Module supertokens_python.supertokens
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from os import environ
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Union, Tuple
from typing_extensions import Literal
from supertokens_python.logger import (
get_maybe_none_as_str,
log_debug_message,
enable_debug_logging,
)
from .constants import FDI_KEY_HEADER, RID_KEY_HEADER, USER_COUNT
from .exceptions import SuperTokensError
from .interfaces import (
CreateUserIdMappingOkResult,
DeleteUserIdMappingOkResult,
GetUserIdMappingOkResult,
UnknownMappingError,
UnknownSupertokensUserIDError,
UpdateOrDeleteUserIdMappingInfoOkResult,
UserIdMappingAlreadyExistsError,
UserIDTypes,
)
from .normalised_url_domain import NormalisedURLDomain
from .normalised_url_path import NormalisedURLPath
from .post_init_callbacks import PostSTInitCallbacks
from .querier import Querier
from .utils import (
get_rid_from_header,
get_top_level_domain_for_same_site_resolution,
is_version_gte,
normalise_http_method,
send_non_200_response_with_message,
)
if TYPE_CHECKING:
from .recipe_module import RecipeModule
from supertokens_python.framework.request import BaseRequest
from supertokens_python.framework.response import BaseResponse
from supertokens_python.recipe.session import SessionContainer
import json
from .exceptions import BadInputError, GeneralError, raise_general_exception
class SupertokensConfig:
def __init__(
self,
connection_uri: str,
api_key: Union[str, None] = None,
network_interceptor: Optional[
Callable[
[
str,
str,
Dict[str, Any],
Optional[Dict[str, Any]],
Optional[Dict[str, Any]],
Optional[Dict[str, Any]],
],
Tuple[
str,
str,
Dict[str, Any],
Optional[Dict[str, Any]],
Optional[Dict[str, Any]],
],
]
] = None,
disable_core_call_cache: bool = False,
): # We keep this = None here because this is directly used by the user.
self.connection_uri = connection_uri
self.api_key = api_key
self.network_interceptor = network_interceptor
self.disable_core_call_cache = disable_core_call_cache
class Host:
def __init__(self, domain: NormalisedURLDomain, base_path: NormalisedURLPath):
self.domain = domain
self.base_path = base_path
class InputAppInfo:
def __init__(
self,
app_name: str,
api_domain: str,
api_gateway_path: str = "",
api_base_path: str = "/auth",
website_base_path: str = "/auth",
website_domain: Optional[str] = None,
origin: Optional[
Union[str, Callable[[Optional[BaseRequest], Dict[str, Any]], str]]
] = None,
):
self.app_name = app_name
self.api_gateway_path = api_gateway_path
self.api_domain = api_domain
self.website_domain = website_domain
self.origin = origin
self.api_base_path = api_base_path
self.website_base_path = website_base_path
class AppInfo:
def __init__(
self,
app_name: str,
api_domain: str,
website_domain: Optional[str],
framework: Literal["fastapi", "flask", "django"],
api_gateway_path: str,
api_base_path: str,
website_base_path: str,
mode: Union[Literal["asgi", "wsgi"], None],
origin: Optional[
Union[str, Callable[[Optional[BaseRequest], Dict[str, Any]], str]]
],
):
self.app_name = app_name
self.api_gateway_path = NormalisedURLPath(api_gateway_path)
self.api_domain = NormalisedURLDomain(api_domain)
self.top_level_api_domain = get_top_level_domain_for_same_site_resolution(
self.api_domain.get_as_string_dangerous()
)
if website_domain is None and origin is None:
raise_general_exception(
"Please provide at least one of website_domain or origin"
)
self.__origin = origin
self.__website_domain = website_domain
self.api_base_path = self.api_gateway_path.append(
NormalisedURLPath(api_base_path)
)
self.website_base_path = NormalisedURLPath(website_base_path)
if mode is not None:
self.mode = mode
elif framework == "fastapi":
mode = "asgi"
else:
mode = "wsgi"
self.framework = framework
self.mode = mode
def get_top_level_website_domain(
self, request: Optional[BaseRequest], user_context: Dict[str, Any]
) -> str:
return get_top_level_domain_for_same_site_resolution(
self.get_origin(request, user_context).get_as_string_dangerous()
)
def get_origin(self, request: Optional[BaseRequest], user_context: Dict[str, Any]):
origin = self.__origin
if origin is None:
origin = self.__website_domain
# This should not be possible because we check for either origin or websiteDomain above
if origin is None:
raise_general_exception("should never come here")
if callable(origin):
origin = origin(request, user_context)
return NormalisedURLDomain(origin)
def toJSON(self):
def defaultImpl(o: Any):
if isinstance(o, (NormalisedURLDomain, NormalisedURLPath)):
return o.get_as_string_dangerous()
return o.__dict__
return json.dumps(self, default=defaultImpl, sort_keys=True, indent=4)
def manage_session_post_response(
session: SessionContainer, response: BaseResponse, user_context: Dict[str, Any]
):
# Something similar happens in handle_error of session/recipe.py
for mutator in session.response_mutators:
mutator(response, user_context)
class Supertokens:
__instance = None
def __init__(
self,
app_info: InputAppInfo,
framework: Literal["fastapi", "flask", "django"],
supertokens_config: SupertokensConfig,
recipe_list: List[Callable[[AppInfo], RecipeModule]],
mode: Optional[Literal["asgi", "wsgi"]],
telemetry: Optional[bool],
debug: Optional[bool],
):
if not isinstance(app_info, InputAppInfo): # type: ignore
raise ValueError("app_info must be an instance of InputAppInfo")
self.app_info = AppInfo(
app_info.app_name,
app_info.api_domain,
app_info.website_domain,
framework,
app_info.api_gateway_path,
app_info.api_base_path,
app_info.website_base_path,
mode,
app_info.origin,
)
self.supertokens_config = supertokens_config
if debug is True:
enable_debug_logging()
self._telemetry_status: str = "NONE"
log_debug_message(
"Started SuperTokens with debug logging (supertokens.init called)"
)
log_debug_message("app_info: %s", self.app_info.toJSON())
log_debug_message("framework: %s", framework)
hosts = list(
map(
lambda h: Host(
NormalisedURLDomain(h.strip()), NormalisedURLPath(h.strip())
),
filter(lambda x: x != "", supertokens_config.connection_uri.split(";")),
)
)
Querier.init(
hosts,
supertokens_config.api_key,
supertokens_config.network_interceptor,
supertokens_config.disable_core_call_cache,
)
if len(recipe_list) == 0:
raise_general_exception(
"Please provide at least one recipe to the supertokens.init function call"
)
multitenancy_found = False
totp_found = False
user_metadata_found = False
multi_factor_auth_found = False
def make_recipe(recipe: Callable[[AppInfo], RecipeModule]) -> RecipeModule:
nonlocal multitenancy_found, totp_found, user_metadata_found, multi_factor_auth_found
recipe_module = recipe(self.app_info)
if recipe_module.get_recipe_id() == "multitenancy":
multitenancy_found = True
elif recipe_module.get_recipe_id() == "usermetadata":
user_metadata_found = True
elif recipe_module.get_recipe_id() == "multifactorauth":
multi_factor_auth_found = True
elif recipe_module.get_recipe_id() == "totp":
totp_found = True
return recipe_module
self.recipe_modules: List[RecipeModule] = list(map(make_recipe, recipe_list))
if not multitenancy_found:
from supertokens_python.recipe.multitenancy.recipe import MultitenancyRecipe
self.recipe_modules.append(MultitenancyRecipe.init()(self.app_info))
if totp_found and not multi_factor_auth_found:
raise Exception("Please initialize the MultiFactorAuth recipe to use TOTP.")
if not user_metadata_found:
from supertokens_python.recipe.usermetadata.recipe import UserMetadataRecipe
self.recipe_modules.append(UserMetadataRecipe.init()(self.app_info))
self.telemetry = (
telemetry
if telemetry is not None
else (environ.get("TEST_MODE") != "testing")
)
@staticmethod
def init(
app_info: InputAppInfo,
framework: Literal["fastapi", "flask", "django"],
supertokens_config: SupertokensConfig,
recipe_list: List[Callable[[AppInfo], RecipeModule]],
mode: Optional[Literal["asgi", "wsgi"]],
telemetry: Optional[bool],
debug: Optional[bool],
):
if Supertokens.__instance is None:
Supertokens.__instance = Supertokens(
app_info,
framework,
supertokens_config,
recipe_list,
mode,
telemetry,
debug,
)
PostSTInitCallbacks.run_post_init_callbacks()
@staticmethod
def reset():
if ("SUPERTOKENS_ENV" not in environ) or (
environ["SUPERTOKENS_ENV"] != "testing"
):
raise_general_exception("calling testing function in non testing env")
from supertokens_python.recipe.usermetadata.recipe import UserMetadataRecipe
UserMetadataRecipe.reset()
Querier.reset()
Supertokens.__instance = None
@staticmethod
def get_instance() -> Supertokens:
if Supertokens.__instance is not None:
return Supertokens.__instance
raise_general_exception(
"Initialisation not done. Did you forget to call the SuperTokens.init function?"
)
def get_all_cors_headers(self) -> List[str]:
headers_set: Set[str] = set()
headers_set.add(RID_KEY_HEADER)
headers_set.add(FDI_KEY_HEADER)
for recipe in self.recipe_modules:
headers = recipe.get_all_cors_headers()
for header in headers:
headers_set.add(header)
return list(headers_set)
async def get_user_count(
self,
include_recipe_ids: Union[None, List[str]],
tenant_id: Optional[str] = None,
user_context: Optional[Dict[str, Any]] = None,
) -> int:
querier = Querier.get_instance(None)
include_recipe_ids_str = None
if include_recipe_ids is not None:
include_recipe_ids_str = ",".join(include_recipe_ids)
response = await querier.send_get_request(
NormalisedURLPath(f"/{tenant_id or 'public'}{USER_COUNT}"),
{
"includeRecipeIds": include_recipe_ids_str,
"includeAllTenants": tenant_id is None,
},
user_context=user_context,
)
return int(response["count"])
async def create_user_id_mapping(
self,
supertokens_user_id: str,
external_user_id: str,
external_user_id_info: Optional[str],
force: Optional[bool],
user_context: Optional[Dict[str, Any]],
) -> Union[
CreateUserIdMappingOkResult,
UnknownSupertokensUserIDError,
UserIdMappingAlreadyExistsError,
]:
querier = Querier.get_instance(None)
cdi_version = await querier.get_api_version(user_context)
if is_version_gte(cdi_version, "2.15"):
body: Dict[str, Any] = {
"superTokensUserId": supertokens_user_id,
"externalUserId": external_user_id,
"externalUserIdInfo": external_user_id_info,
}
if force:
body["force"] = force
res = await querier.send_post_request(
NormalisedURLPath("/recipe/userid/map"), body, user_context=user_context
)
if res["status"] == "OK":
return CreateUserIdMappingOkResult()
if res["status"] == "UNKNOWN_SUPERTOKENS_USER_ID_ERROR":
return UnknownSupertokensUserIDError()
if res["status"] == "USER_ID_MAPPING_ALREADY_EXISTS_ERROR":
return UserIdMappingAlreadyExistsError(
does_super_tokens_user_id_exist=res["doesSuperTokensUserIdExist"],
does_external_user_id_exist=res["does_external_user_id_exist"],
)
raise_general_exception("Unknown response")
raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0")
async def get_user_id_mapping(
self,
user_id: str,
user_id_type: Optional[UserIDTypes],
user_context: Optional[Dict[str, Any]],
) -> Union[GetUserIdMappingOkResult, UnknownMappingError]:
querier = Querier.get_instance(None)
cdi_version = await querier.get_api_version(user_context)
if is_version_gte(cdi_version, "2.15"):
body = {
"userId": user_id,
}
if user_id_type:
body["userIdType"] = user_id_type
res = await querier.send_get_request(
NormalisedURLPath("/recipe/userid/map"),
body,
user_context=user_context,
)
if res["status"] == "OK":
return GetUserIdMappingOkResult(
supertokens_user_id=res["superTokensUserId"],
external_user_id=res["externalUserId"],
external_user_info=res.get("externalUserIdInfo"),
)
if res["status"] == "UNKNOWN_MAPPING_ERROR":
return UnknownMappingError()
raise_general_exception("Unknown response")
raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0")
async def delete_user_id_mapping(
self,
user_id: str,
user_id_type: Optional[UserIDTypes],
force: Optional[bool],
user_context: Optional[Dict[str, Any]],
) -> DeleteUserIdMappingOkResult:
querier = Querier.get_instance(None)
cdi_version = await querier.get_api_version(user_context)
if is_version_gte(cdi_version, "2.15"):
body: Dict[str, Any] = {
"userId": user_id,
"userIdType": user_id_type,
}
if force:
body["force"] = force
res = await querier.send_post_request(
NormalisedURLPath("/recipe/userid/map/remove"),
body,
user_context=user_context,
)
if res["status"] == "OK":
return DeleteUserIdMappingOkResult(
did_mapping_exist=res["didMappingExist"]
)
raise_general_exception("Unknown response")
raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0")
async def update_or_delete_user_id_mapping_info(
self,
user_id: str,
user_id_type: Optional[UserIDTypes],
external_user_id_info: Optional[str],
user_context: Optional[Dict[str, Any]],
) -> Union[UpdateOrDeleteUserIdMappingInfoOkResult, UnknownMappingError]:
querier = Querier.get_instance(None)
cdi_version = await querier.get_api_version(user_context)
if is_version_gte(cdi_version, "2.15"):
res = await querier.send_post_request(
NormalisedURLPath("/recipe/userid/external-user-id-info"),
{
"userId": user_id,
"userIdType": user_id_type,
"externalUserIdInfo": external_user_id_info,
},
user_context=user_context,
)
if res["status"] == "OK":
return UpdateOrDeleteUserIdMappingInfoOkResult()
if res["status"] == "UNKNOWN_MAPPING_ERROR":
return UnknownMappingError()
raise_general_exception("Unknown response")
raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0")
async def middleware(
self, request: BaseRequest, response: BaseResponse, user_context: Dict[str, Any]
) -> Union[BaseResponse, None]:
log_debug_message("middleware: Started")
path = Supertokens.get_instance().app_info.api_gateway_path.append(
NormalisedURLPath(request.get_path())
)
method = normalise_http_method(request.method())
if not path.startswith(Supertokens.get_instance().app_info.api_base_path):
log_debug_message(
"middleware: Not handling because request path did not start with api base path. Request path: %s",
path.get_as_string_dangerous(),
)
return None
request_rid = get_rid_from_header(request)
log_debug_message(
"middleware: requestRID is: %s", get_maybe_none_as_str(request_rid)
)
if request_rid is not None and request_rid == "anti-csrf":
# see https://github.com/supertokens/supertokens-python/issues/54
request_rid = None
async def handle_without_rid():
for recipe in self.recipe_modules:
log_debug_message(
"middleware: Checking recipe ID for match: %s with path: %s and method: %s",
recipe.get_recipe_id(),
path.get_as_string_dangerous(),
method,
)
api_and_tenant_id = await recipe.return_api_id_if_can_handle_request(
path, method, user_context
)
if api_and_tenant_id is not None:
log_debug_message(
"middleware: Request being handled by recipe. ID is: %s",
api_and_tenant_id.api_id,
)
api_resp = await recipe.handle_api_request(
api_and_tenant_id.api_id,
api_and_tenant_id.tenant_id,
request,
path,
method,
response,
user_context,
)
if api_resp is None:
log_debug_message(
"middleware: Not handled because API returned None"
)
return None
log_debug_message("middleware: Ended")
return api_resp
log_debug_message("middleware: Not handling because no recipe matched")
return None
if request_rid is not None:
matched_recipes = [
recipe
for recipe in self.recipe_modules
if recipe.get_recipe_id() == request_rid
or (
request_rid == "thirdpartyemailpassword"
and recipe.get_recipe_id() in ["thirdparty", "emailpassword"]
)
or (
request_rid == "thirdpartypasswordless"
and recipe.get_recipe_id() in ["thirdparty", "passwordless"]
)
]
if len(matched_recipes) == 0:
log_debug_message(
"middleware: Not handling based on rid match. Trying without rid."
)
return await handle_without_rid()
for recipe in matched_recipes:
log_debug_message(
"middleware: Matched with recipe Ids: %s", recipe.get_recipe_id()
)
id_result = None
final_matched_recipe = None
for recipe in matched_recipes:
current_id_result = await recipe.return_api_id_if_can_handle_request(
path, method, user_context
)
if current_id_result is not None:
if id_result is not None:
raise ValueError(
"Two recipes have matched the same API path and method! This is a bug in the SDK. Please contact support."
)
final_matched_recipe = recipe
id_result = current_id_result
if id_result is None or final_matched_recipe is None:
return await handle_without_rid()
log_debug_message(
"middleware: Request being handled by recipe. ID is: %s",
id_result.api_id,
)
request_handled = await final_matched_recipe.handle_api_request(
id_result.api_id,
id_result.tenant_id,
request,
path,
method,
response,
user_context,
)
if request_handled is None:
log_debug_message(
"middleware: Not handled because API returned request_handled as None"
)
return None
log_debug_message("middleware: Ended")
return request_handled
return await handle_without_rid()
async def handle_supertokens_error(
self,
request: BaseRequest,
err: Exception,
response: BaseResponse,
user_context: Dict[str, Any],
) -> Optional[BaseResponse]:
log_debug_message("errorHandler: Started")
log_debug_message(
"errorHandler: Error is from SuperTokens recipe. Message: %s", str(err)
)
if isinstance(err, GeneralError):
raise err
if isinstance(err, BadInputError):
log_debug_message("errorHandler: Sending 400 status code response")
return send_non_200_response_with_message(str(err), 400, response)
for recipe in self.recipe_modules:
log_debug_message(
"errorHandler: Checking recipe for match: %s", recipe.get_recipe_id()
)
if recipe.is_error_from_this_recipe_based_on_instance(err) and isinstance(
err, SuperTokensError
):
log_debug_message(
"errorHandler: Matched with recipeID: %s", recipe.get_recipe_id()
)
return await recipe.handle_error(request, err, response, user_context)
raise err
def get_request_from_user_context(
self,
user_context: Optional[Dict[str, Any]] = None,
) -> Optional[BaseRequest]:
if user_context is None:
return None
if "_default" not in user_context:
return None
if not isinstance(user_context["_default"], dict):
return None
return user_context.get("_default", {}).get("request")
def get_request_from_user_context(
user_context: Optional[Dict[str, Any]],
) -> Optional[BaseRequest]:
return Supertokens.get_instance().get_request_from_user_context(user_context)
Functions
def get_request_from_user_context(user_context: Optional[Dict[str, Any]])
def manage_session_post_response(session: SessionContainer, response: BaseResponse, user_context: Dict[str, Any])
Classes
class AppInfo (app_name: str, api_domain: str, website_domain: Optional[str], framework: "Literal[('fastapi', 'flask', 'django')]", api_gateway_path: str, api_base_path: str, website_base_path: str, mode: "Union[Literal[('asgi', 'wsgi')], None]", origin: Optional[Union[str, Callable[[Optional[BaseRequest], Dict[str, Any]], str]]])
-
Expand source code
class AppInfo: def __init__( self, app_name: str, api_domain: str, website_domain: Optional[str], framework: Literal["fastapi", "flask", "django"], api_gateway_path: str, api_base_path: str, website_base_path: str, mode: Union[Literal["asgi", "wsgi"], None], origin: Optional[ Union[str, Callable[[Optional[BaseRequest], Dict[str, Any]], str]] ], ): self.app_name = app_name self.api_gateway_path = NormalisedURLPath(api_gateway_path) self.api_domain = NormalisedURLDomain(api_domain) self.top_level_api_domain = get_top_level_domain_for_same_site_resolution( self.api_domain.get_as_string_dangerous() ) if website_domain is None and origin is None: raise_general_exception( "Please provide at least one of website_domain or origin" ) self.__origin = origin self.__website_domain = website_domain self.api_base_path = self.api_gateway_path.append( NormalisedURLPath(api_base_path) ) self.website_base_path = NormalisedURLPath(website_base_path) if mode is not None: self.mode = mode elif framework == "fastapi": mode = "asgi" else: mode = "wsgi" self.framework = framework self.mode = mode def get_top_level_website_domain( self, request: Optional[BaseRequest], user_context: Dict[str, Any] ) -> str: return get_top_level_domain_for_same_site_resolution( self.get_origin(request, user_context).get_as_string_dangerous() ) def get_origin(self, request: Optional[BaseRequest], user_context: Dict[str, Any]): origin = self.__origin if origin is None: origin = self.__website_domain # This should not be possible because we check for either origin or websiteDomain above if origin is None: raise_general_exception("should never come here") if callable(origin): origin = origin(request, user_context) return NormalisedURLDomain(origin) def toJSON(self): def defaultImpl(o: Any): if isinstance(o, (NormalisedURLDomain, NormalisedURLPath)): return o.get_as_string_dangerous() return o.__dict__ return json.dumps(self, default=defaultImpl, sort_keys=True, indent=4)
Methods
def get_origin(self, request: Optional[BaseRequest], user_context: Dict[str, Any])
def get_top_level_website_domain(self, request: Optional[BaseRequest], user_context: Dict[str, Any])
def toJSON(self)
class Host (domain: NormalisedURLDomain, base_path: NormalisedURLPath)
-
Expand source code
class Host: def __init__(self, domain: NormalisedURLDomain, base_path: NormalisedURLPath): self.domain = domain self.base_path = base_path
class InputAppInfo (app_name: str, api_domain: str, api_gateway_path: str = '', api_base_path: str = '/auth', website_base_path: str = '/auth', website_domain: Optional[str] = None, origin: Optional[Union[str, Callable[[Optional[BaseRequest], Dict[str, Any]], str]]] = None)
-
Expand source code
class InputAppInfo: def __init__( self, app_name: str, api_domain: str, api_gateway_path: str = "", api_base_path: str = "/auth", website_base_path: str = "/auth", website_domain: Optional[str] = None, origin: Optional[ Union[str, Callable[[Optional[BaseRequest], Dict[str, Any]], str]] ] = None, ): self.app_name = app_name self.api_gateway_path = api_gateway_path self.api_domain = api_domain self.website_domain = website_domain self.origin = origin self.api_base_path = api_base_path self.website_base_path = website_base_path
class Supertokens (app_info: InputAppInfo, framework: "Literal[('fastapi', 'flask', 'django')]", supertokens_config: SupertokensConfig, recipe_list: List[Callable[[AppInfo], RecipeModule]], mode: "Optional[Literal[('asgi', 'wsgi')]]", telemetry: Optional[bool], debug: Optional[bool])
-
Expand source code
class Supertokens: __instance = None def __init__( self, app_info: InputAppInfo, framework: Literal["fastapi", "flask", "django"], supertokens_config: SupertokensConfig, recipe_list: List[Callable[[AppInfo], RecipeModule]], mode: Optional[Literal["asgi", "wsgi"]], telemetry: Optional[bool], debug: Optional[bool], ): if not isinstance(app_info, InputAppInfo): # type: ignore raise ValueError("app_info must be an instance of InputAppInfo") self.app_info = AppInfo( app_info.app_name, app_info.api_domain, app_info.website_domain, framework, app_info.api_gateway_path, app_info.api_base_path, app_info.website_base_path, mode, app_info.origin, ) self.supertokens_config = supertokens_config if debug is True: enable_debug_logging() self._telemetry_status: str = "NONE" log_debug_message( "Started SuperTokens with debug logging (supertokens.init called)" ) log_debug_message("app_info: %s", self.app_info.toJSON()) log_debug_message("framework: %s", framework) hosts = list( map( lambda h: Host( NormalisedURLDomain(h.strip()), NormalisedURLPath(h.strip()) ), filter(lambda x: x != "", supertokens_config.connection_uri.split(";")), ) ) Querier.init( hosts, supertokens_config.api_key, supertokens_config.network_interceptor, supertokens_config.disable_core_call_cache, ) if len(recipe_list) == 0: raise_general_exception( "Please provide at least one recipe to the supertokens.init function call" ) multitenancy_found = False totp_found = False user_metadata_found = False multi_factor_auth_found = False def make_recipe(recipe: Callable[[AppInfo], RecipeModule]) -> RecipeModule: nonlocal multitenancy_found, totp_found, user_metadata_found, multi_factor_auth_found recipe_module = recipe(self.app_info) if recipe_module.get_recipe_id() == "multitenancy": multitenancy_found = True elif recipe_module.get_recipe_id() == "usermetadata": user_metadata_found = True elif recipe_module.get_recipe_id() == "multifactorauth": multi_factor_auth_found = True elif recipe_module.get_recipe_id() == "totp": totp_found = True return recipe_module self.recipe_modules: List[RecipeModule] = list(map(make_recipe, recipe_list)) if not multitenancy_found: from supertokens_python.recipe.multitenancy.recipe import MultitenancyRecipe self.recipe_modules.append(MultitenancyRecipe.init()(self.app_info)) if totp_found and not multi_factor_auth_found: raise Exception("Please initialize the MultiFactorAuth recipe to use TOTP.") if not user_metadata_found: from supertokens_python.recipe.usermetadata.recipe import UserMetadataRecipe self.recipe_modules.append(UserMetadataRecipe.init()(self.app_info)) self.telemetry = ( telemetry if telemetry is not None else (environ.get("TEST_MODE") != "testing") ) @staticmethod def init( app_info: InputAppInfo, framework: Literal["fastapi", "flask", "django"], supertokens_config: SupertokensConfig, recipe_list: List[Callable[[AppInfo], RecipeModule]], mode: Optional[Literal["asgi", "wsgi"]], telemetry: Optional[bool], debug: Optional[bool], ): if Supertokens.__instance is None: Supertokens.__instance = Supertokens( app_info, framework, supertokens_config, recipe_list, mode, telemetry, debug, ) PostSTInitCallbacks.run_post_init_callbacks() @staticmethod def reset(): if ("SUPERTOKENS_ENV" not in environ) or ( environ["SUPERTOKENS_ENV"] != "testing" ): raise_general_exception("calling testing function in non testing env") from supertokens_python.recipe.usermetadata.recipe import UserMetadataRecipe UserMetadataRecipe.reset() Querier.reset() Supertokens.__instance = None @staticmethod def get_instance() -> Supertokens: if Supertokens.__instance is not None: return Supertokens.__instance raise_general_exception( "Initialisation not done. Did you forget to call the SuperTokens.init function?" ) def get_all_cors_headers(self) -> List[str]: headers_set: Set[str] = set() headers_set.add(RID_KEY_HEADER) headers_set.add(FDI_KEY_HEADER) for recipe in self.recipe_modules: headers = recipe.get_all_cors_headers() for header in headers: headers_set.add(header) return list(headers_set) async def get_user_count( self, include_recipe_ids: Union[None, List[str]], tenant_id: Optional[str] = None, user_context: Optional[Dict[str, Any]] = None, ) -> int: querier = Querier.get_instance(None) include_recipe_ids_str = None if include_recipe_ids is not None: include_recipe_ids_str = ",".join(include_recipe_ids) response = await querier.send_get_request( NormalisedURLPath(f"/{tenant_id or 'public'}{USER_COUNT}"), { "includeRecipeIds": include_recipe_ids_str, "includeAllTenants": tenant_id is None, }, user_context=user_context, ) return int(response["count"]) async def create_user_id_mapping( self, supertokens_user_id: str, external_user_id: str, external_user_id_info: Optional[str], force: Optional[bool], user_context: Optional[Dict[str, Any]], ) -> Union[ CreateUserIdMappingOkResult, UnknownSupertokensUserIDError, UserIdMappingAlreadyExistsError, ]: querier = Querier.get_instance(None) cdi_version = await querier.get_api_version(user_context) if is_version_gte(cdi_version, "2.15"): body: Dict[str, Any] = { "superTokensUserId": supertokens_user_id, "externalUserId": external_user_id, "externalUserIdInfo": external_user_id_info, } if force: body["force"] = force res = await querier.send_post_request( NormalisedURLPath("/recipe/userid/map"), body, user_context=user_context ) if res["status"] == "OK": return CreateUserIdMappingOkResult() if res["status"] == "UNKNOWN_SUPERTOKENS_USER_ID_ERROR": return UnknownSupertokensUserIDError() if res["status"] == "USER_ID_MAPPING_ALREADY_EXISTS_ERROR": return UserIdMappingAlreadyExistsError( does_super_tokens_user_id_exist=res["doesSuperTokensUserIdExist"], does_external_user_id_exist=res["does_external_user_id_exist"], ) raise_general_exception("Unknown response") raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0") async def get_user_id_mapping( self, user_id: str, user_id_type: Optional[UserIDTypes], user_context: Optional[Dict[str, Any]], ) -> Union[GetUserIdMappingOkResult, UnknownMappingError]: querier = Querier.get_instance(None) cdi_version = await querier.get_api_version(user_context) if is_version_gte(cdi_version, "2.15"): body = { "userId": user_id, } if user_id_type: body["userIdType"] = user_id_type res = await querier.send_get_request( NormalisedURLPath("/recipe/userid/map"), body, user_context=user_context, ) if res["status"] == "OK": return GetUserIdMappingOkResult( supertokens_user_id=res["superTokensUserId"], external_user_id=res["externalUserId"], external_user_info=res.get("externalUserIdInfo"), ) if res["status"] == "UNKNOWN_MAPPING_ERROR": return UnknownMappingError() raise_general_exception("Unknown response") raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0") async def delete_user_id_mapping( self, user_id: str, user_id_type: Optional[UserIDTypes], force: Optional[bool], user_context: Optional[Dict[str, Any]], ) -> DeleteUserIdMappingOkResult: querier = Querier.get_instance(None) cdi_version = await querier.get_api_version(user_context) if is_version_gte(cdi_version, "2.15"): body: Dict[str, Any] = { "userId": user_id, "userIdType": user_id_type, } if force: body["force"] = force res = await querier.send_post_request( NormalisedURLPath("/recipe/userid/map/remove"), body, user_context=user_context, ) if res["status"] == "OK": return DeleteUserIdMappingOkResult( did_mapping_exist=res["didMappingExist"] ) raise_general_exception("Unknown response") raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0") async def update_or_delete_user_id_mapping_info( self, user_id: str, user_id_type: Optional[UserIDTypes], external_user_id_info: Optional[str], user_context: Optional[Dict[str, Any]], ) -> Union[UpdateOrDeleteUserIdMappingInfoOkResult, UnknownMappingError]: querier = Querier.get_instance(None) cdi_version = await querier.get_api_version(user_context) if is_version_gte(cdi_version, "2.15"): res = await querier.send_post_request( NormalisedURLPath("/recipe/userid/external-user-id-info"), { "userId": user_id, "userIdType": user_id_type, "externalUserIdInfo": external_user_id_info, }, user_context=user_context, ) if res["status"] == "OK": return UpdateOrDeleteUserIdMappingInfoOkResult() if res["status"] == "UNKNOWN_MAPPING_ERROR": return UnknownMappingError() raise_general_exception("Unknown response") raise_general_exception("Please upgrade the SuperTokens core to >= 3.15.0") async def middleware( self, request: BaseRequest, response: BaseResponse, user_context: Dict[str, Any] ) -> Union[BaseResponse, None]: log_debug_message("middleware: Started") path = Supertokens.get_instance().app_info.api_gateway_path.append( NormalisedURLPath(request.get_path()) ) method = normalise_http_method(request.method()) if not path.startswith(Supertokens.get_instance().app_info.api_base_path): log_debug_message( "middleware: Not handling because request path did not start with api base path. Request path: %s", path.get_as_string_dangerous(), ) return None request_rid = get_rid_from_header(request) log_debug_message( "middleware: requestRID is: %s", get_maybe_none_as_str(request_rid) ) if request_rid is not None and request_rid == "anti-csrf": # see https://github.com/supertokens/supertokens-python/issues/54 request_rid = None async def handle_without_rid(): for recipe in self.recipe_modules: log_debug_message( "middleware: Checking recipe ID for match: %s with path: %s and method: %s", recipe.get_recipe_id(), path.get_as_string_dangerous(), method, ) api_and_tenant_id = await recipe.return_api_id_if_can_handle_request( path, method, user_context ) if api_and_tenant_id is not None: log_debug_message( "middleware: Request being handled by recipe. ID is: %s", api_and_tenant_id.api_id, ) api_resp = await recipe.handle_api_request( api_and_tenant_id.api_id, api_and_tenant_id.tenant_id, request, path, method, response, user_context, ) if api_resp is None: log_debug_message( "middleware: Not handled because API returned None" ) return None log_debug_message("middleware: Ended") return api_resp log_debug_message("middleware: Not handling because no recipe matched") return None if request_rid is not None: matched_recipes = [ recipe for recipe in self.recipe_modules if recipe.get_recipe_id() == request_rid or ( request_rid == "thirdpartyemailpassword" and recipe.get_recipe_id() in ["thirdparty", "emailpassword"] ) or ( request_rid == "thirdpartypasswordless" and recipe.get_recipe_id() in ["thirdparty", "passwordless"] ) ] if len(matched_recipes) == 0: log_debug_message( "middleware: Not handling based on rid match. Trying without rid." ) return await handle_without_rid() for recipe in matched_recipes: log_debug_message( "middleware: Matched with recipe Ids: %s", recipe.get_recipe_id() ) id_result = None final_matched_recipe = None for recipe in matched_recipes: current_id_result = await recipe.return_api_id_if_can_handle_request( path, method, user_context ) if current_id_result is not None: if id_result is not None: raise ValueError( "Two recipes have matched the same API path and method! This is a bug in the SDK. Please contact support." ) final_matched_recipe = recipe id_result = current_id_result if id_result is None or final_matched_recipe is None: return await handle_without_rid() log_debug_message( "middleware: Request being handled by recipe. ID is: %s", id_result.api_id, ) request_handled = await final_matched_recipe.handle_api_request( id_result.api_id, id_result.tenant_id, request, path, method, response, user_context, ) if request_handled is None: log_debug_message( "middleware: Not handled because API returned request_handled as None" ) return None log_debug_message("middleware: Ended") return request_handled return await handle_without_rid() async def handle_supertokens_error( self, request: BaseRequest, err: Exception, response: BaseResponse, user_context: Dict[str, Any], ) -> Optional[BaseResponse]: log_debug_message("errorHandler: Started") log_debug_message( "errorHandler: Error is from SuperTokens recipe. Message: %s", str(err) ) if isinstance(err, GeneralError): raise err if isinstance(err, BadInputError): log_debug_message("errorHandler: Sending 400 status code response") return send_non_200_response_with_message(str(err), 400, response) for recipe in self.recipe_modules: log_debug_message( "errorHandler: Checking recipe for match: %s", recipe.get_recipe_id() ) if recipe.is_error_from_this_recipe_based_on_instance(err) and isinstance( err, SuperTokensError ): log_debug_message( "errorHandler: Matched with recipeID: %s", recipe.get_recipe_id() ) return await recipe.handle_error(request, err, response, user_context) raise err def get_request_from_user_context( self, user_context: Optional[Dict[str, Any]] = None, ) -> Optional[BaseRequest]: if user_context is None: return None if "_default" not in user_context: return None if not isinstance(user_context["_default"], dict): return None return user_context.get("_default", {}).get("request")
Static methods
def get_instance() ‑> Supertokens
def init(app_info: InputAppInfo, framework: "Literal[('fastapi', 'flask', 'django')]", supertokens_config: SupertokensConfig, recipe_list: List[Callable[[AppInfo], RecipeModule]], mode: "Optional[Literal[('asgi', 'wsgi')]]", telemetry: Optional[bool], debug: Optional[bool])
def reset()
Methods
async def create_user_id_mapping(self, supertokens_user_id: str, external_user_id: str, external_user_id_info: Optional[str], force: Optional[bool], user_context: Optional[Dict[str, Any]]) ‑> Union[CreateUserIdMappingOkResult, UnknownSupertokensUserIDError, UserIdMappingAlreadyExistsError]
async def delete_user_id_mapping(self, user_id: str, user_id_type: Optional[UserIDTypes], force: Optional[bool], user_context: Optional[Dict[str, Any]]) ‑> DeleteUserIdMappingOkResult
def get_all_cors_headers(self) ‑> List[str]
def get_request_from_user_context(self, user_context: Optional[Dict[str, Any]] = None)
async def get_user_count(self, include_recipe_ids: Union[None, List[str]], tenant_id: Optional[str] = None, user_context: Optional[Dict[str, Any]] = None) ‑> int
async def get_user_id_mapping(self, user_id: str, user_id_type: Optional[UserIDTypes], user_context: Optional[Dict[str, Any]]) ‑> Union[GetUserIdMappingOkResult, UnknownMappingError]
async def handle_supertokens_error(self, request: BaseRequest, err: Exception, response: BaseResponse, user_context: Dict[str, Any])
async def middleware(self, request: BaseRequest, response: BaseResponse, user_context: Dict[str, Any])
async def update_or_delete_user_id_mapping_info(self, user_id: str, user_id_type: Optional[UserIDTypes], external_user_id_info: Optional[str], user_context: Optional[Dict[str, Any]]) ‑> Union[UpdateOrDeleteUserIdMappingInfoOkResult, UnknownMappingError]
class SupertokensConfig (connection_uri: str, api_key: Union[str, None] = None, network_interceptor: Optional[Callable[[str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]]], Tuple[str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]]]]] = None, disable_core_call_cache: bool = False)
-
Expand source code
class SupertokensConfig: def __init__( self, connection_uri: str, api_key: Union[str, None] = None, network_interceptor: Optional[ Callable[ [ str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]], ], Tuple[ str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]], ], ] ] = None, disable_core_call_cache: bool = False, ): # We keep this = None here because this is directly used by the user. self.connection_uri = connection_uri self.api_key = api_key self.network_interceptor = network_interceptor self.disable_core_call_cache = disable_core_call_cache