Source code for core.cache.redis

from __future__ import annotations

from dogpile.cache import CacheRegion
from dogpile.cache.api import NO_VALUE
from functools import lru_cache
from onegov.core.custom import msgpack
from redis import ConnectionPool


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from dogpile.cache.api import NoValue


[docs] def msgpack_deserialize(value: bytes | NoValue) -> Any: if value is NO_VALUE: return value return msgpack.unpackb(value)
[docs] class RedisCacheRegion(CacheRegion): """ A slightly more specific CacheRegion that will be configured to a single non-clustered Redis backend with name-mangling based on a given namespace as well as a couple of additional convenience methods specific to Redis. It will use dill to serialize/deserialize values. """ def __init__( self, namespace: str, expiration_time: float, redis_url: str, ): super().__init__( serializer=msgpack.packb, deserializer=msgpack_deserialize )
[docs] self.namespace = namespace
self.configure( 'dogpile.cache.redis', arguments={ 'url': redis_url, 'redis_expiration_time': expiration_time + 1, 'connection_pool': get_pool(redis_url) } ) # remove instance level key_mangler if 'key_mangler' in self.__dict__: del self.__dict__['key_mangler']
[docs] def key_mangler(self, key: str) -> bytes: # type:ignore[override] return f'{self.namespace}:{key}'.encode('utf-8')
[docs] def keys(self) -> list[str]: # note, this cannot be used in a Redis cluster - if we use that # we have to keep track of all keys separately return self.backend.reader_client.eval( "return redis.pcall('keys', ARGV[1])", 0, f'{self.namespace}:*' )
[docs] def flush(self) -> int: # note, this cannot be used in a Redis cluster - if we use that # we have to keep track of all keys separately return self.backend.reader_client.eval(""" local keys = redis.call('keys', ARGV[1]) for i=1,#keys,5000 do redis.call('del', unpack(keys, i, math.min(i+4999, #keys))) end return #keys """, 0, f'{self.namespace}:*')
# TODO: Remove these deprecated aliases
[docs] keys = RedisCacheRegion.keys
[docs] flush = RedisCacheRegion.flush
@lru_cache(maxsize=1024)
[docs] def get( namespace: str, expiration_time: float, redis_url: str ) -> RedisCacheRegion: return RedisCacheRegion( namespace=namespace, expiration_time=expiration_time, redis_url=redis_url )
@lru_cache(maxsize=16)
[docs] def get_pool(redis_url: str) -> ConnectionPool: return ConnectionPool.from_url(redis_url)