From febbe9bba2bce38a3ad996646ad30738935a707c Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Mon, 15 Apr 2019 14:19:40 +0200 Subject: [PATCH] Prioritize clients with the right operator and added ping-pong mechanism --- client/src/app/app.component.ts | 6 +- .../app/core/core-services/http.service.ts | 6 +- .../openslides-status.service.ts | 2 + .../core/core-services/openslides.service.ts | 4 +- .../core/core-services/operator.service.ts | 16 ++- .../core/core-services/ping.service.spec.ts | 17 +++ .../app/core/core-services/ping.service.ts | 100 ++++++++++++++++ .../core-services/prioritize.service.spec.ts | 17 +++ .../core/core-services/prioritize.service.ts | 46 +++++++ .../core/core-services/websocket.service.ts | 112 ++++++++++++------ client/src/app/core/timeout-promise.ts | 11 ++ openslides/core/apps.py | 5 + openslides/core/websocket.py | 19 +++ 13 files changed, 315 insertions(+), 46 deletions(-) create mode 100644 client/src/app/core/core-services/ping.service.spec.ts create mode 100644 client/src/app/core/core-services/ping.service.ts create mode 100644 client/src/app/core/core-services/prioritize.service.spec.ts create mode 100644 client/src/app/core/core-services/prioritize.service.ts create mode 100644 client/src/app/core/timeout-promise.ts diff --git a/client/src/app/app.component.ts b/client/src/app/app.component.ts index b76f759d6..5b7b5c99f 100644 --- a/client/src/app/app.component.ts +++ b/client/src/app/app.component.ts @@ -13,6 +13,8 @@ import { ServertimeService } from './core/core-services/servertime.service'; import { ThemeService } from './core/ui-services/theme.service'; import { DataStoreUpgradeService } from './core/core-services/data-store-upgrade.service'; import { UpdateService } from './core/ui-services/update.service'; +import { PrioritizeService } from './core/core-services/prioritize.service'; +import { PingService } from './core/core-services/ping.service'; /** * Angular's global App Component @@ -54,7 +56,9 @@ export class AppComponent { configService: ConfigService, loadFontService: LoadFontService, dataStoreUpgradeService: DataStoreUpgradeService, // to start it. - update: UpdateService + update: UpdateService, + prioritizeService: PrioritizeService, + pingService: PingService ) { // manually add the supported languages translate.addLangs(['en', 'de', 'cs']); diff --git a/client/src/app/core/core-services/http.service.ts b/client/src/app/core/core-services/http.service.ts index 690c0898e..0265000d7 100644 --- a/client/src/app/core/core-services/http.service.ts +++ b/client/src/app/core/core-services/http.service.ts @@ -78,7 +78,11 @@ export class HttpService { responseType = 'json'; } - const url = path + formatQueryParams(queryParams); + let url = path + formatQueryParams(queryParams); + if (this.OSStatus.isPrioritizedClient) { + url = '/prioritize' + url; + } + const options = { body: data, headers: customHeader ? customHeader : this.defaultHeaders, diff --git a/client/src/app/core/core-services/openslides-status.service.ts b/client/src/app/core/core-services/openslides-status.service.ts index 90775c564..0829c761c 100644 --- a/client/src/app/core/core-services/openslides-status.service.ts +++ b/client/src/app/core/core-services/openslides-status.service.ts @@ -22,6 +22,8 @@ export class OpenSlidesStatusService { return !!this.history; } + public isPrioritizedClient = false; + /** * Ctor, does nothing. */ diff --git a/client/src/app/core/core-services/openslides.service.ts b/client/src/app/core/core-services/openslides.service.ts index 16e0568da..014595cec 100644 --- a/client/src/app/core/core-services/openslides.service.ts +++ b/client/src/app/core/core-services/openslides.service.ts @@ -120,9 +120,9 @@ export class OpenSlidesService { // is changed, the WS needs to reconnect, so the new connection holds the new // user information. if (this.websocketService.isConnected) { - await this.websocketService.close(); + await this.websocketService.close(); // Wait for the disconnect. } - this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. + await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. } /** diff --git a/client/src/app/core/core-services/operator.service.ts b/client/src/app/core/core-services/operator.service.ts index c3e20a83a..81b58ecfe 100644 --- a/client/src/app/core/core-services/operator.service.ts +++ b/client/src/app/core/core-services/operator.service.ts @@ -308,13 +308,21 @@ export class OperatorService implements OnAfterAppsLoaded { * @param groups The group ids to check */ public isInGroupIds(...groupIds: number[]): boolean { + if (!this.isInGroupIdsNonAdminCheck(...groupIds)) { + // An admin has all perms and is technically in every group. + return this.user.groups_id.includes(2); + } + return true; + } + + /** + * Returns true, if the operator is in at least one group. + * @param groups The group ids to check + */ + public isInGroupIdsNonAdminCheck(...groupIds: number[]): boolean { if (!this.user) { return groupIds.includes(1); // any anonymous is in the default group. } - if (this.user.groups_id.includes(2)) { - // An admin has all perms and is technically in every group. - return true; - } return groupIds.some(id => this.user.groups_id.includes(id)); } diff --git a/client/src/app/core/core-services/ping.service.spec.ts b/client/src/app/core/core-services/ping.service.spec.ts new file mode 100644 index 000000000..c9121d692 --- /dev/null +++ b/client/src/app/core/core-services/ping.service.spec.ts @@ -0,0 +1,17 @@ +import { TestBed, inject } from '@angular/core/testing'; + +import { E2EImportsModule } from '../../../e2e-imports.module'; +import { PingService } from './ping.service'; + +describe('PingService', () => { + beforeEach(() => { + TestBed.configureTestingModule({ + imports: [E2EImportsModule], + providers: [PingService] + }); + }); + + it('should be created', inject([PingService], (service: PingService) => { + expect(service).toBeTruthy(); + })); +}); diff --git a/client/src/app/core/core-services/ping.service.ts b/client/src/app/core/core-services/ping.service.ts new file mode 100644 index 000000000..b684c99fd --- /dev/null +++ b/client/src/app/core/core-services/ping.service.ts @@ -0,0 +1,100 @@ +import { Injectable, ApplicationRef } from '@angular/core'; + +import { first, take } from 'rxjs/operators'; + +import { WebsocketService } from './websocket.service'; +import { TimeoutPromise } from '../timeout-promise'; +import { ConstantsService } from './constants.service'; +import { Deferred } from '../deferred'; + +interface OpenSlidesSettings { + PING_INTERVAL?: number; + PING_TIMEOUT?: number; +} + +@Injectable({ + providedIn: 'root' +}) +export class PingService { + /** + * The interval. + */ + private pingInterval: any; + + private intervalTime: number; + + private timeoutTime: number; + + private lastLatency: number | null = null; + + public constructor( + private websocketService: WebsocketService, + private appRef: ApplicationRef, + private constantsService: ConstantsService + ) { + this.setup(); + } + + private async setup(): Promise { + const gotConstants = new Deferred(); + const isStable = new Deferred(); + + this.constantsService + .get('Settings') + .pipe(take(1)) + .subscribe(settings => { + this.intervalTime = settings.PING_INTERVAL || 30000; + this.timeoutTime = settings.PING_TIMEOUT || 5000; + gotConstants.resolve(); + }); + this.appRef.isStable.pipe(first(s => s)).subscribe(() => { + isStable.resolve(); + }); + + await Promise.all([gotConstants.promise, isStable.promise]); + + // Connects the ping-pong mechanism to the opening and closing of the connection. + this.websocketService.closeEvent.subscribe(() => this.stopPing()); + this.websocketService.connectEvent.subscribe(() => this.startPing()); + if (this.websocketService.isConnected) { + this.startPing(); + } + } + + /** + * Starts the ping-mechanism + */ + private startPing(): void { + if (this.pingInterval) { + return; + } + + this.pingInterval = setInterval(async () => { + const start = performance.now(); + try { + await TimeoutPromise( + this.websocketService.sendAndGetResponse('ping', this.lastLatency), + this.timeoutTime + ); + this.lastLatency = performance.now() - start; + if (this.lastLatency > 1000) { + console.warn(`Ping took ${this.lastLatency / 1000} seconds.`); + } + } catch (e) { + console.warn(`The server didn't respond to ping within ${this.timeoutTime / 1000} seconds.`); + this.stopPing(); + this.websocketService.simulateAbnormalClose(); + } + }, this.intervalTime); + } + + /** + * Clears the ping interval + */ + private stopPing(): void { + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + } + } +} diff --git a/client/src/app/core/core-services/prioritize.service.spec.ts b/client/src/app/core/core-services/prioritize.service.spec.ts new file mode 100644 index 000000000..764290a72 --- /dev/null +++ b/client/src/app/core/core-services/prioritize.service.spec.ts @@ -0,0 +1,17 @@ +import { TestBed, inject } from '@angular/core/testing'; + +import { PrioritizeService } from './prioritize.service'; +import { E2EImportsModule } from '../../../e2e-imports.module'; + +describe('PrioritizeService', () => { + beforeEach(() => { + TestBed.configureTestingModule({ + imports: [E2EImportsModule], + providers: [PrioritizeService] + }); + }); + + it('should be created', inject([PrioritizeService], (service: PrioritizeService) => { + expect(service).toBeTruthy(); + })); +}); diff --git a/client/src/app/core/core-services/prioritize.service.ts b/client/src/app/core/core-services/prioritize.service.ts new file mode 100644 index 000000000..41c95c77e --- /dev/null +++ b/client/src/app/core/core-services/prioritize.service.ts @@ -0,0 +1,46 @@ +import { Injectable } from '@angular/core'; + +import { WebsocketService } from './websocket.service'; +import { ConstantsService } from './constants.service'; +import { OpenSlidesStatusService } from './openslides-status.service'; +import { OperatorService } from './operator.service'; +import { DataStoreService } from './data-store.service'; + +interface OpenSlidesSettings { + PRIORITIZED_GROUP_IDS?: number[]; +} + +/** + * Cares about prioritizing a client. Checks, if the operator is in one of + * some prioritized groups. These group ids come from the server. If the prio- + * ritization changes, the websocket connection gets reconnected. + */ +@Injectable({ + providedIn: 'root' +}) +export class PrioritizeService { + private prioritizedGroupIds: number[] = []; + + public constructor( + constantsService: ConstantsService, + private websocketService: WebsocketService, + private DS: DataStoreService, + private openSlidesStatusService: OpenSlidesStatusService, + private operator: OperatorService + ) { + constantsService.get('Settings').subscribe(settings => { + this.prioritizedGroupIds = settings.PRIORITIZED_GROUP_IDS || []; + this.checkPrioritization(); + }); + operator.getUserObservable().subscribe(() => this.checkPrioritization()); + } + + private checkPrioritization(): void { + const opPrioritized = this.operator.isInGroupIdsNonAdminCheck(...this.prioritizedGroupIds); + if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) { + console.log('Alter prioritization:', opPrioritized); + this.openSlidesStatusService.isPrioritizedClient = opPrioritized; + this.websocketService.reconnect({ changeId: this.DS.maxChangeId }); + } + } +} diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index d70a9bce6..4221abe0d 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -2,11 +2,12 @@ import { Injectable, NgZone, EventEmitter } from '@angular/core'; import { MatSnackBar, MatSnackBarRef, SimpleSnackBar } from '@angular/material'; import { Router } from '@angular/router'; +import { TranslateService } from '@ngx-translate/core'; import { Observable, Subject } from 'rxjs'; import { take } from 'rxjs/operators'; -import { TranslateService } from '@ngx-translate/core'; import { formatQueryParams, QueryParams } from '../query-params'; +import { OpenSlidesStatusService } from './openslides-status.service'; /** * The generic message format in which messages are send and recieved by the server. @@ -31,6 +32,14 @@ interface IncommingWebsocketMessage extends BaseWebsocketMessage { in_response?: string; } +/* + * Options for (re-)connecting. + */ +interface ConnectOptions { + changeId?: number; + enableAutoupdates?: boolean; +} + /** * Service that handles WebSocket connections. Other services can register themselfs * with {@method getOberservable} for a specific type of messages. The content will be published. @@ -104,6 +113,9 @@ export class WebsocketService { */ private subjects: { [type: string]: Subject } = {}; + /** + * Callbacks for a waiting response + */ private responseCallbacks: { [id: string]: [(val: any) => boolean, (error: string) => void | null] } = {}; /** @@ -128,7 +140,8 @@ export class WebsocketService { private matSnackBar: MatSnackBar, private zone: NgZone, private translate: TranslateService, - private router: Router + private router: Router, + private openSlidesStatusService: OpenSlidesStatusService ) {} /** @@ -136,15 +149,9 @@ export class WebsocketService { * * Uses NgZone to let all callbacks run in the angular context. */ - public connect( - options: { - changeId?: number; - enableAutoupdates?: boolean; - } = {}, - retry: boolean = false - ): void { + public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise { if (this.websocket) { - this.close(); + await this.close(); } if (!retry) { @@ -166,7 +173,11 @@ export class WebsocketService { // Create the websocket let socketPath = location.protocol === 'https:' ? 'wss://' : 'ws://'; - socketPath += window.location.host + '/ws/'; + socketPath += window.location.host; + if (this.openSlidesStatusService.isPrioritizedClient) { + socketPath += '/prioritize'; + } + socketPath += '/ws/'; socketPath += formatQueryParams(queryParams); this.websocket = new WebSocket(socketPath); @@ -203,33 +214,7 @@ export class WebsocketService { this.websocket.onclose = (event: CloseEvent) => { this.zone.run(() => { - this.websocket = null; - this._connectionOpen = false; - // 1000 is a normal close, like the close on logout - this._closeEvent.emit(); - if (!this.shouldBeClosed && event.code !== 1000) { - // Do not show the message snackbar on the projector - // tests for /projector and /projector/ - const onProjector = this.router.url.match(/^\/projector(\/[0-9]+\/?)?$/); - if (this.retryCounter <= 3) { - this.retryCounter++; - } - - if (!this.connectionErrorNotice && !onProjector && this.retryCounter > 3) { - // So here we have a connection failure that wasn't intendet. - this.connectionErrorNotice = this.matSnackBar.open( - this.translate.instant('Offline mode: You can use OpenSlides but changes are not saved.'), - '', - { duration: 0 } - ); - } - - // A random retry timeout between 2000 and 5000 ms. - const timeout = Math.floor(Math.random() * 3000 + 2000); - setTimeout(() => { - this.connect({ enableAutoupdates: true }, true); - }, timeout); - } + this.onclose(event.code === 1000); }); }; @@ -281,9 +266,49 @@ export class WebsocketService { } } + /** + * Simulates an abnormal close. + */ + public simulateAbnormalClose(): void { + if (this.websocket) { + this.websocket.close(); + } + this.onclose(false); + } + /** * Closes the connection error notice */ + private onclose(normalClose: boolean): void { + this.websocket = null; + this._connectionOpen = false; + // 1000 is a normal close, like the close on logout + this._closeEvent.emit(); + if (!this.shouldBeClosed && !normalClose) { + // Do not show the message snackbar on the projector + // tests for /projector and /projector/ + const onProjector = this.router.url.match(/^\/projector(\/[0-9]+\/?)?$/); + if (this.retryCounter <= 3) { + this.retryCounter++; + } + + if (!this.connectionErrorNotice && !onProjector && this.retryCounter > 3) { + // So here we have a connection failure that wasn't intendet. + this.connectionErrorNotice = this.matSnackBar.open( + this.translate.instant('Offline mode: You can use OpenSlides but changes are not saved.'), + '', + { duration: 0 } + ); + } + + // A random retry timeout between 2000 and 5000 ms. + const timeout = Math.floor(Math.random() * 3000 + 2000); + setTimeout(() => { + this.connect({ enableAutoupdates: true }, true); + }, timeout); + } + } + private dismissConnectionErrorNotice(): void { if (this.connectionErrorNotice) { this.connectionErrorNotice.dismiss(); @@ -304,6 +329,17 @@ export class WebsocketService { } } + /** + * closes and reopens the connection. If the connection was closed before, + * it will be just opened. + * + * @param options The options for the new connection + */ + public async reconnect(options: ConnectOptions = {}): Promise { + await this.close(); + await this.connect(options); + } + /** * Returns an observable for messages of the given type. * @param type the message type diff --git a/client/src/app/core/timeout-promise.ts b/client/src/app/core/timeout-promise.ts new file mode 100644 index 000000000..dc2b7a586 --- /dev/null +++ b/client/src/app/core/timeout-promise.ts @@ -0,0 +1,11 @@ +/** + * Wraps a promise and let it reject after the given timeout (in ms), if it was + * not resolved before this timeout. + * + * @param promise The promise to wrap + * @param timeout The timeout + * @returns a new Promise + */ +export function TimeoutPromise(promise: Promise, timeout: number): Promise { + return Promise.race([promise, new Promise((_, reject) => setTimeout(reject, timeout))]) as Promise; +} diff --git a/openslides/core/apps.py b/openslides/core/apps.py index b8a38f592..3d04c5e79 100644 --- a/openslides/core/apps.py +++ b/openslides/core/apps.py @@ -43,6 +43,7 @@ class CoreAppConfig(AppConfig): GetElementsWebsocketClientMessage, AutoupdateWebsocketClientMessage, ListenToProjectors, + PingPong, ) from ..utils.access_permissions import required_user from ..utils.rest_api import router @@ -109,6 +110,7 @@ class CoreAppConfig(AppConfig): register_client_message(GetElementsWebsocketClientMessage()) register_client_message(AutoupdateWebsocketClientMessage()) register_client_message(ListenToProjectors()) + register_client_message(PingPong()) # register required_users required_user.add_collection_string( @@ -147,6 +149,9 @@ class CoreAppConfig(AppConfig): "MOTION_IDENTIFIER_MIN_DIGITS", "MOTION_IDENTIFIER_WITHOUT_BLANKS", "MOTIONS_ALLOW_AMENDMENTS_OF_AMENDMENTS", + "PRIORITIZED_GROUP_IDS", + "PING_INTERVAL", + "PING_TIMEOUT", ] client_settings_dict = {} for key in client_settings_keys: diff --git a/openslides/core/websocket.py b/openslides/core/websocket.py index 47118bfa2..da9825cae 100644 --- a/openslides/core/websocket.py +++ b/openslides/core/websocket.py @@ -190,3 +190,22 @@ class ListenToProjectors(BaseWebsocketClientMessage): await consumer.send_json( type="projector", content=projector_data, in_response=id ) + + +class PingPong(BaseWebsocketClientMessage): + """ + Responds to pings from the client. + """ + + identifier = "ping" + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "titel": "PingPong", + "description": "Does a ping pong handshake", + "anyOf": [{"type": "number"}, {"type": "null"}], + } + + async def receive_content( + self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str + ) -> None: + await consumer.send_json(type="pong", content=content, in_response=id)