diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index e21a505e8..a1923bff1 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -1,12 +1,13 @@ import json import time import warnings -from collections import Iterable, defaultdict -from typing import Any, Dict, Iterable, List, cast # noqa +from collections import defaultdict +from typing import Any, Dict, Generator, Iterable, List, Union, cast from channels import Channel, Group from channels.asgi import get_channel_layer from channels.auth import channel_session_user, channel_session_user_from_http +from django.apps import apps from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.db.models import Model @@ -14,7 +15,7 @@ from django.db.models import Model from ..core.config import config from ..core.models import Projector from .auth import anonymous_is_enabled, has_perm, user_to_collection_user -from .cache import startup_cache, websocket_user_cache +from .cache import restricted_data_cache, websocket_user_cache from .collection import Collection, CollectionElement, CollectionElementList @@ -89,25 +90,37 @@ def ws_add_site(message: Any) -> None: # Collect all elements that shoud be send to the client when the websocket # connection is established. user = user_to_collection_user(message.user.id) - output = [] - for collection in startup_cache.get_collections(): - access_permissions = collection.get_access_permissions() - restricted_data = access_permissions.get_restricted_data(collection, user) + user_id = user.id if user is not None else 0 + if restricted_data_cache.exists_for_user(user_id): + output = restricted_data_cache.get_data(user_id) + else: + output = [] + for collection in get_startup_collections(): + access_permissions = collection.get_access_permissions() + restricted_data = access_permissions.get_restricted_data(collection, user) - # At this point restricted_data has to be a list. So we have to tell it mypy - restricted_data = cast(List[Dict[str, Any]], restricted_data) + # At this point restricted_data has to be a list. So we have to tell it mypy + restricted_data = cast(List[Dict[str, Any]], restricted_data) - for data in restricted_data: - if data is None: - # We do not want to send 'deleted' objects on startup. - # That's why we skip such data. - continue - output.append( - format_for_autoupdate( - collection_string=collection.collection_string, - id=int(data['id']), - action='changed', - data=data)) + for data in restricted_data: + if data is None: + # We do not want to send 'deleted' objects on startup. + # That's why we skip such data. + continue + + formatted_data = format_for_autoupdate( + collection_string=collection.collection_string, + id=data['id'], + action='changed', + data=data) + + output.append(formatted_data) + # Cache restricted data for user + restricted_data_cache.add_element( + user_id, + collection.collection_string, + data['id'], + formatted_data) # Send all data. if output: @@ -259,7 +272,23 @@ def send_data(message: Any) -> None: except ObjectDoesNotExist: # The user does not exist. Skip him/her. continue - output = collection_elements.as_autoupdate_for_user(user) + + output = [] + for collection_element in collection_elements: + formatted_data = collection_element.as_autoupdate_for_user(user) + if formatted_data['action'] == 'changed': + restricted_data_cache.add_element( + user_id or 0, + collection_element.collection_string, + collection_element.id, + formatted_data) + else: + restricted_data_cache.del_element( + user_id or 0, + collection_element.collection_string, + collection_element.id) + output.append(formatted_data) + for channel_name in channel_names: send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)}) @@ -290,7 +319,7 @@ def send_data(message: Any) -> None: {'text': json.dumps(output)}) -def inform_changed_data(instances: Iterable[Model], information: Dict[str, Any]=None) -> None: +def inform_changed_data(instances: Union[Iterable[Model], Model], information: Dict[str, Any]=None) -> None: """ Informs the autoupdate system and the caching system about the creation or update of an element. @@ -299,8 +328,8 @@ def inform_changed_data(instances: Iterable[Model], information: Dict[str, Any]= """ root_instances = set() if not isinstance(instances, Iterable): - # Make sure instances is an iterable instances = (instances, ) + for instance in instances: try: root_instances.add(instance.get_root_rest_element()) @@ -375,13 +404,25 @@ def send_autoupdate(collection_elements: CollectionElementList) -> None: Helper function, that sends collection_elements through a channel to the autoupdate system. - Before sending the startup_cache is cleared because it is possibly out of - date. - Does nothing if collection_elements is empty. """ if collection_elements: - startup_cache.clear() send_or_wait( Channel('autoupdate.send_data').send, collection_elements.as_channels_message()) + + +def get_startup_collections() -> Generator[Collection, None, None]: + """ + Returns all Collections that should be send to the user at startup + """ + for app in apps.get_app_configs(): + try: + # Get the method get_startup_elements() from an app. + # This method has to return an iterable of Collection objects. + get_startup_elements = app.get_startup_elements + except AttributeError: + # Skip apps that do not implement get_startup_elements. + continue + + yield from get_startup_elements() diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index 822add703..04db952db 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -1,3 +1,4 @@ +import json from collections import defaultdict from typing import ( # noqa TYPE_CHECKING, @@ -9,11 +10,11 @@ from typing import ( # noqa List, Optional, Set, + Union, ) from channels import Group from channels.sessions import session_for_reply_channel -from django.apps import apps from django.core.cache import cache, caches if TYPE_CHECKING: @@ -202,60 +203,76 @@ class DjangoCacheWebsocketUserCache(BaseWebsocketUserCache): cache.set(self.get_cache_key(), data) -class StartupCache: +class RestrictedDataCache: """ - Cache of all data that are required when a client connects via websocket. + Caches all Data for a specific users. + + The cached values are expected to be formatted for outout via websocket. """ - cache_key = "full_data_startup_cache" - def build(self) -> Dict[str, List[str]]: + base_cache_key = 'restricted_user_cache' + + def add_element(self, user_id: int, collection_string: str, id: int, data: object) -> None: """ - Generate the cache by going through all apps. Returns a dict where the - key is the collection string and the value a list of the full_data from - the collection elements. + Adds one element to the cache. If the cache does not exists for the user, + it is created. """ - cache_data = {} # type: Dict[str, List[str]] - for app in apps.get_app_configs(): - try: - # Get the method get_startup_elements() from an app. - # This method has to return an iterable of Collection objects. - get_startup_elements = app.get_startup_elements # type: Callable[[], Iterable[Collection]] - except AttributeError: - # Skip apps that do not implement get_startup_elements. - continue + redis = get_redis_connection() + redis.hset( + self.get_cache_key(user_id), + "{}/{}".format(collection_string, id), + json.dumps(data)) - for collection in get_startup_elements(): - cache_data[collection.collection_string] = [ - collection_element.get_full_data() - for collection_element - in collection.element_generator()] - - cache.set(self.cache_key, cache_data, 86400) - return cache_data - - def clear(self) -> None: + def del_element(self, user_id: int, collection_string: str, id: int) -> None: """ - Clears the cache. + Removes one element from the cache. + + Does nothing if the cache does not exist. """ - cache.delete(self.cache_key) + redis = get_redis_connection() + redis.hdel( + self.get_cache_key(user_id), + "{}/{}".format(collection_string, id)) - def get_collections(self) -> Generator['Collection', None, None]: + def exists_for_user(self, user_id: int) -> bool: """ - Generator that returns all cached Collections. - - The data is read from the cache if it exists. It builds the cache if it - does not exists. + Returns True if the cache for the user exists, else False. """ - from .collection import Collection # noqa - data = cache.get(self.cache_key) - if data is None: - # The cache does not exist. - data = self.build() - for collection_string, full_data in data.items(): - yield Collection(collection_string, full_data) + redis = get_redis_connection() + return redis.exists(self.get_cache_key(user_id)) + + def get_data(self, user_id: int) -> List[object]: + """ + Returns all data for the user. + + The returned value is a list of the elements. + """ + redis = get_redis_connection() + return [json.loads(element) for element in redis.hvals(self.get_cache_key(user_id))] + + def get_cache_key(self, user_id: int) -> str: + """ + Returns the cache key for a user. + """ + return cache.make_key('{}:{}'.format(self.base_cache_key, user_id)) -startup_cache = StartupCache() +class DummyRestrictedDataCache: + """ + Dummy RestrictedDataCache that does nothing. + """ + + def add_element(self, user_id: int, collection_string: str, id: int, data: object) -> None: + pass + + def del_element(self, user_id: int, collection_string: str, id: int) -> None: + pass + + def exists_for_user(self, user_id: int) -> bool: + return False + + def get_data(self, user_id: int) -> List[object]: + pass def use_redis_cache() -> bool: @@ -279,5 +296,7 @@ def get_redis_connection() -> Any: if use_redis_cache(): websocket_user_cache = RedisWebsocketUserCache() # type: BaseWebsocketUserCache + restricted_data_cache = RestrictedDataCache() # type: Union[RestrictedDataCache, DummyRestrictedDataCache] else: websocket_user_cache = DjangoCacheWebsocketUserCache() + restricted_data_cache = DummyRestrictedDataCache()