Merge pull request #3358 from ostcar/restricted_user_cache
Add restricted_data_cache
This commit is contained in:
commit
37b2996b6c
@ -1,12 +1,13 @@
|
|||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import warnings
|
import warnings
|
||||||
from collections import Iterable, defaultdict
|
from collections import defaultdict
|
||||||
from typing import Any, Dict, Iterable, List, cast # noqa
|
from typing import Any, Dict, Generator, Iterable, List, Union, cast
|
||||||
|
|
||||||
from channels import Channel, Group
|
from channels import Channel, Group
|
||||||
from channels.asgi import get_channel_layer
|
from channels.asgi import get_channel_layer
|
||||||
from channels.auth import channel_session_user, channel_session_user_from_http
|
from channels.auth import channel_session_user, channel_session_user_from_http
|
||||||
|
from django.apps import apps
|
||||||
from django.core.exceptions import ObjectDoesNotExist
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
from django.db.models import Model
|
from django.db.models import Model
|
||||||
@ -14,7 +15,7 @@ from django.db.models import Model
|
|||||||
from ..core.config import config
|
from ..core.config import config
|
||||||
from ..core.models import Projector
|
from ..core.models import Projector
|
||||||
from .auth import anonymous_is_enabled, has_perm, user_to_collection_user
|
from .auth import anonymous_is_enabled, has_perm, user_to_collection_user
|
||||||
from .cache import startup_cache, websocket_user_cache
|
from .cache import restricted_data_cache, websocket_user_cache
|
||||||
from .collection import Collection, CollectionElement, CollectionElementList
|
from .collection import Collection, CollectionElement, CollectionElementList
|
||||||
|
|
||||||
|
|
||||||
@ -89,8 +90,12 @@ def ws_add_site(message: Any) -> None:
|
|||||||
# Collect all elements that shoud be send to the client when the websocket
|
# Collect all elements that shoud be send to the client when the websocket
|
||||||
# connection is established.
|
# connection is established.
|
||||||
user = user_to_collection_user(message.user.id)
|
user = user_to_collection_user(message.user.id)
|
||||||
|
user_id = user.id if user is not None else 0
|
||||||
|
if restricted_data_cache.exists_for_user(user_id):
|
||||||
|
output = restricted_data_cache.get_data(user_id)
|
||||||
|
else:
|
||||||
output = []
|
output = []
|
||||||
for collection in startup_cache.get_collections():
|
for collection in get_startup_collections():
|
||||||
access_permissions = collection.get_access_permissions()
|
access_permissions = collection.get_access_permissions()
|
||||||
restricted_data = access_permissions.get_restricted_data(collection, user)
|
restricted_data = access_permissions.get_restricted_data(collection, user)
|
||||||
|
|
||||||
@ -102,12 +107,20 @@ def ws_add_site(message: Any) -> None:
|
|||||||
# We do not want to send 'deleted' objects on startup.
|
# We do not want to send 'deleted' objects on startup.
|
||||||
# That's why we skip such data.
|
# That's why we skip such data.
|
||||||
continue
|
continue
|
||||||
output.append(
|
|
||||||
format_for_autoupdate(
|
formatted_data = format_for_autoupdate(
|
||||||
collection_string=collection.collection_string,
|
collection_string=collection.collection_string,
|
||||||
id=int(data['id']),
|
id=data['id'],
|
||||||
action='changed',
|
action='changed',
|
||||||
data=data))
|
data=data)
|
||||||
|
|
||||||
|
output.append(formatted_data)
|
||||||
|
# Cache restricted data for user
|
||||||
|
restricted_data_cache.add_element(
|
||||||
|
user_id,
|
||||||
|
collection.collection_string,
|
||||||
|
data['id'],
|
||||||
|
formatted_data)
|
||||||
|
|
||||||
# Send all data.
|
# Send all data.
|
||||||
if output:
|
if output:
|
||||||
@ -259,7 +272,23 @@ def send_data(message: Any) -> None:
|
|||||||
except ObjectDoesNotExist:
|
except ObjectDoesNotExist:
|
||||||
# The user does not exist. Skip him/her.
|
# The user does not exist. Skip him/her.
|
||||||
continue
|
continue
|
||||||
output = collection_elements.as_autoupdate_for_user(user)
|
|
||||||
|
output = []
|
||||||
|
for collection_element in collection_elements:
|
||||||
|
formatted_data = collection_element.as_autoupdate_for_user(user)
|
||||||
|
if formatted_data['action'] == 'changed':
|
||||||
|
restricted_data_cache.add_element(
|
||||||
|
user_id or 0,
|
||||||
|
collection_element.collection_string,
|
||||||
|
collection_element.id,
|
||||||
|
formatted_data)
|
||||||
|
else:
|
||||||
|
restricted_data_cache.del_element(
|
||||||
|
user_id or 0,
|
||||||
|
collection_element.collection_string,
|
||||||
|
collection_element.id)
|
||||||
|
output.append(formatted_data)
|
||||||
|
|
||||||
for channel_name in channel_names:
|
for channel_name in channel_names:
|
||||||
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
|
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
|
||||||
|
|
||||||
@ -290,7 +319,7 @@ def send_data(message: Any) -> None:
|
|||||||
{'text': json.dumps(output)})
|
{'text': json.dumps(output)})
|
||||||
|
|
||||||
|
|
||||||
def inform_changed_data(instances: Iterable[Model], information: Dict[str, Any]=None) -> None:
|
def inform_changed_data(instances: Union[Iterable[Model], Model], information: Dict[str, Any]=None) -> None:
|
||||||
"""
|
"""
|
||||||
Informs the autoupdate system and the caching system about the creation or
|
Informs the autoupdate system and the caching system about the creation or
|
||||||
update of an element.
|
update of an element.
|
||||||
@ -299,8 +328,8 @@ def inform_changed_data(instances: Iterable[Model], information: Dict[str, Any]=
|
|||||||
"""
|
"""
|
||||||
root_instances = set()
|
root_instances = set()
|
||||||
if not isinstance(instances, Iterable):
|
if not isinstance(instances, Iterable):
|
||||||
# Make sure instances is an iterable
|
|
||||||
instances = (instances, )
|
instances = (instances, )
|
||||||
|
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
try:
|
try:
|
||||||
root_instances.add(instance.get_root_rest_element())
|
root_instances.add(instance.get_root_rest_element())
|
||||||
@ -375,13 +404,25 @@ def send_autoupdate(collection_elements: CollectionElementList) -> None:
|
|||||||
Helper function, that sends collection_elements through a channel to the
|
Helper function, that sends collection_elements through a channel to the
|
||||||
autoupdate system.
|
autoupdate system.
|
||||||
|
|
||||||
Before sending the startup_cache is cleared because it is possibly out of
|
|
||||||
date.
|
|
||||||
|
|
||||||
Does nothing if collection_elements is empty.
|
Does nothing if collection_elements is empty.
|
||||||
"""
|
"""
|
||||||
if collection_elements:
|
if collection_elements:
|
||||||
startup_cache.clear()
|
|
||||||
send_or_wait(
|
send_or_wait(
|
||||||
Channel('autoupdate.send_data').send,
|
Channel('autoupdate.send_data').send,
|
||||||
collection_elements.as_channels_message())
|
collection_elements.as_channels_message())
|
||||||
|
|
||||||
|
|
||||||
|
def get_startup_collections() -> Generator[Collection, None, None]:
|
||||||
|
"""
|
||||||
|
Returns all Collections that should be send to the user at startup
|
||||||
|
"""
|
||||||
|
for app in apps.get_app_configs():
|
||||||
|
try:
|
||||||
|
# Get the method get_startup_elements() from an app.
|
||||||
|
# This method has to return an iterable of Collection objects.
|
||||||
|
get_startup_elements = app.get_startup_elements
|
||||||
|
except AttributeError:
|
||||||
|
# Skip apps that do not implement get_startup_elements.
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield from get_startup_elements()
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import json
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import ( # noqa
|
from typing import ( # noqa
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
@ -9,11 +10,11 @@ from typing import ( # noqa
|
|||||||
List,
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
Set,
|
Set,
|
||||||
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
from channels import Group
|
from channels import Group
|
||||||
from channels.sessions import session_for_reply_channel
|
from channels.sessions import session_for_reply_channel
|
||||||
from django.apps import apps
|
|
||||||
from django.core.cache import cache, caches
|
from django.core.cache import cache, caches
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@ -202,60 +203,76 @@ class DjangoCacheWebsocketUserCache(BaseWebsocketUserCache):
|
|||||||
cache.set(self.get_cache_key(), data)
|
cache.set(self.get_cache_key(), data)
|
||||||
|
|
||||||
|
|
||||||
class StartupCache:
|
class RestrictedDataCache:
|
||||||
"""
|
"""
|
||||||
Cache of all data that are required when a client connects via websocket.
|
Caches all Data for a specific users.
|
||||||
"""
|
|
||||||
cache_key = "full_data_startup_cache"
|
|
||||||
|
|
||||||
def build(self) -> Dict[str, List[str]]:
|
The cached values are expected to be formatted for outout via websocket.
|
||||||
"""
|
"""
|
||||||
Generate the cache by going through all apps. Returns a dict where the
|
|
||||||
key is the collection string and the value a list of the full_data from
|
|
||||||
the collection elements.
|
|
||||||
"""
|
|
||||||
cache_data = {} # type: Dict[str, List[str]]
|
|
||||||
for app in apps.get_app_configs():
|
|
||||||
try:
|
|
||||||
# Get the method get_startup_elements() from an app.
|
|
||||||
# This method has to return an iterable of Collection objects.
|
|
||||||
get_startup_elements = app.get_startup_elements # type: Callable[[], Iterable[Collection]]
|
|
||||||
except AttributeError:
|
|
||||||
# Skip apps that do not implement get_startup_elements.
|
|
||||||
continue
|
|
||||||
|
|
||||||
for collection in get_startup_elements():
|
base_cache_key = 'restricted_user_cache'
|
||||||
cache_data[collection.collection_string] = [
|
|
||||||
collection_element.get_full_data()
|
|
||||||
for collection_element
|
|
||||||
in collection.element_generator()]
|
|
||||||
|
|
||||||
cache.set(self.cache_key, cache_data, 86400)
|
def add_element(self, user_id: int, collection_string: str, id: int, data: object) -> None:
|
||||||
return cache_data
|
"""
|
||||||
|
Adds one element to the cache. If the cache does not exists for the user,
|
||||||
|
it is created.
|
||||||
|
"""
|
||||||
|
redis = get_redis_connection()
|
||||||
|
redis.hset(
|
||||||
|
self.get_cache_key(user_id),
|
||||||
|
"{}/{}".format(collection_string, id),
|
||||||
|
json.dumps(data))
|
||||||
|
|
||||||
def clear(self) -> None:
|
def del_element(self, user_id: int, collection_string: str, id: int) -> None:
|
||||||
"""
|
"""
|
||||||
Clears the cache.
|
Removes one element from the cache.
|
||||||
"""
|
|
||||||
cache.delete(self.cache_key)
|
|
||||||
|
|
||||||
def get_collections(self) -> Generator['Collection', None, None]:
|
Does nothing if the cache does not exist.
|
||||||
"""
|
"""
|
||||||
Generator that returns all cached Collections.
|
redis = get_redis_connection()
|
||||||
|
redis.hdel(
|
||||||
|
self.get_cache_key(user_id),
|
||||||
|
"{}/{}".format(collection_string, id))
|
||||||
|
|
||||||
The data is read from the cache if it exists. It builds the cache if it
|
def exists_for_user(self, user_id: int) -> bool:
|
||||||
does not exists.
|
|
||||||
"""
|
"""
|
||||||
from .collection import Collection # noqa
|
Returns True if the cache for the user exists, else False.
|
||||||
data = cache.get(self.cache_key)
|
"""
|
||||||
if data is None:
|
redis = get_redis_connection()
|
||||||
# The cache does not exist.
|
return redis.exists(self.get_cache_key(user_id))
|
||||||
data = self.build()
|
|
||||||
for collection_string, full_data in data.items():
|
def get_data(self, user_id: int) -> List[object]:
|
||||||
yield Collection(collection_string, full_data)
|
"""
|
||||||
|
Returns all data for the user.
|
||||||
|
|
||||||
|
The returned value is a list of the elements.
|
||||||
|
"""
|
||||||
|
redis = get_redis_connection()
|
||||||
|
return [json.loads(element) for element in redis.hvals(self.get_cache_key(user_id))]
|
||||||
|
|
||||||
|
def get_cache_key(self, user_id: int) -> str:
|
||||||
|
"""
|
||||||
|
Returns the cache key for a user.
|
||||||
|
"""
|
||||||
|
return cache.make_key('{}:{}'.format(self.base_cache_key, user_id))
|
||||||
|
|
||||||
|
|
||||||
startup_cache = StartupCache()
|
class DummyRestrictedDataCache:
|
||||||
|
"""
|
||||||
|
Dummy RestrictedDataCache that does nothing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def add_element(self, user_id: int, collection_string: str, id: int, data: object) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def del_element(self, user_id: int, collection_string: str, id: int) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def exists_for_user(self, user_id: int) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_data(self, user_id: int) -> List[object]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def use_redis_cache() -> bool:
|
def use_redis_cache() -> bool:
|
||||||
@ -279,5 +296,7 @@ def get_redis_connection() -> Any:
|
|||||||
|
|
||||||
if use_redis_cache():
|
if use_redis_cache():
|
||||||
websocket_user_cache = RedisWebsocketUserCache() # type: BaseWebsocketUserCache
|
websocket_user_cache = RedisWebsocketUserCache() # type: BaseWebsocketUserCache
|
||||||
|
restricted_data_cache = RestrictedDataCache() # type: Union[RestrictedDataCache, DummyRestrictedDataCache]
|
||||||
else:
|
else:
|
||||||
websocket_user_cache = DjangoCacheWebsocketUserCache()
|
websocket_user_cache = DjangoCacheWebsocketUserCache()
|
||||||
|
restricted_data_cache = DummyRestrictedDataCache()
|
||||||
|
Loading…
Reference in New Issue
Block a user