Merge pull request #2882 from normanjaeckel/ChannelSendWait

Let channels' send method wait and retry if channel layer is full.
This commit is contained in:
Norman Jäckel 2017-01-25 16:40:43 +01:00 committed by GitHub
commit df60700612
2 changed files with 42 additions and 14 deletions

View File

@ -94,7 +94,7 @@ Other:
- Fixed bug, that the last change of a config value was not send via autoupdate.
- Added support for big assemblies with lots of users.
- Used Django Channels instead of Tornado. Refactoring of the autoupdate
process.
process. Added retry with timeout in case of ChannelFull exception.
- Added new caching system with support for Redis.
- Support https as websocket protocol (wss).
- Added migration path from 2.0.

View File

@ -1,8 +1,10 @@
import json
import time
import warnings
from collections import Iterable
from asgiref.inmemory import ChannelLayer
from channels import Channel, Group
from channels.asgi import get_channel_layer
from channels.auth import channel_session_user, channel_session_user_from_http
from django.apps import apps
from django.db import transaction
@ -14,6 +16,31 @@ from .cache import websocket_user_cache
from .collection import Collection, CollectionElement, CollectionElementList
def send_or_wait(send_func, *args, **kwargs):
"""
Wrapper for channels' send() method.
If the method send() raises ChannelFull exception the worker waits for 20
milliseconds and tries again. After 5 secondes it gives up, drops the
channel message and writes a warning to stderr.
Django channels' consumer atomicity feature is disabled.
"""
kwargs['immediately'] = True
for i in range(250):
try:
send_func(*args, **kwargs)
except get_channel_layer().ChannelFull:
time.sleep(0.02)
else:
break
else:
warnings.warn(
'Channel layer is full. Channel message dropped.',
RuntimeWarning
)
@channel_session_user_from_http
def ws_add_site(message):
"""
@ -44,9 +71,9 @@ def ws_add_site(message):
# Send all data. If there is no data, then only accept the connection
if output:
message.reply_channel.send({'text': json.dumps(output)})
send_or_wait(message.reply_channel.send, {'text': json.dumps(output)})
else:
message.reply_channel.send({'accept': True})
send_or_wait(message.reply_channel.send, {'accept': True})
@channel_session_user
@ -70,12 +97,12 @@ def ws_add_projector(message, projector_id):
user = AnonymousUser()
if not user.has_perm('core.can_see_projector'):
message.reply_channel.send({'text': 'No permissions to see this projector.'})
send_or_wait(message.reply_channel.send, {'text': 'No permissions to see this projector.'})
else:
try:
projector = Projector.objects.get(pk=projector_id)
except Projector.DoesNotExist:
message.reply_channel.send({'text': 'The projector {} does not exist.'.format(projector_id)})
send_or_wait(message.reply_channel.send, {'text': 'The projector {} does not exist.'.format(projector_id)})
else:
# At first, the client is added to the projector group, so it is
# informed if the data change.
@ -105,7 +132,7 @@ def ws_add_projector(message, projector_id):
output.append(collection_element.as_autoupdate_for_projector())
# Send all the data that were only collected before.
message.reply_channel.send({'text': json.dumps(output)})
send_or_wait(message.reply_channel.send, {'text': json.dumps(output)})
def ws_disconnect_projector(message, projector_id):
@ -128,7 +155,7 @@ def send_data(message):
user = CollectionElement.from_values('users/user', user_id)
output = collection_elements.as_autoupdate_for_user(user)
for channel_name in channel_names:
Channel(channel_name).send({'text': json.dumps(output)})
send_or_wait(Channel(channel_name).send, {'text': json.dumps(output)})
# Check whether broadcast is active at the moment and set the local
# projector queryset.
@ -148,10 +175,12 @@ def send_data(message):
output.append(element.as_autoupdate_for_projector())
if output:
if config['projector_broadcast'] > 0:
Group('projector-all').send(
send_or_wait(
Group('projector-all').send,
{'text': json.dumps(output)})
else:
Group('projector-{}'.format(projector.pk)).send(
send_or_wait(
Group('projector-{}'.format(projector.pk)).send,
{'text': json.dumps(output)})
@ -229,7 +258,6 @@ def send_autoupdate(collection_elements):
Does nothing if collection_elements is empty.
"""
if collection_elements:
try:
Channel('autoupdate.send_data').send(collection_elements.as_channels_message())
except ChannelLayer.ChannelFull:
pass
send_or_wait(
Channel('autoupdate.send_data').send,
collection_elements.as_channels_message())