Module supertokens_python.recipe.session.recipe

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from os import environ
from typing import TYPE_CHECKING, Any, Dict, List, Union, Callable, Optional

from supertokens_python.framework.response import BaseResponse
from typing_extensions import Literal

from .cookie_and_header import (
    get_cors_allowed_headers,
)
from .exceptions import (
    ClearDuplicateSessionCookiesError,
    SuperTokensSessionError,
    TokenTheftError,
    UnauthorisedError,
    InvalidClaimsError,
)
from ...types import MaybeAwaitable

if TYPE_CHECKING:
    from supertokens_python.framework import BaseRequest
    from supertokens_python.supertokens import AppInfo

from supertokens_python.exceptions import SuperTokensError, raise_general_exception
from supertokens_python.logger import log_debug_message
from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.querier import Querier
from supertokens_python.recipe.openid.recipe import OpenIdRecipe
from supertokens_python.recipe_module import APIHandled, RecipeModule

from .constants import SESSION_REFRESH, SIGNOUT
from .interfaces import (
    APIInterface,
    APIOptions,
    RecipeInterface,
    SessionClaim,
    SessionClaimValidator,
    SessionContainer,
)
from .recipe_implementation import (
    RecipeImplementation,
)
from .api import handle_refresh_api, handle_signout_api
from .utils import (
    InputErrorHandlers,
    InputOverrideConfig,
    TokenTransferMethod,
    validate_and_normalise_user_input,
)
from .cookie_and_header import clear_session_from_all_token_transfer_methods


