(WIP) Ordered and delayed autoupdates:

- Extracted autoupdate code from consumers
- collect autoupdates until a AUTOUPDATE_DELAY is reached (since the first autoupdate)
- Added the AUTOUPDATE_DELAY parameter in the settings.py
- moved some autoupdate code to utils/autoupdate
- moved core/websocket to utils/websocket_client_messages
- add the autoupdate in the response (there are some todos left)
- do not send autoupdates on error (4xx, 5xx)
- the client blindly injects the autoupdate in the response
- removed the unused autoupdate on/off feature
- the clients sends now the maxChangeId (instead of maxChangeId+1) on connection
- the server accepts this.
This commit is contained in:
FinnStutzenstein 2020-05-15 18:24:21 +02:00
parent bf88cea200
commit d8b21c5fb5
No known key found for this signature in database
GPG Key ID: 9042F605C6324654
20 changed files with 501 additions and 350 deletions

View File

@ -143,3 +143,7 @@ not affect the client.
operator is in one of these groups, the client disconnected and reconnects again.
All requests urls (including websockets) are now prefixed with `/prioritize`, so
these requests from "prioritized clients" can be routed to different servers.
`AUTOUPDATE_DELAY`: The delay to send autoupdates. This feature can be
deactivated by setting it to `None`. It is deactivated per default. The Delay is
given in seconds

View File

