diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0932311f7..067de6fb7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,7 +12,7 @@ Core: startup and caching system, dropped support for Geiss [#3796, #3789]. - Dropped support for Python 3.5 [#3805]. - Added a websocket protocol for server client communication using - JSON schema [#3807]. + JSON schema [#3807, #3949]. - Changed URL schema [#3798]. - Enabled docs for using OpenSlides with Gunicorn and Uvicorn in big mode [#3799, #3817]. diff --git a/openslides/core/apps.py b/openslides/core/apps.py index c643c3074..1d85f4dd4 100644 --- a/openslides/core/apps.py +++ b/openslides/core/apps.py @@ -40,6 +40,13 @@ class CoreAppConfig(AppConfig): TagViewSet, ) from ..utils.constants import set_constants, get_constants_from_apps + from .websocket import ( + NotifyWebsocketClientMessage, + ConstantsWebsocketClientMessage, + GetElementsWebsocketClientMessage, + AutoupdateWebsocketClientMessage, + ) + from ..utils.websocket import register_client_message # Collect all config variables before getting the constants. config.collect_config_variables_from_apps() @@ -82,6 +89,12 @@ class CoreAppConfig(AppConfig): # This happens in the tests or in migrations. Do nothing pass + # Register client messages + register_client_message(NotifyWebsocketClientMessage()) + register_client_message(ConstantsWebsocketClientMessage()) + register_client_message(GetElementsWebsocketClientMessage()) + register_client_message(AutoupdateWebsocketClientMessage()) + def get_config_variables(self): from .config_variables import get_config_variables return get_config_variables() diff --git a/openslides/core/websocket.py b/openslides/core/websocket.py new file mode 100644 index 000000000..8c04d348c --- /dev/null +++ b/openslides/core/websocket.py @@ -0,0 +1,104 @@ +from typing import Any + +from ..utils.constants import get_constants +from ..utils.websocket import ( + BaseWebsocketClientMessage, + ProtocollAsyncJsonWebsocketConsumer, + get_element_data, +) + + +class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): + """ + Websocket message from a client to send a message to other clients. + """ + identifier = 'notify' + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Notify elements.", + "description": "Elements that one client can send to one or many other clients.", + "type": "array", + "items": { + "type": "object", + "properties": { + "projectors": { + "type": "array", + "items": {"type": "integer"}, + }, + "reply_channels": { + "type": "array", + "items": {"type": "string"}, + }, + "users": { + "type": "array", + "items": {"type": "integer"}, + } + } + }, + "minItems": 1, + } + + async def receive_content(self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str) -> None: + await consumer.channel_layer.group_send( + "site", + { + "type": "send_notify", + "incomming": content, + "senderReplyChannelName": consumer.channel_name, + "senderUserId": consumer.scope['user'].id or 0, + }, + ) + + +class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): + """ + The Client requests the constants. + """ + identifier = 'constants' + content_required = False + + async def receive_content(self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str) -> None: + # Return all constants to the client. + await consumer.send_json(type='constants', content=get_constants(), in_response=id) + + +class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): + """ + The Client request database elements. + """ + identifier = 'getElements' + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "titel": "getElement request", + "description": "Request from the client to server to get elements.", + "type": "object", + "properties": { + # change_id is not required + "change_id": { + "type": "integer", + } + }, + } + + async def receive_content(self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str) -> None: + requested_change_id = content.get('change_id', 0) + try: + element_data = await get_element_data(consumer.scope['user'], requested_change_id) + except ValueError as error: + await consumer.send_json(type='error', content=str(error), in_response=id) + else: + await consumer.send_json(type='autoupdate', content=element_data, in_response=id) + + +class AutoupdateWebsocketClientMessage(BaseWebsocketClientMessage): + """ + The Client turns autoupdate on or off. + """ + identifier = 'autoupdate' + + async def receive_content(self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str) -> None: + # Turn on or off the autoupdate for the client + if content: # accept any value, that can be interpreted as bool + await consumer.channel_layer.group_add('autoupdate', consumer.channel_name) + else: + await consumer.channel_layer.group_discard('autoupdate', consumer.channel_name) diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 5fb387f70..2a01b6ce0 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -1,11 +1,9 @@ from collections import defaultdict -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List from urllib.parse import parse_qs -import jsonschema from asgiref.sync import sync_to_async from channels.db import database_sync_to_async -from channels.generic.websocket import AsyncJsonWebsocketConsumer from ..core.config import config from ..core.models import Projector @@ -18,73 +16,7 @@ from .collection import ( format_for_autoupdate_old, from_channel_message, ) -from .constants import get_constants - - -class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): - """ - Mixin for JSONWebsocketConsumers, that speaks the a special protocol. - """ - schema = { - "$schema": "http://json-schema.org/draft-04/schema#", - "title": "OpenSlidesWebsocketProtocol", - "description": "The base packages that OpenSlides sends between the server and the client.", - "type": "object", - "properties": { - "type": { - "description": "Defines what kind of packages is packed.", - "type": "string", - "pattern": "notify|constants|getElements|autoupdate", # The server can sent other types - }, - "content": { - "description": "The content of the package.", - }, - "id": { - "description": "An identifier of the package.", - "type": "string", - }, - "in_response": { - "description": "The id of another package that the other part sent before.", - "type": "string", - } - }, - "required": ["type", "content", "id"], - } - - async def send_json(self, type: str, content: Any, id: Optional[str] = None, in_response: Optional[str] = None) -> None: - """ - Sends the data with the type. - """ - out = {'type': type, 'content': content} - if id: - out['id'] = id - if in_response: - out['in_response'] = in_response - await super().send_json(out) - - async def receive_json(self, content: Any) -> None: - """ - Receives the json data, parses it and calls receive_content. - """ - try: - jsonschema.validate(content, self.schema) - except jsonschema.ValidationError as err: - try: - in_response = content['id'] - except (TypeError, KeyError): - # content is not a dict (TypeError) or has not the key id (KeyError) - in_response = None - - await self.send_json( - type='error', - content=str(err), - in_response=in_response) - return - - await self.receive_content(type=content['type'], content=content['content'], id=content['id']) - - async def receive_content(self, type: str, content: object, id: str) -> None: - raise NotImplementedError("ProtocollAsyncJsonWebsocketConsumer needs the method receive_content()") +from .websocket import ProtocollAsyncJsonWebsocketConsumer, get_element_data class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): @@ -130,62 +62,6 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ await self.channel_layer.group_discard('autoupdate', self.channel_name) - async def receive_content(self, type: str, content: Any, id: str) -> None: - """ - If we recieve something from the client we currently just interpret this - as a notify message. - - The server adds the sender's user id (0 for anonymous) and reply - channel name so that a receiver client may reply to the sender or to all - sender's instances. - """ - if type == 'notify': - if notify_message_is_valid(content): - await self.channel_layer.group_send( - "projector", - { - "type": "send_notify", - "incomming": content, - "senderReplyChannelName": self.channel_name, - "senderUserId": self.scope['user'].id or 0, - }, - ) - await self.channel_layer.group_send( - "site", - { - "type": "send_notify", - "incomming": content, - "senderReplyChannelName": self.channel_name, - "senderUserId": self.scope['user'].id or 0, - }, - ) - else: - await self.send_json(type='error', content='Invalid notify message', in_response=id) - - elif type == 'constants': - # Return all constants to the client. - await self.send_json(type='constants', content=get_constants(), in_response=id) - - elif type == 'getElements': - # Return all Elements since a change id - if elements_message_is_valid(content): - requested_change_id = content.get('change_id', 0) - try: - element_data = await get_element_data(self.scope['user'], requested_change_id) - except ValueError as error: - await self.send_json(type='error', content=str(error), in_response=id) - else: - await self.send_json(type='autoupdate', content=element_data, in_response=id) - else: - await self.send_json(type='error', content='Invalid getElements message', in_response=id) - - elif type == 'autoupdate': - # Turn on or off the autoupdate for the client - if content: # accept any value, that can be interpreted as bool - await self.channel_layer.group_add('autoupdate', self.channel_name) - else: - await self.channel_layer.group_discard('autoupdate', self.channel_name) - async def send_notify(self, event: Dict[str, Any]) -> None: """ Send a notify message to the user. @@ -314,35 +190,6 @@ class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer): await self.send_json(type='autoupdate', content=output) -async def get_element_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat: - """ - Returns all data for startup. - """ - current_change_id = await element_cache.get_current_change_id() - if change_id > current_change_id: - raise ValueError("Requested change_id is higher this highest change_id.") - try: - changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user, change_id, current_change_id) - except RuntimeError: - # The change_id is lower the the lowerst change_id in redis. Return all data - changed_elements = await element_cache.get_all_restricted_data(user) - all_data = True - deleted_elements: Dict[str, List[int]] = {} - else: - all_data = False - deleted_elements = defaultdict(list) - for element_id in deleted_element_ids: - collection_string, id = split_element_id(element_id) - deleted_elements[collection_string].append(id) - - return AutoupdateFormat( - changed=changed_elements, - deleted=deleted_elements, - from_change_id=change_id, - to_change_id=current_change_id, - all_data=all_data) - - def projector_startup_data(projector_id: int) -> Any: """ Generate the startup data for a projector. @@ -404,63 +251,3 @@ def projector_sync_send_data(projector_id: int, collection_elements: List[Collec for element in projector.get_collection_elements_required_for_this(collection_element): output.append(element.as_autoupdate_for_projector()) return output - - -def notify_message_is_valid(message: object) -> bool: - """ - Returns True, when the message is a valid notify_message. - """ - schema = { - "$schema": "http://json-schema.org/draft-04/schema#", - "title": "Notify elements.", - "description": "Elements that one client can send to one or many other clients.", - "type": "array", - "items": { - "type": "object", - "properties": { - "projectors": { - "type": "array", - "items": {"type": "integer"}, - }, - "reply_channels": { - "type": "array", - "items": {"type": "string"}, - }, - "users": { - "type": "array", - "items": {"type": "integer"}, - } - } - }, - "minItems": 1, - } - try: - jsonschema.validate(message, schema) - except jsonschema.ValidationError: - return False - else: - return True - - -def elements_message_is_valid(message: object) -> bool: - """ - Return True, if the message is a valid getElement message. - """ - schema = { - "$schema": "http://json-schema.org/draft-04/schema#", - "titel": "getElement request", - "description": "Request from the client to server to get elements.", - "type": "object", - "properties": { - # propertie is not required - "change_id": { - "type": "integer", - } - }, - } - try: - jsonschema.validate(message, schema) - except jsonschema.ValidationError: - return False - else: - return True diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py new file mode 100644 index 000000000..3c8248f22 --- /dev/null +++ b/openslides/utils/websocket.py @@ -0,0 +1,155 @@ +from collections import defaultdict +from typing import Any, Dict, List, Optional + +import jsonschema +from channels.generic.websocket import AsyncJsonWebsocketConsumer + +from .cache import element_cache +from .collection import AutoupdateFormat, CollectionElement +from .utils import split_element_id + + +class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): + """ + Mixin for JSONWebsocketConsumers, that speaks the a special protocol. + """ + + async def send_json(self, type: str, content: Any, id: Optional[str] = None, in_response: Optional[str] = None) -> None: + """ + Sends the data with the type. + """ + out = {'type': type, 'content': content} + if id: + out['id'] = id + if in_response: + out['in_response'] = in_response + await super().send_json(out) + + async def receive_json(self, content: Any) -> None: + """ + Receives the json data, parses it and calls receive_content. + """ + try: + jsonschema.validate(content, schema) + except jsonschema.ValidationError as err: + try: + in_response = content['id'] + except (TypeError, KeyError): + # content is not a dict (TypeError) or has not the key id (KeyError) + in_response = None + + await self.send_json( + type='error', + content=str(err), + in_response=in_response) + return + + await websocket_client_messages[content['type']].receive_content(self, content['content'], id=content['id']) + + +schema: Dict[str, Any] = { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "OpenSlidesWebsocketProtocol", + "description": "The packages that OpenSlides sends between the server and the client.", + "type": "object", + "properties": { + "type": { + "description": "Defines what kind of packages is packed.", + "type": "string", + }, + "content": { + "description": "The content of the package.", + }, + "id": { + "description": "An identifier of the package.", + "type": "string", + }, + "in_response": { + "description": "The id of another package that the other part sent before.", + "type": "string", + }, + }, + "required": ["type", "content", "id"], + "anyOf": [], # This will be filled in register_client_message() +} + + +class BaseWebsocketClientMessage: + schema: Dict[str, object] = {} + """ + Optional schema. + + If schema is not set, any value in content is accepted. + """ + + identifier: str = "" + """ + A unique identifier for the websocket message. + + This is used as value in the 'type' property in the websocket message. + """ + + content_required = True + """ + Desiedes, if the content property is required. + """ + + async def receive_content(self, consumer: "ProtocollAsyncJsonWebsocketConsumer", message: Any, id: str) -> None: + raise NotImplementedError("WebsocketClientMessage needs the method receive_content().") + + +websocket_client_messages: Dict[str, BaseWebsocketClientMessage] = {} +""" +Saves all websocket client message object ordered by there identifier. +""" + + +def register_client_message(websocket_client_message: BaseWebsocketClientMessage) -> None: + """ + Registers one websocket client message class. + """ + if not websocket_client_message.identifier or websocket_client_message.identifier in websocket_client_messages: + raise NotImplementedError("WebsocketClientMessage needs a unique identifier.") + + websocket_client_messages[websocket_client_message.identifier] = websocket_client_message + + # Add the message schema to the schema + message_schema: Dict[str, Any] = { + 'properties': { + 'type': {'const': websocket_client_message.identifier}, + 'content': websocket_client_message.schema, + } + } + if websocket_client_message.content_required: + message_schema['required'] = ['content'] + + schema['anyOf'].append(message_schema) + + +async def get_element_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat: + """ + Returns all element data since a change_id. + """ + current_change_id = await element_cache.get_current_change_id() + if change_id > current_change_id: + raise ValueError("Requested change_id is higher this highest change_id.") + try: + changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user, change_id, current_change_id) + except RuntimeError: + # The change_id is lower the the lowerst change_id in redis. Return all data + changed_elements = await element_cache.get_all_restricted_data(user) + all_data = True + deleted_elements: Dict[str, List[int]] = {} + else: + all_data = False + deleted_elements = defaultdict(list) + for element_id in deleted_element_ids: + collection_string, id = split_element_id(element_id) + deleted_elements[collection_string].append(id) + + return AutoupdateFormat( + changed=changed_elements, + deleted=deleted_elements, + from_change_id=change_id, + to_change_id=current_change_id, + all_data=all_data) diff --git a/requirements/production.txt b/requirements/production.txt index 956983e3a..a0c6fa58e 100644 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -5,7 +5,7 @@ daphne>=2.2,<2.3 Django>=1.11,<2.2 djangorestframework>=3.4,<3.10 jsonfield2>=3.0,<3.1 -jsonschema>=2.6,<2.7 +jsonschema==3.0.0a3 # TODO: use a stabel version before release mypy_extensions>=0.4,<0.5 PyPDF2>=1.26,<1.27 roman>=2.0,<3.1 diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index e319d4ca6..3556afc69 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -173,46 +173,7 @@ async def test_receive_deleted_data(get_communicator): @pytest.mark.asyncio -async def test_send_invalid_notify_not_a_list(communicator): - await set_config('general_system_enable_anonymous', True) - await communicator.connect() - - await communicator.send_json_to({'type': 'notify', 'content': {'testmessage': 'foobar, what else.'}, 'id': 'test_send_invalid_notify_not_a_list'}) - response = await communicator.receive_json_from() - - assert response['type'] == 'error' - assert response['content'] == 'Invalid notify message' - assert response['in_response'] == 'test_send_invalid_notify_not_a_list' - - -@pytest.mark.asyncio -async def test_send_invalid_notify_no_elements(communicator): - await set_config('general_system_enable_anonymous', True) - await communicator.connect() - - await communicator.send_json_to({'type': 'notify', 'content': [], 'id': 'test_send_invalid_notify_no_elements'}) - response = await communicator.receive_json_from() - - assert response['type'] == 'error' - assert response['content'] == 'Invalid notify message' - assert response['in_response'] == 'test_send_invalid_notify_no_elements' - - -@pytest.mark.asyncio -async def test_send_invalid_notify_str_in_list(communicator): - await set_config('general_system_enable_anonymous', True) - await communicator.connect() - - await communicator.send_json_to({'type': 'notify', 'content': [{}, 'testmessage'], 'id': 'test_send_invalid_notify_str_in_list'}) - response = await communicator.receive_json_from() - - assert response['type'] == 'error' - assert response['content'] == 'Invalid notify message' - assert response['in_response'] == 'test_send_invalid_notify_str_in_list' - - -@pytest.mark.asyncio -async def test_send_valid_notify(communicator): +async def test_send_notify(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() @@ -249,17 +210,6 @@ async def test_invalid_websocket_message_no_id(communicator): assert response['type'] == 'error' -@pytest.mark.asyncio -async def test_invalid_websocket_message_no_content(communicator): - await set_config('general_system_enable_anonymous', True) - await communicator.connect() - - await communicator.send_json_to({'type': 'test', 'id': 'test_id'}) - - response = await communicator.receive_json_from() - assert response['type'] == 'error' - - @pytest.mark.asyncio async def test_send_unknown_type(communicator): await set_config('general_system_enable_anonymous', True) diff --git a/tests/unit/core/test_websocket.py b/tests/unit/core/test_websocket.py new file mode 100644 index 000000000..9cf3992b8 --- /dev/null +++ b/tests/unit/core/test_websocket.py @@ -0,0 +1,49 @@ +import jsonschema +import pytest + +from openslides.utils.websocket import schema + + +def test_notify_schema_validation(): + # This raises a validaten error if it fails + message = { + 'id': 'test-message', + 'type': 'notify', + 'content': [{ + 'users': [5], + }] + } + jsonschema.validate(message, schema) + + +def test_notify_schema_invalid_str_in_list(): + message = { + 'type': 'notify', + 'content': [ + {}, + 'testmessage' + ], + 'id': 'test_send_invalid_notify_str_in_list', + } + with pytest.raises(jsonschema.ValidationError): + jsonschema.validate(message, schema) + + +def test_notify_schema_invalid_no_elements(): + message = { + 'type': 'notify', + 'content': [], + 'id': 'test_send_invalid_notify_str_in_list', + } + with pytest.raises(jsonschema.ValidationError): + jsonschema.validate(message, schema) + + +def test_notify_schema_invalid_not_a_list(): + message = { + 'type': 'notify', + 'content': {'testmessage': 'foobar, what else.'}, + 'id': 'test_send_invalid_notify_str_in_list', + } + with pytest.raises(jsonschema.ValidationError): + jsonschema.validate(message, schema)