bb2f958eb5
Since channels_redis does not support dedicated read-redis instances, the autoupdate message may be received before the data was replicated. All workers read the autoupdate message from the write host, so there is a race between getting this message and a finished replication. For large payloads, the replication is slower in the most cases (even more in a distributed setup, where the master and replica are on different nodes). The easy way is to wait for replication. But there is one difficulty: The number of replicas has to be known. There is a new settings-variable "AMOUNT_REPLICAS" which defaults to 1. It needs to be set correctly! If it is too high, every autoupdate will be delayed by 1 seconds because of a timeout witing for non-existent replicas. If it is too low, some autoupdates may be wrong (and not detectable by the client!) becuase of reading from non-synchronised relicas. The other possibility is to fork channel_redis and add the feature of a read-only redis. This ould help, because on a single redis instance all commands are ordered: First, the data is synced, then the autoupdate message. Attention: This means, if redis-replicas are scaled up, one must make sure to read from the same instance. I think this is not possible in the way how dockers overlay networks work. The only way would be to open one connection and reuse the connection from channels_redis in OpenSlides. This would mean a heavy integration of channels_redis (meaning including the source code in our repo). For the first fix, this one is easy and should work.
69 lines
2.1 KiB
Python
69 lines
2.1 KiB
Python
from typing import Any
|
|
|
|
from django.conf import settings
|
|
|
|
from . import logging
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Defaults
|
|
use_redis = False
|
|
use_read_only_redis = False
|
|
read_only_redis_amount_replicas = None
|
|
|
|
try:
|
|
import aioredis
|
|
except ImportError:
|
|
pass
|
|
else:
|
|
from .redis_connection_pool import ConnectionPool
|
|
|
|
# set use_redis to true, if there is a value for REDIS_ADDRESS in the settings
|
|
redis_address = getattr(settings, "REDIS_ADDRESS", "")
|
|
use_redis = bool(redis_address)
|
|
|
|
if use_redis:
|
|
logger.info(f"Redis address {redis_address}")
|
|
pool = ConnectionPool({"address": redis_address})
|
|
|
|
redis_read_only_address = getattr(settings, "REDIS_READ_ONLY_ADDRESS", "")
|
|
use_read_only_redis = bool(redis_read_only_address)
|
|
if use_read_only_redis:
|
|
logger.info(f"Redis read only address {redis_read_only_address}")
|
|
read_only_pool = ConnectionPool({"address": redis_read_only_address})
|
|
|
|
read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1)
|
|
logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}")
|
|
else:
|
|
logger.info("Redis is not configured.")
|
|
|
|
|
|
# TODO: contextlib.asynccontextmanager can be used in python 3.7
|
|
class RedisConnectionContextManager:
|
|
"""
|
|
Async context manager for connections
|
|
"""
|
|
|
|
def __init__(self, read_only: bool) -> None:
|
|
self.pool = read_only_pool if read_only and use_read_only_redis else pool
|
|
|
|
async def __aenter__(self) -> "aioredis.RedisConnection":
|
|
self.conn = await self.pool.pop()
|
|
return self.conn
|
|
|
|
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
|
|
if exc:
|
|
logger.warn(f"Redis Exception: {exc}. Do not reuse connection...")
|
|
self.pool.conn_error(self.conn)
|
|
else:
|
|
self.pool.push(self.conn)
|
|
self.conn = None
|
|
|
|
|
|
def get_connection(read_only: bool = False) -> RedisConnectionContextManager:
|
|
"""
|
|
Returns contextmanager for a redis connection.
|
|
"""
|
|
return RedisConnectionContextManager(read_only)
|