From bb8748a506b96ba014a1cf9022171de7bef52d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Norman=20J=C3=A4ckel?= Date: Sun, 15 Jan 2017 11:55:14 +0100 Subject: [PATCH] Let channels' send method wait and retry if channel layer is full. Fixed #2353. --- CHANGELOG | 2 +- openslides/utils/autoupdate.py | 54 ++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 1f2d1d363..d1eaa70ea 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -92,7 +92,7 @@ Other: - Fixed bug, that the last change of a config value was not send via autoupdate. - Added support for big assemblies with lots of users. - Used Django Channels instead of Tornado. Refactoring of the autoupdate - process. + process. Added retry with timeout in case of ChannelFull exception. - Added new caching system with support for Redis. - Support https as websocket protocol (wss). - Added migration path from 2.0. diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index d66477973..25537e81e 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -1,8 +1,10 @@ import json +import time +import warnings from collections import Iterable -from asgiref.inmemory import ChannelLayer from channels import Channel, Group +from channels.asgi import get_channel_layer from channels.auth import channel_session_user, channel_session_user_from_http from django.apps import apps from django.db import transaction @@ -14,6 +16,31 @@ from .cache import websocket_user_cache from .collection import Collection, CollectionElement, CollectionElementList +def send_or_wait(send_func, *args, **kwargs): + """ + Wrapper for channels' send() method. + + If the method send() raises ChannelFull exception the worker waits for 20 + milliseconds and tries again. After 5 secondes it gives up, drops the + channel message and writes a warning to stderr. + + Django channels' consumer atomicity feature is disabled. + """ + kwargs['immediately'] = True + for i in range(250): + try: + send_func(*args, **kwargs) + except get_channel_layer().ChannelFull: + time.sleep(0.02) + else: + break + else: + warnings.warn( + 'Channel layer is full. Channel message dropped.', + RuntimeWarning + ) + + @channel_session_user_from_http def ws_add_site(message): """ @@ -44,9 +71,9 @@ def ws_add_site(message): # Send all data. If there is no data, then only accept the connection if output: - message.reply_channel.send({'text': json.dumps(output)}) + send_or_wait(message.reply_channel.send, {'text': json.dumps(output)}) else: - message.reply_channel.send({'accept': True}) + send_or_wait(message.reply_channel.send, {'accept': True}) @channel_session_user @@ -70,12 +97,12 @@ def ws_add_projector(message, projector_id): user = AnonymousUser() if not user.has_perm('core.can_see_projector'): - message.reply_channel.send({'text': 'No permissions to see this projector.'}) + send_or_wait(message.reply_channel.send, {'text': 'No permissions to see this projector.'}) else: try: projector = Projector.objects.get(pk=projector_id) except Projector.DoesNotExist: - message.reply_channel.send({'text': 'The projector {} does not exist.'.format(projector_id)}) + send_or_wait(message.reply_channel.send, {'text': 'The projector {} does not exist.'.format(projector_id)}) else: # At first, the client is added to the projector group, so it is # informed if the data change. @@ -105,7 +132,7 @@ def ws_add_projector(message, projector_id): output.append(collection_element.as_autoupdate_for_projector()) # Send all the data that were only collected before. - message.reply_channel.send({'text': json.dumps(output)}) + send_or_wait(message.reply_channel.send, {'text': json.dumps(output)}) def ws_disconnect_projector(message, projector_id): @@ -128,7 +155,7 @@ def send_data(message): user = CollectionElement.from_values('users/user', user_id) output = collection_elements.as_autoupdate_for_user(user) for channel_name in channel_names: - Channel(channel_name).send({'text': json.dumps(output)}) + send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)}) # Check whether broadcast is active at the moment and set the local # projector queryset. @@ -148,10 +175,12 @@ def send_data(message): output.append(element.as_autoupdate_for_projector()) if output: if config['projector_broadcast'] > 0: - Group('projector-all').send( + send_or_wait( + Group('projector-all').send, {'text': json.dumps(output)}) else: - Group('projector-{}'.format(projector.pk)).send( + send_or_wait( + Group('projector-{}'.format(projector.pk)).send, {'text': json.dumps(output)}) @@ -229,7 +258,6 @@ def send_autoupdate(collection_elements): Does nothing if collection_elements is empty. """ if collection_elements: - try: - Channel('autoupdate.send_data').send(collection_elements.as_channels_message()) - except ChannelLayer.ChannelFull: - pass + send_or_wait( + Channel('autoupdate.send_data').send, + collection_elements.as_channels_message())