Skip to content

Auth - service

JWTService

JWTService(
    cache,
    secret_key="",
    access_token_expire_time=3600,
    refresh_token_expire_time=604800,
    jwt_manager=None,
    algorithm=None,
)

Bases: BaseJWTService[PayloadT], PayloadFactory, Generic[PayloadT]

generate access token, refresh token

Info

Most cases, the algorithm parameter is automatically determined based on the secret_key, so there is no need to specify the algorithm. If using an asymmetric encryption key, providing the secret_key will automatically use the correct public key. The secret_key can be generated using the webtools.utils package.

Source code in webtool/auth/service.py
def __init__(
    self,
    cache: "BaseCache",
    secret_key: str | bytes = "",
    access_token_expire_time: int = 3600,
    refresh_token_expire_time: int = 604800,
    jwt_manager: BaseJWTManager | None = None,
    algorithm: str | None = None,
):
    self._cache = cache
    self._secret_key = secret_key
    self._jwt_manager = jwt_manager or JWTManager()
    self._json_encoder = ORJSONEncoder()
    self._json_decoder = ORJSONDecoder()
    self.algorithm = algorithm
    self.access_token_expire_time = access_token_expire_time
    self.refresh_token_expire_time = refresh_token_expire_time

    self._private_key = None
    self._public_key = None

    key_object = load_key(secret_key)

    if len(key_object) == 3:
        self._private_key, self._public_key, key_algorithm = key_object
    else:
        key_algorithm = key_object[-1]

    self._verify_key_algorithm(key_algorithm)

algorithm instance-attribute

algorithm = algorithm

access_token_expire_time instance-attribute

access_token_expire_time = access_token_expire_time

refresh_token_expire_time instance-attribute

refresh_token_expire_time = refresh_token_expire_time

create_token async

create_token(data)

Create Access and Refresh Tokens.

PARAMETER DESCRIPTION
data

must include 'sub' field.

TYPE: dict

RETURNS DESCRIPTION
tuple

Access, Refresh Token.

TYPE: tuple[str, str]

Source code in webtool/auth/service.py
async def create_token(self, data: dict) -> tuple[str, str]:
    """
    Create Access and Refresh Tokens.

    Parameters:
        data: must include 'sub' field.

    Returns:
        tuple: Access, Refresh Token.
    """
    self._validate_sub(data)

    access_data = self._create_metadata(data, self.access_token_expire_time)
    refresh_data = self._create_metadata(data, self.refresh_token_expire_time)

    access_token = self._create_token(access_data)
    refresh_token = self._create_token(refresh_data)
    await self._save_refresh_token(access_data, refresh_data)

    return access_token, refresh_token

validate_access_token async

validate_access_token(access_token, options=None)

Validate Access Token.

PARAMETER DESCRIPTION
access_token

Access Token.

TYPE: str

options

Optional parameters for additional handling of additional errors.

TYPE: dict[str, bool | list[str]] | None DEFAULT: None

RETURNS DESCRIPTION
Optional[PayloadT]

Optional[PayloadType]: Access Token Data

Source code in webtool/auth/service.py
async def validate_access_token(
    self,
    access_token: str,
    options: dict[str, bool | list[str]] | None = None,
) -> Optional[PayloadT]:
    """
    Validate Access Token.

    Parameters:
        access_token: Access Token.
        options: Optional parameters for additional handling of additional errors.

    Returns:
        Optional[PayloadType]: Access Token Data
    """
    access_data = self._decode_token(access_token, options=options)

    if not access_data:
        return None

    return access_data

validate_refresh_token async

validate_refresh_token(refresh_token, options=None)

Validate Refresh Token.

PARAMETER DESCRIPTION
refresh_token

Access Token.

TYPE: str

options

Optional parameters for additional handling of additional errors.

TYPE: dict[str, bool | list[str]] | None DEFAULT: None

RETURNS DESCRIPTION
Optional[PayloadT]

Optional[PayloadType]: Refresh Token Data

Source code in webtool/auth/service.py
async def validate_refresh_token(
    self,
    refresh_token: str,
    options: dict[str, bool | list[str]] | None = None,
) -> Optional[PayloadT]:
    """
    Validate Refresh Token.

    Parameters:
        refresh_token: Access Token.
        options: Optional parameters for additional handling of additional errors.

    Returns:
        Optional[PayloadType]: Refresh Token Data
    """
    refresh_data = self._decode_token(refresh_token, options=options)

    if not refresh_data:
        return None

    cached_refresh_data = await self._read_refresh_token(refresh_data)
    if not cached_refresh_data:
        return None

    cached_refresh_data.pop("access_jti")
    if cached_refresh_data != refresh_data:
        return None

    return refresh_data