class SessionRecipe(RecipeModule):
    recipe_id = "session"
    __instance = None

    def __init__(
        self,
        recipe_id: str,
        app_info: AppInfo,
        cookie_domain: Union[str, None] = None,
        older_cookie_domain: Union[str, None] = None,
        cookie_secure: Union[bool, None] = None,
        cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
        session_expired_status_code: Union[int, None] = None,
        anti_csrf: Union[
            Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None
        ] = None,
        get_token_transfer_method: Union[
            Callable[
                [BaseRequest, bool, Dict[str, Any]],
                Union[TokenTransferMethod, Literal["any"]],
            ],
            None,
        ] = None,
        error_handlers: Union[InputErrorHandlers, None] = None,
        override: Union[InputOverrideConfig, None] = None,
        invalid_claim_status_code: Union[int, None] = None,
        use_dynamic_access_token_signing_key: Union[bool, None] = None,
        expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
        jwks_refresh_interval_sec: Union[int, None] = None,
    ):
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(
            app_info,
            cookie_domain,
            older_cookie_domain,
            cookie_secure,
            cookie_same_site,
            session_expired_status_code,
            anti_csrf,
            get_token_transfer_method,
            error_handlers,
            override,
            invalid_claim_status_code,
            use_dynamic_access_token_signing_key,
            expose_access_token_to_frontend_in_cookie_based_auth,
            jwks_refresh_interval_sec,
        )
        self.openid_recipe = OpenIdRecipe(
            recipe_id,
            app_info,
            None,
            None,
            override.openid_feature if override is not None else None,
        )
        log_debug_message(
            "session init: anti_csrf: %s", self.config.anti_csrf_function_or_string
        )
        if self.config.cookie_domain is not None:
            log_debug_message(
                "session init: cookie_domain: %s", self.config.cookie_domain
            )
        else:
            log_debug_message("session init: cookie_domain: None")

        # we check the input cookie_same_site because the normalised version is
        # always a function.
        if cookie_same_site is not None:
            log_debug_message("session init: cookie_same_site: %s", cookie_same_site)
        else:
            log_debug_message("session init: cookie_same_site: function")

        log_debug_message(
            "session init: cookie_secure: %s", str(self.config.cookie_secure)
        )
        log_debug_message(
            "session init: refresh_token_path: %s ",
            self.config.refresh_token_path.get_as_string_dangerous(),
        )
        log_debug_message(
            "session init: session_expired_status_code: %s",
            str(self.config.session_expired_status_code),
        )
        recipe_implementation = RecipeImplementation(
            Querier.get_instance(recipe_id), self.config, self.app_info
        )
        self.recipe_implementation: RecipeInterface = (
            recipe_implementation
            if self.config.override.functions is None
            else self.config.override.functions(recipe_implementation)
        )

        from .api.implementation import APIImplementation

        api_implementation = APIImplementation()
        self.api_implementation: APIInterface = (
            api_implementation
            if self.config.override.apis is None
            else self.config.override.apis(api_implementation)
        )

        self.claims_added_by_other_recipes: List[SessionClaim[Any]] = []
        self.claim_validators_added_by_other_recipes: List[SessionClaimValidator] = []

    def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and (
            isinstance(err, SuperTokensSessionError)
            or self.openid_recipe.is_error_from_this_recipe_based_on_instance(err)
        )

    def get_apis_handled(self) -> List[APIHandled]:
        apis_handled = [
            APIHandled(
                NormalisedURLPath(SESSION_REFRESH),
                "post",
                SESSION_REFRESH,
                self.api_implementation.disable_refresh_post,
            ),
            APIHandled(
                NormalisedURLPath(SIGNOUT),
                "post",
                SIGNOUT,
                self.api_implementation.disable_signout_post,
            ),
        ]
        apis_handled.extend(self.openid_recipe.get_apis_handled())

        return apis_handled

    async def handle_api_request(
        self,
        request_id: str,
        tenant_id: str,
        request: BaseRequest,
        path: NormalisedURLPath,
        method: str,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> Union[BaseResponse, None]:
        if request_id == SESSION_REFRESH:
            return await handle_refresh_api(
                self.api_implementation,
                APIOptions(
                    request,
                    response,
                    self.recipe_id,
                    self.config,
                    self.recipe_implementation,
                ),
                user_context,
            )
        if request_id == SIGNOUT:
            return await handle_signout_api(
                self.api_implementation,
                APIOptions(
                    request,
                    response,
                    self.recipe_id,
                    self.config,
                    self.recipe_implementation,
                ),
                user_context,
            )
        return await self.openid_recipe.handle_api_request(
            request_id, tenant_id, request, path, method, response, user_context
        )

    async def handle_error(
        self,
        request: BaseRequest,
        err: SuperTokensError,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> BaseResponse:
        if (
            isinstance(err, SuperTokensSessionError)
            and err.response_mutators is not None
        ):
            for mutator in err.response_mutators:
                mutator(response, user_context)

        if isinstance(err, UnauthorisedError):
            log_debug_message("errorHandler: returning UNAUTHORISED")
            if err.clear_tokens:
                log_debug_message("Clearing tokens because of UNAUTHORISED response")
                clear_session_from_all_token_transfer_methods(
                    response, self, request, user_context
                )
            return await self.config.error_handlers.on_unauthorised(
                request, str(err), response
            )
        if isinstance(err, TokenTheftError):
            log_debug_message("errorHandler: returning TOKEN_THEFT_DETECTED")
            log_debug_message(
                "Clearing tokens because of TOKEN_THEFT_DETECTED response"
            )
            clear_session_from_all_token_transfer_methods(
                response, self, request, user_context
            )
            return await self.config.error_handlers.on_token_theft_detected(
                request, err.session_handle, err.user_id, response
            )
        if isinstance(err, InvalidClaimsError):
            log_debug_message("errorHandler: returning INVALID_CLAIMS")
            return await self.config.error_handlers.on_invalid_claim(
                self, request, err.payload, response
            )
        if isinstance(err, ClearDuplicateSessionCookiesError):
            log_debug_message("errorHandler: returning CLEAR_DUPLICATE_SESSION_COOKIES")
            return await self.config.error_handlers.on_clear_duplicate_session_cookies(
                request, str(err), response
            )

        log_debug_message("errorHandler: returning TRY_REFRESH_TOKEN")
        return await self.config.error_handlers.on_try_refresh_token(
            request, str(err), response
        )

    def get_all_cors_headers(self) -> List[str]:
        cors_headers = get_cors_allowed_headers()
        cors_headers.extend(self.openid_recipe.get_all_cors_headers())

        return cors_headers

    @staticmethod
    def init(
        cookie_domain: Union[str, None] = None,
        older_cookie_domain: Union[str, None] = None,
        cookie_secure: Union[bool, None] = None,
        cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
        session_expired_status_code: Union[int, None] = None,
        anti_csrf: Union[
            Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None
        ] = None,
        get_token_transfer_method: Union[
            Callable[
                [BaseRequest, bool, Dict[str, Any]],
                Union[TokenTransferMethod, Literal["any"]],
            ],
            None,
        ] = None,
        error_handlers: Union[InputErrorHandlers, None] = None,
        override: Union[InputOverrideConfig, None] = None,
        invalid_claim_status_code: Union[int, None] = None,
        use_dynamic_access_token_signing_key: Union[bool, None] = None,
        expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
        jwks_refresh_interval_sec: Union[int, None] = None,
    ):
        def func(app_info: AppInfo):
            if SessionRecipe.__instance is None:
                SessionRecipe.__instance = SessionRecipe(
                    SessionRecipe.recipe_id,
                    app_info,
                    cookie_domain,
                    older_cookie_domain,
                    cookie_secure,
                    cookie_same_site,
                    session_expired_status_code,
                    anti_csrf,
                    get_token_transfer_method,
                    error_handlers,
                    override,
                    invalid_claim_status_code,
                    use_dynamic_access_token_signing_key,
                    expose_access_token_to_frontend_in_cookie_based_auth,
                    jwks_refresh_interval_sec,
                )
                return SessionRecipe.__instance
            raise_general_exception(
                "Session recipe has already been initialised. Please check your code for bugs."
            )

        return func

    @staticmethod
    def get_instance() -> SessionRecipe:
        if SessionRecipe.__instance is not None:
            return SessionRecipe.__instance
        raise_general_exception(
            "Initialisation not done. Did you forget to call the SuperTokens.init function?"
        )

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise_general_exception("calling testing function in non testing env")
        SessionRecipe.__instance = None

    def add_claim_from_other_recipe(self, claim: SessionClaim[Any]):
        # We are throwing here (and not in addClaimValidatorFromOtherRecipe) because if multiple
        # claims are added with the same key they will overwrite each other. Validators will all run
        # and work as expected even if they are added multiple times.
        if claim.key in [c.key for c in self.claims_added_by_other_recipes]:
            raise Exception("Claim added by multiple recipes")

        self.claims_added_by_other_recipes.append(claim)

    def get_claims_added_by_other_recipes(self) -> List[SessionClaim[Any]]:
        return self.claims_added_by_other_recipes

    def add_claim_validator_from_other_recipe(
        self, claim_validator: SessionClaimValidator
    ):
        self.claim_validators_added_by_other_recipes.append(claim_validator)

    def get_claim_validators_added_by_other_recipes(
        self,
    ) -> List[SessionClaimValidator]:
        return self.claim_validators_added_by_other_recipes

    async def verify_session(
        self,
        request: BaseRequest,
        anti_csrf_check: Union[bool, None],
        session_required: bool,
        check_database: bool,
        override_global_claim_validators: Optional[
            Callable[
                [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
                MaybeAwaitable[List[SessionClaimValidator]],
            ]
        ],
        user_context: Dict[str, Any],
    ):
        _ = user_context

        return await self.api_implementation.verify_session(
            APIOptions(
                request,
                None,
                self.recipe_id,
                self.config,
                self.recipe_implementation,
            ),
            anti_csrf_check,
            session_required,
            check_database,
            override_global_claim_validators,
            user_context,
        )

Classes

class SessionRecipe (recipe_id: str, app_info: AppInfo, cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: "Union[Literal['lax', 'none', 'strict'], None]" = None, session_expired_status_code: Union[int, None] = None, anti_csrf: "Union[Literal['VIA_TOKEN', 'VIA_CUSTOM_HEADER', 'NONE'], None]" = None, get_token_transfer_method: "Union[Callable[[BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal['any']]], None]" = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[InputOverrideConfig, None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SessionRecipe(RecipeModule):
    recipe_id = "session"
    __instance = None

    def __init__(
        self,
        recipe_id: str,
        app_info: AppInfo,
        cookie_domain: Union[str, None] = None,
        older_cookie_domain: Union[str, None] = None,
        cookie_secure: Union[bool, None] = None,
        cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
        session_expired_status_code: Union[int, None] = None,
        anti_csrf: Union[
            Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None
        ] = None,
        get_token_transfer_method: Union[
            Callable[
                [BaseRequest, bool, Dict[str, Any]],
                Union[TokenTransferMethod, Literal["any"]],
            ],
            None,
        ] = None,
        error_handlers: Union[InputErrorHandlers, None] = None,
        override: Union[InputOverrideConfig, None] = None,
        invalid_claim_status_code: Union[int, None] = None,
        use_dynamic_access_token_signing_key: Union[bool, None] = None,
        expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
        jwks_refresh_interval_sec: Union[int, None] = None,
    ):
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(
            app_info,
            cookie_domain,
            older_cookie_domain,
            cookie_secure,
            cookie_same_site,
            session_expired_status_code,
            anti_csrf,
            get_token_transfer_method,
            error_handlers,
            override,
            invalid_claim_status_code,
            use_dynamic_access_token_signing_key,
            expose_access_token_to_frontend_in_cookie_based_auth,
            jwks_refresh_interval_sec,
        )
        self.openid_recipe = OpenIdRecipe(
            recipe_id,
            app_info,
            None,
            None,
            override.openid_feature if override is not None else None,
        )
        log_debug_message(
            "session init: anti_csrf: %s", self.config.anti_csrf_function_or_string
        )
        if self.config.cookie_domain is not None:
            log_debug_message(
                "session init: cookie_domain: %s", self.config.cookie_domain
            )
        else:
            log_debug_message("session init: cookie_domain: None")

        # we check the input cookie_same_site because the normalised version is
        # always a function.
        if cookie_same_site is not None:
            log_debug_message("session init: cookie_same_site: %s", cookie_same_site)
        else:
            log_debug_message("session init: cookie_same_site: function")

        log_debug_message(
            "session init: cookie_secure: %s", str(self.config.cookie_secure)
        )
        log_debug_message(
            "session init: refresh_token_path: %s ",
            self.config.refresh_token_path.get_as_string_dangerous(),
        )
        log_debug_message(
            "session init: session_expired_status_code: %s",
            str(self.config.session_expired_status_code),
        )
        recipe_implementation = RecipeImplementation(
            Querier.get_instance(recipe_id), self.config, self.app_info
        )
        self.recipe_implementation: RecipeInterface = (
            recipe_implementation
            if self.config.override.functions is None
            else self.config.override.functions(recipe_implementation)
        )

        from .api.implementation import APIImplementation

        api_implementation = APIImplementation()
        self.api_implementation: APIInterface = (
            api_implementation
            if self.config.override.apis is None
            else self.config.override.apis(api_implementation)
        )

        self.claims_added_by_other_recipes: List[SessionClaim[Any]] = []
        self.claim_validators_added_by_other_recipes: List[SessionClaimValidator] = []

    def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and (
            isinstance(err, SuperTokensSessionError)
            or self.openid_recipe.is_error_from_this_recipe_based_on_instance(err)
        )

    def get_apis_handled(self) -> List[APIHandled]:
        apis_handled = [
            APIHandled(
                NormalisedURLPath(SESSION_REFRESH),
                "post",
                SESSION_REFRESH,
                self.api_implementation.disable_refresh_post,
            ),
            APIHandled(
                NormalisedURLPath(SIGNOUT),
                "post",
                SIGNOUT,
                self.api_implementation.disable_signout_post,
            ),
        ]
        apis_handled.extend(self.openid_recipe.get_apis_handled())

        return apis_handled

    async def handle_api_request(
        self,
        request_id: str,
        tenant_id: str,
        request: BaseRequest,
        path: NormalisedURLPath,
        method: str,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> Union[BaseResponse, None]:
        if request_id == SESSION_REFRESH:
            return await handle_refresh_api(
                self.api_implementation,
                APIOptions(
                    request,
                    response,
                    self.recipe_id,
                    self.config,
                    self.recipe_implementation,
                ),
                user_context,
            )
        if request_id == SIGNOUT:
            return await handle_signout_api(
                self.api_implementation,
                APIOptions(
                    request,
                    response,
                    self.recipe_id,
                    self.config,
                    self.recipe_implementation,
                ),
                user_context,
            )
        return await self.openid_recipe.handle_api_request(
            request_id, tenant_id, request, path, method, response, user_context
        )

    async def handle_error(
        self,
        request: BaseRequest,
        err: SuperTokensError,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> BaseResponse:
        if (
            isinstance(err, SuperTokensSessionError)
            and err.response_mutators is not None
        ):
            for mutator in err.response_mutators:
                mutator(response, user_context)

        if isinstance(err, UnauthorisedError):
            log_debug_message("errorHandler: returning UNAUTHORISED")
            if err.clear_tokens:
                log_debug_message("Clearing tokens because of UNAUTHORISED response")
                clear_session_from_all_token_transfer_methods(
                    response, self, request, user_context
                )
            return await self.config.error_handlers.on_unauthorised(
                request, str(err), response
            )
        if isinstance(err, TokenTheftError):
            log_debug_message("errorHandler: returning TOKEN_THEFT_DETECTED")
            log_debug_message(
                "Clearing tokens because of TOKEN_THEFT_DETECTED response"
            )
            clear_session_from_all_token_transfer_methods(
                response, self, request, user_context
            )
            return await self.config.error_handlers.on_token_theft_detected(
                request, err.session_handle, err.user_id, response
            )
        if isinstance(err, InvalidClaimsError):
            log_debug_message("errorHandler: returning INVALID_CLAIMS")
            return await self.config.error_handlers.on_invalid_claim(
                self, request, err.payload, response
            )
        if isinstance(err, ClearDuplicateSessionCookiesError):
            log_debug_message("errorHandler: returning CLEAR_DUPLICATE_SESSION_COOKIES")
            return await self.config.error_handlers.on_clear_duplicate_session_cookies(
                request, str(err), response
            )

        log_debug_message("errorHandler: returning TRY_REFRESH_TOKEN")
        return await self.config.error_handlers.on_try_refresh_token(
            request, str(err), response
        )

    def get_all_cors_headers(self) -> List[str]:
        cors_headers = get_cors_allowed_headers()
        cors_headers.extend(self.openid_recipe.get_all_cors_headers())

        return cors_headers

    @staticmethod
    def init(
        cookie_domain: Union[str, None] = None,
        older_cookie_domain: Union[str, None] = None,
        cookie_secure: Union[bool, None] = None,
        cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
        session_expired_status_code: Union[int, None] = None,
        anti_csrf: Union[
            Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None
        ] = None,
        get_token_transfer_method: Union[
            Callable[
                [BaseRequest, bool, Dict[str, Any]],
                Union[TokenTransferMethod, Literal["any"]],
            ],
            None,
        ] = None,
        error_handlers: Union[InputErrorHandlers, None] = None,
        override: Union[InputOverrideConfig, None] = None,
        invalid_claim_status_code: Union[int, None] = None,
        use_dynamic_access_token_signing_key: Union[bool, None] = None,
        expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
        jwks_refresh_interval_sec: Union[int, None] = None,
    ):
        def func(app_info: AppInfo):
            if SessionRecipe.__instance is None:
                SessionRecipe.__instance = SessionRecipe(
                    SessionRecipe.recipe_id,
                    app_info,
                    cookie_domain,
                    older_cookie_domain,
                    cookie_secure,
                    cookie_same_site,
                    session_expired_status_code,
                    anti_csrf,
                    get_token_transfer_method,
                    error_handlers,
                    override,
                    invalid_claim_status_code,
                    use_dynamic_access_token_signing_key,
                    expose_access_token_to_frontend_in_cookie_based_auth,
                    jwks_refresh_interval_sec,
                )
                return SessionRecipe.__instance
            raise_general_exception(
                "Session recipe has already been initialised. Please check your code for bugs."
            )

        return func

    @staticmethod
    def get_instance() -> SessionRecipe:
        if SessionRecipe.__instance is not None:
            return SessionRecipe.__instance
        raise_general_exception(
            "Initialisation not done. Did you forget to call the SuperTokens.init function?"
        )

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise_general_exception("calling testing function in non testing env")
        SessionRecipe.__instance = None

    def add_claim_from_other_recipe(self, claim: SessionClaim[Any]):
        # We are throwing here (and not in addClaimValidatorFromOtherRecipe) because if multiple
        # claims are added with the same key they will overwrite each other. Validators will all run
        # and work as expected even if they are added multiple times.
        if claim.key in [c.key for c in self.claims_added_by_other_recipes]:
            raise Exception("Claim added by multiple recipes")

        self.claims_added_by_other_recipes.append(claim)

    def get_claims_added_by_other_recipes(self) -> List[SessionClaim[Any]]:
        return self.claims_added_by_other_recipes

    def add_claim_validator_from_other_recipe(
        self, claim_validator: SessionClaimValidator
    ):
        self.claim_validators_added_by_other_recipes.append(claim_validator)

    def get_claim_validators_added_by_other_recipes(
        self,
    ) -> List[SessionClaimValidator]:
        return self.claim_validators_added_by_other_recipes

    async def verify_session(
        self,
        request: BaseRequest,
        anti_csrf_check: Union[bool, None],
        session_required: bool,
        check_database: bool,
        override_global_claim_validators: Optional[
            Callable[
                [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
                MaybeAwaitable[List[SessionClaimValidator]],
            ]
        ],
        user_context: Dict[str, Any],
    ):
        _ = user_context

        return await self.api_implementation.verify_session(
            APIOptions(
                request,
                None,
                self.recipe_id,
                self.config,
                self.recipe_implementation,
            ),
            anti_csrf_check,
            session_required,
            check_database,
            override_global_claim_validators,
            user_context,
        )

Ancestors

Class variables

var get_tenant_id : Optional[Callable[[str, Dict[str, Any]], Awaitable[str]]]
var recipe_id

Static methods

def get_instance() ‑> SessionRecipe
Expand source code
@staticmethod
def get_instance() -> SessionRecipe:
    if SessionRecipe.__instance is not None:
        return SessionRecipe.__instance
    raise_general_exception(
        "Initialisation not done. Did you forget to call the SuperTokens.init function?"
    )
def init(cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: "Union[Literal['lax', 'none', 'strict'], None]" = None, session_expired_status_code: Union[int, None] = None, anti_csrf: "Union[Literal['VIA_TOKEN', 'VIA_CUSTOM_HEADER', 'NONE'], None]" = None, get_token_transfer_method: "Union[Callable[[BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal['any']]], None]" = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[InputOverrideConfig, None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None)
Expand source code
@staticmethod
def init(
    cookie_domain: Union[str, None] = None,
    older_cookie_domain: Union[str, None] = None,
    cookie_secure: Union[bool, None] = None,
    cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
    session_expired_status_code: Union[int, None] = None,
    anti_csrf: Union[
        Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None
    ] = None,
    get_token_transfer_method: Union[
        Callable[
            [BaseRequest, bool, Dict[str, Any]],
            Union[TokenTransferMethod, Literal["any"]],
        ],
        None,
    ] = None,
    error_handlers: Union[InputErrorHandlers, None] = None,
    override: Union[InputOverrideConfig, None] = None,
    invalid_claim_status_code: Union[int, None] = None,
    use_dynamic_access_token_signing_key: Union[bool, None] = None,
    expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
    jwks_refresh_interval_sec: Union[int, None] = None,
):
    def func(app_info: AppInfo):
        if SessionRecipe.__instance is None:
            SessionRecipe.__instance = SessionRecipe(
                SessionRecipe.recipe_id,
                app_info,
                cookie_domain,
                older_cookie_domain,
                cookie_secure,
                cookie_same_site,
                session_expired_status_code,
                anti_csrf,
                get_token_transfer_method,
                error_handlers,
                override,
                invalid_claim_status_code,
                use_dynamic_access_token_signing_key,
                expose_access_token_to_frontend_in_cookie_based_auth,
                jwks_refresh_interval_sec,
            )
            return SessionRecipe.__instance
        raise_general_exception(
            "Session recipe has already been initialised. Please check your code for bugs."
        )

    return func
def reset()
Expand source code
@staticmethod
def reset():
    if ("SUPERTOKENS_ENV" not in environ) or (
        environ["SUPERTOKENS_ENV"] != "testing"
    ):
        raise_general_exception("calling testing function in non testing env")
    SessionRecipe.__instance = None

Methods

def add_claim_from_other_recipe(self, claim: SessionClaim[Any])
Expand source code
def add_claim_from_other_recipe(self, claim: SessionClaim[Any]):
    # We are throwing here (and not in addClaimValidatorFromOtherRecipe) because if multiple
    # claims are added with the same key they will overwrite each other. Validators will all run
    # and work as expected even if they are added multiple times.
    if claim.key in [c.key for c in self.claims_added_by_other_recipes]:
        raise Exception("Claim added by multiple recipes")

    self.claims_added_by_other_recipes.append(claim)
def add_claim_validator_from_other_recipe(self, claim_validator: SessionClaimValidator)
Expand source code
def add_claim_validator_from_other_recipe(
    self, claim_validator: SessionClaimValidator
):
    self.claim_validators_added_by_other_recipes.append(claim_validator)
def get_all_cors_headers(self) ‑> List[str]
Expand source code
def get_all_cors_headers(self) -> List[str]:
    cors_headers = get_cors_allowed_headers()
    cors_headers.extend(self.openid_recipe.get_all_cors_headers())

    return cors_headers
def get_apis_handled(self) ‑> List[APIHandled]
Expand source code
def get_apis_handled(self) -> List[APIHandled]:
    apis_handled = [
        APIHandled(
            NormalisedURLPath(SESSION_REFRESH),
            "post",
            SESSION_REFRESH,
            self.api_implementation.disable_refresh_post,
        ),
        APIHandled(
            NormalisedURLPath(SIGNOUT),
            "post",
            SIGNOUT,
            self.api_implementation.disable_signout_post,
        ),
    ]
    apis_handled.extend(self.openid_recipe.get_apis_handled())

    return apis_handled
def get_claim_validators_added_by_other_recipes(self) ‑> List[SessionClaimValidator]
Expand source code
def get_claim_validators_added_by_other_recipes(
    self,
) -> List[SessionClaimValidator]:
    return self.claim_validators_added_by_other_recipes
def get_claims_added_by_other_recipes(self) ‑> List[SessionClaim[Any]]
Expand source code
def get_claims_added_by_other_recipes(self) -> List[SessionClaim[Any]]:
    return self.claims_added_by_other_recipes
async def handle_api_request(self, request_id: str, tenant_id: str, request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any]) ‑> Union[BaseResponse, None]
Expand source code
async def handle_api_request(
    self,
    request_id: str,
    tenant_id: str,
    request: BaseRequest,
    path: NormalisedURLPath,
    method: str,
    response: BaseResponse,
    user_context: Dict[str, Any],
) -> Union[BaseResponse, None]:
    if request_id == SESSION_REFRESH:
        return await handle_refresh_api(
            self.api_implementation,
            APIOptions(
                request,
                response,
                self.recipe_id,
                self.config,
                self.recipe_implementation,
            ),
            user_context,
        )
    if request_id == SIGNOUT:
        return await handle_signout_api(
            self.api_implementation,
            APIOptions(
                request,
                response,
                self.recipe_id,
                self.config,
                self.recipe_implementation,
            ),
            user_context,
        )
    return await self.openid_recipe.handle_api_request(
        request_id, tenant_id, request, path, method, response, user_context
    )
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any]) ‑> BaseResponse
Expand source code
async def handle_error(
    self,
    request: BaseRequest,
    err: SuperTokensError,
    response: BaseResponse,
    user_context: Dict[str, Any],
) -> BaseResponse:
    if (
        isinstance(err, SuperTokensSessionError)
        and err.response_mutators is not None
    ):
        for mutator in err.response_mutators:
            mutator(response, user_context)

    if isinstance(err, UnauthorisedError):
        log_debug_message("errorHandler: returning UNAUTHORISED")
        if err.clear_tokens:
            log_debug_message("Clearing tokens because of UNAUTHORISED response")
            clear_session_from_all_token_transfer_methods(
                response, self, request, user_context
            )
        return await self.config.error_handlers.on_unauthorised(
            request, str(err), response
        )
    if isinstance(err, TokenTheftError):
        log_debug_message("errorHandler: returning TOKEN_THEFT_DETECTED")
        log_debug_message(
            "Clearing tokens because of TOKEN_THEFT_DETECTED response"
        )
        clear_session_from_all_token_transfer_methods(
            response, self, request, user_context
        )
        return await self.config.error_handlers.on_token_theft_detected(
            request, err.session_handle, err.user_id, response
        )
    if isinstance(err, InvalidClaimsError):
        log_debug_message("errorHandler: returning INVALID_CLAIMS")
        return await self.config.error_handlers.on_invalid_claim(
            self, request, err.payload, response
        )
    if isinstance(err, ClearDuplicateSessionCookiesError):
        log_debug_message("errorHandler: returning CLEAR_DUPLICATE_SESSION_COOKIES")
        return await self.config.error_handlers.on_clear_duplicate_session_cookies(
            request, str(err), response
        )

    log_debug_message("errorHandler: returning TRY_REFRESH_TOKEN")
    return await self.config.error_handlers.on_try_refresh_token(
        request, str(err), response
    )
def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> bool
Expand source code
def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
    return isinstance(err, SuperTokensError) and (
        isinstance(err, SuperTokensSessionError)
        or self.openid_recipe.is_error_from_this_recipe_based_on_instance(err)
    )
async def verify_session(self, request: BaseRequest, anti_csrf_check: Union[bool, None], session_required: bool, check_database: bool, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]], user_context: Dict[str, Any])
Expand source code
async def verify_session(
    self,
    request: BaseRequest,
    anti_csrf_check: Union[bool, None],
    session_required: bool,
    check_database: bool,
    override_global_claim_validators: Optional[
        Callable[
            [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
            MaybeAwaitable[List[SessionClaimValidator]],
        ]
    ],
    user_context: Dict[str, Any],
):
    _ = user_context

    return await self.api_implementation.verify_session(
        APIOptions(
            request,
            None,
            self.recipe_id,
            self.config,
            self.recipe_implementation,
        ),
        anti_csrf_check,
        session_required,
        check_database,
        override_global_claim_validators,
        user_context,
    )