Module supertokens_python.recipe.session
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable, Dict, Union
from typing_extensions import Literal
from supertokens_python.framework import BaseRequest
from .interfaces import SessionContainer
from .recipe import SessionRecipe
from .utils import (
InputErrorHandlers,
InputOverrideConfig,
SessionOverrideConfig,
TokenTransferMethod,
)
if TYPE_CHECKING:
from supertokens_python.supertokens import RecipeInit
def init(
cookie_domain: Union[str, None] = None,
older_cookie_domain: Union[str, None] = None,
cookie_secure: Union[bool, None] = None,
cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None,
session_expired_status_code: Union[int, None] = None,
anti_csrf: Union[Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None] = None,
get_token_transfer_method: Union[
Callable[
[BaseRequest, bool, Dict[str, Any]],
Union[TokenTransferMethod, Literal["any"]],
],
None,
] = None,
error_handlers: Union[InputErrorHandlers, None] = None,
override: Union[SessionOverrideConfig, None] = None,
invalid_claim_status_code: Union[int, None] = None,
use_dynamic_access_token_signing_key: Union[bool, None] = None,
expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None,
jwks_refresh_interval_sec: Union[int, None] = None,
) -> RecipeInit:
return SessionRecipe.init(
cookie_domain,
older_cookie_domain,
cookie_secure,
cookie_same_site,
session_expired_status_code,
anti_csrf,
get_token_transfer_method,
error_handlers,
override,
invalid_claim_status_code,
use_dynamic_access_token_signing_key,
expose_access_token_to_frontend_in_cookie_based_auth,
jwks_refresh_interval_sec,
)
__all__ = [
"InputErrorHandlers",
"InputOverrideConfig", # deprecated, use SessionOverrideConfig instead
"SessionContainer",
"SessionOverrideConfig",
"SessionRecipe",
"TokenTransferMethod",
"init",
]
Sub-modules
supertokens_python.recipe.session.access_tokensupertokens_python.recipe.session.apisupertokens_python.recipe.session.asynciosupertokens_python.recipe.session.claim_base_classessupertokens_python.recipe.session.claimssupertokens_python.recipe.session.constantssupertokens_python.recipe.session.cookie_and_headersupertokens_python.recipe.session.exceptionssupertokens_python.recipe.session.frameworksupertokens_python.recipe.session.interfacessupertokens_python.recipe.session.jwkssupertokens_python.recipe.session.jwtsupertokens_python.recipe.session.recipesupertokens_python.recipe.session.recipe_implementationsupertokens_python.recipe.session.session_classsupertokens_python.recipe.session.session_functionssupertokens_python.recipe.session.session_request_functionssupertokens_python.recipe.session.synciosupertokens_python.recipe.session.utils
Functions
def init(cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: "Union[Literal['lax', 'none', 'strict'], None]" = None, session_expired_status_code: Union[int, None] = None, anti_csrf: "Union[Literal['VIA_TOKEN', 'VIA_CUSTOM_HEADER', 'NONE'], None]" = None, get_token_transfer_method: "Union[Callable[[BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal['any']]], None]" = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[BaseOverrideConfig[RecipeInterface, APIInterface], None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None)
Classes
class InputOverrideConfig (**data: Any)-
Base class for input override config with API overrides.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseOverrideConfig
- BaseOverrideConfigWithoutAPI
- CamelCaseBaseModel
- APIResponse
- abc.ABC
- pydantic.main.BaseModel
- typing.Generic
Class variables
var model_config-
The type of the None singleton.
class SessionOverrideConfig (**data: Any)-
Base class for input override config with API overrides.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseOverrideConfig
- BaseOverrideConfigWithoutAPI
- CamelCaseBaseModel
- APIResponse
- abc.ABC
- pydantic.main.BaseModel
- typing.Generic
Inherited members
class InputErrorHandlers (on_token_theft_detected: Union[None, Callable[[BaseRequest, str, str, RecipeUserId, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]]] = None, on_try_refresh_token: Union[None, Callable[[BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]]] = None, on_unauthorised: Union[Callable[[BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]], None] = None, on_invalid_claim: Union[Callable[[BaseRequest, List[ClaimValidationError], BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]], None] = None, on_clear_duplicate_session_cookies: Union[None, Callable[[BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]]]] = None)-
Expand source code
class InputErrorHandlers(ErrorHandlers): def __init__( self, on_token_theft_detected: Union[ None, Callable[ [BaseRequest, str, str, RecipeUserId, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]], ], ] = None, on_try_refresh_token: Union[ None, Callable[ [BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]], ], ] = None, on_unauthorised: Union[ Callable[ [BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]], ], None, ] = None, on_invalid_claim: Union[ Callable[ [BaseRequest, List[ClaimValidationError], BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]], ], None, ] = None, on_clear_duplicate_session_cookies: Union[ None, Callable[ [BaseRequest, str, BaseResponse], Union[BaseResponse, Awaitable[BaseResponse]], ], ] = None, ): if on_token_theft_detected is None: on_token_theft_detected = default_token_theft_detected_callback if on_try_refresh_token is None: on_try_refresh_token = default_try_refresh_token_callback if on_unauthorised is None: on_unauthorised = default_unauthorised_callback if on_invalid_claim is None: on_invalid_claim = default_invalid_claim_callback if on_clear_duplicate_session_cookies is None: on_clear_duplicate_session_cookies = ( default_clear_duplicate_session_cookies_callback ) super().__init__( on_token_theft_detected, on_try_refresh_token, on_unauthorised, on_invalid_claim, on_clear_duplicate_session_cookies, )Ancestors
class SessionContainer (recipe_implementation: RecipeInterface, config: NormalisedSessionConfig, access_token: str, front_token: str, refresh_token: Optional[TokenInfo], anti_csrf_token: Optional[str], session_handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_access_token: Optional[Dict[str, Any]], req_res_info: Optional[ReqResInfo], access_token_updated: bool, tenant_id: str)-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SessionContainer(ABC): # pylint: disable=too-many-public-methods def __init__( self, recipe_implementation: RecipeInterface, config: NormalisedSessionConfig, access_token: str, front_token: str, refresh_token: Optional[TokenInfo], anti_csrf_token: Optional[str], session_handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_access_token: Optional[Dict[str, Any]], req_res_info: Optional[ReqResInfo], access_token_updated: bool, tenant_id: str, ): self.recipe_implementation = recipe_implementation self.config = config self.access_token = access_token self.front_token = front_token self.refresh_token = refresh_token self.anti_csrf_token = anti_csrf_token self.session_handle = session_handle self.user_id = user_id self.user_data_in_access_token = user_data_in_access_token self.req_res_info: Optional[ReqResInfo] = req_res_info self.access_token_updated = access_token_updated self.tenant_id = tenant_id self.recipe_user_id = recipe_user_id self.response_mutators: List[ResponseMutator] = [] @abstractmethod async def revoke_session( self, user_context: Optional[Dict[str, Any]] = None ) -> None: pass @abstractmethod async def get_session_data_from_database( self, user_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: pass @abstractmethod async def update_session_data_in_database( self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod async def attach_to_request_response( self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any], ): pass @abstractmethod async def merge_into_access_token_payload( self, access_token_payload_update: JSONObject, user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod def get_user_id(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod def get_recipe_user_id( self, user_context: Optional[Dict[str, Any]] = None ) -> RecipeUserId: pass @abstractmethod def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod def get_access_token_payload( self, user_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: pass @abstractmethod def get_handle(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod def get_all_session_tokens_dangerously(self) -> GetSessionTokensDangerouslyDict: pass @abstractmethod def get_access_token(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod async def get_time_created( self, user_context: Optional[Dict[str, Any]] = None ) -> int: pass @abstractmethod async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int: pass @abstractmethod async def assert_claims( self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod async def fetch_and_set_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None ) -> None: pass @abstractmethod async def set_claim_value( self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod async def get_claim_value( self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None ) -> Union[_T, None]: pass @abstractmethod async def remove_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: pass def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int: return sync(self.get_expiry(user_context)) def sync_revoke_session( self, user_context: Optional[Dict[str, Any]] = None ) -> None: return sync(self.revoke_session(user_context=user_context)) def sync_get_session_data_from_database( self, user_context: Union[Dict[str, Any], None] = None ) -> Dict[str, Any]: return sync(self.get_session_data_from_database(user_context)) def sync_get_time_created( self, user_context: Optional[Dict[str, Any]] = None ) -> int: return sync(self.get_time_created(user_context)) def sync_merge_into_access_token_payload( self, access_token_payload_update: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync( self.merge_into_access_token_payload( access_token_payload_update, user_context ) ) def sync_update_session_data_in_database( self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync( self.update_session_data_in_database(new_session_data, user_context) ) # Session claims sync functions: def sync_assert_claims( self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync(self.assert_claims(claim_validators, user_context)) def sync_fetch_and_set_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None ) -> None: return sync(self.fetch_and_set_claim(claim, user_context)) def sync_set_claim_value( self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync(self.set_claim_value(claim, value, user_context)) def sync_get_claim_value( self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None ) -> Union[_T, None]: return sync(self.get_claim_value(claim, user_context)) def sync_remove_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None ) -> None: return sync(self.remove_claim(claim, user_context)) def sync_attach_to_request_response( self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any], ) -> None: return sync( self.attach_to_request_response(request, transfer_method, user_context) ) # This is there so that we can do session["..."] to access some of the members of this class def __getitem__(self, item: str): return getattr(self, item)Ancestors
- abc.ABC
Subclasses
Methods
async def assert_claims(self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None) ‑> Noneasync def attach_to_request_response(self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any])async def fetch_and_set_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef get_access_token(self, user_context: Optional[Dict[str, Any]] = None) ‑> strdef get_access_token_payload(self, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]def get_all_session_tokens_dangerously(self) ‑> GetSessionTokensDangerouslyDictasync def get_claim_value(self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None) ‑> Optional[~_T]async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) ‑> intdef get_handle(self, user_context: Optional[Dict[str, Any]] = None) ‑> strdef get_recipe_user_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> RecipeUserIdasync def get_session_data_from_database(self, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> strasync def get_time_created(self, user_context: Optional[Dict[str, Any]] = None) ‑> intdef get_user_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> strasync def merge_into_access_token_payload(self, access_token_payload_update: JSONObject, user_context: Optional[Dict[str, Any]] = None) ‑> Noneasync def remove_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> Noneasync def revoke_session(self, user_context: Optional[Dict[str, Any]] = None) ‑> Noneasync def set_claim_value(self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_assert_claims(self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_attach_to_request_response(self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any])def sync_fetch_and_set_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_get_claim_value(self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None) ‑> Optional[~_T]def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) ‑> intdef sync_get_session_data_from_database(self, user_context: Union[Dict[str, Any], None] = None) ‑> Dict[str, Any]def sync_get_time_created(self, user_context: Optional[Dict[str, Any]] = None) ‑> intdef sync_merge_into_access_token_payload(self, access_token_payload_update: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_remove_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_revoke_session(self, user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_set_claim_value(self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None) ‑> Nonedef sync_update_session_data_in_database(self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> Noneasync def update_session_data_in_database(self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
class SessionRecipe (recipe_id: str, app_info: AppInfo, config: SessionConfig)-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SessionRecipe(RecipeModule): recipe_id = "session" __instance = None def __init__( self, recipe_id: str, app_info: AppInfo, config: SessionConfig, ): super().__init__(recipe_id, app_info) self.config = validate_and_normalise_user_input( app_info=app_info, config=config, ) log_debug_message( "session init: anti_csrf: %s", self.config.anti_csrf_function_or_string ) if self.config.cookie_domain is not None: log_debug_message( "session init: cookie_domain: %s", self.config.cookie_domain ) else: log_debug_message("session init: cookie_domain: None") # we check the input cookie_same_site because the normalised version is # always a function. if config.cookie_same_site is not None: log_debug_message( "session init: cookie_same_site: %s", config.cookie_same_site ) else: log_debug_message("session init: cookie_same_site: function") log_debug_message( "session init: cookie_secure: %s", str(self.config.cookie_secure) ) log_debug_message( "session init: refresh_token_path: %s ", self.config.refresh_token_path.get_as_string_dangerous(), ) log_debug_message( "session init: session_expired_status_code: %s", str(self.config.session_expired_status_code), ) recipe_implementation = RecipeImplementation( Querier.get_instance(recipe_id), self.config, self.app_info ) self.recipe_implementation: RecipeInterface = self.config.override.functions( recipe_implementation ) from .api.implementation import APIImplementation api_implementation = APIImplementation() self.api_implementation: APIInterface = self.config.override.apis( api_implementation ) self.claims_added_by_other_recipes: List[SessionClaim[Any]] = [] self.claim_validators_added_by_other_recipes: List[SessionClaimValidator] = [] def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool: return isinstance(err, SuperTokensError) and ( isinstance(err, SuperTokensSessionError) ) def get_apis_handled(self) -> List[APIHandled]: apis_handled = [ APIHandled( NormalisedURLPath(SESSION_REFRESH), "post", SESSION_REFRESH, self.api_implementation.disable_refresh_post, ), APIHandled( NormalisedURLPath(SIGNOUT), "post", SIGNOUT, self.api_implementation.disable_signout_post, ), ] return apis_handled async def handle_api_request( self, request_id: str, tenant_id: str, request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any], ) -> Union[BaseResponse, None]: if request_id == SESSION_REFRESH: return await handle_refresh_api( self.api_implementation, APIOptions( request, response, self.recipe_id, self.config, self.recipe_implementation, ), user_context, ) if request_id == SIGNOUT: return await handle_signout_api( self.api_implementation, APIOptions( request, response, self.recipe_id, self.config, self.recipe_implementation, ), user_context, ) raise Exception("should never happen") async def handle_error( self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any], ) -> BaseResponse: if isinstance(err, SuperTokensSessionError): for mutator in err.response_mutators: mutator(response, user_context) if isinstance(err, UnauthorisedError): log_debug_message("errorHandler: returning UNAUTHORISED") if err.clear_tokens: log_debug_message("Clearing tokens because of UNAUTHORISED response") clear_session_from_all_token_transfer_methods( response, self, request, user_context ) return await self.config.error_handlers.on_unauthorised( request, str(err), response ) if isinstance(err, TokenTheftError): log_debug_message("errorHandler: returning TOKEN_THEFT_DETECTED") log_debug_message( "Clearing tokens because of TOKEN_THEFT_DETECTED response" ) clear_session_from_all_token_transfer_methods( response, self, request, user_context ) return await self.config.error_handlers.on_token_theft_detected( request, err.session_handle, err.user_id, err.recipe_user_id, response ) if isinstance(err, InvalidClaimsError): log_debug_message("errorHandler: returning INVALID_CLAIMS") return await self.config.error_handlers.on_invalid_claim( self, request, err.payload, response ) if isinstance(err, ClearDuplicateSessionCookiesError): log_debug_message("errorHandler: returning CLEAR_DUPLICATE_SESSION_COOKIES") return await self.config.error_handlers.on_clear_duplicate_session_cookies( request, str(err), response ) log_debug_message("errorHandler: returning TRY_REFRESH_TOKEN") return await self.config.error_handlers.on_try_refresh_token( request, str(err), response ) def get_all_cors_headers(self) -> List[str]: cors_headers = get_cors_allowed_headers() return cors_headers @staticmethod def init( cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: Union[Literal["lax", "none", "strict"], None] = None, session_expired_status_code: Union[int, None] = None, anti_csrf: Union[ Literal["VIA_TOKEN", "VIA_CUSTOM_HEADER", "NONE"], None ] = None, get_token_transfer_method: Union[ Callable[ [BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal["any"]], ], None, ] = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[SessionOverrideConfig, None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None, ): from supertokens_python.plugins import OverrideMap, apply_plugins config = SessionConfig( cookie_domain=cookie_domain, older_cookie_domain=older_cookie_domain, cookie_secure=cookie_secure, cookie_same_site=cookie_same_site, session_expired_status_code=session_expired_status_code, anti_csrf=anti_csrf, get_token_transfer_method=get_token_transfer_method, error_handlers=error_handlers, override=override, invalid_claim_status_code=invalid_claim_status_code, use_dynamic_access_token_signing_key=use_dynamic_access_token_signing_key, expose_access_token_to_frontend_in_cookie_based_auth=expose_access_token_to_frontend_in_cookie_based_auth, jwks_refresh_interval_sec=jwks_refresh_interval_sec, ) def func(app_info: AppInfo, plugins: List[OverrideMap]): if SessionRecipe.__instance is None: SessionRecipe.__instance = SessionRecipe( recipe_id=SessionRecipe.recipe_id, app_info=app_info, config=apply_plugins( recipe_id=SessionRecipe.recipe_id, config=config, plugins=plugins, ), ) return SessionRecipe.__instance raise_general_exception( "Session recipe has already been initialised. Please check your code for bugs." ) return func @staticmethod def get_instance() -> SessionRecipe: if SessionRecipe.__instance is not None: return SessionRecipe.__instance raise_general_exception( "Initialisation not done. Did you forget to call the SuperTokens.init function?" ) @staticmethod def reset(): if ("SUPERTOKENS_ENV" not in environ) or ( environ["SUPERTOKENS_ENV"] != "testing" ): raise_general_exception("calling testing function in non testing env") SessionRecipe.__instance = None def add_claim_from_other_recipe(self, claim: SessionClaim[Any]): # We are throwing here (and not in addClaimValidatorFromOtherRecipe) because if multiple # claims are added with the same key they will overwrite each other. Validators will all run # and work as expected even if they are added multiple times. if claim.key in [c.key for c in self.claims_added_by_other_recipes]: raise Exception("Claim added by multiple recipes") self.claims_added_by_other_recipes.append(claim) def get_claims_added_by_other_recipes(self) -> List[SessionClaim[Any]]: return self.claims_added_by_other_recipes def add_claim_validator_from_other_recipe( self, claim_validator: SessionClaimValidator ): self.claim_validators_added_by_other_recipes.append(claim_validator) def get_claim_validators_added_by_other_recipes( self, ) -> List[SessionClaimValidator]: return self.claim_validators_added_by_other_recipes async def verify_session( self, request: BaseRequest, anti_csrf_check: Union[bool, None], session_required: bool, check_database: bool, override_global_claim_validators: Optional[ Callable[ [List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]], ] ], user_context: Dict[str, Any], ): _ = user_context return await self.api_implementation.verify_session( APIOptions( request, None, self.recipe_id, self.config, self.recipe_implementation, ), anti_csrf_check, session_required, check_database, override_global_claim_validators, user_context, )Ancestors
- RecipeModule
- abc.ABC
Class variables
var recipe_id-
The type of the None singleton.
Static methods
def get_instance() ‑> SessionRecipedef init(cookie_domain: Union[str, None] = None, older_cookie_domain: Union[str, None] = None, cookie_secure: Union[bool, None] = None, cookie_same_site: "Union[Literal['lax', 'none', 'strict'], None]" = None, session_expired_status_code: Union[int, None] = None, anti_csrf: "Union[Literal['VIA_TOKEN', 'VIA_CUSTOM_HEADER', 'NONE'], None]" = None, get_token_transfer_method: "Union[Callable[[BaseRequest, bool, Dict[str, Any]], Union[TokenTransferMethod, Literal['any']]], None]" = None, error_handlers: Union[InputErrorHandlers, None] = None, override: Union[BaseOverrideConfig[RecipeInterface, APIInterface], None] = None, invalid_claim_status_code: Union[int, None] = None, use_dynamic_access_token_signing_key: Union[bool, None] = None, expose_access_token_to_frontend_in_cookie_based_auth: Union[bool, None] = None, jwks_refresh_interval_sec: Union[int, None] = None)def reset()
Methods
def add_claim_from_other_recipe(self, claim: SessionClaim[Any])def add_claim_validator_from_other_recipe(self, claim_validator: SessionClaimValidator)def get_all_cors_headers(self) ‑> List[str]def get_apis_handled(self) ‑> List[APIHandled]def get_claim_validators_added_by_other_recipes(self) ‑> List[SessionClaimValidator]def get_claims_added_by_other_recipes(self) ‑> List[SessionClaim[Any]]async def handle_api_request(self, request_id: str, tenant_id: str, request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any])async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any])def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> boolasync def verify_session(self, request: BaseRequest, anti_csrf_check: Union[bool, None], session_required: bool, check_database: bool, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]], user_context: Dict[str, Any])
Inherited members