Merge pull request #5290 from FinnStutzenstein/redisWaitForReplication
Redis: Wait for replication on writes
This commit is contained in:
commit
83d57e9da7
@ -8,7 +8,7 @@ from django.core.exceptions import ImproperlyConfigured
|
|||||||
from typing_extensions import Protocol
|
from typing_extensions import Protocol
|
||||||
|
|
||||||
from . import logging
|
from . import logging
|
||||||
from .redis import use_redis
|
from .redis import read_only_redis_amount_replicas, use_redis
|
||||||
from .schema_version import SchemaVersion
|
from .schema_version import SchemaVersion
|
||||||
from .utils import split_element_id, str_dict_to_bytes
|
from .utils import split_element_id, str_dict_to_bytes
|
||||||
|
|
||||||
@ -435,14 +435,23 @@ class RedisCacheProvider:
|
|||||||
|
|
||||||
async with get_connection(read_only=read_only) as redis:
|
async with get_connection(read_only=read_only) as redis:
|
||||||
try:
|
try:
|
||||||
return await redis.evalsha(hash, keys, args)
|
result = await redis.evalsha(hash, keys, args)
|
||||||
except aioredis.errors.ReplyError as e:
|
except aioredis.errors.ReplyError as e:
|
||||||
if str(e).startswith("NOSCRIPT"):
|
if str(e).startswith("NOSCRIPT"):
|
||||||
return await self._eval(redis, script_name, keys=keys, args=args)
|
result = await self._eval(redis, script_name, keys=keys, args=args)
|
||||||
elif str(e) == "cache_reset":
|
elif str(e) == "cache_reset":
|
||||||
raise CacheReset()
|
raise CacheReset()
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
|
if not read_only and read_only_redis_amount_replicas is not None:
|
||||||
|
reported_amount = await redis.wait(
|
||||||
|
read_only_redis_amount_replicas, 1000
|
||||||
|
)
|
||||||
|
if reported_amount != read_only_redis_amount_replicas:
|
||||||
|
logger.warn(
|
||||||
|
f"WAIT reported {reported_amount} replicas of {read_only_redis_amount_replicas} requested!"
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
async def _eval(
|
async def _eval(
|
||||||
self, redis: Any, script_name: str, keys: List[str] = [], args: List[Any] = []
|
self, redis: Any, script_name: str, keys: List[str] = [], args: List[Any] = []
|
||||||
|
@ -7,11 +7,15 @@ from . import logging
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Defaults
|
||||||
|
use_redis = False
|
||||||
|
use_read_only_redis = False
|
||||||
|
read_only_redis_amount_replicas = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import aioredis
|
import aioredis
|
||||||
except ImportError:
|
except ImportError:
|
||||||
use_redis = False
|
pass
|
||||||
use_read_only_redis = False
|
|
||||||
else:
|
else:
|
||||||
from .redis_connection_pool import ConnectionPool
|
from .redis_connection_pool import ConnectionPool
|
||||||
|
|
||||||
@ -28,6 +32,9 @@ else:
|
|||||||
if use_read_only_redis:
|
if use_read_only_redis:
|
||||||
logger.info(f"Redis read only address {redis_read_only_address}")
|
logger.info(f"Redis read only address {redis_read_only_address}")
|
||||||
read_only_pool = ConnectionPool({"address": redis_read_only_address})
|
read_only_pool = ConnectionPool({"address": redis_read_only_address})
|
||||||
|
|
||||||
|
read_only_redis_amount_replicas = getattr(settings, "AMOUNT_REPLICAS", 1)
|
||||||
|
logger.info(f"AMOUNT_REPLICAS={read_only_redis_amount_replicas}")
|
||||||
else:
|
else:
|
||||||
logger.info("Redis is not configured.")
|
logger.info("Redis is not configured.")
|
||||||
|
|
||||||
|
@ -103,6 +103,8 @@ if use_redis:
|
|||||||
# a (host, port) tuple — ('localhost', 6379);
|
# a (host, port) tuple — ('localhost', 6379);
|
||||||
# or a unix domain socket path string — "/path/to/redis.sock".
|
# or a unix domain socket path string — "/path/to/redis.sock".
|
||||||
REDIS_ADDRESS = "redis://127.0.0.1"
|
REDIS_ADDRESS = "redis://127.0.0.1"
|
||||||
|
# REDIS_READ_ONLY_ADDRESS
|
||||||
|
AMOUNT_REPLICAS = 1
|
||||||
|
|
||||||
# Session backend
|
# Session backend
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user