Merge pull request #4607 from FinnStutzenstein/prioritizeClients

Prioritize clients and WS ping-pong-mechanism
This commit is contained in:
Finn Stutzenstein 2019-05-07 14:04:09 +02:00 committed by GitHub
commit c560f511b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 315 additions and 46 deletions

View File

@ -13,6 +13,8 @@ import { ServertimeService } from './core/core-services/servertime.service';
import { ThemeService } from './core/ui-services/theme.service'; import { ThemeService } from './core/ui-services/theme.service';
import { DataStoreUpgradeService } from './core/core-services/data-store-upgrade.service'; import { DataStoreUpgradeService } from './core/core-services/data-store-upgrade.service';
import { UpdateService } from './core/ui-services/update.service'; import { UpdateService } from './core/ui-services/update.service';
import { PrioritizeService } from './core/core-services/prioritize.service';
import { PingService } from './core/core-services/ping.service';
/** /**
* Angular's global App Component * Angular's global App Component
@ -54,7 +56,9 @@ export class AppComponent {
configService: ConfigService, configService: ConfigService,
loadFontService: LoadFontService, loadFontService: LoadFontService,
dataStoreUpgradeService: DataStoreUpgradeService, // to start it. dataStoreUpgradeService: DataStoreUpgradeService, // to start it.
update: UpdateService update: UpdateService,
prioritizeService: PrioritizeService,
pingService: PingService
) { ) {
// manually add the supported languages // manually add the supported languages
translate.addLangs(['en', 'de', 'cs']); translate.addLangs(['en', 'de', 'cs']);

View File

@ -78,7 +78,11 @@ export class HttpService {
responseType = 'json'; responseType = 'json';
} }
const url = path + formatQueryParams(queryParams); let url = path + formatQueryParams(queryParams);
if (this.OSStatus.isPrioritizedClient) {
url = '/prioritize' + url;
}
const options = { const options = {
body: data, body: data,
headers: customHeader ? customHeader : this.defaultHeaders, headers: customHeader ? customHeader : this.defaultHeaders,

View File

@ -22,6 +22,8 @@ export class OpenSlidesStatusService {
return !!this.history; return !!this.history;
} }
public isPrioritizedClient = false;
/** /**
* Ctor, does nothing. * Ctor, does nothing.
*/ */

View File

@ -120,9 +120,9 @@ export class OpenSlidesService {
// is changed, the WS needs to reconnect, so the new connection holds the new // is changed, the WS needs to reconnect, so the new connection holds the new
// user information. // user information.
if (this.websocketService.isConnected) { if (this.websocketService.isConnected) {
await this.websocketService.close(); await this.websocketService.close(); // Wait for the disconnect.
} }
this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId.
} }
/** /**

View File

@ -308,13 +308,21 @@ export class OperatorService implements OnAfterAppsLoaded {
* @param groups The group ids to check * @param groups The group ids to check
*/ */
public isInGroupIds(...groupIds: number[]): boolean { public isInGroupIds(...groupIds: number[]): boolean {
if (!this.isInGroupIdsNonAdminCheck(...groupIds)) {
// An admin has all perms and is technically in every group.
return this.user.groups_id.includes(2);
}
return true;
}
/**
* Returns true, if the operator is in at least one group.
* @param groups The group ids to check
*/
public isInGroupIdsNonAdminCheck(...groupIds: number[]): boolean {
if (!this.user) { if (!this.user) {
return groupIds.includes(1); // any anonymous is in the default group. return groupIds.includes(1); // any anonymous is in the default group.
} }
if (this.user.groups_id.includes(2)) {
// An admin has all perms and is technically in every group.
return true;
}
return groupIds.some(id => this.user.groups_id.includes(id)); return groupIds.some(id => this.user.groups_id.includes(id));
} }

View File

@ -0,0 +1,17 @@
import { TestBed, inject } from '@angular/core/testing';
import { E2EImportsModule } from '../../../e2e-imports.module';
import { PingService } from './ping.service';
describe('PingService', () => {
beforeEach(() => {
TestBed.configureTestingModule({
imports: [E2EImportsModule],
providers: [PingService]
});
});
it('should be created', inject([PingService], (service: PingService) => {
expect(service).toBeTruthy();
}));
});

View File

