Add a protocol for websocket
{'type': STRING, 'content': ANY}
This commit is contained in:
parent
cbf8a33b8d
commit
fed6d6f435
@ -35,6 +35,7 @@ Core:
|
||||
- Support Python 3.7 [#3786].
|
||||
- Updated pdfMake to 0.1.37 [#3766].
|
||||
- Updated Django to 2.1 [#3777, #3786].
|
||||
- Adds a websocket protocol [#3807].
|
||||
|
||||
|
||||
Version 2.2 (2018-06-06)
|
||||
|
@ -1,5 +1,6 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import jsonschema
|
||||
from asgiref.sync import sync_to_async
|
||||
from channels.db import database_sync_to_async
|
||||
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
||||
@ -17,7 +18,73 @@ from .collection import (
|
||||
)
|
||||
|
||||
|
||||
class SiteConsumer(AsyncJsonWebsocketConsumer):
|
||||
class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
|
||||
"""
|
||||
Mixin for JSONWebsocketConsumers, that speaks the a special protocol.
|
||||
"""
|
||||
schema = {
|
||||
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||
"title": "OpenSlidesWebsocketProtocol",
|
||||
"description": "The base packages that OpenSlides sends between the server and the client.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Defines what kind of packages is packed.",
|
||||
"type": "string",
|
||||
"pattern": "notify", # The server can sent other types
|
||||
},
|
||||
"content": {
|
||||
"description": "The content of the package.",
|
||||
},
|
||||
"id": {
|
||||
"description": "An identifier of the package.",
|
||||
"type": "string",
|
||||
},
|
||||
"in_response": {
|
||||
"description": "The id of another package that the other part sent before.",
|
||||
"type": "string",
|
||||
}
|
||||
},
|
||||
"required": ["type", "content", "id"],
|
||||
}
|
||||
|
||||
async def send_json(self, type: str, content: Any, id: Optional[str] = None, in_response: Optional[str] = None) -> None:
|
||||
"""
|
||||
Sends the data with the type.
|
||||
"""
|
||||
out = {'type': type, 'content': content}
|
||||
if id:
|
||||
out['id'] = id
|
||||
if in_response:
|
||||
out['in_response'] = in_response
|
||||
await super().send_json(out)
|
||||
|
||||
async def receive_json(self, content: Any) -> None:
|
||||
"""
|
||||
Receives the json data, parses it and calls receive_content.
|
||||
"""
|
||||
try:
|
||||
jsonschema.validate(content, self.schema)
|
||||
except jsonschema.ValidationError as err:
|
||||
try:
|
||||
in_response = content['id']
|
||||
except (TypeError, KeyError):
|
||||
# content is not a dict (TypeError) or has not the key id (KeyError)
|
||||
in_response = None
|
||||
|
||||
await self.send_json(
|
||||
type='error',
|
||||
content=str(err),
|
||||
in_response=in_response)
|
||||
return
|
||||
|
||||
await self.receive_content(type=content['type'], content=content['content'], id=content['id'])
|
||||
|
||||
async def receive_content(self, type: str, content: object, id: str) -> None:
|
||||
raise NotImplementedError("ProtocollAsyncJsonWebsocketConsumer needs the method receive_content()")
|
||||
|
||||
|
||||
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
"""
|
||||
Websocket Consumer for the site.
|
||||
"""
|
||||
@ -37,9 +104,9 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
||||
else:
|
||||
await self.accept()
|
||||
data = await startup_data(self.scope['user'])
|
||||
await self.send_json(data)
|
||||
await self.send_json(type='autoupdate', content=data)
|
||||
|
||||
async def receive_json(self, content: Any) -> None:
|
||||
async def receive_content(self, type: str, content: Any, id: str) -> None:
|
||||
"""
|
||||
If we recieve something from the client we currently just interpret this
|
||||
as a notify message.
|
||||
@ -48,27 +115,28 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
||||
channel name so that a receiver client may reply to the sender or to all
|
||||
sender's instances.
|
||||
"""
|
||||
if notify_message_is_valid(content):
|
||||
await self.channel_layer.group_send(
|
||||
"projector",
|
||||
{
|
||||
"type": "send_notify",
|
||||
"incomming": content,
|
||||
"senderReplyChannelName": self.channel_name,
|
||||
"senderUserId": self.scope['user'].id or 0,
|
||||
},
|
||||
)
|
||||
await self.channel_layer.group_send(
|
||||
"site",
|
||||
{
|
||||
"type": "send_notify",
|
||||
"incomming": content,
|
||||
"senderReplyChannelName": self.channel_name,
|
||||
"senderUserId": self.scope['user'].id or 0,
|
||||
},
|
||||
)
|
||||
else:
|
||||
await self.send_json({'error': 'invalid message'})
|
||||
if type == 'notify':
|
||||
if notify_message_is_valid(content):
|
||||
await self.channel_layer.group_send(
|
||||
"projector",
|
||||
{
|
||||
"type": "send_notify",
|
||||
"incomming": content,
|
||||
"senderReplyChannelName": self.channel_name,
|
||||
"senderUserId": self.scope['user'].id or 0,
|
||||
},
|
||||
)
|
||||
await self.channel_layer.group_send(
|
||||
"site",
|
||||
{
|
||||
"type": "send_notify",
|
||||
"incomming": content,
|
||||
"senderReplyChannelName": self.channel_name,
|
||||
"senderUserId": self.scope['user'].id or 0,
|
||||
},
|
||||
)
|
||||
else:
|
||||
await self.send_json(type='error', content='Invalid notify message', in_response=id)
|
||||
|
||||
async def send_notify(self, event: Dict[str, Any]) -> None:
|
||||
"""
|
||||
@ -90,7 +158,7 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
||||
out.append(item)
|
||||
|
||||
if out:
|
||||
await self.send_json(out)
|
||||
await self.send_json(type='notify', content=out)
|
||||
|
||||
async def send_data(self, event: Dict[str, Any]) -> None:
|
||||
"""
|
||||
@ -112,10 +180,10 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
||||
collection_string=collection_string,
|
||||
id=id,
|
||||
action='deleted'))
|
||||
await self.send_json(output)
|
||||
await self.send_json(type='autoupdate', content=output)
|
||||
|
||||
|
||||
class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
||||
class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||
"""
|
||||
Websocket Consumer for the projector.
|
||||
"""
|
||||
@ -132,14 +200,14 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
||||
await self.accept()
|
||||
|
||||
if not await database_sync_to_async(has_perm)(user, 'core.can_see_projector'):
|
||||
await self.send_json({'text': 'No permissions to see this projector.'})
|
||||
await self.send_json(type='error', content='No permissions to see this projector.')
|
||||
# TODO: Shouldend we just close the websocket connection with an error message?
|
||||
# self.close(code=4403)
|
||||
else:
|
||||
out = await sync_to_async(projector_startup_data)(projector_id)
|
||||
await self.send_json(out)
|
||||
await self.send_json(type='autoupdate', content=out)
|
||||
|
||||
async def receive_json(self, content: Any) -> None:
|
||||
async def receive_content(self, type: str, content: Any, id: str) -> None:
|
||||
"""
|
||||
If we recieve something from the client we currently just interpret this
|
||||
as a notify message.
|
||||
@ -188,7 +256,7 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
||||
out.append(item)
|
||||
|
||||
if out:
|
||||
await self.send_json(out)
|
||||
await self.send_json(type='notify', content=out)
|
||||
|
||||
async def send_data(self, event: Dict[str, Any]) -> None:
|
||||
"""
|
||||
@ -199,7 +267,7 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
||||
|
||||
output = await projector_sync_send_data(projector_id, collection_elements)
|
||||
if output:
|
||||
await self.send_json(output)
|
||||
await self.send_json(type='autoupdate', content=output)
|
||||
|
||||
|
||||
async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> List[Any]:
|
||||
@ -288,19 +356,33 @@ def notify_message_is_valid(message: object) -> bool:
|
||||
"""
|
||||
Returns True, when the message is a valid notify_message.
|
||||
"""
|
||||
if not isinstance(message, list):
|
||||
# message has to be a list
|
||||
schema = {
|
||||
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||
"title": "Notify elements.",
|
||||
"description": "Elements that one client can send to one or many other clients.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"projectors": {
|
||||
"type": "array",
|
||||
"items": {"type": "integer"},
|
||||
},
|
||||
"reply_channels": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
},
|
||||
"users": {
|
||||
"type": "array",
|
||||
"items": {"type": "integer"},
|
||||
}
|
||||
}
|
||||
},
|
||||
"minItems": 1,
|
||||
}
|
||||
try:
|
||||
jsonschema.validate(message, schema)
|
||||
except jsonschema.ValidationError:
|
||||
return False
|
||||
|
||||
if not message:
|
||||
# message must contain at least one element
|
||||
return False
|
||||
|
||||
for element in message:
|
||||
if not isinstance(element, dict):
|
||||
# All elements have to be a dict
|
||||
return False
|
||||
# TODO: There could be more checks. For example 'users' has to be a list of int
|
||||
# Check could be done with json-schema:
|
||||
# https://pypi.org/project/jsonschema/
|
||||
return True
|
||||
else:
|
||||
return True
|
||||
|
@ -5,6 +5,7 @@ daphne>=2.2,<2.3
|
||||
Django>=1.11,<2.2
|
||||
djangorestframework>=3.4,<3.9
|
||||
jsonfield2>=3.0,<3.1
|
||||
jsonschema>=2.6.0<2.7
|
||||
mypy_extensions>=0.3,<0.4
|
||||
PyPDF2>=1.26,<1.27
|
||||
roman>=2.0,<3.1
|
||||
|
@ -63,8 +63,11 @@ async def test_normal_connection(communicator):
|
||||
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
# Test, that there is a lot of startup data.
|
||||
assert len(response) > 5
|
||||
type = response.get('type')
|
||||
content = response.get('content')
|
||||
assert type == 'autoupdate'
|
||||
# Test, that both example objects are returned
|
||||
assert len(content) > 10
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -78,7 +81,10 @@ async def test_receive_changed_data(communicator):
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
id = config.get_key_to_id()['general_event_name']
|
||||
assert response == [
|
||||
type = response.get('type')
|
||||
content = response.get('content')
|
||||
assert type == 'autoupdate'
|
||||
assert content == [
|
||||
{'action': 'changed',
|
||||
'collection': 'core/config',
|
||||
'data': {'id': id, 'key': 'general_event_name', 'value': 'Test Event'},
|
||||
@ -122,7 +128,10 @@ async def test_receive_deleted_data(communicator):
|
||||
await sync_to_async(inform_deleted_data)([(Collection1().get_collection_string(), 1)])
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
assert response == [{'action': 'deleted', 'collection': Collection1().get_collection_string(), 'id': 1}]
|
||||
type = response.get('type')
|
||||
content = response.get('content')
|
||||
assert type == 'autoupdate'
|
||||
assert content == [{'action': 'deleted', 'collection': Collection1().get_collection_string(), 'id': 1}]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -132,11 +141,12 @@ async def test_send_invalid_notify_not_a_list(communicator):
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to({'testmessage': 'foobar, what else.'})
|
||||
|
||||
await communicator.send_json_to({'type': 'notify', 'content': {'testmessage': 'foobar, what else.'}, 'id': 'test_send_invalid_notify_not_a_list'})
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
assert response == {'error': 'invalid message'}
|
||||
assert response['type'] == 'error'
|
||||
assert response['content'] == 'Invalid notify message'
|
||||
assert response['in_response'] == 'test_send_invalid_notify_not_a_list'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -146,11 +156,12 @@ async def test_send_invalid_notify_no_elements(communicator):
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to([])
|
||||
|
||||
await communicator.send_json_to({'type': 'notify', 'content': [], 'id': 'test_send_invalid_notify_no_elements'})
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
assert response == {'error': 'invalid message'}
|
||||
assert response['type'] == 'error'
|
||||
assert response['content'] == 'Invalid notify message'
|
||||
assert response['in_response'] == 'test_send_invalid_notify_no_elements'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -160,11 +171,12 @@ async def test_send_invalid_notify_str_in_list(communicator):
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to([{}, 'testmessage'])
|
||||
|
||||
await communicator.send_json_to({'type': 'notify', 'content': [{}, 'testmessage'], 'id': 'test_send_invalid_notify_str_in_list'})
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
assert response == {'error': 'invalid message'}
|
||||
assert response['type'] == 'error'
|
||||
assert response['content'] == 'Invalid notify message'
|
||||
assert response['in_response'] == 'test_send_invalid_notify_str_in_list'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -174,12 +186,65 @@ async def test_send_valid_notify(communicator):
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to([{'testmessage': 'foobar, what else.'}])
|
||||
|
||||
await communicator.send_json_to({'type': 'notify', 'content': [{'testmessage': 'foobar, what else.'}], 'id': 'test'})
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
assert isinstance(response, list)
|
||||
assert len(response) == 1
|
||||
assert response[0]['testmessage'] == 'foobar, what else.'
|
||||
assert 'senderReplyChannelName' in response[0]
|
||||
assert response[0]['senderUserId'] == 0
|
||||
content = response['content']
|
||||
assert isinstance(content, list)
|
||||
assert len(content) == 1
|
||||
assert content[0]['testmessage'] == 'foobar, what else.'
|
||||
assert 'senderReplyChannelName' in content[0]
|
||||
assert content[0]['senderUserId'] == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_websocket_message_type(communicator):
|
||||
await set_config('general_system_enable_anonymous', True)
|
||||
await communicator.connect()
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to([])
|
||||
|
||||
response = await communicator.receive_json_from()
|
||||
assert response['type'] == 'error'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_websocket_message_no_id(communicator):
|
||||
await set_config('general_system_enable_anonymous', True)
|
||||
await communicator.connect()
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to({'type': 'test', 'content': 'foobar'})
|
||||
|
||||
response = await communicator.receive_json_from()
|
||||
assert response['type'] == 'error'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_websocket_message_no_content(communicator):
|
||||
await set_config('general_system_enable_anonymous', True)
|
||||
await communicator.connect()
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to({'type': 'test', 'id': 'test_id'})
|
||||
|
||||
response = await communicator.receive_json_from()
|
||||
assert response['type'] == 'error'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_unknown_type(communicator):
|
||||
await set_config('general_system_enable_anonymous', True)
|
||||
await communicator.connect()
|
||||
# Await the startup data
|
||||
await communicator.receive_json_from()
|
||||
|
||||
await communicator.send_json_to({'type': 'if_you_add_this_type_to_openslides_I_will_be_sad', 'content': True, 'id': 'test_id'})
|
||||
|
||||
response = await communicator.receive_json_from()
|
||||
assert response['type'] == 'error'
|
||||
assert response['in_response'] == 'test_id'
|
||||
|
@ -43,21 +43,8 @@ DATABASES = {
|
||||
}
|
||||
}
|
||||
|
||||
# Configure session in the cache
|
||||
|
||||
CACHES = {
|
||||
'default': {
|
||||
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
||||
}
|
||||
}
|
||||
|
||||
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
||||
|
||||
# When use_redis is True, the restricted data cache caches the data individuel
|
||||
# for each user. This requires a lot of memory if there are a lot of active
|
||||
# users. If use_redis is False, this setting has no effect.
|
||||
DISABLE_USER_CACHE = False
|
||||
|
||||
# Internationalization
|
||||
# https://docs.djangoproject.com/en/1.10/topics/i18n/
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user