From 138081292426730b1f8911e61cb5ddc581db9b53 Mon Sep 17 00:00:00 2001 From: Finn Stutzenstein Date: Wed, 27 Jan 2021 14:40:45 +0100 Subject: [PATCH] Fixes offline bar on successful reconnections (closes #5810). --- autoupdate | 2 +- .../autoupdate-throttle.service.ts | 19 ++++++++++-- .../core/core-services/autoupdate.service.ts | 1 + .../communication-manager.service.ts | 23 ++++++++------ .../streaming-communication.service.ts | 31 +++++++++++-------- 5 files changed, 50 insertions(+), 26 deletions(-) diff --git a/autoupdate b/autoupdate index b187dd439..03d86865c 160000 --- a/autoupdate +++ b/autoupdate @@ -1 +1 @@ -Subproject commit b187dd439bd7456e105901ca96cd0995862d4419 +Subproject commit 03d86865c063059878f4c3d616b6a6a9099b90d3 diff --git a/client/src/app/core/core-services/autoupdate-throttle.service.ts b/client/src/app/core/core-services/autoupdate-throttle.service.ts index e0127b9ef..77f05e537 100644 --- a/client/src/app/core/core-services/autoupdate-throttle.service.ts +++ b/client/src/app/core/core-services/autoupdate-throttle.service.ts @@ -68,6 +68,12 @@ export class AutoupdateThrottleService { this.disabledUntil = null; console.log('Throttling autoupdates again'); } + } else if (autoupdate.all_data) { + // all_data=true (aka initial data) should be processed immediatly + // but since there can be pending autoupdates, add it there and + // process them now! + this.pendingAutoupdates.push(autoupdate); + this.processPendingAutoupdates(); } else { this.pendingAutoupdates.push(autoupdate); this.receivedAutoupdate.emit(); @@ -91,12 +97,19 @@ export class AutoupdateThrottleService { this.disabledUntil = changeId; } + /** + * discard all pending autoupdates and resets the timer + */ + public discard(): void { + this.pendingAutoupdates = []; + } + private processPendingAutoupdates(): void { - const autoupdates = this.pendingAutoupdates; - if (autoupdates.length === 0) { + if (this.pendingAutoupdates.length === 0) { return; } - this.pendingAutoupdates = []; + const autoupdates = this.pendingAutoupdates; + this.discard(); console.log(`Processing ${autoupdates.length} pending autoupdates`); const autoupdate = this.mergeAutoupdates(autoupdates); diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index e478d796a..6f5d394ab 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -59,6 +59,7 @@ export class AutoupdateService { this.streamCloseFn(); this.streamCloseFn = null; } + this.autoupdateThrottle.discard(); } /** diff --git a/client/src/app/core/core-services/communication-manager.service.ts b/client/src/app/core/core-services/communication-manager.service.ts index 0a576ecaf..67f602154 100644 --- a/client/src/app/core/core-services/communication-manager.service.ts +++ b/client/src/app/core/core-services/communication-manager.service.ts @@ -54,7 +54,7 @@ export class CommunicationManagerService { return this._stopCommunicationEvent.asObservable(); } - private streamContainers: { [id: number]: StreamContainer } = {}; + private streamContainers: { [id: number]: StreamContainer } = {}; public constructor( private streamingCommunicationService: StreamingCommunicationService, @@ -87,7 +87,7 @@ export class CommunicationManagerService { } } - private async connectWithWrapper(streamContainer: StreamContainer): Promise<() => void> { + private async connectWithWrapper(streamContainer: StreamContainer): Promise<() => void> { console.log('connect', streamContainer, streamContainer.stream); const errorHandler = (type: ErrorType, error: CommunicationError, message: string) => this.handleError(streamContainer, type, error, message); @@ -96,8 +96,8 @@ export class CommunicationManagerService { return () => this.close(streamContainer); } - private async handleError( - streamContainer: StreamContainer, + private async handleError( + streamContainer: StreamContainer, type: ErrorType, error: CommunicationError, message: string @@ -109,7 +109,7 @@ export class CommunicationManagerService { streamContainer.hasErroredAmount++; if (streamContainer.hasErroredAmount > MAX_STREAM_FAILURE_RETRIES) { this.goOffline(streamContainer, OfflineReason.ConnectionLost); - } else if (type === ErrorType.Client && error.type === 'ErrorAuth') { + } else if (type === ErrorType.Client && error.type === 'auth_required') { this.goOffline(streamContainer, OfflineReason.WhoAmIFailed); } else { // retry it after some time: @@ -117,7 +117,7 @@ export class CommunicationManagerService { `Retry no. ${streamContainer.hasErroredAmount} of ${MAX_STREAM_FAILURE_RETRIES} for ${streamContainer.url}` ); try { - await this.delayAndCheckReconnection(); + await this.delayAndCheckReconnection(streamContainer); await this.connectWithWrapper(streamContainer); } catch (e) { // delayAndCheckReconnection can throw an OfflineError, @@ -126,8 +126,13 @@ export class CommunicationManagerService { } } - private async delayAndCheckReconnection(): Promise { - const delay = Math.floor(Math.random() * 3000 + 2000); + private async delayAndCheckReconnection(streamContainer: StreamContainer): Promise { + let delay; + if (streamContainer.hasErroredAmount === 1) { + delay = 500; // the first error has a small delay since these error can happen normally. + } else { + delay = Math.floor(Math.random() * 3000 + 2000); + } console.log(`retry again in ${delay} ms`); await SleepPromise(delay); @@ -156,7 +161,7 @@ export class CommunicationManagerService { this._stopCommunicationEvent.emit(); } - private goOffline(streamContainer: StreamContainer, reason: OfflineReason): void { + private goOffline(streamContainer: StreamContainer, reason: OfflineReason): void { delete this.streamContainers[streamContainer.id]; this.closeConnections(); // here we close the connections early. this.offlineBroadcastService.goOffline(reason); diff --git a/client/src/app/core/core-services/streaming-communication.service.ts b/client/src/app/core/core-services/streaming-communication.service.ts index 7ea42eda1..b66c24cd7 100644 --- a/client/src/app/core/core-services/streaming-communication.service.ts +++ b/client/src/app/core/core-services/streaming-communication.service.ts @@ -55,18 +55,26 @@ export class StreamConnectionError extends Error { } } -export class StreamContainer { +export class StreamContainer { public readonly id = Math.floor(Math.random() * (900000 - 1) + 100000); // [100000, 999999] + public messageHandler: (message: T) => void; + public hasErroredAmount = 0; - public stream?: Stream; + public stream?: Stream; - public constructor( - public url: string, - public messageHandler: (message: any) => void, - public params: () => Params - ) {} + public constructor(public url: string, messageHandler: (message: T) => void, public params: () => Params) { + this.messageHandler = (message: T) => { + // {connected: true} is a special message just to trigger the code below + if ((message).connected) { + console.log(`resetting error amount for ${this.url} since there was a connect message`); + this.hasErroredAmount = 0; + } else { + messageHandler(message); + } + }; + } } export class Stream { @@ -176,11 +184,8 @@ export class Stream { } return; } else { - // check, if we didn't get a keep alive - if (Object.keys(parsedContent).length > 0) { - console.debug('received', parsedContent); - this.messageHandler(parsedContent); - } + console.debug('received', parsedContent); + this.messageHandler(parsedContent); } } else { this.checkedUntilIndex = event.loaded; @@ -247,7 +252,7 @@ export class Stream { export class StreamingCommunicationService { public constructor(private http: HttpClient) {} - public subscribe(streamContainer: StreamContainer, errorHandler: ErrorHandler): void { + public subscribe(streamContainer: StreamContainer, errorHandler: ErrorHandler): void { const options: { body?: any; headers?: HttpHeaders | { [header: string]: string | string[] };