Merge pull request #5828 from FinnStutzenstein/offlineBarReconnect

Fixes offline bar on successful reconnections (closes #5810).
This commit is contained in:
Finn Stutzenstein 2021-02-02 13:12:09 +01:00 committed by GitHub
commit a20641fe44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 26 deletions

@ -1 +1 @@
Subproject commit b187dd439bd7456e105901ca96cd0995862d4419
Subproject commit 03d86865c063059878f4c3d616b6a6a9099b90d3

View File

@ -68,6 +68,12 @@ export class AutoupdateThrottleService {
this.disabledUntil = null;
console.log('Throttling autoupdates again');
}
} else if (autoupdate.all_data) {
// all_data=true (aka initial data) should be processed immediatly
// but since there can be pending autoupdates, add it there and
// process them now!
this.pendingAutoupdates.push(autoupdate);
this.processPendingAutoupdates();
} else {
this.pendingAutoupdates.push(autoupdate);
this.receivedAutoupdate.emit();
@ -91,12 +97,19 @@ export class AutoupdateThrottleService {
this.disabledUntil = changeId;
}
/**
* discard all pending autoupdates and resets the timer
*/
public discard(): void {
this.pendingAutoupdates = [];
}
private processPendingAutoupdates(): void {
const autoupdates = this.pendingAutoupdates;
if (autoupdates.length === 0) {
if (this.pendingAutoupdates.length === 0) {
return;
}
this.pendingAutoupdates = [];
const autoupdates = this.pendingAutoupdates;
this.discard();
console.log(`Processing ${autoupdates.length} pending autoupdates`);
const autoupdate = this.mergeAutoupdates(autoupdates);

View File

@ -59,6 +59,7 @@ export class AutoupdateService {
this.streamCloseFn();
this.streamCloseFn = null;
}
this.autoupdateThrottle.discard();
}
/**

View File

@ -54,7 +54,7 @@ export class CommunicationManagerService {
return this._stopCommunicationEvent.asObservable();
}
private streamContainers: { [id: number]: StreamContainer } = {};
private streamContainers: { [id: number]: StreamContainer<any> } = {};
public constructor(
private streamingCommunicationService: StreamingCommunicationService,
@ -87,7 +87,7 @@ export class CommunicationManagerService {
}
}
private async connectWithWrapper(streamContainer: StreamContainer): Promise<() => void> {
private async connectWithWrapper<T>(streamContainer: StreamContainer<T>): Promise<() => void> {
console.log('connect', streamContainer, streamContainer.stream);
const errorHandler = (type: ErrorType, error: CommunicationError, message: string) =>
this.handleError(streamContainer, type, error, message);
@ -96,8 +96,8 @@ export class CommunicationManagerService {
return () => this.close(streamContainer);
}
private async handleError(
streamContainer: StreamContainer,
private async handleError<T>(
streamContainer: StreamContainer<T>,
type: ErrorType,
error: CommunicationError,
message: string
@ -109,7 +109,7 @@ export class CommunicationManagerService {
streamContainer.hasErroredAmount++;
if (streamContainer.hasErroredAmount > MAX_STREAM_FAILURE_RETRIES) {
this.goOffline(streamContainer, OfflineReason.ConnectionLost);
} else if (type === ErrorType.Client && error.type === 'ErrorAuth') {
} else if (type === ErrorType.Client && error.type === 'auth_required') {
this.goOffline(streamContainer, OfflineReason.WhoAmIFailed);
} else {
// retry it after some time:
@ -117,7 +117,7 @@ export class CommunicationManagerService {
`Retry no. ${streamContainer.hasErroredAmount} of ${MAX_STREAM_FAILURE_RETRIES} for ${streamContainer.url}`
);
try {
await this.delayAndCheckReconnection();
await this.delayAndCheckReconnection(streamContainer);
await this.connectWithWrapper(streamContainer);
} catch (e) {
// delayAndCheckReconnection can throw an OfflineError,
@ -126,8 +126,13 @@ export class CommunicationManagerService {
}
}
private async delayAndCheckReconnection(): Promise<void> {
const delay = Math.floor(Math.random() * 3000 + 2000);
private async delayAndCheckReconnection<T>(streamContainer: StreamContainer<T>): Promise<void> {
let delay;
if (streamContainer.hasErroredAmount === 1) {
delay = 500; // the first error has a small delay since these error can happen normally.
} else {
delay = Math.floor(Math.random() * 3000 + 2000);
}
console.log(`retry again in ${delay} ms`);
await SleepPromise(delay);
@ -156,7 +161,7 @@ export class CommunicationManagerService {
this._stopCommunicationEvent.emit();
}
private goOffline(streamContainer: StreamContainer, reason: OfflineReason): void {
private goOffline<T>(streamContainer: StreamContainer<T>, reason: OfflineReason): void {
delete this.streamContainers[streamContainer.id];
this.closeConnections(); // here we close the connections early.
this.offlineBroadcastService.goOffline(reason);

View File

@ -55,18 +55,26 @@ export class StreamConnectionError extends Error {
}
}
export class StreamContainer {
export class StreamContainer<T> {
public readonly id = Math.floor(Math.random() * (900000 - 1) + 100000); // [100000, 999999]
public messageHandler: (message: T) => void;
public hasErroredAmount = 0;
public stream?: Stream<any>;
public stream?: Stream<T>;
public constructor(
public url: string,
public messageHandler: (message: any) => void,
public params: () => Params
) {}
public constructor(public url: string, messageHandler: (message: T) => void, public params: () => Params) {
this.messageHandler = (message: T) => {
// {connected: true} is a special message just to trigger the code below
if ((<any>message).connected) {
console.log(`resetting error amount for ${this.url} since there was a connect message`);
this.hasErroredAmount = 0;
} else {
messageHandler(message);
}
};
}
}
export class Stream<T> {
@ -176,11 +184,8 @@ export class Stream<T> {
}
return;
} else {
// check, if we didn't get a keep alive
if (Object.keys(parsedContent).length > 0) {
console.debug('received', parsedContent);
this.messageHandler(parsedContent);
}
console.debug('received', parsedContent);
this.messageHandler(parsedContent);
}
} else {
this.checkedUntilIndex = event.loaded;
@ -247,7 +252,7 @@ export class Stream<T> {
export class StreamingCommunicationService {
public constructor(private http: HttpClient) {}
public subscribe<T>(streamContainer: StreamContainer, errorHandler: ErrorHandler): void {
public subscribe<T>(streamContainer: StreamContainer<T>, errorHandler: ErrorHandler): void {
const options: {
body?: any;
headers?: HttpHeaders | { [header: string]: string | string[] };