Merge pull request #4921 from FinnStutzenstein/ensureChangeId

Ensures change id across multiple workers
This commit is contained in:
Finn Stutzenstein 2019-08-19 13:14:07 +02:00 committed by GitHub
commit 7e2c84c2eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 105 additions and 124 deletions

View File

@ -11,7 +11,7 @@ from django.apps import apps
from .cache_providers import (
Cachable,
ElementCacheProvider,
MemmoryCacheProvider,
MemoryCacheProvider,
RedisCacheProvider,
)
from .redis import use_redis
@ -71,19 +71,7 @@ class ElementCache:
self.cache_provider = cache_provider_class(self.async_ensure_cache)
self.cachable_provider = cachable_provider
self._cachables: Optional[Dict[str, Cachable]] = None
self.set_default_change_id(default_change_id)
def set_default_change_id(self, default_change_id: Optional[int] = None) -> None:
"""
Sets the default change id for the cache. Needs to update, if the cache gets generated.
"""
# The current time is used as the first change_id if there is non in redis
if default_change_id is None:
# Use the miliseconds (rounted) since the 2016-02-29.
default_change_id = (
int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()) * 1000
)
self.default_change_id = default_change_id
self.default_change_id: Optional[int] = default_change_id
@property
def cachables(self) -> Dict[str, Cachable]:
@ -151,8 +139,16 @@ class ElementCache:
)
logger.info("Done building the cache data.")
logger.info("Saving cache data into the cache...")
self.set_default_change_id(default_change_id=default_change_id)
await self.cache_provider.reset_full_cache(mapping)
if default_change_id is None:
if self.default_change_id is not None:
default_change_id = self.default_change_id
else:
# Use the miliseconds (rounded) since the 2016-02-29.
default_change_id = int(
(datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()
)
default_change_id *= 1000
await self.cache_provider.reset_full_cache(mapping, default_change_id)
if schema_version:
await self.cache_provider.set_schema_version(schema_version)
logger.info("Done saving the cache data.")
@ -187,7 +183,7 @@ class ElementCache:
deleted_elements.append(element_id)
return await self.cache_provider.add_changed_elements(
changed_elements, deleted_elements, self.default_change_id + 1
changed_elements, deleted_elements
)
async def get_all_data_list(
@ -331,8 +327,7 @@ class ElementCache:
Returns default_change_id if there is no change id yet.
"""
value = await self.cache_provider.get_current_change_id()
return value if value is not None else self.default_change_id
return await self.cache_provider.get_current_change_id()
async def get_lowest_change_id(self) -> int:
"""
@ -340,11 +335,7 @@ class ElementCache:
Raises a RuntimeError if there is no change_id.
"""
value = await self.cache_provider.get_lowest_change_id()
if not value:
raise RuntimeError("There is no known change_id.")
# Return the score (second element) of the first (and only) element
return value
return await self.cache_provider.get_lowest_change_id()
def load_element_cache() -> ElementCache:
@ -354,7 +345,7 @@ def load_element_cache() -> ElementCache:
if use_redis:
cache_provider_class: Type[ElementCacheProvider] = RedisCacheProvider
else:
cache_provider_class = MemmoryCacheProvider
cache_provider_class = MemoryCacheProvider
return ElementCache(cache_provider_class=cache_provider_class)

View File

@ -39,7 +39,9 @@ class ElementCacheProvider(Protocol):
async def clear_cache(self) -> None:
...
async def reset_full_cache(self, data: Dict[str, str]) -> None:
async def reset_full_cache(
self, data: Dict[str, str], default_change_id: int
) -> None:
...
async def data_exists(self) -> bool:
@ -55,10 +57,7 @@ class ElementCacheProvider(Protocol):
...
async def add_changed_elements(
self,
changed_elements: List[str],
deleted_element_ids: List[str],
default_change_id: int,
self, changed_elements: List[str], deleted_element_ids: List[str]
) -> int:
...
@ -76,10 +75,10 @@ class ElementCacheProvider(Protocol):
async def del_lock(self, lock_name: str) -> None:
...
async def get_current_change_id(self) -> Optional[int]:
async def get_current_change_id(self) -> int:
...
async def get_lowest_change_id(self) -> Optional[int]:
async def get_lowest_change_id(self) -> int:
...
async def get_schema_version(self) -> Optional[SchemaVersion]:
@ -127,7 +126,6 @@ class RedisCacheProvider:
full_data_cache_key: str = "full_data"
change_id_cache_key: str = "change_id"
schema_cache_key: str = "schema"
prefix: str = "element_cache_"
# All lua-scripts used by this provider. Every entry is a Tuple (str, bool) with the
# script and an ensure_cache-indicator. If the indicator is True, a short ensure_cache-script
@ -164,36 +162,33 @@ class RedisCacheProvider:
local change_id
if next(tmp) == nil then
-- The key does not exist
change_id = ARGV[1]
return redis.error_reply("cache_reset")
else
change_id = tmp[2] + 1
end
local nc = tonumber(ARGV[2])
local nd = tonumber(ARGV[3])
local nc = tonumber(ARGV[1])
local nd = tonumber(ARGV[2])
local i, max
-- Add changed_elements to the cache and sorted set (the first of the pairs)
if (nc > 0) then
max = 2 + nc
redis.call('hmset', KEYS[1], unpack(ARGV, 4, max + 1))
for i = 4, max, 2 do
max = 1 + nc
redis.call('hmset', KEYS[1], unpack(ARGV, 3, max + 1))
for i = 3, max, 2 do
redis.call('zadd', KEYS[2], change_id, ARGV[i])
end
end
-- Delete deleted_element_ids and add them to sorted set
if (nd > 0) then
max = 3 + nc + nd
redis.call('hdel', KEYS[1], unpack(ARGV, 4 + nc, max))
for i = 4 + nc, max, 1 do
max = 2 + nc + nd
redis.call('hdel', KEYS[1], unpack(ARGV, 3 + nc, max))
for i = 3 + nc, max, 1 do
redis.call('zadd', KEYS[2], change_id, ARGV[i])
end
end
-- Set lowest_change_id if it does not exist
redis.call('zadd', KEYS[2], 'NX', change_id, '_config:lowest_change_id')
return change_id
""",
True,
@ -244,31 +239,27 @@ class RedisCacheProvider:
async def ensure_cache(self) -> None:
await self._ensure_cache()
def get_full_data_cache_key(self) -> str:
return "".join((self.prefix, self.full_data_cache_key))
def get_change_id_cache_key(self) -> str:
return "".join((self.prefix, self.change_id_cache_key))
def get_schema_cache_key(self) -> str:
return "".join((self.prefix, self.schema_cache_key))
async def clear_cache(self) -> None:
"""
Deleted all cache entries created with this element cache.
"""
await self.eval("clear_cache", keys=[], args=[f"{self.prefix}*"])
await self.eval("clear_cache", keys=[], args=["*"])
async def reset_full_cache(self, data: Dict[str, str]) -> None:
async def reset_full_cache(
self, data: Dict[str, str], default_change_id: int
) -> None:
"""
Deletes the full_data_cache and write new data in it. Clears the change id key.
Does not clear locks.
"""
async with get_connection() as redis:
tr = redis.multi_exec()
tr.delete(self.get_change_id_cache_key())
tr.delete(self.get_full_data_cache_key())
tr.hmset_dict(self.get_full_data_cache_key(), data)
tr.delete(self.change_id_cache_key)
tr.delete(self.full_data_cache_key)
tr.hmset_dict(self.full_data_cache_key, data)
tr.zadd(
self.change_id_cache_key, default_change_id, "_config:lowest_change_id"
)
await tr.execute()
async def data_exists(self) -> bool:
@ -276,7 +267,11 @@ class RedisCacheProvider:
Returns True, when there is data in the cache.
"""
async with get_connection() as redis:
return await redis.exists(self.get_full_data_cache_key())
return await redis.exists(self.full_data_cache_key) and bool(
await redis.zrangebyscore(
self.change_id_cache_key, withscores=True, count=1, offset=0
)
)
@ensure_cache_wrapper()
async def get_all_data(self) -> Dict[bytes, bytes]:
@ -284,7 +279,7 @@ class RedisCacheProvider:
Returns all data from the full_data_cache in a mapping from element_id to the element.
"""
return await aioredis.util.wait_make_dict(
self.eval("get_all_data", [self.get_full_data_cache_key()])
self.eval("get_all_data", [self.full_data_cache_key])
)
@ensure_cache_wrapper()
@ -294,7 +289,7 @@ class RedisCacheProvider:
from element_id to the element.
"""
response = await self.eval(
"get_collection_data", [self.get_full_data_cache_key()], [f"{collection}:*"]
"get_collection_data", [self.full_data_cache_key], [f"{collection}:*"]
)
collection_data = {}
@ -310,15 +305,12 @@ class RedisCacheProvider:
Returns one element from the cache. Returns None, when the element does not exist.
"""
return await self.eval(
"get_element_data", [self.get_full_data_cache_key()], [element_id]
"get_element_data", [self.full_data_cache_key], [element_id]
)
@ensure_cache_wrapper()
async def add_changed_elements(
self,
changed_elements: List[str],
deleted_element_ids: List[str],
default_change_id: int,
self, changed_elements: List[str], deleted_element_ids: List[str]
) -> int:
"""
Modified the full_data_cache to insert the changed_elements and removes the
@ -329,9 +321,8 @@ class RedisCacheProvider:
return int(
await self.eval(
"add_changed_elements",
keys=[self.get_full_data_cache_key(), self.get_change_id_cache_key()],
keys=[self.full_data_cache_key, self.change_id_cache_key],
args=[
default_change_id,
len(changed_elements),
len(deleted_element_ids),
*(changed_elements + deleted_element_ids),
@ -363,7 +354,7 @@ class RedisCacheProvider:
elements: Dict[bytes, Optional[bytes]] = await aioredis.util.wait_make_dict(
self.eval(
"get_data_since",
keys=[self.get_full_data_cache_key(), self.get_change_id_cache_key()],
keys=[self.full_data_cache_key, self.change_id_cache_key],
args=[change_id, redis_max_change_id],
)
)
@ -388,48 +379,53 @@ class RedisCacheProvider:
"""
# TODO: Improve lock. See: https://redis.io/topics/distlock
async with get_connection() as redis:
return await redis.setnx(f"{self.prefix}lock_{lock_name}", 1)
return await redis.setnx(f"lock_{lock_name}", 1)
async def get_lock(self, lock_name: str) -> bool:
"""
Returns True, when the lock is set. Else False.
"""
async with get_connection() as redis:
return await redis.get(f"{self.prefix}lock_{lock_name}")
return await redis.get(f"lock_{lock_name}")
async def del_lock(self, lock_name: str) -> None:
"""
Deletes the lock. Does nothing when the lock is not set.
"""
async with get_connection() as redis:
await redis.delete(f"{self.prefix}lock_{lock_name}")
await redis.delete(f"lock_{lock_name}")
async def get_current_change_id(self) -> Optional[int]:
@ensure_cache_wrapper()
async def get_current_change_id(self) -> int:
"""
Get the highest change_id from redis.
"""
async with get_connection() as redis:
value = await redis.zrevrangebyscore(
self.get_change_id_cache_key(), withscores=True, count=1, offset=0
self.change_id_cache_key, withscores=True, count=1, offset=0
)
# Return the score (second element) of the first (and only) element, if exists.
return value[0][1] if value else None
# Return the score (second element) of the first (and only) element, if exists.
if not value:
raise CacheReset()
return value[0][1]
async def get_lowest_change_id(self) -> Optional[int]:
@ensure_cache_wrapper()
async def get_lowest_change_id(self) -> int:
"""
Get the lowest change_id from redis.
Returns None if lowest score does not exist.
"""
async with get_connection() as redis:
return await redis.zscore(
self.get_change_id_cache_key(), "_config:lowest_change_id"
value = await redis.zscore(
self.change_id_cache_key, "_config:lowest_change_id"
)
if not value:
raise CacheReset()
return value
async def get_schema_version(self) -> Optional[SchemaVersion]:
""" Retrieves the schema version of the cache or None, if not existent """
async with get_connection() as redis:
schema_version = await redis.hgetall(self.get_schema_cache_key())
schema_version = await redis.hgetall(self.schema_cache_key)
if not schema_version:
return None
@ -442,7 +438,7 @@ class RedisCacheProvider:
async def set_schema_version(self, schema_version: SchemaVersion) -> None:
""" Sets the schema version for this cache. """
async with get_connection() as redis:
await redis.hmset_dict(self.get_schema_cache_key(), schema_version)
await redis.hmset_dict(self.schema_cache_key, schema_version)
async def eval(
self, script_name: str, keys: List[str] = [], args: List[Any] = []
@ -458,10 +454,7 @@ class RedisCacheProvider:
python, if the lua-script returns a "cache_reset" string as an error response.
"""
hash = self._script_hashes[script_name]
if (
self.scripts[script_name][1]
and not keys[0] == self.get_full_data_cache_key()
):
if self.scripts[script_name][1] and not keys[0] == self.full_data_cache_key:
raise ImproperlyConfigured(
"A script with a ensure_cache prefix must have the full_data cache key as its first key"
)
@ -490,7 +483,7 @@ class RedisCacheProvider:
raise e
class MemmoryCacheProvider:
class MemoryCacheProvider:
"""
CacheProvider for the ElementCache that uses only the memory.
@ -510,6 +503,7 @@ class MemmoryCacheProvider:
self.full_data: Dict[str, str] = {}
self.change_id_data: Dict[int, Set[str]] = {}
self.locks: Dict[str, str] = {}
self.default_change_id: int = -1
async def ensure_cache(self) -> None:
pass
@ -517,12 +511,15 @@ class MemmoryCacheProvider:
async def clear_cache(self) -> None:
self.set_data_dicts()
async def reset_full_cache(self, data: Dict[str, str]) -> None:
async def reset_full_cache(
self, data: Dict[str, str], default_change_id: int
) -> None:
self.change_id_data = {}
self.full_data = data
self.default_change_id = default_change_id
async def data_exists(self) -> bool:
return bool(self.full_data)
return bool(self.full_data) and self.default_change_id >= 0
async def get_all_data(self) -> Dict[bytes, bytes]:
return str_dict_to_bytes(self.full_data)
@ -541,16 +538,9 @@ class MemmoryCacheProvider:
return value.encode() if value is not None else None
async def add_changed_elements(
self,
changed_elements: List[str],
deleted_element_ids: List[str],
default_change_id: int,
self, changed_elements: List[str], deleted_element_ids: List[str]
) -> int:
current_change_id = await self.get_current_change_id()
if current_change_id is None:
change_id = default_change_id
else:
change_id = current_change_id + 1
change_id = await self.get_current_change_id() + 1
for i in range(0, len(changed_elements), 2):
element_id = changed_elements[i]
@ -610,17 +600,14 @@ class MemmoryCacheProvider:
except KeyError:
pass
async def get_current_change_id(self) -> Optional[int]:
change_data = self.change_id_data
if change_data:
return max(change_data.keys())
return None
async def get_current_change_id(self) -> int:
if self.change_id_data:
return max(self.change_id_data.keys())
else:
return await self.get_lowest_change_id()
async def get_lowest_change_id(self) -> Optional[int]:
change_data = self.change_id_data
if change_data:
return min(change_data.keys())
return None
async def get_lowest_change_id(self) -> int:
return self.default_change_id
async def get_schema_version(self) -> Optional[SchemaVersion]:
return None

View File

@ -1,4 +1,5 @@
import os
from typing import cast
import pytest
from asgiref.sync import async_to_sync
@ -7,6 +8,7 @@ from pytest_django.django_compat import is_django_unittest
from pytest_django.plugin import validate_django_db
from openslides.utils.cache import element_cache
from openslides.utils.cache_providers import MemoryCacheProvider
# Set an environment variable to stop the startup command
@ -83,4 +85,4 @@ def reset_cache(request):
element_cache.ensure_cache(reset=True)
# Set constant default change_id
element_cache.set_default_change_id(1)
cast(MemoryCacheProvider, element_cache.cache_provider).default_change_id = 1

View File

@ -39,7 +39,7 @@ async def prepare_element_cache(settings):
[Collection1(), Collection2(), TConfig(), TUser(), TProjector()]
)
element_cache._cachables = None
await element_cache.async_ensure_cache(default_change_id=1)
await element_cache.async_ensure_cache(default_change_id=2)
yield
# Reset the cachable_provider
element_cache.cachable_provider = orig_cachable_provider
@ -332,7 +332,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config):
@pytest.mark.asyncio
async def test_send_get_elements_to_small_change_id(communicator, set_config):
async def test_send_get_elements_too_small_change_id(communicator, set_config):
await set_config("general_system_enable_anonymous", True)
await communicator.connect()

View File

@ -1,7 +1,7 @@
import asyncio
from typing import Any, Callable, Dict, List
from openslides.utils.cache_providers import Cachable, MemmoryCacheProvider
from openslides.utils.cache_providers import Cachable, MemoryCacheProvider
def restrict_elements(elements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
@ -62,9 +62,9 @@ def example_data():
}
class TTestCacheProvider(MemmoryCacheProvider):
class TTestCacheProvider(MemoryCacheProvider):
"""
CacheProvider simular to the MemmoryCacheProvider with special methods for
CacheProvider simular to the MemoryCacheProvider with special methods for
testing.
"""

View File

@ -34,7 +34,6 @@ def element_cache():
default_change_id=0,
)
element_cache.ensure_cache()
element_cache.set_default_change_id(0)
return element_cache
@ -148,13 +147,14 @@ async def test_get_data_since_change_id_0(element_cache):
@pytest.mark.asyncio
async def test_get_data_since_change_id_lower_then_in_redis(element_cache):
async def test_get_data_since_change_id_lower_than_in_redis(element_cache):
element_cache.cache_provider.full_data = {
"app/collection1:1": '{"id": 1, "value": "value1"}',
"app/collection1:2": '{"id": 2, "value": "value2"}',
"app/collection2:1": '{"id": 1, "key": "value1"}',
"app/collection2:2": '{"id": 2, "key": "value2"}',
}
element_cache.cache_provider.default_change_id = 2
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
with pytest.raises(RuntimeError):
await element_cache.get_data_since(None, 1)
@ -196,8 +196,9 @@ async def test_get_data_since_change_id_data_in_db(element_cache):
@pytest.mark.asyncio
async def test_get_gata_since_change_id_data_in_db_empty_change_id(element_cache):
with pytest.raises(RuntimeError):
await element_cache.get_data_since(None, 1)
result = await element_cache.get_data_since(None, 1)
assert result == ({}, [])
@pytest.mark.asyncio
@ -279,8 +280,8 @@ async def test_get_restricted_data_2(element_cache):
@pytest.mark.asyncio
async def test_get_restricted_data_change_id_lower_then_in_redis(element_cache):
element_cache.cache_provider.change_id_data = {2: {"app/collection1:1"}}
async def test_get_restricted_data_change_id_lower_than_in_redis(element_cache):
element_cache.cache_provider.default_change_id = 2
with pytest.raises(RuntimeError):
await element_cache.get_data_since(0, 1)
@ -310,5 +311,5 @@ async def test_lowest_change_id_after_updating_lowest_element(element_cache):
)
second_lowest_change_id = await element_cache.get_lowest_change_id()
assert first_lowest_change_id == 1
assert second_lowest_change_id == 1 # The lowest_change_id should not change
assert first_lowest_change_id == 0
assert second_lowest_change_id == 0 # The lowest_change_id should not change