Merge pull request #3807 from ostcar/websocket_protocol
Add a protocol for websocket
This commit is contained in:
commit
7dd7bb1b8d
@ -36,6 +36,7 @@ Core:
|
|||||||
- Support Python 3.7 [#3786].
|
- Support Python 3.7 [#3786].
|
||||||
- Updated pdfMake to 0.1.37 [#3766].
|
- Updated pdfMake to 0.1.37 [#3766].
|
||||||
- Updated Django to 2.1 [#3777, #3786].
|
- Updated Django to 2.1 [#3777, #3786].
|
||||||
|
- Adds a websocket protocol [#3807].
|
||||||
|
|
||||||
|
|
||||||
Version 2.2 (2018-06-06)
|
Version 2.2 (2018-06-06)
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
import { Component, OnInit } from '@angular/core';
|
import { Component, OnInit } from '@angular/core';
|
||||||
import { TranslateService } from '@ngx-translate/core';
|
import { TranslateService } from '@ngx-translate/core';
|
||||||
|
import { AutoupdateService } from './core/services/autoupdate.service';
|
||||||
|
import { NotifyService } from './core/services/notify.service';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Angular's global App Component
|
* Angular's global App Component
|
||||||
@ -12,11 +14,15 @@ import { TranslateService } from '@ngx-translate/core';
|
|||||||
export class AppComponent {
|
export class AppComponent {
|
||||||
/**
|
/**
|
||||||
* Initialises the operator, the auto update (and therefore a websocket) feature and the translation unit.
|
* Initialises the operator, the auto update (and therefore a websocket) feature and the translation unit.
|
||||||
* @param operator
|
* @param autoupdateService
|
||||||
* @param autoupdate
|
* @param notifyService
|
||||||
* @param translate
|
* @param translate
|
||||||
*/
|
*/
|
||||||
constructor(private translate: TranslateService) {
|
constructor(
|
||||||
|
private autoupdateService: AutoupdateService,
|
||||||
|
private notifyService: NotifyService,
|
||||||
|
private translate: TranslateService
|
||||||
|
) {
|
||||||
// manually add the supported languages
|
// manually add the supported languages
|
||||||
translate.addLangs(['en', 'de', 'fr']);
|
translate.addLangs(['en', 'de', 'fr']);
|
||||||
// this language will be used as a fallback when a translation isn't found in the current language
|
// this language will be used as a fallback when a translation isn't found in the current language
|
||||||
|
@ -33,26 +33,13 @@ import { User } from 'app/shared/models/users/user';
|
|||||||
providedIn: 'root'
|
providedIn: 'root'
|
||||||
})
|
})
|
||||||
export class AutoupdateService extends OpenSlidesComponent {
|
export class AutoupdateService extends OpenSlidesComponent {
|
||||||
/**
|
|
||||||
* Stores the to create the socket created using {@link WebsocketService}.
|
|
||||||
*/
|
|
||||||
private socket;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor to create the AutoupdateService. Calls the constructor of the parent class.
|
* Constructor to create the AutoupdateService. Calls the constructor of the parent class.
|
||||||
* @param websocketService
|
* @param websocketService
|
||||||
*/
|
*/
|
||||||
constructor(private websocketService: WebsocketService) {
|
constructor(private websocketService: WebsocketService) {
|
||||||
super();
|
super();
|
||||||
}
|
websocketService.getOberservable<any>('autoupdate').subscribe(response => {
|
||||||
|
|
||||||
/**
|
|
||||||
* Function to start the automatic update process
|
|
||||||
* will build up a websocket connection using {@link WebsocketService}
|
|
||||||
*/
|
|
||||||
startAutoupdate(): void {
|
|
||||||
this.socket = this.websocketService.connect();
|
|
||||||
this.socket.subscribe(response => {
|
|
||||||
this.storeResponse(response);
|
this.storeResponse(response);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -69,8 +56,6 @@ export class AutoupdateService extends OpenSlidesComponent {
|
|||||||
socketResponse.forEach(jsonObj => {
|
socketResponse.forEach(jsonObj => {
|
||||||
const targetClass = this.getClassFromCollectionString(jsonObj.collection);
|
const targetClass = this.getClassFromCollectionString(jsonObj.collection);
|
||||||
if (jsonObj.action === 'deleted') {
|
if (jsonObj.action === 'deleted') {
|
||||||
console.log('storeResponse detect delete');
|
|
||||||
|
|
||||||
this.DS.remove(jsonObj.collection, jsonObj.id);
|
this.DS.remove(jsonObj.collection, jsonObj.id);
|
||||||
} else {
|
} else {
|
||||||
this.DS.add(new targetClass().deserialize(jsonObj.data));
|
this.DS.add(new targetClass().deserialize(jsonObj.data));
|
||||||
|
15
client/src/app/core/services/notify.service.spec.ts
Normal file
15
client/src/app/core/services/notify.service.spec.ts
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
import { TestBed, inject } from '@angular/core/testing';
|
||||||
|
|
||||||
|
import { NotifyService } from './notify.service';
|
||||||
|
|
||||||
|
describe('WebsocketService', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
TestBed.configureTestingModule({
|
||||||
|
providers: [NotifyService]
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should be created', inject([NotifyService], (service: NotifyService) => {
|
||||||
|
expect(service).toBeTruthy();
|
||||||
|
}));
|
||||||
|
});
|
42
client/src/app/core/services/notify.service.ts
Normal file
42
client/src/app/core/services/notify.service.ts
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
import { Injectable } from '@angular/core';
|
||||||
|
|
||||||
|
import { OpenSlidesComponent } from 'app/openslides.component';
|
||||||
|
import { WebsocketService } from './websocket.service';
|
||||||
|
|
||||||
|
interface NotifyFormat {
|
||||||
|
id: number; //Dummy
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles all incoming and outgoing notify messages via {@link WebsocketService}.
|
||||||
|
*/
|
||||||
|
@Injectable({
|
||||||
|
providedIn: 'root'
|
||||||
|
})
|
||||||
|
export class NotifyService extends OpenSlidesComponent {
|
||||||
|
/**
|
||||||
|
* Constructor to create the NotifyService. Registers itself to the WebsocketService.
|
||||||
|
* @param websocketService
|
||||||
|
*/
|
||||||
|
constructor(private websocketService: WebsocketService) {
|
||||||
|
super();
|
||||||
|
websocketService.getOberservable<any>('notify').subscribe(notify => {
|
||||||
|
this.receive(notify);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Implement this
|
||||||
|
private receive(notify: NotifyFormat): void {
|
||||||
|
console.log('recv', notify);
|
||||||
|
// TODO: Use a Subject, so one can subscribe and get notifies.
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Make this api better: e.g. send(data, users?, projectors?, channel?, ...)
|
||||||
|
/**
|
||||||
|
* Sents a notify object to the server
|
||||||
|
* @param notify the notify objects
|
||||||
|
*/
|
||||||
|
public send(notify: NotifyFormat): void {
|
||||||
|
this.websocketService.send('notify', notify);
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,13 @@
|
|||||||
import { Injectable } from '@angular/core';
|
import { Injectable } from '@angular/core';
|
||||||
import { Router } from '@angular/router';
|
import { Router } from '@angular/router';
|
||||||
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
|
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
|
||||||
|
import { Observable, Subject } from 'rxjs';
|
||||||
|
|
||||||
|
interface WebsocketMessage {
|
||||||
|
type: string;
|
||||||
|
content: any;
|
||||||
|
id: string;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Service that handles WebSocket connections.
|
* Service that handles WebSocket connections.
|
||||||
@ -20,21 +27,71 @@ export class WebsocketService {
|
|||||||
/**
|
/**
|
||||||
* Observable subject that might be `any` for simplicity, `MessageEvent` or something appropriate
|
* Observable subject that might be `any` for simplicity, `MessageEvent` or something appropriate
|
||||||
*/
|
*/
|
||||||
private subject: WebSocketSubject<any>;
|
private _websocketSubject: WebSocketSubject<WebsocketMessage>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subjects for types of websocket messages. A subscriber can get an Observable by {@function getOberservable}.
|
||||||
|
*/
|
||||||
|
private _subjects: { [type: string]: Subject<any> } = {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new WebSocket connection as WebSocketSubject
|
* Creates a new WebSocket connection as WebSocketSubject
|
||||||
*
|
*
|
||||||
* Can return old Subjects to prevent multiple WebSocket connections.
|
* Can return old Subjects to prevent multiple WebSocket connections.
|
||||||
*/
|
*/
|
||||||
public connect(): WebSocketSubject<any> {
|
public connect(): void {
|
||||||
const socketProtocol = this.getWebSocketProtocol();
|
const socketProtocol = this.getWebSocketProtocol();
|
||||||
const socketPath = this.getWebSocketPath();
|
const socketPath = this.getWebSocketPath();
|
||||||
const socketServer = window.location.hostname + ':' + window.location.port;
|
const socketServer = window.location.hostname + ':' + window.location.port;
|
||||||
if (!this.subject) {
|
if (!this._websocketSubject) {
|
||||||
this.subject = webSocket(socketProtocol + socketServer + socketPath);
|
this._websocketSubject = webSocket(socketProtocol + socketServer + socketPath);
|
||||||
|
// directly subscribe. The messages are distributes below
|
||||||
|
this._websocketSubject.subscribe(message => {
|
||||||
|
const type: string = message.type;
|
||||||
|
if (type === 'error') {
|
||||||
|
console.error('Websocket error', message.content);
|
||||||
|
} else if (this._subjects[type]) {
|
||||||
|
this._subjects[type].next(message.content);
|
||||||
|
} else {
|
||||||
|
console.log(`Got unknown websocket message type "${type}" with content`, message.content);
|
||||||
}
|
}
|
||||||
return this.subject;
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an observable for messages of the given type.
|
||||||
|
* @param type the message type
|
||||||
|
*/
|
||||||
|
public getOberservable<T>(type: string): Observable<T> {
|
||||||
|
if (!this._subjects[type]) {
|
||||||
|
this._subjects[type] = new Subject<T>();
|
||||||
|
}
|
||||||
|
return this._subjects[type].asObservable();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a message to the server with the content and the given type.
|
||||||
|
*
|
||||||
|
* @param type the message type
|
||||||
|
* @param content the actual content
|
||||||
|
*/
|
||||||
|
public send<T>(type: string, content: T): void {
|
||||||
|
if (!this._websocketSubject) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message: WebsocketMessage = {
|
||||||
|
type: type,
|
||||||
|
content: content,
|
||||||
|
id: ''
|
||||||
|
};
|
||||||
|
|
||||||
|
const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz';
|
||||||
|
for (let i = 0; i < 8; i++) {
|
||||||
|
message.id += possible.charAt(Math.floor(Math.random() * possible.length));
|
||||||
|
}
|
||||||
|
this._websocketSubject.next(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3,8 +3,8 @@ import { Router } from '@angular/router';
|
|||||||
import { BreakpointObserver, Breakpoints, BreakpointState } from '@angular/cdk/layout';
|
import { BreakpointObserver, Breakpoints, BreakpointState } from '@angular/cdk/layout';
|
||||||
|
|
||||||
import { AuthService } from 'app/core/services/auth.service';
|
import { AuthService } from 'app/core/services/auth.service';
|
||||||
import { AutoupdateService } from 'app/core/services/autoupdate.service';
|
|
||||||
import { OperatorService } from 'app/core/services/operator.service';
|
import { OperatorService } from 'app/core/services/operator.service';
|
||||||
|
import { WebsocketService } from 'app/core/services/websocket.service';
|
||||||
|
|
||||||
import { TranslateService } from '@ngx-translate/core'; //showcase
|
import { TranslateService } from '@ngx-translate/core'; //showcase
|
||||||
import { BaseComponent } from 'app/base.component';
|
import { BaseComponent } from 'app/base.component';
|
||||||
@ -33,7 +33,7 @@ export class SiteComponent extends BaseComponent implements OnInit {
|
|||||||
* Constructor
|
* Constructor
|
||||||
*
|
*
|
||||||
* @param authService
|
* @param authService
|
||||||
* @param autoupdateService
|
* @param websocketService
|
||||||
* @param operator
|
* @param operator
|
||||||
* @param router
|
* @param router
|
||||||
* @param breakpointObserver
|
* @param breakpointObserver
|
||||||
@ -42,7 +42,7 @@ export class SiteComponent extends BaseComponent implements OnInit {
|
|||||||
*/
|
*/
|
||||||
constructor(
|
constructor(
|
||||||
private authService: AuthService,
|
private authService: AuthService,
|
||||||
private autoupdateService: AutoupdateService,
|
private websocketService: WebsocketService,
|
||||||
private operator: OperatorService,
|
private operator: OperatorService,
|
||||||
private router: Router,
|
private router: Router,
|
||||||
public vp: ViewportService,
|
public vp: ViewportService,
|
||||||
@ -66,7 +66,7 @@ export class SiteComponent extends BaseComponent implements OnInit {
|
|||||||
// start autoupdate if the user is logged in:
|
// start autoupdate if the user is logged in:
|
||||||
this.operator.whoAmI().subscribe(resp => {
|
this.operator.whoAmI().subscribe(resp => {
|
||||||
if (resp.user) {
|
if (resp.user) {
|
||||||
this.autoupdateService.startAutoupdate();
|
this.websocketService.connect();
|
||||||
} else {
|
} else {
|
||||||
//if whoami is not sucsessfull, forward to login again
|
//if whoami is not sucsessfull, forward to login again
|
||||||
this.operator.clear();
|
this.operator.clear();
|
||||||
|
@ -117,11 +117,11 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
$timeout(runRetryConnectCallbacks, getTimeoutTime());
|
$timeout(runRetryConnectCallbacks, getTimeoutTime());
|
||||||
};
|
};
|
||||||
socket.onmessage = function (event) {
|
socket.onmessage = function (event) {
|
||||||
var dataList = [];
|
var data;
|
||||||
try {
|
try {
|
||||||
dataList = JSON.parse(event.data);
|
data = JSON.parse(event.data);
|
||||||
_.forEach(Autoupdate.messageReceivers, function (receiver) {
|
_.forEach(Autoupdate.messageReceivers, function (receiver) {
|
||||||
receiver(dataList);
|
receiver(data);
|
||||||
});
|
});
|
||||||
} catch(err) {
|
} catch(err) {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
@ -133,10 +133,23 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
ErrorMessage.clearConnectionError();
|
ErrorMessage.clearConnectionError();
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
Autoupdate.send = function (message) {
|
Autoupdate.send = function (type, content) {
|
||||||
if (socket) {
|
if (!socket) {
|
||||||
socket.send(JSON.stringify(message));
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var message = {
|
||||||
|
type: type,
|
||||||
|
content: content,
|
||||||
|
id: '',
|
||||||
|
};
|
||||||
|
|
||||||
|
// Generate random id
|
||||||
|
var possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz';
|
||||||
|
for (var i = 0; i < 8; i++) {
|
||||||
|
message.id += possible.charAt(Math.floor(Math.random() * possible.length));
|
||||||
|
}
|
||||||
|
socket.send(JSON.stringify(message));
|
||||||
};
|
};
|
||||||
Autoupdate.closeConnection = function () {
|
Autoupdate.closeConnection = function () {
|
||||||
if (socket) {
|
if (socket) {
|
||||||
@ -369,7 +382,12 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
'dsEject',
|
'dsEject',
|
||||||
function (DS, autoupdate, dsEject) {
|
function (DS, autoupdate, dsEject) {
|
||||||
// Handler for normal autoupdate messages.
|
// Handler for normal autoupdate messages.
|
||||||
autoupdate.onMessage(function(dataList) {
|
autoupdate.onMessage(function(data) {
|
||||||
|
if (data.type !== 'autoupdate') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var dataList = data.content;
|
||||||
var dataListByCollection = _.groupBy(dataList, 'collection');
|
var dataListByCollection = _.groupBy(dataList, 'collection');
|
||||||
_.forEach(dataListByCollection, function (list, key) {
|
_.forEach(dataListByCollection, function (list, key) {
|
||||||
var changedElements = [];
|
var changedElements = [];
|
||||||
@ -379,8 +397,6 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
// Uncomment this line for debugging to log all autoupdates:
|
// Uncomment this line for debugging to log all autoupdates:
|
||||||
// console.log("Received object: " + data.collection + ", " + data.id);
|
// console.log("Received object: " + data.collection + ", " + data.id);
|
||||||
|
|
||||||
// Now handle autoupdate message but do not handle notify messages.
|
|
||||||
if (data.collection !== 'notify') {
|
|
||||||
// remove (=eject) object from local DS store
|
// remove (=eject) object from local DS store
|
||||||
var instance = DS.get(data.collection, data.id);
|
var instance = DS.get(data.collection, data.id);
|
||||||
if (instance) {
|
if (instance) {
|
||||||
@ -395,7 +411,6 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
console.error('Error: Undefined action for received object' +
|
console.error('Error: Undefined action for received object' +
|
||||||
'(' + data.collection + ', ' + data.id + ')');
|
'(' + data.collection + ', ' + data.id + ')');
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
// add (=inject) all given objects into local DS store
|
// add (=inject) all given objects into local DS store
|
||||||
if (changedElements.length > 0) {
|
if (changedElements.length > 0) {
|
||||||
@ -418,7 +433,12 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
var anonymousTrackId;
|
var anonymousTrackId;
|
||||||
|
|
||||||
// Handler for notify messages.
|
// Handler for notify messages.
|
||||||
autoupdate.onMessage(function(dataList) {
|
autoupdate.onMessage(function(data) {
|
||||||
|
if (data.type !== 'notify') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var dataList = data.content;
|
||||||
var dataListByCollection = _.groupBy(dataList, 'collection');
|
var dataListByCollection = _.groupBy(dataList, 'collection');
|
||||||
_.forEach(dataListByCollection.notify, function (notifyItem) {
|
_.forEach(dataListByCollection.notify, function (notifyItem) {
|
||||||
// Check, if this current user (or anonymous instance) has send this notify.
|
// Check, if this current user (or anonymous instance) has send this notify.
|
||||||
@ -505,7 +525,7 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
}
|
}
|
||||||
notifyItem.anonymousTrackId = anonymousTrackId;
|
notifyItem.anonymousTrackId = anonymousTrackId;
|
||||||
}
|
}
|
||||||
autoupdate.send([notifyItem]);
|
autoupdate.send('notify', [notifyItem]);
|
||||||
} else {
|
} else {
|
||||||
throw 'eventName should only consist of [a-zA-Z0-9_-]';
|
throw 'eventName should only consist of [a-zA-Z0-9_-]';
|
||||||
}
|
}
|
||||||
@ -514,6 +534,18 @@ angular.module('OpenSlidesApp.core', [
|
|||||||
}
|
}
|
||||||
])
|
])
|
||||||
|
|
||||||
|
.run([
|
||||||
|
'autoupdate',
|
||||||
|
function (autoupdate) {
|
||||||
|
// Handler for normal autoupdate messages.
|
||||||
|
autoupdate.onMessage(function (data) {
|
||||||
|
if (data.type === 'error') {
|
||||||
|
console.error("Websocket error", data.content);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
])
|
||||||
|
|
||||||
// Save the server time to the rootscope.
|
// Save the server time to the rootscope.
|
||||||
.run([
|
.run([
|
||||||
'$http',
|
'$http',
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
import jsonschema
|
||||||
from asgiref.sync import sync_to_async
|
from asgiref.sync import sync_to_async
|
||||||
from channels.db import database_sync_to_async
|
from channels.db import database_sync_to_async
|
||||||
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
||||||
@ -16,7 +17,73 @@ from .collection import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class SiteConsumer(AsyncJsonWebsocketConsumer):
|
class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
|
||||||
|
"""
|
||||||
|
Mixin for JSONWebsocketConsumers, that speaks the a special protocol.
|
||||||
|
"""
|
||||||
|
schema = {
|
||||||
|
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||||
|
"title": "OpenSlidesWebsocketProtocol",
|
||||||
|
"description": "The base packages that OpenSlides sends between the server and the client.",
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"type": {
|
||||||
|
"description": "Defines what kind of packages is packed.",
|
||||||
|
"type": "string",
|
||||||
|
"pattern": "notify", # The server can sent other types
|
||||||
|
},
|
||||||
|
"content": {
|
||||||
|
"description": "The content of the package.",
|
||||||
|
},
|
||||||
|
"id": {
|
||||||
|
"description": "An identifier of the package.",
|
||||||
|
"type": "string",
|
||||||
|
},
|
||||||
|
"in_response": {
|
||||||
|
"description": "The id of another package that the other part sent before.",
|
||||||
|
"type": "string",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["type", "content", "id"],
|
||||||
|
}
|
||||||
|
|
||||||
|
async def send_json(self, type: str, content: Any, id: Optional[str] = None, in_response: Optional[str] = None) -> None:
|
||||||
|
"""
|
||||||
|
Sends the data with the type.
|
||||||
|
"""
|
||||||
|
out = {'type': type, 'content': content}
|
||||||
|
if id:
|
||||||
|
out['id'] = id
|
||||||
|
if in_response:
|
||||||
|
out['in_response'] = in_response
|
||||||
|
await super().send_json(out)
|
||||||
|
|
||||||
|
async def receive_json(self, content: Any) -> None:
|
||||||
|
"""
|
||||||
|
Receives the json data, parses it and calls receive_content.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
jsonschema.validate(content, self.schema)
|
||||||
|
except jsonschema.ValidationError as err:
|
||||||
|
try:
|
||||||
|
in_response = content['id']
|
||||||
|
except (TypeError, KeyError):
|
||||||
|
# content is not a dict (TypeError) or has not the key id (KeyError)
|
||||||
|
in_response = None
|
||||||
|
|
||||||
|
await self.send_json(
|
||||||
|
type='error',
|
||||||
|
content=str(err),
|
||||||
|
in_response=in_response)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self.receive_content(type=content['type'], content=content['content'], id=content['id'])
|
||||||
|
|
||||||
|
async def receive_content(self, type: str, content: object, id: str) -> None:
|
||||||
|
raise NotImplementedError("ProtocollAsyncJsonWebsocketConsumer needs the method receive_content()")
|
||||||
|
|
||||||
|
|
||||||
|
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||||
"""
|
"""
|
||||||
Websocket Consumer for the site.
|
Websocket Consumer for the site.
|
||||||
"""
|
"""
|
||||||
@ -36,9 +103,9 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
else:
|
else:
|
||||||
await self.accept()
|
await self.accept()
|
||||||
data = await startup_data(self.scope['user'])
|
data = await startup_data(self.scope['user'])
|
||||||
await self.send_json(data)
|
await self.send_json(type='autoupdate', content=data)
|
||||||
|
|
||||||
async def receive_json(self, content: Any) -> None:
|
async def receive_content(self, type: str, content: Any, id: str) -> None:
|
||||||
"""
|
"""
|
||||||
If we recieve something from the client we currently just interpret this
|
If we recieve something from the client we currently just interpret this
|
||||||
as a notify message.
|
as a notify message.
|
||||||
@ -47,6 +114,7 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
channel name so that a receiver client may reply to the sender or to all
|
channel name so that a receiver client may reply to the sender or to all
|
||||||
sender's instances.
|
sender's instances.
|
||||||
"""
|
"""
|
||||||
|
if type == 'notify':
|
||||||
if notify_message_is_valid(content):
|
if notify_message_is_valid(content):
|
||||||
await self.channel_layer.group_send(
|
await self.channel_layer.group_send(
|
||||||
"projector",
|
"projector",
|
||||||
@ -67,7 +135,7 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await self.send_json({'error': 'invalid message'})
|
await self.send_json(type='error', content='Invalid notify message', in_response=id)
|
||||||
|
|
||||||
async def send_notify(self, event: Dict[str, Any]) -> None:
|
async def send_notify(self, event: Dict[str, Any]) -> None:
|
||||||
"""
|
"""
|
||||||
@ -89,7 +157,7 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
out.append(item)
|
out.append(item)
|
||||||
|
|
||||||
if out:
|
if out:
|
||||||
await self.send_json(out)
|
await self.send_json(type='notify', content=out)
|
||||||
|
|
||||||
async def send_data(self, event: Dict[str, Any]) -> None:
|
async def send_data(self, event: Dict[str, Any]) -> None:
|
||||||
"""
|
"""
|
||||||
@ -111,10 +179,10 @@ class SiteConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
collection_string=collection_string,
|
collection_string=collection_string,
|
||||||
id=id,
|
id=id,
|
||||||
action='deleted'))
|
action='deleted'))
|
||||||
await self.send_json(output)
|
await self.send_json(type='autoupdate', content=output)
|
||||||
|
|
||||||
|
|
||||||
class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
class ProjectorConsumer(ProtocollAsyncJsonWebsocketConsumer):
|
||||||
"""
|
"""
|
||||||
Websocket Consumer for the projector.
|
Websocket Consumer for the projector.
|
||||||
"""
|
"""
|
||||||
@ -131,14 +199,14 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
await self.accept()
|
await self.accept()
|
||||||
|
|
||||||
if not await database_sync_to_async(has_perm)(user, 'core.can_see_projector'):
|
if not await database_sync_to_async(has_perm)(user, 'core.can_see_projector'):
|
||||||
await self.send_json({'text': 'No permissions to see this projector.'})
|
await self.send_json(type='error', content='No permissions to see this projector.')
|
||||||
# TODO: Shouldend we just close the websocket connection with an error message?
|
# TODO: Shouldend we just close the websocket connection with an error message?
|
||||||
# self.close(code=4403)
|
# self.close(code=4403)
|
||||||
else:
|
else:
|
||||||
out = await sync_to_async(projector_startup_data)(projector_id)
|
out = await sync_to_async(projector_startup_data)(projector_id)
|
||||||
await self.send_json(out)
|
await self.send_json(type='autoupdate', content=out)
|
||||||
|
|
||||||
async def receive_json(self, content: Any) -> None:
|
async def receive_content(self, type: str, content: Any, id: str) -> None:
|
||||||
"""
|
"""
|
||||||
If we recieve something from the client we currently just interpret this
|
If we recieve something from the client we currently just interpret this
|
||||||
as a notify message.
|
as a notify message.
|
||||||
@ -187,7 +255,7 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
out.append(item)
|
out.append(item)
|
||||||
|
|
||||||
if out:
|
if out:
|
||||||
await self.send_json(out)
|
await self.send_json(type='notify', content=out)
|
||||||
|
|
||||||
async def send_data(self, event: Dict[str, Any]) -> None:
|
async def send_data(self, event: Dict[str, Any]) -> None:
|
||||||
"""
|
"""
|
||||||
@ -198,7 +266,7 @@ class ProjectorConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
|
|
||||||
output = await projector_sync_send_data(projector_id, collection_elements)
|
output = await projector_sync_send_data(projector_id, collection_elements)
|
||||||
if output:
|
if output:
|
||||||
await self.send_json(output)
|
await self.send_json(type='autoupdate', content=output)
|
||||||
|
|
||||||
|
|
||||||
async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> List[Any]:
|
async def startup_data(user: Optional[CollectionElement], change_id: int = 0) -> List[Any]:
|
||||||
@ -287,19 +355,33 @@ def notify_message_is_valid(message: object) -> bool:
|
|||||||
"""
|
"""
|
||||||
Returns True, when the message is a valid notify_message.
|
Returns True, when the message is a valid notify_message.
|
||||||
"""
|
"""
|
||||||
if not isinstance(message, list):
|
schema = {
|
||||||
# message has to be a list
|
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||||
|
"title": "Notify elements.",
|
||||||
|
"description": "Elements that one client can send to one or many other clients.",
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"projectors": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {"type": "integer"},
|
||||||
|
},
|
||||||
|
"reply_channels": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {"type": "string"},
|
||||||
|
},
|
||||||
|
"users": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {"type": "integer"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"minItems": 1,
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
jsonschema.validate(message, schema)
|
||||||
|
except jsonschema.ValidationError:
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
if not message:
|
|
||||||
# message must contain at least one element
|
|
||||||
return False
|
|
||||||
|
|
||||||
for element in message:
|
|
||||||
if not isinstance(element, dict):
|
|
||||||
# All elements have to be a dict
|
|
||||||
return False
|
|
||||||
# TODO: There could be more checks. For example 'users' has to be a list of int
|
|
||||||
# Check could be done with json-schema:
|
|
||||||
# https://pypi.org/project/jsonschema/
|
|
||||||
return True
|
return True
|
||||||
|
@ -5,6 +5,7 @@ daphne>=2.2,<2.3
|
|||||||
Django>=1.11,<2.2
|
Django>=1.11,<2.2
|
||||||
djangorestframework>=3.4,<3.9
|
djangorestframework>=3.4,<3.9
|
||||||
jsonfield2>=3.0,<3.1
|
jsonfield2>=3.0,<3.1
|
||||||
|
jsonschema>=2.6.0<2.7
|
||||||
mypy_extensions>=0.3,<0.4
|
mypy_extensions>=0.3,<0.4
|
||||||
PyPDF2>=1.26,<1.27
|
PyPDF2>=1.26,<1.27
|
||||||
roman>=2.0,<3.1
|
roman>=2.0,<3.1
|
||||||
|
@ -56,8 +56,11 @@ async def test_normal_connection(communicator):
|
|||||||
|
|
||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
# Test, that there is a lot of startup data.
|
type = response.get('type')
|
||||||
assert len(response) > 5
|
content = response.get('content')
|
||||||
|
assert type == 'autoupdate'
|
||||||
|
# Test, that both example objects are returned
|
||||||
|
assert len(content) > 10
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -71,7 +74,10 @@ async def test_receive_changed_data(communicator):
|
|||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
id = config.get_key_to_id()['general_event_name']
|
id = config.get_key_to_id()['general_event_name']
|
||||||
assert response == [
|
type = response.get('type')
|
||||||
|
content = response.get('content')
|
||||||
|
assert type == 'autoupdate'
|
||||||
|
assert content == [
|
||||||
{'action': 'changed',
|
{'action': 'changed',
|
||||||
'collection': 'core/config',
|
'collection': 'core/config',
|
||||||
'data': {'id': id, 'key': 'general_event_name', 'value': 'Test Event'},
|
'data': {'id': id, 'key': 'general_event_name', 'value': 'Test Event'},
|
||||||
@ -115,7 +121,10 @@ async def test_receive_deleted_data(communicator):
|
|||||||
await sync_to_async(inform_deleted_data)([(Collection1().get_collection_string(), 1)])
|
await sync_to_async(inform_deleted_data)([(Collection1().get_collection_string(), 1)])
|
||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
assert response == [{'action': 'deleted', 'collection': Collection1().get_collection_string(), 'id': 1}]
|
type = response.get('type')
|
||||||
|
content = response.get('content')
|
||||||
|
assert type == 'autoupdate'
|
||||||
|
assert content == [{'action': 'deleted', 'collection': Collection1().get_collection_string(), 'id': 1}]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -125,11 +134,12 @@ async def test_send_invalid_notify_not_a_list(communicator):
|
|||||||
# Await the startup data
|
# Await the startup data
|
||||||
await communicator.receive_json_from()
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
await communicator.send_json_to({'testmessage': 'foobar, what else.'})
|
await communicator.send_json_to({'type': 'notify', 'content': {'testmessage': 'foobar, what else.'}, 'id': 'test_send_invalid_notify_not_a_list'})
|
||||||
|
|
||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
assert response == {'error': 'invalid message'}
|
assert response['type'] == 'error'
|
||||||
|
assert response['content'] == 'Invalid notify message'
|
||||||
|
assert response['in_response'] == 'test_send_invalid_notify_not_a_list'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -139,11 +149,12 @@ async def test_send_invalid_notify_no_elements(communicator):
|
|||||||
# Await the startup data
|
# Await the startup data
|
||||||
await communicator.receive_json_from()
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
await communicator.send_json_to([])
|
await communicator.send_json_to({'type': 'notify', 'content': [], 'id': 'test_send_invalid_notify_no_elements'})
|
||||||
|
|
||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
assert response == {'error': 'invalid message'}
|
assert response['type'] == 'error'
|
||||||
|
assert response['content'] == 'Invalid notify message'
|
||||||
|
assert response['in_response'] == 'test_send_invalid_notify_no_elements'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -153,11 +164,12 @@ async def test_send_invalid_notify_str_in_list(communicator):
|
|||||||
# Await the startup data
|
# Await the startup data
|
||||||
await communicator.receive_json_from()
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
await communicator.send_json_to([{}, 'testmessage'])
|
await communicator.send_json_to({'type': 'notify', 'content': [{}, 'testmessage'], 'id': 'test_send_invalid_notify_str_in_list'})
|
||||||
|
|
||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
assert response == {'error': 'invalid message'}
|
assert response['type'] == 'error'
|
||||||
|
assert response['content'] == 'Invalid notify message'
|
||||||
|
assert response['in_response'] == 'test_send_invalid_notify_str_in_list'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -167,12 +179,65 @@ async def test_send_valid_notify(communicator):
|
|||||||
# Await the startup data
|
# Await the startup data
|
||||||
await communicator.receive_json_from()
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
await communicator.send_json_to([{'testmessage': 'foobar, what else.'}])
|
await communicator.send_json_to({'type': 'notify', 'content': [{'testmessage': 'foobar, what else.'}], 'id': 'test'})
|
||||||
|
|
||||||
response = await communicator.receive_json_from()
|
response = await communicator.receive_json_from()
|
||||||
|
|
||||||
assert isinstance(response, list)
|
content = response['content']
|
||||||
assert len(response) == 1
|
assert isinstance(content, list)
|
||||||
assert response[0]['testmessage'] == 'foobar, what else.'
|
assert len(content) == 1
|
||||||
assert 'senderReplyChannelName' in response[0]
|
assert content[0]['testmessage'] == 'foobar, what else.'
|
||||||
assert response[0]['senderUserId'] == 0
|
assert 'senderReplyChannelName' in content[0]
|
||||||
|
assert content[0]['senderUserId'] == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_invalid_websocket_message_type(communicator):
|
||||||
|
await set_config('general_system_enable_anonymous', True)
|
||||||
|
await communicator.connect()
|
||||||
|
# Await the startup data
|
||||||
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
|
await communicator.send_json_to([])
|
||||||
|
|
||||||
|
response = await communicator.receive_json_from()
|
||||||
|
assert response['type'] == 'error'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_invalid_websocket_message_no_id(communicator):
|
||||||
|
await set_config('general_system_enable_anonymous', True)
|
||||||
|
await communicator.connect()
|
||||||
|
# Await the startup data
|
||||||
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
|
await communicator.send_json_to({'type': 'test', 'content': 'foobar'})
|
||||||
|
|
||||||
|
response = await communicator.receive_json_from()
|
||||||
|
assert response['type'] == 'error'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_invalid_websocket_message_no_content(communicator):
|
||||||
|
await set_config('general_system_enable_anonymous', True)
|
||||||
|
await communicator.connect()
|
||||||
|
# Await the startup data
|
||||||
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
|
await communicator.send_json_to({'type': 'test', 'id': 'test_id'})
|
||||||
|
|
||||||
|
response = await communicator.receive_json_from()
|
||||||
|
assert response['type'] == 'error'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_unknown_type(communicator):
|
||||||
|
await set_config('general_system_enable_anonymous', True)
|
||||||
|
await communicator.connect()
|
||||||
|
# Await the startup data
|
||||||
|
await communicator.receive_json_from()
|
||||||
|
|
||||||
|
await communicator.send_json_to({'type': 'if_you_add_this_type_to_openslides_I_will_be_sad', 'content': True, 'id': 'test_id'})
|
||||||
|
|
||||||
|
response = await communicator.receive_json_from()
|
||||||
|
assert response['type'] == 'error'
|
||||||
|
assert response['in_response'] == 'test_id'
|
||||||
|
@ -43,21 +43,8 @@ DATABASES = {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Configure session in the cache
|
|
||||||
|
|
||||||
CACHES = {
|
|
||||||
'default': {
|
|
||||||
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
|
||||||
|
|
||||||
# When use_redis is True, the restricted data cache caches the data individuel
|
|
||||||
# for each user. This requires a lot of memory if there are a lot of active
|
|
||||||
# users. If use_redis is False, this setting has no effect.
|
|
||||||
DISABLE_USER_CACHE = False
|
|
||||||
|
|
||||||
# Internationalization
|
# Internationalization
|
||||||
# https://docs.djangoproject.com/en/1.10/topics/i18n/
|
# https://docs.djangoproject.com/en/1.10/topics/i18n/
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user