Improve redis cache

* delete only keys with prefix
* Make redis_provider atomic with transactions and lua scripts
* improve lock
* generate change_id in redis to make sure it is uniq
* use miliseconds as starttime
* add argument use max_change_id to get_{full|resticted}_data
This commit is contained in:
Oskar Hahn 2018-09-23 22:02:09 +02:00
parent becdef26a8
commit bc442210fb
5 changed files with 127 additions and 79 deletions

View File

@ -18,7 +18,7 @@ matrix:
- flake8 openslides tests - flake8 openslides tests
- isort --check-only --diff --recursive openslides tests - isort --check-only --diff --recursive openslides tests
- python -m mypy openslides/ - python -m mypy openslides/
- pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=76 - pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=75
- language: python - language: python
cache: cache:
@ -35,7 +35,7 @@ matrix:
- flake8 openslides tests - flake8 openslides tests
- isort --check-only --diff --recursive openslides tests - isort --check-only --diff --recursive openslides tests
- python -m mypy openslides/ - python -m mypy openslides/
- pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=76 - pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=75
- language: node_js - language: node_js
node_js: node_js:

View File

@ -77,7 +77,8 @@ class ElementCache:
# Start time is used as first change_id if there is non in redis # Start time is used as first change_id if there is non in redis
if start_time is None: if start_time is None:
start_time = int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds()) # Use the miliseconds (rounted) since the 2016-02-29.
start_time = int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000
self.start_time = start_time self.start_time = start_time
# Contains Futures to controll, that only one client updates the restricted_data. # Contains Futures to controll, that only one client updates the restricted_data.
@ -151,13 +152,7 @@ class ElementCache:
if deleted_elements: if deleted_elements:
await self.cache_provider.del_elements(deleted_elements) await self.cache_provider.del_elements(deleted_elements)
# TODO: The provider has to define the new change_id with lua. In other return await self.cache_provider.add_changed_elements(self.start_time + 1, elements.keys())
# case it is possible, that two changes get the same id (which
# would not be a big problem).
change_id = await self.get_next_change_id()
await self.cache_provider.add_changed_elements(change_id, elements.keys())
return change_id
async def get_all_full_data(self) -> Dict[str, List[Dict[str, Any]]]: async def get_all_full_data(self) -> Dict[str, List[Dict[str, Any]]]:
""" """
@ -174,9 +169,10 @@ class ElementCache:
return dict(out) return dict(out)
async def get_full_data( async def get_full_data(
self, change_id: int = 0) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: self, change_id: int = 0, max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
""" """
Returns all full_data since change_id. If it does not exist, it is created. Returns all full_data since change_id until max_change_id (including).
max_change_id -1 means the highest change_id.
Returns two values inside a tuple. The first value is a dict where the Returns two values inside a tuple. The first value is a dict where the
key is the collection_string and the value is a list of data. The second key is the collection_string and the value is a list of data. The second
@ -202,7 +198,7 @@ class ElementCache:
"Catch this exception and rerun the method with change_id=0." "Catch this exception and rerun the method with change_id=0."
.format(change_id, lowest_change_id)) .format(change_id, lowest_change_id))
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id) raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, max_change_id=max_change_id)
return ( return (
{collection_string: [json.loads(value.decode()) for value in value_list] {collection_string: [json.loads(value.decode()) for value in value_list]
for collection_string, value_list in raw_changed_elements.items()}, for collection_string, value_list in raw_changed_elements.items()},
@ -323,7 +319,8 @@ class ElementCache:
async def get_restricted_data( async def get_restricted_data(
self, self,
user: Optional['CollectionElement'], user: Optional['CollectionElement'],
change_id: int = 0) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]: change_id: int = 0,
max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
""" """
Like get_full_data but with restricted_data for an user. Like get_full_data but with restricted_data for an user.
""" """
@ -332,7 +329,7 @@ class ElementCache:
return (await self.get_all_restricted_data(user), []) return (await self.get_all_restricted_data(user), [])
if not self.use_restricted_data_cache: if not self.use_restricted_data_cache:
changed_elements, deleted_elements = await self.get_full_data(change_id) changed_elements, deleted_elements = await self.get_full_data(change_id, max_change_id)
restricted_data = {} restricted_data = {}
for collection_string, full_data in changed_elements.items(): for collection_string, full_data in changed_elements.items():
restricter = self.cachables[collection_string].restrict_elements restricter = self.cachables[collection_string].restrict_elements
@ -353,7 +350,7 @@ class ElementCache:
# data, this waits until it is done. # data, this waits until it is done.
await self.update_restricted_data(user) await self.update_restricted_data(user)
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user)) raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user), max_change_id)
return ( return (
{collection_string: [json.loads(value.decode()) for value in value_list] {collection_string: [json.loads(value.decode()) for value in value_list]
for collection_string, value_list in raw_changed_elements.items()}, for collection_string, value_list in raw_changed_elements.items()},
@ -371,15 +368,6 @@ class ElementCache:
# Return the score (second element) of the first (and only) element # Return the score (second element) of the first (and only) element
return value[0][1] return value[0][1]
async def get_next_change_id(self) -> int:
"""
Returns the next change_id.
Returns the start time in seconds + 1, if there is no change_id in yet.
"""
current_id = await self.get_current_change_id()
return current_id + 1
async def get_lowest_change_id(self) -> int: async def get_lowest_change_id(self) -> int:
""" """
Returns the lowest change id. Returns the lowest change id.

View File

@ -3,13 +3,11 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Dict, Dict,
Generator,
Iterable, Iterable,
List, List,
Optional, Optional,
Set, Set,
Tuple, Tuple,
Union,
) )
from django.apps import apps from django.apps import apps
@ -35,22 +33,22 @@ class BaseCacheProvider:
See RedisCacheProvider as reverence implementation. See RedisCacheProvider as reverence implementation.
""" """
full_data_cache_key = 'full_data_cache' full_data_cache_key = 'full_data'
restricted_user_cache_key = 'restricted_data_cache:{user_id}' restricted_user_cache_key = 'restricted_data:{user_id}'
change_id_cache_key = 'change_id_cache' change_id_cache_key = 'change_id'
lock_key = '_config:updating' prefix = 'element_cache_'
def __init__(self, *args: Any) -> None: def __init__(self, *args: Any) -> None:
pass pass
def get_full_data_cache_key(self) -> str: def get_full_data_cache_key(self) -> str:
return self.full_data_cache_key return "".join((self.prefix, self.full_data_cache_key))
def get_restricted_data_cache_key(self, user_id: int) -> str: def get_restricted_data_cache_key(self, user_id: int) -> str:
return self.restricted_user_cache_key.format(user_id=user_id) return "".join((self.prefix, self.restricted_user_cache_key.format(user_id=user_id)))
def get_change_id_cache_key(self) -> str: def get_change_id_cache_key(self) -> str:
return self.change_id_cache_key return "".join((self.prefix, self.change_id_cache_key))
async def clear_cache(self) -> None: async def clear_cache(self) -> None:
raise NotImplementedError("CacheProvider has to implement the method clear_cache().") raise NotImplementedError("CacheProvider has to implement the method clear_cache().")
@ -67,13 +65,17 @@ class BaseCacheProvider:
async def del_elements(self, elements: List[str], user_id: Optional[int] = None) -> None: async def del_elements(self, elements: List[str], user_id: Optional[int] = None) -> None:
raise NotImplementedError("CacheProvider has to implement the method del_elements().") raise NotImplementedError("CacheProvider has to implement the method del_elements().")
async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None: async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int:
raise NotImplementedError("CacheProvider has to implement the method add_changed_elements().") raise NotImplementedError("CacheProvider has to implement the method add_changed_elements().")
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
raise NotImplementedError("CacheProvider has to implement the method get_all_data().") raise NotImplementedError("CacheProvider has to implement the method get_all_data().")
async def get_data_since(self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]: async def get_data_since(
self,
change_id: int,
user_id: Optional[int] = None,
max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]:
raise NotImplementedError("CacheProvider has to implement the method get_data_since().") raise NotImplementedError("CacheProvider has to implement the method get_data_since().")
async def get_element(self, element_id: str) -> Optional[bytes]: async def get_element(self, element_id: str) -> Optional[bytes]:
@ -141,17 +143,17 @@ class RedisCacheProvider(BaseCacheProvider):
Deleted all cache entries created with this element cache. Deleted all cache entries created with this element cache.
""" """
async with self.get_connection() as redis: async with self.get_connection() as redis:
# TODO: Fix me. Do only delete keys, that are created with this cache. await redis.eval("return redis.call('del', 'fake_key', unpack(redis.call('keys', ARGV[1])))", keys=[], args=["{}*".format(self.prefix)])
await redis.flushall()
async def reset_full_cache(self, data: Dict[str, str]) -> None: async def reset_full_cache(self, data: Dict[str, str]) -> None:
""" """
Deletes the cache and write new data in it. Deletes the full_data_cache and write new data in it.
""" """
# TODO: lua or transaction
async with self.get_connection() as redis: async with self.get_connection() as redis:
await redis.delete(self.get_full_data_cache_key()) tr = redis.multi_exec()
await redis.hmset_dict(self.get_full_data_cache_key(), data) tr.delete(self.get_full_data_cache_key())
tr.hmset_dict(self.get_full_data_cache_key(), data)
await tr.execute()
async def data_exists(self, user_id: Optional[int] = None) -> bool: async def data_exists(self, user_id: Optional[int] = None) -> bool:
""" """
@ -197,25 +199,18 @@ class RedisCacheProvider(BaseCacheProvider):
cache_key, cache_key,
*elements) *elements)
async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None: async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int:
""" """
Saves which elements are change with a change_id. Saves which elements are change with a change_id.
args has to be an even iterable. The odd values have to be a change id (int) and the Generates and returns the change_id.
even values have to be element_ids.
""" """
def zadd_args(change_id: int) -> Generator[Union[int, str], None, None]:
"""
Small helper to generates the arguments for the redis command zadd.
"""
for element_id in element_ids:
yield change_id
yield element_id
async with self.get_connection() as redis: async with self.get_connection() as redis:
await redis.zadd(self.get_change_id_cache_key(), *zadd_args(change_id)) return int(await redis.eval(
# Saves the lowest_change_id if it does not exist lua_script_change_data,
await redis.zadd(self.get_change_id_cache_key(), change_id, '_config:lowest_change_id', exist='ZSET_IF_NOT_EXIST') keys=[self.get_change_id_cache_key()],
args=[default_change_id, *element_ids]
))
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
""" """
@ -242,7 +237,11 @@ class RedisCacheProvider(BaseCacheProvider):
self.get_full_data_cache_key(), self.get_full_data_cache_key(),
element_id) element_id)
async def get_data_since(self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]: async def get_data_since(
self,
change_id: int,
user_id: Optional[int] = None,
max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]:
""" """
Returns all elements since a change_id. Returns all elements since a change_id.
@ -253,21 +252,48 @@ class RedisCacheProvider(BaseCacheProvider):
if user_id is None, the full_data is returned. If user_id is an int, the restricted_data if user_id is None, the full_data is returned. If user_id is an int, the restricted_data
for an user is used. 0 is for the anonymous user. for an user is used. 0 is for the anonymous user.
""" """
# TODO: rewrite with lua to get all elements with one request changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = []
if user_id is None:
cache_key = self.get_full_data_cache_key()
else:
cache_key = self.get_restricted_data_cache_key(user_id)
# Convert max_change_id to a string. If its negative, use the string '+inf'
redis_max_change_id = "+inf" if max_change_id < 0 else str(max_change_id)
async with self.get_connection() as redis: async with self.get_connection() as redis:
changed_elements: Dict[str, List[bytes]] = defaultdict(list) # lua script that returns gets all element_ids from change_id_cache_key
deleted_elements: List[str] = [] # and then uses each element_id on full_data or restricted_data.
for element_id in await redis.zrangebyscore(self.get_change_id_cache_key(), min=change_id): # It returns a list where the odd values are the change_id and the
if element_id.startswith(b'_config'): # even values the element as json. The function wait_make_dict creates
continue # a python dict from the returned list.
element_json = await redis.hget(self.get_full_data_cache_key(), element_id) # Optional[bytes] elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(redis.eval(
if element_json is None: """
# The element is not in the cache. It has to be deleted. -- Get change ids of changed elements
deleted_elements.append(element_id) local element_ids = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2])
else:
collection_string, id = split_element_id(element_id) -- Save elements in array. Rotate element_id and element_json
changed_elements[collection_string].append(element_json) local elements = {}
return changed_elements, deleted_elements for _, element_id in pairs(element_ids) do
table.insert(elements, element_id)
table.insert(elements, redis.call('hget', KEYS[2], element_id))
end
return elements
""",
keys=[self.get_change_id_cache_key(), cache_key],
args=[change_id, redis_max_change_id]))
for element_id, element_json in elements.items():
if element_id.startswith(b'_config'):
# Ignore config values from the change_id cache key
continue
if element_json is None:
# The element is not in the cache. It has to be deleted.
deleted_elements.append(element_id.decode())
else:
collection_string, id = split_element_id(element_id)
changed_elements[collection_string].append(element_json)
return changed_elements, deleted_elements
async def del_restricted_data(self, user_id: int) -> None: async def del_restricted_data(self, user_id: int) -> None:
""" """
@ -284,15 +310,16 @@ class RedisCacheProvider(BaseCacheProvider):
Returns False when the lock was already set. Returns False when the lock was already set.
""" """
# TODO: Improve lock. See: https://redis.io/topics/distlock
async with self.get_connection() as redis: async with self.get_connection() as redis:
return await redis.hsetnx("lock_{}".format(lock_name), self.lock_key, 1) return await redis.setnx("{}lock_{}".format(self.prefix, lock_name), 1)
async def get_lock(self, lock_name: str) -> bool: async def get_lock(self, lock_name: str) -> bool:
""" """
Returns True, when the lock for the restricted_data of an user is set. Else False. Returns True, when the lock for the restricted_data of an user is set. Else False.
""" """
async with self.get_connection() as redis: async with self.get_connection() as redis:
return await redis.hget("lock_{}".format(lock_name), self.lock_key) return await redis.get("{}lock_{}".format(self.prefix, lock_name))
async def del_lock(self, lock_name: str) -> None: async def del_lock(self, lock_name: str) -> None:
""" """
@ -300,7 +327,7 @@ class RedisCacheProvider(BaseCacheProvider):
lock is not set. lock is not set.
""" """
async with self.get_connection() as redis: async with self.get_connection() as redis:
await redis.hdel("lock_{}".format(lock_name), self.lock_key) await redis.delete("{}lock_{}".format(self.prefix, lock_name))
async def get_change_id_user(self, user_id: int) -> Optional[int]: async def get_change_id_user(self, user_id: int) -> Optional[int]:
""" """
@ -396,14 +423,19 @@ class MemmoryCacheProvider(BaseCacheProvider):
except KeyError: except KeyError:
pass pass
async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None: async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int:
element_ids = list(element_ids) element_ids = list(element_ids)
try:
change_id = (await self.get_current_change_id())[0][1] + 1
except IndexError:
change_id = default_change_id
for element_id in element_ids: for element_id in element_ids:
if change_id in self.change_id_data: if change_id in self.change_id_data:
self.change_id_data[change_id].add(element_id) self.change_id_data[change_id].add(element_id)
else: else:
self.change_id_data[change_id] = {element_id} self.change_id_data[change_id] = {element_id}
return change_id
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]: async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
if user_id is None: if user_id is None:
@ -418,7 +450,10 @@ class MemmoryCacheProvider(BaseCacheProvider):
return value.encode() if value is not None else None return value.encode() if value is not None else None
async def get_data_since( async def get_data_since(
self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]: self,
change_id: int,
user_id: Optional[int] = None,
max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]:
changed_elements: Dict[str, List[bytes]] = defaultdict(list) changed_elements: Dict[str, List[bytes]] = defaultdict(list)
deleted_elements: List[str] = [] deleted_elements: List[str] = []
if user_id is None: if user_id is None:
@ -427,7 +462,7 @@ class MemmoryCacheProvider(BaseCacheProvider):
cache_dict = self.restricted_data.get(user_id, {}) cache_dict = self.restricted_data.get(user_id, {})
for data_change_id, element_ids in self.change_id_data.items(): for data_change_id, element_ids in self.change_id_data.items():
if data_change_id < change_id: if data_change_id < change_id or (max_change_id > -1 and data_change_id > max_change_id):
continue continue
for element_id in element_ids: for element_id in element_ids:
element_json = cache_dict.get(element_id, None) element_json = cache_dict.get(element_id, None)
@ -530,3 +565,28 @@ def get_all_cachables() -> List[Cachable]:
continue continue
out.extend(get_startup_elements()) out.extend(get_startup_elements())
return out return out
lua_script_change_data = """
-- Generate a new change_id
local tmp = redis.call('zrevrangebyscore', KEYS[1], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
local change_id
if next(tmp) == nil then
-- The key does not exist
change_id = ARGV[1]
else
change_id = tmp[2] + 1
end
-- Add elements to sorted set
local count = 2
while ARGV[count] do
redis.call('zadd', KEYS[1], change_id, ARGV[count])
count = count + 1
end
-- Set lowest_change_id if it does not exist
redis.call('zadd', KEYS[1], 'NX', change_id, '_config:lowest_change_id')
return change_id
"""

View File

@ -170,7 +170,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
""" """
change_id = event['change_id'] change_id = event['change_id']
output = [] output = []
changed_elements, deleted_elements = await element_cache.get_restricted_data(self.scope['user'], change_id) changed_elements, deleted_elements = await element_cache.get_restricted_data(self.scope['user'], change_id, max_change_id=change_id)
for collection_string, elements in changed_elements.items(): for collection_string, elements in changed_elements.items():
for element in elements: for element in elements:
output.append(format_for_autoupdate( output.append(format_for_autoupdate(

View File

@ -42,7 +42,7 @@ DATABASES = {
'ENGINE': 'django.db.backends.sqlite3', 'ENGINE': 'django.db.backends.sqlite3',
} }
} }
REDIS_ADDRESS = "redis://127.0.0.1"
SESSION_ENGINE = "django.contrib.sessions.backends.cache" SESSION_ENGINE = "django.contrib.sessions.backends.cache"
# Internationalization # Internationalization