From fed6d6f435cca54119084a7396717047ac3003e8 Mon Sep 17 00:00:00 2001 From: Oskar Hahn Date: Wed, 22 Aug 2018 16:50:23 +0200 Subject: [PATCH 1/2] Add a protocol for websocket {'type': STRING, 'content': ANY} --- CHANGELOG.rst | 1 + openslides/utils/consumers.py | 176 ++++++++++++++++------ requirements_production.txt | 1 + tests/integration/utils/test_consumers.py | 105 ++++++++++--- tests/settings.py | 13 -- 5 files changed, 216 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index aa6b8d022..3c669534e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -35,6 +35,7 @@ Core: - Support Python 3.7 [#3786]. - Updated pdfMake to 0.1.37 [#3766]. - Updated Django to 2.1 [#3777, #3786]. + - Adds a websocket protocol [#3807]. Version 2.2 (2018-06-06) diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 820b1ad36..e01a66f4a 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -1,5 +1,6 @@ from typing import Any, Dict, List, Optional +import jsonschema from asgiref.sync import sync_to_async from channels.db import database_sync_to_async from channels.generic.websocket import AsyncJsonWebsocketConsumer @@ -17,7 +18,73 @@ from .collection import ( ) -class SiteConsumer(AsyncJsonWebsocketConsumer): +class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): + """ + Mixin for JSONWebsocketConsumers, that speaks the a special protocol. + """ + schema = { + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "OpenSlidesWebsocketProtocol", + "description": "The base packages that OpenSlides sends between the server and the client.", + "type": "object", + "properties": { + "type": { + "description": "Defines what kind of packages is packed.", + "type": "string", + "pattern": "notify", # The server can sent other types + }, + "content": { + "description": "The content of the package.", + }, + "id": { + "description": "An identifier of the package.", + "type": "string", + }, + "in_response": { + "description": "The id of another package that the other part sent before.", + "type": "string", + } + }, + "required": ["type", "content", "id"], + } + + async def send_json(self, type: str, content: Any, id: Optional[str] = None, in_response: Optional[str] = None) -> None: + """ + Sends the data with the type. + """ + out = {'type': type, 'content': content} + if id: + out['id'] = id + if in_response: + out['in_response'] = in_response + await super().send_json(out) + + async def receive_json(self, content: Any) -> None: + """ + Receives the json data, parses it and calls receive_content. + """ + try: + jsonschema.validate(content, self.schema) + except jsonschema.ValidationError as err: + try: + in_response = content['id'] + except (TypeError, KeyError): + # content is not a dict (TypeError) or has not the key id (KeyError) + in_response = None + + await self.send_json( + type='error', + content=str(err), + in_response=in_response) + return + + await self.receive_content(type=content['type'], content=content['content'], id=content['id']) + + async def receive_content(self, type: str, content: object, id: str) -> None: + raise NotImplementedError("ProtocollAsyncJsonWebsocketConsumer needs the method receive_content()") + + +class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ Websocket Consumer for the site. """ @@ -37,9 +104,9 @@ class SiteConsumer(AsyncJsonWebsocketConsumer): else: await self.accept() data = await startup_data(self.scope['user']) - await self.send_json(data) + await self.send_json(type='autoupdate', content=data) - async def receive_json(self, content: Any) -> None: + async def receive_content(self, type: str, content: Any, id: str) -> None: """ If we recieve something from the client we currently just interpret this as a notify message. @@ -48,27 +115,28 @@ class SiteConsumer(AsyncJsonWebsocketConsumer): channel name so that a receiver client may reply to the sender or to all sender's instances. """ - if notify_message_is_valid(content): - await self.channel_layer.group_send( - "projector", - { - "type": "send_notify", - "incomming": content, - "senderReplyChannelName": self.channel_name, - "senderUserId": self.scope['user'].id or 0, - }, - ) - await self.channel_layer.group_send( - "site", - { - "type": "send_notify", - "incomming": content, - "senderReplyChannelName": self.channel_name, - "senderUserId": self.scope['user'].id or 0, - }, - ) - else: - await self.send_json({'error': 'invalid message'}) + if type == 'notify': + if notify_message_is_valid(content): + await self.channel_layer.group_send( + "projector", + { + "type": "send_notify", + "incomming": content, + "senderReplyChannelName": self.channel_name, + "senderUserId": self.scope['user'].id or 0, + }, + ) + await self.channel_layer.group_send( + "site", + { + "type": "send_notify", + "incomming": content, + "senderReplyChannelName": self.channel_name, + "senderUserId": self.scope['user'].id or 0, + }, + ) + else: + await self.send_json(type='error', content='Invalid notify message', in_response=id) async def send_notify(self, event: Dict[str, Any]) -> None: """ @@ -90,7 +158,7 @@ class SiteConsumer(AsyncJsonWebsocketConsumer): out.append(item) if out: - await self.send_json(out) + await self.send_json(type='notify', content=out) async def send_data(self, event: Dict[str, Any]) -> None: """ @@ -112,10 +180,10 @@ class SiteConsumer(AsyncJsonWebsocketConsumer): collection_string=collection_string, id=id, action='deleted')) - await self.send_json(output) + await self.send_json(type='autoupdate', content=output) -class ProjectorConsumer(AsyncJsonWebsocketConsumer): +class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer): """ Websocket Consumer for the projector. """ @@ -132,14 +200,14 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer): await self.accept() if not await database_sync_to_async(has_perm)(user, 'core.can_see_projector'): - await self.send_json({'text': 'No permissions to see this projector.'}) + await self.send_json(type='error', content='No permissions to see this projector.') # TODO: Shouldend we just close the websocket connection with an error message? # self.close(code=4403) else: out = await sync_to_async(projector_startup_data)(projector_id) - await self.send_json(out) + await self.send_json(type='autoupdate', content=out) - async def receive_json(self, content: Any) -> None: + async def receive_content(self, type: str, content: Any, id: str) -> None: """ If we recieve something from the client we currently just interpret this as a notify message. @@ -188,7 +256,7 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer): out.append(item) if out: - await self.send_json(out) + await self.send_json(type='notify', content=out) async def send_data(self, event: Dict[str, Any]) -> None: """ @@ -199,7 +267,7 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer): output = await projector_sync_send_data(projector_id, collection_elements) if output: - await self.send_json(output) + await self.send_json(type='autoupdate', content=output) async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> List[Any]: @@ -288,19 +356,33 @@ def notify_message_is_valid(message: object) -> bool: """ Returns True, when the message is a valid notify_message. """ - if not isinstance(message, list): - # message has to be a list + schema = { + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "Notify elements.", + "description": "Elements that one client can send to one or many other clients.", + "type": "array", + "items": { + "type": "object", + "properties": { + "projectors": { + "type": "array", + "items": {"type": "integer"}, + }, + "reply_channels": { + "type": "array", + "items": {"type": "string"}, + }, + "users": { + "type": "array", + "items": {"type": "integer"}, + } + } + }, + "minItems": 1, + } + try: + jsonschema.validate(message, schema) + except jsonschema.ValidationError: return False - - if not message: - # message must contain at least one element - return False - - for element in message: - if not isinstance(element, dict): - # All elements have to be a dict - return False - # TODO: There could be more checks. For example 'users' has to be a list of int - # Check could be done with json-schema: - # https://pypi.org/project/jsonschema/ - return True + else: + return True diff --git a/requirements_production.txt b/requirements_production.txt index 624f06fe7..8bb0190df 100644 --- a/requirements_production.txt +++ b/requirements_production.txt @@ -5,6 +5,7 @@ daphne>=2.2,<2.3 Django>=1.11,<2.2 djangorestframework>=3.4,<3.9 jsonfield2>=3.0,<3.1 +jsonschema>=2.6.0<2.7 mypy_extensions>=0.3,<0.4 PyPDF2>=1.26,<1.27 roman>=2.0,<3.1 diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index c510f79ab..43efacbcc 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -63,8 +63,11 @@ async def test_normal_connection(communicator): response = await communicator.receive_json_from() - # Test, that there is a lot of startup data. - assert len(response) > 5 + type = response.get('type') + content = response.get('content') + assert type == 'autoupdate' + # Test, that both example objects are returned + assert len(content) > 10 @pytest.mark.asyncio @@ -78,7 +81,10 @@ async def test_receive_changed_data(communicator): response = await communicator.receive_json_from() id = config.get_key_to_id()['general_event_name'] - assert response == [ + type = response.get('type') + content = response.get('content') + assert type == 'autoupdate' + assert content == [ {'action': 'changed', 'collection': 'core/config', 'data': {'id': id, 'key': 'general_event_name', 'value': 'Test Event'}, @@ -122,7 +128,10 @@ async def test_receive_deleted_data(communicator): await sync_to_async(inform_deleted_data)([(Collection1().get_collection_string(), 1)]) response = await communicator.receive_json_from() - assert response == [{'action': 'deleted', 'collection': Collection1().get_collection_string(), 'id': 1}] + type = response.get('type') + content = response.get('content') + assert type == 'autoupdate' + assert content == [{'action': 'deleted', 'collection': Collection1().get_collection_string(), 'id': 1}] @pytest.mark.asyncio @@ -132,11 +141,12 @@ async def test_send_invalid_notify_not_a_list(communicator): # Await the startup data await communicator.receive_json_from() - await communicator.send_json_to({'testmessage': 'foobar, what else.'}) - + await communicator.send_json_to({'type': 'notify', 'content': {'testmessage': 'foobar, what else.'}, 'id': 'test_send_invalid_notify_not_a_list'}) response = await communicator.receive_json_from() - assert response == {'error': 'invalid message'} + assert response['type'] == 'error' + assert response['content'] == 'Invalid notify message' + assert response['in_response'] == 'test_send_invalid_notify_not_a_list' @pytest.mark.asyncio @@ -146,11 +156,12 @@ async def test_send_invalid_notify_no_elements(communicator): # Await the startup data await communicator.receive_json_from() - await communicator.send_json_to([]) - + await communicator.send_json_to({'type': 'notify', 'content': [], 'id': 'test_send_invalid_notify_no_elements'}) response = await communicator.receive_json_from() - assert response == {'error': 'invalid message'} + assert response['type'] == 'error' + assert response['content'] == 'Invalid notify message' + assert response['in_response'] == 'test_send_invalid_notify_no_elements' @pytest.mark.asyncio @@ -160,11 +171,12 @@ async def test_send_invalid_notify_str_in_list(communicator): # Await the startup data await communicator.receive_json_from() - await communicator.send_json_to([{}, 'testmessage']) - + await communicator.send_json_to({'type': 'notify', 'content': [{}, 'testmessage'], 'id': 'test_send_invalid_notify_str_in_list'}) response = await communicator.receive_json_from() - assert response == {'error': 'invalid message'} + assert response['type'] == 'error' + assert response['content'] == 'Invalid notify message' + assert response['in_response'] == 'test_send_invalid_notify_str_in_list' @pytest.mark.asyncio @@ -174,12 +186,65 @@ async def test_send_valid_notify(communicator): # Await the startup data await communicator.receive_json_from() - await communicator.send_json_to([{'testmessage': 'foobar, what else.'}]) - + await communicator.send_json_to({'type': 'notify', 'content': [{'testmessage': 'foobar, what else.'}], 'id': 'test'}) response = await communicator.receive_json_from() - assert isinstance(response, list) - assert len(response) == 1 - assert response[0]['testmessage'] == 'foobar, what else.' - assert 'senderReplyChannelName' in response[0] - assert response[0]['senderUserId'] == 0 + content = response['content'] + assert isinstance(content, list) + assert len(content) == 1 + assert content[0]['testmessage'] == 'foobar, what else.' + assert 'senderReplyChannelName' in content[0] + assert content[0]['senderUserId'] == 0 + + +@pytest.mark.asyncio +async def test_invalid_websocket_message_type(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + # Await the startup data + await communicator.receive_json_from() + + await communicator.send_json_to([]) + + response = await communicator.receive_json_from() + assert response['type'] == 'error' + + +@pytest.mark.asyncio +async def test_invalid_websocket_message_no_id(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + # Await the startup data + await communicator.receive_json_from() + + await communicator.send_json_to({'type': 'test', 'content': 'foobar'}) + + response = await communicator.receive_json_from() + assert response['type'] == 'error' + + +@pytest.mark.asyncio +async def test_invalid_websocket_message_no_content(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + # Await the startup data + await communicator.receive_json_from() + + await communicator.send_json_to({'type': 'test', 'id': 'test_id'}) + + response = await communicator.receive_json_from() + assert response['type'] == 'error' + + +@pytest.mark.asyncio +async def test_send_unknown_type(communicator): + await set_config('general_system_enable_anonymous', True) + await communicator.connect() + # Await the startup data + await communicator.receive_json_from() + + await communicator.send_json_to({'type': 'if_you_add_this_type_to_openslides_I_will_be_sad', 'content': True, 'id': 'test_id'}) + + response = await communicator.receive_json_from() + assert response['type'] == 'error' + assert response['in_response'] == 'test_id' diff --git a/tests/settings.py b/tests/settings.py index 5d9b1fe05..3ff83ea81 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -43,21 +43,8 @@ DATABASES = { } } -# Configure session in the cache - -CACHES = { - 'default': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - } -} - SESSION_ENGINE = "django.contrib.sessions.backends.cache" -# When use_redis is True, the restricted data cache caches the data individuel -# for each user. This requires a lot of memory if there are a lot of active -# users. If use_redis is False, this setting has no effect. -DISABLE_USER_CACHE = False - # Internationalization # https://docs.djangoproject.com/en/1.10/topics/i18n/ From 22f7d84caebe88088589491cee5aa958dc7510e6 Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Thu, 23 Aug 2018 15:28:57 +0200 Subject: [PATCH 2/2] New websocket message format for both clients --- client/src/app/app.component.ts | 12 ++- .../app/core/services/autoupdate.service.ts | 17 +--- .../app/core/services/notify.service.spec.ts | 15 ++++ .../src/app/core/services/notify.service.ts | 42 ++++++++++ .../app/core/services/websocket.service.ts | 67 +++++++++++++-- client/src/app/site/site.component.ts | 8 +- openslides/core/static/js/core/base.js | 82 +++++++++++++------ 7 files changed, 190 insertions(+), 53 deletions(-) create mode 100644 client/src/app/core/services/notify.service.spec.ts create mode 100644 client/src/app/core/services/notify.service.ts diff --git a/client/src/app/app.component.ts b/client/src/app/app.component.ts index ecddb29a0..30fce4041 100644 --- a/client/src/app/app.component.ts +++ b/client/src/app/app.component.ts @@ -1,5 +1,7 @@ import { Component, OnInit } from '@angular/core'; import { TranslateService } from '@ngx-translate/core'; +import { AutoupdateService } from './core/services/autoupdate.service'; +import { NotifyService } from './core/services/notify.service'; /** * Angular's global App Component @@ -12,11 +14,15 @@ import { TranslateService } from '@ngx-translate/core'; export class AppComponent { /** * Initialises the operator, the auto update (and therefore a websocket) feature and the translation unit. - * @param operator - * @param autoupdate + * @param autoupdateService + * @param notifyService * @param translate */ - constructor(private translate: TranslateService) { + constructor( + private autoupdateService: AutoupdateService, + private notifyService: NotifyService, + private translate: TranslateService + ) { // manually add the supported languages translate.addLangs(['en', 'de', 'fr']); // this language will be used as a fallback when a translation isn't found in the current language diff --git a/client/src/app/core/services/autoupdate.service.ts b/client/src/app/core/services/autoupdate.service.ts index c60c9fb5b..66c1606fc 100644 --- a/client/src/app/core/services/autoupdate.service.ts +++ b/client/src/app/core/services/autoupdate.service.ts @@ -33,26 +33,13 @@ import { User } from 'app/shared/models/users/user'; providedIn: 'root' }) export class AutoupdateService extends OpenSlidesComponent { - /** - * Stores the to create the socket created using {@link WebsocketService}. - */ - private socket; - /** * Constructor to create the AutoupdateService. Calls the constructor of the parent class. * @param websocketService */ constructor(private websocketService: WebsocketService) { super(); - } - - /** - * Function to start the automatic update process - * will build up a websocket connection using {@link WebsocketService} - */ - startAutoupdate(): void { - this.socket = this.websocketService.connect(); - this.socket.subscribe(response => { + websocketService.getOberservable('autoupdate').subscribe(response => { this.storeResponse(response); }); } @@ -69,8 +56,6 @@ export class AutoupdateService extends OpenSlidesComponent { socketResponse.forEach(jsonObj => { const targetClass = this.getClassFromCollectionString(jsonObj.collection); if (jsonObj.action === 'deleted') { - console.log('storeResponse detect delete'); - this.DS.remove(jsonObj.collection, jsonObj.id); } else { this.DS.add(new targetClass().deserialize(jsonObj.data)); diff --git a/client/src/app/core/services/notify.service.spec.ts b/client/src/app/core/services/notify.service.spec.ts new file mode 100644 index 000000000..4963d15b5 --- /dev/null +++ b/client/src/app/core/services/notify.service.spec.ts @@ -0,0 +1,15 @@ +import { TestBed, inject } from '@angular/core/testing'; + +import { NotifyService } from './notify.service'; + +describe('WebsocketService', () => { + beforeEach(() => { + TestBed.configureTestingModule({ + providers: [NotifyService] + }); + }); + + it('should be created', inject([NotifyService], (service: NotifyService) => { + expect(service).toBeTruthy(); + })); +}); diff --git a/client/src/app/core/services/notify.service.ts b/client/src/app/core/services/notify.service.ts new file mode 100644 index 000000000..a96e2e5f1 --- /dev/null +++ b/client/src/app/core/services/notify.service.ts @@ -0,0 +1,42 @@ +import { Injectable } from '@angular/core'; + +import { OpenSlidesComponent } from 'app/openslides.component'; +import { WebsocketService } from './websocket.service'; + +interface NotifyFormat { + id: number; //Dummy +} + +/** + * Handles all incoming and outgoing notify messages via {@link WebsocketService}. + */ +@Injectable({ + providedIn: 'root' +}) +export class NotifyService extends OpenSlidesComponent { + /** + * Constructor to create the NotifyService. Registers itself to the WebsocketService. + * @param websocketService + */ + constructor(private websocketService: WebsocketService) { + super(); + websocketService.getOberservable('notify').subscribe(notify => { + this.receive(notify); + }); + } + + // TODO: Implement this + private receive(notify: NotifyFormat): void { + console.log('recv', notify); + // TODO: Use a Subject, so one can subscribe and get notifies. + } + + // TODO: Make this api better: e.g. send(data, users?, projectors?, channel?, ...) + /** + * Sents a notify object to the server + * @param notify the notify objects + */ + public send(notify: NotifyFormat): void { + this.websocketService.send('notify', notify); + } +} diff --git a/client/src/app/core/services/websocket.service.ts b/client/src/app/core/services/websocket.service.ts index 99e3099a8..7ce4edcdb 100644 --- a/client/src/app/core/services/websocket.service.ts +++ b/client/src/app/core/services/websocket.service.ts @@ -1,6 +1,13 @@ import { Injectable } from '@angular/core'; import { Router } from '@angular/router'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; +import { Observable, Subject } from 'rxjs'; + +interface WebsocketMessage { + type: string; + content: any; + id: string; +} /** * Service that handles WebSocket connections. @@ -20,21 +27,71 @@ export class WebsocketService { /** * Observable subject that might be `any` for simplicity, `MessageEvent` or something appropriate */ - private subject: WebSocketSubject; + private _websocketSubject: WebSocketSubject; + + /** + * Subjects for types of websocket messages. A subscriber can get an Observable by {@function getOberservable}. + */ + private _subjects: { [type: string]: Subject } = {}; /** * Creates a new WebSocket connection as WebSocketSubject * * Can return old Subjects to prevent multiple WebSocket connections. */ - public connect(): WebSocketSubject { + public connect(): void { const socketProtocol = this.getWebSocketProtocol(); const socketPath = this.getWebSocketPath(); const socketServer = window.location.hostname + ':' + window.location.port; - if (!this.subject) { - this.subject = webSocket(socketProtocol + socketServer + socketPath); + if (!this._websocketSubject) { + this._websocketSubject = webSocket(socketProtocol + socketServer + socketPath); + // directly subscribe. The messages are distributes below + this._websocketSubject.subscribe(message => { + const type: string = message.type; + if (type === 'error') { + console.error('Websocket error', message.content); + } else if (this._subjects[type]) { + this._subjects[type].next(message.content); + } else { + console.log(`Got unknown websocket message type "${type}" with content`, message.content); + } + }); } - return this.subject; + } + + /** + * Returns an observable for messages of the given type. + * @param type the message type + */ + public getOberservable(type: string): Observable { + if (!this._subjects[type]) { + this._subjects[type] = new Subject(); + } + return this._subjects[type].asObservable(); + } + + /** + * Sends a message to the server with the content and the given type. + * + * @param type the message type + * @param content the actual content + */ + public send(type: string, content: T): void { + if (!this._websocketSubject) { + return; + } + + const message: WebsocketMessage = { + type: type, + content: content, + id: '' + }; + + const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'; + for (let i = 0; i < 8; i++) { + message.id += possible.charAt(Math.floor(Math.random() * possible.length)); + } + this._websocketSubject.next(message); } /** diff --git a/client/src/app/site/site.component.ts b/client/src/app/site/site.component.ts index 005584584..20ba8782a 100644 --- a/client/src/app/site/site.component.ts +++ b/client/src/app/site/site.component.ts @@ -3,8 +3,8 @@ import { Router } from '@angular/router'; import { BreakpointObserver, Breakpoints, BreakpointState } from '@angular/cdk/layout'; import { AuthService } from 'app/core/services/auth.service'; -import { AutoupdateService } from 'app/core/services/autoupdate.service'; import { OperatorService } from 'app/core/services/operator.service'; +import { WebsocketService } from 'app/core/services/websocket.service'; import { TranslateService } from '@ngx-translate/core'; //showcase import { BaseComponent } from 'app/base.component'; @@ -33,7 +33,7 @@ export class SiteComponent extends BaseComponent implements OnInit { * Constructor * * @param authService - * @param autoupdateService + * @param websocketService * @param operator * @param router * @param breakpointObserver @@ -42,7 +42,7 @@ export class SiteComponent extends BaseComponent implements OnInit { */ constructor( private authService: AuthService, - private autoupdateService: AutoupdateService, + private websocketService: WebsocketService, private operator: OperatorService, private router: Router, public vp: ViewportService, @@ -66,7 +66,7 @@ export class SiteComponent extends BaseComponent implements OnInit { // start autoupdate if the user is logged in: this.operator.whoAmI().subscribe(resp => { if (resp.user) { - this.autoupdateService.startAutoupdate(); + this.websocketService.connect(); } else { //if whoami is not sucsessfull, forward to login again this.operator.clear(); diff --git a/openslides/core/static/js/core/base.js b/openslides/core/static/js/core/base.js index 60acb14fa..4f1992ed5 100644 --- a/openslides/core/static/js/core/base.js +++ b/openslides/core/static/js/core/base.js @@ -117,11 +117,11 @@ angular.module('OpenSlidesApp.core', [ $timeout(runRetryConnectCallbacks, getTimeoutTime()); }; socket.onmessage = function (event) { - var dataList = []; + var data; try { - dataList = JSON.parse(event.data); + data = JSON.parse(event.data); _.forEach(Autoupdate.messageReceivers, function (receiver) { - receiver(dataList); + receiver(data); }); } catch(err) { console.error(err); @@ -133,10 +133,23 @@ angular.module('OpenSlidesApp.core', [ ErrorMessage.clearConnectionError(); }; }; - Autoupdate.send = function (message) { - if (socket) { - socket.send(JSON.stringify(message)); + Autoupdate.send = function (type, content) { + if (!socket) { + return; } + + var message = { + type: type, + content: content, + id: '', + }; + + // Generate random id + var possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'; + for (var i = 0; i < 8; i++) { + message.id += possible.charAt(Math.floor(Math.random() * possible.length)); + } + socket.send(JSON.stringify(message)); }; Autoupdate.closeConnection = function () { if (socket) { @@ -369,7 +382,12 @@ angular.module('OpenSlidesApp.core', [ 'dsEject', function (DS, autoupdate, dsEject) { // Handler for normal autoupdate messages. - autoupdate.onMessage(function(dataList) { + autoupdate.onMessage(function(data) { + if (data.type !== 'autoupdate') { + return; + } + + var dataList = data.content; var dataListByCollection = _.groupBy(dataList, 'collection'); _.forEach(dataListByCollection, function (list, key) { var changedElements = []; @@ -379,22 +397,19 @@ angular.module('OpenSlidesApp.core', [ // Uncomment this line for debugging to log all autoupdates: // console.log("Received object: " + data.collection + ", " + data.id); - // Now handle autoupdate message but do not handle notify messages. - if (data.collection !== 'notify') { - // remove (=eject) object from local DS store - var instance = DS.get(data.collection, data.id); - if (instance) { - dsEject(data.collection, instance); - } - // check if object changed or deleted - if (data.action === 'changed') { - changedElements.push(data.data); - } else if (data.action === 'deleted') { - deletedElements.push(data.id); - } else { - console.error('Error: Undefined action for received object' + - '(' + data.collection + ', ' + data.id + ')'); - } + // remove (=eject) object from local DS store + var instance = DS.get(data.collection, data.id); + if (instance) { + dsEject(data.collection, instance); + } + // check if object changed or deleted + if (data.action === 'changed') { + changedElements.push(data.data); + } else if (data.action === 'deleted') { + deletedElements.push(data.id); + } else { + console.error('Error: Undefined action for received object' + + '(' + data.collection + ', ' + data.id + ')'); } }); // add (=inject) all given objects into local DS store @@ -418,7 +433,12 @@ angular.module('OpenSlidesApp.core', [ var anonymousTrackId; // Handler for notify messages. - autoupdate.onMessage(function(dataList) { + autoupdate.onMessage(function(data) { + if (data.type !== 'notify') { + return; + } + + var dataList = data.content; var dataListByCollection = _.groupBy(dataList, 'collection'); _.forEach(dataListByCollection.notify, function (notifyItem) { // Check, if this current user (or anonymous instance) has send this notify. @@ -505,7 +525,7 @@ angular.module('OpenSlidesApp.core', [ } notifyItem.anonymousTrackId = anonymousTrackId; } - autoupdate.send([notifyItem]); + autoupdate.send('notify', [notifyItem]); } else { throw 'eventName should only consist of [a-zA-Z0-9_-]'; } @@ -514,6 +534,18 @@ angular.module('OpenSlidesApp.core', [ } ]) +.run([ + 'autoupdate', + function (autoupdate) { + // Handler for normal autoupdate messages. + autoupdate.onMessage(function (data) { + if (data.type === 'error') { + console.error("Websocket error", data.content); + } + }); + } +]) + // Save the server time to the rootscope. .run([ '$http',