Merge pull request #3959 from normanjaeckel/AutoupdateSplitUp

Split up autoupdate functions (sync and async).
This commit is contained in:
Oskar Hahn 2018-10-30 17:18:45 +01:00 committed by GitHub
commit f4cf9f30a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -39,7 +39,7 @@ def inform_changed_data(instances: Union[Iterable[Model], Model]) -> None:
bundle.update(collection_elements)
else:
# Send autoupdate directly
async_to_sync(send_autoupdate)(collection_elements.values())
handle_collection_elements(collection_elements.values())
def inform_deleted_data(elements: Iterable[Tuple[str, int]]) -> None:
@ -62,7 +62,7 @@ def inform_deleted_data(elements: Iterable[Tuple[str, int]]) -> None:
bundle.update(collection_elements)
else:
# Send autoupdate directly
async_to_sync(send_autoupdate)(collection_elements.values())
handle_collection_elements(collection_elements.values())
def inform_data_collection_element_list(collection_elements: List[CollectionElement]) -> None:
@ -81,7 +81,7 @@ def inform_data_collection_element_list(collection_elements: List[CollectionElem
bundle.update(elements)
else:
# Send autoupdate directly
async_to_sync(send_autoupdate)(elements.values())
handle_collection_elements(elements.values())
"""
@ -105,20 +105,23 @@ class AutoupdateBundleMiddleware:
response = self.get_response(request)
bundle: Dict[str, CollectionElement] = autoupdate_bundle.pop(thread_id)
async_to_sync(send_autoupdate)(bundle.values())
handle_collection_elements(bundle.values())
return response
async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> None:
def handle_collection_elements(collection_elements: Iterable[CollectionElement]) -> None:
"""
Helper function, that sends collection_elements through a channel to the
autoupdate system.
Also updates the redis cache.
autoupdate system and updates the cache.
Does nothing if collection_elements is empty.
"""
if collection_elements:
async def update_cache(collection_elements: Iterable[CollectionElement]) -> int:
"""
Async helper function to update the cache.
Returns the change_id
"""
cache_elements: Dict[str, Optional[Dict[str, Any]]] = {}
for element in collection_elements:
element_id = get_element_id(element.collection_string, element.id)
@ -126,9 +129,16 @@ async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> N
cache_elements[element_id] = None
else:
cache_elements[element_id] = element.get_full_data()
return await element_cache.change_elements(cache_elements)
change_id = await element_cache.change_elements(cache_elements)
async def async_handle_collection_elements(collection_elements: Iterable[CollectionElement]) -> None:
"""
Async helper function to update cache and send autoupdate.
"""
# Update cache
change_id = await update_cache(collection_elements)
# Send autoupdate
channel_layer = get_channel_layer()
await channel_layer.group_send(
"autoupdate",
@ -137,3 +147,9 @@ async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> N
"change_id": change_id,
},
)
if collection_elements:
# TODO: Save histroy here using sync code
# Update cache and send autoupdate
async_to_sync(async_handle_collection_elements)(collection_elements)