Module supertokens_python.recipe.passwordless.api.implementation
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, Optional, Union
from supertokens_python.asyncio import get_user
from supertokens_python.auth_utils import (
OkResponse,
PostAuthChecksOkResponse,
SignInNotAllowedResponse,
SignUpNotAllowedResponse,
check_auth_type_and_linking_status,
filter_out_invalid_first_factors_or_throw_if_all_are_invalid,
get_authenticating_user_and_add_to_current_tenant_if_required,
post_auth_checks,
pre_auth_checks,
)
from supertokens_python.logger import log_debug_message
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.accountlinking.types import AccountInfoWithRecipeId
from supertokens_python.recipe.multifactorauth.types import FactorIds
from supertokens_python.recipe.passwordless.interfaces import (
APIInterface,
APIOptions,
CheckCodeOkResult,
CheckCodeIncorrectUserInputCodeError,
CheckCodeExpiredUserInputCodeError,
CheckCodeRestartFlowError,
ConsumeCodeExpiredUserInputCodeError,
ConsumeCodeIncorrectUserInputCodeError,
ConsumeCodeOkResult,
ConsumeCodePostExpiredUserInputCodeError,
ConsumeCodePostIncorrectUserInputCodeError,
ConsumeCodePostOkResult,
ConsumeCodePostRestartFlowError,
ConsumeCodeRestartFlowError,
CreateCodePostOkResult,
CreateNewCodeForDeviceOkResult,
CreateNewCodeForDeviceUserInputCodeAlreadyUsedError,
EmailExistsGetOkResult,
PasswordlessLoginEmailTemplateVars,
PhoneNumberExistsGetOkResult,
ResendCodePostOkResult,
ResendCodePostRestartFlowError,
SignInUpPostNotAllowedResponse,
)
from supertokens_python.recipe.passwordless.types import (
PasswordlessLoginSMSTemplateVars,
)
from supertokens_python.recipe.passwordless.utils import (
ContactEmailOnlyConfig,
ContactEmailOrPhoneConfig,
ContactPhoneOnlyConfig,
get_enabled_pwless_factors,
)
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.session.exceptions import UnauthorisedError
from supertokens_python.types import (
AccountInfo,
User,
GeneralErrorResponse,
LoginMethod,
RecipeUserId,
)
from ...emailverification import EmailVerificationRecipe
from ...emailverification.interfaces import CreateEmailVerificationTokenOkResult
class PasswordlessUserResult:
user: User
login_method: Union[LoginMethod, None]
def __init__(self, user: User, login_method: Union[LoginMethod, None]):
self.user = user
self.login_method = login_method
async def get_passwordless_user_by_account_info(
tenant_id: str,
user_context: Dict[str, Any],
account_info: AccountInfo,
) -> Optional[PasswordlessUserResult]:
existing_users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=account_info,
do_union_of_account_info=False,
user_context=user_context,
)
log_debug_message(
f"get_passwordless_user_by_account_info got {len(existing_users)} from core resp {account_info}"
)
users_with_matching_login_methods = [
PasswordlessUserResult(
user=user,
login_method=next(
(
lm
for lm in user.login_methods
if lm.recipe_id == "passwordless"
and (
lm.has_same_email_as(account_info.email)
or lm.has_same_phone_number_as(account_info.phone_number)
)
),
None,
),
)
for user in existing_users
]
users_with_matching_login_methods = [
user_data
for user_data in users_with_matching_login_methods
if user_data.login_method is not None
]
log_debug_message(
f"get_passwordless_user_by_account_info {len(users_with_matching_login_methods)} has matching login methods"
)
if len(users_with_matching_login_methods) > 1:
raise Exception(
"This should never happen: multiple users exist matching the accountInfo in passwordless createCode"
)
if len(users_with_matching_login_methods) == 0:
return None
return users_with_matching_login_methods[0]
class APIImplementation(APIInterface):
async def create_code_post(
self,
email: Union[str, None],
phone_number: Union[str, None],
session: Optional[SessionContainer],
should_try_linking_with_session_user: Union[bool, None],
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[
CreateCodePostOkResult, SignInUpPostNotAllowedResponse, GeneralErrorResponse
]:
error_code_map = {
"SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
"LINKING_TO_SESSION_USER_FAILED": {
"SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
},
}
account_info = AccountInfo(
email=email,
phone_number=phone_number,
)
user_with_matching_login_method = await get_passwordless_user_by_account_info(
tenant_id, user_context, account_info
)
factor_ids = []
if session is not None:
factor_ids = [
FactorIds.OTP_EMAIL if email is not None else FactorIds.OTP_PHONE
]
else:
factor_ids = get_enabled_pwless_factors(api_options.config)
if email is not None:
factor_ids = [
f
for f in factor_ids
if f in [FactorIds.OTP_EMAIL, FactorIds.LINK_EMAIL]
]
else:
factor_ids = [
f
for f in factor_ids
if f in [FactorIds.OTP_PHONE, FactorIds.LINK_PHONE]
]
is_verified_input = True
if user_with_matching_login_method is not None:
assert user_with_matching_login_method.login_method is not None
is_verified_input = user_with_matching_login_method.login_method.verified
pre_auth_checks_result = await pre_auth_checks(
authenticating_account_info=AccountInfoWithRecipeId(
recipe_id="passwordless",
email=account_info.email,
phone_number=account_info.phone_number,
),
is_sign_up=user_with_matching_login_method is None,
authenticating_user=(
user_with_matching_login_method.user
if user_with_matching_login_method
else None
),
is_verified=is_verified_input,
sign_in_verifies_login_method=True,
skip_session_user_update_in_core=True,
tenant_id=tenant_id,
factor_ids=factor_ids,
user_context=user_context,
session=session,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
if not isinstance(pre_auth_checks_result, OkResponse):
if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpPostNotAllowedResponse(reason)
if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
raise Exception("Should never come here")
reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
assert isinstance(reason_dict, Dict)
reason = reason_dict[pre_auth_checks_result.reason]
return SignInUpPostNotAllowedResponse(reason=reason)
user_input_code = None
if api_options.config.get_custom_user_input_code is not None:
user_input_code = await api_options.config.get_custom_user_input_code(
tenant_id, user_context
)
user_input_code_input = user_input_code
if api_options.config.get_custom_user_input_code is not None:
user_input_code_input = await api_options.config.get_custom_user_input_code(
tenant_id, user_context
)
response = await api_options.recipe_implementation.create_code(
email=account_info.email,
phone_number=account_info.phone_number,
user_input_code=user_input_code_input,
tenant_id=tenant_id,
user_context=user_context,
session=session,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
magic_link = None
user_input_code = None
flow_type = api_options.config.flow_type
if all(
_id.startswith("link") for _id in pre_auth_checks_result.valid_factor_ids
):
flow_type = "MAGIC_LINK"
elif all(
_id.startswith("otp") for _id in pre_auth_checks_result.valid_factor_ids
):
flow_type = "USER_INPUT_CODE"
else:
flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"
if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
magic_link = (
api_options.app_info.get_origin(
api_options.request, user_context
).get_as_string_dangerous()
+ api_options.app_info.website_base_path.get_as_string_dangerous()
+ "/verify"
+ "?preAuthSessionId="
+ response.pre_auth_session_id
+ "&tenantId="
+ tenant_id
+ "#"
+ response.link_code
)
if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
user_input_code = response.user_input_code
if isinstance(api_options.config.contact_config, ContactEmailOnlyConfig) or (
isinstance(api_options.config.contact_config, ContactEmailOrPhoneConfig)
and email is not None
):
if email is None:
raise Exception("Should never come here")
log_debug_message("Sending passwordless login email to %s", email)
passwordless_email_delivery_input = PasswordlessLoginEmailTemplateVars(
email=email,
user_input_code=user_input_code,
url_with_link_code=magic_link,
code_life_time=response.code_life_time,
pre_auth_session_id=response.pre_auth_session_id,
tenant_id=tenant_id,
is_first_factor=pre_auth_checks_result.is_first_factor,
)
await api_options.email_delivery.ingredient_interface_impl.send_email(
passwordless_email_delivery_input, user_context
)
elif isinstance(
api_options.config.contact_config,
(ContactEmailOrPhoneConfig, ContactPhoneOnlyConfig),
):
if phone_number is None:
raise Exception("Should never come here")
log_debug_message("Sending passwordless login SMS to %s", phone_number)
sms_input = PasswordlessLoginSMSTemplateVars(
phone_number=phone_number,
user_input_code=user_input_code,
url_with_link_code=magic_link,
code_life_time=response.code_life_time,
pre_auth_session_id=response.pre_auth_session_id,
tenant_id=tenant_id,
is_first_factor=pre_auth_checks_result.is_first_factor,
)
await api_options.sms_delivery.ingredient_interface_impl.send_sms(
sms_input, user_context
)
return CreateCodePostOkResult(
response.device_id, response.pre_auth_session_id, flow_type
)
async def resend_code_post(
self,
device_id: str,
pre_auth_session_id: str,
session: Optional[SessionContainer],
should_try_linking_with_session_user: Union[bool, None],
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[
ResendCodePostOkResult, ResendCodePostRestartFlowError, GeneralErrorResponse
]:
device_info = await api_options.recipe_implementation.list_codes_by_device_id(
device_id=device_id, tenant_id=tenant_id, user_context=user_context
)
if device_info is None:
return ResendCodePostRestartFlowError()
if (
api_options.config.contact_config.contact_method == "PHONE"
and device_info.phone_number is None
) or (
api_options.config.contact_config.contact_method == "EMAIL"
and device_info.email is None
):
return ResendCodePostRestartFlowError()
user_with_matching_login_method = await get_passwordless_user_by_account_info(
tenant_id=tenant_id,
user_context=user_context,
account_info=AccountInfo(
email=device_info.email,
phone_number=device_info.phone_number,
),
)
auth_type_info = await check_auth_type_and_linking_status(
session=session,
account_info=AccountInfoWithRecipeId(
recipe_id="passwordless",
email=device_info.email,
phone_number=device_info.phone_number,
),
input_user=(
user_with_matching_login_method.user
if user_with_matching_login_method
else None
),
skip_session_user_update_in_core=True,
user_context=user_context,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
if auth_type_info.status == "LINKING_TO_SESSION_USER_FAILED":
return ResendCodePostRestartFlowError()
number_of_tries_to_create_new_code = 0
while True:
number_of_tries_to_create_new_code += 1
user_input_code = None
if api_options.config.get_custom_user_input_code is not None:
user_input_code = await api_options.config.get_custom_user_input_code(
tenant_id, user_context
)
user_input_code_input = user_input_code
if api_options.config.get_custom_user_input_code is not None:
user_input_code_input = (
await api_options.config.get_custom_user_input_code(
tenant_id, user_context
)
)
response = (
await api_options.recipe_implementation.create_new_code_for_device(
device_id=device_id,
user_input_code=user_input_code_input,
tenant_id=tenant_id,
user_context=user_context,
)
)
if isinstance(
response, CreateNewCodeForDeviceUserInputCodeAlreadyUsedError
):
if number_of_tries_to_create_new_code >= 3:
return GeneralErrorResponse(
"Failed to generate a one time code. Please try again"
)
continue
if isinstance(response, CreateNewCodeForDeviceOkResult):
magic_link = None
user_input_code = None
factor_ids = []
if session is not None:
factor_ids = [
(
FactorIds.OTP_EMAIL
if device_info.email is not None
else FactorIds.OTP_PHONE
)
]
else:
factor_ids = get_enabled_pwless_factors(api_options.config)
factor_ids = await filter_out_invalid_first_factors_or_throw_if_all_are_invalid(
factor_ids, tenant_id, False, user_context
)
flow_type = api_options.config.flow_type
if all(id.startswith("link") for id in factor_ids):
flow_type = "MAGIC_LINK"
elif all(id.startswith("otp") for id in factor_ids):
flow_type = "USER_INPUT_CODE"
else:
flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"
if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
magic_link = (
api_options.app_info.get_origin(
api_options.request, user_context
).get_as_string_dangerous()
+ api_options.app_info.website_base_path.get_as_string_dangerous()
+ "/verify"
+ "?preAuthSessionId="
+ response.pre_auth_session_id
+ "&tenantId="
+ tenant_id
+ "#"
+ response.link_code
)
if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
user_input_code = response.user_input_code
if api_options.config.contact_config.contact_method == "PHONE" or (
api_options.config.contact_config.contact_method == "EMAIL_OR_PHONE"
and device_info.phone_number is not None
):
log_debug_message(
"Sending passwordless login SMS to %s", device_info.phone_number
)
assert device_info.phone_number is not None
sms_input = PasswordlessLoginSMSTemplateVars(
phone_number=device_info.phone_number,
user_input_code=user_input_code,
url_with_link_code=magic_link,
code_life_time=response.code_life_time,
pre_auth_session_id=response.pre_auth_session_id,
tenant_id=tenant_id,
is_first_factor=auth_type_info.is_first_factor,
)
await api_options.sms_delivery.ingredient_interface_impl.send_sms(
sms_input, user_context
)
else:
log_debug_message(
"Sending passwordless login email to %s", device_info.email
)
assert device_info.email is not None
passwordless_email_delivery_input = (
PasswordlessLoginEmailTemplateVars(
email=device_info.email,
user_input_code=user_input_code,
url_with_link_code=magic_link,
code_life_time=response.code_life_time,
pre_auth_session_id=response.pre_auth_session_id,
tenant_id=tenant_id,
is_first_factor=auth_type_info.is_first_factor,
)
)
await api_options.email_delivery.ingredient_interface_impl.send_email(
passwordless_email_delivery_input, user_context
)
return ResendCodePostOkResult()
return ResendCodePostRestartFlowError()
async def consume_code_post(
self,
pre_auth_session_id: str,
user_input_code: Union[str, None],
device_id: Union[str, None],
link_code: Union[str, None],
session: Optional[SessionContainer],
should_try_linking_with_session_user: Union[bool, None],
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[
ConsumeCodePostOkResult,
ConsumeCodePostRestartFlowError,
GeneralErrorResponse,
ConsumeCodePostIncorrectUserInputCodeError,
ConsumeCodePostExpiredUserInputCodeError,
SignInUpPostNotAllowedResponse,
]:
error_code_map = {
"SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
"SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_003)",
"LINKING_TO_SESSION_USER_FAILED": {
"RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_017)",
"ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_018)",
"SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
},
}
device_info = (
await api_options.recipe_implementation.list_codes_by_pre_auth_session_id(
tenant_id=tenant_id,
pre_auth_session_id=pre_auth_session_id,
user_context=user_context,
)
)
if not device_info:
return ConsumeCodePostRestartFlowError()
recipe_id = "passwordless"
account_info = AccountInfo(
phone_number=device_info.phone_number, email=device_info.email
)
check_credentials_response: Optional[
Union[
CheckCodeOkResult,
CheckCodeIncorrectUserInputCodeError,
CheckCodeExpiredUserInputCodeError,
CheckCodeRestartFlowError,
]
] = None
async def check_credentials(_: str):
nonlocal check_credentials_response
if check_credentials_response is None:
check_credentials_response = (
await api_options.recipe_implementation.check_code(
pre_auth_session_id=pre_auth_session_id,
device_id=device_id,
user_input_code=user_input_code,
link_code=link_code,
tenant_id=tenant_id,
user_context=user_context,
)
)
return isinstance(check_credentials_response, CheckCodeOkResult)
check_credentials_response = None
authenticating_user = (
await get_authenticating_user_and_add_to_current_tenant_if_required(
email=account_info.email,
phone_number=account_info.phone_number,
third_party=None,
recipe_id=recipe_id,
user_context=user_context,
session=session,
tenant_id=tenant_id,
check_credentials_on_tenant=check_credentials,
)
)
ev_instance = EmailVerificationRecipe.get_instance_optional()
if account_info.email and session and ev_instance:
session_user = await get_user(session.get_user_id(), user_context)
if session_user is None:
raise UnauthorisedError(
"Session user not found",
)
login_method = next(
(
lm
for lm in session_user.login_methods
if lm.recipe_user_id.get_as_string()
== session.get_recipe_user_id().get_as_string()
),
None,
)
if login_method is None:
raise UnauthorisedError(
"Session user and session recipeUserId is inconsistent",
)
if (
login_method.has_same_email_as(account_info.email)
and not login_method.verified
):
if await check_credentials(tenant_id):
token_response = await ev_instance.recipe_implementation.create_email_verification_token(
tenant_id=tenant_id,
recipe_user_id=login_method.recipe_user_id,
email=account_info.email,
user_context=user_context,
)
if isinstance(token_response, CreateEmailVerificationTokenOkResult):
await ev_instance.recipe_implementation.verify_email_using_token(
tenant_id=tenant_id,
token=token_response.token,
attempt_account_linking=False,
user_context=user_context,
)
factor_id = (
FactorIds.OTP_EMAIL
if device_info.email and user_input_code
else (
FactorIds.LINK_EMAIL
if device_info.email
else (FactorIds.OTP_PHONE if user_input_code else FactorIds.LINK_PHONE)
)
)
is_sign_up = authenticating_user is None
pre_auth_checks_result = await pre_auth_checks(
authenticating_account_info=AccountInfoWithRecipeId(
recipe_id="passwordless",
email=device_info.email,
phone_number=device_info.phone_number,
),
factor_ids=[factor_id],
authenticating_user=(
authenticating_user.user if authenticating_user else None
),
is_sign_up=is_sign_up,
is_verified=(
authenticating_user.login_method.verified
if authenticating_user and authenticating_user.login_method
else True
),
sign_in_verifies_login_method=True,
skip_session_user_update_in_core=False,
tenant_id=tenant_id,
user_context=user_context,
session=session,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
if not isinstance(pre_auth_checks_result, OkResponse):
if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpPostNotAllowedResponse(reason)
if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpPostNotAllowedResponse(reason)
reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
assert isinstance(reason_dict, Dict)
reason = reason_dict[pre_auth_checks_result.reason]
return SignInUpPostNotAllowedResponse(reason=reason)
if check_credentials_response is not None:
if not isinstance(check_credentials_response, CheckCodeOkResult):
return check_credentials_response
response = await api_options.recipe_implementation.consume_code(
pre_auth_session_id=pre_auth_session_id,
device_id=device_id,
user_input_code=user_input_code,
link_code=link_code,
session=session,
tenant_id=tenant_id,
user_context=user_context,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
if isinstance(response, ConsumeCodeRestartFlowError):
return ConsumeCodePostRestartFlowError()
if isinstance(response, ConsumeCodeIncorrectUserInputCodeError):
return ConsumeCodePostIncorrectUserInputCodeError(
response.failed_code_input_attempt_count,
response.maximum_code_input_attempts,
)
if isinstance(response, ConsumeCodeExpiredUserInputCodeError):
return ConsumeCodePostExpiredUserInputCodeError(
response.failed_code_input_attempt_count,
response.maximum_code_input_attempts,
)
if not isinstance(response, ConsumeCodeOkResult):
reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
assert isinstance(reason_dict, Dict)
reason = reason_dict[response.reason]
return SignInUpPostNotAllowedResponse(reason=reason)
authenticating_user_input: User
if response.user:
authenticating_user_input = response.user
elif authenticating_user:
authenticating_user_input = authenticating_user.user
else:
raise Exception("Should never come here")
recipe_user_id_input: RecipeUserId
if response.recipe_user_id:
recipe_user_id_input = response.recipe_user_id
elif authenticating_user:
assert authenticating_user.login_method is not None
recipe_user_id_input = authenticating_user.login_method.recipe_user_id
else:
raise Exception("Should never come here")
post_auth_checks_result = await post_auth_checks(
factor_id=factor_id,
is_sign_up=is_sign_up,
authenticated_user=authenticating_user_input,
recipe_user_id=recipe_user_id_input,
tenant_id=tenant_id,
user_context=user_context,
session=session,
request=api_options.request,
)
if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse):
reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpPostNotAllowedResponse(reason)
return ConsumeCodePostOkResult(
created_new_recipe_user=response.created_new_recipe_user,
user=post_auth_checks_result.user,
session=post_auth_checks_result.session,
)
async def email_exists_get(
self,
email: str,
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[EmailExistsGetOkResult, GeneralErrorResponse]:
users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=AccountInfo(email=email),
do_union_of_account_info=False,
user_context=user_context,
)
user_exists = any(
any(
lm.recipe_id == "passwordless" and lm.has_same_email_as(email)
for lm in u.login_methods
)
for u in users
)
return EmailExistsGetOkResult(exists=user_exists)
async def phone_number_exists_get(
self,
phone_number: str,
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[PhoneNumberExistsGetOkResult, GeneralErrorResponse]:
users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=AccountInfo(phone_number=phone_number),
do_union_of_account_info=False,
user_context=user_context,
)
return PhoneNumberExistsGetOkResult(exists=len(users) > 0)
Functions
async def get_passwordless_user_by_account_info(tenant_id: str, user_context: Dict[str, Any], account_info: AccountInfo) ‑> Optional[PasswordlessUserResult]
Classes
class APIImplementation
-
Expand source code
class APIImplementation(APIInterface): async def create_code_post( self, email: Union[str, None], phone_number: Union[str, None], session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ CreateCodePostOkResult, SignInUpPostNotAllowedResponse, GeneralErrorResponse ]: error_code_map = { "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)", "LINKING_TO_SESSION_USER_FAILED": { "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)", }, } account_info = AccountInfo( email=email, phone_number=phone_number, ) user_with_matching_login_method = await get_passwordless_user_by_account_info( tenant_id, user_context, account_info ) factor_ids = [] if session is not None: factor_ids = [ FactorIds.OTP_EMAIL if email is not None else FactorIds.OTP_PHONE ] else: factor_ids = get_enabled_pwless_factors(api_options.config) if email is not None: factor_ids = [ f for f in factor_ids if f in [FactorIds.OTP_EMAIL, FactorIds.LINK_EMAIL] ] else: factor_ids = [ f for f in factor_ids if f in [FactorIds.OTP_PHONE, FactorIds.LINK_PHONE] ] is_verified_input = True if user_with_matching_login_method is not None: assert user_with_matching_login_method.login_method is not None is_verified_input = user_with_matching_login_method.login_method.verified pre_auth_checks_result = await pre_auth_checks( authenticating_account_info=AccountInfoWithRecipeId( recipe_id="passwordless", email=account_info.email, phone_number=account_info.phone_number, ), is_sign_up=user_with_matching_login_method is None, authenticating_user=( user_with_matching_login_method.user if user_with_matching_login_method else None ), is_verified=is_verified_input, sign_in_verifies_login_method=True, skip_session_user_update_in_core=True, tenant_id=tenant_id, factor_ids=factor_ids, user_context=user_context, session=session, should_try_linking_with_session_user=should_try_linking_with_session_user, ) if not isinstance(pre_auth_checks_result, OkResponse): if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse): reason = error_code_map["SIGN_UP_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpPostNotAllowedResponse(reason) if isinstance(pre_auth_checks_result, SignInNotAllowedResponse): raise Exception("Should never come here") reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"] assert isinstance(reason_dict, Dict) reason = reason_dict[pre_auth_checks_result.reason] return SignInUpPostNotAllowedResponse(reason=reason) user_input_code = None if api_options.config.get_custom_user_input_code is not None: user_input_code = await api_options.config.get_custom_user_input_code( tenant_id, user_context ) user_input_code_input = user_input_code if api_options.config.get_custom_user_input_code is not None: user_input_code_input = await api_options.config.get_custom_user_input_code( tenant_id, user_context ) response = await api_options.recipe_implementation.create_code( email=account_info.email, phone_number=account_info.phone_number, user_input_code=user_input_code_input, tenant_id=tenant_id, user_context=user_context, session=session, should_try_linking_with_session_user=should_try_linking_with_session_user, ) magic_link = None user_input_code = None flow_type = api_options.config.flow_type if all( _id.startswith("link") for _id in pre_auth_checks_result.valid_factor_ids ): flow_type = "MAGIC_LINK" elif all( _id.startswith("otp") for _id in pre_auth_checks_result.valid_factor_ids ): flow_type = "USER_INPUT_CODE" else: flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK" if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"): magic_link = ( api_options.app_info.get_origin( api_options.request, user_context ).get_as_string_dangerous() + api_options.app_info.website_base_path.get_as_string_dangerous() + "/verify" + "?preAuthSessionId=" + response.pre_auth_session_id + "&tenantId=" + tenant_id + "#" + response.link_code ) if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"): user_input_code = response.user_input_code if isinstance(api_options.config.contact_config, ContactEmailOnlyConfig) or ( isinstance(api_options.config.contact_config, ContactEmailOrPhoneConfig) and email is not None ): if email is None: raise Exception("Should never come here") log_debug_message("Sending passwordless login email to %s", email) passwordless_email_delivery_input = PasswordlessLoginEmailTemplateVars( email=email, user_input_code=user_input_code, url_with_link_code=magic_link, code_life_time=response.code_life_time, pre_auth_session_id=response.pre_auth_session_id, tenant_id=tenant_id, is_first_factor=pre_auth_checks_result.is_first_factor, ) await api_options.email_delivery.ingredient_interface_impl.send_email( passwordless_email_delivery_input, user_context ) elif isinstance( api_options.config.contact_config, (ContactEmailOrPhoneConfig, ContactPhoneOnlyConfig), ): if phone_number is None: raise Exception("Should never come here") log_debug_message("Sending passwordless login SMS to %s", phone_number) sms_input = PasswordlessLoginSMSTemplateVars( phone_number=phone_number, user_input_code=user_input_code, url_with_link_code=magic_link, code_life_time=response.code_life_time, pre_auth_session_id=response.pre_auth_session_id, tenant_id=tenant_id, is_first_factor=pre_auth_checks_result.is_first_factor, ) await api_options.sms_delivery.ingredient_interface_impl.send_sms( sms_input, user_context ) return CreateCodePostOkResult( response.device_id, response.pre_auth_session_id, flow_type ) async def resend_code_post( self, device_id: str, pre_auth_session_id: str, session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ ResendCodePostOkResult, ResendCodePostRestartFlowError, GeneralErrorResponse ]: device_info = await api_options.recipe_implementation.list_codes_by_device_id( device_id=device_id, tenant_id=tenant_id, user_context=user_context ) if device_info is None: return ResendCodePostRestartFlowError() if ( api_options.config.contact_config.contact_method == "PHONE" and device_info.phone_number is None ) or ( api_options.config.contact_config.contact_method == "EMAIL" and device_info.email is None ): return ResendCodePostRestartFlowError() user_with_matching_login_method = await get_passwordless_user_by_account_info( tenant_id=tenant_id, user_context=user_context, account_info=AccountInfo( email=device_info.email, phone_number=device_info.phone_number, ), ) auth_type_info = await check_auth_type_and_linking_status( session=session, account_info=AccountInfoWithRecipeId( recipe_id="passwordless", email=device_info.email, phone_number=device_info.phone_number, ), input_user=( user_with_matching_login_method.user if user_with_matching_login_method else None ), skip_session_user_update_in_core=True, user_context=user_context, should_try_linking_with_session_user=should_try_linking_with_session_user, ) if auth_type_info.status == "LINKING_TO_SESSION_USER_FAILED": return ResendCodePostRestartFlowError() number_of_tries_to_create_new_code = 0 while True: number_of_tries_to_create_new_code += 1 user_input_code = None if api_options.config.get_custom_user_input_code is not None: user_input_code = await api_options.config.get_custom_user_input_code( tenant_id, user_context ) user_input_code_input = user_input_code if api_options.config.get_custom_user_input_code is not None: user_input_code_input = ( await api_options.config.get_custom_user_input_code( tenant_id, user_context ) ) response = ( await api_options.recipe_implementation.create_new_code_for_device( device_id=device_id, user_input_code=user_input_code_input, tenant_id=tenant_id, user_context=user_context, ) ) if isinstance( response, CreateNewCodeForDeviceUserInputCodeAlreadyUsedError ): if number_of_tries_to_create_new_code >= 3: return GeneralErrorResponse( "Failed to generate a one time code. Please try again" ) continue if isinstance(response, CreateNewCodeForDeviceOkResult): magic_link = None user_input_code = None factor_ids = [] if session is not None: factor_ids = [ ( FactorIds.OTP_EMAIL if device_info.email is not None else FactorIds.OTP_PHONE ) ] else: factor_ids = get_enabled_pwless_factors(api_options.config) factor_ids = await filter_out_invalid_first_factors_or_throw_if_all_are_invalid( factor_ids, tenant_id, False, user_context ) flow_type = api_options.config.flow_type if all(id.startswith("link") for id in factor_ids): flow_type = "MAGIC_LINK" elif all(id.startswith("otp") for id in factor_ids): flow_type = "USER_INPUT_CODE" else: flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK" if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"): magic_link = ( api_options.app_info.get_origin( api_options.request, user_context ).get_as_string_dangerous() + api_options.app_info.website_base_path.get_as_string_dangerous() + "/verify" + "?preAuthSessionId=" + response.pre_auth_session_id + "&tenantId=" + tenant_id + "#" + response.link_code ) if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"): user_input_code = response.user_input_code if api_options.config.contact_config.contact_method == "PHONE" or ( api_options.config.contact_config.contact_method == "EMAIL_OR_PHONE" and device_info.phone_number is not None ): log_debug_message( "Sending passwordless login SMS to %s", device_info.phone_number ) assert device_info.phone_number is not None sms_input = PasswordlessLoginSMSTemplateVars( phone_number=device_info.phone_number, user_input_code=user_input_code, url_with_link_code=magic_link, code_life_time=response.code_life_time, pre_auth_session_id=response.pre_auth_session_id, tenant_id=tenant_id, is_first_factor=auth_type_info.is_first_factor, ) await api_options.sms_delivery.ingredient_interface_impl.send_sms( sms_input, user_context ) else: log_debug_message( "Sending passwordless login email to %s", device_info.email ) assert device_info.email is not None passwordless_email_delivery_input = ( PasswordlessLoginEmailTemplateVars( email=device_info.email, user_input_code=user_input_code, url_with_link_code=magic_link, code_life_time=response.code_life_time, pre_auth_session_id=response.pre_auth_session_id, tenant_id=tenant_id, is_first_factor=auth_type_info.is_first_factor, ) ) await api_options.email_delivery.ingredient_interface_impl.send_email( passwordless_email_delivery_input, user_context ) return ResendCodePostOkResult() return ResendCodePostRestartFlowError() async def consume_code_post( self, pre_auth_session_id: str, user_input_code: Union[str, None], device_id: Union[str, None], link_code: Union[str, None], session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ ConsumeCodePostOkResult, ConsumeCodePostRestartFlowError, GeneralErrorResponse, ConsumeCodePostIncorrectUserInputCodeError, ConsumeCodePostExpiredUserInputCodeError, SignInUpPostNotAllowedResponse, ]: error_code_map = { "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)", "SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_003)", "LINKING_TO_SESSION_USER_FAILED": { "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_017)", "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_018)", "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)", }, } device_info = ( await api_options.recipe_implementation.list_codes_by_pre_auth_session_id( tenant_id=tenant_id, pre_auth_session_id=pre_auth_session_id, user_context=user_context, ) ) if not device_info: return ConsumeCodePostRestartFlowError() recipe_id = "passwordless" account_info = AccountInfo( phone_number=device_info.phone_number, email=device_info.email ) check_credentials_response: Optional[ Union[ CheckCodeOkResult, CheckCodeIncorrectUserInputCodeError, CheckCodeExpiredUserInputCodeError, CheckCodeRestartFlowError, ] ] = None async def check_credentials(_: str): nonlocal check_credentials_response if check_credentials_response is None: check_credentials_response = ( await api_options.recipe_implementation.check_code( pre_auth_session_id=pre_auth_session_id, device_id=device_id, user_input_code=user_input_code, link_code=link_code, tenant_id=tenant_id, user_context=user_context, ) ) return isinstance(check_credentials_response, CheckCodeOkResult) check_credentials_response = None authenticating_user = ( await get_authenticating_user_and_add_to_current_tenant_if_required( email=account_info.email, phone_number=account_info.phone_number, third_party=None, recipe_id=recipe_id, user_context=user_context, session=session, tenant_id=tenant_id, check_credentials_on_tenant=check_credentials, ) ) ev_instance = EmailVerificationRecipe.get_instance_optional() if account_info.email and session and ev_instance: session_user = await get_user(session.get_user_id(), user_context) if session_user is None: raise UnauthorisedError( "Session user not found", ) login_method = next( ( lm for lm in session_user.login_methods if lm.recipe_user_id.get_as_string() == session.get_recipe_user_id().get_as_string() ), None, ) if login_method is None: raise UnauthorisedError( "Session user and session recipeUserId is inconsistent", ) if ( login_method.has_same_email_as(account_info.email) and not login_method.verified ): if await check_credentials(tenant_id): token_response = await ev_instance.recipe_implementation.create_email_verification_token( tenant_id=tenant_id, recipe_user_id=login_method.recipe_user_id, email=account_info.email, user_context=user_context, ) if isinstance(token_response, CreateEmailVerificationTokenOkResult): await ev_instance.recipe_implementation.verify_email_using_token( tenant_id=tenant_id, token=token_response.token, attempt_account_linking=False, user_context=user_context, ) factor_id = ( FactorIds.OTP_EMAIL if device_info.email and user_input_code else ( FactorIds.LINK_EMAIL if device_info.email else (FactorIds.OTP_PHONE if user_input_code else FactorIds.LINK_PHONE) ) ) is_sign_up = authenticating_user is None pre_auth_checks_result = await pre_auth_checks( authenticating_account_info=AccountInfoWithRecipeId( recipe_id="passwordless", email=device_info.email, phone_number=device_info.phone_number, ), factor_ids=[factor_id], authenticating_user=( authenticating_user.user if authenticating_user else None ), is_sign_up=is_sign_up, is_verified=( authenticating_user.login_method.verified if authenticating_user and authenticating_user.login_method else True ), sign_in_verifies_login_method=True, skip_session_user_update_in_core=False, tenant_id=tenant_id, user_context=user_context, session=session, should_try_linking_with_session_user=should_try_linking_with_session_user, ) if not isinstance(pre_auth_checks_result, OkResponse): if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse): reason = error_code_map["SIGN_UP_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpPostNotAllowedResponse(reason) if isinstance(pre_auth_checks_result, SignInNotAllowedResponse): reason = error_code_map["SIGN_IN_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpPostNotAllowedResponse(reason) reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"] assert isinstance(reason_dict, Dict) reason = reason_dict[pre_auth_checks_result.reason] return SignInUpPostNotAllowedResponse(reason=reason) if check_credentials_response is not None: if not isinstance(check_credentials_response, CheckCodeOkResult): return check_credentials_response response = await api_options.recipe_implementation.consume_code( pre_auth_session_id=pre_auth_session_id, device_id=device_id, user_input_code=user_input_code, link_code=link_code, session=session, tenant_id=tenant_id, user_context=user_context, should_try_linking_with_session_user=should_try_linking_with_session_user, ) if isinstance(response, ConsumeCodeRestartFlowError): return ConsumeCodePostRestartFlowError() if isinstance(response, ConsumeCodeIncorrectUserInputCodeError): return ConsumeCodePostIncorrectUserInputCodeError( response.failed_code_input_attempt_count, response.maximum_code_input_attempts, ) if isinstance(response, ConsumeCodeExpiredUserInputCodeError): return ConsumeCodePostExpiredUserInputCodeError( response.failed_code_input_attempt_count, response.maximum_code_input_attempts, ) if not isinstance(response, ConsumeCodeOkResult): reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"] assert isinstance(reason_dict, Dict) reason = reason_dict[response.reason] return SignInUpPostNotAllowedResponse(reason=reason) authenticating_user_input: User if response.user: authenticating_user_input = response.user elif authenticating_user: authenticating_user_input = authenticating_user.user else: raise Exception("Should never come here") recipe_user_id_input: RecipeUserId if response.recipe_user_id: recipe_user_id_input = response.recipe_user_id elif authenticating_user: assert authenticating_user.login_method is not None recipe_user_id_input = authenticating_user.login_method.recipe_user_id else: raise Exception("Should never come here") post_auth_checks_result = await post_auth_checks( factor_id=factor_id, is_sign_up=is_sign_up, authenticated_user=authenticating_user_input, recipe_user_id=recipe_user_id_input, tenant_id=tenant_id, user_context=user_context, session=session, request=api_options.request, ) if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse): reason = error_code_map["SIGN_IN_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpPostNotAllowedResponse(reason) return ConsumeCodePostOkResult( created_new_recipe_user=response.created_new_recipe_user, user=post_auth_checks_result.user, session=post_auth_checks_result.session, ) async def email_exists_get( self, email: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[EmailExistsGetOkResult, GeneralErrorResponse]: users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=AccountInfo(email=email), do_union_of_account_info=False, user_context=user_context, ) user_exists = any( any( lm.recipe_id == "passwordless" and lm.has_same_email_as(email) for lm in u.login_methods ) for u in users ) return EmailExistsGetOkResult(exists=user_exists) async def phone_number_exists_get( self, phone_number: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[PhoneNumberExistsGetOkResult, GeneralErrorResponse]: users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=AccountInfo(phone_number=phone_number), do_union_of_account_info=False, user_context=user_context, ) return PhoneNumberExistsGetOkResult(exists=len(users) > 0)
Ancestors
Methods
async def consume_code_post(self, pre_auth_session_id: str, user_input_code: Optional[str], device_id: Optional[str], link_code: Optional[str], session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[ConsumeCodePostOkResult, ConsumeCodePostRestartFlowError, GeneralErrorResponse, ConsumeCodePostIncorrectUserInputCodeError, ConsumeCodePostExpiredUserInputCodeError, SignInUpPostNotAllowedResponse]
async def create_code_post(self, email: Optional[str], phone_number: Optional[str], session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[CreateCodePostOkResult, SignInUpPostNotAllowedResponse, GeneralErrorResponse]
async def email_exists_get(self, email: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[EmailExistsGetOkResult, GeneralErrorResponse]
async def phone_number_exists_get(self, phone_number: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[PhoneNumberExistsGetOkResult, GeneralErrorResponse]
async def resend_code_post(self, device_id: str, pre_auth_session_id: str, session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[ResendCodePostOkResult, ResendCodePostRestartFlowError, GeneralErrorResponse]
class PasswordlessUserResult (user: User, login_method: Optional[LoginMethod])
-
Expand source code
class PasswordlessUserResult: user: User login_method: Union[LoginMethod, None] def __init__(self, user: User, login_method: Union[LoginMethod, None]): self.user = user self.login_method = login_method
Class variables
var login_method : Optional[LoginMethod]
var user : User