Module supertokens_python.auth_utils
Expand source code
from typing import Awaitable, Callable, Dict, Any, Optional, Union, List
from typing_extensions import Literal
from supertokens_python.framework import BaseRequest
from supertokens_python.recipe.accountlinking import (
AccountInfoWithRecipeIdAndUserId,
ShouldAutomaticallyLink,
ShouldNotAutomaticallyLink,
)
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.accountlinking.types import AccountInfoWithRecipeId
from supertokens_python.recipe.accountlinking.utils import (
recipe_init_defined_should_do_automatic_account_linking,
)
from supertokens_python.recipe.multifactorauth.asyncio import (
mark_factor_as_complete_in_session,
)
from supertokens_python.recipe.multifactorauth.recipe import MultiFactorAuthRecipe
from supertokens_python.recipe.multifactorauth.utils import (
is_valid_first_factor,
update_and_get_mfa_related_info_in_session,
)
from supertokens_python.recipe.multitenancy.asyncio import associate_user_to_tenant
from supertokens_python.recipe.session.interfaces import SessionContainer
from supertokens_python.recipe.session.asyncio import create_new_session, get_session
from supertokens_python.recipe.thirdparty.types import ThirdPartyInfo
from supertokens_python.types import (
AccountInfo,
User,
LoginMethod,
)
from supertokens_python.types import (
RecipeUserId,
)
from supertokens_python.recipe.session.exceptions import UnauthorisedError
from supertokens_python.recipe.emailverification import (
EmailVerificationClaim,
)
from supertokens_python.exceptions import BadInputError, raise_bad_input_exception
from supertokens_python.utils import log_debug_message
from .asyncio import get_user
class LinkingToSessionUserFailedError:
status: Literal["LINKING_TO_SESSION_USER_FAILED"] = "LINKING_TO_SESSION_USER_FAILED"
reason: Literal[
"EMAIL_VERIFICATION_REQUIRED",
"RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR",
"ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR",
"SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR",
"INPUT_USER_IS_NOT_A_PRIMARY_USER",
]
def __init__(
self,
reason: Literal[
"EMAIL_VERIFICATION_REQUIRED",
"RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR",
"ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR",
"SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR",
"INPUT_USER_IS_NOT_A_PRIMARY_USER",
],
):
self.reason = reason
class OkResponse:
status: Literal["OK"]
valid_factor_ids: List[str]
is_first_factor: bool
def __init__(self, valid_factor_ids: List[str], is_first_factor: bool):
self.status = "OK"
self.valid_factor_ids = valid_factor_ids
self.is_first_factor = is_first_factor
class SignUpNotAllowedResponse:
status: Literal["SIGN_UP_NOT_ALLOWED"] = "SIGN_UP_NOT_ALLOWED"
class SignInNotAllowedResponse:
status: Literal["SIGN_IN_NOT_ALLOWED"] = "SIGN_IN_NOT_ALLOWED"
async def pre_auth_checks(
authenticating_account_info: AccountInfoWithRecipeId,
authenticating_user: Union[User, None],
tenant_id: str,
factor_ids: List[str],
is_sign_up: bool,
is_verified: bool,
sign_in_verifies_login_method: bool,
skip_session_user_update_in_core: bool,
session: Union[SessionContainer, None],
should_try_linking_with_session_user: Union[bool, None],
user_context: Dict[str, Any],
) -> Union[
OkResponse,
SignUpNotAllowedResponse,
SignInNotAllowedResponse,
LinkingToSessionUserFailedError,
]:
valid_factor_ids: List[str] = []
if len(factor_ids) == 0:
raise Exception(
"This should never happen: empty factorIds array passed to preSignInChecks"
)
log_debug_message("preAuthChecks checking auth types")
auth_type_info = await check_auth_type_and_linking_status(
session,
should_try_linking_with_session_user,
authenticating_account_info,
authenticating_user,
skip_session_user_update_in_core,
user_context,
)
if auth_type_info.status != "OK":
log_debug_message(
f"preAuthChecks returning {auth_type_info.status} from checkAuthType results"
)
return auth_type_info
if auth_type_info.is_first_factor:
log_debug_message("preAuthChecks getting valid first factors")
valid_first_factors = (
await filter_out_invalid_first_factors_or_throw_if_all_are_invalid(
factor_ids, tenant_id, session is not None, user_context
)
)
valid_factor_ids = valid_first_factors
else:
assert isinstance(
auth_type_info,
(OkSecondFactorNotLinkedResponse, OkSecondFactorLinkedResponse),
)
assert session is not None
log_debug_message("preAuthChecks getting valid secondary factors")
valid_factor_ids = (
await filter_out_invalid_second_factors_or_throw_if_all_are_invalid(
factor_ids,
auth_type_info.input_user_already_linked_to_session_user,
auth_type_info.session_user,
session,
user_context,
)
)
if not is_sign_up and authenticating_user is None:
raise Exception(
"This should never happen: preAuthChecks called with isSignUp: false, authenticatingUser: None"
)
if is_sign_up:
verified_in_session_user = not isinstance(
auth_type_info, OkFirstFactorResponse
) and any(
lm.verified
and (
lm.has_same_email_as(authenticating_account_info.email)
or lm.has_same_phone_number_as(authenticating_account_info.phone_number)
)
for lm in auth_type_info.session_user.login_methods
)
log_debug_message("preAuthChecks checking if the user is allowed to sign up")
if not await AccountLinkingRecipe.get_instance().is_sign_up_allowed(
new_user=authenticating_account_info,
is_verified=is_verified
or sign_in_verifies_login_method
or verified_in_session_user,
tenant_id=tenant_id,
session=session,
user_context=user_context,
):
return SignUpNotAllowedResponse()
elif authenticating_user is not None:
log_debug_message("preAuthChecks checking if the user is allowed to sign in")
if not await AccountLinkingRecipe.get_instance().is_sign_in_allowed(
user=authenticating_user,
account_info=authenticating_account_info,
sign_in_verifies_login_method=sign_in_verifies_login_method,
tenant_id=tenant_id,
session=session,
user_context=user_context,
):
return SignInNotAllowedResponse()
log_debug_message("preAuthChecks returning OK")
return OkResponse(
valid_factor_ids=valid_factor_ids,
is_first_factor=auth_type_info.is_first_factor,
)
class PostAuthChecksOkResponse:
status: Literal["OK"]
session: SessionContainer
user: User
def __init__(self, status: Literal["OK"], session: SessionContainer, user: User):
self.status = status
self.session = session
self.user = user
class PostAuthChecksSignInNotAllowedResponse:
status: Literal["SIGN_IN_NOT_ALLOWED"]
async def post_auth_checks(
authenticated_user: User,
recipe_user_id: RecipeUserId,
is_sign_up: bool,
factor_id: str,
session: Union[SessionContainer, None],
tenant_id: str,
user_context: Dict[str, Any],
request: BaseRequest,
) -> Union[PostAuthChecksOkResponse, PostAuthChecksSignInNotAllowedResponse]:
log_debug_message(
f"postAuthChecks called {'with' if session is not None else 'without'} a session to "
f"{'sign up' if is_sign_up else 'sign in'} with {factor_id}"
)
mfa_instance = MultiFactorAuthRecipe.get_instance()
resp_session = session
if session is not None:
authenticated_user_linked_to_session_user = any(
lm.recipe_user_id.get_as_string()
== session.get_recipe_user_id(user_context).get_as_string()
for lm in authenticated_user.login_methods
)
if authenticated_user_linked_to_session_user:
log_debug_message("postAuthChecks session and input user got linked")
if mfa_instance is not None:
log_debug_message("postAuthChecks marking factor as completed")
# if the authenticating user is linked to the current session user (it means that the factor got set up or completed),
# we mark it as completed in the session.
assert resp_session is not None
await mark_factor_as_complete_in_session(
resp_session, factor_id, user_context
)
else:
log_debug_message("postAuthChecks checking overwriteSessionDuringSignInUp")
# If the new user wasn't linked to the current one, we check the config and overwrite the session if required
# Note: we could also get here if MFA is enabled, but the app didn't want to link the user to the session user.
# This is intentional, since the MFA and overwriteSessionDuringSignInUp configs should work independently.
resp_session = await create_new_session(
request, tenant_id, recipe_user_id, {}, {}, user_context
)
if mfa_instance is not None:
await mark_factor_as_complete_in_session(
resp_session, factor_id, user_context
)
else:
log_debug_message("postAuthChecks creating session for first factor sign in/up")
# If there is no input session, we do not need to do anything other checks and create a new session
resp_session = await create_new_session(
request, tenant_id, recipe_user_id, {}, {}, user_context
)
# Here we can always mark the factor as completed, since we just created the session
if mfa_instance is not None:
await mark_factor_as_complete_in_session(
resp_session, factor_id, user_context
)
assert resp_session is not None
return PostAuthChecksOkResponse(
status="OK", session=resp_session, user=authenticated_user
)
class AuthenticatingUserInfo:
def __init__(self, user: User, login_method: Union[LoginMethod, None]):
self.user = user
self.login_method = login_method
async def get_authenticating_user_and_add_to_current_tenant_if_required(
recipe_id: str,
email: Optional[str],
phone_number: Optional[str],
third_party: Optional[ThirdPartyInfo],
tenant_id: str,
session: Optional[SessionContainer],
check_credentials_on_tenant: Callable[[str], Awaitable[bool]],
user_context: Dict[str, Any],
) -> Optional[AuthenticatingUserInfo]:
i = 0
while i < 300:
account_info = {
"email": email,
"phoneNumber": phone_number,
"thirdParty": third_party,
}
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired called with {account_info}"
)
existing_users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=AccountInfo(
email=email, phone_number=phone_number, third_party=third_party
),
do_union_of_account_info=True,
user_context=user_context,
)
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired got {len(existing_users)} users from the core resp"
)
users_with_matching_login_methods = [
AuthenticatingUserInfo(
user=user,
login_method=next(
(
lm
for lm in user.login_methods
if lm.recipe_id == recipe_id
and (
(email is not None and lm.has_same_email_as(email))
or lm.has_same_phone_number_as(phone_number)
or lm.has_same_third_party_info_as(third_party)
)
),
None,
),
)
for user in existing_users
]
users_with_matching_login_methods = [
u for u in users_with_matching_login_methods if u.login_method is not None
]
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired got {len(users_with_matching_login_methods)} users with matching login methods"
)
if len(users_with_matching_login_methods) > 1:
raise Exception(
"You have found a bug. Please report it on https://github.com/supertokens/supertokens-node/issues"
)
authenticating_user = (
AuthenticatingUserInfo(
users_with_matching_login_methods[0].user,
users_with_matching_login_methods[0].login_method,
)
if users_with_matching_login_methods
else None
)
if authenticating_user is None and session is not None:
log_debug_message(
"getAuthenticatingUserAndAddToCurrentTenantIfRequired checking session user"
)
session_user = await get_user(
session.get_user_id(user_context), user_context
)
if session_user is None:
raise UnauthorisedError(
"Session user not found",
)
if not session_user.is_primary_user:
log_debug_message(
"getAuthenticatingUserAndAddToCurrentTenantIfRequired session user is non-primary so returning early without checking other tenants"
)
return None
matching_login_methods_from_session_user = [
lm
for lm in session_user.login_methods
if lm.recipe_id == recipe_id
and (
lm.has_same_email_as(email)
or lm.has_same_phone_number_as(phone_number)
or lm.has_same_third_party_info_as(third_party)
)
]
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired session has {len(matching_login_methods_from_session_user)} matching login methods"
)
if any(
tenant_id in lm.tenant_ids
for lm in matching_login_methods_from_session_user
):
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired session has {len(matching_login_methods_from_session_user)} matching login methods"
)
return AuthenticatingUserInfo(
user=session_user,
login_method=next(
lm
for lm in matching_login_methods_from_session_user
if tenant_id in lm.tenant_ids
),
)
go_to_retry = False
for lm in matching_login_methods_from_session_user:
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired session checking credentials on {lm.tenant_ids[0]}"
)
if await check_credentials_on_tenant(lm.tenant_ids[0]):
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired associating user from {lm.tenant_ids[0]} with current tenant"
)
associate_res = await associate_user_to_tenant(
tenant_id, lm.recipe_user_id, user_context
)
log_debug_message(
f"getAuthenticatingUserAndAddToCurrentTenantIfRequired associating returned {associate_res.status}"
)
if associate_res.status == "OK":
lm.tenant_ids.append(tenant_id)
return AuthenticatingUserInfo(
user=session_user, login_method=lm
)
if associate_res.status in [
"UNKNOWN_USER_ID_ERROR",
"EMAIL_ALREADY_EXISTS_ERROR",
"PHONE_NUMBER_ALREADY_EXISTS_ERROR",
"THIRD_PARTY_USER_ALREADY_EXISTS_ERROR",
]:
go_to_retry = True
break
if associate_res.status == "ASSOCIATION_NOT_ALLOWED_ERROR":
raise UnauthorisedError(
"Session user not associated with the session tenant"
)
if go_to_retry:
log_debug_message(
"getAuthenticatingUserAndAddToCurrentTenantIfRequired retrying"
)
i += 1
continue
return authenticating_user
raise Exception(
"This should never happen: ran out of retries for getAuthenticatingUserAndAddToCurrentTenantIfRequired"
)
class OkFirstFactorResponse:
status: Literal["OK"] = "OK"
is_first_factor: Literal[True] = True
class OkSecondFactorLinkedResponse:
status: Literal["OK"] = "OK"
is_first_factor: Literal[False] = False
input_user_already_linked_to_session_user: Literal[True] = True
session_user: User
def __init__(self, session_user: User):
self.session_user = session_user
class OkSecondFactorNotLinkedResponse:
status: Literal["OK"] = "OK"
is_first_factor: Literal[False] = False
input_user_already_linked_to_session_user: Literal[False] = False
session_user: User
linking_to_session_user_requires_verification: bool
def __init__(
self,
session_user: User,
linking_to_session_user_requires_verification: bool,
):
self.session_user = session_user
self.linking_to_session_user_requires_verification = (
linking_to_session_user_requires_verification
)
async def check_auth_type_and_linking_status(
session: Union[SessionContainer, None],
should_try_linking_with_session_user: Union[bool, None],
account_info: AccountInfoWithRecipeId,
input_user: Union[User, None],
skip_session_user_update_in_core: bool,
user_context: Dict[str, Any],
) -> Union[
OkFirstFactorResponse,
OkSecondFactorLinkedResponse,
OkSecondFactorNotLinkedResponse,
LinkingToSessionUserFailedError,
]:
log_debug_message("check_auth_type_and_linking_status called")
session_user: Union[User, None] = None
if session is None:
if should_try_linking_with_session_user is True:
raise UnauthorisedError(
"Session not found but shouldTryLinkingWithSessionUser is true"
)
log_debug_message(
"check_auth_type_and_linking_status returning first factor because there is no session"
)
return OkFirstFactorResponse()
else:
if should_try_linking_with_session_user is False:
# In our normal flows this should never happen - but some user overrides might do this.
# Anyway, since should_try_linking_with_session_user explicitly set to false, it's safe to consider this a first factor
log_debug_message(
"check_auth_type_and_linking_status returning first factor because should_try_linking_with_session_user is False"
)
return OkFirstFactorResponse()
if not recipe_init_defined_should_do_automatic_account_linking():
if should_try_linking_with_session_user is True:
raise Exception(
"Please initialise the account linking recipe and define should_do_automatic_account_linking to enable MFA"
)
else:
if MultiFactorAuthRecipe.get_instance() is not None:
raise Exception(
"Please initialise the account linking recipe and define should_do_automatic_account_linking to enable MFA"
)
else:
return OkFirstFactorResponse()
if input_user is not None and input_user.id == session.get_user_id():
log_debug_message(
"check_auth_type_and_linking_status returning secondary factor, session and input user are the same"
)
return OkSecondFactorLinkedResponse(
session_user=input_user,
)
log_debug_message(
f"check_auth_type_and_linking_status loading session user, {input_user.id if input_user else None} === {session.get_user_id()}"
)
session_user_result = await try_and_make_session_user_into_a_primary_user(
session, skip_session_user_update_in_core, user_context
)
if session_user_result.status == "SHOULD_AUTOMATICALLY_LINK_FALSE":
if should_try_linking_with_session_user is True:
raise BadInputError(
"should_do_automatic_account_linking returned false when creating primary user but shouldTryLinkingWithSessionUser is true"
)
return OkFirstFactorResponse()
elif (
session_user_result.status
== "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
):
return LinkingToSessionUserFailedError(
reason="SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
)
session_user = session_user_result.user
should_link = await AccountLinkingRecipe.get_instance().config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
account_info
),
session_user,
session,
session.get_tenant_id(),
user_context,
)
log_debug_message(
f"check_auth_type_and_linking_status session user <-> input user should_do_automatic_account_linking returned {should_link}"
)
if isinstance(should_link, ShouldNotAutomaticallyLink):
if should_try_linking_with_session_user is True:
raise BadInputError(
"should_do_automatic_account_linking returned false when creating primary user but shouldTryLinkingWithSessionUser is true"
)
return OkFirstFactorResponse()
else:
return OkSecondFactorNotLinkedResponse(
session_user=session_user,
linking_to_session_user_requires_verification=should_link.should_require_verification,
)
class OkResponse2:
status: Literal["OK"]
user: User
def __init__(self, user: User):
self.status = "OK"
self.user: User = user
async def link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
tenant_id: str,
input_user: User,
recipe_user_id: RecipeUserId,
session: Union[SessionContainer, None],
should_try_linking_with_session_user: Union[bool, None],
user_context: Dict[str, Any],
) -> Union[OkResponse2, LinkingToSessionUserFailedError,]:
log_debug_message(
"link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info called"
)
async def retry():
log_debug_message(
"link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info retrying...."
)
return await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
tenant_id=tenant_id,
input_user=input_user,
session=session,
recipe_user_id=recipe_user_id,
should_try_linking_with_session_user=should_try_linking_with_session_user,
user_context=user_context,
)
auth_login_method = next(
(
lm
for lm in input_user.login_methods
if lm.recipe_user_id.get_as_string() == recipe_user_id.get_as_string()
),
None,
)
if auth_login_method is None:
raise Exception(
"This should never happen: the recipe_user_id and user is inconsistent in create_primary_user_id_or_link_by_account_info params"
)
auth_type_res = await check_auth_type_and_linking_status(
session,
should_try_linking_with_session_user,
AccountInfoWithRecipeId(
recipe_id=auth_login_method.recipe_id,
email=auth_login_method.email,
phone_number=auth_login_method.phone_number,
third_party=auth_login_method.third_party,
),
input_user,
False,
user_context,
)
if not isinstance(
auth_type_res,
(
OkFirstFactorResponse,
OkSecondFactorLinkedResponse,
OkSecondFactorNotLinkedResponse,
),
):
return LinkingToSessionUserFailedError(reason=auth_type_res.reason)
if isinstance(auth_type_res, OkFirstFactorResponse):
if not recipe_init_defined_should_do_automatic_account_linking():
log_debug_message(
"link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info skipping link by account info because this is a first factor auth and the app hasn't defined should_do_automatic_account_linking"
)
return OkResponse2(user=input_user)
log_debug_message(
"link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info trying to link by account info because this is a first factor auth"
)
link_res = await AccountLinkingRecipe.get_instance().try_linking_by_account_info_or_create_primary_user(
input_user=input_user,
session=session,
tenant_id=tenant_id,
user_context=user_context,
)
if link_res.status == "OK":
assert link_res.user is not None
return OkResponse2(user=link_res.user)
if link_res.status == "NO_LINK":
return OkResponse2(user=input_user)
return await retry()
if isinstance(auth_type_res, OkSecondFactorLinkedResponse):
return OkResponse2(user=auth_type_res.session_user)
log_debug_message(
"link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info trying to link by session info"
)
session_linking_res = await try_linking_by_session(
session_user=auth_type_res.session_user,
authenticated_user=input_user,
auth_login_method=auth_login_method,
linking_to_session_user_requires_verification=auth_type_res.linking_to_session_user_requires_verification,
user_context=user_context,
)
if isinstance(session_linking_res, LinkingToSessionUserFailedError):
if session_linking_res.reason == "INPUT_USER_IS_NOT_A_PRIMARY_USER":
return await retry()
else:
return session_linking_res
else:
return session_linking_res
class ShouldAutomaticallyLinkFalseResponse:
status: Literal["SHOULD_AUTOMATICALLY_LINK_FALSE"]
def __init__(self):
self.status = "SHOULD_AUTOMATICALLY_LINK_FALSE"
class AccountInfoAlreadyAssociatedResponse:
status: Literal[
"ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
]
def __init__(self):
self.status = (
"ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
)
async def try_and_make_session_user_into_a_primary_user(
session: SessionContainer,
skip_session_user_update_in_core: bool,
user_context: Dict[str, Any],
) -> Union[
OkResponse2,
ShouldAutomaticallyLinkFalseResponse,
AccountInfoAlreadyAssociatedResponse,
]:
log_debug_message("try_and_make_session_user_into_a_primary_user called")
session_user = await get_user(session.get_user_id(), user_context)
if session_user is None:
raise UnauthorisedError("Session user not found")
if session_user.is_primary_user:
log_debug_message(
"try_and_make_session_user_into_a_primary_user session user already primary"
)
return OkResponse2(user=session_user)
else:
log_debug_message(
"try_and_make_session_user_into_a_primary_user not primary user yet"
)
account_linking_instance = AccountLinkingRecipe.get_instance()
should_do_account_linking = (
await account_linking_instance.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
session_user.login_methods[0]
),
None,
session,
session.get_tenant_id(),
user_context,
)
)
log_debug_message(
f"try_and_make_session_user_into_a_primary_user should_do_account_linking: {should_do_account_linking}"
)
if isinstance(should_do_account_linking, ShouldAutomaticallyLink):
if skip_session_user_update_in_core:
return OkResponse2(user=session_user)
if (
should_do_account_linking.should_require_verification
and not session_user.login_methods[0].verified
):
if (
await session.get_claim_value(EmailVerificationClaim, user_context)
) is not False:
log_debug_message(
"try_and_make_session_user_into_a_primary_user updating emailverification status in session"
)
await session.set_claim_value(
EmailVerificationClaim, False, user_context
)
log_debug_message(
"try_and_make_session_user_into_a_primary_user throwing validation error"
)
await session.assert_claims(
[EmailVerificationClaim.validators.is_verified()], user_context
)
raise Exception(
"This should never happen: email verification claim validator passed after setting value to false"
)
create_primary_user_res = await account_linking_instance.recipe_implementation.create_primary_user(
recipe_user_id=session_user.login_methods[0].recipe_user_id,
user_context=user_context,
)
log_debug_message(
f"try_and_make_session_user_into_a_primary_user create_primary_user returned {create_primary_user_res.status}"
)
if (
create_primary_user_res.status
== "RECIPE_USER_ID_ALREADY_LINKED_WITH_PRIMARY_USER_ID_ERROR"
):
raise UnauthorisedError("Session user not found")
elif create_primary_user_res.status == "OK":
return OkResponse2(user=create_primary_user_res.user)
else:
return AccountInfoAlreadyAssociatedResponse()
else:
return ShouldAutomaticallyLinkFalseResponse()
async def try_linking_by_session(
linking_to_session_user_requires_verification: bool,
auth_login_method: LoginMethod,
authenticated_user: User,
session_user: User,
user_context: Dict[str, Any],
) -> Union[OkResponse2, LinkingToSessionUserFailedError,]:
log_debug_message("tryLinkingBySession called")
session_user_has_verified_account_info = any(
(
lm.has_same_email_as(auth_login_method.email)
or lm.has_same_phone_number_as(auth_login_method.phone_number)
)
and lm.verified
for lm in session_user.login_methods
)
can_link_based_on_verification = (
not linking_to_session_user_requires_verification
or auth_login_method.verified
or session_user_has_verified_account_info
)
if not can_link_based_on_verification:
return LinkingToSessionUserFailedError(reason="EMAIL_VERIFICATION_REQUIRED")
link_accounts_result = (
await AccountLinkingRecipe.get_instance().recipe_implementation.link_accounts(
recipe_user_id=authenticated_user.login_methods[0].recipe_user_id,
primary_user_id=session_user.id,
user_context=user_context,
)
)
if link_accounts_result.status == "OK":
log_debug_message(
"tryLinkingBySession successfully linked input user to session user"
)
return OkResponse2(user=link_accounts_result.user)
elif (
link_accounts_result.status
== "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
):
log_debug_message(
"tryLinkingBySession linking to session user failed because of a race condition - input user linked to another user"
)
return LinkingToSessionUserFailedError(
reason="RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
)
elif link_accounts_result.status == "INPUT_USER_IS_NOT_A_PRIMARY_USER":
log_debug_message(
"tryLinkingBySession linking to session user failed because of a race condition - INPUT_USER_IS_NOT_A_PRIMARY_USER, should retry"
)
return LinkingToSessionUserFailedError(
reason="INPUT_USER_IS_NOT_A_PRIMARY_USER"
)
else:
log_debug_message(
"tryLinkingBySession linking to session user failed because of a race condition - input user has another primary user it can be linked to"
)
return LinkingToSessionUserFailedError(
reason="ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
)
async def filter_out_invalid_first_factors_or_throw_if_all_are_invalid(
factor_ids: List[str],
tenant_id: str,
has_session: bool,
user_context: Dict[str, Any],
) -> List[str]:
valid_factor_ids: List[str] = []
for _id in factor_ids:
valid_res = await is_valid_first_factor(tenant_id, _id, user_context)
if valid_res == "TENANT_NOT_FOUND_ERROR":
if has_session:
raise UnauthorisedError("Tenant not found")
else:
raise Exception("Tenant not found error.")
elif valid_res == "OK":
valid_factor_ids.append(_id)
if len(valid_factor_ids) == 0:
if not has_session:
raise UnauthorisedError(
"A valid session is required to authenticate with secondary factors"
)
else:
raise_bad_input_exception(
"First factor sign in/up called for a non-first factor with an active session. This might indicate that you are trying to use this as a secondary factor, but disabled account linking."
)
return valid_factor_ids
async def filter_out_invalid_second_factors_or_throw_if_all_are_invalid(
factor_ids: List[str],
input_user_already_linked_to_session_user: bool,
session_user: User,
session: SessionContainer,
user_context: Dict[str, Any],
) -> List[str]:
log_debug_message(
f"filter_out_invalid_second_factors_or_throw_if_all_are_invalid called for {', '.join(factor_ids)}"
)
mfa_instance = MultiFactorAuthRecipe.get_instance()
if mfa_instance is not None:
if not input_user_already_linked_to_session_user:
factors_set_up_for_user_prom: Optional[List[str]] = None
mfa_info_prom = None
async def get_factors_set_up_for_user() -> List[str]:
nonlocal factors_set_up_for_user_prom
if factors_set_up_for_user_prom is None:
factors_set_up_for_user_prom = await mfa_instance.recipe_implementation.get_factors_setup_for_user(
user=session_user, user_context=user_context
)
assert factors_set_up_for_user_prom is not None
return factors_set_up_for_user_prom
async def get_mfa_requirements_for_auth():
nonlocal mfa_info_prom
if mfa_info_prom is None:
mfa_info_prom = await update_and_get_mfa_related_info_in_session(
input_session=session,
user_context=user_context,
)
return mfa_info_prom.mfa_requirements_for_auth
log_debug_message(
"filter_out_invalid_second_factors_or_throw_if_all_are_invalid checking if linking is allowed by the mfa recipe"
)
caught_setup_factor_error: Optional[Exception] = None
valid_factor_ids: List[str] = []
for _id in factor_ids:
log_debug_message(
"filter_out_invalid_second_factors_or_throw_if_all_are_invalid checking assert_allowed_to_setup_factor_else_throw_invalid_claim_error"
)
try:
await mfa_instance.recipe_implementation.assert_allowed_to_setup_factor_else_throw_invalid_claim_error(
factor_id=_id,
session=session,
factors_set_up_for_user=get_factors_set_up_for_user,
mfa_requirements_for_auth=get_mfa_requirements_for_auth,
user_context=user_context,
)
log_debug_message(
f"filter_out_invalid_second_factors_or_throw_if_all_are_invalid {id} valid because assert_allowed_to_setup_factor_else_throw_invalid_claim_error passed"
)
valid_factor_ids.append(_id)
except Exception as err:
log_debug_message(
f"filter_out_invalid_second_factors_or_throw_if_all_are_invalid assert_allowed_to_setup_factor_else_throw_invalid_claim_error failed for {id}"
)
caught_setup_factor_error = err
if len(valid_factor_ids) == 0:
log_debug_message(
"filter_out_invalid_second_factors_or_throw_if_all_are_invalid rethrowing error from assert_allowed_to_setup_factor_else_throw_invalid_claim_error because we found no valid factors"
)
if caught_setup_factor_error is not None:
raise caught_setup_factor_error
else:
raise Exception("Should never come here")
return valid_factor_ids
else:
log_debug_message(
"filter_out_invalid_second_factors_or_throw_if_all_are_invalid allowing all factors because it'll not create new link"
)
return factor_ids
else:
log_debug_message(
"filter_out_invalid_second_factors_or_throw_if_all_are_invalid allowing all factors because MFA is not enabled"
)
return factor_ids
def is_fake_email(email: str) -> bool:
return email.endswith("@stfakeemail.supertokens.com") or email.endswith(
".fakeemail.com"
) # .fakeemail.com for older users
async def load_session_in_auth_api_if_needed(
request: BaseRequest,
should_try_linking_with_session_user: Optional[bool],
user_context: Dict[str, Any],
) -> Optional[SessionContainer]:
if should_try_linking_with_session_user is not False:
return await get_session(
request,
session_required=should_try_linking_with_session_user is True,
override_global_claim_validators=lambda _, __, ___: [],
user_context=user_context,
)
return None
Functions
async def check_auth_type_and_linking_status(session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], account_info: AccountInfoWithRecipeId, input_user: Optional[User], skip_session_user_update_in_core: bool, user_context: Dict[str, Any]) ‑> Union[OkFirstFactorResponse, OkSecondFactorLinkedResponse, OkSecondFactorNotLinkedResponse, LinkingToSessionUserFailedError]
-
Expand source code
async def check_auth_type_and_linking_status( session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], account_info: AccountInfoWithRecipeId, input_user: Union[User, None], skip_session_user_update_in_core: bool, user_context: Dict[str, Any], ) -> Union[ OkFirstFactorResponse, OkSecondFactorLinkedResponse, OkSecondFactorNotLinkedResponse, LinkingToSessionUserFailedError, ]: log_debug_message("check_auth_type_and_linking_status called") session_user: Union[User, None] = None if session is None: if should_try_linking_with_session_user is True: raise UnauthorisedError( "Session not found but shouldTryLinkingWithSessionUser is true" ) log_debug_message( "check_auth_type_and_linking_status returning first factor because there is no session" ) return OkFirstFactorResponse() else: if should_try_linking_with_session_user is False: # In our normal flows this should never happen - but some user overrides might do this. # Anyway, since should_try_linking_with_session_user explicitly set to false, it's safe to consider this a first factor log_debug_message( "check_auth_type_and_linking_status returning first factor because should_try_linking_with_session_user is False" ) return OkFirstFactorResponse() if not recipe_init_defined_should_do_automatic_account_linking(): if should_try_linking_with_session_user is True: raise Exception( "Please initialise the account linking recipe and define should_do_automatic_account_linking to enable MFA" ) else: if MultiFactorAuthRecipe.get_instance() is not None: raise Exception( "Please initialise the account linking recipe and define should_do_automatic_account_linking to enable MFA" ) else: return OkFirstFactorResponse() if input_user is not None and input_user.id == session.get_user_id(): log_debug_message( "check_auth_type_and_linking_status returning secondary factor, session and input user are the same" ) return OkSecondFactorLinkedResponse( session_user=input_user, ) log_debug_message( f"check_auth_type_and_linking_status loading session user, {input_user.id if input_user else None} === {session.get_user_id()}" ) session_user_result = await try_and_make_session_user_into_a_primary_user( session, skip_session_user_update_in_core, user_context ) if session_user_result.status == "SHOULD_AUTOMATICALLY_LINK_FALSE": if should_try_linking_with_session_user is True: raise BadInputError( "should_do_automatic_account_linking returned false when creating primary user but shouldTryLinkingWithSessionUser is true" ) return OkFirstFactorResponse() elif ( session_user_result.status == "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ): return LinkingToSessionUserFailedError( reason="SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ) session_user = session_user_result.user should_link = await AccountLinkingRecipe.get_instance().config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( account_info ), session_user, session, session.get_tenant_id(), user_context, ) log_debug_message( f"check_auth_type_and_linking_status session user <-> input user should_do_automatic_account_linking returned {should_link}" ) if isinstance(should_link, ShouldNotAutomaticallyLink): if should_try_linking_with_session_user is True: raise BadInputError( "should_do_automatic_account_linking returned false when creating primary user but shouldTryLinkingWithSessionUser is true" ) return OkFirstFactorResponse() else: return OkSecondFactorNotLinkedResponse( session_user=session_user, linking_to_session_user_requires_verification=should_link.should_require_verification, )
async def filter_out_invalid_first_factors_or_throw_if_all_are_invalid(factor_ids: List[str], tenant_id: str, has_session: bool, user_context: Dict[str, Any]) ‑> List[str]
-
Expand source code
async def filter_out_invalid_first_factors_or_throw_if_all_are_invalid( factor_ids: List[str], tenant_id: str, has_session: bool, user_context: Dict[str, Any], ) -> List[str]: valid_factor_ids: List[str] = [] for _id in factor_ids: valid_res = await is_valid_first_factor(tenant_id, _id, user_context) if valid_res == "TENANT_NOT_FOUND_ERROR": if has_session: raise UnauthorisedError("Tenant not found") else: raise Exception("Tenant not found error.") elif valid_res == "OK": valid_factor_ids.append(_id) if len(valid_factor_ids) == 0: if not has_session: raise UnauthorisedError( "A valid session is required to authenticate with secondary factors" ) else: raise_bad_input_exception( "First factor sign in/up called for a non-first factor with an active session. This might indicate that you are trying to use this as a secondary factor, but disabled account linking." ) return valid_factor_ids
async def filter_out_invalid_second_factors_or_throw_if_all_are_invalid(factor_ids: List[str], input_user_already_linked_to_session_user: bool, session_user: User, session: SessionContainer, user_context: Dict[str, Any]) ‑> List[str]
-
Expand source code
async def filter_out_invalid_second_factors_or_throw_if_all_are_invalid( factor_ids: List[str], input_user_already_linked_to_session_user: bool, session_user: User, session: SessionContainer, user_context: Dict[str, Any], ) -> List[str]: log_debug_message( f"filter_out_invalid_second_factors_or_throw_if_all_are_invalid called for {', '.join(factor_ids)}" ) mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is not None: if not input_user_already_linked_to_session_user: factors_set_up_for_user_prom: Optional[List[str]] = None mfa_info_prom = None async def get_factors_set_up_for_user() -> List[str]: nonlocal factors_set_up_for_user_prom if factors_set_up_for_user_prom is None: factors_set_up_for_user_prom = await mfa_instance.recipe_implementation.get_factors_setup_for_user( user=session_user, user_context=user_context ) assert factors_set_up_for_user_prom is not None return factors_set_up_for_user_prom async def get_mfa_requirements_for_auth(): nonlocal mfa_info_prom if mfa_info_prom is None: mfa_info_prom = await update_and_get_mfa_related_info_in_session( input_session=session, user_context=user_context, ) return mfa_info_prom.mfa_requirements_for_auth log_debug_message( "filter_out_invalid_second_factors_or_throw_if_all_are_invalid checking if linking is allowed by the mfa recipe" ) caught_setup_factor_error: Optional[Exception] = None valid_factor_ids: List[str] = [] for _id in factor_ids: log_debug_message( "filter_out_invalid_second_factors_or_throw_if_all_are_invalid checking assert_allowed_to_setup_factor_else_throw_invalid_claim_error" ) try: await mfa_instance.recipe_implementation.assert_allowed_to_setup_factor_else_throw_invalid_claim_error( factor_id=_id, session=session, factors_set_up_for_user=get_factors_set_up_for_user, mfa_requirements_for_auth=get_mfa_requirements_for_auth, user_context=user_context, ) log_debug_message( f"filter_out_invalid_second_factors_or_throw_if_all_are_invalid {id} valid because assert_allowed_to_setup_factor_else_throw_invalid_claim_error passed" ) valid_factor_ids.append(_id) except Exception as err: log_debug_message( f"filter_out_invalid_second_factors_or_throw_if_all_are_invalid assert_allowed_to_setup_factor_else_throw_invalid_claim_error failed for {id}" ) caught_setup_factor_error = err if len(valid_factor_ids) == 0: log_debug_message( "filter_out_invalid_second_factors_or_throw_if_all_are_invalid rethrowing error from assert_allowed_to_setup_factor_else_throw_invalid_claim_error because we found no valid factors" ) if caught_setup_factor_error is not None: raise caught_setup_factor_error else: raise Exception("Should never come here") return valid_factor_ids else: log_debug_message( "filter_out_invalid_second_factors_or_throw_if_all_are_invalid allowing all factors because it'll not create new link" ) return factor_ids else: log_debug_message( "filter_out_invalid_second_factors_or_throw_if_all_are_invalid allowing all factors because MFA is not enabled" ) return factor_ids
async def get_authenticating_user_and_add_to_current_tenant_if_required(recipe_id: str, email: Optional[str], phone_number: Optional[str], third_party: Optional[ThirdPartyInfo], tenant_id: str, session: Optional[SessionContainer], check_credentials_on_tenant: Callable[[str], Awaitable[bool]], user_context: Dict[str, Any]) ‑> Optional[AuthenticatingUserInfo]
-
Expand source code
async def get_authenticating_user_and_add_to_current_tenant_if_required( recipe_id: str, email: Optional[str], phone_number: Optional[str], third_party: Optional[ThirdPartyInfo], tenant_id: str, session: Optional[SessionContainer], check_credentials_on_tenant: Callable[[str], Awaitable[bool]], user_context: Dict[str, Any], ) -> Optional[AuthenticatingUserInfo]: i = 0 while i < 300: account_info = { "email": email, "phoneNumber": phone_number, "thirdParty": third_party, } log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired called with {account_info}" ) existing_users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=AccountInfo( email=email, phone_number=phone_number, third_party=third_party ), do_union_of_account_info=True, user_context=user_context, ) log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired got {len(existing_users)} users from the core resp" ) users_with_matching_login_methods = [ AuthenticatingUserInfo( user=user, login_method=next( ( lm for lm in user.login_methods if lm.recipe_id == recipe_id and ( (email is not None and lm.has_same_email_as(email)) or lm.has_same_phone_number_as(phone_number) or lm.has_same_third_party_info_as(third_party) ) ), None, ), ) for user in existing_users ] users_with_matching_login_methods = [ u for u in users_with_matching_login_methods if u.login_method is not None ] log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired got {len(users_with_matching_login_methods)} users with matching login methods" ) if len(users_with_matching_login_methods) > 1: raise Exception( "You have found a bug. Please report it on https://github.com/supertokens/supertokens-node/issues" ) authenticating_user = ( AuthenticatingUserInfo( users_with_matching_login_methods[0].user, users_with_matching_login_methods[0].login_method, ) if users_with_matching_login_methods else None ) if authenticating_user is None and session is not None: log_debug_message( "getAuthenticatingUserAndAddToCurrentTenantIfRequired checking session user" ) session_user = await get_user( session.get_user_id(user_context), user_context ) if session_user is None: raise UnauthorisedError( "Session user not found", ) if not session_user.is_primary_user: log_debug_message( "getAuthenticatingUserAndAddToCurrentTenantIfRequired session user is non-primary so returning early without checking other tenants" ) return None matching_login_methods_from_session_user = [ lm for lm in session_user.login_methods if lm.recipe_id == recipe_id and ( lm.has_same_email_as(email) or lm.has_same_phone_number_as(phone_number) or lm.has_same_third_party_info_as(third_party) ) ] log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired session has {len(matching_login_methods_from_session_user)} matching login methods" ) if any( tenant_id in lm.tenant_ids for lm in matching_login_methods_from_session_user ): log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired session has {len(matching_login_methods_from_session_user)} matching login methods" ) return AuthenticatingUserInfo( user=session_user, login_method=next( lm for lm in matching_login_methods_from_session_user if tenant_id in lm.tenant_ids ), ) go_to_retry = False for lm in matching_login_methods_from_session_user: log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired session checking credentials on {lm.tenant_ids[0]}" ) if await check_credentials_on_tenant(lm.tenant_ids[0]): log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired associating user from {lm.tenant_ids[0]} with current tenant" ) associate_res = await associate_user_to_tenant( tenant_id, lm.recipe_user_id, user_context ) log_debug_message( f"getAuthenticatingUserAndAddToCurrentTenantIfRequired associating returned {associate_res.status}" ) if associate_res.status == "OK": lm.tenant_ids.append(tenant_id) return AuthenticatingUserInfo( user=session_user, login_method=lm ) if associate_res.status in [ "UNKNOWN_USER_ID_ERROR", "EMAIL_ALREADY_EXISTS_ERROR", "PHONE_NUMBER_ALREADY_EXISTS_ERROR", "THIRD_PARTY_USER_ALREADY_EXISTS_ERROR", ]: go_to_retry = True break if associate_res.status == "ASSOCIATION_NOT_ALLOWED_ERROR": raise UnauthorisedError( "Session user not associated with the session tenant" ) if go_to_retry: log_debug_message( "getAuthenticatingUserAndAddToCurrentTenantIfRequired retrying" ) i += 1 continue return authenticating_user raise Exception( "This should never happen: ran out of retries for getAuthenticatingUserAndAddToCurrentTenantIfRequired" )
def is_fake_email(email: str) ‑> bool
-
Expand source code
def is_fake_email(email: str) -> bool: return email.endswith("@stfakeemail.supertokens.com") or email.endswith( ".fakeemail.com" ) # .fakeemail.com for older users
async def link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(tenant_id: str, input_user: User, recipe_user_id: RecipeUserId, session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], user_context: Dict[str, Any]) ‑> Union[OkResponse2, LinkingToSessionUserFailedError]
-
Expand source code
async def link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info( tenant_id: str, input_user: User, recipe_user_id: RecipeUserId, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], user_context: Dict[str, Any], ) -> Union[OkResponse2, LinkingToSessionUserFailedError,]: log_debug_message( "link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info called" ) async def retry(): log_debug_message( "link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info retrying...." ) return await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info( tenant_id=tenant_id, input_user=input_user, session=session, recipe_user_id=recipe_user_id, should_try_linking_with_session_user=should_try_linking_with_session_user, user_context=user_context, ) auth_login_method = next( ( lm for lm in input_user.login_methods if lm.recipe_user_id.get_as_string() == recipe_user_id.get_as_string() ), None, ) if auth_login_method is None: raise Exception( "This should never happen: the recipe_user_id and user is inconsistent in create_primary_user_id_or_link_by_account_info params" ) auth_type_res = await check_auth_type_and_linking_status( session, should_try_linking_with_session_user, AccountInfoWithRecipeId( recipe_id=auth_login_method.recipe_id, email=auth_login_method.email, phone_number=auth_login_method.phone_number, third_party=auth_login_method.third_party, ), input_user, False, user_context, ) if not isinstance( auth_type_res, ( OkFirstFactorResponse, OkSecondFactorLinkedResponse, OkSecondFactorNotLinkedResponse, ), ): return LinkingToSessionUserFailedError(reason=auth_type_res.reason) if isinstance(auth_type_res, OkFirstFactorResponse): if not recipe_init_defined_should_do_automatic_account_linking(): log_debug_message( "link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info skipping link by account info because this is a first factor auth and the app hasn't defined should_do_automatic_account_linking" ) return OkResponse2(user=input_user) log_debug_message( "link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info trying to link by account info because this is a first factor auth" ) link_res = await AccountLinkingRecipe.get_instance().try_linking_by_account_info_or_create_primary_user( input_user=input_user, session=session, tenant_id=tenant_id, user_context=user_context, ) if link_res.status == "OK": assert link_res.user is not None return OkResponse2(user=link_res.user) if link_res.status == "NO_LINK": return OkResponse2(user=input_user) return await retry() if isinstance(auth_type_res, OkSecondFactorLinkedResponse): return OkResponse2(user=auth_type_res.session_user) log_debug_message( "link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info trying to link by session info" ) session_linking_res = await try_linking_by_session( session_user=auth_type_res.session_user, authenticated_user=input_user, auth_login_method=auth_login_method, linking_to_session_user_requires_verification=auth_type_res.linking_to_session_user_requires_verification, user_context=user_context, ) if isinstance(session_linking_res, LinkingToSessionUserFailedError): if session_linking_res.reason == "INPUT_USER_IS_NOT_A_PRIMARY_USER": return await retry() else: return session_linking_res else: return session_linking_res
async def load_session_in_auth_api_if_needed(request: BaseRequest, should_try_linking_with_session_user: Optional[bool], user_context: Dict[str, Any]) ‑> Optional[SessionContainer]
-
Expand source code
async def load_session_in_auth_api_if_needed( request: BaseRequest, should_try_linking_with_session_user: Optional[bool], user_context: Dict[str, Any], ) -> Optional[SessionContainer]: if should_try_linking_with_session_user is not False: return await get_session( request, session_required=should_try_linking_with_session_user is True, override_global_claim_validators=lambda _, __, ___: [], user_context=user_context, ) return None
async def post_auth_checks(authenticated_user: User, recipe_user_id: RecipeUserId, is_sign_up: bool, factor_id: str, session: Optional[SessionContainer], tenant_id: str, user_context: Dict[str, Any], request: BaseRequest) ‑> Union[PostAuthChecksOkResponse, PostAuthChecksSignInNotAllowedResponse]
-
Expand source code
async def post_auth_checks( authenticated_user: User, recipe_user_id: RecipeUserId, is_sign_up: bool, factor_id: str, session: Union[SessionContainer, None], tenant_id: str, user_context: Dict[str, Any], request: BaseRequest, ) -> Union[PostAuthChecksOkResponse, PostAuthChecksSignInNotAllowedResponse]: log_debug_message( f"postAuthChecks called {'with' if session is not None else 'without'} a session to " f"{'sign up' if is_sign_up else 'sign in'} with {factor_id}" ) mfa_instance = MultiFactorAuthRecipe.get_instance() resp_session = session if session is not None: authenticated_user_linked_to_session_user = any( lm.recipe_user_id.get_as_string() == session.get_recipe_user_id(user_context).get_as_string() for lm in authenticated_user.login_methods ) if authenticated_user_linked_to_session_user: log_debug_message("postAuthChecks session and input user got linked") if mfa_instance is not None: log_debug_message("postAuthChecks marking factor as completed") # if the authenticating user is linked to the current session user (it means that the factor got set up or completed), # we mark it as completed in the session. assert resp_session is not None await mark_factor_as_complete_in_session( resp_session, factor_id, user_context ) else: log_debug_message("postAuthChecks checking overwriteSessionDuringSignInUp") # If the new user wasn't linked to the current one, we check the config and overwrite the session if required # Note: we could also get here if MFA is enabled, but the app didn't want to link the user to the session user. # This is intentional, since the MFA and overwriteSessionDuringSignInUp configs should work independently. resp_session = await create_new_session( request, tenant_id, recipe_user_id, {}, {}, user_context ) if mfa_instance is not None: await mark_factor_as_complete_in_session( resp_session, factor_id, user_context ) else: log_debug_message("postAuthChecks creating session for first factor sign in/up") # If there is no input session, we do not need to do anything other checks and create a new session resp_session = await create_new_session( request, tenant_id, recipe_user_id, {}, {}, user_context ) # Here we can always mark the factor as completed, since we just created the session if mfa_instance is not None: await mark_factor_as_complete_in_session( resp_session, factor_id, user_context ) assert resp_session is not None return PostAuthChecksOkResponse( status="OK", session=resp_session, user=authenticated_user )
async def pre_auth_checks(authenticating_account_info: AccountInfoWithRecipeId, authenticating_user: Optional[User], tenant_id: str, factor_ids: List[str], is_sign_up: bool, is_verified: bool, sign_in_verifies_login_method: bool, skip_session_user_update_in_core: bool, session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], user_context: Dict[str, Any]) ‑> Union[OkResponse, SignUpNotAllowedResponse, SignInNotAllowedResponse, LinkingToSessionUserFailedError]
-
Expand source code
async def pre_auth_checks( authenticating_account_info: AccountInfoWithRecipeId, authenticating_user: Union[User, None], tenant_id: str, factor_ids: List[str], is_sign_up: bool, is_verified: bool, sign_in_verifies_login_method: bool, skip_session_user_update_in_core: bool, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], user_context: Dict[str, Any], ) -> Union[ OkResponse, SignUpNotAllowedResponse, SignInNotAllowedResponse, LinkingToSessionUserFailedError, ]: valid_factor_ids: List[str] = [] if len(factor_ids) == 0: raise Exception( "This should never happen: empty factorIds array passed to preSignInChecks" ) log_debug_message("preAuthChecks checking auth types") auth_type_info = await check_auth_type_and_linking_status( session, should_try_linking_with_session_user, authenticating_account_info, authenticating_user, skip_session_user_update_in_core, user_context, ) if auth_type_info.status != "OK": log_debug_message( f"preAuthChecks returning {auth_type_info.status} from checkAuthType results" ) return auth_type_info if auth_type_info.is_first_factor: log_debug_message("preAuthChecks getting valid first factors") valid_first_factors = ( await filter_out_invalid_first_factors_or_throw_if_all_are_invalid( factor_ids, tenant_id, session is not None, user_context ) ) valid_factor_ids = valid_first_factors else: assert isinstance( auth_type_info, (OkSecondFactorNotLinkedResponse, OkSecondFactorLinkedResponse), ) assert session is not None log_debug_message("preAuthChecks getting valid secondary factors") valid_factor_ids = ( await filter_out_invalid_second_factors_or_throw_if_all_are_invalid( factor_ids, auth_type_info.input_user_already_linked_to_session_user, auth_type_info.session_user, session, user_context, ) ) if not is_sign_up and authenticating_user is None: raise Exception( "This should never happen: preAuthChecks called with isSignUp: false, authenticatingUser: None" ) if is_sign_up: verified_in_session_user = not isinstance( auth_type_info, OkFirstFactorResponse ) and any( lm.verified and ( lm.has_same_email_as(authenticating_account_info.email) or lm.has_same_phone_number_as(authenticating_account_info.phone_number) ) for lm in auth_type_info.session_user.login_methods ) log_debug_message("preAuthChecks checking if the user is allowed to sign up") if not await AccountLinkingRecipe.get_instance().is_sign_up_allowed( new_user=authenticating_account_info, is_verified=is_verified or sign_in_verifies_login_method or verified_in_session_user, tenant_id=tenant_id, session=session, user_context=user_context, ): return SignUpNotAllowedResponse() elif authenticating_user is not None: log_debug_message("preAuthChecks checking if the user is allowed to sign in") if not await AccountLinkingRecipe.get_instance().is_sign_in_allowed( user=authenticating_user, account_info=authenticating_account_info, sign_in_verifies_login_method=sign_in_verifies_login_method, tenant_id=tenant_id, session=session, user_context=user_context, ): return SignInNotAllowedResponse() log_debug_message("preAuthChecks returning OK") return OkResponse( valid_factor_ids=valid_factor_ids, is_first_factor=auth_type_info.is_first_factor, )
async def try_and_make_session_user_into_a_primary_user(session: SessionContainer, skip_session_user_update_in_core: bool, user_context: Dict[str, Any]) ‑> Union[OkResponse2, ShouldAutomaticallyLinkFalseResponse, AccountInfoAlreadyAssociatedResponse]
-
Expand source code
async def try_and_make_session_user_into_a_primary_user( session: SessionContainer, skip_session_user_update_in_core: bool, user_context: Dict[str, Any], ) -> Union[ OkResponse2, ShouldAutomaticallyLinkFalseResponse, AccountInfoAlreadyAssociatedResponse, ]: log_debug_message("try_and_make_session_user_into_a_primary_user called") session_user = await get_user(session.get_user_id(), user_context) if session_user is None: raise UnauthorisedError("Session user not found") if session_user.is_primary_user: log_debug_message( "try_and_make_session_user_into_a_primary_user session user already primary" ) return OkResponse2(user=session_user) else: log_debug_message( "try_and_make_session_user_into_a_primary_user not primary user yet" ) account_linking_instance = AccountLinkingRecipe.get_instance() should_do_account_linking = ( await account_linking_instance.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( session_user.login_methods[0] ), None, session, session.get_tenant_id(), user_context, ) ) log_debug_message( f"try_and_make_session_user_into_a_primary_user should_do_account_linking: {should_do_account_linking}" ) if isinstance(should_do_account_linking, ShouldAutomaticallyLink): if skip_session_user_update_in_core: return OkResponse2(user=session_user) if ( should_do_account_linking.should_require_verification and not session_user.login_methods[0].verified ): if ( await session.get_claim_value(EmailVerificationClaim, user_context) ) is not False: log_debug_message( "try_and_make_session_user_into_a_primary_user updating emailverification status in session" ) await session.set_claim_value( EmailVerificationClaim, False, user_context ) log_debug_message( "try_and_make_session_user_into_a_primary_user throwing validation error" ) await session.assert_claims( [EmailVerificationClaim.validators.is_verified()], user_context ) raise Exception( "This should never happen: email verification claim validator passed after setting value to false" ) create_primary_user_res = await account_linking_instance.recipe_implementation.create_primary_user( recipe_user_id=session_user.login_methods[0].recipe_user_id, user_context=user_context, ) log_debug_message( f"try_and_make_session_user_into_a_primary_user create_primary_user returned {create_primary_user_res.status}" ) if ( create_primary_user_res.status == "RECIPE_USER_ID_ALREADY_LINKED_WITH_PRIMARY_USER_ID_ERROR" ): raise UnauthorisedError("Session user not found") elif create_primary_user_res.status == "OK": return OkResponse2(user=create_primary_user_res.user) else: return AccountInfoAlreadyAssociatedResponse() else: return ShouldAutomaticallyLinkFalseResponse()
async def try_linking_by_session(linking_to_session_user_requires_verification: bool, auth_login_method: LoginMethod, authenticated_user: User, session_user: User, user_context: Dict[str, Any]) ‑> Union[OkResponse2, LinkingToSessionUserFailedError]
-
Expand source code
async def try_linking_by_session( linking_to_session_user_requires_verification: bool, auth_login_method: LoginMethod, authenticated_user: User, session_user: User, user_context: Dict[str, Any], ) -> Union[OkResponse2, LinkingToSessionUserFailedError,]: log_debug_message("tryLinkingBySession called") session_user_has_verified_account_info = any( ( lm.has_same_email_as(auth_login_method.email) or lm.has_same_phone_number_as(auth_login_method.phone_number) ) and lm.verified for lm in session_user.login_methods ) can_link_based_on_verification = ( not linking_to_session_user_requires_verification or auth_login_method.verified or session_user_has_verified_account_info ) if not can_link_based_on_verification: return LinkingToSessionUserFailedError(reason="EMAIL_VERIFICATION_REQUIRED") link_accounts_result = ( await AccountLinkingRecipe.get_instance().recipe_implementation.link_accounts( recipe_user_id=authenticated_user.login_methods[0].recipe_user_id, primary_user_id=session_user.id, user_context=user_context, ) ) if link_accounts_result.status == "OK": log_debug_message( "tryLinkingBySession successfully linked input user to session user" ) return OkResponse2(user=link_accounts_result.user) elif ( link_accounts_result.status == "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ): log_debug_message( "tryLinkingBySession linking to session user failed because of a race condition - input user linked to another user" ) return LinkingToSessionUserFailedError( reason="RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ) elif link_accounts_result.status == "INPUT_USER_IS_NOT_A_PRIMARY_USER": log_debug_message( "tryLinkingBySession linking to session user failed because of a race condition - INPUT_USER_IS_NOT_A_PRIMARY_USER, should retry" ) return LinkingToSessionUserFailedError( reason="INPUT_USER_IS_NOT_A_PRIMARY_USER" ) else: log_debug_message( "tryLinkingBySession linking to session user failed because of a race condition - input user has another primary user it can be linked to" ) return LinkingToSessionUserFailedError( reason="ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" )
Classes
class AccountInfoAlreadyAssociatedResponse
-
Expand source code
class AccountInfoAlreadyAssociatedResponse: status: Literal[ "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ] def __init__(self): self.status = ( "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" )
Class variables
var status : Literal['ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR']
class AuthenticatingUserInfo (user: User, login_method: Optional[LoginMethod])
-
Expand source code
class AuthenticatingUserInfo: def __init__(self, user: User, login_method: Union[LoginMethod, None]): self.user = user self.login_method = login_method
class LinkingToSessionUserFailedError (reason: Literal['EMAIL_VERIFICATION_REQUIRED', 'RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR', 'ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR', 'SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR', 'INPUT_USER_IS_NOT_A_PRIMARY_USER'])
-
Expand source code
class LinkingToSessionUserFailedError: status: Literal["LINKING_TO_SESSION_USER_FAILED"] = "LINKING_TO_SESSION_USER_FAILED" reason: Literal[ "EMAIL_VERIFICATION_REQUIRED", "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR", "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR", "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR", "INPUT_USER_IS_NOT_A_PRIMARY_USER", ] def __init__( self, reason: Literal[ "EMAIL_VERIFICATION_REQUIRED", "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR", "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR", "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR", "INPUT_USER_IS_NOT_A_PRIMARY_USER", ], ): self.reason = reason
Class variables
var reason : Literal['EMAIL_VERIFICATION_REQUIRED', 'RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR', 'ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR', 'SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR', 'INPUT_USER_IS_NOT_A_PRIMARY_USER']
var status : Literal['LINKING_TO_SESSION_USER_FAILED']
class OkFirstFactorResponse
-
Expand source code
class OkFirstFactorResponse: status: Literal["OK"] = "OK" is_first_factor: Literal[True] = True
Class variables
var is_first_factor : Literal[True]
var status : Literal['OK']
class OkResponse (valid_factor_ids: List[str], is_first_factor: bool)
-
Expand source code
class OkResponse: status: Literal["OK"] valid_factor_ids: List[str] is_first_factor: bool def __init__(self, valid_factor_ids: List[str], is_first_factor: bool): self.status = "OK" self.valid_factor_ids = valid_factor_ids self.is_first_factor = is_first_factor
Class variables
var is_first_factor : bool
var status : Literal['OK']
var valid_factor_ids : List[str]
class OkResponse2 (user: User)
-
Expand source code
class OkResponse2: status: Literal["OK"] user: User def __init__(self, user: User): self.status = "OK" self.user: User = user
Class variables
var status : Literal['OK']
var user : User
class OkSecondFactorLinkedResponse (session_user: User)
-
Expand source code
class OkSecondFactorLinkedResponse: status: Literal["OK"] = "OK" is_first_factor: Literal[False] = False input_user_already_linked_to_session_user: Literal[True] = True session_user: User def __init__(self, session_user: User): self.session_user = session_user
Class variables
var input_user_already_linked_to_session_user : Literal[True]
var is_first_factor : Literal[False]
var session_user : User
var status : Literal['OK']
class OkSecondFactorNotLinkedResponse (session_user: User, linking_to_session_user_requires_verification: bool)
-
Expand source code
class OkSecondFactorNotLinkedResponse: status: Literal["OK"] = "OK" is_first_factor: Literal[False] = False input_user_already_linked_to_session_user: Literal[False] = False session_user: User linking_to_session_user_requires_verification: bool def __init__( self, session_user: User, linking_to_session_user_requires_verification: bool, ): self.session_user = session_user self.linking_to_session_user_requires_verification = ( linking_to_session_user_requires_verification )
Class variables
var input_user_already_linked_to_session_user : Literal[False]
var is_first_factor : Literal[False]
var linking_to_session_user_requires_verification : bool
var session_user : User
var status : Literal['OK']
class PostAuthChecksOkResponse (status: Literal['OK'], session: SessionContainer, user: User)
-
Expand source code
class PostAuthChecksOkResponse: status: Literal["OK"] session: SessionContainer user: User def __init__(self, status: Literal["OK"], session: SessionContainer, user: User): self.status = status self.session = session self.user = user
Class variables
var session : SessionContainer
var status : Literal['OK']
var user : User
class PostAuthChecksSignInNotAllowedResponse
-
Expand source code
class PostAuthChecksSignInNotAllowedResponse: status: Literal["SIGN_IN_NOT_ALLOWED"]
Class variables
var status : Literal['SIGN_IN_NOT_ALLOWED']
class ShouldAutomaticallyLinkFalseResponse
-
Expand source code
class ShouldAutomaticallyLinkFalseResponse: status: Literal["SHOULD_AUTOMATICALLY_LINK_FALSE"] def __init__(self): self.status = "SHOULD_AUTOMATICALLY_LINK_FALSE"
Class variables
var status : Literal['SHOULD_AUTOMATICALLY_LINK_FALSE']
class SignInNotAllowedResponse
-
Expand source code
class SignInNotAllowedResponse: status: Literal["SIGN_IN_NOT_ALLOWED"] = "SIGN_IN_NOT_ALLOWED"
Class variables
var status : Literal['SIGN_IN_NOT_ALLOWED']
class SignUpNotAllowedResponse
-
Expand source code
class SignUpNotAllowedResponse: status: Literal["SIGN_UP_NOT_ALLOWED"] = "SIGN_UP_NOT_ALLOWED"
Class variables
var status : Literal['SIGN_UP_NOT_ALLOWED']