Module supertokens_python.recipe.accountlinking.recipe
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from os import environ
from typing import Any, Dict, List, Union, TYPE_CHECKING, Optional, Callable, Awaitable
from supertokens_python.supertokens import Supertokens
from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.recipe_module import APIHandled, RecipeModule
from .utils import validate_and_normalise_user_input
from supertokens_python.exceptions import SuperTokensError, raise_general_exception
from .recipe_implementation import RecipeImplementation
from supertokens_python.querier import Querier
from supertokens_python.logger import (
log_debug_message,
)
from supertokens_python.process_state import PROCESS_STATE, ProcessState
from typing_extensions import Literal
from .types import (
RecipeLevelUser,
ShouldAutomaticallyLink,
ShouldNotAutomaticallyLink,
AccountInfoWithRecipeIdAndUserId,
InputOverrideConfig,
AccountInfoWithRecipeId,
AccountInfo,
)
from .interfaces import RecipeInterface
if TYPE_CHECKING:
from supertokens_python.supertokens import AppInfo
from supertokens_python.types import User, LoginMethod, RecipeUserId
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.framework import BaseRequest, BaseResponse
from supertokens_python.recipe.emailverification.recipe import (
EmailVerificationRecipe,
)
class EmailChangeAllowedResult:
def __init__(
self,
allowed: bool,
reason: Literal["OK", "PRIMARY_USER_CONFLICT", "ACCOUNT_TAKEOVER_RISK"],
):
self.allowed = allowed
self.reason: Literal["OK", "PRIMARY_USER_CONFLICT", "ACCOUNT_TAKEOVER_RISK"] = (
reason
)
class TryLinkingByAccountInfoOrCreatePrimaryUserResult:
def __init__(self, status: Literal["OK", "NO_LINK"], user: Optional[User]):
self.status: Literal["OK", "NO_LINK"] = status
self.user = user
class AccountLinkingRecipe(RecipeModule):
recipe_id = "accountlinking"
__instance = None
def __init__(
self,
recipe_id: str,
app_info: AppInfo,
on_account_linked: Optional[
Callable[[User, RecipeLevelUser, Dict[str, Any]], Awaitable[None]]
] = None,
should_do_automatic_account_linking: Optional[
Callable[
[
AccountInfoWithRecipeIdAndUserId,
Optional[User],
Optional[SessionContainer],
str,
Dict[str, Any],
],
Awaitable[Union[ShouldNotAutomaticallyLink, ShouldAutomaticallyLink]],
]
] = None,
override: Optional[InputOverrideConfig] = None,
):
super().__init__(recipe_id, app_info)
self.config = validate_and_normalise_user_input(
app_info, on_account_linked, should_do_automatic_account_linking, override
)
recipe_implementation: RecipeInterface = RecipeImplementation(
Querier.get_instance(recipe_id), self, self.config
)
self.recipe_implementation: RecipeInterface = (
recipe_implementation
if self.config.override.functions is None
else self.config.override.functions(recipe_implementation)
)
self.email_verification_recipe: EmailVerificationRecipe | None = None
def register_email_verification_recipe(
self, email_verification_recipe: EmailVerificationRecipe
):
self.email_verification_recipe = email_verification_recipe
def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
return False
def get_apis_handled(self) -> List[APIHandled]:
return []
async def handle_api_request(
self,
request_id: str,
tenant_id: Optional[str],
request: BaseRequest,
path: NormalisedURLPath,
method: str,
response: BaseResponse,
user_context: Dict[str, Any],
) -> Union[BaseResponse, None]:
raise Exception("Should never come here")
async def handle_error(
self,
request: BaseRequest,
err: SuperTokensError,
response: BaseResponse,
user_context: Dict[str, Any],
) -> BaseResponse:
raise err
def get_all_cors_headers(self) -> List[str]:
return []
@staticmethod
def init(
on_account_linked: Optional[
Callable[[User, RecipeLevelUser, Dict[str, Any]], Awaitable[None]]
] = None,
should_do_automatic_account_linking: Optional[
Callable[
[
AccountInfoWithRecipeIdAndUserId,
Optional[User],
Optional[SessionContainer],
str,
Dict[str, Any],
],
Awaitable[Union[ShouldNotAutomaticallyLink, ShouldAutomaticallyLink]],
]
] = None,
override: Optional[InputOverrideConfig] = None,
):
def func(app_info: AppInfo):
if AccountLinkingRecipe.__instance is None:
AccountLinkingRecipe.__instance = AccountLinkingRecipe(
AccountLinkingRecipe.recipe_id,
app_info,
on_account_linked,
should_do_automatic_account_linking,
override,
)
return AccountLinkingRecipe.__instance
raise Exception(
None,
"Accountlinking recipe has already been initialised. Please check your code for bugs.",
)
return func
@staticmethod
def get_instance() -> AccountLinkingRecipe:
if AccountLinkingRecipe.__instance is None:
AccountLinkingRecipe.init()(Supertokens.get_instance().app_info)
assert AccountLinkingRecipe.__instance is not None
return AccountLinkingRecipe.__instance
@staticmethod
def reset():
if ("SUPERTOKENS_ENV" not in environ) or (
environ["SUPERTOKENS_ENV"] != "testing"
):
raise_general_exception("calling testing function in non testing env")
AccountLinkingRecipe.__instance = None
async def get_primary_user_that_can_be_linked_to_recipe_user_id(
self,
tenant_id: str,
user: User,
user_context: Dict[str, Any],
) -> Optional[User]:
# First we check if this user itself is a primary user or not. If it is, we return that.
if user.is_primary_user:
return user
# Then, we try and find a primary user based on the email / phone number / third party ID.
users = await self.recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=user.login_methods[0],
do_union_of_account_info=True,
user_context=user_context,
)
log_debug_message(
"getPrimaryUserThatCanBeLinkedToRecipeUserId found %d matching users"
% len(users)
)
primary_users = [u for u in users if u.is_primary_user]
log_debug_message(
"getPrimaryUserThatCanBeLinkedToRecipeUserId found %d matching primary users"
% len(primary_users)
)
if len(primary_users) > 1:
# This means that the new user has account info such that it's
# spread across multiple primary user IDs. In this case, even
# if we return one of them, it won't be able to be linked anyway
# cause if we did, it would mean 2 primary users would have the
# same account info. So we return None
# This being said, with the current set of auth recipes, it should
# never come here - cause:
# ----> If the recipeuserid is a passwordless user, then it can have either a phone
# email or both. If it has just one of them, then anyway 2 primary users can't
# exist with the same phone number / email. If it has both, then the only way
# that it can have multiple primary users returned is if there is another passwordless
# primary user with the same phone number - which is not possible, cause phone
# numbers are unique across passwordless users.
#
# ----> If the input is a third party user, then it has third party info and an email. Now there can be able to primary user with the same email, but
# there can't be another thirdparty user with the same third party info (since that is unique).
# Nor can there an email password primary user with the same email along with another
# thirdparty primary user with the same email (since emails can't be the same across primary users).
#
# ----> If the input is an email password user, then it has an email. There can't be multiple primary users with the same email anyway.
raise Exception(
"You found a bug. Please report it on github.com/supertokens/supertokens-node"
)
return primary_users[0] if len(primary_users) > 0 else None
async def get_oldest_user_that_can_be_linked_to_recipe_user(
self,
tenant_id: str,
user: User,
user_context: Dict[str, Any],
) -> Optional[User]:
# First we check if this user itself is a primary user or not. If it is, we return that since it cannot be linked to anything else
if user.is_primary_user:
return user
# Then, we try and find matching users based on the email / phone number / third party ID.
users = await self.recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=user.login_methods[0],
do_union_of_account_info=True,
user_context=user_context,
)
log_debug_message(
f"getOldestUserThatCanBeLinkedToRecipeUser found {len(users)} matching users"
)
# Finally select the oldest one
oldest_user = min(users, key=lambda u: u.time_joined) if users else None
return oldest_user
async def is_sign_in_allowed(
self,
user: User,
account_info: Union[AccountInfoWithRecipeId, LoginMethod],
tenant_id: str,
session: Optional[SessionContainer],
sign_in_verifies_login_method: bool,
user_context: Dict[str, Any],
) -> bool:
ProcessState.get_instance().add_state(PROCESS_STATE.IS_SIGN_IN_ALLOWED_CALLED)
if (
user.is_primary_user
or user.login_methods[0].verified
or sign_in_verifies_login_method
):
return True
return await self.is_sign_in_up_allowed_helper(
account_info=account_info,
is_verified=user.login_methods[0].verified,
session=session,
tenant_id=tenant_id,
is_sign_in=True,
user=user,
user_context=user_context,
)
async def is_sign_up_allowed(
self,
new_user: AccountInfoWithRecipeId,
is_verified: bool,
session: Optional[SessionContainer],
tenant_id: str,
user_context: Dict[str, Any],
) -> bool:
ProcessState.get_instance().add_state(PROCESS_STATE.IS_SIGN_UP_ALLOWED_CALLED)
if new_user.email is not None and new_user.phone_number is not None:
# We do this check cause below when we call list_users_by_account_info,
# we only pass in one of email or phone number
raise Exception("Please pass one of email or phone number, not both")
return await self.is_sign_in_up_allowed_helper(
account_info=new_user,
is_verified=is_verified,
session=session,
tenant_id=tenant_id,
user_context=user_context,
user=None,
is_sign_in=False,
)
async def is_sign_in_up_allowed_helper(
self,
account_info: Union[AccountInfoWithRecipeId, LoginMethod],
is_verified: bool,
session: Optional[SessionContainer],
tenant_id: str,
is_sign_in: bool,
user: Optional[User],
user_context: Dict[str, Any],
) -> bool:
ProcessState.get_instance().add_state(
PROCESS_STATE.IS_SIGN_IN_UP_ALLOWED_HELPER_CALLED
)
users = await self.recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=account_info,
do_union_of_account_info=True,
user_context=user_context,
)
if not users:
log_debug_message(
"isSignInUpAllowedHelper returning true because no user with given account info"
)
return True
if is_sign_in and user is None:
raise Exception(
"This should never happen: isSignInUpAllowedHelper called with isSignIn: true, user: None"
)
if (
len(users) == 1
and is_sign_in
and user is not None
and users[0].id == user.id
):
log_debug_message(
"isSignInUpAllowedHelper returning true because this is sign in and there is only a single user with the given account info"
)
return True
primary_users = [u for u in users if u.is_primary_user]
if not primary_users:
log_debug_message("isSignInUpAllowedHelper no primary user exists")
should_do_account_linking = (
await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
account_info
),
None,
session,
tenant_id,
user_context,
)
)
if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink):
log_debug_message(
"isSignInUpAllowedHelper returning true because account linking is disabled"
)
return True
if not should_do_account_linking.should_require_verification:
log_debug_message(
"isSignInUpAllowedHelper returning true because dev does not require email verification"
)
return True
should_allow = True
for curr_user in users:
if session is not None and curr_user.id == session.get_user_id(
user_context
):
# We do not consider the current session user to be conflicting
# This can be useful in cases where the current sign in will mark the session user as verified
continue
this_iteration_is_verified = False
if account_info.email is not None:
if (
curr_user.login_methods[0].has_same_email_as(account_info.email)
and curr_user.login_methods[0].verified
):
log_debug_message(
"isSignInUpAllowedHelper found same email for another user and verified"
)
this_iteration_is_verified = True
if account_info.phone_number is not None:
if (
curr_user.login_methods[0].has_same_phone_number_as(
account_info.phone_number
)
and curr_user.login_methods[0].verified
):
log_debug_message(
"isSignInUpAllowedHelper found same phone number for another user and verified"
)
this_iteration_is_verified = True
if not this_iteration_is_verified:
# even if one of the users is not verified, we do not allow sign up (see why above).
# Sure, this allows attackers to create email password accounts with an email
# to block actual users from signing up, but that's ok, since those
# users will just see an email already exists error and then will try another
# login method. They can also still just go through the password reset flow
# and then gain access to their email password account (which can then be verified).
log_debug_message(
"isSignInUpAllowedHelper returning false cause one of the other recipe level users is not verified"
)
should_allow = False
break
ProcessState.get_instance().add_state(
PROCESS_STATE.IS_SIGN_IN_UP_ALLOWED_NO_PRIMARY_USER_EXISTS
)
log_debug_message(f"isSignInUpAllowedHelper returning {should_allow}")
return should_allow
else:
if len(primary_users) > 1:
raise Exception(
"You have found a bug. Please report to https://github.com/supertokens/supertokens-node/issues"
)
primary_user = primary_users[0]
log_debug_message("isSignInUpAllowedHelper primary user found")
should_do_account_linking = (
await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
account_info
),
primary_user,
session,
tenant_id,
user_context,
)
)
if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink):
log_debug_message(
"isSignInUpAllowedHelper returning true because account linking is disabled"
)
return True
if not should_do_account_linking.should_require_verification:
log_debug_message(
"isSignInUpAllowedHelper returning true because dev does not require email verification"
)
return True
if not is_verified:
log_debug_message(
"isSignInUpAllowedHelper returning false because new user's email is not verified, and primary user with the same email was found."
)
return False
if session is not None and primary_user.id == session.get_user_id(
user_context
):
return True
for login_method in primary_user.login_methods:
if login_method.email is not None:
if (
login_method.has_same_email_as(account_info.email)
and login_method.verified
):
log_debug_message(
"isSignInUpAllowedHelper returning true cause found same email for primary user and verified"
)
return True
if login_method.phone_number is not None:
if (
login_method.has_same_phone_number_as(account_info.phone_number)
and login_method.verified
):
log_debug_message(
"isSignInUpAllowedHelper returning true cause found same phone number for primary user and verified"
)
return True
log_debug_message(
"isSignInUpAllowedHelper returning false cause primary user does not have the same email or phone number that is verified"
)
return False
async def is_email_change_allowed(
self,
user: User,
new_email: str,
is_verified: bool,
session: Optional[SessionContainer],
user_context: Dict[str, Any],
) -> EmailChangeAllowedResult:
"""
The purpose of this function is to check if a recipe user ID's email
can be changed or not. There are two conditions for when it can't be changed:
- If the recipe user is a primary user, then we need to check that the new email
doesn't belong to any other primary user. If it does, we disallow the change
since multiple primary user's can't have the same account info.
- If the recipe user is NOT a primary user, and if is_verified is false, then
we check if there exists a primary user with the same email, and if it does
we disallow the email change cause if this email is changed, and an email
verification email is sent, then the primary user may end up clicking
on the link by mistake, causing account linking to happen which can result
in account take over if this recipe user is malicious.
"""
for tenant_id in user.tenant_ids:
existing_users_with_new_email = (
await self.recipe_implementation.list_users_by_account_info(
tenant_id=tenant_id,
account_info=AccountInfo(email=new_email),
do_union_of_account_info=False,
user_context=user_context,
)
)
other_users_with_new_email = [
u for u in existing_users_with_new_email if u.id != user.id
]
other_primary_user_for_new_email = [
u for u in other_users_with_new_email if u.is_primary_user
]
if len(other_primary_user_for_new_email) > 1:
raise Exception(
"You found a bug. Please report it on github.com/supertokens/supertokens-core"
)
if user.is_primary_user:
if other_primary_user_for_new_email:
log_debug_message(
f"isEmailChangeAllowed: returning false cause email change will lead to two primary users having same email on {tenant_id}"
)
return EmailChangeAllowedResult(
allowed=False, reason="PRIMARY_USER_CONFLICT"
)
if is_verified:
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is primary, new email is verified and doesn't belong to any other primary user"
)
continue
if any(
lm.has_same_email_as(new_email) and lm.verified
for lm in user.login_methods
):
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is primary, new email is verified in another login method and doesn't belong to any other primary user"
)
continue
if not other_users_with_new_email:
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is primary and the new email doesn't belong to any other user (primary or non-primary)"
)
continue
should_do_account_linking = await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
other_users_with_new_email[0].login_methods[0]
),
user,
session,
tenant_id,
user_context,
)
if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink):
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause linking is disabled"
)
continue
if not should_do_account_linking.should_require_verification:
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause linking doesn't require email verification"
)
continue
log_debug_message(
f"isEmailChangeAllowed: returning false because the user hasn't verified the new email address and there exists another user with it on {tenant_id} and linking requires verification"
)
return EmailChangeAllowedResult(
allowed=False, reason="ACCOUNT_TAKEOVER_RISK"
)
else:
if is_verified:
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary and new email is verified"
)
continue
if user.login_methods[0].has_same_email_as(new_email):
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary and new email is same as the older one"
)
continue
if other_primary_user_for_new_email:
should_do_account_linking = (
await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId(
recipe_id=user.login_methods[0].recipe_id,
email=user.login_methods[0].email,
recipe_user_id=user.login_methods[0].recipe_user_id,
phone_number=user.login_methods[0].phone_number,
third_party=user.login_methods[0].third_party,
),
other_primary_user_for_new_email[0],
session,
tenant_id,
user_context,
)
)
if isinstance(
should_do_account_linking, ShouldNotAutomaticallyLink
):
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary there exists a primary user exists with the new email, but the dev does not have account linking enabled."
)
continue
if not should_do_account_linking.should_require_verification:
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary there exists a primary user exists with the new email, but the dev does not require email verification."
)
continue
log_debug_message(
"isEmailChangeAllowed: returning false cause input user is not a primary there exists a primary user exists with the new email."
)
return EmailChangeAllowedResult(
allowed=False, reason="ACCOUNT_TAKEOVER_RISK"
)
log_debug_message(
f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary no primary user exists with the new email"
)
continue
log_debug_message(
"isEmailChangeAllowed: returning true cause email change can happen on all tenants the user is part of"
)
return EmailChangeAllowedResult(allowed=True, reason="OK")
async def verify_email_for_recipe_user_if_linked_accounts_are_verified(
self,
user: User,
recipe_user_id: RecipeUserId,
user_context: Dict[str, Any],
) -> None:
if self.email_verification_recipe is None:
return
if user.is_primary_user:
recipe_user_email: Optional[str] = None
is_already_verified = False
for lm in user.login_methods:
if lm.recipe_user_id.get_as_string() == recipe_user_id.get_as_string():
recipe_user_email = lm.email
is_already_verified = lm.verified
break
if recipe_user_email is not None:
if is_already_verified:
return
should_verify_email = False
for lm in user.login_methods:
if lm.has_same_email_as(recipe_user_email) and lm.verified:
should_verify_email = True
break
if should_verify_email:
ev_recipe = self.email_verification_recipe.get_instance_or_throw()
resp = await ev_recipe.recipe_implementation.create_email_verification_token(
tenant_id=user.tenant_ids[0],
recipe_user_id=recipe_user_id,
email=recipe_user_email,
user_context=user_context,
)
if resp.status == "OK":
# we purposely pass in false below cause we don't want account
# linking to happen
await ev_recipe.recipe_implementation.verify_email_using_token(
tenant_id=user.tenant_ids[0],
token=resp.token,
attempt_account_linking=False,
user_context=user_context,
)
async def should_become_primary_user(
self,
user: User,
tenant_id: str,
session: Optional[SessionContainer],
user_context: Dict[str, Any],
) -> bool:
should_do_account_linking = (
await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
user.login_methods[0]
),
None,
session,
tenant_id,
user_context,
)
)
if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink):
log_debug_message(
"should_become_primary_user returning false because shouldAutomaticallyLink is false"
)
return False
if (
should_do_account_linking.should_require_verification
and not user.login_methods[0].verified
):
log_debug_message(
"should_become_primary_user returning false because shouldRequireVerification is true but the login method is not verified"
)
return False
log_debug_message("should_become_primary_user returning true")
return True
async def try_linking_by_account_info_or_create_primary_user(
self,
input_user: User,
session: Optional[SessionContainer],
tenant_id: str,
user_context: Dict[str, Any],
) -> TryLinkingByAccountInfoOrCreatePrimaryUserResult:
tries = 0
while tries < 100:
tries += 1
primary_user_that_can_be_linked_to_the_input_user = (
await self.get_primary_user_that_can_be_linked_to_recipe_user_id(
tenant_id=tenant_id,
user=input_user,
user_context=user_context,
)
)
if primary_user_that_can_be_linked_to_the_input_user is not None:
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: got primary user we can try linking"
)
# we check if the input_user and primary_user_that_can_be_linked_to_the_input_user are linked based on recipeIds because the input_user obj could be outdated
if not any(
lm.recipe_user_id.get_as_string()
== input_user.login_methods[0].recipe_user_id.get_as_string()
for lm in primary_user_that_can_be_linked_to_the_input_user.login_methods
):
should_do_account_linking = await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
input_user.login_methods[0]
),
primary_user_that_can_be_linked_to_the_input_user,
session,
tenant_id,
user_context,
)
if isinstance(
should_do_account_linking, ShouldNotAutomaticallyLink
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: not linking because shouldAutomaticallyLink is false"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="NO_LINK", user=None
)
account_info_verified_in_prim_user = any(
(
input_user.login_methods[0].email is not None
and lm.has_same_email_as(input_user.login_methods[0].email)
)
or (
input_user.login_methods[0].phone_number is not None
and lm.has_same_phone_number_as(
input_user.login_methods[0].phone_number
)
and lm.verified
)
for lm in primary_user_that_can_be_linked_to_the_input_user.login_methods
)
if should_do_account_linking.should_require_verification and (
not input_user.login_methods[0].verified
or not account_info_verified_in_prim_user
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: not linking because shouldRequireVerification is true but the login method is not verified in the new or the primary user"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="NO_LINK", user=None
)
log_debug_message(
"try_linking_by_account_info_or_create_primary_user linking"
)
link_accounts_result = await self.recipe_implementation.link_accounts(
recipe_user_id=input_user.login_methods[0].recipe_user_id,
primary_user_id=primary_user_that_can_be_linked_to_the_input_user.id,
user_context=user_context,
)
if link_accounts_result.status == "OK":
log_debug_message(
"try_linking_by_account_info_or_create_primary_user successfully linked"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK", user=link_accounts_result.user
)
elif (
link_accounts_result.status
== "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user already linked to another user"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK", user=link_accounts_result.user
)
elif (
link_accounts_result.status
== "INPUT_USER_IS_NOT_A_PRIMARY_USER"
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user linking failed because of a race condition"
)
continue
else:
log_debug_message(
"try_linking_by_account_info_or_create_primary_user linking failed because of a race condition"
)
continue
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK", user=input_user
)
oldest_user_that_can_be_linked_to_the_input_user = (
await self.get_oldest_user_that_can_be_linked_to_recipe_user(
tenant_id=tenant_id,
user=input_user,
user_context=user_context,
)
)
if (
oldest_user_that_can_be_linked_to_the_input_user is not None
and oldest_user_that_can_be_linked_to_the_input_user.id != input_user.id
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: got an older user we can try linking"
)
should_make_older_user_primary = await self.should_become_primary_user(
oldest_user_that_can_be_linked_to_the_input_user,
tenant_id,
session,
user_context,
)
if should_make_older_user_primary:
create_primary_user_result = await self.recipe_implementation.create_primary_user(
recipe_user_id=oldest_user_that_can_be_linked_to_the_input_user.login_methods[
0
].recipe_user_id,
user_context=user_context,
)
if (
create_primary_user_result.status
== "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
or create_primary_user_result.status
== "RECIPE_USER_ID_ALREADY_LINKED_WITH_PRIMARY_USER_ID_ERROR"
):
log_debug_message(
f"try_linking_by_account_info_or_create_primary_user: retrying because createPrimaryUser returned {create_primary_user_result.status}"
)
continue
should_do_account_linking = await self.config.should_do_automatic_account_linking(
AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method(
input_user.login_methods[0]
),
create_primary_user_result.user,
session,
tenant_id,
user_context,
)
if isinstance(
should_do_account_linking, ShouldNotAutomaticallyLink
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: not linking because shouldAutomaticallyLink is false"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="NO_LINK", user=None
)
if (
should_do_account_linking.should_require_verification
and not input_user.login_methods[0].verified
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: not linking because shouldRequireVerification is true but the login method is not verified"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="NO_LINK", user=None
)
log_debug_message(
"try_linking_by_account_info_or_create_primary_user linking"
)
link_accounts_result = (
await self.recipe_implementation.link_accounts(
recipe_user_id=input_user.login_methods[0].recipe_user_id,
primary_user_id=create_primary_user_result.user.id,
user_context=user_context,
)
)
if link_accounts_result.status == "OK":
log_debug_message(
"try_linking_by_account_info_or_create_primary_user successfully linked"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK", user=link_accounts_result.user
)
elif (
link_accounts_result.status
== "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user already linked to another user"
)
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK", user=link_accounts_result.user
)
elif (
link_accounts_result.status
== "INPUT_USER_IS_NOT_A_PRIMARY_USER"
):
log_debug_message(
"try_linking_by_account_info_or_create_primary_user linking failed because of a race condition"
)
continue
else:
log_debug_message(
"try_linking_by_account_info_or_create_primary_user linking failed because of a race condition"
)
continue
log_debug_message(
"try_linking_by_account_info_or_create_primary_user: trying to make the current user primary"
)
if await self.should_become_primary_user(
input_user, tenant_id, session, user_context
):
create_primary_user_result = (
await self.recipe_implementation.create_primary_user(
recipe_user_id=input_user.login_methods[0].recipe_user_id,
user_context=user_context,
)
)
if (
create_primary_user_result.status
== "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR"
or create_primary_user_result.status
== "RECIPE_USER_ID_ALREADY_LINKED_WITH_PRIMARY_USER_ID_ERROR"
):
continue
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK",
user=create_primary_user_result.user,
)
else:
return TryLinkingByAccountInfoOrCreatePrimaryUserResult(
status="OK", user=input_user
)
raise Exception(
"This should never happen: ran out of retries for try_linking_by_account_info_or_create_primary_user"
)
Classes
class AccountLinkingRecipe (recipe_id: str, app_info: AppInfo, on_account_linked: Optional[Callable[[User, RecipeLevelUser, Dict[str, Any]], Awaitable[None]]] = None, should_do_automatic_account_linking: Optional[Callable[[AccountInfoWithRecipeIdAndUserId, Optional[User], Optional[SessionContainer], str, Dict[str, Any]], Awaitable[Union[ShouldNotAutomaticallyLink, ShouldAutomaticallyLink]]]] = None, override: Optional[InputOverrideConfig] = None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class AccountLinkingRecipe(RecipeModule): recipe_id = "accountlinking" __instance = None def __init__( self, recipe_id: str, app_info: AppInfo, on_account_linked: Optional[ Callable[[User, RecipeLevelUser, Dict[str, Any]], Awaitable[None]] ] = None, should_do_automatic_account_linking: Optional[ Callable[ [ AccountInfoWithRecipeIdAndUserId, Optional[User], Optional[SessionContainer], str, Dict[str, Any], ], Awaitable[Union[ShouldNotAutomaticallyLink, ShouldAutomaticallyLink]], ] ] = None, override: Optional[InputOverrideConfig] = None, ): super().__init__(recipe_id, app_info) self.config = validate_and_normalise_user_input( app_info, on_account_linked, should_do_automatic_account_linking, override ) recipe_implementation: RecipeInterface = RecipeImplementation( Querier.get_instance(recipe_id), self, self.config ) self.recipe_implementation: RecipeInterface = ( recipe_implementation if self.config.override.functions is None else self.config.override.functions(recipe_implementation) ) self.email_verification_recipe: EmailVerificationRecipe | None = None def register_email_verification_recipe( self, email_verification_recipe: EmailVerificationRecipe ): self.email_verification_recipe = email_verification_recipe def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool: return False def get_apis_handled(self) -> List[APIHandled]: return [] async def handle_api_request( self, request_id: str, tenant_id: Optional[str], request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any], ) -> Union[BaseResponse, None]: raise Exception("Should never come here") async def handle_error( self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any], ) -> BaseResponse: raise err def get_all_cors_headers(self) -> List[str]: return [] @staticmethod def init( on_account_linked: Optional[ Callable[[User, RecipeLevelUser, Dict[str, Any]], Awaitable[None]] ] = None, should_do_automatic_account_linking: Optional[ Callable[ [ AccountInfoWithRecipeIdAndUserId, Optional[User], Optional[SessionContainer], str, Dict[str, Any], ], Awaitable[Union[ShouldNotAutomaticallyLink, ShouldAutomaticallyLink]], ] ] = None, override: Optional[InputOverrideConfig] = None, ): def func(app_info: AppInfo): if AccountLinkingRecipe.__instance is None: AccountLinkingRecipe.__instance = AccountLinkingRecipe( AccountLinkingRecipe.recipe_id, app_info, on_account_linked, should_do_automatic_account_linking, override, ) return AccountLinkingRecipe.__instance raise Exception( None, "Accountlinking recipe has already been initialised. Please check your code for bugs.", ) return func @staticmethod def get_instance() -> AccountLinkingRecipe: if AccountLinkingRecipe.__instance is None: AccountLinkingRecipe.init()(Supertokens.get_instance().app_info) assert AccountLinkingRecipe.__instance is not None return AccountLinkingRecipe.__instance @staticmethod def reset(): if ("SUPERTOKENS_ENV" not in environ) or ( environ["SUPERTOKENS_ENV"] != "testing" ): raise_general_exception("calling testing function in non testing env") AccountLinkingRecipe.__instance = None async def get_primary_user_that_can_be_linked_to_recipe_user_id( self, tenant_id: str, user: User, user_context: Dict[str, Any], ) -> Optional[User]: # First we check if this user itself is a primary user or not. If it is, we return that. if user.is_primary_user: return user # Then, we try and find a primary user based on the email / phone number / third party ID. users = await self.recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=user.login_methods[0], do_union_of_account_info=True, user_context=user_context, ) log_debug_message( "getPrimaryUserThatCanBeLinkedToRecipeUserId found %d matching users" % len(users) ) primary_users = [u for u in users if u.is_primary_user] log_debug_message( "getPrimaryUserThatCanBeLinkedToRecipeUserId found %d matching primary users" % len(primary_users) ) if len(primary_users) > 1: # This means that the new user has account info such that it's # spread across multiple primary user IDs. In this case, even # if we return one of them, it won't be able to be linked anyway # cause if we did, it would mean 2 primary users would have the # same account info. So we return None # This being said, with the current set of auth recipes, it should # never come here - cause: # ----> If the recipeuserid is a passwordless user, then it can have either a phone # email or both. If it has just one of them, then anyway 2 primary users can't # exist with the same phone number / email. If it has both, then the only way # that it can have multiple primary users returned is if there is another passwordless # primary user with the same phone number - which is not possible, cause phone # numbers are unique across passwordless users. # # ----> If the input is a third party user, then it has third party info and an email. Now there can be able to primary user with the same email, but # there can't be another thirdparty user with the same third party info (since that is unique). # Nor can there an email password primary user with the same email along with another # thirdparty primary user with the same email (since emails can't be the same across primary users). # # ----> If the input is an email password user, then it has an email. There can't be multiple primary users with the same email anyway. raise Exception( "You found a bug. Please report it on github.com/supertokens/supertokens-node" ) return primary_users[0] if len(primary_users) > 0 else None async def get_oldest_user_that_can_be_linked_to_recipe_user( self, tenant_id: str, user: User, user_context: Dict[str, Any], ) -> Optional[User]: # First we check if this user itself is a primary user or not. If it is, we return that since it cannot be linked to anything else if user.is_primary_user: return user # Then, we try and find matching users based on the email / phone number / third party ID. users = await self.recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=user.login_methods[0], do_union_of_account_info=True, user_context=user_context, ) log_debug_message( f"getOldestUserThatCanBeLinkedToRecipeUser found {len(users)} matching users" ) # Finally select the oldest one oldest_user = min(users, key=lambda u: u.time_joined) if users else None return oldest_user async def is_sign_in_allowed( self, user: User, account_info: Union[AccountInfoWithRecipeId, LoginMethod], tenant_id: str, session: Optional[SessionContainer], sign_in_verifies_login_method: bool, user_context: Dict[str, Any], ) -> bool: ProcessState.get_instance().add_state(PROCESS_STATE.IS_SIGN_IN_ALLOWED_CALLED) if ( user.is_primary_user or user.login_methods[0].verified or sign_in_verifies_login_method ): return True return await self.is_sign_in_up_allowed_helper( account_info=account_info, is_verified=user.login_methods[0].verified, session=session, tenant_id=tenant_id, is_sign_in=True, user=user, user_context=user_context, ) async def is_sign_up_allowed( self, new_user: AccountInfoWithRecipeId, is_verified: bool, session: Optional[SessionContainer], tenant_id: str, user_context: Dict[str, Any], ) -> bool: ProcessState.get_instance().add_state(PROCESS_STATE.IS_SIGN_UP_ALLOWED_CALLED) if new_user.email is not None and new_user.phone_number is not None: # We do this check cause below when we call list_users_by_account_info, # we only pass in one of email or phone number raise Exception("Please pass one of email or phone number, not both") return await self.is_sign_in_up_allowed_helper( account_info=new_user, is_verified=is_verified, session=session, tenant_id=tenant_id, user_context=user_context, user=None, is_sign_in=False, ) async def is_sign_in_up_allowed_helper( self, account_info: Union[AccountInfoWithRecipeId, LoginMethod], is_verified: bool, session: Optional[SessionContainer], tenant_id: str, is_sign_in: bool, user: Optional[User], user_context: Dict[str, Any], ) -> bool: ProcessState.get_instance().add_state( PROCESS_STATE.IS_SIGN_IN_UP_ALLOWED_HELPER_CALLED ) users = await self.recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=account_info, do_union_of_account_info=True, user_context=user_context, ) if not users: log_debug_message( "isSignInUpAllowedHelper returning true because no user with given account info" ) return True if is_sign_in and user is None: raise Exception( "This should never happen: isSignInUpAllowedHelper called with isSignIn: true, user: None" ) if ( len(users) == 1 and is_sign_in and user is not None and users[0].id == user.id ): log_debug_message( "isSignInUpAllowedHelper returning true because this is sign in and there is only a single user with the given account info" ) return True primary_users = [u for u in users if u.is_primary_user] if not primary_users: log_debug_message("isSignInUpAllowedHelper no primary user exists") should_do_account_linking = ( await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( account_info ), None, session, tenant_id, user_context, ) ) if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink): log_debug_message( "isSignInUpAllowedHelper returning true because account linking is disabled" ) return True if not should_do_account_linking.should_require_verification: log_debug_message( "isSignInUpAllowedHelper returning true because dev does not require email verification" ) return True should_allow = True for curr_user in users: if session is not None and curr_user.id == session.get_user_id( user_context ): # We do not consider the current session user to be conflicting # This can be useful in cases where the current sign in will mark the session user as verified continue this_iteration_is_verified = False if account_info.email is not None: if ( curr_user.login_methods[0].has_same_email_as(account_info.email) and curr_user.login_methods[0].verified ): log_debug_message( "isSignInUpAllowedHelper found same email for another user and verified" ) this_iteration_is_verified = True if account_info.phone_number is not None: if ( curr_user.login_methods[0].has_same_phone_number_as( account_info.phone_number ) and curr_user.login_methods[0].verified ): log_debug_message( "isSignInUpAllowedHelper found same phone number for another user and verified" ) this_iteration_is_verified = True if not this_iteration_is_verified: # even if one of the users is not verified, we do not allow sign up (see why above). # Sure, this allows attackers to create email password accounts with an email # to block actual users from signing up, but that's ok, since those # users will just see an email already exists error and then will try another # login method. They can also still just go through the password reset flow # and then gain access to their email password account (which can then be verified). log_debug_message( "isSignInUpAllowedHelper returning false cause one of the other recipe level users is not verified" ) should_allow = False break ProcessState.get_instance().add_state( PROCESS_STATE.IS_SIGN_IN_UP_ALLOWED_NO_PRIMARY_USER_EXISTS ) log_debug_message(f"isSignInUpAllowedHelper returning {should_allow}") return should_allow else: if len(primary_users) > 1: raise Exception( "You have found a bug. Please report to https://github.com/supertokens/supertokens-node/issues" ) primary_user = primary_users[0] log_debug_message("isSignInUpAllowedHelper primary user found") should_do_account_linking = ( await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( account_info ), primary_user, session, tenant_id, user_context, ) ) if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink): log_debug_message( "isSignInUpAllowedHelper returning true because account linking is disabled" ) return True if not should_do_account_linking.should_require_verification: log_debug_message( "isSignInUpAllowedHelper returning true because dev does not require email verification" ) return True if not is_verified: log_debug_message( "isSignInUpAllowedHelper returning false because new user's email is not verified, and primary user with the same email was found." ) return False if session is not None and primary_user.id == session.get_user_id( user_context ): return True for login_method in primary_user.login_methods: if login_method.email is not None: if ( login_method.has_same_email_as(account_info.email) and login_method.verified ): log_debug_message( "isSignInUpAllowedHelper returning true cause found same email for primary user and verified" ) return True if login_method.phone_number is not None: if ( login_method.has_same_phone_number_as(account_info.phone_number) and login_method.verified ): log_debug_message( "isSignInUpAllowedHelper returning true cause found same phone number for primary user and verified" ) return True log_debug_message( "isSignInUpAllowedHelper returning false cause primary user does not have the same email or phone number that is verified" ) return False async def is_email_change_allowed( self, user: User, new_email: str, is_verified: bool, session: Optional[SessionContainer], user_context: Dict[str, Any], ) -> EmailChangeAllowedResult: """ The purpose of this function is to check if a recipe user ID's email can be changed or not. There are two conditions for when it can't be changed: - If the recipe user is a primary user, then we need to check that the new email doesn't belong to any other primary user. If it does, we disallow the change since multiple primary user's can't have the same account info. - If the recipe user is NOT a primary user, and if is_verified is false, then we check if there exists a primary user with the same email, and if it does we disallow the email change cause if this email is changed, and an email verification email is sent, then the primary user may end up clicking on the link by mistake, causing account linking to happen which can result in account take over if this recipe user is malicious. """ for tenant_id in user.tenant_ids: existing_users_with_new_email = ( await self.recipe_implementation.list_users_by_account_info( tenant_id=tenant_id, account_info=AccountInfo(email=new_email), do_union_of_account_info=False, user_context=user_context, ) ) other_users_with_new_email = [ u for u in existing_users_with_new_email if u.id != user.id ] other_primary_user_for_new_email = [ u for u in other_users_with_new_email if u.is_primary_user ] if len(other_primary_user_for_new_email) > 1: raise Exception( "You found a bug. Please report it on github.com/supertokens/supertokens-core" ) if user.is_primary_user: if other_primary_user_for_new_email: log_debug_message( f"isEmailChangeAllowed: returning false cause email change will lead to two primary users having same email on {tenant_id}" ) return EmailChangeAllowedResult( allowed=False, reason="PRIMARY_USER_CONFLICT" ) if is_verified: log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is primary, new email is verified and doesn't belong to any other primary user" ) continue if any( lm.has_same_email_as(new_email) and lm.verified for lm in user.login_methods ): log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is primary, new email is verified in another login method and doesn't belong to any other primary user" ) continue if not other_users_with_new_email: log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is primary and the new email doesn't belong to any other user (primary or non-primary)" ) continue should_do_account_linking = await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( other_users_with_new_email[0].login_methods[0] ), user, session, tenant_id, user_context, ) if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink): log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause linking is disabled" ) continue if not should_do_account_linking.should_require_verification: log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause linking doesn't require email verification" ) continue log_debug_message( f"isEmailChangeAllowed: returning false because the user hasn't verified the new email address and there exists another user with it on {tenant_id} and linking requires verification" ) return EmailChangeAllowedResult( allowed=False, reason="ACCOUNT_TAKEOVER_RISK" ) else: if is_verified: log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary and new email is verified" ) continue if user.login_methods[0].has_same_email_as(new_email): log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary and new email is same as the older one" ) continue if other_primary_user_for_new_email: should_do_account_linking = ( await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId( recipe_id=user.login_methods[0].recipe_id, email=user.login_methods[0].email, recipe_user_id=user.login_methods[0].recipe_user_id, phone_number=user.login_methods[0].phone_number, third_party=user.login_methods[0].third_party, ), other_primary_user_for_new_email[0], session, tenant_id, user_context, ) ) if isinstance( should_do_account_linking, ShouldNotAutomaticallyLink ): log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary there exists a primary user exists with the new email, but the dev does not have account linking enabled." ) continue if not should_do_account_linking.should_require_verification: log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary there exists a primary user exists with the new email, but the dev does not require email verification." ) continue log_debug_message( "isEmailChangeAllowed: returning false cause input user is not a primary there exists a primary user exists with the new email." ) return EmailChangeAllowedResult( allowed=False, reason="ACCOUNT_TAKEOVER_RISK" ) log_debug_message( f"isEmailChangeAllowed: can change on {tenant_id} cause input user is not a primary no primary user exists with the new email" ) continue log_debug_message( "isEmailChangeAllowed: returning true cause email change can happen on all tenants the user is part of" ) return EmailChangeAllowedResult(allowed=True, reason="OK") async def verify_email_for_recipe_user_if_linked_accounts_are_verified( self, user: User, recipe_user_id: RecipeUserId, user_context: Dict[str, Any], ) -> None: if self.email_verification_recipe is None: return if user.is_primary_user: recipe_user_email: Optional[str] = None is_already_verified = False for lm in user.login_methods: if lm.recipe_user_id.get_as_string() == recipe_user_id.get_as_string(): recipe_user_email = lm.email is_already_verified = lm.verified break if recipe_user_email is not None: if is_already_verified: return should_verify_email = False for lm in user.login_methods: if lm.has_same_email_as(recipe_user_email) and lm.verified: should_verify_email = True break if should_verify_email: ev_recipe = self.email_verification_recipe.get_instance_or_throw() resp = await ev_recipe.recipe_implementation.create_email_verification_token( tenant_id=user.tenant_ids[0], recipe_user_id=recipe_user_id, email=recipe_user_email, user_context=user_context, ) if resp.status == "OK": # we purposely pass in false below cause we don't want account # linking to happen await ev_recipe.recipe_implementation.verify_email_using_token( tenant_id=user.tenant_ids[0], token=resp.token, attempt_account_linking=False, user_context=user_context, ) async def should_become_primary_user( self, user: User, tenant_id: str, session: Optional[SessionContainer], user_context: Dict[str, Any], ) -> bool: should_do_account_linking = ( await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( user.login_methods[0] ), None, session, tenant_id, user_context, ) ) if isinstance(should_do_account_linking, ShouldNotAutomaticallyLink): log_debug_message( "should_become_primary_user returning false because shouldAutomaticallyLink is false" ) return False if ( should_do_account_linking.should_require_verification and not user.login_methods[0].verified ): log_debug_message( "should_become_primary_user returning false because shouldRequireVerification is true but the login method is not verified" ) return False log_debug_message("should_become_primary_user returning true") return True async def try_linking_by_account_info_or_create_primary_user( self, input_user: User, session: Optional[SessionContainer], tenant_id: str, user_context: Dict[str, Any], ) -> TryLinkingByAccountInfoOrCreatePrimaryUserResult: tries = 0 while tries < 100: tries += 1 primary_user_that_can_be_linked_to_the_input_user = ( await self.get_primary_user_that_can_be_linked_to_recipe_user_id( tenant_id=tenant_id, user=input_user, user_context=user_context, ) ) if primary_user_that_can_be_linked_to_the_input_user is not None: log_debug_message( "try_linking_by_account_info_or_create_primary_user: got primary user we can try linking" ) # we check if the input_user and primary_user_that_can_be_linked_to_the_input_user are linked based on recipeIds because the input_user obj could be outdated if not any( lm.recipe_user_id.get_as_string() == input_user.login_methods[0].recipe_user_id.get_as_string() for lm in primary_user_that_can_be_linked_to_the_input_user.login_methods ): should_do_account_linking = await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( input_user.login_methods[0] ), primary_user_that_can_be_linked_to_the_input_user, session, tenant_id, user_context, ) if isinstance( should_do_account_linking, ShouldNotAutomaticallyLink ): log_debug_message( "try_linking_by_account_info_or_create_primary_user: not linking because shouldAutomaticallyLink is false" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="NO_LINK", user=None ) account_info_verified_in_prim_user = any( ( input_user.login_methods[0].email is not None and lm.has_same_email_as(input_user.login_methods[0].email) ) or ( input_user.login_methods[0].phone_number is not None and lm.has_same_phone_number_as( input_user.login_methods[0].phone_number ) and lm.verified ) for lm in primary_user_that_can_be_linked_to_the_input_user.login_methods ) if should_do_account_linking.should_require_verification and ( not input_user.login_methods[0].verified or not account_info_verified_in_prim_user ): log_debug_message( "try_linking_by_account_info_or_create_primary_user: not linking because shouldRequireVerification is true but the login method is not verified in the new or the primary user" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="NO_LINK", user=None ) log_debug_message( "try_linking_by_account_info_or_create_primary_user linking" ) link_accounts_result = await self.recipe_implementation.link_accounts( recipe_user_id=input_user.login_methods[0].recipe_user_id, primary_user_id=primary_user_that_can_be_linked_to_the_input_user.id, user_context=user_context, ) if link_accounts_result.status == "OK": log_debug_message( "try_linking_by_account_info_or_create_primary_user successfully linked" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=link_accounts_result.user ) elif ( link_accounts_result.status == "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ): log_debug_message( "try_linking_by_account_info_or_create_primary_user already linked to another user" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=link_accounts_result.user ) elif ( link_accounts_result.status == "INPUT_USER_IS_NOT_A_PRIMARY_USER" ): log_debug_message( "try_linking_by_account_info_or_create_primary_user linking failed because of a race condition" ) continue else: log_debug_message( "try_linking_by_account_info_or_create_primary_user linking failed because of a race condition" ) continue return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=input_user ) oldest_user_that_can_be_linked_to_the_input_user = ( await self.get_oldest_user_that_can_be_linked_to_recipe_user( tenant_id=tenant_id, user=input_user, user_context=user_context, ) ) if ( oldest_user_that_can_be_linked_to_the_input_user is not None and oldest_user_that_can_be_linked_to_the_input_user.id != input_user.id ): log_debug_message( "try_linking_by_account_info_or_create_primary_user: got an older user we can try linking" ) should_make_older_user_primary = await self.should_become_primary_user( oldest_user_that_can_be_linked_to_the_input_user, tenant_id, session, user_context, ) if should_make_older_user_primary: create_primary_user_result = await self.recipe_implementation.create_primary_user( recipe_user_id=oldest_user_that_can_be_linked_to_the_input_user.login_methods[ 0 ].recipe_user_id, user_context=user_context, ) if ( create_primary_user_result.status == "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" or create_primary_user_result.status == "RECIPE_USER_ID_ALREADY_LINKED_WITH_PRIMARY_USER_ID_ERROR" ): log_debug_message( f"try_linking_by_account_info_or_create_primary_user: retrying because createPrimaryUser returned {create_primary_user_result.status}" ) continue should_do_account_linking = await self.config.should_do_automatic_account_linking( AccountInfoWithRecipeIdAndUserId.from_account_info_or_login_method( input_user.login_methods[0] ), create_primary_user_result.user, session, tenant_id, user_context, ) if isinstance( should_do_account_linking, ShouldNotAutomaticallyLink ): log_debug_message( "try_linking_by_account_info_or_create_primary_user: not linking because shouldAutomaticallyLink is false" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="NO_LINK", user=None ) if ( should_do_account_linking.should_require_verification and not input_user.login_methods[0].verified ): log_debug_message( "try_linking_by_account_info_or_create_primary_user: not linking because shouldRequireVerification is true but the login method is not verified" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="NO_LINK", user=None ) log_debug_message( "try_linking_by_account_info_or_create_primary_user linking" ) link_accounts_result = ( await self.recipe_implementation.link_accounts( recipe_user_id=input_user.login_methods[0].recipe_user_id, primary_user_id=create_primary_user_result.user.id, user_context=user_context, ) ) if link_accounts_result.status == "OK": log_debug_message( "try_linking_by_account_info_or_create_primary_user successfully linked" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=link_accounts_result.user ) elif ( link_accounts_result.status == "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" ): log_debug_message( "try_linking_by_account_info_or_create_primary_user already linked to another user" ) return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=link_accounts_result.user ) elif ( link_accounts_result.status == "INPUT_USER_IS_NOT_A_PRIMARY_USER" ): log_debug_message( "try_linking_by_account_info_or_create_primary_user linking failed because of a race condition" ) continue else: log_debug_message( "try_linking_by_account_info_or_create_primary_user linking failed because of a race condition" ) continue log_debug_message( "try_linking_by_account_info_or_create_primary_user: trying to make the current user primary" ) if await self.should_become_primary_user( input_user, tenant_id, session, user_context ): create_primary_user_result = ( await self.recipe_implementation.create_primary_user( recipe_user_id=input_user.login_methods[0].recipe_user_id, user_context=user_context, ) ) if ( create_primary_user_result.status == "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR" or create_primary_user_result.status == "RECIPE_USER_ID_ALREADY_LINKED_WITH_PRIMARY_USER_ID_ERROR" ): continue return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=create_primary_user_result.user, ) else: return TryLinkingByAccountInfoOrCreatePrimaryUserResult( status="OK", user=input_user ) raise Exception( "This should never happen: ran out of retries for try_linking_by_account_info_or_create_primary_user" )
Ancestors
- RecipeModule
- abc.ABC
Class variables
var get_tenant_id : Optional[Callable[[str, Dict[str, Any]], Awaitable[str]]]
var recipe_id
Static methods
def get_instance() ‑> AccountLinkingRecipe
def init(on_account_linked: Optional[Callable[[User, RecipeLevelUser, Dict[str, Any]], Awaitable[None]]] = None, should_do_automatic_account_linking: Optional[Callable[[AccountInfoWithRecipeIdAndUserId, Optional[User], Optional[SessionContainer], str, Dict[str, Any]], Awaitable[Union[ShouldNotAutomaticallyLink, ShouldAutomaticallyLink]]]] = None, override: Optional[InputOverrideConfig] = None)
def reset()
Methods
def get_all_cors_headers(self) ‑> List[str]
def get_apis_handled(self) ‑> List[APIHandled]
async def get_oldest_user_that_can_be_linked_to_recipe_user(self, tenant_id: str, user: User, user_context: Dict[str, Any])
async def get_primary_user_that_can_be_linked_to_recipe_user_id(self, tenant_id: str, user: User, user_context: Dict[str, Any])
async def handle_api_request(self, request_id: str, tenant_id: Optional[str], request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any])
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any])
async def is_email_change_allowed(self, user: User, new_email: str, is_verified: bool, session: Optional[SessionContainer], user_context: Dict[str, Any])
-
The purpose of this function is to check if a recipe user ID's email can be changed or not. There are two conditions for when it can't be changed: - If the recipe user is a primary user, then we need to check that the new email doesn't belong to any other primary user. If it does, we disallow the change since multiple primary user's can't have the same account info.
- If the recipe user is NOT a primary user, and if is_verified is false, then we check if there exists a primary user with the same email, and if it does we disallow the email change cause if this email is changed, and an email verification email is sent, then the primary user may end up clicking on the link by mistake, causing account linking to happen which can result in account take over if this recipe user is malicious.
def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> bool
async def is_sign_in_allowed(self, user: User, account_info: Union[AccountInfoWithRecipeId, LoginMethod], tenant_id: str, session: Optional[SessionContainer], sign_in_verifies_login_method: bool, user_context: Dict[str, Any])
async def is_sign_in_up_allowed_helper(self, account_info: Union[AccountInfoWithRecipeId, LoginMethod], is_verified: bool, session: Optional[SessionContainer], tenant_id: str, is_sign_in: bool, user: Optional[User], user_context: Dict[str, Any])
async def is_sign_up_allowed(self, new_user: AccountInfoWithRecipeId, is_verified: bool, session: Optional[SessionContainer], tenant_id: str, user_context: Dict[str, Any])
def register_email_verification_recipe(self, email_verification_recipe: EmailVerificationRecipe)
async def should_become_primary_user(self, user: User, tenant_id: str, session: Optional[SessionContainer], user_context: Dict[str, Any])
async def try_linking_by_account_info_or_create_primary_user(self, input_user: User, session: Optional[SessionContainer], tenant_id: str, user_context: Dict[str, Any])
async def verify_email_for_recipe_user_if_linked_accounts_are_verified(self, user: User, recipe_user_id: RecipeUserId, user_context: Dict[str, Any])
class EmailChangeAllowedResult (allowed: bool, reason: "Literal[('OK', 'PRIMARY_USER_CONFLICT', 'ACCOUNT_TAKEOVER_RISK')]")
-
Expand source code
class EmailChangeAllowedResult: def __init__( self, allowed: bool, reason: Literal["OK", "PRIMARY_USER_CONFLICT", "ACCOUNT_TAKEOVER_RISK"], ): self.allowed = allowed self.reason: Literal["OK", "PRIMARY_USER_CONFLICT", "ACCOUNT_TAKEOVER_RISK"] = ( reason )
class TryLinkingByAccountInfoOrCreatePrimaryUserResult (status: "Literal[('OK', 'NO_LINK')]", user: Optional[User])
-
Expand source code
class TryLinkingByAccountInfoOrCreatePrimaryUserResult: def __init__(self, status: Literal["OK", "NO_LINK"], user: Optional[User]): self.status: Literal["OK", "NO_LINK"] = status self.user = user