Module supertokens_python.recipe.emailpassword.recipe_implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Union
from supertokens_python.asyncio import get_user

from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.emailverification.recipe import EmailVerificationRecipe
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.types import RecipeUserId

from .interfaces import (
    CreateResetPasswordOkResult,
    UnknownUserIdError,
    RecipeInterface,
    ConsumePasswordResetTokenOkResult,
    PasswordResetTokenInvalidError,
    SignInOkResult,
    UpdateEmailOrPasswordEmailChangeNotAllowedError,
    WrongCredentialsError,
    EmailAlreadyExistsError,
    SignUpOkResult,
    UpdateEmailOrPasswordOkResult,
    PasswordPolicyViolationError,
)
from .utils import EmailPasswordConfig
from .constants import FORM_FIELD_PASSWORD_ID
from supertokens_python.auth_utils import (
    LinkingToSessionUserFailedError,
    link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info,
)
from ...types import User

if TYPE_CHECKING:
    from supertokens_python.querier import Querier


class RecipeImplementation(RecipeInterface):
    def __init__(
        self,
        querier: Querier,
        ep_config: EmailPasswordConfig,
    ):
        super().__init__()
        self.querier = querier
        self.ep_config = ep_config

    async def sign_up(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[
        SignUpOkResult, EmailAlreadyExistsError, LinkingToSessionUserFailedError
    ]:
        response = await self.create_new_recipe_user(
            email=email,
            password=password,
            tenant_id=tenant_id,
            user_context=user_context,
        )
        if isinstance(response, EmailAlreadyExistsError):
            return response

        updated_user = response.user

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=response.user,
            recipe_user_id=response.recipe_user_id,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(link_result, LinkingToSessionUserFailedError):
            return LinkingToSessionUserFailedError(reason=link_result.reason)

        updated_user = link_result.user

        return SignUpOkResult(
            user=updated_user,
            recipe_user_id=response.recipe_user_id,
        )

    async def create_new_recipe_user(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignUpOkResult, EmailAlreadyExistsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signup"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )
        if response["status"] == "OK":
            return SignUpOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )
        return EmailAlreadyExistsError()

    async def sign_in(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError, LinkingToSessionUserFailedError]:
        response = await self.verify_credentials(
            email, password, tenant_id, user_context
        )

        if isinstance(response, SignInOkResult):
            login_method = next(
                (
                    lm
                    for lm in response.user.login_methods
                    if lm.recipe_user_id.get_as_string()
                    == response.recipe_user_id.get_as_string()
                ),
                None,
            )

            assert login_method is not None

            if not login_method.verified:
                await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                    user=response.user,
                    recipe_user_id=response.recipe_user_id,
                    user_context=user_context,
                )

                # We do this to get the updated user (in case the above function updated the verification status)
                updated_user = await get_user(
                    response.recipe_user_id.get_as_string(), user_context
                )
                assert updated_user is not None
                response.user = updated_user

            link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
                tenant_id=tenant_id,
                input_user=response.user,
                recipe_user_id=response.recipe_user_id,
                session=session,
                should_try_linking_with_session_user=should_try_linking_with_session_user,
                user_context=user_context,
            )

            if isinstance(link_result, LinkingToSessionUserFailedError):
                return link_result

            response.user = link_result.user

        return response

    async def verify_credentials(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signin"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )

        if response["status"] == "OK":
            return SignInOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )

        return WrongCredentialsError()

    async def create_reset_password_token(
        self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]
    ) -> Union[CreateResetPasswordOkResult, UnknownUserIdError]:
        data = {"userId": user_id, "email": email}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token"),
            data,
            user_context=user_context,
        )
        if "status" in response and response["status"] == "OK":
            return CreateResetPasswordOkResult(response["token"])
        return UnknownUserIdError()

    async def consume_password_reset_token(
        self,
        token: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[ConsumePasswordResetTokenOkResult, PasswordResetTokenInvalidError]:
        data = {"token": token}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token/consume"),
            data,
            user_context=user_context,
        )
        if "status" not in response or response["status"] != "OK":
            return PasswordResetTokenInvalidError()
        return ConsumePasswordResetTokenOkResult(response["email"], response["userId"])

    async def update_email_or_password(
        self,
        recipe_user_id: RecipeUserId,
        email: Union[str, None],
        password: Union[str, None],
        apply_password_policy: Union[bool, None],
        tenant_id_for_password_policy: str,
        user_context: Dict[str, Any],
    ) -> Union[
        UpdateEmailOrPasswordOkResult,
        EmailAlreadyExistsError,
        UnknownUserIdError,
        PasswordPolicyViolationError,
        UpdateEmailOrPasswordEmailChangeNotAllowedError,
    ]:
        account_linking = AccountLinkingRecipe.get_instance()
        data = {"recipeUserId": recipe_user_id.get_as_string()}

        if email is not None:
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()

            ev_instance = EmailVerificationRecipe.get_instance_optional()
            is_email_verified = False
            if ev_instance:
                is_email_verified = (
                    await ev_instance.recipe_implementation.is_email_verified(
                        recipe_user_id=recipe_user_id,
                        email=email,
                        user_context=user_context,
                    )
                )

            is_email_change_allowed = await account_linking.is_email_change_allowed(
                user=user,
                is_verified=is_email_verified,
                new_email=email,
                session=None,
                user_context=user_context,
            )
            if not is_email_change_allowed.allowed:
                reason = (
                    "New email cannot be applied to existing account because of account takeover risks."
                    if is_email_change_allowed.reason == "ACCOUNT_TAKEOVER_RISK"
                    else "New email cannot be applied to existing account because of there is another primary user with the same email address."
                )
                return UpdateEmailOrPasswordEmailChangeNotAllowedError(reason)

            data["email"] = email

        if password is not None:
            if apply_password_policy is None or apply_password_policy:
                form_fields = self.ep_config.sign_up_feature.form_fields
                password_field = next(
                    field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID
                )
                error = await password_field.validate(
                    password, tenant_id_for_password_policy
                )
                if error is not None:
                    return PasswordPolicyViolationError(error)
            data["password"] = password

        response = await self.querier.send_put_request(
            NormalisedURLPath("/recipe/user"),
            data,
            user_context=user_context,
        )

        if response.get("status") == "OK":
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()
            await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                user=user,
                recipe_user_id=recipe_user_id,
                user_context=user_context,
            )
            return UpdateEmailOrPasswordOkResult()
        elif response.get("status") == "EMAIL_ALREADY_EXISTS_ERROR":
            return EmailAlreadyExistsError()
        else:
            return UnknownUserIdError()