invalidate_token async

invalidate_token(refresh_token)

Invalidates the Refresh token and the Access token issued with it .

PARAMETER DESCRIPTION
refresh_token

Access Token.

TYPE: str

RETURNS DESCRIPTION
bool

Returns true on success.

TYPE: bool

Source code in webtool/auth/service.py
async def invalidate_token(self, refresh_token: str) -> bool:
    """
    Invalidates the Refresh token and the Access token issued with it .

    Parameters:
        refresh_token: Access Token.

    Returns:
        bool: Returns `true` on success.
    """
    refresh_data = await self.validate_refresh_token(refresh_token)

    if not refresh_data:
        return False

    await self._invalidate_refresh_token(refresh_data)
    return True

update_token async

update_token(data, refresh_token)

Invalidates the Refresh token and the Access token issued with it and issue New Access and Refresh Tokens.

PARAMETER DESCRIPTION
data

Token data.

TYPE: dict

refresh_token

Access Token.

TYPE: str

RETURNS DESCRIPTION
tuple

Access, Refresh Token.

TYPE: tuple[str, str] | None

Source code in webtool/auth/service.py
async def update_token(self, data: dict, refresh_token: str) -> tuple[str, str] | None:
    """
    Invalidates the Refresh token and the Access token issued with it and issue New Access and Refresh Tokens.

    Parameters:
        data: Token data.
        refresh_token: Access Token.

    Returns:
        tuple: Access, Refresh Token.
    """
    refresh_data = await self.invalidate_token(refresh_token)

    if not refresh_data:
        return None

    new_access_token, new_refresh_token = await self.create_token(data)

    return new_access_token, new_refresh_token

RedisJWTService

RedisJWTService(
    cache,
    secret_key="",
    access_token_expire_time=3600,
    refresh_token_expire_time=604800,
    jwt_manager=None,
    algorithm=None,
)

Bases: JWTService, Generic[PayloadT]

generate access token, refresh token

Info

Most cases, the algorithm parameter is automatically determined based on the secret_key, so there is no need to specify the algorithm. If using an asymmetric encryption key, providing the secret_key will automatically use the correct public key. The secret_key can be generated using the webtools.utils package.

Source code in webtool/auth/service.py
def __init__(
    self,
    cache: "RedisCache",
    secret_key: str | bytes = "",
    access_token_expire_time: int = 3600,
    refresh_token_expire_time: int = 604800,
    jwt_manager: BaseJWTManager | None = None,
    algorithm: str | None = None,
):
    super().__init__(cache, secret_key, access_token_expire_time, refresh_token_expire_time, jwt_manager, algorithm)
    self._save_script = self._cache.cache.register_script(RedisJWTService._LUA_SAVE_TOKEN_SCRIPT)
    self._invalidate_script = self._cache.cache.register_script(RedisJWTService._LUA_INVALIDATE_TOKEN_SCRIPT)
    self._search_script = self._cache.cache.register_script(RedisJWTService._LUA_SEARCH_TOKEN_SCRIPT)

algorithm instance-attribute

algorithm = algorithm

access_token_expire_time instance-attribute

access_token_expire_time = access_token_expire_time

refresh_token_expire_time instance-attribute

refresh_token_expire_time = refresh_token_expire_time

create_token async

create_token(data)

Create Access and Refresh Tokens.

PARAMETER DESCRIPTION
data

must include 'sub' field.

TYPE: dict

RETURNS DESCRIPTION
tuple

Access, Refresh Token.

TYPE: tuple[str, str]

Source code in webtool/auth/service.py
async def create_token(self, data: dict) -> tuple[str, str]:
    """
    Create Access and Refresh Tokens.

    Parameters:
        data: must include 'sub' field.

    Returns:
        tuple: Access, Refresh Token.
    """
    self._validate_sub(data)

    access_data = self._create_metadata(data, self.access_token_expire_time)
    refresh_data = self._create_metadata(data, self.refresh_token_expire_time)

    access_token = self._create_token(access_data)
    refresh_token = self._create_token(refresh_data)
    await self._save_refresh_token(access_data, refresh_data)

    return access_token, refresh_token

validate_access_token async

validate_access_token(access_token, options=None)

Validate Access Token.

PARAMETER DESCRIPTION
access_token

Access Token.

TYPE: str

options

Optional parameters for additional handling of additional errors.

TYPE: dict[str, bool | list[str]] | None DEFAULT: None

RETURNS DESCRIPTION
Optional[PayloadT]