@ -6,7 +6,7 @@ import { DataStoreService, DataStoreUpdateManagerService } from './data-store.se
import { Mutex } from '../promises/mutex';
import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service';
interface AutoupdateFormat {
export interface AutoupdateFormat {
/**
* All changed (and created) items as their full/restricted data grouped by their collection.
*/
@ -37,6 +37,19 @@ interface AutoupdateFormat {
all_data: boolean;
}
export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat {
const format = obj as AutoupdateFormat;
return (
obj &&
typeof obj === 'object' &&
format.changed !== undefined &&
format.deleted !== undefined &&
format.from_change_id !== undefined &&
format.to_change_id !== undefined &&
format.all_data !== undefined
);
}
/**
* Handles the initial update and automatic updates using the {@link WebsocketService}
* Incoming objects, usually BaseModels, will be saved in the dataStore (`this.DS`)
@ -120,27 +133,40 @@ export class AutoupdateService {
// Normal autoupdate
if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) {
const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS);
// Delete the removed objects from the DataStore
for (const collection of Object.keys(autoupdate.deleted)) {
await this.DS.remove(collection, autoupdate.deleted[collection]);
}
// Add the objects to the DataStore.
for (const collection of Object.keys(autoupdate.changed)) {
await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection]));
}
await this.DS.flushToStorage(autoupdate.to_change_id);
this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id);
this.injectAutupdateIntoDS(autoupdate, true);
} else {
// autoupdate fully in the future. we are missing something!
this.requestChanges();
}
}
public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise<void> {
const unlock = await this.mutex.lock();
console.debug('inject autoupdate', autoupdate);
this.injectAutupdateIntoDS(autoupdate, false);
unlock();
}
private async injectAutupdateIntoDS(autoupdate: AutoupdateFormat, flush: boolean): Promise<void> {
const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS);
// Delete the removed objects from the DataStore
for (const collection of Object.keys(autoupdate.deleted)) {
await this.DS.remove(collection, autoupdate.deleted[collection]);
}
// Add the objects to the DataStore.
for (const collection of Object.keys(autoupdate.changed)) {
await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection]));
}
if (flush) {
await this.DS.flushToStorage(autoupdate.to_change_id);
}
this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id);
}
/**
* Creates baseModels for each plain object. If the collection is not registered,
* A console error will be issued and an empty list returned.
@ -164,9 +190,8 @@ export class AutoupdateService {
* The server should return an autoupdate with all new data.
*/
public requestChanges(): void {
const changeId = this.DS.maxChangeId === 0 ? 0 : this.DS.maxChangeId + 1;
console.log(`requesting changed objects with DS max change id ${changeId}`);
this.websocketService.send('getElements', { change_id: changeId });
console.log(`requesting changed objects with DS max change id ${this.DS.maxChangeId}`);
this.websocketService.send('getElements', { change_id: this.DS.maxChangeId });
}
/**

View File

@ -258,7 +258,6 @@ export class DataStoreUpdateManagerService {
private serveNextSlot(): void {
if (this.updateSlotRequests.length > 0) {
console.warn('Concurrent update slots');
const request = this.updateSlotRequests.pop();
request.resolve();
}

View File

@ -3,6 +3,7 @@ import { Injectable } from '@angular/core';
import { TranslateService } from '@ngx-translate/core';
import { AutoupdateFormat, AutoupdateService, isAutoupdateFormat } from './autoupdate.service';
import { OpenSlidesStatusService } from './openslides-status.service';
import { formatQueryParams, QueryParams } from '../definitions/query-params';
@ -17,12 +18,12 @@ export enum HTTPMethod {
DELETE = 'delete'
}
export interface DetailResponse {
export interface ErrorDetailResponse {
detail: string | string[];
args?: string[];
}
function isDetailResponse(obj: any): obj is DetailResponse {
function isErrorDetailResponse(obj: any): obj is ErrorDetailResponse {
return (
obj &&
typeof obj === 'object' &&
@ -31,6 +32,15 @@ function isDetailResponse(obj: any): obj is DetailResponse {
);
}
interface AutoupdateResponse {
autoupdate: AutoupdateFormat;
data?: any;
}
function isAutoupdateReponse(obj: any): obj is AutoupdateResponse {
return obj && typeof obj === 'object' && isAutoupdateFormat((obj as AutoupdateResponse).autoupdate);
}
/**
* Service for managing HTTP requests. Allows to send data for every method. Also (TODO) will do generic error handling.
*/
@ -55,7 +65,8 @@ export class HttpService {
public constructor(
private http: HttpClient,
private translate: TranslateService,
private OSStatus: OpenSlidesStatusService
private OSStatus: OpenSlidesStatusService,
private autoupdateService: AutoupdateService
) {
this.defaultHeaders = new HttpHeaders().set('Content-Type', 'application/json');
}
@ -82,7 +93,7 @@ export class HttpService {
): Promise<T> {
// end early, if we are in history mode
if (this.OSStatus.isInHistoryMode && method !== HTTPMethod.GET) {
throw this.handleError('You cannot make changes while in history mode');
throw this.processError('You cannot make changes while in history mode');
}
// there is a current bug with the responseType.
@ -108,9 +119,10 @@ export class HttpService {
};
try {
return await this.http.request<T>(method, url, options).toPromise();
} catch (e) {
throw this.handleError(e);
const responseData: T = await this.http.request<T>(method, url, options).toPromise();
return this.processResponse(responseData);
} catch (error) {
throw this.processError(error);
}
}
@ -120,7 +132,7 @@ export class HttpService {
* @param e The error thrown.
* @returns The prepared and translated message for the user
*/
private handleError(e: any): string {
private processError(e: any): string {
let error = this.translate.instant('Error') + ': ';
// If the error is a string already, return it.
if (typeof e === 'string') {
@ -142,12 +154,14 @@ export class HttpService {
} else if (!e.error) {
error += this.translate.instant("The server didn't respond.");
} else if (typeof e.error === 'object') {
if (isDetailResponse(e.error)) {
error += this.processDetailResponse(e.error);
if (isErrorDetailResponse(e.error)) {
error += this.processErrorDetailResponse(e.error);
} else {
const errorList = Object.keys(e.error).map(key => {
const capitalizedKey = key.charAt(0).toUpperCase() + key.slice(1);
return `${this.translate.instant(capitalizedKey)}: ${this.processDetailResponse(e.error[key])}`;
return `${this.translate.instant(capitalizedKey)}: ${this.processErrorDetailResponse(
e.error[key]
)}`;
});
error = errorList.join(', ');
}
@ -168,11 +182,9 @@ export class HttpService {
* @param str a string or a string array to join together.
* @returns Error text(s) as single string
*/
private processDetailResponse(response: DetailResponse): string {
private processErrorDetailResponse(response: ErrorDetailResponse): string {
let message: string;
if (response instanceof Array) {
message = response.join(' ');
} else if (response.detail instanceof Array) {
if (response.detail instanceof Array) {
message = response.detail.join(' ');
} else {
message = response.detail;
@ -187,6 +199,14 @@ export class HttpService {
return message;
}
private processResponse<T>(responseData: T): T {
if (isAutoupdateReponse(responseData)) {
this.autoupdateService.injectAutoupdateIgnoreChangeId(responseData.autoupdate);
responseData = responseData.data;
}
return responseData;
}
/**
* Executes a get on a path with a certain object
* @param path The path to send the request to.

View File

@ -130,10 +130,7 @@ export class OpenSlidesService {
* Init DS from cache and after this start the websocket service.
*/
private async setupDataStoreAndWebSocket(): Promise<void> {
let changeId = await this.DS.initFromStorage();
if (changeId > 0) {
changeId += 1;
}
const changeId = await this.DS.initFromStorage();
// disconnect the WS connection, if there was one. This is needed
// to update the connection parameters, namely the cookies. If the user
// is changed, the WS needs to reconnect, so the new connection holds the new
@ -141,7 +138,7 @@ export class OpenSlidesService {
if (this.websocketService.isConnected) {
await this.websocketService.close(); // Wait for the disconnect.
}
await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId.
await this.websocketService.connect(changeId); // Request changes after changeId.
}
/**

View File

@ -456,7 +456,8 @@ export class OperatorService implements OnAfterAppsLoaded {
* Set the operators presence to isPresent
*/
public async setPresence(isPresent: boolean): Promise<void> {
await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent);
const r = await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent);
console.log('operator', r);
}
/**

View File

@ -40,7 +40,7 @@ export class PrioritizeService {
if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) {
console.log('Alter prioritization:', opPrioritized);
this.openSlidesStatusService.isPrioritizedClient = opPrioritized;
this.websocketService.reconnect({ changeId: this.DS.maxChangeId });
this.websocketService.reconnect(this.DS.maxChangeId);
}
}
}

View File

@ -55,14 +55,6 @@ export const WEBSOCKET_ERROR_CODES = {
WRONG_FORMAT: 102
};
/*
* Options for (re-)connecting.
*/
interface ConnectOptions {
changeId?: number;
enableAutoupdates?: boolean;
}
/**
* Service that handles WebSocket connections. Other services can register themselfs
* with {@method getOberservable} for a specific type of messages. The content will be published.
@ -207,7 +199,7 @@ export class WebsocketService {
*
* Uses NgZone to let all callbacks run in the angular context.
*/
public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise<void> {
public async connect(changeId: number | null = null, retry: boolean = false): Promise<void> {
const websocketId = Math.random().toString(36).substring(7);
this.websocketId = websocketId;
@ -220,17 +212,10 @@ export class WebsocketService {
this.shouldBeClosed = false;
}
// set defaults
options = Object.assign(options, {
enableAutoupdates: true
});
const queryParams: QueryParams = {};
const queryParams: QueryParams = {
autoupdate: options.enableAutoupdates
};
if (options.changeId !== undefined) {
queryParams.change_id = options.changeId;
if (changeId !== null) {
queryParams.change_id = changeId;
}
// Create the websocket
@ -398,7 +383,7 @@ export class WebsocketService {
const timeout = Math.floor(Math.random() * 3000 + 2000);
this.retryTimeout = setTimeout(() => {
this.retryTimeout = null;
this.connect({ enableAutoupdates: true }, true);
this.connect(null, true);
}, timeout);
}
}
@ -438,9 +423,9 @@ export class WebsocketService {
*
* @param options The options for the new connection
*/
public async reconnect(options: ConnectOptions = {}): Promise<void> {
public async reconnect(changeId: number | null = null): Promise<void> {
await this.close();
await this.connect(options);
await this.connect(changeId);
}
/**

View File

@ -34,16 +34,10 @@ class CoreAppConfig(AppConfig):
ProjectionDefaultViewSet,
TagViewSet,
)
from .websocket import (
NotifyWebsocketClientMessage,
ConstantsWebsocketClientMessage,
GetElementsWebsocketClientMessage,
AutoupdateWebsocketClientMessage,
ListenToProjectors,
PingPong,
)
from ..utils.rest_api import router
from ..utils.websocket import register_client_message
# Let all client websocket message register
from ..utils import websocket_client_messages # noqa
# Collect all config variables before getting the constants.
config.collect_config_variables_from_apps()
@ -92,14 +86,6 @@ class CoreAppConfig(AppConfig):
self.get_model("Countdown").get_collection_string(), CountdownViewSet
)
# Register client messages
register_client_message(NotifyWebsocketClientMessage())
register_client_message(ConstantsWebsocketClientMessage())
register_client_message(GetElementsWebsocketClientMessage())
register_client_message(AutoupdateWebsocketClientMessage())
register_client_message(ListenToProjectors())
register_client_message(PingPong())
if "runserver" in sys.argv or "changeconfig" in sys.argv:
from openslides.utils.startup import run_startup_hooks

View File

@ -1,3 +1,4 @@
import json
import threading
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
@ -7,9 +8,22 @@ from channels.layers import get_channel_layer
from django.db.models import Model
from mypy_extensions import TypedDict
from .cache import element_cache, get_element_id
from .cache import ChangeIdTooLowError, element_cache, get_element_id
from .projector import get_projector_data
from .utils import get_model_from_collection_string, is_iterable
from .timing import Timing
from .utils import get_model_from_collection_string, is_iterable, split_element_id
AutoupdateFormat = TypedDict(
"AutoupdateFormat",
{
"changed": Dict[str, List[Dict[str, Any]]],
"deleted": Dict[str, List[int]],
"from_change_id": int,
"to_change_id": int,
"all_data": bool,
},
)
class AutoupdateElementBase(TypedDict):
@ -66,13 +80,15 @@ class AutoupdateBundle:
element["id"]
] = element
def done(self) -> None:
def done(self) -> Optional[int]:
"""
Finishes the bundle by resolving all missing data and passing it to
the history and element cache.
Returns the change id, if there are autoupdate elements. Otherwise none.
"""
if not self.autoupdate_elements:
return
return None
for collection, elements in self.autoupdate_elements.items():
# Get all ids, that do not have a full_data key
@ -95,7 +111,7 @@ class AutoupdateBundle:
save_history(self.element_iterator)
# Update cache and send autoupdate using async code.
async_to_sync(self.async_handle_collection_elements)()
return async_to_sync(self.dispatch_autoupdate)()
@property
def element_iterator(self) -> Iterable[AutoupdateElement]:
@ -120,9 +136,11 @@ class AutoupdateBundle:
cache_elements[element_id] = full_data
return await element_cache.change_elements(cache_elements)
async def async_handle_collection_elements(self) -> None:
async def dispatch_autoupdate(self) -> int:
"""
Async helper function to update cache and send autoupdate.
Return the change_id
"""
# Update cache
change_id = await self.update_cache()
@ -130,21 +148,23 @@ class AutoupdateBundle:
# Send autoupdate
channel_layer = get_channel_layer()
await channel_layer.group_send(
"autoupdate", {"type": "send_data", "change_id": change_id}
"autoupdate", {"type": "msg_new_change_id", "change_id": change_id}
)
projector_data = await get_projector_data()
# Send projector
projector_data = await get_projector_data()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"projector",
{
"type": "projector_changed",
"type": "msg_projector_data",
"data": projector_data,
"change_id": change_id,
},
)
return change_id
def inform_changed_data(
instances: Union[Iterable[Model], Model],
@ -246,13 +266,67 @@ class AutoupdateBundleMiddleware:
thread_id = threading.get_ident()
autoupdate_bundle[thread_id] = AutoupdateBundle()
timing = Timing("request")
response = self.get_response(request)
timing()
# rewrite the response by adding the autoupdate on any success-case (2xx status)
bundle: AutoupdateBundle = autoupdate_bundle.pop(thread_id)
bundle.done()
if response.status_code >= 200 and response.status_code < 300:
change_id = bundle.done()
if change_id is not None:
user_id = request.user.pk or 0
# Inject the autoupdate in the response.
# The complete response body will be overwritten!
autoupdate = async_to_sync(get_autoupdate_data)(
change_id, change_id, user_id
)
content = {"autoupdate": autoupdate, "data": response.data}
# Note: autoupdate may be none on skipped ones (which should not happen
# since the user has made the request....)
response.content = json.dumps(content)
timing(True)
return response
async def get_autoupdate_data(
from_change_id: int, to_change_id: int, user_id: int
) -> Optional[AutoupdateFormat]:
try:
changed_elements, deleted_element_ids = await element_cache.get_data_since(
user_id, from_change_id, to_change_id
)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id)
all_data = True
deleted_elements: Dict[str, List[int]] = {}
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Skip empty updates
return None
else:
# Normal autoupdate with data
return AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=to_change_id,
all_data=all_data,
)
def save_history(element_iterator: Iterable[AutoupdateElement]) -> Iterable:
"""
Thin wrapper around the call of history saving manager method.

View File

@ -0,0 +1,99 @@
import asyncio
from asyncio import Task
from typing import Optional, cast
from django.conf import settings
from .autoupdate import get_autoupdate_data
from .cache import element_cache
from .websocket import ChangeIdTooHighException, ProtocollAsyncJsonWebsocketConsumer
AUTOUPDATE_DELAY = getattr(settings, "AUTOUPDATE_DELAY", None)
class ConsumerAutoupdateStrategy:
def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None:
self.consumer = consumer
# client_change_id = None: unknown -> set on first autoupdate or request_change_id
# client_change_id is int: the one
self.client_change_id: Optional[int] = None
self.max_seen_change_id = 0
self.next_send_time = None
self.timer_task_handle: Optional[Task[None]] = None
self.lock = asyncio.Lock()
async def request_change_id(
self, change_id: int, in_response: Optional[str] = None
) -> None:
# This resets the server side tracking of the client's change id.
async with self.lock:
await self.stop_timer()
self.max_seen_change_id = await element_cache.get_current_change_id()
self.client_change_id = change_id
if self.client_change_id == self.max_seen_change_id:
# The client is up-to-date, so nothing will be done
return None
if self.client_change_id > self.max_seen_change_id:
message = (
f"Requested change_id {self.client_change_id} is higher than the "
+ f"highest change_id {self.max_seen_change_id}."
)
raise ChangeIdTooHighException(message, in_response=in_response)
await self.send_autoupdate(in_response=in_response)
async def new_change_id(self, change_id: int) -> None:
async with self.lock:
if self.client_change_id is None:
self.client_change_id = change_id
if change_id > self.max_seen_change_id:
self.max_seen_change_id = change_id
if AUTOUPDATE_DELAY is None: # feature deactivated, send directly
await self.send_autoupdate()
elif self.timer_task_handle is None:
await self.start_timer()
async def get_running_loop(self) -> asyncio.AbstractEventLoop:
if hasattr(asyncio, "get_running_loop"):
return asyncio.get_running_loop() # type: ignore
else:
return asyncio.get_event_loop()
async def start_timer(self) -> None:
loop = await self.get_running_loop()
self.timer_task_handle = loop.create_task(self.timer_task())
async def stop_timer(self) -> None:
if self.timer_task_handle is not None:
self.timer_task_handle.cancel()
self.timer_task_handle = None
async def timer_task(self) -> None:
try:
await asyncio.sleep(AUTOUPDATE_DELAY)
except asyncio.CancelledError:
return
async with self.lock:
await self.send_autoupdate()
self.timer_task_handle = None
async def send_autoupdate(self, in_response: Optional[str] = None) -> None:
max_change_id = (
self.max_seen_change_id
) # important to save this variable, because
# it can change during runtime.
autoupdate = await get_autoupdate_data(
cast(int, self.client_change_id), max_change_id, self.consumer.user_id
)
if autoupdate is not None:
# It will be send, so we can set the client_change_id
self.client_change_id = max_change_id
await self.consumer.send_json(
type="autoupdate", content=autoupdate, in_response=in_response,
)

View File

@ -1,32 +1,19 @@
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, cast
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from mypy_extensions import TypedDict
from ..utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH
from . import logging
from .auth import UserDoesNotExist, async_anonymous_is_enabled
from .cache import ChangeIdTooLowError, element_cache, split_element_id
from .cache import element_cache
from .consumer_autoupdate_strategy import ConsumerAutoupdateStrategy
from .utils import get_worker_id
from .websocket import ProtocollAsyncJsonWebsocketConsumer
from .websocket import BaseWebsocketException, ProtocollAsyncJsonWebsocketConsumer
logger = logging.getLogger("openslides.websocket")
AutoupdateFormat = TypedDict(
"AutoupdateFormat",
{
"changed": Dict[str, List[Dict[str, Any]]],
"deleted": Dict[str, List[int]],
"from_change_id": int,
"to_change_id": int,
"all_data": bool,
},
)
class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
"""
@ -40,12 +27,11 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
ID counter for assigning each instance of this class an unique id.
"""
skipped_autoupdate_from_change_id: Optional[int] = None
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.projector_hash: Dict[int, int] = {}
SiteConsumer.ID_COUNTER += 1
self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER)
self.autoupdate_strategy = ConsumerAutoupdateStrategy(self)
super().__init__(*args, **kwargs)
async def connect(self) -> None:
@ -56,11 +42,13 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
Sends the startup data to the user.
"""
self.user_id = self.scope["user"]["id"]
self.connect_time = time.time()
# self.scope['user'] is the full_data dict of the user. For an
# anonymous user is it the dict {'id': 0}
change_id = None
if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]:
if not await async_anonymous_is_enabled() and not self.user_id:
await self.accept() # workaround for #4009
await self.close()
logger.debug(f"connect: denied ({self._id})")
@ -74,24 +62,23 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
change_id = int(query_string[b"change_id"][0])
except ValueError:
await self.accept() # workaround for #4009
await self.close() # TODO: Find a way to send an error code
await self.close()
logger.debug(f"connect: wrong change id ({self._id})")
return
if b"autoupdate" in query_string and query_string[b"autoupdate"][
0
].lower() not in [b"0", b"off", b"false"]:
# a positive value in autoupdate. Start autoupdate
await self.channel_layer.group_add("autoupdate", self.channel_name)
await self.accept()
if change_id is not None:
logger.debug(f"connect: change id {change_id} ({self._id})")
await self.send_autoupdate(change_id)
try:
await self.request_autoupdate(change_id)
except BaseWebsocketException as e:
await self.send_exception(e)
else:
logger.debug(f"connect: no change id ({self._id})")
await self.channel_layer.group_add("autoupdate", self.channel_name)
async def disconnect(self, close_code: int) -> None:
"""
A user disconnects. Remove it from autoupdate.
@ -102,110 +89,19 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
f"disconnect code={close_code} active_secs={active_seconds} ({self._id})"
)
async def send_notify(self, event: Dict[str, Any]) -> None:
"""
Send a notify message to the user.
"""
user_id = self.scope["user"]["id"]
item = event["incomming"]
users = item.get("users")
reply_channels = item.get("replyChannels")
if (
(isinstance(users, bool) and users)
or (isinstance(users, list) and user_id in users)
or (
isinstance(reply_channels, list) and self.channel_name in reply_channels
)
or (users is None and reply_channels is None)
):
item["senderChannelName"] = event["senderChannelName"]
item["senderUserId"] = event["senderUserId"]
await self.send_json(type="notify", content=item)
async def send_autoupdate(
self,
change_id: int,
max_change_id: Optional[int] = None,
in_response: Optional[str] = None,
) -> None:
"""
Sends an autoupdate to the client from change_id to max_change_id.
If max_change_id is None, the current change id will be used.
"""
user_id = self.scope["user"]["id"]
if max_change_id is None:
max_change_id = await element_cache.get_current_change_id()
if change_id == max_change_id + 1:
# The client is up-to-date, so nothing will be done
return
if change_id > max_change_id:
message = f"Requested change_id {change_id} is higher this highest change_id {max_change_id}."
await self.send_error(
code=WEBSOCKET_CHANGE_ID_TOO_HIGH,
message=message,
in_response=in_response,
)
return
try:
changed_elements, deleted_element_ids = await element_cache.get_data_since(
user_id, change_id, max_change_id
)
except ChangeIdTooLowError:
# The change_id is lower the the lowerst change_id in redis. Return all data
changed_elements = await element_cache.get_all_data_list(user_id)
all_data = True
deleted_elements: Dict[str, List[int]] = {}
except UserDoesNotExist:
# Maybe the user was deleted, but a websocket connection is still open to the user.
# So we can close this connection and return.
await self.close()
return
else:
all_data = False
deleted_elements = defaultdict(list)
for element_id in deleted_element_ids:
collection_string, id = split_element_id(element_id)
deleted_elements[collection_string].append(id)
# Check, if the autoupdate has any data.
if not changed_elements and not deleted_element_ids:
# Set the current from_change_id, if it is the first skipped autoupdate
if not self.skipped_autoupdate_from_change_id:
self.skipped_autoupdate_from_change_id = change_id
else:
# Normal autoupdate with data
from_change_id = change_id
# If there is at least one skipped autoupdate, take the saved from_change_id
if self.skipped_autoupdate_from_change_id:
from_change_id = self.skipped_autoupdate_from_change_id
self.skipped_autoupdate_from_change_id = None
await self.send_json(
type="autoupdate",
content=AutoupdateFormat(
changed=changed_elements,
deleted=deleted_elements,
from_change_id=from_change_id,
to_change_id=max_change_id,
all_data=all_data,
),
in_response=in_response,
)
async def send_data(self, event: Dict[str, Any]) -> None:
async def msg_new_change_id(self, event: Dict[str, Any]) -> None:
"""
Send changed or deleted elements to the user.
"""
change_id = event["change_id"]
await self.send_autoupdate(change_id, max_change_id=change_id)
try:
await self.autoupdate_strategy.new_change_id(change_id)
except UserDoesNotExist:
# Maybe the user was deleted, but a websocket connection is still open to the user.
# So we can close this connection and return.
await self.close()
async def projector_changed(self, event: Dict[str, Any]) -> None:
async def msg_projector_data(self, event: Dict[str, Any]) -> None:
"""
The projector has changed.
"""
@ -223,6 +119,33 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer):
if projector_data:
await self.send_projector_data(projector_data, change_id=change_id)
async def msg_notify(self, event: Dict[str, Any]) -> None:
"""
Send a notify message to the user.
"""
item = event["incomming"]
users = item.get("users")
reply_channels = item.get("replyChannels")
if (
(isinstance(users, bool) and users)
or (isinstance(users, list) and self.user_id in users)
or (
isinstance(reply_channels, list) and self.channel_name in reply_channels
)
or (users is None and reply_channels is None)
):
item["senderChannelName"] = event["senderChannelName"]
item["senderUserId"] = event["senderUserId"]
await self.send_json(type="notify", content=item)
async def request_autoupdate(
self, change_id: int, in_response: Optional[str] = None
) -> None:
await self.autoupdate_strategy.request_change_id(
change_id, in_response=in_response
)
async def send_projector_data(
self,
data: Dict[int, Dict[str, Any]],

View File

@ -35,9 +35,11 @@ class ProjectorAllDataProvider:
if data is None:
data = ProjectorAllDataProvider.NON_EXISTENT_MARKER
self.cache[collection][id] = data
elif cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER:
cache_data = self.cache[collection][id]
if cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER:
return None
return self.cache[collection][id]
return cache_data
async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]:
if not self.fetched_collection.get(collection, False):

View File

@ -0,0 +1,27 @@
import time
from typing import List, Optional
from . import logging
timelogger = logging.getLogger(__name__)
class Timing:
def __init__(self, name: str) -> None:
self.name = name
self.times: List[float] = [time.time()]
def __call__(self, done: Optional[bool] = False) -> None:
self.times.append(time.time())
if done:
self.printtime()
def printtime(self) -> None:
s = f"{self.name}: "
for i in range(1, len(self.times)):
diff = self.times[i] - self.times[i - 1]
s += f"{i}: {diff:.5f} "
diff = self.times[-1] - self.times[0]
s += f"sum: {diff:.5f}"
timelogger.info(s)

View File

@ -25,6 +25,26 @@ WEBSOCKET_WRONG_FORMAT = 102
# If the recieved data has not the expected format.
class BaseWebsocketException(Exception):
code: int
def __init__(self, message: str, in_response: Optional[str] = None) -> None:
self.message = message
self.in_response = in_response
class NotAuthorizedException(BaseWebsocketException):
code = WEBSOCKET_NOT_AUTHORIZED
class ChangeIdTooHighException(BaseWebsocketException):
code = WEBSOCKET_CHANGE_ID_TOO_HIGH
class WrongFormatException(BaseWebsocketException):
code = WEBSOCKET_WRONG_FORMAT
class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer):
async def receive(
self,
@ -122,6 +142,20 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer):
silence_errors=silence_errors,
)
async def send_exception(
self, e: BaseWebsocketException, silence_errors: Optional[bool] = True,
) -> None:
"""
Send generic error messages with a custom status code (see above) and a text message.
"""
await self.send_json(
"error",
{"code": e.code, "message": e.message},
None,
in_response=e.in_response,
silence_errors=silence_errors,
)
async def receive_json(self, content: Any) -> None: # type: ignore
"""
Receives the json data, parses it and calls receive_content.
@ -140,9 +174,12 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer):
)
return
await websocket_client_messages[content["type"]].receive_content(
self, content["content"], id=content["id"]
)
try:
await websocket_client_messages[content["type"]].receive_content(
self, content["content"], id=content["id"]
)
except BaseWebsocketException as e:
await self.send_exception(e)
schema: Dict[str, Any] = {

View File

@ -1,21 +1,22 @@
from typing import Any, Dict, Optional
from ..utils import logging
from ..utils.auth import async_has_perm
from ..utils.constants import get_constants
from ..utils.projector import get_projector_data
from ..utils.stats import WebsocketLatencyLogger
from ..utils.websocket import (
WEBSOCKET_NOT_AUTHORIZED,
from . import logging
from .auth import async_has_perm
from .constants import get_constants
from .projector import get_projector_data
from .stats import WebsocketLatencyLogger
from .websocket import (
BaseWebsocketClientMessage,
NotAuthorizedException,
ProtocollAsyncJsonWebsocketConsumer,
register_client_message,
)
logger = logging.getLogger(__name__)
class NotifyWebsocketClientMessage(BaseWebsocketClientMessage):
class Notify(BaseWebsocketClientMessage):
"""
Websocket message from a client to send a message to other clients.
"""
@ -59,13 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage):
) -> None:
# Check if the user is allowed to send this notify message
perm = self.notify_permissions.get(content["name"])
if perm is not None and not await async_has_perm(
consumer.scope["user"]["id"], perm
):
await consumer.send_error(
code=WEBSOCKET_NOT_AUTHORIZED,
message=f"You need '{perm}' to send this message.",
in_response=id,
if perm is not None and not await async_has_perm(consumer.user_id, perm):
raise NotAuthorizedException(
f"You need '{perm}' to send this message.", in_response=id,
)
else:
# Some logging
@ -84,15 +81,18 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage):
await consumer.channel_layer.group_send(
"site",
{
"type": "send_notify",
"type": "msg_notify",
"incomming": content,
"senderChannelName": consumer.channel_name,
"senderUserId": consumer.scope["user"]["id"],
"senderUserId": consumer.user_id,
},
)
class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage):
register_client_message(Notify())
class Constants(BaseWebsocketClientMessage):
"""
The Client requests the constants.
"""
@ -109,7 +109,10 @@ class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage):
)
class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage):
register_client_message(Constants())
class GetElements(BaseWebsocketClientMessage):
"""
The Client request database elements.
"""
@ -130,26 +133,10 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage):
self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str
) -> None:
requested_change_id = content.get("change_id", 0)
await consumer.send_autoupdate(requested_change_id, in_response=id)
await consumer.request_autoupdate(requested_change_id, in_response=id)
class AutoupdateWebsocketClientMessage(BaseWebsocketClientMessage):
"""
The Client turns autoupdate on or off.
"""
identifier = "autoupdate"
async def receive_content(
self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str
) -> None:
# Turn on or off the autoupdate for the client
if content: # accept any value, that can be interpreted as bool
await consumer.channel_layer.group_add("autoupdate", consumer.channel_name)
else:
await consumer.channel_layer.group_discard(
"autoupdate", consumer.channel_name
)
register_client_message(GetElements())
class ListenToProjectors(BaseWebsocketClientMessage):
@ -198,6 +185,9 @@ class ListenToProjectors(BaseWebsocketClientMessage):
await consumer.send_projector_data(projector_data, in_response=id)
register_client_message(ListenToProjectors())
class PingPong(BaseWebsocketClientMessage):
"""
Responds to pings from the client.
@ -220,3 +210,6 @@ class PingPong(BaseWebsocketClientMessage):
await consumer.send_json(type="pong", content=latency, in_response=id)
if latency is not None:
await WebsocketLatencyLogger.add_latency(latency)
register_client_message(PingPong())

View File

@ -1,9 +1,13 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, cast
from openslides.core.config import config
from openslides.core.models import Projector
from openslides.users.models import User
from openslides.utils.projector import AllData, get_config, register_projector_slide
from openslides.utils.projector import (
ProjectorAllDataProvider,
get_config,
register_projector_slide,
)
class TConfig:
@ -94,19 +98,23 @@ class TProjector:
async def slide1(
all_data: AllData, element: Dict[str, Any], projector_id: int
all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]:
"""
Slide that shows the general_event_name.
"""
return {
"name": "slide1",
"event_name": await get_config(all_data, "general_event_name"),
"event_name": await get_config(all_data_provider, "general_event_name"),
}
async def slide2(
all_data: AllData, element: Dict[str, Any], projector_id: int
all_data_provider: ProjectorAllDataProvider,
element: Dict[str, Any],
projector_id: int,
) -> Dict[str, Any]:
return {"name": "slide2"}
@ -115,17 +123,26 @@ register_projector_slide("test/slide1", slide1)
register_projector_slide("test/slide2", slide2)
def all_data_config() -> AllData:
return {
TConfig().get_collection_string(): {
element["id"]: element for element in TConfig().get_elements()
}
}
class TestProjectorAllDataProvider:
def __init__(self, data):
self.data = data
async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]:
collection_data = await self.get_collection(collection)
return collection_data.get(id)
async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]:
return self.data.get(collection, {})
async def exists(self, collection: str, id: int) -> bool:
return (await self.get(collection, id)) is not None
def all_data_users() -> AllData:
return {
TUser().get_collection_string(): {
element["id"]: element for element in TUser().get_elements()
}
def get_all_data_provider(data) -> ProjectorAllDataProvider:
data[TConfig().get_collection_string()] = {
element["id"]: element for element in TConfig().get_elements()
}
data[TUser().get_collection_string()] = {
element["id"]: element for element in TUser().get_elements()
}
return cast(ProjectorAllDataProvider, TestProjectorAllDataProvider(data))

View File

@ -1,4 +1,3 @@
import asyncio
from importlib import import_module
from typing import Optional, Tuple
from unittest.mock import patch
@ -158,17 +157,7 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config):
@pytest.mark.asyncio
async def test_changed_data_autoupdate_off(communicator, set_config):
await set_config("general_system_enable_anonymous", True)
await communicator.connect()
# Change a config value
await set_config("general_event_name", "Test Event")
assert await communicator.receive_nothing()
@pytest.mark.asyncio
async def test_changed_data_autoupdate_on(get_communicator, set_config):
async def test_changed_data_autoupdate(get_communicator, set_config):
await set_config("general_system_enable_anonymous", True)
communicator = get_communicator("autoupdate=on")
await communicator.connect()
@ -422,12 +411,12 @@ async def test_send_connect_up_to_date(communicator, set_config):
{"type": "getElements", "content": {"change_id": 0}, "id": "test_id"}
)
response1 = await communicator.receive_json_from()
first_change_id = response1.get("content")["to_change_id"]
max_change_id = response1.get("content")["to_change_id"]
await communicator.send_json_to(
{
"type": "getElements",
"content": {"change_id": first_change_id + 1},
"content": {"change_id": max_change_id},
"id": "test_id",
}
)
@ -510,43 +499,6 @@ async def test_send_invalid_get_elements(communicator, set_config):
assert response.get("in_response") == "test_id"
@pytest.mark.asyncio
async def test_turn_on_autoupdate(communicator, set_config):
await set_config("general_system_enable_anonymous", True)
await communicator.connect()
await communicator.send_json_to(
{"type": "autoupdate", "content": "on", "id": "test_id"}
)
await asyncio.sleep(0.01)
# Change a config value
await set_config("general_event_name", "Test Event")
response = await communicator.receive_json_from()
id = config.get_key_to_id()["general_event_name"]
type = response.get("type")
content = response.get("content")
assert type == "autoupdate"
assert content["changed"] == {
"core/config": [{"id": id, "key": "general_event_name", "value": "Test Event"}]
}
@pytest.mark.asyncio
async def test_turn_off_autoupdate(get_communicator, set_config):
await set_config("general_system_enable_anonymous", True)
communicator = get_communicator("autoupdate=on")
await communicator.connect()
await communicator.send_json_to(
{"type": "autoupdate", "content": False, "id": "test_id"}
)
await asyncio.sleep(0.01)
# Change a config value
await set_config("general_event_name", "Test Event")
assert await communicator.receive_nothing()
@pytest.mark.asyncio
async def test_listen_to_projector(communicator, set_config):
await set_config("general_system_enable_anonymous", True)
@ -588,10 +540,15 @@ async def test_update_projector(communicator, set_config):
"id": "test_id",
}
)
await communicator.receive_json_from()
await communicator.receive_json_from() # recieve initial projector data
# Change a config value
await set_config("general_event_name", "Test Event")
# We need two messages: The autoupdate and the projector data in this order
response = await communicator.receive_json_from()
assert response.get("type") == "autoupdate"
response = await communicator.receive_json_from()
type = response.get("type")
@ -629,4 +586,8 @@ async def test_update_projector_to_current_value(communicator, set_config):
# Change a config value to current_value
await set_config("general_event_name", "OpenSlides")
# We await an autoupdate, bot no projector data
response = await communicator.receive_json_from()
assert response.get("type") == "autoupdate"
assert await communicator.receive_nothing()

View File

@ -4,10 +4,12 @@ import pytest
from openslides.agenda import projector
from ...integration.helpers import get_all_data_provider
@pytest.fixture
def all_data():
all_data = {
def all_data_provider():
data = {
"agenda/item": {
1: {
"id": 1,
@ -82,14 +84,14 @@ def all_data():
}
}
return all_data
return get_all_data_provider(data)
@pytest.mark.asyncio
async def test_main_items(all_data):
async def test_main_items(all_data_provider):
element: Dict[str, Any] = {}
data = await projector.item_list_slide(all_data, element, 1)
data = await projector.item_list_slide(all_data_provider, element, 1)
assert data == {
"items": [
@ -106,10 +108,10 @@ async def test_main_items(all_data):
@pytest.mark.asyncio
async def test_all_items(all_data):
async def test_all_items(all_data_provider):
element: Dict[str, Any] = {"only_main_items": False}
data = await projector.item_list_slide(all_data, element, 1)
data = await projector.item_list_slide(all_data_provider, element, 1)
assert data == {
"items": [

View File

@ -4,14 +4,13 @@ import pytest
from openslides.motions import projector
from ...integration.helpers import all_data_config, all_data_users
from ...integration.helpers import get_all_data_provider
@pytest.fixture
def all_data():
return_value = all_data_config()
return_value.update(all_data_users())
return_value["motions/motion"] = {
def all_data_provider():
data = {}
data["motions/motion"] = {
1: {
"id": 1,
"identifier": "4",
@ -143,7 +142,7 @@ def all_data():
"change_recommendations": [],
},
}
return_value["motions/workflow"] = {
data["motions/workflow"] = {
1: {
"id": 1,
"name": "Simple Workflow",
@ -151,7 +150,7 @@ def all_data():
"first_state_id": 1,
}
}
return_value["motions/state"] = {
data["motions/state"] = {
1: {
"id": 1,
"name": "submitted",
@ -217,7 +216,7 @@ def all_data():
"workflow_id": 1,
},
}
return_value["motions/statute-paragraph"] = {
data["motions/statute-paragraph"] = {
1: {
"id": 1,
"title": "§1 Preamble",
@ -225,7 +224,7 @@ def all_data():
"weight": 10000,
}
}
return_value["motions/motion-change-recommendation"] = {
data["motions/motion-change-recommendation"] = {
1: {
"id": 1,
"motion_id": 1,
@ -251,14 +250,14 @@ def all_data():
"creation_time": "2019-02-09T09:54:06.256378+01:00",
},
}
return return_value
return get_all_data_provider(data)
@pytest.mark.asyncio
async def test_motion_slide(all_data):
async def test_motion_slide(all_data_provider):
element: Dict[str, Any] = {"id": 1}
data = await projector.motion_slide(all_data, element, 1)
data = await projector.motion_slide(all_data_provider, element, 1)
assert data == {
"identifier": "4",
@ -304,10 +303,10 @@ async def test_motion_slide(all_data):
@pytest.mark.asyncio
async def test_amendment_slide(all_data):
async def test_amendment_slide(all_data_provider):
element: Dict[str, Any] = {"id": 2}
data = await projector.motion_slide(all_data, element, 1)
data = await projector.motion_slide(all_data_provider, element, 1)
assert data == {
"identifier": "Ä1",
@ -331,10 +330,10 @@ async def test_amendment_slide(all_data):
@pytest.mark.asyncio
async def test_statute_amendment_slide(all_data):
async def test_statute_amendment_slide(all_data_provider):
element: Dict[str, Any] = {"id": 3}
data = await projector.motion_slide(all_data, element, 1)
data = await projector.motion_slide(all_data_provider, element, 1)
assert data == {
"identifier": None,