Skip to content

Cache - client

InMemoryCache

InMemoryCache()

Bases: BaseCache

Implementation of a InMemory client. DO NOT USE IN PRODUCTION.

Source code in webtool/cache/client.py
def __init__(self):
    self.cache: dict = {}

cache instance-attribute

cache = {}

lock

lock(
    key,
    ttl_ms=100,
    blocking=True,
    blocking_timeout=DEFAULT_CAP,
    blocking_sleep=DEFAULT_BASE,
)

Sets a key-value pair in a lock mechanism.

PARAMETER DESCRIPTION
key

The key to be locked.

TYPE: Union[bytes, str, memoryview]

ttl_ms

The time-to-live for the lock in milliseconds.

TYPE: Union[int, timedelta, None] DEFAULT: 100

blocking

If True, the method will block until the lock is acquired.

TYPE: bool DEFAULT: True

blocking_timeout

The maximum time in seconds to block while waiting for the lock to be acquired.

TYPE: float DEFAULT: DEFAULT_CAP

blocking_sleep

The time in seconds to wait between attempts to acquire the lock when blocking.

TYPE: float DEFAULT: DEFAULT_BASE

RETURNS DESCRIPTION
AsyncInMemoryLock

The Lock object representing the acquired lock.

TYPE: AsyncInMemoryLock

Source code in webtool/cache/client.py
def lock(
    self,
    key: Union[bytes, str, memoryview],
    ttl_ms: Union[int, timedelta, None] = 100,
    blocking: bool = True,
    blocking_timeout: float = DEFAULT_CAP,
    blocking_sleep: float = DEFAULT_BASE,
) -> AsyncInMemoryLock:
    """
    Sets a key-value pair in a lock mechanism.

    Parameters:
        key: The key to be locked.
        ttl_ms: The time-to-live for the lock in milliseconds.
        blocking: If True, the method will block until the lock is acquired.
        blocking_timeout: The maximum time in seconds to block while waiting for the lock to be acquired.
        blocking_sleep: The time in seconds to wait between attempts to acquire the lock when blocking.

    Returns:
        AsyncInMemoryLock: The Lock object representing the acquired lock.
    """
    return AsyncInMemoryLock(self, key, ttl_ms, blocking, blocking_timeout, blocking_sleep)

set async

set(key, value, ex=None, exat=None, nx=False)

Sets a key-value pair.

PARAMETER DESCRIPTION
key

The key for the data to be set.

TYPE: Union[bytes, str, memoryview]

value

The value associated with the key.

TYPE: Union[bytes, memoryview, str, int, float]

ex

Expiration time for the key, in seconds or as a timedelta.

TYPE: Union[int, timedelta, None] DEFAULT: None

exat

Expiration time as an absolute timestamp.

TYPE: Union[int, datetime, None] DEFAULT: None

nx

if set to True, set the value at key to value only if it does not exist.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Any

return of set.

TYPE: Any

Source code in webtool/cache/client.py
async def set(
    self,
    key: Union[bytes, str, memoryview],
    value: Union[bytes, memoryview, str, int, float],
    ex: Union[int, timedelta, None] = None,
    exat: Union[int, datetime, None] = None,
    nx: bool = False,
) -> Any:
    """
    Sets a key-value pair.

    Parameters:
        key: The key for the data to be set.
        value: The value associated with the key.
        ex: Expiration time for the key, in seconds or as a timedelta.
        exat: Expiration time as an absolute timestamp.
        nx: if set to True, set the value at key to value only if it does not exist.

    Returns:
        Any: return of set.
    """

    await self._expire()
    if nx and self.cache.get(key):
        return None

    if ex:
        now = asyncio.get_running_loop().time()
        if isinstance(ex, (float, int)):
            exp = now + ex
        else:
            exp = now + ex.total_seconds()
    else:
        if isinstance(exat, int):
            exp = exat
        elif isinstance(exat, datetime):
            exp = exat.timestamp()
        else:
            exp = float("inf")

    self.cache[key] = (value, exp)
    return self.cache[key]

get async

get(key)

Gets a key-value pair.

PARAMETER DESCRIPTION
key

The key for the data to be set.

TYPE: Union[bytes, str, memoryview]

Returns: Any: return of get.

Source code in webtool/cache/client.py
async def get(
    self,
    key: Union[bytes, str, memoryview],
) -> Any:
    """
    Gets a key-value pair.

    Parameters:
        key: The key for the data to be set.
    Returns:
        Any: return of get.
    """

    await self._expire()

    val = self.cache.get(key)
    if val:
        return val[0]
    return None

delete async

delete(key)

Deletes a key-value pair.

PARAMETER DESCRIPTION
key

The key for the data to be set.

TYPE: Union[bytes, str, memoryview]

Returns: None

