diff --git a/client/src/app/core/core-services/autoupdate.service.ts b/client/src/app/core/core-services/autoupdate.service.ts index ceaeda77a..3303d0740 100644 --- a/client/src/app/core/core-services/autoupdate.service.ts +++ b/client/src/app/core/core-services/autoupdate.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@angular/core'; import { BaseModel } from '../../shared/models/base/base-model'; import { CollectionStringMapperService } from './collection-string-mapper.service'; import { DataStoreService, DataStoreUpdateManagerService } from './data-store.service'; +import { Mutex } from '../promises/mutex'; import { WebsocketService, WEBSOCKET_ERROR_CODES } from './websocket.service'; interface AutoupdateFormat { @@ -45,6 +46,7 @@ interface AutoupdateFormat { providedIn: 'root' }) export class AutoupdateService { + private mutex = new Mutex(); /** * Constructor to create the AutoupdateService. Calls the constructor of the parent class. * @param websocketService @@ -79,11 +81,13 @@ export class AutoupdateService { * Handles the change ids of all autoupdates. */ private async storeResponse(autoupdate: AutoupdateFormat): Promise { + const unlock = await this.mutex.lock(); if (autoupdate.all_data) { await this.storeAllData(autoupdate); } else { await this.storePartialAutoupdate(autoupdate); } + unlock(); } /** diff --git a/client/src/app/core/core-services/data-store.service.ts b/client/src/app/core/core-services/data-store.service.ts index a9ff9fc7e..b9ed07a08 100644 --- a/client/src/app/core/core-services/data-store.service.ts +++ b/client/src/app/core/core-services/data-store.service.ts @@ -258,6 +258,7 @@ export class DataStoreUpdateManagerService { private serveNextSlot(): void { if (this.updateSlotRequests.length > 0) { + console.warn('Concurrent update slots'); const request = this.updateSlotRequests.pop(); request.resolve(); } diff --git a/client/src/app/core/promises/mutex.ts b/client/src/app/core/promises/mutex.ts new file mode 100644 index 000000000..3de879915 --- /dev/null +++ b/client/src/app/core/promises/mutex.ts @@ -0,0 +1,30 @@ +/** + * A mutex as described in every textbook + * + * Usage: + * ``` + * mutex = new Mutex(); // create e.g. as class member + * + * // Somewhere in the code to lock (must be async code!) + * const unlock = await this.mutex.lock() + * // ...the code to synchronize + * unlock() + * ``` + */ +export class Mutex { + private mutex = Promise.resolve(); + + public lock(): PromiseLike<() => void> { + // this will capture the code-to-synchronize + let begin: (unlock: () => void) => void = () => {}; + + // All "requests" to execute code are chained in a promise-chain + this.mutex = this.mutex.then(() => { + return new Promise(begin); + }); + + return new Promise(res => { + begin = res; + }); + } +} diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 1fb0cc60f..e049a5211 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -8,7 +8,11 @@ from django.core.exceptions import ImproperlyConfigured from typing_extensions import Protocol from . import logging -from .redis import read_only_redis_amount_replicas, use_redis +from .redis import ( + read_only_redis_amount_replicas, + read_only_redis_wait_timeout, + use_redis, +) from .schema_version import SchemaVersion from .utils import split_element_id, str_dict_to_bytes @@ -452,11 +456,11 @@ class RedisCacheProvider: raise e if not read_only and read_only_redis_amount_replicas is not None: reported_amount = await redis.wait( - read_only_redis_amount_replicas, 1000 + read_only_redis_amount_replicas, read_only_redis_wait_timeout ) if reported_amount != read_only_redis_amount_replicas: logger.warn( - f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!" + f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested after {read_only_redis_wait_timeout} ms!" ) return result diff --git a/openslides/utils/redis.py b/openslides/utils/redis.py index acc370f54..d5ff24065 100644 --- a/openslides/utils/redis.py +++ b/openslides/utils/redis.py @@ -11,6 +11,7 @@ logger = logging.getLogger(__name__) use_redis = False use_read_only_redis = False read_only_redis_amount_replicas = None +read_only_redis_wait_timeout = None try: import aioredis @@ -35,6 +36,8 @@ else: read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1) logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}") + read_only_redis_wait_timeout = getattr(settings, "WAIT_TIMEOUT", 1000) + logger.info(f"WAIT_TIMEOUT={read_only_redis_wait_timeout}") else: logger.info("Redis is not configured.")