Skip to content

throttle - middleware

LimitMiddleware

LimitMiddleware(
    app, cache, auth_backend, anno_backend=None
)

Middleware for implementing rate limiting in ASGI applications.

This middleware supports both authenticated and anonymous users, applying rate limits based on user identifiers or session IDs.

:param app: ASGI application :param cache: Cache client instance for storing rate limit data :param auth_backend: Authentication backend for identifying users :param anno_backend: Backend for handling anonymous users (defaults to AnnoSessionBackend)

Source code in webtool/throttle/middleware.py
def __init__(
    self,
    app,
    cache,
    auth_backend: "BaseBackend",
    anno_backend: "BaseAnnoBackend" = None,
) -> None:
    """
    :param app: ASGI application
    :param cache: Cache client instance for storing rate limit data
    :param auth_backend: Authentication backend for identifying users
    :param anno_backend: Backend for handling anonymous users (defaults to AnnoSessionBackend)
    """

    self.app = app
    self.limiter = RedisLimiter(cache)
    self.auth_backend = auth_backend
    self.anno_backend = anno_backend or AnnoSessionBackend("th-session")

app instance-attribute

app = app

limiter instance-attribute

limiter = RedisLimiter(cache)

auth_backend instance-attribute

auth_backend = auth_backend

anno_backend instance-attribute

anno_backend = anno_backend or AnnoSessionBackend(
    "th-session"
)

apply async

apply(scope, receive, send, identifier, rules)

Applies rate limiting rules and handles the request.

:param scope: ASGI request scope :param receive: ASGI receive function :param send: ASGI send function :param identifier: ASGI identifier string :param rules: List of rate limit rules

:return: Response from app or rate limit exceeded response

Source code in webtool/throttle/middleware.py
async def apply(self, scope, receive, send, identifier: str, rules: list["LimitRule"]):
    """
    Applies rate limiting rules and handles the request.

    :param scope: ASGI request scope
    :param receive: ASGI receive function
    :param send: ASGI send function
    :param identifier: ASGI identifier string
    :param rules: List of rate limit rules

    :return: Response from app or rate limit exceeded response
    """

    if rules:
        deny = await self.limiter.is_deny(identifier, rules)
        deny.sort(key=lambda limit: limit[2])

        if any(s[0] < s[1] for s in deny):
            return await _default_callback(scope, send, deny)

        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                headers = message.setdefault("headers", [])
                headers.extend(
                    [
                        (b"Retry-After", str(deny[-1][2]).encode()),
                        (b"x-ratelimit-limit", str(deny[-1][1]).encode()),
                        (b"x-ratelimit-remaining", str(deny[-1][0] - deny[-1][1]).encode()),
                        (
                            b"access-control-expose-headers",
                            b"x-ratelimit-limit, x-ratelimit-remaining, retry-after",
                        ),
                    ]
                )
            await send(message)

        return await self.app(scope, receive, send_wrapper)

    return await self.app(scope, receive, send)