Source code in webtool/cache/client.py
async def delete(
    self,
    key: Union[bytes, str, memoryview],
) -> Any:
    """
    Deletes a key-value pair.

    Parameters:
        key: The key for the data to be set.
    Returns:
        None
    """

    await self._expire()

    return self.cache.pop(key)

aclose async

aclose()

Closes the Cache client.

Source code in webtool/cache/client.py
async def aclose(self) -> None:
    """
    Closes the Cache client.
    """

    self.cache.clear()

RedisCache

RedisCache(
    redis_url=None,
    connection_pool=None,
    logger=None,
    config=None,
)

Bases: BaseCache

Implementation of a Redis client with failover capabilities.

Initializes the Redis client.

PARAMETER DESCRIPTION
redis_url

Redis data source name for connection.

TYPE: str DEFAULT: None

connection_pool

An existing connection pool to use.

TYPE: Optional[ConnectionPool] DEFAULT: None

Source code in webtool/cache/client.py
def __init__(
    self,
    redis_url: str = None,
    connection_pool: Optional[ConnectionPool] = None,
    logger: logging.Logger = None,
    config: RedisConfig | None = None,
):
    """
    Initializes the Redis client.

    Parameters:
        redis_url: Redis data source name for connection.
        connection_pool: An existing connection pool to use.
    """

    self.logger = logger or logging.getLogger(__name__)
    self.config = config or RedisConfig()

    if connection_pool:
        self.connection_pool = connection_pool
    elif redis_url:
        kwargs = self.config.to_dict()
        kwargs.update(
            {
                "retry": deepcopy(self.config.retry),
            }
        )
        self.connection_pool = ConnectionPool.from_url(redis_url, **kwargs)
    else:
        raise TypeError("RedisClient must be provided with either redis_url or connection_pool")

    self.cache: Redis = Redis.from_pool(self.connection_pool)

logger instance-attribute

logger = logger or getLogger(__name__)

config instance-attribute

config = config or RedisConfig()

connection_pool instance-attribute

connection_pool = connection_pool

cache instance-attribute

cache = from_pool(connection_pool)

lock

lock(
    key,
    ttl_ms=100,
    blocking=True,
    blocking_timeout=DEFAULT_CAP,
    blocking_sleep=DEFAULT_BASE,
)

Sets a key-value pair in a lock mechanism.

PARAMETER DESCRIPTION
key

The key to be locked.

TYPE: Union[bytes, str, memoryview]

ttl_ms

The time-to-live for the lock in milliseconds.

TYPE: Union[int, timedelta, None] DEFAULT: 100

blocking

If True, the method will block until the lock is acquired.

TYPE: bool DEFAULT: True

blocking_timeout

The maximum time in seconds to block while waiting for the lock to be acquired.

TYPE: float DEFAULT: DEFAULT_CAP

blocking_sleep

The time in seconds to wait between attempts to acquire the lock when blocking.

TYPE: float DEFAULT: DEFAULT_BASE

RETURNS DESCRIPTION
AsyncRedisLock

The Lock object representing the acquired lock.

TYPE: AsyncRedisLock

Source code in webtool/cache/client.py
def lock(
    self,
    key: Union[bytes, str, memoryview],
    ttl_ms: Union[int, timedelta, None] = 100,
    blocking: bool = True,
    blocking_timeout: float = DEFAULT_CAP,
    blocking_sleep: float = DEFAULT_BASE,
) -> AsyncRedisLock:
    """
    Sets a key-value pair in a lock mechanism.

    Parameters:
        key: The key to be locked.
        ttl_ms: The time-to-live for the lock in milliseconds.
        blocking: If True, the method will block until the lock is acquired.
        blocking_timeout: The maximum time in seconds to block while waiting for the lock to be acquired.
        blocking_sleep: The time in seconds to wait between attempts to acquire the lock when blocking.

    Returns:
        AsyncRedisLock: The Lock object representing the acquired lock.
    """
    return AsyncRedisLock(self, key, ttl_ms, blocking, blocking_timeout, blocking_sleep)

set async

set(key, value, ex=None, exat=None, nx=False)

Sets a key-value pair.

PARAMETER DESCRIPTION
key

The key for the data to be set.

TYPE: Union[bytes, str, memoryview]

value

The value associated with the key.

TYPE: Union[bytes, memoryview, str, int, float]

ex

Expiration time for the key, in seconds or as a timedelta.

TYPE: Union[int, timedelta, None] DEFAULT: None

exat

Expiration time as an absolute timestamp.

TYPE: Union[int, datetime, None] DEFAULT: None

nx

if set to True, set the value at key to value only if it does not exist.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Any

return of set.

TYPE: Any

Source code in webtool/cache/client.py
async def set(
    self,
    key: Union[bytes, str, memoryview],
    value: Union[bytes, memoryview, str, int, float],
    ex: Union[int, timedelta, None] = None,
    exat: Union[int, datetime, None] = None,
    nx: bool = False,
) -> Any:
    """
    Sets a key-value pair.

    Parameters:
        key: The key for the data to be set.
        value: The value associated with the key.
        ex: Expiration time for the key, in seconds or as a timedelta.
        exat: Expiration time as an absolute timestamp.
        nx: if set to True, set the value at key to value only if it does not exist.

    Returns:
        Any: return of set.
    """

    return await self.cache.set(key, value, ex=ex, exat=exat, nx=nx)

