Module supertokens_python.recipe.emailpassword.recipe_implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Union
from supertokens_python.asyncio import get_user

from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.emailverification.recipe import EmailVerificationRecipe
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.types import RecipeUserId

from .interfaces import (
    CreateResetPasswordOkResult,
    UnknownUserIdError,
    RecipeInterface,
    ConsumePasswordResetTokenOkResult,
    PasswordResetTokenInvalidError,
    SignInOkResult,
    UpdateEmailOrPasswordEmailChangeNotAllowedError,
    WrongCredentialsError,
    EmailAlreadyExistsError,
    SignUpOkResult,
    UpdateEmailOrPasswordOkResult,
    PasswordPolicyViolationError,
)
from .utils import EmailPasswordConfig
from .constants import FORM_FIELD_PASSWORD_ID
from supertokens_python.auth_utils import (
    LinkingToSessionUserFailedError,
    link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info,
)
from ...types import User

if TYPE_CHECKING:
    from supertokens_python.querier import Querier


class RecipeImplementation(RecipeInterface):
    def __init__(
        self,
        querier: Querier,
        ep_config: EmailPasswordConfig,
    ):
        super().__init__()
        self.querier = querier
        self.ep_config = ep_config

    async def sign_up(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[
        SignUpOkResult, EmailAlreadyExistsError, LinkingToSessionUserFailedError
    ]:
        response = await self.create_new_recipe_user(
            email=email,
            password=password,
            tenant_id=tenant_id,
            user_context=user_context,
        )
        if isinstance(response, EmailAlreadyExistsError):
            return response

        updated_user = response.user

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=response.user,
            recipe_user_id=response.recipe_user_id,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(link_result, LinkingToSessionUserFailedError):
            return LinkingToSessionUserFailedError(reason=link_result.reason)

        updated_user = link_result.user

        return SignUpOkResult(
            user=updated_user,
            recipe_user_id=response.recipe_user_id,
        )

    async def create_new_recipe_user(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignUpOkResult, EmailAlreadyExistsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signup"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )
        if response["status"] == "OK":
            return SignUpOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )
        return EmailAlreadyExistsError()

    async def sign_in(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError, LinkingToSessionUserFailedError]:
        response = await self.verify_credentials(
            email, password, tenant_id, user_context
        )

        if isinstance(response, SignInOkResult):
            login_method = next(
                (
                    lm
                    for lm in response.user.login_methods
                    if lm.recipe_user_id.get_as_string()
                    == response.recipe_user_id.get_as_string()
                ),
                None,
            )

            assert login_method is not None

            if not login_method.verified:
                await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                    user=response.user,
                    recipe_user_id=response.recipe_user_id,
                    user_context=user_context,
                )

                # We do this to get the updated user (in case the above function updated the verification status)
                updated_user = await get_user(
                    response.recipe_user_id.get_as_string(), user_context
                )
                assert updated_user is not None
                response.user = updated_user

            link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
                tenant_id=tenant_id,
                input_user=response.user,
                recipe_user_id=response.recipe_user_id,
                session=session,
                should_try_linking_with_session_user=should_try_linking_with_session_user,
                user_context=user_context,
            )

            if isinstance(link_result, LinkingToSessionUserFailedError):
                return link_result

            response.user = link_result.user

        return response

    async def verify_credentials(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signin"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )

        if response["status"] == "OK":
            return SignInOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )

        return WrongCredentialsError()

    async def create_reset_password_token(
        self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]
    ) -> Union[CreateResetPasswordOkResult, UnknownUserIdError]:
        data = {"userId": user_id, "email": email}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token"),
            data,
            user_context=user_context,
        )
        if "status" in response and response["status"] == "OK":
            return CreateResetPasswordOkResult(response["token"])
        return UnknownUserIdError()

    async def consume_password_reset_token(
        self,
        token: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[ConsumePasswordResetTokenOkResult, PasswordResetTokenInvalidError]:
        data = {"token": token}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token/consume"),
            data,
            user_context=user_context,
        )
        if "status" not in response or response["status"] != "OK":
            return PasswordResetTokenInvalidError()
        return ConsumePasswordResetTokenOkResult(response["email"], response["userId"])

    async def update_email_or_password(
        self,
        recipe_user_id: RecipeUserId,
        email: Union[str, None],
        password: Union[str, None],
        apply_password_policy: Union[bool, None],
        tenant_id_for_password_policy: str,
        user_context: Dict[str, Any],
    ) -> Union[
        UpdateEmailOrPasswordOkResult,
        EmailAlreadyExistsError,
        UnknownUserIdError,
        PasswordPolicyViolationError,
        UpdateEmailOrPasswordEmailChangeNotAllowedError,
    ]:
        account_linking = AccountLinkingRecipe.get_instance()
        data = {"recipeUserId": recipe_user_id.get_as_string()}

        if email is not None:
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()

            ev_instance = EmailVerificationRecipe.get_instance_optional()
            is_email_verified = False
            if ev_instance:
                is_email_verified = (
                    await ev_instance.recipe_implementation.is_email_verified(
                        recipe_user_id=recipe_user_id,
                        email=email,
                        user_context=user_context,
                    )
                )

            is_email_change_allowed = await account_linking.is_email_change_allowed(
                user=user,
                is_verified=is_email_verified,
                new_email=email,
                session=None,
                user_context=user_context,
            )
            if not is_email_change_allowed.allowed:
                reason = (
                    "New email cannot be applied to existing account because of account takeover risks."
                    if is_email_change_allowed.reason == "ACCOUNT_TAKEOVER_RISK"
                    else "New email cannot be applied to existing account because of there is another primary user with the same email address."
                )
                return UpdateEmailOrPasswordEmailChangeNotAllowedError(reason)

            data["email"] = email

        if password is not None:
            if apply_password_policy is None or apply_password_policy:
                form_fields = self.ep_config.sign_up_feature.form_fields
                password_field = next(
                    field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID
                )
                error = await password_field.validate(
                    password, tenant_id_for_password_policy
                )
                if error is not None:
                    return PasswordPolicyViolationError(error)
            data["password"] = password

        response = await self.querier.send_put_request(
            NormalisedURLPath("/recipe/user"),
            data,
            user_context=user_context,
        )

        if response.get("status") == "OK":
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()
            await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                user=user,
                recipe_user_id=recipe_user_id,
                user_context=user_context,
            )
            return UpdateEmailOrPasswordOkResult()
        elif response.get("status") == "EMAIL_ALREADY_EXISTS_ERROR":
            return EmailAlreadyExistsError()
        else:
            return UnknownUserIdError()

