Ensures change id across multiple workers
This commit is contained in:
parent
eaf4d8180d
commit
d4dc13706f
@ -11,7 +11,7 @@ from django.apps import apps
|
|||||||
from .cache_providers import (
|
from .cache_providers import (
|
||||||
Cachable,
|
Cachable,
|
||||||
ElementCacheProvider,
|
ElementCacheProvider,
|
||||||
MemmoryCacheProvider,
|
MemoryCacheProvider,
|
||||||
RedisCacheProvider,
|
RedisCacheProvider,
|
||||||
)
|
)
|
||||||
from .redis import use_redis
|
from .redis import use_redis
|
||||||
@ -71,19 +71,7 @@ class ElementCache:
|
|||||||
self.cache_provider = cache_provider_class(self.async_ensure_cache)
|
self.cache_provider = cache_provider_class(self.async_ensure_cache)
|
||||||
self.cachable_provider = cachable_provider
|
self.cachable_provider = cachable_provider
|
||||||
self._cachables: Optional[Dict[str, Cachable]] = None
|
self._cachables: Optional[Dict[str, Cachable]] = None
|
||||||
self.set_default_change_id(default_change_id)
|
self.default_change_id: Optional[int] = default_change_id
|
||||||
|
|
||||||
def set_default_change_id(self, default_change_id: Optional[int] = None) -> None:
|
|
||||||
"""
|
|
||||||
Sets the default change id for the cache. Needs to update, if the cache gets generated.
|
|
||||||
"""
|
|
||||||
# The current time is used as the first change_id if there is non in redis
|
|
||||||
if default_change_id is None:
|
|
||||||
# Use the miliseconds (rounted) since the 2016-02-29.
|
|
||||||
default_change_id = (
|
|
||||||
int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000
|
|
||||||
)
|
|
||||||
self.default_change_id = default_change_id
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cachables(self) -> Dict[str, Cachable]:
|
def cachables(self) -> Dict[str, Cachable]:
|
||||||
@ -151,8 +139,16 @@ class ElementCache:
|
|||||||
)
|
)
|
||||||
logger.info("Done building the cache data.")
|
logger.info("Done building the cache data.")
|
||||||
logger.info("Saving cache data into the cache...")
|
logger.info("Saving cache data into the cache...")
|
||||||
self.set_default_change_id(default_change_id=default_change_id)
|
if default_change_id is None:
|
||||||
await self.cache_provider.reset_full_cache(mapping)
|
if self.default_change_id is not None:
|
||||||
|
default_change_id = self.default_change_id
|
||||||
|
else:
|
||||||
|
# Use the miliseconds (rounded) since the 2016-02-29.
|
||||||
|
default_change_id = int(
|
||||||
|
(datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()
|
||||||
|
)
|
||||||
|
default_change_id *= 1000
|
||||||
|
await self.cache_provider.reset_full_cache(mapping, default_change_id)
|
||||||
if schema_version:
|
if schema_version:
|
||||||
await self.cache_provider.set_schema_version(schema_version)
|
await self.cache_provider.set_schema_version(schema_version)
|
||||||
logger.info("Done saving the cache data.")
|
logger.info("Done saving the cache data.")
|
||||||
@ -187,7 +183,7 @@ class ElementCache:
|
|||||||
deleted_elements.append(element_id)
|
deleted_elements.append(element_id)
|
||||||
|
|
||||||
return await self.cache_provider.add_changed_elements(
|
return await self.cache_provider.add_changed_elements(
|
||||||
changed_elements, deleted_elements, self.default_change_id + 1
|
changed_elements, deleted_elements
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_all_data_list(
|
async def get_all_data_list(
|
||||||
@ -331,8 +327,7 @@ class ElementCache:
|
|||||||
|
|
||||||
Returns default_change_id if there is no change id yet.
|
Returns default_change_id if there is no change id yet.
|
||||||
"""
|
"""
|
||||||
value = await self.cache_provider.get_current_change_id()
|
return await self.cache_provider.get_current_change_id()
|
||||||
return value if value is not None else self.default_change_id
|
|
||||||
|
|
||||||
async def get_lowest_change_id(self) -> int:
|
async def get_lowest_change_id(self) -> int:
|
||||||
"""
|
"""
|
||||||
@ -340,11 +335,7 @@ class ElementCache:
|
|||||||
|
|
||||||
Raises a RuntimeError if there is no change_id.
|
Raises a RuntimeError if there is no change_id.
|
||||||
"""
|
"""
|
||||||
value = await self.cache_provider.get_lowest_change_id()
|
return await self.cache_provider.get_lowest_change_id()
|
||||||
if not value:
|
|
||||||
raise RuntimeError("There is no known change_id.")
|
|
||||||
# Return the score (second element) of the first (and only) element
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
def load_element_cache() -> ElementCache:
|
def load_element_cache() -> ElementCache:
|
||||||
@ -354,7 +345,7 @@ def load_element_cache() -> ElementCache:
|
|||||||
if use_redis:
|
if use_redis:
|
||||||
cache_provider_class: Type[ElementCacheProvider] = RedisCacheProvider
|
cache_provider_class: Type[ElementCacheProvider] = RedisCacheProvider
|
||||||
else:
|
else:
|
||||||
cache_provider_class = MemmoryCacheProvider
|
cache_provider_class = MemoryCacheProvider
|
||||||
|
|
||||||
return ElementCache(cache_provider_class=cache_provider_class)
|
return ElementCache(cache_provider_class=cache_provider_class)
|
||||||
|
|
||||||
|
@ -39,7 +39,9 @@ class ElementCacheProvider(Protocol):
|
|||||||
async def clear_cache(self) -> None:
|
async def clear_cache(self) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
async def reset_full_cache(self, data: Dict[str, str]) -> None:
|
async def reset_full_cache(
|
||||||
|
self, data: Dict[str, str], default_change_id: int
|
||||||
|
) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
async def data_exists(self) -> bool:
|
async def data_exists(self) -> bool:
|
||||||
@ -55,10 +57,7 @@ class ElementCacheProvider(Protocol):
|
|||||||
...
|
...
|
||||||
|
|
||||||
async def add_changed_elements(
|
async def add_changed_elements(
|
||||||
self,
|
self, changed_elements: List[str], deleted_element_ids: List[str]
|
||||||
changed_elements: List[str],
|
|
||||||
deleted_element_ids: List[str],
|
|
||||||
default_change_id: int,
|
|
||||||
) -> int:
|
) -> int:
|
||||||
...
|
...
|
||||||
|
|
||||||
@ -76,10 +75,10 @@ class ElementCacheProvider(Protocol):
|
|||||||
async def del_lock(self, lock_name: str) -> None:
|
async def del_lock(self, lock_name: str) -> None:
|
||||||
...
|
...
|
||||||
|
|
||||||
async def get_current_change_id(self) -> Optional[int]:
|
async def get_current_change_id(self) -> int:
|
||||||
...
|
...
|
||||||
|
|
||||||
async def get_lowest_change_id(self) -> Optional[int]:
|
async def get_lowest_change_id(self) -> int:
|
||||||
...
|
...
|
||||||
|
|
||||||
async def get_schema_version(self) -> Optional[SchemaVersion]:
|
async def get_schema_version(self) -> Optional[SchemaVersion]:
|
||||||
@ -127,7 +126,6 @@ class RedisCacheProvider:
|
|||||||
full_data_cache_key: str = "full_data"
|
full_data_cache_key: str = "full_data"
|
||||||
change_id_cache_key: str = "change_id"
|
change_id_cache_key: str = "change_id"
|
||||||
schema_cache_key: str = "schema"
|
schema_cache_key: str = "schema"
|
||||||
prefix: str = "element_cache_"
|
|
||||||
|
|
||||||
# All lua-scripts used by this provider. Every entry is a Tuple (str, bool) with the
|
# All lua-scripts used by this provider. Every entry is a Tuple (str, bool) with the
|
||||||
# script and an ensure_cache-indicator. If the indicator is True, a short ensure_cache-script
|
# script and an ensure_cache-indicator. If the indicator is True, a short ensure_cache-script
|
||||||
@ -164,36 +162,33 @@ class RedisCacheProvider:
|
|||||||
local change_id
|
local change_id
|
||||||
if next(tmp) == nil then
|
if next(tmp) == nil then
|
||||||
-- The key does not exist
|
-- The key does not exist
|
||||||
change_id = ARGV[1]
|
return redis.error_reply("cache_reset")
|
||||||
else
|
else
|
||||||
change_id = tmp[2] + 1
|
change_id = tmp[2] + 1
|
||||||
end
|
end
|
||||||
|
|
||||||
local nc = tonumber(ARGV[2])
|
local nc = tonumber(ARGV[1])
|
||||||
local nd = tonumber(ARGV[3])
|
local nd = tonumber(ARGV[2])
|
||||||
|
|
||||||
local i, max
|
local i, max
|
||||||
-- Add changed_elements to the cache and sorted set (the first of the pairs)
|
-- Add changed_elements to the cache and sorted set (the first of the pairs)
|
||||||
if (nc > 0) then
|
if (nc > 0) then
|
||||||
max = 2 + nc
|
max = 1 + nc
|
||||||
redis.call('hmset', KEYS[1], unpack(ARGV, 4, max + 1))
|
redis.call('hmset', KEYS[1], unpack(ARGV, 3, max + 1))
|
||||||
for i = 4, max, 2 do
|
for i = 3, max, 2 do
|
||||||
redis.call('zadd', KEYS[2], change_id, ARGV[i])
|
redis.call('zadd', KEYS[2], change_id, ARGV[i])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Delete deleted_element_ids and add them to sorted set
|
-- Delete deleted_element_ids and add them to sorted set
|
||||||
if (nd > 0) then
|
if (nd > 0) then
|
||||||
max = 3 + nc + nd
|
max = 2 + nc + nd
|
||||||
redis.call('hdel', KEYS[1], unpack(ARGV, 4 + nc, max))
|
redis.call('hdel', KEYS[1], unpack(ARGV, 3 + nc, max))
|
||||||
for i = 4 + nc, max, 1 do
|
for i = 3 + nc, max, 1 do
|
||||||
redis.call('zadd', KEYS[2], change_id, ARGV[i])
|
redis.call('zadd', KEYS[2], change_id, ARGV[i])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Set lowest_change_id if it does not exist
|
|
||||||
redis.call('zadd', KEYS[2], 'NX', change_id, '_config:lowest_change_id')
|
|
||||||
|
|
||||||
return change_id
|
return change_id
|
||||||
""",
|
""",
|
||||||
True,
|
True,
|
||||||
@ -244,31 +239,27 @@ class RedisCacheProvider:
|
|||||||
async def ensure_cache(self) -> None:
|
async def ensure_cache(self) -> None:
|
||||||
await self._ensure_cache()
|
await self._ensure_cache()
|
||||||
|
|
||||||
def get_full_data_cache_key(self) -> str:
|
|
||||||
return "".join((self.prefix, self.full_data_cache_key))
|
|
||||||
|
|
||||||
def get_change_id_cache_key(self) -> str:
|
|
||||||
return "".join((self.prefix, self.change_id_cache_key))
|
|
||||||
|
|
||||||
def get_schema_cache_key(self) -> str:
|
|
||||||
return "".join((self.prefix, self.schema_cache_key))
|
|
||||||
|
|
||||||
async def clear_cache(self) -> None:
|
async def clear_cache(self) -> None:
|
||||||
"""
|
"""
|
||||||
Deleted all cache entries created with this element cache.
|
Deleted all cache entries created with this element cache.
|
||||||
"""
|
"""
|
||||||
await self.eval("clear_cache", keys=[], args=[f"{self.prefix}*"])
|
await self.eval("clear_cache", keys=[], args=["*"])
|
||||||
|
|
||||||
async def reset_full_cache(self, data: Dict[str, str]) -> None:
|
async def reset_full_cache(
|
||||||
|
self, data: Dict[str, str], default_change_id: int
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Deletes the full_data_cache and write new data in it. Clears the change id key.
|
Deletes the full_data_cache and write new data in it. Clears the change id key.
|
||||||
Does not clear locks.
|
Does not clear locks.
|
||||||
"""
|
"""
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
tr = redis.multi_exec()
|
tr = redis.multi_exec()
|
||||||
tr.delete(self.get_change_id_cache_key())
|
tr.delete(self.change_id_cache_key)
|
||||||
tr.delete(self.get_full_data_cache_key())
|
tr.delete(self.full_data_cache_key)
|
||||||
tr.hmset_dict(self.get_full_data_cache_key(), data)
|
tr.hmset_dict(self.full_data_cache_key, data)
|
||||||
|
tr.zadd(
|
||||||
|
self.change_id_cache_key, default_change_id, "_config:lowest_change_id"
|
||||||
|
)
|
||||||
await tr.execute()
|
await tr.execute()
|
||||||
|
|
||||||
async def data_exists(self) -> bool:
|
async def data_exists(self) -> bool:
|
||||||
@ -276,7 +267,11 @@ class RedisCacheProvider:
|
|||||||
Returns True, when there is data in the cache.
|
Returns True, when there is data in the cache.
|
||||||
"""
|
"""
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
return await redis.exists(self.get_full_data_cache_key())
|
return await redis.exists(self.full_data_cache_key) and bool(
|
||||||
|
await redis.zrangebyscore(
|
||||||
|
self.change_id_cache_key, withscores=True, count=1, offset=0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
@ensure_cache_wrapper()
|
@ensure_cache_wrapper()
|
||||||
async def get_all_data(self) -> Dict[bytes, bytes]:
|
async def get_all_data(self) -> Dict[bytes, bytes]:
|
||||||
@ -284,7 +279,7 @@ class RedisCacheProvider:
|
|||||||
Returns all data from the full_data_cache in a mapping from element_id to the element.
|
Returns all data from the full_data_cache in a mapping from element_id to the element.
|
||||||
"""
|
"""
|
||||||
return await aioredis.util.wait_make_dict(
|
return await aioredis.util.wait_make_dict(
|
||||||
self.eval("get_all_data", [self.get_full_data_cache_key()])
|
self.eval("get_all_data", [self.full_data_cache_key])
|
||||||
)
|
)
|
||||||
|
|
||||||
@ensure_cache_wrapper()
|
@ensure_cache_wrapper()
|
||||||
@ -294,7 +289,7 @@ class RedisCacheProvider:
|
|||||||
from element_id to the element.
|
from element_id to the element.
|
||||||
"""
|
"""
|
||||||
response = await self.eval(
|
response = await self.eval(
|
||||||
"get_collection_data", [self.get_full_data_cache_key()], [f"{collection}:*"]
|
"get_collection_data", [self.full_data_cache_key], [f"{collection}:*"]
|
||||||
)
|
)
|
||||||
|
|
||||||
collection_data = {}
|
collection_data = {}
|
||||||
@ -310,15 +305,12 @@ class RedisCacheProvider:
|
|||||||
Returns one element from the cache. Returns None, when the element does not exist.
|
Returns one element from the cache. Returns None, when the element does not exist.
|
||||||
"""
|
"""
|
||||||
return await self.eval(
|
return await self.eval(
|
||||||
"get_element_data", [self.get_full_data_cache_key()], [element_id]
|
"get_element_data", [self.full_data_cache_key], [element_id]
|
||||||
)
|
)
|
||||||
|
|
||||||
@ensure_cache_wrapper()
|
@ensure_cache_wrapper()
|
||||||
async def add_changed_elements(
|
async def add_changed_elements(
|
||||||
self,
|
self, changed_elements: List[str], deleted_element_ids: List[str]
|
||||||
changed_elements: List[str],
|
|
||||||
deleted_element_ids: List[str],
|
|
||||||
default_change_id: int,
|
|
||||||
) -> int:
|
) -> int:
|
||||||
"""
|
"""
|
||||||
Modified the full_data_cache to insert the changed_elements and removes the
|
Modified the full_data_cache to insert the changed_elements and removes the
|
||||||
@ -329,9 +321,8 @@ class RedisCacheProvider:
|
|||||||
return int(
|
return int(
|
||||||
await self.eval(
|
await self.eval(
|
||||||
"add_changed_elements",
|
"add_changed_elements",
|
||||||
keys=[self.get_full_data_cache_key(), self.get_change_id_cache_key()],
|
keys=[self.full_data_cache_key, self.change_id_cache_key],
|
||||||
args=[
|
args=[
|
||||||
default_change_id,
|
|
||||||
len(changed_elements),
|
len(changed_elements),
|
||||||
len(deleted_element_ids),
|
len(deleted_element_ids),
|
||||||
*(changed_elements + deleted_element_ids),
|
*(changed_elements + deleted_element_ids),
|
||||||
@ -363,7 +354,7 @@ class RedisCacheProvider:
|
|||||||
elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(
|
elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(
|
||||||
self.eval(
|
self.eval(
|
||||||
"get_data_since",
|
"get_data_since",
|
||||||
keys=[self.get_full_data_cache_key(), self.get_change_id_cache_key()],
|
keys=[self.full_data_cache_key, self.change_id_cache_key],
|
||||||
args=[change_id, redis_max_change_id],
|
args=[change_id, redis_max_change_id],
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -388,48 +379,53 @@ class RedisCacheProvider:
|
|||||||
"""
|
"""
|
||||||
# TODO: Improve lock. See: https://redis.io/topics/distlock
|
# TODO: Improve lock. See: https://redis.io/topics/distlock
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
return await redis.setnx(f"{self.prefix}lock_{lock_name}", 1)
|
return await redis.setnx(f"lock_{lock_name}", 1)
|
||||||
|
|
||||||
async def get_lock(self, lock_name: str) -> bool:
|
async def get_lock(self, lock_name: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Returns True, when the lock is set. Else False.
|
Returns True, when the lock is set. Else False.
|
||||||
"""
|
"""
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
return await redis.get(f"{self.prefix}lock_{lock_name}")
|
return await redis.get(f"lock_{lock_name}")
|
||||||
|
|
||||||
async def del_lock(self, lock_name: str) -> None:
|
async def del_lock(self, lock_name: str) -> None:
|
||||||
"""
|
"""
|
||||||
Deletes the lock. Does nothing when the lock is not set.
|
Deletes the lock. Does nothing when the lock is not set.
|
||||||
"""
|
"""
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
await redis.delete(f"{self.prefix}lock_{lock_name}")
|
await redis.delete(f"lock_{lock_name}")
|
||||||
|
|
||||||
async def get_current_change_id(self) -> Optional[int]:
|
@ensure_cache_wrapper()
|
||||||
|
async def get_current_change_id(self) -> int:
|
||||||
"""
|
"""
|
||||||
Get the highest change_id from redis.
|
Get the highest change_id from redis.
|
||||||
"""
|
"""
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
value = await redis.zrevrangebyscore(
|
value = await redis.zrevrangebyscore(
|
||||||
self.get_change_id_cache_key(), withscores=True, count=1, offset=0
|
self.change_id_cache_key, withscores=True, count=1, offset=0
|
||||||
)
|
)
|
||||||
# Return the score (second element) of the first (and only) element, if exists.
|
# Return the score (second element) of the first (and only) element, if exists.
|
||||||
return value[0][1] if value else None
|
if not value:
|
||||||
|
raise CacheReset()
|
||||||
|
return value[0][1]
|
||||||
|
|
||||||
async def get_lowest_change_id(self) -> Optional[int]:
|
@ensure_cache_wrapper()
|
||||||
|
async def get_lowest_change_id(self) -> int:
|
||||||
"""
|
"""
|
||||||
Get the lowest change_id from redis.
|
Get the lowest change_id from redis.
|
||||||
|
|
||||||
Returns None if lowest score does not exist.
|
|
||||||
"""
|
"""
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
return await redis.zscore(
|
value = await redis.zscore(
|
||||||
self.get_change_id_cache_key(), "_config:lowest_change_id"
|
self.change_id_cache_key, "_config:lowest_change_id"
|
||||||
)
|
)
|
||||||
|
if not value:
|
||||||
|
raise CacheReset()
|
||||||
|
return value
|
||||||
|
|
||||||
async def get_schema_version(self) -> Optional[SchemaVersion]:
|
async def get_schema_version(self) -> Optional[SchemaVersion]:
|
||||||
""" Retrieves the schema version of the cache or None, if not existent """
|
""" Retrieves the schema version of the cache or None, if not existent """
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
schema_version = await redis.hgetall(self.get_schema_cache_key())
|
schema_version = await redis.hgetall(self.schema_cache_key)
|
||||||
if not schema_version:
|
if not schema_version:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -442,7 +438,7 @@ class RedisCacheProvider:
|
|||||||
async def set_schema_version(self, schema_version: SchemaVersion) -> None:
|
async def set_schema_version(self, schema_version: SchemaVersion) -> None:
|
||||||
""" Sets the schema version for this cache. """
|
""" Sets the schema version for this cache. """
|
||||||
async with get_connection() as redis:
|
async with get_connection() as redis:
|
||||||
await redis.hmset_dict(self.get_schema_cache_key(), schema_version)
|
await redis.hmset_dict(self.schema_cache_key, schema_version)
|
||||||
|
|
||||||
async def eval(
|
async def eval(
|
||||||
self, script_name: str, keys: List[str] = [], args: List[Any] = []
|
self, script_name: str, keys: List[str] = [], args: List[Any] = []
|
||||||
@ -458,10 +454,7 @@ class RedisCacheProvider:
|
|||||||
python, if the lua-script returns a "cache_reset" string as an error response.
|
python, if the lua-script returns a "cache_reset" string as an error response.
|
||||||
"""
|
"""
|
||||||
hash = self._script_hashes[script_name]
|
hash = self._script_hashes[script_name]
|
||||||
if (
|
if self.scripts[script_name][1] and not keys[0] == self.full_data_cache_key:
|
||||||
self.scripts[script_name][1]
|
|
||||||
and not keys[0] == self.get_full_data_cache_key()
|
|
||||||
):
|
|
||||||
raise ImproperlyConfigured(
|
raise ImproperlyConfigured(
|
||||||
"A script with a ensure_cache prefix must have the full_data cache key as its first key"
|
"A script with a ensure_cache prefix must have the full_data cache key as its first key"
|
||||||
)
|
)
|
||||||
@ -490,7 +483,7 @@ class RedisCacheProvider:
|
|||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
|
||||||
class MemmoryCacheProvider:
|
class MemoryCacheProvider:
|
||||||
"""
|
"""
|
||||||
CacheProvider for the ElementCache that uses only the memory.
|
CacheProvider for the ElementCache that uses only the memory.
|
||||||
|
|
||||||
@ -510,6 +503,7 @@ class MemmoryCacheProvider:
|
|||||||
self.full_data: Dict[str, str] = {}
|
self.full_data: Dict[str, str] = {}
|
||||||
self.change_id_data: Dict[int, Set[str]] = {}
|
self.change_id_data: Dict[int, Set[str]] = {}
|
||||||
self.locks: Dict[str, str] = {}
|
self.locks: Dict[str, str] = {}
|
||||||
|
self.default_change_id: int = -1
|
||||||
|
|
||||||
async def ensure_cache(self) -> None:
|
async def ensure_cache(self) -> None:
|
||||||
pass
|
pass
|
||||||
@ -517,12 +511,15 @@ class MemmoryCacheProvider:
|
|||||||
async def clear_cache(self) -> None:
|
async def clear_cache(self) -> None:
|
||||||
self.set_data_dicts()
|
self.set_data_dicts()
|
||||||
|
|
||||||
async def reset_full_cache(self, data: Dict[str, str]) -> None:
|
async def reset_full_cache(
|
||||||
|
self, data: Dict[str, str], default_change_id: int
|
||||||
|
) -> None:
|
||||||
self.change_id_data = {}
|
self.change_id_data = {}
|
||||||
self.full_data = data
|
self.full_data = data
|
||||||
|
self.default_change_id = default_change_id
|
||||||
|
|
||||||
async def data_exists(self) -> bool:
|
async def data_exists(self) -> bool:
|
||||||
return bool(self.full_data)
|
return bool(self.full_data) and self.default_change_id >= 0
|
||||||
|
|
||||||
async def get_all_data(self) -> Dict[bytes, bytes]:
|
async def get_all_data(self) -> Dict[bytes, bytes]:
|
||||||
return str_dict_to_bytes(self.full_data)
|
return str_dict_to_bytes(self.full_data)
|
||||||
@ -541,16 +538,9 @@ class MemmoryCacheProvider:
|
|||||||
return value.encode() if value is not None else None
|
return value.encode() if value is not None else None
|
||||||
|
|
||||||
async def add_changed_elements(
|
async def add_changed_elements(
|
||||||
self,
|
self, changed_elements: List[str], deleted_element_ids: List[str]
|
||||||
changed_elements: List[str],
|
|
||||||
deleted_element_ids: List[str],
|
|
||||||
default_change_id: int,
|
|
||||||
) -> int:
|
) -> int:
|
||||||
current_change_id = await self.get_current_change_id()
|
change_id = await self.get_current_change_id() + 1
|
||||||
if current_change_id is None:
|
|
||||||
change_id = default_change_id
|
|
||||||
else:
|
|
||||||
change_id = current_change_id + 1
|
|
||||||
|
|
||||||
for i in range(0, len(changed_elements), 2):
|
for i in range(0, len(changed_elements), 2):
|
||||||
element_id = changed_elements[i]
|
element_id = changed_elements[i]
|
||||||
@ -610,17 +600,14 @@ class MemmoryCacheProvider:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def get_current_change_id(self) -> Optional[int]:
|
async def get_current_change_id(self) -> int:
|
||||||
change_data = self.change_id_data
|
if self.change_id_data:
|
||||||
if change_data:
|
return max(self.change_id_data.keys())
|
||||||
return max(change_data.keys())
|
else:
|
||||||
return None
|
return await self.get_lowest_change_id()
|
||||||
|
|
||||||
async def get_lowest_change_id(self) -> Optional[int]:
|
async def get_lowest_change_id(self) -> int:
|
||||||
change_data = self.change_id_data
|
return self.default_change_id
|
||||||
if change_data:
|
|
||||||
return min(change_data.keys())
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def get_schema_version(self) -> Optional[SchemaVersion]:
|
async def get_schema_version(self) -> Optional[SchemaVersion]:
|
||||||
return None
|
return None
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from asgiref.sync import async_to_sync
|
from asgiref.sync import async_to_sync
|
||||||
@ -7,6 +8,7 @@ from pytest_django.django_compat import is_django_unittest
|
|||||||
from pytest_django.plugin import validate_django_db
|
from pytest_django.plugin import validate_django_db
|
||||||
|
|
||||||
from openslides.utils.cache import element_cache
|
from openslides.utils.cache import element_cache
|
||||||
|
from openslides.utils.cache_providers import MemoryCacheProvider
|
||||||
|
|
||||||
|
|
||||||
# Set an environment variable to stop the startup command
|
# Set an environment variable to stop the startup command
|
||||||
@ -83,4 +85,4 @@ def reset_cache(request):
|
|||||||
element_cache.ensure_cache(reset=True)
|
element_cache.ensure_cache(reset=True)
|
||||||
|
|
||||||
# Set constant default change_id
|
# Set constant default change_id
|
||||||
element_cache.set_default_change_id(1)
|
cast(MemoryCacheProvider, element_cache.cache_provider).default_change_id = 1
|
||||||
|
@ -39,7 +39,7 @@ async def prepare_element_cache(settings):
|
|||||||
[Collection1(), Collection2(), TConfig(), TUser(), TProjector()]
|
[Collection1(), Collection2(), TConfig(), TUser(), TProjector()]
|
||||||
)
|
)
|
||||||
element_cache._cachables = None
|
element_cache._cachables = None
|
||||||
await element_cache.async_ensure_cache(default_change_id=1)
|
await element_cache.async_ensure_cache(default_change_id=2)
|
||||||
yield
|
yield
|
||||||
# Reset the cachable_provider
|
# Reset the cachable_provider
|
||||||
element_cache.cachable_provider = orig_cachable_provider
|
element_cache.cachable_provider = orig_cachable_provider
|
||||||
@ -332,7 +332,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_send_get_elements_to_small_change_id(communicator, set_config):
|
async def test_send_get_elements_too_small_change_id(communicator, set_config):
|
||||||
await set_config("general_system_enable_anonymous", True)
|
await set_config("general_system_enable_anonymous", True)
|
||||||
await communicator.connect()
|
await communicator.connect()
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from typing import Any, Callable, Dict, List
|
from typing import Any, Callable, Dict, List
|
||||||
|
|
||||||
from openslides.utils.cache_providers import Cachable, MemmoryCacheProvider
|
from openslides.utils.cache_providers import Cachable, MemoryCacheProvider
|
||||||
|
|
||||||
|
|
||||||
def restrict_elements(elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
def restrict_elements(elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||||
@ -62,9 +62,9 @@ def example_data():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class TTestCacheProvider(MemmoryCacheProvider):
|
class TTestCacheProvider(MemoryCacheProvider):
|
||||||
"""
|
"""
|
||||||
CacheProvider simular to the MemmoryCacheProvider with special methods for
|
CacheProvider simular to the MemoryCacheProvider with special methods for
|
||||||
testing.
|
testing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -34,7 +34,6 @@ def element_cache():
|
|||||||
default_change_id=0,
|
default_change_id=0,
|
||||||
)
|
)
|
||||||
element_cache.ensure_cache()
|
element_cache.ensure_cache()
|
||||||
element_cache.set_default_change_id(0)
|
|
||||||
return element_cache
|
return element_cache
|
||||||
|
|
||||||
|
|
||||||
@ -148,13 +147,14 @@ async def test_get_data_since_change_id_0(element_cache):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_data_since_change_id_lower_then_in_redis(element_cache):
|
async def test_get_data_since_change_id_lower_than_in_redis(element_cache):
|
||||||
element_cache.cache_provider.full_data = {
|
element_cache.cache_provider.full_data = {
|
||||||
"app/collection1:1": '{"id": 1, "value": "value1"}',
|
"app/collection1:1": '{"id": 1, "value": "value1"}',
|
||||||
"app/collection1:2": '{"id": 2, "value": "value2"}',
|
"app/collection1:2": '{"id": 2, "value": "value2"}',
|
||||||
"app/collection2:1": '{"id": 1, "key": "value1"}',
|
"app/collection2:1": '{"id": 1, "key": "value1"}',
|
||||||
"app/collection2:2": '{"id": 2, "key": "value2"}',
|
"app/collection2:2": '{"id": 2, "key": "value2"}',
|
||||||
}
|
}
|
||||||
|
element_cache.cache_provider.default_change_id = 2
|
||||||
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
|
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
|
||||||
with pytest.raises(RuntimeError):
|
with pytest.raises(RuntimeError):
|
||||||
await element_cache.get_data_since(None, 1)
|
await element_cache.get_data_since(None, 1)
|
||||||
@ -196,8 +196,9 @@ async def test_get_data_since_change_id_data_in_db(element_cache):
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache):
|
async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache):
|
||||||
with pytest.raises(RuntimeError):
|
result = await element_cache.get_data_since(None, 1)
|
||||||
await element_cache.get_data_since(None, 1)
|
|
||||||
|
assert result == ({}, [])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -279,8 +280,8 @@ async def test_get_restricted_data_2(element_cache):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_restricted_data_change_id_lower_then_in_redis(element_cache):
|
async def test_get_restricted_data_change_id_lower_than_in_redis(element_cache):
|
||||||
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
|
element_cache.cache_provider.default_change_id = 2
|
||||||
|
|
||||||
with pytest.raises(RuntimeError):
|
with pytest.raises(RuntimeError):
|
||||||
await element_cache.get_data_since(0, 1)
|
await element_cache.get_data_since(0, 1)
|
||||||
@ -310,5 +311,5 @@ async def test_lowest_change_id_after_updating_lowest_element(element_cache):
|
|||||||
)
|
)
|
||||||
second_lowest_change_id = await element_cache.get_lowest_change_id()
|
second_lowest_change_id = await element_cache.get_lowest_change_id()
|
||||||
|
|
||||||
assert first_lowest_change_id == 1
|
assert first_lowest_change_id == 0
|
||||||
assert second_lowest_change_id == 1 # The lowest_change_id should not change
|
assert second_lowest_change_id == 0 # The lowest_change_id should not change
|
||||||
|
Loading…
Reference in New Issue
Block a user