Merge pull request #5375 from FinnStutzenstein/autoupdatePerformance

Autoupdate performance
This commit is contained in:
Emanuel Schütze 2020-05-29 17:31:32 +02:00 committed by GitHub
commit 7665634d42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 998 additions and 673 deletions

View File

@ -143,3 +143,7 @@ not affect the client.
operator is in one of these groups, the client disconnected and reconnects again. operator is in one of these groups, the client disconnected and reconnects again.
All requests urls (including websockets) are now prefixed with `/prioritize`, so All requests urls (including websockets) are now prefixed with `/prioritize`, so
these requests from "prioritized clients" can be routed to different servers. these requests from "prioritized clients" can be routed to different servers.
`AUTOUPDATE_DELAY`: The delay to send autoupdates. This feature can be
deactivated by setting it to `None`. It is deactivated per default. The Delay is
given in seconds

View File

@ -3,9 +3,10 @@ import { Injectable } from '@angular/core';
import { BaseModel } from '../../shared/models/base/base-model'; import { BaseModel } from '../../shared/models/base/base-model';
import { CollectionStringMapperService } from './collection-string-mapper.service'; import { CollectionStringMapperService } from './collection-string-mapper.service';
import { DataStoreService, DataStoreUpdateManagerService } from './data-store.service'; import { DataStoreService, DataStoreUpdateManagerService } from './data-store.service';
import { Mutex } from '../promises/mutex';
import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service';
interface AutoupdateFormat { export interface AutoupdateFormat {
/** /**
* All changed (and created) items as their full/restricted data grouped by their collection. * All changed (and created) items as their full/restricted data grouped by their collection.
*/ */
@ -36,6 +37,19 @@ interface AutoupdateFormat {
all_data: boolean; all_data: boolean;
} }
export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat {
const format = obj as AutoupdateFormat;
return (
obj &&
typeof obj === 'object' &&
format.changed !== undefined &&
format.deleted !== undefined &&
format.from_change_id !== undefined &&
format.to_change_id !== undefined &&
format.all_data !== undefined
);
}
/** /**
* Handles the initial update and automatic updates using the {@link WebsocketService} * Handles the initial update and automatic updates using the {@link WebsocketService}
* Incoming objects, usually BaseModels, will be saved in the dataStore (`this.DS`) * Incoming objects, usually BaseModels, will be saved in the dataStore (`this.DS`)
@ -45,6 +59,8 @@ interface AutoupdateFormat {
providedIn: 'root' providedIn: 'root'
}) })
export class AutoupdateService { export class AutoupdateService {
private mutex = new Mutex();
/** /**
* Constructor to create the AutoupdateService. Calls the constructor of the parent class. * Constructor to create the AutoupdateService. Calls the constructor of the parent class.
* @param websocketService * @param websocketService
@ -79,15 +95,17 @@ export class AutoupdateService {
* Handles the change ids of all autoupdates. * Handles the change ids of all autoupdates.
*/ */
private async storeResponse(autoupdate: AutoupdateFormat): Promise<void> { private async storeResponse(autoupdate: AutoupdateFormat): Promise<void> {
const unlock = await this.mutex.lock();
if (autoupdate.all_data) { if (autoupdate.all_data) {
await this.storeAllData(autoupdate); await this.storeAllData(autoupdate);
} else { } else {
await this.storePartialAutoupdate(autoupdate); await this.storePartialAutoupdate(autoupdate);
} }
unlock();
} }
/** /**
* Stores all data from the autoupdate. This means, that the DS is resettet and filled with just the * Stores all data from the autoupdate. This means, that the DS is resetted and filled with just the
* given data from the autoupdate. * given data from the autoupdate.
* @param autoupdate The autoupdate * @param autoupdate The autoupdate
*/ */
@ -116,27 +134,41 @@ export class AutoupdateService {
// Normal autoupdate // Normal autoupdate
if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) { if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) {
const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); await this.injectAutupdateIntoDS(autoupdate, true);
// Delete the removed objects from the DataStore
for (const collection of Object.keys(autoupdate.deleted)) {
await this.DS.remove(collection, autoupdate.deleted[collection]);
}
// Add the objects to the DataStore.
for (const collection of Object.keys(autoupdate.changed)) {
await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection]));
}
await this.DS.flushToStorage(autoupdate.to_change_id);
this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id);
} else { } else {
// autoupdate fully in the future. we are missing something! // autoupdate fully in the future. we are missing something!
console.log('Autoupdate in the future', maxChangeId, autoupdate.from_change_id, autoupdate.to_change_id);
this.requestChanges(); this.requestChanges();
} }
} }
public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise<void> {
const unlock = await this.mutex.lock();
console.debug('inject autoupdate', autoupdate);
await this.injectAutupdateIntoDS(autoupdate, false);
unlock();
}
private async injectAutupdateIntoDS(autoupdate: AutoupdateFormat, flush: boolean): Promise<void> {
const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS);
// Delete the removed objects from the DataStore
for (const collection of Object.keys(autoupdate.deleted)) {
await this.DS.remove(collection, autoupdate.deleted[collection]);
}
// Add the objects to the DataStore.
for (const collection of Object.keys(autoupdate.changed)) {
await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection]));
}
if (flush) {
await this.DS.flushToStorage(autoupdate.to_change_id);
}
this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id);
}
/** /**
* Creates baseModels for each plain object. If the collection is not registered, * Creates baseModels for each plain object. If the collection is not registered,
* A console error will be issued and an empty list returned. * A console error will be issued and an empty list returned.
@ -160,9 +192,8 @@ export class AutoupdateService {
* The server should return an autoupdate with all new data. * The server should return an autoupdate with all new data.
*/ */
public requestChanges(): void { public requestChanges(): void {
const changeId = this.DS.maxChangeId === 0 ? 0 : this.DS.maxChangeId + 1; console.log(`requesting changed objects with DS max change id ${this.DS.maxChangeId}`);
console.log(`requesting changed objects with DS max change id ${changeId}`); this.websocketService.send('getElements', { change_id: this.DS.maxChangeId });
this.websocketService.send('getElements', { change_id: changeId });
} }
/** /**

View File

@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService {
private serveNextSlot(): void { private serveNextSlot(): void {
if (this.updateSlotRequests.length > 0) { if (this.updateSlotRequests.length > 0) {
console.log('Concurrent update slots');
const request = this.updateSlotRequests.pop(); const request = this.updateSlotRequests.pop();
request.resolve(); request.resolve();
} }
@ -665,4 +666,11 @@ export class DataStoreService {
await this.storageService.set(DataStoreService.cachePrefix + 'DS', this.jsonStore); await this.storageService.set(DataStoreService.cachePrefix + 'DS', this.jsonStore);
await this.storageService.set(DataStoreService.cachePrefix + 'maxChangeId', changeId); await this.storageService.set(DataStoreService.cachePrefix + 'maxChangeId', changeId);
} }
public print(): void {
console.log('Max change id', this.maxChangeId);
console.log('json storage');
console.log(JSON.stringify(this.jsonStore));
console.log(this.modelStore);
}
} }

View File

@ -3,6 +3,7 @@ import { Injectable } from '@angular/core';
import { TranslateService } from '@ngx-translate/core'; import { TranslateService } from '@ngx-translate/core';
import { AutoupdateFormat, AutoupdateService, isAutoupdateFormat } from './autoupdate.service';
import { OpenSlidesStatusService } from './openslides-status.service'; import { OpenSlidesStatusService } from './openslides-status.service';
import { formatQueryParams, QueryParams } from '../definitions/query-params'; import { formatQueryParams, QueryParams } from '../definitions/query-params';
@ -17,12 +18,12 @@ export enum HTTPMethod {
DELETE = 'delete' DELETE = 'delete'
} }
export interface DetailResponse { export interface ErrorDetailResponse {
detail: string | string[]; detail: string | string[];
args?: string[]; args?: string[];
} }
function isDetailResponse(obj: any): obj is DetailResponse { function isErrorDetailResponse(obj: any): obj is ErrorDetailResponse {
return ( return (
obj && obj &&
typeof obj === 'object' && typeof obj === 'object' &&
@ -31,6 +32,15 @@ function isDetailResponse(obj: any): obj is DetailResponse {
); );
} }
interface AutoupdateResponse {
autoupdate: AutoupdateFormat;
data?: any;
}
function isAutoupdateReponse(obj: any): obj is AutoupdateResponse {
return obj && typeof obj === 'object' && isAutoupdateFormat((obj as AutoupdateResponse).autoupdate);
}
/** /**
* Service for managing HTTP requests. Allows to send data for every method. Also (TODO) will do generic error handling. * Service for managing HTTP requests. Allows to send data for every method. Also (TODO) will do generic error handling.
*/ */
@ -55,7 +65,8 @@ export class HttpService {
public constructor( public constructor(
private http: HttpClient, private http: HttpClient,
private translate: TranslateService, private translate: TranslateService,
private OSStatus: OpenSlidesStatusService private OSStatus: OpenSlidesStatusService,
private autoupdateService: AutoupdateService
) { ) {
this.defaultHeaders = new HttpHeaders().set('Content-Type', 'application/json'); this.defaultHeaders = new HttpHeaders().set('Content-Type', 'application/json');
} }
@ -82,7 +93,7 @@ export class HttpService {
): Promise<T> { ): Promise<T> {
// end early, if we are in history mode // end early, if we are in history mode
if (this.OSStatus.isInHistoryMode && method !== HTTPMethod.GET) { if (this.OSStatus.isInHistoryMode && method !== HTTPMethod.GET) {
throw this.handleError('You cannot make changes while in history mode'); throw this.processError('You cannot make changes while in history mode');
} }
// there is a current bug with the responseType. // there is a current bug with the responseType.
@ -108,9 +119,10 @@ export class HttpService {
}; };
try { try {
return await this.http.request<T>(method, url, options).toPromise(); const responseData: T = await this.http.request<T>(method, url, options).toPromise();
} catch (e) { return this.processResponse(responseData);
throw this.handleError(e); } catch (error) {
throw this.processError(error);
} }
} }
@ -120,7 +132,7 @@ export class HttpService {
* @param e The error thrown. * @param e The error thrown.
* @returns The prepared and translated message for the user * @returns The prepared and translated message for the user
*/ */
private handleError(e: any): string { private processError(e: any): string {
let error = this.translate.instant('Error') + ': '; let error = this.translate.instant('Error') + ': ';
// If the error is a string already, return it. // If the error is a string already, return it.
if (typeof e === 'string') { if (typeof e === 'string') {
@ -142,12 +154,14 @@ export class HttpService {
} else if (!e.error) { } else if (!e.error) {
error += this.translate.instant("The server didn't respond."); error += this.translate.instant("The server didn't respond.");
} else if (typeof e.error === 'object') { } else if (typeof e.error === 'object') {
if (isDetailResponse(e.error)) { if (isErrorDetailResponse(e.error)) {
error += this.processDetailResponse(e.error); error += this.processErrorDetailResponse(e.error);
} else { } else {
const errorList = Object.keys(e.error).map(key => { const errorList = Object.keys(e.error).map(key => {
const capitalizedKey = key.charAt(0).toUpperCase() + key.slice(1); const capitalizedKey = key.charAt(0).toUpperCase() + key.slice(1);
return `${this.translate.instant(capitalizedKey)}: ${this.processDetailResponse(e.error[key])}`; return `${this.translate.instant(capitalizedKey)}: ${this.processErrorDetailResponse(
e.error[key]
)}`;
}); });
error = errorList.join(', '); error = errorList.join(', ');
} }
@ -168,11 +182,9 @@ export class HttpService {
* @param str a string or a string array to join together. * @param str a string or a string array to join together.
* @returns Error text(s) as single string * @returns Error text(s) as single string
*/ */
private processDetailResponse(response: DetailResponse): string { private processErrorDetailResponse(response: ErrorDetailResponse): string {
let message: string; let message: string;
if (response instanceof Array) { if (response.detail instanceof Array) {
message = response.join(' ');
} else if (response.detail instanceof Array) {
message = response.detail.join(' '); message = response.detail.join(' ');
} else { } else {
message = response.detail; message = response.detail;
@ -187,6 +199,14 @@ export class HttpService {
return message; return message;
} }
private processResponse<T>(responseData: T): T {
if (isAutoupdateReponse(responseData)) {
this.autoupdateService.injectAutoupdateIgnoreChangeId(responseData.autoupdate);
responseData = responseData.data;
}
return responseData;
}
/** /**
* Executes a get on a path with a certain object * Executes a get on a path with a certain object
* @param path The path to send the request to. * @param path The path to send the request to.

View File

@ -130,10 +130,7 @@ export class OpenSlidesService {
* Init DS from cache and after this start the websocket service. * Init DS from cache and after this start the websocket service.
*/ */
private async setupDataStoreAndWebSocket(): Promise<void> { private async setupDataStoreAndWebSocket(): Promise<void> {
let changeId = await this.DS.initFromStorage(); const changeId = await this.DS.initFromStorage();
if (changeId > 0) {
changeId += 1;
}
// disconnect the WS connection, if there was one. This is needed // disconnect the WS connection, if there was one. This is needed
// to update the connection parameters, namely the cookies. If the user // to update the connection parameters, namely the cookies. If the user
// is changed, the WS needs to reconnect, so the new connection holds the new // is changed, the WS needs to reconnect, so the new connection holds the new
@ -141,7 +138,7 @@ export class OpenSlidesService {
if (this.websocketService.isConnected) { if (this.websocketService.isConnected) {
await this.websocketService.close(); // Wait for the disconnect. await this.websocketService.close(); // Wait for the disconnect.
} }
await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. await this.websocketService.connect(changeId); // Request changes after changeId.
} }
/** /**

View File

@ -40,7 +40,7 @@ export class PrioritizeService {
if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) { if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) {
console.log('Alter prioritization:', opPrioritized); console.log('Alter prioritization:', opPrioritized);
this.openSlidesStatusService.isPrioritizedClient = opPrioritized; this.openSlidesStatusService.isPrioritizedClient = opPrioritized;
this.websocketService.reconnect({ changeId: this.DS.maxChangeId }); this.websocketService.reconnect(this.DS.maxChangeId);
} }
} }
} }

View File

@ -55,14 +55,6 @@ export const WEBSOCKET_ERROR_CODES = {
WRONG_FORMAT: 102 WRONG_FORMAT: 102
}; };
/*
* Options for (re-)connecting.
*/
interface ConnectOptions {
changeId?: number;
enableAutoupdates?: boolean;
}
/** /**
* Service that handles WebSocket connections. Other services can register themselfs * Service that handles WebSocket connections. Other services can register themselfs
* with {@method getOberservable} for a specific type of messages. The content will be published. * with {@method getOberservable} for a specific type of messages. The content will be published.
@ -207,7 +199,7 @@ export class WebsocketService {
* *
* Uses NgZone to let all callbacks run in the angular context. * Uses NgZone to let all callbacks run in the angular context.
*/ */
public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise<void> { public async connect(changeId: number | null = null, retry: boolean = false): Promise<void> {
const websocketId = Math.random().toString(36).substring(7); const websocketId = Math.random().toString(36).substring(7);
this.websocketId = websocketId; this.websocketId = websocketId;
@ -220,17 +212,10 @@ export class WebsocketService {
this.shouldBeClosed = false; this.shouldBeClosed = false;
} }
// set defaults const queryParams: QueryParams = {};
options = Object.assign(options, {
enableAutoupdates: true
});
const queryParams: QueryParams = { if (changeId !== null) {
autoupdate: options.enableAutoupdates queryParams.change_id = changeId;
};
if (options.changeId !== undefined) {
queryParams.change_id = options.changeId;
} }
// Create the websocket // Create the websocket
@ -398,7 +383,7 @@ export class WebsocketService {
const timeout = Math.floor(Math.random() * 3000 + 2000); const timeout = Math.floor(Math.random() * 3000 + 2000);
this.retryTimeout = setTimeout(() => { this.retryTimeout = setTimeout(() => {
this.retryTimeout = null; this.retryTimeout = null;
this.connect({ enableAutoupdates: true }, true); this.connect(null, true);
}, timeout); }, timeout);
} }
} }
@ -438,9 +423,9 @@ export class WebsocketService {
* *
* @param options The options for the new connection * @param options The options for the new connection
*/ */
public async reconnect(options: ConnectOptions = {}): Promise<void> { public async reconnect(changeId: number | null = null): Promise<void> {
await this.close(); await this.close();
await this.connect(options); await this.connect(changeId);
} }
/** /**

View File

@ -0,0 +1,30 @@
/**
* A mutex as described in every textbook
*
* Usage:
* ```
* mutex = new Mutex(); // create e.g. as class member
*
* // Somewhere in the code to lock (must be async code!)
* const unlock = await this.mutex.lock()
* // ...the code to synchronize
* unlock()
* ```
*/
export class Mutex {
private mutex = Promise.resolve();
public lock(): PromiseLike<() => void> {
// this will capture the code-to-synchronize
let begin: (unlock: () => void) => void = () => {};
// All "requests" to execute code are chained in a promise-chain
this.mutex = this.mutex.then(() => {
return new Promise(begin);
});
return new Promise(res => {
begin = res;
});
}
}

View File

@ -32,6 +32,26 @@
<span>{{ 'Check for updates' | translate }}</span> <span>{{ 'Check for updates' | translate }}</span>
</button> </button>
</div> </div>
<div>
<button type="button" mat-button (click)="showDevTools=!showDevTools">
<span>{{ 'Show devtools' | translate }}</span>
</button>
</div>
<mat-card class="os-card" *ngIf="showDevTools">
<div>
<button type="button" mat-button (click)="printDS()">
<span>Print DS</span>
</button>
</div>
<div>
<button type="button" mat-button (click)="getThisComponent()">
<span>Get this component</span>
</button>
</div>
</mat-card>
</mat-card> </mat-card>
<mat-card class="os-card" *osPerms="'users.can_manage'"> <mat-card class="os-card" *osPerms="'users.can_manage'">

View File

@ -4,6 +4,7 @@ import { Title } from '@angular/platform-browser';
import { TranslateService } from '@ngx-translate/core'; import { TranslateService } from '@ngx-translate/core';
import { DataStoreService } from 'app/core/core-services/data-store.service';
import { OpenSlidesService } from 'app/core/core-services/openslides.service'; import { OpenSlidesService } from 'app/core/core-services/openslides.service';
import { OperatorService, Permission } from 'app/core/core-services/operator.service'; import { OperatorService, Permission } from 'app/core/core-services/operator.service';
import { ConfigRepositoryService } from 'app/core/repositories/config/config-repository.service'; import { ConfigRepositoryService } from 'app/core/repositories/config/config-repository.service';
@ -25,6 +26,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit {
*/ */
public legalNotice = ''; public legalNotice = '';
public showDevTools = false;
/** /**
* Constructor. * Constructor.
*/ */
@ -35,7 +38,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit {
private openSlidesService: OpenSlidesService, private openSlidesService: OpenSlidesService,
private update: UpdateService, private update: UpdateService,
private configRepo: ConfigRepositoryService, private configRepo: ConfigRepositoryService,
private operator: OperatorService private operator: OperatorService,
private DS: DataStoreService
) { ) {
super(title, translate, matSnackbar); super(title, translate, matSnackbar);
} }
@ -67,4 +71,12 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit {
public canManage(): boolean { public canManage(): boolean {
return this.operator.hasPerms(Permission.coreCanManageConfig); return this.operator.hasPerms(Permission.coreCanManageConfig);
} }
public printDS(): void {
this.DS.print();
}
public getThisComponent(): void {
console.log(this);
}
} }

View File

@ -554,7 +554,7 @@ export class MotionDetailComponent extends BaseViewComponent implements OnInit,
this.motionFilterService.initFilters(this.motionObserver); this.motionFilterService.initFilters(this.motionObserver);
this.motionSortService.initSorting(this.motionFilterService.outputObservable); this.motionSortService.initSorting(this.motionFilterService.outputObservable);
this.sortedMotionsObservable = this.motionSortService.outputObservable; this.sortedMotionsObservable = this.motionSortService.outputObservable;
} else if (this.motion.parent_id) { } else if (this.motion && this.motion.parent_id) {
// only use the amendments for this motion // only use the amendments for this motion
this.amendmentFilterService.initFilters(this.repo.amendmentsTo(this.motion.parent_id)); this.amendmentFilterService.initFilters(this.repo.amendmentsTo(this.motion.parent_id));
this.amendmentSortService.initSorting(this.amendmentFilterService.outputObservable); this.amendmentSortService.initSorting(this.amendmentFilterService.outputObservable);

View File

@ -1,6 +1,6 @@
<div *ngIf="poll" class="poll-progress-wrapper"> <div *ngIf="poll" class="poll-progress-wrapper">
<div class="vote-number"> <div class="vote-number">
<span>{{ poll.votescast }} / {{ max }}</span> <span>{{ votescast }} / {{ max }}</span>
</div> </div>
<span>{{ 'Received votes' | translate }}</span> <span>{{ 'Received votes' | translate }}</span>

View File

@ -1,58 +1,98 @@
import { Component, Input, OnInit } from '@angular/core'; import { Component, Input } from '@angular/core';
import { MatSnackBar } from '@angular/material/snack-bar'; import { MatSnackBar } from '@angular/material/snack-bar';
import { Title } from '@angular/platform-browser'; import { Title } from '@angular/platform-browser';
import { TranslateService } from '@ngx-translate/core'; import { TranslateService } from '@ngx-translate/core';
import { map } from 'rxjs/operators'; import { Subscription } from 'rxjs';
import { MotionPollRepositoryService } from 'app/core/repositories/motions/motion-poll-repository.service';
import { UserRepositoryService } from 'app/core/repositories/users/user-repository.service'; import { UserRepositoryService } from 'app/core/repositories/users/user-repository.service';
import { BaseViewComponent } from 'app/site/base/base-view'; import { BaseViewComponent } from 'app/site/base/base-view';
import { ViewBasePoll } from 'app/site/polls/models/view-base-poll'; import { ViewBasePoll } from 'app/site/polls/models/view-base-poll';
import { ViewUser } from 'app/site/users/models/view-user';
@Component({ @Component({
selector: 'os-poll-progress', selector: 'os-poll-progress',
templateUrl: './poll-progress.component.html', templateUrl: './poll-progress.component.html',
styleUrls: ['./poll-progress.component.scss'] styleUrls: ['./poll-progress.component.scss']
}) })
export class PollProgressComponent extends BaseViewComponent implements OnInit { export class PollProgressComponent extends BaseViewComponent {
@Input() private pollId: number = null;
public poll: ViewBasePoll; private pollSubscription: Subscription = null;
@Input()
public set poll(value: ViewBasePoll) {
if (value.id !== this.pollId) {
this.pollId = value.id;
if (this.pollSubscription !== null) {
this.pollSubscription.unsubscribe();
this.pollSubscription = null;
}
this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => {
if (poll) {
this._poll = poll;
// We may cannot use this.poll.votescast during the voting, since it can
// be reported with false values from the server
// -> calculate the votes on our own.
const ids = new Set();
for (const option of this.poll.options) {
for (const vote of option.votes) {
if (vote.user_id) {
ids.add(vote.user_id);
}
}
}
this.votescast = ids.size;
// But sometimes there are not enough votes (poll.votescast is higher).
// If this happens, take the value from the poll
if (this.poll.votescast > this.votescast) {
this.votescast = this.poll.votescast;
}
this.calculateMaxUsers();
}
});
}
}
public get poll(): ViewBasePoll {
return this._poll;
}
private _poll: ViewBasePoll;
public votescast: number;
public max: number; public max: number;
public valueInPercent: number;
public constructor( public constructor(
title: Title, title: Title,
protected translate: TranslateService, protected translate: TranslateService,
snackbar: MatSnackBar, snackbar: MatSnackBar,
private userRepo: UserRepositoryService private userRepo: UserRepositoryService,
private pollRepo: MotionPollRepositoryService
) { ) {
super(title, translate, snackbar); super(title, translate, snackbar);
this.userRepo.getViewModelListObservable().subscribe(users => {
if (users) {
this.calculateMaxUsers(users);
}
});
} }
public get valueInPercent(): number { private calculateMaxUsers(allUsers?: ViewUser[]): void {
if (this.poll) { if (!this.poll) {
return (this.poll.votesvalid / this.max) * 100; return;
} else { }
return 0; if (!allUsers) {
allUsers = this.userRepo.getViewModelList();
} }
}
/** allUsers = allUsers.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length);
* OnInit.
* Sets the observable for groups. this.max = allUsers.length;
*/ this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0;
public ngOnInit(): void {
if (this.poll) {
this.userRepo
.getViewModelListObservable()
.pipe(
map(users =>
users.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length)
)
)
.subscribe(users => {
this.max = users.length;
});
}
} }
} }

View File

@ -3,9 +3,10 @@ from typing import Any, Dict, List, Union
from ..users.projector import get_user_name from ..users.projector import get_user_name
from ..utils.projector import ( from ..utils.projector import (
AllData, ProjectorAllDataProvider,
ProjectorElementException, ProjectorElementException,
get_config, get_config,
get_model,
register_projector_slide, register_projector_slide,
) )
@ -15,20 +16,24 @@ from ..utils.projector import (
# side effects. # side effects.
async def get_sorted_agenda_items(all_data: AllData) -> List[Dict[str, Any]]: async def get_sorted_agenda_items(
agenda_items: Dict[int, Dict[str, Any]]
) -> List[Dict[str, Any]]:
""" """
Returns all sorted agenda items by id first and then weight, resulting in Returns all sorted agenda items by id first and then weight, resulting in
ordered items, if some have the same weight. ordered items, if some have the same weight.
""" """
return sorted( return sorted(
sorted(all_data["agenda/item"].values(), key=lambda item: item["id"]), sorted(agenda_items.values(), key=lambda item: item["id"]),
key=lambda item: item["weight"], key=lambda item: item["weight"],
) )
async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, Any]]: async def get_flat_tree(
agenda_items: Dict[int, Dict[str, Any]], parent_id: int = 0
) -> List[Dict[str, Any]]:
""" """
Build the item tree from all_data. Build the item tree from all_data_provider.
Only build the tree from elements unterneath parent_id. Only build the tree from elements unterneath parent_id.
@ -38,16 +43,16 @@ async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str,
# Build a dict from an item_id to all its children # Build a dict from an item_id to all its children
children: Dict[int, List[int]] = defaultdict(list) children: Dict[int, List[int]] = defaultdict(list)
if "agenda/item" in all_data:
for item in await get_sorted_agenda_items(all_data): for item in await get_sorted_agenda_items(agenda_items):
if item["type"] == 1: # only normal items if item["type"] == 1: # only normal items
children[item["parent_id"] or 0].append(item["id"]) children[item["parent_id"] or 0].append(item["id"])
tree = [] tree = []
async def get_children(item_ids: List[int], depth: int) -> None: def build_tree(item_ids: List[int], depth: int) -> None:
for item_id in item_ids: for item_id in item_ids:
item = all_data["agenda/item"][item_id] item = agenda_items[item_id]
title_information = item["title_information"] title_information = item["title_information"]
title_information["_agenda_item_number"] = item["item_number"] title_information["_agenda_item_number"] = item["item_number"]
tree.append( tree.append(
@ -57,25 +62,29 @@ async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str,
"depth": depth, "depth": depth,
} }
) )
await get_children(children[item_id], depth + 1) build_tree(children[item_id], depth + 1)
await get_children(children[parent_id], 0) build_tree(children[parent_id], 0)
return tree return tree
async def item_list_slide( async def item_list_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Item list slide. Item list slide.
Returns all root items or all children of an item. Returns all root items or all children of an item.
""" """
only_main_items = element.get("only_main_items", True) # fetch all items, so they are cached:
all_agenda_items = await all_data_provider.get_collection("agenda/item")
only_main_items = element.get("only_main_items", True)
if only_main_items: if only_main_items:
agenda_items = [] agenda_items = []
for item in await get_sorted_agenda_items(all_data): for item in await get_sorted_agenda_items(all_agenda_items):
if item["parent_id"] is None and item["type"] == 1: if item["parent_id"] is None and item["type"] == 1:
title_information = item["title_information"] title_information = item["title_information"]
title_information["_agenda_item_number"] = item["item_number"] title_information["_agenda_item_number"] = item["item_number"]
@ -86,13 +95,15 @@ async def item_list_slide(
} }
) )
else: else:
agenda_items = await get_flat_tree(all_data) agenda_items = await get_flat_tree(all_agenda_items)
return {"items": agenda_items} return {"items": agenda_items}
async def list_of_speakers_slide( async def list_of_speakers_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
List of speakers slide. List of speakers slide.
@ -104,35 +115,35 @@ async def list_of_speakers_slide(
if list_of_speakers_id is None: if list_of_speakers_id is None:
raise ProjectorElementException("id is required for list of speakers slide") raise ProjectorElementException("id is required for list of speakers slide")
return await get_list_of_speakers_slide_data(all_data, list_of_speakers_id) return await get_list_of_speakers_slide_data(all_data_provider, list_of_speakers_id)
async def get_list_of_speakers_slide_data( async def get_list_of_speakers_slide_data(
all_data: AllData, list_of_speakers_id: int all_data_provider: ProjectorAllDataProvider, list_of_speakers_id: int
) -> Dict[str, Any]: ) -> Dict[str, Any]:
try: list_of_speakers = await get_model(
list_of_speakers = all_data["agenda/list-of-speakers"][list_of_speakers_id] all_data_provider, "agenda/list-of-speakers", list_of_speakers_id
except KeyError: )
raise ProjectorElementException(
f"List of speakers {list_of_speakers_id} does not exist"
)
title_information = list_of_speakers["title_information"] title_information = list_of_speakers["title_information"]
# try to get the agenda item for the content object (which must not exist) # try to get the agenda item for the content object (which must not exist)
agenda_item_id = all_data[list_of_speakers["content_object"]["collection"]][ content_object = await get_model(
list_of_speakers["content_object"]["id"] all_data_provider,
].get("agenda_item_id") list_of_speakers["content_object"]["collection"],
if agenda_item_id: list_of_speakers["content_object"]["id"],
title_information["_agenda_item_number"] = all_data["agenda/item"][ )
agenda_item_id agenda_item_id = content_object.get("agenda_item_id")
]["item_number"] if agenda_item_id is not None:
agenda_item = await all_data_provider.get("agenda/item", agenda_item_id)
if agenda_item is not None:
title_information["_agenda_item_number"] = agenda_item["item_number"]
# Partition speaker objects to waiting, current and finished # Partition speaker objects to waiting, current and finished
speakers_waiting = [] speakers_waiting = []
speakers_finished = [] speakers_finished = []
current_speaker = None current_speaker = None
for speaker in list_of_speakers["speakers"]: for speaker in list_of_speakers["speakers"]:
user = await get_user_name(all_data, speaker["user_id"]) user = await get_user_name(all_data_provider, speaker["user_id"])
formatted_speaker = { formatted_speaker = {
"user": user, "user": user,
"marked": speaker["marked"], "marked": speaker["marked"],
@ -151,8 +162,12 @@ async def get_list_of_speakers_slide_data(
speakers_waiting = sorted(speakers_waiting, key=lambda s: s["weight"]) speakers_waiting = sorted(speakers_waiting, key=lambda s: s["weight"])
speakers_finished = sorted(speakers_finished, key=lambda s: s["end_time"]) speakers_finished = sorted(speakers_finished, key=lambda s: s["end_time"])
number_of_last_speakers = await get_config(all_data, "agenda_show_last_speakers") number_of_last_speakers = await get_config(
number_of_next_speakers = await get_config(all_data, "agenda_show_next_speakers") all_data_provider, "agenda_show_last_speakers"
)
number_of_next_speakers = await get_config(
all_data_provider, "agenda_show_next_speakers"
)
if number_of_last_speakers == 0: if number_of_last_speakers == 0:
speakers_finished = [] speakers_finished = []
@ -174,7 +189,7 @@ async def get_list_of_speakers_slide_data(
async def get_current_list_of_speakers_id_for_projector( async def get_current_list_of_speakers_id_for_projector(
all_data: AllData, projector: Dict[str, Any] all_data_provider: ProjectorAllDataProvider, projector: Dict[str, Any]
) -> Union[int, None]: ) -> Union[int, None]:
""" """
Search for elements, that do have a list of speakers: Search for elements, that do have a list of speakers:
@ -189,94 +204,88 @@ async def get_current_list_of_speakers_id_for_projector(
continue continue
collection = element["name"] collection = element["name"]
id = element["id"] id = element["id"]
if collection not in all_data or id not in all_data[collection]: model = await all_data_provider.get(collection, id)
if model is None:
continue continue
model = all_data[collection][id]
if "list_of_speakers_id" not in model: if "list_of_speakers_id" not in model:
continue continue
if not model["list_of_speakers_id"] in all_data["agenda/list-of-speakers"]: list_of_speakers_id = model["list_of_speakers_id"]
los_exists = await all_data_provider.exists(
"agenda/list-of-speakers", list_of_speakers_id
)
if not los_exists:
continue continue
list_of_speakers_id = model["list_of_speakers_id"]
break break
return list_of_speakers_id return list_of_speakers_id
async def get_reference_projector( async def get_reference_projector(
all_data: AllData, projector_id: int all_data_provider: ProjectorAllDataProvider, projector_id: int
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Returns the reference projector to the given projector (by id) Returns the reference projector to the given projector (by id)
""" """
try: this_projector = await get_model(all_data_provider, "core/projector", projector_id)
this_projector = all_data["core/projector"][projector_id]
except KeyError:
raise ProjectorElementException(f"Projector {projector_id} does not exist")
reference_projector_id = this_projector["reference_projector_id"] or projector_id reference_projector_id = this_projector["reference_projector_id"] or projector_id
try: return await get_model(all_data_provider, "core/projector", reference_projector_id)
reference_projector = all_data["core/projector"][reference_projector_id]
except KeyError:
raise ProjectorElementException(
f"Projector {reference_projector_id} does not exist"
)
return reference_projector
async def current_list_of_speakers_slide( async def current_list_of_speakers_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
The current list of speakers slide. Creates the data for the given projector. The current list of speakers slide. Creates the data for the given projector.
""" """
reference_projector = await get_reference_projector(all_data, projector_id) reference_projector = await get_reference_projector(all_data_provider, projector_id)
list_of_speakers_id = await get_current_list_of_speakers_id_for_projector( list_of_speakers_id = await get_current_list_of_speakers_id_for_projector(
all_data, reference_projector all_data_provider, reference_projector
) )
if list_of_speakers_id is None: # no element found if list_of_speakers_id is None: # no element found
return {} return {}
return await get_list_of_speakers_slide_data(all_data, list_of_speakers_id) return await get_list_of_speakers_slide_data(all_data_provider, list_of_speakers_id)
async def current_speaker_chyron_slide( async def current_speaker_chyron_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Returns the username for the current speaker. Returns the username for the current speaker.
""" """
# get projector for color information # get projector for color information
projector = all_data["core/projector"][projector_id] projector = await get_model(all_data_provider, "core/projector", projector_id)
slide_data = { slide_data = {
"background_color": projector["chyron_background_color"], "background_color": projector["chyron_background_color"],
"font_color": projector["chyron_font_color"], "font_color": projector["chyron_font_color"],
} }
reference_projector = await get_reference_projector(all_data, projector_id) reference_projector = await get_reference_projector(all_data_provider, projector_id)
list_of_speakers_id = await get_current_list_of_speakers_id_for_projector( list_of_speakers_id = await get_current_list_of_speakers_id_for_projector(
all_data, reference_projector all_data_provider, reference_projector
) )
if list_of_speakers_id is None: # no element found if list_of_speakers_id is None: # no element found
return slide_data return slide_data
# get list of speakers to search current speaker # get list of speakers to search current speaker
try: list_of_speakers = await get_model(
list_of_speakers = all_data["agenda/list-of-speakers"][list_of_speakers_id] all_data_provider, "agenda/list-of-speakers", list_of_speakers_id
except KeyError: )
raise ProjectorElementException(
f"List of speakers {list_of_speakers_id} does not exist"
)
# find current speaker # find current speaker
current_speaker = None current_speaker = None
for speaker in list_of_speakers["speakers"]: for speaker in list_of_speakers["speakers"]:
if speaker["begin_time"] is not None and speaker["end_time"] is None: if speaker["begin_time"] is not None and speaker["end_time"] is None:
current_speaker = await get_user_name(all_data, speaker["user_id"]) current_speaker = await get_user_name(all_data_provider, speaker["user_id"])
break break
if current_speaker is not None: if current_speaker is not None:

View File

@ -1,25 +1,29 @@
from typing import Any, Dict, List from typing import Any, Dict, List
from ..users.projector import get_user_name from ..users.projector import get_user_name
from ..utils.projector import AllData, get_model, get_models, register_projector_slide from ..utils.projector import (
ProjectorAllDataProvider,
get_model,
get_models,
register_projector_slide,
)
from .models import AssignmentPoll from .models import AssignmentPoll
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
async def assignment_slide( async def assignment_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Assignment slide. Assignment slide.
""" """
assignment = get_model(all_data, "assignments/assignment", element.get("id")) assignment = await get_model(
all_data_provider, "assignments/assignment", element.get("id")
)
assignment_related_users: List[Dict[str, Any]] = [ assignment_related_users: List[Dict[str, Any]] = [
{"user": await get_user_name(all_data, aru["user_id"])} {"user": await get_user_name(all_data_provider, aru["user_id"])}
for aru in sorted( for aru in sorted(
assignment["assignment_related_users"], key=lambda aru: aru["weight"] assignment["assignment_related_users"], key=lambda aru: aru["weight"]
) )
@ -36,13 +40,19 @@ async def assignment_slide(
async def assignment_poll_slide( async def assignment_poll_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Poll slide. Poll slide.
""" """
poll = get_model(all_data, "assignments/assignment-poll", element.get("id")) poll = await get_model(
assignment = get_model(all_data, "assignments/assignment", poll["assignment_id"]) all_data_provider, "assignments/assignment-poll", element.get("id")
)
assignment = await get_model(
all_data_provider, "assignments/assignment", poll["assignment_id"]
)
poll_data = { poll_data = {
key: poll[key] key: poll[key]
@ -60,10 +70,14 @@ async def assignment_poll_slide(
# Add options: # Add options:
poll_data["options"] = [] poll_data["options"] = []
options = get_models(all_data, "assignments/assignment-option", poll["options_id"]) options = await get_models(
all_data_provider, "assignments/assignment-option", poll["options_id"]
)
for option in sorted(options, key=lambda option: option["weight"]): for option in sorted(options, key=lambda option: option["weight"]):
option_data: Dict[str, Any] = { option_data: Dict[str, Any] = {
"user": {"short_name": await get_user_name(all_data, option["user_id"])} "user": {
"short_name": await get_user_name(all_data_provider, option["user_id"])
}
} }
if poll["state"] == AssignmentPoll.STATE_PUBLISHED: if poll["state"] == AssignmentPoll.STATE_PUBLISHED:
option_data["yes"] = float(option["yes"]) option_data["yes"] = float(option["yes"])

View File

@ -34,16 +34,10 @@ class CoreAppConfig(AppConfig):
ProjectionDefaultViewSet, ProjectionDefaultViewSet,
TagViewSet, TagViewSet,
) )
from .websocket import (
NotifyWebsocketClientMessage,
ConstantsWebsocketClientMessage,
GetElementsWebsocketClientMessage,
AutoupdateWebsocketClientMessage,
ListenToProjectors,
PingPong,
)
from ..utils.rest_api import router from ..utils.rest_api import router
from ..utils.websocket import register_client_message
# Let all client websocket message register
from ..utils import websocket_client_messages # noqa
# Collect all config variables before getting the constants. # Collect all config variables before getting the constants.
config.collect_config_variables_from_apps() config.collect_config_variables_from_apps()
@ -92,14 +86,6 @@ class CoreAppConfig(AppConfig):
self.get_model("Countdown").get_collection_string(), CountdownViewSet self.get_model("Countdown").get_collection_string(), CountdownViewSet
) )
# Register client messages
register_client_message(NotifyWebsocketClientMessage())
register_client_message(ConstantsWebsocketClientMessage())
register_client_message(GetElementsWebsocketClientMessage())
register_client_message(AutoupdateWebsocketClientMessage())
register_client_message(ListenToProjectors())
register_client_message(PingPong())
if "runserver" in sys.argv or "changeconfig" in sys.argv: if "runserver" in sys.argv or "changeconfig" in sys.argv:
from openslides.utils.startup import run_startup_hooks from openslides.utils.startup import run_startup_hooks

View File

@ -1,20 +1,17 @@
from typing import Any, Dict from typing import Any, Dict
from ..utils.projector import ( from ..utils.projector import (
AllData, ProjectorAllDataProvider,
ProjectorElementException,
get_config, get_config,
get_model,
register_projector_slide, register_projector_slide,
) )
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
async def countdown_slide( async def countdown_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Countdown slide. Countdown slide.
@ -26,23 +23,21 @@ async def countdown_slide(
id: 5, # Countdown ID id: 5, # Countdown ID
} }
""" """
countdown_id = element.get("id") or 1 countdown = await get_model(all_data_provider, "core/countdown", element.get("id"))
try:
countdown = all_data["core/countdown"][countdown_id]
except KeyError:
raise ProjectorElementException(f"Countdown {countdown_id} does not exist")
return { return {
"description": countdown["description"], "description": countdown["description"],
"running": countdown["running"], "running": countdown["running"],
"countdown_time": countdown["countdown_time"], "countdown_time": countdown["countdown_time"],
"warning_time": await get_config(all_data, "agenda_countdown_warning_time"), "warning_time": await get_config(
all_data_provider, "agenda_countdown_warning_time"
),
} }
async def message_slide( async def message_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Message slide. Message slide.
@ -54,16 +49,15 @@ async def message_slide(
id: 5, # ProjectorMessage ID id: 5, # ProjectorMessage ID
} }
""" """
message_id = element.get("id") or 1 return await get_model(
all_data_provider, "core/projector-message", element.get("id")
try: )
return all_data["core/projector-message"][message_id]
except KeyError:
raise ProjectorElementException(f"Message {message_id} does not exist")
async def clock_slide( async def clock_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
return {} return {}

View File

@ -1,35 +1,23 @@
from typing import Any, Dict from typing import Any, Dict
from ..utils.projector import ( from ..utils.projector import (
AllData, ProjectorAllDataProvider,
ProjectorElementException, get_model,
register_projector_slide, register_projector_slide,
) )
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
async def mediafile_slide( async def mediafile_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Slide for Mediafile. Slide for Mediafile.
""" """
mediafile_id = element.get("id") mediafile = await get_model(
all_data_provider, "mediafiles/mediafile", element.get("id")
if mediafile_id is None: )
raise ProjectorElementException("id is required for mediafile slide")
try:
mediafile = all_data["mediafiles/mediafile"][mediafile_id]
except KeyError:
raise ProjectorElementException(
f"mediafile with id {mediafile_id} does not exist"
)
return { return {
"path": mediafile["path"], "path": mediafile["path"],
"mimetype": mediafile["mimetype"], "mimetype": mediafile["mimetype"],

View File

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional
from ..users.projector import get_user_name from ..users.projector import get_user_name
from ..utils.projector import ( from ..utils.projector import (
AllData, ProjectorAllDataProvider,
ProjectorElementException, ProjectorElementException,
get_config, get_config,
get_model, get_model,
@ -14,33 +14,31 @@ from .models import MotionPoll
motion_placeholder_regex = re.compile(r"\[motion:(\d+)\]") motion_placeholder_regex = re.compile(r"\[motion:(\d+)\]")
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
async def get_state( async def get_state(
all_data: AllData, motion: Dict[str, Any], state_id_key: str all_data_provider: ProjectorAllDataProvider,
motion: Dict[str, Any],
state_id_key: str,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Returns a state element from one motion. Raises an error if the state does not exist. Returns a state element from one motion. Raises an error if the state does not exist.
""" """
state = all_data["motions/state"].get(motion[state_id_key]) state = await all_data_provider.get("motions/state", motion[state_id_key])
if not state: if state is None:
raise ProjectorElementException( raise ProjectorElementException(
f"motion {motion['id']} can not be on the state with id {motion[state_id_key]}" f"motion {motion['id']} can not be on the state with id {motion[state_id_key]}"
) )
return state return state
async def get_amendment_merge_into_motion_diff(all_data, amendment): async def get_amendment_merge_into_motion_diff(all_data_provider, amendment):
""" """
HINT: This implementation should be consistent to showInDiffView() in ViewMotionAmendedParagraph.ts HINT: This implementation should be consistent to showInDiffView() in ViewMotionAmendedParagraph.ts
""" """
if amendment["state_id"] is None: if amendment["state_id"] is None:
return 0 return 0
state = await get_state(all_data, amendment, "state_id") state = await get_state(all_data_provider, amendment, "state_id")
if state["merge_amendment_into_final"] == -1: if state["merge_amendment_into_final"] == -1:
return 0 return 0
if state["merge_amendment_into_final"] == 1: if state["merge_amendment_into_final"] == 1:
@ -48,36 +46,37 @@ async def get_amendment_merge_into_motion_diff(all_data, amendment):
if amendment["recommendation_id"] is None: if amendment["recommendation_id"] is None:
return 0 return 0
recommendation = await get_state(all_data, amendment, "recommendation_id") recommendation = await get_state(all_data_provider, amendment, "recommendation_id")
if recommendation["merge_amendment_into_final"] == 1: if recommendation["merge_amendment_into_final"] == 1:
return 1 return 1
return 0 return 0
async def get_amendment_merge_into_motion_final(all_data, amendment): async def get_amendment_merge_into_motion_final(all_data_provider, amendment):
""" """
HINT: This implementation should be consistent to showInFinalView() in ViewMotionAmendedParagraph.ts HINT: This implementation should be consistent to showInFinalView() in ViewMotionAmendedParagraph.ts
""" """
if amendment["state_id"] is None: if amendment["state_id"] is None:
return 0 return 0
state = await get_state(all_data, amendment, "state_id") state = await get_state(all_data_provider, amendment, "state_id")
if state["merge_amendment_into_final"] == 1: if state["merge_amendment_into_final"] == 1:
return 1 return 1
return 0 return 0
async def get_amendments_for_motion(motion, all_data): async def get_amendments_for_motion(motion, all_data_provider):
amendment_data = [] amendment_data = []
for amendment_id, amendment in all_data["motions/motion"].items(): all_motions = await all_data_provider.get_collection("motions/motion")
for amendment_id, amendment in all_motions.items():
if amendment["parent_id"] == motion["id"]: if amendment["parent_id"] == motion["id"]:
merge_amendment_into_final = await get_amendment_merge_into_motion_final( merge_amendment_into_final = await get_amendment_merge_into_motion_final(
all_data, amendment all_data_provider, amendment
) )
merge_amendment_into_diff = await get_amendment_merge_into_motion_diff( merge_amendment_into_diff = await get_amendment_merge_into_motion_diff(
all_data, amendment all_data_provider, amendment
) )
amendment_data.append( amendment_data.append(
{ {
@ -92,8 +91,10 @@ async def get_amendments_for_motion(motion, all_data):
return amendment_data return amendment_data
async def get_amendment_base_motion(amendment, all_data): async def get_amendment_base_motion(amendment, all_data_provider):
motion = get_model(all_data, "motions/motion", amendment.get("parent_id")) motion = await get_model(
all_data_provider, "motions/motion", amendment.get("parent_id")
)
return { return {
"identifier": motion["identifier"], "identifier": motion["identifier"],
@ -102,15 +103,17 @@ async def get_amendment_base_motion(amendment, all_data):
} }
async def get_amendment_base_statute(amendment, all_data): async def get_amendment_base_statute(amendment, all_data_provider):
statute = get_model( statute = await get_model(
all_data, "motions/statute-paragraph", amendment.get("statute_paragraph_id") all_data_provider,
"motions/statute-paragraph",
amendment.get("statute_paragraph_id"),
) )
return {"title": statute["title"], "text": statute["text"]} return {"title": statute["title"], "text": statute["text"]}
async def extend_reference_motion_dict( async def extend_reference_motion_dict(
all_data: AllData, all_data_provider: ProjectorAllDataProvider,
recommendation: Optional[str], recommendation: Optional[str],
referenced_motions: Dict[int, Dict[str, str]], referenced_motions: Dict[int, Dict[str, str]],
) -> None: ) -> None:
@ -127,15 +130,18 @@ async def extend_reference_motion_dict(
] ]
for id in referenced_ids: for id in referenced_ids:
# Put every referenced motion into the referenced_motions dict # Put every referenced motion into the referenced_motions dict
if id not in referenced_motions and id in all_data["motions/motion"]: referenced_motion = await all_data_provider.get("motions/motion", id)
if id not in referenced_motions and referenced_motion is not None:
referenced_motions[id] = { referenced_motions[id] = {
"title": all_data["motions/motion"][id]["title"], "title": referenced_motion["title"],
"identifier": all_data["motions/motion"][id]["identifier"], "identifier": referenced_motion["identifier"],
} }
async def motion_slide( async def motion_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Motion slide. Motion slide.
@ -158,13 +164,16 @@ async def motion_slide(
""" """
# Get motion # Get motion
mode = element.get( mode = element.get(
"mode", await get_config(all_data, "motions_recommendation_text_mode") "mode", await get_config(all_data_provider, "motions_recommendation_text_mode")
) )
motion = get_model(all_data, "motions/motion", element.get("id"))
# populate cache:
motion = await get_model(all_data_provider, "motions/motion", element.get("id"))
# Add submitters # Add submitters
submitters = [ submitters = [
await get_user_name(all_data, submitter["user_id"]) await get_user_name(all_data_provider, submitter["user_id"])
for submitter in sorted( for submitter in sorted(
motion["submitters"], key=lambda submitter: submitter["weight"] motion["submitters"], key=lambda submitter: submitter["weight"]
) )
@ -172,14 +181,16 @@ async def motion_slide(
# Get some needed config values # Get some needed config values
show_meta_box = not await get_config( show_meta_box = not await get_config(
all_data, "motions_disable_sidebox_on_projector" all_data_provider, "motions_disable_sidebox_on_projector"
) )
show_referring_motions = not await get_config( show_referring_motions = not await get_config(
all_data, "motions_hide_referring_motions" all_data_provider, "motions_hide_referring_motions"
) )
line_length = await get_config(all_data, "motions_line_length") line_length = await get_config(all_data_provider, "motions_line_length")
line_numbering_mode = await get_config(all_data, "motions_default_line_numbering") line_numbering_mode = await get_config(
motions_preamble = await get_config(all_data, "motions_preamble") all_data_provider, "motions_default_line_numbering"
)
motions_preamble = await get_config(all_data_provider, "motions_preamble")
# Query all change-recommendation and amendment related things. # Query all change-recommendation and amendment related things.
change_recommendations = [] # type: ignore change_recommendations = [] # type: ignore
@ -187,17 +198,19 @@ async def motion_slide(
base_motion = None base_motion = None
base_statute = None base_statute = None
if motion["statute_paragraph_id"]: if motion["statute_paragraph_id"]:
base_statute = await get_amendment_base_statute(motion, all_data) base_statute = await get_amendment_base_statute(motion, all_data_provider)
elif motion["parent_id"] is not None and motion["amendment_paragraphs"]: elif motion["parent_id"] is not None and motion["amendment_paragraphs"]:
base_motion = await get_amendment_base_motion(motion, all_data) base_motion = await get_amendment_base_motion(motion, all_data_provider)
else: else:
for change_recommendation_id in motion["change_recommendations_id"]: for change_recommendation_id in motion["change_recommendations_id"]:
cr = all_data["motions/motion-change-recommendation"].get( cr = await get_model(
change_recommendation_id all_data_provider,
"motions/motion-change-recommendation",
change_recommendation_id,
) )
if cr is not None and not cr["internal"]: if cr is not None and not cr["internal"]:
change_recommendations.append(cr) change_recommendations.append(cr)
amendments = await get_amendments_for_motion(motion, all_data) amendments = await get_amendments_for_motion(motion, all_data_provider)
# The base return value. More fields will get added below. # The base return value. More fields will get added below.
return_value = { return_value = {
@ -217,10 +230,10 @@ async def motion_slide(
"line_numbering_mode": line_numbering_mode, "line_numbering_mode": line_numbering_mode,
} }
if not await get_config(all_data, "motions_disable_text_on_projector"): if not await get_config(all_data_provider, "motions_disable_text_on_projector"):
return_value["text"] = motion["text"] return_value["text"] = motion["text"]
if not await get_config(all_data, "motions_disable_reason_on_projector"): if not await get_config(all_data_provider, "motions_disable_reason_on_projector"):
return_value["reason"] = motion["reason"] return_value["reason"] = motion["reason"]
if mode == "final": if mode == "final":
@ -228,40 +241,46 @@ async def motion_slide(
# Add recommendation, if enabled in config (and the motion has one) # Add recommendation, if enabled in config (and the motion has one)
if ( if (
not await get_config(all_data, "motions_disable_recommendation_on_projector") not await get_config(
all_data_provider, "motions_disable_recommendation_on_projector"
)
and motion["recommendation_id"] and motion["recommendation_id"]
): ):
recommendation_state = await get_state(all_data, motion, "recommendation_id") recommendation_state = await get_state(
all_data_provider, motion, "recommendation_id"
)
return_value["recommendation"] = recommendation_state["recommendation_label"] return_value["recommendation"] = recommendation_state["recommendation_label"]
if recommendation_state["show_recommendation_extension_field"]: if recommendation_state["show_recommendation_extension_field"]:
recommendation_extension = motion["recommendation_extension"] recommendation_extension = motion["recommendation_extension"]
# All title information for referenced motions in the recommendation # All title information for referenced motions in the recommendation
referenced_motions: Dict[int, Dict[str, str]] = {} referenced_motions: Dict[int, Dict[str, str]] = {}
await extend_reference_motion_dict( await extend_reference_motion_dict(
all_data, recommendation_extension, referenced_motions all_data_provider, recommendation_extension, referenced_motions
) )
return_value["recommendation_extension"] = recommendation_extension return_value["recommendation_extension"] = recommendation_extension
return_value["referenced_motions"] = referenced_motions return_value["referenced_motions"] = referenced_motions
if motion["statute_paragraph_id"]: if motion["statute_paragraph_id"]:
return_value["recommender"] = await get_config( return_value["recommender"] = await get_config(
all_data, "motions_statute_recommendations_by" all_data_provider, "motions_statute_recommendations_by"
) )
else: else:
return_value["recommender"] = await get_config( return_value["recommender"] = await get_config(
all_data, "motions_recommendations_by" all_data_provider, "motions_recommendations_by"
) )
if show_referring_motions: if show_referring_motions:
# Add recommendation-referencing motions # Add recommendation-referencing motions
return_value[ return_value[
"recommendation_referencing_motions" "recommendation_referencing_motions"
] = await get_recommendation_referencing_motions(all_data, motion["id"]) ] = await get_recommendation_referencing_motions(
all_data_provider, motion["id"]
)
return return_value return return_value
async def get_recommendation_referencing_motions( async def get_recommendation_referencing_motions(
all_data: AllData, motion_id: int all_data_provider: ProjectorAllDataProvider, motion_id: int
) -> Optional[List[Dict[str, Any]]]: ) -> Optional[List[Dict[str, Any]]]:
""" """
Returns all title information for motions, that are referencing Returns all title information for motions, that are referencing
@ -269,14 +288,15 @@ async def get_recommendation_referencing_motions(
motions, None is returned (instead of []). motions, None is returned (instead of []).
""" """
recommendation_referencing_motions = [] recommendation_referencing_motions = []
for motion in all_data["motions/motion"].values(): all_motions = await all_data_provider.get_collection("motions/motion")
for motion in all_motions.values():
# Motion must have a recommendation and a recommendaiton extension # Motion must have a recommendation and a recommendaiton extension
if not motion["recommendation_id"] or not motion["recommendation_extension"]: if not motion["recommendation_id"] or not motion["recommendation_extension"]:
continue continue
# The recommendation must allow the extension field (there might be left-overs # The recommendation must allow the extension field (there might be left-overs
# in a motions recommendation extension..) # in a motions recommendation extension..)
recommendation = await get_state(all_data, motion, "recommendation_id") recommendation = await get_state(all_data_provider, motion, "recommendation_id")
if not recommendation["show_recommendation_extension_field"]: if not recommendation["show_recommendation_extension_field"]:
continue continue
@ -297,12 +317,16 @@ async def get_recommendation_referencing_motions(
async def motion_block_slide( async def motion_block_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Motion block slide. Motion block slide.
""" """
motion_block = get_model(all_data, "motions/motion-block", element.get("id")) motion_block = await get_model(
all_data_provider, "motions/motion-block", element.get("id")
)
# All motions in this motion block # All motions in this motion block
motions = [] motions = []
@ -311,7 +335,8 @@ async def motion_block_slide(
referenced_motions: Dict[int, Dict[str, str]] = {} referenced_motions: Dict[int, Dict[str, str]] = {}
# Search motions. # Search motions.
for motion in all_data["motions/motion"].values(): all_motions = await all_data_provider.get_collection("motions/motion")
for motion in all_motions.values():
if motion["motion_block_id"] == motion_block["id"]: if motion["motion_block_id"] == motion_block["id"]:
motion_object = { motion_object = {
"title": motion["title"], "title": motion["title"],
@ -320,7 +345,9 @@ async def motion_block_slide(
recommendation_id = motion["recommendation_id"] recommendation_id = motion["recommendation_id"]
if recommendation_id is not None: if recommendation_id is not None:
recommendation = await get_state(all_data, motion, "recommendation_id") recommendation = await get_state(
all_data_provider, motion, "recommendation_id"
)
motion_object["recommendation"] = { motion_object["recommendation"] = {
"name": recommendation["recommendation_label"], "name": recommendation["recommendation_label"],
"css_class": recommendation["css_class"], "css_class": recommendation["css_class"],
@ -328,7 +355,7 @@ async def motion_block_slide(
if recommendation["show_recommendation_extension_field"]: if recommendation["show_recommendation_extension_field"]:
recommendation_extension = motion["recommendation_extension"] recommendation_extension = motion["recommendation_extension"]
await extend_reference_motion_dict( await extend_reference_motion_dict(
all_data, recommendation_extension, referenced_motions all_data_provider, recommendation_extension, referenced_motions
) )
motion_object["recommendation_extension"] = recommendation_extension motion_object["recommendation_extension"] = recommendation_extension
@ -342,13 +369,15 @@ async def motion_block_slide(
async def motion_poll_slide( async def motion_poll_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Poll slide. Poll slide.
""" """
poll = get_model(all_data, "motions/motion-poll", element.get("id")) poll = await get_model(all_data_provider, "motions/motion-poll", element.get("id"))
motion = get_model(all_data, "motions/motion", poll["motion_id"]) motion = await get_model(all_data_provider, "motions/motion", poll["motion_id"])
poll_data = { poll_data = {
key: poll[key] key: poll[key]
@ -363,8 +392,8 @@ async def motion_poll_slide(
} }
if poll["state"] == MotionPoll.STATE_PUBLISHED: if poll["state"] == MotionPoll.STATE_PUBLISHED:
option = get_model( option = await get_model(
all_data, "motions/motion-option", poll["options_id"][0] all_data_provider, "motions/motion-option", poll["options_id"][0]
) # there can only be exactly one option ) # there can only be exactly one option
poll_data["options"] = [ poll_data["options"] = [
{ {

View File

@ -156,9 +156,11 @@ class BasePollViewSet(ModelViewSet):
poll.state = BasePoll.STATE_PUBLISHED poll.state = BasePoll.STATE_PUBLISHED
poll.save() poll.save()
inform_changed_data(vote.user for vote in poll.get_votes().all() if vote.user) inform_changed_data(
inform_changed_data(poll.get_votes()) (vote.user for vote in poll.get_votes().all() if vote.user), final_data=True
inform_changed_data(poll.get_options()) )
inform_changed_data(poll.get_votes(), final_data=True)
inform_changed_data(poll.get_options(), final_data=True)
return Response() return Response()
@detail_route(methods=["POST"]) @detail_route(methods=["POST"])

View File

@ -1,19 +1,16 @@
from typing import Any, Dict from typing import Any, Dict
from ..utils.projector import ( from ..utils.projector import (
AllData, ProjectorAllDataProvider,
ProjectorElementException, get_model,
register_projector_slide, register_projector_slide,
) )
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
async def topic_slide( async def topic_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Topic slide. Topic slide.
@ -22,22 +19,8 @@ async def topic_slide(
* title * title
* text * text
""" """
topic_id = element.get("id") topic = await get_model(all_data_provider, "topics/topic", element.get("id"))
item = await get_model(all_data_provider, "agenda/item", topic["agenda_item_id"])
if topic_id is None:
raise ProjectorElementException("id is required for topic slide")
try:
topic = all_data["topics/topic"][topic_id]
except KeyError:
raise ProjectorElementException(f"topic with id {topic_id} does not exist")
item_id = topic["agenda_item_id"]
try:
item = all_data["agenda/item"][item_id]
except KeyError:
raise ProjectorElementException(f"item with id {item_id} does not exist")
return { return {
"title": topic["title"], "title": topic["title"],
"text": topic["text"], "text": topic["text"],

View File

@ -1,19 +1,16 @@
from typing import Any, Dict, List from typing import Any, Dict, List, Optional
from ..utils.projector import ( from ..utils.projector import (
AllData, ProjectorAllDataProvider,
ProjectorElementException, get_model,
register_projector_slide, register_projector_slide,
) )
# Important: All functions have to be prune. This means, that thay can only
# access the data, that they get as argument and do not have any
# side effects.
async def user_slide( async def user_slide(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
User slide. User slide.
@ -21,22 +18,16 @@ async def user_slide(
The returned dict can contain the following fields: The returned dict can contain the following fields:
* user * user
""" """
user_id = element.get("id") return {"user": await get_user_name(all_data_provider, element.get("id"))}
if user_id is None:
raise ProjectorElementException("id is required for user slide")
return {"user": await get_user_name(all_data, user_id)}
async def get_user_name(all_data: AllData, user_id: int) -> str: async def get_user_name(
all_data_provider: ProjectorAllDataProvider, user_id: Optional[int]
) -> str:
""" """
Returns the short name for an user_id. Returns the short name for an user_id.
""" """
try: user = await get_model(all_data_provider, "users/user", user_id)
user = all_data["users/user"][user_id]
except KeyError:
raise ProjectorElementException(f"user with id {user_id} does not exist")
name_parts: List[str] = [] name_parts: List[str] = []
for name_part in ("title", "first_name", "last_name"): for name_part in ("title", "first_name", "last_name"):

View File

@ -1,3 +1,4 @@
import json
import threading import threading
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
@ -7,9 +8,22 @@ from channels.layers import get_channel_layer
from django.db.models import Model from django.db.models import Model
from mypy_extensions import TypedDict from mypy_extensions import TypedDict
from .cache import element_cache, get_element_id from .cache import ChangeIdTooLowError, element_cache, get_element_id
from .projector import get_projector_data from .projector import get_projector_data
from .utils import get_model_from_collection_string, is_iterable from .timing import Timing
from .utils import get_model_from_collection_string, is_iterable, split_element_id
AutoupdateFormat = TypedDict(
"AutoupdateFormat",
{
"changed": Dict[str, List[Dict[str, Any]]],
"deleted": Dict[str, List[int]],
"from_change_id": int,
"to_change_id": int,
"all_data": bool,
},
)
class AutoupdateElementBase(TypedDict): class AutoupdateElementBase(TypedDict):
@ -66,13 +80,15 @@ class AutoupdateBundle:
element["id"] element["id"]
] = element ] = element
def done(self) -> None: def done(self) -> Optional[int]:
""" """
Finishes the bundle by resolving all missing data and passing it to Finishes the bundle by resolving all missing data and passing it to
the history and element cache. the history and element cache.
Returns the change id, if there are autoupdate elements. Otherwise none.
""" """
if not self.autoupdate_elements: if not self.autoupdate_elements:
return return None
for collection, elements in self.autoupdate_elements.items(): for collection, elements in self.autoupdate_elements.items():
# Get all ids, that do not have a full_data key # Get all ids, that do not have a full_data key
@ -92,13 +108,14 @@ class AutoupdateBundle:
elements[full_data["id"]]["full_data"] = full_data elements[full_data["id"]]["full_data"] = full_data
# Save histroy here using sync code. # Save histroy here using sync code.
save_history(self.elements) save_history(self.element_iterator)
# Update cache and send autoupdate using async code. # Update cache and send autoupdate using async code.
async_to_sync(self.async_handle_collection_elements)() change_id = async_to_sync(self.dispatch_autoupdate)()
return change_id
@property @property
def elements(self) -> Iterable[AutoupdateElement]: def element_iterator(self) -> Iterable[AutoupdateElement]:
""" Iterator for all elements in this bundle """ """ Iterator for all elements in this bundle """
for elements in self.autoupdate_elements.values(): for elements in self.autoupdate_elements.values():
yield from elements.values() yield from elements.values()
@ -110,7 +127,7 @@ class AutoupdateBundle:
Returns the change_id Returns the change_id
""" """
cache_elements: Dict[str, Optional[Dict[str, Any]]] = {} cache_elements: Dict[str, Optional[Dict[str, Any]]] = {}
for element in self.elements: for element in self.element_iterator:
element_id = get_element_id(element["collection_string"], element["id"]) element_id = get_element_id(element["collection_string"], element["id"])
full_data = element.get("full_data") full_data = element.get("full_data")
if full_data: if full_data:
@ -120,9 +137,11 @@ class AutoupdateBundle:
cache_elements[element_id] = full_data cache_elements[element_id] = full_data
return await element_cache.change_elements(cache_elements) return await element_cache.change_elements(cache_elements)
async def async_handle_collection_elements(self) -> None: async def dispatch_autoupdate(self) -> int:
""" """
Async helper function to update cache and send autoupdate. Async helper function to update cache and send autoupdate.
Return the change_id
""" """
# Update cache # Update cache
change_id = await self.update_cache() change_id = await self.update_cache()
@ -130,21 +149,23 @@ class AutoupdateBundle:
# Send autoupdate # Send autoupdate
channel_layer = get_channel_layer() channel_layer = get_channel_layer()
await channel_layer.group_send( await channel_layer.group_send(
"autoupdate", {"type": "send_data", "change_id": change_id} "autoupdate", {"type": "msg_new_change_id", "change_id": change_id}
) )
projector_data = await get_projector_data()
# Send projector # Send projector
projector_data = await get_projector_data()
channel_layer = get_channel_layer() channel_layer = get_channel_layer()
await channel_layer.group_send( await channel_layer.group_send(
"projector", "projector",
{ {
"type": "projector_changed", "type": "msg_projector_data",
"data": projector_data, "data": projector_data,
"change_id": change_id, "change_id": change_id,
}, },
) )
return change_id
def inform_changed_data( def inform_changed_data(
instances: Union[Iterable[Model], Model], instances: Union[Iterable[Model], Model],
@ -152,6 +173,7 @@ def inform_changed_data(
user_id: Optional[int] = None, user_id: Optional[int] = None,
disable_history: bool = False, disable_history: bool = False,
no_delete_on_restriction: bool = False, no_delete_on_restriction: bool = False,
final_data: bool = False,
) -> None: ) -> None:
""" """
Informs the autoupdate system and the caching system about the creation or Informs the autoupdate system and the caching system about the creation or
@ -167,8 +189,10 @@ def inform_changed_data(
instances = (instances,) instances = (instances,)
root_instances = set(instance.get_root_rest_element() for instance in instances) root_instances = set(instance.get_root_rest_element() for instance in instances)
elements = [
AutoupdateElement( elements = []
for root_instance in root_instances:
element = AutoupdateElement(
id=root_instance.get_rest_pk(), id=root_instance.get_rest_pk(),
collection_string=root_instance.get_collection_string(), collection_string=root_instance.get_collection_string(),
disable_history=disable_history, disable_history=disable_history,
@ -176,8 +200,9 @@ def inform_changed_data(
user_id=user_id, user_id=user_id,
no_delete_on_restriction=no_delete_on_restriction, no_delete_on_restriction=no_delete_on_restriction,
) )
for root_instance in root_instances if final_data:
] element["full_data"] = root_instance.get_full_data()
elements.append(element)
inform_elements(elements) inform_elements(elements)
@ -246,14 +271,68 @@ class AutoupdateBundleMiddleware:
thread_id = threading.get_ident() thread_id = threading.get_ident()
autoupdate_bundle[thread_id] = AutoupdateBundle() autoupdate_bundle[thread_id] = AutoupdateBundle()
timing = Timing("request")
response = self.get_response(request) response = self.get_response(request)
timing()
# rewrite the response by adding the autoupdate on any success-case (2xx status)
bundle: AutoupdateBundle = autoupdate_bundle.pop(thread_id) bundle: AutoupdateBundle = autoupdate_bundle.pop(thread_id)
bundle.done() if response.status_code >= 200 and response.status_code < 300:
change_id = bundle.done()
if change_id is not None:
user_id = request.user.pk or 0
# Inject the autoupdate in the response.
# The complete response body will be overwritten!
autoupdate = async_to_sync(get_autoupdate_data)(
change_id, change_id, user_id
)
content = {"autoupdate": autoupdate, "data": response.data}
# Note: autoupdate may be none on skipped ones (which should not happen
# since the user has made the request....)
response.content = json.dumps(content)
timing(True)
return response return response
def save_history(elements: Iterable[AutoupdateElement]) -> Iterable: async def get_autoupdate_data(
from_change_id: int, to_change_id: int, user_id: int
) -> Optional[AutoupdateFormat]:
try:
changed_elements, deleted_element_ids = await element_cache.get_data_since(
user_id, from_change_id, to_change_id
)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id)
all_data = True
deleted_elements: Dict[str, List[int]] = {}
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Skip empty updates
return None
else:
# Normal autoupdate with data
return AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=to_change_id,
all_data=all_data,
)
def save_history(element_iterator: Iterable[AutoupdateElement]) -> Iterable:
""" """
Thin wrapper around the call of history saving manager method. Thin wrapper around the call of history saving manager method.
@ -261,4 +340,4 @@ def save_history(elements: Iterable[AutoupdateElement]) -> Iterable:
""" """
from ..core.models import History from ..core.models import History
return History.objects.add_elements(elements) return History.objects.add_elements(element_iterator)

View File

@ -254,25 +254,6 @@ class ElementCache:
all_data[collection] = await restricter(user_id, all_data[collection]) all_data[collection] = await restricter(user_id, all_data[collection])
return dict(all_data) return dict(all_data)
async def get_all_data_dict(self) -> Dict[str, Dict[int, Dict[str, Any]]]:
"""
Returns all data with a dict (id <-> element) per collection:
{
<collection>: {
<id>: <element>
}
}
"""
all_data: Dict[str, Dict[int, Dict[str, Any]]] = defaultdict(dict)
for element_id, data in (await self.cache_provider.get_all_data()).items():
collection, id = split_element_id(element_id)
element = json.loads(data.decode())
element.pop(
"_no_delete_on_restriction", False
) # remove special field for get_data_since
all_data[collection][id] = element
return dict(all_data)
async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]: async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]:
""" """
Returns the data for one collection as dict: {id: <element>} Returns the data for one collection as dict: {id: <element>}

View File

@ -8,7 +8,11 @@ from django.core.exceptions import ImproperlyConfigured
from typing_extensions import Protocol from typing_extensions import Protocol
from . import logging from . import logging
from .redis import read_only_redis_amount_replicas, use_redis from .redis import (
read_only_redis_amount_replicas,
read_only_redis_wait_timeout,
use_redis,
)
from .schema_version import SchemaVersion from .schema_version import SchemaVersion
from .utils import split_element_id, str_dict_to_bytes from .utils import split_element_id, str_dict_to_bytes
@ -297,7 +301,7 @@ class RedisCacheProvider:
async def add_to_full_data(self, data: Dict[str, str]) -> None: async def add_to_full_data(self, data: Dict[str, str]) -> None:
async with get_connection() as redis: async with get_connection() as redis:
redis.hmset_dict(self.full_data_cache_key, data) await redis.hmset_dict(self.full_data_cache_key, data)
async def data_exists(self) -> bool: async def data_exists(self) -> bool:
""" """
@ -492,11 +496,12 @@ class RedisCacheProvider:
raise e raise e
if not read_only and read_only_redis_amount_replicas is not None: if not read_only and read_only_redis_amount_replicas is not None:
reported_amount = await redis.wait( reported_amount = await redis.wait(
read_only_redis_amount_replicas, 1000 read_only_redis_amount_replicas, read_only_redis_wait_timeout
) )
if reported_amount != read_only_redis_amount_replicas: if reported_amount != read_only_redis_amount_replicas:
logger.warn( logger.warn(
f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!" f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} "
+ f"requested after {read_only_redis_wait_timeout} ms!"
) )
return result return result

View File

@ -0,0 +1,108 @@
import asyncio
from asyncio import Task
from typing import Optional, cast
from django.conf import settings
from .autoupdate import get_autoupdate_data
from .cache import element_cache
from .websocket import ChangeIdTooHighException, ProtocollAsyncJsonWebsocketConsumer
AUTOUPDATE_DELAY = getattr(settings, "AUTOUPDATE_DELAY", None)
class ConsumerAutoupdateStrategy:
def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None:
self.consumer = consumer
# client_change_id = None: unknown -> set on first autoupdate or request_change_id
# client_change_id is int: the change_id, the client knows about, so the next
# update must be from client_change_id+1 .. <next clange_id>
self.client_change_id: Optional[int] = None
self.max_seen_change_id = 0
self.next_send_time = None
self.timer_task_handle: Optional[Task[None]] = None
self.lock = asyncio.Lock()
async def request_change_id(
self, change_id: int, in_response: Optional[str] = None
) -> None:
"""
The change id is not inclusive, so the client is on change_id and wants
data from change_id+1 .. now
"""
# This resets the server side tracking of the client's change id.
async with self.lock:
await self.stop_timer()
self.max_seen_change_id = await element_cache.get_current_change_id()
print(self.max_seen_change_id)
self.client_change_id = change_id
if self.client_change_id == self.max_seen_change_id:
# The client is up-to-date, so nothing will be done
return None
if self.client_change_id > self.max_seen_change_id:
message = (
f"Requested change_id {self.client_change_id} is higher than the "
+ f"highest change_id {self.max_seen_change_id}."
)
raise ChangeIdTooHighException(message, in_response=in_response)
await self.send_autoupdate(in_response=in_response)
async def new_change_id(self, change_id: int) -> None:
async with self.lock:
if self.client_change_id is None:
# The -1 is to send this autoupdate as the first one to he client.
# Remember: the client_change_id is the change_id the client knows about
self.client_change_id = change_id - 1
if change_id > self.max_seen_change_id:
self.max_seen_change_id = change_id
if AUTOUPDATE_DELAY is None: # feature deactivated, send directly
await self.send_autoupdate()
elif self.timer_task_handle is None:
await self.start_timer()
async def get_running_loop(self) -> asyncio.AbstractEventLoop:
if hasattr(asyncio, "get_running_loop"):
return asyncio.get_running_loop() # type: ignore
else:
return asyncio.get_event_loop()
async def start_timer(self) -> None:
loop = await self.get_running_loop()
self.timer_task_handle = loop.create_task(self.timer_task())
async def stop_timer(self) -> None:
if self.timer_task_handle is not None:
self.timer_task_handle.cancel()
self.timer_task_handle = None
async def timer_task(self) -> None:
try:
await asyncio.sleep(AUTOUPDATE_DELAY)
except asyncio.CancelledError:
return
async with self.lock:
await self.send_autoupdate()
self.timer_task_handle = None
async def send_autoupdate(self, in_response: Optional[str] = None) -> None:
# it is important to save this variable, because it can change during runtime.
max_change_id = self.max_seen_change_id
# here, 1 is added to the change_id, because the client_change_id is the id the client
# *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is
# inclusive [change_id .. max_change_id].
autoupdate = await get_autoupdate_data(
cast(int, self.client_change_id) + 1, max_change_id, self.consumer.user_id
)
if autoupdate is not None:
# It will be send, so we can set the client_change_id
self.client_change_id = max_change_id
await self.consumer.send_json(
type="autoupdate", content=autoupdate, in_response=in_response,
)

View File

@ -1,32 +1,19 @@
import time import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, cast from typing import Any, Dict, List, Optional, cast
from urllib.parse import parse_qs from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer from channels.generic.websocket import AsyncWebsocketConsumer
from mypy_extensions import TypedDict
from ..utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH
from . import logging from . import logging
from .auth import UserDoesNotExist, async_anonymous_is_enabled from .auth import UserDoesNotExist, async_anonymous_is_enabled
from .cache import ChangeIdTooLowError, element_cache, split_element_id from .cache import element_cache
from .consumer_autoupdate_strategy import ConsumerAutoupdateStrategy
from .utils import get_worker_id from .utils import get_worker_id
from .websocket import ProtocollAsyncJsonWebsocketConsumer from .websocket import BaseWebsocketException, ProtocollAsyncJsonWebsocketConsumer
logger = logging.getLogger("openslides.websocket") logger = logging.getLogger("openslides.websocket")
AutoupdateFormat = TypedDict(
"AutoupdateFormat",
{
"changed": Dict[str, List[Dict[str, Any]]],
"deleted": Dict[str, List[int]],
"from_change_id": int,
"to_change_id": int,
"all_data": bool,
},
)
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
""" """
@ -40,12 +27,11 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
ID counter for assigning each instance of this class an unique id. ID counter for assigning each instance of this class an unique id.
""" """
skipped_autoupdate_from_change_id: Optional[int] = None
def __init__(self, *args: Any, **kwargs: Any) -> None: def __init__(self, *args: Any, **kwargs: Any) -> None:
self.projector_hash: Dict[int, int] = {} self.projector_hash: Dict[int, int] = {}
SiteConsumer.ID_COUNTER += 1 SiteConsumer.ID_COUNTER += 1
self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER) self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER)
self.autoupdate_strategy = ConsumerAutoupdateStrategy(self)
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
async def connect(self) -> None: async def connect(self) -> None:
@ -56,11 +42,13 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
Sends the startup data to the user. Sends the startup data to the user.
""" """
self.user_id = self.scope["user"]["id"]
self.connect_time = time.time() self.connect_time = time.time()
# self.scope['user'] is the full_data dict of the user. For an # self.scope['user'] is the full_data dict of the user. For an
# anonymous user is it the dict {'id': 0} # anonymous user is it the dict {'id': 0}
change_id = None change_id = None
if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]: if not await async_anonymous_is_enabled() and not self.user_id:
await self.accept() # workaround for #4009 await self.accept() # workaround for #4009
await self.close() await self.close()
logger.debug(f"connect: denied ({self._id})") logger.debug(f"connect: denied ({self._id})")
@ -74,24 +62,23 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
change_id = int(query_string[b"change_id"][0]) change_id = int(query_string[b"change_id"][0])
except ValueError: except ValueError:
await self.accept() # workaround for #4009 await self.accept() # workaround for #4009
await self.close() # TODO: Find a way to send an error code await self.close()
logger.debug(f"connect: wrong change id ({self._id})") logger.debug(f"connect: wrong change id ({self._id})")
return return
if b"autoupdate" in query_string and query_string[b"autoupdate"][
0
].lower() not in [b"0", b"off", b"false"]:
# a positive value in autoupdate. Start autoupdate
await self.channel_layer.group_add("autoupdate", self.channel_name)
await self.accept() await self.accept()
if change_id is not None: if change_id is not None:
logger.debug(f"connect: change id {change_id} ({self._id})") logger.debug(f"connect: change id {change_id} ({self._id})")
await self.send_autoupdate(change_id) try:
await self.request_autoupdate(change_id)
except BaseWebsocketException as e:
await self.send_exception(e)
else: else:
logger.debug(f"connect: no change id ({self._id})") logger.debug(f"connect: no change id ({self._id})")
await self.channel_layer.group_add("autoupdate", self.channel_name)
async def disconnect(self, close_code: int) -> None: async def disconnect(self, close_code: int) -> None:
""" """
A user disconnects. Remove it from autoupdate. A user disconnects. Remove it from autoupdate.
@ -102,110 +89,19 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
f"disconnect code={close_code} active_secs={active_seconds} ({self._id})" f"disconnect code={close_code} active_secs={active_seconds} ({self._id})"
) )
async def send_notify(self, event: Dict[str, Any]) -> None: async def msg_new_change_id(self, event: Dict[str, Any]) -> None:
"""
Send a notify message to the user.
"""
user_id = self.scope["user"]["id"]
item = event["incomming"]
users = item.get("users")
reply_channels = item.get("replyChannels")
if (
(isinstance(users, bool) and users)
or (isinstance(users, list) and user_id in users)
or (
isinstance(reply_channels, list) and self.channel_name in reply_channels
)
or (users is None and reply_channels is None)
):
item["senderChannelName"] = event["senderChannelName"]
item["senderUserId"] = event["senderUserId"]
await self.send_json(type="notify", content=item)
async def send_autoupdate(
self,
change_id: int,
max_change_id: Optional[int] = None,
in_response: Optional[str] = None,
) -> None:
"""
Sends an autoupdate to the client from change_id to max_change_id.
If max_change_id is None, the current change id will be used.
"""
user_id = self.scope["user"]["id"]
if max_change_id is None:
max_change_id = await element_cache.get_current_change_id()
if change_id == max_change_id + 1:
# The client is up-to-date, so nothing will be done
return
if change_id > max_change_id:
message = f"Requested change_id {change_id} is higher this highest change_id {max_change_id}."
await self.send_error(
code=WEBSOCKET_CHANGE_ID_TOO_HIGH,
message=message,
in_response=in_response,
)
return
try:
changed_elements, deleted_element_ids = await element_cache.get_data_since(
user_id, change_id, max_change_id
)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id)
all_data = True
deleted_elements: Dict[str, List[int]] = {}
except UserDoesNotExist:
# Maybe the user was deleted, but a websocket connection is still open to the user.
# So we can close this connection and return.
await self.close()
return
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Set the current from_change_id, if it is the first skipped autoupdate
if not self.skipped_autoupdate_from_change_id:
self.skipped_autoupdate_from_change_id = change_id
else:
# Normal autoupdate with data
from_change_id = change_id
# If there is at least one skipped autoupdate, take the saved from_change_id
if self.skipped_autoupdate_from_change_id:
from_change_id = self.skipped_autoupdate_from_change_id
self.skipped_autoupdate_from_change_id = None
await self.send_json(
type="autoupdate",
content=AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=max_change_id,
all_data=all_data,
),
in_response=in_response,
)
async def send_data(self, event: Dict[str, Any]) -> None:
""" """
Send changed or deleted elements to the user. Send changed or deleted elements to the user.
""" """
change_id = event["change_id"] change_id = event["change_id"]
await self.send_autoupdate(change_id, max_change_id=change_id) try:
await self.autoupdate_strategy.new_change_id(change_id)
except UserDoesNotExist:
# Maybe the user was deleted, but a websocket connection is still open to the user.
# So we can close this connection and return.
await self.close()
async def projector_changed(self, event: Dict[str, Any]) -> None: async def msg_projector_data(self, event: Dict[str, Any]) -> None:
""" """
The projector has changed. The projector has changed.
""" """
@ -223,6 +119,33 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
if projector_data: if projector_data:
await self.send_projector_data(projector_data, change_id=change_id) await self.send_projector_data(projector_data, change_id=change_id)
async def msg_notify(self, event: Dict[str, Any]) -> None:
"""
Send a notify message to the user.
"""
item = event["incomming"]
users = item.get("users")
reply_channels = item.get("replyChannels")
if (
(isinstance(users, bool) and users)
or (isinstance(users, list) and self.user_id in users)
or (
isinstance(reply_channels, list) and self.channel_name in reply_channels
)
or (users is None and reply_channels is None)
):
item["senderChannelName"] = event["senderChannelName"]
item["senderUserId"] = event["senderUserId"]
await self.send_json(type="notify", content=item)
async def request_autoupdate(
self, change_id: int, in_response: Optional[str] = None
) -> None:
await self.autoupdate_strategy.request_change_id(
change_id, in_response=in_response
)
async def send_projector_data( async def send_projector_data(
self, self,
data: Dict[int, Dict[str, Any]], data: Dict[int, Dict[str, Any]],

View File

@ -175,7 +175,7 @@ class RESTModelMixin:
current_time = time.time() current_time = time.time()
if current_time > last_time + 5: if current_time > last_time + 5:
last_time = current_time last_time = current_time
logger.info(f"\t{i+1}/{instances_length}...") logger.info(f" {i+1}/{instances_length}...")
return full_data return full_data
@classmethod @classmethod

View File

@ -5,16 +5,14 @@ Functions that handel the registration of projector elements and the rendering
of the data to present it on the projector. of the data to present it on the projector.
""" """
from typing import Any, Awaitable, Callable, Dict, List from collections import defaultdict
from typing import Any, Awaitable, Callable, Dict, List, Optional
from . import logging
from .cache import element_cache from .cache import element_cache
AllData = Dict[str, Dict[int, Dict[str, Any]]] logger = logging.getLogger(__name__)
ProjectorSlide = Callable[[AllData, Dict[str, Any], int], Awaitable[Dict[str, Any]]]
projector_slides: Dict[str, ProjectorSlide] = {}
class ProjectorElementException(Exception): class ProjectorElementException(Exception):
@ -23,6 +21,46 @@ class ProjectorElementException(Exception):
""" """
class ProjectorAllDataProvider:
NON_EXISTENT_MARKER = object()
def __init__(self) -> None:
self.cache: Any = defaultdict(dict) # fuu you mypy
self.fetched_collection: Dict[str, bool] = {}
async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]:
cache_data = self.cache[collection].get(id)
if cache_data is None:
data: Any = await element_cache.get_element_data(collection, id)
if data is None:
data = ProjectorAllDataProvider.NON_EXISTENT_MARKER
self.cache[collection][id] = data
cache_data = self.cache[collection][id]
if cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER:
return None
return cache_data
async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]:
if not self.fetched_collection.get(collection, False):
collection_data = await element_cache.get_collection_data(collection)
self.cache[collection] = collection_data
self.fetched_collection[collection] = True
return self.cache[collection]
async def exists(self, collection: str, id: int) -> bool:
model = await self.get(collection, id)
return model is not None
ProjectorSlide = Callable[
[ProjectorAllDataProvider, Dict[str, Any], int], Awaitable[Dict[str, Any]]
]
projector_slides: Dict[str, ProjectorSlide] = {}
def register_projector_slide(name: str, slide: ProjectorSlide) -> None: def register_projector_slide(name: str, slide: ProjectorSlide) -> None:
""" """
Registers a projector slide. Registers a projector slide.
@ -67,10 +105,11 @@ async def get_projector_data(
if projector_ids is None: if projector_ids is None:
projector_ids = [] projector_ids = []
all_data = await element_cache.get_all_data_dict()
projector_data: Dict[int, List[Dict[str, Any]]] = {} projector_data: Dict[int, List[Dict[str, Any]]] = {}
all_data_provider = ProjectorAllDataProvider()
projectors = await all_data_provider.get_collection("core/projector")
for projector_id, projector in all_data.get("core/projector", {}).items(): for projector_id, projector in projectors.items():
if projector_ids and projector_id not in projector_ids: if projector_ids and projector_id not in projector_ids:
# only render the projector in question. # only render the projector in question.
continue continue
@ -83,7 +122,7 @@ async def get_projector_data(
for element in projector["elements"]: for element in projector["elements"]:
projector_slide = projector_slides[element["name"]] projector_slide = projector_slides[element["name"]]
try: try:
data = await projector_slide(all_data, element, projector_id) data = await projector_slide(all_data_provider, element, projector_id)
except ProjectorElementException as err: except ProjectorElementException as err:
data = {"error": str(err)} data = {"error": str(err)}
projector_data[projector_id].append({"data": data, "element": element}) projector_data[projector_id].append({"data": data, "element": element})
@ -91,18 +130,23 @@ async def get_projector_data(
return projector_data return projector_data
async def get_config(all_data: AllData, key: str) -> Any: async def get_config(all_data_provider: ProjectorAllDataProvider, key: str) -> Any:
""" """
Returns a config value from all_data. Returns a config value from all_data_provider.
Triggers the cache early: It access `get_colelction` instead of `get`. It
allows for all successive queries for configs to be cached.
""" """
from ..core.config import config from ..core.config import config
config_id = (await config.async_get_key_to_id())[key] config_id = (await config.async_get_key_to_id())[key]
return all_data[config.get_collection_string()][config_id]["value"] configs = await all_data_provider.get_collection(config.get_collection_string())
return configs[config_id]["value"]
def get_model(all_data: AllData, collection: str, id: Any) -> Dict[str, Any]: async def get_model(
all_data_provider: ProjectorAllDataProvider, collection: str, id: Any
) -> Dict[str, Any]:
""" """
Tries to get the model identified by the collection and id. Tries to get the model identified by the collection and id.
If the id is invalid or the model not found, ProjectorElementExceptions will be raised. If the id is invalid or the model not found, ProjectorElementExceptions will be raised.
@ -110,17 +154,19 @@ def get_model(all_data: AllData, collection: str, id: Any) -> Dict[str, Any]:
if id is None: if id is None:
raise ProjectorElementException(f"id is required for {collection} slide") raise ProjectorElementException(f"id is required for {collection} slide")
try: model = await all_data_provider.get(collection, id)
model = all_data[collection][id] if model is None:
except KeyError:
raise ProjectorElementException(f"{collection} with id {id} does not exist") raise ProjectorElementException(f"{collection} with id {id} does not exist")
return model return model
def get_models( async def get_models(
all_data: AllData, collection: str, ids: List[Any] all_data_provider: ProjectorAllDataProvider, collection: str, ids: List[Any]
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Tries to fetch all given models. Models are required to be all of the collection `collection`. Tries to fetch all given models. Models are required to be all of the collection `collection`.
""" """
return [get_model(all_data, collection, id) for id in ids] logger.info(
f"Note: a call to `get_models` with {collection}/{ids}. This might be cache-intensive"
)
return [await get_model(all_data_provider, collection, id) for id in ids]

View File

@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
use_redis = False use_redis = False
use_read_only_redis = False use_read_only_redis = False
read_only_redis_amount_replicas = None read_only_redis_amount_replicas = None
read_only_redis_wait_timeout = None
try: try:
import aioredis import aioredis
@ -35,6 +36,8 @@ else:
read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1) read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1)
logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}") logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}")
read_only_redis_wait_timeout = getattr(settings, "WAIT_TIMEOUT", 1000)
logger.info(f"WAIT_TIMEOUT={read_only_redis_wait_timeout}")
else: else:
logger.info("Redis is not configured.") logger.info("Redis is not configured.")

View File

@ -0,0 +1,27 @@
import time
from typing import List, Optional
from . import logging
timelogger = logging.getLogger(__name__)
class Timing:
def __init__(self, name: str) -> None:
self.name = name
self.times: List[float] = [time.time()]
def __call__(self, done: Optional[bool] = False) -> None:
self.times.append(time.time())
if done:
self.printtime()
def printtime(self) -> None:
s = f"{self.name}: "
for i in range(1, len(self.times)):
diff = self.times[i] - self.times[i - 1]
s += f"{i}: {diff:.5f} "
diff = self.times[-1] - self.times[0]
s += f"sum: {diff:.5f}"
timelogger.info(s)

View File

@ -25,6 +25,26 @@ WEBSOCKET_WRONG_FORMAT = 102
# If the recieved data has not the expected format. # If the recieved data has not the expected format.
class BaseWebsocketException(Exception):
code: int
def __init__(self, message: str, in_response: Optional[str] = None) -> None:
self.message = message
self.in_response = in_response
class NotAuthorizedException(BaseWebsocketException):
code = WEBSOCKET_NOT_AUTHORIZED
class ChangeIdTooHighException(BaseWebsocketException):
code = WEBSOCKET_CHANGE_ID_TOO_HIGH
class WrongFormatException(BaseWebsocketException):
code = WEBSOCKET_WRONG_FORMAT
class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer):
async def receive( async def receive(
self, self,
@ -122,6 +142,20 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer):
silence_errors=silence_errors, silence_errors=silence_errors,
) )
async def send_exception(
self, e: BaseWebsocketException, silence_errors: Optional[bool] = True,
) -> None:
"""
Send generic error messages with a custom status code (see above) and a text message.
"""
await self.send_json(
"error",
{"code": e.code, "message": e.message},
None,
in_response=e.in_response,
silence_errors=silence_errors,
)
async def receive_json(self, content: Any) -> None: # type: ignore async def receive_json(self, content: Any) -> None: # type: ignore
""" """
Receives the json data, parses it and calls receive_content. Receives the json data, parses it and calls receive_content.
@ -140,9 +174,12 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer):
) )
return return
await websocket_client_messages[content["type"]].receive_content( try:
self, content["content"], id=content["id"] await websocket_client_messages[content["type"]].receive_content(
) self, content["content"], id=content["id"]
)
except BaseWebsocketException as e:
await self.send_exception(e)
schema: Dict[str, Any] = { schema: Dict[str, Any] = {

View File

@ -1,21 +1,22 @@
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from ..utils import logging from . import logging
from ..utils.auth import async_has_perm from .auth import async_has_perm
from ..utils.constants import get_constants from .constants import get_constants
from ..utils.projector import get_projector_data from .projector import get_projector_data
from ..utils.stats import WebsocketLatencyLogger from .stats import WebsocketLatencyLogger
from ..utils.websocket import ( from .websocket import (
WEBSOCKET_NOT_AUTHORIZED,
BaseWebsocketClientMessage, BaseWebsocketClientMessage,
NotAuthorizedException,
ProtocollAsyncJsonWebsocketConsumer, ProtocollAsyncJsonWebsocketConsumer,
register_client_message,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): class Notify(BaseWebsocketClientMessage):
""" """
Websocket message from a client to send a message to other clients. Websocket message from a client to send a message to other clients.
""" """
@ -59,13 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage):
) -> None: ) -> None:
# Check if the user is allowed to send this notify message # Check if the user is allowed to send this notify message
perm = self.notify_permissions.get(content["name"]) perm = self.notify_permissions.get(content["name"])
if perm is not None and not await async_has_perm( if perm is not None and not await async_has_perm(consumer.user_id, perm):
consumer.scope["user"]["id"], perm raise NotAuthorizedException(
): f"You need '{perm}' to send this message.", in_response=id,
await consumer.send_error(
code=WEBSOCKET_NOT_AUTHORIZED,
message=f"You need '{perm}' to send this message.",
in_response=id,
) )
else: else:
# Some logging # Some logging
@ -84,15 +81,18 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage):
await consumer.channel_layer.group_send( await consumer.channel_layer.group_send(
"site", "site",
{ {
"type": "send_notify", "type": "msg_notify",
"incomming": content, "incomming": content,
"senderChannelName": consumer.channel_name, "senderChannelName": consumer.channel_name,
"senderUserId": consumer.scope["user"]["id"], "senderUserId": consumer.user_id,
}, },
) )
class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): register_client_message(Notify())
class Constants(BaseWebsocketClientMessage):
""" """
The Client requests the constants. The Client requests the constants.
""" """
@ -109,7 +109,10 @@ class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage):
) )
class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): register_client_message(Constants())
class GetElements(BaseWebsocketClientMessage):
""" """
The Client request database elements. The Client request database elements.
""" """
@ -130,26 +133,10 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage):
self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str
) -> None: ) -> None:
requested_change_id = content.get("change_id", 0) requested_change_id = content.get("change_id", 0)
await consumer.send_autoupdate(requested_change_id, in_response=id) await consumer.request_autoupdate(requested_change_id, in_response=id)
class AutoupdateWebsocketClientMessage(BaseWebsocketClientMessage): register_client_message(GetElements())
"""
The Client turns autoupdate on or off.
"""
identifier = "autoupdate"
async def receive_content(
self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str
) -> None:
# Turn on or off the autoupdate for the client
if content: # accept any value, that can be interpreted as bool
await consumer.channel_layer.group_add("autoupdate", consumer.channel_name)
else:
await consumer.channel_layer.group_discard(
"autoupdate", consumer.channel_name
)
class ListenToProjectors(BaseWebsocketClientMessage): class ListenToProjectors(BaseWebsocketClientMessage):
@ -198,6 +185,9 @@ class ListenToProjectors(BaseWebsocketClientMessage):
await consumer.send_projector_data(projector_data, in_response=id) await consumer.send_projector_data(projector_data, in_response=id)
register_client_message(ListenToProjectors())
class PingPong(BaseWebsocketClientMessage): class PingPong(BaseWebsocketClientMessage):
""" """
Responds to pings from the client. Responds to pings from the client.
@ -220,3 +210,6 @@ class PingPong(BaseWebsocketClientMessage):
await consumer.send_json(type="pong", content=latency, in_response=id) await consumer.send_json(type="pong", content=latency, in_response=id)
if latency is not None: if latency is not None:
await WebsocketLatencyLogger.add_latency(latency) await WebsocketLatencyLogger.add_latency(latency)
register_client_message(PingPong())

View File

@ -1,9 +1,13 @@
from typing import Any, Dict, List from typing import Any, Dict, List, Optional, cast
from openslides.core.config import config from openslides.core.config import config
from openslides.core.models import Projector from openslides.core.models import Projector
from openslides.users.models import User from openslides.users.models import User
from openslides.utils.projector import AllData, get_config, register_projector_slide from openslides.utils.projector import (
ProjectorAllDataProvider,
get_config,
register_projector_slide,
)
class TConfig: class TConfig:
@ -94,19 +98,23 @@ class TProjector:
async def slide1( async def slide1(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Slide that shows the general_event_name. Slide that shows the general_event_name.
""" """
return { return {
"name": "slide1", "name": "slide1",
"event_name": await get_config(all_data, "general_event_name"), "event_name": await get_config(all_data_provider, "general_event_name"),
} }
async def slide2( async def slide2(
all_data: AllData, element: Dict[str, Any], projector_id: int all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
return {"name": "slide2"} return {"name": "slide2"}
@ -115,17 +123,26 @@ register_projector_slide("test/slide1", slide1)
register_projector_slide("test/slide2", slide2) register_projector_slide("test/slide2", slide2)
def all_data_config() -> AllData: class TestProjectorAllDataProvider:
return { def __init__(self, data):
TConfig().get_collection_string(): { self.data = data
element["id"]: element for element in TConfig().get_elements()
} async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]:
} collection_data = await self.get_collection(collection)
return collection_data.get(id)
async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]:
return self.data.get(collection, {})
async def exists(self, collection: str, id: int) -> bool:
return (await self.get(collection, id)) is not None
def all_data_users() -> AllData: def get_all_data_provider(data) -> ProjectorAllDataProvider:
return { data[TConfig().get_collection_string()] = {
TUser().get_collection_string(): { element["id"]: element for element in TConfig().get_elements()
element["id"]: element for element in TUser().get_elements()
}
} }
data[TUser().get_collection_string()] = {
element["id"]: element for element in TUser().get_elements()
}
return cast(ProjectorAllDataProvider, TestProjectorAllDataProvider(data))

View File

@ -1,4 +1,3 @@
import asyncio
from importlib import import_module from importlib import import_module
from typing import Optional, Tuple from typing import Optional, Tuple
from unittest.mock import patch from unittest.mock import patch
@ -51,7 +50,7 @@ async def prepare_element_cache(settings):
] ]
) )
element_cache._cachables = None element_cache._cachables = None
await element_cache.async_ensure_cache(default_change_id=2) await element_cache.async_ensure_cache(default_change_id=10)
yield yield
# Reset the cachable_provider # Reset the cachable_provider
element_cache.cachable_provider = orig_cachable_provider element_cache.cachable_provider = orig_cachable_provider
@ -158,19 +157,9 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_changed_data_autoupdate_off(communicator, set_config): async def test_changed_data_autoupdate(get_communicator, set_config):
await set_config("general_system_enable_anonymous", True) await set_config("general_system_enable_anonymous", True)
await communicator.connect() communicator = get_communicator()
# Change a config value
await set_config("general_event_name", "Test Event")
assert await communicator.receive_nothing()
@pytest.mark.asyncio
async def test_changed_data_autoupdate_on(get_communicator, set_config):
await set_config("general_system_enable_anonymous", True)
communicator = get_communicator("autoupdate=on")
await communicator.connect() await communicator.connect()
# Change a config value # Change a config value
@ -212,7 +201,7 @@ async def create_user_session_cookie(user_id: int) -> Tuple[bytes, bytes]:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_with_user(get_communicator): async def test_with_user(get_communicator):
cookie_header = await create_user_session_cookie(1) cookie_header = await create_user_session_cookie(1)
communicator = get_communicator("autoupdate=on", headers=[cookie_header]) communicator = get_communicator(headers=[cookie_header])
connected, __ = await communicator.connect() connected, __ = await communicator.connect()
@ -222,7 +211,7 @@ async def test_with_user(get_communicator):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_skipping_autoupdate(set_config, get_communicator): async def test_skipping_autoupdate(set_config, get_communicator):
cookie_header = await create_user_session_cookie(1) cookie_header = await create_user_session_cookie(1)
communicator = get_communicator("autoupdate=on", headers=[cookie_header]) communicator = get_communicator(headers=[cookie_header])
await communicator.connect() await communicator.connect()
@ -265,7 +254,7 @@ async def test_skipping_autoupdate(set_config, get_communicator):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_receive_deleted_data(get_communicator, set_config): async def test_receive_deleted_data(get_communicator, set_config):
await set_config("general_system_enable_anonymous", True) await set_config("general_system_enable_anonymous", True)
communicator = get_communicator("autoupdate=on") communicator = get_communicator()
await communicator.connect() await communicator.connect()
# Delete test element # Delete test element
@ -395,6 +384,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_get_elements_too_small_change_id(communicator, set_config): async def test_send_get_elements_too_small_change_id(communicator, set_config):
# Note: this test depends on the default_change_id set in prepare_element_cache
await set_config("general_system_enable_anonymous", True) await set_config("general_system_enable_anonymous", True)
await communicator.connect() await communicator.connect()
@ -422,12 +412,12 @@ async def test_send_connect_up_to_date(communicator, set_config):
{"type": "getElements", "content": {"change_id": 0}, "id": "test_id"} {"type": "getElements", "content": {"change_id": 0}, "id": "test_id"}
) )
response1 = await communicator.receive_json_from() response1 = await communicator.receive_json_from()
first_change_id = response1.get("content")["to_change_id"] max_change_id = response1.get("content")["to_change_id"]
await communicator.send_json_to( await communicator.send_json_to(
{ {
"type": "getElements", "type": "getElements",
"content": {"change_id": first_change_id + 1}, "content": {"change_id": max_change_id},
"id": "test_id", "id": "test_id",
} }
) )
@ -510,43 +500,6 @@ async def test_send_invalid_get_elements(communicator, set_config):
assert response.get("in_response") == "test_id" assert response.get("in_response") == "test_id"
@pytest.mark.asyncio
async def test_turn_on_autoupdate(communicator, set_config):
await set_config("general_system_enable_anonymous", True)
await communicator.connect()
await communicator.send_json_to(
{"type": "autoupdate", "content": "on", "id": "test_id"}
)
await asyncio.sleep(0.01)
# Change a config value
await set_config("general_event_name", "Test Event")
response = await communicator.receive_json_from()
id = config.get_key_to_id()["general_event_name"]
type = response.get("type")
content = response.get("content")
assert type == "autoupdate"
assert content["changed"] == {
"core/config": [{"id": id, "key": "general_event_name", "value": "Test Event"}]
}
@pytest.mark.asyncio
async def test_turn_off_autoupdate(get_communicator, set_config):
await set_config("general_system_enable_anonymous", True)
communicator = get_communicator("autoupdate=on")
await communicator.connect()
await communicator.send_json_to(
{"type": "autoupdate", "content": False, "id": "test_id"}
)
await asyncio.sleep(0.01)
# Change a config value
await set_config("general_event_name", "Test Event")
assert await communicator.receive_nothing()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_listen_to_projector(communicator, set_config): async def test_listen_to_projector(communicator, set_config):
await set_config("general_system_enable_anonymous", True) await set_config("general_system_enable_anonymous", True)
@ -565,7 +518,7 @@ async def test_listen_to_projector(communicator, set_config):
content = response.get("content") content = response.get("content")
assert type == "projector" assert type == "projector"
assert content == { assert content == {
"change_id": 3, "change_id": 11,
"data": { "data": {
"1": [ "1": [
{ {
@ -588,17 +541,22 @@ async def test_update_projector(communicator, set_config):
"id": "test_id", "id": "test_id",
} }
) )
await communicator.receive_json_from() await communicator.receive_json_from() # recieve initial projector data
# Change a config value # Change a config value
await set_config("general_event_name", "Test Event") await set_config("general_event_name", "Test Event")
# We need two messages: The autoupdate and the projector data in this order
response = await communicator.receive_json_from()
assert response.get("type") == "autoupdate"
response = await communicator.receive_json_from() response = await communicator.receive_json_from()
type = response.get("type") type = response.get("type")
content = response.get("content") content = response.get("content")
assert type == "projector" assert type == "projector"
assert content == { assert content == {
"change_id": 4, "change_id": 12,
"data": { "data": {
"1": [ "1": [
{ {
@ -629,4 +587,8 @@ async def test_update_projector_to_current_value(communicator, set_config):
# Change a config value to current_value # Change a config value to current_value
await set_config("general_event_name", "OpenSlides") await set_config("general_event_name", "OpenSlides")
# We await an autoupdate, bot no projector data
response = await communicator.receive_json_from()
assert response.get("type") == "autoupdate"
assert await communicator.receive_nothing() assert await communicator.receive_nothing()

View File

@ -4,10 +4,12 @@ import pytest
from openslides.agenda import projector from openslides.agenda import projector
from ...integration.helpers import get_all_data_provider
@pytest.fixture @pytest.fixture
def all_data(): def all_data_provider():
all_data = { data = {
"agenda/item": { "agenda/item": {
1: { 1: {
"id": 1, "id": 1,
@ -82,14 +84,14 @@ def all_data():
} }
} }
return all_data return get_all_data_provider(data)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_main_items(all_data): async def test_main_items(all_data_provider):
element: Dict[str, Any] = {} element: Dict[str, Any] = {}
data = await projector.item_list_slide(all_data, element, 1) data = await projector.item_list_slide(all_data_provider, element, 1)
assert data == { assert data == {
"items": [ "items": [
@ -106,10 +108,10 @@ async def test_main_items(all_data):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_all_items(all_data): async def test_all_items(all_data_provider):
element: Dict[str, Any] = {"only_main_items": False} element: Dict[str, Any] = {"only_main_items": False}
data = await projector.item_list_slide(all_data, element, 1) data = await projector.item_list_slide(all_data_provider, element, 1)
assert data == { assert data == {
"items": [ "items": [

View File

@ -4,14 +4,13 @@ import pytest
from openslides.motions import projector from openslides.motions import projector
from ...integration.helpers import all_data_config, all_data_users from ...integration.helpers import get_all_data_provider
@pytest.fixture @pytest.fixture
def all_data(): def all_data_provider():
return_value = all_data_config() data = {}
return_value.update(all_data_users()) data["motions/motion"] = {
return_value["motions/motion"] = {
1: { 1: {
"id": 1, "id": 1,
"identifier": "4", "identifier": "4",
@ -143,7 +142,7 @@ def all_data():
"change_recommendations": [], "change_recommendations": [],
}, },
} }
return_value["motions/workflow"] = { data["motions/workflow"] = {
1: { 1: {
"id": 1, "id": 1,
"name": "Simple Workflow", "name": "Simple Workflow",
@ -151,7 +150,7 @@ def all_data():
"first_state_id": 1, "first_state_id": 1,
} }
} }
return_value["motions/state"] = { data["motions/state"] = {
1: { 1: {
"id": 1, "id": 1,
"name": "submitted", "name": "submitted",
@ -217,7 +216,7 @@ def all_data():
"workflow_id": 1, "workflow_id": 1,
}, },
} }
return_value["motions/statute-paragraph"] = { data["motions/statute-paragraph"] = {
1: { 1: {
"id": 1, "id": 1,
"title": "§1 Preamble", "title": "§1 Preamble",
@ -225,7 +224,7 @@ def all_data():
"weight": 10000, "weight": 10000,
} }
} }
return_value["motions/motion-change-recommendation"] = { data["motions/motion-change-recommendation"] = {
1: { 1: {
"id": 1, "id": 1,
"motion_id": 1, "motion_id": 1,
@ -251,14 +250,14 @@ def all_data():
"creation_time": "2019-02-09T09:54:06.256378+01:00", "creation_time": "2019-02-09T09:54:06.256378+01:00",
}, },
} }
return return_value return get_all_data_provider(data)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_motion_slide(all_data): async def test_motion_slide(all_data_provider):
element: Dict[str, Any] = {"id": 1} element: Dict[str, Any] = {"id": 1}
data = await projector.motion_slide(all_data, element, 1) data = await projector.motion_slide(all_data_provider, element, 1)
assert data == { assert data == {
"identifier": "4", "identifier": "4",
@ -304,10 +303,10 @@ async def test_motion_slide(all_data):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_amendment_slide(all_data): async def test_amendment_slide(all_data_provider):
element: Dict[str, Any] = {"id": 2} element: Dict[str, Any] = {"id": 2}
data = await projector.motion_slide(all_data, element, 1) data = await projector.motion_slide(all_data_provider, element, 1)
assert data == { assert data == {
"identifier": "Ä1", "identifier": "Ä1",
@ -331,10 +330,10 @@ async def test_amendment_slide(all_data):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_statute_amendment_slide(all_data): async def test_statute_amendment_slide(all_data_provider):
element: Dict[str, Any] = {"id": 3} element: Dict[str, Any] = {"id": 3}
data = await projector.motion_slide(all_data, element, 1) data = await projector.motion_slide(all_data_provider, element, 1)
assert data == { assert data == {
"identifier": None, "identifier": None,