@ -0,0 +1,100 @@
import { Injectable, ApplicationRef } from '@angular/core';
import { first, take } from 'rxjs/operators';
import { WebsocketService } from './websocket.service';
import { TimeoutPromise } from '../timeout-promise';
import { ConstantsService } from './constants.service';
import { Deferred } from '../deferred';
interface OpenSlidesSettings {
PING_INTERVAL?: number;
PING_TIMEOUT?: number;
}
@Injectable({
providedIn: 'root'
})
export class PingService {
/**
* The interval.
*/
private pingInterval: any;
private intervalTime: number;
private timeoutTime: number;
private lastLatency: number | null = null;
public constructor(
private websocketService: WebsocketService,
private appRef: ApplicationRef,
private constantsService: ConstantsService
) {
this.setup();
}
private async setup(): Promise<void> {
const gotConstants = new Deferred();
const isStable = new Deferred();
this.constantsService
.get<OpenSlidesSettings>('Settings')
.pipe(take(1))
.subscribe(settings => {
this.intervalTime = settings.PING_INTERVAL || 30000;
this.timeoutTime = settings.PING_TIMEOUT || 5000;
gotConstants.resolve();
});
this.appRef.isStable.pipe(first(s => s)).subscribe(() => {
isStable.resolve();
});
await Promise.all([gotConstants.promise, isStable.promise]);
// Connects the ping-pong mechanism to the opening and closing of the connection.
this.websocketService.closeEvent.subscribe(() => this.stopPing());
this.websocketService.connectEvent.subscribe(() => this.startPing());
if (this.websocketService.isConnected) {
this.startPing();
}
}
/**
* Starts the ping-mechanism
*/
private startPing(): void {
if (this.pingInterval) {
return;
}
this.pingInterval = setInterval(async () => {
const start = performance.now();
try {
await TimeoutPromise(
this.websocketService.sendAndGetResponse('ping', this.lastLatency),
this.timeoutTime
);
this.lastLatency = performance.now() - start;
if (this.lastLatency > 1000) {
console.warn(`Ping took ${this.lastLatency / 1000} seconds.`);
}
} catch (e) {
console.warn(`The server didn't respond to ping within ${this.timeoutTime / 1000} seconds.`);
this.stopPing();
this.websocketService.simulateAbnormalClose();
}
}, this.intervalTime);
}
/**
* Clears the ping interval
*/
private stopPing(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
}

View File

@ -0,0 +1,17 @@
import { TestBed, inject } from '@angular/core/testing';
import { PrioritizeService } from './prioritize.service';
import { E2EImportsModule } from '../../../e2e-imports.module';
describe('PrioritizeService', () => {
beforeEach(() => {
TestBed.configureTestingModule({
imports: [E2EImportsModule],
providers: [PrioritizeService]
});
});
it('should be created', inject([PrioritizeService], (service: PrioritizeService) => {
expect(service).toBeTruthy();
}));
});

View File

@ -0,0 +1,46 @@
import { Injectable } from '@angular/core';
import { WebsocketService } from './websocket.service';
import { ConstantsService } from './constants.service';
import { OpenSlidesStatusService } from './openslides-status.service';
import { OperatorService } from './operator.service';
import { DataStoreService } from './data-store.service';
interface OpenSlidesSettings {
PRIORITIZED_GROUP_IDS?: number[];
}
/**
* Cares about prioritizing a client. Checks, if the operator is in one of
* some prioritized groups. These group ids come from the server. If the prio-
* ritization changes, the websocket connection gets reconnected.
*/
@Injectable({
providedIn: 'root'
})
export class PrioritizeService {
private prioritizedGroupIds: number[] = [];
public constructor(
constantsService: ConstantsService,
private websocketService: WebsocketService,
private DS: DataStoreService,
private openSlidesStatusService: OpenSlidesStatusService,
private operator: OperatorService
) {
constantsService.get<OpenSlidesSettings>('Settings').subscribe(settings => {
this.prioritizedGroupIds = settings.PRIORITIZED_GROUP_IDS || [];
this.checkPrioritization();
});
operator.getUserObservable().subscribe(() => this.checkPrioritization());
}
private checkPrioritization(): void {
const opPrioritized = this.operator.isInGroupIdsNonAdminCheck(...this.prioritizedGroupIds);
if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) {
console.log('Alter prioritization:', opPrioritized);
this.openSlidesStatusService.isPrioritizedClient = opPrioritized;
this.websocketService.reconnect({ changeId: this.DS.maxChangeId });
}
}
}

View File

