bc442210fb
* delete only keys with prefix * Make redis_provider atomic with transactions and lua scripts * improve lock * generate change_id in redis to make sure it is uniq * use miliseconds as starttime * add argument use max_change_id to get_{full|resticted}_data
400 lines
17 KiB
Python
400 lines
17 KiB
Python
import asyncio
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from time import sleep
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Tuple,
|
|
Type,
|
|
)
|
|
|
|
from asgiref.sync import async_to_sync, sync_to_async
|
|
from django.conf import settings
|
|
|
|
from .cache_providers import (
|
|
BaseCacheProvider,
|
|
Cachable,
|
|
MemmoryCacheProvider,
|
|
RedisCacheProvider,
|
|
get_all_cachables,
|
|
no_redis_dependency,
|
|
)
|
|
from .utils import get_element_id, get_user_id, split_element_id
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
# Dummy import Collection for mypy, can be fixed with python 3.7
|
|
from .collection import CollectionElement # noqa
|
|
|
|
|
|
class ElementCache:
|
|
"""
|
|
Cache for the CollectionElements.
|
|
|
|
Saves the full_data and if enabled the restricted data.
|
|
|
|
There is one redis Hash (simular to python dict) for the full_data and one
|
|
Hash for every user.
|
|
|
|
The key of the Hashes is COLLECTIONSTRING:ID where COLLECTIONSTRING is the
|
|
collection_string of a collection and id the id of an element.
|
|
|
|
All elements have to be in the cache. If one element is missing, the cache
|
|
is invalid, but this can not be detected. When a plugin with a new
|
|
collection is added to OpenSlides, then the cache has to be rebuild manualy.
|
|
|
|
There is an sorted set in redis with the change id as score. The values are
|
|
COLLETIONSTRING:ID for the elements that have been changed with that change
|
|
id. With this key it is possible, to get all elements as full_data or as
|
|
restricted_data that are newer then a specific change id.
|
|
|
|
All method of this class are async. You either have to call them with
|
|
await in an async environment or use asgiref.sync.async_to_sync().
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
redis: str,
|
|
use_restricted_data_cache: bool = False,
|
|
cache_provider_class: Type[BaseCacheProvider] = RedisCacheProvider,
|
|
cachable_provider: Callable[[], List[Cachable]] = get_all_cachables,
|
|
start_time: int = None) -> None:
|
|
"""
|
|
Initializes the cache.
|
|
|
|
When restricted_data_cache is false, no restricted data is saved.
|
|
"""
|
|
self.use_restricted_data_cache = use_restricted_data_cache
|
|
self.cache_provider = cache_provider_class(redis)
|
|
self.cachable_provider = cachable_provider
|
|
self._cachables: Optional[Dict[str, Cachable]] = None
|
|
|
|
# Start time is used as first change_id if there is non in redis
|
|
if start_time is None:
|
|
# Use the miliseconds (rounted) since the 2016-02-29.
|
|
start_time = int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000
|
|
self.start_time = start_time
|
|
|
|
# Contains Futures to controll, that only one client updates the restricted_data.
|
|
self.restricted_data_cache_updater: Dict[int, asyncio.Future] = {}
|
|
|
|
# Tells if self.ensure_cache was called.
|
|
self.ensured = False
|
|
|
|
@property
|
|
def cachables(self) -> Dict[str, Cachable]:
|
|
"""
|
|
Returns all Cachables as a dict where the key is the collection_string of the cachable.
|
|
"""
|
|
# This method is neccessary to lazy load the cachables
|
|
if self._cachables is None:
|
|
self._cachables = {cachable.get_collection_string(): cachable for cachable in self.cachable_provider()}
|
|
return self._cachables
|
|
|
|
def ensure_cache(self, reset: bool = False) -> None:
|
|
"""
|
|
Makes sure that the cache exist.
|
|
|
|
Builds the cache if not. If reset is True, it will be reset in any case.
|
|
|
|
This method is sync, so it can be run when OpenSlides starts.
|
|
"""
|
|
cache_exists = async_to_sync(self.cache_provider.data_exists)()
|
|
|
|
if reset or not cache_exists:
|
|
lock_name = 'ensure_cache'
|
|
# Set a lock so only one process builds the cache
|
|
if async_to_sync(self.cache_provider.set_lock)(lock_name):
|
|
try:
|
|
mapping = {}
|
|
for collection_string, cachable in self.cachables.items():
|
|
for element in cachable.get_elements():
|
|
mapping.update(
|
|
{get_element_id(collection_string, element['id']):
|
|
json.dumps(element)})
|
|
async_to_sync(self.cache_provider.reset_full_cache)(mapping)
|
|
finally:
|
|
async_to_sync(self.cache_provider.del_lock)(lock_name)
|
|
else:
|
|
while async_to_sync(self.cache_provider.get_lock)(lock_name):
|
|
sleep(0.01)
|
|
|
|
self.ensured = True
|
|
|
|
async def change_elements(
|
|
self, elements: Dict[str, Optional[Dict[str, Any]]]) -> int:
|
|
"""
|
|
Changes elements in the cache.
|
|
|
|
elements is a list of the changed elements as dict. When the value is None,
|
|
it is interpreded as deleted. The key has to be an element_id.
|
|
|
|
Returns the new generated change_id.
|
|
"""
|
|
deleted_elements = []
|
|
changed_elements = []
|
|
for element_id, data in elements.items():
|
|
if data:
|
|
# The arguments for redis.hset is pairs of key value
|
|
changed_elements.append(element_id)
|
|
changed_elements.append(json.dumps(data))
|
|
else:
|
|
deleted_elements.append(element_id)
|
|
|
|
if changed_elements:
|
|
await self.cache_provider.add_elements(changed_elements)
|
|
if deleted_elements:
|
|
await self.cache_provider.del_elements(deleted_elements)
|
|
|
|
return await self.cache_provider.add_changed_elements(self.start_time + 1, elements.keys())
|
|
|
|
async def get_all_full_data(self) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""
|
|
Returns all full_data. If it does not exist, it is created.
|
|
|
|
The returned value is a dict where the key is the collection_string and
|
|
the value is a list of data.
|
|
"""
|
|
out: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
full_data = await self.cache_provider.get_all_data()
|
|
for element_id, data in full_data.items():
|
|
collection_string, __ = split_element_id(element_id)
|
|
out[collection_string].append(json.loads(data.decode()))
|
|
return dict(out)
|
|
|
|
async def get_full_data(
|
|
self, change_id: int = 0, max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
|
|
"""
|
|
Returns all full_data since change_id until max_change_id (including).
|
|
max_change_id -1 means the highest change_id.
|
|
|
|
Returns two values inside a tuple. The first value is a dict where the
|
|
key is the collection_string and the value is a list of data. The second
|
|
is a list of element_ids with deleted elements.
|
|
|
|
Only returns elements with the change_id or newer. When change_id is 0,
|
|
all elements are returned.
|
|
|
|
Raises a RuntimeError when the lowest change_id in redis is higher then
|
|
the requested change_id. In this case the method has to be rerun with
|
|
change_id=0. This is importend because there could be deleted elements
|
|
that the cache does not know about.
|
|
"""
|
|
if change_id == 0:
|
|
return (await self.get_all_full_data(), [])
|
|
|
|
lowest_change_id = await self.get_lowest_change_id()
|
|
if change_id < lowest_change_id:
|
|
# When change_id is lower then the lowest change_id in redis, we can
|
|
# not inform the user about deleted elements.
|
|
raise RuntimeError(
|
|
"change_id {} is lower then the lowest change_id in redis {}. "
|
|
"Catch this exception and rerun the method with change_id=0."
|
|
.format(change_id, lowest_change_id))
|
|
|
|
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, max_change_id=max_change_id)
|
|
return (
|
|
{collection_string: [json.loads(value.decode()) for value in value_list]
|
|
for collection_string, value_list in raw_changed_elements.items()},
|
|
deleted_elements)
|
|
|
|
async def get_element_full_data(self, collection_string: str, id: int) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Returns one element as full data.
|
|
|
|
If the cache is empty, it is created.
|
|
|
|
Returns None if the element does not exist.
|
|
"""
|
|
element = await self.cache_provider.get_element(get_element_id(collection_string, id))
|
|
|
|
if element is None:
|
|
return None
|
|
return json.loads(element.decode())
|
|
|
|
async def exists_restricted_data(self, user: Optional['CollectionElement']) -> bool:
|
|
"""
|
|
Returns True, if the restricted_data exists for the user.
|
|
"""
|
|
if not self.use_restricted_data_cache:
|
|
return False
|
|
|
|
return await self.cache_provider.data_exists(get_user_id(user))
|
|
|
|
async def del_user(self, user: Optional['CollectionElement']) -> None:
|
|
"""
|
|
Removes one user from the resticted_data_cache.
|
|
"""
|
|
await self.cache_provider.del_restricted_data(get_user_id(user))
|
|
|
|
async def update_restricted_data(
|
|
self, user: Optional['CollectionElement']) -> None:
|
|
"""
|
|
Updates the restricted data for an user from the full_data_cache.
|
|
"""
|
|
# TODO: When elements are changed at the same time then this method run
|
|
# this could make the cache invalid.
|
|
# This could be fixed when get_full_data would be used with a
|
|
# max change_id.
|
|
if not self.use_restricted_data_cache:
|
|
# If the restricted_data_cache is not used, there is nothing to do
|
|
return
|
|
|
|
# Try to write a special key.
|
|
# If this succeeds, there is noone else currently updating the cache.
|
|
# TODO: Make a timeout. Else this could block forever
|
|
lock_name = "restricted_data_{}".format(get_user_id(user))
|
|
if await self.cache_provider.set_lock(lock_name):
|
|
future: asyncio.Future = asyncio.Future()
|
|
self.restricted_data_cache_updater[get_user_id(user)] = future
|
|
# Get change_id for this user
|
|
value = await self.cache_provider.get_change_id_user(get_user_id(user))
|
|
# If the change id is not in the cache yet, use -1 to get all data since 0
|
|
user_change_id = int(value) if value else -1
|
|
change_id = await self.get_current_change_id()
|
|
if change_id > user_change_id:
|
|
try:
|
|
full_data_elements, deleted_elements = await self.get_full_data(user_change_id + 1)
|
|
except RuntimeError:
|
|
# The user_change_id is lower then the lowest change_id in the cache.
|
|
# The whole restricted_data for that user has to be recreated.
|
|
full_data_elements = await self.get_all_full_data()
|
|
await self.cache_provider.del_restricted_data(get_user_id(user))
|
|
else:
|
|
# Remove deleted elements
|
|
if deleted_elements:
|
|
await self.cache_provider.del_elements(deleted_elements, get_user_id(user))
|
|
|
|
mapping = {}
|
|
for collection_string, full_data in full_data_elements.items():
|
|
restricter = self.cachables[collection_string].restrict_elements
|
|
elements = await sync_to_async(restricter)(user, full_data)
|
|
for element in elements:
|
|
mapping.update(
|
|
{get_element_id(collection_string, element['id']):
|
|
json.dumps(element)})
|
|
mapping['_config:change_id'] = str(change_id)
|
|
await self.cache_provider.update_restricted_data(get_user_id(user), mapping)
|
|
# Unset the lock
|
|
await self.cache_provider.del_lock(lock_name)
|
|
future.set_result(1)
|
|
else:
|
|
# Wait until the update if finshed
|
|
if get_user_id(user) in self.restricted_data_cache_updater:
|
|
# The active worker is on the same asgi server, we can use the future
|
|
await self.restricted_data_cache_updater[get_user_id(user)]
|
|
else:
|
|
while await self.cache_provider.get_lock(lock_name):
|
|
await asyncio.sleep(0.01)
|
|
|
|
async def get_all_restricted_data(self, user: Optional['CollectionElement']) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""
|
|
Like get_all_full_data but with restricted_data for an user.
|
|
"""
|
|
if not self.use_restricted_data_cache:
|
|
all_restricted_data = {}
|
|
for collection_string, full_data in (await self.get_all_full_data()).items():
|
|
restricter = self.cachables[collection_string].restrict_elements
|
|
elements = await sync_to_async(restricter)(user, full_data)
|
|
all_restricted_data[collection_string] = elements
|
|
return all_restricted_data
|
|
|
|
await self.update_restricted_data(user)
|
|
|
|
out: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
|
restricted_data = await self.cache_provider.get_all_data(get_user_id(user))
|
|
for element_id, data in restricted_data.items():
|
|
if element_id.decode().startswith('_config'):
|
|
continue
|
|
collection_string, __ = split_element_id(element_id)
|
|
out[collection_string].append(json.loads(data.decode()))
|
|
return dict(out)
|
|
|
|
async def get_restricted_data(
|
|
self,
|
|
user: Optional['CollectionElement'],
|
|
change_id: int = 0,
|
|
max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
|
|
"""
|
|
Like get_full_data but with restricted_data for an user.
|
|
"""
|
|
if change_id == 0:
|
|
# Return all data
|
|
return (await self.get_all_restricted_data(user), [])
|
|
|
|
if not self.use_restricted_data_cache:
|
|
changed_elements, deleted_elements = await self.get_full_data(change_id, max_change_id)
|
|
restricted_data = {}
|
|
for collection_string, full_data in changed_elements.items():
|
|
restricter = self.cachables[collection_string].restrict_elements
|
|
elements = await sync_to_async(restricter)(user, full_data)
|
|
restricted_data[collection_string] = elements
|
|
return restricted_data, deleted_elements
|
|
|
|
lowest_change_id = await self.get_lowest_change_id()
|
|
if change_id < lowest_change_id:
|
|
# When change_id is lower then the lowest change_id in redis, we can
|
|
# not inform the user about deleted elements.
|
|
raise RuntimeError(
|
|
"change_id {} is lower then the lowest change_id in redis {}. "
|
|
"Catch this exception and rerun the method with change_id=0."
|
|
.format(change_id, lowest_change_id))
|
|
|
|
# If another coroutine or another daphne server also updates the restricted
|
|
# data, this waits until it is done.
|
|
await self.update_restricted_data(user)
|
|
|
|
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user), max_change_id)
|
|
return (
|
|
{collection_string: [json.loads(value.decode()) for value in value_list]
|
|
for collection_string, value_list in raw_changed_elements.items()},
|
|
deleted_elements)
|
|
|
|
async def get_current_change_id(self) -> int:
|
|
"""
|
|
Returns the current change id.
|
|
|
|
Returns start_time if there is no change id yet.
|
|
"""
|
|
value = await self.cache_provider.get_current_change_id()
|
|
if not value:
|
|
return self.start_time
|
|
# Return the score (second element) of the first (and only) element
|
|
return value[0][1]
|
|
|
|
async def get_lowest_change_id(self) -> int:
|
|
"""
|
|
Returns the lowest change id.
|
|
|
|
Raises a RuntimeError if there is no change_id.
|
|
"""
|
|
value = await self.cache_provider.get_lowest_change_id()
|
|
if not value:
|
|
raise RuntimeError('There is no known change_id.')
|
|
# Return the score (second element) of the first (and only) element
|
|
return value
|
|
|
|
|
|
def load_element_cache(redis_addr: str = '', restricted_data: bool = True) -> ElementCache:
|
|
"""
|
|
Generates an element cache instance.
|
|
"""
|
|
if not redis_addr:
|
|
return ElementCache(redis='', cache_provider_class=MemmoryCacheProvider)
|
|
|
|
if no_redis_dependency:
|
|
raise ImportError("OpenSlides is configured to use redis as cache backend, but aioredis is not installed.")
|
|
return ElementCache(redis=redis_addr, use_restricted_data_cache=restricted_data)
|
|
|
|
|
|
# Set the element_cache
|
|
redis_address = getattr(settings, 'REDIS_ADDRESS', '')
|
|
use_restricted_data = getattr(settings, 'RESTRICTED_DATA_CACHE', True)
|
|
element_cache = load_element_cache(redis_addr=redis_address, restricted_data=use_restricted_data)
|