Merge pull request #4713 from FinnStutzenstein/tooHighChangeIdHandling

Handles 'change id too high' and generic websocket error message format
This commit is contained in:
Emanuel Schütze 2019-05-16 20:25:13 +02:00 committed by GitHub
commit fef386ddf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 24 deletions

View File

@ -1,6 +1,6 @@
import { Injectable } from '@angular/core';
import { WebsocketService } from './websocket.service';
import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service';
import { CollectionStringMapperService } from './collection-string-mapper.service';
import { DataStoreService } from './data-store.service';
import { BaseModel } from '../../shared/models/base/base-model';
@ -61,6 +61,13 @@ export class AutoupdateService {
this.websocketService.getOberservable<AutoupdateFormat>('autoupdate').subscribe(response => {
this.storeResponse(response);
});
// Check for too high change id-errors. If this happens, reset the DS and get fresh data.
this.websocketService.errorResponseObservable.subscribe(error => {
if (error.code === WEBSOCKET_ERROR_CODES.CHANGE_ID_TOO_HIGH) {
this.doFullUpdate();
}
});
}
/**
@ -159,6 +166,7 @@ export class AutoupdateService {
public async doFullUpdate(): Promise<void> {
const response = await this.websocketService.sendAndGetResponse<{}, AutoupdateFormat>('getElements', {});
const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS);
let allModels: BaseModel[] = [];
for (const collection of Object.keys(response.changed)) {
if (this.modelMapper.isCollectionRegistered(collection)) {
@ -169,5 +177,6 @@ export class AutoupdateService {
}
await this.DS.set(allModels, response.to_change_id);
this.DSUpdateManager.commit(updateSlot);
}
}

View File

@ -32,6 +32,28 @@ interface IncommingWebsocketMessage extends BaseWebsocketMessage {
in_response?: string;
}
/**
* The format of a messages content, if the message type is "error"
*/
interface WebsocketErrorContent {
code: number;
message: string;
}
function isWebsocketErrorContent(obj: any): obj is WebsocketErrorContent {
return !!obj && obj.code !== undefined && obj.message !== undefined;
}
/**
* All (custom) error codes that are used to pass error information
* from the server to the client
*/
export const WEBSOCKET_ERROR_CODES = {
NOT_AUTHORIZED: 100,
CHANGE_ID_TOO_HIGH: 101,
WRONG_FORMAT: 102
};
/*
* Options for (re-)connecting.
*/
@ -57,7 +79,7 @@ export class WebsocketService {
* Subjects that will be called, if a reconnect after a retry (e.g. with a previous
* connection loss) was successful.
*/
private _retryReconnectEvent: EventEmitter<void> = new EventEmitter<void>();
private readonly _retryReconnectEvent: EventEmitter<void> = new EventEmitter<void>();
/**
* Getter for the retry reconnect event.
@ -69,7 +91,7 @@ export class WebsocketService {
/**
* Listeners will be nofitied, if the wesocket connection is establiched.
*/
private _connectEvent: EventEmitter<void> = new EventEmitter<void>();
private readonly _connectEvent: EventEmitter<void> = new EventEmitter<void>();
/**
* Getter for the connect event.
@ -81,7 +103,7 @@ export class WebsocketService {
/**
* Listeners will be nofitied, if the wesocket connection is closed.
*/
private _closeEvent: EventEmitter<void> = new EventEmitter<void>();
private readonly _closeEvent: EventEmitter<void> = new EventEmitter<void>();
/**
* Getter for the close event.
@ -90,6 +112,18 @@ export class WebsocketService {
return this._closeEvent;
}
/**
* The subject for all websocket *message* errors (no connection errors).
*/
private readonly _errorResponseSubject = new Subject<WebsocketErrorContent>();
/**
* The error response obersable for all websocket message errors.
*/
public get errorResponseObservable(): Observable<WebsocketErrorContent> {
return this._errorResponseSubject.asObservable();
}
/**
* Saves, if the connection is open
*/
@ -115,9 +149,12 @@ export class WebsocketService {
private subjects: { [type: string]: Subject<any> } = {};
/**
* Callbacks for a waiting response
* Callbacks for a waiting response. If any callback returns true, the message/error will not be propagated with the
* responsible subjects for the message type.
*/
private responseCallbacks: { [id: string]: [(val: any) => boolean, (error: string) => void | null] } = {};
private responseCallbacks: {
[id: string]: [(val: any) => boolean, (error: WebsocketErrorContent) => boolean];
} = {};
/**
* Saves, if the WS Connection should be closed (e.g. after an explicit `close()`). Prohibits
@ -243,10 +280,24 @@ export class WebsocketService {
}
if (type === 'error') {
console.error('Websocket error', message.content);
if (inResponse && callbacks && callbacks[1]) {
callbacks[1](message.content as string);
if (!isWebsocketErrorContent(message.content)) {
console.error('Websocket error without standard form!', message);
return;
}
// Print this to the console.
const error = message.content;
const errorDescription =
Object.keys(WEBSOCKET_ERROR_CODES).find(key => WEBSOCKET_ERROR_CODES[key] === error.code) ||
'unknown code';
console.error(`Websocket error with code=${error.code} (${errorDescription}):`, error.message);
// call the error callback, if there is any. If it returns true (means "handled"),
// the errorResponseSubject will not be called
if (inResponse && callbacks && callbacks[1] && callbacks[1](error)) {
return;
}
this._errorResponseSubject.next(error);
return;
}
@ -357,8 +408,10 @@ export class WebsocketService {
*
* @param type the message type
* @param content the actual content
* @param success an optional success callback for a response
* @param error an optional error callback for a response
* @param success an optional success callback for a response. If it returns true, the message will not be
* propagated through the recieve subjects.
* @param error an optional error callback for a response. If it returns true, the error will not be propagated
* with the error subject.
* @param id an optional id for the message. If not given, a random id will be generated and returned.
* @returns the message id
*/
@ -366,7 +419,7 @@ export class WebsocketService {
type: string,
content: T,
success?: (val: R) => boolean,
error?: (error: string) => void,
error?: (error: WebsocketErrorContent) => boolean,
id?: string
): string {
if (!this.websocket) {
@ -419,7 +472,10 @@ export class WebsocketService {
resolve(val);
return true;
},
reject,
val => {
reject(val);
return true;
},
id
);
});

View File

@ -5,6 +5,8 @@ from ..utils.constants import get_constants
from ..utils.projector import get_projector_data
from ..utils.stats import WebsocketLatencyLogger
from ..utils.websocket import (
WEBSOCKET_CHANGE_ID_TOO_HIGH,
WEBSOCKET_NOT_AUTHORIZED,
BaseWebsocketClientMessage,
ProtocollAsyncJsonWebsocketConsumer,
get_element_data,
@ -58,9 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage):
if perm is not None and not await async_has_perm(
consumer.scope["user"]["id"], perm
):
await consumer.send_json(
type="error",
content=f"You need '{perm}' to send this message.",
await consumer.send_error(
code=WEBSOCKET_NOT_AUTHORIZED,
message=f"You need '{perm}' to send this message.",
in_response=id,
)
else:
@ -119,7 +121,9 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage):
consumer.scope["user"]["id"], requested_change_id
)
except ValueError as error:
await consumer.send_json(type="error", content=str(error), in_response=id)
await consumer.send_error(
code=WEBSOCKET_CHANGE_ID_TOO_HIGH, message=str(error), in_response=id
)
else:
await consumer.send_json(
type="autoupdate", content=element_data, in_response=id

View File

@ -10,6 +10,21 @@ from .cache import element_cache
from .utils import split_element_id
# Custom Websocket error codes (not to be confused with the websocket *connection*
# status codes like 1000 or 1006. These are custom ones for OpenSlides to give a
# machine parseable response, so the client can react on errors.
WEBSOCKET_NOT_AUTHORIZED = 100
# E.g. if a user does not have the right permission(s) for a message.
WEBSOCKET_CHANGE_ID_TOO_HIGH = 101
# If data is requested and the given change id is higher than the highest change id
# from the element_cache.
WEBSOCKET_WRONG_FORMAT = 10
# If the recieved data has not the expected format.
class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
"""
Mixin for JSONWebsocketConsumers, that speaks the a special protocol.
@ -44,6 +59,24 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
if not silence_errors:
raise e
async def send_error(
self,
code: int,
message: str,
in_response: Optional[str] = None,
silence_errors: Optional[bool] = True,
) -> None:
"""
Send generic error messages with a custom status code (see above) and a text message.
"""
await self.send_json(
"error",
{"code": code, "message": message},
None,
in_response=in_response,
silence_errors=silence_errors,
)
async def receive_json(self, content: Any) -> None:
"""
Receives the json data, parses it and calls receive_content.
@ -57,8 +90,8 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
# content is not a dict (TypeError) or has not the key id (KeyError)
in_response = None
await self.send_json(
type="error", content=str(err), in_response=in_response
await self.send_error(
code=WEBSOCKET_WRONG_FORMAT, message=str(err), in_response=in_response
)
return
@ -158,7 +191,9 @@ async def get_element_data(user_id: int, change_id: int = 0) -> AutoupdateFormat
"""
current_change_id = await element_cache.get_current_change_id()
if change_id > current_change_id:
raise ValueError("Requested change_id is higher this highest change_id.")
raise ValueError(
f"Requested change_id {change_id} is higher this highest change_id {current_change_id}."
)
try:
changed_elements, deleted_element_ids = await element_cache.get_restricted_data(
user_id, change_id, current_change_id

View File

@ -16,6 +16,7 @@ from openslides.utils.autoupdate import (
inform_deleted_data,
)
from openslides.utils.cache import element_cache
from openslides.utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH
from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider
from ..helpers import TConfig, TProjector, TUser
@ -396,10 +397,10 @@ async def test_send_connect_twice_with_clear_change_id_cache(communicator, set_c
response2 = await communicator.receive_json_from()
assert response2["type"] == "error"
assert (
response2.get("content")
== "Requested change_id is higher this highest change_id."
)
assert response2.get("content") == {
"code": WEBSOCKET_CHANGE_ID_TOO_HIGH,
"message": "Requested change_id 2 is higher this highest change_id 1.",
}
@pytest.mark.xfail # This test is broken