Module supertokens_python.recipe.emailpassword.recipe

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from os import environ
from typing import TYPE_CHECKING, Any, Dict, List, Union, cast

from supertokens_python.ingredients.emaildelivery import \
    EmailDeliveryIngredient
from supertokens_python.ingredients.emaildelivery.types import \
    EmailDeliveryConfig
from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.recipe.emailpassword.types import (
    EmailPasswordIngredients, EmailTemplateVars)
from supertokens_python.recipe.emailverification.types import \
    EmailVerificationIngredients, VerificationEmailTemplateVars
from supertokens_python.recipe_module import APIHandled, RecipeModule

from ...exceptions import SuperTokensError
from .api.implementation import APIImplementation
from .exceptions import FieldError, SuperTokensEmailPasswordError
from .interfaces import APIOptions
from .recipe_implementation import RecipeImplementation

if TYPE_CHECKING:
    from supertokens_python.framework.request import BaseRequest
    from supertokens_python.framework.response import BaseResponse
    from supertokens_python.supertokens import AppInfo

from supertokens_python.exceptions import (SuperTokensError,
                                           raise_general_exception)
from supertokens_python.querier import Querier
from supertokens_python.recipe.emailverification import EmailVerificationRecipe

from .api import (handle_email_exists_api,
                  handle_generate_password_reset_token_api,
                  handle_password_reset_api, handle_sign_in_api,
                  handle_sign_up_api)
from .constants import (SIGNIN, SIGNUP, SIGNUP_EMAIL_EXISTS,
                        USER_PASSWORD_RESET, USER_PASSWORD_RESET_TOKEN)
from .utils import (InputEmailVerificationConfig, InputOverrideConfig,
                    InputResetPasswordUsingTokenFeature, InputSignUpFeature,
                    validate_and_normalise_user_input)


