import itertools import threading from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from django.db.models import Model from mypy_extensions import TypedDict from .cache import element_cache, get_element_id Element = TypedDict( "Element", { "id": int, "collection_string": str, "full_data": Optional[Dict[str, Any]], "information": str, "user_id": Optional[int], "disable_history": bool, }, ) AutoupdateFormat = TypedDict( "AutoupdateFormat", { "changed": Dict[str, List[Dict[str, Any]]], "deleted": Dict[str, List[int]], "from_change_id": int, "to_change_id": int, "all_data": bool, }, ) def inform_changed_data( instances: Union[Iterable[Model], Model], information: str = "", user_id: Optional[int] = None, ) -> None: """ Informs the autoupdate system and the caching system about the creation or update of an element. The argument instances can be one instance or an iterable over instances. History creation is enabled. """ root_instances = set() if not isinstance(instances, Iterable): instances = (instances,) for instance in instances: try: root_instances.add(instance.get_root_rest_element()) except AttributeError: # Instance has no method get_root_rest_element. Just ignore it. pass elements: Dict[str, Element] = {} for root_instance in root_instances: key = root_instance.get_collection_string() + str(root_instance.get_rest_pk()) elements[key] = Element( id=root_instance.get_rest_pk(), collection_string=root_instance.get_collection_string(), full_data=root_instance.get_full_data(), information=information, user_id=user_id, disable_history=False, ) bundle = autoupdate_bundle.get(threading.get_ident()) if bundle is not None: # Put all elements into the autoupdate_bundle. bundle.update(elements) else: # Send autoupdate directly handle_changed_elements(elements.values()) def inform_deleted_data( deleted_elements: Iterable[Tuple[str, int]], information: str = "", user_id: Optional[int] = None, ) -> None: """ Informs the autoupdate system and the caching system about the deletion of elements. History creation is enabled. """ elements: Dict[str, Element] = {} for deleted_element in deleted_elements: key = deleted_element[0] + str(deleted_element[1]) elements[key] = Element( id=deleted_element[1], collection_string=deleted_element[0], full_data=None, information=information, user_id=user_id, disable_history=False, ) bundle = autoupdate_bundle.get(threading.get_ident()) if bundle is not None: # Put all elements into the autoupdate_bundle. bundle.update(elements) else: # Send autoupdate directly handle_changed_elements(elements.values()) def inform_changed_elements(changed_elements: Iterable[Element]) -> None: """ Informs the autoupdate system about some elements. This is used just to send some data to all users. If you want to save history information, user id or disable history you have to put information or flag inside the elements. """ elements = {} for changed_element in changed_elements: key = changed_element["collection_string"] + str(changed_element["id"]) elements[key] = changed_element bundle = autoupdate_bundle.get(threading.get_ident()) if bundle is not None: # Put all collection elements into the autoupdate_bundle. bundle.update(elements) else: # Send autoupdate directly handle_changed_elements(elements.values()) """ Global container for autoupdate bundles """ autoupdate_bundle: Dict[int, Dict[str, Element]] = {} class AutoupdateBundleMiddleware: """ Middleware to handle autoupdate bundling. """ def __init__(self, get_response: Any) -> None: self.get_response = get_response # One-time configuration and initialization. def __call__(self, request: Any) -> Any: thread_id = threading.get_ident() autoupdate_bundle[thread_id] = {} response = self.get_response(request) bundle: Dict[str, Element] = autoupdate_bundle.pop(thread_id) handle_changed_elements(bundle.values()) return response def handle_changed_elements(elements: Iterable[Element]) -> None: """ Helper function, that sends elements through a channel to the autoupdate system and updates the cache. Does nothing if elements is empty. """ async def update_cache(elements: Iterable[Element]) -> int: """ Async helper function to update the cache. Returns the change_id """ cache_elements: Dict[str, Optional[Dict[str, Any]]] = {} for element in elements: element_id = get_element_id(element["collection_string"], element["id"]) cache_elements[element_id] = element["full_data"] return await element_cache.change_elements(cache_elements) async def async_handle_collection_elements(elements: Iterable[Element]) -> None: """ Async helper function to update cache and send autoupdate. """ # Update cache change_id = await update_cache(elements) # Send autoupdate channel_layer = get_channel_layer() await channel_layer.group_send( "autoupdate", {"type": "send_data", "change_id": change_id} ) if elements: # Save histroy here using sync code. history_instances = save_history(elements) # Convert history instances to Elements. history_elements: List[Element] = [] for history_instance in history_instances: history_elements.append( Element( id=history_instance.get_rest_pk(), collection_string=history_instance.get_collection_string(), full_data=history_instance.get_full_data(), information="", user_id=None, disable_history=True, # This does not matter because history elements can never be part of the history itself. ) ) # Chain elements and history elements. itertools.chain(elements, history_elements) # Update cache and send autoupdate using async code. async_to_sync(async_handle_collection_elements)( itertools.chain(elements, history_elements) ) def save_history( elements: Iterable[Element] ) -> Iterable: # TODO: Try to write Iterable[History] here """ Thin wrapper around the call of history saving manager method. This is separated to patch it during tests. """ from ..core.models import History return History.objects.add_elements(elements)