diff --git a/openslides/utils/cache_providers.py b/openslides/utils/cache_providers.py index 156d9945b..44f04e0ae 100644 --- a/openslides/utils/cache_providers.py +++ b/openslides/utils/cache_providers.py @@ -256,7 +256,7 @@ class RedisCacheProvider: """ Returns True, when there is data in the cache. """ - async with get_connection() as redis: + async with get_connection(read_only=True) as redis: return await redis.exists(self.full_data_cache_key) and bool( await redis.zrangebyscore( self.change_id_cache_key, withscores=True, count=1, offset=0 @@ -269,7 +269,7 @@ class RedisCacheProvider: Returns all data from the full_data_cache in a mapping from element_id to the element. """ return await aioredis.util.wait_make_dict( - self.eval("get_all_data", [self.full_data_cache_key]) + self.eval("get_all_data", [self.full_data_cache_key], read_only=True) ) @ensure_cache_wrapper() @@ -279,7 +279,10 @@ class RedisCacheProvider: from element_id to the element. """ response = await self.eval( - "get_collection_data", [self.full_data_cache_key], [f"{collection}:*"] + "get_collection_data", + [self.full_data_cache_key], + [f"{collection}:*"], + read_only=True, ) collection_data = {} @@ -295,7 +298,7 @@ class RedisCacheProvider: Returns one element from the cache. Returns None, when the element does not exist. """ return await self.eval( - "get_element_data", [self.full_data_cache_key], [element_id] + "get_element_data", [self.full_data_cache_key], [element_id], read_only=True ) @ensure_cache_wrapper() @@ -346,6 +349,7 @@ class RedisCacheProvider: "get_data_since", keys=[self.full_data_cache_key, self.change_id_cache_key], args=[change_id, redis_max_change_id], + read_only=True, ) ) @@ -366,7 +370,7 @@ class RedisCacheProvider: """ Get the highest change_id from redis. """ - async with get_connection() as redis: + async with get_connection(read_only=True) as redis: value = await redis.zrevrangebyscore( self.change_id_cache_key, withscores=True, count=1, offset=0 ) @@ -380,7 +384,7 @@ class RedisCacheProvider: """ Get the lowest change_id from redis. """ - async with get_connection() as redis: + async with get_connection(read_only=True) as redis: value = await redis.zscore( self.change_id_cache_key, "_config:lowest_change_id" ) @@ -390,7 +394,7 @@ class RedisCacheProvider: async def get_schema_version(self) -> Optional[SchemaVersion]: """ Retrieves the schema version of the cache or None, if not existent """ - async with get_connection() as redis: + async with get_connection(read_only=True) as redis: schema_version = await redis.hgetall(self.schema_cache_key) if not schema_version: return None @@ -407,7 +411,11 @@ class RedisCacheProvider: await redis.hmset_dict(self.schema_cache_key, schema_version) async def eval( - self, script_name: str, keys: List[str] = [], args: List[Any] = [] + self, + script_name: str, + keys: List[str] = [], + args: List[Any] = [], + read_only: bool = False, ) -> Any: """ Runs a lua script in redis. This wrapper around redis.eval tries to make @@ -425,7 +433,7 @@ class RedisCacheProvider: "A script with a ensure_cache prefix must have the full_data cache key as its first key" ) - async with get_connection() as redis: + async with get_connection(read_only=read_only) as redis: try: return await redis.evalsha(hash, keys, args) except aioredis.errors.ReplyError as e: diff --git a/openslides/utils/locking.py b/openslides/utils/locking.py index ee7c87f13..d71ca0015 100644 --- a/openslides/utils/locking.py +++ b/openslides/utils/locking.py @@ -37,6 +37,8 @@ class RedisLockProvider: """ Returns True, when the lock is set. Else False. """ + # Execute the lookup on the main redis server (no readonly) to avoid + # eventual consistency between the master and replicas async with get_connection() as redis: return await redis.get(f"{self.lock_prefix}{lock_name}") diff --git a/openslides/utils/redis.py b/openslides/utils/redis.py index 9744ee5fe..287efdeba 100644 --- a/openslides/utils/redis.py +++ b/openslides/utils/redis.py @@ -1,4 +1,3 @@ -import asyncio from typing import Any from django.conf import settings @@ -12,49 +11,51 @@ try: import aioredis except ImportError: use_redis = False + use_read_only_redis = False else: - from channels_redis.core import ConnectionPool + from .redis_connection_pool import ConnectionPool # set use_redis to true, if there is a value for REDIS_ADDRESS in the settings redis_address = getattr(settings, "REDIS_ADDRESS", "") use_redis = bool(redis_address) + if use_redis: logger.info(f"Redis address {redis_address}") + pool = ConnectionPool({"address": redis_address}) - pool = ConnectionPool({"address": redis_address}) - counter = 0 + redis_read_only_address = getattr(settings, "REDIS_READ_ONLY_ADDRESS", "") + use_read_only_redis = bool(redis_read_only_address) + if use_read_only_redis: + logger.info(f"Redis read only address {redis_read_only_address}") + read_only_pool = ConnectionPool({"address": redis_read_only_address}) + else: + logger.info("Redis is not configured.") +# TODO: contextlib.asynccontextmanager can be used in python 3.7 class RedisConnectionContextManager: """ Async context manager for connections """ - # TODO: contextlib.asynccontextmanager can be used in python 3.7 + def __init__(self, read_only: bool) -> None: + self.pool = read_only_pool if read_only and use_read_only_redis else pool async def __aenter__(self) -> "aioredis.RedisConnection": - global counter - while counter > 100: - await asyncio.sleep(0.1) - counter += 1 - - self.conn = await pool.pop() + self.conn = await self.pool.pop() return self.conn async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None: if exc: logger.warn(f"Redis Exception: {exc}. Do not reuse connection...") - pool.conn_error(self.conn) + self.pool.conn_error(self.conn) else: - pool.push(self.conn) + self.pool.push(self.conn) self.conn = None - global counter - counter -= 1 - -def get_connection() -> RedisConnectionContextManager: +def get_connection(read_only: bool = False) -> RedisConnectionContextManager: """ Returns contextmanager for a redis connection. """ - return RedisConnectionContextManager() + return RedisConnectionContextManager(read_only) diff --git a/openslides/utils/redis_connection_pool.py b/openslides/utils/redis_connection_pool.py new file mode 100644 index 000000000..894211bf8 --- /dev/null +++ b/openslides/utils/redis_connection_pool.py @@ -0,0 +1,42 @@ +import asyncio +from typing import Any, Dict, List + +import aioredis +from channels_redis.core import ConnectionPool as ChannelRedisConnectionPool +from django.conf import settings + +from . import logging + + +logger = logging.getLogger(__name__) +connection_pool_limit = getattr(settings, "CONNECTION_POOL_LIMIT", 100) +logger.info(f"CONNECTION_POOL_LIMIT={connection_pool_limit}") + + +class ConnectionPool(ChannelRedisConnectionPool): + """ Adds a trivial, soft limit for the pool """ + + def __init__(self, host: Any) -> None: + self.counter = 0 + super().__init__(host) + + async def pop( + self, *args: List[Any], **kwargs: Dict[str, Any] + ) -> aioredis.commands.Redis: + while self.counter > connection_pool_limit: + await asyncio.sleep(0.1) + self.counter += 1 + + return await super().pop(*args, **kwargs) + + def push(self, conn: aioredis.commands.Redis) -> None: + super().push(conn) + self.counter -= 1 + + def conn_error(self, conn: aioredis.commands.Redis) -> None: + super().conn_error(conn) + self.counter -= 1 + + def reset(self) -> None: + super().reset() + self.counter = 0