Module supertokens_python.recipe.thirdpartyemailpassword.utils
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import List, Callable, TYPE_CHECKING, Union
from .interfaces import RecipeInterface, APIInterface
from .types import (
INPUT_SCHEMA,
NextPaginationToken
)
if TYPE_CHECKING:
from .recipe import ThirdPartyEmailPasswordRecipe
from supertokens_python.utils import validate_the_structure_of_user_input, utf_base64decode, utf_base64encode
from supertokens_python.recipe.emailpassword.types import UsersResponse
class SignUpFeature:
def __init__(self, form_fields: List):
self.form_fields = form_fields
def validate_and_normalise_sign_up_config(config=None) -> SignUpFeature:
if config is None:
config = {}
form_fields = config['form_fields'] if 'form_fields' in config else []
return SignUpFeature(form_fields)
def email_verification_create_and_send_custom_email(recipe: ThirdPartyEmailPasswordRecipe,
create_and_send_custom_email):
async def func(user, link):
user_info = await recipe.recipe_implementation.get_user_by_id(user.id)
if user_info is None:
raise Exception('User ID unknown')
return await create_and_send_custom_email(user_info, link)
return func
def email_verification_get_email_verification_url(
recipe: ThirdPartyEmailPasswordRecipe, get_email_verification_url):
async def func(user):
user_info = await recipe.recipe_implementation.get_user_by_id(user.id)
if user_info is None:
raise Exception('User ID unknown')
return await get_email_verification_url(user_info)
return func
def validate_and_normalise_email_verification_config(
recipe: ThirdPartyEmailPasswordRecipe, config=None, override=None):
create_and_send_custom_email = None
get_email_verification_url = None
if config is None:
config = {}
if override is None:
override = {}
if 'create_and_send_custom_email' in config:
create_and_send_custom_email = email_verification_create_and_send_custom_email(recipe, config[
'create_and_send_custom_email'])
if 'get_email_verification_url' in config:
get_email_verification_url = email_verification_get_email_verification_url(recipe,
config['get_email_verification_url'])
return {
'get_email_for_user_id': recipe.get_email_for_user_id,
'create_and_send_custom_email': create_and_send_custom_email,
'get_email_verification_url': get_email_verification_url,
'override': override
}
class OverrideConfig:
def __init__(self, functions: Union[Callable[[RecipeInterface], RecipeInterface], None],
apis: Union[Callable[[APIInterface], APIInterface], None]):
self.functions = functions
self.apis = apis
class ThirdPartyEmailPasswordConfig:
def __init__(self,
sign_up_feature: SignUpFeature,
email_verification_feature: any,
providers: List,
reset_password_using_token_feature: any,
override: OverrideConfig):
self.sign_up_feature = sign_up_feature
self.email_verification_feature = email_verification_feature
self.providers = providers
self.reset_password_using_token_feature = reset_password_using_token_feature
self.override = override
def validate_and_normalise_user_input(
recipe: ThirdPartyEmailPasswordRecipe, config) -> ThirdPartyEmailPasswordConfig:
validate_the_structure_of_user_input(
config, INPUT_SCHEMA, 'thirdpartyemailpassword recipe', recipe)
sign_up_feature = validate_and_normalise_sign_up_config(
config['sign_up_feature'] if 'sign_up_feature' in config else None)
email_verification_feature = validate_and_normalise_email_verification_config(
recipe,
config['email_verification_feature'] if 'email_verification_feature' in config else None)
providers = config['providers'] if 'providers' in config else []
reset_password_using_token_feature = config[
'reset_password_using_token_feature'] if 'reset_password_using_token_feature' in config else {}
override_functions = config['override']['functions'] if 'override' in config and 'functions' in config[
'override'] else None
override_apis = config['override']['apis'] if 'override' in config and 'apis' in config[
'override'] else None
override = OverrideConfig(override_functions, override_apis)
return ThirdPartyEmailPasswordConfig(sign_up_feature, email_verification_feature, providers,
reset_password_using_token_feature, override)
def create_new_pagination_token(user_id: str, time_joined: int) -> str:
return utf_base64encode(user_id + ';' + str(time_joined))
def combine_pagination_tokens(third_party_pagination_token: Union[str, None],
email_password_pagination_token: Union[str, None]):
if third_party_pagination_token is None:
third_party_pagination_token = 'null'
if email_password_pagination_token is None:
email_password_pagination_token = 'null'
return utf_base64encode(third_party_pagination_token + ';' + email_password_pagination_token)
def extract_pagination_token(
next_pagination_token: str) -> NextPaginationToken:
extracted_tokens = utf_base64decode(next_pagination_token).split(';')
if len(extracted_tokens) != 2:
raise Exception('Pagination token is invalid')
return NextPaginationToken(None if extracted_tokens[0] == 'null' else extracted_tokens[0],
None if extracted_tokens[1] == 'null' else extracted_tokens[1])
def combine_pagination_results(third_party_result: UsersResponse, email_password_result: UsersResponse, limit: int,
oldest_first: bool) -> UsersResponse:
max_loop = min(
limit, len(
third_party_result.users), len(
email_password_result.users))
third_party_result_loop_index = 0
email_password_result_loop_index = 0
users = []
for i in range(max_loop):
if (
third_party_result_loop_index != len(third_party_result.users)
and
(
email_password_result_loop_index == len(
email_password_result.users)
or
(
oldest_first and third_party_result.users[third_party_result_loop_index].time_joined <
email_password_result.users[email_password_result_loop_index].time_joined
)
or
(
not oldest_first and third_party_result.users[
third_party_result_loop_index].time_joined >
email_password_result.users[email_password_result_loop_index].time_joined
)
)
):
users.append(
third_party_result.users[third_party_result_loop_index])
third_party_result_loop_index += 1
else:
users.append(
email_password_result.users[third_party_result_loop_index])
email_password_result_loop_index += 1
if third_party_result_loop_index == len(third_party_result.users):
third_party_pagination_token = third_party_result.next_pagination_token
else:
third_party_pagination_token = create_new_pagination_token(
third_party_result.users[third_party_result_loop_index].user_id,
third_party_result.users[third_party_result_loop_index].time_joined
)
if email_password_result_loop_index == len(email_password_result.users):
email_password_pagination_token = email_password_result.next_pagination_token
else:
email_password_pagination_token = create_new_pagination_token(
email_password_result.users[email_password_result_loop_index].user_id,
email_password_result.users[email_password_result_loop_index].time_joined
)
next_pagination_token = combine_pagination_tokens(
third_party_pagination_token, email_password_pagination_token)
return UsersResponse(users, next_pagination_token)
Functions
def combine_pagination_results(third_party_result: UsersResponse, email_password_result: UsersResponse, limit: int, oldest_first: bool) ‑> UsersResponse
-
Expand source code
def combine_pagination_results(third_party_result: UsersResponse, email_password_result: UsersResponse, limit: int, oldest_first: bool) -> UsersResponse: max_loop = min( limit, len( third_party_result.users), len( email_password_result.users)) third_party_result_loop_index = 0 email_password_result_loop_index = 0 users = [] for i in range(max_loop): if ( third_party_result_loop_index != len(third_party_result.users) and ( email_password_result_loop_index == len( email_password_result.users) or ( oldest_first and third_party_result.users[third_party_result_loop_index].time_joined < email_password_result.users[email_password_result_loop_index].time_joined ) or ( not oldest_first and third_party_result.users[ third_party_result_loop_index].time_joined > email_password_result.users[email_password_result_loop_index].time_joined ) ) ): users.append( third_party_result.users[third_party_result_loop_index]) third_party_result_loop_index += 1 else: users.append( email_password_result.users[third_party_result_loop_index]) email_password_result_loop_index += 1 if third_party_result_loop_index == len(third_party_result.users): third_party_pagination_token = third_party_result.next_pagination_token else: third_party_pagination_token = create_new_pagination_token( third_party_result.users[third_party_result_loop_index].user_id, third_party_result.users[third_party_result_loop_index].time_joined ) if email_password_result_loop_index == len(email_password_result.users): email_password_pagination_token = email_password_result.next_pagination_token else: email_password_pagination_token = create_new_pagination_token( email_password_result.users[email_password_result_loop_index].user_id, email_password_result.users[email_password_result_loop_index].time_joined ) next_pagination_token = combine_pagination_tokens( third_party_pagination_token, email_password_pagination_token) return UsersResponse(users, next_pagination_token)
def combine_pagination_tokens(third_party_pagination_token: Union[str, None], email_password_pagination_token: Union[str, None])
-
Expand source code
def combine_pagination_tokens(third_party_pagination_token: Union[str, None], email_password_pagination_token: Union[str, None]): if third_party_pagination_token is None: third_party_pagination_token = 'null' if email_password_pagination_token is None: email_password_pagination_token = 'null' return utf_base64encode(third_party_pagination_token + ';' + email_password_pagination_token)
def create_new_pagination_token(user_id: str, time_joined: int) ‑> str
-
Expand source code
def create_new_pagination_token(user_id: str, time_joined: int) -> str: return utf_base64encode(user_id + ';' + str(time_joined))
def email_verification_create_and_send_custom_email(recipe: ThirdPartyEmailPasswordRecipe, create_and_send_custom_email)
-
Expand source code
def email_verification_create_and_send_custom_email(recipe: ThirdPartyEmailPasswordRecipe, create_and_send_custom_email): async def func(user, link): user_info = await recipe.recipe_implementation.get_user_by_id(user.id) if user_info is None: raise Exception('User ID unknown') return await create_and_send_custom_email(user_info, link) return func
def email_verification_get_email_verification_url(recipe: ThirdPartyEmailPasswordRecipe, get_email_verification_url)
-
Expand source code
def email_verification_get_email_verification_url( recipe: ThirdPartyEmailPasswordRecipe, get_email_verification_url): async def func(user): user_info = await recipe.recipe_implementation.get_user_by_id(user.id) if user_info is None: raise Exception('User ID unknown') return await get_email_verification_url(user_info) return func
def extract_pagination_token(next_pagination_token: str) ‑> NextPaginationToken
-
Expand source code
def extract_pagination_token( next_pagination_token: str) -> NextPaginationToken: extracted_tokens = utf_base64decode(next_pagination_token).split(';') if len(extracted_tokens) != 2: raise Exception('Pagination token is invalid') return NextPaginationToken(None if extracted_tokens[0] == 'null' else extracted_tokens[0], None if extracted_tokens[1] == 'null' else extracted_tokens[1])
def validate_and_normalise_email_verification_config(recipe: ThirdPartyEmailPasswordRecipe, config=None, override=None)
-
Expand source code
def validate_and_normalise_email_verification_config( recipe: ThirdPartyEmailPasswordRecipe, config=None, override=None): create_and_send_custom_email = None get_email_verification_url = None if config is None: config = {} if override is None: override = {} if 'create_and_send_custom_email' in config: create_and_send_custom_email = email_verification_create_and_send_custom_email(recipe, config[ 'create_and_send_custom_email']) if 'get_email_verification_url' in config: get_email_verification_url = email_verification_get_email_verification_url(recipe, config['get_email_verification_url']) return { 'get_email_for_user_id': recipe.get_email_for_user_id, 'create_and_send_custom_email': create_and_send_custom_email, 'get_email_verification_url': get_email_verification_url, 'override': override }
def validate_and_normalise_sign_up_config(config=None) ‑> SignUpFeature
-
Expand source code
def validate_and_normalise_sign_up_config(config=None) -> SignUpFeature: if config is None: config = {} form_fields = config['form_fields'] if 'form_fields' in config else [] return SignUpFeature(form_fields)
def validate_and_normalise_user_input(recipe: ThirdPartyEmailPasswordRecipe, config) ‑> ThirdPartyEmailPasswordConfig
-
Expand source code
def validate_and_normalise_user_input( recipe: ThirdPartyEmailPasswordRecipe, config) -> ThirdPartyEmailPasswordConfig: validate_the_structure_of_user_input( config, INPUT_SCHEMA, 'thirdpartyemailpassword recipe', recipe) sign_up_feature = validate_and_normalise_sign_up_config( config['sign_up_feature'] if 'sign_up_feature' in config else None) email_verification_feature = validate_and_normalise_email_verification_config( recipe, config['email_verification_feature'] if 'email_verification_feature' in config else None) providers = config['providers'] if 'providers' in config else [] reset_password_using_token_feature = config[ 'reset_password_using_token_feature'] if 'reset_password_using_token_feature' in config else {} override_functions = config['override']['functions'] if 'override' in config and 'functions' in config[ 'override'] else None override_apis = config['override']['apis'] if 'override' in config and 'apis' in config[ 'override'] else None override = OverrideConfig(override_functions, override_apis) return ThirdPartyEmailPasswordConfig(sign_up_feature, email_verification_feature, providers, reset_password_using_token_feature, override)
Classes
class OverrideConfig (functions: Union[Callable[[RecipeInterface], RecipeInterface], None], apis: Union[Callable[[APIInterface], APIInterface], None])
-
Expand source code
class OverrideConfig: def __init__(self, functions: Union[Callable[[RecipeInterface], RecipeInterface], None], apis: Union[Callable[[APIInterface], APIInterface], None]): self.functions = functions self.apis = apis
class SignUpFeature (form_fields: List)
-
Expand source code
class SignUpFeature: def __init__(self, form_fields: List): self.form_fields = form_fields
class ThirdPartyEmailPasswordConfig (sign_up_feature: SignUpFeature, email_verification_feature: any, providers: List, reset_password_using_token_feature: any, override: OverrideConfig)
-
Expand source code
class ThirdPartyEmailPasswordConfig: def __init__(self, sign_up_feature: SignUpFeature, email_verification_feature: any, providers: List, reset_password_using_token_feature: any, override: OverrideConfig): self.sign_up_feature = sign_up_feature self.email_verification_feature = email_verification_feature self.providers = providers self.reset_password_using_token_feature = reset_password_using_token_feature self.override = override