Fixes offline bar on successful reconnections (closes #5810).
This commit is contained in:
parent
cc65b756c7
commit
1380812924
@ -1 +1 @@
|
|||||||
Subproject commit b187dd439bd7456e105901ca96cd0995862d4419
|
Subproject commit 03d86865c063059878f4c3d616b6a6a9099b90d3
|
@ -68,6 +68,12 @@ export class AutoupdateThrottleService {
|
|||||||
this.disabledUntil = null;
|
this.disabledUntil = null;
|
||||||
console.log('Throttling autoupdates again');
|
console.log('Throttling autoupdates again');
|
||||||
}
|
}
|
||||||
|
} else if (autoupdate.all_data) {
|
||||||
|
// all_data=true (aka initial data) should be processed immediatly
|
||||||
|
// but since there can be pending autoupdates, add it there and
|
||||||
|
// process them now!
|
||||||
|
this.pendingAutoupdates.push(autoupdate);
|
||||||
|
this.processPendingAutoupdates();
|
||||||
} else {
|
} else {
|
||||||
this.pendingAutoupdates.push(autoupdate);
|
this.pendingAutoupdates.push(autoupdate);
|
||||||
this.receivedAutoupdate.emit();
|
this.receivedAutoupdate.emit();
|
||||||
@ -91,12 +97,19 @@ export class AutoupdateThrottleService {
|
|||||||
this.disabledUntil = changeId;
|
this.disabledUntil = changeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* discard all pending autoupdates and resets the timer
|
||||||
|
*/
|
||||||
|
public discard(): void {
|
||||||
|
this.pendingAutoupdates = [];
|
||||||
|
}
|
||||||
|
|
||||||
private processPendingAutoupdates(): void {
|
private processPendingAutoupdates(): void {
|
||||||
const autoupdates = this.pendingAutoupdates;
|
if (this.pendingAutoupdates.length === 0) {
|
||||||
if (autoupdates.length === 0) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.pendingAutoupdates = [];
|
const autoupdates = this.pendingAutoupdates;
|
||||||
|
this.discard();
|
||||||
|
|
||||||
console.log(`Processing ${autoupdates.length} pending autoupdates`);
|
console.log(`Processing ${autoupdates.length} pending autoupdates`);
|
||||||
const autoupdate = this.mergeAutoupdates(autoupdates);
|
const autoupdate = this.mergeAutoupdates(autoupdates);
|
||||||
|
@ -59,6 +59,7 @@ export class AutoupdateService {
|
|||||||
this.streamCloseFn();
|
this.streamCloseFn();
|
||||||
this.streamCloseFn = null;
|
this.streamCloseFn = null;
|
||||||
}
|
}
|
||||||
|
this.autoupdateThrottle.discard();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,7 +54,7 @@ export class CommunicationManagerService {
|
|||||||
return this._stopCommunicationEvent.asObservable();
|
return this._stopCommunicationEvent.asObservable();
|
||||||
}
|
}
|
||||||
|
|
||||||
private streamContainers: { [id: number]: StreamContainer } = {};
|
private streamContainers: { [id: number]: StreamContainer<any> } = {};
|
||||||
|
|
||||||
public constructor(
|
public constructor(
|
||||||
private streamingCommunicationService: StreamingCommunicationService,
|
private streamingCommunicationService: StreamingCommunicationService,
|
||||||
@ -87,7 +87,7 @@ export class CommunicationManagerService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async connectWithWrapper(streamContainer: StreamContainer): Promise<() => void> {
|
private async connectWithWrapper<T>(streamContainer: StreamContainer<T>): Promise<() => void> {
|
||||||
console.log('connect', streamContainer, streamContainer.stream);
|
console.log('connect', streamContainer, streamContainer.stream);
|
||||||
const errorHandler = (type: ErrorType, error: CommunicationError, message: string) =>
|
const errorHandler = (type: ErrorType, error: CommunicationError, message: string) =>
|
||||||
this.handleError(streamContainer, type, error, message);
|
this.handleError(streamContainer, type, error, message);
|
||||||
@ -96,8 +96,8 @@ export class CommunicationManagerService {
|
|||||||
return () => this.close(streamContainer);
|
return () => this.close(streamContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async handleError(
|
private async handleError<T>(
|
||||||
streamContainer: StreamContainer,
|
streamContainer: StreamContainer<T>,
|
||||||
type: ErrorType,
|
type: ErrorType,
|
||||||
error: CommunicationError,
|
error: CommunicationError,
|
||||||
message: string
|
message: string
|
||||||
@ -109,7 +109,7 @@ export class CommunicationManagerService {
|
|||||||
streamContainer.hasErroredAmount++;
|
streamContainer.hasErroredAmount++;
|
||||||
if (streamContainer.hasErroredAmount > MAX_STREAM_FAILURE_RETRIES) {
|
if (streamContainer.hasErroredAmount > MAX_STREAM_FAILURE_RETRIES) {
|
||||||
this.goOffline(streamContainer, OfflineReason.ConnectionLost);
|
this.goOffline(streamContainer, OfflineReason.ConnectionLost);
|
||||||
} else if (type === ErrorType.Client && error.type === 'ErrorAuth') {
|
} else if (type === ErrorType.Client && error.type === 'auth_required') {
|
||||||
this.goOffline(streamContainer, OfflineReason.WhoAmIFailed);
|
this.goOffline(streamContainer, OfflineReason.WhoAmIFailed);
|
||||||
} else {
|
} else {
|
||||||
// retry it after some time:
|
// retry it after some time:
|
||||||
@ -117,7 +117,7 @@ export class CommunicationManagerService {
|
|||||||
`Retry no. ${streamContainer.hasErroredAmount} of ${MAX_STREAM_FAILURE_RETRIES} for ${streamContainer.url}`
|
`Retry no. ${streamContainer.hasErroredAmount} of ${MAX_STREAM_FAILURE_RETRIES} for ${streamContainer.url}`
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
await this.delayAndCheckReconnection();
|
await this.delayAndCheckReconnection(streamContainer);
|
||||||
await this.connectWithWrapper(streamContainer);
|
await this.connectWithWrapper(streamContainer);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
// delayAndCheckReconnection can throw an OfflineError,
|
// delayAndCheckReconnection can throw an OfflineError,
|
||||||
@ -126,8 +126,13 @@ export class CommunicationManagerService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async delayAndCheckReconnection(): Promise<void> {
|
private async delayAndCheckReconnection<T>(streamContainer: StreamContainer<T>): Promise<void> {
|
||||||
const delay = Math.floor(Math.random() * 3000 + 2000);
|
let delay;
|
||||||
|
if (streamContainer.hasErroredAmount === 1) {
|
||||||
|
delay = 500; // the first error has a small delay since these error can happen normally.
|
||||||
|
} else {
|
||||||
|
delay = Math.floor(Math.random() * 3000 + 2000);
|
||||||
|
}
|
||||||
console.log(`retry again in ${delay} ms`);
|
console.log(`retry again in ${delay} ms`);
|
||||||
|
|
||||||
await SleepPromise(delay);
|
await SleepPromise(delay);
|
||||||
@ -156,7 +161,7 @@ export class CommunicationManagerService {
|
|||||||
this._stopCommunicationEvent.emit();
|
this._stopCommunicationEvent.emit();
|
||||||
}
|
}
|
||||||
|
|
||||||
private goOffline(streamContainer: StreamContainer, reason: OfflineReason): void {
|
private goOffline<T>(streamContainer: StreamContainer<T>, reason: OfflineReason): void {
|
||||||
delete this.streamContainers[streamContainer.id];
|
delete this.streamContainers[streamContainer.id];
|
||||||
this.closeConnections(); // here we close the connections early.
|
this.closeConnections(); // here we close the connections early.
|
||||||
this.offlineBroadcastService.goOffline(reason);
|
this.offlineBroadcastService.goOffline(reason);
|
||||||
|
@ -55,18 +55,26 @@ export class StreamConnectionError extends Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class StreamContainer {
|
export class StreamContainer<T> {
|
||||||
public readonly id = Math.floor(Math.random() * (900000 - 1) + 100000); // [100000, 999999]
|
public readonly id = Math.floor(Math.random() * (900000 - 1) + 100000); // [100000, 999999]
|
||||||
|
|
||||||
|
public messageHandler: (message: T) => void;
|
||||||
|
|
||||||
public hasErroredAmount = 0;
|
public hasErroredAmount = 0;
|
||||||
|
|
||||||
public stream?: Stream<any>;
|
public stream?: Stream<T>;
|
||||||
|
|
||||||
public constructor(
|
public constructor(public url: string, messageHandler: (message: T) => void, public params: () => Params) {
|
||||||
public url: string,
|
this.messageHandler = (message: T) => {
|
||||||
public messageHandler: (message: any) => void,
|
// {connected: true} is a special message just to trigger the code below
|
||||||
public params: () => Params
|
if ((<any>message).connected) {
|
||||||
) {}
|
console.log(`resetting error amount for ${this.url} since there was a connect message`);
|
||||||
|
this.hasErroredAmount = 0;
|
||||||
|
} else {
|
||||||
|
messageHandler(message);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Stream<T> {
|
export class Stream<T> {
|
||||||
@ -176,12 +184,9 @@ export class Stream<T> {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
// check, if we didn't get a keep alive
|
|
||||||
if (Object.keys(parsedContent).length > 0) {
|
|
||||||
console.debug('received', parsedContent);
|
console.debug('received', parsedContent);
|
||||||
this.messageHandler(parsedContent);
|
this.messageHandler(parsedContent);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
this.checkedUntilIndex = event.loaded;
|
this.checkedUntilIndex = event.loaded;
|
||||||
}
|
}
|
||||||
@ -247,7 +252,7 @@ export class Stream<T> {
|
|||||||
export class StreamingCommunicationService {
|
export class StreamingCommunicationService {
|
||||||
public constructor(private http: HttpClient) {}
|
public constructor(private http: HttpClient) {}
|
||||||
|
|
||||||
public subscribe<T>(streamContainer: StreamContainer, errorHandler: ErrorHandler): void {
|
public subscribe<T>(streamContainer: StreamContainer<T>, errorHandler: ErrorHandler): void {
|
||||||
const options: {
|
const options: {
|
||||||
body?: any;
|
body?: any;
|
||||||
headers?: HttpHeaders | { [header: string]: string | string[] };
|
headers?: HttpHeaders | { [header: string]: string | string[] };
|
||||||
|
Loading…
Reference in New Issue
Block a user