Module supertokens_python.recipe.passwordless.api.implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, Optional, Union
from supertokens_python.asyncio import get_user
from supertokens_python.auth_utils import (
    OkResponse,
    PostAuthChecksOkResponse,
    SignInNotAllowedResponse,
    SignUpNotAllowedResponse,
    check_auth_type_and_linking_status,
    filter_out_invalid_first_factors_or_throw_if_all_are_invalid,
    get_authenticating_user_and_add_to_current_tenant_if_required,
    post_auth_checks,
    pre_auth_checks,
)

from supertokens_python.logger import log_debug_message
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.accountlinking.types import AccountInfoWithRecipeId
from supertokens_python.recipe.multifactorauth.types import FactorIds
from supertokens_python.recipe.passwordless.interfaces import (
    APIInterface,
    APIOptions,
    CheckCodeOkResult,
    ConsumeCodeExpiredUserInputCodeError,
    ConsumeCodeIncorrectUserInputCodeError,
    ConsumeCodeOkResult,
    ConsumeCodePostExpiredUserInputCodeError,
    ConsumeCodePostIncorrectUserInputCodeError,
    ConsumeCodePostOkResult,
    ConsumeCodePostRestartFlowError,
    ConsumeCodeRestartFlowError,
    CreateCodePostOkResult,
    CreateNewCodeForDeviceOkResult,
    CreateNewCodeForDeviceUserInputCodeAlreadyUsedError,
    EmailExistsGetOkResult,
    PasswordlessLoginEmailTemplateVars,
    PhoneNumberExistsGetOkResult,
    ResendCodePostOkResult,
    ResendCodePostRestartFlowError,
    SignInUpPostNotAllowedResponse,
)
from supertokens_python.recipe.passwordless.types import (
    PasswordlessLoginSMSTemplateVars,
)
from supertokens_python.recipe.passwordless.utils import (
    ContactEmailOnlyConfig,
    ContactEmailOrPhoneConfig,
    ContactPhoneOnlyConfig,
    get_enabled_pwless_factors,
)
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.session.exceptions import UnauthorisedError
from supertokens_python.types import (
    AccountInfo,
    User,
    GeneralErrorResponse,
    LoginMethod,
    RecipeUserId,
)
from ...emailverification import EmailVerificationRecipe
from ...emailverification.interfaces import CreateEmailVerificationTokenOkResult


class PasswordlessUserResult:
    user: User
    login_method: Union[LoginMethod, None]

    def __init__(self, user: User, login_method: Union[LoginMethod, None]):
        self.user = user
        self.login_method = login_method


async def get_passwordless_user_by_account_info(
    tenant_id: str,
    user_context: Dict[str, Any],
    account_info: AccountInfo,
) -> Optional[PasswordlessUserResult]:
    existing_users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
        tenant_id=tenant_id,
        account_info=account_info,
        do_union_of_account_info=False,
        user_context=user_context,
    )
    log_debug_message(
        f"get_passwordless_user_by_account_info got {len(existing_users)} from core resp {account_info}"
    )

    users_with_matching_login_methods = [
        PasswordlessUserResult(
            user=user,
            login_method=next(
                (
                    lm
                    for lm in user.login_methods
                    if lm.recipe_id == "passwordless"
                    and (
                        lm.has_same_email_as(account_info.email)
                        or lm.has_same_phone_number_as(account_info.phone_number)
                    )
                ),
                None,
            ),
        )
        for user in existing_users
    ]
    users_with_matching_login_methods = [
        user_data
        for user_data in users_with_matching_login_methods
        if user_data.login_method is not None
    ]

    log_debug_message(
        f"get_passwordless_user_by_account_info {len(users_with_matching_login_methods)} has matching login methods"
    )

    if len(users_with_matching_login_methods) > 1:
        raise Exception(
            "This should never happen: multiple users exist matching the accountInfo in passwordless createCode"
        )

    if len(users_with_matching_login_methods) == 0:
        return None

    return users_with_matching_login_methods[0]


