Redis: Wait for replication on writes
Since channels_redis does not support dedicated read-redis instances, the autoupdate message may be received before the data was replicated. All workers read the autoupdate message from the write host, so there is a race between getting this message and a finished replication. For large payloads, the replication is slower in the most cases (even more in a distributed setup, where the master and replica are on different nodes). The easy way is to wait for replication. But there is one difficulty: The number of replicas has to be known. There is a new settings-variable "AMOUNT_REPLICAS" which defaults to 1. It needs to be set correctly! If it is too high, every autoupdate will be delayed by 1 seconds because of a timeout witing for non-existent replicas. If it is too low, some autoupdates may be wrong (and not detectable by the client!) becuase of reading from non-synchronised relicas. The other possibility is to fork channel_redis and add the feature of a read-only redis. This ould help, because on a single redis instance all commands are ordered: First, the data is synced, then the autoupdate message. Attention: This means, if redis-replicas are scaled up, one must make sure to read from the same instance. I think this is not possible in the way how dockers overlay networks work. The only way would be to open one connection and reuse the connection from channels_redis in OpenSlides. This would mean a heavy integration of channels_redis (meaning including the source code in our repo). For the first fix, this one is easy and should work.
This commit is contained in:
parent
8119507b8a
commit
bb2f958eb5
@ -8,7 +8,7 @@ from django.core.exceptions import ImproperlyConfigured
|
|||||||
from typing_extensions import Protocol
|
from typing_extensions import Protocol
|
||||||
|
|
||||||
from . import logging
|
from . import logging
|
||||||
from .redis import use_redis
|
from .redis import read_only_redis_amount_replicas, use_redis
|
||||||
from .schema_version import SchemaVersion
|
from .schema_version import SchemaVersion
|
||||||
from .utils import split_element_id, str_dict_to_bytes
|
from .utils import split_element_id, str_dict_to_bytes
|
||||||
|
|
||||||
@ -435,14 +435,23 @@ class RedisCacheProvider:
|
|||||||
|
|
||||||
async with get_connection(read_only=read_only) as redis:
|
async with get_connection(read_only=read_only) as redis:
|
||||||
try:
|
try:
|
||||||
return await redis.evalsha(hash, keys, args)
|
result = await redis.evalsha(hash, keys, args)
|
||||||
except aioredis.errors.ReplyError as e:
|
except aioredis.errors.ReplyError as e:
|
||||||
if str(e).startswith("NOSCRIPT"):
|
if str(e).startswith("NOSCRIPT"):
|
||||||
return await self._eval(redis, script_name, keys=keys, args=args)
|
result = await self._eval(redis, script_name, keys=keys, args=args)
|
||||||
elif str(e) == "cache_reset":
|
elif str(e) == "cache_reset":
|
||||||
raise CacheReset()
|
raise CacheReset()
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
|
if not read_only and read_only_redis_amount_replicas is not None:
|
||||||
|
reported_amount = await redis.wait(
|
||||||
|
read_only_redis_amount_replicas, 1000
|
||||||
|
)
|
||||||
|
if reported_amount != read_only_redis_amount_replicas:
|
||||||
|
logger.warn(
|
||||||
|
f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!"
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
async def _eval(
|
async def _eval(
|
||||||
self, redis: Any, script_name: str, keys: List[str] = [], args: List[Any] = []
|
self, redis: Any, script_name: str, keys: List[str] = [], args: List[Any] = []
|
||||||
|
@ -7,11 +7,15 @@ from . import logging
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Defaults
|
||||||
|
use_redis = False
|
||||||
|
use_read_only_redis = False
|
||||||
|
read_only_redis_amount_replicas = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import aioredis
|
import aioredis
|
||||||
except ImportError:
|
except ImportError:
|
||||||
use_redis = False
|
pass
|
||||||
use_read_only_redis = False
|
|
||||||
else:
|
else:
|
||||||
from .redis_connection_pool import ConnectionPool
|
from .redis_connection_pool import ConnectionPool
|
||||||
|
|
||||||
@ -28,6 +32,9 @@ else:
|
|||||||
if use_read_only_redis:
|
if use_read_only_redis:
|
||||||
logger.info(f"Redis read only address {redis_read_only_address}")
|
logger.info(f"Redis read only address {redis_read_only_address}")
|
||||||
read_only_pool = ConnectionPool({"address": redis_read_only_address})
|
read_only_pool = ConnectionPool({"address": redis_read_only_address})
|
||||||
|
|
||||||
|
read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1)
|
||||||
|
logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}")
|
||||||
else:
|
else:
|
||||||
logger.info("Redis is not configured.")
|
logger.info("Redis is not configured.")
|
||||||
|
|
||||||
|
@ -103,6 +103,8 @@ if use_redis:
|
|||||||
# a (host, port) tuple — ('localhost', 6379);
|
# a (host, port) tuple — ('localhost', 6379);
|
||||||
# or a unix domain socket path string — "/path/to/redis.sock".
|
# or a unix domain socket path string — "/path/to/redis.sock".
|
||||||
REDIS_ADDRESS = "redis://127.0.0.1"
|
REDIS_ADDRESS = "redis://127.0.0.1"
|
||||||
|
# REDIS_READ_ONLY_ADDRESS
|
||||||
|
AMOUNT_REPLICAS = 1
|
||||||
|
|
||||||
# Session backend
|
# Session backend
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user