Merge pull request #5341 from FinnStutzenstein/loadConfigsBeforeModels

Load configs before models
This commit is contained in:
Emanuel Schütze 2020-04-27 10:00:53 +02:00 committed by GitHub
commit 46d0bbd8f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 81 deletions

View File

@ -85,7 +85,7 @@ class ConfigHandler:
before this is called. before this is called.
""" """
async with build_key_to_id_lock: async with build_key_to_id_lock:
# Another cliend could have build the key_to_id_dict, check and return early # Another worker could have build the key_to_id_dict, check and return early
if self.key_to_id is not None: if self.key_to_id is not None:
return return
@ -101,12 +101,7 @@ class ConfigHandler:
""" """
Returns True, if the config varialbe was defined. Returns True, if the config varialbe was defined.
""" """
try: return key in self.config_variables
self.config_variables[key]
except KeyError:
return False
else:
return True
# TODO: Remove the any by using right types in INPUT_TYPE_MAPPING # TODO: Remove the any by using right types in INPUT_TYPE_MAPPING
def __setitem__(self, key: str, value: Any) -> None: def __setitem__(self, key: str, value: Any) -> None:

View File

@ -4,7 +4,7 @@ from datetime import datetime
from time import sleep from time import sleep
from typing import Any, Callable, Dict, List, Optional, Tuple, Type from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync, sync_to_async
from django.apps import apps from django.apps import apps
from . import logging from . import logging
@ -53,7 +53,7 @@ class ElementCache:
There is one redis Hash (simular to python dict) for the full_data There is one redis Hash (simular to python dict) for the full_data
The key of the Hashes is COLLECTIONSTRING:ID where COLLECTIONSTRING is the The key of the Hashes is COLLECTIONSTRING:ID where COLLECTIONSTRING is the
collection_string of a collection and id the id of an element. collection of a collection and id the id of an element.
There is an sorted set in redis with the change id as score. The values are There is an sorted set in redis with the change id as score. The values are
COLLETIONSTRING:ID for the elements that have been changed with that change COLLETIONSTRING:ID for the elements that have been changed with that change
@ -81,7 +81,7 @@ class ElementCache:
@property @property
def cachables(self) -> Dict[str, Cachable]: def cachables(self) -> Dict[str, Cachable]:
""" """
Returns all cachables as a dict where the key is the collection_string of the cachable. Returns all cachables as a dict where the key is the collection of the cachable.
""" """
# This method is neccessary to lazy load the cachables # This method is neccessary to lazy load the cachables
if self._cachables is None: if self._cachables is None:
@ -130,33 +130,10 @@ class ElementCache:
lock_name = "build_cache" lock_name = "build_cache"
# Set a lock so only one process builds the cache # Set a lock so only one process builds the cache
if await locking.set(lock_name): if await locking.set(lock_name):
logger.info("Building up the cache data...")
try: try:
mapping = {} await self._build_cache(
for collection_string, cachable in self.cachables.items(): default_change_id=default_change_id, schema_version=schema_version
for element in cachable.get_elements():
mapping.update(
{
get_element_id(
collection_string, element["id"]
): json.dumps(element)
}
) )
logger.info("Done building the cache data.")
logger.info("Saving cache data into the cache...")
if default_change_id is None:
if self.default_change_id is not None:
default_change_id = self.default_change_id
else:
# Use the miliseconds (rounded) since the 2016-02-29.
default_change_id = int(
(datetime.utcnow() - datetime(2016, 2, 29)).total_seconds()
)
default_change_id *= 1000
await self.cache_provider.reset_full_cache(mapping, default_change_id)
if schema_version:
await self.cache_provider.set_schema_version(schema_version)
logger.info("Done saving the cache data.")
finally: finally:
await locking.delete(lock_name) await locking.delete(lock_name)
else: else:
@ -165,6 +142,67 @@ class ElementCache:
sleep(0.01) sleep(0.01)
logger.info("Cache is ready (built by another process).") logger.info("Cache is ready (built by another process).")
async def _build_cache(
self,
default_change_id: Optional[int] = None,
schema_version: Optional[SchemaVersion] = None,
) -> None:
logger.info("Building config data and resetting cache...")
config_mapping = await sync_to_async(
self._build_cache_get_elementid_model_mapping
)(config_only=True)
change_id = self._build_cache_get_change_id(default_change_id)
await self.cache_provider.reset_full_cache(config_mapping, change_id)
if schema_version:
await self.cache_provider.set_schema_version(schema_version)
logger.info("Done building and resetting.")
logger.info("Building up the cache data...")
mapping = await sync_to_async(self._build_cache_get_elementid_model_mapping)()
logger.info("Done building the cache data.")
logger.info("Saving cache data into the cache...")
await self.cache_provider.add_to_full_data(mapping)
logger.info("Done saving the cache data.")
def _build_cache_get_elementid_model_mapping(
self, config_only: bool = False,
) -> Dict[str, str]:
"""
Do NOT call this in an asynchronous context!
This accesses the django's model system which requires a synchronous context.
config_only=True only includes the config collection
config_only=False *excludes* the config collection
"""
mapping = {}
config_collection = "core/config"
for collection, cachable in self.cachables.items():
if (config_only and collection != config_collection) or (
not config_only and collection == config_collection
):
continue
for element in cachable.get_elements():
mapping.update(
{get_element_id(collection, element["id"]): json.dumps(element)}
)
return mapping
def _build_cache_get_change_id(
self, default_change_id: Optional[int] = None
) -> int:
if default_change_id is None:
if self.default_change_id is not None:
change_id = self.default_change_id
else:
# Use the miliseconds (rounded) since the 2016-02-29.
change_id = (
int((datetime.utcnow() - datetime(2016, 2, 29)).total_seconds())
* 1000
)
else:
change_id = default_change_id
return change_id
async def change_elements( async def change_elements(
self, elements: Dict[str, Optional[Dict[str, Any]]] self, elements: Dict[str, Optional[Dict[str, Any]]]
) -> int: ) -> int:
@ -203,19 +241,17 @@ class ElementCache:
""" """
all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list) all_data: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for element_id, data in (await self.cache_provider.get_all_data()).items(): for element_id, data in (await self.cache_provider.get_all_data()).items():
collection_string, _ = split_element_id(element_id) collection, _ = split_element_id(element_id)
element = json.loads(data.decode()) element = json.loads(data.decode())
element.pop( element.pop(
"_no_delete_on_restriction", False "_no_delete_on_restriction", False
) # remove special field for get_data_since ) # remove special field for get_data_since
all_data[collection_string].append(element) all_data[collection].append(element)
if user_id is not None: if user_id is not None:
for collection_string in all_data.keys(): for collection in all_data.keys():
restricter = self.cachables[collection_string].restrict_elements restricter = self.cachables[collection].restrict_elements
all_data[collection_string] = await restricter( all_data[collection] = await restricter(user_id, all_data[collection])
user_id, all_data[collection_string]
)
return dict(all_data) return dict(all_data)
async def get_all_data_dict(self) -> Dict[str, Dict[int, Dict[str, Any]]]: async def get_all_data_dict(self) -> Dict[str, Dict[int, Dict[str, Any]]]:
@ -229,22 +265,20 @@ class ElementCache:
""" """
all_data: Dict[str, Dict[int, Dict[str, Any]]] = defaultdict(dict) all_data: Dict[str, Dict[int, Dict[str, Any]]] = defaultdict(dict)
for element_id, data in (await self.cache_provider.get_all_data()).items(): for element_id, data in (await self.cache_provider.get_all_data()).items():
collection_string, id = split_element_id(element_id) collection, id = split_element_id(element_id)
element = json.loads(data.decode()) element = json.loads(data.decode())
element.pop( element.pop(
"_no_delete_on_restriction", False "_no_delete_on_restriction", False
) # remove special field for get_data_since ) # remove special field for get_data_since
all_data[collection_string][id] = element all_data[collection][id] = element
return dict(all_data) return dict(all_data)
async def get_collection_data( async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]:
self, collection_string: str
) -> Dict[int, Dict[str, Any]]:
""" """
Returns the data for one collection as dict: {id: <element>} Returns the data for one collection as dict: {id: <element>}
""" """
encoded_collection_data = await self.cache_provider.get_collection_data( encoded_collection_data = await self.cache_provider.get_collection_data(
collection_string collection
) )
collection_data = {} collection_data = {}
for id in encoded_collection_data.keys(): for id in encoded_collection_data.keys():
@ -255,14 +289,14 @@ class ElementCache:
return collection_data return collection_data
async def get_element_data( async def get_element_data(
self, collection_string: str, id: int, user_id: Optional[int] = None self, collection: str, id: int, user_id: Optional[int] = None
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
""" """
Returns one element or None, if the element does not exist. Returns one element or None, if the element does not exist.
If the user id is given the data will be restricted for this user. If the user id is given the data will be restricted for this user.
""" """
encoded_element = await self.cache_provider.get_element_data( encoded_element = await self.cache_provider.get_element_data(
get_element_id(collection_string, id) get_element_id(collection, id)
) )
if encoded_element is None: if encoded_element is None:
@ -273,15 +307,13 @@ class ElementCache:
) # remove special field for get_data_since ) # remove special field for get_data_since
if user_id is not None: if user_id is not None:
element = await self.restrict_element_data( element = await self.restrict_element_data(element, collection, user_id)
element, collection_string, user_id
)
return element return element
async def restrict_element_data( async def restrict_element_data(
self, element: Dict[str, Any], collection_string: str, user_id: int self, element: Dict[str, Any], collection: str, user_id: int
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
restricter = self.cachables[collection_string].restrict_elements restricter = self.cachables[collection].restrict_elements
restricted_elements = await restricter(user_id, [element]) restricted_elements = await restricter(user_id, [element])
return restricted_elements[0] if restricted_elements else None return restricted_elements[0] if restricted_elements else None
@ -294,7 +326,7 @@ class ElementCache:
data will be restricted for this user. data will be restricted for this user.
Returns two values inside a tuple. The first value is a dict where the Returns two values inside a tuple. The first value is a dict where the
key is the collection_string and the value is a list of data. The second key is the collection and the value is a list of data. The second
is a list of element_ids with deleted elements. is a list of element_ids with deleted elements.
Only returns elements with the change_id or newer. When change_id is 0, Only returns elements with the change_id or newer. When change_id is 0,
@ -325,8 +357,8 @@ class ElementCache:
change_id, max_change_id=max_change_id change_id, max_change_id=max_change_id
) )
changed_elements = { changed_elements = {
collection_string: [json.loads(value.decode()) for value in value_list] collection: [json.loads(value.decode()) for value in value_list]
for collection_string, value_list in raw_changed_elements.items() for collection, value_list in raw_changed_elements.items()
} }
if user_id is None: if user_id is None:
@ -336,7 +368,7 @@ class ElementCache:
else: else:
# the list(...) is important, because `changed_elements` will be # the list(...) is important, because `changed_elements` will be
# altered during iteration and restricting data # altered during iteration and restricting data
for collection_string, elements in list(changed_elements.items()): for collection, elements in list(changed_elements.items()):
# Remove the _no_delete_on_restriction from each element. Collect all ids, where # Remove the _no_delete_on_restriction from each element. Collect all ids, where
# this field is absent or False. # this field is absent or False.
unrestricted_ids = set() unrestricted_ids = set()
@ -347,7 +379,7 @@ class ElementCache:
if not no_delete_on_restriction: if not no_delete_on_restriction:
unrestricted_ids.add(element["id"]) unrestricted_ids.add(element["id"])
cacheable = self.cachables[collection_string] cacheable = self.cachables[collection]
restricted_elements = await cacheable.restrict_elements( restricted_elements = await cacheable.restrict_elements(
user_id, elements user_id, elements
) )
@ -361,12 +393,12 @@ class ElementCache:
# Delete all ids, that are allowed to be deleted (see unrestricted_ids) and are # Delete all ids, that are allowed to be deleted (see unrestricted_ids) and are
# not present after restricting the data. # not present after restricting the data.
for id in unrestricted_ids - restricted_element_ids: for id in unrestricted_ids - restricted_element_ids:
deleted_elements.append(get_element_id(collection_string, id)) deleted_elements.append(get_element_id(collection, id))
if not restricted_elements: if not restricted_elements:
del changed_elements[collection_string] del changed_elements[collection]
else: else:
changed_elements[collection_string] = restricted_elements changed_elements[collection] = restricted_elements
return (changed_elements, deleted_elements) return (changed_elements, deleted_elements)

View File

@ -44,6 +44,9 @@ class ElementCacheProvider(Protocol):
) -> None: ) -> None:
... ...
async def add_to_full_data(self, data: Dict[str, str]) -> None:
...
async def data_exists(self) -> bool: async def data_exists(self) -> bool:
... ...
@ -252,6 +255,10 @@ class RedisCacheProvider:
) )
await tr.execute() await tr.execute()
async def add_to_full_data(self, data: Dict[str, str]) -> None:
async with get_connection() as redis:
redis.hmset_dict(self.full_data_cache_key, data)
async def data_exists(self) -> bool: async def data_exists(self) -> bool:
""" """
Returns True, when there is data in the cache. Returns True, when there is data in the cache.
@ -331,7 +338,7 @@ class RedisCacheProvider:
Returns all elements since a change_id (included) and until the max_change_id (included). Returns all elements since a change_id (included) and until the max_change_id (included).
The returend value is a two element tuple. The first value is a dict the elements where The returend value is a two element tuple. The first value is a dict the elements where
the key is the collection_string and the value a list of (json-) encoded elements. The the key is the collection and the value a list of (json-) encoded elements. The
second element is a list of element_ids, that have been deleted since the change_id. second element is a list of element_ids, that have been deleted since the change_id.
""" """
changed_elements: Dict[str, List[bytes]] = defaultdict(list) changed_elements: Dict[str, List[bytes]] = defaultdict(list)
@ -361,8 +368,8 @@ class RedisCacheProvider:
# The element is not in the cache. It has to be deleted. # The element is not in the cache. It has to be deleted.
deleted_elements.append(element_id.decode()) deleted_elements.append(element_id.decode())
else: else:
collection_string, id = split_element_id(element_id) collection, id = split_element_id(element_id)
changed_elements[collection_string].append(element_json) changed_elements[collection].append(element_json)
return changed_elements, deleted_elements return changed_elements, deleted_elements
@ensure_cache_wrapper() @ensure_cache_wrapper()
@ -501,6 +508,9 @@ class MemoryCacheProvider:
self.full_data = data self.full_data = data
self.default_change_id = default_change_id self.default_change_id = default_change_id
async def add_to_full_data(self, data: Dict[str, str]) -> None:
self.full_data.update(data)
async def data_exists(self) -> bool: async def data_exists(self) -> bool:
return bool(self.full_data) and self.default_change_id >= 0 return bool(self.full_data) and self.default_change_id >= 0
@ -564,8 +574,8 @@ class MemoryCacheProvider:
if element_json is None: if element_json is None:
deleted_elements.append(element_id) deleted_elements.append(element_id)
else: else:
collection_string, id = split_element_id(element_id) collection, id = split_element_id(element_id)
changed_elements[collection_string].append(element_json.encode()) changed_elements[collection].append(element_json.encode())
return changed_elements, deleted_elements return changed_elements, deleted_elements
async def get_current_change_id(self) -> int: async def get_current_change_id(self) -> int:

View File

@ -351,11 +351,7 @@ def is_local_installation() -> bool:
This is the case if manage.py is used, or when the --local-installation flag is set. This is the case if manage.py is used, or when the --local-installation flag is set.
""" """
return ( return "--local-installation" in sys.argv or "manage.py" in sys.argv[0]
True
if "--local-installation" in sys.argv or "manage.py" in sys.argv[0]
else False
)
def is_windows() -> bool: def is_windows() -> bool: