diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index d10109ede..912304767 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@angular/core'; -import { WebsocketService } from './websocket.service'; +import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; import { CollectionStringMapperService } from './collection-string-mapper.service'; import { DataStoreService } from './data-store.service'; import { BaseModel } from '../../shared/models/base/base-model'; @@ -61,6 +61,13 @@ export class AutoupdateService { this.websocketService.getOberservable('autoupdate').subscribe(response => { this.storeResponse(response); }); + + // Check for too high change id-errors. If this happens, reset the DS and get fresh data. + this.websocketService.errorResponseObservable.subscribe(error => { + if (error.code === WEBSOCKET_ERROR_CODES.CHANGE_ID_TOO_HIGH) { + this.doFullUpdate(); + } + }); } /** @@ -159,6 +166,7 @@ export class AutoupdateService { public async doFullUpdate(): Promise { const response = await this.websocketService.sendAndGetResponse<{}, AutoupdateFormat>('getElements', {}); + const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); let allModels: BaseModel[] = []; for (const collection of Object.keys(response.changed)) { if (this.modelMapper.isCollectionRegistered(collection)) { @@ -169,5 +177,6 @@ export class AutoupdateService { } await this.DS.set(allModels, response.to_change_id); + this.DSUpdateManager.commit(updateSlot); } } diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 49cfa4346..3b0f229c8 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -32,6 +32,28 @@ interface IncommingWebsocketMessage extends BaseWebsocketMessage { in_response?: string; } +/** + * The format of a messages content, if the message type is "error" + */ +interface WebsocketErrorContent { + code: number; + message: string; +} + +function isWebsocketErrorContent(obj: any): obj is WebsocketErrorContent { + return !!obj && obj.code !== undefined && obj.message !== undefined; +} + +/** + * All (custom) error codes that are used to pass error information + * from the server to the client + */ +export const WEBSOCKET_ERROR_CODES = { + NOT_AUTHORIZED: 100, + CHANGE_ID_TOO_HIGH: 101, + WRONG_FORMAT: 102 +}; + /* * Options for (re-)connecting. */ @@ -57,7 +79,7 @@ export class WebsocketService { * Subjects that will be called, if a reconnect after a retry (e.g. with a previous * connection loss) was successful. */ - private _retryReconnectEvent: EventEmitter = new EventEmitter(); + private readonly _retryReconnectEvent: EventEmitter = new EventEmitter(); /** * Getter for the retry reconnect event. @@ -69,7 +91,7 @@ export class WebsocketService { /** * Listeners will be nofitied, if the wesocket connection is establiched. */ - private _connectEvent: EventEmitter = new EventEmitter(); + private readonly _connectEvent: EventEmitter = new EventEmitter(); /** * Getter for the connect event. @@ -81,7 +103,7 @@ export class WebsocketService { /** * Listeners will be nofitied, if the wesocket connection is closed. */ - private _closeEvent: EventEmitter = new EventEmitter(); + private readonly _closeEvent: EventEmitter = new EventEmitter(); /** * Getter for the close event. @@ -90,6 +112,18 @@ export class WebsocketService { return this._closeEvent; } + /** + * The subject for all websocket *message* errors (no connection errors). + */ + private readonly _errorResponseSubject = new Subject(); + + /** + * The error response obersable for all websocket message errors. + */ + public get errorResponseObservable(): Observable { + return this._errorResponseSubject.asObservable(); + } + /** * Saves, if the connection is open */ @@ -115,9 +149,12 @@ export class WebsocketService { private subjects: { [type: string]: Subject } = {}; /** - * Callbacks for a waiting response + * Callbacks for a waiting response. If any callback returns true, the message/error will not be propagated with the + * responsible subjects for the message type. */ - private responseCallbacks: { [id: string]: [(val: any) => boolean, (error: string) => void | null] } = {}; + private responseCallbacks: { + [id: string]: [(val: any) => boolean, (error: WebsocketErrorContent) => boolean]; + } = {}; /** * Saves, if the WS Connection should be closed (e.g. after an explicit `close()`). Prohibits @@ -243,10 +280,24 @@ export class WebsocketService { } if (type === 'error') { - console.error('Websocket error', message.content); - if (inResponse && callbacks && callbacks[1]) { - callbacks[1](message.content as string); + if (!isWebsocketErrorContent(message.content)) { + console.error('Websocket error without standard form!', message); + return; } + + // Print this to the console. + const error = message.content; + const errorDescription = + Object.keys(WEBSOCKET_ERROR_CODES).find(key => WEBSOCKET_ERROR_CODES[key] === error.code) || + 'unknown code'; + console.error(`Websocket error with code=${error.code} (${errorDescription}):`, error.message); + + // call the error callback, if there is any. If it returns true (means "handled"), + // the errorResponseSubject will not be called + if (inResponse && callbacks && callbacks[1] && callbacks[1](error)) { + return; + } + this._errorResponseSubject.next(error); return; } @@ -357,8 +408,10 @@ export class WebsocketService { * * @param type the message type * @param content the actual content - * @param success an optional success callback for a response - * @param error an optional error callback for a response + * @param success an optional success callback for a response. If it returns true, the message will not be + * propagated through the recieve subjects. + * @param error an optional error callback for a response. If it returns true, the error will not be propagated + * with the error subject. * @param id an optional id for the message. If not given, a random id will be generated and returned. * @returns the message id */ @@ -366,7 +419,7 @@ export class WebsocketService { type: string, content: T, success?: (val: R) => boolean, - error?: (error: string) => void, + error?: (error: WebsocketErrorContent) => boolean, id?: string ): string { if (!this.websocket) { @@ -419,7 +472,10 @@ export class WebsocketService { resolve(val); return true; }, - reject, + val => { + reject(val); + return true; + }, id ); }); diff --git a/openslides/core/websocket.py b/openslides/core/websocket.py index ff4d8e760..a711f0430 100644 --- a/openslides/core/websocket.py +++ b/openslides/core/websocket.py @@ -5,6 +5,8 @@ from ..utils.constants import get_constants from ..utils.projector import get_projector_data from ..utils.stats import WebsocketLatencyLogger from ..utils.websocket import ( + WEBSOCKET_CHANGE_ID_TOO_HIGH, + WEBSOCKET_NOT_AUTHORIZED, BaseWebsocketClientMessage, ProtocollAsyncJsonWebsocketConsumer, get_element_data, @@ -58,9 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): if perm is not None and not await async_has_perm( consumer.scope["user"]["id"], perm ): - await consumer.send_json( - type="error", - content=f"You need '{perm}' to send this message.", + await consumer.send_error( + code=WEBSOCKET_NOT_AUTHORIZED, + message=f"You need '{perm}' to send this message.", in_response=id, ) else: @@ -119,7 +121,9 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): consumer.scope["user"]["id"], requested_change_id ) except ValueError as error: - await consumer.send_json(type="error", content=str(error), in_response=id) + await consumer.send_error( + code=WEBSOCKET_CHANGE_ID_TOO_HIGH, message=str(error), in_response=id + ) else: await consumer.send_json( type="autoupdate", content=element_data, in_response=id diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index 032fb342c..af090fb68 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -10,6 +10,21 @@ from .cache import element_cache from .utils import split_element_id +# Custom Websocket error codes (not to be confused with the websocket *connection* +# status codes like 1000 or 1006. These are custom ones for OpenSlides to give a +# machine parseable response, so the client can react on errors. + +WEBSOCKET_NOT_AUTHORIZED = 100 +# E.g. if a user does not have the right permission(s) for a message. + +WEBSOCKET_CHANGE_ID_TOO_HIGH = 101 +# If data is requested and the given change id is higher than the highest change id +# from the element_cache. + +WEBSOCKET_WRONG_FORMAT = 10 +# If the recieved data has not the expected format. + + class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): """ Mixin for JSONWebsocketConsumers, that speaks the a special protocol. @@ -44,6 +59,24 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): if not silence_errors: raise e + async def send_error( + self, + code: int, + message: str, + in_response: Optional[str] = None, + silence_errors: Optional[bool] = True, + ) -> None: + """ + Send generic error messages with a custom status code (see above) and a text message. + """ + await self.send_json( + "error", + {"code": code, "message": message}, + None, + in_response=in_response, + silence_errors=silence_errors, + ) + async def receive_json(self, content: Any) -> None: """ Receives the json data, parses it and calls receive_content. @@ -57,8 +90,8 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): # content is not a dict (TypeError) or has not the key id (KeyError) in_response = None - await self.send_json( - type="error", content=str(err), in_response=in_response + await self.send_error( + code=WEBSOCKET_WRONG_FORMAT, message=str(err), in_response=in_response ) return @@ -158,7 +191,9 @@ async def get_element_data(user_id: int, change_id: int = 0) -> AutoupdateFormat """ current_change_id = await element_cache.get_current_change_id() if change_id > current_change_id: - raise ValueError("Requested change_id is higher this highest change_id.") + raise ValueError( + f"Requested change_id {change_id} is higher this highest change_id {current_change_id}." + ) try: changed_elements, deleted_element_ids = await element_cache.get_restricted_data( user_id, change_id, current_change_id diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index e08946847..05f05e12e 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -16,6 +16,7 @@ from openslides.utils.autoupdate import ( inform_deleted_data, ) from openslides.utils.cache import element_cache +from openslides.utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH from ...unit.utils.cache_provider import Collection1, Collection2, get_cachable_provider from ..helpers import TConfig, TProjector, TUser @@ -396,10 +397,10 @@ async def test_send_connect_twice_with_clear_change_id_cache(communicator, set_c response2 = await communicator.receive_json_from() assert response2["type"] == "error" - assert ( - response2.get("content") - == "Requested change_id is higher this highest change_id." - ) + assert response2.get("content") == { + "code": WEBSOCKET_CHANGE_ID_TOO_HIGH, + "message": "Requested change_id 2 is higher this highest change_id 1.", + } @pytest.mark.xfail # This test is broken