From 23842fd496d966c6c69af28eef64108781c117b1 Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Fri, 8 May 2020 16:17:14 +0200 Subject: [PATCH 1/4] Synchronize autoupdate code in the client If autoupdates are too fast, the first one may not be fully executed. Especially when the maxChangeId is not yet updated, the second Autoupdate will trigger a refresh, because for the client it "lay in the future". This can be prevented by synchronizing the autoupdate-handling code with a mutex. --- .../core/core-services/autoupdate.service.ts | 4 +++ .../core/core-services/data-store.service.ts | 1 + client/src/app/core/promises/mutex.ts | 30 +++++++++++++++++++ openslides/utils/cache_providers.py | 10 +++++-- openslides/utils/redis.py | 3 ++ 5 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 client/src/app/core/promises/mutex.ts diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index ceaeda77a..3303d0740 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@angular/core'; import { BaseModel } from '../../shared/models/base/base-model'; import { CollectionStringMapperService } from './collection-string-mapper.service'; import { DataStoreService, DataStoreUpdateManagerService } from './data-store.service'; +import { Mutex } from '../promises/mutex'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; interface AutoupdateFormat { @@ -45,6 +46,7 @@ interface AutoupdateFormat { providedIn: 'root' }) export class AutoupdateService { + private mutex = new Mutex(); /** * Constructor to create the AutoupdateService. Calls the constructor of the parent class. * @param websocketService @@ -79,11 +81,13 @@ export class AutoupdateService { * Handles the change ids of all autoupdates. */ private async storeResponse(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); if (autoupdate.all_data) { await this.storeAllData(autoupdate); } else { await this.storePartialAutoupdate(autoupdate); } + unlock(); } /** diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index a9ff9fc7e..b9ed07a08 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { + console.warn('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } diff --git a/client/src/app/core/promises/mutex.ts b/client/src/app/core/promises/mutex.ts new file mode 100644 index 000000000..3de879915 --- /dev/null +++ b/client/src/app/core/promises/mutex.ts @@ -0,0 +1,30 @@ +/** + * A mutex as described in every textbook + * + * Usage: + * ``` + * mutex = new Mutex(); // create e.g. as class member + * + * // Somewhere in the code to lock (must be async code!) + * const unlock = await this.mutex.lock() + * // ...the code to synchronize + * unlock() + * ``` + */ +export class Mutex { + private mutex = Promise.resolve(); + + public lock(): PromiseLike<() => void> { + // this will capture the code-to-synchronize + let begin: (unlock: () => void) => void = () => {}; + + // All "requests" to execute code are chained in a promise-chain + this.mutex = this.mutex.then(() => { + return new Promise(begin); + }); + + return new Promise(res => { + begin = res; + }); + } +} diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 1fb0cc60f..e049a5211 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -8,7 +8,11 @@ from django.core.exceptions import ImproperlyConfigured from typing_extensions import Protocol from . import logging -from .redis import read_only_redis_amount_replicas, use_redis +from .redis import ( + read_only_redis_amount_replicas, + read_only_redis_wait_timeout, + use_redis, +) from .schema_version import SchemaVersion from .utils import split_element_id, str_dict_to_bytes @@ -452,11 +456,11 @@ class RedisCacheProvider: raise e if not read_only and read_only_redis_amount_replicas is not None: reported_amount = await redis.wait( - read_only_redis_amount_replicas, 1000 + read_only_redis_amount_replicas, read_only_redis_wait_timeout ) if reported_amount != read_only_redis_amount_replicas: logger.warn( - f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!" + f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested after {read_only_redis_wait_timeout} ms!" ) return result diff --git a/openslides/utils/redis.py b/openslides/utils/redis.py index acc370f54..d5ff24065 100644 --- a/openslides/utils/redis.py +++ b/openslides/utils/redis.py @@ -11,6 +11,7 @@ logger = logging.getLogger(__name__) use_redis = False use_read_only_redis = False read_only_redis_amount_replicas = None +read_only_redis_wait_timeout = None try: import aioredis @@ -35,6 +36,8 @@ else: read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1) logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}") + read_only_redis_wait_timeout = getattr(settings, "WAIT_TIMEOUT", 1000) + logger.info(f"WAIT_TIMEOUT={read_only_redis_wait_timeout}") else: logger.info("Redis is not configured.") From bf88cea20048a15f167ff82a064e8251d61ed4c1 Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Fri, 15 May 2020 11:47:43 +0200 Subject: [PATCH 2/4] Rewrite projector code to be cache friendly This speeds up the requests/seconds by a factor of 100 --- openslides/agenda/projector.py | 147 ++++++++++++++------------- openslides/assignments/projector.py | 42 +++++--- openslides/core/projector.py | 42 ++++---- openslides/mediafiles/projector.py | 28 ++---- openslides/motions/projector.py | 151 +++++++++++++++++----------- openslides/topics/projector.py | 31 ++---- openslides/users/projector.py | 31 ++---- openslides/utils/autoupdate.py | 10 +- openslides/utils/cache.py | 19 ---- openslides/utils/cache_providers.py | 3 +- openslides/utils/projector.py | 82 +++++++++++---- 11 files changed, 310 insertions(+), 276 deletions(-) diff --git a/openslides/agenda/projector.py b/openslides/agenda/projector.py index 36d0a47d4..eb8b305ca 100644 --- a/openslides/agenda/projector.py +++ b/openslides/agenda/projector.py @@ -3,9 +3,10 @@ from typing import Any, Dict, List, Union from ..users.projector import get_user_name from ..utils.projector import ( - AllData, + ProjectorAllDataProvider, ProjectorElementException, get_config, + get_model, register_projector_slide, ) @@ -15,20 +16,24 @@ from ..utils.projector import ( # side effects. -async def get_sorted_agenda_items(all_data: AllData) -> List[Dict[str, Any]]: +async def get_sorted_agenda_items( + agenda_items: Dict[int, Dict[str, Any]] +) -> List[Dict[str, Any]]: """ Returns all sorted agenda items by id first and then weight, resulting in ordered items, if some have the same weight. """ return sorted( - sorted(all_data["agenda/item"].values(), key=lambda item: item["id"]), + sorted(agenda_items.values(), key=lambda item: item["id"]), key=lambda item: item["weight"], ) -async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, Any]]: +async def get_flat_tree( + agenda_items: Dict[int, Dict[str, Any]], parent_id: int = 0 +) -> List[Dict[str, Any]]: """ - Build the item tree from all_data. + Build the item tree from all_data_provider. Only build the tree from elements unterneath parent_id. @@ -38,16 +43,16 @@ async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, # Build a dict from an item_id to all its children children: Dict[int, List[int]] = defaultdict(list) - if "agenda/item" in all_data: - for item in await get_sorted_agenda_items(all_data): - if item["type"] == 1: # only normal items - children[item["parent_id"] or 0].append(item["id"]) + + for item in await get_sorted_agenda_items(agenda_items): + if item["type"] == 1: # only normal items + children[item["parent_id"] or 0].append(item["id"]) tree = [] - async def get_children(item_ids: List[int], depth: int) -> None: + def build_tree(item_ids: List[int], depth: int) -> None: for item_id in item_ids: - item = all_data["agenda/item"][item_id] + item = agenda_items[item_id] title_information = item["title_information"] title_information["_agenda_item_number"] = item["item_number"] tree.append( @@ -57,25 +62,29 @@ async def get_flat_tree(all_data: AllData, parent_id: int = 0) -> List[Dict[str, "depth": depth, } ) - await get_children(children[item_id], depth + 1) + build_tree(children[item_id], depth + 1) - await get_children(children[parent_id], 0) + build_tree(children[parent_id], 0) return tree async def item_list_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Item list slide. Returns all root items or all children of an item. """ - only_main_items = element.get("only_main_items", True) + # fetch all items, so they are cached: + all_agenda_items = await all_data_provider.get_collection("agenda/item") + only_main_items = element.get("only_main_items", True) if only_main_items: agenda_items = [] - for item in await get_sorted_agenda_items(all_data): + for item in await get_sorted_agenda_items(all_agenda_items): if item["parent_id"] is None and item["type"] == 1: title_information = item["title_information"] title_information["_agenda_item_number"] = item["item_number"] @@ -86,13 +95,15 @@ async def item_list_slide( } ) else: - agenda_items = await get_flat_tree(all_data) + agenda_items = await get_flat_tree(all_agenda_items) return {"items": agenda_items} async def list_of_speakers_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ List of speakers slide. @@ -104,35 +115,35 @@ async def list_of_speakers_slide( if list_of_speakers_id is None: raise ProjectorElementException("id is required for list of speakers slide") - return await get_list_of_speakers_slide_data(all_data, list_of_speakers_id) + return await get_list_of_speakers_slide_data(all_data_provider, list_of_speakers_id) async def get_list_of_speakers_slide_data( - all_data: AllData, list_of_speakers_id: int + all_data_provider: ProjectorAllDataProvider, list_of_speakers_id: int ) -> Dict[str, Any]: - try: - list_of_speakers = all_data["agenda/list-of-speakers"][list_of_speakers_id] - except KeyError: - raise ProjectorElementException( - f"List of speakers {list_of_speakers_id} does not exist" - ) + list_of_speakers = await get_model( + all_data_provider, "agenda/list-of-speakers", list_of_speakers_id + ) title_information = list_of_speakers["title_information"] # try to get the agenda item for the content object (which must not exist) - agenda_item_id = all_data[list_of_speakers["content_object"]["collection"]][ - list_of_speakers["content_object"]["id"] - ].get("agenda_item_id") - if agenda_item_id: - title_information["_agenda_item_number"] = all_data["agenda/item"][ - agenda_item_id - ]["item_number"] + content_object = await get_model( + all_data_provider, + list_of_speakers["content_object"]["collection"], + list_of_speakers["content_object"]["id"], + ) + agenda_item_id = content_object.get("agenda_item_id") + if agenda_item_id is not None: + agenda_item = await all_data_provider.get("agenda/item", agenda_item_id) + if agenda_item is not None: + title_information["_agenda_item_number"] = agenda_item["item_number"] # Partition speaker objects to waiting, current and finished speakers_waiting = [] speakers_finished = [] current_speaker = None for speaker in list_of_speakers["speakers"]: - user = await get_user_name(all_data, speaker["user_id"]) + user = await get_user_name(all_data_provider, speaker["user_id"]) formatted_speaker = { "user": user, "marked": speaker["marked"], @@ -151,8 +162,12 @@ async def get_list_of_speakers_slide_data( speakers_waiting = sorted(speakers_waiting, key=lambda s: s["weight"]) speakers_finished = sorted(speakers_finished, key=lambda s: s["end_time"]) - number_of_last_speakers = await get_config(all_data, "agenda_show_last_speakers") - number_of_next_speakers = await get_config(all_data, "agenda_show_next_speakers") + number_of_last_speakers = await get_config( + all_data_provider, "agenda_show_last_speakers" + ) + number_of_next_speakers = await get_config( + all_data_provider, "agenda_show_next_speakers" + ) if number_of_last_speakers == 0: speakers_finished = [] @@ -174,7 +189,7 @@ async def get_list_of_speakers_slide_data( async def get_current_list_of_speakers_id_for_projector( - all_data: AllData, projector: Dict[str, Any] + all_data_provider: ProjectorAllDataProvider, projector: Dict[str, Any] ) -> Union[int, None]: """ Search for elements, that do have a list of speakers: @@ -189,94 +204,88 @@ async def get_current_list_of_speakers_id_for_projector( continue collection = element["name"] id = element["id"] - if collection not in all_data or id not in all_data[collection]: + model = await all_data_provider.get(collection, id) + if model is None: continue - model = all_data[collection][id] if "list_of_speakers_id" not in model: continue - if not model["list_of_speakers_id"] in all_data["agenda/list-of-speakers"]: + list_of_speakers_id = model["list_of_speakers_id"] + los_exists = await all_data_provider.exists( + "agenda/list-of-speakers", list_of_speakers_id + ) + if not los_exists: continue - list_of_speakers_id = model["list_of_speakers_id"] break return list_of_speakers_id async def get_reference_projector( - all_data: AllData, projector_id: int + all_data_provider: ProjectorAllDataProvider, projector_id: int ) -> Dict[str, Any]: """ Returns the reference projector to the given projector (by id) """ - try: - this_projector = all_data["core/projector"][projector_id] - except KeyError: - raise ProjectorElementException(f"Projector {projector_id} does not exist") + this_projector = await get_model(all_data_provider, "core/projector", projector_id) reference_projector_id = this_projector["reference_projector_id"] or projector_id - try: - reference_projector = all_data["core/projector"][reference_projector_id] - except KeyError: - raise ProjectorElementException( - f"Projector {reference_projector_id} does not exist" - ) - - return reference_projector + return await get_model(all_data_provider, "core/projector", reference_projector_id) async def current_list_of_speakers_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ The current list of speakers slide. Creates the data for the given projector. """ - reference_projector = await get_reference_projector(all_data, projector_id) + reference_projector = await get_reference_projector(all_data_provider, projector_id) list_of_speakers_id = await get_current_list_of_speakers_id_for_projector( - all_data, reference_projector + all_data_provider, reference_projector ) if list_of_speakers_id is None: # no element found return {} - return await get_list_of_speakers_slide_data(all_data, list_of_speakers_id) + return await get_list_of_speakers_slide_data(all_data_provider, list_of_speakers_id) async def current_speaker_chyron_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Returns the username for the current speaker. """ # get projector for color information - projector = all_data["core/projector"][projector_id] + projector = await get_model(all_data_provider, "core/projector", projector_id) slide_data = { "background_color": projector["chyron_background_color"], "font_color": projector["chyron_font_color"], } - reference_projector = await get_reference_projector(all_data, projector_id) + reference_projector = await get_reference_projector(all_data_provider, projector_id) list_of_speakers_id = await get_current_list_of_speakers_id_for_projector( - all_data, reference_projector + all_data_provider, reference_projector ) if list_of_speakers_id is None: # no element found return slide_data # get list of speakers to search current speaker - try: - list_of_speakers = all_data["agenda/list-of-speakers"][list_of_speakers_id] - except KeyError: - raise ProjectorElementException( - f"List of speakers {list_of_speakers_id} does not exist" - ) + list_of_speakers = await get_model( + all_data_provider, "agenda/list-of-speakers", list_of_speakers_id + ) # find current speaker current_speaker = None for speaker in list_of_speakers["speakers"]: if speaker["begin_time"] is not None and speaker["end_time"] is None: - current_speaker = await get_user_name(all_data, speaker["user_id"]) + current_speaker = await get_user_name(all_data_provider, speaker["user_id"]) break if current_speaker is not None: diff --git a/openslides/assignments/projector.py b/openslides/assignments/projector.py index c868ad103..97c0be82a 100644 --- a/openslides/assignments/projector.py +++ b/openslides/assignments/projector.py @@ -1,25 +1,29 @@ from typing import Any, Dict, List from ..users.projector import get_user_name -from ..utils.projector import AllData, get_model, get_models, register_projector_slide +from ..utils.projector import ( + ProjectorAllDataProvider, + get_model, + get_models, + register_projector_slide, +) from .models import AssignmentPoll -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def assignment_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Assignment slide. """ - assignment = get_model(all_data, "assignments/assignment", element.get("id")) + assignment = await get_model( + all_data_provider, "assignments/assignment", element.get("id") + ) assignment_related_users: List[Dict[str, Any]] = [ - {"user": await get_user_name(all_data, aru["user_id"])} + {"user": await get_user_name(all_data_provider, aru["user_id"])} for aru in sorted( assignment["assignment_related_users"], key=lambda aru: aru["weight"] ) @@ -36,13 +40,19 @@ async def assignment_slide( async def assignment_poll_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Poll slide. """ - poll = get_model(all_data, "assignments/assignment-poll", element.get("id")) - assignment = get_model(all_data, "assignments/assignment", poll["assignment_id"]) + poll = await get_model( + all_data_provider, "assignments/assignment-poll", element.get("id") + ) + assignment = await get_model( + all_data_provider, "assignments/assignment", poll["assignment_id"] + ) poll_data = { key: poll[key] @@ -60,10 +70,14 @@ async def assignment_poll_slide( # Add options: poll_data["options"] = [] - options = get_models(all_data, "assignments/assignment-option", poll["options_id"]) + options = await get_models( + all_data_provider, "assignments/assignment-option", poll["options_id"] + ) for option in sorted(options, key=lambda option: option["weight"]): option_data: Dict[str, Any] = { - "user": {"short_name": await get_user_name(all_data, option["user_id"])} + "user": { + "short_name": await get_user_name(all_data_provider, option["user_id"]) + } } if poll["state"] == AssignmentPoll.STATE_PUBLISHED: option_data["yes"] = float(option["yes"]) diff --git a/openslides/core/projector.py b/openslides/core/projector.py index 9bbf75942..3c9f2ed82 100644 --- a/openslides/core/projector.py +++ b/openslides/core/projector.py @@ -1,20 +1,17 @@ from typing import Any, Dict from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, get_config, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def countdown_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Countdown slide. @@ -26,23 +23,21 @@ async def countdown_slide( id: 5, # Countdown ID } """ - countdown_id = element.get("id") or 1 - - try: - countdown = all_data["core/countdown"][countdown_id] - except KeyError: - raise ProjectorElementException(f"Countdown {countdown_id} does not exist") - + countdown = await get_model(all_data_provider, "core/countdown", element.get("id")) return { "description": countdown["description"], "running": countdown["running"], "countdown_time": countdown["countdown_time"], - "warning_time": await get_config(all_data, "agenda_countdown_warning_time"), + "warning_time": await get_config( + all_data_provider, "agenda_countdown_warning_time" + ), } async def message_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Message slide. @@ -54,16 +49,15 @@ async def message_slide( id: 5, # ProjectorMessage ID } """ - message_id = element.get("id") or 1 - - try: - return all_data["core/projector-message"][message_id] - except KeyError: - raise ProjectorElementException(f"Message {message_id} does not exist") + return await get_model( + all_data_provider, "core/projector-message", element.get("id") + ) async def clock_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: return {} diff --git a/openslides/mediafiles/projector.py b/openslides/mediafiles/projector.py index 196d5e8c8..b996c823f 100644 --- a/openslides/mediafiles/projector.py +++ b/openslides/mediafiles/projector.py @@ -1,35 +1,23 @@ from typing import Any, Dict from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def mediafile_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Slide for Mediafile. """ - mediafile_id = element.get("id") - - if mediafile_id is None: - raise ProjectorElementException("id is required for mediafile slide") - - try: - mediafile = all_data["mediafiles/mediafile"][mediafile_id] - except KeyError: - raise ProjectorElementException( - f"mediafile with id {mediafile_id} does not exist" - ) - + mediafile = await get_model( + all_data_provider, "mediafiles/mediafile", element.get("id") + ) return { "path": mediafile["path"], "mimetype": mediafile["mimetype"], diff --git a/openslides/motions/projector.py b/openslides/motions/projector.py index 74a9a399f..c7a39040c 100644 --- a/openslides/motions/projector.py +++ b/openslides/motions/projector.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional from ..users.projector import get_user_name from ..utils.projector import ( - AllData, + ProjectorAllDataProvider, ProjectorElementException, get_config, get_model, @@ -14,33 +14,31 @@ from .models import MotionPoll motion_placeholder_regex = re.compile(r"\[motion:(\d+)\]") -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - async def get_state( - all_data: AllData, motion: Dict[str, Any], state_id_key: str + all_data_provider: ProjectorAllDataProvider, + motion: Dict[str, Any], + state_id_key: str, ) -> Dict[str, Any]: """ Returns a state element from one motion. Raises an error if the state does not exist. """ - state = all_data["motions/state"].get(motion[state_id_key]) - if not state: + state = await all_data_provider.get("motions/state", motion[state_id_key]) + if state is None: raise ProjectorElementException( f"motion {motion['id']} can not be on the state with id {motion[state_id_key]}" ) return state -async def get_amendment_merge_into_motion_diff(all_data, amendment): +async def get_amendment_merge_into_motion_diff(all_data_provider, amendment): """ HINT: This implementation should be consistent to showInDiffView() in ViewMotionAmendedParagraph.ts """ if amendment["state_id"] is None: return 0 - state = await get_state(all_data, amendment, "state_id") + state = await get_state(all_data_provider, amendment, "state_id") if state["merge_amendment_into_final"] == -1: return 0 if state["merge_amendment_into_final"] == 1: @@ -48,36 +46,37 @@ async def get_amendment_merge_into_motion_diff(all_data, amendment): if amendment["recommendation_id"] is None: return 0 - recommendation = await get_state(all_data, amendment, "recommendation_id") + recommendation = await get_state(all_data_provider, amendment, "recommendation_id") if recommendation["merge_amendment_into_final"] == 1: return 1 return 0 -async def get_amendment_merge_into_motion_final(all_data, amendment): +async def get_amendment_merge_into_motion_final(all_data_provider, amendment): """ HINT: This implementation should be consistent to showInFinalView() in ViewMotionAmendedParagraph.ts """ if amendment["state_id"] is None: return 0 - state = await get_state(all_data, amendment, "state_id") + state = await get_state(all_data_provider, amendment, "state_id") if state["merge_amendment_into_final"] == 1: return 1 return 0 -async def get_amendments_for_motion(motion, all_data): +async def get_amendments_for_motion(motion, all_data_provider): amendment_data = [] - for amendment_id, amendment in all_data["motions/motion"].items(): + all_motions = await all_data_provider.get_collection("motions/motion") + for amendment_id, amendment in all_motions.items(): if amendment["parent_id"] == motion["id"]: merge_amendment_into_final = await get_amendment_merge_into_motion_final( - all_data, amendment + all_data_provider, amendment ) merge_amendment_into_diff = await get_amendment_merge_into_motion_diff( - all_data, amendment + all_data_provider, amendment ) amendment_data.append( { @@ -92,8 +91,10 @@ async def get_amendments_for_motion(motion, all_data): return amendment_data -async def get_amendment_base_motion(amendment, all_data): - motion = get_model(all_data, "motions/motion", amendment.get("parent_id")) +async def get_amendment_base_motion(amendment, all_data_provider): + motion = await get_model( + all_data_provider, "motions/motion", amendment.get("parent_id") + ) return { "identifier": motion["identifier"], @@ -102,15 +103,17 @@ async def get_amendment_base_motion(amendment, all_data): } -async def get_amendment_base_statute(amendment, all_data): - statute = get_model( - all_data, "motions/statute-paragraph", amendment.get("statute_paragraph_id") +async def get_amendment_base_statute(amendment, all_data_provider): + statute = await get_model( + all_data_provider, + "motions/statute-paragraph", + amendment.get("statute_paragraph_id"), ) return {"title": statute["title"], "text": statute["text"]} async def extend_reference_motion_dict( - all_data: AllData, + all_data_provider: ProjectorAllDataProvider, recommendation: Optional[str], referenced_motions: Dict[int, Dict[str, str]], ) -> None: @@ -127,15 +130,18 @@ async def extend_reference_motion_dict( ] for id in referenced_ids: # Put every referenced motion into the referenced_motions dict - if id not in referenced_motions and id in all_data["motions/motion"]: + referenced_motion = await all_data_provider.get("motions/motion", id) + if id not in referenced_motions and referenced_motion is not None: referenced_motions[id] = { - "title": all_data["motions/motion"][id]["title"], - "identifier": all_data["motions/motion"][id]["identifier"], + "title": referenced_motion["title"], + "identifier": referenced_motion["identifier"], } async def motion_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Motion slide. @@ -158,13 +164,16 @@ async def motion_slide( """ # Get motion mode = element.get( - "mode", await get_config(all_data, "motions_recommendation_text_mode") + "mode", await get_config(all_data_provider, "motions_recommendation_text_mode") ) - motion = get_model(all_data, "motions/motion", element.get("id")) + + # populate cache: + + motion = await get_model(all_data_provider, "motions/motion", element.get("id")) # Add submitters submitters = [ - await get_user_name(all_data, submitter["user_id"]) + await get_user_name(all_data_provider, submitter["user_id"]) for submitter in sorted( motion["submitters"], key=lambda submitter: submitter["weight"] ) @@ -172,14 +181,16 @@ async def motion_slide( # Get some needed config values show_meta_box = not await get_config( - all_data, "motions_disable_sidebox_on_projector" + all_data_provider, "motions_disable_sidebox_on_projector" ) show_referring_motions = not await get_config( - all_data, "motions_hide_referring_motions" + all_data_provider, "motions_hide_referring_motions" ) - line_length = await get_config(all_data, "motions_line_length") - line_numbering_mode = await get_config(all_data, "motions_default_line_numbering") - motions_preamble = await get_config(all_data, "motions_preamble") + line_length = await get_config(all_data_provider, "motions_line_length") + line_numbering_mode = await get_config( + all_data_provider, "motions_default_line_numbering" + ) + motions_preamble = await get_config(all_data_provider, "motions_preamble") # Query all change-recommendation and amendment related things. change_recommendations = [] # type: ignore @@ -187,17 +198,19 @@ async def motion_slide( base_motion = None base_statute = None if motion["statute_paragraph_id"]: - base_statute = await get_amendment_base_statute(motion, all_data) + base_statute = await get_amendment_base_statute(motion, all_data_provider) elif motion["parent_id"] is not None and motion["amendment_paragraphs"]: - base_motion = await get_amendment_base_motion(motion, all_data) + base_motion = await get_amendment_base_motion(motion, all_data_provider) else: for change_recommendation_id in motion["change_recommendations_id"]: - cr = all_data["motions/motion-change-recommendation"].get( - change_recommendation_id + cr = await get_model( + all_data_provider, + "motions/motion-change-recommendation", + change_recommendation_id, ) if cr is not None and not cr["internal"]: change_recommendations.append(cr) - amendments = await get_amendments_for_motion(motion, all_data) + amendments = await get_amendments_for_motion(motion, all_data_provider) # The base return value. More fields will get added below. return_value = { @@ -217,10 +230,10 @@ async def motion_slide( "line_numbering_mode": line_numbering_mode, } - if not await get_config(all_data, "motions_disable_text_on_projector"): + if not await get_config(all_data_provider, "motions_disable_text_on_projector"): return_value["text"] = motion["text"] - if not await get_config(all_data, "motions_disable_reason_on_projector"): + if not await get_config(all_data_provider, "motions_disable_reason_on_projector"): return_value["reason"] = motion["reason"] if mode == "final": @@ -228,40 +241,46 @@ async def motion_slide( # Add recommendation, if enabled in config (and the motion has one) if ( - not await get_config(all_data, "motions_disable_recommendation_on_projector") + not await get_config( + all_data_provider, "motions_disable_recommendation_on_projector" + ) and motion["recommendation_id"] ): - recommendation_state = await get_state(all_data, motion, "recommendation_id") + recommendation_state = await get_state( + all_data_provider, motion, "recommendation_id" + ) return_value["recommendation"] = recommendation_state["recommendation_label"] if recommendation_state["show_recommendation_extension_field"]: recommendation_extension = motion["recommendation_extension"] # All title information for referenced motions in the recommendation referenced_motions: Dict[int, Dict[str, str]] = {} await extend_reference_motion_dict( - all_data, recommendation_extension, referenced_motions + all_data_provider, recommendation_extension, referenced_motions ) return_value["recommendation_extension"] = recommendation_extension return_value["referenced_motions"] = referenced_motions if motion["statute_paragraph_id"]: return_value["recommender"] = await get_config( - all_data, "motions_statute_recommendations_by" + all_data_provider, "motions_statute_recommendations_by" ) else: return_value["recommender"] = await get_config( - all_data, "motions_recommendations_by" + all_data_provider, "motions_recommendations_by" ) if show_referring_motions: # Add recommendation-referencing motions return_value[ "recommendation_referencing_motions" - ] = await get_recommendation_referencing_motions(all_data, motion["id"]) + ] = await get_recommendation_referencing_motions( + all_data_provider, motion["id"] + ) return return_value async def get_recommendation_referencing_motions( - all_data: AllData, motion_id: int + all_data_provider: ProjectorAllDataProvider, motion_id: int ) -> Optional[List[Dict[str, Any]]]: """ Returns all title information for motions, that are referencing @@ -269,14 +288,15 @@ async def get_recommendation_referencing_motions( motions, None is returned (instead of []). """ recommendation_referencing_motions = [] - for motion in all_data["motions/motion"].values(): + all_motions = await all_data_provider.get_collection("motions/motion") + for motion in all_motions.values(): # Motion must have a recommendation and a recommendaiton extension if not motion["recommendation_id"] or not motion["recommendation_extension"]: continue # The recommendation must allow the extension field (there might be left-overs # in a motions recommendation extension..) - recommendation = await get_state(all_data, motion, "recommendation_id") + recommendation = await get_state(all_data_provider, motion, "recommendation_id") if not recommendation["show_recommendation_extension_field"]: continue @@ -297,12 +317,16 @@ async def get_recommendation_referencing_motions( async def motion_block_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Motion block slide. """ - motion_block = get_model(all_data, "motions/motion-block", element.get("id")) + motion_block = await get_model( + all_data_provider, "motions/motion-block", element.get("id") + ) # All motions in this motion block motions = [] @@ -311,7 +335,8 @@ async def motion_block_slide( referenced_motions: Dict[int, Dict[str, str]] = {} # Search motions. - for motion in all_data["motions/motion"].values(): + all_motions = await all_data_provider.get_collection("motions/motion") + for motion in all_motions.values(): if motion["motion_block_id"] == motion_block["id"]: motion_object = { "title": motion["title"], @@ -320,7 +345,9 @@ async def motion_block_slide( recommendation_id = motion["recommendation_id"] if recommendation_id is not None: - recommendation = await get_state(all_data, motion, "recommendation_id") + recommendation = await get_state( + all_data_provider, motion, "recommendation_id" + ) motion_object["recommendation"] = { "name": recommendation["recommendation_label"], "css_class": recommendation["css_class"], @@ -328,7 +355,7 @@ async def motion_block_slide( if recommendation["show_recommendation_extension_field"]: recommendation_extension = motion["recommendation_extension"] await extend_reference_motion_dict( - all_data, recommendation_extension, referenced_motions + all_data_provider, recommendation_extension, referenced_motions ) motion_object["recommendation_extension"] = recommendation_extension @@ -342,13 +369,15 @@ async def motion_block_slide( async def motion_poll_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Poll slide. """ - poll = get_model(all_data, "motions/motion-poll", element.get("id")) - motion = get_model(all_data, "motions/motion", poll["motion_id"]) + poll = await get_model(all_data_provider, "motions/motion-poll", element.get("id")) + motion = await get_model(all_data_provider, "motions/motion", poll["motion_id"]) poll_data = { key: poll[key] @@ -363,8 +392,8 @@ async def motion_poll_slide( } if poll["state"] == MotionPoll.STATE_PUBLISHED: - option = get_model( - all_data, "motions/motion-option", poll["options_id"][0] + option = await get_model( + all_data_provider, "motions/motion-option", poll["options_id"][0] ) # there can only be exactly one option poll_data["options"] = [ { diff --git a/openslides/topics/projector.py b/openslides/topics/projector.py index 18306aa81..3687c0f31 100644 --- a/openslides/topics/projector.py +++ b/openslides/topics/projector.py @@ -1,19 +1,16 @@ from typing import Any, Dict from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def topic_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Topic slide. @@ -22,22 +19,8 @@ async def topic_slide( * title * text """ - topic_id = element.get("id") - - if topic_id is None: - raise ProjectorElementException("id is required for topic slide") - - try: - topic = all_data["topics/topic"][topic_id] - except KeyError: - raise ProjectorElementException(f"topic with id {topic_id} does not exist") - - item_id = topic["agenda_item_id"] - try: - item = all_data["agenda/item"][item_id] - except KeyError: - raise ProjectorElementException(f"item with id {item_id} does not exist") - + topic = await get_model(all_data_provider, "topics/topic", element.get("id")) + item = await get_model(all_data_provider, "agenda/item", topic["agenda_item_id"]) return { "title": topic["title"], "text": topic["text"], diff --git a/openslides/users/projector.py b/openslides/users/projector.py index c1b02fc85..902a19a59 100644 --- a/openslides/users/projector.py +++ b/openslides/users/projector.py @@ -1,19 +1,16 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from ..utils.projector import ( - AllData, - ProjectorElementException, + ProjectorAllDataProvider, + get_model, register_projector_slide, ) -# Important: All functions have to be prune. This means, that thay can only -# access the data, that they get as argument and do not have any -# side effects. - - async def user_slide( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ User slide. @@ -21,22 +18,16 @@ async def user_slide( The returned dict can contain the following fields: * user """ - user_id = element.get("id") - - if user_id is None: - raise ProjectorElementException("id is required for user slide") - - return {"user": await get_user_name(all_data, user_id)} + return {"user": await get_user_name(all_data_provider, element.get("id"))} -async def get_user_name(all_data: AllData, user_id: int) -> str: +async def get_user_name( + all_data_provider: ProjectorAllDataProvider, user_id: Optional[int] +) -> str: """ Returns the short name for an user_id. """ - try: - user = all_data["users/user"][user_id] - except KeyError: - raise ProjectorElementException(f"user with id {user_id} does not exist") + user = await get_model(all_data_provider, "users/user", user_id) name_parts: List[str] = [] for name_part in ("title", "first_name", "last_name"): diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index 7701b1598..a43c5261a 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -92,13 +92,13 @@ class AutoupdateBundle: elements[full_data["id"]]["full_data"] = full_data # Save histroy here using sync code. - save_history(self.elements) + save_history(self.element_iterator) # Update cache and send autoupdate using async code. async_to_sync(self.async_handle_collection_elements)() @property - def elements(self) -> Iterable[AutoupdateElement]: + def element_iterator(self) -> Iterable[AutoupdateElement]: """ Iterator for all elements in this bundle """ for elements in self.autoupdate_elements.values(): yield from elements.values() @@ -110,7 +110,7 @@ class AutoupdateBundle: Returns the change_id """ cache_elements: Dict[str, Optional[Dict[str, Any]]] = {} - for element in self.elements: + for element in self.element_iterator: element_id = get_element_id(element["collection_string"], element["id"]) full_data = element.get("full_data") if full_data: @@ -253,7 +253,7 @@ class AutoupdateBundleMiddleware: return response -def save_history(elements: Iterable[AutoupdateElement]) -> Iterable: +def save_history(element_iterator: Iterable[AutoupdateElement]) -> Iterable: """ Thin wrapper around the call of history saving manager method. @@ -261,4 +261,4 @@ def save_history(elements: Iterable[AutoupdateElement]) -> Iterable: """ from ..core.models import History - return History.objects.add_elements(elements) + return History.objects.add_elements(element_iterator) diff --git a/openslides/utils/cache.py b/openslides/utils/cache.py index 00054ca74..14dec14d2 100644 --- a/openslides/utils/cache.py +++ b/openslides/utils/cache.py @@ -254,25 +254,6 @@ class ElementCache: all_data[collection] = await restricter(user_id, all_data[collection]) return dict(all_data) - async def get_all_data_dict(self) -> Dict[str, Dict[int, Dict[str, Any]]]: - """ - Returns all data with a dict (id <-> element) per collection: - { - : { - : - } - } - """ - all_data: Dict[str, Dict[int, Dict[str, Any]]] = defaultdict(dict) - for element_id, data in (await self.cache_provider.get_all_data()).items(): - collection, id = split_element_id(element_id) - element = json.loads(data.decode()) - element.pop( - "_no_delete_on_restriction", False - ) # remove special field for get_data_since - all_data[collection][id] = element - return dict(all_data) - async def get_collection_data(self, collection: str) -> Dict[int, Dict[str, Any]]: """ Returns the data for one collection as dict: {id: } diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index e049a5211..d74c55378 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -460,7 +460,8 @@ class RedisCacheProvider: ) if reported_amount != read_only_redis_amount_replicas: logger.warn( - f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested after {read_only_redis_wait_timeout} ms!" + f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} " + + f"requested after {read_only_redis_wait_timeout} ms!" ) return result diff --git a/openslides/utils/projector.py b/openslides/utils/projector.py index 9e20fddcd..7b66e87c6 100644 --- a/openslides/utils/projector.py +++ b/openslides/utils/projector.py @@ -5,16 +5,14 @@ Functions that handel the registration of projector elements and the rendering of the data to present it on the projector. """ -from typing import Any, Awaitable, Callable, Dict, List +from collections import defaultdict +from typing import Any, Awaitable, Callable, Dict, List, Optional +from . import logging from .cache import element_cache -AllData = Dict[str, Dict[int, Dict[str, Any]]] -ProjectorSlide = Callable[[AllData, Dict[str, Any], int], Awaitable[Dict[str, Any]]] - - -projector_slides: Dict[str, ProjectorSlide] = {} +logger = logging.getLogger(__name__) class ProjectorElementException(Exception): @@ -23,6 +21,44 @@ class ProjectorElementException(Exception): """ +class ProjectorAllDataProvider: + NON_EXISTENT_MARKER = object() + + def __init__(self) -> None: + self.cache: Any = defaultdict(dict) # fuu you mypy + self.fetched_collection: Dict[str, bool] = {} + + async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]: + cache_data = self.cache[collection].get(id) + if cache_data is None: + data: Any = await element_cache.get_element_data(collection, id) + if data is None: + data = ProjectorAllDataProvider.NON_EXISTENT_MARKER + self.cache[collection][id] = data + elif cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER: + return None + return self.cache[collection][id] + + async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: + if not self.fetched_collection.get(collection, False): + collection_data = await element_cache.get_collection_data(collection) + self.cache[collection] = collection_data + self.fetched_collection[collection] = True + return self.cache[collection] + + async def exists(self, collection: str, id: int) -> bool: + model = await self.get(collection, id) + return model is not None + + +ProjectorSlide = Callable[ + [ProjectorAllDataProvider, Dict[str, Any], int], Awaitable[Dict[str, Any]] +] + + +projector_slides: Dict[str, ProjectorSlide] = {} + + def register_projector_slide(name: str, slide: ProjectorSlide) -> None: """ Registers a projector slide. @@ -67,10 +103,11 @@ async def get_projector_data( if projector_ids is None: projector_ids = [] - all_data = await element_cache.get_all_data_dict() projector_data: Dict[int, List[Dict[str, Any]]] = {} + all_data_provider = ProjectorAllDataProvider() + projectors = await all_data_provider.get_collection("core/projector") - for projector_id, projector in all_data.get("core/projector", {}).items(): + for projector_id, projector in projectors.items(): if projector_ids and projector_id not in projector_ids: # only render the projector in question. continue @@ -83,7 +120,7 @@ async def get_projector_data( for element in projector["elements"]: projector_slide = projector_slides[element["name"]] try: - data = await projector_slide(all_data, element, projector_id) + data = await projector_slide(all_data_provider, element, projector_id) except ProjectorElementException as err: data = {"error": str(err)} projector_data[projector_id].append({"data": data, "element": element}) @@ -91,18 +128,23 @@ async def get_projector_data( return projector_data -async def get_config(all_data: AllData, key: str) -> Any: +async def get_config(all_data_provider: ProjectorAllDataProvider, key: str) -> Any: """ - Returns a config value from all_data. + Returns a config value from all_data_provider. + Triggers the cache early: It access `get_colelction` instead of `get`. It + allows for all successive queries for configs to be cached. """ from ..core.config import config config_id = (await config.async_get_key_to_id())[key] - return all_data[config.get_collection_string()][config_id]["value"] + configs = await all_data_provider.get_collection(config.get_collection_string()) + return configs[config_id]["value"] -def get_model(all_data: AllData, collection: str, id: Any) -> Dict[str, Any]: +async def get_model( + all_data_provider: ProjectorAllDataProvider, collection: str, id: Any +) -> Dict[str, Any]: """ Tries to get the model identified by the collection and id. If the id is invalid or the model not found, ProjectorElementExceptions will be raised. @@ -110,17 +152,19 @@ def get_model(all_data: AllData, collection: str, id: Any) -> Dict[str, Any]: if id is None: raise ProjectorElementException(f"id is required for {collection} slide") - try: - model = all_data[collection][id] - except KeyError: + model = await all_data_provider.get(collection, id) + if model is None: raise ProjectorElementException(f"{collection} with id {id} does not exist") return model -def get_models( - all_data: AllData, collection: str, ids: List[Any] +async def get_models( + all_data_provider: ProjectorAllDataProvider, collection: str, ids: List[Any] ) -> List[Dict[str, Any]]: """ Tries to fetch all given models. Models are required to be all of the collection `collection`. """ - return [get_model(all_data, collection, id) for id in ids] + logger.info( + f"Note: a call to `get_models` with {collection}/{ids}. This might be cache-intensive" + ) + return [await get_model(all_data_provider, collection, id) for id in ids] From d8b21c5fb5429b06b6b9253e032ce4f230485af0 Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Fri, 15 May 2020 18:24:21 +0200 Subject: [PATCH 3/4] (WIP) Ordered and delayed autoupdates: - Extracted autoupdate code from consumers - collect autoupdates until a AUTOUPDATE_DELAY is reached (since the first autoupdate) - Added the AUTOUPDATE_DELAY parameter in the settings.py - moved some autoupdate code to utils/autoupdate - moved core/websocket to utils/websocket_client_messages - add the autoupdate in the response (there are some todos left) - do not send autoupdates on error (4xx, 5xx) - the client blindly injects the autoupdate in the response - removed the unused autoupdate on/off feature - the clients sends now the maxChangeId (instead of maxChangeId+1) on connection - the server accepts this. --- SETTINGS.rst | 4 + .../core/core-services/autoupdate.service.ts | 63 +++++-- .../core/core-services/data-store.service.ts | 1 - .../app/core/core-services/http.service.ts | 50 +++-- .../core/core-services/openslides.service.ts | 7 +- .../core/core-services/operator.service.ts | 3 +- .../core/core-services/prioritize.service.ts | 2 +- .../core/core-services/websocket.service.ts | 29 +-- openslides/core/apps.py | 20 +- openslides/utils/autoupdate.py | 94 +++++++++- .../utils/consumer_autoupdate_strategy.py | 99 ++++++++++ openslides/utils/consumers.py | 175 +++++------------- openslides/utils/projector.py | 6 +- openslides/utils/timing.py | 27 +++ openslides/utils/websocket.py | 43 ++++- .../websocket_client_messages.py} | 67 +++---- tests/integration/helpers.py | 49 +++-- tests/integration/utils/test_consumers.py | 65 ++----- tests/unit/agenda/test_projector.py | 16 +- tests/unit/motions/test_projector.py | 31 ++-- 20 files changed, 501 insertions(+), 350 deletions(-) create mode 100644 openslides/utils/consumer_autoupdate_strategy.py create mode 100644 openslides/utils/timing.py rename openslides/{core/websocket.py => utils/websocket_client_messages.py} (79%) diff --git a/SETTINGS.rst b/SETTINGS.rst index 64163c001..dc42430f0 100644 --- a/SETTINGS.rst +++ b/SETTINGS.rst @@ -143,3 +143,7 @@ not affect the client. operator is in one of these groups, the client disconnected and reconnects again. All requests urls (including websockets) are now prefixed with `/prioritize`, so these requests from "prioritized clients" can be routed to different servers. + +`AUTOUPDATE_DELAY`: The delay to send autoupdates. This feature can be +deactivated by setting it to `None`. It is deactivated per default. The Delay is +given in seconds diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index 3303d0740..72fbc1f88 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -6,7 +6,7 @@ import { DataStoreService, DataStoreUpdateManagerService } from './data-store.se import { Mutex } from '../promises/mutex'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; -interface AutoupdateFormat { +export interface AutoupdateFormat { /** * All changed (and created) items as their full/restricted data grouped by their collection. */ @@ -37,6 +37,19 @@ interface AutoupdateFormat { all_data: boolean; } +export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat { + const format = obj as AutoupdateFormat; + return ( + obj && + typeof obj === 'object' && + format.changed !== undefined && + format.deleted !== undefined && + format.from_change_id !== undefined && + format.to_change_id !== undefined && + format.all_data !== undefined + ); +} + /** * Handles the initial update and automatic updates using the {@link WebsocketService} * Incoming objects, usually BaseModels, will be saved in the dataStore (`this.DS`) @@ -120,27 +133,40 @@ export class AutoupdateService { // Normal autoupdate if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) { - const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); - - // Delete the removed objects from the DataStore - for (const collection of Object.keys(autoupdate.deleted)) { - await this.DS.remove(collection, autoupdate.deleted[collection]); - } - - // Add the objects to the DataStore. - for (const collection of Object.keys(autoupdate.changed)) { - await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection])); - } - - await this.DS.flushToStorage(autoupdate.to_change_id); - - this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id); + this.injectAutupdateIntoDS(autoupdate, true); } else { // autoupdate fully in the future. we are missing something! this.requestChanges(); } } + public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); + console.debug('inject autoupdate', autoupdate); + this.injectAutupdateIntoDS(autoupdate, false); + unlock(); + } + + private async injectAutupdateIntoDS(autoupdate: AutoupdateFormat, flush: boolean): Promise { + const updateSlot = await this.DSUpdateManager.getNewUpdateSlot(this.DS); + + // Delete the removed objects from the DataStore + for (const collection of Object.keys(autoupdate.deleted)) { + await this.DS.remove(collection, autoupdate.deleted[collection]); + } + + // Add the objects to the DataStore. + for (const collection of Object.keys(autoupdate.changed)) { + await this.DS.add(this.mapObjectsToBaseModels(collection, autoupdate.changed[collection])); + } + + if (flush) { + await this.DS.flushToStorage(autoupdate.to_change_id); + } + + this.DSUpdateManager.commit(updateSlot, autoupdate.to_change_id); + } + /** * Creates baseModels for each plain object. If the collection is not registered, * A console error will be issued and an empty list returned. @@ -164,9 +190,8 @@ export class AutoupdateService { * The server should return an autoupdate with all new data. */ public requestChanges(): void { - const changeId = this.DS.maxChangeId === 0 ? 0 : this.DS.maxChangeId + 1; - console.log(`requesting changed objects with DS max change id ${changeId}`); - this.websocketService.send('getElements', { change_id: changeId }); + console.log(`requesting changed objects with DS max change id ${this.DS.maxChangeId}`); + this.websocketService.send('getElements', { change_id: this.DS.maxChangeId }); } /** diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index b9ed07a08..a9ff9fc7e 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,7 +258,6 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { - console.warn('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } diff --git a/client/src/app/core/core-services/http.service.ts b/client/src/app/core/core-services/http.service.ts index 7dfffc0ae..fbf1f01fc 100644 --- a/client/src/app/core/core-services/http.service.ts +++ b/client/src/app/core/core-services/http.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@angular/core'; import { TranslateService } from '@ngx-translate/core'; +import { AutoupdateFormat, AutoupdateService, isAutoupdateFormat } from './autoupdate.service'; import { OpenSlidesStatusService } from './openslides-status.service'; import { formatQueryParams, QueryParams } from '../definitions/query-params'; @@ -17,12 +18,12 @@ export enum HTTPMethod { DELETE = 'delete' } -export interface DetailResponse { +export interface ErrorDetailResponse { detail: string | string[]; args?: string[]; } -function isDetailResponse(obj: any): obj is DetailResponse { +function isErrorDetailResponse(obj: any): obj is ErrorDetailResponse { return ( obj && typeof obj === 'object' && @@ -31,6 +32,15 @@ function isDetailResponse(obj: any): obj is DetailResponse { ); } +interface AutoupdateResponse { + autoupdate: AutoupdateFormat; + data?: any; +} + +function isAutoupdateReponse(obj: any): obj is AutoupdateResponse { + return obj && typeof obj === 'object' && isAutoupdateFormat((obj as AutoupdateResponse).autoupdate); +} + /** * Service for managing HTTP requests. Allows to send data for every method. Also (TODO) will do generic error handling. */ @@ -55,7 +65,8 @@ export class HttpService { public constructor( private http: HttpClient, private translate: TranslateService, - private OSStatus: OpenSlidesStatusService + private OSStatus: OpenSlidesStatusService, + private autoupdateService: AutoupdateService ) { this.defaultHeaders = new HttpHeaders().set('Content-Type', 'application/json'); } @@ -82,7 +93,7 @@ export class HttpService { ): Promise { // end early, if we are in history mode if (this.OSStatus.isInHistoryMode && method !== HTTPMethod.GET) { - throw this.handleError('You cannot make changes while in history mode'); + throw this.processError('You cannot make changes while in history mode'); } // there is a current bug with the responseType. @@ -108,9 +119,10 @@ export class HttpService { }; try { - return await this.http.request(method, url, options).toPromise(); - } catch (e) { - throw this.handleError(e); + const responseData: T = await this.http.request(method, url, options).toPromise(); + return this.processResponse(responseData); + } catch (error) { + throw this.processError(error); } } @@ -120,7 +132,7 @@ export class HttpService { * @param e The error thrown. * @returns The prepared and translated message for the user */ - private handleError(e: any): string { + private processError(e: any): string { let error = this.translate.instant('Error') + ': '; // If the error is a string already, return it. if (typeof e === 'string') { @@ -142,12 +154,14 @@ export class HttpService { } else if (!e.error) { error += this.translate.instant("The server didn't respond."); } else if (typeof e.error === 'object') { - if (isDetailResponse(e.error)) { - error += this.processDetailResponse(e.error); + if (isErrorDetailResponse(e.error)) { + error += this.processErrorDetailResponse(e.error); } else { const errorList = Object.keys(e.error).map(key => { const capitalizedKey = key.charAt(0).toUpperCase() + key.slice(1); - return `${this.translate.instant(capitalizedKey)}: ${this.processDetailResponse(e.error[key])}`; + return `${this.translate.instant(capitalizedKey)}: ${this.processErrorDetailResponse( + e.error[key] + )}`; }); error = errorList.join(', '); } @@ -168,11 +182,9 @@ export class HttpService { * @param str a string or a string array to join together. * @returns Error text(s) as single string */ - private processDetailResponse(response: DetailResponse): string { + private processErrorDetailResponse(response: ErrorDetailResponse): string { let message: string; - if (response instanceof Array) { - message = response.join(' '); - } else if (response.detail instanceof Array) { + if (response.detail instanceof Array) { message = response.detail.join(' '); } else { message = response.detail; @@ -187,6 +199,14 @@ export class HttpService { return message; } + private processResponse(responseData: T): T { + if (isAutoupdateReponse(responseData)) { + this.autoupdateService.injectAutoupdateIgnoreChangeId(responseData.autoupdate); + responseData = responseData.data; + } + return responseData; + } + /** * Executes a get on a path with a certain object * @param path The path to send the request to. diff --git a/client/src/app/core/core-services/openslides.service.ts b/client/src/app/core/core-services/openslides.service.ts index 07d27a469..a35c47ed8 100644 --- a/client/src/app/core/core-services/openslides.service.ts +++ b/client/src/app/core/core-services/openslides.service.ts @@ -130,10 +130,7 @@ export class OpenSlidesService { * Init DS from cache and after this start the websocket service. */ private async setupDataStoreAndWebSocket(): Promise { - let changeId = await this.DS.initFromStorage(); - if (changeId > 0) { - changeId += 1; - } + const changeId = await this.DS.initFromStorage(); // disconnect the WS connection, if there was one. This is needed // to update the connection parameters, namely the cookies. If the user // is changed, the WS needs to reconnect, so the new connection holds the new @@ -141,7 +138,7 @@ export class OpenSlidesService { if (this.websocketService.isConnected) { await this.websocketService.close(); // Wait for the disconnect. } - await this.websocketService.connect({ changeId: changeId }); // Request changes after changeId. + await this.websocketService.connect(changeId); // Request changes after changeId. } /** diff --git a/client/src/app/core/core-services/operator.service.ts b/client/src/app/core/core-services/operator.service.ts index 822a5f855..e2f294be4 100644 --- a/client/src/app/core/core-services/operator.service.ts +++ b/client/src/app/core/core-services/operator.service.ts @@ -456,7 +456,8 @@ export class OperatorService implements OnAfterAppsLoaded { * Set the operators presence to isPresent */ public async setPresence(isPresent: boolean): Promise { - await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent); + const r = await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent); + console.log('operator', r); } /** diff --git a/client/src/app/core/core-services/prioritize.service.ts b/client/src/app/core/core-services/prioritize.service.ts index a4bcf744d..e7677ab3b 100644 --- a/client/src/app/core/core-services/prioritize.service.ts +++ b/client/src/app/core/core-services/prioritize.service.ts @@ -40,7 +40,7 @@ export class PrioritizeService { if (this.openSlidesStatusService.isPrioritizedClient !== opPrioritized) { console.log('Alter prioritization:', opPrioritized); this.openSlidesStatusService.isPrioritizedClient = opPrioritized; - this.websocketService.reconnect({ changeId: this.DS.maxChangeId }); + this.websocketService.reconnect(this.DS.maxChangeId); } } } diff --git a/client/src/app/core/core-services/websocket.service.ts b/client/src/app/core/core-services/websocket.service.ts index 17b4ad62e..b7208416c 100644 --- a/client/src/app/core/core-services/websocket.service.ts +++ b/client/src/app/core/core-services/websocket.service.ts @@ -55,14 +55,6 @@ export const WEBSOCKET_ERROR_CODES = { WRONG_FORMAT: 102 }; -/* - * Options for (re-)connecting. - */ -interface ConnectOptions { - changeId?: number; - enableAutoupdates?: boolean; -} - /** * Service that handles WebSocket connections. Other services can register themselfs * with {@method getOberservable} for a specific type of messages. The content will be published. @@ -207,7 +199,7 @@ export class WebsocketService { * * Uses NgZone to let all callbacks run in the angular context. */ - public async connect(options: ConnectOptions = {}, retry: boolean = false): Promise { + public async connect(changeId: number | null = null, retry: boolean = false): Promise { const websocketId = Math.random().toString(36).substring(7); this.websocketId = websocketId; @@ -220,17 +212,10 @@ export class WebsocketService { this.shouldBeClosed = false; } - // set defaults - options = Object.assign(options, { - enableAutoupdates: true - }); + const queryParams: QueryParams = {}; - const queryParams: QueryParams = { - autoupdate: options.enableAutoupdates - }; - - if (options.changeId !== undefined) { - queryParams.change_id = options.changeId; + if (changeId !== null) { + queryParams.change_id = changeId; } // Create the websocket @@ -398,7 +383,7 @@ export class WebsocketService { const timeout = Math.floor(Math.random() * 3000 + 2000); this.retryTimeout = setTimeout(() => { this.retryTimeout = null; - this.connect({ enableAutoupdates: true }, true); + this.connect(null, true); }, timeout); } } @@ -438,9 +423,9 @@ export class WebsocketService { * * @param options The options for the new connection */ - public async reconnect(options: ConnectOptions = {}): Promise { + public async reconnect(changeId: number | null = null): Promise { await this.close(); - await this.connect(options); + await this.connect(changeId); } /** diff --git a/openslides/core/apps.py b/openslides/core/apps.py index 3b4711b6e..3481e402a 100644 --- a/openslides/core/apps.py +++ b/openslides/core/apps.py @@ -34,16 +34,10 @@ class CoreAppConfig(AppConfig): ProjectionDefaultViewSet, TagViewSet, ) - from .websocket import ( - NotifyWebsocketClientMessage, - ConstantsWebsocketClientMessage, - GetElementsWebsocketClientMessage, - AutoupdateWebsocketClientMessage, - ListenToProjectors, - PingPong, - ) from ..utils.rest_api import router - from ..utils.websocket import register_client_message + + # Let all client websocket message register + from ..utils import websocket_client_messages # noqa # Collect all config variables before getting the constants. config.collect_config_variables_from_apps() @@ -92,14 +86,6 @@ class CoreAppConfig(AppConfig): self.get_model("Countdown").get_collection_string(), CountdownViewSet ) - # Register client messages - register_client_message(NotifyWebsocketClientMessage()) - register_client_message(ConstantsWebsocketClientMessage()) - register_client_message(GetElementsWebsocketClientMessage()) - register_client_message(AutoupdateWebsocketClientMessage()) - register_client_message(ListenToProjectors()) - register_client_message(PingPong()) - if "runserver" in sys.argv or "changeconfig" in sys.argv: from openslides.utils.startup import run_startup_hooks diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index a43c5261a..3d3a3e308 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -1,3 +1,4 @@ +import json import threading from collections import defaultdict from typing import Any, Dict, Iterable, List, Optional, Tuple, Union @@ -7,9 +8,22 @@ from channels.layers import get_channel_layer from django.db.models import Model from mypy_extensions import TypedDict -from .cache import element_cache, get_element_id +from .cache import ChangeIdTooLowError, element_cache, get_element_id from .projector import get_projector_data -from .utils import get_model_from_collection_string, is_iterable +from .timing import Timing +from .utils import get_model_from_collection_string, is_iterable, split_element_id + + +AutoupdateFormat = TypedDict( + "AutoupdateFormat", + { + "changed": Dict[str, List[Dict[str, Any]]], + "deleted": Dict[str, List[int]], + "from_change_id": int, + "to_change_id": int, + "all_data": bool, + }, +) class AutoupdateElementBase(TypedDict): @@ -66,13 +80,15 @@ class AutoupdateBundle: element["id"] ] = element - def done(self) -> None: + def done(self) -> Optional[int]: """ Finishes the bundle by resolving all missing data and passing it to the history and element cache. + + Returns the change id, if there are autoupdate elements. Otherwise none. """ if not self.autoupdate_elements: - return + return None for collection, elements in self.autoupdate_elements.items(): # Get all ids, that do not have a full_data key @@ -95,7 +111,7 @@ class AutoupdateBundle: save_history(self.element_iterator) # Update cache and send autoupdate using async code. - async_to_sync(self.async_handle_collection_elements)() + return async_to_sync(self.dispatch_autoupdate)() @property def element_iterator(self) -> Iterable[AutoupdateElement]: @@ -120,9 +136,11 @@ class AutoupdateBundle: cache_elements[element_id] = full_data return await element_cache.change_elements(cache_elements) - async def async_handle_collection_elements(self) -> None: + async def dispatch_autoupdate(self) -> int: """ Async helper function to update cache and send autoupdate. + + Return the change_id """ # Update cache change_id = await self.update_cache() @@ -130,21 +148,23 @@ class AutoupdateBundle: # Send autoupdate channel_layer = get_channel_layer() await channel_layer.group_send( - "autoupdate", {"type": "send_data", "change_id": change_id} + "autoupdate", {"type": "msg_new_change_id", "change_id": change_id} ) - projector_data = await get_projector_data() # Send projector + projector_data = await get_projector_data() channel_layer = get_channel_layer() await channel_layer.group_send( "projector", { - "type": "projector_changed", + "type": "msg_projector_data", "data": projector_data, "change_id": change_id, }, ) + return change_id + def inform_changed_data( instances: Union[Iterable[Model], Model], @@ -246,13 +266,67 @@ class AutoupdateBundleMiddleware: thread_id = threading.get_ident() autoupdate_bundle[thread_id] = AutoupdateBundle() + timing = Timing("request") + response = self.get_response(request) + timing() + + # rewrite the response by adding the autoupdate on any success-case (2xx status) bundle: AutoupdateBundle = autoupdate_bundle.pop(thread_id) - bundle.done() + if response.status_code >= 200 and response.status_code < 300: + change_id = bundle.done() + + if change_id is not None: + user_id = request.user.pk or 0 + # Inject the autoupdate in the response. + # The complete response body will be overwritten! + autoupdate = async_to_sync(get_autoupdate_data)( + change_id, change_id, user_id + ) + content = {"autoupdate": autoupdate, "data": response.data} + # Note: autoupdate may be none on skipped ones (which should not happen + # since the user has made the request....) + response.content = json.dumps(content) + + timing(True) return response +async def get_autoupdate_data( + from_change_id: int, to_change_id: int, user_id: int +) -> Optional[AutoupdateFormat]: + try: + changed_elements, deleted_element_ids = await element_cache.get_data_since( + user_id, from_change_id, to_change_id + ) + except ChangeIdTooLowError: + # The change_id is lower the the lowerst change_id in redis. Return all data + changed_elements = await element_cache.get_all_data_list(user_id) + all_data = True + deleted_elements: Dict[str, List[int]] = {} + else: + all_data = False + deleted_elements = defaultdict(list) + for element_id in deleted_element_ids: + collection_string, id = split_element_id(element_id) + deleted_elements[collection_string].append(id) + + # Check, if the autoupdate has any data. + if not changed_elements and not deleted_element_ids: + # Skip empty updates + return None + else: + # Normal autoupdate with data + return AutoupdateFormat( + changed=changed_elements, + deleted=deleted_elements, + from_change_id=from_change_id, + to_change_id=to_change_id, + all_data=all_data, + ) + + def save_history(element_iterator: Iterable[AutoupdateElement]) -> Iterable: """ Thin wrapper around the call of history saving manager method. diff --git a/openslides/utils/consumer_autoupdate_strategy.py b/openslides/utils/consumer_autoupdate_strategy.py new file mode 100644 index 000000000..139e3ecd7 --- /dev/null +++ b/openslides/utils/consumer_autoupdate_strategy.py @@ -0,0 +1,99 @@ +import asyncio +from asyncio import Task +from typing import Optional, cast + +from django.conf import settings + +from .autoupdate import get_autoupdate_data +from .cache import element_cache +from .websocket import ChangeIdTooHighException, ProtocollAsyncJsonWebsocketConsumer + + +AUTOUPDATE_DELAY = getattr(settings, "AUTOUPDATE_DELAY", None) + + +class ConsumerAutoupdateStrategy: + def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None: + self.consumer = consumer + # client_change_id = None: unknown -> set on first autoupdate or request_change_id + # client_change_id is int: the one + self.client_change_id: Optional[int] = None + self.max_seen_change_id = 0 + self.next_send_time = None + self.timer_task_handle: Optional[Task[None]] = None + self.lock = asyncio.Lock() + + async def request_change_id( + self, change_id: int, in_response: Optional[str] = None + ) -> None: + # This resets the server side tracking of the client's change id. + async with self.lock: + await self.stop_timer() + + self.max_seen_change_id = await element_cache.get_current_change_id() + self.client_change_id = change_id + + if self.client_change_id == self.max_seen_change_id: + # The client is up-to-date, so nothing will be done + return None + + if self.client_change_id > self.max_seen_change_id: + message = ( + f"Requested change_id {self.client_change_id} is higher than the " + + f"highest change_id {self.max_seen_change_id}." + ) + raise ChangeIdTooHighException(message, in_response=in_response) + + await self.send_autoupdate(in_response=in_response) + + async def new_change_id(self, change_id: int) -> None: + async with self.lock: + if self.client_change_id is None: + self.client_change_id = change_id + if change_id > self.max_seen_change_id: + self.max_seen_change_id = change_id + + if AUTOUPDATE_DELAY is None: # feature deactivated, send directly + await self.send_autoupdate() + elif self.timer_task_handle is None: + await self.start_timer() + + async def get_running_loop(self) -> asyncio.AbstractEventLoop: + if hasattr(asyncio, "get_running_loop"): + return asyncio.get_running_loop() # type: ignore + else: + return asyncio.get_event_loop() + + async def start_timer(self) -> None: + loop = await self.get_running_loop() + self.timer_task_handle = loop.create_task(self.timer_task()) + + async def stop_timer(self) -> None: + if self.timer_task_handle is not None: + self.timer_task_handle.cancel() + self.timer_task_handle = None + + async def timer_task(self) -> None: + try: + await asyncio.sleep(AUTOUPDATE_DELAY) + except asyncio.CancelledError: + return + + async with self.lock: + await self.send_autoupdate() + self.timer_task_handle = None + + async def send_autoupdate(self, in_response: Optional[str] = None) -> None: + max_change_id = ( + self.max_seen_change_id + ) # important to save this variable, because + # it can change during runtime. + autoupdate = await get_autoupdate_data( + cast(int, self.client_change_id), max_change_id, self.consumer.user_id + ) + if autoupdate is not None: + # It will be send, so we can set the client_change_id + self.client_change_id = max_change_id + await self.consumer.send_json( + type="autoupdate", content=autoupdate, in_response=in_response, + ) diff --git a/openslides/utils/consumers.py b/openslides/utils/consumers.py index 1073f831d..7432423f9 100644 --- a/openslides/utils/consumers.py +++ b/openslides/utils/consumers.py @@ -1,32 +1,19 @@ import time -from collections import defaultdict from typing import Any, Dict, List, Optional, cast from urllib.parse import parse_qs from channels.generic.websocket import AsyncWebsocketConsumer -from mypy_extensions import TypedDict -from ..utils.websocket import WEBSOCKET_CHANGE_ID_TOO_HIGH from . import logging from .auth import UserDoesNotExist, async_anonymous_is_enabled -from .cache import ChangeIdTooLowError, element_cache, split_element_id +from .cache import element_cache +from .consumer_autoupdate_strategy import ConsumerAutoupdateStrategy from .utils import get_worker_id -from .websocket import ProtocollAsyncJsonWebsocketConsumer +from .websocket import BaseWebsocketException, ProtocollAsyncJsonWebsocketConsumer logger = logging.getLogger("openslides.websocket") -AutoupdateFormat = TypedDict( - "AutoupdateFormat", - { - "changed": Dict[str, List[Dict[str, Any]]], - "deleted": Dict[str, List[int]], - "from_change_id": int, - "to_change_id": int, - "all_data": bool, - }, -) - class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): """ @@ -40,12 +27,11 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): ID counter for assigning each instance of this class an unique id. """ - skipped_autoupdate_from_change_id: Optional[int] = None - def __init__(self, *args: Any, **kwargs: Any) -> None: self.projector_hash: Dict[int, int] = {} SiteConsumer.ID_COUNTER += 1 self._id = get_worker_id() + "-" + str(SiteConsumer.ID_COUNTER) + self.autoupdate_strategy = ConsumerAutoupdateStrategy(self) super().__init__(*args, **kwargs) async def connect(self) -> None: @@ -56,11 +42,13 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): Sends the startup data to the user. """ + self.user_id = self.scope["user"]["id"] + self.connect_time = time.time() # self.scope['user'] is the full_data dict of the user. For an # anonymous user is it the dict {'id': 0} change_id = None - if not await async_anonymous_is_enabled() and not self.scope["user"]["id"]: + if not await async_anonymous_is_enabled() and not self.user_id: await self.accept() # workaround for #4009 await self.close() logger.debug(f"connect: denied ({self._id})") @@ -74,24 +62,23 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): change_id = int(query_string[b"change_id"][0]) except ValueError: await self.accept() # workaround for #4009 - await self.close() # TODO: Find a way to send an error code + await self.close() logger.debug(f"connect: wrong change id ({self._id})") return - if b"autoupdate" in query_string and query_string[b"autoupdate"][ - 0 - ].lower() not in [b"0", b"off", b"false"]: - # a positive value in autoupdate. Start autoupdate - await self.channel_layer.group_add("autoupdate", self.channel_name) - await self.accept() if change_id is not None: logger.debug(f"connect: change id {change_id} ({self._id})") - await self.send_autoupdate(change_id) + try: + await self.request_autoupdate(change_id) + except BaseWebsocketException as e: + await self.send_exception(e) else: logger.debug(f"connect: no change id ({self._id})") + await self.channel_layer.group_add("autoupdate", self.channel_name) + async def disconnect(self, close_code: int) -> None: """ A user disconnects. Remove it from autoupdate. @@ -102,110 +89,19 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): f"disconnect code={close_code} active_secs={active_seconds} ({self._id})" ) - async def send_notify(self, event: Dict[str, Any]) -> None: - """ - Send a notify message to the user. - """ - user_id = self.scope["user"]["id"] - item = event["incomming"] - - users = item.get("users") - reply_channels = item.get("replyChannels") - if ( - (isinstance(users, bool) and users) - or (isinstance(users, list) and user_id in users) - or ( - isinstance(reply_channels, list) and self.channel_name in reply_channels - ) - or (users is None and reply_channels is None) - ): - item["senderChannelName"] = event["senderChannelName"] - item["senderUserId"] = event["senderUserId"] - await self.send_json(type="notify", content=item) - - async def send_autoupdate( - self, - change_id: int, - max_change_id: Optional[int] = None, - in_response: Optional[str] = None, - ) -> None: - """ - Sends an autoupdate to the client from change_id to max_change_id. - If max_change_id is None, the current change id will be used. - """ - user_id = self.scope["user"]["id"] - - if max_change_id is None: - max_change_id = await element_cache.get_current_change_id() - - if change_id == max_change_id + 1: - # The client is up-to-date, so nothing will be done - return - - if change_id > max_change_id: - message = f"Requested change_id {change_id} is higher this highest change_id {max_change_id}." - await self.send_error( - code=WEBSOCKET_CHANGE_ID_TOO_HIGH, - message=message, - in_response=in_response, - ) - return - - try: - changed_elements, deleted_element_ids = await element_cache.get_data_since( - user_id, change_id, max_change_id - ) - except ChangeIdTooLowError: - # The change_id is lower the the lowerst change_id in redis. Return all data - changed_elements = await element_cache.get_all_data_list(user_id) - all_data = True - deleted_elements: Dict[str, List[int]] = {} - except UserDoesNotExist: - # Maybe the user was deleted, but a websocket connection is still open to the user. - # So we can close this connection and return. - await self.close() - return - else: - all_data = False - deleted_elements = defaultdict(list) - for element_id in deleted_element_ids: - collection_string, id = split_element_id(element_id) - deleted_elements[collection_string].append(id) - - # Check, if the autoupdate has any data. - if not changed_elements and not deleted_element_ids: - # Set the current from_change_id, if it is the first skipped autoupdate - if not self.skipped_autoupdate_from_change_id: - self.skipped_autoupdate_from_change_id = change_id - else: - # Normal autoupdate with data - from_change_id = change_id - - # If there is at least one skipped autoupdate, take the saved from_change_id - if self.skipped_autoupdate_from_change_id: - from_change_id = self.skipped_autoupdate_from_change_id - self.skipped_autoupdate_from_change_id = None - - await self.send_json( - type="autoupdate", - content=AutoupdateFormat( - changed=changed_elements, - deleted=deleted_elements, - from_change_id=from_change_id, - to_change_id=max_change_id, - all_data=all_data, - ), - in_response=in_response, - ) - - async def send_data(self, event: Dict[str, Any]) -> None: + async def msg_new_change_id(self, event: Dict[str, Any]) -> None: """ Send changed or deleted elements to the user. """ change_id = event["change_id"] - await self.send_autoupdate(change_id, max_change_id=change_id) + try: + await self.autoupdate_strategy.new_change_id(change_id) + except UserDoesNotExist: + # Maybe the user was deleted, but a websocket connection is still open to the user. + # So we can close this connection and return. + await self.close() - async def projector_changed(self, event: Dict[str, Any]) -> None: + async def msg_projector_data(self, event: Dict[str, Any]) -> None: """ The projector has changed. """ @@ -223,6 +119,33 @@ class SiteConsumer(ProtocollAsyncJsonWebsocketConsumer): if projector_data: await self.send_projector_data(projector_data, change_id=change_id) + async def msg_notify(self, event: Dict[str, Any]) -> None: + """ + Send a notify message to the user. + """ + item = event["incomming"] + + users = item.get("users") + reply_channels = item.get("replyChannels") + if ( + (isinstance(users, bool) and users) + or (isinstance(users, list) and self.user_id in users) + or ( + isinstance(reply_channels, list) and self.channel_name in reply_channels + ) + or (users is None and reply_channels is None) + ): + item["senderChannelName"] = event["senderChannelName"] + item["senderUserId"] = event["senderUserId"] + await self.send_json(type="notify", content=item) + + async def request_autoupdate( + self, change_id: int, in_response: Optional[str] = None + ) -> None: + await self.autoupdate_strategy.request_change_id( + change_id, in_response=in_response + ) + async def send_projector_data( self, data: Dict[int, Dict[str, Any]], diff --git a/openslides/utils/projector.py b/openslides/utils/projector.py index 7b66e87c6..3a8f1cc00 100644 --- a/openslides/utils/projector.py +++ b/openslides/utils/projector.py @@ -35,9 +35,11 @@ class ProjectorAllDataProvider: if data is None: data = ProjectorAllDataProvider.NON_EXISTENT_MARKER self.cache[collection][id] = data - elif cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER: + + cache_data = self.cache[collection][id] + if cache_data == ProjectorAllDataProvider.NON_EXISTENT_MARKER: return None - return self.cache[collection][id] + return cache_data async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: if not self.fetched_collection.get(collection, False): diff --git a/openslides/utils/timing.py b/openslides/utils/timing.py new file mode 100644 index 000000000..ebcc2f35c --- /dev/null +++ b/openslides/utils/timing.py @@ -0,0 +1,27 @@ +import time +from typing import List, Optional + +from . import logging + + +timelogger = logging.getLogger(__name__) + + +class Timing: + def __init__(self, name: str) -> None: + self.name = name + self.times: List[float] = [time.time()] + + def __call__(self, done: Optional[bool] = False) -> None: + self.times.append(time.time()) + if done: + self.printtime() + + def printtime(self) -> None: + s = f"{self.name}: " + for i in range(1, len(self.times)): + diff = self.times[i] - self.times[i - 1] + s += f"{i}: {diff:.5f} " + diff = self.times[-1] - self.times[0] + s += f"sum: {diff:.5f}" + timelogger.info(s) diff --git a/openslides/utils/websocket.py b/openslides/utils/websocket.py index b001b2dc9..42e5b802f 100644 --- a/openslides/utils/websocket.py +++ b/openslides/utils/websocket.py @@ -25,6 +25,26 @@ WEBSOCKET_WRONG_FORMAT = 102 # If the recieved data has not the expected format. +class BaseWebsocketException(Exception): + code: int + + def __init__(self, message: str, in_response: Optional[str] = None) -> None: + self.message = message + self.in_response = in_response + + +class NotAuthorizedException(BaseWebsocketException): + code = WEBSOCKET_NOT_AUTHORIZED + + +class ChangeIdTooHighException(BaseWebsocketException): + code = WEBSOCKET_CHANGE_ID_TOO_HIGH + + +class WrongFormatException(BaseWebsocketException): + code = WEBSOCKET_WRONG_FORMAT + + class AsyncCompressedJsonWebsocketConsumer(AsyncWebsocketConsumer): async def receive( self, @@ -122,6 +142,20 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): silence_errors=silence_errors, ) + async def send_exception( + self, e: BaseWebsocketException, silence_errors: Optional[bool] = True, + ) -> None: + """ + Send generic error messages with a custom status code (see above) and a text message. + """ + await self.send_json( + "error", + {"code": e.code, "message": e.message}, + None, + in_response=e.in_response, + silence_errors=silence_errors, + ) + async def receive_json(self, content: Any) -> None: # type: ignore """ Receives the json data, parses it and calls receive_content. @@ -140,9 +174,12 @@ class ProtocollAsyncJsonWebsocketConsumer(AsyncCompressedJsonWebsocketConsumer): ) return - await websocket_client_messages[content["type"]].receive_content( - self, content["content"], id=content["id"] - ) + try: + await websocket_client_messages[content["type"]].receive_content( + self, content["content"], id=content["id"] + ) + except BaseWebsocketException as e: + await self.send_exception(e) schema: Dict[str, Any] = { diff --git a/openslides/core/websocket.py b/openslides/utils/websocket_client_messages.py similarity index 79% rename from openslides/core/websocket.py rename to openslides/utils/websocket_client_messages.py index a43447305..d66eb1f58 100644 --- a/openslides/core/websocket.py +++ b/openslides/utils/websocket_client_messages.py @@ -1,21 +1,22 @@ from typing import Any, Dict, Optional -from ..utils import logging -from ..utils.auth import async_has_perm -from ..utils.constants import get_constants -from ..utils.projector import get_projector_data -from ..utils.stats import WebsocketLatencyLogger -from ..utils.websocket import ( - WEBSOCKET_NOT_AUTHORIZED, +from . import logging +from .auth import async_has_perm +from .constants import get_constants +from .projector import get_projector_data +from .stats import WebsocketLatencyLogger +from .websocket import ( BaseWebsocketClientMessage, + NotAuthorizedException, ProtocollAsyncJsonWebsocketConsumer, + register_client_message, ) logger = logging.getLogger(__name__) -class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): +class Notify(BaseWebsocketClientMessage): """ Websocket message from a client to send a message to other clients. """ @@ -59,13 +60,9 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): ) -> None: # Check if the user is allowed to send this notify message perm = self.notify_permissions.get(content["name"]) - if perm is not None and not await async_has_perm( - consumer.scope["user"]["id"], perm - ): - await consumer.send_error( - code=WEBSOCKET_NOT_AUTHORIZED, - message=f"You need '{perm}' to send this message.", - in_response=id, + if perm is not None and not await async_has_perm(consumer.user_id, perm): + raise NotAuthorizedException( + f"You need '{perm}' to send this message.", in_response=id, ) else: # Some logging @@ -84,15 +81,18 @@ class NotifyWebsocketClientMessage(BaseWebsocketClientMessage): await consumer.channel_layer.group_send( "site", { - "type": "send_notify", + "type": "msg_notify", "incomming": content, "senderChannelName": consumer.channel_name, - "senderUserId": consumer.scope["user"]["id"], + "senderUserId": consumer.user_id, }, ) -class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): +register_client_message(Notify()) + + +class Constants(BaseWebsocketClientMessage): """ The Client requests the constants. """ @@ -109,7 +109,10 @@ class ConstantsWebsocketClientMessage(BaseWebsocketClientMessage): ) -class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): +register_client_message(Constants()) + + +class GetElements(BaseWebsocketClientMessage): """ The Client request database elements. """ @@ -130,26 +133,10 @@ class GetElementsWebsocketClientMessage(BaseWebsocketClientMessage): self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str ) -> None: requested_change_id = content.get("change_id", 0) - await consumer.send_autoupdate(requested_change_id, in_response=id) + await consumer.request_autoupdate(requested_change_id, in_response=id) -class AutoupdateWebsocketClientMessage(BaseWebsocketClientMessage): - """ - The Client turns autoupdate on or off. - """ - - identifier = "autoupdate" - - async def receive_content( - self, consumer: "ProtocollAsyncJsonWebsocketConsumer", content: Any, id: str - ) -> None: - # Turn on or off the autoupdate for the client - if content: # accept any value, that can be interpreted as bool - await consumer.channel_layer.group_add("autoupdate", consumer.channel_name) - else: - await consumer.channel_layer.group_discard( - "autoupdate", consumer.channel_name - ) +register_client_message(GetElements()) class ListenToProjectors(BaseWebsocketClientMessage): @@ -198,6 +185,9 @@ class ListenToProjectors(BaseWebsocketClientMessage): await consumer.send_projector_data(projector_data, in_response=id) +register_client_message(ListenToProjectors()) + + class PingPong(BaseWebsocketClientMessage): """ Responds to pings from the client. @@ -220,3 +210,6 @@ class PingPong(BaseWebsocketClientMessage): await consumer.send_json(type="pong", content=latency, in_response=id) if latency is not None: await WebsocketLatencyLogger.add_latency(latency) + + +register_client_message(PingPong()) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index f83011d6e..2791e70be 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,9 +1,13 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, cast from openslides.core.config import config from openslides.core.models import Projector from openslides.users.models import User -from openslides.utils.projector import AllData, get_config, register_projector_slide +from openslides.utils.projector import ( + ProjectorAllDataProvider, + get_config, + register_projector_slide, +) class TConfig: @@ -94,19 +98,23 @@ class TProjector: async def slide1( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: """ Slide that shows the general_event_name. """ return { "name": "slide1", - "event_name": await get_config(all_data, "general_event_name"), + "event_name": await get_config(all_data_provider, "general_event_name"), } async def slide2( - all_data: AllData, element: Dict[str, Any], projector_id: int + all_data_provider: ProjectorAllDataProvider, + element: Dict[str, Any], + projector_id: int, ) -> Dict[str, Any]: return {"name": "slide2"} @@ -115,17 +123,26 @@ register_projector_slide("test/slide1", slide1) register_projector_slide("test/slide2", slide2) -def all_data_config() -> AllData: - return { - TConfig().get_collection_string(): { - element["id"]: element for element in TConfig().get_elements() - } - } +class TestProjectorAllDataProvider: + def __init__(self, data): + self.data = data + + async def get(self, collection: str, id: int) -> Optional[Dict[str, Any]]: + collection_data = await self.get_collection(collection) + return collection_data.get(id) + + async def get_collection(self, collection: str) -> Dict[int, Dict[str, Any]]: + return self.data.get(collection, {}) + + async def exists(self, collection: str, id: int) -> bool: + return (await self.get(collection, id)) is not None -def all_data_users() -> AllData: - return { - TUser().get_collection_string(): { - element["id"]: element for element in TUser().get_elements() - } +def get_all_data_provider(data) -> ProjectorAllDataProvider: + data[TConfig().get_collection_string()] = { + element["id"]: element for element in TConfig().get_elements() } + data[TUser().get_collection_string()] = { + element["id"]: element for element in TUser().get_elements() + } + return cast(ProjectorAllDataProvider, TestProjectorAllDataProvider(data)) diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 80bf93729..4f664ff2a 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -1,4 +1,3 @@ -import asyncio from importlib import import_module from typing import Optional, Tuple from unittest.mock import patch @@ -158,17 +157,7 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config): @pytest.mark.asyncio -async def test_changed_data_autoupdate_off(communicator, set_config): - await set_config("general_system_enable_anonymous", True) - await communicator.connect() - - # Change a config value - await set_config("general_event_name", "Test Event") - assert await communicator.receive_nothing() - - -@pytest.mark.asyncio -async def test_changed_data_autoupdate_on(get_communicator, set_config): +async def test_changed_data_autoupdate(get_communicator, set_config): await set_config("general_system_enable_anonymous", True) communicator = get_communicator("autoupdate=on") await communicator.connect() @@ -422,12 +411,12 @@ async def test_send_connect_up_to_date(communicator, set_config): {"type": "getElements", "content": {"change_id": 0}, "id": "test_id"} ) response1 = await communicator.receive_json_from() - first_change_id = response1.get("content")["to_change_id"] + max_change_id = response1.get("content")["to_change_id"] await communicator.send_json_to( { "type": "getElements", - "content": {"change_id": first_change_id + 1}, + "content": {"change_id": max_change_id}, "id": "test_id", } ) @@ -510,43 +499,6 @@ async def test_send_invalid_get_elements(communicator, set_config): assert response.get("in_response") == "test_id" -@pytest.mark.asyncio -async def test_turn_on_autoupdate(communicator, set_config): - await set_config("general_system_enable_anonymous", True) - await communicator.connect() - - await communicator.send_json_to( - {"type": "autoupdate", "content": "on", "id": "test_id"} - ) - await asyncio.sleep(0.01) - # Change a config value - await set_config("general_event_name", "Test Event") - response = await communicator.receive_json_from() - - id = config.get_key_to_id()["general_event_name"] - type = response.get("type") - content = response.get("content") - assert type == "autoupdate" - assert content["changed"] == { - "core/config": [{"id": id, "key": "general_event_name", "value": "Test Event"}] - } - - -@pytest.mark.asyncio -async def test_turn_off_autoupdate(get_communicator, set_config): - await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") - await communicator.connect() - - await communicator.send_json_to( - {"type": "autoupdate", "content": False, "id": "test_id"} - ) - await asyncio.sleep(0.01) - # Change a config value - await set_config("general_event_name", "Test Event") - assert await communicator.receive_nothing() - - @pytest.mark.asyncio async def test_listen_to_projector(communicator, set_config): await set_config("general_system_enable_anonymous", True) @@ -588,10 +540,15 @@ async def test_update_projector(communicator, set_config): "id": "test_id", } ) - await communicator.receive_json_from() + await communicator.receive_json_from() # recieve initial projector data # Change a config value await set_config("general_event_name", "Test Event") + + # We need two messages: The autoupdate and the projector data in this order + response = await communicator.receive_json_from() + assert response.get("type") == "autoupdate" + response = await communicator.receive_json_from() type = response.get("type") @@ -629,4 +586,8 @@ async def test_update_projector_to_current_value(communicator, set_config): # Change a config value to current_value await set_config("general_event_name", "OpenSlides") + # We await an autoupdate, bot no projector data + response = await communicator.receive_json_from() + assert response.get("type") == "autoupdate" + assert await communicator.receive_nothing() diff --git a/tests/unit/agenda/test_projector.py b/tests/unit/agenda/test_projector.py index 0f86a54ca..442be4079 100644 --- a/tests/unit/agenda/test_projector.py +++ b/tests/unit/agenda/test_projector.py @@ -4,10 +4,12 @@ import pytest from openslides.agenda import projector +from ...integration.helpers import get_all_data_provider + @pytest.fixture -def all_data(): - all_data = { +def all_data_provider(): + data = { "agenda/item": { 1: { "id": 1, @@ -82,14 +84,14 @@ def all_data(): } } - return all_data + return get_all_data_provider(data) @pytest.mark.asyncio -async def test_main_items(all_data): +async def test_main_items(all_data_provider): element: Dict[str, Any] = {} - data = await projector.item_list_slide(all_data, element, 1) + data = await projector.item_list_slide(all_data_provider, element, 1) assert data == { "items": [ @@ -106,10 +108,10 @@ async def test_main_items(all_data): @pytest.mark.asyncio -async def test_all_items(all_data): +async def test_all_items(all_data_provider): element: Dict[str, Any] = {"only_main_items": False} - data = await projector.item_list_slide(all_data, element, 1) + data = await projector.item_list_slide(all_data_provider, element, 1) assert data == { "items": [ diff --git a/tests/unit/motions/test_projector.py b/tests/unit/motions/test_projector.py index c829070d0..257d60150 100644 --- a/tests/unit/motions/test_projector.py +++ b/tests/unit/motions/test_projector.py @@ -4,14 +4,13 @@ import pytest from openslides.motions import projector -from ...integration.helpers import all_data_config, all_data_users +from ...integration.helpers import get_all_data_provider @pytest.fixture -def all_data(): - return_value = all_data_config() - return_value.update(all_data_users()) - return_value["motions/motion"] = { +def all_data_provider(): + data = {} + data["motions/motion"] = { 1: { "id": 1, "identifier": "4", @@ -143,7 +142,7 @@ def all_data(): "change_recommendations": [], }, } - return_value["motions/workflow"] = { + data["motions/workflow"] = { 1: { "id": 1, "name": "Simple Workflow", @@ -151,7 +150,7 @@ def all_data(): "first_state_id": 1, } } - return_value["motions/state"] = { + data["motions/state"] = { 1: { "id": 1, "name": "submitted", @@ -217,7 +216,7 @@ def all_data(): "workflow_id": 1, }, } - return_value["motions/statute-paragraph"] = { + data["motions/statute-paragraph"] = { 1: { "id": 1, "title": "§1 Preamble", @@ -225,7 +224,7 @@ def all_data(): "weight": 10000, } } - return_value["motions/motion-change-recommendation"] = { + data["motions/motion-change-recommendation"] = { 1: { "id": 1, "motion_id": 1, @@ -251,14 +250,14 @@ def all_data(): "creation_time": "2019-02-09T09:54:06.256378+01:00", }, } - return return_value + return get_all_data_provider(data) @pytest.mark.asyncio -async def test_motion_slide(all_data): +async def test_motion_slide(all_data_provider): element: Dict[str, Any] = {"id": 1} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": "4", @@ -304,10 +303,10 @@ async def test_motion_slide(all_data): @pytest.mark.asyncio -async def test_amendment_slide(all_data): +async def test_amendment_slide(all_data_provider): element: Dict[str, Any] = {"id": 2} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": "Ä1", @@ -331,10 +330,10 @@ async def test_amendment_slide(all_data): @pytest.mark.asyncio -async def test_statute_amendment_slide(all_data): +async def test_statute_amendment_slide(all_data_provider): element: Dict[str, Any] = {"id": 3} - data = await projector.motion_slide(all_data, element, 1) + data = await projector.motion_slide(all_data_provider, element, 1) assert data == { "identifier": None, From 0eee8397364be40cd90627000c9520bbeb54d20d Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Thu, 28 May 2020 13:53:01 +0200 Subject: [PATCH 4/4] Small improvements and first attempt to make to poll progress responsive to massive autoupdates. The "optimization" didn't help, so this has to be continued in another PR. --- .../core/core-services/autoupdate.service.ts | 8 +- .../core/core-services/data-store.service.ts | 8 ++ .../core/core-services/operator.service.ts | 3 +- .../legal-notice/legal-notice.component.html | 20 ++++ .../legal-notice/legal-notice.component.ts | 14 ++- .../motion-detail/motion-detail.component.ts | 2 +- .../poll-progress.component.html | 2 +- .../poll-progress/poll-progress.component.ts | 98 +++++++++++++------ openslides/poll/views.py | 8 +- openslides/utils/autoupdate.py | 15 ++- openslides/utils/cache_providers.py | 2 +- .../utils/consumer_autoupdate_strategy.py | 23 +++-- openslides/utils/models.py | 2 +- tests/integration/utils/test_consumers.py | 15 +-- 14 files changed, 159 insertions(+), 61 deletions(-) diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index 72fbc1f88..a286bc3e9 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -60,6 +60,7 @@ export function isAutoupdateFormat(obj: any): obj is AutoupdateFormat { }) export class AutoupdateService { private mutex = new Mutex(); + /** * Constructor to create the AutoupdateService. Calls the constructor of the parent class. * @param websocketService @@ -104,7 +105,7 @@ export class AutoupdateService { } /** - * Stores all data from the autoupdate. This means, that the DS is resettet and filled with just the + * Stores all data from the autoupdate. This means, that the DS is resetted and filled with just the * given data from the autoupdate. * @param autoupdate The autoupdate */ @@ -133,9 +134,10 @@ export class AutoupdateService { // Normal autoupdate if (autoupdate.from_change_id <= maxChangeId + 1 && autoupdate.to_change_id > maxChangeId) { - this.injectAutupdateIntoDS(autoupdate, true); + await this.injectAutupdateIntoDS(autoupdate, true); } else { // autoupdate fully in the future. we are missing something! + console.log('Autoupdate in the future', maxChangeId, autoupdate.from_change_id, autoupdate.to_change_id); this.requestChanges(); } } @@ -143,7 +145,7 @@ export class AutoupdateService { public async injectAutoupdateIgnoreChangeId(autoupdate: AutoupdateFormat): Promise { const unlock = await this.mutex.lock(); console.debug('inject autoupdate', autoupdate); - this.injectAutupdateIntoDS(autoupdate, false); + await this.injectAutupdateIntoDS(autoupdate, false); unlock(); } diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index a9ff9fc7e..2fa19e4c7 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { + console.log('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } @@ -665,4 +666,11 @@ export class DataStoreService { await this.storageService.set(DataStoreService.cachePrefix + 'DS', this.jsonStore); await this.storageService.set(DataStoreService.cachePrefix + 'maxChangeId', changeId); } + + public print(): void { + console.log('Max change id', this.maxChangeId); + console.log('json storage'); + console.log(JSON.stringify(this.jsonStore)); + console.log(this.modelStore); + } } diff --git a/client/src/app/core/core-services/operator.service.ts b/client/src/app/core/core-services/operator.service.ts index e2f294be4..822a5f855 100644 --- a/client/src/app/core/core-services/operator.service.ts +++ b/client/src/app/core/core-services/operator.service.ts @@ -456,8 +456,7 @@ export class OperatorService implements OnAfterAppsLoaded { * Set the operators presence to isPresent */ public async setPresence(isPresent: boolean): Promise { - const r = await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent); - console.log('operator', r); + await this.http.post(environment.urlPrefix + '/users/setpresence/', isPresent); } /** diff --git a/client/src/app/site/common/components/legal-notice/legal-notice.component.html b/client/src/app/site/common/components/legal-notice/legal-notice.component.html index 31dbb9e57..56136f488 100644 --- a/client/src/app/site/common/components/legal-notice/legal-notice.component.html +++ b/client/src/app/site/common/components/legal-notice/legal-notice.component.html @@ -32,6 +32,26 @@ {{ 'Check for updates' | translate }} + +
+ +
+ + +
+ +
+ +
+ +
+
diff --git a/client/src/app/site/common/components/legal-notice/legal-notice.component.ts b/client/src/app/site/common/components/legal-notice/legal-notice.component.ts index bbacac6da..fd61ea876 100644 --- a/client/src/app/site/common/components/legal-notice/legal-notice.component.ts +++ b/client/src/app/site/common/components/legal-notice/legal-notice.component.ts @@ -4,6 +4,7 @@ import { Title } from '@angular/platform-browser'; import { TranslateService } from '@ngx-translate/core'; +import { DataStoreService } from 'app/core/core-services/data-store.service'; import { OpenSlidesService } from 'app/core/core-services/openslides.service'; import { OperatorService, Permission } from 'app/core/core-services/operator.service'; import { ConfigRepositoryService } from 'app/core/repositories/config/config-repository.service'; @@ -25,6 +26,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit { */ public legalNotice = ''; + public showDevTools = false; + /** * Constructor. */ @@ -35,7 +38,8 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit { private openSlidesService: OpenSlidesService, private update: UpdateService, private configRepo: ConfigRepositoryService, - private operator: OperatorService + private operator: OperatorService, + private DS: DataStoreService ) { super(title, translate, matSnackbar); } @@ -67,4 +71,12 @@ export class LegalNoticeComponent extends BaseViewComponent implements OnInit { public canManage(): boolean { return this.operator.hasPerms(Permission.coreCanManageConfig); } + + public printDS(): void { + this.DS.print(); + } + + public getThisComponent(): void { + console.log(this); + } } diff --git a/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts b/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts index beda8a319..8cf8d31c9 100644 --- a/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts +++ b/client/src/app/site/motions/modules/motion-detail/components/motion-detail/motion-detail.component.ts @@ -554,7 +554,7 @@ export class MotionDetailComponent extends BaseViewComponent implements OnInit, this.motionFilterService.initFilters(this.motionObserver); this.motionSortService.initSorting(this.motionFilterService.outputObservable); this.sortedMotionsObservable = this.motionSortService.outputObservable; - } else if (this.motion.parent_id) { + } else if (this.motion && this.motion.parent_id) { // only use the amendments for this motion this.amendmentFilterService.initFilters(this.repo.amendmentsTo(this.motion.parent_id)); this.amendmentSortService.initSorting(this.amendmentFilterService.outputObservable); diff --git a/client/src/app/site/polls/components/poll-progress/poll-progress.component.html b/client/src/app/site/polls/components/poll-progress/poll-progress.component.html index 6a5f5cfb5..f8cffffd3 100644 --- a/client/src/app/site/polls/components/poll-progress/poll-progress.component.html +++ b/client/src/app/site/polls/components/poll-progress/poll-progress.component.html @@ -1,6 +1,6 @@
- {{ poll.votescast }} / {{ max }} + {{ votescast }} / {{ max }}
{{ 'Received votes' | translate }} diff --git a/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts b/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts index 8ecedb614..909f86df0 100644 --- a/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts +++ b/client/src/app/site/polls/components/poll-progress/poll-progress.component.ts @@ -1,58 +1,98 @@ -import { Component, Input, OnInit } from '@angular/core'; +import { Component, Input } from '@angular/core'; import { MatSnackBar } from '@angular/material/snack-bar'; import { Title } from '@angular/platform-browser'; import { TranslateService } from '@ngx-translate/core'; -import { map } from 'rxjs/operators'; +import { Subscription } from 'rxjs'; +import { MotionPollRepositoryService } from 'app/core/repositories/motions/motion-poll-repository.service'; import { UserRepositoryService } from 'app/core/repositories/users/user-repository.service'; import { BaseViewComponent } from 'app/site/base/base-view'; import { ViewBasePoll } from 'app/site/polls/models/view-base-poll'; +import { ViewUser } from 'app/site/users/models/view-user'; @Component({ selector: 'os-poll-progress', templateUrl: './poll-progress.component.html', styleUrls: ['./poll-progress.component.scss'] }) -export class PollProgressComponent extends BaseViewComponent implements OnInit { - @Input() - public poll: ViewBasePoll; +export class PollProgressComponent extends BaseViewComponent { + private pollId: number = null; + private pollSubscription: Subscription = null; + @Input() + public set poll(value: ViewBasePoll) { + if (value.id !== this.pollId) { + this.pollId = value.id; + + if (this.pollSubscription !== null) { + this.pollSubscription.unsubscribe(); + this.pollSubscription = null; + } + + this.pollSubscription = this.pollRepo.getViewModelObservable(this.pollId).subscribe(poll => { + if (poll) { + this._poll = poll; + + // We may cannot use this.poll.votescast during the voting, since it can + // be reported with false values from the server + // -> calculate the votes on our own. + const ids = new Set(); + for (const option of this.poll.options) { + for (const vote of option.votes) { + if (vote.user_id) { + ids.add(vote.user_id); + } + } + } + this.votescast = ids.size; + + // But sometimes there are not enough votes (poll.votescast is higher). + // If this happens, take the value from the poll + if (this.poll.votescast > this.votescast) { + this.votescast = this.poll.votescast; + } + + this.calculateMaxUsers(); + } + }); + } + } + public get poll(): ViewBasePoll { + return this._poll; + } + private _poll: ViewBasePoll; + + public votescast: number; public max: number; + public valueInPercent: number; public constructor( title: Title, protected translate: TranslateService, snackbar: MatSnackBar, - private userRepo: UserRepositoryService + private userRepo: UserRepositoryService, + private pollRepo: MotionPollRepositoryService ) { super(title, translate, snackbar); + this.userRepo.getViewModelListObservable().subscribe(users => { + if (users) { + this.calculateMaxUsers(users); + } + }); } - public get valueInPercent(): number { - if (this.poll) { - return (this.poll.votesvalid / this.max) * 100; - } else { - return 0; + private calculateMaxUsers(allUsers?: ViewUser[]): void { + if (!this.poll) { + return; + } + if (!allUsers) { + allUsers = this.userRepo.getViewModelList(); } - } - /** - * OnInit. - * Sets the observable for groups. - */ - public ngOnInit(): void { - if (this.poll) { - this.userRepo - .getViewModelListObservable() - .pipe( - map(users => - users.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length) - ) - ) - .subscribe(users => { - this.max = users.length; - }); - } + allUsers = allUsers.filter(user => user.is_present && this.poll.groups_id.intersect(user.groups_id).length); + + this.max = allUsers.length; + this.valueInPercent = this.poll ? (this.votescast / this.max) * 100 : 0; } } diff --git a/openslides/poll/views.py b/openslides/poll/views.py index fabb87f9a..06da7637b 100644 --- a/openslides/poll/views.py +++ b/openslides/poll/views.py @@ -156,9 +156,11 @@ class BasePollViewSet(ModelViewSet): poll.state = BasePoll.STATE_PUBLISHED poll.save() - inform_changed_data(vote.user for vote in poll.get_votes().all() if vote.user) - inform_changed_data(poll.get_votes()) - inform_changed_data(poll.get_options()) + inform_changed_data( + (vote.user for vote in poll.get_votes().all() if vote.user), final_data=True + ) + inform_changed_data(poll.get_votes(), final_data=True) + inform_changed_data(poll.get_options(), final_data=True) return Response() @detail_route(methods=["POST"]) diff --git a/openslides/utils/autoupdate.py b/openslides/utils/autoupdate.py index 3d3a3e308..d1fffe8df 100644 --- a/openslides/utils/autoupdate.py +++ b/openslides/utils/autoupdate.py @@ -111,7 +111,8 @@ class AutoupdateBundle: save_history(self.element_iterator) # Update cache and send autoupdate using async code. - return async_to_sync(self.dispatch_autoupdate)() + change_id = async_to_sync(self.dispatch_autoupdate)() + return change_id @property def element_iterator(self) -> Iterable[AutoupdateElement]: @@ -172,6 +173,7 @@ def inform_changed_data( user_id: Optional[int] = None, disable_history: bool = False, no_delete_on_restriction: bool = False, + final_data: bool = False, ) -> None: """ Informs the autoupdate system and the caching system about the creation or @@ -187,8 +189,10 @@ def inform_changed_data( instances = (instances,) root_instances = set(instance.get_root_rest_element() for instance in instances) - elements = [ - AutoupdateElement( + + elements = [] + for root_instance in root_instances: + element = AutoupdateElement( id=root_instance.get_rest_pk(), collection_string=root_instance.get_collection_string(), disable_history=disable_history, @@ -196,8 +200,9 @@ def inform_changed_data( user_id=user_id, no_delete_on_restriction=no_delete_on_restriction, ) - for root_instance in root_instances - ] + if final_data: + element["full_data"] = root_instance.get_full_data() + elements.append(element) inform_elements(elements) diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index d74c55378..a99ef24df 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -261,7 +261,7 @@ class RedisCacheProvider: async def add_to_full_data(self, data: Dict[str, str]) -> None: async with get_connection() as redis: - redis.hmset_dict(self.full_data_cache_key, data) + await redis.hmset_dict(self.full_data_cache_key, data) async def data_exists(self) -> bool: """ diff --git a/openslides/utils/consumer_autoupdate_strategy.py b/openslides/utils/consumer_autoupdate_strategy.py index 139e3ecd7..f5a03cb29 100644 --- a/openslides/utils/consumer_autoupdate_strategy.py +++ b/openslides/utils/consumer_autoupdate_strategy.py @@ -16,7 +16,8 @@ class ConsumerAutoupdateStrategy: def __init__(self, consumer: ProtocollAsyncJsonWebsocketConsumer) -> None: self.consumer = consumer # client_change_id = None: unknown -> set on first autoupdate or request_change_id - # client_change_id is int: the one + # client_change_id is int: the change_id, the client knows about, so the next + # update must be from client_change_id+1 .. self.client_change_id: Optional[int] = None self.max_seen_change_id = 0 self.next_send_time = None @@ -26,11 +27,16 @@ class ConsumerAutoupdateStrategy: async def request_change_id( self, change_id: int, in_response: Optional[str] = None ) -> None: + """ + The change id is not inclusive, so the client is on change_id and wants + data from change_id+1 .. now + """ # This resets the server side tracking of the client's change id. async with self.lock: await self.stop_timer() self.max_seen_change_id = await element_cache.get_current_change_id() + print(self.max_seen_change_id) self.client_change_id = change_id if self.client_change_id == self.max_seen_change_id: @@ -49,7 +55,9 @@ class ConsumerAutoupdateStrategy: async def new_change_id(self, change_id: int) -> None: async with self.lock: if self.client_change_id is None: - self.client_change_id = change_id + # The -1 is to send this autoupdate as the first one to he client. + # Remember: the client_change_id is the change_id the client knows about + self.client_change_id = change_id - 1 if change_id > self.max_seen_change_id: self.max_seen_change_id = change_id @@ -84,12 +92,13 @@ class ConsumerAutoupdateStrategy: self.timer_task_handle = None async def send_autoupdate(self, in_response: Optional[str] = None) -> None: - max_change_id = ( - self.max_seen_change_id - ) # important to save this variable, because - # it can change during runtime. + # it is important to save this variable, because it can change during runtime. + max_change_id = self.max_seen_change_id + # here, 1 is added to the change_id, because the client_change_id is the id the client + # *knows* about -> the client needs client_change_id+1 since get_autoupdate_data is + # inclusive [change_id .. max_change_id]. autoupdate = await get_autoupdate_data( - cast(int, self.client_change_id), max_change_id, self.consumer.user_id + cast(int, self.client_change_id) + 1, max_change_id, self.consumer.user_id ) if autoupdate is not None: # It will be send, so we can set the client_change_id diff --git a/openslides/utils/models.py b/openslides/utils/models.py index 8a531ac17..91a43107c 100644 --- a/openslides/utils/models.py +++ b/openslides/utils/models.py @@ -175,7 +175,7 @@ class RESTModelMixin: current_time = time.time() if current_time > last_time + 5: last_time = current_time - logger.info(f"\t{i+1}/{instances_length}...") + logger.info(f" {i+1}/{instances_length}...") return full_data @classmethod diff --git a/tests/integration/utils/test_consumers.py b/tests/integration/utils/test_consumers.py index 4f664ff2a..764bb143e 100644 --- a/tests/integration/utils/test_consumers.py +++ b/tests/integration/utils/test_consumers.py @@ -50,7 +50,7 @@ async def prepare_element_cache(settings): ] ) element_cache._cachables = None - await element_cache.async_ensure_cache(default_change_id=2) + await element_cache.async_ensure_cache(default_change_id=10) yield # Reset the cachable_provider element_cache.cachable_provider = orig_cachable_provider @@ -159,7 +159,7 @@ async def test_connection_with_too_big_change_id(get_communicator, set_config): @pytest.mark.asyncio async def test_changed_data_autoupdate(get_communicator, set_config): await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") + communicator = get_communicator() await communicator.connect() # Change a config value @@ -201,7 +201,7 @@ async def create_user_session_cookie(user_id: int) -> Tuple[bytes, bytes]: @pytest.mark.asyncio async def test_with_user(get_communicator): cookie_header = await create_user_session_cookie(1) - communicator = get_communicator("autoupdate=on", headers=[cookie_header]) + communicator = get_communicator(headers=[cookie_header]) connected, __ = await communicator.connect() @@ -211,7 +211,7 @@ async def test_with_user(get_communicator): @pytest.mark.asyncio async def test_skipping_autoupdate(set_config, get_communicator): cookie_header = await create_user_session_cookie(1) - communicator = get_communicator("autoupdate=on", headers=[cookie_header]) + communicator = get_communicator(headers=[cookie_header]) await communicator.connect() @@ -254,7 +254,7 @@ async def test_skipping_autoupdate(set_config, get_communicator): @pytest.mark.asyncio async def test_receive_deleted_data(get_communicator, set_config): await set_config("general_system_enable_anonymous", True) - communicator = get_communicator("autoupdate=on") + communicator = get_communicator() await communicator.connect() # Delete test element @@ -384,6 +384,7 @@ async def test_send_get_elements_too_big_change_id(communicator, set_config): @pytest.mark.asyncio async def test_send_get_elements_too_small_change_id(communicator, set_config): + # Note: this test depends on the default_change_id set in prepare_element_cache await set_config("general_system_enable_anonymous", True) await communicator.connect() @@ -517,7 +518,7 @@ async def test_listen_to_projector(communicator, set_config): content = response.get("content") assert type == "projector" assert content == { - "change_id": 3, + "change_id": 11, "data": { "1": [ { @@ -555,7 +556,7 @@ async def test_update_projector(communicator, set_config): content = response.get("content") assert type == "projector" assert content == { - "change_id": 4, + "change_id": 12, "data": { "1": [ {