Module supertokens_python.recipe.emailverification.recipe

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from os import environ
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from supertokens_python.exceptions import SuperTokensError, raise_general_exception
from supertokens_python.ingredients.emaildelivery import EmailDeliveryIngredient
from supertokens_python.recipe.emailverification.exceptions import (
    EmailVerificationInvalidTokenError,
)
from supertokens_python.recipe.emailverification.types import (
    EmailTemplateVars,
    EmailVerificationIngredients,
    VerificationEmailTemplateVars,
    VerificationEmailTemplateVarsUser,
)
from supertokens_python.recipe_module import APIHandled, RecipeModule

from ...ingredients.emaildelivery.types import EmailDeliveryConfig
from ...logger import log_debug_message
from ...post_init_callbacks import PostSTInitCallbacks
from ...utils import get_timestamp_ms
from ..session import SessionRecipe
from ..session.asyncio import revoke_all_sessions_for_user, create_new_session
from ..session.claim_base_classes.boolean_claim import (
    BooleanClaim,
    BooleanClaimValidators,
)
from ..session.exceptions import raise_unauthorised_exception
from ..session.interfaces import (
    ClaimValidationResult,
    JSONObject,
    SessionClaimValidator,
    SessionContainer,
)
from .interfaces import (
    APIInterface,
    APIOptions,
    CreateEmailVerificationTokenEmailAlreadyVerifiedError,
    EmailDoesNotExistError,
    EmailVerifyPostInvalidTokenError,
    EmailVerifyPostOkResult,
    GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError,
    GenerateEmailVerifyTokenPostOkResult,
    GetEmailForUserIdOkResult,
    IsEmailVerifiedGetOkResult,
    TypeGetEmailForUserIdFunction,
    UnknownUserIdError,
    VerifyEmailUsingTokenOkResult,
)
from .recipe_implementation import RecipeImplementation

if TYPE_CHECKING:
    from supertokens_python.framework.request import BaseRequest
    from supertokens_python.framework.response import BaseResponse
    from supertokens_python.supertokens import AppInfo
    from supertokens_python.types import RecipeUserId
    from ...types import User, MaybeAwaitable

from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.querier import Querier
from supertokens_python.recipe.emailverification.utils import get_email_verify_link

from .api import handle_email_verify_api, handle_generate_email_verify_token_api
from .constants import USER_EMAIL_VERIFY, USER_EMAIL_VERIFY_TOKEN
from .exceptions import SuperTokensEmailVerificationError
from .utils import MODE_TYPE, OverrideConfig, validate_and_normalise_user_input