get async

get(key)

Gets a key-value pair.

PARAMETER DESCRIPTION
key

The key for the data to be set.

TYPE: Union[bytes, str, memoryview]

Returns: Any: return of get.

Source code in webtool/cache/client.py
async def get(
    self,
    key: Union[bytes, str, memoryview],
) -> Any:
    """
    Gets a key-value pair.

    Parameters:
        key: The key for the data to be set.
    Returns:
        Any: return of get.
    """

    return await self.cache.get(key)

delete async

delete(key)

Deletes a key-value pair.

PARAMETER DESCRIPTION
key

The key for the data to be set.

TYPE: Union[bytes, str, memoryview]

Returns: None

Source code in webtool/cache/client.py
async def delete(
    self,
    key: Union[bytes, str, memoryview],
) -> Any:
    """
    Deletes a key-value pair.

    Parameters:
        key: The key for the data to be set.
    Returns:
        None
    """

    return await self.cache.delete(key)

aclose async

aclose()

Closes the Redis client connection and connection pool.

Source code in webtool/cache/client.py
async def aclose(self) -> None:
    """
    Closes the Redis client connection and connection pool.
    """

    self.logger.info(f"Closing Redis client connection (id: {id(self)})")

    try:
        await self.cache.aclose()
    except AttributeError as e:
        self.logger.warning(f"Failed to close Redis connection: {e}")

    try:
        await self.connection_pool.aclose()
        await self.connection_pool.disconnect()
    except AttributeError as e:
        self.logger.warning(f"Failed to close connection pool: {e}")

RedisConfig dataclass

RedisConfig(
    username=None,
    password=None,
    health_check_interval=0,
    socket_timeout=0.5,
    socket_connect_timeout=2.0,
    socket_keepalive=True,
    retry=Retry(default_backoff(), retries=3),
    retry_on_error=lambda: [
        BusyLoadingError,
        ConnectionError,
        RedisError,
        OSError,
    ](),
    retry_on_timeout=True,
    ssl=False,
    max_connections=None,
    protocol=3,
)

Configuration settings for establishing a connection with a Redis server.

PARAMETER DESCRIPTION
username

username

TYPE: Optional[str] DEFAULT: None

password

password

TYPE: Optional[str] DEFAULT: None

health_check_interval

Interval in seconds for performing health checks.

TYPE: int DEFAULT: 0

socket_timeout

Timeout in seconds for socket operations, including reads and writes.

TYPE: float DEFAULT: 0.5

socket_connect_timeout

Timeout in seconds for establishing a new connection to Redis.

TYPE: float DEFAULT: 2.0

socket_keepalive

Whether to enable TCP keepalive for the connection. Default is True.

TYPE: bool DEFAULT: True

retry

Retry policy for handling transient failures.

TYPE: Optional[Retry] DEFAULT: Retry(default_backoff(), retries=3)

retry_on_error

A list of exception types that should trigger a retry.

TYPE: Optional[list[type[Exception]]] DEFAULT: lambda: [BusyLoadingError, ConnectionError, RedisError, OSError]()

retry_on_timeout

Whether to retry operations when a timeout occurs.

TYPE: bool DEFAULT: True

ssl

Specifies if SSL should be used for the Redis connection.

TYPE: bool DEFAULT: False

protocol

Redis protocol version to be used. Default is RESP3.

TYPE: Optional[int] DEFAULT: 3

METHOD DESCRIPTION
to_dict

Converts the configuration fields to a dictionary.

username class-attribute instance-attribute

username = None

password class-attribute instance-attribute

password = None

health_check_interval class-attribute instance-attribute

health_check_interval = 0

socket_timeout class-attribute instance-attribute

socket_timeout = 0.5

socket_connect_timeout class-attribute instance-attribute

socket_connect_timeout = 2.0

socket_keepalive class-attribute instance-attribute

socket_keepalive = True

retry class-attribute instance-attribute

retry = Retry(default_backoff(), retries=3)

retry_on_error class-attribute instance-attribute

retry_on_error = field(
    default_factory=lambda: [
        BusyLoadingError,
        ConnectionError,
        RedisError,
        OSError,
    ]
)

retry_on_timeout class-attribute instance-attribute

retry_on_timeout = True

ssl class-attribute instance-attribute

ssl = False

max_connections class-attribute instance-attribute

max_connections = None

protocol class-attribute instance-attribute

protocol = 3

to_dict

to_dict()
Source code in webtool/cache/client.py
def to_dict(self) -> dict[str, Any]:
    return {k: v for k, v in self.__dict__.items() if v}