Classes

class RecipeImplementation (querier: Querier, ep_config: EmailPasswordConfig)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class RecipeImplementation(RecipeInterface):
    def __init__(
        self,
        querier: Querier,
        ep_config: EmailPasswordConfig,
    ):
        super().__init__()
        self.querier = querier
        self.ep_config = ep_config

    async def sign_up(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[
        SignUpOkResult, EmailAlreadyExistsError, LinkingToSessionUserFailedError
    ]:
        response = await self.create_new_recipe_user(
            email=email,
            password=password,
            tenant_id=tenant_id,
            user_context=user_context,
        )
        if isinstance(response, EmailAlreadyExistsError):
            return response

        updated_user = response.user

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=response.user,
            recipe_user_id=response.recipe_user_id,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(link_result, LinkingToSessionUserFailedError):
            return LinkingToSessionUserFailedError(reason=link_result.reason)

        updated_user = link_result.user

        return SignUpOkResult(
            user=updated_user,
            recipe_user_id=response.recipe_user_id,
        )

    async def create_new_recipe_user(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignUpOkResult, EmailAlreadyExistsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signup"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )
        if response["status"] == "OK":
            return SignUpOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )
        return EmailAlreadyExistsError()

    async def sign_in(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError, LinkingToSessionUserFailedError]:
        response = await self.verify_credentials(
            email, password, tenant_id, user_context
        )

        if isinstance(response, SignInOkResult):
            login_method = next(
                (
                    lm
                    for lm in response.user.login_methods
                    if lm.recipe_user_id.get_as_string()
                    == response.recipe_user_id.get_as_string()
                ),
                None,
            )

            assert login_method is not None

            if not login_method.verified:
                await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                    user=response.user,
                    recipe_user_id=response.recipe_user_id,
                    user_context=user_context,
                )

                # We do this to get the updated user (in case the above function updated the verification status)
                updated_user = await get_user(
                    response.recipe_user_id.get_as_string(), user_context
                )
                assert updated_user is not None
                response.user = updated_user

            link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
                tenant_id=tenant_id,
                input_user=response.user,
                recipe_user_id=response.recipe_user_id,
                session=session,
                should_try_linking_with_session_user=should_try_linking_with_session_user,
                user_context=user_context,
            )

            if isinstance(link_result, LinkingToSessionUserFailedError):
                return link_result

            response.user = link_result.user

        return response

    async def verify_credentials(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signin"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )

        if response["status"] == "OK":
            return SignInOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )

        return WrongCredentialsError()

    async def create_reset_password_token(
        self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]
    ) -> Union[CreateResetPasswordOkResult, UnknownUserIdError]:
        data = {"userId": user_id, "email": email}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token"),
            data,
            user_context=user_context,
        )
        if "status" in response and response["status"] == "OK":
            return CreateResetPasswordOkResult(response["token"])
        return UnknownUserIdError()

    async def consume_password_reset_token(
        self,
        token: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[ConsumePasswordResetTokenOkResult, PasswordResetTokenInvalidError]:
        data = {"token": token}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token/consume"),
            data,
            user_context=user_context,
        )
        if "status" not in response or response["status"] != "OK":
            return PasswordResetTokenInvalidError()
        return ConsumePasswordResetTokenOkResult(response["email"], response["userId"])

    async def update_email_or_password(
        self,
        recipe_user_id: RecipeUserId,
        email: Union[str, None],
        password: Union[str, None],
        apply_password_policy: Union[bool, None],
        tenant_id_for_password_policy: str,
        user_context: Dict[str, Any],
    ) -> Union[
        UpdateEmailOrPasswordOkResult,
        EmailAlreadyExistsError,
        UnknownUserIdError,
        PasswordPolicyViolationError,
        UpdateEmailOrPasswordEmailChangeNotAllowedError,
    ]:
        account_linking = AccountLinkingRecipe.get_instance()
        data = {"recipeUserId": recipe_user_id.get_as_string()}

        if email is not None:
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()

            ev_instance = EmailVerificationRecipe.get_instance_optional()
            is_email_verified = False
            if ev_instance:
                is_email_verified = (
                    await ev_instance.recipe_implementation.is_email_verified(
                        recipe_user_id=recipe_user_id,
                        email=email,
                        user_context=user_context,
                    )
                )

            is_email_change_allowed = await account_linking.is_email_change_allowed(
                user=user,
                is_verified=is_email_verified,
                new_email=email,
                session=None,
                user_context=user_context,
            )
            if not is_email_change_allowed.allowed:
                reason = (
                    "New email cannot be applied to existing account because of account takeover risks."
                    if is_email_change_allowed.reason == "ACCOUNT_TAKEOVER_RISK"
                    else "New email cannot be applied to existing account because of there is another primary user with the same email address."
                )
                return UpdateEmailOrPasswordEmailChangeNotAllowedError(reason)

            data["email"] = email

        if password is not None:
            if apply_password_policy is None or apply_password_policy:
                form_fields = self.ep_config.sign_up_feature.form_fields
                password_field = next(
                    field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID
                )
                error = await password_field.validate(
                    password, tenant_id_for_password_policy
                )
                if error is not None:
                    return PasswordPolicyViolationError(error)
            data["password"] = password

        response = await self.querier.send_put_request(
            NormalisedURLPath("/recipe/user"),
            data,
            user_context=user_context,
        )

        if response.get("status") == "OK":
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()
            await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                user=user,
                recipe_user_id=recipe_user_id,
                user_context=user_context,
            )
            return UpdateEmailOrPasswordOkResult()
        elif response.get("status") == "EMAIL_ALREADY_EXISTS_ERROR":
            return EmailAlreadyExistsError()
        else:
            return UnknownUserIdError()

