From f7cdfb7c02af6bb166090e813efba176f2253487 Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Tue, 3 Dec 2019 08:25:48 +0100 Subject: [PATCH] Locking service and locks the history build process (fixes #4039) --- openslides/core/models.py | 38 +++++++------ openslides/utils/cache.py | 7 +-- openslides/utils/cache_providers.py | 49 ----------------- openslides/utils/locking.py | 83 +++++++++++++++++++++++++++++ tests/unit/utils/cache_provider.py | 11 ++-- 5 files changed, 112 insertions(+), 76 deletions(-) create mode 100644 openslides/utils/locking.py diff --git a/openslides/core/models.py b/openslides/core/models.py index c2324609d..ab8697a7e 100644 --- a/openslides/core/models.py +++ b/openslides/core/models.py @@ -6,6 +6,7 @@ from jsonfield import JSONField from ..utils.autoupdate import Element from ..utils.cache import element_cache, get_element_id +from ..utils.locking import locking from ..utils.models import SET_NULL_AND_AUTOUPDATE, RESTModelMixin from .access_permissions import ( ConfigAccessPermissions, @@ -284,22 +285,27 @@ class HistoryManager(models.Manager): """ Method to add all cachables to the history. """ - # TODO: Add lock to prevent multiple history builds at once. See #4039. - instances = None - if self.all().count() == 0: - elements = [] - all_full_data = async_to_sync(element_cache.get_all_data_list)() - for collection_string, data in all_full_data.items(): - for full_data in data: - elements.append( - Element( - id=full_data["id"], - collection_string=collection_string, - full_data=full_data, - ) - ) - instances = self.add_elements(elements) - return instances + async_to_sync(self.async_build_history)() + + async def async_build_history(self): + lock_name = "build_cache" + if await locking.set(lock_name): + try: + if self.all().count() == 0: + elements = [] + all_full_data = await element_cache.get_all_data_list() + for collection_string, data in all_full_data.items(): + for full_data in data: + elements.append( + Element( + id=full_data["id"], + collection_string=collection_string, + full_data=full_data, + ) + ) + self.add_elements(elements) + finally: + await locking.delete(lock_name) class History(models.Model): diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index f6d7e4ee6..59bd2d2d5 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -14,6 +14,7 @@ from .cache_providers import ( MemoryCacheProvider, RedisCacheProvider, ) +from .locking import locking from .redis import use_redis from .schema_version import SchemaVersion, schema_version_handler from .utils import get_element_id, split_element_id @@ -128,7 +129,7 @@ class ElementCache: ) -> None: lock_name = "build_cache" # Set a lock so only one process builds the cache - if await self.cache_provider.set_lock(lock_name): + if await locking.set(lock_name): logger.info("Building up the cache data...") try: mapping = {} @@ -157,10 +158,10 @@ class ElementCache: await self.cache_provider.set_schema_version(schema_version) logger.info("Done saving the cache data.") finally: - await self.cache_provider.del_lock(lock_name) + await locking.delete(lock_name) else: logger.info("Wait for another process to build up the cache...") - while await self.cache_provider.get_lock(lock_name): + while await locking.get(lock_name): sleep(0.01) logger.info("Cache is ready (built by another process).") diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 84adea402..156d9945b 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -66,15 +66,6 @@ class ElementCacheProvider(Protocol): ) -> Tuple[Dict[str, List[bytes]], List[str]]: ... - async def set_lock(self, lock_name: str) -> bool: - ... - - async def get_lock(self, lock_name: str) -> bool: - ... - - async def del_lock(self, lock_name: str) -> None: - ... - async def get_current_change_id(self) -> int: ... @@ -250,7 +241,6 @@ class RedisCacheProvider: ) -> None: """ Deletes the full_data_cache and write new data in it. Clears the change id key. - Does not clear locks. """ async with get_connection() as redis: tr = redis.multi_exec() @@ -371,30 +361,6 @@ class RedisCacheProvider: changed_elements[collection_string].append(element_json) return changed_elements, deleted_elements - async def set_lock(self, lock_name: str) -> bool: - """ - Tries to sets a lock. - - Returns True when the lock could be set and False, if it was already set. - """ - # TODO: Improve lock. See: https://redis.io/topics/distlock - async with get_connection() as redis: - return await redis.setnx(f"lock_{lock_name}", 1) - - async def get_lock(self, lock_name: str) -> bool: - """ - Returns True, when the lock is set. Else False. - """ - async with get_connection() as redis: - return await redis.get(f"lock_{lock_name}") - - async def del_lock(self, lock_name: str) -> None: - """ - Deletes the lock. Does nothing when the lock is not set. - """ - async with get_connection() as redis: - await redis.delete(f"lock_{lock_name}") - @ensure_cache_wrapper() async def get_current_change_id(self) -> int: """ @@ -585,21 +551,6 @@ class MemoryCacheProvider: changed_elements[collection_string].append(element_json.encode()) return changed_elements, deleted_elements - async def set_lock(self, lock_name: str) -> bool: - if lock_name in self.locks: - return False - self.locks[lock_name] = "1" - return True - - async def get_lock(self, lock_name: str) -> bool: - return lock_name in self.locks - - async def del_lock(self, lock_name: str) -> None: - try: - del self.locks[lock_name] - except KeyError: - pass - async def get_current_change_id(self) -> int: if self.change_id_data: return max(self.change_id_data.keys()) diff --git a/openslides/utils/locking.py b/openslides/utils/locking.py new file mode 100644 index 000000000..ee7c87f13 --- /dev/null +++ b/openslides/utils/locking.py @@ -0,0 +1,83 @@ +from typing import Dict + +from typing_extensions import Protocol + +from .redis import use_redis + + +if use_redis: + from .redis import get_connection + + +class LockProtocol(Protocol): + async def set(self, lock_name: str) -> bool: + ... + + async def get(self, lock_name: str) -> bool: + ... + + async def delete(self, lock_name: str) -> None: + ... + + +class RedisLockProvider: + lock_prefix = "lock_" + + async def set(self, lock_name: str) -> bool: + """ + Tries to sets a lock. + + Returns True when the lock could be set and False, if it was already set. + """ + # TODO: Improve lock. See: https://redis.io/topics/distlock + async with get_connection() as redis: + return await redis.setnx(f"{self.lock_prefix}{lock_name}", 1) + + async def get(self, lock_name: str) -> bool: + """ + Returns True, when the lock is set. Else False. + """ + async with get_connection() as redis: + return await redis.get(f"{self.lock_prefix}{lock_name}") + + async def delete(self, lock_name: str) -> None: + """ + Deletes the lock. Does nothing when the lock is not set. + """ + async with get_connection() as redis: + await redis.delete(f"{self.lock_prefix}{lock_name}") + + +class MemoryLockProvider: + def __init__(self) -> None: + self.locks: Dict[str, str] = {} + + async def set(self, lock_name: str) -> bool: + if lock_name in self.locks: + return False + self.locks[lock_name] = "1" + return True + + async def get(self, lock_name: str) -> bool: + return lock_name in self.locks + + async def delete(self, lock_name: str) -> None: + try: + del self.locks[lock_name] + except KeyError: + pass + + +def load_lock_provider() -> LockProtocol: + """ + Generates an lock provider singleton. + """ + if use_redis: + lock_provider: LockProtocol = RedisLockProvider() + else: + lock_provider = MemoryLockProvider() + + return lock_provider + + +locking = load_lock_provider() diff --git a/tests/unit/utils/cache_provider.py b/tests/unit/utils/cache_provider.py index e26dbd788..1bd6a6caa 100644 --- a/tests/unit/utils/cache_provider.py +++ b/tests/unit/utils/cache_provider.py @@ -1,4 +1,3 @@ -import asyncio from typing import Any, Callable, Dict, List from openslides.utils.cache_providers import Cachable, MemoryCacheProvider @@ -92,12 +91,8 @@ class TTestCacheProvider(MemoryCacheProvider): """ CacheProvider simular to the MemoryCacheProvider with special methods for testing. + + Currently just a dummy for future extensions. """ - async def del_lock_after_wait( - self, lock_name: str, future: asyncio.Future = None - ) -> None: - async def set_future() -> None: - await self.del_lock(lock_name) - - asyncio.ensure_future(set_future()) + pass