from collections import defaultdict from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from django.apps import apps from typing_extensions import Protocol from .redis import use_redis from .utils import split_element_id, str_dict_to_bytes if use_redis: from .redis import get_connection, aioredis class ElementCacheProvider(Protocol): """ Base class for cache provider. See RedisCacheProvider as reverence implementation. """ async def clear_cache(self) -> None: ... async def reset_full_cache(self, data: Dict[str, str]) -> None: ... async def data_exists(self, user_id: Optional[int] = None) -> bool: ... async def add_elements(self, elements: List[str]) -> None: ... async def del_elements( self, elements: List[str], user_id: Optional[int] = None ) -> None: ... async def add_changed_elements( self, default_change_id: int, element_ids: Iterable[str] ) -> int: ... async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: ... async def get_collection_data( self, collection: str, user_id: Optional[int] = None ) -> Dict[bytes, bytes]: ... async def get_data_since( self, change_id: int, user_id: Optional[int] = None, max_change_id: int = -1 ) -> Tuple[Dict[str, List[bytes]], List[str]]: ... async def get_element( self, element_id: str, user_id: Optional[int] = None ) -> Optional[bytes]: ... async def del_restricted_data(self, user_id: int) -> None: ... async def set_lock(self, lock_name: str) -> bool: ... async def get_lock(self, lock_name: str) -> bool: ... async def del_lock(self, lock_name: str) -> None: ... async def get_change_id_user(self, user_id: int) -> Optional[int]: ... async def update_restricted_data(self, user_id: int, data: Dict[str, str]) -> None: ... async def get_current_change_id(self) -> List[Tuple[str, int]]: ... async def get_lowest_change_id(self) -> Optional[int]: ... class RedisCacheProvider: """ Cache provider that loads and saves the data to redis. """ full_data_cache_key: str = "full_data" restricted_user_cache_key: str = "restricted_data:{user_id}" change_id_cache_key: str = "change_id" prefix: str = "element_cache_" def get_full_data_cache_key(self) -> str: return "".join((self.prefix, self.full_data_cache_key)) def get_restricted_data_cache_key(self, user_id: int) -> str: return "".join( (self.prefix, self.restricted_user_cache_key.format(user_id=user_id)) ) def get_change_id_cache_key(self) -> str: return "".join((self.prefix, self.change_id_cache_key)) async def clear_cache(self) -> None: """ Deleted all cache entries created with this element cache. """ async with get_connection() as redis: await redis.eval( "return redis.call('del', 'fake_key', unpack(redis.call('keys', ARGV[1])))", keys=[], args=[f"{self.prefix}*"], ) async def reset_full_cache(self, data: Dict[str, str]) -> None: """ Deletes the full_data_cache and write new data in it. Also deletes the restricted_data_cache and the change_id_cache. """ async with get_connection() as redis: tr = redis.multi_exec() # like clear_cache but does not delete a lock tr.eval( "return redis.call('del', 'fake_key', unpack(redis.call('keys', ARGV[1])))", keys=[], args=[f"{self.prefix}{self.restricted_user_cache_key}*"], ) tr.delete(self.get_change_id_cache_key()) tr.delete(self.get_full_data_cache_key()) tr.hmset_dict(self.get_full_data_cache_key(), data) await tr.execute() async def data_exists(self, user_id: Optional[int] = None) -> bool: """ Returns True, when there is data in the cache. If user_id is None, the method tests for full_data. If user_id is an int, it tests for the restricted_data_cache for the user with the user_id. 0 is for anonymous. """ async with get_connection() as redis: if user_id is None: cache_key = self.get_full_data_cache_key() else: cache_key = self.get_restricted_data_cache_key(user_id) return await redis.exists(cache_key) async def add_elements(self, elements: List[str]) -> None: """ Add or change elements to the cache. elements is a list with an even len. the odd values are the element_ids and the even values are the elements. The elements have to be encoded, for example with json. """ async with get_connection() as redis: await redis.hmset(self.get_full_data_cache_key(), *elements) async def del_elements( self, elements: List[str], user_id: Optional[int] = None ) -> None: """ Deletes elements from the cache. elements has to be a list of element_ids. If user_id is None, the elements are deleted from the full_data cache. If user_id is an int, the elements are deleted one restricted_data_cache. 0 is for anonymous. """ async with get_connection() as redis: if user_id is None: cache_key = self.get_full_data_cache_key() else: cache_key = self.get_restricted_data_cache_key(user_id) await redis.hdel(cache_key, *elements) async def add_changed_elements( self, default_change_id: int, element_ids: Iterable[str] ) -> int: """ Saves which elements are change with a change_id. Generates and returns the change_id. """ async with get_connection() as redis: return int( await redis.eval( lua_script_change_data, keys=[self.get_change_id_cache_key()], args=[default_change_id, *element_ids], ) ) async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: """ Returns all data from a cache. if user_id is None, then the data is returned from the full_data_cache. If it is and int, it is returned from a restricted_data_cache. 0 is for anonymous. """ if user_id is None: cache_key = self.get_full_data_cache_key() else: cache_key = self.get_restricted_data_cache_key(user_id) async with get_connection() as redis: return await redis.hgetall(cache_key) async def get_collection_data( self, collection: str, user_id: Optional[int] = None ) -> Dict[bytes, bytes]: """ Returns all elements for a collection from the cache. """ if user_id is None: cache_key = self.get_full_data_cache_key() else: cache_key = self.get_restricted_data_cache_key(user_id) async with get_connection() as redis: out = {} async for k, v in redis.ihscan(cache_key, match=f"{collection}:*"): out[k] = v return out async def get_element( self, element_id: str, user_id: Optional[int] = None ) -> Optional[bytes]: """ Returns one element from the cache. Returns None, when the element does not exist. """ if user_id is None: cache_key = self.get_full_data_cache_key() else: cache_key = self.get_restricted_data_cache_key(user_id) async with get_connection() as redis: return await redis.hget(cache_key, element_id) async def get_data_since( self, change_id: int, user_id: Optional[int] = None, max_change_id: int = -1 ) -> Tuple[Dict[str, List[bytes]], List[str]]: """ Returns all elements since a change_id. The returend value is a two element tuple. The first value is a dict the elements where the key is the collection_string and the value a list of (json-) encoded elements. The second element is a list of element_ids, that have been deleted since the change_id. if user_id is None, the full_data is returned. If user_id is an int, the restricted_data for an user is used. 0 is for the anonymous user. """ changed_elements: Dict[str, List[bytes]] = defaultdict(list) deleted_elements: List[str] = [] if user_id is None: cache_key = self.get_full_data_cache_key() else: cache_key = self.get_restricted_data_cache_key(user_id) # Convert max_change_id to a string. If its negative, use the string '+inf' redis_max_change_id = "+inf" if max_change_id < 0 else str(max_change_id) async with get_connection() as redis: # lua script that returns gets all element_ids from change_id_cache_key # and then uses each element_id on full_data or restricted_data. # It returns a list where the odd values are the change_id and the # even values the element as json. The function wait_make_dict creates # a python dict from the returned list. elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict( redis.eval( """ -- Get change ids of changed elements local element_ids = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2]) -- Save elements in array. Rotate element_id and element_json local elements = {} for _, element_id in pairs(element_ids) do table.insert(elements, element_id) table.insert(elements, redis.call('hget', KEYS[2], element_id)) end return elements """, keys=[self.get_change_id_cache_key(), cache_key], args=[change_id, redis_max_change_id], ) ) for element_id, element_json in elements.items(): if element_id.startswith(b"_config"): # Ignore config values from the change_id cache key continue if element_json is None: # The element is not in the cache. It has to be deleted. deleted_elements.append(element_id.decode()) else: collection_string, id = split_element_id(element_id) changed_elements[collection_string].append(element_json) return changed_elements, deleted_elements async def del_restricted_data(self, user_id: int) -> None: """ Deletes all restricted_data for an user. 0 is for the anonymous user. """ async with get_connection() as redis: await redis.delete(self.get_restricted_data_cache_key(user_id)) async def set_lock(self, lock_name: str) -> bool: """ Tries to sets a lock. Returns True when the lock could be set. Returns False when the lock was already set. """ # TODO: Improve lock. See: https://redis.io/topics/distlock async with get_connection() as redis: return await redis.setnx(f"{self.prefix}lock_{lock_name}", 1) async def get_lock(self, lock_name: str) -> bool: """ Returns True, when the lock for the restricted_data of an user is set. Else False. """ async with get_connection() as redis: return await redis.get(f"{self.prefix}lock_{lock_name}") async def del_lock(self, lock_name: str) -> None: """ Deletes the lock for the restricted_data of an user. Does nothing when the lock is not set. """ async with get_connection() as redis: await redis.delete(f"{self.prefix}lock_{lock_name}") async def get_change_id_user(self, user_id: int) -> Optional[int]: """ Get the change_id for the restricted_data of an user. This is the change_id where the restricted_data was last calculated. """ async with get_connection() as redis: return await redis.hget( self.get_restricted_data_cache_key(user_id), "_config:change_id" ) async def update_restricted_data(self, user_id: int, data: Dict[str, str]) -> None: """ Updates the restricted_data for an user. data has to be a dict where the key is an element_id and the value the (json-) encoded element. """ async with get_connection() as redis: await redis.hmset_dict(self.get_restricted_data_cache_key(user_id), data) async def get_current_change_id(self) -> List[Tuple[str, int]]: """ Get the highest change_id from redis. """ async with get_connection() as redis: return await redis.zrevrangebyscore( self.get_change_id_cache_key(), withscores=True, count=1, offset=0 ) async def get_lowest_change_id(self) -> Optional[int]: """ Get the lowest change_id from redis. Returns None if lowest score does not exist. """ async with get_connection() as redis: return await redis.zscore( self.get_change_id_cache_key(), "_config:lowest_change_id" ) class MemmoryCacheProvider: """ CacheProvider for the ElementCache that uses only the memory. See the RedisCacheProvider for a description of the methods. This provider supports only one process. It saves the data into the memory. When you use different processes they will use diffrent data. """ def __init__(self) -> None: self.set_data_dicts() def set_data_dicts(self) -> None: self.full_data: Dict[str, str] = {} self.restricted_data: Dict[int, Dict[str, str]] = {} self.change_id_data: Dict[int, Set[str]] = {} self.locks: Dict[str, str] = {} async def clear_cache(self) -> None: self.set_data_dicts() async def reset_full_cache(self, data: Dict[str, str]) -> None: self.full_data = data async def data_exists(self, user_id: Optional[int] = None) -> bool: if user_id is None: cache_dict = self.full_data else: cache_dict = self.restricted_data.get(user_id, {}) return bool(cache_dict) async def add_elements(self, elements: List[str]) -> None: if len(elements) % 2: raise ValueError( "The argument elements of add_elements has to be a list with an even number of elements." ) for i in range(0, len(elements), 2): self.full_data[elements[i]] = elements[i + 1] async def del_elements( self, elements: List[str], user_id: Optional[int] = None ) -> None: if user_id is None: cache_dict = self.full_data else: cache_dict = self.restricted_data.get(user_id, {}) for element in elements: try: del cache_dict[element] except KeyError: pass async def add_changed_elements( self, default_change_id: int, element_ids: Iterable[str] ) -> int: element_ids = list(element_ids) try: change_id = (await self.get_current_change_id())[0][1] + 1 except IndexError: change_id = default_change_id for element_id in element_ids: if change_id in self.change_id_data: self.change_id_data[change_id].add(element_id) else: self.change_id_data[change_id] = {element_id} return change_id async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: if user_id is None: cache_dict = self.full_data else: cache_dict = self.restricted_data.get(user_id, {}) return str_dict_to_bytes(cache_dict) async def get_collection_data( self, collection: str, user_id: Optional[int] = None ) -> Dict[bytes, bytes]: if user_id is None: cache_dict = self.full_data else: cache_dict = self.restricted_data.get(user_id, {}) out = {} for key, value in cache_dict.items(): if key.startswith(f"{collection}:"): out[key] = value return str_dict_to_bytes(out) async def get_element( self, element_id: str, user_id: Optional[int] = None ) -> Optional[bytes]: if user_id is None: cache_dict = self.full_data else: cache_dict = self.restricted_data.get(user_id, {}) value = cache_dict.get(element_id, None) return value.encode() if value is not None else None async def get_data_since( self, change_id: int, user_id: Optional[int] = None, max_change_id: int = -1 ) -> Tuple[Dict[str, List[bytes]], List[str]]: changed_elements: Dict[str, List[bytes]] = defaultdict(list) deleted_elements: List[str] = [] if user_id is None: cache_dict = self.full_data else: cache_dict = self.restricted_data.get(user_id, {}) all_element_ids: Set[str] = set() for data_change_id, element_ids in self.change_id_data.items(): if data_change_id >= change_id and ( max_change_id == -1 or data_change_id <= max_change_id ): all_element_ids.update(element_ids) for element_id in all_element_ids: element_json = cache_dict.get(element_id, None) if element_json is None: deleted_elements.append(element_id) else: collection_string, id = split_element_id(element_id) changed_elements[collection_string].append(element_json.encode()) return changed_elements, deleted_elements async def del_restricted_data(self, user_id: int) -> None: try: del self.restricted_data[user_id] except KeyError: pass async def set_lock(self, lock_name: str) -> bool: if lock_name in self.locks: return False self.locks[lock_name] = "1" return True async def get_lock(self, lock_name: str) -> bool: return lock_name in self.locks async def del_lock(self, lock_name: str) -> None: try: del self.locks[lock_name] except KeyError: pass async def get_change_id_user(self, user_id: int) -> Optional[int]: data = self.restricted_data.get(user_id, {}) change_id = data.get("_config:change_id", None) return int(change_id) if change_id is not None else None async def update_restricted_data(self, user_id: int, data: Dict[str, str]) -> None: redis_data = self.restricted_data.setdefault(user_id, {}) redis_data.update(data) async def get_current_change_id(self) -> List[Tuple[str, int]]: change_data = self.change_id_data if change_data: return [("no_usefull_value", max(change_data.keys()))] return [] async def get_lowest_change_id(self) -> Optional[int]: change_data = self.change_id_data if change_data: return min(change_data.keys()) return None class Cachable(Protocol): """ A Cachable is an object that returns elements that can be cached. It needs at least the methods defined here. """ def get_collection_string(self) -> str: """ Returns the string representing the name of the cachable. """ def get_elements(self) -> List[Dict[str, Any]]: """ Returns all elements of the cachable. """ async def restrict_elements( self, user_id: int, elements: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """ Converts full_data to restricted_data. elements can be an empty list, a list with some elements of the cachable or with all elements of the cachable. """ def get_all_cachables() -> List[Cachable]: """ Returns all element of OpenSlides. """ out: List[Cachable] = [] for app in apps.get_app_configs(): try: # Get the method get_startup_elements() from an app. # This method has to return an iterable of Cachable objects. get_startup_elements = app.get_startup_elements except AttributeError: # Skip apps that do not implement get_startup_elements. continue out.extend(get_startup_elements()) return out lua_script_change_data = """ -- Generate a new change_id local tmp = redis.call('zrevrangebyscore', KEYS[1], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1) local change_id if next(tmp) == nil then -- The key does not exist change_id = ARGV[1] else change_id = tmp[2] + 1 end -- Add elements to sorted set local count = 2 while ARGV[count] do redis.call('zadd', KEYS[1], change_id, ARGV[count]) count = count + 1 end -- Set lowest_change_id if it does not exist redis.call('zadd', KEYS[1], 'NX', change_id, '_config:lowest_change_id') return change_id """