Classes

class RecipeImplementation (querier: Querier, ep_config: EmailPasswordConfig)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class RecipeImplementation(RecipeInterface):
    def __init__(
        self,
        querier: Querier,
        ep_config: EmailPasswordConfig,
    ):
        super().__init__()
        self.querier = querier
        self.ep_config = ep_config

    async def sign_up(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[
        SignUpOkResult, EmailAlreadyExistsError, LinkingToSessionUserFailedError
    ]:
        response = await self.create_new_recipe_user(
            email=email,
            password=password,
            tenant_id=tenant_id,
            user_context=user_context,
        )
        if isinstance(response, EmailAlreadyExistsError):
            return response

        updated_user = response.user

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=response.user,
            recipe_user_id=response.recipe_user_id,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(link_result, LinkingToSessionUserFailedError):
            return LinkingToSessionUserFailedError(reason=link_result.reason)

        updated_user = link_result.user

        return SignUpOkResult(
            user=updated_user,
            recipe_user_id=response.recipe_user_id,
        )

    async def create_new_recipe_user(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignUpOkResult, EmailAlreadyExistsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signup"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )
        if response["status"] == "OK":
            return SignUpOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )
        return EmailAlreadyExistsError()

    async def sign_in(
        self,
        email: str,
        password: str,
        tenant_id: str,
        session: Union[SessionContainer, None],
        should_try_linking_with_session_user: Union[bool, None],
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError, LinkingToSessionUserFailedError]:
        response = await self.verify_credentials(
            email, password, tenant_id, user_context
        )

        if isinstance(response, SignInOkResult):
            login_method = next(
                (
                    lm
                    for lm in response.user.login_methods
                    if lm.recipe_user_id.get_as_string()
                    == response.recipe_user_id.get_as_string()
                ),
                None,
            )

            assert login_method is not None

            if not login_method.verified:
                await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                    user=response.user,
                    recipe_user_id=response.recipe_user_id,
                    user_context=user_context,
                )

                # We do this to get the updated user (in case the above function updated the verification status)
                updated_user = await get_user(
                    response.recipe_user_id.get_as_string(), user_context
                )
                assert updated_user is not None
                response.user = updated_user

            link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
                tenant_id=tenant_id,
                input_user=response.user,
                recipe_user_id=response.recipe_user_id,
                session=session,
                should_try_linking_with_session_user=should_try_linking_with_session_user,
                user_context=user_context,
            )

            if isinstance(link_result, LinkingToSessionUserFailedError):
                return link_result

            response.user = link_result.user

        return response

    async def verify_credentials(
        self,
        email: str,
        password: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignInOkResult, WrongCredentialsError]:
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signin"),
            {
                "email": email,
                "password": password,
            },
            user_context=user_context,
        )

        if response["status"] == "OK":
            return SignInOkResult(
                user=User.from_json(response["user"]),
                recipe_user_id=RecipeUserId(response["recipeUserId"]),
            )

        return WrongCredentialsError()

    async def create_reset_password_token(
        self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]
    ) -> Union[CreateResetPasswordOkResult, UnknownUserIdError]:
        data = {"userId": user_id, "email": email}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token"),
            data,
            user_context=user_context,
        )
        if "status" in response and response["status"] == "OK":
            return CreateResetPasswordOkResult(response["token"])
        return UnknownUserIdError()

    async def consume_password_reset_token(
        self,
        token: str,
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[ConsumePasswordResetTokenOkResult, PasswordResetTokenInvalidError]:
        data = {"token": token}
        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/user/password/reset/token/consume"),
            data,
            user_context=user_context,
        )
        if "status" not in response or response["status"] != "OK":
            return PasswordResetTokenInvalidError()
        return ConsumePasswordResetTokenOkResult(response["email"], response["userId"])

    async def update_email_or_password(
        self,
        recipe_user_id: RecipeUserId,
        email: Union[str, None],
        password: Union[str, None],
        apply_password_policy: Union[bool, None],
        tenant_id_for_password_policy: str,
        user_context: Dict[str, Any],
    ) -> Union[
        UpdateEmailOrPasswordOkResult,
        EmailAlreadyExistsError,
        UnknownUserIdError,
        PasswordPolicyViolationError,
        UpdateEmailOrPasswordEmailChangeNotAllowedError,
    ]:
        account_linking = AccountLinkingRecipe.get_instance()
        data = {"recipeUserId": recipe_user_id.get_as_string()}

        if email is not None:
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()

            ev_instance = EmailVerificationRecipe.get_instance_optional()
            is_email_verified = False
            if ev_instance:
                is_email_verified = (
                    await ev_instance.recipe_implementation.is_email_verified(
                        recipe_user_id=recipe_user_id,
                        email=email,
                        user_context=user_context,
                    )
                )

            is_email_change_allowed = await account_linking.is_email_change_allowed(
                user=user,
                is_verified=is_email_verified,
                new_email=email,
                session=None,
                user_context=user_context,
            )
            if not is_email_change_allowed.allowed:
                reason = (
                    "New email cannot be applied to existing account because of account takeover risks."
                    if is_email_change_allowed.reason == "ACCOUNT_TAKEOVER_RISK"
                    else "New email cannot be applied to existing account because of there is another primary user with the same email address."
                )
                return UpdateEmailOrPasswordEmailChangeNotAllowedError(reason)

            data["email"] = email

        if password is not None:
            if apply_password_policy is None or apply_password_policy:
                form_fields = self.ep_config.sign_up_feature.form_fields
                password_field = next(
                    field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID
                )
                error = await password_field.validate(
                    password, tenant_id_for_password_policy
                )
                if error is not None:
                    return PasswordPolicyViolationError(error)
            data["password"] = password

        response = await self.querier.send_put_request(
            NormalisedURLPath("/recipe/user"),
            data,
            user_context=user_context,
        )

        if response.get("status") == "OK":
            user = await get_user(recipe_user_id.get_as_string(), user_context)
            if user is None:
                return UnknownUserIdError()
            await AccountLinkingRecipe.get_instance().verify_email_for_recipe_user_if_linked_accounts_are_verified(
                user=user,
                recipe_user_id=recipe_user_id,
                user_context=user_context,
            )
            return UpdateEmailOrPasswordOkResult()
        elif response.get("status") == "EMAIL_ALREADY_EXISTS_ERROR":
            return EmailAlreadyExistsError()
        else:
            return UnknownUserIdError()