class EmailPasswordRecipe(RecipeModule):
    recipe_id = 'emailpassword'
    __instance = None
    email_delivery: EmailDeliveryIngredient[EmailTemplateVars]

    def __init__(self, recipe_id: str, app_info: AppInfo,
                 ingredients: EmailPasswordIngredients,
                 sign_up_feature: Union[InputSignUpFeature, None] = None,
                 reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None,
                 email_verification_feature: Union[InputEmailVerificationConfig, None] = None,
                 override: Union[InputOverrideConfig, None] = None,
                 email_verification_recipe: Union[EmailVerificationRecipe, None] = None,
                 email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
                 ):
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(self, app_info, sign_up_feature,
                                                        reset_password_using_token_feature,
                                                        email_verification_feature, override, email_delivery)
        recipe_implementation = RecipeImplementation(Querier.get_instance(recipe_id))
        self.recipe_implementation = recipe_implementation if self.config.override.functions is None else \
            self.config.override.functions(recipe_implementation)

        email_delivery_ingredient = ingredients.email_delivery
        if email_delivery_ingredient is None:
            self.email_delivery = EmailDeliveryIngredient(self.config.get_email_delivery_config(self.recipe_implementation))
        else:
            self.email_delivery = email_delivery_ingredient

        if email_verification_recipe is not None:
            self.email_verification_recipe = email_verification_recipe
        else:
            ev_email_delivery_ingredient = cast(
                EmailDeliveryIngredient[VerificationEmailTemplateVars],
                self.email_delivery
            )

            email_verification_ingredients = EmailVerificationIngredients(email_delivery=ev_email_delivery_ingredient)
            self.email_verification_recipe = EmailVerificationRecipe(recipe_id, app_info,
                                                                     self.config.email_verification_feature,
                                                                     ingredients=email_verification_ingredients)

        api_implementation = APIImplementation()
        self.api_implementation = api_implementation if self.config.override.apis is None else \
            self.config.override.apis(api_implementation)

    def is_error_from_this_recipe_based_on_instance(
            self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and (
            isinstance(err, SuperTokensEmailPasswordError)
            or
            self.email_verification_recipe.is_error_from_this_recipe_based_on_instance(
                err)
        )

    def get_apis_handled(self) -> List[APIHandled]:
        return [
            APIHandled(NormalisedURLPath(SIGNUP), 'post', SIGNUP,
                       self.api_implementation.disable_sign_up_post),
            APIHandled(NormalisedURLPath(SIGNIN), 'post', SIGNIN,
                       self.api_implementation.disable_sign_in_post),
            APIHandled(NormalisedURLPath(USER_PASSWORD_RESET_TOKEN), 'post', USER_PASSWORD_RESET_TOKEN,
                       self.api_implementation.disable_generate_password_reset_token_post),
            APIHandled(NormalisedURLPath(USER_PASSWORD_RESET), 'post', USER_PASSWORD_RESET,
                       self.api_implementation.disable_password_reset_post),
            APIHandled(NormalisedURLPath(SIGNUP_EMAIL_EXISTS), 'get', SIGNUP_EMAIL_EXISTS,
                       self.api_implementation.disable_email_exists_get)

        ] + self.email_verification_recipe.get_apis_handled()

    async def handle_api_request(self, request_id: str, request: BaseRequest, path: NormalisedURLPath, method: str,
                                 response: BaseResponse):
        api_options = APIOptions(request, response, self.recipe_id, self.config, self.recipe_implementation, self.email_verification_recipe.recipe_implementation, self.email_delivery)
        if request_id == SIGNUP:
            return await handle_sign_up_api(self.api_implementation,
                                            api_options)
        if request_id == SIGNIN:
            return await handle_sign_in_api(self.api_implementation,
                                            api_options)
        if request_id == SIGNUP_EMAIL_EXISTS:
            return await handle_email_exists_api(self.api_implementation,
                                                 api_options)
        if request_id == USER_PASSWORD_RESET_TOKEN:
            return await handle_generate_password_reset_token_api(self.api_implementation,
                                                                  api_options)
        if request_id == USER_PASSWORD_RESET:
            return await handle_password_reset_api(self.api_implementation,
                                                   api_options)
        return await self.email_verification_recipe.handle_api_request(request_id, request, path, method, response)

    async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse) -> BaseResponse:
        if isinstance(err, SuperTokensEmailPasswordError):
            if isinstance(err, FieldError):
                response.set_json_content(
                    {'status': 'FIELD_ERROR', 'formFields': err.get_json_form_fields()})
                return response
        return await self.email_verification_recipe.handle_error(request, err, response)

    def get_all_cors_headers(self) -> List[str]:
        return self.email_verification_recipe.get_all_cors_headers()

    @staticmethod
    def init(sign_up_feature: Union[InputSignUpFeature, None] = None,
             reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None,
             email_verification_feature: Union[InputEmailVerificationConfig, None] = None,
             override: Union[InputOverrideConfig, None] = None,
             email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None):
        def func(app_info: AppInfo):
            if EmailPasswordRecipe.__instance is None:
                ingredients = EmailPasswordIngredients(None)
                EmailPasswordRecipe.__instance = EmailPasswordRecipe(EmailPasswordRecipe.recipe_id, app_info, ingredients, sign_up_feature, reset_password_using_token_feature, email_verification_feature, override, email_delivery=email_delivery)
                return EmailPasswordRecipe.__instance
            raise Exception(None, 'Emailpassword recipe has already been initialised. Please check your code for bugs.')

        return func

    @staticmethod
    def get_instance() -> EmailPasswordRecipe:
        if EmailPasswordRecipe.__instance is not None:
            return EmailPasswordRecipe.__instance
        raise_general_exception(
            'Initialisation not done. Did you forget to call the SuperTokens.init function?')

    @staticmethod
    def reset():
        if ('SUPERTOKENS_ENV' not in environ) or (
                environ['SUPERTOKENS_ENV'] != 'testing'):
            raise_general_exception(
                'calling testing function in non testing env')
        EmailPasswordRecipe.__instance = None

    # instance functions below...............

    async def get_email_for_user_id(self, user_id: str, user_context: Dict[str, Any]) -> str:
        user_info = await self.recipe_implementation.get_user_by_id(user_id, user_context)
        if user_info is None:
            raise Exception('Unknown User ID provided')
        return user_info.email

Classes