Ancestors

Methods

async def consume_password_reset_token(self, token: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[ConsumePasswordResetTokenOkResultPasswordResetTokenInvalidError]
Expand source code
async def consume_password_reset_token(
    self,
    token: str,
    tenant_id: str,
    user_context: Dict[str, Any],
) -> Union[ConsumePasswordResetTokenOkResult, PasswordResetTokenInvalidError]:
    data = {"token": token}
    response = await self.querier.send_post_request(
        NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token/consume"),
        data,
        user_context=user_context,
    )
    if "status" not in response or response["status"] != "OK":
        return PasswordResetTokenInvalidError()
    return ConsumePasswordResetTokenOkResult(response["email"], response["userId"])
async def create_new_recipe_user(self, email: str, password: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[SignUpOkResultEmailAlreadyExistsError]
Expand source code
async def create_new_recipe_user(
    self,
    email: str,
    password: str,
    tenant_id: str,
    user_context: Dict[str, Any],
) -> Union[SignUpOkResult, EmailAlreadyExistsError]:
    response = await self.querier.send_post_request(
        NormalisedURLPath(f"{tenant_id}/recipe/signup"),
        {
            "email": email,
            "password": password,
        },
        user_context=user_context,
    )
    if response["status"] == "OK":
        return SignUpOkResult(
            user=User.from_json(response["user"]),
            recipe_user_id=RecipeUserId(response["recipeUserId"]),
        )
    return EmailAlreadyExistsError()
async def create_reset_password_token(self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[CreateResetPasswordOkResultUnknownUserIdError]
Expand source code
async def create_reset_password_token(
    self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]
) -> Union[CreateResetPasswordOkResult, UnknownUserIdError]:
    data = {"userId": user_id, "email": email}
    response = await self.querier.send_post_request(
        NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token"),
        data,
        user_context=user_context,
    )
    if "status" in response and response["status"] == "OK":
        return CreateResetPasswordOkResult(response["token"])
    return UnknownUserIdError()
async def sign_in(self, email: str, password: str, tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], user_context: Dict[str, Any]) ‑> Union[SignInOkResultWrongCredentialsErrorLinkingToSessionUserFailedError]
Expand source code
async def sign_in(
    self,
    email: str,
    password: str,
    tenant_id: str,
    session: Union[SessionContainer, None],
    should_try_linking_with_session_user: Union[bool, None],
    user_context: Dict[str, Any],
) -> Union[SignInOkResult, WrongCredentialsError, LinkingToSessionUserFailedError]:
    response = await self.verify_credentials(
        email, password, tenant_id, user_context
    )

    if isinstance(response, SignInOkResult):
        login_method = next(
            (
                lm
                for lm in response.user.login_methods
                if lm.recipe_user_id.get_as_string()
                == response.recipe_user_id.get_as_string()
            ),
            None,
        )

        assert login_method is not None

        if not login_method.verified:
            await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                user=response.user,
                recipe_user_id=response.recipe_user_id,
                user_context=user_context,
            )

            # We do this to get the updated user (in case the above function updated the verification status)
            updated_user = await get_user(
                response.recipe_user_id.get_as_string(), user_context
            )
            assert updated_user is not None
            response.user = updated_user

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=response.user,
            recipe_user_id=response.recipe_user_id,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(link_result, LinkingToSessionUserFailedError):
            return link_result

        response.user = link_result.user

    return response
