Merge pull request #3925 from ostcar/cache_improvements
Improve redis cache
This commit is contained in:
commit
1ba3af968d
@ -18,7 +18,7 @@ matrix:
|
|||||||
- flake8 openslides tests
|
- flake8 openslides tests
|
||||||
- isort --check-only --diff --recursive openslides tests
|
- isort --check-only --diff --recursive openslides tests
|
||||||
- python -m mypy openslides/
|
- python -m mypy openslides/
|
||||||
- pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=76
|
- pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=75
|
||||||
|
|
||||||
- language: python
|
- language: python
|
||||||
cache:
|
cache:
|
||||||
@ -35,7 +35,7 @@ matrix:
|
|||||||
- flake8 openslides tests
|
- flake8 openslides tests
|
||||||
- isort --check-only --diff --recursive openslides tests
|
- isort --check-only --diff --recursive openslides tests
|
||||||
- python -m mypy openslides/
|
- python -m mypy openslides/
|
||||||
- pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=76
|
- pytest tests/old/ tests/integration/ tests/unit/ --cov --cov-fail-under=75
|
||||||
|
|
||||||
- language: node_js
|
- language: node_js
|
||||||
node_js:
|
node_js:
|
||||||
|
@ -77,7 +77,8 @@ class ElementCache:
|
|||||||
|
|
||||||
# Start time is used as first change_id if there is non in redis
|
# Start time is used as first change_id if there is non in redis
|
||||||
if start_time is None:
|
if start_time is None:
|
||||||
start_time = int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds())
|
# Use the miliseconds (rounted) since the 2016-02-29.
|
||||||
|
start_time = int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000
|
||||||
self.start_time = start_time
|
self.start_time = start_time
|
||||||
|
|
||||||
# Contains Futures to controll, that only one client updates the restricted_data.
|
# Contains Futures to controll, that only one client updates the restricted_data.
|
||||||
@ -151,13 +152,7 @@ class ElementCache:
|
|||||||
if deleted_elements:
|
if deleted_elements:
|
||||||
await self.cache_provider.del_elements(deleted_elements)
|
await self.cache_provider.del_elements(deleted_elements)
|
||||||
|
|
||||||
# TODO: The provider has to define the new change_id with lua. In other
|
return await self.cache_provider.add_changed_elements(self.start_time + 1, elements.keys())
|
||||||
# case it is possible, that two changes get the same id (which
|
|
||||||
# would not be a big problem).
|
|
||||||
change_id = await self.get_next_change_id()
|
|
||||||
|
|
||||||
await self.cache_provider.add_changed_elements(change_id, elements.keys())
|
|
||||||
return change_id
|
|
||||||
|
|
||||||
async def get_all_full_data(self) -> Dict[str, List[Dict[str, Any]]]:
|
async def get_all_full_data(self) -> Dict[str, List[Dict[str, Any]]]:
|
||||||
"""
|
"""
|
||||||
@ -174,9 +169,10 @@ class ElementCache:
|
|||||||
return dict(out)
|
return dict(out)
|
||||||
|
|
||||||
async def get_full_data(
|
async def get_full_data(
|
||||||
self, change_id: int = 0) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
|
self, change_id: int = 0, max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
|
||||||
"""
|
"""
|
||||||
Returns all full_data since change_id. If it does not exist, it is created.
|
Returns all full_data since change_id until max_change_id (including).
|
||||||
|
max_change_id -1 means the highest change_id.
|
||||||
|
|
||||||
Returns two values inside a tuple. The first value is a dict where the
|
Returns two values inside a tuple. The first value is a dict where the
|
||||||
key is the collection_string and the value is a list of data. The second
|
key is the collection_string and the value is a list of data. The second
|
||||||
@ -202,7 +198,7 @@ class ElementCache:
|
|||||||
"Catch this exception and rerun the method with change_id=0."
|
"Catch this exception and rerun the method with change_id=0."
|
||||||
.format(change_id, lowest_change_id))
|
.format(change_id, lowest_change_id))
|
||||||
|
|
||||||
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id)
|
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, max_change_id=max_change_id)
|
||||||
return (
|
return (
|
||||||
{collection_string: [json.loads(value.decode()) for value in value_list]
|
{collection_string: [json.loads(value.decode()) for value in value_list]
|
||||||
for collection_string, value_list in raw_changed_elements.items()},
|
for collection_string, value_list in raw_changed_elements.items()},
|
||||||
@ -323,7 +319,8 @@ class ElementCache:
|
|||||||
async def get_restricted_data(
|
async def get_restricted_data(
|
||||||
self,
|
self,
|
||||||
user: Optional['CollectionElement'],
|
user: Optional['CollectionElement'],
|
||||||
change_id: int = 0) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
|
change_id: int = 0,
|
||||||
|
max_change_id: int = -1) -> Tuple[Dict[str, List[Dict[str, Any]]], List[str]]:
|
||||||
"""
|
"""
|
||||||
Like get_full_data but with restricted_data for an user.
|
Like get_full_data but with restricted_data for an user.
|
||||||
"""
|
"""
|
||||||
@ -332,7 +329,7 @@ class ElementCache:
|
|||||||
return (await self.get_all_restricted_data(user), [])
|
return (await self.get_all_restricted_data(user), [])
|
||||||
|
|
||||||
if not self.use_restricted_data_cache:
|
if not self.use_restricted_data_cache:
|
||||||
changed_elements, deleted_elements = await self.get_full_data(change_id)
|
changed_elements, deleted_elements = await self.get_full_data(change_id, max_change_id)
|
||||||
restricted_data = {}
|
restricted_data = {}
|
||||||
for collection_string, full_data in changed_elements.items():
|
for collection_string, full_data in changed_elements.items():
|
||||||
restricter = self.cachables[collection_string].restrict_elements
|
restricter = self.cachables[collection_string].restrict_elements
|
||||||
@ -353,7 +350,7 @@ class ElementCache:
|
|||||||
# data, this waits until it is done.
|
# data, this waits until it is done.
|
||||||
await self.update_restricted_data(user)
|
await self.update_restricted_data(user)
|
||||||
|
|
||||||
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user))
|
raw_changed_elements, deleted_elements = await self.cache_provider.get_data_since(change_id, get_user_id(user), max_change_id)
|
||||||
return (
|
return (
|
||||||
{collection_string: [json.loads(value.decode()) for value in value_list]
|
{collection_string: [json.loads(value.decode()) for value in value_list]
|
||||||
for collection_string, value_list in raw_changed_elements.items()},
|
for collection_string, value_list in raw_changed_elements.items()},
|
||||||
@ -371,15 +368,6 @@ class ElementCache:
|
|||||||
# Return the score (second element) of the first (and only) element
|
# Return the score (second element) of the first (and only) element
|
||||||
return value[0][1]
|
return value[0][1]
|
||||||
|
|
||||||
async def get_next_change_id(self) -> int:
|
|
||||||
"""
|
|
||||||
Returns the next change_id.
|
|
||||||
|
|
||||||
Returns the start time in seconds + 1, if there is no change_id in yet.
|
|
||||||
"""
|
|
||||||
current_id = await self.get_current_change_id()
|
|
||||||
return current_id + 1
|
|
||||||
|
|
||||||
async def get_lowest_change_id(self) -> int:
|
async def get_lowest_change_id(self) -> int:
|
||||||
"""
|
"""
|
||||||
Returns the lowest change id.
|
Returns the lowest change id.
|
||||||
|
@ -3,13 +3,11 @@ from typing import (
|
|||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
Any,
|
Any,
|
||||||
Dict,
|
Dict,
|
||||||
Generator,
|
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
Set,
|
Set,
|
||||||
Tuple,
|
Tuple,
|
||||||
Union,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
from django.apps import apps
|
from django.apps import apps
|
||||||
@ -35,22 +33,22 @@ class BaseCacheProvider:
|
|||||||
|
|
||||||
See RedisCacheProvider as reverence implementation.
|
See RedisCacheProvider as reverence implementation.
|
||||||
"""
|
"""
|
||||||
full_data_cache_key = 'full_data_cache'
|
full_data_cache_key = 'full_data'
|
||||||
restricted_user_cache_key = 'restricted_data_cache:{user_id}'
|
restricted_user_cache_key = 'restricted_data:{user_id}'
|
||||||
change_id_cache_key = 'change_id_cache'
|
change_id_cache_key = 'change_id'
|
||||||
lock_key = '_config:updating'
|
prefix = 'element_cache_'
|
||||||
|
|
||||||
def __init__(self, *args: Any) -> None:
|
def __init__(self, *args: Any) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_full_data_cache_key(self) -> str:
|
def get_full_data_cache_key(self) -> str:
|
||||||
return self.full_data_cache_key
|
return "".join((self.prefix, self.full_data_cache_key))
|
||||||
|
|
||||||
def get_restricted_data_cache_key(self, user_id: int) -> str:
|
def get_restricted_data_cache_key(self, user_id: int) -> str:
|
||||||
return self.restricted_user_cache_key.format(user_id=user_id)
|
return "".join((self.prefix, self.restricted_user_cache_key.format(user_id=user_id)))
|
||||||
|
|
||||||
def get_change_id_cache_key(self) -> str:
|
def get_change_id_cache_key(self) -> str:
|
||||||
return self.change_id_cache_key
|
return "".join((self.prefix, self.change_id_cache_key))
|
||||||
|
|
||||||
async def clear_cache(self) -> None:
|
async def clear_cache(self) -> None:
|
||||||
raise NotImplementedError("CacheProvider has to implement the method clear_cache().")
|
raise NotImplementedError("CacheProvider has to implement the method clear_cache().")
|
||||||
@ -67,13 +65,17 @@ class BaseCacheProvider:
|
|||||||
async def del_elements(self, elements: List[str], user_id: Optional[int] = None) -> None:
|
async def del_elements(self, elements: List[str], user_id: Optional[int] = None) -> None:
|
||||||
raise NotImplementedError("CacheProvider has to implement the method del_elements().")
|
raise NotImplementedError("CacheProvider has to implement the method del_elements().")
|
||||||
|
|
||||||
async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None:
|
async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int:
|
||||||
raise NotImplementedError("CacheProvider has to implement the method add_changed_elements().")
|
raise NotImplementedError("CacheProvider has to implement the method add_changed_elements().")
|
||||||
|
|
||||||
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
|
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
|
||||||
raise NotImplementedError("CacheProvider has to implement the method get_all_data().")
|
raise NotImplementedError("CacheProvider has to implement the method get_all_data().")
|
||||||
|
|
||||||
async def get_data_since(self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
async def get_data_since(
|
||||||
|
self,
|
||||||
|
change_id: int,
|
||||||
|
user_id: Optional[int] = None,
|
||||||
|
max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
||||||
raise NotImplementedError("CacheProvider has to implement the method get_data_since().")
|
raise NotImplementedError("CacheProvider has to implement the method get_data_since().")
|
||||||
|
|
||||||
async def get_element(self, element_id: str) -> Optional[bytes]:
|
async def get_element(self, element_id: str) -> Optional[bytes]:
|
||||||
@ -141,17 +143,17 @@ class RedisCacheProvider(BaseCacheProvider):
|
|||||||
Deleted all cache entries created with this element cache.
|
Deleted all cache entries created with this element cache.
|
||||||
"""
|
"""
|
||||||
async with self.get_connection() as redis:
|
async with self.get_connection() as redis:
|
||||||
# TODO: Fix me. Do only delete keys, that are created with this cache.
|
await redis.eval("return redis.call('del', 'fake_key', unpack(redis.call('keys', ARGV[1])))", keys=[], args=["{}*".format(self.prefix)])
|
||||||
await redis.flushall()
|
|
||||||
|
|
||||||
async def reset_full_cache(self, data: Dict[str, str]) -> None:
|
async def reset_full_cache(self, data: Dict[str, str]) -> None:
|
||||||
"""
|
"""
|
||||||
Deletes the cache and write new data in it.
|
Deletes the full_data_cache and write new data in it.
|
||||||
"""
|
"""
|
||||||
# TODO: lua or transaction
|
|
||||||
async with self.get_connection() as redis:
|
async with self.get_connection() as redis:
|
||||||
await redis.delete(self.get_full_data_cache_key())
|
tr = redis.multi_exec()
|
||||||
await redis.hmset_dict(self.get_full_data_cache_key(), data)
|
tr.delete(self.get_full_data_cache_key())
|
||||||
|
tr.hmset_dict(self.get_full_data_cache_key(), data)
|
||||||
|
await tr.execute()
|
||||||
|
|
||||||
async def data_exists(self, user_id: Optional[int] = None) -> bool:
|
async def data_exists(self, user_id: Optional[int] = None) -> bool:
|
||||||
"""
|
"""
|
||||||
@ -197,25 +199,18 @@ class RedisCacheProvider(BaseCacheProvider):
|
|||||||
cache_key,
|
cache_key,
|
||||||
*elements)
|
*elements)
|
||||||
|
|
||||||
async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None:
|
async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int:
|
||||||
"""
|
"""
|
||||||
Saves which elements are change with a change_id.
|
Saves which elements are change with a change_id.
|
||||||
|
|
||||||
args has to be an even iterable. The odd values have to be a change id (int) and the
|
Generates and returns the change_id.
|
||||||
even values have to be element_ids.
|
|
||||||
"""
|
"""
|
||||||
def zadd_args(change_id: int) -> Generator[Union[int, str], None, None]:
|
|
||||||
"""
|
|
||||||
Small helper to generates the arguments for the redis command zadd.
|
|
||||||
"""
|
|
||||||
for element_id in element_ids:
|
|
||||||
yield change_id
|
|
||||||
yield element_id
|
|
||||||
|
|
||||||
async with self.get_connection() as redis:
|
async with self.get_connection() as redis:
|
||||||
await redis.zadd(self.get_change_id_cache_key(), *zadd_args(change_id))
|
return int(await redis.eval(
|
||||||
# Saves the lowest_change_id if it does not exist
|
lua_script_change_data,
|
||||||
await redis.zadd(self.get_change_id_cache_key(), change_id, '_config:lowest_change_id', exist='ZSET_IF_NOT_EXIST')
|
keys=[self.get_change_id_cache_key()],
|
||||||
|
args=[default_change_id, *element_ids]
|
||||||
|
))
|
||||||
|
|
||||||
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
|
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
|
||||||
"""
|
"""
|
||||||
@ -242,7 +237,11 @@ class RedisCacheProvider(BaseCacheProvider):
|
|||||||
self.get_full_data_cache_key(),
|
self.get_full_data_cache_key(),
|
||||||
element_id)
|
element_id)
|
||||||
|
|
||||||
async def get_data_since(self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
async def get_data_since(
|
||||||
|
self,
|
||||||
|
change_id: int,
|
||||||
|
user_id: Optional[int] = None,
|
||||||
|
max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
||||||
"""
|
"""
|
||||||
Returns all elements since a change_id.
|
Returns all elements since a change_id.
|
||||||
|
|
||||||
@ -253,17 +252,44 @@ class RedisCacheProvider(BaseCacheProvider):
|
|||||||
if user_id is None, the full_data is returned. If user_id is an int, the restricted_data
|
if user_id is None, the full_data is returned. If user_id is an int, the restricted_data
|
||||||
for an user is used. 0 is for the anonymous user.
|
for an user is used. 0 is for the anonymous user.
|
||||||
"""
|
"""
|
||||||
# TODO: rewrite with lua to get all elements with one request
|
|
||||||
async with self.get_connection() as redis:
|
|
||||||
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
|
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
|
||||||
deleted_elements: List[str] = []
|
deleted_elements: List[str] = []
|
||||||
for element_id in await redis.zrangebyscore(self.get_change_id_cache_key(), min=change_id):
|
if user_id is None:
|
||||||
|
cache_key = self.get_full_data_cache_key()
|
||||||
|
else:
|
||||||
|
cache_key = self.get_restricted_data_cache_key(user_id)
|
||||||
|
|
||||||
|
# Convert max_change_id to a string. If its negative, use the string '+inf'
|
||||||
|
redis_max_change_id = "+inf" if max_change_id < 0 else str(max_change_id)
|
||||||
|
async with self.get_connection() as redis:
|
||||||
|
# lua script that returns gets all element_ids from change_id_cache_key
|
||||||
|
# and then uses each element_id on full_data or restricted_data.
|
||||||
|
# It returns a list where the odd values are the change_id and the
|
||||||
|
# even values the element as json. The function wait_make_dict creates
|
||||||
|
# a python dict from the returned list.
|
||||||
|
elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(redis.eval(
|
||||||
|
"""
|
||||||
|
-- Get change ids of changed elements
|
||||||
|
local element_ids = redis.call('zrangebyscore', KEYS[1], ARGV[1], ARGV[2])
|
||||||
|
|
||||||
|
-- Save elements in array. Rotate element_id and element_json
|
||||||
|
local elements = {}
|
||||||
|
for _, element_id in pairs(element_ids) do
|
||||||
|
table.insert(elements, element_id)
|
||||||
|
table.insert(elements, redis.call('hget', KEYS[2], element_id))
|
||||||
|
end
|
||||||
|
return elements
|
||||||
|
""",
|
||||||
|
keys=[self.get_change_id_cache_key(), cache_key],
|
||||||
|
args=[change_id, redis_max_change_id]))
|
||||||
|
|
||||||
|
for element_id, element_json in elements.items():
|
||||||
if element_id.startswith(b'_config'):
|
if element_id.startswith(b'_config'):
|
||||||
|
# Ignore config values from the change_id cache key
|
||||||
continue
|
continue
|
||||||
element_json = await redis.hget(self.get_full_data_cache_key(), element_id) # Optional[bytes]
|
|
||||||
if element_json is None:
|
if element_json is None:
|
||||||
# The element is not in the cache. It has to be deleted.
|
# The element is not in the cache. It has to be deleted.
|
||||||
deleted_elements.append(element_id)
|
deleted_elements.append(element_id.decode())
|
||||||
else:
|
else:
|
||||||
collection_string, id = split_element_id(element_id)
|
collection_string, id = split_element_id(element_id)
|
||||||
changed_elements[collection_string].append(element_json)
|
changed_elements[collection_string].append(element_json)
|
||||||
@ -284,15 +310,16 @@ class RedisCacheProvider(BaseCacheProvider):
|
|||||||
|
|
||||||
Returns False when the lock was already set.
|
Returns False when the lock was already set.
|
||||||
"""
|
"""
|
||||||
|
# TODO: Improve lock. See: https://redis.io/topics/distlock
|
||||||
async with self.get_connection() as redis:
|
async with self.get_connection() as redis:
|
||||||
return await redis.hsetnx("lock_{}".format(lock_name), self.lock_key, 1)
|
return await redis.setnx("{}lock_{}".format(self.prefix, lock_name), 1)
|
||||||
|
|
||||||
async def get_lock(self, lock_name: str) -> bool:
|
async def get_lock(self, lock_name: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Returns True, when the lock for the restricted_data of an user is set. Else False.
|
Returns True, when the lock for the restricted_data of an user is set. Else False.
|
||||||
"""
|
"""
|
||||||
async with self.get_connection() as redis:
|
async with self.get_connection() as redis:
|
||||||
return await redis.hget("lock_{}".format(lock_name), self.lock_key)
|
return await redis.get("{}lock_{}".format(self.prefix, lock_name))
|
||||||
|
|
||||||
async def del_lock(self, lock_name: str) -> None:
|
async def del_lock(self, lock_name: str) -> None:
|
||||||
"""
|
"""
|
||||||
@ -300,7 +327,7 @@ class RedisCacheProvider(BaseCacheProvider):
|
|||||||
lock is not set.
|
lock is not set.
|
||||||
"""
|
"""
|
||||||
async with self.get_connection() as redis:
|
async with self.get_connection() as redis:
|
||||||
await redis.hdel("lock_{}".format(lock_name), self.lock_key)
|
await redis.delete("{}lock_{}".format(self.prefix, lock_name))
|
||||||
|
|
||||||
async def get_change_id_user(self, user_id: int) -> Optional[int]:
|
async def get_change_id_user(self, user_id: int) -> Optional[int]:
|
||||||
"""
|
"""
|
||||||
@ -396,14 +423,19 @@ class MemmoryCacheProvider(BaseCacheProvider):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def add_changed_elements(self, change_id: int, element_ids: Iterable[str]) -> None:
|
async def add_changed_elements(self, default_change_id: int, element_ids: Iterable[str]) -> int:
|
||||||
element_ids = list(element_ids)
|
element_ids = list(element_ids)
|
||||||
|
try:
|
||||||
|
change_id = (await self.get_current_change_id())[0][1] + 1
|
||||||
|
except IndexError:
|
||||||
|
change_id = default_change_id
|
||||||
|
|
||||||
for element_id in element_ids:
|
for element_id in element_ids:
|
||||||
if change_id in self.change_id_data:
|
if change_id in self.change_id_data:
|
||||||
self.change_id_data[change_id].add(element_id)
|
self.change_id_data[change_id].add(element_id)
|
||||||
else:
|
else:
|
||||||
self.change_id_data[change_id] = {element_id}
|
self.change_id_data[change_id] = {element_id}
|
||||||
|
return change_id
|
||||||
|
|
||||||
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
|
async def get_all_data(self, user_id: Optional[int] = None) -> Dict[bytes, bytes]:
|
||||||
if user_id is None:
|
if user_id is None:
|
||||||
@ -418,7 +450,10 @@ class MemmoryCacheProvider(BaseCacheProvider):
|
|||||||
return value.encode() if value is not None else None
|
return value.encode() if value is not None else None
|
||||||
|
|
||||||
async def get_data_since(
|
async def get_data_since(
|
||||||
self, change_id: int, user_id: Optional[int] = None) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
self,
|
||||||
|
change_id: int,
|
||||||
|
user_id: Optional[int] = None,
|
||||||
|
max_change_id: int = -1) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
||||||
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
|
changed_elements: Dict[str, List[bytes]] = defaultdict(list)
|
||||||
deleted_elements: List[str] = []
|
deleted_elements: List[str] = []
|
||||||
if user_id is None:
|
if user_id is None:
|
||||||
@ -427,7 +462,7 @@ class MemmoryCacheProvider(BaseCacheProvider):
|
|||||||
cache_dict = self.restricted_data.get(user_id, {})
|
cache_dict = self.restricted_data.get(user_id, {})
|
||||||
|
|
||||||
for data_change_id, element_ids in self.change_id_data.items():
|
for data_change_id, element_ids in self.change_id_data.items():
|
||||||
if data_change_id < change_id:
|
if data_change_id < change_id or (max_change_id > -1 and data_change_id > max_change_id):
|
||||||
continue
|
continue
|
||||||
for element_id in element_ids:
|
for element_id in element_ids:
|
||||||
element_json = cache_dict.get(element_id, None)
|
element_json = cache_dict.get(element_id, None)
|
||||||
@ -530,3 +565,28 @@ def get_all_cachables() -> List[Cachable]:
|
|||||||
continue
|
continue
|
||||||
out.extend(get_startup_elements())
|
out.extend(get_startup_elements())
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
lua_script_change_data = """
|
||||||
|
-- Generate a new change_id
|
||||||
|
local tmp = redis.call('zrevrangebyscore', KEYS[1], '+inf', '-inf', 'WITHSCORES', 'LIMIT', 0, 1)
|
||||||
|
local change_id
|
||||||
|
if next(tmp) == nil then
|
||||||
|
-- The key does not exist
|
||||||
|
change_id = ARGV[1]
|
||||||
|
else
|
||||||
|
change_id = tmp[2] + 1
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Add elements to sorted set
|
||||||
|
local count = 2
|
||||||
|
while ARGV[count] do
|
||||||
|
redis.call('zadd', KEYS[1], change_id, ARGV[count])
|
||||||
|
count = count + 1
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Set lowest_change_id if it does not exist
|
||||||
|
redis.call('zadd', KEYS[1], 'NX', change_id, '_config:lowest_change_id')
|
||||||
|
|
||||||
|
return change_id
|
||||||
|
"""
|
||||||
|
@ -170,7 +170,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
|||||||
"""
|
"""
|
||||||
change_id = event['change_id']
|
change_id = event['change_id']
|
||||||
output = []
|
output = []
|
||||||
changed_elements, deleted_elements = await element_cache.get_restricted_data(self.scope['user'], change_id)
|
changed_elements, deleted_elements = await element_cache.get_restricted_data(self.scope['user'], change_id, max_change_id=change_id)
|
||||||
for collection_string, elements in changed_elements.items():
|
for collection_string, elements in changed_elements.items():
|
||||||
for element in elements:
|
for element in elements:
|
||||||
output.append(format_for_autoupdate(
|
output.append(format_for_autoupdate(
|
||||||
|
@ -42,7 +42,7 @@ DATABASES = {
|
|||||||
'ENGINE': 'django.db.backends.sqlite3',
|
'ENGINE': 'django.db.backends.sqlite3',
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
REDIS_ADDRESS = "redis://127.0.0.1"
|
|
||||||
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
||||||
|
|
||||||
# Internationalization
|
# Internationalization
|
||||||
|
Loading…
Reference in New Issue
Block a user