diff --git a/SETTINGS.rst b/SETTINGS.rst index 64163c001..dc42430f0 100644 --- a/SETTINGS.rst +++ b/SETTINGS.rst @@ -143,3 +143,7 @@ not affect the client. operator is in one of these groups, the client disconnected and reconnects again. All requests urls (including websockets) are now prefixed with `/prioritize`, so these requests from "prioritized clients" can be routed to different servers. + +`AUTOUPDATE_DELAY`: The delay to send autoupdates. This feature can be +deactivated by setting it to `None`. It is deactivated per default. The Delay is +given in seconds diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index ceaeda77a..a286bc3e9 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -3,9 +3,10 @@ import { Injectable } from '@angular/core'; import { BaseModel } from '../../shared/models/base/base-model'; import { CollectionStringMapperService } from './collection-string-mapper.service'; import { DataStoreService, DataStoreUpdateManagerService } from './data-store.service'; +import { Mutex } from '../promises/mutex'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; -interface AutoupdateFormat { +export interface AutoupdateFormat { /** * All changed (and created) items as their full/restricted data grouped by their collection. */ @@ -36,6 +37,19 @@ interface AutoupdateFormat { all_data: boolean; } +export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat { + const format = obj as AutoupdateFormat; + return ( + obj && + typeof obj === 'object' && + format.changed !== undefined && + format.deleted !== undefined && + format.from_change_id !== undefined && + format.to_change_id !== undefined && + format.all_data !== undefined + ); +} + /** * Handles the initial update and automatic updates using the {@link WebsocketService} * Incoming objects, usually BaseModels, will be saved in the dataStore (`this.DS`) @@ -45,6 +59,8 @@ interface AutoupdateFormat { providedIn: 'root' }) export class AutoupdateService { + private mutex = new Mutex(); + /** * Constructor to create the AutoupdateService. Calls the constructor of the parent class. * @param websocketService @@ -79,15 +95,17 @@ export class AutoupdateService { * Handles the change ids of all autoupdates. */ private async storeResponse(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); if (autoupdate.all_data) { await this.storeAllData(autoupdate); } else { await this.storePartialAutoupdate(autoupdate); } + unlock(); } /** - * Stores all data from the autoupdate. This means, that the DS is resettet and filled with just the + * Stores all data from the autoupdate. This means, that the DS is resetted and filled with just the * given data from the autoupdate. * @param autoupdate The autoupdate */ @@ -116,27 +134,41 @@ export class AutoupdateService { // Normal autoupdate if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) { - const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); - - // Delete the removed objects from the DataStore - for (const collection of Object.keys(autoupdate.deleted)) { - await this.DS.remove(collection, autoupdate.deleted[collection]); - } - - // Add the objects to the DataStore. - for (const collection of Object.keys(autoupdate.changed)) { - await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection])); - } - - await this.DS.flushToStorage(autoupdate.to_change_id); - - this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id); + await this.injectAutupdateIntoDS(autoupdate, true); } else { // autoupdate fully in the future. we are missing something! + console.log('Autoupdate in the future', maxChangeId, autoupdate.from_change_id, autoupdate.to_change_id); this.requestChanges(); } } + public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); + console.debug('inject autoupdate', autoupdate); + await this.injectAutupdateIntoDS(autoupdate, false); + unlock(); + } + + private async injectAutupdateIntoDS(autoupdate: AutoupdateFormat, flush: boolean): Promise { + const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); + + // Delete the removed objects from the DataStore + for (const collection of Object.keys(autoupdate.deleted)) { + await this.DS.remove(collection, autoupdate.deleted[collection]); + } + + // Add the objects to the DataStore. + for (const collection of Object.keys(autoupdate.changed)) { + await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection])); + } + + if (flush) { + await this.DS.flushToStorage(autoupdate.to_change_id); + } + + this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id); + } + /** * Creates baseModels for each plain object. If the collection is not registered, * A console error will be issued and an empty list returned. @@ -160,9 +192,8 @@ export class AutoupdateService { * The server should return an autoupdate with all new data. */ public requestChanges(): void { - const changeId = this.DS.maxChangeId === 0 ? 0 : this.DS.maxChangeId + 1; - console.log(`requesting changed objects with DS max change id ${changeId}`); - this.websocketService.send('getElements', { change_id: changeId }); + console.log(`requesting changed objects with DS max change id ${this.DS.maxChangeId}`); + this.websocketService.send('getElements', { change_id: this.DS.maxChangeId }); } /** diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index a9ff9fc7e..2fa19e4c7 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { + console.log('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } @@ -665,4 +666,11 @@ export class DataStoreService { await this.storageService.set(DataStoreService.cachePrefix + 'DS', this.jsonStore); await this.storageService.set(DataStoreService.cachePrefix + 'maxChangeId', changeId); } + + public print(): void { + console.log('Max change id', this.maxChangeId); + console.log('json storage'); + console.log(JSON.stringify(this.jsonStore)); + console.log(this.modelStore); + } } diff --git a/client/src/app/core/core-services/http.service.ts b/client/src/app/core/core-services/http.service.ts index 7dfffc0ae..fbf1f01fc 100644 --- a/client/src/app/core/core-services/http.service.ts +++ b/client/src/app/core/core-services/http.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@angular/core'; import { TranslateService } from '@ngx-translate/core'; +import { AutoupdateFormat, AutoupdateService, isAutoupdateFormat } from './autoupdate.service'; import { OpenSlidesStatusService } from './openslides-status.service'; import { formatQueryParams, QueryParams } from '../definitions/query-params'; @@ -17,12 +18,12 @@ export enum HTTPMethod { DELETE = 'delete' } -export interface DetailResponse { +export interface ErrorDetailResponse { detail: string | string[]; args?: string[]; } -function isDetailResponse(obj: any): obj is DetailResponse { +function isErrorDetailResponse(obj: any): obj is ErrorDetailResponse { return ( obj && typeof obj === 'object' && @@ -31,6 +32,15 @@ function isDetailResponse(obj: any): obj is DetailResponse { ); } +interface AutoupdateResponse { + autoupdate: AutoupdateFormat; + data?: any; +} + +function isAutoupdateReponse(obj: any): obj is AutoupdateResponse { + return obj && typeof obj === 'object' && isAutoupdateFormat((obj as AutoupdateResponse).autoupdate); +} + /** * Service for managing HTTP requests. Allows to send data for every method. Also (TODO) will do generic error handling. */ @@ -55,7 +65,8 @@ export class HttpService { public constructor( private http: HttpClient, private translate: TranslateService, - private OSStatus: OpenSlidesStatusService + private OSStatus: OpenSlidesStatusService, + private autoupdateService: AutoupdateService ) { this.defaultHeaders = new HttpHeaders().set('Content-Type', 'application/json'); } @@ -82,7 +93,7 @@ export class HttpService { ): Promise { // end early, if we are in history mode if (this.OSStatus.isInHistoryMode && method !== HTTPMethod.GET) { - throw this.handleError('You cannot make changes while in history mode'); + throw this.processError('You cannot make changes while in history mode'); } // there is a current bug with the responseType. @@ -108,9 +119,10 @@ export class HttpService { }; try { - return await this.http.request(method, url, options).toPromise(); - } catch (e) { - throw this.handleError(e); + const responseData: T = await this.http.request(method, url, options).toPromise(); + return this.processResponse(responseData); + } catch (error) { + throw this.processError(error); } } @@ -120,7 +132,7 @@ export class HttpService { * @param e The error thrown. * @returns The prepared and translated message for the user */ - private handleError(e: any): string { + private processError(e: any): string { let error = this.translate.instant('Error') + ': '; // If the error is a string already, return it. if (typeof e === 'string') { @@ -142,12 +154,14 @@ export class HttpService { } else if (!e.error) { error += this.translate.instant("The server didn't respond."); } else if (typeof e.error === 'object') { - if (isDetailResponse(e.error)) { - error += this.processDetailResponse(e.error); + if (isErrorDetailResponse(e.error)) { + error += this.processErrorDetailResponse(e.error); } else { const errorList = Object.keys(e.error).map(key => { const capitalizedKey = key.charAt(0).toUpperCase() + key.slice(1); - return `${this.translate.instant(capitalizedKey)}: ${this.processDetailResponse(e.error[key])}`; + return `${this.translate.instant(capitalizedKey)}: ${this.processErrorDetailResponse( + e.error[key] + )}`; }); error = errorList.join(', '); } @@ -168,11 +182,9 @@ export class HttpService { * @param str a string or a string array to join together. * @returns Error text(s) as single string */ - private processDetailResponse(response: DetailResponse): string { + private processErrorDetailResponse(response: ErrorDetailResponse): string { let message: string; - if (response instanceof Array) { - message = response.join(' '); - } else if (response.detail instanceof Array) { + if (response.detail instanceof Array) { message = response.detail.join(' '); } else { message = response.detail; @@ -187,6 +199,14 @@ export class HttpService { return message; } + private processResponse(responseData: T): T { + if (isAutoupdateReponse(responseData)) { + this.autoupdateService.injectAutoupdateIgnoreChangeId(responseData.autoupdate); + responseData = responseData.data; + } + return responseData; + } + /** * Executes a get on a path with a certain object * @param path The path to send the request to. diff --git a/client/src/app/core/core-services/openslides.service.ts b/client/src/app/core/core-services/openslides.service.ts index 07d27a469..a35c47ed8 100644 --- a/client/src/app/core/core-services/openslides.service.ts +++ b/client/src/app/core/core-services/openslides.service.ts @@ -130,10 +130,7 @@ export class OpenSlidesService { * Init DS from cache and after this start the websocket service. */ private async setupDataStoreAndWebSocket(): Promise { - let changeId = await this.DS.initFromStorage(); - if (changeId > 0) { - changeId += 1; - } + const changeId = await this.DS.initFromStorage(); // disconnect the WS connection, if there was one. This is needed // to update the connection parameters, namely the cookies. If the user // is changed, the WS needs to reconnect, so the new connection holds the new @@ -141,7 +138,7 @@ export class OpenSlidesService { if (this.websocketService.isConnected) { await this.websocketService.close(); // Wait for the disconnect. } - await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. + await this.websocketService.connect(changeId); // Request changes after changeId. } /** diff --git a/client/src/app/core/core-services/prioritize.service.ts b/client/src/app/core/core-services/prioritize.service.ts index a4bcf744d..e7677ab3b 100644 --- a/client/src/app/core/core-services/prioritize.service.ts +++ b/client/src/app/core/core-services/prioritize.service.ts @@ -40,7 +40,7 @@ export class PrioritizeService { if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) { console.log('Alter prioritization:', opPrioritized); this.openSlidesStatusService.isPrioritizedClient = opPrioritized; - this.websocketService.reconnect({ changeId: this.DS.maxChangeId }); + this.websocketService.reconnect(this.DS.maxChangeId); } } } diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 17b4ad62e..b7208416c 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -55,14 +55,6 @@ export const WEBSOCKET_ERROR_CODES = { WRONG_FORMAT: 102 }; -/* - * Options for (re-)connecting. - */ -interface ConnectOptions { - changeId?: number; - enableAutoupdates?: boolean; -} - /** * Service that handles WebSocket connections. Other services can register themselfs * with {@method getOberservable} for a specific type of messages. The content will be published. @@ -207,7 +199,7 @@ export class WebsocketService { * * Uses NgZone to let all callbacks run in the angular context. */ - public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise { + public async connect(changeId: number | null = null, retry: boolean = false): Promise { const websocketId = Math.random().toString(36).substring(7); this.websocketId = websocketId; @@ -220,17 +212,10 @@ export class WebsocketService { this.shouldBeClosed = false; } - // set defaults - options = Object.assign(options, { - enableAutoupdates: true - }); + const queryParams: QueryParams = {}; - const queryParams: QueryParams = { - autoupdate: options.enableAutoupdates - }; - - if (options.changeId !== undefined) { - queryParams.change_id = options.changeId; + if (changeId !== null) { + queryParams.change_id = changeId; } // Create the websocket @@ -398,7 +383,7 @@ export class WebsocketService { const timeout = Math.floor(Math.random() * 3000 + 2000); this.retryTimeout = setTimeout(() => { this.retryTimeout = null; - this.connect({ enableAutoupdates: true }, true); + this.connect(null, true); }, timeout); } } @@ -438,9 +423,9 @@ export class WebsocketService { * * @param options The options for the new connection */ - public async reconnect(options: ConnectOptions = {}): Promise { + public async reconnect(changeId: number | null = null): Promise { await this.close(); - await this.connect(options); + await this.connect(changeId); } /** diff --git a/client/src/app/core/promises/mutex.ts b/client/src/app/core/promises/mutex.ts new file mode 100644 index 000000000..3de879915 --- /dev/null +++ b/client/src/app/core/promises/mutex.ts @@ -0,0 +1,30 @@ +/** + * A mutex as described in every textbook + * + * Usage: + * ``` + * mutex = new Mutex(); // create e.g. as class member + * + * // Somewhere in the code to lock (must be async code!) + * const unlock = await this.mutex.lock() + * // ...the code to synchronize + * unlock() + * ``` + */ +export class Mutex { + private mutex = Promise.resolve(); + + public lock(): PromiseLike<() => void> { + // this will capture the code-to-synchronize + let begin: (unlock: () => void) => void = () => {}; + + // All "requests" to execute code are chained in a promise-chain + this.mutex = this.mutex.then(() => { + return new Promise(begin); + }); + + return new Promise(res => { + begin = res; + }); + } +} diff --git a/client/src/app/site/common/components/legal-notice/legal-notice.component.html b/client/src/app/site/common/components/legal-notice/legal-notice.component.html index 31dbb9e57..56136f488 100644 --- a/client/src/app/site/common/components/legal-notice/legal-notice.component.html +++ b/client/src/app/site/common/components/legal-notice/legal-notice.component.html @@ -32,6 +32,26 @@ {{ 'Check for updates' | translate }} + +
+ +
+ + +
+ +
+ +
+ +
+
diff --git a/client/src/app/site/common/components/legal-notice/legal-notice.component.ts b/client/src/app/site/common/components/legal-notice/legal-notice.component.ts index bbacac6da..fd61ea876 100644 --- a/client/src/app/site/common/components/legal-notice/legal-notice.component.ts +++ b/client/src/app/site/common/components/legal-notice/legal-notice.component.ts @@ -4,6 +4,7 @@ import { Title } from '@angular/platform-browser'; import { TranslateService } from '@ngx-translate/core'; +import { DataStoreService } from 'app/core/core-services/data-store.service'; import { OpenSlidesService } from 'app/core/core-services/openslides.service'; import { OperatorService, Permission } from 'app/core/core-services/operator.service'; import { ConfigRepositoryService } from 'app/core/repositories/config/config-repository.service'; @@ -25,6 +26,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit { */ public legalNotice = ''; + public showDevTools = false; + /** * Constructor. */ @@ -35,7 +38,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit { private openSlidesService: OpenSlidesService, private update: UpdateService, private configRepo: ConfigRepositoryService, - private operator: OperatorService + private operator: OperatorService, + private DS: DataStoreService ) { super(title, translate, matSnackbar); } @@ -67,4 +71,12 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit { public canManage(): boolean { return this.operator.hasPerms(Permission.coreCanManageConfig); } + + public printDS(): void { + this.DS.print(); + } + + public getThisComponent(): void { + console.log(this); + } } diff --git a/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts b/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts index beda8a319..8cf8d31c9 100644 --- a/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts +++ b/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts @@ -554,7 +554,7 @@ export class MotionDetailComponent extends BaseViewComponent implements OnInit, this.motionFilterService.initFilters(this.motionObserver); this.motionSortService.initSorting(this.motionFilterService.outputObservable); this.sortedMotionsObservable = this.motionSortService.outputObservable; - } else if (this.motion.parent_id) { + } else if (this.motion && this.motion.parent_id) { // only use the amendments for this motion this.amendmentFilterService.initFilters(this.repo.amendmentsTo(this.motion.parent_id)); this.amendmentSortService.initSorting(this.amendmentFilterService.outputObservable); diff --git a/client/src/app/site/polls/components/poll-progress/poll-progress.component.html b/client/src/app/site/polls/components/poll-progress/poll-progress.component.html index 6a5f5cfb5..f8cffffd3 100644 --- a/client/src/app/site/polls/components/poll-progress/poll-progress.component.html +++ b/client/src/app/site/polls/components/poll-progress/poll-progress.component.html @@ -1,6 +1,6 @@
- {{ poll.votescast }} / {{ max }} + {{ votescast }} / {{ max }}
{{ 'Received votes' | translate }} diff --git a/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts b/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts index 8ecedb614..909f86df0 100644 --- a/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts +++ b/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts @@ -1,58 +1,98 @@ -import { Component, Input, OnInit } from '@angular/core'; +import { Component, Input } from '@angular/core'; import { MatSnackBar } from '@angular/material/snack-bar'; import { Title } from '@angular/platform-browser'; import { TranslateService } from '@ngx-translate/core'; -import { map } from 'rxjs/operators'; +import { Subscription } from 'rxjs'; +import { MotionPollRepositoryService } from 'app/core/repositories/motions/motion-poll-repository.service'; import { UserRepositoryService } from 'app/core/repositories/users/user-repository.service'; import { BaseViewComponent } from 'app/site/base/base-view'; import { ViewBasePoll } from 'app/site/polls/models/view-base-poll'; +import { ViewUser } from 'app/site/users/models/view-user'; @Component({ selector: 'os-poll-progress', templateUrl: './poll-progress.component.html', styleUrls: ['./poll-progress.component.scss'] }) -export class PollProgressComponent extends BaseViewComponent implements OnInit { - @Input() - public poll: ViewBasePoll; +export class PollProgressComponent extends BaseViewComponent { + private pollId: number = null; + private pollSubscription: Subscription = null; + @Input() + public set poll(value: ViewBasePoll) { + if (value.id !== this.pollId) { + this.pollId = value.id; + + if (this.pollSubscription !== null) { + this.pollSubscription.unsubscribe(); + this.pollSubscription = null; + } + + this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => { + if (poll) { + this._poll = poll; + + // We may cannot use this.poll.votescast during the voting, since it can + // be reported with false values from the server + // -> calculate the votes on our own. + const ids = new Set(); + for (const option of this.poll.options) { + for (const vote of option.votes) { + if (vote.user_id) { + ids.add(vote.user_id); + } + } + } + this.votescast = ids.size; + + // But sometimes there are not enough votes (poll.votescast is higher). + // If this happens, take the value from the poll + if (this.poll.votescast > this.votescast) { + this.votescast = this.poll.votescast; + } + + this.calculateMaxUsers(); + } + }); + } + } + public get poll(): ViewBasePoll { + return this._poll; + } + private _poll: ViewBasePoll; + + public votescast: number; public max: number; + public valueInPercent: number; public constructor( title: Title, protected translate: TranslateService, snackbar: MatSnackBar, - private userRepo: UserRepositoryService + private userRepo: UserRepositoryService, + private pollRepo: MotionPollRepositoryService ) { super(title, translate, snackbar); + this.userRepo.getViewModelListObservable().subscribe(users => { + if (users) { + this.calculateMaxUsers(users); + } + }); } - public get valueInPercent(): number { - if (this.poll) { - return (this.poll.votesvalid / this.max) * 100; - } else { - return 0; + private calculateMaxUsers(allUsers?: ViewUser[]): void { + if (!this.poll) { + return; + } + if (!allUsers) { + allUsers = this.userRepo.getViewModelList(); } - } - /** - * OnInit. - * Sets the observable for groups. - */ - public ngOnInit(): void { - if (this.poll) { - this.userRepo - .getViewModelListObservable() - .pipe( - map(users => - users.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length) - ) - ) - .subscribe(users => { - this.max = users.length; - }); - } + allUsers = allUsers.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length); + + this.max = allUsers.length; + this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0; } } diff --git a/openslides/agenda/projector.py b/openslides/agenda/projector.py index 36d0a47d4..eb8b305ca 100644 --- a/openslides/agenda/projector.py +++ b/openslides/agenda/projector.py @@ -3,9 +3,10 @@ from typing import Any, Dict, List, Union from ..users.projector import get_user_name from ..utils.projector import ( - AllData, + ProjectorAllDataProvider, ProjectorElementException, get_config, + get_model, register_projector_slide, ) @@ -15,20 +16,24 @@ from ..utils.projector import ( # side effects. -async def get_sorted_agenda_items(all_data: AllData) -> List[Dict[str, Any]]: +async def get_sorted_agenda_items( + agenda_items: Dict[int, Dict[str, Any]] +) -> List[Dict[str, Any]]: """ Returns all sorted agenda items by id first and then weight, resulting in ordered items, if some have the same weight. """ return sorted( - sorted(all_data["agenda/item"].values(), key=lambda item: item["id"]), + sorted(agenda_items.values(), key=lambda item: item["id"]), key=lambda item: item["weight"], ) -async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, Any]]: +async def get_flat_tree( + agenda_items: Dict[int, Dict[str, Any]], parent_id: int = 0 +) -> List[Dict[str, Any]]: """ - Build the item tree from all_data. + Build the item tree from all_data_provider. Only build the tree from elements unterneath parent_id. @@ -38,16 +43,16 @@ async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, # Build a dict from an item_id to all its children children: Dict[int, List[int]] = defaultdict(list) - if "agenda/item" in all_data: - for item in await get_sorted_agenda_items(all_data): - if item["type"] == 1: # only normal items - children[item["parent_id"] or 0].append(item["id"]) + + for item in await get_sorted_agenda_items(agenda_items): + if item["type"] == 1: # only normal items + children[item["parent_id"] or 0].append(item["id"]) tree = [] - async def get_children(item_ids: List[int], depth: int) -> None: + def build_tree(item_ids: List[int], depth: int) -> None: for item_id in item_ids: - item = all_data["agenda/item"][item_id] + item = agenda_items[item_id] title_information = item["title_information"] title_information["_agenda_item_number"] = item["item_number"] tree.append( @@ -57,25 +62,29 @@ async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, "depth": depth, } ) - await get_children(children[item_id], depth + 1) + build_tree(children[item_id], depth + 1) - await get_children(children[parent_id], 0) + build_tree(children[parent_id], 0) return tree async def item_list_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Item list slide. Returns all root items or all children of an item. """ - only_main_items = element.get("only_main_items", True) + # fetch all items, so they are cached: + all_agenda_items = await all_data_provider.get_collection("agenda/item") + only_main_items = element.get("only_main_items", True) if only_main_items: agenda_items = [] - for item in await get_sorted_agenda_items(all_data): + for item in await get_sorted_agenda_items(all_agenda_items): if item["parent_id"] is None and item["type"] == 1: title_information = item["title_information"] title_information["_agenda_item_number"] = item["item_number"] @@ -86,13 +95,15 @@ async def item_list_slide( } ) else: - agenda_items = await get_flat_tree(all_data) + agenda_items = await get_flat_tree(all_agenda_items) return {"items": agenda_items} async def list_of_speakers_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ List of speakers slide. @@ -104,35 +115,35 @@ async def list_of_speakers_slide( if list_of_speakers_id is None: raise ProjectorElementException("id is required for list of speakers slide") - return await get_list_of_speakers_slide_data(all_data, list_of_speakers_id) + return await get_list_of_speakers_slide_data(all_data_provider, list_of_speakers_id) async def get_list_of_speakers_slide_data( - all_data: AllData, list_of_speakers_id: int + all_data_provider: ProjectorAllDataProvider, list_of_speakers_id: int ) -> Dict[str, Any]: - try: - list_of_speakers = all_data["agenda/list-of-speakers"][list_of_speakers_id] - except KeyError: - raise ProjectorElementException( - f"List of speakers {list_of_speakers_id} does not exist" - ) + list_of_speakers = await get_model( + all_data_provider, "agenda/list-of-speakers", list_of_speakers_id + ) title_information = list_of_speakers["title_information"] # try to get the agenda item for the content object (which must not exist) - agenda_item_id = all_data[list_of_speakers["content_object"]["collection"]][ - list_of_speakers["content_object"]["id"] - ].get("agenda_item_id") - if agenda_item_id: - title_information["_agenda_item_number"] = all_data["agenda/item"][ - agenda_item_id - ]["item_number"] + content_object = await get_model( + all_data_provider, + list_of_speakers["content_object"]["collection"], + list_of_speakers["content_object"]["id"], + ) + agenda_item_id = content_object.get("agenda_item_id") + if agenda_item_id is not None: + agenda_item = await all_data_provider.get("agenda/item", agenda_item_id) + if agenda_item is not None: + title_information["_agenda_item_number"] = agenda_item["item_number"] # Partition speaker objects to waiting, current and finished speakers_waiting = [] speakers_finished = [] current_speaker = None for speaker in list_of_speakers["speakers"]: - user = await get_user_name(all_data, speaker["user_id"]) + user = await get_user_name(all_data_provider, speaker["user_id"]) formatted_speaker = { "user": user, "marked": speaker["marked"], @@ -151,8 +162,12 @@ async def get_list_of_speakers_slide_data( speakers_waiting = sorted(speakers_waiting, key=lambda s: s["weight"]) speakers_finished = sorted(speakers_finished, key=lambda s: s["end_time"]) - number_of_last_speakers = await get_config(all_data, "agenda_show_last_speakers") - number_of_next_speakers = await get_config(all_data, "agenda_show_next_speakers") + number_of_last_speakers = await get_config( + all_data_provider, "agenda_show_last_speakers" + ) + number_of_next_speakers = await get_config( + all_data_provider, "agenda_show_next_speakers" + ) if number_of_last_speakers == 0: speakers_finished = [] @@ -174,7 +189,7 @@ async def get_list_of_speakers_slide_data( async def get_current_list_of_speakers_id_for_projector( - all_data: AllData, projector: Dict[str, Any] + all_data_provider: ProjectorAllDataProvider, projector: Dict[str, Any] ) -> Union[int, None]: """ Search for elements, that do have a list of speakers: @@ -189,94 +204,88 @@ async def get_current_list_of_speakers_id_for_projector( continue collection = element["name"] id = element["id"] - if collection not in all_data or id not in all_data[collection]: + model = await all_data_provider.get(collection, id) + if model is None: continue - model = all_data[collection][id] if "list_of_speakers_id" not in model: continue - if not model["list_of_speakers_id"] in all_data["agenda/list-of-speakers"]: + list_of_speakers_id = model["list_of_speakers_id"] + los_exists = await all_data_provider.exists( + "agenda/list-of-speakers", list_of_speakers_id + ) + if not los_exists: continue - list_of_speakers_id = model["list_of_speakers_id"] break return list_of_speakers_id async def get_reference_projector( - all_data: AllData, projector_id: int + all_data_provider: ProjectorAllDataProvider, projector_id: int ) -> Dict[str, Any]: """ Returns the reference projector to the given projector (by id) """ - try: - this_projector = all_data["core/projector"][projector_id] - except KeyError: - raise ProjectorElementException(f"Projector {projector_id} does not exist") + this_projector = await get_model(all_data_provider, "core/projector", projector_id) reference_projector_id = this_projector["reference_projector_id"] or projector_id - try: - reference_projector = all_data["core/projector"][reference_projector_id] - except KeyError: - raise ProjectorElementException( - f"Projector {reference_projector_id} does not exist" - ) - - return reference_projector + return await get_model(all_data_provider, "core/projector", reference_projector_id) async def current_list_of_speakers_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ The current list of speakers slide. Creates the data for the given projector. """ - reference_projector = await get_reference_projector(all_data, projector_id) + reference_projector = await get_reference_projector(all_data_provider, projector_id) list_of_speakers_id = await get_current_list_of_speakers_id_for_projector( - all_data, reference_projector + all_data_provider, reference_projector ) if list_of_speakers_id is None: # no element found return {} - return await get_list_of_speakers_slide_data(all_data, list_of_speakers_id) + return await get_list_of_speakers_slide_data(all_data_provider, list_of_speakers_id) async def current_speaker_chyron_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Returns the username for the current speaker. """ # get projector for color information - projector = all_data["core/projector"][projector_id] + projector = await get_model(all_data_provider, "core/projector", projector_id) slide_data = { "background_color": projector["chyron_background_color"], "font_color": projector["chyron_font_color"], } - reference_projector = await get_reference_projector(all_data, projector_id) + reference_projector = await get_reference_projector(all_data_provider, projector_id) list_of_speakers_id = await get_current_list_of_speakers_id_for_projector( - all_data, reference_projector + all_data_provider, reference_projector ) if list_of_speakers_id is None: # no element found return slide_data # get list of speakers to search current speaker - try: - list_of_speakers = all_data["agenda/list-of-speakers"][list_of_speakers_id] - except KeyError: - raise ProjectorElementException( - f"List of speakers {list_of_speakers_id} does not exist" - ) + list_of_speakers = await get_model( + all_data_provider, "agenda/list-of-speakers", list_of_speakers_id + ) # find current speaker current_speaker = None for speaker in list_of_speakers["speakers"]: if speaker["begin_time"] is not None and speaker["end_time"] is None: - current_speaker = await get_user_name(all_data, speaker["user_id"]) + current_speaker = await get_user_name(all_data_provider, speaker["user_id"]) break if current_speaker is not None: diff --git a/openslides/assignments/projector.py b/openslides/assignments/projector.py index c868ad103..97c0be82a 100644 --- a/openslides/assignments/projector.py +++ b/openslides/assignments/projector.py @@ -1,25 +1,29 @@ from typing import Any, Dict, List from ..users.projector import get_user_name -from ..utils.projector import AllData, get_model, get_models, register_projector_slide +from ..utils.projector import ( + ProjectorAllDataProvider, + get_model, + get_models, + register_projector_slide, +) from .models import AssignmentPoll -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def assignment_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Assignment slide. """ - assignment = get_model(all_data, "assignments/assignment", element.get("id")) + assignment = await get_model( + all_data_provider, "assignments/assignment", element.get("id") + ) assignment_related_users: List[Dict[str, Any]] = [ - {"user": await get_user_name(all_data, aru["user_id"])} + {"user": await get_user_name(all_data_provider, aru["user_id"])} for aru in sorted( assignment["assignment_related_users"], key=lambda aru: aru["weight"] ) @@ -36,13 +40,19 @@ async def assignment_slide( async def assignment_poll_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Poll slide. """ - poll = get_model(all_data, "assignments/assignment-poll", element.get("id")) - assignment = get_model(all_data, "assignments/assignment", poll["assignment_id"]) + poll = await get_model( + all_data_provider, "assignments/assignment-poll", element.get("id") + ) + assignment = await get_model( + all_data_provider, "assignments/assignment", poll["assignment_id"] + ) poll_data = { key: poll[key] @@ -60,10 +70,14 @@ async def assignment_poll_slide( # Add options: poll_data["options"] = [] - options = get_models(all_data, "assignments/assignment-option", poll["options_id"]) + options = await get_models( + all_data_provider, "assignments/assignment-option", poll["options_id"] + ) for option in sorted(options, key=lambda option: option["weight"]): option_data: Dict[str, Any] = { - "user": {"short_name": await get_user_name(all_data, option["user_id"])} + "user": { + "short_name": await get_user_name(all_data_provider, option["user_id"]) + } } if poll["state"] == AssignmentPoll.STATE_PUBLISHED: option_data["yes"] = float(option["yes"]) diff --git a/openslides/core/apps.py b/openslides/core/apps.py index 3b4711b6e..3481e402a 100644 --- a/openslides/core/apps.py +++ b/openslides/core/apps.py @@ -34,16 +34,10 @@ class CoreAppConfig(AppConfig): ProjectionDefaultViewSet, TagViewSet, ) - from .websocket import ( - NotifyWebsocketClientMessage, - ConstantsWebsocketClientMessage, - GetElementsWebsocketClientMessage, - AutoupdateWebsocketClientMessage, - ListenToProjectors, - PingPong, - ) from ..utils.rest_api import router - from ..utils.websocket import register_client_message + + # Let all client websocket message register + from ..utils import websocket_client_messages # noqa # Collect all config variables before getting the constants. config.collect_config_variables_from_apps() @@ -92,14 +86,6 @@ class CoreAppConfig(AppConfig): self.get_model("Countdown").get_collection_string(), CountdownViewSet ) - # Register client messages - register_client_message(NotifyWebsocketClientMessage()) - register_client_message(ConstantsWebsocketClientMessage()) - register_client_message(GetElementsWebsocketClientMessage()) - register_client_message(AutoupdateWebsocketClientMessage()) - register_client_message(ListenToProjectors()) - register_client_message(PingPong()) - if "runserver" in sys.argv or "changeconfig" in sys.argv: from openslides.utils.startup import run_startup_hooks diff --git a/openslides/core/projector.py b/openslides/core/projector.py index 9bbf75942..3c9f2ed82 100644 --- a/openslides/core/projector.py +++ b/openslides/core/projector.py @@ -1,20 +1,17 @@ from typing import Any, Dict from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, get_config, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def countdown_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Countdown slide. @@ -26,23 +23,21 @@ async def countdown_slide( id: 5, # Countdown ID } """ - countdown_id = element.get("id") or 1 - - try: - countdown = all_data["core/countdown"][countdown_id] - except KeyError: - raise ProjectorElementException(f"Countdown {countdown_id} does not exist") - + countdown = await get_model(all_data_provider, "core/countdown", element.get("id")) return { "description": countdown["description"], "running": countdown["running"], "countdown_time": countdown["countdown_time"], - "warning_time": await get_config(all_data, "agenda_countdown_warning_time"), + "warning_time": await get_config( + all_data_provider, "agenda_countdown_warning_time" + ), } async def message_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Message slide. @@ -54,16 +49,15 @@ async def message_slide( id: 5, # ProjectorMessage ID } """ - message_id = element.get("id") or 1 - - try: - return all_data["core/projector-message"][message_id] - except KeyError: - raise ProjectorElementException(f"Message {message_id} does not exist") + return await get_model( + all_data_provider, "core/projector-message", element.get("id") + ) async def clock_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: return {} diff --git a/openslides/mediafiles/projector.py b/openslides/mediafiles/projector.py index 196d5e8c8..b996c823f 100644 --- a/openslides/mediafiles/projector.py +++ b/openslides/mediafiles/projector.py @@ -1,35 +1,23 @@ from typing import Any, Dict from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def mediafile_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Slide for Mediafile. """ - mediafile_id = element.get("id") - - if mediafile_id is None: - raise ProjectorElementException("id is required for mediafile slide") - - try: - mediafile = all_data["mediafiles/mediafile"][mediafile_id] - except KeyError: - raise ProjectorElementException( - f"mediafile with id {mediafile_id} does not exist" - ) - + mediafile = await get_model( + all_data_provider, "mediafiles/mediafile", element.get("id") + ) return { "path": mediafile["path"], "mimetype": mediafile["mimetype"], diff --git a/openslides/motions/projector.py b/openslides/motions/projector.py index 74a9a399f..c7a39040c 100644 --- a/openslides/motions/projector.py +++ b/openslides/motions/projector.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional from ..users.projector import get_user_name from ..utils.projector import ( - AllData, + ProjectorAllDataProvider, ProjectorElementException, get_config, get_model, @@ -14,33 +14,31 @@ from .models import MotionPoll motion_placeholder_regex = re.compile(r"\[motion:(\d+)\]") -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - async def get_state( - all_data: AllData, motion: Dict[str, Any], state_id_key: str + all_data_provider: ProjectorAllDataProvider, + motion: Dict[str, Any], + state_id_key: str, ) -> Dict[str, Any]: """ Returns a state element from one motion. Raises an error if the state does not exist. """ - state = all_data["motions/state"].get(motion[state_id_key]) - if not state: + state = await all_data_provider.get("motions/state", motion[state_id_key]) + if state is None: raise ProjectorElementException( f"motion {motion['id']} can not be on the state with id {motion[state_id_key]}" ) return state -async def get_amendment_merge_into_motion_diff(all_data, amendment): +async def get_amendment_merge_into_motion_diff(all_data_provider, amendment): """ HINT: This implementation should be consistent to showInDiffView() in ViewMotionAmendedParagraph.ts """ if amendment["state_id"] is None: return 0 - state = await get_state(all_data, amendment, "state_id") + state = await get_state(all_data_provider, amendment, "state_id") if state["merge_amendment_into_final"] == -1: return 0 if state["merge_amendment_into_final"] == 1: @@ -48,36 +46,37 @@ async def get_amendment_merge_into_motion_diff(all_data, amendment): if amendment["recommendation_id"] is None: return 0 - recommendation = await get_state(all_data, amendment, "recommendation_id") + recommendation = await get_state(all_data_provider, amendment, "recommendation_id") if recommendation["merge_amendment_into_final"] == 1: return 1 return 0 -async def get_amendment_merge_into_motion_final(all_data, amendment): +async def get_amendment_merge_into_motion_final(all_data_provider, amendment): """ HINT: This implementation should be consistent to showInFinalView() in ViewMotionAmendedParagraph.ts """ if amendment["state_id"] is None: return 0 - state = await get_state(all_data, amendment, "state_id") + state = await get_state(all_data_provider, amendment, "state_id") if state["merge_amendment_into_final"] == 1: return 1 return 0 -async def get_amendments_for_motion(motion, all_data): +async def get_amendments_for_motion(motion, all_data_provider): amendment_data = [] - for amendment_id, amendment in all_data["motions/motion"].items(): + all_motions = await all_data_provider.get_collection("motions/motion") + for amendment_id, amendment in all_motions.items(): if amendment["parent_id"] == motion["id"]: merge_amendment_into_final = await get_amendment_merge_into_motion_final( - all_data, amendment + all_data_provider, amendment ) merge_amendment_into_diff = await get_amendment_merge_into_motion_diff( - all_data, amendment + all_data_provider, amendment ) amendment_data.append( { @@ -92,8 +91,10 @@ async def get_amendments_for_motion(motion, all_data): return amendment_data -async def get_amendment_base_motion(amendment, all_data): - motion = get_model(all_data, "motions/motion", amendment.get("parent_id")) +async def get_amendment_base_motion(amendment, all_data_provider): + motion = await get_model( + all_data_provider, "motions/motion", amendment.get("parent_id") + ) return { "identifier": motion["identifier"], @@ -102,15 +103,17 @@ async def get_amendment_base_motion(amendment, all_data): } -async def get_amendment_base_statute(amendment, all_data): - statute = get_model( - all_data, "motions/statute-paragraph", amendment.get("statute_paragraph_id") +async def get_amendment_base_statute(amendment, all_data_provider): + statute = await get_model( + all_data_provider, + "motions/statute-paragraph", + amendment.get("statute_paragraph_id"), ) return {"title": statute["title"], "text": statute["text"]} async def extend_reference_motion_dict( - all_data: AllData, + all_data_provider: ProjectorAllDataProvider, recommendation: Optional[str], referenced_motions: Dict[int, Dict[str, str]], ) -> None: @@ -127,15 +130,18 @@ async def extend_reference_motion_dict( ] for id in referenced_ids: # Put every referenced motion into the referenced_motions dict - if id not in referenced_motions and id in all_data["motions/motion"]: + referenced_motion = await all_data_provider.get("motions/motion", id) + if id not in referenced_motions and referenced_motion is not None: referenced_motions[id] = { - "title": all_data["motions/motion"][id]["title"], - "identifier": all_data["motions/motion"][id]["identifier"], + "title": referenced_motion["title"], + "identifier": referenced_motion["identifier"], } async def motion_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Motion slide. @@ -158,13 +164,16 @@ async def motion_slide( """ # Get motion mode = element.get( - "mode", await get_config(all_data, "motions_recommendation_text_mode") + "mode", await get_config(all_data_provider, "motions_recommendation_text_mode") ) - motion = get_model(all_data, "motions/motion", element.get("id")) + + # populate cache: + + motion = await get_model(all_data_provider, "motions/motion", element.get("id")) # Add submitters submitters = [ - await get_user_name(all_data, submitter["user_id"]) + await get_user_name(all_data_provider, submitter["user_id"]) for submitter in sorted( motion["submitters"], key=lambda submitter: submitter["weight"] ) @@ -172,14 +181,16 @@ async def motion_slide( # Get some needed config values show_meta_box = not await get_config( - all_data, "motions_disable_sidebox_on_projector" + all_data_provider, "motions_disable_sidebox_on_projector" ) show_referring_motions = not await get_config( - all_data, "motions_hide_referring_motions" + all_data_provider, "motions_hide_referring_motions" ) - line_length = await get_config(all_data, "motions_line_length") - line_numbering_mode = await get_config(all_data, "motions_default_line_numbering") - motions_preamble = await get_config(all_data, "motions_preamble") + line_length = await get_config(all_data_provider, "motions_line_length") + line_numbering_mode = await get_config( + all_data_provider, "motions_default_line_numbering" + ) + motions_preamble = await get_config(all_data_provider, "motions_preamble") # Query all change-recommendation and amendment related things. change_recommendations = [] # type: ignore @@ -187,17 +198,19 @@ async def motion_slide( base_motion = None base_statute = None if motion["statute_paragraph_id"]: - base_statute = await get_amendment_base_statute(motion, all_data) + base_statute = await get_amendment_base_statute(motion, all_data_provider) elif motion["parent_id"] is not None and motion["amendment_paragraphs"]: - base_motion = await get_amendment_base_motion(motion, all_data) + base_motion = await get_amendment_base_motion(motion, all_data_provider) else: for change_recommendation_id in motion["change_recommendations_id"]: - cr = all_data["motions/motion-change-recommendation"].get( - change_recommendation_id + cr = await get_model( + all_data_provider, + "motions/motion-change-recommendation", + change_recommendation_id, ) if cr is not None and not cr["internal"]: change_recommendations.append(cr) - amendments = await get_amendments_for_motion(motion, all_data) + amendments = await get_amendments_for_motion(motion, all_data_provider) # The base return value. More fields will get added below. return_value = { @@ -217,10 +230,10 @@ async def motion_slide( "line_numbering_mode": line_numbering_mode, } - if not await get_config(all_data, "motions_disable_text_on_projector"): + if not await get_config(all_data_provider, "motions_disable_text_on_projector"): return_value["text"] = motion["text"] - if not await get_config(all_data, "motions_disable_reason_on_projector"): + if not await get_config(all_data_provider, "motions_disable_reason_on_projector"): return_value["reason"] = motion["reason"] if mode == "final": @@ -228,40 +241,46 @@ async def motion_slide( # Add recommendation, if enabled in config (and the motion has one) if ( - not await get_config(all_data, "motions_disable_recommendation_on_projector") + not await get_config( + all_data_provider, "motions_disable_recommendation_on_projector" + ) and motion["recommendation_id"] ): - recommendation_state = await get_state(all_data, motion, "recommendation_id") + recommendation_state = await get_state( + all_data_provider, motion, "recommendation_id" + ) return_value["recommendation"] = recommendation_state["recommendation_label"] if recommendation_state["show_recommendation_extension_field"]: recommendation_extension = motion["recommendation_extension"] # All title information for referenced motions in the recommendation referenced_motions: Dict[int, Dict[str, str]] = {} await extend_reference_motion_dict( - all_data, recommendation_extension, referenced_motions + all_data_provider, recommendation_extension, referenced_motions ) return_value["recommendation_extension"] = recommendation_extension return_value["referenced_motions"] = referenced_motions if motion["statute_paragraph_id"]: return_value["recommender"] = await get_config( - all_data, "motions_statute_recommendations_by" + all_data_provider, "motions_statute_recommendations_by" ) else: return_value["recommender"] = await get_config( - all_data, "motions_recommendations_by" + all_data_provider, "motions_recommendations_by" ) if show_referring_motions: # Add recommendation-referencing motions return_value[ "recommendation_referencing_motions" - ] = await get_recommendation_referencing_motions(all_data, motion["id"]) + ] = await get_recommendation_referencing_motions( + all_data_provider, motion["id"] + ) return return_value async def get_recommendation_referencing_motions( - all_data: AllData, motion_id: int + all_data_provider: ProjectorAllDataProvider, motion_id: int ) -> Optional[List[Dict[str, Any]]]: """ Returns all title information for motions, that are referencing @@ -269,14 +288,15 @@ async def get_recommendation_referencing_motions( motions, None is returned (instead of []). """ recommendation_referencing_motions = [] - for motion in all_data["motions/motion"].values(): + all_motions = await all_data_provider.get_collection("motions/motion") + for motion in all_motions.values(): # Motion must have a recommendation and a recommendaiton extension if not motion["recommendation_id"] or not motion["recommendation_extension"]: continue # The recommendation must allow the extension field (there might be left-overs # in a motions recommendation extension..) - recommendation = await get_state(all_data, motion, "recommendation_id") + recommendation = await get_state(all_data_provider, motion, "recommendation_id") if not recommendation["show_recommendation_extension_field"]: continue @@ -297,12 +317,16 @@ async def get_recommendation_referencing_motions( async def motion_block_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Motion block slide. """ - motion_block = get_model(all_data, "motions/motion-block", element.get("id")) + motion_block = await get_model( + all_data_provider, "motions/motion-block", element.get("id") + ) # All motions in this motion block motions = [] @@ -311,7 +335,8 @@ async def motion_block_slide( referenced_motions: Dict[int, Dict[str, str]] = {} # Search motions. - for motion in all_data["motions/motion"].values(): + all_motions = await all_data_provider.get_collection("motions/motion") + for motion in all_motions.values(): if motion["motion_block_id"] == motion_block["id"]: motion_object = { "title": motion["title"], @@ -320,7 +345,9 @@ async def motion_block_slide( recommendation_id = motion["recommendation_id"] if recommendation_id is not None: - recommendation = await get_state(all_data, motion, "recommendation_id") + recommendation = await get_state( + all_data_provider, motion, "recommendation_id" + ) motion_object["recommendation"] = { "name": recommendation["recommendation_label"], "css_class": recommendation["css_class"], @@ -328,7 +355,7 @@ async def motion_block_slide( if recommendation["show_recommendation_extension_field"]: recommendation_extension = motion["recommendation_extension"] await extend_reference_motion_dict( - all_data, recommendation_extension, referenced_motions + all_data_provider, recommendation_extension, referenced_motions ) motion_object["recommendation_extension"] = recommendation_extension @@ -342,13 +369,15 @@ async def motion_block_slide( async def motion_poll_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Poll slide. """ - poll = get_model(all_data, "motions/motion-poll", element.get("id")) - motion = get_model(all_data, "motions/motion", poll["motion_id"]) + poll = await get_model(all_data_provider, "motions/motion-poll", element.get("id")) + motion = await get_model(all_data_provider, "motions/motion", poll["motion_id"]) poll_data = { key: poll[key] @@ -363,8 +392,8 @@ async def motion_poll_slide( } if poll["state"] == MotionPoll.STATE_PUBLISHED: - option = get_model( - all_data, "motions/motion-option", poll["options_id"][0] + option = await get_model( + all_data_provider, "motions/motion-option", poll["options_id"][0] ) # there can only be exactly one option poll_data["options"] = [ { diff --git a/openslides/poll/views.py b/openslides/poll/views.py index fabb87f9a..06da7637b 100644 --- a/openslides/poll/views.py +++ b/openslides/poll/views.py @@ -156,9 +156,11 @@ class BasePollViewSet(ModelViewSet): poll.state = BasePoll.STATE_PUBLISHED poll.save() - inform_changed_data(vote.user for vote in poll.get_votes().all() if vote.user) - inform_changed_data(poll.get_votes()) - inform_changed_data(poll.get_options()) + inform_changed_data( + (vote.user for vote in poll.get_votes().all() if vote.user), final_data=True + ) + inform_changed_data(poll.get_votes(), final_data=True) + inform_changed_data(poll.get_options(), final_data=True) return Response() @detail_route(methods=["POST"]) diff --git a/openslides/topics/projector.py b/openslides/topics/projector.py index 18306aa81..3687c0f31 100644 --- a/openslides/topics/projector.py +++ b/openslides/topics/projector.py @@ -1,19 +1,16 @@ from typing import Any, Dict from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def topic_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Topic slide. @@ -22,22 +19,8 @@ async def topic_slide( * title * text """ - topic_id = element.get("id") - - if topic_id is None: - raise ProjectorElementException("id is required for topic slide") - - try: - topic = all_data["topics/topic"][topic_id] - except KeyError: - raise ProjectorElementException(f"topic with id {topic_id} does not exist") - - item_id = topic["agenda_item_id"] - try: - item = all_data["agenda/item"][item_id] - except KeyError: - raise ProjectorElementException(f"item with id {item_id} does not exist") - + topic = await get_model(all_data_provider, "topics/topic", element.get("id")) + item = await get_model(all_data_provider, "agenda/item", topic["agenda_item_id"]) return { "title": topic["title"], "text": topic["text"], diff --git a/openslides/users/projector.py b/openslides/users/projector.py index c1b02fc85..902a19a59 100644 --- a/openslides/users/projector.py +++ b/openslides/users/projector.py @@ -1,19 +1,16 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def user_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ User slide. @@ -21,22 +18,16 @@ async def user_slide( The returned dict can contain the following fields: * user """ - user_id = element.get("id") - - if user_id is None: - raise ProjectorElementException("id is required for user slide") - - return {"user": await get_user_name(all_data, user_id)} + return {"user": await get_user_name(all_data_provider, element.get("id"))} -async def get_user_name(all_data: AllData, user_id: int) -> str: +async def get_user_name( + all_data_provider: ProjectorAllDataProvider, user_id: Optional[int] +) -> str: """ Returns the short name for an user_id. """ - try: - user = all_data["users/user"][user_id] - except KeyError: - raise ProjectorElementException(f"user with id {user_id} does not exist") + user = await get_model(all_data_provider, "users/user", user_id) name_parts: List[str] = [] for name_part in ("title", "first_name", "last_name"): diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index 7701b1598..d1fffe8df 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -1,3 +1,4 @@ +import json import threading from collections import defaultdict from typing import Any, Dict, Iterable, List, Optional, Tuple, Union @@ -7,9 +8,22 @@ from channels.layers import get_channel_layer from django.db.models import Model from mypy_extensions import TypedDict -from .cache import element_cache, get_element_id +from .cache import ChangeIdTooLowError, element_cache, get_element_id from .projector import get_projector_data -from .utils import get_model_from_collection_string, is_iterable +from .timing import Timing +from .utils import get_model_from_collection_string, is_iterable, split_element_id + + +AutoupdateFormat = TypedDict( + "AutoupdateFormat", + { + "changed": Dict[str, List[Dict[str, Any]]], + "deleted": Dict[str, List[int]], + "from_change_id": int, + "to_change_id": int, + "all_data": bool, + }, +) class AutoupdateElementBase(TypedDict): @@ -66,13 +80,15 @@ class AutoupdateBundle: element["id"] ] = element - def done(self) -> None: + def done(self) -> Optional[int]: """ Finishes the bundle by resolving all missing data and passing it to the history and element cache. + + Returns the change id, if there are autoupdate elements. Otherwise none. """ if not self.autoupdate_elements: - return + return None for collection, elements in self.autoupdate_elements.items(): # Get all ids, that do not have a full_data key @@ -92,13 +108,14 @@ class AutoupdateBundle: elements[full_data["id"]]["full_data"] = full_data # Save histroy here using sync code. - save_history(self.elements) + save_history(self.element_iterator) # Update cache and send autoupdate using async code. - async_to_sync(self.async_handle_collection_elements)() + change_id = async_to_sync(self.dispatch_autoupdate)() + return change_id @property - def elements(self) -> Iterable[AutoupdateElement]: + def element_iterator(self) -> Iterable[AutoupdateElement]: """ Iterator for all elements in this bundle """ for elements in self.autoupdate_elements.values(): yield from elements.values() @@ -110,7 +127,7 @@ class AutoupdateBundle: Returns the change_id """ cache_elements: Dict[str, Optional[Dict[str, Any]]] = {} - for element in self.elements: + for element in self.element_iterator: element_id = get_element_id(element["collection_string"], element["id"]) full_data = element.get("full_data") if full_data: @@ -120,9 +137,11 @@ class AutoupdateBundle: cache_elements[element_id] = full_data return await element_cache.change_elements(cache_elements) - async def async_handle_collection_elements(self) -> None: + async def dispatch_autoupdate(self) -> int: """ Async helper function to update cache and send autoupdate. + + Return the change_id """ # Update cache change_id = await self.update_cache() @@ -130,21 +149,23 @@ class AutoupdateBundle: # Send autoupdate channel_layer = get_channel_layer() await channel_layer.group_send( - "autoupdate", {"type": "send_data", "change_id": change_id} + "autoupdate", {"type": "msg_new_change_id", "change_id": change_id} ) - projector_data = await get_projector_data() # Send projector + projector_data = await get_projector_data() channel_layer = get_channel_layer() await channel_layer.group_send( "projector", { - "type": "projector_changed", + "type": "msg_projector_data", "data": projector_data, "change_id": change_id, }, ) + return change_id + def inform_changed_data( instances: Union[Iterable[Model], Model], @@ -152,6 +173,7 @@ def inform_changed_data( user_id: Optional[int] = None, disable_history: bool = False, no_delete_on_restriction: bool = False, + final_data: bool = False, ) -> None: """ Informs the autoupdate system and the caching system about the creation or @@ -167,8 +189,10 @@ def inform_changed_data( instances = (instances,) root_instances = set(instance.get_root_rest_element() for instance in instances) - elements = [ - AutoupdateElement( + + elements = [] + for root_instance in root_instances: + element = AutoupdateElement( id=root_instance.get_rest_pk(), collection_string=root_instance.get_collection_string(), disable_history=disable_history, @@ -176,8 +200,9 @@ def inform_changed_data( user_id=user_id, no_delete_on_restriction=no_delete_on_restriction, ) - for root_instance in root_instances - ] + if final_data: + element["full_data"] = root_instance.get_full_data() + elements.append(element) inform_elements(elements) @@ -246,14 +271,68 @@ class AutoupdateBundleMiddleware: thread_id = threading.get_ident() autoupdate_bundle[thread_id] = AutoupdateBundle() + timing = Timing("request") + response = self.get_response(request) + timing() + + # rewrite the response by adding the autoupdate on any success-case (2xx status) bundle: AutoupdateBundle = autoupdate_bundle.pop(thread_id) - bundle.done() + if response.status_code >= 200 and response.status_code < 300: + change_id = bundle.done() + + if change_id is not None: + user_id = request.user.pk or 0 + # Inject the autoupdate in the response. + # The complete response body will be overwritten! + autoupdate = async_to_sync(get_autoupdate_data)( + change_id, change_id, user_id + ) + content = {"autoupdate": autoupdate, "data": response.data} + # Note: autoupdate may be none on skipped ones (which should not happen + # since the user has made the request....) + response.content = json.dumps(content) + + timing(True) return response -def save_history(elements: Iterable[AutoupdateElement]) -> Iterable: +async def get_autoupdate_data( + from_change_id: int, to_change_id: int, user_id: int +) -> Optional[AutoupdateFormat]: + try: + changed_elements, deleted_element_ids = await element_cache.get_data_since( + user_id, from_change_id, to_change_id + ) + except ChangeIdTooLowError: + # The change_id is lower the the lowerst change_id in redis. Return all data + changed_elements = await element_cache.get_all_data_list(user_id) + all_data = True + deleted_elements: Dict[str, List[int]] = {} + else: + all_data = False + deleted_elements = defaultdict(list) + for element_id in deleted_element_ids: + collection_string, id = split_element_id(element_id) + deleted_elements[collection_string].append(id) + + # Check, if the autoupdate has any data. + if not changed_elements and not deleted_element_ids: + # Skip empty updates + return None + else: + # Normal autoupdate with data + return AutoupdateFormat( + changed=changed_elements, + deleted=deleted_elements, + from_change_id=from_change_id, + to_change_id=to_change_id, + all_data=all_data, + ) + + +def save_history(element_iterator: Iterable[AutoupdateElement]) -> Iterable: """ Thin wrapper around the call of history saving manager method. @@ -261,4 +340,4 @@ def save_history(elements: Iterable[AutoupdateElement]) -> Iterable: """ from ..core.models import History - return History.objects.add_elements(elements) + return History.objects.add_elements(element_iterator) diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index 00054ca74..14dec14d2 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -254,25 +254,6 @@ class ElementCache: all_data[collection] = await restricter(user_id, all_data[collection]) return dict(all_data) - async def get_all_data_dict(self) -> Dict[str, Dict[int, Dict[str, Any]]]: - """ - Returns all data with a dict (id <-> element) per collection: - { - : { - : - } - } - """ - all_data: Dict[str, Dict[int, Dict[str, Any]]] = defaultdict(dict) - for element_id, data in (await self.cache_provider.get_all_data()).items(): - collection, id = split_element_id(element_id) - element = json.loads(data.decode()) - element.pop( - "_no_delete_on_restriction", False - ) # remove special field for get_data_since - all_data[collection][id] = element - return dict(all_data) - async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]: """ Returns the data for one collection as dict: {id: } diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 0b5c1ba86..987904424 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -8,7 +8,11 @@ from django.core.exceptions import ImproperlyConfigured from typing_extensions import Protocol from . import logging -from .redis import read_only_redis_amount_replicas, use_redis +from .redis import ( + read_only_redis_amount_replicas, + read_only_redis_wait_timeout, + use_redis, +) from .schema_version import SchemaVersion from .utils import split_element_id, str_dict_to_bytes @@ -297,7 +301,7 @@ class RedisCacheProvider: async def add_to_full_data(self, data: Dict[str, str]) -> None: async with get_connection() as redis: - redis.hmset_dict(self.full_data_cache_key, data) + await redis.hmset_dict(self.full_data_cache_key, data) async def data_exists(self) -> bool: """ @@ -492,11 +496,12 @@ class RedisCacheProvider: raise e if not read_only and read_only_redis_amount_replicas is not None: reported_amount = await redis.wait( - read_only_redis_amount_replicas, 1000 + read_only_redis_amount_replicas, read_only_redis_wait_timeout ) if reported_amount != read_only_redis_amount_replicas: logger.warn( - f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!" + f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} " + + f"requested after {read_only_redis_wait_timeout} ms!" ) return result diff --git a/openslides/utils/consumer_autoupdate_strategy.py b/openslides/utils/consumer_autoupdate_strategy.py new file mode 100644 index 000000000..f5a03cb29 --- /dev/null +++ b/openslides/utils/consumer_autoupdate_strategy.py @@ -0,0 +1,108 @@ +import asyncio +from asyncio import Task +from typing import Optional, cast + +from django.conf import settings + +from .autoupdate import get_autoupdate_data +from .cache import element_cache +from .websocket import ChangeIdTooHighException, ProtocollAsyncJsonWebsocketConsumer + + +AUTOUPDATE_DELAY = getattr(settings, "AUTOUPDATE_DELAY", None) + + +class ConsumerAutoupdateStrategy: + def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None: + self.consumer = consumer + # client_change_id = None: unknown -> set on first autoupdate or request_change_id + # client_change_id is int: the change_id, the client knows about, so the next + # update must be from client_change_id+1 .. + self.client_change_id: Optional[int] = None + self.max_seen_change_id = 0 + self.next_send_time = None + self.timer_task_handle: Optional[Task[None]] = None + self.lock = asyncio.Lock() + + async def request_change_id( + self, change_id: int, in_response: Optional[str] = None + ) -> None: + """ + The change id is not inclusive, so the client is on change_id and wants + data from change_id+1 .. now + """ + # This resets the server side tracking of the client's change id. + async with self.lock: + await self.stop_timer() + + self.max_seen_change_id = await element_cache.get_current_change_id() + print(self.max_seen_change_id) + self.client_change_id = change_id + + if self.client_change_id == self.max_seen_change_id: + # The client is up-to-date, so nothing will be done + return None + + if self.client_change_id > self.max_seen_change_id: + message = ( + f"Requested change_id {self.client_change_id} is higher than the " + + f"highest change_id {self.max_seen_change_id}." + ) + raise ChangeIdTooHighException(message, in_response=in_response) + + await self.send_autoupdate(in_response=in_response) + + async def new_change_id(self, change_id: int) -> None: + async with self.lock: + if self.client_change_id is None: + # The -1 is to send this autoupdate as the first one to he client. + # Remember: the client_change_id is the change_id the client knows about + self.client_change_id = change_id - 1 + if change_id > self.max_seen_change_id: + self.max_seen_change_id = change_id + + if AUTOUPDATE_DELAY is None: # feature deactivated, send directly + await self.send_autoupdate() + elif self.timer_task_handle is None: + await self.start_timer() + + async def get_running_loop(self) -> asyncio.AbstractEventLoop: + if hasattr(asyncio, "get_running_loop"): + return asyncio.get_running_loop() # type: ignore + else: + return asyncio.get_event_loop() + + async def start_timer(self) -> None: + loop = await self.get_running_loop() + self.timer_task_handle = loop.create_task(self.timer_task()) + + async def stop_timer(self) -> None: + if self.timer_task_handle is not None: + self.timer_task_handle.cancel() + self.timer_task_handle = None + + async def timer_task(self) -> None: + try: + await asyncio.sleep(AUTOUPDATE_DELAY) + except asyncio.CancelledError: + return + + async with self.lock: + await self.send_autoupdate() + self.timer_task_handle = None + + async def send_autoupdate(self, in_response: Optional[str] = None) -> None: + # it is important to save this variable, because it can change during runtime. + max_change_id = self.max_seen_change_id + # here, 1 is added to the change_id, because the client_change_id is the id the client + # *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is + # inclusive [change_id .. max_change_id]. + autoupdate = await get_autoupdate_data( + cast(int, self.client_change_id) + 1, max_change_id, self.consumer.user_id + ) + if autoupdate is not None: + # It will be send, so we can set the client_change_id + self.client_change_id = max_change_id + await self.consumer.send_json( + type="autoupdate", content=autoupdate, in_response=in_response, + ) diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 1073f831d..7432423f9 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -1,32 +1,19 @@ import time -from collections import defaultdict from typing import Any, Dict, List, Optional, cast from urllib.parse import parse_qs from channels.generic.websocket import AsyncWebsocketConsumer -from mypy_extensions import TypedDict -from ..utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH from . import logging from .auth import UserDoesNotExist, async_anonymous_is_enabled -from .cache import ChangeIdTooLowError, element_cache, split_element_id +from .cache import element_cache +from .consumer_autoupdate_strategy import ConsumerAutoupdateStrategy from .utils import get_worker_id -from .websocket import ProtocollAsyncJsonWebsocketConsumer +from .websocket import BaseWebsocketException, ProtocollAsyncJsonWebsocketConsumer logger = logging.getLogger("openslides.websocket") -AutoupdateFormat = TypedDict( - "AutoupdateFormat", - { - "changed": Dict[str, List[Dict[str, Any]]], - "deleted": Dict[str, List[int]], - "from_change_id": int, - "to_change_id": int, - "all_data": bool, - }, -) - class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ @@ -40,12 +27,11 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): ID counter for assigning each instance of this class an unique id. """ - skipped_autoupdate_from_change_id: Optional[int] = None - def __init__(self, *args: Any, **kwargs: Any) -> None: self.projector_hash: Dict[int, int] = {} SiteConsumer.ID_COUNTER += 1 self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER) + self.autoupdate_strategy = ConsumerAutoupdateStrategy(self) super().__init__(*args, **kwargs) async def connect(self) -> None: @@ -56,11 +42,13 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): Sends the startup data to the user. """ + self.user_id = self.scope["user"]["id"] + self.connect_time = time.time() # self.scope['user'] is the full_data dict of the user. For an # anonymous user is it the dict {'id': 0} change_id = None - if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]: + if not await async_anonymous_is_enabled() and not self.user_id: await self.accept() # workaround for #4009 await self.close() logger.debug(f"connect: denied ({self._id})") @@ -74,24 +62,23 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): change_id = int(query_string[b"change_id"][0]) except ValueError: await self.accept() # workaround for #4009 - await self.close() # TODO: Find a way to send an error code + await self.close() logger.debug(f"connect: wrong change id ({self._id})") return - if b"autoupdate" in query_string and query_string[b"autoupdate"][ - 0 - ].lower() not in [b"0", b"off", b"false"]: - # a positive value in autoupdate. Start autoupdate - await self.channel_layer.group_add("autoupdate", self.channel_name) - await self.accept() if change_id is not None: logger.debug(f"connect: change id {change_id} ({self._id})") - await self.send_autoupdate(change_id) + try: + await self.request_autoupdate(change_id) + except BaseWebsocketException as e: + await self.send_exception(e) else: logger.debug(f"connect: no change id ({self._id})") + await self.channel_layer.group_add("autoupdate", self.channel_name) + async def disconnect(self, close_code: int) -> None: """ A user disconnects. Remove it from autoupdate. @@ -102,110 +89,19 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): f"disconnect code={close_code} active_secs={active_seconds} ({self._id})" ) - async def send_notify(self, event: Dict[str, Any]) -> None: - """ - Send a notify message to the user. - """ - user_id = self.scope["user"]["id"] - item = event["incomming"] - - users = item.get("users") - reply_channels = item.get("replyChannels") - if ( - (isinstance(users, bool) and users) - or (isinstance(users, list) and user_id in users) - or ( - isinstance(reply_channels, list) and self.channel_name in reply_channels - ) - or (users is None and reply_channels is None) - ): - item["senderChannelName"] = event["senderChannelName"] - item["senderUserId"] = event["senderUserId"] - await self.send_json(type="notify", content=item) - - async def send_autoupdate( - self, - change_id: int, - max_change_id: Optional[int] = None, - in_response: Optional[str] = None, - ) -> None: - """ - Sends an autoupdate to the client from change_id to max_change_id. - If max_change_id is None, the current change id will be used. - """ - user_id = self.scope["user"]["id"] - - if max_change_id is None: - max_change_id = await element_cache.get_current_change_id() - - if change_id == max_change_id + 1: - # The client is up-to-date, so nothing will be done - return - - if change_id > max_change_id: - message = f"Requested change_id {change_id} is higher this highest change_id {max_change_id}." - await self.send_error( - code=WEBSOCKET_CHANGE_ID_TOO_HIGH, - message=message, - in_response=in_response, - ) - return - - try: - changed_elements, deleted_element_ids = await element_cache.get_data_since( - user_id, change_id, max_change_id - ) - except ChangeIdTooLowError: - # The change_id is lower the the lowerst change_id in redis. Return all data - changed_elements = await element_cache.get_all_data_list(user_id) - all_data = True - deleted_elements: Dict[str, List[int]] = {} - except UserDoesNotExist: - # Maybe the user was deleted, but a websocket connection is still open to the user. - # So we can close this connection and return. - await self.close() - return - else: - all_data = False - deleted_elements = defaultdict(list) - for element_id in deleted_element_ids: - collection_string, id = split_element_id(element_id) - deleted_elements[collection_string].append(id) - - # Check, if the autoupdate has any data. - if not changed_elements and not deleted_element_ids: - # Set the current from_change_id, if it is the first skipped autoupdate - if not self.skipped_autoupdate_from_change_id: - self.skipped_autoupdate_from_change_id = change_id - else: - # Normal autoupdate with data - from_change_id = change_id - - # If there is at least one skipped autoupdate, take the saved from_change_id - if self.skipped_autoupdate_from_change_id: - from_change_id = self.skipped_autoupdate_from_change_id - self.skipped_autoupdate_from_change_id = None - - await self.send_json( - type="autoupdate", - content=AutoupdateFormat( - changed=changed_elements, - deleted=deleted_elements, - from_change_id=from_change_id, - to_change_id=max_change_id, - all_data=all_data, - ), - in_response=in_response, - ) - - async def send_data(self, event: Dict[str, Any]) -> None: + async def msg_new_change_id(self, event: Dict[str, Any]) -> None: """ Send changed or deleted elements to the user. """ change_id = event["change_id"] - await self.send_autoupdate(change_id, max_change_id=change_id) + try: + await self.autoupdate_strategy.new_change_id(change_id) + except UserDoesNotExist: + # Maybe the user was deleted, but a websocket connection is still open to the user. + # So we can close this connection and return. + await self.close() - async def projector_changed(self, event: Dict[str, Any]) -> None: + async def msg_projector_data(self, event: Dict[str, Any]) -> None: """ The projector has changed. """ @@ -223,6 +119,33 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): if projector_data: await self.send_projector_data(projector_data, change_id=change_id) + async def msg_notify(self, event: Dict[str, Any]) -> None: + """ + Send a notify message to the user. + """ + item = event["incomming"] + + users = item.get("users") + reply_channels = item.get("replyChannels") + if ( + (isinstance(users, bool) and users) + or (isinstance(users, list) and self.user_id in users) + or ( + isinstance(reply_channels, list) and self.channel_name in reply_channels + ) + or (users is None and reply_channels is None) + ): + item["senderChannelName"] = event["senderChannelName"] + item["senderUserId"] = event["senderUserId"] + await self.send_json(type="notify", content=item) + + async def request_autoupdate( + self, change_id: int, in_response: Optional[str] = None + ) -> None: + await self.autoupdate_strategy.request_change_id( + change_id, in_response=in_response + ) + async def send_projector_data( self, data: Dict[int, Dict[str, Any]], diff --git a/openslides/utils/models.py b/openslides/utils/models.py index 8a531ac17..91a43107c 100644 --- a/openslides/utils/models.py +++ b/openslides/utils/models.py @@ -175,7 +175,7 @@ class RESTModelMixin: current_time = time.time() if current_time > last_time + 5: last_time = current_time - logger.info(f"\t{i+1}/{instances_length}...") + logger.info(f" {i+1}/{instances_length}...") return full_data @classmethod diff --git a/openslides/utils/projector.py b/openslides/utils/projector.py index 9e20fddcd..3a8f1cc00 100644 --- a/openslides/utils/projector.py +++ b/openslides/utils/projector.py @@ -5,16 +5,14 @@ Functions that handel the registration of projector elements and the rendering of the data to present it on the projector. """ -from typing import Any, Awaitable, Callable, Dict, List +from collections import defaultdict +from typing import Any, Awaitable, Callable, Dict, List, Optional +from . import logging from .cache import element_cache -AllData = Dict[str, Dict[int, Dict[str, Any]]] -ProjectorSlide = Callable[[AllData, Dict[str, Any], int], Awaitable[Dict[str, Any]]] - - -projector_slides: Dict[str, ProjectorSlide] = {} +logger = logging.getLogger(__name__) class ProjectorElementException(Exception): @@ -23,6 +21,46 @@ class ProjectorElementException(Exception): """ +class ProjectorAllDataProvider: + NON_EXISTENT_MARKER = object() + + def __init__(self) -> None: + self.cache: Any = defaultdict(dict) # fuu you mypy + self.fetched_collection: Dict[str, bool] = {} + + async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]: + cache_data = self.cache[collection].get(id) + if cache_data is None: + data: Any = await element_cache.get_element_data(collection, id) + if data is None: + data = ProjectorAllDataProvider.NON_EXISTENT_MARKER + self.cache[collection][id] = data + + cache_data = self.cache[collection][id] + if cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER: + return None + return cache_data + + async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: + if not self.fetched_collection.get(collection, False): + collection_data = await element_cache.get_collection_data(collection) + self.cache[collection] = collection_data + self.fetched_collection[collection] = True + return self.cache[collection] + + async def exists(self, collection: str, id: int) -> bool: + model = await self.get(collection, id) + return model is not None + + +ProjectorSlide = Callable[ + [ProjectorAllDataProvider, Dict[str, Any], int], Awaitable[Dict[str, Any]] +] + + +projector_slides: Dict[str, ProjectorSlide] = {} + + def register_projector_slide(name: str, slide: ProjectorSlide) -> None: """ Registers a projector slide. @@ -67,10 +105,11 @@ async def get_projector_data( if projector_ids is None: projector_ids = [] - all_data = await element_cache.get_all_data_dict() projector_data: Dict[int, List[Dict[str, Any]]] = {} + all_data_provider = ProjectorAllDataProvider() + projectors = await all_data_provider.get_collection("core/projector") - for projector_id, projector in all_data.get("core/projector", {}).items(): + for projector_id, projector in projectors.items(): if projector_ids and projector_id not in projector_ids: # only render the projector in question. continue @@ -83,7 +122,7 @@ async def get_projector_data( for element in projector["elements"]: projector_slide = projector_slides[element["name"]] try: - data = await projector_slide(all_data, element, projector_id) + data = await projector_slide(all_data_provider, element, projector_id) except ProjectorElementException as err: data = {"error": str(err)} projector_data[projector_id].append({"data": data, "element": element}) @@ -91,18 +130,23 @@ async def get_projector_data( return projector_data -async def get_config(all_data: AllData, key: str) -> Any: +async def get_config(all_data_provider: ProjectorAllDataProvider, key: str) -> Any: """ - Returns a config value from all_data. + Returns a config value from all_data_provider. + Triggers the cache early: It access `get_colelction` instead of `get`. It + allows for all successive queries for configs to be cached. """ from ..core.config import config config_id = (await config.async_get_key_to_id())[key] - return all_data[config.get_collection_string()][config_id]["value"] + configs = await all_data_provider.get_collection(config.get_collection_string()) + return configs[config_id]["value"] -def get_model(all_data: AllData, collection: str, id: Any) -> Dict[str, Any]: +async def get_model( + all_data_provider: ProjectorAllDataProvider, collection: str, id: Any +) -> Dict[str, Any]: """ Tries to get the model identified by the collection and id. If the id is invalid or the model not found, ProjectorElementExceptions will be raised. @@ -110,17 +154,19 @@ def get_model(all_data: AllData, collection: str, id: Any) -> Dict[str, Any]: if id is None: raise ProjectorElementException(f"id is required for {collection} slide") - try: - model = all_data[collection][id] - except KeyError: + model = await all_data_provider.get(collection, id) + if model is None: raise ProjectorElementException(f"{collection} with id {id} does not exist") return model -def get_models( - all_data: AllData, collection: str, ids: List[Any] +async def get_models( + all_data_provider: ProjectorAllDataProvider, collection: str, ids: List[Any] ) -> List[Dict[str, Any]]: """ Tries to fetch all given models. Models are required to be all of the collection `collection`. """ - return [get_model(all_data, collection, id) for id in ids] + logger.info( + f"Note: a call to `get_models` with {collection}/{ids}. This might be cache-intensive" + ) + return [await get_model(all_data_provider, collection, id) for id in ids] diff --git a/openslides/utils/redis.py b/openslides/utils/redis.py index acc370f54..d5ff24065 100644 --- a/openslides/utils/redis.py +++ b/openslides/utils/redis.py @@ -11,6 +11,7 @@ logger = logging.getLogger(__name__) use_redis = False use_read_only_redis = False read_only_redis_amount_replicas = None +read_only_redis_wait_timeout = None try: import aioredis @@ -35,6 +36,8 @@ else: read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1) logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}") + read_only_redis_wait_timeout = getattr(settings, "WAIT_TIMEOUT", 1000) + logger.info(f"WAIT_TIMEOUT={read_only_redis_wait_timeout}") else: logger.info("Redis is not configured.") diff --git a/openslides/utils/timing.py b/openslides/utils/timing.py new file mode 100644 index 000000000..ebcc2f35c --- /dev/null +++ b/openslides/utils/timing.py @@ -0,0 +1,27 @@ +import time +from typing import List, Optional + +from . import logging + + +timelogger = logging.getLogger(__name__) + + +class Timing: + def __init__(self, name: str) -> None: + self.name = name + self.times: List[float] = [time.time()] + + def __call__(self, done: Optional[bool] = False) -> None: + self.times.append(time.time()) + if done: + self.printtime() + + def printtime(self) -> None: + s = f"{self.name}: " + for i in range(1, len(self.times)): + diff = self.times[i] - self.times[i - 1] + s += f"{i}: {diff:.5f} " + diff = self.times[-1] - self.times[0] + s += f"sum: {diff:.5f}" + timelogger.info(s) diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index b001b2dc9..42e5b802f 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -25,6 +25,26 @@ WEBSOCKET_WRONG_FORMAT = 102 # If the recieved data has not the expected format. +class BaseWebsocketException(Exception): + code: int + + def __init__(self, message: str, in_response: Optional[str] = None) -> None: + self.message = message + self.in_response = in_response + + +class NotAuthorizedException(BaseWebsocketException): + code = WEBSOCKET_NOT_AUTHORIZED + + +class ChangeIdTooHighException(BaseWebsocketException): + code = WEBSOCKET_CHANGE_ID_TOO_HIGH + + +class WrongFormatException(BaseWebsocketException): + code = WEBSOCKET_WRONG_FORMAT + + class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): async def receive( self, @@ -122,6 +142,20 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): silence_errors=silence_errors, ) + async def send_exception( + self, e: BaseWebsocketException, silence_errors: Optional[bool] = True, + ) -> None: + """ + Send generic error messages with a custom status code (see above) and a text message. + """ + await self.send_json( + "error", + {"code": e.code, "message": e.message}, + None, + in_response=e.in_response, + silence_errors=silence_errors, + ) + async def receive_json(self, content: Any) -> None: # type: ignore """ Receives the json data, parses it and calls receive_content. @@ -140,9 +174,12 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): ) return - await websocket_client_messages[content["type"]].receive_content( - self, content["content"], id=content["id"] - ) + try: + await websocket_client_messages[content["type"]].receive_content( + self, content["content"], id=content["id"] + ) + except BaseWebsocketException as e: + await self.send_exception(e) schema: Dict[str, Any] = { diff --git a/openslides/core/websocket.py b/openslides/utils/websocket_client_messages.py similarity index 79% rename from openslides/core/websocket.py rename to openslides/utils/websocket_client_messages.py index a43447305..d66eb1f58 100644 --- a/openslides/core/websocket.py +++ b/openslides/utils/websocket_client_messages.py @@ -1,21 +1,22 @@ from typing import Any, Dict, Optional -from ..utils import logging -from ..utils.auth import async_has_perm -from ..utils.constants import get_constants -from ..utils.projector import get_projector_data -from ..utils.stats import WebsocketLatencyLogger -from ..utils.websocket import ( - WEBSOCKET_NOT_AUTHORIZED, +from . import logging +from .auth import async_has_perm +from .constants import get_constants +from .projector import get_projector_data +from .stats import WebsocketLatencyLogger +from .websocket import ( BaseWebsocketClientMessage, + NotAuthorizedException, ProtocollAsyncJsonWebsocketConsumer, + register_client_message, ) logger = logging.getLogger(__name__) -class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): +class Notify(BaseWebsocketClientMessage): """ Websocket message from a client to send a message to other clients. """ @@ -59,13 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): ) -> None: # Check if the user is allowed to send this notify message perm = self.notify_permissions.get(content["name"]) - if perm is not None and not await async_has_perm( - consumer.scope["user"]["id"], perm - ): - await consumer.send_error( - code=WEBSOCKET_NOT_AUTHORIZED, - message=f"You need '{perm}' to send this message.", - in_response=id, + if perm is not None and not await async_has_perm(consumer.user_id, perm): + raise NotAuthorizedException( + f"You need '{perm}' to send this message.", in_response=id, ) else: # Some logging @@ -84,15 +81,18 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): await consumer.channel_layer.group_send( "site", { - "type": "send_notify", + "type": "msg_notify", "incomming": content, "senderChannelName": consumer.channel_name, - "senderUserId": consumer.scope["user"]["id"], + "senderUserId": consumer.user_id, }, ) -class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): +register_client_message(Notify()) + + +class Constants(BaseWebsocketClientMessage): """ The Client requests the constants. """ @@ -109,7 +109,10 @@ class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): ) -class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): +register_client_message(Constants()) + + +class GetElements(BaseWebsocketClientMessage): """ The Client request database elements. """ @@ -130,26 +133,10 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str ) -> None: requested_change_id = content.get("change_id", 0) - await consumer.send_autoupdate(requested_change_id, in_response=id) + await consumer.request_autoupdate(requested_change_id, in_response=id) -class AutoupdateWebsocketClientMessage(BaseWebsocketClientMessage): - """ - The Client turns autoupdate on or off. - """ - - identifier = "autoupdate" - - async def receive_content( - self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str - ) -> None: - # Turn on or off the autoupdate for the client - if content: # accept any value, that can be interpreted as bool - await consumer.channel_layer.group_add("autoupdate", consumer.channel_name) - else: - await consumer.channel_layer.group_discard( - "autoupdate", consumer.channel_name - ) +register_client_message(GetElements()) class ListenToProjectors(BaseWebsocketClientMessage): @@ -198,6 +185,9 @@ class ListenToProjectors(BaseWebsocketClientMessage): await consumer.send_projector_data(projector_data, in_response=id) +register_client_message(ListenToProjectors()) + + class PingPong(BaseWebsocketClientMessage): """ Responds to pings from the client. @@ -220,3 +210,6 @@ class PingPong(BaseWebsocketClientMessage): await consumer.send_json(type="pong", content=latency, in_response=id) if latency is not None: await WebsocketLatencyLogger.add_latency(latency) + + +register_client_message(PingPong()) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index f83011d6e..2791e70be 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,9 +1,13 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, cast from openslides.core.config import config from openslides.core.models import Projector from openslides.users.models import User -from openslides.utils.projector import AllData, get_config, register_projector_slide +from openslides.utils.projector import ( + ProjectorAllDataProvider, + get_config, + register_projector_slide, +) class TConfig: @@ -94,19 +98,23 @@ class TProjector: async def slide1( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Slide that shows the general_event_name. """ return { "name": "slide1", - "event_name": await get_config(all_data, "general_event_name"), + "event_name": await get_config(all_data_provider, "general_event_name"), } async def slide2( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: return {"name": "slide2"} @@ -115,17 +123,26 @@ register_projector_slide("test/slide1", slide1) register_projector_slide("test/slide2", slide2) -def all_data_config() -> AllData: - return { - TConfig().get_collection_string(): { - element["id"]: element for element in TConfig().get_elements() - } - } +class TestProjectorAllDataProvider: + def __init__(self, data): + self.data = data + + async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]: + collection_data = await self.get_collection(collection) + return collection_data.get(id) + + async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: + return self.data.get(collection, {}) + + async def exists(self, collection: str, id: int) -> bool: + return (await self.get(collection, id)) is not None -def all_data_users() -> AllData: - return { - TUser().get_collection_string(): { - element["id"]: element for element in TUser().get_elements() - } +def get_all_data_provider(data) -> ProjectorAllDataProvider: + data[TConfig().get_collection_string()] = { + element["id"]: element for element in TConfig().get_elements() } + data[TUser().get_collection_string()] = { + element["id"]: element for element in TUser().get_elements() + } + return cast(ProjectorAllDataProvider, TestProjectorAllDataProvider(data)) diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 80bf93729..764bb143e 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -1,4 +1,3 @@ -import asyncio from importlib import import_module from typing import Optional, Tuple from unittest.mock import patch @@ -51,7 +50,7 @@ async def prepare_element_cache(settings): ] ) element_cache._cachables = None - await element_cache.async_ensure_cache(default_change_id=2) + await element_cache.async_ensure_cache(default_change_id=10) yield # Reset the cachable_provider element_cache.cachable_provider = orig_cachable_provider @@ -158,19 +157,9 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config): @pytest.mark.asyncio -async def test_changed_data_autoupdate_off(communicator, set_config): +async def test_changed_data_autoupdate(get_communicator, set_config): await set_config("general_system_enable_anonymous", True) - await communicator.connect() - - # Change a config value - await set_config("general_event_name", "Test Event") - assert await communicator.receive_nothing() - - -@pytest.mark.asyncio -async def test_changed_data_autoupdate_on(get_communicator, set_config): - await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") + communicator = get_communicator() await communicator.connect() # Change a config value @@ -212,7 +201,7 @@ async def create_user_session_cookie(user_id: int) -> Tuple[bytes, bytes]: @pytest.mark.asyncio async def test_with_user(get_communicator): cookie_header = await create_user_session_cookie(1) - communicator = get_communicator("autoupdate=on", headers=[cookie_header]) + communicator = get_communicator(headers=[cookie_header]) connected, __ = await communicator.connect() @@ -222,7 +211,7 @@ async def test_with_user(get_communicator): @pytest.mark.asyncio async def test_skipping_autoupdate(set_config, get_communicator): cookie_header = await create_user_session_cookie(1) - communicator = get_communicator("autoupdate=on", headers=[cookie_header]) + communicator = get_communicator(headers=[cookie_header]) await communicator.connect() @@ -265,7 +254,7 @@ async def test_skipping_autoupdate(set_config, get_communicator): @pytest.mark.asyncio async def test_receive_deleted_data(get_communicator, set_config): await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") + communicator = get_communicator() await communicator.connect() # Delete test element @@ -395,6 +384,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config): @pytest.mark.asyncio async def test_send_get_elements_too_small_change_id(communicator, set_config): + # Note: this test depends on the default_change_id set in prepare_element_cache await set_config("general_system_enable_anonymous", True) await communicator.connect() @@ -422,12 +412,12 @@ async def test_send_connect_up_to_date(communicator, set_config): {"type": "getElements", "content": {"change_id": 0}, "id": "test_id"} ) response1 = await communicator.receive_json_from() - first_change_id = response1.get("content")["to_change_id"] + max_change_id = response1.get("content")["to_change_id"] await communicator.send_json_to( { "type": "getElements", - "content": {"change_id": first_change_id + 1}, + "content": {"change_id": max_change_id}, "id": "test_id", } ) @@ -510,43 +500,6 @@ async def test_send_invalid_get_elements(communicator, set_config): assert response.get("in_response") == "test_id" -@pytest.mark.asyncio -async def test_turn_on_autoupdate(communicator, set_config): - await set_config("general_system_enable_anonymous", True) - await communicator.connect() - - await communicator.send_json_to( - {"type": "autoupdate", "content": "on", "id": "test_id"} - ) - await asyncio.sleep(0.01) - # Change a config value - await set_config("general_event_name", "Test Event") - response = await communicator.receive_json_from() - - id = config.get_key_to_id()["general_event_name"] - type = response.get("type") - content = response.get("content") - assert type == "autoupdate" - assert content["changed"] == { - "core/config": [{"id": id, "key": "general_event_name", "value": "Test Event"}] - } - - -@pytest.mark.asyncio -async def test_turn_off_autoupdate(get_communicator, set_config): - await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") - await communicator.connect() - - await communicator.send_json_to( - {"type": "autoupdate", "content": False, "id": "test_id"} - ) - await asyncio.sleep(0.01) - # Change a config value - await set_config("general_event_name", "Test Event") - assert await communicator.receive_nothing() - - @pytest.mark.asyncio async def test_listen_to_projector(communicator, set_config): await set_config("general_system_enable_anonymous", True) @@ -565,7 +518,7 @@ async def test_listen_to_projector(communicator, set_config): content = response.get("content") assert type == "projector" assert content == { - "change_id": 3, + "change_id": 11, "data": { "1": [ { @@ -588,17 +541,22 @@ async def test_update_projector(communicator, set_config): "id": "test_id", } ) - await communicator.receive_json_from() + await communicator.receive_json_from() # recieve initial projector data # Change a config value await set_config("general_event_name", "Test Event") + + # We need two messages: The autoupdate and the projector data in this order + response = await communicator.receive_json_from() + assert response.get("type") == "autoupdate" + response = await communicator.receive_json_from() type = response.get("type") content = response.get("content") assert type == "projector" assert content == { - "change_id": 4, + "change_id": 12, "data": { "1": [ { @@ -629,4 +587,8 @@ async def test_update_projector_to_current_value(communicator, set_config): # Change a config value to current_value await set_config("general_event_name", "OpenSlides") + # We await an autoupdate, bot no projector data + response = await communicator.receive_json_from() + assert response.get("type") == "autoupdate" + assert await communicator.receive_nothing() diff --git a/tests/unit/agenda/test_projector.py b/tests/unit/agenda/test_projector.py index 0f86a54ca..442be4079 100644 --- a/tests/unit/agenda/test_projector.py +++ b/tests/unit/agenda/test_projector.py @@ -4,10 +4,12 @@ import pytest from openslides.agenda import projector +from ...integration.helpers import get_all_data_provider + @pytest.fixture -def all_data(): - all_data = { +def all_data_provider(): + data = { "agenda/item": { 1: { "id": 1, @@ -82,14 +84,14 @@ def all_data(): } } - return all_data + return get_all_data_provider(data) @pytest.mark.asyncio -async def test_main_items(all_data): +async def test_main_items(all_data_provider): element: Dict[str, Any] = {} - data = await projector.item_list_slide(all_data, element, 1) + data = await projector.item_list_slide(all_data_provider, element, 1) assert data == { "items": [ @@ -106,10 +108,10 @@ async def test_main_items(all_data): @pytest.mark.asyncio -async def test_all_items(all_data): +async def test_all_items(all_data_provider): element: Dict[str, Any] = {"only_main_items": False} - data = await projector.item_list_slide(all_data, element, 1) + data = await projector.item_list_slide(all_data_provider, element, 1) assert data == { "items": [ diff --git a/tests/unit/motions/test_projector.py b/tests/unit/motions/test_projector.py index c829070d0..257d60150 100644 --- a/tests/unit/motions/test_projector.py +++ b/tests/unit/motions/test_projector.py @@ -4,14 +4,13 @@ import pytest from openslides.motions import projector -from ...integration.helpers import all_data_config, all_data_users +from ...integration.helpers import get_all_data_provider @pytest.fixture -def all_data(): - return_value = all_data_config() - return_value.update(all_data_users()) - return_value["motions/motion"] = { +def all_data_provider(): + data = {} + data["motions/motion"] = { 1: { "id": 1, "identifier": "4", @@ -143,7 +142,7 @@ def all_data(): "change_recommendations": [], }, } - return_value["motions/workflow"] = { + data["motions/workflow"] = { 1: { "id": 1, "name": "Simple Workflow", @@ -151,7 +150,7 @@ def all_data(): "first_state_id": 1, } } - return_value["motions/state"] = { + data["motions/state"] = { 1: { "id": 1, "name": "submitted", @@ -217,7 +216,7 @@ def all_data(): "workflow_id": 1, }, } - return_value["motions/statute-paragraph"] = { + data["motions/statute-paragraph"] = { 1: { "id": 1, "title": "§1 Preamble", @@ -225,7 +224,7 @@ def all_data(): "weight": 10000, } } - return_value["motions/motion-change-recommendation"] = { + data["motions/motion-change-recommendation"] = { 1: { "id": 1, "motion_id": 1, @@ -251,14 +250,14 @@ def all_data(): "creation_time": "2019-02-09T09:54:06.256378+01:00", }, } - return return_value + return get_all_data_provider(data) @pytest.mark.asyncio -async def test_motion_slide(all_data): +async def test_motion_slide(all_data_provider): element: Dict[str, Any] = {"id": 1} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": "4", @@ -304,10 +303,10 @@ async def test_motion_slide(all_data): @pytest.mark.asyncio -async def test_amendment_slide(all_data): +async def test_amendment_slide(all_data_provider): element: Dict[str, Any] = {"id": 2} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": "Ä1", @@ -331,10 +330,10 @@ async def test_amendment_slide(all_data): @pytest.mark.asyncio -async def test_statute_amendment_slide(all_data): +async def test_statute_amendment_slide(all_data_provider): element: Dict[str, Any] = {"id": 3} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": None,