From 23842fd496d966c6c69af28eef64108781c117b1 Mon Sep 17 00:00:00 2001 From: FinnStutzenstein Date: Fri, 8 May 2020 16:17:14 +0200 Subject: [PATCH] Synchronize autoupdate code in the client If autoupdates are too fast, the first one may not be fully executed. Especially when the maxChangeId is not yet updated, the second Autoupdate will trigger a refresh, because for the client it "lay in the future". This can be prevented by synchronizing the autoupdate-handling code with a mutex. --- .../core/core-services/autoupdate.service.ts | 4 +++ .../core/core-services/data-store.service.ts | 1 + client/src/app/core/promises/mutex.ts | 30 +++++++++++++++++++ openslides/utils/cache_providers.py | 10 +++++-- openslides/utils/redis.py | 3 ++ 5 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 client/src/app/core/promises/mutex.ts diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index ceaeda77a..3303d0740 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@angular/core'; import { BaseModel } from '../../shared/models/base/base-model'; import { CollectionStringMapperService } from './collection-string-mapper.service'; import { DataStoreService, DataStoreUpdateManagerService } from './data-store.service'; +import { Mutex } from '../promises/mutex'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; interface AutoupdateFormat { @@ -45,6 +46,7 @@ interface AutoupdateFormat { providedIn: 'root' }) export class AutoupdateService { + private mutex = new Mutex(); /** * Constructor to create the AutoupdateService. Calls the constructor of the parent class. * @param websocketService @@ -79,11 +81,13 @@ export class AutoupdateService { * Handles the change ids of all autoupdates. */ private async storeResponse(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); if (autoupdate.all_data) { await this.storeAllData(autoupdate); } else { await this.storePartialAutoupdate(autoupdate); } + unlock(); } /** diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index a9ff9fc7e..b9ed07a08 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { + console.warn('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } diff --git a/client/src/app/core/promises/mutex.ts b/client/src/app/core/promises/mutex.ts new file mode 100644 index 000000000..3de879915 --- /dev/null +++ b/client/src/app/core/promises/mutex.ts @@ -0,0 +1,30 @@ +/** + * A mutex as described in every textbook + * + * Usage: + * ``` + * mutex = new Mutex(); // create e.g. as class member + * + * // Somewhere in the code to lock (must be async code!) + * const unlock = await this.mutex.lock() + * // ...the code to synchronize + * unlock() + * ``` + */ +export class Mutex { + private mutex = Promise.resolve(); + + public lock(): PromiseLike<() => void> { + // this will capture the code-to-synchronize + let begin: (unlock: () => void) => void = () => {}; + + // All "requests" to execute code are chained in a promise-chain + this.mutex = this.mutex.then(() => { + return new Promise(begin); + }); + + return new Promise(res => { + begin = res; + }); + } +} diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 1fb0cc60f..e049a5211 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -8,7 +8,11 @@ from django.core.exceptions import ImproperlyConfigured from typing_extensions import Protocol from . import logging -from .redis import read_only_redis_amount_replicas, use_redis +from .redis import ( + read_only_redis_amount_replicas, + read_only_redis_wait_timeout, + use_redis, +) from .schema_version import SchemaVersion from .utils import split_element_id, str_dict_to_bytes @@ -452,11 +456,11 @@ class RedisCacheProvider: raise e if not read_only and read_only_redis_amount_replicas is not None: reported_amount = await redis.wait( - read_only_redis_amount_replicas, 1000 + read_only_redis_amount_replicas, read_only_redis_wait_timeout ) if reported_amount != read_only_redis_amount_replicas: logger.warn( - f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!" + f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested after {read_only_redis_wait_timeout} ms!" ) return result diff --git a/openslides/utils/redis.py b/openslides/utils/redis.py index acc370f54..d5ff24065 100644 --- a/openslides/utils/redis.py +++ b/openslides/utils/redis.py @@ -11,6 +11,7 @@ logger = logging.getLogger(__name__) use_redis = False use_read_only_redis = False read_only_redis_amount_replicas = None +read_only_redis_wait_timeout = None try: import aioredis @@ -35,6 +36,8 @@ else: read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1) logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}") + read_only_redis_wait_timeout = getattr(settings, "WAIT_TIMEOUT", 1000) + logger.info(f"WAIT_TIMEOUT={read_only_redis_wait_timeout}") else: logger.info("Redis is not configured.")