diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index 64673b3c6..ce910d958 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -11,7 +11,7 @@ from django.apps import apps from .cache_providers import ( Cachable, ElementCacheProvider, - MemmoryCacheProvider, + MemoryCacheProvider, RedisCacheProvider, ) from .redis import use_redis @@ -71,19 +71,7 @@ class ElementCache: self.cache_provider = cache_provider_class(self.async_ensure_cache) self.cachable_provider = cachable_provider self._cachables: Optional[Dict[str, Cachable]] = None - self.set_default_change_id(default_change_id) - - def set_default_change_id(self, default_change_id: Optional[int] = None) -> None: - """ - Sets the default change id for the cache. Needs to update, if the cache gets generated. - """ - # The current time is used as the first change_id if there is non in redis - if default_change_id is None: - # Use the miliseconds (rounted) since the 2016-02-29. - default_change_id = ( - int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000 - ) - self.default_change_id = default_change_id + self.default_change_id: Optional[int] = default_change_id @property def cachables(self) -> Dict[str, Cachable]: @@ -151,8 +139,16 @@ class ElementCache: ) logger.info("Done building the cache data.") logger.info("Saving cache data into the cache...") - self.set_default_change_id(default_change_id=default_change_id) - await self.cache_provider.reset_full_cache(mapping) + if default_change_id is None: + if self.default_change_id is not None: + default_change_id = self.default_change_id + else: + # Use the miliseconds (rounded) since the 2016-02-29. + default_change_id = int( + (datetime.utcnow() - datetime(2016, 2, 29)).total_seconds() + ) + default_change_id *= 1000 + await self.cache_provider.reset_full_cache(mapping, default_change_id) if schema_version: await self.cache_provider.set_schema_version(schema_version) logger.info("Done saving the cache data.") @@ -187,7 +183,7 @@ class ElementCache: deleted_elements.append(element_id) return await self.cache_provider.add_changed_elements( - changed_elements, deleted_elements, self.default_change_id + 1 + changed_elements, deleted_elements ) async def get_all_data_list( @@ -331,8 +327,7 @@ class ElementCache: Returns default_change_id if there is no change id yet. """ - value = await self.cache_provider.get_current_change_id() - return value if value is not None else self.default_change_id + return await self.cache_provider.get_current_change_id() async def get_lowest_change_id(self) -> int: """ @@ -340,11 +335,7 @@ class ElementCache: Raises a RuntimeError if there is no change_id. """ - value = await self.cache_provider.get_lowest_change_id() - if not value: - raise RuntimeError("There is no known change_id.") - # Return the score (second element) of the first (and only) element - return value + return await self.cache_provider.get_lowest_change_id() def load_element_cache() -> ElementCache: @@ -354,7 +345,7 @@ def load_element_cache() -> ElementCache: if use_redis: cache_provider_class: Type[ElementCacheProvider] = RedisCacheProvider else: - cache_provider_class = MemmoryCacheProvider + cache_provider_class = MemoryCacheProvider return ElementCache(cache_provider_class=cache_provider_class) diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index c7a3fa35c..6a564d54e 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -39,7 +39,9 @@ class ElementCacheProvider(Protocol): async def clear_cache(self) -> None: ... - async def reset_full_cache(self, data: Dict[str, str]) -> None: + async def reset_full_cache( + self, data: Dict[str, str], default_change_id: int + ) -> None: ... async def data_exists(self) -> bool: @@ -55,10 +57,7 @@ class ElementCacheProvider(Protocol): ... async def add_changed_elements( - self, - changed_elements: List[str], - deleted_element_ids: List[str], - default_change_id: int, + self, changed_elements: List[str], deleted_element_ids: List[str] ) -> int: ... @@ -76,10 +75,10 @@ class ElementCacheProvider(Protocol): async def del_lock(self, lock_name: str) -> None: ... - async def get_current_change_id(self) -> Optional[int]: + async def get_current_change_id(self) -> int: ... - async def get_lowest_change_id(self) -> Optional[int]: + async def get_lowest_change_id(self) -> int: ... async def get_schema_version(self) -> Optional[SchemaVersion]: @@ -127,7 +126,6 @@ class RedisCacheProvider: full_data_cache_key: str = "full_data" change_id_cache_key: str = "change_id" schema_cache_key: str = "schema" - prefix: str = "element_cache_" # All lua-scripts used by this provider. Every entry is a Tuple (str, bool) with the # script and an ensure_cache-indicator. If the indicator is True, a short ensure_cache-script @@ -164,36 +162,33 @@ class RedisCacheProvider: local change_id if next(tmp) == nil then -- The key does not exist - change_id = ARGV[1] + return redis.error_reply("cache_reset") else change_id = tmp[2] + 1 end - local nc = tonumber(ARGV[2]) - local nd = tonumber(ARGV[3]) + local nc = tonumber(ARGV[1]) + local nd = tonumber(ARGV[2]) local i, max -- Add changed_elements to the cache and sorted set (the first of the pairs) if (nc > 0) then - max = 2 + nc - redis.call('hmset', KEYS[1], unpack(ARGV, 4, max + 1)) - for i = 4, max, 2 do + max = 1 + nc + redis.call('hmset', KEYS[1], unpack(ARGV, 3, max + 1)) + for i = 3, max, 2 do redis.call('zadd', KEYS[2], change_id, ARGV[i]) end end -- Delete deleted_element_ids and add them to sorted set if (nd > 0) then - max = 3 + nc + nd - redis.call('hdel', KEYS[1], unpack(ARGV, 4 + nc, max)) - for i = 4 + nc, max, 1 do + max = 2 + nc + nd + redis.call('hdel', KEYS[1], unpack(ARGV, 3 + nc, max)) + for i = 3 + nc, max, 1 do redis.call('zadd', KEYS[2], change_id, ARGV[i]) end end - -- Set lowest_change_id if it does not exist - redis.call('zadd', KEYS[2], 'NX', change_id, '_config:lowest_change_id') - return change_id """, True, @@ -244,31 +239,27 @@ class RedisCacheProvider: async def ensure_cache(self) -> None: await self._ensure_cache() - def get_full_data_cache_key(self) -> str: - return "".join((self.prefix, self.full_data_cache_key)) - - def get_change_id_cache_key(self) -> str: - return "".join((self.prefix, self.change_id_cache_key)) - - def get_schema_cache_key(self) -> str: - return "".join((self.prefix, self.schema_cache_key)) - async def clear_cache(self) -> None: """ Deleted all cache entries created with this element cache. """ - await self.eval("clear_cache", keys=[], args=[f"{self.prefix}*"]) + await self.eval("clear_cache", keys=[], args=["*"]) - async def reset_full_cache(self, data: Dict[str, str]) -> None: + async def reset_full_cache( + self, data: Dict[str, str], default_change_id: int + ) -> None: """ Deletes the full_data_cache and write new data in it. Clears the change id key. Does not clear locks. """ async with get_connection() as redis: tr = redis.multi_exec() - tr.delete(self.get_change_id_cache_key()) - tr.delete(self.get_full_data_cache_key()) - tr.hmset_dict(self.get_full_data_cache_key(), data) + tr.delete(self.change_id_cache_key) + tr.delete(self.full_data_cache_key) + tr.hmset_dict(self.full_data_cache_key, data) + tr.zadd( + self.change_id_cache_key, default_change_id, "_config:lowest_change_id" + ) await tr.execute() async def data_exists(self) -> bool: @@ -276,7 +267,11 @@ class RedisCacheProvider: Returns True, when there is data in the cache. """ async with get_connection() as redis: - return await redis.exists(self.get_full_data_cache_key()) + return await redis.exists(self.full_data_cache_key) and bool( + await redis.zrangebyscore( + self.change_id_cache_key, withscores=True, count=1, offset=0 + ) + ) @ensure_cache_wrapper() async def get_all_data(self) -> Dict[bytes, bytes]: @@ -284,7 +279,7 @@ class RedisCacheProvider: Returns all data from the full_data_cache in a mapping from element_id to the element. """ return await aioredis.util.wait_make_dict( - self.eval("get_all_data", [self.get_full_data_cache_key()]) + self.eval("get_all_data", [self.full_data_cache_key]) ) @ensure_cache_wrapper() @@ -294,7 +289,7 @@ class RedisCacheProvider: from element_id to the element. """ response = await self.eval( - "get_collection_data", [self.get_full_data_cache_key()], [f"{collection}:*"] + "get_collection_data", [self.full_data_cache_key], [f"{collection}:*"] ) collection_data = {} @@ -310,15 +305,12 @@ class RedisCacheProvider: Returns one element from the cache. Returns None, when the element does not exist. """ return await self.eval( - "get_element_data", [self.get_full_data_cache_key()], [element_id] + "get_element_data", [self.full_data_cache_key], [element_id] ) @ensure_cache_wrapper() async def add_changed_elements( - self, - changed_elements: List[str], - deleted_element_ids: List[str], - default_change_id: int, + self, changed_elements: List[str], deleted_element_ids: List[str] ) -> int: """ Modified the full_data_cache to insert the changed_elements and removes the @@ -329,9 +321,8 @@ class RedisCacheProvider: return int( await self.eval( "add_changed_elements", - keys=[self.get_full_data_cache_key(), self.get_change_id_cache_key()], + keys=[self.full_data_cache_key, self.change_id_cache_key], args=[ - default_change_id, len(changed_elements), len(deleted_element_ids), *(changed_elements + deleted_element_ids), @@ -363,7 +354,7 @@ class RedisCacheProvider: elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict( self.eval( "get_data_since", - keys=[self.get_full_data_cache_key(), self.get_change_id_cache_key()], + keys=[self.full_data_cache_key, self.change_id_cache_key], args=[change_id, redis_max_change_id], ) ) @@ -388,48 +379,53 @@ class RedisCacheProvider: """ # TODO: Improve lock. See: https://redis.io/topics/distlock async with get_connection() as redis: - return await redis.setnx(f"{self.prefix}lock_{lock_name}", 1) + return await redis.setnx(f"lock_{lock_name}", 1) async def get_lock(self, lock_name: str) -> bool: """ Returns True, when the lock is set. Else False. """ async with get_connection() as redis: - return await redis.get(f"{self.prefix}lock_{lock_name}") + return await redis.get(f"lock_{lock_name}") async def del_lock(self, lock_name: str) -> None: """ Deletes the lock. Does nothing when the lock is not set. """ async with get_connection() as redis: - await redis.delete(f"{self.prefix}lock_{lock_name}") + await redis.delete(f"lock_{lock_name}") - async def get_current_change_id(self) -> Optional[int]: + @ensure_cache_wrapper() + async def get_current_change_id(self) -> int: """ Get the highest change_id from redis. """ async with get_connection() as redis: value = await redis.zrevrangebyscore( - self.get_change_id_cache_key(), withscores=True, count=1, offset=0 + self.change_id_cache_key, withscores=True, count=1, offset=0 ) - # Return the score (second element) of the first (and only) element, if exists. - return value[0][1] if value else None + # Return the score (second element) of the first (and only) element, if exists. + if not value: + raise CacheReset() + return value[0][1] - async def get_lowest_change_id(self) -> Optional[int]: + @ensure_cache_wrapper() + async def get_lowest_change_id(self) -> int: """ Get the lowest change_id from redis. - - Returns None if lowest score does not exist. """ async with get_connection() as redis: - return await redis.zscore( - self.get_change_id_cache_key(), "_config:lowest_change_id" + value = await redis.zscore( + self.change_id_cache_key, "_config:lowest_change_id" ) + if not value: + raise CacheReset() + return value async def get_schema_version(self) -> Optional[SchemaVersion]: """ Retrieves the schema version of the cache or None, if not existent """ async with get_connection() as redis: - schema_version = await redis.hgetall(self.get_schema_cache_key()) + schema_version = await redis.hgetall(self.schema_cache_key) if not schema_version: return None @@ -442,7 +438,7 @@ class RedisCacheProvider: async def set_schema_version(self, schema_version: SchemaVersion) -> None: """ Sets the schema version for this cache. """ async with get_connection() as redis: - await redis.hmset_dict(self.get_schema_cache_key(), schema_version) + await redis.hmset_dict(self.schema_cache_key, schema_version) async def eval( self, script_name: str, keys: List[str] = [], args: List[Any] = [] @@ -458,10 +454,7 @@ class RedisCacheProvider: python, if the lua-script returns a "cache_reset" string as an error response. """ hash = self._script_hashes[script_name] - if ( - self.scripts[script_name][1] - and not keys[0] == self.get_full_data_cache_key() - ): + if self.scripts[script_name][1] and not keys[0] == self.full_data_cache_key: raise ImproperlyConfigured( "A script with a ensure_cache prefix must have the full_data cache key as its first key" ) @@ -490,7 +483,7 @@ class RedisCacheProvider: raise e -class MemmoryCacheProvider: +class MemoryCacheProvider: """ CacheProvider for the ElementCache that uses only the memory. @@ -510,6 +503,7 @@ class MemmoryCacheProvider: self.full_data: Dict[str, str] = {} self.change_id_data: Dict[int, Set[str]] = {} self.locks: Dict[str, str] = {} + self.default_change_id: int = -1 async def ensure_cache(self) -> None: pass @@ -517,12 +511,15 @@ class MemmoryCacheProvider: async def clear_cache(self) -> None: self.set_data_dicts() - async def reset_full_cache(self, data: Dict[str, str]) -> None: + async def reset_full_cache( + self, data: Dict[str, str], default_change_id: int + ) -> None: self.change_id_data = {} self.full_data = data + self.default_change_id = default_change_id async def data_exists(self) -> bool: - return bool(self.full_data) + return bool(self.full_data) and self.default_change_id >= 0 async def get_all_data(self) -> Dict[bytes, bytes]: return str_dict_to_bytes(self.full_data) @@ -541,16 +538,9 @@ class MemmoryCacheProvider: return value.encode() if value is not None else None async def add_changed_elements( - self, - changed_elements: List[str], - deleted_element_ids: List[str], - default_change_id: int, + self, changed_elements: List[str], deleted_element_ids: List[str] ) -> int: - current_change_id = await self.get_current_change_id() - if current_change_id is None: - change_id = default_change_id - else: - change_id = current_change_id + 1 + change_id = await self.get_current_change_id() + 1 for i in range(0, len(changed_elements), 2): element_id = changed_elements[i] @@ -610,17 +600,14 @@ class MemmoryCacheProvider: except KeyError: pass - async def get_current_change_id(self) -> Optional[int]: - change_data = self.change_id_data - if change_data: - return max(change_data.keys()) - return None + async def get_current_change_id(self) -> int: + if self.change_id_data: + return max(self.change_id_data.keys()) + else: + return await self.get_lowest_change_id() - async def get_lowest_change_id(self) -> Optional[int]: - change_data = self.change_id_data - if change_data: - return min(change_data.keys()) - return None + async def get_lowest_change_id(self) -> int: + return self.default_change_id async def get_schema_version(self) -> Optional[SchemaVersion]: return None diff --git a/tests/conftest.py b/tests/conftest.py index ddd7ceb3a..56041dd25 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +from typing import cast import pytest from asgiref.sync import async_to_sync @@ -7,6 +8,7 @@ from pytest_django.django_compat import is_django_unittest from pytest_django.plugin import validate_django_db from openslides.utils.cache import element_cache +from openslides.utils.cache_providers import MemoryCacheProvider # Set an environment variable to stop the startup command @@ -83,4 +85,4 @@ def reset_cache(request): element_cache.ensure_cache(reset=True) # Set constant default change_id - element_cache.set_default_change_id(1) + cast(MemoryCacheProvider, element_cache.cache_provider).default_change_id = 1 diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 09d66da63..6697073c8 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -39,7 +39,7 @@ async def prepare_element_cache(settings): [Collection1(), Collection2(), TConfig(), TUser(), TProjector()] ) element_cache._cachables = None - await element_cache.async_ensure_cache(default_change_id=1) + await element_cache.async_ensure_cache(default_change_id=2) yield # Reset the cachable_provider element_cache.cachable_provider = orig_cachable_provider @@ -332,7 +332,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config): @pytest.mark.asyncio -async def test_send_get_elements_to_small_change_id(communicator, set_config): +async def test_send_get_elements_too_small_change_id(communicator, set_config): await set_config("general_system_enable_anonymous", True) await communicator.connect() diff --git a/tests/unit/utils/cache_provider.py b/tests/unit/utils/cache_provider.py index 96619e015..27387ef84 100644 --- a/tests/unit/utils/cache_provider.py +++ b/tests/unit/utils/cache_provider.py @@ -1,7 +1,7 @@ import asyncio from typing import Any, Callable, Dict, List -from openslides.utils.cache_providers import Cachable, MemmoryCacheProvider +from openslides.utils.cache_providers import Cachable, MemoryCacheProvider def restrict_elements(elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]: @@ -62,9 +62,9 @@ def example_data(): } -class TTestCacheProvider(MemmoryCacheProvider): +class TTestCacheProvider(MemoryCacheProvider): """ - CacheProvider simular to the MemmoryCacheProvider with special methods for + CacheProvider simular to the MemoryCacheProvider with special methods for testing. """ diff --git a/tests/unit/utils/test_cache.py b/tests/unit/utils/test_cache.py index 59876c588..da6639e2c 100644 --- a/tests/unit/utils/test_cache.py +++ b/tests/unit/utils/test_cache.py @@ -34,7 +34,6 @@ def element_cache(): default_change_id=0, ) element_cache.ensure_cache() - element_cache.set_default_change_id(0) return element_cache @@ -148,13 +147,14 @@ async def test_get_data_since_change_id_0(element_cache): @pytest.mark.asyncio -async def test_get_data_since_change_id_lower_then_in_redis(element_cache): +async def test_get_data_since_change_id_lower_than_in_redis(element_cache): element_cache.cache_provider.full_data = { "app/collection1:1": '{"id": 1, "value": "value1"}', "app/collection1:2": '{"id": 2, "value": "value2"}', "app/collection2:1": '{"id": 1, "key": "value1"}', "app/collection2:2": '{"id": 2, "key": "value2"}', } + element_cache.cache_provider.default_change_id = 2 element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}} with pytest.raises(RuntimeError): await element_cache.get_data_since(None, 1) @@ -196,8 +196,9 @@ async def test_get_data_since_change_id_data_in_db(element_cache): @pytest.mark.asyncio async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache): - with pytest.raises(RuntimeError): - await element_cache.get_data_since(None, 1) + result = await element_cache.get_data_since(None, 1) + + assert result == ({}, []) @pytest.mark.asyncio @@ -279,8 +280,8 @@ async def test_get_restricted_data_2(element_cache): @pytest.mark.asyncio -async def test_get_restricted_data_change_id_lower_then_in_redis(element_cache): - element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}} +async def test_get_restricted_data_change_id_lower_than_in_redis(element_cache): + element_cache.cache_provider.default_change_id = 2 with pytest.raises(RuntimeError): await element_cache.get_data_since(0, 1) @@ -310,5 +311,5 @@ async def test_lowest_change_id_after_updating_lowest_element(element_cache): ) second_lowest_change_id = await element_cache.get_lowest_change_id() - assert first_lowest_change_id == 1 - assert second_lowest_change_id == 1 # The lowest_change_id should not change + assert first_lowest_change_id == 0 + assert second_lowest_change_id == 0 # The lowest_change_id should not change