async def sign_up(self, email: str, password: str, tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], user_context: Dict[str, Any]) ‑> Union[SignUpOkResultEmailAlreadyExistsErrorLinkingToSessionUserFailedError]
Expand source code
async def sign_up(
    self,
    email: str,
    password: str,
    tenant_id: str,
    session: Union[SessionContainer, None],
    should_try_linking_with_session_user: Union[bool, None],
    user_context: Dict[str, Any],
) -> Union[
    SignUpOkResult, EmailAlreadyExistsError, LinkingToSessionUserFailedError
]:
    response = await self.create_new_recipe_user(
        email=email,
        password=password,
        tenant_id=tenant_id,
        user_context=user_context,
    )
    if isinstance(response, EmailAlreadyExistsError):
        return response

    updated_user = response.user

    link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
        tenant_id=tenant_id,
        input_user=response.user,
        recipe_user_id=response.recipe_user_id,
        session=session,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
        user_context=user_context,
    )

    if isinstance(link_result, LinkingToSessionUserFailedError):
        return LinkingToSessionUserFailedError(reason=link_result.reason)

    updated_user = link_result.user

    return SignUpOkResult(
        user=updated_user,
        recipe_user_id=response.recipe_user_id,
    )
async def update_email_or_password(self, recipe_user_id: RecipeUserId, email: Union[str, None], password: Union[str, None], apply_password_policy: Union[bool, None], tenant_id_for_password_policy: str, user_context: Dict[str, Any]) ‑> Union[UpdateEmailOrPasswordOkResultEmailAlreadyExistsErrorUnknownUserIdErrorUpdateEmailOrPasswordEmailChangeNotAllowedErrorPasswordPolicyViolationError]
Expand source code
async def update_email_or_password(
    self,
    recipe_user_id: RecipeUserId,
    email: Union[str, None],
    password: Union[str, None],
    apply_password_policy: Union[bool, None],
    tenant_id_for_password_policy: str,
    user_context: Dict[str, Any],
) -> Union[
    UpdateEmailOrPasswordOkResult,
    EmailAlreadyExistsError,
    UnknownUserIdError,
    PasswordPolicyViolationError,
    UpdateEmailOrPasswordEmailChangeNotAllowedError,
]:
    account_linking = AccountLinkingRecipe.get_instance()
    data = {"recipeUserId": recipe_user_id.get_as_string()}

    if email is not None:
        user = await get_user(recipe_user_id.get_as_string(), user_context)
        if user is None:
            return UnknownUserIdError()

        ev_instance = EmailVerificationRecipe.get_instance_optional()
        is_email_verified = False
        if ev_instance:
            is_email_verified = (
                await ev_instance.recipe_implementation.is_email_verified(
                    recipe_user_id=recipe_user_id,
                    email=email,
                    user_context=user_context,
                )
            )

        is_email_change_allowed = await account_linking.is_email_change_allowed(
            user=user,
            is_verified=is_email_verified,
            new_email=email,
            session=None,
            user_context=user_context,
        )
        if not is_email_change_allowed.allowed:
            reason = (
                "New email cannot be applied to existing account because of account takeover risks."
                if is_email_change_allowed.reason == "ACCOUNT_TAKEOVER_RISK"
                else "New email cannot be applied to existing account because of there is another primary user with the same email address."
            )
            return UpdateEmailOrPasswordEmailChangeNotAllowedError(reason)

        data["email"] = email

    if password is not None:
        if apply_password_policy is None or apply_password_policy:
            form_fields = self.ep_config.sign_up_feature.form_fields
            password_field = next(
                field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID
            )
            error = await password_field.validate(
                password, tenant_id_for_password_policy
            )
            if error is not None:
                return PasswordPolicyViolationError(error)
        data["password"] = password

    response = await self.querier.send_put_request(
        NormalisedURLPath("/recipe/user"),
        data,
        user_context=user_context,
    )

    if response.get("status") == "OK":
        user = await get_user(recipe_user_id.get_as_string(), user_context)
        if user is None:
            return UnknownUserIdError()
        await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
            user=user,
            recipe_user_id=recipe_user_id,
            user_context=user_context,
        )
        return UpdateEmailOrPasswordOkResult()
    elif response.get("status") == "EMAIL_ALREADY_EXISTS_ERROR":
        return EmailAlreadyExistsError()
    else:
        return UnknownUserIdError()
async def verify_credentials(self, email: str, password: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[SignInOkResultWrongCredentialsError]
Expand source code
async def verify_credentials(
    self,
    email: str,
    password: str,
    tenant_id: str,
    user_context: Dict[str, Any],
) -> Union[SignInOkResult, WrongCredentialsError]:
    response = await self.querier.send_post_request(
        NormalisedURLPath(f"{tenant_id}/recipe/signin"),
        {
            "email": email,
            "password": password,
        },
        user_context=user_context,
    )

    if response["status"] == "OK":
        return SignInOkResult(
            user=User.from_json(response["user"]),
            recipe_user_id=RecipeUserId(response["recipeUserId"]),
        )

    return WrongCredentialsError()