Merge pull request #5077 from FinnStutzenstein/readOnlyRedis

Adding a second optional redis for read only accesses
This commit is contained in:
Finn Stutzenstein 2019-12-04 11:01:21 +01:00 committed by GitHub
commit 60e3282286
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 27 deletions

View File

@ -256,7 +256,7 @@ class RedisCacheProvider:
"""
Returns True, when there is data in the cache.
"""
async with get_connection() as redis:
async with get_connection(read_only=True) as redis:
return await redis.exists(self.full_data_cache_key) and bool(
await redis.zrangebyscore(
self.change_id_cache_key, withscores=True, count=1, offset=0
@ -269,7 +269,7 @@ class RedisCacheProvider:
Returns all data from the full_data_cache in a mapping from element_id to the element.
"""
return await aioredis.util.wait_make_dict(
self.eval("get_all_data", [self.full_data_cache_key])
self.eval("get_all_data", [self.full_data_cache_key], read_only=True)
)
@ensure_cache_wrapper()
@ -279,7 +279,10 @@ class RedisCacheProvider:
from element_id to the element.
"""
response = await self.eval(
"get_collection_data", [self.full_data_cache_key], [f"{collection}:*"]
"get_collection_data",
[self.full_data_cache_key],
[f"{collection}:*"],
read_only=True,
)
collection_data = {}
@ -295,7 +298,7 @@ class RedisCacheProvider:
Returns one element from the cache. Returns None, when the element does not exist.
"""
return await self.eval(
"get_element_data", [self.full_data_cache_key], [element_id]
"get_element_data", [self.full_data_cache_key], [element_id], read_only=True
)
@ensure_cache_wrapper()
@ -346,6 +349,7 @@ class RedisCacheProvider:
"get_data_since",
keys=[self.full_data_cache_key, self.change_id_cache_key],
args=[change_id, redis_max_change_id],
read_only=True,
)
)
@ -366,7 +370,7 @@ class RedisCacheProvider:
"""
Get the highest change_id from redis.
"""
async with get_connection() as redis:
async with get_connection(read_only=True) as redis:
value = await redis.zrevrangebyscore(
self.change_id_cache_key, withscores=True, count=1, offset=0
)
@ -380,7 +384,7 @@ class RedisCacheProvider:
"""
Get the lowest change_id from redis.
"""
async with get_connection() as redis:
async with get_connection(read_only=True) as redis:
value = await redis.zscore(
self.change_id_cache_key, "_config:lowest_change_id"
)
@ -390,7 +394,7 @@ class RedisCacheProvider:
async def get_schema_version(self) -> Optional[SchemaVersion]:
""" Retrieves the schema version of the cache or None, if not existent """
async with get_connection() as redis:
async with get_connection(read_only=True) as redis:
schema_version = await redis.hgetall(self.schema_cache_key)
if not schema_version:
return None
@ -407,7 +411,11 @@ class RedisCacheProvider:
await redis.hmset_dict(self.schema_cache_key, schema_version)
async def eval(
self, script_name: str, keys: List[str] = [], args: List[Any] = []
self,
script_name: str,
keys: List[str] = [],
args: List[Any] = [],
read_only: bool = False,
) -> Any:
"""
Runs a lua script in redis. This wrapper around redis.eval tries to make
@ -425,7 +433,7 @@ class RedisCacheProvider:
"A script with a ensure_cache prefix must have the full_data cache key as its first key"
)
async with get_connection() as redis:
async with get_connection(read_only=read_only) as redis:
try:
return await redis.evalsha(hash, keys, args)
except aioredis.errors.ReplyError as e:

View File

@ -37,6 +37,8 @@ class RedisLockProvider:
"""
Returns True, when the lock is set. Else False.
"""
# Execute the lookup on the main redis server (no readonly) to avoid
# eventual consistency between the master and replicas
async with get_connection() as redis:
return await redis.get(f"{self.lock_prefix}{lock_name}")

View File

@ -1,4 +1,3 @@
import asyncio
from typing import Any
from django.conf import settings
@ -12,49 +11,51 @@ try:
import aioredis
except ImportError:
use_redis = False
use_read_only_redis = False
else:
from channels_redis.core import ConnectionPool
from .redis_connection_pool import ConnectionPool
# set use_redis to true, if there is a value for REDIS_ADDRESS in the settings
redis_address = getattr(settings, "REDIS_ADDRESS", "")
use_redis = bool(redis_address)
if use_redis:
logger.info(f"Redis address {redis_address}")
pool = ConnectionPool({"address": redis_address})
pool = ConnectionPool({"address": redis_address})
counter = 0
redis_read_only_address = getattr(settings, "REDIS_READ_ONLY_ADDRESS", "")
use_read_only_redis = bool(redis_read_only_address)
if use_read_only_redis:
logger.info(f"Redis read only address {redis_read_only_address}")
read_only_pool = ConnectionPool({"address": redis_read_only_address})
else:
logger.info("Redis is not configured.")
# TODO: contextlib.asynccontextmanager can be used in python 3.7
class RedisConnectionContextManager:
"""
Async context manager for connections
"""
# TODO: contextlib.asynccontextmanager can be used in python 3.7
def __init__(self, read_only: bool) -> None:
self.pool = read_only_pool if read_only and use_read_only_redis else pool
async def __aenter__(self) -> "aioredis.RedisConnection":
global counter
while counter > 100:
await asyncio.sleep(0.1)
counter += 1
self.conn = await pool.pop()
self.conn = await self.pool.pop()
return self.conn
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
if exc:
logger.warn(f"Redis Exception: {exc}. Do not reuse connection...")
pool.conn_error(self.conn)
self.pool.conn_error(self.conn)
else:
pool.push(self.conn)
self.pool.push(self.conn)
self.conn = None
global counter
counter -= 1
def get_connection() -> RedisConnectionContextManager:
def get_connection(read_only: bool = False) -> RedisConnectionContextManager:
"""
Returns contextmanager for a redis connection.
"""
return RedisConnectionContextManager()
return RedisConnectionContextManager(read_only)

View File

@ -0,0 +1,42 @@
import asyncio
from typing import Any, Dict, List
import aioredis
from channels_redis.core import ConnectionPool as ChannelRedisConnectionPool
from django.conf import settings
from . import logging
logger = logging.getLogger(__name__)
connection_pool_limit = getattr(settings, "CONNECTION_POOL_LIMIT", 100)
logger.info(f"CONNECTION_POOL_LIMIT={connection_pool_limit}")
class ConnectionPool(ChannelRedisConnectionPool):
""" Adds a trivial, soft limit for the pool """
def __init__(self, host: Any) -> None:
self.counter = 0
super().__init__(host)
async def pop(
self, *args: List[Any], **kwargs: Dict[str, Any]
) -> aioredis.commands.Redis:
while self.counter > connection_pool_limit:
await asyncio.sleep(0.1)
self.counter += 1
return await super().pop(*args, **kwargs)
def push(self, conn: aioredis.commands.Redis) -> None:
super().push(conn)
self.counter -= 1
def conn_error(self, conn: aioredis.commands.Redis) -> None:
super().conn_error(conn)
self.counter -= 1
def reset(self) -> None:
super().reset()
self.counter = 0