Ancestors

Methods

async def consume_password_reset_token(self, token: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[ConsumePasswordResetTokenOkResultPasswordResetTokenInvalidError]
async def create_new_recipe_user(self, email: str, password: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[SignUpOkResultEmailAlreadyExistsError]
async def create_reset_password_token(self, user_id: str, email: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[CreateResetPasswordOkResultUnknownUserIdError]
async def sign_in(self, email: str, password: str, tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], user_context: Dict[str, Any]) ‑> Union[SignInOkResultWrongCredentialsErrorLinkingToSessionUserFailedError]
async def sign_up(self, email: str, password: str, tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], user_context: Dict[str, Any]) ‑> Union[SignUpOkResultEmailAlreadyExistsErrorLinkingToSessionUserFailedError]
async def update_email_or_password(self, recipe_user_id: RecipeUserId, email: Union[str, None], password: Union[str, None], apply_password_policy: Union[bool, None], tenant_id_for_password_policy: str, user_context: Dict[str, Any]) ‑> Union[UpdateEmailOrPasswordOkResultEmailAlreadyExistsErrorUnknownUserIdErrorUpdateEmailOrPasswordEmailChangeNotAllowedErrorPasswordPolicyViolationError]
async def verify_credentials(self, email: str, password: str, tenant_id: str, user_context: Dict[str, Any]) ‑> Union[SignInOkResultWrongCredentialsError]