Change_id API

* Improve AutoUpdateFormat
* Receive autoupdate requested via websocket
* Support change_id in query string
* Disable autoupdate at default
* Add websocket protocoll to turn on or off autoupdate
This commit is contained in:
Oskar Hahn 2018-10-19 16:32:48 +02:00
parent 1c99c0c40a
commit e7d14a8081
5 changed files with 251 additions and 47 deletions

View File

@ -17,6 +17,7 @@ Core:
- Enabled docs for using OpenSlides with Gunicorn and Uvicorn in big
mode [#3799, #3817].
- Changed format for elements send via autoupdate [#3926].
- Add a change-id system to get only new elements [#3938].
Motions:
- Option to customly sort motions [#3894].

View File

@ -157,7 +157,7 @@ async def send_autoupdate(collection_elements: Iterable[CollectionElement]) -> N
},
)
await channel_layer.group_send(
"site",
"autoupdate",
{
"type": "send_data",
"change_id": change_id,

View File

@ -28,7 +28,9 @@ AutoupdateFormat = TypedDict(
{
'changed': Dict[str, List[Dict[str, Any]]],
'deleted': Dict[str, List[int]],
'change_id': int,
'from_change_id': int,
'to_change_id': int,
'all_data': bool,
},
)

View File

@ -1,5 +1,6 @@
from collections import defaultdict
from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs
import jsonschema
from asgiref.sync import sync_to_async
@ -33,7 +34,7 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
"type": {
"description": "Defines what kind of packages is packed.",
"type": "string",
"pattern": "notify|constants", # The server can sent other types
"pattern": "notify|constants|getElements|autoupdate", # The server can sent other types
},
"content": {
"description": "The content of the package.",
@ -90,6 +91,7 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
"""
Websocket Consumer for the site.
"""
groups = ['site']
async def connect(self) -> None:
@ -100,14 +102,34 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
Sends the startup data to the user.
"""
# TODO: add a way to ask for the data since a change_id and send only data that is newer
change_id = None
if not await async_anonymous_is_enabled() and self.scope['user'].id is None:
await self.close()
else:
await self.accept()
data = await startup_data(self.scope['user'])
return
query_string = parse_qs(self.scope['query_string'])
if b'change_id' in query_string:
try:
change_id = int(query_string[b'change_id'][0])
except ValueError:
await self.close() # TODO: Find a way to send an error code
return
if b'autoupdate' in query_string and query_string[b'autoupdate'][0].lower() not in [b'0', b'off', b'false']:
# a positive value in autoupdate. Start autoupdate
await self.channel_layer.group_add('autoupdate', self.channel_name)
await self.accept()
if change_id is not None:
data = await get_element_data(self.scope['user'], change_id)
await self.send_json(type='autoupdate', content=data)
async def disconnect(self, close_code: int) -> None:
"""
A user disconnects. Remove it from autoupdate.
"""
await self.channel_layer.group_discard('autoupdate', self.channel_name)
async def receive_content(self, type: str, content: Any, id: str) -> None:
"""
If we recieve something from the client we currently just interpret this
@ -144,6 +166,26 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
# Return all constants to the client.
await self.send_json(type='constants', content=get_constants(), in_response=id)
elif type == 'getElements':
# Return all Elements since a change id
if elements_message_is_valid(content):
requested_change_id = content.get('change_id', 0)
try:
element_data = await get_element_data(self.scope['user'], requested_change_id)
except ValueError as error:
await self.send_json(type='error', content=str(error), in_response=id)
else:
await self.send_json(type='autoupdate', content=element_data, in_response=id)
else:
await self.send_json(type='error', content='Invalid getElements message', in_response=id)
elif type == 'autoupdate':
# Turn on or off the autoupdate for the client
if content: # accept any value, that can be interpreted as bool
await self.channel_layer.group_add('autoupdate', self.channel_name)
else:
await self.channel_layer.group_discard('autoupdate', self.channel_name)
async def send_notify(self, event: Dict[str, Any]) -> None:
"""
Send a notify message to the user.
@ -177,7 +219,12 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
for element_id in deleted_elements_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
await self.send_json(type='autoupdate', content=AutoupdateFormat(changed=changed_elements, deleted=deleted_elements, change_id=change_id))
await self.send_json(type='autoupdate', content=AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=change_id,
to_change_id=change_id,
all_data=False))
class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer):
@ -267,21 +314,33 @@ class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer):
await self.send_json(type='autoupdate', content=output)
async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat:
async def get_element_data(user: Optional[CollectionElement], change_id: int = 0) -> AutoupdateFormat:
"""
Returns all data for startup.
"""
# TODO: use the change_id argument
# TODO: This two calls have to be atomic
changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user)
current_change_id = await element_cache.get_current_change_id()
if change_id > current_change_id:
raise ValueError("Requested change_id is higher this highest change_id.")
try:
changed_elements, deleted_element_ids = await element_cache.get_restricted_data(user, change_id, current_change_id)
except RuntimeError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_restricted_data(user)
all_data = True
deleted_elements: Dict[str, List[int]] = {}
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
deleted_elements: Dict[str, List[int]] = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
return AutoupdateFormat(changed=changed_elements, deleted=deleted_elements, change_id=current_change_id)
return AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=change_id,
to_change_id=current_change_id,
all_data=all_data)
def projector_startup_data(projector_id: int) -> Any:
@ -381,3 +440,27 @@ def notify_message_is_valid(message: object) -> bool:
return False
else:
return True
def elements_message_is_valid(message: object) -> bool:
"""
Return True, if the message is a valid getElement message.
"""
schema = {
"$schema": "http://json-schema.org/draft-04/schema#",
"titel": "getElement request",
"description": "Request from the client to server to get elements.",
"type": "object",
"properties": {
# propertie is not required
"change_id": {
"type": "integer",
}
},
}
try:
jsonschema.validate(message, schema)
except jsonschema.ValidationError:
return False
else:
return True

View File

@ -1,3 +1,4 @@
import asyncio
from importlib import import_module
import pytest
@ -43,15 +44,37 @@ async def prepare_element_cache(settings):
@pytest.fixture
async def communicator(request, event_loop):
communicator = WebsocketCommunicator(application, "/ws/site/")
yield communicator
await communicator.disconnect()
async def get_communicator():
communicator: WebsocketCommunicator = None
def get_communicator(query_string=''):
nonlocal communicator # use the outer communicator variable
if query_string:
query_string = "?{}".format(query_string)
communicator = WebsocketCommunicator(application, "/ws/site/{}".format(query_string))
return communicator
yield get_communicator
if communicator:
await communicator.disconnect()
@pytest.fixture
async def communicator(get_communicator):
yield get_communicator()
@pytest.mark.asyncio
async def test_normal_connection(communicator):
async def test_normal_connection(get_communicator):
await set_config('general_system_enable_anonymous', True)
connected, __ = await get_communicator().connect()
assert connected
@pytest.mark.asyncio
async def test_connection_with_change_id(get_communicator):
await set_config('general_system_enable_anonymous', True)
communicator = get_communicator('change_id=0')
await communicator.connect()
response = await communicator.receive_json_from()
@ -61,7 +84,8 @@ async def test_normal_connection(communicator):
assert type == 'autoupdate'
assert 'changed' in content
assert 'deleted' in content
assert 'change_id' in content
assert 'from_change_id' in content
assert 'to_change_id' in content
assert Collection1().get_collection_string() in content['changed']
assert Collection2().get_collection_string() in content['changed']
assert TConfig().get_collection_string() in content['changed']
@ -69,12 +93,31 @@ async def test_normal_connection(communicator):
@pytest.mark.asyncio
async def test_receive_changed_data(communicator):
async def test_connection_with_invalid_change_id(get_communicator):
await set_config('general_system_enable_anonymous', True)
communicator = get_communicator('change_id=invalid')
connected, __ = await communicator.connect()
assert connected is False
@pytest.mark.asyncio
async def test_changed_data_autoupdate_off(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
await communicator.receive_json_from()
# Change a config value after the startup data has been received
# Change a config value
await set_config('general_event_name', 'Test Event')
assert await communicator.receive_nothing()
@pytest.mark.asyncio
async def test_changed_data_autoupdate_on(get_communicator):
await set_config('general_system_enable_anonymous', True)
communicator = get_communicator('autoupdate=on')
await communicator.connect()
# Change a config value
await set_config('general_event_name', 'Test Event')
response = await communicator.receive_json_from()
@ -114,10 +157,10 @@ async def test_with_user():
@pytest.mark.asyncio
async def test_receive_deleted_data(communicator):
async def test_receive_deleted_data(get_communicator):
await set_config('general_system_enable_anonymous', True)
communicator = get_communicator('autoupdate=on')
await communicator.connect()
await communicator.receive_json_from()
# Delete test element
await sync_to_async(inform_deleted_data)([(Collection1().get_collection_string(), 1)])
@ -133,8 +176,6 @@ async def test_receive_deleted_data(communicator):
async def test_send_invalid_notify_not_a_list(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'notify', 'content': {'testmessage': 'foobar, what else.'}, 'id': 'test_send_invalid_notify_not_a_list'})
response = await communicator.receive_json_from()
@ -148,8 +189,6 @@ async def test_send_invalid_notify_not_a_list(communicator):
async def test_send_invalid_notify_no_elements(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'notify', 'content': [], 'id': 'test_send_invalid_notify_no_elements'})
response = await communicator.receive_json_from()
@ -163,8 +202,6 @@ async def test_send_invalid_notify_no_elements(communicator):
async def test_send_invalid_notify_str_in_list(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'notify', 'content': [{}, 'testmessage'], 'id': 'test_send_invalid_notify_str_in_list'})
response = await communicator.receive_json_from()
@ -178,8 +215,6 @@ async def test_send_invalid_notify_str_in_list(communicator):
async def test_send_valid_notify(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'notify', 'content': [{'testmessage': 'foobar, what else.'}], 'id': 'test'})
response = await communicator.receive_json_from()
@ -196,8 +231,6 @@ async def test_send_valid_notify(communicator):
async def test_invalid_websocket_message_type(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to([])
@ -209,8 +242,6 @@ async def test_invalid_websocket_message_type(communicator):
async def test_invalid_websocket_message_no_id(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'test', 'content': 'foobar'})
@ -222,8 +253,6 @@ async def test_invalid_websocket_message_no_id(communicator):
async def test_invalid_websocket_message_no_content(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'test', 'id': 'test_id'})
@ -235,8 +264,6 @@ async def test_invalid_websocket_message_no_content(communicator):
async def test_send_unknown_type(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'if_you_add_this_type_to_openslides_I_will_be_sad', 'content': True, 'id': 'test_id'})
@ -249,8 +276,6 @@ async def test_send_unknown_type(communicator):
async def test_request_constants(communicator, settings):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
# Await the startup data
await communicator.receive_json_from()
await communicator.send_json_to({'type': 'constants', 'content': '', 'id': 'test_id'})
@ -258,3 +283,96 @@ async def test_request_constants(communicator, settings):
assert response['type'] == 'constants'
# See conftest.py for the content of 'content'
assert response['content'] == {'constant1': 'value1', 'constant2': 'value2'}
@pytest.mark.asyncio
async def test_send_get_elements(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
await communicator.send_json_to({'type': 'getElements', 'content': {}, 'id': 'test_id'})
response = await communicator.receive_json_from()
type = response.get('type')
content = response.get('content')
assert type == 'autoupdate'
assert 'changed' in content
assert 'deleted' in content
assert 'from_change_id' in content
assert 'to_change_id' in content
assert Collection1().get_collection_string() in content['changed']
assert Collection2().get_collection_string() in content['changed']
assert TConfig().get_collection_string() in content['changed']
assert TUser().get_collection_string() in content['changed']
@pytest.mark.asyncio
async def test_send_get_elements_to_big_change_id(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
await communicator.send_json_to({'type': 'getElements', 'content': {'change_id': 1_000_000_000_000}, 'id': 'test_id'})
response = await communicator.receive_json_from()
type = response.get('type')
assert type == 'error'
assert response.get('in_response') == 'test_id'
@pytest.mark.asyncio
async def test_send_get_elements_to_small_change_id(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
await communicator.send_json_to({'type': 'getElements', 'content': {'change_id': 1}, 'id': 'test_id'})
response = await communicator.receive_json_from()
type = response.get('type')
assert type == 'autoupdate'
assert response.get('in_response') == 'test_id'
assert response.get('content')['all_data']
@pytest.mark.asyncio
async def test_send_invalid_get_elements(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
await communicator.send_json_to({'type': 'getElements', 'content': {'change_id': 'some value'}, 'id': 'test_id'})
response = await communicator.receive_json_from()
type = response.get('type')
assert type == 'error'
assert response.get('in_response') == 'test_id'
@pytest.mark.asyncio
async def test_turn_on_autoupdate(communicator):
await set_config('general_system_enable_anonymous', True)
await communicator.connect()
await communicator.send_json_to({'type': 'autoupdate', 'content': 'on', 'id': 'test_id'})
await asyncio.sleep(0.01)
# Change a config value
await set_config('general_event_name', 'Test Event')
response = await communicator.receive_json_from()
id = config.get_key_to_id()['general_event_name']
type = response.get('type')
content = response.get('content')
assert type == 'autoupdate'
assert content['changed'] == {
'core/config': [{'id': id, 'key': 'general_event_name', 'value': 'Test Event'}]}
@pytest.mark.asyncio
async def test_turn_off_autoupdate(get_communicator):
await set_config('general_system_enable_anonymous', True)
communicator = get_communicator('autoupdate=on')
await communicator.connect()
await communicator.send_json_to({'type': 'autoupdate', 'content': False, 'id': 'test_id'})
await asyncio.sleep(0.01)
# Change a config value
await set_config('general_event_name', 'Test Event')
assert await communicator.receive_nothing()