Module supertokens_python.recipe.session

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Callable, Dict, Union

from typing_extensions import Literal

from supertokens_python.framework import BaseRequest

from .interfaces import SessionContainer
from .recipe import SessionRecipe
from .utils import (
    InputErrorHandlers,
    InputOverrideConfig,
    SessionOverrideConfig,
    TokenTransferMethod,
)

if TYPE_CHECKING:
    from supertokens_python.supertokens import RecipeInit


def init(
    cookie_domain: Union[str, None] = None,
    older_cookie_domain: Union[str, None] = None,
    cookie_secure: Union[bool, None] = None,
    cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
    session_expired_status_code: Union[int, None] = None,
    anti_csrf: Union[Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None] = None,
    get_token_transfer_method: Union[
        Callable[
            [BaseRequest, bool, Dict[str, Any]],
            Union[TokenTransferMethod, Literal["any"]],
        ],
        None,
    ] = None,
    error_handlers: Union[InputErrorHandlers, None] = None,
    override: Union[SessionOverrideConfig, None] = None,
    invalid_claim_status_code: Union[int, None] = None,
    use_dynamic_access_token_signing_key: Union[bool, None] = None,
    expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
    jwks_refresh_interval_sec: Union[int, None] = None,
) -> RecipeInit:
    return SessionRecipe.init(
        cookie_domain,
        older_cookie_domain,
        cookie_secure,
        cookie_same_site,
        session_expired_status_code,
        anti_csrf,
        get_token_transfer_method,
        error_handlers,
        override,
        invalid_claim_status_code,
        use_dynamic_access_token_signing_key,
        expose_access_token_to_frontend_in_cookie_based_auth,
        jwks_refresh_interval_sec,
    )


__all__ = [
    "InputErrorHandlers",
    "InputOverrideConfig",  # deprecated, use SessionOverrideConfig instead
    "SessionContainer",
    "SessionOverrideConfig",
    "SessionRecipe",
    "TokenTransferMethod",
    "init",
]

Sub-modules

supertokens_python.recipe.session.access_token
supertokens_python.recipe.session.api
supertokens_python.recipe.session.asyncio
supertokens_python.recipe.session.claim_base_classes
supertokens_python.recipe.session.claims
supertokens_python.recipe.session.constants
supertokens_python.recipe.session.cookie_and_header
supertokens_python.recipe.session.exceptions
supertokens_python.recipe.session.framework
supertokens_python.recipe.session.interfaces
supertokens_python.recipe.session.jwks
supertokens_python.recipe.session.jwt
supertokens_python.recipe.session.recipe
supertokens_python.recipe.session.recipe_implementation
supertokens_python.recipe.session.session_class
supertokens_python.recipe.session.session_functions
supertokens_python.recipe.session.session_request_functions
supertokens_python.recipe.session.syncio
supertokens_python.recipe.session.utils

Functions