class EmailVerificationRecipe(RecipeModule):
    recipe_id = "emailverification"
    __instance = None
    email_delivery: EmailDeliveryIngredient[VerificationEmailTemplateVars]

    def __init__(
        self,
        recipe_id: str,
        app_info: AppInfo,
        ingredients: EmailVerificationIngredients,
        mode: MODE_TYPE,
        email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
        get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None,
        override: Union[OverrideConfig, None] = None,
    ) -> None:
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(
            app_info,
            mode,
            email_delivery,
            get_email_for_recipe_user_id,
            override,
        )

        recipe_implementation = RecipeImplementation(
            Querier.get_instance(recipe_id),
            self.get_email_for_recipe_user_id,
        )
        self.recipe_implementation = (
            recipe_implementation
            if self.config.override.functions is None
            else self.config.override.functions(recipe_implementation)
        )

        api_implementation = APIImplementation()
        self.api_implementation = (
            api_implementation
            if self.config.override.apis is None
            else self.config.override.apis(api_implementation)
        )

        email_delivery_ingredient = ingredients.email_delivery
        if email_delivery_ingredient is None:
            self.email_delivery = EmailDeliveryIngredient(
                self.config.get_email_delivery_config()
            )
        else:
            self.email_delivery = email_delivery_ingredient

    def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and isinstance(
            err, SuperTokensEmailVerificationError
        )

    def get_apis_handled(self) -> List[APIHandled]:
        return [
            APIHandled(
                NormalisedURLPath(USER_EMAIL_VERIFY_TOKEN),
                "post",
                USER_EMAIL_VERIFY_TOKEN,
                self.api_implementation.disable_generate_email_verify_token_post,
            ),
            APIHandled(
                NormalisedURLPath(USER_EMAIL_VERIFY),
                "post",
                USER_EMAIL_VERIFY,
                self.api_implementation.disable_email_verify_post,
            ),
            APIHandled(
                NormalisedURLPath(USER_EMAIL_VERIFY),
                "get",
                USER_EMAIL_VERIFY,
                self.api_implementation.disable_is_email_verified_get,
            ),
        ]

    async def handle_api_request(
        self,
        request_id: str,
        tenant_id: str,
        request: BaseRequest,
        path: NormalisedURLPath,
        method: str,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> Union[BaseResponse, None]:
        api_options = APIOptions(
            request,
            response,
            self.recipe_id,
            self.config,
            self.recipe_implementation,
            self.get_app_info(),
            self.email_delivery,
        )
        if request_id == USER_EMAIL_VERIFY_TOKEN:
            return await handle_generate_email_verify_token_api(
                self.api_implementation, api_options, user_context
            )
        return await handle_email_verify_api(
            self.api_implementation, tenant_id, api_options, user_context
        )

    async def handle_error(
        self,
        request: BaseRequest,
        err: SuperTokensError,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> BaseResponse:
        if isinstance(err, EmailVerificationInvalidTokenError):
            response.set_json_content(
                {"status": "EMAIL_VERIFICATION_INVALID_TOKEN_ERROR"}
            )
            return response
        response.set_json_content({"status": "EMAIL_ALREADY_VERIFIED_ERROR"})
        return response

    def get_all_cors_headers(self) -> List[str]:
        return []

    @staticmethod
    def init(
        mode: MODE_TYPE,
        email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
        get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None,
        override: Union[OverrideConfig, None] = None,
    ):
        def func(app_info: AppInfo) -> EmailVerificationRecipe:
            if EmailVerificationRecipe.__instance is None:
                ingredients = EmailVerificationIngredients(email_delivery=None)
                EmailVerificationRecipe.__instance = EmailVerificationRecipe(
                    EmailVerificationRecipe.recipe_id,
                    app_info,
                    ingredients,
                    mode,
                    email_delivery,
                    get_email_for_recipe_user_id,
                    override,
                )

                def callback():
                    SessionRecipe.get_instance().add_claim_from_other_recipe(
                        EmailVerificationClaim
                    )
                    if mode == "REQUIRED":
                        SessionRecipe.get_instance().add_claim_validator_from_other_recipe(
                            EmailVerificationClaim.validators.is_verified()
                        )

                    from supertokens_python.recipe.accountlinking.recipe import (
                        AccountLinkingRecipe,
                    )

                    assert EmailVerificationRecipe.__instance is not None
                    AccountLinkingRecipe.get_instance().register_email_verification_recipe(
                        EmailVerificationRecipe.__instance
                    )

                PostSTInitCallbacks.add_post_init_callback(callback)

                return EmailVerificationRecipe.__instance
            raise_general_exception(
                "Emailverification recipe has already been initialised. Please check your code for bugs."
            )

        return func

    @staticmethod
    def get_instance_or_throw() -> EmailVerificationRecipe:
        if EmailVerificationRecipe.__instance is not None:
            return EmailVerificationRecipe.__instance
        raise_general_exception(
            "Initialisation not done. Did you forget to call the SuperTokens.init function?"
        )

    @staticmethod
    def get_instance_optional() -> Optional[EmailVerificationRecipe]:
        return EmailVerificationRecipe.__instance

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise_general_exception("calling testing function in non testing env")
        EmailVerificationRecipe.__instance = None

    async def get_email_for_recipe_user_id(
        self,
        user: Optional[User],
        recipe_user_id: RecipeUserId,
        user_context: Dict[str, Any],
    ) -> Union[GetEmailForUserIdOkResult, EmailDoesNotExistError, UnknownUserIdError]:
        if self.config.get_email_for_recipe_user_id is not None:
            user_res = await self.config.get_email_for_recipe_user_id(
                recipe_user_id, user_context
            )
            if not isinstance(user_res, UnknownUserIdError):
                return user_res

        if user is None:
            from supertokens_python.recipe.accountlinking.recipe import (
                AccountLinkingRecipe,
            )

            user = await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
                recipe_user_id.get_as_string(), user_context
            )

            if user is None:
                return UnknownUserIdError()

        for login_method in user.login_methods:
            if (
                login_method.recipe_user_id.get_as_string()
                == recipe_user_id.get_as_string()
            ):
                if login_method.email is not None:
                    return GetEmailForUserIdOkResult(email=login_method.email)
                else:
                    return EmailDoesNotExistError()

        return UnknownUserIdError()

    async def get_primary_user_id_for_recipe_user(
        self, recipe_user_id: RecipeUserId, user_context: Dict[str, Any]
    ) -> str:
        # We extract this into its own function like this cause we want to make sure that
        # this recipe does not get the email of the user ID from the getUser function.
        # In fact, there is a test "email verification recipe uses getUser function only in getEmailForRecipeUserId"
        # which makes sure that this function is only called in 3 places in this recipe:
        # - this function
        # - getEmailForRecipeUserId function (above)
        # - after verification to get the updated user in verifyEmailUsingToken
        # We want to isolate the result of calling this function as much as possible
        # so that the consumer of the getUser function does not read the email
        # from the primaryUser. Hence, this function only returns the string ID
        # and nothing else from the primaryUser.
        from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe

        primary_user = (
            await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
                recipe_user_id.get_as_string(), user_context
            )
        )
        if primary_user is None:
            # This can come here if the user is using session + email verification
            # recipe with a user ID that is not known to supertokens. In this case,
            # we do not allow linking for such users.
            return recipe_user_id.get_as_string()
        return primary_user.id

    async def update_session_if_required_post_email_verification(
        self,
        req: BaseRequest,
        session: Optional[SessionContainer],
        recipe_user_id_whose_email_got_verified: RecipeUserId,
        user_context: Dict[str, Any],
    ) -> Optional[SessionContainer]:
        primary_user_id = await self.get_primary_user_id_for_recipe_user(
            recipe_user_id_whose_email_got_verified, user_context
        )

        # if a session exists in the API, then we can update the session
        # claim related to email verification
        if session is not None:
            log_debug_message(
                "updateSessionIfRequiredPostEmailVerification got session"
            )
            # Due to linking, we will have to correct the current
            # session's user ID. There are four cases here:
            # --> (Case 1) User signed up and did email verification and the new account
            # became a primary user (user ID no change)
            # --> (Case 2) User signed up and did email verification and the new account got linked
            # to another primary user (user ID change)
            # --> (Case 3) This is post login account linking, in which the account that got verified
            # got linked to the session's account (user ID of account has changed to the session's user ID)
            # -->  (Case 4) This is post login account linking, in which the account that got verified
            # got linked to ANOTHER primary account (user ID of account has changed to a different user ID != session.getUserId, but
            # we should ignore this since it will result in the user's session changing.)

            if (
                session.get_recipe_user_id(user_context).get_as_string()
                == recipe_user_id_whose_email_got_verified.get_as_string()
            ):
                log_debug_message(
                    "updateSessionIfRequiredPostEmailVerification the session belongs to the verified user"
                )
                # this means that the session's login method's account is the
                # one that just got verified and that we are NOT doing post login
                # account linking. So this is only for (Case 1) and (Case 2)

                if session.get_user_id() == primary_user_id:
                    log_debug_message(
                        "updateSessionIfRequiredPostEmailVerification the session userId matches the primary user id, so we are only refreshing the claim"
                    )
                    # if the session's primary user ID is equal to the
                    # primary user ID that the account was linked to, then
                    # this means that the new account became a primary user (Case 1)
                    # We also have the sub cases here that the account that just
                    # got verified was already linked to the session's primary user ID,
                    # but either way, we don't need to change any user ID.

                    # In this case, all we do is to update the emailverification claim
                    try:
                        # EmailVerificationClaim will be based on the recipeUserId
                        # and not the primary user ID.
                        await session.fetch_and_set_claim(
                            EmailVerificationClaim, user_context
                        )
                    except Exception as err:
                        # This should never happen, since we've just set the status above.
                        if str(err) == "UNKNOWN_USER_ID":
                            raise_unauthorised_exception("Unknown User ID provided")
                        raise err

                    return None
                else:
                    log_debug_message(
                        "updateSessionIfRequiredPostEmailVerification the session user id doesn't match the primary user id, so we are revoking all sessions and creating a new one"
                    )
                    # if the session's primary user ID is NOT equal to the
                    # primary user ID that the account that it was linked to, then
                    # this means that the new account got linked to another primary user (Case 2)

                    # In this case, we need to update the session's user ID by creating
                    # a new session

                    # Revoke all session belonging to session.getRecipeUserId()
                    # We do not really need to do this, but we do it anyway.. no harm.
                    await revoke_all_sessions_for_user(
                        recipe_user_id_whose_email_got_verified.get_as_string(),
                        False,
                        None,
                        user_context,
                    )

                    # create a new session and return that..
                    return await create_new_session(
                        req,
                        session.get_tenant_id(),
                        session.get_recipe_user_id(user_context),
                        {},
                        {},
                        user_context,
                    )
            else:
                log_debug_message(
                    "updateSessionIfRequiredPostEmailVerification the verified user doesn't match the session"
                )
                # this means that the session's login method's account was NOT the
                # one that just got verified and that we ARE doing post login
                # account linking. So this is only for (Case 3) and (Case 4)

                # In both case 3 and case 4, we do not want to change anything in the
                # current session in terms of user ID or email verification claim (since
                # both of these refer to the current logged in user and not the newly
                # linked user's account).

                return None
        else:
            log_debug_message(
                "updateSessionIfRequiredPostEmailVerification got no session"
            )
            # the session is updated when the is email verification GET API is called
            # so we don't do anything in this API.
            return None


class EmailVerificationClaimValidators(BooleanClaimValidators):
    def __init__(self, claim: EmailVerificationClaimClass):
        super().__init__(claim, None)

    def is_verified(
        self,
        refetch_time_on_false_in_seconds: int = 10,
        max_age_in_seconds: Optional[int] = None,
        id_: Optional[str] = None,
    ) -> SessionClaimValidator:

        assert isinstance(self.claim, EmailVerificationClaimClass)
        return IsVerifiedSCV(
            (id_ or self.claim.key),
            self.claim,
            self,
            refetch_time_on_false_in_seconds,
            max_age_in_seconds,
        )


class EmailVerificationClaimClass(BooleanClaim):
    def __init__(self):
        async def fetch_value(
            _: str,
            recipe_user_id: RecipeUserId,
            __: str,
            ___: Dict[str, Any],
            user_context: Dict[str, Any],
        ) -> bool:
            recipe = EmailVerificationRecipe.get_instance_or_throw()
            email_info = await recipe.get_email_for_recipe_user_id(
                None, recipe_user_id, user_context
            )

            if isinstance(email_info, GetEmailForUserIdOkResult):
                return await recipe.recipe_implementation.is_email_verified(
                    recipe_user_id, email_info.email, user_context
                )
            if isinstance(email_info, EmailDoesNotExistError):
                # we consider people without email addresses as validated
                return True
            raise Exception("UNKNOWN_USER_ID")

        super().__init__("st-ev", fetch_value, None)

        self.validators = EmailVerificationClaimValidators(claim=self)