@ -2,11 +2,12 @@ import { Injectable, NgZone, EventEmitter } from '@angular/core';
import { MatSnackBar, MatSnackBarRef, SimpleSnackBar } from '@angular/material'; import { MatSnackBar, MatSnackBarRef, SimpleSnackBar } from '@angular/material';
import { Router } from '@angular/router'; import { Router } from '@angular/router';
import { TranslateService } from '@ngx-translate/core';
import { Observable, Subject } from 'rxjs'; import { Observable, Subject } from 'rxjs';
import { take } from 'rxjs/operators'; import { take } from 'rxjs/operators';
import { TranslateService } from '@ngx-translate/core';
import { formatQueryParams, QueryParams } from '../query-params'; import { formatQueryParams, QueryParams } from '../query-params';
import { OpenSlidesStatusService } from './openslides-status.service';
/** /**
* The generic message format in which messages are send and recieved by the server. * The generic message format in which messages are send and recieved by the server.
@ -31,6 +32,14 @@ interface IncommingWebsocketMessage extends BaseWebsocketMessage {
in_response?: string; in_response?: string;
} }
/*
* Options for (re-)connecting.
*/
interface ConnectOptions {
changeId?: number;
enableAutoupdates?: boolean;
}
/** /**
* Service that handles WebSocket connections. Other services can register themselfs * Service that handles WebSocket connections. Other services can register themselfs
* with {@method getOberservable} for a specific type of messages. The content will be published. * with {@method getOberservable} for a specific type of messages. The content will be published.
@ -104,6 +113,9 @@ export class WebsocketService {
*/ */
private subjects: { [type: string]: Subject<any> } = {}; private subjects: { [type: string]: Subject<any> } = {};
/**
* Callbacks for a waiting response
*/
private responseCallbacks: { [id: string]: [(val: any) => boolean, (error: string) => void | null] } = {}; private responseCallbacks: { [id: string]: [(val: any) => boolean, (error: string) => void | null] } = {};
/** /**
@ -128,7 +140,8 @@ export class WebsocketService {
private matSnackBar: MatSnackBar, private matSnackBar: MatSnackBar,
private zone: NgZone, private zone: NgZone,
private translate: TranslateService, private translate: TranslateService,
private router: Router private router: Router,
private openSlidesStatusService: OpenSlidesStatusService
) {} ) {}
/** /**
@ -136,15 +149,9 @@ export class WebsocketService {
* *
* Uses NgZone to let all callbacks run in the angular context. * Uses NgZone to let all callbacks run in the angular context.
*/ */
public connect( public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise<void> {
options: {
changeId?: number;
enableAutoupdates?: boolean;
} = {},
retry: boolean = false
): void {
if (this.websocket) { if (this.websocket) {
this.close(); await this.close();
} }
if (!retry) { if (!retry) {
@ -166,7 +173,11 @@ export class WebsocketService {
// Create the websocket // Create the websocket
let socketPath = location.protocol === 'https:' ? 'wss://' : 'ws://'; let socketPath = location.protocol === 'https:' ? 'wss://' : 'ws://';
socketPath += window.location.host + '/ws/'; socketPath += window.location.host;
if (this.openSlidesStatusService.isPrioritizedClient) {
socketPath += '/prioritize';
}
socketPath += '/ws/';
socketPath += formatQueryParams(queryParams); socketPath += formatQueryParams(queryParams);
this.websocket = new WebSocket(socketPath); this.websocket = new WebSocket(socketPath);
@ -203,33 +214,7 @@ export class WebsocketService {
this.websocket.onclose = (event: CloseEvent) => { this.websocket.onclose = (event: CloseEvent) => {
this.zone.run(() => { this.zone.run(() => {
this.websocket = null; this.onclose(event.code === 1000);
this._connectionOpen = false;
// 1000 is a normal close, like the close on logout
this._closeEvent.emit();
if (!this.shouldBeClosed && event.code !== 1000) {
// Do not show the message snackbar on the projector
// tests for /projector and /projector/<id>
const onProjector = this.router.url.match(/^\/projector(\/[0-9]+\/?)?$/);
if (this.retryCounter <= 3) {
this.retryCounter++;
}
if (!this.connectionErrorNotice && !onProjector && this.retryCounter > 3) {
// So here we have a connection failure that wasn't intendet.
this.connectionErrorNotice = this.matSnackBar.open(
this.translate.instant('Offline mode: You can use OpenSlides but changes are not saved.'),
'',
{ duration: 0 }
);
}
// A random retry timeout between 2000 and 5000 ms.
const timeout = Math.floor(Math.random() * 3000 + 2000);
setTimeout(() => {
this.connect({ enableAutoupdates: true }, true);
}, timeout);
}
}); });
}; };
@ -281,9 +266,49 @@ export class WebsocketService {
} }
} }
/**
* Simulates an abnormal close.
*/
public simulateAbnormalClose(): void {
if (this.websocket) {
this.websocket.close();
}
this.onclose(false);
}
/** /**
* Closes the connection error notice * Closes the connection error notice
*/ */
private onclose(normalClose: boolean): void {
this.websocket = null;
this._connectionOpen = false;
// 1000 is a normal close, like the close on logout
this._closeEvent.emit();
if (!this.shouldBeClosed && !normalClose) {
// Do not show the message snackbar on the projector
// tests for /projector and /projector/<id>
const onProjector = this.router.url.match(/^\/projector(\/[0-9]+\/?)?$/);
if (this.retryCounter <= 3) {
this.retryCounter++;
}
if (!this.connectionErrorNotice && !onProjector && this.retryCounter > 3) {
// So here we have a connection failure that wasn't intendet.
this.connectionErrorNotice = this.matSnackBar.open(
this.translate.instant('Offline mode: You can use OpenSlides but changes are not saved.'),
'',
{ duration: 0 }
);
}
// A random retry timeout between 2000 and 5000 ms.
const timeout = Math.floor(Math.random() * 3000 + 2000);
setTimeout(() => {
this.connect({ enableAutoupdates: true }, true);
}, timeout);
}
}
private dismissConnectionErrorNotice(): void { private dismissConnectionErrorNotice(): void {
if (this.connectionErrorNotice) { if (this.connectionErrorNotice) {
this.connectionErrorNotice.dismiss(); this.connectionErrorNotice.dismiss();
@ -304,6 +329,17 @@ export class WebsocketService {
} }
} }
/**
* closes and reopens the connection. If the connection was closed before,
* it will be just opened.
*
* @param options The options for the new connection
*/
public async reconnect(options: ConnectOptions = {}): Promise<void> {
await this.close();
await this.connect(options);
}
/** /**
* Returns an observable for messages of the given type. * Returns an observable for messages of the given type.
* @param type the message type * @param type the message type

View File

@ -0,0 +1,11 @@
/**
* Wraps a promise and let it reject after the given timeout (in ms), if it was
* not resolved before this timeout.
*
* @param promise The promise to wrap
* @param timeout The timeout
* @returns a new Promise
*/
export function TimeoutPromise<T>(promise: Promise<T>, timeout: number): Promise<T> {
return Promise.race([promise, new Promise((_, reject) => setTimeout(reject, timeout))]) as Promise<T>;
}

View File

@ -43,6 +43,7 @@ class CoreAppConfig(AppConfig):
GetElementsWebsocketClientMessage, GetElementsWebsocketClientMessage,
AutoupdateWebsocketClientMessage, AutoupdateWebsocketClientMessage,
ListenToProjectors, ListenToProjectors,
PingPong,
) )
from ..utils.access_permissions import required_user from ..utils.access_permissions import required_user
from ..utils.rest_api import router from ..utils.rest_api import router
@ -109,6 +110,7 @@ class CoreAppConfig(AppConfig):
register_client_message(GetElementsWebsocketClientMessage()) register_client_message(GetElementsWebsocketClientMessage())
register_client_message(AutoupdateWebsocketClientMessage()) register_client_message(AutoupdateWebsocketClientMessage())
register_client_message(ListenToProjectors()) register_client_message(ListenToProjectors())
register_client_message(PingPong())
# register required_users # register required_users
required_user.add_collection_string( required_user.add_collection_string(
@ -147,6 +149,9 @@ class CoreAppConfig(AppConfig):
"MOTION_IDENTIFIER_MIN_DIGITS", "MOTION_IDENTIFIER_MIN_DIGITS",
"MOTION_IDENTIFIER_WITHOUT_BLANKS", "MOTION_IDENTIFIER_WITHOUT_BLANKS",
"MOTIONS_ALLOW_AMENDMENTS_OF_AMENDMENTS", "MOTIONS_ALLOW_AMENDMENTS_OF_AMENDMENTS",
"PRIORITIZED_GROUP_IDS",
"PING_INTERVAL",
"PING_TIMEOUT",
] ]
client_settings_dict = {} client_settings_dict = {}
for key in client_settings_keys: for key in client_settings_keys:

View File

@ -190,3 +190,22 @@ class ListenToProjectors(BaseWebsocketClientMessage):
await consumer.send_json( await consumer.send_json(
type="projector", content=projector_data, in_response=id type="projector", content=projector_data, in_response=id
) )
class PingPong(BaseWebsocketClientMessage):
"""
Responds to pings from the client.
"""
identifier = "ping"
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"titel": "PingPong",
"description": "Does a ping pong handshake",
"anyOf": [{"type": "number"}, {"type": "null"}],
}
async def receive_content(
self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str
) -> None:
await consumer.send_json(type="pong", content=content, in_response=id)