diff --git a/SETTINGS.rst b/SETTINGS.rst index 64163c001..dc42430f0 100644 --- a/SETTINGS.rst +++ b/SETTINGS.rst @@ -143,3 +143,7 @@ not affect the client. operator is in one of these groups, the client disconnected and reconnects again. All requests urls (including websockets) are now prefixed with `/prioritize`, so these requests from "prioritized clients" can be routed to different servers. + +`AUTOUPDATE_DELAY`: The delay to send autoupdates. This feature can be +deactivated by setting it to `None`. It is deactivated per default. The Delay is +given in seconds diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index 3303d0740..72fbc1f88 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -6,7 +6,7 @@ import { DataStoreService, DataStoreUpdateManagerService } from './data-store.se import { Mutex } from '../promises/mutex'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; -interface AutoupdateFormat { +export interface AutoupdateFormat { /** * All changed (and created) items as their full/restricted data grouped by their collection. */ @@ -37,6 +37,19 @@ interface AutoupdateFormat { all_data: boolean; } +export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat { + const format = obj as AutoupdateFormat; + return ( + obj && + typeof obj === 'object' && + format.changed !== undefined && + format.deleted !== undefined && + format.from_change_id !== undefined && + format.to_change_id !== undefined && + format.all_data !== undefined + ); +} + /** * Handles the initial update and automatic updates using the {@link WebsocketService} * Incoming objects, usually BaseModels, will be saved in the dataStore (`this.DS`) @@ -120,27 +133,40 @@ export class AutoupdateService { // Normal autoupdate if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) { - const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); - - // Delete the removed objects from the DataStore - for (const collection of Object.keys(autoupdate.deleted)) { - await this.DS.remove(collection, autoupdate.deleted[collection]); - } - - // Add the objects to the DataStore. - for (const collection of Object.keys(autoupdate.changed)) { - await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection])); - } - - await this.DS.flushToStorage(autoupdate.to_change_id); - - this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id); + this.injectAutupdateIntoDS(autoupdate, true); } else { // autoupdate fully in the future. we are missing something! this.requestChanges(); } } + public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); + console.debug('inject autoupdate', autoupdate); + this.injectAutupdateIntoDS(autoupdate, false); + unlock(); + } + + private async injectAutupdateIntoDS(autoupdate: AutoupdateFormat, flush: boolean): Promise { + const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); + + // Delete the removed objects from the DataStore + for (const collection of Object.keys(autoupdate.deleted)) { + await this.DS.remove(collection, autoupdate.deleted[collection]); + } + + // Add the objects to the DataStore. + for (const collection of Object.keys(autoupdate.changed)) { + await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection])); + } + + if (flush) { + await this.DS.flushToStorage(autoupdate.to_change_id); + } + + this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id); + } + /** * Creates baseModels for each plain object. If the collection is not registered, * A console error will be issued and an empty list returned. @@ -164,9 +190,8 @@ export class AutoupdateService { * The server should return an autoupdate with all new data. */ public requestChanges(): void { - const changeId = this.DS.maxChangeId === 0 ? 0 : this.DS.maxChangeId + 1; - console.log(`requesting changed objects with DS max change id ${changeId}`); - this.websocketService.send('getElements', { change_id: changeId }); + console.log(`requesting changed objects with DS max change id ${this.DS.maxChangeId}`); + this.websocketService.send('getElements', { change_id: this.DS.maxChangeId }); } /** diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index b9ed07a08..a9ff9fc7e 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,7 +258,6 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { - console.warn('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } diff --git a/client/src/app/core/core-services/http.service.ts b/client/src/app/core/core-services/http.service.ts index 7dfffc0ae..fbf1f01fc 100644 --- a/client/src/app/core/core-services/http.service.ts +++ b/client/src/app/core/core-services/http.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@angular/core'; import { TranslateService } from '@ngx-translate/core'; +import { AutoupdateFormat, AutoupdateService, isAutoupdateFormat } from './autoupdate.service'; import { OpenSlidesStatusService } from './openslides-status.service'; import { formatQueryParams, QueryParams } from '../definitions/query-params'; @@ -17,12 +18,12 @@ export enum HTTPMethod { DELETE = 'delete' } -export interface DetailResponse { +export interface ErrorDetailResponse { detail: string | string[]; args?: string[]; } -function isDetailResponse(obj: any): obj is DetailResponse { +function isErrorDetailResponse(obj: any): obj is ErrorDetailResponse { return ( obj && typeof obj === 'object' && @@ -31,6 +32,15 @@ function isDetailResponse(obj: any): obj is DetailResponse { ); } +interface AutoupdateResponse { + autoupdate: AutoupdateFormat; + data?: any; +} + +function isAutoupdateReponse(obj: any): obj is AutoupdateResponse { + return obj && typeof obj === 'object' && isAutoupdateFormat((obj as AutoupdateResponse).autoupdate); +} + /** * Service for managing HTTP requests. Allows to send data for every method. Also (TODO) will do generic error handling. */ @@ -55,7 +65,8 @@ export class HttpService { public constructor( private http: HttpClient, private translate: TranslateService, - private OSStatus: OpenSlidesStatusService + private OSStatus: OpenSlidesStatusService, + private autoupdateService: AutoupdateService ) { this.defaultHeaders = new HttpHeaders().set('Content-Type', 'application/json'); } @@ -82,7 +93,7 @@ export class HttpService { ): Promise { // end early, if we are in history mode if (this.OSStatus.isInHistoryMode && method !== HTTPMethod.GET) { - throw this.handleError('You cannot make changes while in history mode'); + throw this.processError('You cannot make changes while in history mode'); } // there is a current bug with the responseType. @@ -108,9 +119,10 @@ export class HttpService { }; try { - return await this.http.request(method, url, options).toPromise(); - } catch (e) { - throw this.handleError(e); + const responseData: T = await this.http.request(method, url, options).toPromise(); + return this.processResponse(responseData); + } catch (error) { + throw this.processError(error); } } @@ -120,7 +132,7 @@ export class HttpService { * @param e The error thrown. * @returns The prepared and translated message for the user */ - private handleError(e: any): string { + private processError(e: any): string { let error = this.translate.instant('Error') + ': '; // If the error is a string already, return it. if (typeof e === 'string') { @@ -142,12 +154,14 @@ export class HttpService { } else if (!e.error) { error += this.translate.instant("The server didn't respond."); } else if (typeof e.error === 'object') { - if (isDetailResponse(e.error)) { - error += this.processDetailResponse(e.error); + if (isErrorDetailResponse(e.error)) { + error += this.processErrorDetailResponse(e.error); } else { const errorList = Object.keys(e.error).map(key => { const capitalizedKey = key.charAt(0).toUpperCase() + key.slice(1); - return `${this.translate.instant(capitalizedKey)}: ${this.processDetailResponse(e.error[key])}`; + return `${this.translate.instant(capitalizedKey)}: ${this.processErrorDetailResponse( + e.error[key] + )}`; }); error = errorList.join(', '); } @@ -168,11 +182,9 @@ export class HttpService { * @param str a string or a string array to join together. * @returns Error text(s) as single string */ - private processDetailResponse(response: DetailResponse): string { + private processErrorDetailResponse(response: ErrorDetailResponse): string { let message: string; - if (response instanceof Array) { - message = response.join(' '); - } else if (response.detail instanceof Array) { + if (response.detail instanceof Array) { message = response.detail.join(' '); } else { message = response.detail; @@ -187,6 +199,14 @@ export class HttpService { return message; } + private processResponse(responseData: T): T { + if (isAutoupdateReponse(responseData)) { + this.autoupdateService.injectAutoupdateIgnoreChangeId(responseData.autoupdate); + responseData = responseData.data; + } + return responseData; + } + /** * Executes a get on a path with a certain object * @param path The path to send the request to. diff --git a/client/src/app/core/core-services/openslides.service.ts b/client/src/app/core/core-services/openslides.service.ts index 07d27a469..a35c47ed8 100644 --- a/client/src/app/core/core-services/openslides.service.ts +++ b/client/src/app/core/core-services/openslides.service.ts @@ -130,10 +130,7 @@ export class OpenSlidesService { * Init DS from cache and after this start the websocket service. */ private async setupDataStoreAndWebSocket(): Promise { - let changeId = await this.DS.initFromStorage(); - if (changeId > 0) { - changeId += 1; - } + const changeId = await this.DS.initFromStorage(); // disconnect the WS connection, if there was one. This is needed // to update the connection parameters, namely the cookies. If the user // is changed, the WS needs to reconnect, so the new connection holds the new @@ -141,7 +138,7 @@ export class OpenSlidesService { if (this.websocketService.isConnected) { await this.websocketService.close(); // Wait for the disconnect. } - await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. + await this.websocketService.connect(changeId); // Request changes after changeId. } /** diff --git a/client/src/app/core/core-services/operator.service.ts b/client/src/app/core/core-services/operator.service.ts index 822a5f855..e2f294be4 100644 --- a/client/src/app/core/core-services/operator.service.ts +++ b/client/src/app/core/core-services/operator.service.ts @@ -456,7 +456,8 @@ export class OperatorService implements OnAfterAppsLoaded { * Set the operators presence to isPresent */ public async setPresence(isPresent: boolean): Promise { - await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent); + const r = await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent); + console.log('operator', r); } /** diff --git a/client/src/app/core/core-services/prioritize.service.ts b/client/src/app/core/core-services/prioritize.service.ts index a4bcf744d..e7677ab3b 100644 --- a/client/src/app/core/core-services/prioritize.service.ts +++ b/client/src/app/core/core-services/prioritize.service.ts @@ -40,7 +40,7 @@ export class PrioritizeService { if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) { console.log('Alter prioritization:', opPrioritized); this.openSlidesStatusService.isPrioritizedClient = opPrioritized; - this.websocketService.reconnect({ changeId: this.DS.maxChangeId }); + this.websocketService.reconnect(this.DS.maxChangeId); } } } diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 17b4ad62e..b7208416c 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -55,14 +55,6 @@ export const WEBSOCKET_ERROR_CODES = { WRONG_FORMAT: 102 }; -/* - * Options for (re-)connecting. - */ -interface ConnectOptions { - changeId?: number; - enableAutoupdates?: boolean; -} - /** * Service that handles WebSocket connections. Other services can register themselfs * with {@method getOberservable} for a specific type of messages. The content will be published. @@ -207,7 +199,7 @@ export class WebsocketService { * * Uses NgZone to let all callbacks run in the angular context. */ - public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise { + public async connect(changeId: number | null = null, retry: boolean = false): Promise { const websocketId = Math.random().toString(36).substring(7); this.websocketId = websocketId; @@ -220,17 +212,10 @@ export class WebsocketService { this.shouldBeClosed = false; } - // set defaults - options = Object.assign(options, { - enableAutoupdates: true - }); + const queryParams: QueryParams = {}; - const queryParams: QueryParams = { - autoupdate: options.enableAutoupdates - }; - - if (options.changeId !== undefined) { - queryParams.change_id = options.changeId; + if (changeId !== null) { + queryParams.change_id = changeId; } // Create the websocket @@ -398,7 +383,7 @@ export class WebsocketService { const timeout = Math.floor(Math.random() * 3000 + 2000); this.retryTimeout = setTimeout(() => { this.retryTimeout = null; - this.connect({ enableAutoupdates: true }, true); + this.connect(null, true); }, timeout); } } @@ -438,9 +423,9 @@ export class WebsocketService { * * @param options The options for the new connection */ - public async reconnect(options: ConnectOptions = {}): Promise { + public async reconnect(changeId: number | null = null): Promise { await this.close(); - await this.connect(options); + await this.connect(changeId); } /** diff --git a/openslides/core/apps.py b/openslides/core/apps.py index 3b4711b6e..3481e402a 100644 --- a/openslides/core/apps.py +++ b/openslides/core/apps.py @@ -34,16 +34,10 @@ class CoreAppConfig(AppConfig): ProjectionDefaultViewSet, TagViewSet, ) - from .websocket import ( - NotifyWebsocketClientMessage, - ConstantsWebsocketClientMessage, - GetElementsWebsocketClientMessage, - AutoupdateWebsocketClientMessage, - ListenToProjectors, - PingPong, - ) from ..utils.rest_api import router - from ..utils.websocket import register_client_message + + # Let all client websocket message register + from ..utils import websocket_client_messages # noqa # Collect all config variables before getting the constants. config.collect_config_variables_from_apps() @@ -92,14 +86,6 @@ class CoreAppConfig(AppConfig): self.get_model("Countdown").get_collection_string(), CountdownViewSet ) - # Register client messages - register_client_message(NotifyWebsocketClientMessage()) - register_client_message(ConstantsWebsocketClientMessage()) - register_client_message(GetElementsWebsocketClientMessage()) - register_client_message(AutoupdateWebsocketClientMessage()) - register_client_message(ListenToProjectors()) - register_client_message(PingPong()) - if "runserver" in sys.argv or "changeconfig" in sys.argv: from openslides.utils.startup import run_startup_hooks diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index a43c5261a..3d3a3e308 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -1,3 +1,4 @@ +import json import threading from collections import defaultdict from typing import Any, Dict, Iterable, List, Optional, Tuple, Union @@ -7,9 +8,22 @@ from channels.layers import get_channel_layer from django.db.models import Model from mypy_extensions import TypedDict -from .cache import element_cache, get_element_id +from .cache import ChangeIdTooLowError, element_cache, get_element_id from .projector import get_projector_data -from .utils import get_model_from_collection_string, is_iterable +from .timing import Timing +from .utils import get_model_from_collection_string, is_iterable, split_element_id + + +AutoupdateFormat = TypedDict( + "AutoupdateFormat", + { + "changed": Dict[str, List[Dict[str, Any]]], + "deleted": Dict[str, List[int]], + "from_change_id": int, + "to_change_id": int, + "all_data": bool, + }, +) class AutoupdateElementBase(TypedDict): @@ -66,13 +80,15 @@ class AutoupdateBundle: element["id"] ] = element - def done(self) -> None: + def done(self) -> Optional[int]: """ Finishes the bundle by resolving all missing data and passing it to the history and element cache. + + Returns the change id, if there are autoupdate elements. Otherwise none. """ if not self.autoupdate_elements: - return + return None for collection, elements in self.autoupdate_elements.items(): # Get all ids, that do not have a full_data key @@ -95,7 +111,7 @@ class AutoupdateBundle: save_history(self.element_iterator) # Update cache and send autoupdate using async code. - async_to_sync(self.async_handle_collection_elements)() + return async_to_sync(self.dispatch_autoupdate)() @property def element_iterator(self) -> Iterable[AutoupdateElement]: @@ -120,9 +136,11 @@ class AutoupdateBundle: cache_elements[element_id] = full_data return await element_cache.change_elements(cache_elements) - async def async_handle_collection_elements(self) -> None: + async def dispatch_autoupdate(self) -> int: """ Async helper function to update cache and send autoupdate. + + Return the change_id """ # Update cache change_id = await self.update_cache() @@ -130,21 +148,23 @@ class AutoupdateBundle: # Send autoupdate channel_layer = get_channel_layer() await channel_layer.group_send( - "autoupdate", {"type": "send_data", "change_id": change_id} + "autoupdate", {"type": "msg_new_change_id", "change_id": change_id} ) - projector_data = await get_projector_data() # Send projector + projector_data = await get_projector_data() channel_layer = get_channel_layer() await channel_layer.group_send( "projector", { - "type": "projector_changed", + "type": "msg_projector_data", "data": projector_data, "change_id": change_id, }, ) + return change_id + def inform_changed_data( instances: Union[Iterable[Model], Model], @@ -246,13 +266,67 @@ class AutoupdateBundleMiddleware: thread_id = threading.get_ident() autoupdate_bundle[thread_id] = AutoupdateBundle() + timing = Timing("request") + response = self.get_response(request) + timing() + + # rewrite the response by adding the autoupdate on any success-case (2xx status) bundle: AutoupdateBundle = autoupdate_bundle.pop(thread_id) - bundle.done() + if response.status_code >= 200 and response.status_code < 300: + change_id = bundle.done() + + if change_id is not None: + user_id = request.user.pk or 0 + # Inject the autoupdate in the response. + # The complete response body will be overwritten! + autoupdate = async_to_sync(get_autoupdate_data)( + change_id, change_id, user_id + ) + content = {"autoupdate": autoupdate, "data": response.data} + # Note: autoupdate may be none on skipped ones (which should not happen + # since the user has made the request....) + response.content = json.dumps(content) + + timing(True) return response +async def get_autoupdate_data( + from_change_id: int, to_change_id: int, user_id: int +) -> Optional[AutoupdateFormat]: + try: + changed_elements, deleted_element_ids = await element_cache.get_data_since( + user_id, from_change_id, to_change_id + ) + except ChangeIdTooLowError: + # The change_id is lower the the lowerst change_id in redis. Return all data + changed_elements = await element_cache.get_all_data_list(user_id) + all_data = True + deleted_elements: Dict[str, List[int]] = {} + else: + all_data = False + deleted_elements = defaultdict(list) + for element_id in deleted_element_ids: + collection_string, id = split_element_id(element_id) + deleted_elements[collection_string].append(id) + + # Check, if the autoupdate has any data. + if not changed_elements and not deleted_element_ids: + # Skip empty updates + return None + else: + # Normal autoupdate with data + return AutoupdateFormat( + changed=changed_elements, + deleted=deleted_elements, + from_change_id=from_change_id, + to_change_id=to_change_id, + all_data=all_data, + ) + + def save_history(element_iterator: Iterable[AutoupdateElement]) -> Iterable: """ Thin wrapper around the call of history saving manager method. diff --git a/openslides/utils/consumer_autoupdate_strategy.py b/openslides/utils/consumer_autoupdate_strategy.py new file mode 100644 index 000000000..139e3ecd7 --- /dev/null +++ b/openslides/utils/consumer_autoupdate_strategy.py @@ -0,0 +1,99 @@ +import asyncio +from asyncio import Task +from typing import Optional, cast + +from django.conf import settings + +from .autoupdate import get_autoupdate_data +from .cache import element_cache +from .websocket import ChangeIdTooHighException, ProtocollAsyncJsonWebsocketConsumer + + +AUTOUPDATE_DELAY = getattr(settings, "AUTOUPDATE_DELAY", None) + + +class ConsumerAutoupdateStrategy: + def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None: + self.consumer = consumer + # client_change_id = None: unknown -> set on first autoupdate or request_change_id + # client_change_id is int: the one + self.client_change_id: Optional[int] = None + self.max_seen_change_id = 0 + self.next_send_time = None + self.timer_task_handle: Optional[Task[None]] = None + self.lock = asyncio.Lock() + + async def request_change_id( + self, change_id: int, in_response: Optional[str] = None + ) -> None: + # This resets the server side tracking of the client's change id. + async with self.lock: + await self.stop_timer() + + self.max_seen_change_id = await element_cache.get_current_change_id() + self.client_change_id = change_id + + if self.client_change_id == self.max_seen_change_id: + # The client is up-to-date, so nothing will be done + return None + + if self.client_change_id > self.max_seen_change_id: + message = ( + f"Requested change_id {self.client_change_id} is higher than the " + + f"highest change_id {self.max_seen_change_id}." + ) + raise ChangeIdTooHighException(message, in_response=in_response) + + await self.send_autoupdate(in_response=in_response) + + async def new_change_id(self, change_id: int) -> None: + async with self.lock: + if self.client_change_id is None: + self.client_change_id = change_id + if change_id > self.max_seen_change_id: + self.max_seen_change_id = change_id + + if AUTOUPDATE_DELAY is None: # feature deactivated, send directly + await self.send_autoupdate() + elif self.timer_task_handle is None: + await self.start_timer() + + async def get_running_loop(self) -> asyncio.AbstractEventLoop: + if hasattr(asyncio, "get_running_loop"): + return asyncio.get_running_loop() # type: ignore + else: + return asyncio.get_event_loop() + + async def start_timer(self) -> None: + loop = await self.get_running_loop() + self.timer_task_handle = loop.create_task(self.timer_task()) + + async def stop_timer(self) -> None: + if self.timer_task_handle is not None: + self.timer_task_handle.cancel() + self.timer_task_handle = None + + async def timer_task(self) -> None: + try: + await asyncio.sleep(AUTOUPDATE_DELAY) + except asyncio.CancelledError: + return + + async with self.lock: + await self.send_autoupdate() + self.timer_task_handle = None + + async def send_autoupdate(self, in_response: Optional[str] = None) -> None: + max_change_id = ( + self.max_seen_change_id + ) # important to save this variable, because + # it can change during runtime. + autoupdate = await get_autoupdate_data( + cast(int, self.client_change_id), max_change_id, self.consumer.user_id + ) + if autoupdate is not None: + # It will be send, so we can set the client_change_id + self.client_change_id = max_change_id + await self.consumer.send_json( + type="autoupdate", content=autoupdate, in_response=in_response, + ) diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 1073f831d..7432423f9 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -1,32 +1,19 @@ import time -from collections import defaultdict from typing import Any, Dict, List, Optional, cast from urllib.parse import parse_qs from channels.generic.websocket import AsyncWebsocketConsumer -from mypy_extensions import TypedDict -from ..utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH from . import logging from .auth import UserDoesNotExist, async_anonymous_is_enabled -from .cache import ChangeIdTooLowError, element_cache, split_element_id +from .cache import element_cache +from .consumer_autoupdate_strategy import ConsumerAutoupdateStrategy from .utils import get_worker_id -from .websocket import ProtocollAsyncJsonWebsocketConsumer +from .websocket import BaseWebsocketException, ProtocollAsyncJsonWebsocketConsumer logger = logging.getLogger("openslides.websocket") -AutoupdateFormat = TypedDict( - "AutoupdateFormat", - { - "changed": Dict[str, List[Dict[str, Any]]], - "deleted": Dict[str, List[int]], - "from_change_id": int, - "to_change_id": int, - "all_data": bool, - }, -) - class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ @@ -40,12 +27,11 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): ID counter for assigning each instance of this class an unique id. """ - skipped_autoupdate_from_change_id: Optional[int] = None - def __init__(self, *args: Any, **kwargs: Any) -> None: self.projector_hash: Dict[int, int] = {} SiteConsumer.ID_COUNTER += 1 self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER) + self.autoupdate_strategy = ConsumerAutoupdateStrategy(self) super().__init__(*args, **kwargs) async def connect(self) -> None: @@ -56,11 +42,13 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): Sends the startup data to the user. """ + self.user_id = self.scope["user"]["id"] + self.connect_time = time.time() # self.scope['user'] is the full_data dict of the user. For an # anonymous user is it the dict {'id': 0} change_id = None - if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]: + if not await async_anonymous_is_enabled() and not self.user_id: await self.accept() # workaround for #4009 await self.close() logger.debug(f"connect: denied ({self._id})") @@ -74,24 +62,23 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): change_id = int(query_string[b"change_id"][0]) except ValueError: await self.accept() # workaround for #4009 - await self.close() # TODO: Find a way to send an error code + await self.close() logger.debug(f"connect: wrong change id ({self._id})") return - if b"autoupdate" in query_string and query_string[b"autoupdate"][ - 0 - ].lower() not in [b"0", b"off", b"false"]: - # a positive value in autoupdate. Start autoupdate - await self.channel_layer.group_add("autoupdate", self.channel_name) - await self.accept() if change_id is not None: logger.debug(f"connect: change id {change_id} ({self._id})") - await self.send_autoupdate(change_id) + try: + await self.request_autoupdate(change_id) + except BaseWebsocketException as e: + await self.send_exception(e) else: logger.debug(f"connect: no change id ({self._id})") + await self.channel_layer.group_add("autoupdate", self.channel_name) + async def disconnect(self, close_code: int) -> None: """ A user disconnects. Remove it from autoupdate. @@ -102,110 +89,19 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): f"disconnect code={close_code} active_secs={active_seconds} ({self._id})" ) - async def send_notify(self, event: Dict[str, Any]) -> None: - """ - Send a notify message to the user. - """ - user_id = self.scope["user"]["id"] - item = event["incomming"] - - users = item.get("users") - reply_channels = item.get("replyChannels") - if ( - (isinstance(users, bool) and users) - or (isinstance(users, list) and user_id in users) - or ( - isinstance(reply_channels, list) and self.channel_name in reply_channels - ) - or (users is None and reply_channels is None) - ): - item["senderChannelName"] = event["senderChannelName"] - item["senderUserId"] = event["senderUserId"] - await self.send_json(type="notify", content=item) - - async def send_autoupdate( - self, - change_id: int, - max_change_id: Optional[int] = None, - in_response: Optional[str] = None, - ) -> None: - """ - Sends an autoupdate to the client from change_id to max_change_id. - If max_change_id is None, the current change id will be used. - """ - user_id = self.scope["user"]["id"] - - if max_change_id is None: - max_change_id = await element_cache.get_current_change_id() - - if change_id == max_change_id + 1: - # The client is up-to-date, so nothing will be done - return - - if change_id > max_change_id: - message = f"Requested change_id {change_id} is higher this highest change_id {max_change_id}." - await self.send_error( - code=WEBSOCKET_CHANGE_ID_TOO_HIGH, - message=message, - in_response=in_response, - ) - return - - try: - changed_elements, deleted_element_ids = await element_cache.get_data_since( - user_id, change_id, max_change_id - ) - except ChangeIdTooLowError: - # The change_id is lower the the lowerst change_id in redis. Return all data - changed_elements = await element_cache.get_all_data_list(user_id) - all_data = True - deleted_elements: Dict[str, List[int]] = {} - except UserDoesNotExist: - # Maybe the user was deleted, but a websocket connection is still open to the user. - # So we can close this connection and return. - await self.close() - return - else: - all_data = False - deleted_elements = defaultdict(list) - for element_id in deleted_element_ids: - collection_string, id = split_element_id(element_id) - deleted_elements[collection_string].append(id) - - # Check, if the autoupdate has any data. - if not changed_elements and not deleted_element_ids: - # Set the current from_change_id, if it is the first skipped autoupdate - if not self.skipped_autoupdate_from_change_id: - self.skipped_autoupdate_from_change_id = change_id - else: - # Normal autoupdate with data - from_change_id = change_id - - # If there is at least one skipped autoupdate, take the saved from_change_id - if self.skipped_autoupdate_from_change_id: - from_change_id = self.skipped_autoupdate_from_change_id - self.skipped_autoupdate_from_change_id = None - - await self.send_json( - type="autoupdate", - content=AutoupdateFormat( - changed=changed_elements, - deleted=deleted_elements, - from_change_id=from_change_id, - to_change_id=max_change_id, - all_data=all_data, - ), - in_response=in_response, - ) - - async def send_data(self, event: Dict[str, Any]) -> None: + async def msg_new_change_id(self, event: Dict[str, Any]) -> None: """ Send changed or deleted elements to the user. """ change_id = event["change_id"] - await self.send_autoupdate(change_id, max_change_id=change_id) + try: + await self.autoupdate_strategy.new_change_id(change_id) + except UserDoesNotExist: + # Maybe the user was deleted, but a websocket connection is still open to the user. + # So we can close this connection and return. + await self.close() - async def projector_changed(self, event: Dict[str, Any]) -> None: + async def msg_projector_data(self, event: Dict[str, Any]) -> None: """ The projector has changed. """ @@ -223,6 +119,33 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): if projector_data: await self.send_projector_data(projector_data, change_id=change_id) + async def msg_notify(self, event: Dict[str, Any]) -> None: + """ + Send a notify message to the user. + """ + item = event["incomming"] + + users = item.get("users") + reply_channels = item.get("replyChannels") + if ( + (isinstance(users, bool) and users) + or (isinstance(users, list) and self.user_id in users) + or ( + isinstance(reply_channels, list) and self.channel_name in reply_channels + ) + or (users is None and reply_channels is None) + ): + item["senderChannelName"] = event["senderChannelName"] + item["senderUserId"] = event["senderUserId"] + await self.send_json(type="notify", content=item) + + async def request_autoupdate( + self, change_id: int, in_response: Optional[str] = None + ) -> None: + await self.autoupdate_strategy.request_change_id( + change_id, in_response=in_response + ) + async def send_projector_data( self, data: Dict[int, Dict[str, Any]], diff --git a/openslides/utils/projector.py b/openslides/utils/projector.py index 7b66e87c6..3a8f1cc00 100644 --- a/openslides/utils/projector.py +++ b/openslides/utils/projector.py @@ -35,9 +35,11 @@ class ProjectorAllDataProvider: if data is None: data = ProjectorAllDataProvider.NON_EXISTENT_MARKER self.cache[collection][id] = data - elif cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER: + + cache_data = self.cache[collection][id] + if cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER: return None - return self.cache[collection][id] + return cache_data async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: if not self.fetched_collection.get(collection, False): diff --git a/openslides/utils/timing.py b/openslides/utils/timing.py new file mode 100644 index 000000000..ebcc2f35c --- /dev/null +++ b/openslides/utils/timing.py @@ -0,0 +1,27 @@ +import time +from typing import List, Optional + +from . import logging + + +timelogger = logging.getLogger(__name__) + + +class Timing: + def __init__(self, name: str) -> None: + self.name = name + self.times: List[float] = [time.time()] + + def __call__(self, done: Optional[bool] = False) -> None: + self.times.append(time.time()) + if done: + self.printtime() + + def printtime(self) -> None: + s = f"{self.name}: " + for i in range(1, len(self.times)): + diff = self.times[i] - self.times[i - 1] + s += f"{i}: {diff:.5f} " + diff = self.times[-1] - self.times[0] + s += f"sum: {diff:.5f}" + timelogger.info(s) diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index b001b2dc9..42e5b802f 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -25,6 +25,26 @@ WEBSOCKET_WRONG_FORMAT = 102 # If the recieved data has not the expected format. +class BaseWebsocketException(Exception): + code: int + + def __init__(self, message: str, in_response: Optional[str] = None) -> None: + self.message = message + self.in_response = in_response + + +class NotAuthorizedException(BaseWebsocketException): + code = WEBSOCKET_NOT_AUTHORIZED + + +class ChangeIdTooHighException(BaseWebsocketException): + code = WEBSOCKET_CHANGE_ID_TOO_HIGH + + +class WrongFormatException(BaseWebsocketException): + code = WEBSOCKET_WRONG_FORMAT + + class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): async def receive( self, @@ -122,6 +142,20 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): silence_errors=silence_errors, ) + async def send_exception( + self, e: BaseWebsocketException, silence_errors: Optional[bool] = True, + ) -> None: + """ + Send generic error messages with a custom status code (see above) and a text message. + """ + await self.send_json( + "error", + {"code": e.code, "message": e.message}, + None, + in_response=e.in_response, + silence_errors=silence_errors, + ) + async def receive_json(self, content: Any) -> None: # type: ignore """ Receives the json data, parses it and calls receive_content. @@ -140,9 +174,12 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): ) return - await websocket_client_messages[content["type"]].receive_content( - self, content["content"], id=content["id"] - ) + try: + await websocket_client_messages[content["type"]].receive_content( + self, content["content"], id=content["id"] + ) + except BaseWebsocketException as e: + await self.send_exception(e) schema: Dict[str, Any] = { diff --git a/openslides/core/websocket.py b/openslides/utils/websocket_client_messages.py similarity index 79% rename from openslides/core/websocket.py rename to openslides/utils/websocket_client_messages.py index a43447305..d66eb1f58 100644 --- a/openslides/core/websocket.py +++ b/openslides/utils/websocket_client_messages.py @@ -1,21 +1,22 @@ from typing import Any, Dict, Optional -from ..utils import logging -from ..utils.auth import async_has_perm -from ..utils.constants import get_constants -from ..utils.projector import get_projector_data -from ..utils.stats import WebsocketLatencyLogger -from ..utils.websocket import ( - WEBSOCKET_NOT_AUTHORIZED, +from . import logging +from .auth import async_has_perm +from .constants import get_constants +from .projector import get_projector_data +from .stats import WebsocketLatencyLogger +from .websocket import ( BaseWebsocketClientMessage, + NotAuthorizedException, ProtocollAsyncJsonWebsocketConsumer, + register_client_message, ) logger = logging.getLogger(__name__) -class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): +class Notify(BaseWebsocketClientMessage): """ Websocket message from a client to send a message to other clients. """ @@ -59,13 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): ) -> None: # Check if the user is allowed to send this notify message perm = self.notify_permissions.get(content["name"]) - if perm is not None and not await async_has_perm( - consumer.scope["user"]["id"], perm - ): - await consumer.send_error( - code=WEBSOCKET_NOT_AUTHORIZED, - message=f"You need '{perm}' to send this message.", - in_response=id, + if perm is not None and not await async_has_perm(consumer.user_id, perm): + raise NotAuthorizedException( + f"You need '{perm}' to send this message.", in_response=id, ) else: # Some logging @@ -84,15 +81,18 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): await consumer.channel_layer.group_send( "site", { - "type": "send_notify", + "type": "msg_notify", "incomming": content, "senderChannelName": consumer.channel_name, - "senderUserId": consumer.scope["user"]["id"], + "senderUserId": consumer.user_id, }, ) -class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): +register_client_message(Notify()) + + +class Constants(BaseWebsocketClientMessage): """ The Client requests the constants. """ @@ -109,7 +109,10 @@ class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): ) -class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): +register_client_message(Constants()) + + +class GetElements(BaseWebsocketClientMessage): """ The Client request database elements. """ @@ -130,26 +133,10 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str ) -> None: requested_change_id = content.get("change_id", 0) - await consumer.send_autoupdate(requested_change_id, in_response=id) + await consumer.request_autoupdate(requested_change_id, in_response=id) -class AutoupdateWebsocketClientMessage(BaseWebsocketClientMessage): - """ - The Client turns autoupdate on or off. - """ - - identifier = "autoupdate" - - async def receive_content( - self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str - ) -> None: - # Turn on or off the autoupdate for the client - if content: # accept any value, that can be interpreted as bool - await consumer.channel_layer.group_add("autoupdate", consumer.channel_name) - else: - await consumer.channel_layer.group_discard( - "autoupdate", consumer.channel_name - ) +register_client_message(GetElements()) class ListenToProjectors(BaseWebsocketClientMessage): @@ -198,6 +185,9 @@ class ListenToProjectors(BaseWebsocketClientMessage): await consumer.send_projector_data(projector_data, in_response=id) +register_client_message(ListenToProjectors()) + + class PingPong(BaseWebsocketClientMessage): """ Responds to pings from the client. @@ -220,3 +210,6 @@ class PingPong(BaseWebsocketClientMessage): await consumer.send_json(type="pong", content=latency, in_response=id) if latency is not None: await WebsocketLatencyLogger.add_latency(latency) + + +register_client_message(PingPong()) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index f83011d6e..2791e70be 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,9 +1,13 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, cast from openslides.core.config import config from openslides.core.models import Projector from openslides.users.models import User -from openslides.utils.projector import AllData, get_config, register_projector_slide +from openslides.utils.projector import ( + ProjectorAllDataProvider, + get_config, + register_projector_slide, +) class TConfig: @@ -94,19 +98,23 @@ class TProjector: async def slide1( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Slide that shows the general_event_name. """ return { "name": "slide1", - "event_name": await get_config(all_data, "general_event_name"), + "event_name": await get_config(all_data_provider, "general_event_name"), } async def slide2( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: return {"name": "slide2"} @@ -115,17 +123,26 @@ register_projector_slide("test/slide1", slide1) register_projector_slide("test/slide2", slide2) -def all_data_config() -> AllData: - return { - TConfig().get_collection_string(): { - element["id"]: element for element in TConfig().get_elements() - } - } +class TestProjectorAllDataProvider: + def __init__(self, data): + self.data = data + + async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]: + collection_data = await self.get_collection(collection) + return collection_data.get(id) + + async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: + return self.data.get(collection, {}) + + async def exists(self, collection: str, id: int) -> bool: + return (await self.get(collection, id)) is not None -def all_data_users() -> AllData: - return { - TUser().get_collection_string(): { - element["id"]: element for element in TUser().get_elements() - } +def get_all_data_provider(data) -> ProjectorAllDataProvider: + data[TConfig().get_collection_string()] = { + element["id"]: element for element in TConfig().get_elements() } + data[TUser().get_collection_string()] = { + element["id"]: element for element in TUser().get_elements() + } + return cast(ProjectorAllDataProvider, TestProjectorAllDataProvider(data)) diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 80bf93729..4f664ff2a 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -1,4 +1,3 @@ -import asyncio from importlib import import_module from typing import Optional, Tuple from unittest.mock import patch @@ -158,17 +157,7 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config): @pytest.mark.asyncio -async def test_changed_data_autoupdate_off(communicator, set_config): - await set_config("general_system_enable_anonymous", True) - await communicator.connect() - - # Change a config value - await set_config("general_event_name", "Test Event") - assert await communicator.receive_nothing() - - -@pytest.mark.asyncio -async def test_changed_data_autoupdate_on(get_communicator, set_config): +async def test_changed_data_autoupdate(get_communicator, set_config): await set_config("general_system_enable_anonymous", True) communicator = get_communicator("autoupdate=on") await communicator.connect() @@ -422,12 +411,12 @@ async def test_send_connect_up_to_date(communicator, set_config): {"type": "getElements", "content": {"change_id": 0}, "id": "test_id"} ) response1 = await communicator.receive_json_from() - first_change_id = response1.get("content")["to_change_id"] + max_change_id = response1.get("content")["to_change_id"] await communicator.send_json_to( { "type": "getElements", - "content": {"change_id": first_change_id + 1}, + "content": {"change_id": max_change_id}, "id": "test_id", } ) @@ -510,43 +499,6 @@ async def test_send_invalid_get_elements(communicator, set_config): assert response.get("in_response") == "test_id" -@pytest.mark.asyncio -async def test_turn_on_autoupdate(communicator, set_config): - await set_config("general_system_enable_anonymous", True) - await communicator.connect() - - await communicator.send_json_to( - {"type": "autoupdate", "content": "on", "id": "test_id"} - ) - await asyncio.sleep(0.01) - # Change a config value - await set_config("general_event_name", "Test Event") - response = await communicator.receive_json_from() - - id = config.get_key_to_id()["general_event_name"] - type = response.get("type") - content = response.get("content") - assert type == "autoupdate" - assert content["changed"] == { - "core/config": [{"id": id, "key": "general_event_name", "value": "Test Event"}] - } - - -@pytest.mark.asyncio -async def test_turn_off_autoupdate(get_communicator, set_config): - await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") - await communicator.connect() - - await communicator.send_json_to( - {"type": "autoupdate", "content": False, "id": "test_id"} - ) - await asyncio.sleep(0.01) - # Change a config value - await set_config("general_event_name", "Test Event") - assert await communicator.receive_nothing() - - @pytest.mark.asyncio async def test_listen_to_projector(communicator, set_config): await set_config("general_system_enable_anonymous", True) @@ -588,10 +540,15 @@ async def test_update_projector(communicator, set_config): "id": "test_id", } ) - await communicator.receive_json_from() + await communicator.receive_json_from() # recieve initial projector data # Change a config value await set_config("general_event_name", "Test Event") + + # We need two messages: The autoupdate and the projector data in this order + response = await communicator.receive_json_from() + assert response.get("type") == "autoupdate" + response = await communicator.receive_json_from() type = response.get("type") @@ -629,4 +586,8 @@ async def test_update_projector_to_current_value(communicator, set_config): # Change a config value to current_value await set_config("general_event_name", "OpenSlides") + # We await an autoupdate, bot no projector data + response = await communicator.receive_json_from() + assert response.get("type") == "autoupdate" + assert await communicator.receive_nothing() diff --git a/tests/unit/agenda/test_projector.py b/tests/unit/agenda/test_projector.py index 0f86a54ca..442be4079 100644 --- a/tests/unit/agenda/test_projector.py +++ b/tests/unit/agenda/test_projector.py @@ -4,10 +4,12 @@ import pytest from openslides.agenda import projector +from ...integration.helpers import get_all_data_provider + @pytest.fixture -def all_data(): - all_data = { +def all_data_provider(): + data = { "agenda/item": { 1: { "id": 1, @@ -82,14 +84,14 @@ def all_data(): } } - return all_data + return get_all_data_provider(data) @pytest.mark.asyncio -async def test_main_items(all_data): +async def test_main_items(all_data_provider): element: Dict[str, Any] = {} - data = await projector.item_list_slide(all_data, element, 1) + data = await projector.item_list_slide(all_data_provider, element, 1) assert data == { "items": [ @@ -106,10 +108,10 @@ async def test_main_items(all_data): @pytest.mark.asyncio -async def test_all_items(all_data): +async def test_all_items(all_data_provider): element: Dict[str, Any] = {"only_main_items": False} - data = await projector.item_list_slide(all_data, element, 1) + data = await projector.item_list_slide(all_data_provider, element, 1) assert data == { "items": [ diff --git a/tests/unit/motions/test_projector.py b/tests/unit/motions/test_projector.py index c829070d0..257d60150 100644 --- a/tests/unit/motions/test_projector.py +++ b/tests/unit/motions/test_projector.py @@ -4,14 +4,13 @@ import pytest from openslides.motions import projector -from ...integration.helpers import all_data_config, all_data_users +from ...integration.helpers import get_all_data_provider @pytest.fixture -def all_data(): - return_value = all_data_config() - return_value.update(all_data_users()) - return_value["motions/motion"] = { +def all_data_provider(): + data = {} + data["motions/motion"] = { 1: { "id": 1, "identifier": "4", @@ -143,7 +142,7 @@ def all_data(): "change_recommendations": [], }, } - return_value["motions/workflow"] = { + data["motions/workflow"] = { 1: { "id": 1, "name": "Simple Workflow", @@ -151,7 +150,7 @@ def all_data(): "first_state_id": 1, } } - return_value["motions/state"] = { + data["motions/state"] = { 1: { "id": 1, "name": "submitted", @@ -217,7 +216,7 @@ def all_data(): "workflow_id": 1, }, } - return_value["motions/statute-paragraph"] = { + data["motions/statute-paragraph"] = { 1: { "id": 1, "title": "§1 Preamble", @@ -225,7 +224,7 @@ def all_data(): "weight": 10000, } } - return_value["motions/motion-change-recommendation"] = { + data["motions/motion-change-recommendation"] = { 1: { "id": 1, "motion_id": 1, @@ -251,14 +250,14 @@ def all_data(): "creation_time": "2019-02-09T09:54:06.256378+01:00", }, } - return return_value + return get_all_data_provider(data) @pytest.mark.asyncio -async def test_motion_slide(all_data): +async def test_motion_slide(all_data_provider): element: Dict[str, Any] = {"id": 1} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": "4", @@ -304,10 +303,10 @@ async def test_motion_slide(all_data): @pytest.mark.asyncio -async def test_amendment_slide(all_data): +async def test_amendment_slide(all_data_provider): element: Dict[str, Any] = {"id": 2} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": "Ä1", @@ -331,10 +330,10 @@ async def test_amendment_slide(all_data): @pytest.mark.asyncio -async def test_statute_amendment_slide(all_data): +async def test_statute_amendment_slide(all_data_provider): element: Dict[str, Any] = {"id": 3} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": None,