Locking service and locks the history build process (fixes #4039)

This commit is contained in:
FinnStutzenstein 2019-12-03 08:25:48 +01:00
parent 93d9e6b169
commit f7cdfb7c02
5 changed files with 112 additions and 76 deletions

View File

@ -6,6 +6,7 @@ from jsonfield import JSONField
from ..utils.autoupdate import Element from ..utils.autoupdate import Element
from ..utils.cache import element_cache, get_element_id from ..utils.cache import element_cache, get_element_id
from ..utils.locking import locking
from ..utils.models import SET_NULL_AND_AUTOUPDATE, RESTModelMixin from ..utils.models import SET_NULL_AND_AUTOUPDATE, RESTModelMixin
from .access_permissions import ( from .access_permissions import (
ConfigAccessPermissions, ConfigAccessPermissions,
@ -284,11 +285,15 @@ class HistoryManager(models.Manager):
""" """
Method to add all cachables to the history. Method to add all cachables to the history.
""" """
# TODO: Add lock to prevent multiple history builds at once. See #4039. async_to_sync(self.async_build_history)()
instances = None
async def async_build_history(self):
lock_name = "build_cache"
if await locking.set(lock_name):
try:
if self.all().count() == 0: if self.all().count() == 0:
elements = [] elements = []
all_full_data = async_to_sync(element_cache.get_all_data_list)() all_full_data = await element_cache.get_all_data_list()
for collection_string, data in all_full_data.items(): for collection_string, data in all_full_data.items():
for full_data in data: for full_data in data:
elements.append( elements.append(
@ -298,8 +303,9 @@ class HistoryManager(models.Manager):
full_data=full_data, full_data=full_data,
) )
) )
instances = self.add_elements(elements) self.add_elements(elements)
return instances finally:
await locking.delete(lock_name)
class History(models.Model): class History(models.Model):

View File

@ -14,6 +14,7 @@ from .cache_providers import (
MemoryCacheProvider, MemoryCacheProvider,
RedisCacheProvider, RedisCacheProvider,
) )
from .locking import locking
from .redis import use_redis from .redis import use_redis
from .schema_version import SchemaVersion, schema_version_handler from .schema_version import SchemaVersion, schema_version_handler
from .utils import get_element_id, split_element_id from .utils import get_element_id, split_element_id
@ -128,7 +129,7 @@ class ElementCache:
) -> None: ) -> None:
lock_name = "build_cache" lock_name = "build_cache"
# Set a lock so only one process builds the cache # Set a lock so only one process builds the cache
if await self.cache_provider.set_lock(lock_name): if await locking.set(lock_name):
logger.info("Building up the cache data...") logger.info("Building up the cache data...")
try: try:
mapping = {} mapping = {}
@ -157,10 +158,10 @@ class ElementCache:
await self.cache_provider.set_schema_version(schema_version) await self.cache_provider.set_schema_version(schema_version)
logger.info("Done saving the cache data.") logger.info("Done saving the cache data.")
finally: finally:
await self.cache_provider.del_lock(lock_name) await locking.delete(lock_name)
else: else:
logger.info("Wait for another process to build up the cache...") logger.info("Wait for another process to build up the cache...")
while await self.cache_provider.get_lock(lock_name): while await locking.get(lock_name):
sleep(0.01) sleep(0.01)
logger.info("Cache is ready (built by another process).") logger.info("Cache is ready (built by another process).")

View File

