diff --git a/.travis.yml b/.travis.yml index 249826f5e..fe3e3712a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ matrix: - flake8 openslides tests - isort --check-only --diff --recursive openslides tests - python -m mypy openslides/ - - pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=76 + - pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=75 - language: python cache: @@ -35,7 +35,7 @@ matrix: - flake8 openslides tests - isort --check-only --diff --recursive openslides tests - python -m mypy openslides/ - - pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=76 + - pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=75 - language: node_js node_js: diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index ce25e2e5f..f42df338b 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -77,7 +77,8 @@ class ElementCache: # Start time is used as first change_id if there is non in redis if start_time is None: - start_time = int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds()) + # Use the miliseconds (rounted) since the 2016-02-29. + start_time = int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000 self.start_time = start_time # Contains Futures to controll, that only one client updates the restricted_data. @@ -151,13 +152,7 @@ class ElementCache: if deleted_elements: await self.cache_provider.del_elements(deleted_elements) - # TODO: The provider has to define the new change_id with lua. In other - # case it is possible, that two changes get the same id (which - # would not be a big problem). - change_id = await self.get_next_change_id() - - await self.cache_provider.add_changed_elements(change_id, elements.keys()) - return change_id + return await self.cache_provider.add_changed_elements(self.start_time + 1, elements.keys()) async def get_all_full_data(self) -> Dict[str, List[Dict[str, Any]]]: """ @@ -174,9 +169,10 @@ class ElementCache: return dict(out) async def get_full_data( - self, change_id: int = 0) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: + self, change_id: int = 0, max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: """ - Returns all full_data since change_id. If it does not exist, it is created. + Returns all full_data since change_id until max_change_id (including). + max_change_id -1 means the highest change_id. Returns two values inside a tuple. The first value is a dict where the key is the collection_string and the value is a list of data. The second @@ -202,7 +198,7 @@ class ElementCache: "Catch this exception and rerun the method with change_id=0." .format(change_id, lowest_change_id)) - raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id) + raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, max_change_id=max_change_id) return ( {collection_string: [json.loads(value.decode()) for value in value_list] for collection_string, value_list in raw_changed_elements.items()}, @@ -323,7 +319,8 @@ class ElementCache: async def get_restricted_data( self, user: Optional['CollectionElement'], - change_id: int = 0) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: + change_id: int = 0, + max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: """ Like get_full_data but with restricted_data for an user. """ @@ -332,7 +329,7 @@ class ElementCache: return (await self.get_all_restricted_data(user), []) if not self.use_restricted_data_cache: - changed_elements, deleted_elements = await self.get_full_data(change_id) + changed_elements, deleted_elements = await self.get_full_data(change_id, max_change_id) restricted_data = {} for collection_string, full_data in changed_elements.items(): restricter = self.cachables[collection_string].restrict_elements @@ -353,7 +350,7 @@ class ElementCache: # data, this waits until it is done. await self.update_restricted_data(user) - raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user)) + raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user), max_change_id) return ( {collection_string: [json.loads(value.decode()) for value in value_list] for collection_string, value_list in raw_changed_elements.items()}, @@ -371,15 +368,6 @@ class ElementCache: # Return the score (second element) of the first (and only) element return value[0][1] - async def get_next_change_id(self) -> int: - """ - Returns the next change_id. - - Returns the start time in seconds + 1, if there is no change_id in yet. - """ - current_id = await self.get_current_change_id() - return current_id + 1 - async def get_lowest_change_id(self) -> int: """ Returns the lowest change id. diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 5aba38741..c82a60675 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -3,13 +3,11 @@ from typing import ( TYPE_CHECKING, Any, Dict, - Generator, Iterable, List, Optional, Set, Tuple, - Union, ) from django.apps import apps @@ -35,22 +33,22 @@ class BaseCacheProvider: See RedisCacheProvider as reverence implementation. """ - full_data_cache_key = 'full_data_cache' - restricted_user_cache_key = 'restricted_data_cache:{user_id}' - change_id_cache_key = 'change_id_cache' - lock_key = '_config:updating' + full_data_cache_key = 'full_data' + restricted_user_cache_key = 'restricted_data:{user_id}' + change_id_cache_key = 'change_id' + prefix = 'element_cache_' def __init__(self, *args: Any) -> None: pass def get_full_data_cache_key(self) -> str: - return self.full_data_cache_key + return "".join((self.prefix, self.full_data_cache_key)) def get_restricted_data_cache_key(self, user_id: int) -> str: - return self.restricted_user_cache_key.format(user_id=user_id) + return "".join((self.prefix, self.restricted_user_cache_key.format(user_id=user_id))) def get_change_id_cache_key(self) -> str: - return self.change_id_cache_key + return "".join((self.prefix, self.change_id_cache_key)) async def clear_cache(self) -> None: raise NotImplementedError("CacheProvider has to implement the method clear_cache().") @@ -67,13 +65,17 @@ class BaseCacheProvider: async def del_elements(self, elements: List[str], user_id: Optional[int] = None) -> None: raise NotImplementedError("CacheProvider has to implement the method del_elements().") - async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None: + async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int: raise NotImplementedError("CacheProvider has to implement the method add_changed_elements().") async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: raise NotImplementedError("CacheProvider has to implement the method get_all_data().") - async def get_data_since(self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]: + async def get_data_since( + self, + change_id: int, + user_id: Optional[int] = None, + max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]: raise NotImplementedError("CacheProvider has to implement the method get_data_since().") async def get_element(self, element_id: str) -> Optional[bytes]: @@ -141,17 +143,17 @@ class RedisCacheProvider(BaseCacheProvider): Deleted all cache entries created with this element cache. """ async with self.get_connection() as redis: - # TODO: Fix me. Do only delete keys, that are created with this cache. - await redis.flushall() + await redis.eval("return redis.call('del', 'fake_key', unpack(redis.call('keys', ARGV[1])))", keys=[], args=["{}*".format(self.prefix)]) async def reset_full_cache(self, data: Dict[str, str]) -> None: """ - Deletes the cache and write new data in it. + Deletes the full_data_cache and write new data in it. """ - # TODO: lua or transaction async with self.get_connection() as redis: - await redis.delete(self.get_full_data_cache_key()) - await redis.hmset_dict(self.get_full_data_cache_key(), data) + tr = redis.multi_exec() + tr.delete(self.get_full_data_cache_key()) + tr.hmset_dict(self.get_full_data_cache_key(), data) + await tr.execute() async def data_exists(self, user_id: Optional[int] = None) -> bool: """ @@ -197,25 +199,18 @@ class RedisCacheProvider(BaseCacheProvider): cache_key, *elements) - async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None: + async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int: """ Saves which elements are change with a change_id. - args has to be an even iterable. The odd values have to be a change id (int) and the - even values have to be element_ids. + Generates and returns the change_id. """ - def zadd_args(change_id: int) -> Generator[Union[int, str], None, None]: - """ - Small helper to generates the arguments for the redis command zadd. - """ - for element_id in element_ids: - yield change_id - yield element_id - async with self.get_connection() as redis: - await redis.zadd(self.get_change_id_cache_key(), *zadd_args(change_id)) - # Saves the lowest_change_id if it does not exist - await redis.zadd(self.get_change_id_cache_key(), change_id, '_config:lowest_change_id', exist='ZSET_IF_NOT_EXIST') + return int(await redis.eval( + lua_script_change_data, + keys=[self.get_change_id_cache_key()], + args=[default_change_id, *element_ids] + )) async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: """ @@ -242,7 +237,11 @@ class RedisCacheProvider(BaseCacheProvider): self.get_full_data_cache_key(), element_id) - async def get_data_since(self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]: + async def get_data_since( + self, + change_id: int, + user_id: Optional[int] = None, + max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]: """ Returns all elements since a change_id. @@ -253,21 +252,48 @@ class RedisCacheProvider(BaseCacheProvider): if user_id is None, the full_data is returned. If user_id is an int, the restricted_data for an user is used. 0 is for the anonymous user. """ - # TODO: rewrite with lua to get all elements with one request + changed_elements: Dict[str, List[bytes]] = defaultdict(list) + deleted_elements: List[str] = [] + if user_id is None: + cache_key = self.get_full_data_cache_key() + else: + cache_key = self.get_restricted_data_cache_key(user_id) + + # Convert max_change_id to a string. If its negative, use the string '+inf' + redis_max_change_id = "+inf" if max_change_id < 0 else str(max_change_id) async with self.get_connection() as redis: - changed_elements: Dict[str, List[bytes]] = defaultdict(list) - deleted_elements: List[str] = [] - for element_id in await redis.zrangebyscore(self.get_change_id_cache_key(), min=change_id): - if element_id.startswith(b'_config'): - continue - element_json = await redis.hget(self.get_full_data_cache_key(), element_id) # Optional[bytes] - if element_json is None: - # The element is not in the cache. It has to be deleted. - deleted_elements.append(element_id) - else: - collection_string, id = split_element_id(element_id) - changed_elements[collection_string].append(element_json) - return changed_elements, deleted_elements + # lua script that returns gets all element_ids from change_id_cache_key + # and then uses each element_id on full_data or restricted_data. + # It returns a list where the odd values are the change_id and the + # even values the element as json. The function wait_make_dict creates + # a python dict from the returned list. + elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(redis.eval( + """ + -- Get change ids of changed elements + local element_ids = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2]) + + -- Save elements in array. Rotate element_id and element_json + local elements = {} + for _, element_id in pairs(element_ids) do + table.insert(elements, element_id) + table.insert(elements, redis.call('hget', KEYS[2], element_id)) + end + return elements + """, + keys=[self.get_change_id_cache_key(), cache_key], + args=[change_id, redis_max_change_id])) + + for element_id, element_json in elements.items(): + if element_id.startswith(b'_config'): + # Ignore config values from the change_id cache key + continue + if element_json is None: + # The element is not in the cache. It has to be deleted. + deleted_elements.append(element_id.decode()) + else: + collection_string, id = split_element_id(element_id) + changed_elements[collection_string].append(element_json) + return changed_elements, deleted_elements async def del_restricted_data(self, user_id: int) -> None: """ @@ -284,15 +310,16 @@ class RedisCacheProvider(BaseCacheProvider): Returns False when the lock was already set. """ + # TODO: Improve lock. See: https://redis.io/topics/distlock async with self.get_connection() as redis: - return await redis.hsetnx("lock_{}".format(lock_name), self.lock_key, 1) + return await redis.setnx("{}lock_{}".format(self.prefix, lock_name), 1) async def get_lock(self, lock_name: str) -> bool: """ Returns True, when the lock for the restricted_data of an user is set. Else False. """ async with self.get_connection() as redis: - return await redis.hget("lock_{}".format(lock_name), self.lock_key) + return await redis.get("{}lock_{}".format(self.prefix, lock_name)) async def del_lock(self, lock_name: str) -> None: """ @@ -300,7 +327,7 @@ class RedisCacheProvider(BaseCacheProvider): lock is not set. """ async with self.get_connection() as redis: - await redis.hdel("lock_{}".format(lock_name), self.lock_key) + await redis.delete("{}lock_{}".format(self.prefix, lock_name)) async def get_change_id_user(self, user_id: int) -> Optional[int]: """ @@ -396,14 +423,19 @@ class MemmoryCacheProvider(BaseCacheProvider): except KeyError: pass - async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None: + async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int: element_ids = list(element_ids) + try: + change_id = (await self.get_current_change_id())[0][1] + 1 + except IndexError: + change_id = default_change_id for element_id in element_ids: if change_id in self.change_id_data: self.change_id_data[change_id].add(element_id) else: self.change_id_data[change_id] = {element_id} + return change_id async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: if user_id is None: @@ -418,7 +450,10 @@ class MemmoryCacheProvider(BaseCacheProvider): return value.encode() if value is not None else None async def get_data_since( - self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]: + self, + change_id: int, + user_id: Optional[int] = None, + max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]: changed_elements: Dict[str, List[bytes]] = defaultdict(list) deleted_elements: List[str] = [] if user_id is None: @@ -427,7 +462,7 @@ class MemmoryCacheProvider(BaseCacheProvider): cache_dict = self.restricted_data.get(user_id, {}) for data_change_id, element_ids in self.change_id_data.items(): - if data_change_id < change_id: + if data_change_id < change_id or (max_change_id > -1 and data_change_id > max_change_id): continue for element_id in element_ids: element_json = cache_dict.get(element_id, None) @@ -530,3 +565,28 @@ def get_all_cachables() -> List[Cachable]: continue out.extend(get_startup_elements()) return out + + +lua_script_change_data = """ +-- Generate a new change_id +local tmp = redis.call('zrevrangebyscore', KEYS[1], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1) +local change_id +if next(tmp) == nil then + -- The key does not exist + change_id = ARGV[1] +else + change_id = tmp[2] + 1 +end + +-- Add elements to sorted set +local count = 2 +while ARGV[count] do + redis.call('zadd', KEYS[1], change_id, ARGV[count]) + count = count + 1 +end + +-- Set lowest_change_id if it does not exist +redis.call('zadd', KEYS[1], 'NX', change_id, '_config:lowest_change_id') + +return change_id +""" diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 1fbe5edc7..ab05b79fa 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -170,7 +170,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ change_id = event['change_id'] output = [] - changed_elements, deleted_elements = await element_cache.get_restricted_data(self.scope['user'], change_id) + changed_elements, deleted_elements = await element_cache.get_restricted_data(self.scope['user'], change_id, max_change_id=change_id) for collection_string, elements in changed_elements.items(): for element in elements: output.append(format_for_autoupdate( diff --git a/tests/settings.py b/tests/settings.py index fa2083111..765edc90e 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -42,7 +42,7 @@ DATABASES = { 'ENGINE': 'django.db.backends.sqlite3', } } -REDIS_ADDRESS = "redis://127.0.0.1" + SESSION_ENGINE = "django.contrib.sessions.backends.cache" # Internationalization