Module supertokens_python.recipe.session.asyncio

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union

from supertokens_python.recipe.openid.interfaces import (
    GetOpenIdDiscoveryConfigurationResult,
)
from supertokens_python.recipe.session.interfaces import (
    ClaimsValidationResult,
    GetClaimValueOkResult,
    SessionClaim,
    SessionClaimValidator,
    SessionContainer,
    SessionDoesNotExistError,
    SessionInformationResult,
)
from supertokens_python.recipe.session.recipe import SessionRecipe
from supertokens_python.types import MaybeAwaitable, RecipeUserId
from supertokens_python.utils import FRAMEWORKS, resolve

from ...jwt.interfaces import (
    CreateJwtOkResult,
    CreateJwtResultUnsupportedAlgorithm,
    GetJWKSResult,
)
from ..session_request_functions import (
    create_new_session_in_request,
    get_session_from_request,
    refresh_session_in_request,
)
from ..constants import protected_props
from ..utils import get_required_claim_validators

from supertokens_python.recipe.multitenancy.constants import DEFAULT_TENANT_ID
from supertokens_python.asyncio import get_user

_T = TypeVar("_T")


async def create_new_session(
    request: Any,
    tenant_id: str,
    recipe_user_id: RecipeUserId,
    access_token_payload: Union[Dict[str, Any], None] = None,
    session_data_in_database: Union[Dict[str, Any], None] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> SessionContainer:
    if user_context is None:
        user_context = {}
    if session_data_in_database is None:
        session_data_in_database = {}
    if access_token_payload is None:
        access_token_payload = {}

    recipe_instance = SessionRecipe.get_instance()
    config = recipe_instance.config
    app_info = recipe_instance.app_info

    user = await get_user(recipe_user_id.get_as_string(), user_context)
    user_id = recipe_user_id.get_as_string()
    if user is not None:
        user_id = user.id

    return await create_new_session_in_request(
        request,
        user_context,
        recipe_instance,
        access_token_payload,
        user_id,
        recipe_user_id,
        config,
        app_info,
        session_data_in_database,
        tenant_id,
    )


async def create_new_session_without_request_response(
    tenant_id: str,
    recipe_user_id: RecipeUserId,
    access_token_payload: Union[Dict[str, Any], None] = None,
    session_data_in_database: Union[Dict[str, Any], None] = None,
    disable_anti_csrf: bool = False,
    user_context: Union[None, Dict[str, Any]] = None,
) -> SessionContainer:
    if user_context is None:
        user_context = {}
    if session_data_in_database is None:
        session_data_in_database = {}
    if access_token_payload is None:
        access_token_payload = {}

    claims_added_by_other_recipes = (
        SessionRecipe.get_instance().get_claims_added_by_other_recipes()
    )
    app_info = SessionRecipe.get_instance().app_info
    issuer = (
        app_info.api_domain.get_as_string_dangerous()
        + app_info.api_base_path.get_as_string_dangerous()
    )

    final_access_token_payload = {**access_token_payload, "iss": issuer}

    for prop in protected_props:
        if prop in final_access_token_payload:
            del final_access_token_payload[prop]

    user = await get_user(recipe_user_id.get_as_string(), user_context)
    user_id = recipe_user_id.get_as_string()
    if user is not None:
        user_id = user.id

    for claim in claims_added_by_other_recipes:
        update = await claim.build(
            user_id, recipe_user_id, tenant_id, final_access_token_payload, user_context
        )
        final_access_token_payload = {**final_access_token_payload, **update}

    return await SessionRecipe.get_instance().recipe_implementation.create_new_session(
        user_id,
        recipe_user_id,
        final_access_token_payload,
        session_data_in_database,
        disable_anti_csrf,
        tenant_id,
        user_context=user_context,
    )


async def validate_claims_for_session_handle(
    session_handle: str,
    override_global_claim_validators: Optional[
        Callable[
            [
                List[SessionClaimValidator],
                SessionInformationResult,
                Dict[str, Any],
            ],
            MaybeAwaitable[List[SessionClaimValidator]],
        ]
    ] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> Union[SessionDoesNotExistError, ClaimsValidationResult]:
    if user_context is None:
        user_context = {}

    recipe_impl = SessionRecipe.get_instance().recipe_implementation
    session_info = await recipe_impl.get_session_information(
        session_handle, user_context
    )

    if session_info is None:
        return SessionDoesNotExistError()

    claim_validators_added_by_other_recipes = (
        SessionRecipe.get_instance().get_claim_validators_added_by_other_recipes()
    )
    global_claim_validators = await resolve(
        recipe_impl.get_global_claim_validators(
            session_info.tenant_id,
            session_info.user_id,
            session_info.recipe_user_id,
            claim_validators_added_by_other_recipes,
            user_context,
        )
    )

    if override_global_claim_validators is not None:
        claim_validators = await resolve(
            override_global_claim_validators(
                global_claim_validators, session_info, user_context
            )
        )
    else:
        claim_validators = global_claim_validators

    claim_validation_res = await recipe_impl.validate_claims(
        session_info.user_id,
        session_info.recipe_user_id,
        session_info.custom_claims_in_access_token_payload,
        claim_validators,
        user_context,
    )

    if claim_validation_res.access_token_payload_update is not None:
        updated = await recipe_impl.merge_into_access_token_payload(
            session_handle,
            claim_validation_res.access_token_payload_update,
            user_context,
        )
        if not updated:
            return SessionDoesNotExistError()

    return ClaimsValidationResult(claim_validation_res.invalid_claims)


async def fetch_and_set_claim(
    session_handle: str,
    claim: SessionClaim[Any],
    user_context: Union[None, Dict[str, Any]] = None,
) -> bool:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.fetch_and_set_claim(
        session_handle, claim, user_context
    )


async def get_claim_value(
    session_handle: str,
    claim: SessionClaim[_T],
    user_context: Union[None, Dict[str, Any]] = None,
) -> Union[SessionDoesNotExistError, GetClaimValueOkResult[_T]]:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.get_claim_value(
        session_handle, claim, user_context
    )


async def set_claim_value(
    session_handle: str,
    claim: SessionClaim[_T],
    value: _T,
    user_context: Union[None, Dict[str, Any]] = None,
) -> bool:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.set_claim_value(
        session_handle, claim, value, user_context
    )


async def remove_claim(
    session_handle: str,
    claim: SessionClaim[Any],
    user_context: Union[None, Dict[str, Any]] = None,
) -> bool:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.remove_claim(
        session_handle, claim, user_context
    )


async def get_session(
    request: Any,
    session_required: Optional[bool] = None,
    anti_csrf_check: Optional[bool] = None,
    check_database: Optional[bool] = None,
    override_global_claim_validators: Optional[
        Callable[
            [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
            MaybeAwaitable[List[SessionClaimValidator]],
        ]
    ] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> Union[SessionContainer, None]:
    if user_context is None:
        user_context = {}

    if session_required is None:
        session_required = True

    recipe_instance = SessionRecipe.get_instance()
    recipe_interface_impl = recipe_instance.recipe_implementation
    config = recipe_instance.config

    return await get_session_from_request(
        request,
        config,
        recipe_interface_impl,
        session_required=session_required,
        anti_csrf_check=anti_csrf_check,
        check_database=check_database,
        override_global_claim_validators=override_global_claim_validators,
        user_context=user_context,
    )


async def get_session_without_request_response(
    access_token: str,
    anti_csrf_token: Optional[str] = None,
    anti_csrf_check: Optional[bool] = None,
    session_required: Optional[bool] = None,
    check_database: Optional[bool] = None,
    override_global_claim_validators: Optional[
        Callable[
            [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
            MaybeAwaitable[List[SessionClaimValidator]],
        ]
    ] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> Optional[SessionContainer]:
    """Tries to validate an access token and build a Session object from it.

    Notes about anti-csrf checking:
    - if the `antiCsrf` is set to VIA_HEADER in the Session recipe config you have to handle anti-csrf checking before calling this function and set antiCsrfCheck to false in the options.
    - you can disable anti-csrf checks by setting antiCsrf to NONE in the Session recipe config. We only recommend this if you are always getting the access-token from the Authorization header.
    - if the antiCsrf check fails the returned status will be TRY_REFRESH_TOKEN_ERROR

    Args:
    - access_token: The access token extracted from the authorization header or cookies
    - anti_csrf_token: The anti-csrf token extracted from the authorization header or cookies. Can be undefined if antiCsrfCheck is false
    - anti_csrf_check: If true, anti-csrf checking will be done. If false, it will be skipped. Default behaviour is to check.
    - session_required: If true, throws an error if the session does not exist. Default is True.
    - check_database: If true, the session will be checked in the database. If false, it will be skipped. Default behaviour is to skip.
    - override_global_claim_validators: Alter the
    - user_context: user context

    Results:
    - OK: The session was successfully validated, including claim validation
    - CLAIM_VALIDATION_ERROR: While the access token is valid, one or more claim validators have failed. Our frontend SDKs expect a 403 response the contents matching the value returned from this function.
    - TRY_REFRESH_TOKEN_ERROR: This means, that the access token structure was valid, but it didn't pass validation for some reason and the user should call the refresh API.
        You can send a 401 response to trigger this behaviour if you are using our frontend SDKs
    - UNAUTHORISED: This means that the access token likely doesn't belong to a SuperTokens session. If this is unexpected, it's best handled by sending a 401 response.
    """
    if user_context is None:
        user_context = {}

    if session_required is None:
        session_required = True

    recipe_interface_impl = SessionRecipe.get_instance().recipe_implementation

    session = await recipe_interface_impl.get_session(
        access_token,
        anti_csrf_token,
        anti_csrf_check,
        session_required,
        check_database,
        override_global_claim_validators,
        user_context,
    )

    if session is not None:
        claim_validators = await get_required_claim_validators(
            session, override_global_claim_validators, user_context
        )
        await session.assert_claims(claim_validators, user_context)

    return session


async def refresh_session(
    request: Any,
    user_context: Union[None, Dict[str, Any]] = None,
) -> SessionContainer:
    if user_context is None:
        user_context = {}

    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            SessionRecipe.get_instance().app_info.framework
        ].wrap_request(request)

    recipe_instance = SessionRecipe.get_instance()
    config = recipe_instance.config
    recipe_interface_impl = recipe_instance.recipe_implementation

    return await refresh_session_in_request(
        request,
        user_context,
        config,
        recipe_interface_impl,
    )


async def refresh_session_without_request_response(
    refresh_token: str,
    disable_anti_csrf: bool = False,
    anti_csrf_token: Optional[str] = None,
    user_context: Optional[Dict[str, Any]] = None,
) -> SessionContainer:
    if user_context is None:
        user_context = {}

    return await SessionRecipe.get_instance().recipe_implementation.refresh_session(
        refresh_token, anti_csrf_token, disable_anti_csrf, user_context
    )


async def revoke_session(
    session_handle: str, user_context: Union[None, Dict[str, Any]] = None
) -> bool:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.revoke_session(
        session_handle, user_context
    )


async def revoke_all_sessions_for_user(
    user_id: str,
    revoke_sessions_for_linked_accounts: bool = True,
    tenant_id: Optional[str] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> List[str]:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.revoke_all_sessions_for_user(
        user_id,
        revoke_sessions_for_linked_accounts,
        tenant_id or DEFAULT_TENANT_ID,
        tenant_id is None,
        user_context,
    )


async def get_all_session_handles_for_user(
    user_id: str,
    fetch_sessions_for_linked_accounts: bool = True,
    tenant_id: Optional[str] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> List[str]:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.get_all_session_handles_for_user(
        user_id,
        fetch_sessions_for_linked_accounts,
        tenant_id or DEFAULT_TENANT_ID,
        tenant_id is None,
        user_context,
    )


async def revoke_multiple_sessions(
    session_handles: List[str], user_context: Union[None, Dict[str, Any]] = None
) -> List[str]:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.revoke_multiple_sessions(
        session_handles, user_context
    )


async def get_session_information(
    session_handle: str, user_context: Union[None, Dict[str, Any]] = None
) -> Union[SessionInformationResult, None]:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.get_session_information(
        session_handle, user_context
    )


async def update_session_data_in_database(
    session_handle: str,
    new_session_data: Dict[str, Any],
    user_context: Union[None, Dict[str, Any]] = None,
) -> bool:
    if user_context is None:
        user_context = {}
    return await SessionRecipe.get_instance().recipe_implementation.update_session_data_in_database(
        session_handle, new_session_data, user_context
    )


async def merge_into_access_token_payload(
    session_handle: str,
    new_access_token_payload: Dict[str, Any],
    user_context: Union[None, Dict[str, Any]] = None,
) -> bool:
    if user_context is None:
        user_context = {}

    return await SessionRecipe.get_instance().recipe_implementation.merge_into_access_token_payload(
        session_handle, new_access_token_payload, user_context
    )


async def create_jwt(
    payload: Dict[str, Any],
    validity_seconds: Optional[int] = None,
    use_static_signing_key: Optional[bool] = None,
    user_context: Union[None, Dict[str, Any]] = None,
) -> Union[CreateJwtOkResult, CreateJwtResultUnsupportedAlgorithm]:
    if user_context is None:
        user_context = {}
    openid_recipe = SessionRecipe.get_instance().openid_recipe

    return await openid_recipe.recipe_implementation.create_jwt(
        payload, validity_seconds, use_static_signing_key, user_context
    )


async def get_jwks(user_context: Union[None, Dict[str, Any]] = None) -> GetJWKSResult:
    if user_context is None:
        user_context = {}
    openid_recipe = SessionRecipe.get_instance().openid_recipe
    return await openid_recipe.recipe_implementation.get_jwks(user_context)


async def get_open_id_discovery_configuration(
    user_context: Union[None, Dict[str, Any]] = None
) -> GetOpenIdDiscoveryConfigurationResult:
    if user_context is None:
        user_context = {}
    openid_recipe = SessionRecipe.get_instance().openid_recipe

    return (
        await openid_recipe.recipe_implementation.get_open_id_discovery_configuration(
            user_context
        )
    )

Functions

async def create_jwt(payload: Dict[str, Any], validity_seconds: Optional[int] = None, use_static_signing_key: Optional[bool] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> Union[CreateJwtOkResultCreateJwtResultUnsupportedAlgorithm]
async def create_new_session(request: Any, tenant_id: str, recipe_user_id: RecipeUserId, access_token_payload: Union[Dict[str, Any], None] = None, session_data_in_database: Union[Dict[str, Any], None] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> SessionContainer
async def create_new_session_without_request_response(tenant_id: str, recipe_user_id: RecipeUserId, access_token_payload: Union[Dict[str, Any], None] = None, session_data_in_database: Union[Dict[str, Any], None] = None, disable_anti_csrf: bool = False, user_context: Union[None, Dict[str, Any]] = None) ‑> SessionContainer
async def fetch_and_set_claim(session_handle: str, claim: SessionClaim[Any], user_context: Union[None, Dict[str, Any]] = None) ‑> bool
async def get_all_session_handles_for_user(user_id: str, fetch_sessions_for_linked_accounts: bool = True, tenant_id: Optional[str] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> List[str]
async def get_claim_value(session_handle: str, claim: SessionClaim[_T], user_context: Union[None, Dict[str, Any]] = None) ‑> Union[SessionDoesNotExistErrorGetClaimValueOkResult[~_T]]
async def get_jwks(user_context: Union[None, Dict[str, Any]] = None) ‑> GetJWKSResult
async def get_open_id_discovery_configuration(user_context: Union[None, Dict[str, Any]] = None) ‑> GetOpenIdDiscoveryConfigurationResult
async def get_session(request: Any, session_required: Optional[bool] = None, anti_csrf_check: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> Optional[SessionContainer]
async def get_session_information(session_handle: str, user_context: Union[None, Dict[str, Any]] = None) ‑> Optional[SessionInformationResult]
async def get_session_without_request_response(access_token: str, anti_csrf_token: Optional[str] = None, anti_csrf_check: Optional[bool] = None, session_required: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> Optional[SessionContainer]

Tries to validate an access token and build a Session object from it.

Notes about anti-csrf checking: - if the antiCsrf is set to VIA_HEADER in the Session recipe config you have to handle anti-csrf checking before calling this function and set antiCsrfCheck to false in the options. - you can disable anti-csrf checks by setting antiCsrf to NONE in the Session recipe config. We only recommend this if you are always getting the access-token from the Authorization header. - if the antiCsrf check fails the returned status will be TRY_REFRESH_TOKEN_ERROR

Args: - access_token: The access token extracted from the authorization header or cookies - anti_csrf_token: The anti-csrf token extracted from the authorization header or cookies. Can be undefined if antiCsrfCheck is false - anti_csrf_check: If true, anti-csrf checking will be done. If false, it will be skipped. Default behaviour is to check. - session_required: If true, throws an error if the session does not exist. Default is True. - check_database: If true, the session will be checked in the database. If false, it will be skipped. Default behaviour is to skip. - override_global_claim_validators: Alter the - user_context: user context

Results: - OK: The session was successfully validated, including claim validation - CLAIM_VALIDATION_ERROR: While the access token is valid, one or more claim validators have failed. Our frontend SDKs expect a 403 response the contents matching the value returned from this function. - TRY_REFRESH_TOKEN_ERROR: This means, that the access token structure was valid, but it didn't pass validation for some reason and the user should call the refresh API. You can send a 401 response to trigger this behaviour if you are using our frontend SDKs - UNAUTHORISED: This means that the access token likely doesn't belong to a SuperTokens session. If this is unexpected, it's best handled by sending a 401 response.

async def merge_into_access_token_payload(session_handle: str, new_access_token_payload: Dict[str, Any], user_context: Union[None, Dict[str, Any]] = None) ‑> bool
async def refresh_session(request: Any, user_context: Union[None, Dict[str, Any]] = None) ‑> SessionContainer
async def refresh_session_without_request_response(refresh_token: str, disable_anti_csrf: bool = False, anti_csrf_token: Optional[str] = None, user_context: Optional[Dict[str, Any]] = None) ‑> SessionContainer
async def remove_claim(session_handle: str, claim: SessionClaim[Any], user_context: Union[None, Dict[str, Any]] = None) ‑> bool
async def revoke_all_sessions_for_user(user_id: str, revoke_sessions_for_linked_accounts: bool = True, tenant_id: Optional[str] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> List[str]
async def revoke_multiple_sessions(session_handles: List[str], user_context: Union[None, Dict[str, Any]] = None) ‑> List[str]
async def revoke_session(session_handle: str, user_context: Union[None, Dict[str, Any]] = None) ‑> bool
async def set_claim_value(session_handle: str, claim: SessionClaim[_T], value: _T, user_context: Union[None, Dict[str, Any]] = None) ‑> bool
async def update_session_data_in_database(session_handle: str, new_session_data: Dict[str, Any], user_context: Union[None, Dict[str, Any]] = None) ‑> bool
async def validate_claims_for_session_handle(session_handle: str, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionInformationResult, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]] = None, user_context: Union[None, Dict[str, Any]] = None) ‑> Union[SessionDoesNotExistErrorClaimsValidationResult]