def init(cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: "Union[Literal['lax', 'none', 'strict'], None]" = None, session_expired_status_code: Union[int, None] = None, anti_csrf: "Union[Literal['VIA_TOKEN', 'VIA_CUSTOM_HEADER', 'NONE'], None]" = None, get_token_transfer_method: "Union[Callable[[BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal['any']]], None]" = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[BaseOverrideConfig[RecipeInterface, APIInterface], None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None)

Classes

class InputOverrideConfig (**data: Any)

Base class for input override config with API overrides.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var model_config

The type of the None singleton.

class SessionOverrideConfig (**data: Any)

Base class for input override config with API overrides.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Inherited members

class InputErrorHandlers (on_token_theft_detected: Union[None, Callable[[BaseRequest, str, str, RecipeUserId, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]]] = None, on_try_refresh_token: Union[None, Callable[[BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]]] = None, on_unauthorised: Union[Callable[[BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]], None] = None, on_invalid_claim: Union[Callable[[BaseRequest, List[ClaimValidationError], BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]], None] = None, on_clear_duplicate_session_cookies: Union[None, Callable[[BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]]] = None)
Expand source code
class InputErrorHandlers(ErrorHandlers):
    def __init__(
        self,
        on_token_theft_detected: Union[
            None,
            Callable[
                [BaseRequest, str, str, RecipeUserId, BaseResponse],
                Union[BaseResponse, Awaitable[BaseResponse]],
            ],
        ] = None,
        on_try_refresh_token: Union[
            None,
            Callable[
                [BaseRequest, str, BaseResponse],
                Union[BaseResponse, Awaitable[BaseResponse]],
            ],
        ] = None,
        on_unauthorised: Union[
            Callable[
                [BaseRequest, str, BaseResponse],
                Union[BaseResponse, Awaitable[BaseResponse]],
            ],
            None,
        ] = None,
        on_invalid_claim: Union[
            Callable[
                [BaseRequest, List[ClaimValidationError], BaseResponse],
                Union[BaseResponse, Awaitable[BaseResponse]],
            ],
            None,
        ] = None,
        on_clear_duplicate_session_cookies: Union[
            None,
            Callable[
                [BaseRequest, str, BaseResponse],
                Union[BaseResponse, Awaitable[BaseResponse]],
            ],
        ] = None,
    ):
        if on_token_theft_detected is None:
            on_token_theft_detected = default_token_theft_detected_callback
        if on_try_refresh_token is None:
            on_try_refresh_token = default_try_refresh_token_callback
        if on_unauthorised is None:
            on_unauthorised = default_unauthorised_callback
        if on_invalid_claim is None:
            on_invalid_claim = default_invalid_claim_callback
        if on_clear_duplicate_session_cookies is None:
            on_clear_duplicate_session_cookies = (
                default_clear_duplicate_session_cookies_callback
            )
        super().__init__(
            on_token_theft_detected,
            on_try_refresh_token,
            on_unauthorised,
            on_invalid_claim,
            on_clear_duplicate_session_cookies,
        )

Ancestors

class SessionContainer (recipe_implementation: RecipeInterface, config: NormalisedSessionConfig, access_token: str, front_token: str, refresh_token: Optional[TokenInfo], anti_csrf_token: Optional[str], session_handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_access_token: Optional[Dict[str, Any]], req_res_info: Optional[ReqResInfo], access_token_updated: bool, tenant_id: str)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SessionContainer(ABC):  # pylint: disable=too-many-public-methods
    def __init__(
        self,
        recipe_implementation: RecipeInterface,
        config: NormalisedSessionConfig,
        access_token: str,
        front_token: str,
        refresh_token: Optional[TokenInfo],
        anti_csrf_token: Optional[str],
        session_handle: str,
        user_id: str,
        recipe_user_id: RecipeUserId,
        user_data_in_access_token: Optional[Dict[str, Any]],
        req_res_info: Optional[ReqResInfo],
        access_token_updated: bool,
        tenant_id: str,
    ):
        self.recipe_implementation = recipe_implementation
        self.config = config
        self.access_token = access_token
        self.front_token = front_token
        self.refresh_token = refresh_token
        self.anti_csrf_token = anti_csrf_token
        self.session_handle = session_handle
        self.user_id = user_id
        self.user_data_in_access_token = user_data_in_access_token
        self.req_res_info: Optional[ReqResInfo] = req_res_info
        self.access_token_updated = access_token_updated
        self.tenant_id = tenant_id
        self.recipe_user_id = recipe_user_id
        self.response_mutators: List[ResponseMutator] = []

    @abstractmethod
    async def revoke_session(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> None:
        pass

    @abstractmethod
    async def get_session_data_from_database(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        pass

    @abstractmethod
    async def update_session_data_in_database(
        self,
        new_session_data: Dict[str, Any],
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        pass

    @abstractmethod
    async def attach_to_request_response(
        self,
        request: BaseRequest,
        transfer_method: TokenTransferMethod,
        user_context: Dict[str, Any],
    ):
        pass

    @abstractmethod
    async def merge_into_access_token_payload(
        self,
        access_token_payload_update: JSONObject,
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        pass

    @abstractmethod
    def get_user_id(self, user_context: Optional[Dict[str, Any]] = None) -> str:
        pass

    @abstractmethod
    def get_recipe_user_id(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> RecipeUserId:
        pass

    @abstractmethod
    def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) -> str:
        pass

    @abstractmethod
    def get_access_token_payload(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        pass

    @abstractmethod
    def get_handle(self, user_context: Optional[Dict[str, Any]] = None) -> str:
        pass

    @abstractmethod
    def get_all_session_tokens_dangerously(self) -> GetSessionTokensDangerouslyDict:
        pass

    @abstractmethod
    def get_access_token(self, user_context: Optional[Dict[str, Any]] = None) -> str:
        pass

    @abstractmethod
    async def get_time_created(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> int:
        pass

    @abstractmethod
    async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int:
        pass

    @abstractmethod
    async def assert_claims(
        self,
        claim_validators: List[SessionClaimValidator],
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        pass

    @abstractmethod
    async def fetch_and_set_claim(
        self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None
    ) -> None:
        pass

    @abstractmethod
    async def set_claim_value(
        self,
        claim: SessionClaim[_T],
        value: _T,
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        pass

    @abstractmethod
    async def get_claim_value(
        self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None
    ) -> Union[_T, None]:
        pass

    @abstractmethod
    async def remove_claim(
        self,
        claim: SessionClaim[Any],
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        pass

    def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int:
        return sync(self.get_expiry(user_context))

    def sync_revoke_session(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> None:
        return sync(self.revoke_session(user_context=user_context))

    def sync_get_session_data_from_database(
        self, user_context: Union[Dict[str, Any], None] = None
    ) -> Dict[str, Any]:
        return sync(self.get_session_data_from_database(user_context))

    def sync_get_time_created(
        self, user_context: Optional[Dict[str, Any]] = None
    ) -> int:
        return sync(self.get_time_created(user_context))

    def sync_merge_into_access_token_payload(
        self,
        access_token_payload_update: Dict[str, Any],
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        return sync(
            self.merge_into_access_token_payload(
                access_token_payload_update, user_context
            )
        )

    def sync_update_session_data_in_database(
        self,
        new_session_data: Dict[str, Any],
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        return sync(
            self.update_session_data_in_database(new_session_data, user_context)
        )

    # Session claims sync functions:
    def sync_assert_claims(
        self,
        claim_validators: List[SessionClaimValidator],
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        return sync(self.assert_claims(claim_validators, user_context))

    def sync_fetch_and_set_claim(
        self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None
    ) -> None:
        return sync(self.fetch_and_set_claim(claim, user_context))

    def sync_set_claim_value(
        self,
        claim: SessionClaim[_T],
        value: _T,
        user_context: Optional[Dict[str, Any]] = None,
    ) -> None:
        return sync(self.set_claim_value(claim, value, user_context))

    def sync_get_claim_value(
        self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None
    ) -> Union[_T, None]:
        return sync(self.get_claim_value(claim, user_context))

    def sync_remove_claim(
        self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None
    ) -> None:
        return sync(self.remove_claim(claim, user_context))

    def sync_attach_to_request_response(
        self,
        request: BaseRequest,
        transfer_method: TokenTransferMethod,
        user_context: Dict[str, Any],
    ) -> None:
        return sync(
            self.attach_to_request_response(request, transfer_method, user_context)
        )

    # This is there so that we can do session["..."] to access some of the members of this class
    def __getitem__(self, item: str):
        return getattr(self, item)

Ancestors

  • abc.ABC

Subclasses

Methods

async def assert_claims(self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None) ‑> None
async def attach_to_request_response(self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any])
async def fetch_and_set_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def get_access_token(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
def get_access_token_payload(self, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
def get_all_session_tokens_dangerously(self) ‑> GetSessionTokensDangerouslyDict
async def get_claim_value(self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None) ‑> Optional[~_T]
async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def get_handle(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
def get_recipe_user_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> RecipeUserId
async def get_session_data_from_database(self, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
async def get_time_created(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def get_user_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
async def merge_into_access_token_payload(self, access_token_payload_update: JSONObject, user_context: Optional[Dict[str, Any]] = None) ‑> None
async def remove_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
async def revoke_session(self, user_context: Optional[Dict[str, Any]] = None) ‑> None
async def set_claim_value(self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_assert_claims(self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_attach_to_request_response(self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any])
def sync_fetch_and_set_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_get_claim_value(self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None) ‑> Optional[~_T]
def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def sync_get_session_data_from_database(self, user_context: Union[Dict[str, Any], None] = None) ‑> Dict[str, Any]
def sync_get_time_created(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def sync_merge_into_access_token_payload(self, access_token_payload_update: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_remove_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_revoke_session(self, user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_set_claim_value(self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_update_session_data_in_database(self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
async def update_session_data_in_database(self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
class SessionRecipe (recipe_id: str, app_info: AppInfo, config: SessionConfig)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SessionRecipe(RecipeModule):
    recipe_id = "session"
    __instance = None

    def __init__(
        self,
        recipe_id: str,
        app_info: AppInfo,
        config: SessionConfig,
    ):
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(
            app_info=app_info,
            config=config,
        )
        log_debug_message(
            "session init: anti_csrf: %s", self.config.anti_csrf_function_or_string
        )
        if self.config.cookie_domain is not None:
            log_debug_message(
                "session init: cookie_domain: %s", self.config.cookie_domain
            )
        else:
            log_debug_message("session init: cookie_domain: None")

        # we check the input cookie_same_site because the normalised version is
        # always a function.
        if config.cookie_same_site is not None:
            log_debug_message(
                "session init: cookie_same_site: %s", config.cookie_same_site
            )
        else:
            log_debug_message("session init: cookie_same_site: function")

        log_debug_message(
            "session init: cookie_secure: %s", str(self.config.cookie_secure)
        )
        log_debug_message(
            "session init: refresh_token_path: %s ",
            self.config.refresh_token_path.get_as_string_dangerous(),
        )
        log_debug_message(
            "session init: session_expired_status_code: %s",
            str(self.config.session_expired_status_code),
        )
        recipe_implementation = RecipeImplementation(
            Querier.get_instance(recipe_id), self.config, self.app_info
        )
        self.recipe_implementation: RecipeInterface = self.config.override.functions(
            recipe_implementation
        )

        from .api.implementation import APIImplementation

        api_implementation = APIImplementation()
        self.api_implementation: APIInterface = self.config.override.apis(
            api_implementation
        )

        self.claims_added_by_other_recipes: List[SessionClaim[Any]] = []
        self.claim_validators_added_by_other_recipes: List[SessionClaimValidator] = []

    def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and (
            isinstance(err, SuperTokensSessionError)
        )

    def get_apis_handled(self) -> List[APIHandled]:
        apis_handled = [
            APIHandled(
                NormalisedURLPath(SESSION_REFRESH),
                "post",
                SESSION_REFRESH,
                self.api_implementation.disable_refresh_post,
            ),
            APIHandled(
                NormalisedURLPath(SIGNOUT),
                "post",
                SIGNOUT,
                self.api_implementation.disable_signout_post,
            ),
        ]

        return apis_handled

    async def handle_api_request(
        self,
        request_id: str,
        tenant_id: str,
        request: BaseRequest,
        path: NormalisedURLPath,
        method: str,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> Union[BaseResponse, None]:
        if request_id == SESSION_REFRESH:
            return await handle_refresh_api(
                self.api_implementation,
                APIOptions(
                    request,
                    response,
                    self.recipe_id,
                    self.config,
                    self.recipe_implementation,
                ),
                user_context,
            )
        if request_id == SIGNOUT:
            return await handle_signout_api(
                self.api_implementation,
                APIOptions(
                    request,
                    response,
                    self.recipe_id,
                    self.config,
                    self.recipe_implementation,
                ),
                user_context,
            )
        raise Exception("should never happen")

    async def handle_error(
        self,
        request: BaseRequest,
        err: SuperTokensError,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> BaseResponse:
        if isinstance(err, SuperTokensSessionError):
            for mutator in err.response_mutators:
                mutator(response, user_context)

        if isinstance(err, UnauthorisedError):
            log_debug_message("errorHandler: returning UNAUTHORISED")
            if err.clear_tokens:
                log_debug_message("Clearing tokens because of UNAUTHORISED response")
                clear_session_from_all_token_transfer_methods(
                    response, self, request, user_context
                )
            return await self.config.error_handlers.on_unauthorised(
                request, str(err), response
            )
        if isinstance(err, TokenTheftError):
            log_debug_message("errorHandler: returning TOKEN_THEFT_DETECTED")
            log_debug_message(
                "Clearing tokens because of TOKEN_THEFT_DETECTED response"
            )
            clear_session_from_all_token_transfer_methods(
                response, self, request, user_context
            )
            return await self.config.error_handlers.on_token_theft_detected(
                request, err.session_handle, err.user_id, err.recipe_user_id, response
            )
        if isinstance(err, InvalidClaimsError):
            log_debug_message("errorHandler: returning INVALID_CLAIMS")
            return await self.config.error_handlers.on_invalid_claim(
                self, request, err.payload, response
            )
        if isinstance(err, ClearDuplicateSessionCookiesError):
            log_debug_message("errorHandler: returning CLEAR_DUPLICATE_SESSION_COOKIES")
            return await self.config.error_handlers.on_clear_duplicate_session_cookies(
                request, str(err), response
            )

        log_debug_message("errorHandler: returning TRY_REFRESH_TOKEN")
        return await self.config.error_handlers.on_try_refresh_token(
            request, str(err), response
        )

    def get_all_cors_headers(self) -> List[str]:
        cors_headers = get_cors_allowed_headers()

        return cors_headers

    @staticmethod
    def init(
        cookie_domain: Union[str, None] = None,
        older_cookie_domain: Union[str, None] = None,
        cookie_secure: Union[bool, None] = None,
        cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
        session_expired_status_code: Union[int, None] = None,
        anti_csrf: Union[
            Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None
        ] = None,
        get_token_transfer_method: Union[
            Callable[
                [BaseRequest, bool, Dict[str, Any]],
                Union[TokenTransferMethod, Literal["any"]],
            ],
            None,
        ] = None,
        error_handlers: Union[InputErrorHandlers, None] = None,
        override: Union[SessionOverrideConfig, None] = None,
        invalid_claim_status_code: Union[int, None] = None,
        use_dynamic_access_token_signing_key: Union[bool, None] = None,
        expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
        jwks_refresh_interval_sec: Union[int, None] = None,
    ):
        from supertokens_python.plugins import OverrideMap, apply_plugins

        config = SessionConfig(
            cookie_domain=cookie_domain,
            older_cookie_domain=older_cookie_domain,
            cookie_secure=cookie_secure,
            cookie_same_site=cookie_same_site,
            session_expired_status_code=session_expired_status_code,
            anti_csrf=anti_csrf,
            get_token_transfer_method=get_token_transfer_method,
            error_handlers=error_handlers,
            override=override,
            invalid_claim_status_code=invalid_claim_status_code,
            use_dynamic_access_token_signing_key=use_dynamic_access_token_signing_key,
            expose_access_token_to_frontend_in_cookie_based_auth=expose_access_token_to_frontend_in_cookie_based_auth,
            jwks_refresh_interval_sec=jwks_refresh_interval_sec,
        )

        def func(app_info: AppInfo, plugins: List[OverrideMap]):
            if SessionRecipe.__instance is None:
                SessionRecipe.__instance = SessionRecipe(
                    recipe_id=SessionRecipe.recipe_id,
                    app_info=app_info,
                    config=apply_plugins(
                        recipe_id=SessionRecipe.recipe_id,
                        config=config,
                        plugins=plugins,
                    ),
                )
                return SessionRecipe.__instance
            raise_general_exception(
                "Session recipe has already been initialised. Please check your code for bugs."
            )

        return func

    @staticmethod
    def get_instance() -> SessionRecipe:
        if SessionRecipe.__instance is not None:
            return SessionRecipe.__instance
        raise_general_exception(
            "Initialisation not done. Did you forget to call the SuperTokens.init function?"
        )

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise_general_exception("calling testing function in non testing env")
        SessionRecipe.__instance = None

    def add_claim_from_other_recipe(self, claim: SessionClaim[Any]):
        # We are throwing here (and not in addClaimValidatorFromOtherRecipe) because if multiple
        # claims are added with the same key they will overwrite each other. Validators will all run
        # and work as expected even if they are added multiple times.
        if claim.key in [c.key for c in self.claims_added_by_other_recipes]:
            raise Exception("Claim added by multiple recipes")

        self.claims_added_by_other_recipes.append(claim)

    def get_claims_added_by_other_recipes(self) -> List[SessionClaim[Any]]:
        return self.claims_added_by_other_recipes

    def add_claim_validator_from_other_recipe(
        self, claim_validator: SessionClaimValidator
    ):
        self.claim_validators_added_by_other_recipes.append(claim_validator)

    def get_claim_validators_added_by_other_recipes(
        self,
    ) -> List[SessionClaimValidator]:
        return self.claim_validators_added_by_other_recipes

    async def verify_session(
        self,
        request: BaseRequest,
        anti_csrf_check: Union[bool, None],
        session_required: bool,
        check_database: bool,
        override_global_claim_validators: Optional[
            Callable[
                [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
                MaybeAwaitable[List[SessionClaimValidator]],
            ]
        ],
        user_context: Dict[str, Any],
    ):
        _ = user_context

        return await self.api_implementation.verify_session(
            APIOptions(
                request,
                None,
                self.recipe_id,
                self.config,
                self.recipe_implementation,
            ),
            anti_csrf_check,
            session_required,
            check_database,
            override_global_claim_validators,
            user_context,
        )

Ancestors

Class variables

var recipe_id

The type of the None singleton.

Static methods

def get_instance() ‑> SessionRecipe
def init(cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: "Union[Literal['lax', 'none', 'strict'], None]" = None, session_expired_status_code: Union[int, None] = None, anti_csrf: "Union[Literal['VIA_TOKEN', 'VIA_CUSTOM_HEADER', 'NONE'], None]" = None, get_token_transfer_method: "Union[Callable[[BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal['any']]], None]" = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[BaseOverrideConfig[RecipeInterface, APIInterface], None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None)
def reset()

Methods

def add_claim_from_other_recipe(self, claim: SessionClaim[Any])
def add_claim_validator_from_other_recipe(self, claim_validator: SessionClaimValidator)
def get_all_cors_headers(self) ‑> List[str]
def get_apis_handled(self) ‑> List[APIHandled]
def get_claim_validators_added_by_other_recipes(self) ‑> List[SessionClaimValidator]
def get_claims_added_by_other_recipes(self) ‑> List[SessionClaim[Any]]
async def handle_api_request(self, request_id: str, tenant_id: str, request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any])
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any])
def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> bool
async def verify_session(self, request: BaseRequest, anti_csrf_check: Union[bool, None], session_required: bool, check_database: bool, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]], user_context: Dict[str, Any])

Inherited members