Let channels' send method wait and retry if channel layer is full. Fixed #2353.

This commit is contained in:
Norman Jäckel 2017-01-15 11:55:14 +01:00
parent 624fcc663b
commit bb8748a506
2 changed files with 42 additions and 14 deletions

View File

@ -92,7 +92,7 @@ Other:
- Fixed bug, that the last change of a config value was not send via autoupdate. - Fixed bug, that the last change of a config value was not send via autoupdate.
- Added support for big assemblies with lots of users. - Added support for big assemblies with lots of users.
- Used Django Channels instead of Tornado. Refactoring of the autoupdate - Used Django Channels instead of Tornado. Refactoring of the autoupdate
process. process. Added retry with timeout in case of ChannelFull exception.
- Added new caching system with support for Redis. - Added new caching system with support for Redis.
- Support https as websocket protocol (wss). - Support https as websocket protocol (wss).
- Added migration path from 2.0. - Added migration path from 2.0.

View File

@ -1,8 +1,10 @@
import json import json
import time
import warnings
from collections import Iterable from collections import Iterable
from asgiref.inmemory import ChannelLayer
from channels import Channel, Group from channels import Channel, Group
from channels.asgi import get_channel_layer
from channels.auth import channel_session_user, channel_session_user_from_http from channels.auth import channel_session_user, channel_session_user_from_http
from django.apps import apps from django.apps import apps
from django.db import transaction from django.db import transaction
@ -14,6 +16,31 @@ from .cache import websocket_user_cache
from .collection import Collection, CollectionElement, CollectionElementList from .collection import Collection, CollectionElement, CollectionElementList
def send_or_wait(send_func, *args, **kwargs):
"""
Wrapper for channels' send() method.
If the method send() raises ChannelFull exception the worker waits for 20
milliseconds and tries again. After 5 secondes it gives up, drops the
channel message and writes a warning to stderr.
Django channels' consumer atomicity feature is disabled.
"""
kwargs['immediately'] = True
for i in range(250):
try:
send_func(*args, **kwargs)
except get_channel_layer().ChannelFull:
time.sleep(0.02)
else:
break
else:
warnings.warn(
'Channel layer is full. Channel message dropped.',
RuntimeWarning
)
@channel_session_user_from_http @channel_session_user_from_http
def ws_add_site(message): def ws_add_site(message):
""" """
@ -44,9 +71,9 @@ def ws_add_site(message):
# Send all data. If there is no data, then only accept the connection # Send all data. If there is no data, then only accept the connection
if output: if output:
message.reply_channel.send({'text': json.dumps(output)}) send_or_wait(message.reply_channel.send, {'text': json.dumps(output)})
else: else:
message.reply_channel.send({'accept': True}) send_or_wait(message.reply_channel.send, {'accept': True})
@channel_session_user @channel_session_user
@ -70,12 +97,12 @@ def ws_add_projector(message, projector_id):
user = AnonymousUser() user = AnonymousUser()
if not user.has_perm('core.can_see_projector'): if not user.has_perm('core.can_see_projector'):
message.reply_channel.send({'text': 'No permissions to see this projector.'}) send_or_wait(message.reply_channel.send, {'text': 'No permissions to see this projector.'})
else: else:
try: try:
projector = Projector.objects.get(pk=projector_id) projector = Projector.objects.get(pk=projector_id)
except Projector.DoesNotExist: except Projector.DoesNotExist:
message.reply_channel.send({'text': 'The projector {} does not exist.'.format(projector_id)}) send_or_wait(message.reply_channel.send, {'text': 'The projector {} does not exist.'.format(projector_id)})
else: else:
# At first, the client is added to the projector group, so it is # At first, the client is added to the projector group, so it is
# informed if the data change. # informed if the data change.
@ -105,7 +132,7 @@ def ws_add_projector(message, projector_id):
output.append(collection_element.as_autoupdate_for_projector()) output.append(collection_element.as_autoupdate_for_projector())
# Send all the data that were only collected before. # Send all the data that were only collected before.
message.reply_channel.send({'text': json.dumps(output)}) send_or_wait(message.reply_channel.send, {'text': json.dumps(output)})
def ws_disconnect_projector(message, projector_id): def ws_disconnect_projector(message, projector_id):
@ -128,7 +155,7 @@ def send_data(message):
user = CollectionElement.from_values('users/user', user_id) user = CollectionElement.from_values('users/user', user_id)
output = collection_elements.as_autoupdate_for_user(user) output = collection_elements.as_autoupdate_for_user(user)
for channel_name in channel_names: for channel_name in channel_names:
Channel(channel_name).send({'text': json.dumps(output)}) send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
# Check whether broadcast is active at the moment and set the local # Check whether broadcast is active at the moment and set the local
# projector queryset. # projector queryset.
@ -148,10 +175,12 @@ def send_data(message):
output.append(element.as_autoupdate_for_projector()) output.append(element.as_autoupdate_for_projector())
if output: if output:
if config['projector_broadcast'] > 0: if config['projector_broadcast'] > 0:
Group('projector-all').send( send_or_wait(
Group('projector-all').send,
{'text': json.dumps(output)}) {'text': json.dumps(output)})
else: else:
Group('projector-{}'.format(projector.pk)).send( send_or_wait(
Group('projector-{}'.format(projector.pk)).send,
{'text': json.dumps(output)}) {'text': json.dumps(output)})
@ -229,7 +258,6 @@ def send_autoupdate(collection_elements):
Does nothing if collection_elements is empty. Does nothing if collection_elements is empty.
""" """
if collection_elements: if collection_elements:
try: send_or_wait(
Channel('autoupdate.send_data').send(collection_elements.as_channels_message()) Channel('autoupdate.send_data').send,
except ChannelLayer.ChannelFull: collection_elements.as_channels_message())
pass