2016-03-02 00:46:19 +01:00
|
|
|
import json
|
2018-01-20 13:57:25 +01:00
|
|
|
import threading
|
2017-01-15 11:55:14 +01:00
|
|
|
import time
|
|
|
|
import warnings
|
2018-01-20 13:57:25 +01:00
|
|
|
from collections import OrderedDict, defaultdict
|
2018-02-24 17:54:59 +01:00
|
|
|
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Union
|
2015-01-18 15:53:03 +01:00
|
|
|
|
2016-05-29 08:29:14 +02:00
|
|
|
from channels import Channel, Group
|
2017-01-15 11:55:14 +01:00
|
|
|
from channels.asgi import get_channel_layer
|
2016-05-29 08:29:14 +02:00
|
|
|
from channels.auth import channel_session_user, channel_session_user_from_http
|
2017-08-18 07:56:16 +02:00
|
|
|
from django.apps import apps
|
2017-02-12 14:09:53 +01:00
|
|
|
from django.core.exceptions import ObjectDoesNotExist
|
2016-08-08 07:48:11 +02:00
|
|
|
from django.db import transaction
|
2017-08-24 12:26:55 +02:00
|
|
|
from django.db.models import Model
|
2016-01-10 00:17:00 +01:00
|
|
|
|
2016-09-17 22:26:23 +02:00
|
|
|
from ..core.config import config
|
|
|
|
from ..core.models import Projector
|
2017-06-10 09:25:27 +02:00
|
|
|
from .auth import anonymous_is_enabled, has_perm, user_to_collection_user
|
2017-08-18 07:56:16 +02:00
|
|
|
from .cache import restricted_data_cache, websocket_user_cache
|
2017-09-04 00:25:45 +02:00
|
|
|
from .collection import AutoupdateFormat # noqa
|
|
|
|
from .collection import (
|
|
|
|
ChannelMessageFormat,
|
|
|
|
Collection,
|
|
|
|
CollectionElement,
|
|
|
|
format_for_autoupdate,
|
|
|
|
from_channel_message,
|
|
|
|
to_channel_message,
|
|
|
|
)
|
2013-02-27 18:22:24 +01:00
|
|
|
|
2015-01-17 14:01:44 +01:00
|
|
|
|
2017-08-24 12:26:55 +02:00
|
|
|
def send_or_wait(send_func: Any, *args: Any, **kwargs: Any) -> None:
|
2017-01-15 11:55:14 +01:00
|
|
|
"""
|
|
|
|
Wrapper for channels' send() method.
|
|
|
|
|
|
|
|
If the method send() raises ChannelFull exception the worker waits for 20
|
|
|
|
milliseconds and tries again. After 5 secondes it gives up, drops the
|
|
|
|
channel message and writes a warning to stderr.
|
|
|
|
|
|
|
|
Django channels' consumer atomicity feature is disabled.
|
|
|
|
"""
|
|
|
|
kwargs['immediately'] = True
|
|
|
|
for i in range(250):
|
|
|
|
try:
|
|
|
|
send_func(*args, **kwargs)
|
|
|
|
except get_channel_layer().ChannelFull:
|
|
|
|
time.sleep(0.02)
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
warnings.warn(
|
|
|
|
'Channel layer is full. Channel message dropped.',
|
|
|
|
RuntimeWarning
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2016-05-29 08:29:14 +02:00
|
|
|
@channel_session_user_from_http
|
2017-08-24 12:26:55 +02:00
|
|
|
def ws_add_site(message: Any) -> None:
|
2016-05-29 08:29:14 +02:00
|
|
|
"""
|
|
|
|
Adds the websocket connection to a group specific to the connecting user.
|
2013-12-09 18:03:47 +01:00
|
|
|
|
2016-05-29 08:29:14 +02:00
|
|
|
The group with the name 'user-None' stands for all anonymous users.
|
2017-01-14 12:29:42 +01:00
|
|
|
|
2017-01-14 13:02:26 +01:00
|
|
|
Send all "startup-data" through the connection.
|
2016-05-29 08:29:14 +02:00
|
|
|
"""
|
2017-06-10 09:25:27 +02:00
|
|
|
if not anonymous_is_enabled() and not message.user.id:
|
|
|
|
send_or_wait(message.reply_channel.send, {'accept': False})
|
|
|
|
return
|
|
|
|
|
2016-12-17 09:30:20 +01:00
|
|
|
Group('site').add(message.reply_channel)
|
|
|
|
message.channel_session['user_id'] = message.user.id
|
|
|
|
# Saves the reply channel to the user. Uses 0 for anonymous users.
|
|
|
|
websocket_user_cache.add(message.user.id or 0, message.reply_channel.name)
|
2017-01-14 12:29:42 +01:00
|
|
|
|
2017-04-25 14:56:49 +02:00
|
|
|
# Open the websocket connection.
|
|
|
|
send_or_wait(message.reply_channel.send, {'accept': True})
|
|
|
|
|
2017-01-14 12:29:42 +01:00
|
|
|
# Collect all elements that shoud be send to the client when the websocket
|
2017-05-01 23:12:42 +02:00
|
|
|
# connection is established.
|
2017-04-28 00:50:37 +02:00
|
|
|
user = user_to_collection_user(message.user.id)
|
2017-08-18 07:56:16 +02:00
|
|
|
user_id = user.id if user is not None else 0
|
|
|
|
if restricted_data_cache.exists_for_user(user_id):
|
|
|
|
output = restricted_data_cache.get_data(user_id)
|
|
|
|
else:
|
|
|
|
output = []
|
|
|
|
for collection in get_startup_collections():
|
|
|
|
access_permissions = collection.get_access_permissions()
|
2017-09-04 00:25:45 +02:00
|
|
|
restricted_data = access_permissions.get_restricted_data(collection.get_full_data(), user)
|
2017-08-18 07:56:16 +02:00
|
|
|
|
|
|
|
for data in restricted_data:
|
|
|
|
if data is None:
|
|
|
|
# We do not want to send 'deleted' objects on startup.
|
|
|
|
# That's why we skip such data.
|
|
|
|
continue
|
|
|
|
|
|
|
|
formatted_data = format_for_autoupdate(
|
2017-09-04 00:25:45 +02:00
|
|
|
collection_string=collection.collection_string,
|
|
|
|
id=data['id'],
|
|
|
|
action='changed',
|
|
|
|
data=data)
|
2017-08-18 07:56:16 +02:00
|
|
|
|
|
|
|
output.append(formatted_data)
|
|
|
|
# Cache restricted data for user
|
2018-02-24 16:52:48 +01:00
|
|
|
restricted_data_cache.add_element(
|
2017-08-18 07:56:16 +02:00
|
|
|
user_id,
|
|
|
|
collection.collection_string,
|
|
|
|
data['id'],
|
|
|
|
formatted_data)
|
2017-01-14 12:29:42 +01:00
|
|
|
|
2017-05-01 23:12:42 +02:00
|
|
|
# Send all data.
|
2017-01-14 12:29:42 +01:00
|
|
|
if output:
|
2017-01-15 11:55:14 +01:00
|
|
|
send_or_wait(message.reply_channel.send, {'text': json.dumps(output)})
|
2013-02-27 18:22:24 +01:00
|
|
|
|
2015-01-17 14:01:44 +01:00
|
|
|
|
2016-05-29 08:29:14 +02:00
|
|
|
@channel_session_user
|
2017-08-24 12:26:55 +02:00
|
|
|
def ws_disconnect_site(message: Any) -> None:
|
2016-09-17 22:26:23 +02:00
|
|
|
"""
|
|
|
|
This function is called, when a client on the site disconnects.
|
|
|
|
"""
|
2016-12-17 09:30:20 +01:00
|
|
|
Group('site').discard(message.reply_channel)
|
|
|
|
websocket_user_cache.remove(message.user.id or 0, message.reply_channel.name)
|
2016-01-10 00:17:00 +01:00
|
|
|
|
2015-01-17 14:01:44 +01:00
|
|
|
|
2017-04-19 22:58:48 +02:00
|
|
|
@channel_session_user
|
2017-08-24 12:26:55 +02:00
|
|
|
def ws_receive_site(message: Any) -> None:
|
2017-04-19 22:58:48 +02:00
|
|
|
"""
|
|
|
|
This function is called if a message from a client comes in. The message
|
|
|
|
should be a list. Every item is broadcasted to the given users (or all
|
|
|
|
users if no user list is given) if it is a notify element.
|
|
|
|
|
|
|
|
The server adds the sender's user id (0 for anonymous) and reply
|
|
|
|
channel name so that a receiver client may reply to the sender or to all
|
|
|
|
sender's instances.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
incomming = json.loads(message.content['text'])
|
|
|
|
except ValueError:
|
|
|
|
# Message content is invalid. Just do nothing.
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
if isinstance(incomming, list):
|
|
|
|
# Parse all items
|
2017-08-24 12:26:55 +02:00
|
|
|
receivers_users = defaultdict(list) # type: Dict[int, List[Any]]
|
|
|
|
receivers_reply_channels = defaultdict(list) # type: Dict[str, List[Any]]
|
2017-04-19 22:58:48 +02:00
|
|
|
items_for_all = []
|
|
|
|
for item in incomming:
|
|
|
|
if item.get('collection') == 'notify':
|
|
|
|
use_receivers_dict = False
|
|
|
|
item['senderReplyChannelName'] = message.reply_channel.name
|
|
|
|
item['senderUserId'] = message.user.id or 0
|
|
|
|
|
2017-04-20 11:21:24 +02:00
|
|
|
# Force the params to be a dict
|
|
|
|
if not isinstance(item.get('params'), dict):
|
|
|
|
item['params'] = {}
|
|
|
|
|
2017-04-19 22:58:48 +02:00
|
|
|
users = item.get('users')
|
|
|
|
if isinstance(users, list):
|
|
|
|
# Send this item only to all reply channels of some site users.
|
|
|
|
for user_id in users:
|
|
|
|
receivers_users[user_id].append(item)
|
|
|
|
use_receivers_dict = True
|
|
|
|
|
|
|
|
reply_channels = item.get('replyChannels')
|
|
|
|
if isinstance(reply_channels, list):
|
|
|
|
# Send this item only to some reply channels.
|
|
|
|
for reply_channel_name in reply_channels:
|
|
|
|
receivers_reply_channels[reply_channel_name].append(item)
|
|
|
|
use_receivers_dict = True
|
|
|
|
|
|
|
|
if not use_receivers_dict:
|
|
|
|
# Send this item to all reply channels.
|
|
|
|
items_for_all.append(item)
|
|
|
|
|
|
|
|
# Send all items
|
|
|
|
for user_id, channel_names in websocket_user_cache.get_all().items():
|
|
|
|
output = receivers_users[user_id]
|
|
|
|
if len(output) > 0:
|
|
|
|
for channel_name in channel_names:
|
|
|
|
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
|
|
|
|
|
|
|
|
for channel_name, output in receivers_reply_channels.items():
|
|
|
|
if len(output) > 0:
|
|
|
|
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
|
|
|
|
|
|
|
|
if len(items_for_all) > 0:
|
|
|
|
send_or_wait(Group('site').send, {'text': json.dumps(items_for_all)})
|
|
|
|
|
|
|
|
|
2016-09-17 22:26:23 +02:00
|
|
|
@channel_session_user_from_http
|
2017-08-24 12:26:55 +02:00
|
|
|
def ws_add_projector(message: Any, projector_id: int) -> None:
|
2015-01-17 14:01:44 +01:00
|
|
|
"""
|
2016-09-17 22:26:23 +02:00
|
|
|
Adds the websocket connection to a group specific to the projector with the given id.
|
|
|
|
Also sends all data that are shown on the projector.
|
2015-01-17 14:01:44 +01:00
|
|
|
"""
|
2017-08-24 12:26:55 +02:00
|
|
|
user = user_to_collection_user(message.user.id)
|
2016-09-17 22:26:23 +02:00
|
|
|
|
2017-01-26 15:34:24 +01:00
|
|
|
if not has_perm(user, 'core.can_see_projector'):
|
2017-01-15 11:55:14 +01:00
|
|
|
send_or_wait(message.reply_channel.send, {'text': 'No permissions to see this projector.'})
|
2015-01-17 14:01:44 +01:00
|
|
|
else:
|
2016-09-17 22:26:23 +02:00
|
|
|
try:
|
|
|
|
projector = Projector.objects.get(pk=projector_id)
|
|
|
|
except Projector.DoesNotExist:
|
2017-01-15 11:55:14 +01:00
|
|
|
send_or_wait(message.reply_channel.send, {'text': 'The projector {} does not exist.'.format(projector_id)})
|
2016-09-17 22:26:23 +02:00
|
|
|
else:
|
|
|
|
# At first, the client is added to the projector group, so it is
|
|
|
|
# informed if the data change.
|
|
|
|
Group('projector-{}'.format(projector_id)).add(message.reply_channel)
|
|
|
|
|
2016-10-01 12:58:49 +02:00
|
|
|
# Then it is also added to the global projector group which is
|
|
|
|
# used for broadcasting data.
|
|
|
|
Group('projector-all').add(message.reply_channel)
|
|
|
|
|
|
|
|
# Now check whether broadcast is active at the moment. If yes,
|
|
|
|
# change the local projector variable.
|
|
|
|
if config['projector_broadcast'] > 0:
|
|
|
|
projector = Projector.objects.get(pk=config['projector_broadcast'])
|
|
|
|
|
|
|
|
# Collect all elements that are on the projector.
|
2017-09-04 00:25:45 +02:00
|
|
|
output = [] # type: List[AutoupdateFormat]
|
2016-09-30 23:39:42 +02:00
|
|
|
for requirement in projector.get_all_requirements():
|
|
|
|
required_collection_element = CollectionElement.from_instance(requirement)
|
|
|
|
output.append(required_collection_element.as_autoupdate_for_projector())
|
2016-05-29 08:29:14 +02:00
|
|
|
|
2016-10-01 12:58:49 +02:00
|
|
|
# Collect all config elements.
|
2017-09-04 00:25:45 +02:00
|
|
|
config_collection = Collection(config.get_collection_string())
|
|
|
|
projector_data = (config_collection.get_access_permissions()
|
|
|
|
.get_projector_data(config_collection.get_full_data()))
|
|
|
|
for data in projector_data:
|
|
|
|
output.append(format_for_autoupdate(
|
|
|
|
config_collection.collection_string,
|
|
|
|
data['id'],
|
|
|
|
'changed',
|
|
|
|
data))
|
2016-09-17 22:26:23 +02:00
|
|
|
|
2016-10-01 12:58:49 +02:00
|
|
|
# Collect the projector instance.
|
2016-09-17 22:26:23 +02:00
|
|
|
collection_element = CollectionElement.from_instance(projector)
|
|
|
|
output.append(collection_element.as_autoupdate_for_projector())
|
|
|
|
|
2016-10-01 12:58:49 +02:00
|
|
|
# Send all the data that were only collected before.
|
2017-01-15 11:55:14 +01:00
|
|
|
send_or_wait(message.reply_channel.send, {'text': json.dumps(output)})
|
2016-09-17 22:26:23 +02:00
|
|
|
|
|
|
|
|
2017-08-24 12:26:55 +02:00
|
|
|
def ws_disconnect_projector(message: Any, projector_id: int) -> None:
|
2016-09-17 22:26:23 +02:00
|
|
|
"""
|
|
|
|
This function is called, when a client on the projector disconnects.
|
|
|
|
"""
|
|
|
|
Group('projector-{}'.format(projector_id)).discard(message.reply_channel)
|
2018-04-18 10:59:39 +02:00
|
|
|
Group('projector-all').discard(message.reply_channel)
|
2016-09-17 22:26:23 +02:00
|
|
|
|
|
|
|
|
2018-01-20 09:45:02 +01:00
|
|
|
def send_data_projector(message: ChannelMessageFormat) -> None:
|
2016-09-17 22:26:23 +02:00
|
|
|
"""
|
2018-01-20 09:45:02 +01:00
|
|
|
Informs all projector clients about changed data.
|
2016-09-17 22:26:23 +02:00
|
|
|
"""
|
2017-09-04 00:25:45 +02:00
|
|
|
collection_elements = from_channel_message(message)
|
2017-03-06 16:34:20 +01:00
|
|
|
|
2017-09-27 13:06:21 +02:00
|
|
|
# Check whether broadcast is active at the moment and set the local
|
|
|
|
# projector queryset.
|
|
|
|
if config['projector_broadcast'] > 0:
|
|
|
|
queryset = Projector.objects.filter(pk=config['projector_broadcast'])
|
|
|
|
else:
|
|
|
|
queryset = Projector.objects.all()
|
|
|
|
|
|
|
|
# Loop over all projectors and send data that they need.
|
|
|
|
for projector in queryset:
|
|
|
|
output = []
|
|
|
|
for collection_element in collection_elements:
|
|
|
|
if collection_element.is_deleted():
|
|
|
|
output.append(collection_element.as_autoupdate_for_projector())
|
|
|
|
else:
|
|
|
|
for element in projector.get_collection_elements_required_for_this(collection_element):
|
|
|
|
output.append(element.as_autoupdate_for_projector())
|
|
|
|
if output:
|
|
|
|
if config['projector_broadcast'] > 0:
|
|
|
|
send_or_wait(
|
|
|
|
Group('projector-all').send,
|
|
|
|
{'text': json.dumps(output)})
|
|
|
|
else:
|
|
|
|
send_or_wait(
|
|
|
|
Group('projector-{}'.format(projector.pk)).send,
|
|
|
|
{'text': json.dumps(output)})
|
|
|
|
|
2018-01-20 09:45:02 +01:00
|
|
|
|
|
|
|
def send_data_site(message: ChannelMessageFormat) -> None:
|
|
|
|
"""
|
|
|
|
Informs all site users about changed data.
|
|
|
|
"""
|
|
|
|
collection_elements = from_channel_message(message)
|
|
|
|
|
2017-03-06 16:34:20 +01:00
|
|
|
# Send data to site users.
|
2016-12-17 09:30:20 +01:00
|
|
|
for user_id, channel_names in websocket_user_cache.get_all().items():
|
|
|
|
if not user_id:
|
|
|
|
# Anonymous user
|
2017-01-26 15:34:24 +01:00
|
|
|
user = None
|
2016-12-17 09:30:20 +01:00
|
|
|
else:
|
2017-02-12 14:09:53 +01:00
|
|
|
try:
|
2017-02-21 09:34:24 +01:00
|
|
|
user = user_to_collection_user(user_id)
|
2017-02-12 14:09:53 +01:00
|
|
|
except ObjectDoesNotExist:
|
|
|
|
# The user does not exist. Skip him/her.
|
|
|
|
continue
|
2017-08-18 07:56:16 +02:00
|
|
|
|
|
|
|
output = []
|
|
|
|
for collection_element in collection_elements:
|
|
|
|
formatted_data = collection_element.as_autoupdate_for_user(user)
|
|
|
|
if formatted_data['action'] == 'changed':
|
2017-11-08 10:34:47 +01:00
|
|
|
restricted_data_cache.update_element(
|
2017-08-18 07:56:16 +02:00
|
|
|
user_id or 0,
|
|
|
|
collection_element.collection_string,
|
|
|
|
collection_element.id,
|
|
|
|
formatted_data)
|
|
|
|
else:
|
|
|
|
restricted_data_cache.del_element(
|
|
|
|
user_id or 0,
|
|
|
|
collection_element.collection_string,
|
|
|
|
collection_element.id)
|
|
|
|
output.append(formatted_data)
|
|
|
|
|
2016-12-17 09:30:20 +01:00
|
|
|
for channel_name in channel_names:
|
2017-01-15 11:55:14 +01:00
|
|
|
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
|
2016-09-18 16:00:31 +02:00
|
|
|
|
2016-05-29 08:29:14 +02:00
|
|
|
|
2018-02-24 16:38:17 +01:00
|
|
|
def to_ordered_dict(d: Optional[Dict]) -> Optional[OrderedDict]:
|
2018-01-20 13:57:25 +01:00
|
|
|
"""
|
|
|
|
Little helper to hash information dict in inform_*_data.
|
|
|
|
"""
|
|
|
|
if isinstance(d, dict):
|
2018-02-24 16:38:17 +01:00
|
|
|
result = OrderedDict([(key, to_ordered_dict(d[key])) for key in sorted(d.keys())]) # type: Optional[OrderedDict]
|
2018-01-20 13:57:25 +01:00
|
|
|
else:
|
|
|
|
result = d
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
2017-08-18 07:56:16 +02:00
|
|
|
def inform_changed_data(instances: Union[Iterable[Model], Model], information: Dict[str, Any]=None) -> None:
|
2016-09-18 16:00:31 +02:00
|
|
|
"""
|
|
|
|
Informs the autoupdate system and the caching system about the creation or
|
2018-01-20 13:57:25 +01:00
|
|
|
update of an element. This is done via the AutoupdateBundleMiddleware.
|
2016-10-01 15:26:00 +02:00
|
|
|
|
2018-01-20 13:57:25 +01:00
|
|
|
The argument instances can be one instance or an iterable over instances.
|
2016-09-18 16:00:31 +02:00
|
|
|
"""
|
2016-10-01 15:26:00 +02:00
|
|
|
root_instances = set()
|
|
|
|
if not isinstance(instances, Iterable):
|
|
|
|
instances = (instances, )
|
2017-08-18 07:56:16 +02:00
|
|
|
|
2016-10-01 15:26:00 +02:00
|
|
|
for instance in instances:
|
|
|
|
try:
|
|
|
|
root_instances.add(instance.get_root_rest_element())
|
|
|
|
except AttributeError:
|
|
|
|
# Instance has no method get_root_rest_element. Just ignore it.
|
|
|
|
pass
|
|
|
|
|
2018-01-20 13:57:25 +01:00
|
|
|
# Put all collection elements into the autoupdate_bundle.
|
|
|
|
bundle = autoupdate_bundle.get(threading.get_ident())
|
|
|
|
if bundle is not None:
|
|
|
|
# Run autoupdate only if the bundle exists because we are in a request-response-cycle.
|
|
|
|
for root_instance in root_instances:
|
|
|
|
collection_element = CollectionElement.from_instance(
|
2016-10-01 15:26:00 +02:00
|
|
|
root_instance,
|
2018-01-20 13:57:25 +01:00
|
|
|
information=information)
|
|
|
|
key = root_instance.get_collection_string() + str(root_instance.get_rest_pk()) + str(to_ordered_dict(information))
|
|
|
|
bundle[key] = collection_element
|
2015-01-17 14:01:44 +01:00
|
|
|
|
2016-09-30 20:42:58 +02:00
|
|
|
|
2017-09-04 00:25:45 +02:00
|
|
|
def inform_deleted_data(elements: Iterable[Tuple[str, int]], information: Dict[str, Any]=None) -> None:
|
2015-01-17 14:01:44 +01:00
|
|
|
"""
|
2016-09-18 16:00:31 +02:00
|
|
|
Informs the autoupdate system and the caching system about the deletion of
|
2018-01-20 13:57:25 +01:00
|
|
|
elements. This is done via the AutoupdateBundleMiddleware.
|
2016-10-01 15:26:00 +02:00
|
|
|
|
|
|
|
The argument information is added to each collection element.
|
|
|
|
"""
|
2018-01-20 13:57:25 +01:00
|
|
|
# Put all stuff to be deleted into the autoupdate_bundle.
|
|
|
|
bundle = autoupdate_bundle.get(threading.get_ident())
|
|
|
|
if bundle is not None:
|
|
|
|
# Run autoupdate only if the bundle exists because we are in a request-response-cycle.
|
|
|
|
for element in elements:
|
|
|
|
collection_element = CollectionElement.from_values(
|
|
|
|
collection_string=element[0],
|
|
|
|
id=element[1],
|
|
|
|
deleted=True,
|
|
|
|
information=information)
|
|
|
|
key = element[0] + str(element[1]) + str(to_ordered_dict(information))
|
|
|
|
bundle[key] = collection_element
|
2016-02-11 11:29:19 +01:00
|
|
|
|
|
|
|
|
2017-09-04 00:25:45 +02:00
|
|
|
def inform_data_collection_element_list(collection_elements: List[CollectionElement],
|
2017-08-24 12:26:55 +02:00
|
|
|
information: Dict[str, Any]=None) -> None:
|
2017-03-06 16:34:20 +01:00
|
|
|
"""
|
|
|
|
Informs the autoupdate system about some collection elements. This is
|
|
|
|
used just to send some data to all users.
|
|
|
|
"""
|
2018-01-20 13:57:25 +01:00
|
|
|
# Put all stuff into the autoupdate_bundle.
|
|
|
|
bundle = autoupdate_bundle.get(threading.get_ident())
|
|
|
|
if bundle is not None:
|
|
|
|
# Run autoupdate only if the bundle exists because we are in a request-response-cycle.
|
|
|
|
for collection_element in collection_elements:
|
|
|
|
key = collection_element.collection_string + str(collection_element.id) + str(to_ordered_dict(information))
|
|
|
|
bundle[key] = collection_element
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
Global container for autoupdate bundles
|
|
|
|
"""
|
|
|
|
autoupdate_bundle = {} # type: Dict[int, Dict[str, CollectionElement]]
|
|
|
|
|
|
|
|
|
|
|
|
class AutoupdateBundleMiddleware:
|
|
|
|
"""
|
|
|
|
Middleware to handle autoupdate bundling.
|
|
|
|
"""
|
2018-02-24 16:38:17 +01:00
|
|
|
def __init__(self, get_response: Any) -> None:
|
2018-01-20 13:57:25 +01:00
|
|
|
self.get_response = get_response
|
|
|
|
# One-time configuration and initialization.
|
|
|
|
|
2018-02-24 16:38:17 +01:00
|
|
|
def __call__(self, request: Any) -> Any:
|
2018-01-20 13:57:25 +01:00
|
|
|
thread_id = threading.get_ident()
|
|
|
|
autoupdate_bundle[thread_id] = {}
|
|
|
|
|
|
|
|
response = self.get_response(request)
|
|
|
|
|
|
|
|
bundle = autoupdate_bundle.pop(thread_id) # type: Dict[str, CollectionElement]
|
|
|
|
# If currently there is an open database transaction, then the
|
|
|
|
# send_autoupdate function is only called, when the transaction is
|
|
|
|
# commited. If there is currently no transaction, then the function
|
|
|
|
# is called immediately.
|
|
|
|
transaction.on_commit(lambda: send_autoupdate(bundle.values()))
|
|
|
|
return response
|
2017-03-06 16:34:20 +01:00
|
|
|
|
|
|
|
|
2018-01-20 13:57:25 +01:00
|
|
|
def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> None:
|
2016-02-11 11:29:19 +01:00
|
|
|
"""
|
2016-10-01 15:26:00 +02:00
|
|
|
Helper function, that sends collection_elements through a channel to the
|
2016-09-18 16:00:31 +02:00
|
|
|
autoupdate system.
|
2016-10-01 15:26:00 +02:00
|
|
|
|
|
|
|
Does nothing if collection_elements is empty.
|
2016-02-11 11:29:19 +01:00
|
|
|
"""
|
2016-10-01 15:26:00 +02:00
|
|
|
if collection_elements:
|
2017-01-15 11:55:14 +01:00
|
|
|
send_or_wait(
|
2018-01-20 09:45:02 +01:00
|
|
|
Channel('autoupdate.send_data_projector').send,
|
|
|
|
to_channel_message(collection_elements))
|
|
|
|
send_or_wait(
|
|
|
|
Channel('autoupdate.send_data_site').send,
|
2017-09-04 00:25:45 +02:00
|
|
|
to_channel_message(collection_elements))
|
2017-08-18 07:56:16 +02:00
|
|
|
|
|
|
|
|
|
|
|
def get_startup_collections() -> Generator[Collection, None, None]:
|
|
|
|
"""
|
|
|
|
Returns all Collections that should be send to the user at startup
|
|
|
|
"""
|
|
|
|
for app in apps.get_app_configs():
|
|
|
|
try:
|
|
|
|
# Get the method get_startup_elements() from an app.
|
|
|
|
# This method has to return an iterable of Collection objects.
|
|
|
|
get_startup_elements = app.get_startup_elements
|
|
|
|
except AttributeError:
|
|
|
|
# Skip apps that do not implement get_startup_elements.
|
|
|
|
continue
|
|
|
|
|
|
|
|
yield from get_startup_elements()
|