@ -66,15 +66,6 @@ class ElementCacheProvider(Protocol):
) -> Tuple[Dict[str, List[bytes]], List[str]]: ) -> Tuple[Dict[str, List[bytes]], List[str]]:
... ...
async def set_lock(self, lock_name: str) -> bool:
...
async def get_lock(self, lock_name: str) -> bool:
...
async def del_lock(self, lock_name: str) -> None:
...
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
... ...
@ -250,7 +241,6 @@ class RedisCacheProvider:
) -> None: ) -> None:
""" """
Deletes the full_data_cache and write new data in it. Clears the change id key. Deletes the full_data_cache and write new data in it. Clears the change id key.
Does not clear locks.
""" """
async with get_connection() as redis: async with get_connection() as redis:
tr = redis.multi_exec() tr = redis.multi_exec()
@ -371,30 +361,6 @@ class RedisCacheProvider:
changed_elements[collection_string].append(element_json) changed_elements[collection_string].append(element_json)
return changed_elements, deleted_elements return changed_elements, deleted_elements
async def set_lock(self, lock_name: str) -> bool:
"""
Tries to sets a lock.
Returns True when the lock could be set and False, if it was already set.
"""
# TODO: Improve lock. See: https://redis.io/topics/distlock
async with get_connection() as redis:
return await redis.setnx(f"lock_{lock_name}", 1)
async def get_lock(self, lock_name: str) -> bool:
"""
Returns True, when the lock is set. Else False.
"""
async with get_connection() as redis:
return await redis.get(f"lock_{lock_name}")
async def del_lock(self, lock_name: str) -> None:
"""
Deletes the lock. Does nothing when the lock is not set.
"""
async with get_connection() as redis:
await redis.delete(f"lock_{lock_name}")
@ensure_cache_wrapper() @ensure_cache_wrapper()
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
""" """
@ -585,21 +551,6 @@ class MemoryCacheProvider:
changed_elements[collection_string].append(element_json.encode()) changed_elements[collection_string].append(element_json.encode())
return changed_elements, deleted_elements return changed_elements, deleted_elements
async def set_lock(self, lock_name: str) -> bool:
if lock_name in self.locks:
return False
self.locks[lock_name] = "1"
return True
async def get_lock(self, lock_name: str) -> bool:
return lock_name in self.locks
async def del_lock(self, lock_name: str) -> None:
try:
del self.locks[lock_name]
except KeyError:
pass
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:
if self.change_id_data: if self.change_id_data:
return max(self.change_id_data.keys()) return max(self.change_id_data.keys())

View File

@ -0,0 +1,83 @@
from typing import Dict
from typing_extensions import Protocol
from .redis import use_redis
if use_redis:
from .redis import get_connection
class LockProtocol(Protocol):
async def set(self, lock_name: str) -> bool:
...
async def get(self, lock_name: str) -> bool:
...
async def delete(self, lock_name: str) -> None:
...
class RedisLockProvider:
lock_prefix = "lock_"
async def set(self, lock_name: str) -> bool:
"""
Tries to sets a lock.
Returns True when the lock could be set and False, if it was already set.
"""
# TODO: Improve lock. See: https://redis.io/topics/distlock
async with get_connection() as redis:
return await redis.setnx(f"{self.lock_prefix}{lock_name}", 1)
async def get(self, lock_name: str) -> bool:
"""
Returns True, when the lock is set. Else False.
"""
async with get_connection() as redis:
return await redis.get(f"{self.lock_prefix}{lock_name}")
async def delete(self, lock_name: str) -> None:
"""
Deletes the lock. Does nothing when the lock is not set.
"""
async with get_connection() as redis:
await redis.delete(f"{self.lock_prefix}{lock_name}")
class MemoryLockProvider:
def __init__(self) -> None:
self.locks: Dict[str, str] = {}
async def set(self, lock_name: str) -> bool:
if lock_name in self.locks:
return False
self.locks[lock_name] = "1"
return True
async def get(self, lock_name: str) -> bool:
return lock_name in self.locks
async def delete(self, lock_name: str) -> None:
try:
del self.locks[lock_name]
except KeyError:
pass
def load_lock_provider() -> LockProtocol:
"""
Generates an lock provider singleton.
"""
if use_redis:
lock_provider: LockProtocol = RedisLockProvider()
else:
lock_provider = MemoryLockProvider()
return lock_provider
locking = load_lock_provider()

View File

@ -1,4 +1,3 @@
import asyncio
from typing import Any, Callable, Dict, List from typing import Any, Callable, Dict, List
from openslides.utils.cache_providers import Cachable, MemoryCacheProvider from openslides.utils.cache_providers import Cachable, MemoryCacheProvider
@ -92,12 +91,8 @@ class TTestCacheProvider(MemoryCacheProvider):
""" """
CacheProvider simular to the MemoryCacheProvider with special methods for CacheProvider simular to the MemoryCacheProvider with special methods for
testing. testing.
Currently just a dummy for future extensions.
""" """
async def del_lock_after_wait( pass
self, lock_name: str, future: asyncio.Future = None
) -> None:
async def set_future() -> None:
await self.del_lock(lock_name)
asyncio.ensure_future(set_future())