Merge pull request #5152 from FinnStutzenstein/lockingService
Locking service and locks the history build process (closes #4039)
This commit is contained in:
commit
7c61076837
@ -6,6 +6,7 @@ from jsonfield import JSONField
|
||||
|
||||
from ..utils.autoupdate import Element
|
||||
from ..utils.cache import element_cache, get_element_id
|
||||
from ..utils.locking import locking
|
||||
from ..utils.models import SET_NULL_AND_AUTOUPDATE, RESTModelMixin
|
||||
from .access_permissions import (
|
||||
ConfigAccessPermissions,
|
||||
@ -284,22 +285,27 @@ class HistoryManager(models.Manager):
|
||||
"""
|
||||
Method to add all cachables to the history.
|
||||
"""
|
||||
# TODO: Add lock to prevent multiple history builds at once. See #4039.
|
||||
instances = None
|
||||
if self.all().count() == 0:
|
||||
elements = []
|
||||
all_full_data = async_to_sync(element_cache.get_all_data_list)()
|
||||
for collection_string, data in all_full_data.items():
|
||||
for full_data in data:
|
||||
elements.append(
|
||||
Element(
|
||||
id=full_data["id"],
|
||||
collection_string=collection_string,
|
||||
full_data=full_data,
|
||||
)
|
||||
)
|
||||
instances = self.add_elements(elements)
|
||||
return instances
|
||||
async_to_sync(self.async_build_history)()
|
||||
|
||||
async def async_build_history(self):
|
||||
lock_name = "build_cache"
|
||||
if await locking.set(lock_name):
|
||||
try:
|
||||
if self.all().count() == 0:
|
||||
elements = []
|
||||
all_full_data = await element_cache.get_all_data_list()
|
||||
for collection_string, data in all_full_data.items():
|
||||
for full_data in data:
|
||||
elements.append(
|
||||
Element(
|
||||
id=full_data["id"],
|
||||
collection_string=collection_string,
|
||||
full_data=full_data,
|
||||
)
|
||||
)
|
||||
self.add_elements(elements)
|
||||
finally:
|
||||
await locking.delete(lock_name)
|
||||
|
||||
|
||||
class History(models.Model):
|
||||
|
@ -14,6 +14,7 @@ from .cache_providers import (
|
||||
MemoryCacheProvider,
|
||||
RedisCacheProvider,
|
||||
)
|
||||
from .locking import locking
|
||||
from .redis import use_redis
|
||||
from .schema_version import SchemaVersion, schema_version_handler
|
||||
from .utils import get_element_id, split_element_id
|
||||
@ -128,7 +129,7 @@ class ElementCache:
|
||||
) -> None:
|
||||
lock_name = "build_cache"
|
||||
# Set a lock so only one process builds the cache
|
||||
if await self.cache_provider.set_lock(lock_name):
|
||||
if await locking.set(lock_name):
|
||||
logger.info("Building up the cache data...")
|
||||
try:
|
||||
mapping = {}
|
||||
@ -157,10 +158,10 @@ class ElementCache:
|
||||
await self.cache_provider.set_schema_version(schema_version)
|
||||
logger.info("Done saving the cache data.")
|
||||
finally:
|
||||
await self.cache_provider.del_lock(lock_name)
|
||||
await locking.delete(lock_name)
|
||||
else:
|
||||
logger.info("Wait for another process to build up the cache...")
|
||||
while await self.cache_provider.get_lock(lock_name):
|
||||
while await locking.get(lock_name):
|
||||
sleep(0.01)
|
||||
logger.info("Cache is ready (built by another process).")
|
||||
|
||||
|
@ -66,15 +66,6 @@ class ElementCacheProvider(Protocol):
|
||||
) -> Tuple[Dict[str, List[bytes]], List[str]]:
|
||||
...
|
||||
|
||||
async def set_lock(self, lock_name: str) -> bool:
|
||||
...
|
||||
|
||||
async def get_lock(self, lock_name: str) -> bool:
|
||||
...
|
||||
|
||||
async def del_lock(self, lock_name: str) -> None:
|
||||
...
|
||||
|
||||
async def get_current_change_id(self) -> int:
|
||||
...
|
||||
|
||||
@ -250,7 +241,6 @@ class RedisCacheProvider:
|
||||
) -> None:
|
||||
"""
|
||||
Deletes the full_data_cache and write new data in it. Clears the change id key.
|
||||
Does not clear locks.
|
||||
"""
|
||||
async with get_connection() as redis:
|
||||
tr = redis.multi_exec()
|
||||
@ -371,30 +361,6 @@ class RedisCacheProvider:
|
||||
changed_elements[collection_string].append(element_json)
|
||||
return changed_elements, deleted_elements
|
||||
|
||||
async def set_lock(self, lock_name: str) -> bool:
|
||||
"""
|
||||
Tries to sets a lock.
|
||||
|
||||
Returns True when the lock could be set and False, if it was already set.
|
||||
"""
|
||||
# TODO: Improve lock. See: https://redis.io/topics/distlock
|
||||
async with get_connection() as redis:
|
||||
return await redis.setnx(f"lock_{lock_name}", 1)
|
||||
|
||||
async def get_lock(self, lock_name: str) -> bool:
|
||||
"""
|
||||
Returns True, when the lock is set. Else False.
|
||||
"""
|
||||
async with get_connection() as redis:
|
||||
return await redis.get(f"lock_{lock_name}")
|
||||
|
||||
async def del_lock(self, lock_name: str) -> None:
|
||||
"""
|
||||
Deletes the lock. Does nothing when the lock is not set.
|
||||
"""
|
||||
async with get_connection() as redis:
|
||||
await redis.delete(f"lock_{lock_name}")
|
||||
|
||||
@ensure_cache_wrapper()
|
||||
async def get_current_change_id(self) -> int:
|
||||
"""
|
||||
@ -585,21 +551,6 @@ class MemoryCacheProvider:
|
||||
changed_elements[collection_string].append(element_json.encode())
|
||||
return changed_elements, deleted_elements
|
||||
|
||||
async def set_lock(self, lock_name: str) -> bool:
|
||||
if lock_name in self.locks:
|
||||
return False
|
||||
self.locks[lock_name] = "1"
|
||||
return True
|
||||
|
||||
async def get_lock(self, lock_name: str) -> bool:
|
||||
return lock_name in self.locks
|
||||
|
||||
async def del_lock(self, lock_name: str) -> None:
|
||||
try:
|
||||
del self.locks[lock_name]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
async def get_current_change_id(self) -> int:
|
||||
if self.change_id_data:
|
||||
return max(self.change_id_data.keys())
|
||||
|
83
openslides/utils/locking.py
Normal file
83
openslides/utils/locking.py
Normal file
@ -0,0 +1,83 @@
|
||||
from typing import Dict
|
||||
|
||||
from typing_extensions import Protocol
|
||||
|
||||
from .redis import use_redis
|
||||
|
||||
|
||||
if use_redis:
|
||||
from .redis import get_connection
|
||||
|
||||
|
||||
class LockProtocol(Protocol):
|
||||
async def set(self, lock_name: str) -> bool:
|
||||
...
|
||||
|
||||
async def get(self, lock_name: str) -> bool:
|
||||
...
|
||||
|
||||
async def delete(self, lock_name: str) -> None:
|
||||
...
|
||||
|
||||
|
||||
class RedisLockProvider:
|
||||
lock_prefix = "lock_"
|
||||
|
||||
async def set(self, lock_name: str) -> bool:
|
||||
"""
|
||||
Tries to sets a lock.
|
||||
|
||||
Returns True when the lock could be set and False, if it was already set.
|
||||
"""
|
||||
# TODO: Improve lock. See: https://redis.io/topics/distlock
|
||||
async with get_connection() as redis:
|
||||
return await redis.setnx(f"{self.lock_prefix}{lock_name}", 1)
|
||||
|
||||
async def get(self, lock_name: str) -> bool:
|
||||
"""
|
||||
Returns True, when the lock is set. Else False.
|
||||
"""
|
||||
async with get_connection() as redis:
|
||||
return await redis.get(f"{self.lock_prefix}{lock_name}")
|
||||
|
||||
async def delete(self, lock_name: str) -> None:
|
||||
"""
|
||||
Deletes the lock. Does nothing when the lock is not set.
|
||||
"""
|
||||
async with get_connection() as redis:
|
||||
await redis.delete(f"{self.lock_prefix}{lock_name}")
|
||||
|
||||
|
||||
class MemoryLockProvider:
|
||||
def __init__(self) -> None:
|
||||
self.locks: Dict[str, str] = {}
|
||||
|
||||
async def set(self, lock_name: str) -> bool:
|
||||
if lock_name in self.locks:
|
||||
return False
|
||||
self.locks[lock_name] = "1"
|
||||
return True
|
||||
|
||||
async def get(self, lock_name: str) -> bool:
|
||||
return lock_name in self.locks
|
||||
|
||||
async def delete(self, lock_name: str) -> None:
|
||||
try:
|
||||
del self.locks[lock_name]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
def load_lock_provider() -> LockProtocol:
|
||||
"""
|
||||
Generates an lock provider singleton.
|
||||
"""
|
||||
if use_redis:
|
||||
lock_provider: LockProtocol = RedisLockProvider()
|
||||
else:
|
||||
lock_provider = MemoryLockProvider()
|
||||
|
||||
return lock_provider
|
||||
|
||||
|
||||
locking = load_lock_provider()
|
@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
from typing import Any, Callable, Dict, List
|
||||
|
||||
from openslides.utils.cache_providers import Cachable, MemoryCacheProvider
|
||||
@ -92,12 +91,8 @@ class TTestCacheProvider(MemoryCacheProvider):
|
||||
"""
|
||||
CacheProvider simular to the MemoryCacheProvider with special methods for
|
||||
testing.
|
||||
|
||||
Currently just a dummy for future extensions.
|
||||
"""
|
||||
|
||||
async def del_lock_after_wait(
|
||||
self, lock_name: str, future: asyncio.Future = None
|
||||
) -> None:
|
||||
async def set_future() -> None:
|
||||
await self.del_lock(lock_name)
|
||||
|
||||
asyncio.ensure_future(set_future())
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user