class EmailPasswordRecipe (recipe_id: str, app_info: AppInfo, ingredients: EmailPasswordIngredients, sign_up_feature: Union[InputSignUpFeature, None] = None, reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None, email_verification_feature: Union[InputEmailVerificationConfig, None] = None, override: Union[InputOverrideConfig, None] = None, email_verification_recipe: Union[EmailVerificationRecipe, None] = None, email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class EmailPasswordRecipe(RecipeModule):
    recipe_id = 'emailpassword'
    __instance = None
    email_delivery: EmailDeliveryIngredient[EmailTemplateVars]

    def __init__(self, recipe_id: str, app_info: AppInfo,
                 ingredients: EmailPasswordIngredients,
                 sign_up_feature: Union[InputSignUpFeature, None] = None,
                 reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None,
                 email_verification_feature: Union[InputEmailVerificationConfig, None] = None,
                 override: Union[InputOverrideConfig, None] = None,
                 email_verification_recipe: Union[EmailVerificationRecipe, None] = None,
                 email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None,
                 ):
        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(self, app_info, sign_up_feature,
                                                        reset_password_using_token_feature,
                                                        email_verification_feature, override, email_delivery)
        recipe_implementation = RecipeImplementation(Querier.get_instance(recipe_id))
        self.recipe_implementation = recipe_implementation if self.config.override.functions is None else \
            self.config.override.functions(recipe_implementation)

        email_delivery_ingredient = ingredients.email_delivery
        if email_delivery_ingredient is None:
            self.email_delivery = EmailDeliveryIngredient(self.config.get_email_delivery_config(self.recipe_implementation))
        else:
            self.email_delivery = email_delivery_ingredient

        if email_verification_recipe is not None:
            self.email_verification_recipe = email_verification_recipe
        else:
            ev_email_delivery_ingredient = cast(
                EmailDeliveryIngredient[VerificationEmailTemplateVars],
                self.email_delivery
            )

            email_verification_ingredients = EmailVerificationIngredients(email_delivery=ev_email_delivery_ingredient)
            self.email_verification_recipe = EmailVerificationRecipe(recipe_id, app_info,
                                                                     self.config.email_verification_feature,
                                                                     ingredients=email_verification_ingredients)

        api_implementation = APIImplementation()
        self.api_implementation = api_implementation if self.config.override.apis is None else \
            self.config.override.apis(api_implementation)

    def is_error_from_this_recipe_based_on_instance(
            self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and (
            isinstance(err, SuperTokensEmailPasswordError)
            or
            self.email_verification_recipe.is_error_from_this_recipe_based_on_instance(
                err)
        )

    def get_apis_handled(self) -> List[APIHandled]:
        return [
            APIHandled(NormalisedURLPath(SIGNUP), 'post', SIGNUP,
                       self.api_implementation.disable_sign_up_post),
            APIHandled(NormalisedURLPath(SIGNIN), 'post', SIGNIN,
                       self.api_implementation.disable_sign_in_post),
            APIHandled(NormalisedURLPath(USER_PASSWORD_RESET_TOKEN), 'post', USER_PASSWORD_RESET_TOKEN,
                       self.api_implementation.disable_generate_password_reset_token_post),
            APIHandled(NormalisedURLPath(USER_PASSWORD_RESET), 'post', USER_PASSWORD_RESET,
                       self.api_implementation.disable_password_reset_post),
            APIHandled(NormalisedURLPath(SIGNUP_EMAIL_EXISTS), 'get', SIGNUP_EMAIL_EXISTS,
                       self.api_implementation.disable_email_exists_get)

        ] + self.email_verification_recipe.get_apis_handled()

    async def handle_api_request(self, request_id: str, request: BaseRequest, path: NormalisedURLPath, method: str,
                                 response: BaseResponse):
        api_options = APIOptions(request, response, self.recipe_id, self.config, self.recipe_implementation, self.email_verification_recipe.recipe_implementation, self.email_delivery)
        if request_id == SIGNUP:
            return await handle_sign_up_api(self.api_implementation,
                                            api_options)
        if request_id == SIGNIN:
            return await handle_sign_in_api(self.api_implementation,
                                            api_options)
        if request_id == SIGNUP_EMAIL_EXISTS:
            return await handle_email_exists_api(self.api_implementation,
                                                 api_options)
        if request_id == USER_PASSWORD_RESET_TOKEN:
            return await handle_generate_password_reset_token_api(self.api_implementation,
                                                                  api_options)
        if request_id == USER_PASSWORD_RESET:
            return await handle_password_reset_api(self.api_implementation,
                                                   api_options)
        return await self.email_verification_recipe.handle_api_request(request_id, request, path, method, response)

    async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse) -> BaseResponse:
        if isinstance(err, SuperTokensEmailPasswordError):
            if isinstance(err, FieldError):
                response.set_json_content(
                    {'status': 'FIELD_ERROR', 'formFields': err.get_json_form_fields()})
                return response
        return await self.email_verification_recipe.handle_error(request, err, response)

    def get_all_cors_headers(self) -> List[str]:
        return self.email_verification_recipe.get_all_cors_headers()

    @staticmethod
    def init(sign_up_feature: Union[InputSignUpFeature, None] = None,
             reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None,
             email_verification_feature: Union[InputEmailVerificationConfig, None] = None,
             override: Union[InputOverrideConfig, None] = None,
             email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None):
        def func(app_info: AppInfo):
            if EmailPasswordRecipe.__instance is None:
                ingredients = EmailPasswordIngredients(None)
                EmailPasswordRecipe.__instance = EmailPasswordRecipe(EmailPasswordRecipe.recipe_id, app_info, ingredients, sign_up_feature, reset_password_using_token_feature, email_verification_feature, override, email_delivery=email_delivery)
                return EmailPasswordRecipe.__instance
            raise Exception(None, 'Emailpassword recipe has already been initialised. Please check your code for bugs.')

        return func

    @staticmethod
    def get_instance() -> EmailPasswordRecipe:
        if EmailPasswordRecipe.__instance is not None:
            return EmailPasswordRecipe.__instance
        raise_general_exception(
            'Initialisation not done. Did you forget to call the SuperTokens.init function?')

    @staticmethod
    def reset():
        if ('SUPERTOKENS_ENV' not in environ) or (
                environ['SUPERTOKENS_ENV'] != 'testing'):
            raise_general_exception(
                'calling testing function in non testing env')
        EmailPasswordRecipe.__instance = None

    # instance functions below...............

    async def get_email_for_user_id(self, user_id: str, user_context: Dict[str, Any]) -> str:
        user_info = await self.recipe_implementation.get_user_by_id(user_id, user_context)
        if user_info is None:
            raise Exception('Unknown User ID provided')
        return user_info.email

Ancestors

Class variables

var email_deliveryEmailDeliveryIngredient[typing.Union[PasswordResetEmailTemplateVarsVerificationEmailTemplateVars]]
var recipe_id

Static methods

def get_instance() ‑> EmailPasswordRecipe
Expand source code
@staticmethod
def get_instance() -> EmailPasswordRecipe:
    if EmailPasswordRecipe.__instance is not None:
        return EmailPasswordRecipe.__instance
    raise_general_exception(
        'Initialisation not done. Did you forget to call the SuperTokens.init function?')
def init(sign_up_feature: Union[InputSignUpFeature, None] = None, reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None, email_verification_feature: Union[InputEmailVerificationConfig, None] = None, override: Union[InputOverrideConfig, None] = None, email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None)
Expand source code
@staticmethod
def init(sign_up_feature: Union[InputSignUpFeature, None] = None,
         reset_password_using_token_feature: Union[InputResetPasswordUsingTokenFeature, None] = None,
         email_verification_feature: Union[InputEmailVerificationConfig, None] = None,
         override: Union[InputOverrideConfig, None] = None,
         email_delivery: Union[EmailDeliveryConfig[EmailTemplateVars], None] = None):
    def func(app_info: AppInfo):
        if EmailPasswordRecipe.__instance is None:
            ingredients = EmailPasswordIngredients(None)
            EmailPasswordRecipe.__instance = EmailPasswordRecipe(EmailPasswordRecipe.recipe_id, app_info, ingredients, sign_up_feature, reset_password_using_token_feature, email_verification_feature, override, email_delivery=email_delivery)
            return EmailPasswordRecipe.__instance
        raise Exception(None, 'Emailpassword recipe has already been initialised. Please check your code for bugs.')

    return func
def reset()
Expand source code
@staticmethod
def reset():
    if ('SUPERTOKENS_ENV' not in environ) or (
            environ['SUPERTOKENS_ENV'] != 'testing'):
        raise_general_exception(
            'calling testing function in non testing env')
    EmailPasswordRecipe.__instance = None

Methods

def get_all_cors_headers(self) ‑> List[str]
Expand source code
def get_all_cors_headers(self) -> List[str]:
    return self.email_verification_recipe.get_all_cors_headers()
def get_apis_handled(self) ‑> List[APIHandled]
Expand source code
def get_apis_handled(self) -> List[APIHandled]:
    return [
        APIHandled(NormalisedURLPath(SIGNUP), 'post', SIGNUP,
                   self.api_implementation.disable_sign_up_post),
        APIHandled(NormalisedURLPath(SIGNIN), 'post', SIGNIN,
                   self.api_implementation.disable_sign_in_post),
        APIHandled(NormalisedURLPath(USER_PASSWORD_RESET_TOKEN), 'post', USER_PASSWORD_RESET_TOKEN,
                   self.api_implementation.disable_generate_password_reset_token_post),
        APIHandled(NormalisedURLPath(USER_PASSWORD_RESET), 'post', USER_PASSWORD_RESET,
                   self.api_implementation.disable_password_reset_post),
        APIHandled(NormalisedURLPath(SIGNUP_EMAIL_EXISTS), 'get', SIGNUP_EMAIL_EXISTS,
                   self.api_implementation.disable_email_exists_get)

    ] + self.email_verification_recipe.get_apis_handled()
async def get_email_for_user_id(self, user_id: str, user_context: Dict[str, Any]) ‑> str
Expand source code
async def get_email_for_user_id(self, user_id: str, user_context: Dict[str, Any]) -> str:
    user_info = await self.recipe_implementation.get_user_by_id(user_id, user_context)
    if user_info is None:
        raise Exception('Unknown User ID provided')
    return user_info.email
async def handle_api_request(self, request_id: str, request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse)
Expand source code
async def handle_api_request(self, request_id: str, request: BaseRequest, path: NormalisedURLPath, method: str,
                             response: BaseResponse):
    api_options = APIOptions(request, response, self.recipe_id, self.config, self.recipe_implementation, self.email_verification_recipe.recipe_implementation, self.email_delivery)
    if request_id == SIGNUP:
        return await handle_sign_up_api(self.api_implementation,
                                        api_options)
    if request_id == SIGNIN:
        return await handle_sign_in_api(self.api_implementation,
                                        api_options)
    if request_id == SIGNUP_EMAIL_EXISTS:
        return await handle_email_exists_api(self.api_implementation,
                                             api_options)
    if request_id == USER_PASSWORD_RESET_TOKEN:
        return await handle_generate_password_reset_token_api(self.api_implementation,
                                                              api_options)
    if request_id == USER_PASSWORD_RESET:
        return await handle_password_reset_api(self.api_implementation,
                                               api_options)
    return await self.email_verification_recipe.handle_api_request(request_id, request, path, method, response)
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse) ‑> BaseResponse
Expand source code
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse) -> BaseResponse:
    if isinstance(err, SuperTokensEmailPasswordError):
        if isinstance(err, FieldError):
            response.set_json_content(
                {'status': 'FIELD_ERROR', 'formFields': err.get_json_form_fields()})
            return response
    return await self.email_verification_recipe.handle_error(request, err, response)
def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> bool
Expand source code
def is_error_from_this_recipe_based_on_instance(
        self, err: Exception) -> bool:
    return isinstance(err, SuperTokensError) and (
        isinstance(err, SuperTokensEmailPasswordError)
        or
        self.email_verification_recipe.is_error_from_this_recipe_based_on_instance(
            err)
    )