diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 84e9ce1d7..0932311f7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -17,6 +17,7 @@ Core: - Enabled docs for using OpenSlides with Gunicorn and Uvicorn in big mode [#3799, #3817]. - Changed format for elements send via autoupdate [#3926]. + - Add a change-id system to get only new elements [#3938]. Motions: - Option to customly sort motions [#3894]. diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index ee5c59ae8..044f37a89 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -157,7 +157,7 @@ async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> N }, ) await channel_layer.group_send( - "site", + "autoupdate", { "type": "send_data", "change_id": change_id, diff --git a/openslides/utils/collection.py b/openslides/utils/collection.py index 62317c1f3..8d329cc9d 100644 --- a/openslides/utils/collection.py +++ b/openslides/utils/collection.py @@ -28,7 +28,9 @@ AutoupdateFormat = TypedDict( { 'changed': Dict[str, List[Dict[str, Any]]], 'deleted': Dict[str, List[int]], - 'change_id': int, + 'from_change_id': int, + 'to_change_id': int, + 'all_data': bool, }, ) diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 56bf91ec3..5fb387f70 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -1,5 +1,6 @@ from collections import defaultdict from typing import Any, Dict, List, Optional +from urllib.parse import parse_qs import jsonschema from asgiref.sync import sync_to_async @@ -33,7 +34,7 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): "type": { "description": "Defines what kind of packages is packed.", "type": "string", - "pattern": "notify|constants", # The server can sent other types + "pattern": "notify|constants|getElements|autoupdate", # The server can sent other types }, "content": { "description": "The content of the package.", @@ -90,6 +91,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ Websocket Consumer for the site. """ + groups = ['site'] async def connect(self) -> None: @@ -100,14 +102,34 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): Sends the startup data to the user. """ - # TODO: add a way to ask for the data since a change_id and send only data that is newer + change_id = None if not await async_anonymous_is_enabled() and self.scope['user'].id is None: await self.close() - else: - await self.accept() - data = await startup_data(self.scope['user']) + return + + query_string = parse_qs(self.scope['query_string']) + if b'change_id' in query_string: + try: + change_id = int(query_string[b'change_id'][0]) + except ValueError: + await self.close() # TODO: Find a way to send an error code + return + + if b'autoupdate' in query_string and query_string[b'autoupdate'][0].lower() not in [b'0', b'off', b'false']: + # a positive value in autoupdate. Start autoupdate + await self.channel_layer.group_add('autoupdate', self.channel_name) + + await self.accept() + if change_id is not None: + data = await get_element_data(self.scope['user'], change_id) await self.send_json(type='autoupdate', content=data) + async def disconnect(self, close_code: int) -> None: + """ + A user disconnects. Remove it from autoupdate. + """ + await self.channel_layer.group_discard('autoupdate', self.channel_name) + async def receive_content(self, type: str, content: Any, id: str) -> None: """ If we recieve something from the client we currently just interpret this @@ -144,6 +166,26 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): # Return all constants to the client. await self.send_json(type='constants', content=get_constants(), in_response=id) + elif type == 'getElements': + # Return all Elements since a change id + if elements_message_is_valid(content): + requested_change_id = content.get('change_id', 0) + try: + element_data = await get_element_data(self.scope['user'], requested_change_id) + except ValueError as error: + await self.send_json(type='error', content=str(error), in_response=id) + else: + await self.send_json(type='autoupdate', content=element_data, in_response=id) + else: + await self.send_json(type='error', content='Invalid getElements message', in_response=id) + + elif type == 'autoupdate': + # Turn on or off the autoupdate for the client + if content: # accept any value, that can be interpreted as bool + await self.channel_layer.group_add('autoupdate', self.channel_name) + else: + await self.channel_layer.group_discard('autoupdate', self.channel_name) + async def send_notify(self, event: Dict[str, Any]) -> None: """ Send a notify message to the user. @@ -177,7 +219,12 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): for element_id in deleted_elements_ids: collection_string, id = split_element_id(element_id) deleted_elements[collection_string].append(id) - await self.send_json(type='autoupdate', content=AutoupdateFormat(changed=changed_elements, deleted=deleted_elements, change_id=change_id)) + await self.send_json(type='autoupdate', content=AutoupdateFormat( + changed=changed_elements, + deleted=deleted_elements, + from_change_id=change_id, + to_change_id=change_id, + all_data=False)) class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer): @@ -267,21 +314,33 @@ class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer): await self.send_json(type='autoupdate', content=output) -async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat: +async def get_element_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat: """ Returns all data for startup. """ - # TODO: use the change_id argument - # TODO: This two calls have to be atomic - changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user) current_change_id = await element_cache.get_current_change_id() + if change_id > current_change_id: + raise ValueError("Requested change_id is higher this highest change_id.") + try: + changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user, change_id, current_change_id) + except RuntimeError: + # The change_id is lower the the lowerst change_id in redis. Return all data + changed_elements = await element_cache.get_all_restricted_data(user) + all_data = True + deleted_elements: Dict[str, List[int]] = {} + else: + all_data = False + deleted_elements = defaultdict(list) + for element_id in deleted_element_ids: + collection_string, id = split_element_id(element_id) + deleted_elements[collection_string].append(id) - deleted_elements: Dict[str, List[int]] = defaultdict(list) - for element_id in deleted_element_ids: - collection_string, id = split_element_id(element_id) - deleted_elements[collection_string].append(id) - - return AutoupdateFormat(changed=changed_elements, deleted=deleted_elements, change_id=current_change_id) + return AutoupdateFormat( + changed=changed_elements, + deleted=deleted_elements, + from_change_id=change_id, + to_change_id=current_change_id, + all_data=all_data) def projector_startup_data(projector_id: int) -> Any: @@ -381,3 +440,27 @@ def notify_message_is_valid(message: object) -> bool: return False else: return True + + +def elements_message_is_valid(message: object) -> bool: + """ + Return True, if the message is a valid getElement message. + """ + schema = { + "$schema": "http://json-schema.org/draft-04/schema#", + "titel": "getElement request", + "description": "Request from the client to server to get elements.", + "type": "object", + "properties": { + # propertie is not required + "change_id": { + "type": "integer", + } + }, + } + try: + jsonschema.validate(message, schema) + except jsonschema.ValidationError: + return False + else: + return True diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 4d2c79559..6ed3e3884 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -1,3 +1,4 @@ +import asyncio from importlib import import_module import pytest @@ -43,15 +44,37 @@ async def prepare_element_cache(settings): @pytest.fixture -async def communicator(request, event_loop): - communicator = WebsocketCommunicator(application, "/ws/site/") - yield communicator - await communicator.disconnect() +async def get_communicator(): + communicator: WebsocketCommunicator = None + + def get_communicator(query_string=''): + nonlocal communicator # use the outer communicator variable + if query_string: + query_string = "?{}".format(query_string) + communicator = WebsocketCommunicator(application, "/ws/site/{}".format(query_string)) + return communicator + + yield get_communicator + if communicator: + await communicator.disconnect() + + +@pytest.fixture +async def communicator(get_communicator): + yield get_communicator() @pytest.mark.asyncio -async def test_normal_connection(communicator): +async def test_normal_connection(get_communicator): await set_config('general_system_enable_anonymous', True) + connected, __ = await get_communicator().connect() + assert connected + + +@pytest.mark.asyncio +async def test_connection_with_change_id(get_communicator): + await set_config('general_system_enable_anonymous', True) + communicator = get_communicator('change_id=0') await communicator.connect() response = await communicator.receive_json_from() @@ -61,7 +84,8 @@ async def test_normal_connection(communicator): assert type == 'autoupdate' assert 'changed' in content assert 'deleted' in content - assert 'change_id' in content + assert 'from_change_id' in content + assert 'to_change_id' in content assert Collection1().get_collection_string() in content['changed'] assert Collection2().get_collection_string() in content['changed'] assert TConfig().get_collection_string() in content['changed'] @@ -69,12 +93,31 @@ async def test_normal_connection(communicator): @pytest.mark.asyncio -async def test_receive_changed_data(communicator): +async def test_connection_with_invalid_change_id(get_communicator): + await set_config('general_system_enable_anonymous', True) + communicator = get_communicator('change_id=invalid') + connected, __ = await communicator.connect() + + assert connected is False + + +@pytest.mark.asyncio +async def test_changed_data_autoupdate_off(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - await communicator.receive_json_from() - # Change a config value after the startup data has been received + # Change a config value + await set_config('general_event_name', 'Test Event') + assert await communicator.receive_nothing() + + +@pytest.mark.asyncio +async def test_changed_data_autoupdate_on(get_communicator): + await set_config('general_system_enable_anonymous', True) + communicator = get_communicator('autoupdate=on') + await communicator.connect() + + # Change a config value await set_config('general_event_name', 'Test Event') response = await communicator.receive_json_from() @@ -114,10 +157,10 @@ async def test_with_user(): @pytest.mark.asyncio -async def test_receive_deleted_data(communicator): +async def test_receive_deleted_data(get_communicator): await set_config('general_system_enable_anonymous', True) + communicator = get_communicator('autoupdate=on') await communicator.connect() - await communicator.receive_json_from() # Delete test element await sync_to_async(inform_deleted_data)([(Collection1().get_collection_string(), 1)]) @@ -133,8 +176,6 @@ async def test_receive_deleted_data(communicator): async def test_send_invalid_notify_not_a_list(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'notify', 'content': {'testmessage': 'foobar, what else.'}, 'id': 'test_send_invalid_notify_not_a_list'}) response = await communicator.receive_json_from() @@ -148,8 +189,6 @@ async def test_send_invalid_notify_not_a_list(communicator): async def test_send_invalid_notify_no_elements(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'notify', 'content': [], 'id': 'test_send_invalid_notify_no_elements'}) response = await communicator.receive_json_from() @@ -163,8 +202,6 @@ async def test_send_invalid_notify_no_elements(communicator): async def test_send_invalid_notify_str_in_list(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'notify', 'content': [{}, 'testmessage'], 'id': 'test_send_invalid_notify_str_in_list'}) response = await communicator.receive_json_from() @@ -178,8 +215,6 @@ async def test_send_invalid_notify_str_in_list(communicator): async def test_send_valid_notify(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'notify', 'content': [{'testmessage': 'foobar, what else.'}], 'id': 'test'}) response = await communicator.receive_json_from() @@ -196,8 +231,6 @@ async def test_send_valid_notify(communicator): async def test_invalid_websocket_message_type(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to([]) @@ -209,8 +242,6 @@ async def test_invalid_websocket_message_type(communicator): async def test_invalid_websocket_message_no_id(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'test', 'content': 'foobar'}) @@ -222,8 +253,6 @@ async def test_invalid_websocket_message_no_id(communicator): async def test_invalid_websocket_message_no_content(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'test', 'id': 'test_id'}) @@ -235,8 +264,6 @@ async def test_invalid_websocket_message_no_content(communicator): async def test_send_unknown_type(communicator): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'if_you_add_this_type_to_openslides_I_will_be_sad', 'content': True, 'id': 'test_id'}) @@ -249,8 +276,6 @@ async def test_send_unknown_type(communicator): async def test_request_constants(communicator, settings): await set_config('general_system_enable_anonymous', True) await communicator.connect() - # Await the startup data - await communicator.receive_json_from() await communicator.send_json_to({'type': 'constants', 'content': '', 'id': 'test_id'}) @@ -258,3 +283,96 @@ async def test_request_constants(communicator, settings): assert response['type'] == 'constants' # See conftest.py for the content of 'content' assert response['content'] == {'constant1': 'value1', 'constant2': 'value2'} + + +@pytest.mark.asyncio +async def test_send_get_elements(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + + await communicator.send_json_to({'type': 'getElements', 'content': {}, 'id': 'test_id'}) + response = await communicator.receive_json_from() + + type = response.get('type') + content = response.get('content') + assert type == 'autoupdate' + assert 'changed' in content + assert 'deleted' in content + assert 'from_change_id' in content + assert 'to_change_id' in content + assert Collection1().get_collection_string() in content['changed'] + assert Collection2().get_collection_string() in content['changed'] + assert TConfig().get_collection_string() in content['changed'] + assert TUser().get_collection_string() in content['changed'] + + +@pytest.mark.asyncio +async def test_send_get_elements_to_big_change_id(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + + await communicator.send_json_to({'type': 'getElements', 'content': {'change_id': 1_000_000_000_000}, 'id': 'test_id'}) + response = await communicator.receive_json_from() + + type = response.get('type') + assert type == 'error' + assert response.get('in_response') == 'test_id' + + +@pytest.mark.asyncio +async def test_send_get_elements_to_small_change_id(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + + await communicator.send_json_to({'type': 'getElements', 'content': {'change_id': 1}, 'id': 'test_id'}) + response = await communicator.receive_json_from() + + type = response.get('type') + assert type == 'autoupdate' + assert response.get('in_response') == 'test_id' + assert response.get('content')['all_data'] + + +@pytest.mark.asyncio +async def test_send_invalid_get_elements(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + + await communicator.send_json_to({'type': 'getElements', 'content': {'change_id': 'some value'}, 'id': 'test_id'}) + response = await communicator.receive_json_from() + + type = response.get('type') + assert type == 'error' + assert response.get('in_response') == 'test_id' + + +@pytest.mark.asyncio +async def test_turn_on_autoupdate(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + + await communicator.send_json_to({'type': 'autoupdate', 'content': 'on', 'id': 'test_id'}) + await asyncio.sleep(0.01) + # Change a config value + await set_config('general_event_name', 'Test Event') + response = await communicator.receive_json_from() + + id = config.get_key_to_id()['general_event_name'] + type = response.get('type') + content = response.get('content') + assert type == 'autoupdate' + assert content['changed'] == { + 'core/config': [{'id': id, 'key': 'general_event_name', 'value': 'Test Event'}]} + + +@pytest.mark.asyncio +async def test_turn_off_autoupdate(get_communicator): + await set_config('general_system_enable_anonymous', True) + communicator = get_communicator('autoupdate=on') + await communicator.connect() + + await communicator.send_json_to({'type': 'autoupdate', 'content': False, 'id': 'test_id'}) + await asyncio.sleep(0.01) + # Change a config value + await set_config('general_event_name', 'Test Event') + assert await communicator.receive_nothing()