diff --git a/openslides/core/config.py b/openslides/core/config.py index 9d6989466..a1862d2f5 100644 --- a/openslides/core/config.py +++ b/openslides/core/config.py @@ -85,28 +85,23 @@ class ConfigHandler: before this is called. """ async with build_key_to_id_lock: - # Another cliend could have build the key_to_id_dict, check and return early + # Another worker could have build the key_to_id_dict, check and return early if self.key_to_id is not None: return - config_full_data = await element_cache.get_collection_data( - self.get_collection_string() - ) - elements = config_full_data.values() - self.key_to_id = {} - for element in elements: - self.key_to_id[element["key"]] = element["id"] + config_full_data = await element_cache.get_collection_data( + self.get_collection_string() + ) + elements = config_full_data.values() + self.key_to_id = {} + for element in elements: + self.key_to_id[element["key"]] = element["id"] def exists(self, key: str) -> bool: """ Returns True, if the config varialbe was defined. """ - try: - self.config_variables[key] - except KeyError: - return False - else: - return True + return key in self.config_variables # TODO: Remove the any by using right types in INPUT_TYPE_MAPPING def __setitem__(self, key: str, value: Any) -> None: diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index 92cc0385a..00054ca74 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -4,7 +4,7 @@ from datetime import datetime from time import sleep from typing import Any, Callable, Dict, List, Optional, Tuple, Type -from asgiref.sync import async_to_sync +from asgiref.sync import async_to_sync, sync_to_async from django.apps import apps from . import logging @@ -53,7 +53,7 @@ class ElementCache: There is one redis Hash (simular to python dict) for the full_data The key of the Hashes is COLLECTIONSTRING:ID where COLLECTIONSTRING is the - collection_string of a collection and id the id of an element. + collection of a collection and id the id of an element. There is an sorted set in redis with the change id as score. The values are COLLETIONSTRING:ID for the elements that have been changed with that change @@ -81,7 +81,7 @@ class ElementCache: @property def cachables(self) -> Dict[str, Cachable]: """ - Returns all cachables as a dict where the key is the collection_string of the cachable. + Returns all cachables as a dict where the key is the collection of the cachable. """ # This method is neccessary to lazy load the cachables if self._cachables is None: @@ -130,33 +130,10 @@ class ElementCache: lock_name = "build_cache" # Set a lock so only one process builds the cache if await locking.set(lock_name): - logger.info("Building up the cache data...") try: - mapping = {} - for collection_string, cachable in self.cachables.items(): - for element in cachable.get_elements(): - mapping.update( - { - get_element_id( - collection_string, element["id"] - ): json.dumps(element) - } - ) - logger.info("Done building the cache data.") - logger.info("Saving cache data into the cache...") - if default_change_id is None: - if self.default_change_id is not None: - default_change_id = self.default_change_id - else: - # Use the miliseconds (rounded) since the 2016-02-29. - default_change_id = int( - (datetime.utcnow() - datetime(2016, 2, 29)).total_seconds() - ) - default_change_id *= 1000 - await self.cache_provider.reset_full_cache(mapping, default_change_id) - if schema_version: - await self.cache_provider.set_schema_version(schema_version) - logger.info("Done saving the cache data.") + await self._build_cache( + default_change_id=default_change_id, schema_version=schema_version + ) finally: await locking.delete(lock_name) else: @@ -165,6 +142,67 @@ class ElementCache: sleep(0.01) logger.info("Cache is ready (built by another process).") + async def _build_cache( + self, + default_change_id: Optional[int] = None, + schema_version: Optional[SchemaVersion] = None, + ) -> None: + logger.info("Building config data and resetting cache...") + config_mapping = await sync_to_async( + self._build_cache_get_elementid_model_mapping + )(config_only=True) + change_id = self._build_cache_get_change_id(default_change_id) + await self.cache_provider.reset_full_cache(config_mapping, change_id) + if schema_version: + await self.cache_provider.set_schema_version(schema_version) + logger.info("Done building and resetting.") + + logger.info("Building up the cache data...") + mapping = await sync_to_async(self._build_cache_get_elementid_model_mapping)() + logger.info("Done building the cache data.") + logger.info("Saving cache data into the cache...") + await self.cache_provider.add_to_full_data(mapping) + logger.info("Done saving the cache data.") + + def _build_cache_get_elementid_model_mapping( + self, config_only: bool = False, + ) -> Dict[str, str]: + """ + Do NOT call this in an asynchronous context! + This accesses the django's model system which requires a synchronous context. + + config_only=True only includes the config collection + config_only=False *excludes* the config collection + """ + mapping = {} + config_collection = "core/config" + for collection, cachable in self.cachables.items(): + if (config_only and collection != config_collection) or ( + not config_only and collection == config_collection + ): + continue + for element in cachable.get_elements(): + mapping.update( + {get_element_id(collection, element["id"]): json.dumps(element)} + ) + return mapping + + def _build_cache_get_change_id( + self, default_change_id: Optional[int] = None + ) -> int: + if default_change_id is None: + if self.default_change_id is not None: + change_id = self.default_change_id + else: + # Use the miliseconds (rounded) since the 2016-02-29. + change_id = ( + int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) + * 1000 + ) + else: + change_id = default_change_id + return change_id + async def change_elements( self, elements: Dict[str, Optional[Dict[str, Any]]] ) -> int: @@ -203,19 +241,17 @@ class ElementCache: """ all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list) for element_id, data in (await self.cache_provider.get_all_data()).items(): - collection_string, _ = split_element_id(element_id) + collection, _ = split_element_id(element_id) element = json.loads(data.decode()) element.pop( "_no_delete_on_restriction", False ) # remove special field for get_data_since - all_data[collection_string].append(element) + all_data[collection].append(element) if user_id is not None: - for collection_string in all_data.keys(): - restricter = self.cachables[collection_string].restrict_elements - all_data[collection_string] = await restricter( - user_id, all_data[collection_string] - ) + for collection in all_data.keys(): + restricter = self.cachables[collection].restrict_elements + all_data[collection] = await restricter(user_id, all_data[collection]) return dict(all_data) async def get_all_data_dict(self) -> Dict[str, Dict[int, Dict[str, Any]]]: @@ -229,22 +265,20 @@ class ElementCache: """ all_data: Dict[str, Dict[int, Dict[str, Any]]] = defaultdict(dict) for element_id, data in (await self.cache_provider.get_all_data()).items(): - collection_string, id = split_element_id(element_id) + collection, id = split_element_id(element_id) element = json.loads(data.decode()) element.pop( "_no_delete_on_restriction", False ) # remove special field for get_data_since - all_data[collection_string][id] = element + all_data[collection][id] = element return dict(all_data) - async def get_collection_data( - self, collection_string: str - ) -> Dict[int, Dict[str, Any]]: + async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]: """ Returns the data for one collection as dict: {id: } """ encoded_collection_data = await self.cache_provider.get_collection_data( - collection_string + collection ) collection_data = {} for id in encoded_collection_data.keys(): @@ -255,14 +289,14 @@ class ElementCache: return collection_data async def get_element_data( - self, collection_string: str, id: int, user_id: Optional[int] = None + self, collection: str, id: int, user_id: Optional[int] = None ) -> Optional[Dict[str, Any]]: """ Returns one element or None, if the element does not exist. If the user id is given the data will be restricted for this user. """ encoded_element = await self.cache_provider.get_element_data( - get_element_id(collection_string, id) + get_element_id(collection, id) ) if encoded_element is None: @@ -273,15 +307,13 @@ class ElementCache: ) # remove special field for get_data_since if user_id is not None: - element = await self.restrict_element_data( - element, collection_string, user_id - ) + element = await self.restrict_element_data(element, collection, user_id) return element async def restrict_element_data( - self, element: Dict[str, Any], collection_string: str, user_id: int + self, element: Dict[str, Any], collection: str, user_id: int ) -> Optional[Dict[str, Any]]: - restricter = self.cachables[collection_string].restrict_elements + restricter = self.cachables[collection].restrict_elements restricted_elements = await restricter(user_id, [element]) return restricted_elements[0] if restricted_elements else None @@ -294,7 +326,7 @@ class ElementCache: data will be restricted for this user. Returns two values inside a tuple. The first value is a dict where the - key is the collection_string and the value is a list of data. The second + key is the collection and the value is a list of data. The second is a list of element_ids with deleted elements. Only returns elements with the change_id or newer. When change_id is 0, @@ -325,8 +357,8 @@ class ElementCache: change_id, max_change_id=max_change_id ) changed_elements = { - collection_string: [json.loads(value.decode()) for value in value_list] - for collection_string, value_list in raw_changed_elements.items() + collection: [json.loads(value.decode()) for value in value_list] + for collection, value_list in raw_changed_elements.items() } if user_id is None: @@ -336,7 +368,7 @@ class ElementCache: else: # the list(...) is important, because `changed_elements` will be # altered during iteration and restricting data - for collection_string, elements in list(changed_elements.items()): + for collection, elements in list(changed_elements.items()): # Remove the _no_delete_on_restriction from each element. Collect all ids, where # this field is absent or False. unrestricted_ids = set() @@ -347,7 +379,7 @@ class ElementCache: if not no_delete_on_restriction: unrestricted_ids.add(element["id"]) - cacheable = self.cachables[collection_string] + cacheable = self.cachables[collection] restricted_elements = await cacheable.restrict_elements( user_id, elements ) @@ -361,12 +393,12 @@ class ElementCache: # Delete all ids, that are allowed to be deleted (see unrestricted_ids) and are # not present after restricting the data. for id in unrestricted_ids - restricted_element_ids: - deleted_elements.append(get_element_id(collection_string, id)) + deleted_elements.append(get_element_id(collection, id)) if not restricted_elements: - del changed_elements[collection_string] + del changed_elements[collection] else: - changed_elements[collection_string] = restricted_elements + changed_elements[collection] = restricted_elements return (changed_elements, deleted_elements) diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 36accc1c0..1fb0cc60f 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -44,6 +44,9 @@ class ElementCacheProvider(Protocol): ) -> None: ... + async def add_to_full_data(self, data: Dict[str, str]) -> None: + ... + async def data_exists(self) -> bool: ... @@ -252,6 +255,10 @@ class RedisCacheProvider: ) await tr.execute() + async def add_to_full_data(self, data: Dict[str, str]) -> None: + async with get_connection() as redis: + redis.hmset_dict(self.full_data_cache_key, data) + async def data_exists(self) -> bool: """ Returns True, when there is data in the cache. @@ -331,7 +338,7 @@ class RedisCacheProvider: Returns all elements since a change_id (included) and until the max_change_id (included). The returend value is a two element tuple. The first value is a dict the elements where - the key is the collection_string and the value a list of (json-) encoded elements. The + the key is the collection and the value a list of (json-) encoded elements. The second element is a list of element_ids, that have been deleted since the change_id. """ changed_elements: Dict[str, List[bytes]] = defaultdict(list) @@ -361,8 +368,8 @@ class RedisCacheProvider: # The element is not in the cache. It has to be deleted. deleted_elements.append(element_id.decode()) else: - collection_string, id = split_element_id(element_id) - changed_elements[collection_string].append(element_json) + collection, id = split_element_id(element_id) + changed_elements[collection].append(element_json) return changed_elements, deleted_elements @ensure_cache_wrapper() @@ -501,6 +508,9 @@ class MemoryCacheProvider: self.full_data = data self.default_change_id = default_change_id + async def add_to_full_data(self, data: Dict[str, str]) -> None: + self.full_data.update(data) + async def data_exists(self) -> bool: return bool(self.full_data) and self.default_change_id >= 0 @@ -564,8 +574,8 @@ class MemoryCacheProvider: if element_json is None: deleted_elements.append(element_id) else: - collection_string, id = split_element_id(element_id) - changed_elements[collection_string].append(element_json.encode()) + collection, id = split_element_id(element_id) + changed_elements[collection].append(element_json.encode()) return changed_elements, deleted_elements async def get_current_change_id(self) -> int: diff --git a/openslides/utils/main.py b/openslides/utils/main.py index 8ec54eda2..37191f35f 100644 --- a/openslides/utils/main.py +++ b/openslides/utils/main.py @@ -351,11 +351,7 @@ def is_local_installation() -> bool: This is the case if manage.py is used, or when the --local-installation flag is set. """ - return ( - True - if "--local-installation" in sys.argv or "manage.py" in sys.argv[0] - else False - ) + return "--local-installation" in sys.argv or "manage.py" in sys.argv[0] def is_windows() -> bool: