Split up autoupdate functions (sync and async).
This commit is contained in:
parent
eeb29d140b
commit
30c0773838
@ -39,7 +39,7 @@ def inform_changed_data(instances: Union[Iterable[Model], Model]) -> None:
|
||||
bundle.update(collection_elements)
|
||||
else:
|
||||
# Send autoupdate directly
|
||||
async_to_sync(send_autoupdate)(collection_elements.values())
|
||||
handle_collection_elements(collection_elements.values())
|
||||
|
||||
|
||||
def inform_deleted_data(elements: Iterable[Tuple[str, int]]) -> None:
|
||||
@ -62,7 +62,7 @@ def inform_deleted_data(elements: Iterable[Tuple[str, int]]) -> None:
|
||||
bundle.update(collection_elements)
|
||||
else:
|
||||
# Send autoupdate directly
|
||||
async_to_sync(send_autoupdate)(collection_elements.values())
|
||||
handle_collection_elements(collection_elements.values())
|
||||
|
||||
|
||||
def inform_data_collection_element_list(collection_elements: List[CollectionElement]) -> None:
|
||||
@ -81,7 +81,7 @@ def inform_data_collection_element_list(collection_elements: List[CollectionElem
|
||||
bundle.update(elements)
|
||||
else:
|
||||
# Send autoupdate directly
|
||||
async_to_sync(send_autoupdate)(elements.values())
|
||||
handle_collection_elements(elements.values())
|
||||
|
||||
|
||||
"""
|
||||
@ -105,20 +105,23 @@ class AutoupdateBundleMiddleware:
|
||||
response = self.get_response(request)
|
||||
|
||||
bundle: Dict[str, CollectionElement] = autoupdate_bundle.pop(thread_id)
|
||||
async_to_sync(send_autoupdate)(bundle.values())
|
||||
handle_collection_elements(bundle.values())
|
||||
return response
|
||||
|
||||
|
||||
async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> None:
|
||||
def handle_collection_elements(collection_elements: Iterable[CollectionElement]) -> None:
|
||||
"""
|
||||
Helper function, that sends collection_elements through a channel to the
|
||||
autoupdate system.
|
||||
|
||||
Also updates the redis cache.
|
||||
autoupdate system and updates the cache.
|
||||
|
||||
Does nothing if collection_elements is empty.
|
||||
"""
|
||||
if collection_elements:
|
||||
async def update_cache(collection_elements: Iterable[CollectionElement]) -> int:
|
||||
"""
|
||||
Async helper function to update the cache.
|
||||
|
||||
Returns the change_id
|
||||
"""
|
||||
cache_elements: Dict[str, Optional[Dict[str, Any]]] = {}
|
||||
for element in collection_elements:
|
||||
element_id = get_element_id(element.collection_string, element.id)
|
||||
@ -126,9 +129,16 @@ async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> N
|
||||
cache_elements[element_id] = None
|
||||
else:
|
||||
cache_elements[element_id] = element.get_full_data()
|
||||
return await element_cache.change_elements(cache_elements)
|
||||
|
||||
change_id = await element_cache.change_elements(cache_elements)
|
||||
async def async_handle_collection_elements(collection_elements: Iterable[CollectionElement]) -> None:
|
||||
"""
|
||||
Async helper function to update cache and send autoupdate.
|
||||
"""
|
||||
# Update cache
|
||||
change_id = await update_cache(collection_elements)
|
||||
|
||||
# Send autoupdate
|
||||
channel_layer = get_channel_layer()
|
||||
await channel_layer.group_send(
|
||||
"autoupdate",
|
||||
@ -137,3 +147,9 @@ async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> N
|
||||
"change_id": change_id,
|
||||
},
|
||||
)
|
||||
|
||||
if collection_elements:
|
||||
# TODO: Save histroy here using sync code
|
||||
|
||||
# Update cache and send autoupdate
|
||||
async_to_sync(async_handle_collection_elements)(collection_elements)
|
||||
|
Loading…
Reference in New Issue
Block a user