EmailVerificationClaim = EmailVerificationClaimClass()


class APIImplementation(APIInterface):
    async def email_verify_post(
        self,
        token: str,
        session: Optional[SessionContainer],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[EmailVerifyPostOkResult, EmailVerifyPostInvalidTokenError]:

        response = await api_options.recipe_implementation.verify_email_using_token(
            token, tenant_id, True, user_context
        )
        if isinstance(response, VerifyEmailUsingTokenOkResult):
            email_verification_recipe = EmailVerificationRecipe.get_instance_or_throw()
            new_session = await email_verification_recipe.update_session_if_required_post_email_verification(
                api_options.request, session, response.user.recipe_user_id, user_context
            )

            return EmailVerifyPostOkResult(response.user, new_session)
        return EmailVerifyPostInvalidTokenError()

    async def is_email_verified_get(
        self,
        session: SessionContainer,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> IsEmailVerifiedGetOkResult:
        recipe = EmailVerificationRecipe.get_instance_or_throw()
        email_info = await recipe.get_email_for_recipe_user_id(
            None, session.get_recipe_user_id(user_context), user_context
        )

        if isinstance(email_info, GetEmailForUserIdOkResult):
            is_verified = await api_options.recipe_implementation.is_email_verified(
                session.get_recipe_user_id(user_context), email_info.email, user_context
            )

            if is_verified:
                new_session = (
                    await recipe.update_session_if_required_post_email_verification(
                        api_options.request,
                        session,
                        session.get_recipe_user_id(user_context),
                        user_context,
                    )
                )
                return IsEmailVerifiedGetOkResult(True, new_session)
            else:
                await session.set_claim_value(
                    EmailVerificationClaim, False, user_context
                )
                return IsEmailVerifiedGetOkResult(False, None)
        elif isinstance(email_info, EmailDoesNotExistError):
            # We consider people without email addresses as validated
            return IsEmailVerifiedGetOkResult(True, None)
        else:
            # This means that the user ID is not known to supertokens. This could
            # happen if the current session's user ID is not an auth user,
            # or if it belongs to a recipe user ID that got deleted. Either way,
            # we logout the user.
            raise_unauthorised_exception("Unknown User ID provided")

    async def generate_email_verify_token_post(
        self,
        session: SessionContainer,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        GenerateEmailVerifyTokenPostOkResult,
        GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError,
    ]:
        tenant_id = session.get_tenant_id()

        email_info = await EmailVerificationRecipe.get_instance_or_throw().get_email_for_recipe_user_id(
            None, session.get_recipe_user_id(user_context), user_context
        )

        if isinstance(email_info, EmailDoesNotExistError):
            log_debug_message(
                "Email verification email not sent to user %s because it doesn't have an email address.",
                session.get_recipe_user_id(user_context).get_as_string(),
            )
            # This can happen if the user ID was found, but it has no email. In this
            # case, we treat it as a success case.
            new_session = await EmailVerificationRecipe.get_instance_or_throw().update_session_if_required_post_email_verification(
                api_options.request,
                session,
                session.get_recipe_user_id(user_context),
                user_context,
            )
            return GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError(new_session)
        elif isinstance(email_info, GetEmailForUserIdOkResult):
            response = (
                await api_options.recipe_implementation.create_email_verification_token(
                    session.get_recipe_user_id(user_context),
                    email_info.email,
                    tenant_id,
                    user_context,
                )
            )

            if isinstance(
                response, CreateEmailVerificationTokenEmailAlreadyVerifiedError
            ):
                log_debug_message(
                    "Email verification email not sent to user %s because it is already verified.",
                    session.get_recipe_user_id(user_context).get_as_string(),
                )
                new_session = await EmailVerificationRecipe.get_instance_or_throw().update_session_if_required_post_email_verification(
                    api_options.request,
                    session,
                    session.get_recipe_user_id(user_context),
                    user_context,
                )
                return GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError(
                    new_session
                )

            if (
                await session.get_claim_value(EmailVerificationClaim, user_context)
                is not False
            ):
                # This can happen if the email was unverified in another browser
                # and this session is still outdated - and the user has not
                # called the get email verification API yet.
                await session.fetch_and_set_claim(EmailVerificationClaim, user_context)

            email_verify_link = get_email_verify_link(
                api_options.app_info,
                response.token,
                tenant_id,
                api_options.request,
                user_context,
            )

            log_debug_message(
                "Sending email verification email to %s", email_info.email
            )
            email_verification_email_delivery_input = VerificationEmailTemplateVars(
                user=VerificationEmailTemplateVarsUser(
                    _id=session.get_user_id(user_context),
                    recipe_user_id=session.get_recipe_user_id(user_context),
                    email=email_info.email,
                ),
                email_verify_link=email_verify_link,
                tenant_id=tenant_id,
            )
            await api_options.email_delivery.ingredient_interface_impl.send_email(
                email_verification_email_delivery_input, user_context
            )
            return GenerateEmailVerifyTokenPostOkResult()

        log_debug_message(
            "generateEmailVerifyTokenPOST: Returning UNAUTHORISED because the user id provided is unknown"
        )
        raise_unauthorised_exception("Unknown User ID provided")


class IsVerifiedSCV(SessionClaimValidator):
    def __init__(
        self,
        id_: str,
        claim: EmailVerificationClaimClass,
        ev_claim_validators: EmailVerificationClaimValidators,
        refetch_time_on_false_in_seconds: int,
        max_age_in_seconds: Optional[int],
    ):
        super().__init__(id_)
        self.claim: EmailVerificationClaimClass = claim
        self.ev_claim_validators = ev_claim_validators
        self.refetch_time_on_false_in_ms = refetch_time_on_false_in_seconds * 1000
        self.max_age_in_sec = max_age_in_seconds

    async def validate(
        self, payload: JSONObject, user_context: Dict[str, Any]
    ) -> ClaimValidationResult:
        return await self.ev_claim_validators.has_value(
            True, self.max_age_in_sec
        ).validate(payload, user_context)

    def should_refetch(
        self, payload: JSONObject, user_context: Dict[str, Any]
    ) -> MaybeAwaitable[bool]:
        value = self.claim.get_value_from_payload(payload, user_context)
        if value is None:
            return True

        current_time = get_timestamp_ms()
        last_refetch_time = self.claim.get_last_refetch_time(payload, user_context)
        assert last_refetch_time is not None

        if self.max_age_in_sec is not None:
            if last_refetch_time < current_time - self.max_age_in_sec * 1000:
                return True

        if value is False:
            if last_refetch_time < current_time - self.refetch_time_on_false_in_ms:
                return True

        return False

Classes

class APIImplementation

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class APIImplementation(APIInterface):
    async def email_verify_post(
        self,
        token: str,
        session: Optional[SessionContainer],
        tenant_id: str,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[EmailVerifyPostOkResult, EmailVerifyPostInvalidTokenError]:

        response = await api_options.recipe_implementation.verify_email_using_token(
            token, tenant_id, True, user_context
        )
        if isinstance(response, VerifyEmailUsingTokenOkResult):
            email_verification_recipe = EmailVerificationRecipe.get_instance_or_throw()
            new_session = await email_verification_recipe.update_session_if_required_post_email_verification(
                api_options.request, session, response.user.recipe_user_id, user_context
            )

            return EmailVerifyPostOkResult(response.user, new_session)
        return EmailVerifyPostInvalidTokenError()

    async def is_email_verified_get(
        self,
        session: SessionContainer,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> IsEmailVerifiedGetOkResult:
        recipe = EmailVerificationRecipe.get_instance_or_throw()
        email_info = await recipe.get_email_for_recipe_user_id(
            None, session.get_recipe_user_id(user_context), user_context
        )

        if isinstance(email_info, GetEmailForUserIdOkResult):
            is_verified = await api_options.recipe_implementation.is_email_verified(
                session.get_recipe_user_id(user_context), email_info.email, user_context
            )

            if is_verified:
                new_session = (
                    await recipe.update_session_if_required_post_email_verification(
                        api_options.request,
                        session,
                        session.get_recipe_user_id(user_context),
                        user_context,
                    )
                )
                return IsEmailVerifiedGetOkResult(True, new_session)
            else:
                await session.set_claim_value(
                    EmailVerificationClaim, False, user_context
                )
                return IsEmailVerifiedGetOkResult(False, None)
        elif isinstance(email_info, EmailDoesNotExistError):
            # We consider people without email addresses as validated
            return IsEmailVerifiedGetOkResult(True, None)
        else:
            # This means that the user ID is not known to supertokens. This could
            # happen if the current session's user ID is not an auth user,
            # or if it belongs to a recipe user ID that got deleted. Either way,
            # we logout the user.
            raise_unauthorised_exception("Unknown User ID provided")

    async def generate_email_verify_token_post(
        self,
        session: SessionContainer,
        api_options: APIOptions,
        user_context: Dict[str, Any],
    ) -> Union[
        GenerateEmailVerifyTokenPostOkResult,
        GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError,
    ]:
        tenant_id = session.get_tenant_id()

        email_info = await EmailVerificationRecipe.get_instance_or_throw().get_email_for_recipe_user_id(
            None, session.get_recipe_user_id(user_context), user_context
        )

        if isinstance(email_info, EmailDoesNotExistError):
            log_debug_message(
                "Email verification email not sent to user %s because it doesn't have an email address.",
                session.get_recipe_user_id(user_context).get_as_string(),
            )
            # This can happen if the user ID was found, but it has no email. In this
            # case, we treat it as a success case.
            new_session = await EmailVerificationRecipe.get_instance_or_throw().update_session_if_required_post_email_verification(
                api_options.request,
                session,
                session.get_recipe_user_id(user_context),
                user_context,
            )
            return GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError(new_session)
        elif isinstance(email_info, GetEmailForUserIdOkResult):
            response = (
                await api_options.recipe_implementation.create_email_verification_token(
                    session.get_recipe_user_id(user_context),
                    email_info.email,
                    tenant_id,
                    user_context,
                )
            )

            if isinstance(
                response, CreateEmailVerificationTokenEmailAlreadyVerifiedError
            ):
                log_debug_message(
                    "Email verification email not sent to user %s because it is already verified.",
                    session.get_recipe_user_id(user_context).get_as_string(),
                )
                new_session = await EmailVerificationRecipe.get_instance_or_throw().update_session_if_required_post_email_verification(
                    api_options.request,
                    session,
                    session.get_recipe_user_id(user_context),
                    user_context,
                )
                return GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError(
                    new_session
                )

            if (
                await session.get_claim_value(EmailVerificationClaim, user_context)
                is not False
            ):
                # This can happen if the email was unverified in another browser
                # and this session is still outdated - and the user has not
                # called the get email verification API yet.
                await session.fetch_and_set_claim(EmailVerificationClaim, user_context)

            email_verify_link = get_email_verify_link(
                api_options.app_info,
                response.token,
                tenant_id,
                api_options.request,
                user_context,
            )

            log_debug_message(
                "Sending email verification email to %s", email_info.email
            )
            email_verification_email_delivery_input = VerificationEmailTemplateVars(
                user=VerificationEmailTemplateVarsUser(
                    _id=session.get_user_id(user_context),
                    recipe_user_id=session.get_recipe_user_id(user_context),
                    email=email_info.email,
                ),
                email_verify_link=email_verify_link,
                tenant_id=tenant_id,
            )
            await api_options.email_delivery.ingredient_interface_impl.send_email(
                email_verification_email_delivery_input, user_context
            )
            return GenerateEmailVerifyTokenPostOkResult()

        log_debug_message(
            "generateEmailVerifyTokenPOST: Returning UNAUTHORISED because the user id provided is unknown"
        )
        raise_unauthorised_exception("Unknown User ID provided")

Ancestors

Methods

async def email_verify_post(self, token: str, session: Optional[SessionContainer], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[EmailVerifyPostOkResultEmailVerifyPostInvalidTokenError]
Expand source code
async def email_verify_post(
    self,
    token: str,
    session: Optional[SessionContainer],
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[EmailVerifyPostOkResult, EmailVerifyPostInvalidTokenError]:

    response = await api_options.recipe_implementation.verify_email_using_token(
        token, tenant_id, True, user_context
    )
    if isinstance(response, VerifyEmailUsingTokenOkResult):
        email_verification_recipe = EmailVerificationRecipe.get_instance_or_throw()
        new_session = await email_verification_recipe.update_session_if_required_post_email_verification(
            api_options.request, session, response.user.recipe_user_id, user_context
        )

        return EmailVerifyPostOkResult(response.user, new_session)
    return EmailVerifyPostInvalidTokenError()
async def generate_email_verify_token_post(self, session: SessionContainer, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[GenerateEmailVerifyTokenPostOkResultGenerateEmailVerifyTokenPostEmailAlreadyVerifiedError]
Expand source code
async def generate_email_verify_token_post(
    self,
    session: SessionContainer,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[
    GenerateEmailVerifyTokenPostOkResult,
    GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError,
]:
    tenant_id = session.get_tenant_id()

    email_info = await EmailVerificationRecipe.get_instance_or_throw().get_email_for_recipe_user_id(
        None, session.get_recipe_user_id(user_context), user_context
    )

    if isinstance(email_info, EmailDoesNotExistError):
        log_debug_message(
            "Email verification email not sent to user %s because it doesn't have an email address.",
            session.get_recipe_user_id(user_context).get_as_string(),
        )
        # This can happen if the user ID was found, but it has no email. In this
        # case, we treat it as a success case.
        new_session = await EmailVerificationRecipe.get_instance_or_throw().update_session_if_required_post_email_verification(
            api_options.request,
            session,
            session.get_recipe_user_id(user_context),
            user_context,
        )
        return GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError(new_session)
    elif isinstance(email_info, GetEmailForUserIdOkResult):
        response = (
            await api_options.recipe_implementation.create_email_verification_token(
                session.get_recipe_user_id(user_context),
                email_info.email,
                tenant_id,
                user_context,
            )
        )

        if isinstance(
            response, CreateEmailVerificationTokenEmailAlreadyVerifiedError
        ):
            log_debug_message(
                "Email verification email not sent to user %s because it is already verified.",
                session.get_recipe_user_id(user_context).get_as_string(),
            )
            new_session = await EmailVerificationRecipe.get_instance_or_throw().update_session_if_required_post_email_verification(
                api_options.request,
                session,
                session.get_recipe_user_id(user_context),
                user_context,
            )
            return GenerateEmailVerifyTokenPostEmailAlreadyVerifiedError(
                new_session
            )

        if (
            await session.get_claim_value(EmailVerificationClaim, user_context)
            is not False
        ):
            # This can happen if the email was unverified in another browser
            # and this session is still outdated - and the user has not
            # called the get email verification API yet.
            await session.fetch_and_set_claim(EmailVerificationClaim, user_context)

        email_verify_link = get_email_verify_link(
            api_options.app_info,
            response.token,
            tenant_id,
            api_options.request,
            user_context,
        )

        log_debug_message(
            "Sending email verification email to %s", email_info.email
        )
        email_verification_email_delivery_input = VerificationEmailTemplateVars(
            user=VerificationEmailTemplateVarsUser(
                _id=session.get_user_id(user_context),
                recipe_user_id=session.get_recipe_user_id(user_context),
                email=email_info.email,
            ),
            email_verify_link=email_verify_link,
            tenant_id=tenant_id,
        )
        await api_options.email_delivery.ingredient_interface_impl.send_email(
            email_verification_email_delivery_input, user_context
        )
        return GenerateEmailVerifyTokenPostOkResult()

    log_debug_message(
        "generateEmailVerifyTokenPOST: Returning UNAUTHORISED because the user id provided is unknown"
    )
    raise_unauthorised_exception("Unknown User ID provided")
async def is_email_verified_get(self, session: SessionContainer, api_options: APIOptions, user_context: Dict[str, Any]) ‑> IsEmailVerifiedGetOkResult
Expand source code
async def is_email_verified_get(
    self,
    session: SessionContainer,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> IsEmailVerifiedGetOkResult:
    recipe = EmailVerificationRecipe.get_instance_or_throw()
    email_info = await recipe.get_email_for_recipe_user_id(
        None, session.get_recipe_user_id(user_context), user_context
    )

    if isinstance(email_info, GetEmailForUserIdOkResult):
        is_verified = await api_options.recipe_implementation.is_email_verified(
            session.get_recipe_user_id(user_context), email_info.email, user_context
        )

        if is_verified:
            new_session = (
                await recipe.update_session_if_required_post_email_verification(
                    api_options.request,
                    session,
                    session.get_recipe_user_id(user_context),
                    user_context,
                )
            )
            return IsEmailVerifiedGetOkResult(True, new_session)
        else:
            await session.set_claim_value(
                EmailVerificationClaim, False, user_context
            )
            return IsEmailVerifiedGetOkResult(False, None)
    elif isinstance(email_info, EmailDoesNotExistError):
        # We consider people without email addresses as validated
        return IsEmailVerifiedGetOkResult(True, None)
    else:
        # This means that the user ID is not known to supertokens. This could
        # happen if the current session's user ID is not an auth user,
        # or if it belongs to a recipe user ID that got deleted. Either way,
        # we logout the user.
        raise_unauthorised_exception("Unknown User ID provided")
class EmailVerificationClaimClass

Helper class that provides a standard way to create an ABC using inheritance.

Args

key
The key to use when storing the claim in the payload.
fetch_value
a method that fetches the current value of this claim for the user. A None return value signifies that we don't want to update the claim payload and or the claim value is not present in the database. For example, this can happen with a second factor auth claim, where we don't want to add the claim to the session automatically
Expand source code
class EmailVerificationClaimClass(BooleanClaim):
    def __init__(self):
        async def fetch_value(
            _: str,
            recipe_user_id: RecipeUserId,
            __: str,
            ___: Dict[str, Any],
            user_context: Dict[str, Any],
        ) -> bool:
            recipe = EmailVerificationRecipe.get_instance_or_throw()
            email_info = await recipe.get_email_for_recipe_user_id(
                None, recipe_user_id, user_context
            )

            if isinstance(email_info, GetEmailForUserIdOkResult):
                return await recipe.recipe_implementation.is_email_verified(
                    recipe_user_id, email_info.email, user_context
                )
            if isinstance(email_info, EmailDoesNotExistError):
                # we consider people without email addresses as validated
                return True
            raise Exception("UNKNOWN_USER_ID")

        super().__init__("st-ev", fetch_value, None)

        self.validators = EmailVerificationClaimValidators(claim=self)

Ancestors

Inherited members

class EmailVerificationClaimValidators (claim: EmailVerificationClaimClass)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Expand source code
class EmailVerificationClaimValidators(BooleanClaimValidators):
    def __init__(self, claim: EmailVerificationClaimClass):
        super().__init__(claim, None)

    def is_verified(
        self,
        refetch_time_on_false_in_seconds: int = 10,
        max_age_in_seconds: Optional[int] = None,
        id_: Optional[str] = None,
    ) -> SessionClaimValidator:

        assert isinstance(self.claim, EmailVerificationClaimClass)
        return IsVerifiedSCV(
            (id_ or self.claim.key),
            self.claim,
            self,
            refetch_time_on_false_in_seconds,
            max_age_in_seconds,
        )

Ancestors

Methods

def is_verified(self, refetch_time_on_false_in_seconds: int = 10, max_age_in_seconds: Optional[int] = None, id_: Optional[str] = None) ‑> SessionClaimValidator
Expand source code
def is_verified(
    self,
    refetch_time_on_false_in_seconds: int = 10,
    max_age_in_seconds: Optional[int] = None,
    id_: Optional[str] = None,
) -> SessionClaimValidator:

    assert isinstance(self.claim, EmailVerificationClaimClass)
    return IsVerifiedSCV(
        (id_ or self.claim.key),
        self.claim,
        self,
        refetch_time_on_false_in_seconds,
        max_age_in_seconds,
    )
class EmailVerificationRecipe (recipe_id: str, app_info: AppInfo, ingredients: EmailVerificationIngredients, mode: MODE_TYPE, email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None, get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None, override: Union[OverrideConfig, None] = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class EmailVerificationRecipe(RecipeModule):
    recipe_id = "emailverification"
    __instance = None
    email_delivery: EmailDeliveryIngredient[VerificationEmailTemplateVars]

    def __init__(
        self,
        recipe_id: str,
        app_info: AppInfo,
        ingredients: EmailVerificationIngredients,
        mode: MODE_TYPE,
        email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
        get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None,
        override: Union[OverrideConfig, None] = None,
    ) -> None:
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(
            app_info,
            mode,
            email_delivery,
            get_email_for_recipe_user_id,
            override,
        )

        recipe_implementation = RecipeImplementation(
            Querier.get_instance(recipe_id),
            self.get_email_for_recipe_user_id,
        )
        self.recipe_implementation = (
            recipe_implementation
            if self.config.override.functions is None
            else self.config.override.functions(recipe_implementation)
        )

        api_implementation = APIImplementation()
        self.api_implementation = (
            api_implementation
            if self.config.override.apis is None
            else self.config.override.apis(api_implementation)
        )

        email_delivery_ingredient = ingredients.email_delivery
        if email_delivery_ingredient is None:
            self.email_delivery = EmailDeliveryIngredient(
                self.config.get_email_delivery_config()
            )
        else:
            self.email_delivery = email_delivery_ingredient

    def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and isinstance(
            err, SuperTokensEmailVerificationError
        )

    def get_apis_handled(self) -> List[APIHandled]:
        return [
            APIHandled(
                NormalisedURLPath(USER_EMAIL_VERIFY_TOKEN),
                "post",
                USER_EMAIL_VERIFY_TOKEN,
                self.api_implementation.disable_generate_email_verify_token_post,
            ),
            APIHandled(
                NormalisedURLPath(USER_EMAIL_VERIFY),
                "post",
                USER_EMAIL_VERIFY,
                self.api_implementation.disable_email_verify_post,
            ),
            APIHandled(
                NormalisedURLPath(USER_EMAIL_VERIFY),
                "get",
                USER_EMAIL_VERIFY,
                self.api_implementation.disable_is_email_verified_get,
            ),
        ]

    async def handle_api_request(
        self,
        request_id: str,
        tenant_id: str,
        request: BaseRequest,
        path: NormalisedURLPath,
        method: str,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> Union[BaseResponse, None]:
        api_options = APIOptions(
            request,
            response,
            self.recipe_id,
            self.config,
            self.recipe_implementation,
            self.get_app_info(),
            self.email_delivery,
        )
        if request_id == USER_EMAIL_VERIFY_TOKEN:
            return await handle_generate_email_verify_token_api(
                self.api_implementation, api_options, user_context
            )
        return await handle_email_verify_api(
            self.api_implementation, tenant_id, api_options, user_context
        )

    async def handle_error(
        self,
        request: BaseRequest,
        err: SuperTokensError,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> BaseResponse:
        if isinstance(err, EmailVerificationInvalidTokenError):
            response.set_json_content(
                {"status": "EMAIL_VERIFICATION_INVALID_TOKEN_ERROR"}
            )
            return response
        response.set_json_content({"status": "EMAIL_ALREADY_VERIFIED_ERROR"})
        return response

    def get_all_cors_headers(self) -> List[str]:
        return []

    @staticmethod
    def init(
        mode: MODE_TYPE,
        email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
        get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None,
        override: Union[OverrideConfig, None] = None,
    ):
        def func(app_info: AppInfo) -> EmailVerificationRecipe:
            if EmailVerificationRecipe.__instance is None:
                ingredients = EmailVerificationIngredients(email_delivery=None)
                EmailVerificationRecipe.__instance = EmailVerificationRecipe(
                    EmailVerificationRecipe.recipe_id,
                    app_info,
                    ingredients,
                    mode,
                    email_delivery,
                    get_email_for_recipe_user_id,
                    override,
                )

                def callback():
                    SessionRecipe.get_instance().add_claim_from_other_recipe(
                        EmailVerificationClaim
                    )
                    if mode == "REQUIRED":
                        SessionRecipe.get_instance().add_claim_validator_from_other_recipe(
                            EmailVerificationClaim.validators.is_verified()
                        )

                    from supertokens_python.recipe.accountlinking.recipe import (
                        AccountLinkingRecipe,
                    )

                    assert EmailVerificationRecipe.__instance is not None
                    AccountLinkingRecipe.get_instance().register_email_verification_recipe(
                        EmailVerificationRecipe.__instance
                    )

                PostSTInitCallbacks.add_post_init_callback(callback)

                return EmailVerificationRecipe.__instance
            raise_general_exception(
                "Emailverification recipe has already been initialised. Please check your code for bugs."
            )

        return func

    @staticmethod
    def get_instance_or_throw() -> EmailVerificationRecipe:
        if EmailVerificationRecipe.__instance is not None:
            return EmailVerificationRecipe.__instance
        raise_general_exception(
            "Initialisation not done. Did you forget to call the SuperTokens.init function?"
        )

    @staticmethod
    def get_instance_optional() -> Optional[EmailVerificationRecipe]:
        return EmailVerificationRecipe.__instance

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise_general_exception("calling testing function in non testing env")
        EmailVerificationRecipe.__instance = None

    async def get_email_for_recipe_user_id(
        self,
        user: Optional[User],
        recipe_user_id: RecipeUserId,
        user_context: Dict[str, Any],
    ) -> Union[GetEmailForUserIdOkResult, EmailDoesNotExistError, UnknownUserIdError]:
        if self.config.get_email_for_recipe_user_id is not None:
            user_res = await self.config.get_email_for_recipe_user_id(
                recipe_user_id, user_context
            )
            if not isinstance(user_res, UnknownUserIdError):
                return user_res

        if user is None:
            from supertokens_python.recipe.accountlinking.recipe import (
                AccountLinkingRecipe,
            )

            user = await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
                recipe_user_id.get_as_string(), user_context
            )

            if user is None:
                return UnknownUserIdError()

        for login_method in user.login_methods:
            if (
                login_method.recipe_user_id.get_as_string()
                == recipe_user_id.get_as_string()
            ):
                if login_method.email is not None:
                    return GetEmailForUserIdOkResult(email=login_method.email)
                else:
                    return EmailDoesNotExistError()

        return UnknownUserIdError()

    async def get_primary_user_id_for_recipe_user(
        self, recipe_user_id: RecipeUserId, user_context: Dict[str, Any]
    ) -> str:
        # We extract this into its own function like this cause we want to make sure that
        # this recipe does not get the email of the user ID from the getUser function.
        # In fact, there is a test "email verification recipe uses getUser function only in getEmailForRecipeUserId"
        # which makes sure that this function is only called in 3 places in this recipe:
        # - this function
        # - getEmailForRecipeUserId function (above)
        # - after verification to get the updated user in verifyEmailUsingToken
        # We want to isolate the result of calling this function as much as possible
        # so that the consumer of the getUser function does not read the email
        # from the primaryUser. Hence, this function only returns the string ID
        # and nothing else from the primaryUser.
        from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe

        primary_user = (
            await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
                recipe_user_id.get_as_string(), user_context
            )
        )
        if primary_user is None:
            # This can come here if the user is using session + email verification
            # recipe with a user ID that is not known to supertokens. In this case,
            # we do not allow linking for such users.
            return recipe_user_id.get_as_string()
        return primary_user.id

    async def update_session_if_required_post_email_verification(
        self,
        req: BaseRequest,
        session: Optional[SessionContainer],
        recipe_user_id_whose_email_got_verified: RecipeUserId,
        user_context: Dict[str, Any],
    ) -> Optional[SessionContainer]:
        primary_user_id = await self.get_primary_user_id_for_recipe_user(
            recipe_user_id_whose_email_got_verified, user_context
        )

        # if a session exists in the API, then we can update the session
        # claim related to email verification
        if session is not None:
            log_debug_message(
                "updateSessionIfRequiredPostEmailVerification got session"
            )
            # Due to linking, we will have to correct the current
            # session's user ID. There are four cases here:
            # --> (Case 1) User signed up and did email verification and the new account
            # became a primary user (user ID no change)
            # --> (Case 2) User signed up and did email verification and the new account got linked
            # to another primary user (user ID change)
            # --> (Case 3) This is post login account linking, in which the account that got verified
            # got linked to the session's account (user ID of account has changed to the session's user ID)
            # -->  (Case 4) This is post login account linking, in which the account that got verified
            # got linked to ANOTHER primary account (user ID of account has changed to a different user ID != session.getUserId, but
            # we should ignore this since it will result in the user's session changing.)

            if (
                session.get_recipe_user_id(user_context).get_as_string()
                == recipe_user_id_whose_email_got_verified.get_as_string()
            ):
                log_debug_message(
                    "updateSessionIfRequiredPostEmailVerification the session belongs to the verified user"
                )
                # this means that the session's login method's account is the
                # one that just got verified and that we are NOT doing post login
                # account linking. So this is only for (Case 1) and (Case 2)

                if session.get_user_id() == primary_user_id:
                    log_debug_message(
                        "updateSessionIfRequiredPostEmailVerification the session userId matches the primary user id, so we are only refreshing the claim"
                    )
                    # if the session's primary user ID is equal to the
                    # primary user ID that the account was linked to, then
                    # this means that the new account became a primary user (Case 1)
                    # We also have the sub cases here that the account that just
                    # got verified was already linked to the session's primary user ID,
                    # but either way, we don't need to change any user ID.

                    # In this case, all we do is to update the emailverification claim
                    try:
                        # EmailVerificationClaim will be based on the recipeUserId
                        # and not the primary user ID.
                        await session.fetch_and_set_claim(
                            EmailVerificationClaim, user_context
                        )
                    except Exception as err:
                        # This should never happen, since we've just set the status above.
                        if str(err) == "UNKNOWN_USER_ID":
                            raise_unauthorised_exception("Unknown User ID provided")
                        raise err

                    return None
                else:
                    log_debug_message(
                        "updateSessionIfRequiredPostEmailVerification the session user id doesn't match the primary user id, so we are revoking all sessions and creating a new one"
                    )
                    # if the session's primary user ID is NOT equal to the
                    # primary user ID that the account that it was linked to, then
                    # this means that the new account got linked to another primary user (Case 2)

                    # In this case, we need to update the session's user ID by creating
                    # a new session

                    # Revoke all session belonging to session.getRecipeUserId()
                    # We do not really need to do this, but we do it anyway.. no harm.
                    await revoke_all_sessions_for_user(
                        recipe_user_id_whose_email_got_verified.get_as_string(),
                        False,
                        None,
                        user_context,
                    )

                    # create a new session and return that..
                    return await create_new_session(
                        req,
                        session.get_tenant_id(),
                        session.get_recipe_user_id(user_context),
                        {},
                        {},
                        user_context,
                    )
            else:
                log_debug_message(
                    "updateSessionIfRequiredPostEmailVerification the verified user doesn't match the session"
                )
                # this means that the session's login method's account was NOT the
                # one that just got verified and that we ARE doing post login
                # account linking. So this is only for (Case 3) and (Case 4)

                # In both case 3 and case 4, we do not want to change anything in the
                # current session in terms of user ID or email verification claim (since
                # both of these refer to the current logged in user and not the newly
                # linked user's account).

                return None
        else:
            log_debug_message(
                "updateSessionIfRequiredPostEmailVerification got no session"
            )
            # the session is updated when the is email verification GET API is called
            # so we don't do anything in this API.
            return None

Ancestors

Class variables

var email_deliveryEmailDeliveryIngredient[VerificationEmailTemplateVars]
var recipe_id

Static methods

def get_instance_optional() ‑> Optional[EmailVerificationRecipe]
Expand source code
@staticmethod
def get_instance_optional() -> Optional[EmailVerificationRecipe]:
    return EmailVerificationRecipe.__instance
def get_instance_or_throw() ‑> EmailVerificationRecipe
Expand source code
@staticmethod
def get_instance_or_throw() -> EmailVerificationRecipe:
    if EmailVerificationRecipe.__instance is not None:
        return EmailVerificationRecipe.__instance
    raise_general_exception(
        "Initialisation not done. Did you forget to call the SuperTokens.init function?"
    )
def init(mode: MODE_TYPE, email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None, get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None, override: Union[OverrideConfig, None] = None)
Expand source code
@staticmethod
def init(
    mode: MODE_TYPE,
    email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
    get_email_for_recipe_user_id: Optional[TypeGetEmailForUserIdFunction] = None,
    override: Union[OverrideConfig, None] = None,
):
    def func(app_info: AppInfo) -> EmailVerificationRecipe:
        if EmailVerificationRecipe.__instance is None:
            ingredients = EmailVerificationIngredients(email_delivery=None)
            EmailVerificationRecipe.__instance = EmailVerificationRecipe(
                EmailVerificationRecipe.recipe_id,
                app_info,
                ingredients,
                mode,
                email_delivery,
                get_email_for_recipe_user_id,
                override,
            )

            def callback():
                SessionRecipe.get_instance().add_claim_from_other_recipe(
                    EmailVerificationClaim
                )
                if mode == "REQUIRED":
                    SessionRecipe.get_instance().add_claim_validator_from_other_recipe(
                        EmailVerificationClaim.validators.is_verified()
                    )

                from supertokens_python.recipe.accountlinking.recipe import (
                    AccountLinkingRecipe,
                )

                assert EmailVerificationRecipe.__instance is not None
                AccountLinkingRecipe.get_instance().register_email_verification_recipe(
                    EmailVerificationRecipe.__instance
                )

            PostSTInitCallbacks.add_post_init_callback(callback)

            return EmailVerificationRecipe.__instance
        raise_general_exception(
            "Emailverification recipe has already been initialised. Please check your code for bugs."
        )

    return func
def reset()
Expand source code
@staticmethod
def reset():
    if ("SUPERTOKENS_ENV" not in environ) or (
        environ["SUPERTOKENS_ENV"] != "testing"
    ):
        raise_general_exception("calling testing function in non testing env")
    EmailVerificationRecipe.__instance = None

Methods

def get_all_cors_headers(self) ‑> List[str]
Expand source code
def get_all_cors_headers(self) -> List[str]:
    return []
def get_apis_handled(self) ‑> List[APIHandled]
Expand source code
def get_apis_handled(self) -> List[APIHandled]:
    return [
        APIHandled(
            NormalisedURLPath(USER_EMAIL_VERIFY_TOKEN),
            "post",
            USER_EMAIL_VERIFY_TOKEN,
            self.api_implementation.disable_generate_email_verify_token_post,
        ),
        APIHandled(
            NormalisedURLPath(USER_EMAIL_VERIFY),
            "post",
            USER_EMAIL_VERIFY,
            self.api_implementation.disable_email_verify_post,
        ),
        APIHandled(
            NormalisedURLPath(USER_EMAIL_VERIFY),
            "get",
            USER_EMAIL_VERIFY,
            self.api_implementation.disable_is_email_verified_get,
        ),
    ]
async def get_email_for_recipe_user_id(self, user: Optional[User], recipe_user_id: RecipeUserId, user_context: Dict[str, Any]) ‑> Union[GetEmailForUserIdOkResult, EmailDoesNotExistError, UnknownUserIdError]
Expand source code
async def get_email_for_recipe_user_id(
    self,
    user: Optional[User],
    recipe_user_id: RecipeUserId,
    user_context: Dict[str, Any],
) -> Union[GetEmailForUserIdOkResult, EmailDoesNotExistError, UnknownUserIdError]:
    if self.config.get_email_for_recipe_user_id is not None:
        user_res = await self.config.get_email_for_recipe_user_id(
            recipe_user_id, user_context
        )
        if not isinstance(user_res, UnknownUserIdError):
            return user_res

    if user is None:
        from supertokens_python.recipe.accountlinking.recipe import (
            AccountLinkingRecipe,
        )

        user = await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
            recipe_user_id.get_as_string(), user_context
        )

        if user is None:
            return UnknownUserIdError()

    for login_method in user.login_methods:
        if (
            login_method.recipe_user_id.get_as_string()
            == recipe_user_id.get_as_string()
        ):
            if login_method.email is not None:
                return GetEmailForUserIdOkResult(email=login_method.email)
            else:
                return EmailDoesNotExistError()

    return UnknownUserIdError()
async def get_primary_user_id_for_recipe_user(self, recipe_user_id: RecipeUserId, user_context: Dict[str, Any]) ‑> str
Expand source code
async def get_primary_user_id_for_recipe_user(
    self, recipe_user_id: RecipeUserId, user_context: Dict[str, Any]
) -> str:
    # We extract this into its own function like this cause we want to make sure that
    # this recipe does not get the email of the user ID from the getUser function.
    # In fact, there is a test "email verification recipe uses getUser function only in getEmailForRecipeUserId"
    # which makes sure that this function is only called in 3 places in this recipe:
    # - this function
    # - getEmailForRecipeUserId function (above)
    # - after verification to get the updated user in verifyEmailUsingToken
    # We want to isolate the result of calling this function as much as possible
    # so that the consumer of the getUser function does not read the email
    # from the primaryUser. Hence, this function only returns the string ID
    # and nothing else from the primaryUser.
    from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe

    primary_user = (
        await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
            recipe_user_id.get_as_string(), user_context
        )
    )
    if primary_user is None:
        # This can come here if the user is using session + email verification
        # recipe with a user ID that is not known to supertokens. In this case,
        # we do not allow linking for such users.
        return recipe_user_id.get_as_string()
    return primary_user.id
async def handle_api_request(self, request_id: str, tenant_id: str, request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any]) ‑> Union[BaseResponse, None]
Expand source code
async def handle_api_request(
    self,
    request_id: str,
    tenant_id: str,
    request: BaseRequest,
    path: NormalisedURLPath,
    method: str,
    response: BaseResponse,
    user_context: Dict[str, Any],
) -> Union[BaseResponse, None]:
    api_options = APIOptions(
        request,
        response,
        self.recipe_id,
        self.config,
        self.recipe_implementation,
        self.get_app_info(),
        self.email_delivery,
    )
    if request_id == USER_EMAIL_VERIFY_TOKEN:
        return await handle_generate_email_verify_token_api(
            self.api_implementation, api_options, user_context
        )
    return await handle_email_verify_api(
        self.api_implementation, tenant_id, api_options, user_context
    )
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any]) ‑> BaseResponse
Expand source code
async def handle_error(
    self,
    request: BaseRequest,
    err: SuperTokensError,
    response: BaseResponse,
    user_context: Dict[str, Any],
) -> BaseResponse:
    if isinstance(err, EmailVerificationInvalidTokenError):
        response.set_json_content(
            {"status": "EMAIL_VERIFICATION_INVALID_TOKEN_ERROR"}
        )
        return response
    response.set_json_content({"status": "EMAIL_ALREADY_VERIFIED_ERROR"})
    return response
def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> bool
Expand source code
def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
    return isinstance(err, SuperTokensError) and isinstance(
        err, SuperTokensEmailVerificationError
    )
async def update_session_if_required_post_email_verification(self, req: BaseRequest, session: Optional[SessionContainer], recipe_user_id_whose_email_got_verified: RecipeUserId, user_context: Dict[str, Any]) ‑> Optional[SessionContainer]
Expand source code
async def update_session_if_required_post_email_verification(
    self,
    req: BaseRequest,
    session: Optional[SessionContainer],
    recipe_user_id_whose_email_got_verified: RecipeUserId,
    user_context: Dict[str, Any],
) -> Optional[SessionContainer]:
    primary_user_id = await self.get_primary_user_id_for_recipe_user(
        recipe_user_id_whose_email_got_verified, user_context
    )

    # if a session exists in the API, then we can update the session
    # claim related to email verification
    if session is not None:
        log_debug_message(
            "updateSessionIfRequiredPostEmailVerification got session"
        )
        # Due to linking, we will have to correct the current
        # session's user ID. There are four cases here:
        # --> (Case 1) User signed up and did email verification and the new account
        # became a primary user (user ID no change)
        # --> (Case 2) User signed up and did email verification and the new account got linked
        # to another primary user (user ID change)
        # --> (Case 3) This is post login account linking, in which the account that got verified
        # got linked to the session's account (user ID of account has changed to the session's user ID)
        # -->  (Case 4) This is post login account linking, in which the account that got verified
        # got linked to ANOTHER primary account (user ID of account has changed to a different user ID != session.getUserId, but
        # we should ignore this since it will result in the user's session changing.)

        if (
            session.get_recipe_user_id(user_context).get_as_string()
            == recipe_user_id_whose_email_got_verified.get_as_string()
        ):
            log_debug_message(
                "updateSessionIfRequiredPostEmailVerification the session belongs to the verified user"
            )
            # this means that the session's login method's account is the
            # one that just got verified and that we are NOT doing post login
            # account linking. So this is only for (Case 1) and (Case 2)

            if session.get_user_id() == primary_user_id:
                log_debug_message(
                    "updateSessionIfRequiredPostEmailVerification the session userId matches the primary user id, so we are only refreshing the claim"
                )
                # if the session's primary user ID is equal to the
                # primary user ID that the account was linked to, then
                # this means that the new account became a primary user (Case 1)
                # We also have the sub cases here that the account that just
                # got verified was already linked to the session's primary user ID,
                # but either way, we don't need to change any user ID.

                # In this case, all we do is to update the emailverification claim
                try:
                    # EmailVerificationClaim will be based on the recipeUserId
                    # and not the primary user ID.
                    await session.fetch_and_set_claim(
                        EmailVerificationClaim, user_context
                    )
                except Exception as err:
                    # This should never happen, since we've just set the status above.
                    if str(err) == "UNKNOWN_USER_ID":
                        raise_unauthorised_exception("Unknown User ID provided")
                    raise err

                return None
            else:
                log_debug_message(
                    "updateSessionIfRequiredPostEmailVerification the session user id doesn't match the primary user id, so we are revoking all sessions and creating a new one"
                )
                # if the session's primary user ID is NOT equal to the
                # primary user ID that the account that it was linked to, then
                # this means that the new account got linked to another primary user (Case 2)

                # In this case, we need to update the session's user ID by creating
                # a new session

                # Revoke all session belonging to session.getRecipeUserId()
                # We do not really need to do this, but we do it anyway.. no harm.
                await revoke_all_sessions_for_user(
                    recipe_user_id_whose_email_got_verified.get_as_string(),
                    False,
                    None,
                    user_context,
                )

                # create a new session and return that..
                return await create_new_session(
                    req,
                    session.get_tenant_id(),
                    session.get_recipe_user_id(user_context),
                    {},
                    {},
                    user_context,
                )
        else:
            log_debug_message(
                "updateSessionIfRequiredPostEmailVerification the verified user doesn't match the session"
            )
            # this means that the session's login method's account was NOT the
            # one that just got verified and that we ARE doing post login
            # account linking. So this is only for (Case 3) and (Case 4)

            # In both case 3 and case 4, we do not want to change anything in the
            # current session in terms of user ID or email verification claim (since
            # both of these refer to the current logged in user and not the newly
            # linked user's account).

            return None
    else:
        log_debug_message(
            "updateSessionIfRequiredPostEmailVerification got no session"
        )
        # the session is updated when the is email verification GET API is called
        # so we don't do anything in this API.
        return None
class IsVerifiedSCV (id_: str, claim: EmailVerificationClaimClass, ev_claim_validators: EmailVerificationClaimValidators, refetch_time_on_false_in_seconds: int, max_age_in_seconds: Optional[int])

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class IsVerifiedSCV(SessionClaimValidator):
    def __init__(
        self,
        id_: str,
        claim: EmailVerificationClaimClass,
        ev_claim_validators: EmailVerificationClaimValidators,
        refetch_time_on_false_in_seconds: int,
        max_age_in_seconds: Optional[int],
    ):
        super().__init__(id_)
        self.claim: EmailVerificationClaimClass = claim
        self.ev_claim_validators = ev_claim_validators
        self.refetch_time_on_false_in_ms = refetch_time_on_false_in_seconds * 1000
        self.max_age_in_sec = max_age_in_seconds

    async def validate(
        self, payload: JSONObject, user_context: Dict[str, Any]
    ) -> ClaimValidationResult:
        return await self.ev_claim_validators.has_value(
            True, self.max_age_in_sec
        ).validate(payload, user_context)

    def should_refetch(
        self, payload: JSONObject, user_context: Dict[str, Any]
    ) -> MaybeAwaitable[bool]:
        value = self.claim.get_value_from_payload(payload, user_context)
        if value is None:
            return True

        current_time = get_timestamp_ms()
        last_refetch_time = self.claim.get_last_refetch_time(payload, user_context)
        assert last_refetch_time is not None

        if self.max_age_in_sec is not None:
            if last_refetch_time < current_time - self.max_age_in_sec * 1000:
                return True

        if value is False:
            if last_refetch_time < current_time - self.refetch_time_on_false_in_ms:
                return True

        return False

Ancestors

Methods

def should_refetch(self, payload: JSONObject, user_context: Dict[str, Any]) ‑> MaybeAwaitable[bool]
Expand source code
def should_refetch(
    self, payload: JSONObject, user_context: Dict[str, Any]
) -> MaybeAwaitable[bool]:
    value = self.claim.get_value_from_payload(payload, user_context)
    if value is None:
        return True

    current_time = get_timestamp_ms()
    last_refetch_time = self.claim.get_last_refetch_time(payload, user_context)
    assert last_refetch_time is not None

    if self.max_age_in_sec is not None:
        if last_refetch_time < current_time - self.max_age_in_sec * 1000:
            return True

    if value is False:
        if last_refetch_time < current_time - self.refetch_time_on_false_in_ms:
            return True

    return False
async def validate(self, payload: JSONObject, user_context: Dict[str, Any]) ‑> ClaimValidationResult
Expand source code
async def validate(
    self, payload: JSONObject, user_context: Dict[str, Any]
) -> ClaimValidationResult:
    return await self.ev_claim_validators.has_value(
        True, self.max_age_in_sec
    ).validate(payload, user_context)