class APIImplementation(APIInterface):
    async def create_code_post(
        self,
        email: Union[str, None],
        phone_number: Union[str, None],
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        CreateCodePostOkResult, SignInUpPostNotAllowedResponse, GeneralErrorResponse
    ]:
        error_code_map = {
            "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
            "LINKING_TO_SESSION_USER_FAILED": {
                "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
            },
        }

        account_info = AccountInfo(
            email=email,
            phone_number=phone_number,
        )

        user_with_matching_login_method = await get_passwordless_user_by_account_info(
            tenant_id, user_context, account_info
        )

        factor_ids = []
        if session is not None:
            factor_ids = [
                FactorIds.OTP_EMAIL if email is not None else FactorIds.OTP_PHONE
            ]
        else:
            factor_ids = get_enabled_pwless_factors(api_options.config)
            if email is not None:
                factor_ids = [
                    f
                    for f in factor_ids
                    if f in [FactorIds.OTP_EMAIL, FactorIds.LINK_EMAIL]
                ]
            else:
                factor_ids = [
                    f
                    for f in factor_ids
                    if f in [FactorIds.OTP_PHONE, FactorIds.LINK_PHONE]
                ]

        is_verified_input = True
        if user_with_matching_login_method is not None:
            assert user_with_matching_login_method.login_method is not None
            is_verified_input = user_with_matching_login_method.login_method.verified

        pre_auth_checks_result = await pre_auth_checks(
            authenticating_account_info=AccountInfoWithRecipeId(
                recipe_id="passwordless",
                email=account_info.email,
                phone_number=account_info.phone_number,
            ),
            is_sign_up=user_with_matching_login_method is None,
            authenticating_user=(
                user_with_matching_login_method.user
                if user_with_matching_login_method
                else None
            ),
            is_verified=is_verified_input,
            sign_in_verifies_login_method=True,
            skip_session_user_update_in_core=True,
            tenant_id=tenant_id,
            factor_ids=factor_ids,
            user_context=user_context,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if not isinstance(pre_auth_checks_result, OkResponse):
            if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
                reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
                assert isinstance(reason, str)
                return SignInUpPostNotAllowedResponse(reason)
            if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
                raise Exception("Should never come here")

            reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
            assert isinstance(reason_dict, Dict)
            reason = reason_dict[pre_auth_checks_result.reason]
            return SignInUpPostNotAllowedResponse(reason=reason)

        user_input_code = None
        if api_options.config.get_custom_user_input_code is not None:
            user_input_code = await api_options.config.get_custom_user_input_code(
                tenant_id, user_context
            )

        user_input_code_input = user_input_code
        if api_options.config.get_custom_user_input_code is not None:
            user_input_code_input = await api_options.config.get_custom_user_input_code(
                tenant_id, user_context
            )
        response = await api_options.recipe_implementation.create_code(
            email=account_info.email,
            phone_number=account_info.phone_number,
            user_input_code=user_input_code_input,
            tenant_id=tenant_id,
            user_context=user_context,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        magic_link = None
        user_input_code = None
        flow_type = api_options.config.flow_type

        if all(
            _id.startswith("link") for _id in pre_auth_checks_result.valid_factor_ids
        ):
            flow_type = "MAGIC_LINK"
        elif all(
            _id.startswith("otp") for _id in pre_auth_checks_result.valid_factor_ids
        ):
            flow_type = "USER_INPUT_CODE"
        else:
            flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"

        if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
            magic_link = (
                api_options.app_info.get_origin(
                    api_options.request, user_context
                ).get_as_string_dangerous()
                + api_options.app_info.website_base_path.get_as_string_dangerous()
                + "/verify"
                + "?preAuthSessionId="
                + response.pre_auth_session_id
                + "&tenantId="
                + tenant_id
                + "#"
                + response.link_code
            )
        if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
            user_input_code = response.user_input_code

        if isinstance(api_options.config.contact_config, ContactEmailOnlyConfig) or (
            isinstance(api_options.config.contact_config, ContactEmailOrPhoneConfig)
            and email is not None
        ):
            if email is None:
                raise Exception("Should never come here")

            log_debug_message("Sending passwordless login email to %s", email)
            passwordless_email_delivery_input = PasswordlessLoginEmailTemplateVars(
                email=email,
                user_input_code=user_input_code,
                url_with_link_code=magic_link,
                code_life_time=response.code_life_time,
                pre_auth_session_id=response.pre_auth_session_id,
                tenant_id=tenant_id,
                is_first_factor=pre_auth_checks_result.is_first_factor,
            )
            await api_options.email_delivery.ingredient_interface_impl.send_email(
                passwordless_email_delivery_input, user_context
            )
        elif isinstance(
            api_options.config.contact_config,
            (ContactEmailOrPhoneConfig, ContactPhoneOnlyConfig),
        ):
            if phone_number is None:
                raise Exception("Should never come here")
            log_debug_message("Sending passwordless login SMS to %s", phone_number)
            sms_input = PasswordlessLoginSMSTemplateVars(
                phone_number=phone_number,
                user_input_code=user_input_code,
                url_with_link_code=magic_link,
                code_life_time=response.code_life_time,
                pre_auth_session_id=response.pre_auth_session_id,
                tenant_id=tenant_id,
                is_first_factor=pre_auth_checks_result.is_first_factor,
            )
            await api_options.sms_delivery.ingredient_interface_impl.send_sms(
                sms_input, user_context
            )

        return CreateCodePostOkResult(
            response.device_id, response.pre_auth_session_id, flow_type
        )

    async def resend_code_post(
        self,
        device_id: str,
        pre_auth_session_id: str,
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        ResendCodePostOkResult, ResendCodePostRestartFlowError, GeneralErrorResponse
    ]:
        device_info = await api_options.recipe_implementation.list_codes_by_device_id(
            device_id=device_id, tenant_id=tenant_id, user_context=user_context
        )

        if device_info is None:
            return ResendCodePostRestartFlowError()

        if (
            api_options.config.contact_config.contact_method == "PHONE"
            and device_info.phone_number is None
        ) or (
            api_options.config.contact_config.contact_method == "EMAIL"
            and device_info.email is None
        ):
            return ResendCodePostRestartFlowError()

        user_with_matching_login_method = await get_passwordless_user_by_account_info(
            tenant_id=tenant_id,
            user_context=user_context,
            account_info=AccountInfo(
                email=device_info.email,
                phone_number=device_info.phone_number,
            ),
        )

        auth_type_info = await check_auth_type_and_linking_status(
            session=session,
            account_info=AccountInfoWithRecipeId(
                recipe_id="passwordless",
                email=device_info.email,
                phone_number=device_info.phone_number,
            ),
            input_user=(
                user_with_matching_login_method.user
                if user_with_matching_login_method
                else None
            ),
            skip_session_user_update_in_core=True,
            user_context=user_context,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if auth_type_info.status == "LINKING_TO_SESSION_USER_FAILED":
            return ResendCodePostRestartFlowError()

        number_of_tries_to_create_new_code = 0
        while True:
            number_of_tries_to_create_new_code += 1
            user_input_code = None
            if api_options.config.get_custom_user_input_code is not None:
                user_input_code = await api_options.config.get_custom_user_input_code(
                    tenant_id, user_context
                )
            user_input_code_input = user_input_code
            if api_options.config.get_custom_user_input_code is not None:
                user_input_code_input = (
                    await api_options.config.get_custom_user_input_code(
                        tenant_id, user_context
                    )
                )
            response = (
                await api_options.recipe_implementation.create_new_code_for_device(
                    device_id=device_id,
                    user_input_code=user_input_code_input,
                    tenant_id=tenant_id,
                    user_context=user_context,
                )
            )

            if isinstance(
                response, CreateNewCodeForDeviceUserInputCodeAlreadyUsedError
            ):
                if number_of_tries_to_create_new_code >= 3:
                    return GeneralErrorResponse(
                        "Failed to generate a one time code. Please try again"
                    )
                continue

            if isinstance(response, CreateNewCodeForDeviceOkResult):
                magic_link = None
                user_input_code = None

                factor_ids = []
                if session is not None:
                    factor_ids = [
                        (
                            FactorIds.OTP_EMAIL
                            if device_info.email is not None
                            else FactorIds.OTP_PHONE
                        )
                    ]
                else:
                    factor_ids = get_enabled_pwless_factors(api_options.config)
                    factor_ids = await filter_out_invalid_first_factors_or_throw_if_all_are_invalid(
                        factor_ids, tenant_id, False, user_context
                    )

                flow_type = api_options.config.flow_type
                if all(id.startswith("link") for id in factor_ids):
                    flow_type = "MAGIC_LINK"
                elif all(id.startswith("otp") for id in factor_ids):
                    flow_type = "USER_INPUT_CODE"
                else:
                    flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"

                if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
                    magic_link = (
                        api_options.app_info.get_origin(
                            api_options.request, user_context
                        ).get_as_string_dangerous()
                        + api_options.app_info.website_base_path.get_as_string_dangerous()
                        + "/verify"
                        + "?preAuthSessionId="
                        + response.pre_auth_session_id
                        + "&tenantId="
                        + tenant_id
                        + "#"
                        + response.link_code
                    )
                if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
                    user_input_code = response.user_input_code

                if api_options.config.contact_config.contact_method == "PHONE" or (
                    api_options.config.contact_config.contact_method == "EMAIL_OR_PHONE"
                    and device_info.phone_number is not None
                ):
                    log_debug_message(
                        "Sending passwordless login SMS to %s", device_info.phone_number
                    )
                    assert device_info.phone_number is not None
                    sms_input = PasswordlessLoginSMSTemplateVars(
                        phone_number=device_info.phone_number,
                        user_input_code=user_input_code,
                        url_with_link_code=magic_link,
                        code_life_time=response.code_life_time,
                        pre_auth_session_id=response.pre_auth_session_id,
                        tenant_id=tenant_id,
                        is_first_factor=auth_type_info.is_first_factor,
                    )
                    await api_options.sms_delivery.ingredient_interface_impl.send_sms(
                        sms_input, user_context
                    )
                else:
                    log_debug_message(
                        "Sending passwordless login email to %s", device_info.email
                    )
                    assert device_info.email is not None
                    passwordless_email_delivery_input = (
                        PasswordlessLoginEmailTemplateVars(
                            email=device_info.email,
                            user_input_code=user_input_code,
                            url_with_link_code=magic_link,
                            code_life_time=response.code_life_time,
                            pre_auth_session_id=response.pre_auth_session_id,
                            tenant_id=tenant_id,
                            is_first_factor=auth_type_info.is_first_factor,
                        )
                    )
                    await api_options.email_delivery.ingredient_interface_impl.send_email(
                        passwordless_email_delivery_input, user_context
                    )

                return ResendCodePostOkResult()

            return ResendCodePostRestartFlowError()

    async def consume_code_post(
        self,
        pre_auth_session_id: str,
        user_input_code: Union[str, None],
        device_id: Union[str, None],
        link_code: Union[str, None],
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        ConsumeCodePostOkResult,
        ConsumeCodePostRestartFlowError,
        GeneralErrorResponse,
        ConsumeCodePostIncorrectUserInputCodeError,
        ConsumeCodePostExpiredUserInputCodeError,
        SignInUpPostNotAllowedResponse,
    ]:
        error_code_map = {
            "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
            "SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_003)",
            "LINKING_TO_SESSION_USER_FAILED": {
                "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_017)",
                "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_018)",
                "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
            },
        }

        device_info = (
            await api_options.recipe_implementation.list_codes_by_pre_auth_session_id(
                tenant_id=tenant_id,
                pre_auth_session_id=pre_auth_session_id,
                user_context=user_context,
            )
        )

        if not device_info:
            return ConsumeCodePostRestartFlowError()

        recipe_id = "passwordless"
        account_info = AccountInfo(
            phone_number=device_info.phone_number, email=device_info.email
        )

        async def check_credentials(_: str):
            nonlocal check_credentials_response
            if check_credentials_response is None:
                check_credentials_response = (
                    await api_options.recipe_implementation.check_code(
                        pre_auth_session_id=pre_auth_session_id,
                        device_id=device_id,
                        user_input_code=user_input_code,
                        link_code=link_code,
                        tenant_id=tenant_id,
                        user_context=user_context,
                    )
                )
            return isinstance(check_credentials_response, CheckCodeOkResult)

        check_credentials_response = None
        authenticating_user = (
            await get_authenticating_user_and_add_to_current_tenant_if_required(
                email=account_info.email,
                phone_number=account_info.phone_number,
                third_party=None,
                recipe_id=recipe_id,
                user_context=user_context,
                session=session,
                tenant_id=tenant_id,
                check_credentials_on_tenant=check_credentials,
            )
        )

        ev_instance = EmailVerificationRecipe.get_instance_optional()
        if account_info.email and session and ev_instance:
            session_user = await get_user(session.get_user_id(), user_context)
            if session_user is None:
                raise UnauthorisedError(
                    "Session user not found",
                )

            login_method = next(
                (
                    lm
                    for lm in session_user.login_methods
                    if lm.recipe_user_id.get_as_string()
                    == session.get_recipe_user_id().get_as_string()
                ),
                None,
            )
            if login_method is None:
                raise UnauthorisedError(
                    "Session user and session recipeUserId is inconsistent",
                )

            if (
                login_method.has_same_email_as(account_info.email)
                and not login_method.verified
            ):
                if await check_credentials(tenant_id):
                    token_response = await ev_instance.recipe_implementation.create_email_verification_token(
                        tenant_id=tenant_id,
                        recipe_user_id=login_method.recipe_user_id,
                        email=account_info.email,
                        user_context=user_context,
                    )
                    if isinstance(token_response, CreateEmailVerificationTokenOkResult):
                        await ev_instance.recipe_implementation.verify_email_using_token(
                            tenant_id=tenant_id,
                            token=token_response.token,
                            attempt_account_linking=False,
                            user_context=user_context,
                        )

        factor_id = (
            FactorIds.OTP_EMAIL
            if device_info.email and user_input_code
            else (
                FactorIds.LINK_EMAIL
                if device_info.email
                else (FactorIds.OTP_PHONE if user_input_code else FactorIds.LINK_PHONE)
            )
        )

        is_sign_up = authenticating_user is None
        pre_auth_checks_result = await pre_auth_checks(
            authenticating_account_info=AccountInfoWithRecipeId(
                recipe_id="passwordless",
                email=device_info.email,
                phone_number=device_info.phone_number,
            ),
            factor_ids=[factor_id],
            authenticating_user=(
                authenticating_user.user if authenticating_user else None
            ),
            is_sign_up=is_sign_up,
            is_verified=(
                authenticating_user.login_method.verified
                if authenticating_user and authenticating_user.login_method
                else True
            ),
            sign_in_verifies_login_method=True,
            skip_session_user_update_in_core=False,
            tenant_id=tenant_id,
            user_context=user_context,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if not isinstance(pre_auth_checks_result, OkResponse):
            if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
                reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
                assert isinstance(reason, str)
                return SignInUpPostNotAllowedResponse(reason)
            if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
                reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
                assert isinstance(reason, str)
                return SignInUpPostNotAllowedResponse(reason)

            reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
            assert isinstance(reason_dict, Dict)
            reason = reason_dict[pre_auth_checks_result.reason]
            return SignInUpPostNotAllowedResponse(reason=reason)

        if check_credentials_response is not None:
            if not isinstance(check_credentials_response, CheckCodeOkResult):
                return check_credentials_response

        response = await api_options.recipe_implementation.consume_code(
            pre_auth_session_id=pre_auth_session_id,
            device_id=device_id,
            user_input_code=user_input_code,
            link_code=link_code,
            session=session,
            tenant_id=tenant_id,
            user_context=user_context,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if isinstance(response, ConsumeCodeRestartFlowError):
            return ConsumeCodePostRestartFlowError()
        if isinstance(response, ConsumeCodeIncorrectUserInputCodeError):
            return ConsumeCodePostIncorrectUserInputCodeError(
                response.failed_code_input_attempt_count,
                response.maximum_code_input_attempts,
            )
        if isinstance(response, ConsumeCodeExpiredUserInputCodeError):
            return ConsumeCodePostExpiredUserInputCodeError(
                response.failed_code_input_attempt_count,
                response.maximum_code_input_attempts,
            )
        if not isinstance(response, ConsumeCodeOkResult):
            reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
            assert isinstance(reason_dict, Dict)
            reason = reason_dict[response.reason]
            return SignInUpPostNotAllowedResponse(reason=reason)

        authenticating_user_input: User
        if response.user:
            authenticating_user_input = response.user
        elif authenticating_user:
            authenticating_user_input = authenticating_user.user
        else:
            raise Exception("Should never come here")
        recipe_user_id_input: RecipeUserId
        if response.recipe_user_id:
            recipe_user_id_input = response.recipe_user_id
        elif authenticating_user:
            assert authenticating_user.login_method is not None
            recipe_user_id_input = authenticating_user.login_method.recipe_user_id
        else:
            raise Exception("Should never come here")

        post_auth_checks_result = await post_auth_checks(
            factor_id=factor_id,
            is_sign_up=is_sign_up,
            authenticated_user=authenticating_user_input,
            recipe_user_id=recipe_user_id_input,
            tenant_id=tenant_id,
            user_context=user_context,
            session=session,
            request=api_options.request,
        )

        if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse):
            reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
            assert isinstance(reason, str)
            return SignInUpPostNotAllowedResponse(reason)

        return ConsumeCodePostOkResult(
            created_new_recipe_user=response.created_new_recipe_user,
            user=post_auth_checks_result.user,
            session=post_auth_checks_result.session,
        )

    async def email_exists_get(
        self,
        email: str,
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[EmailExistsGetOkResult, GeneralErrorResponse]:
        users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
            tenant_id=tenant_id,
            account_info=AccountInfo(email=email),
            do_union_of_account_info=False,
            user_context=user_context,
        )
        user_exists = any(
            any(
                lm.recipe_id == "passwordless" and lm.has_same_email_as(email)
                for lm in u.login_methods
            )
            for u in users
        )

        return EmailExistsGetOkResult(exists=user_exists)

    async def phone_number_exists_get(
        self,
        phone_number: str,
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[PhoneNumberExistsGetOkResult, GeneralErrorResponse]:
        users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
            tenant_id=tenant_id,
            account_info=AccountInfo(phone_number=phone_number),
            do_union_of_account_info=False,
            user_context=user_context,
        )
        return PhoneNumberExistsGetOkResult(exists=len(users) > 0)

Functions

async def get_passwordless_user_by_account_info(tenant_id: str, user_context: Dict[str, Any], account_info: AccountInfo) ‑> Optional[PasswordlessUserResult]
Expand source code
async def get_passwordless_user_by_account_info(
    tenant_id: str,
    user_context: Dict[str, Any],
    account_info: AccountInfo,
) -> Optional[PasswordlessUserResult]:
    existing_users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
        tenant_id=tenant_id,
        account_info=account_info,
        do_union_of_account_info=False,
        user_context=user_context,
    )
    log_debug_message(
        f"get_passwordless_user_by_account_info got {len(existing_users)} from core resp {account_info}"
    )

    users_with_matching_login_methods = [
        PasswordlessUserResult(
            user=user,
            login_method=next(
                (
                    lm
                    for lm in user.login_methods
                    if lm.recipe_id == "passwordless"
                    and (
                        lm.has_same_email_as(account_info.email)
                        or lm.has_same_phone_number_as(account_info.phone_number)
                    )
                ),
                None,
            ),
        )
        for user in existing_users
    ]
    users_with_matching_login_methods = [
        user_data
        for user_data in users_with_matching_login_methods
        if user_data.login_method is not None
    ]

    log_debug_message(
        f"get_passwordless_user_by_account_info {len(users_with_matching_login_methods)} has matching login methods"
    )

    if len(users_with_matching_login_methods) > 1:
        raise Exception(
            "This should never happen: multiple users exist matching the accountInfo in passwordless createCode"
        )

    if len(users_with_matching_login_methods) == 0:
        return None

    return users_with_matching_login_methods[0]

Classes

class APIImplementation
Expand source code
class APIImplementation(APIInterface):
    async def create_code_post(
        self,
        email: Union[str, None],
        phone_number: Union[str, None],
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        CreateCodePostOkResult, SignInUpPostNotAllowedResponse, GeneralErrorResponse
    ]:
        error_code_map = {
            "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
            "LINKING_TO_SESSION_USER_FAILED": {
                "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
            },
        }

        account_info = AccountInfo(
            email=email,
            phone_number=phone_number,
        )

        user_with_matching_login_method = await get_passwordless_user_by_account_info(
            tenant_id, user_context, account_info
        )

        factor_ids = []
        if session is not None:
            factor_ids = [
                FactorIds.OTP_EMAIL if email is not None else FactorIds.OTP_PHONE
            ]
        else:
            factor_ids = get_enabled_pwless_factors(api_options.config)
            if email is not None:
                factor_ids = [
                    f
                    for f in factor_ids
                    if f in [FactorIds.OTP_EMAIL, FactorIds.LINK_EMAIL]
                ]
            else:
                factor_ids = [
                    f
                    for f in factor_ids
                    if f in [FactorIds.OTP_PHONE, FactorIds.LINK_PHONE]
                ]

        is_verified_input = True
        if user_with_matching_login_method is not None:
            assert user_with_matching_login_method.login_method is not None
            is_verified_input = user_with_matching_login_method.login_method.verified

        pre_auth_checks_result = await pre_auth_checks(
            authenticating_account_info=AccountInfoWithRecipeId(
                recipe_id="passwordless",
                email=account_info.email,
                phone_number=account_info.phone_number,
            ),
            is_sign_up=user_with_matching_login_method is None,
            authenticating_user=(
                user_with_matching_login_method.user
                if user_with_matching_login_method
                else None
            ),
            is_verified=is_verified_input,
            sign_in_verifies_login_method=True,
            skip_session_user_update_in_core=True,
            tenant_id=tenant_id,
            factor_ids=factor_ids,
            user_context=user_context,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if not isinstance(pre_auth_checks_result, OkResponse):
            if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
                reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
                assert isinstance(reason, str)
                return SignInUpPostNotAllowedResponse(reason)
            if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
                raise Exception("Should never come here")

            reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
            assert isinstance(reason_dict, Dict)
            reason = reason_dict[pre_auth_checks_result.reason]
            return SignInUpPostNotAllowedResponse(reason=reason)

        user_input_code = None
        if api_options.config.get_custom_user_input_code is not None:
            user_input_code = await api_options.config.get_custom_user_input_code(
                tenant_id, user_context
            )

        user_input_code_input = user_input_code
        if api_options.config.get_custom_user_input_code is not None:
            user_input_code_input = await api_options.config.get_custom_user_input_code(
                tenant_id, user_context
            )
        response = await api_options.recipe_implementation.create_code(
            email=account_info.email,
            phone_number=account_info.phone_number,
            user_input_code=user_input_code_input,
            tenant_id=tenant_id,
            user_context=user_context,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        magic_link = None
        user_input_code = None
        flow_type = api_options.config.flow_type

        if all(
            _id.startswith("link") for _id in pre_auth_checks_result.valid_factor_ids
        ):
            flow_type = "MAGIC_LINK"
        elif all(
            _id.startswith("otp") for _id in pre_auth_checks_result.valid_factor_ids
        ):
            flow_type = "USER_INPUT_CODE"
        else:
            flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"

        if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
            magic_link = (
                api_options.app_info.get_origin(
                    api_options.request, user_context
                ).get_as_string_dangerous()
                + api_options.app_info.website_base_path.get_as_string_dangerous()
                + "/verify"
                + "?preAuthSessionId="
                + response.pre_auth_session_id
                + "&tenantId="
                + tenant_id
                + "#"
                + response.link_code
            )
        if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
            user_input_code = response.user_input_code

        if isinstance(api_options.config.contact_config, ContactEmailOnlyConfig) or (
            isinstance(api_options.config.contact_config, ContactEmailOrPhoneConfig)
            and email is not None
        ):
            if email is None:
                raise Exception("Should never come here")

            log_debug_message("Sending passwordless login email to %s", email)
            passwordless_email_delivery_input = PasswordlessLoginEmailTemplateVars(
                email=email,
                user_input_code=user_input_code,
                url_with_link_code=magic_link,
                code_life_time=response.code_life_time,
                pre_auth_session_id=response.pre_auth_session_id,
                tenant_id=tenant_id,
                is_first_factor=pre_auth_checks_result.is_first_factor,
            )
            await api_options.email_delivery.ingredient_interface_impl.send_email(
                passwordless_email_delivery_input, user_context
            )
        elif isinstance(
            api_options.config.contact_config,
            (ContactEmailOrPhoneConfig, ContactPhoneOnlyConfig),
        ):
            if phone_number is None:
                raise Exception("Should never come here")
            log_debug_message("Sending passwordless login SMS to %s", phone_number)
            sms_input = PasswordlessLoginSMSTemplateVars(
                phone_number=phone_number,
                user_input_code=user_input_code,
                url_with_link_code=magic_link,
                code_life_time=response.code_life_time,
                pre_auth_session_id=response.pre_auth_session_id,
                tenant_id=tenant_id,
                is_first_factor=pre_auth_checks_result.is_first_factor,
            )
            await api_options.sms_delivery.ingredient_interface_impl.send_sms(
                sms_input, user_context
            )

        return CreateCodePostOkResult(
            response.device_id, response.pre_auth_session_id, flow_type
        )

    async def resend_code_post(
        self,
        device_id: str,
        pre_auth_session_id: str,
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        ResendCodePostOkResult, ResendCodePostRestartFlowError, GeneralErrorResponse
    ]:
        device_info = await api_options.recipe_implementation.list_codes_by_device_id(
            device_id=device_id, tenant_id=tenant_id, user_context=user_context
        )

        if device_info is None:
            return ResendCodePostRestartFlowError()

        if (
            api_options.config.contact_config.contact_method == "PHONE"
            and device_info.phone_number is None
        ) or (
            api_options.config.contact_config.contact_method == "EMAIL"
            and device_info.email is None
        ):
            return ResendCodePostRestartFlowError()

        user_with_matching_login_method = await get_passwordless_user_by_account_info(
            tenant_id=tenant_id,
            user_context=user_context,
            account_info=AccountInfo(
                email=device_info.email,
                phone_number=device_info.phone_number,
            ),
        )

        auth_type_info = await check_auth_type_and_linking_status(
            session=session,
            account_info=AccountInfoWithRecipeId(
                recipe_id="passwordless",
                email=device_info.email,
                phone_number=device_info.phone_number,
            ),
            input_user=(
                user_with_matching_login_method.user
                if user_with_matching_login_method
                else None
            ),
            skip_session_user_update_in_core=True,
            user_context=user_context,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if auth_type_info.status == "LINKING_TO_SESSION_USER_FAILED":
            return ResendCodePostRestartFlowError()

        number_of_tries_to_create_new_code = 0
        while True:
            number_of_tries_to_create_new_code += 1
            user_input_code = None
            if api_options.config.get_custom_user_input_code is not None:
                user_input_code = await api_options.config.get_custom_user_input_code(
                    tenant_id, user_context
                )
            user_input_code_input = user_input_code
            if api_options.config.get_custom_user_input_code is not None:
                user_input_code_input = (
                    await api_options.config.get_custom_user_input_code(
                        tenant_id, user_context
                    )
                )
            response = (
                await api_options.recipe_implementation.create_new_code_for_device(
                    device_id=device_id,
                    user_input_code=user_input_code_input,
                    tenant_id=tenant_id,
                    user_context=user_context,
                )
            )

            if isinstance(
                response, CreateNewCodeForDeviceUserInputCodeAlreadyUsedError
            ):
                if number_of_tries_to_create_new_code >= 3:
                    return GeneralErrorResponse(
                        "Failed to generate a one time code. Please try again"
                    )
                continue

            if isinstance(response, CreateNewCodeForDeviceOkResult):
                magic_link = None
                user_input_code = None

                factor_ids = []
                if session is not None:
                    factor_ids = [
                        (
                            FactorIds.OTP_EMAIL
                            if device_info.email is not None
                            else FactorIds.OTP_PHONE
                        )
                    ]
                else:
                    factor_ids = get_enabled_pwless_factors(api_options.config)
                    factor_ids = await filter_out_invalid_first_factors_or_throw_if_all_are_invalid(
                        factor_ids, tenant_id, False, user_context
                    )

                flow_type = api_options.config.flow_type
                if all(id.startswith("link") for id in factor_ids):
                    flow_type = "MAGIC_LINK"
                elif all(id.startswith("otp") for id in factor_ids):
                    flow_type = "USER_INPUT_CODE"
                else:
                    flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"

                if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
                    magic_link = (
                        api_options.app_info.get_origin(
                            api_options.request, user_context
                        ).get_as_string_dangerous()
                        + api_options.app_info.website_base_path.get_as_string_dangerous()
                        + "/verify"
                        + "?preAuthSessionId="
                        + response.pre_auth_session_id
                        + "&tenantId="
                        + tenant_id
                        + "#"
                        + response.link_code
                    )
                if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
                    user_input_code = response.user_input_code

                if api_options.config.contact_config.contact_method == "PHONE" or (
                    api_options.config.contact_config.contact_method == "EMAIL_OR_PHONE"
                    and device_info.phone_number is not None
                ):
                    log_debug_message(
                        "Sending passwordless login SMS to %s", device_info.phone_number
                    )
                    assert device_info.phone_number is not None
                    sms_input = PasswordlessLoginSMSTemplateVars(
                        phone_number=device_info.phone_number,
                        user_input_code=user_input_code,
                        url_with_link_code=magic_link,
                        code_life_time=response.code_life_time,
                        pre_auth_session_id=response.pre_auth_session_id,
                        tenant_id=tenant_id,
                        is_first_factor=auth_type_info.is_first_factor,
                    )
                    await api_options.sms_delivery.ingredient_interface_impl.send_sms(
                        sms_input, user_context
                    )
                else:
                    log_debug_message(
                        "Sending passwordless login email to %s", device_info.email
                    )
                    assert device_info.email is not None
                    passwordless_email_delivery_input = (
                        PasswordlessLoginEmailTemplateVars(
                            email=device_info.email,
                            user_input_code=user_input_code,
                            url_with_link_code=magic_link,
                            code_life_time=response.code_life_time,
                            pre_auth_session_id=response.pre_auth_session_id,
                            tenant_id=tenant_id,
                            is_first_factor=auth_type_info.is_first_factor,
                        )
                    )
                    await api_options.email_delivery.ingredient_interface_impl.send_email(
                        passwordless_email_delivery_input, user_context
                    )

                return ResendCodePostOkResult()

            return ResendCodePostRestartFlowError()

    async def consume_code_post(
        self,
        pre_auth_session_id: str,
        user_input_code: Union[str, None],
        device_id: Union[str, None],
        link_code: Union[str, None],
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        ConsumeCodePostOkResult,
        ConsumeCodePostRestartFlowError,
        GeneralErrorResponse,
        ConsumeCodePostIncorrectUserInputCodeError,
        ConsumeCodePostExpiredUserInputCodeError,
        SignInUpPostNotAllowedResponse,
    ]:
        error_code_map = {
            "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
            "SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_003)",
            "LINKING_TO_SESSION_USER_FAILED": {
                "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_017)",
                "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_018)",
                "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
            },
        }

        device_info = (
            await api_options.recipe_implementation.list_codes_by_pre_auth_session_id(
                tenant_id=tenant_id,
                pre_auth_session_id=pre_auth_session_id,
                user_context=user_context,
            )
        )

        if not device_info:
            return ConsumeCodePostRestartFlowError()

        recipe_id = "passwordless"
        account_info = AccountInfo(
            phone_number=device_info.phone_number, email=device_info.email
        )

        async def check_credentials(_: str):
            nonlocal check_credentials_response
            if check_credentials_response is None:
                check_credentials_response = (
                    await api_options.recipe_implementation.check_code(
                        pre_auth_session_id=pre_auth_session_id,
                        device_id=device_id,
                        user_input_code=user_input_code,
                        link_code=link_code,
                        tenant_id=tenant_id,
                        user_context=user_context,
                    )
                )
            return isinstance(check_credentials_response, CheckCodeOkResult)

        check_credentials_response = None
        authenticating_user = (
            await get_authenticating_user_and_add_to_current_tenant_if_required(
                email=account_info.email,
                phone_number=account_info.phone_number,
                third_party=None,
                recipe_id=recipe_id,
                user_context=user_context,
                session=session,
                tenant_id=tenant_id,
                check_credentials_on_tenant=check_credentials,
            )
        )

        ev_instance = EmailVerificationRecipe.get_instance_optional()
        if account_info.email and session and ev_instance:
            session_user = await get_user(session.get_user_id(), user_context)
            if session_user is None:
                raise UnauthorisedError(
                    "Session user not found",
                )

            login_method = next(
                (
                    lm
                    for lm in session_user.login_methods
                    if lm.recipe_user_id.get_as_string()
                    == session.get_recipe_user_id().get_as_string()
                ),
                None,
            )
            if login_method is None:
                raise UnauthorisedError(
                    "Session user and session recipeUserId is inconsistent",
                )

            if (
                login_method.has_same_email_as(account_info.email)
                and not login_method.verified
            ):
                if await check_credentials(tenant_id):
                    token_response = await ev_instance.recipe_implementation.create_email_verification_token(
                        tenant_id=tenant_id,
                        recipe_user_id=login_method.recipe_user_id,
                        email=account_info.email,
                        user_context=user_context,
                    )
                    if isinstance(token_response, CreateEmailVerificationTokenOkResult):
                        await ev_instance.recipe_implementation.verify_email_using_token(
                            tenant_id=tenant_id,
                            token=token_response.token,
                            attempt_account_linking=False,
                            user_context=user_context,
                        )

        factor_id = (
            FactorIds.OTP_EMAIL
            if device_info.email and user_input_code
            else (
                FactorIds.LINK_EMAIL
                if device_info.email
                else (FactorIds.OTP_PHONE if user_input_code else FactorIds.LINK_PHONE)
            )
        )

        is_sign_up = authenticating_user is None
        pre_auth_checks_result = await pre_auth_checks(
            authenticating_account_info=AccountInfoWithRecipeId(
                recipe_id="passwordless",
                email=device_info.email,
                phone_number=device_info.phone_number,
            ),
            factor_ids=[factor_id],
            authenticating_user=(
                authenticating_user.user if authenticating_user else None
            ),
            is_sign_up=is_sign_up,
            is_verified=(
                authenticating_user.login_method.verified
                if authenticating_user and authenticating_user.login_method
                else True
            ),
            sign_in_verifies_login_method=True,
            skip_session_user_update_in_core=False,
            tenant_id=tenant_id,
            user_context=user_context,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if not isinstance(pre_auth_checks_result, OkResponse):
            if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
                reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
                assert isinstance(reason, str)
                return SignInUpPostNotAllowedResponse(reason)
            if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
                reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
                assert isinstance(reason, str)
                return SignInUpPostNotAllowedResponse(reason)

            reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
            assert isinstance(reason_dict, Dict)
            reason = reason_dict[pre_auth_checks_result.reason]
            return SignInUpPostNotAllowedResponse(reason=reason)

        if check_credentials_response is not None:
            if not isinstance(check_credentials_response, CheckCodeOkResult):
                return check_credentials_response

        response = await api_options.recipe_implementation.consume_code(
            pre_auth_session_id=pre_auth_session_id,
            device_id=device_id,
            user_input_code=user_input_code,
            link_code=link_code,
            session=session,
            tenant_id=tenant_id,
            user_context=user_context,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if isinstance(response, ConsumeCodeRestartFlowError):
            return ConsumeCodePostRestartFlowError()
        if isinstance(response, ConsumeCodeIncorrectUserInputCodeError):
            return ConsumeCodePostIncorrectUserInputCodeError(
                response.failed_code_input_attempt_count,
                response.maximum_code_input_attempts,
            )
        if isinstance(response, ConsumeCodeExpiredUserInputCodeError):
            return ConsumeCodePostExpiredUserInputCodeError(
                response.failed_code_input_attempt_count,
                response.maximum_code_input_attempts,
            )
        if not isinstance(response, ConsumeCodeOkResult):
            reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
            assert isinstance(reason_dict, Dict)
            reason = reason_dict[response.reason]
            return SignInUpPostNotAllowedResponse(reason=reason)

        authenticating_user_input: User
        if response.user:
            authenticating_user_input = response.user
        elif authenticating_user:
            authenticating_user_input = authenticating_user.user
        else:
            raise Exception("Should never come here")
        recipe_user_id_input: RecipeUserId
        if response.recipe_user_id:
            recipe_user_id_input = response.recipe_user_id
        elif authenticating_user:
            assert authenticating_user.login_method is not None
            recipe_user_id_input = authenticating_user.login_method.recipe_user_id
        else:
            raise Exception("Should never come here")

        post_auth_checks_result = await post_auth_checks(
            factor_id=factor_id,
            is_sign_up=is_sign_up,
            authenticated_user=authenticating_user_input,
            recipe_user_id=recipe_user_id_input,
            tenant_id=tenant_id,
            user_context=user_context,
            session=session,
            request=api_options.request,
        )

        if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse):
            reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
            assert isinstance(reason, str)
            return SignInUpPostNotAllowedResponse(reason)

        return ConsumeCodePostOkResult(
            created_new_recipe_user=response.created_new_recipe_user,
            user=post_auth_checks_result.user,
            session=post_auth_checks_result.session,
        )

    async def email_exists_get(
        self,
        email: str,
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[EmailExistsGetOkResult, GeneralErrorResponse]:
        users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
            tenant_id=tenant_id,
            account_info=AccountInfo(email=email),
            do_union_of_account_info=False,
            user_context=user_context,
        )
        user_exists = any(
            any(
                lm.recipe_id == "passwordless" and lm.has_same_email_as(email)
                for lm in u.login_methods
            )
            for u in users
        )

        return EmailExistsGetOkResult(exists=user_exists)

    async def phone_number_exists_get(
        self,
        phone_number: str,
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[PhoneNumberExistsGetOkResult, GeneralErrorResponse]:
        users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
            tenant_id=tenant_id,
            account_info=AccountInfo(phone_number=phone_number),
            do_union_of_account_info=False,
            user_context=user_context,
        )
        return PhoneNumberExistsGetOkResult(exists=len(users) > 0)

Ancestors

Methods

async def consume_code_post(self, pre_auth_session_id: str, user_input_code: Optional[str], device_id: Optional[str], link_code: Optional[str], session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[ConsumeCodePostOkResultConsumeCodePostRestartFlowErrorGeneralErrorResponseConsumeCodePostIncorrectUserInputCodeErrorConsumeCodePostExpiredUserInputCodeErrorSignInUpPostNotAllowedResponse]
Expand source code
async def consume_code_post(
    self,
    pre_auth_session_id: str,
    user_input_code: Union[str, None],
    device_id: Union[str, None],
    link_code: Union[str, None],
    session: Optional[SessionContainer],
    should_try_linking_with_session_user: Union[bool, None],
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[
    ConsumeCodePostOkResult,
    ConsumeCodePostRestartFlowError,
    GeneralErrorResponse,
    ConsumeCodePostIncorrectUserInputCodeError,
    ConsumeCodePostExpiredUserInputCodeError,
    SignInUpPostNotAllowedResponse,
]:
    error_code_map = {
        "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
        "SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_003)",
        "LINKING_TO_SESSION_USER_FAILED": {
            "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_017)",
            "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_018)",
            "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
        },
    }

    device_info = (
        await api_options.recipe_implementation.list_codes_by_pre_auth_session_id(
            tenant_id=tenant_id,
            pre_auth_session_id=pre_auth_session_id,
            user_context=user_context,
        )
    )

    if not device_info:
        return ConsumeCodePostRestartFlowError()

    recipe_id = "passwordless"
    account_info = AccountInfo(
        phone_number=device_info.phone_number, email=device_info.email
    )

    async def check_credentials(_: str):
        nonlocal check_credentials_response
        if check_credentials_response is None:
            check_credentials_response = (
                await api_options.recipe_implementation.check_code(
                    pre_auth_session_id=pre_auth_session_id,
                    device_id=device_id,
                    user_input_code=user_input_code,
                    link_code=link_code,
                    tenant_id=tenant_id,
                    user_context=user_context,
                )
            )
        return isinstance(check_credentials_response, CheckCodeOkResult)

    check_credentials_response = None
    authenticating_user = (
        await get_authenticating_user_and_add_to_current_tenant_if_required(
            email=account_info.email,
            phone_number=account_info.phone_number,
            third_party=None,
            recipe_id=recipe_id,
            user_context=user_context,
            session=session,
            tenant_id=tenant_id,
            check_credentials_on_tenant=check_credentials,
        )
    )

    ev_instance = EmailVerificationRecipe.get_instance_optional()
    if account_info.email and session and ev_instance:
        session_user = await get_user(session.get_user_id(), user_context)
        if session_user is None:
            raise UnauthorisedError(
                "Session user not found",
            )

        login_method = next(
            (
                lm
                for lm in session_user.login_methods
                if lm.recipe_user_id.get_as_string()
                == session.get_recipe_user_id().get_as_string()
            ),
            None,
        )
        if login_method is None:
            raise UnauthorisedError(
                "Session user and session recipeUserId is inconsistent",
            )

        if (
            login_method.has_same_email_as(account_info.email)
            and not login_method.verified
        ):
            if await check_credentials(tenant_id):
                token_response = await ev_instance.recipe_implementation.create_email_verification_token(
                    tenant_id=tenant_id,
                    recipe_user_id=login_method.recipe_user_id,
                    email=account_info.email,
                    user_context=user_context,
                )
                if isinstance(token_response, CreateEmailVerificationTokenOkResult):
                    await ev_instance.recipe_implementation.verify_email_using_token(
                        tenant_id=tenant_id,
                        token=token_response.token,
                        attempt_account_linking=False,
                        user_context=user_context,
                    )

    factor_id = (
        FactorIds.OTP_EMAIL
        if device_info.email and user_input_code
        else (
            FactorIds.LINK_EMAIL
            if device_info.email
            else (FactorIds.OTP_PHONE if user_input_code else FactorIds.LINK_PHONE)
        )
    )

    is_sign_up = authenticating_user is None
    pre_auth_checks_result = await pre_auth_checks(
        authenticating_account_info=AccountInfoWithRecipeId(
            recipe_id="passwordless",
            email=device_info.email,
            phone_number=device_info.phone_number,
        ),
        factor_ids=[factor_id],
        authenticating_user=(
            authenticating_user.user if authenticating_user else None
        ),
        is_sign_up=is_sign_up,
        is_verified=(
            authenticating_user.login_method.verified
            if authenticating_user and authenticating_user.login_method
            else True
        ),
        sign_in_verifies_login_method=True,
        skip_session_user_update_in_core=False,
        tenant_id=tenant_id,
        user_context=user_context,
        session=session,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    if not isinstance(pre_auth_checks_result, OkResponse):
        if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
            reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
            assert isinstance(reason, str)
            return SignInUpPostNotAllowedResponse(reason)
        if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
            reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
            assert isinstance(reason, str)
            return SignInUpPostNotAllowedResponse(reason)

        reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
        assert isinstance(reason_dict, Dict)
        reason = reason_dict[pre_auth_checks_result.reason]
        return SignInUpPostNotAllowedResponse(reason=reason)

    if check_credentials_response is not None:
        if not isinstance(check_credentials_response, CheckCodeOkResult):
            return check_credentials_response

    response = await api_options.recipe_implementation.consume_code(
        pre_auth_session_id=pre_auth_session_id,
        device_id=device_id,
        user_input_code=user_input_code,
        link_code=link_code,
        session=session,
        tenant_id=tenant_id,
        user_context=user_context,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    if isinstance(response, ConsumeCodeRestartFlowError):
        return ConsumeCodePostRestartFlowError()
    if isinstance(response, ConsumeCodeIncorrectUserInputCodeError):
        return ConsumeCodePostIncorrectUserInputCodeError(
            response.failed_code_input_attempt_count,
            response.maximum_code_input_attempts,
        )
    if isinstance(response, ConsumeCodeExpiredUserInputCodeError):
        return ConsumeCodePostExpiredUserInputCodeError(
            response.failed_code_input_attempt_count,
            response.maximum_code_input_attempts,
        )
    if not isinstance(response, ConsumeCodeOkResult):
        reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
        assert isinstance(reason_dict, Dict)
        reason = reason_dict[response.reason]
        return SignInUpPostNotAllowedResponse(reason=reason)

    authenticating_user_input: User
    if response.user:
        authenticating_user_input = response.user
    elif authenticating_user:
        authenticating_user_input = authenticating_user.user
    else:
        raise Exception("Should never come here")
    recipe_user_id_input: RecipeUserId
    if response.recipe_user_id:
        recipe_user_id_input = response.recipe_user_id
    elif authenticating_user:
        assert authenticating_user.login_method is not None
        recipe_user_id_input = authenticating_user.login_method.recipe_user_id
    else:
        raise Exception("Should never come here")

    post_auth_checks_result = await post_auth_checks(
        factor_id=factor_id,
        is_sign_up=is_sign_up,
        authenticated_user=authenticating_user_input,
        recipe_user_id=recipe_user_id_input,
        tenant_id=tenant_id,
        user_context=user_context,
        session=session,
        request=api_options.request,
    )

    if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse):
        reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
        assert isinstance(reason, str)
        return SignInUpPostNotAllowedResponse(reason)

    return ConsumeCodePostOkResult(
        created_new_recipe_user=response.created_new_recipe_user,
        user=post_auth_checks_result.user,
        session=post_auth_checks_result.session,
    )
async def create_code_post(self, email: Optional[str], phone_number: Optional[str], session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[CreateCodePostOkResultSignInUpPostNotAllowedResponseGeneralErrorResponse]
Expand source code
async def create_code_post(
    self,
    email: Union[str, None],
    phone_number: Union[str, None],
    session: Optional[SessionContainer],
    should_try_linking_with_session_user: Union[bool, None],
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[
    CreateCodePostOkResult, SignInUpPostNotAllowedResponse, GeneralErrorResponse
]:
    error_code_map = {
        "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_002)",
        "LINKING_TO_SESSION_USER_FAILED": {
            "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_019)",
        },
    }

    account_info = AccountInfo(
        email=email,
        phone_number=phone_number,
    )

    user_with_matching_login_method = await get_passwordless_user_by_account_info(
        tenant_id, user_context, account_info
    )

    factor_ids = []
    if session is not None:
        factor_ids = [
            FactorIds.OTP_EMAIL if email is not None else FactorIds.OTP_PHONE
        ]
    else:
        factor_ids = get_enabled_pwless_factors(api_options.config)
        if email is not None:
            factor_ids = [
                f
                for f in factor_ids
                if f in [FactorIds.OTP_EMAIL, FactorIds.LINK_EMAIL]
            ]
        else:
            factor_ids = [
                f
                for f in factor_ids
                if f in [FactorIds.OTP_PHONE, FactorIds.LINK_PHONE]
            ]

    is_verified_input = True
    if user_with_matching_login_method is not None:
        assert user_with_matching_login_method.login_method is not None
        is_verified_input = user_with_matching_login_method.login_method.verified

    pre_auth_checks_result = await pre_auth_checks(
        authenticating_account_info=AccountInfoWithRecipeId(
            recipe_id="passwordless",
            email=account_info.email,
            phone_number=account_info.phone_number,
        ),
        is_sign_up=user_with_matching_login_method is None,
        authenticating_user=(
            user_with_matching_login_method.user
            if user_with_matching_login_method
            else None
        ),
        is_verified=is_verified_input,
        sign_in_verifies_login_method=True,
        skip_session_user_update_in_core=True,
        tenant_id=tenant_id,
        factor_ids=factor_ids,
        user_context=user_context,
        session=session,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    if not isinstance(pre_auth_checks_result, OkResponse):
        if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
            reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
            assert isinstance(reason, str)
            return SignInUpPostNotAllowedResponse(reason)
        if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
            raise Exception("Should never come here")

        reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
        assert isinstance(reason_dict, Dict)
        reason = reason_dict[pre_auth_checks_result.reason]
        return SignInUpPostNotAllowedResponse(reason=reason)

    user_input_code = None
    if api_options.config.get_custom_user_input_code is not None:
        user_input_code = await api_options.config.get_custom_user_input_code(
            tenant_id, user_context
        )

    user_input_code_input = user_input_code
    if api_options.config.get_custom_user_input_code is not None:
        user_input_code_input = await api_options.config.get_custom_user_input_code(
            tenant_id, user_context
        )
    response = await api_options.recipe_implementation.create_code(
        email=account_info.email,
        phone_number=account_info.phone_number,
        user_input_code=user_input_code_input,
        tenant_id=tenant_id,
        user_context=user_context,
        session=session,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    magic_link = None
    user_input_code = None
    flow_type = api_options.config.flow_type

    if all(
        _id.startswith("link") for _id in pre_auth_checks_result.valid_factor_ids
    ):
        flow_type = "MAGIC_LINK"
    elif all(
        _id.startswith("otp") for _id in pre_auth_checks_result.valid_factor_ids
    ):
        flow_type = "USER_INPUT_CODE"
    else:
        flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"

    if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
        magic_link = (
            api_options.app_info.get_origin(
                api_options.request, user_context
            ).get_as_string_dangerous()
            + api_options.app_info.website_base_path.get_as_string_dangerous()
            + "/verify"
            + "?preAuthSessionId="
            + response.pre_auth_session_id
            + "&tenantId="
            + tenant_id
            + "#"
            + response.link_code
        )
    if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
        user_input_code = response.user_input_code

    if isinstance(api_options.config.contact_config, ContactEmailOnlyConfig) or (
        isinstance(api_options.config.contact_config, ContactEmailOrPhoneConfig)
        and email is not None
    ):
        if email is None:
            raise Exception("Should never come here")

        log_debug_message("Sending passwordless login email to %s", email)
        passwordless_email_delivery_input = PasswordlessLoginEmailTemplateVars(
            email=email,
            user_input_code=user_input_code,
            url_with_link_code=magic_link,
            code_life_time=response.code_life_time,
            pre_auth_session_id=response.pre_auth_session_id,
            tenant_id=tenant_id,
            is_first_factor=pre_auth_checks_result.is_first_factor,
        )
        await api_options.email_delivery.ingredient_interface_impl.send_email(
            passwordless_email_delivery_input, user_context
        )
    elif isinstance(
        api_options.config.contact_config,
        (ContactEmailOrPhoneConfig, ContactPhoneOnlyConfig),
    ):
        if phone_number is None:
            raise Exception("Should never come here")
        log_debug_message("Sending passwordless login SMS to %s", phone_number)
        sms_input = PasswordlessLoginSMSTemplateVars(
            phone_number=phone_number,
            user_input_code=user_input_code,
            url_with_link_code=magic_link,
            code_life_time=response.code_life_time,
            pre_auth_session_id=response.pre_auth_session_id,
            tenant_id=tenant_id,
            is_first_factor=pre_auth_checks_result.is_first_factor,
        )
        await api_options.sms_delivery.ingredient_interface_impl.send_sms(
            sms_input, user_context
        )

    return CreateCodePostOkResult(
        response.device_id, response.pre_auth_session_id, flow_type
    )
async def email_exists_get(self, email: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[EmailExistsGetOkResultGeneralErrorResponse]
Expand source code
async def email_exists_get(
    self,
    email: str,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[EmailExistsGetOkResult, GeneralErrorResponse]:
    users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
        tenant_id=tenant_id,
        account_info=AccountInfo(email=email),
        do_union_of_account_info=False,
        user_context=user_context,
    )
    user_exists = any(
        any(
            lm.recipe_id == "passwordless" and lm.has_same_email_as(email)
            for lm in u.login_methods
        )
        for u in users
    )

    return EmailExistsGetOkResult(exists=user_exists)
async def phone_number_exists_get(self, phone_number: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[PhoneNumberExistsGetOkResultGeneralErrorResponse]
Expand source code
async def phone_number_exists_get(
    self,
    phone_number: str,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[PhoneNumberExistsGetOkResult, GeneralErrorResponse]:
    users = await AccountLinkingRecipe.get_instance().recipe_implementation.list_users_by_account_info(
        tenant_id=tenant_id,
        account_info=AccountInfo(phone_number=phone_number),
        do_union_of_account_info=False,
        user_context=user_context,
    )
    return PhoneNumberExistsGetOkResult(exists=len(users) > 0)
async def resend_code_post(self, device_id: str, pre_auth_session_id: str, session: Optional[SessionContainer], should_try_linking_with_session_user: Optional[bool], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[ResendCodePostOkResultResendCodePostRestartFlowErrorGeneralErrorResponse]
Expand source code
async def resend_code_post(
    self,
    device_id: str,
    pre_auth_session_id: str,
    session: Optional[SessionContainer],
    should_try_linking_with_session_user: Union[bool, None],
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[
    ResendCodePostOkResult, ResendCodePostRestartFlowError, GeneralErrorResponse
]:
    device_info = await api_options.recipe_implementation.list_codes_by_device_id(
        device_id=device_id, tenant_id=tenant_id, user_context=user_context
    )

    if device_info is None:
        return ResendCodePostRestartFlowError()

    if (
        api_options.config.contact_config.contact_method == "PHONE"
        and device_info.phone_number is None
    ) or (
        api_options.config.contact_config.contact_method == "EMAIL"
        and device_info.email is None
    ):
        return ResendCodePostRestartFlowError()

    user_with_matching_login_method = await get_passwordless_user_by_account_info(
        tenant_id=tenant_id,
        user_context=user_context,
        account_info=AccountInfo(
            email=device_info.email,
            phone_number=device_info.phone_number,
        ),
    )

    auth_type_info = await check_auth_type_and_linking_status(
        session=session,
        account_info=AccountInfoWithRecipeId(
            recipe_id="passwordless",
            email=device_info.email,
            phone_number=device_info.phone_number,
        ),
        input_user=(
            user_with_matching_login_method.user
            if user_with_matching_login_method
            else None
        ),
        skip_session_user_update_in_core=True,
        user_context=user_context,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    if auth_type_info.status == "LINKING_TO_SESSION_USER_FAILED":
        return ResendCodePostRestartFlowError()

    number_of_tries_to_create_new_code = 0
    while True:
        number_of_tries_to_create_new_code += 1
        user_input_code = None
        if api_options.config.get_custom_user_input_code is not None:
            user_input_code = await api_options.config.get_custom_user_input_code(
                tenant_id, user_context
            )
        user_input_code_input = user_input_code
        if api_options.config.get_custom_user_input_code is not None:
            user_input_code_input = (
                await api_options.config.get_custom_user_input_code(
                    tenant_id, user_context
                )
            )
        response = (
            await api_options.recipe_implementation.create_new_code_for_device(
                device_id=device_id,
                user_input_code=user_input_code_input,
                tenant_id=tenant_id,
                user_context=user_context,
            )
        )

        if isinstance(
            response, CreateNewCodeForDeviceUserInputCodeAlreadyUsedError
        ):
            if number_of_tries_to_create_new_code >= 3:
                return GeneralErrorResponse(
                    "Failed to generate a one time code. Please try again"
                )
            continue

        if isinstance(response, CreateNewCodeForDeviceOkResult):
            magic_link = None
            user_input_code = None

            factor_ids = []
            if session is not None:
                factor_ids = [
                    (
                        FactorIds.OTP_EMAIL
                        if device_info.email is not None
                        else FactorIds.OTP_PHONE
                    )
                ]
            else:
                factor_ids = get_enabled_pwless_factors(api_options.config)
                factor_ids = await filter_out_invalid_first_factors_or_throw_if_all_are_invalid(
                    factor_ids, tenant_id, False, user_context
                )

            flow_type = api_options.config.flow_type
            if all(id.startswith("link") for id in factor_ids):
                flow_type = "MAGIC_LINK"
            elif all(id.startswith("otp") for id in factor_ids):
                flow_type = "USER_INPUT_CODE"
            else:
                flow_type = "USER_INPUT_CODE_AND_MAGIC_LINK"

            if flow_type in ("MAGIC_LINK", "USER_INPUT_CODE_AND_MAGIC_LINK"):
                magic_link = (
                    api_options.app_info.get_origin(
                        api_options.request, user_context
                    ).get_as_string_dangerous()
                    + api_options.app_info.website_base_path.get_as_string_dangerous()
                    + "/verify"
                    + "?preAuthSessionId="
                    + response.pre_auth_session_id
                    + "&tenantId="
                    + tenant_id
                    + "#"
                    + response.link_code
                )
            if flow_type in ("USER_INPUT_CODE", "USER_INPUT_CODE_AND_MAGIC_LINK"):
                user_input_code = response.user_input_code

            if api_options.config.contact_config.contact_method == "PHONE" or (
                api_options.config.contact_config.contact_method == "EMAIL_OR_PHONE"
                and device_info.phone_number is not None
            ):
                log_debug_message(
                    "Sending passwordless login SMS to %s", device_info.phone_number
                )
                assert device_info.phone_number is not None
                sms_input = PasswordlessLoginSMSTemplateVars(
                    phone_number=device_info.phone_number,
                    user_input_code=user_input_code,
                    url_with_link_code=magic_link,
                    code_life_time=response.code_life_time,
                    pre_auth_session_id=response.pre_auth_session_id,
                    tenant_id=tenant_id,
                    is_first_factor=auth_type_info.is_first_factor,
                )
                await api_options.sms_delivery.ingredient_interface_impl.send_sms(
                    sms_input, user_context
                )
            else:
                log_debug_message(
                    "Sending passwordless login email to %s", device_info.email
                )
                assert device_info.email is not None
                passwordless_email_delivery_input = (
                    PasswordlessLoginEmailTemplateVars(
                        email=device_info.email,
                        user_input_code=user_input_code,
                        url_with_link_code=magic_link,
                        code_life_time=response.code_life_time,
                        pre_auth_session_id=response.pre_auth_session_id,
                        tenant_id=tenant_id,
                        is_first_factor=auth_type_info.is_first_factor,
                    )
                )
                await api_options.email_delivery.ingredient_interface_impl.send_email(
                    passwordless_email_delivery_input, user_context
                )

            return ResendCodePostOkResult()

        return ResendCodePostRestartFlowError()
class PasswordlessUserResult (user: User, login_method: Optional[LoginMethod])
Expand source code
class PasswordlessUserResult:
    user: User
    login_method: Union[LoginMethod, None]

    def __init__(self, user: User, login_method: Union[LoginMethod, None]):
        self.user = user
        self.login_method = login_method

Class variables

var login_method : Optional[LoginMethod]
var userUser