Optional[PayloadType]: Access Token Data

Source code in webtool/auth/service.py
async def validate_access_token(
    self,
    access_token: str,
    options: dict[str, bool | list[str]] | None = None,
) -> Optional[PayloadT]:
    """
    Validate Access Token.

    Parameters:
        access_token: Access Token.
        options: Optional parameters for additional handling of additional errors.

    Returns:
        Optional[PayloadType]: Access Token Data
    """
    access_data = self._decode_token(access_token, options=options)

    if not access_data:
        return None

    return access_data

validate_refresh_token async

validate_refresh_token(refresh_token, options=None)

Validate Refresh Token.

PARAMETER DESCRIPTION
refresh_token

Access Token.

TYPE: str

options

Optional parameters for additional handling of additional errors.

TYPE: dict[str, bool | list[str]] | None DEFAULT: None

RETURNS DESCRIPTION
Optional[PayloadT]

Optional[PayloadType]: Refresh Token Data

Source code in webtool/auth/service.py
async def validate_refresh_token(
    self,
    refresh_token: str,
    options: dict[str, bool | list[str]] | None = None,
) -> Optional[PayloadT]:
    """
    Validate Refresh Token.

    Parameters:
        refresh_token: Access Token.
        options: Optional parameters for additional handling of additional errors.

    Returns:
        Optional[PayloadType]: Refresh Token Data
    """
    refresh_data = self._decode_token(refresh_token, options=options)

    if not refresh_data:
        return None

    cached_refresh_data = await self._read_refresh_token(refresh_data)
    if not cached_refresh_data:
        return None

    cached_refresh_data.pop("access_jti")
    if cached_refresh_data != refresh_data:
        return None

    return refresh_data

update_token async

update_token(data, refresh_token)

Invalidates the Refresh token and the Access token issued with it and issue New Access and Refresh Tokens.

PARAMETER DESCRIPTION
data

Token data.

TYPE: dict

refresh_token

Access Token.

TYPE: str

RETURNS DESCRIPTION
tuple

Access, Refresh Token.

TYPE: tuple[str, str] | None

Source code in webtool/auth/service.py
async def update_token(self, data: dict, refresh_token: str) -> tuple[str, str] | None:
    """
    Invalidates the Refresh token and the Access token issued with it and issue New Access and Refresh Tokens.

    Parameters:
        data: Token data.
        refresh_token: Access Token.

    Returns:
        tuple: Access, Refresh Token.
    """
    refresh_data = await self.invalidate_token(refresh_token)

    if not refresh_data:
        return None

    new_access_token, new_refresh_token = await self.create_token(data)

    return new_access_token, new_refresh_token

invalidate_token async

invalidate_token(
    refresh_token, refresh_jti_to_invalidate=None
)

Invalidates the Refresh token and the Access token issued with it .

PARAMETER DESCRIPTION
refresh_token

Access Token.

TYPE: str

refresh_jti_to_invalidate

Refresh Token JTI to invalidate can be found using search_token.

TYPE: str | bytes | None DEFAULT: None

RETURNS DESCRIPTION
bool

Returns true on success.

TYPE: bool

Source code in webtool/auth/service.py
async def invalidate_token(
    self,
    refresh_token: str,
    refresh_jti_to_invalidate: str | bytes | None = None,
) -> bool:
    """
    Invalidates the Refresh token and the Access token issued with it .

    Parameters:
        refresh_token: Access Token.
        refresh_jti_to_invalidate: Refresh Token JTI to invalidate can be found using search_token.

    Returns:
        bool: Returns `true` on success.
    """
    refresh_data = await self.validate_refresh_token(refresh_token)

    if not refresh_data:
        return False

    return await self._invalidate_token_data(refresh_data, refresh_jti_to_invalidate)

search_token async

search_token(refresh_token)

Returns the JTI of the refresh token issued with the token’s sub claim

PARAMETER DESCRIPTION
refresh_token

Access Token.

TYPE: str

RETURNS DESCRIPTION
list[bytes]

list[bytes]: Returns a list containing JTIs on success.

Source code in webtool/auth/service.py
async def search_token(self, refresh_token: str) -> list[bytes]:
    """
    Returns the JTI of the refresh token issued with the token’s sub claim

    Parameters:
        refresh_token: Access Token.

    Returns:
        list[bytes]: Returns a list containing JTIs on success.
    """
    refresh_data = self._decode_token(refresh_token)
    refresh_json = self._json_encoder.encode(refresh_data)

    return await self._search_script(
        keys=[refresh_json],
        args=[
            time.time(),
            self.refresh_token_expire_time,
        ],
    )