Module supertokens_python.recipe.thirdparty.recipe_implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from supertokens_python.asyncio import get_user, list_users_by_account_info

from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.multitenancy.constants import DEFAULT_TENANT_ID
from supertokens_python.recipe.multitenancy.recipe import MultitenancyRecipe
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.thirdparty.provider import ProviderInput
from supertokens_python.recipe.thirdparty.providers.config_utils import (
    find_and_create_provider_instance,
    merge_providers_from_core_and_static,
)
from supertokens_python.types import AccountInfo, User, RecipeUserId

if TYPE_CHECKING:
    from supertokens_python.querier import Querier
    from supertokens_python.auth_utils import (
        LinkingToSessionUserFailedError,
    )

from .interfaces import (
    EmailChangeNotAllowedError,
    ManuallyCreateOrUpdateUserOkResult,
    RecipeInterface,
    SignInUpNotAllowed,
    SignInUpOkResult,
)
from .types import RawUserInfoFromProvider, ThirdPartyInfo


class RecipeImplementation(RecipeInterface):
    def __init__(self, querier: Querier, providers: List[ProviderInput]):
        super().__init__()
        self.querier = querier
        self.providers = providers

    async def sign_in_up(
        self,
        third_party_id: str,
        third_party_user_id: str,
        email: str,
        is_verified: bool,
        oauth_tokens: Dict[str, Any],
        raw_user_info_from_provider: RawUserInfoFromProvider,
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignInUpOkResult, SignInUpNotAllowed, LinkingToSessionUserFailedError]:
        response = await self.manually_create_or_update_user(
            third_party_id=third_party_id,
            third_party_user_id=third_party_user_id,
            email=email,
            tenant_id=tenant_id,
            is_verified=is_verified,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(response, EmailChangeNotAllowedError):
            return SignInUpNotAllowed(
                "Cannot sign in / up because new email cannot be applied to existing account. Please contact support. (ERR_CODE_005)"
                if response.reason
                == "Email already associated with another primary user."
                else "Cannot sign in / up because new email cannot be applied to existing account. Please contact support. (ERR_CODE_024)"
            )

        if isinstance(response, ManuallyCreateOrUpdateUserOkResult):
            return SignInUpOkResult(
                user=response.user,
                recipe_user_id=response.recipe_user_id,
                created_new_recipe_user=response.created_new_recipe_user,
                oauth_tokens=oauth_tokens,
                raw_user_info_from_provider=raw_user_info_from_provider,
            )

        return response

    async def manually_create_or_update_user(
        self,
        third_party_id: str,
        third_party_user_id: str,
        email: str,
        is_verified: bool,
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[
        ManuallyCreateOrUpdateUserOkResult,
        LinkingToSessionUserFailedError,
        SignInUpNotAllowed,
        EmailChangeNotAllowedError,
    ]:
        from supertokens_python.auth_utils import (
            link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info,
        )

        account_linking = AccountLinkingRecipe.get_instance()
        users = await list_users_by_account_info(
            tenant_id,
            AccountInfo(
                third_party=ThirdPartyInfo(
                    third_party_id=third_party_id,
                    third_party_user_id=third_party_user_id,
                ),
            ),
            False,
            user_context,
        )

        user = users[0] if users else None
        if user is not None:
            is_email_change_allowed = await account_linking.is_email_change_allowed(
                user=user,
                is_verified=is_verified,
                new_email=email,
                session=session,
                user_context=user_context,
            )
            if not is_email_change_allowed.allowed:
                reason = (
                    "Email already associated with another primary user."
                    if is_email_change_allowed.reason == "PRIMARY_USER_CONFLICT"
                    else "New email cannot be applied to existing account because of account takeover risks."
                )
                return EmailChangeNotAllowedError(reason)

        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signinup"),
            {
                "thirdPartyId": third_party_id,
                "thirdPartyUserId": third_party_user_id,
                "email": {"id": email, "isVerified": is_verified},
            },
            user_context=user_context,
        )

        if response["status"] == "EMAIL_CHANGE_NOT_ALLOWED_ERROR":
            return EmailChangeNotAllowedError(response["reason"])

        # status is OK

        user = User.from_json(
            response["user"],
        )
        recipe_user_id = RecipeUserId(response["recipeUserId"])

        await account_linking.verify_email_for_recipe_user_if_linked_accounts_are_verified(
            user=user,
            recipe_user_id=recipe_user_id,
            user_context=user_context,
        )

        # Fetch updated user
        user = await get_user(recipe_user_id.get_as_string(), user_context)

        assert user is not None

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=user,
            recipe_user_id=recipe_user_id,
            session=session,
            user_context=user_context,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if link_result.status != "OK":
            return link_result

        return ManuallyCreateOrUpdateUserOkResult(
            user=link_result.user,
            recipe_user_id=recipe_user_id,
            created_new_recipe_user=response["createdNewUser"],
        )

    async def get_provider(
        self,
        third_party_id: str,
        client_type: Optional[str],
        tenant_id: str,
        user_context: Dict[str, Any],
    ):
        mt_recipe = MultitenancyRecipe.get_instance()
        tenant_config = await mt_recipe.recipe_implementation.get_tenant(
            tenant_id=tenant_id,
            user_context=user_context,
        )

        if tenant_config is None:
            raise Exception("Tenant not found")

        merged_providers = merge_providers_from_core_and_static(
            provider_configs_from_core=tenant_config.third_party_providers,
            provider_inputs_from_static=self.providers,
            include_all_providers=tenant_id == DEFAULT_TENANT_ID,
        )

        provider = await find_and_create_provider_instance(
            merged_providers, third_party_id, client_type, user_context
        )

        return provider

Classes

class RecipeImplementation (querier: Querier, providers: List[ProviderInput])

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class RecipeImplementation(RecipeInterface):
    def __init__(self, querier: Querier, providers: List[ProviderInput]):
        super().__init__()
        self.querier = querier
        self.providers = providers

    async def sign_in_up(
        self,
        third_party_id: str,
        third_party_user_id: str,
        email: str,
        is_verified: bool,
        oauth_tokens: Dict[str, Any],
        raw_user_info_from_provider: RawUserInfoFromProvider,
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[SignInUpOkResult, SignInUpNotAllowed, LinkingToSessionUserFailedError]:
        response = await self.manually_create_or_update_user(
            third_party_id=third_party_id,
            third_party_user_id=third_party_user_id,
            email=email,
            tenant_id=tenant_id,
            is_verified=is_verified,
            session=session,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
            user_context=user_context,
        )

        if isinstance(response, EmailChangeNotAllowedError):
            return SignInUpNotAllowed(
                "Cannot sign in / up because new email cannot be applied to existing account. Please contact support. (ERR_CODE_005)"
                if response.reason
                == "Email already associated with another primary user."
                else "Cannot sign in / up because new email cannot be applied to existing account. Please contact support. (ERR_CODE_024)"
            )

        if isinstance(response, ManuallyCreateOrUpdateUserOkResult):
            return SignInUpOkResult(
                user=response.user,
                recipe_user_id=response.recipe_user_id,
                created_new_recipe_user=response.created_new_recipe_user,
                oauth_tokens=oauth_tokens,
                raw_user_info_from_provider=raw_user_info_from_provider,
            )

        return response

    async def manually_create_or_update_user(
        self,
        third_party_id: str,
        third_party_user_id: str,
        email: str,
        is_verified: bool,
        session: Optional[SessionContainer],
        should_try_linking_with_session_user: Union[bool, None],
        tenant_id: str,
        user_context: Dict[str, Any],
    ) -> Union[
        ManuallyCreateOrUpdateUserOkResult,
        LinkingToSessionUserFailedError,
        SignInUpNotAllowed,
        EmailChangeNotAllowedError,
    ]:
        from supertokens_python.auth_utils import (
            link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info,
        )

        account_linking = AccountLinkingRecipe.get_instance()
        users = await list_users_by_account_info(
            tenant_id,
            AccountInfo(
                third_party=ThirdPartyInfo(
                    third_party_id=third_party_id,
                    third_party_user_id=third_party_user_id,
                ),
            ),
            False,
            user_context,
        )

        user = users[0] if users else None
        if user is not None:
            is_email_change_allowed = await account_linking.is_email_change_allowed(
                user=user,
                is_verified=is_verified,
                new_email=email,
                session=session,
                user_context=user_context,
            )
            if not is_email_change_allowed.allowed:
                reason = (
                    "Email already associated with another primary user."
                    if is_email_change_allowed.reason == "PRIMARY_USER_CONFLICT"
                    else "New email cannot be applied to existing account because of account takeover risks."
                )
                return EmailChangeNotAllowedError(reason)

        response = await self.querier.send_post_request(
            NormalisedURLPath(f"{tenant_id}/recipe/signinup"),
            {
                "thirdPartyId": third_party_id,
                "thirdPartyUserId": third_party_user_id,
                "email": {"id": email, "isVerified": is_verified},
            },
            user_context=user_context,
        )

        if response["status"] == "EMAIL_CHANGE_NOT_ALLOWED_ERROR":
            return EmailChangeNotAllowedError(response["reason"])

        # status is OK

        user = User.from_json(
            response["user"],
        )
        recipe_user_id = RecipeUserId(response["recipeUserId"])

        await account_linking.verify_email_for_recipe_user_if_linked_accounts_are_verified(
            user=user,
            recipe_user_id=recipe_user_id,
            user_context=user_context,
        )

        # Fetch updated user
        user = await get_user(recipe_user_id.get_as_string(), user_context)

        assert user is not None

        link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
            tenant_id=tenant_id,
            input_user=user,
            recipe_user_id=recipe_user_id,
            session=session,
            user_context=user_context,
            should_try_linking_with_session_user=should_try_linking_with_session_user,
        )

        if link_result.status != "OK":
            return link_result

        return ManuallyCreateOrUpdateUserOkResult(
            user=link_result.user,
            recipe_user_id=recipe_user_id,
            created_new_recipe_user=response["createdNewUser"],
        )

    async def get_provider(
        self,
        third_party_id: str,
        client_type: Optional[str],
        tenant_id: str,
        user_context: Dict[str, Any],
    ):
        mt_recipe = MultitenancyRecipe.get_instance()
        tenant_config = await mt_recipe.recipe_implementation.get_tenant(
            tenant_id=tenant_id,
            user_context=user_context,
        )

        if tenant_config is None:
            raise Exception("Tenant not found")

        merged_providers = merge_providers_from_core_and_static(
            provider_configs_from_core=tenant_config.third_party_providers,
            provider_inputs_from_static=self.providers,
            include_all_providers=tenant_id == DEFAULT_TENANT_ID,
        )

        provider = await find_and_create_provider_instance(
            merged_providers, third_party_id, client_type, user_context
        )

        return provider

Ancestors

Methods

async def get_provider(self, third_party_id: str, client_type: Optional[str], tenant_id: str, user_context: Dict[str, Any])
Expand source code
async def get_provider(
    self,
    third_party_id: str,
    client_type: Optional[str],
    tenant_id: str,
    user_context: Dict[str, Any],
):
    mt_recipe = MultitenancyRecipe.get_instance()
    tenant_config = await mt_recipe.recipe_implementation.get_tenant(
        tenant_id=tenant_id,
        user_context=user_context,
    )

    if tenant_config is None:
        raise Exception("Tenant not found")

    merged_providers = merge_providers_from_core_and_static(
        provider_configs_from_core=tenant_config.third_party_providers,
        provider_inputs_from_static=self.providers,
        include_all_providers=tenant_id == DEFAULT_TENANT_ID,
    )

    provider = await find_and_create_provider_instance(
        merged_providers, third_party_id, client_type, user_context
    )

    return provider
async def manually_create_or_update_user(self, third_party_id: str, third_party_user_id: str, email: str, is_verified: bool, session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, user_context: Dict[str, Any]) ‑> Union[ManuallyCreateOrUpdateUserOkResult, LinkingToSessionUserFailedError, SignInUpNotAllowed, EmailChangeNotAllowedError]
Expand source code
async def manually_create_or_update_user(
    self,
    third_party_id: str,
    third_party_user_id: str,
    email: str,
    is_verified: bool,
    session: Optional[SessionContainer],
    should_try_linking_with_session_user: Union[bool, None],
    tenant_id: str,
    user_context: Dict[str, Any],
) -> Union[
    ManuallyCreateOrUpdateUserOkResult,
    LinkingToSessionUserFailedError,
    SignInUpNotAllowed,
    EmailChangeNotAllowedError,
]:
    from supertokens_python.auth_utils import (
        link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info,
    )

    account_linking = AccountLinkingRecipe.get_instance()
    users = await list_users_by_account_info(
        tenant_id,
        AccountInfo(
            third_party=ThirdPartyInfo(
                third_party_id=third_party_id,
                third_party_user_id=third_party_user_id,
            ),
        ),
        False,
        user_context,
    )

    user = users[0] if users else None
    if user is not None:
        is_email_change_allowed = await account_linking.is_email_change_allowed(
            user=user,
            is_verified=is_verified,
            new_email=email,
            session=session,
            user_context=user_context,
        )
        if not is_email_change_allowed.allowed:
            reason = (
                "Email already associated with another primary user."
                if is_email_change_allowed.reason == "PRIMARY_USER_CONFLICT"
                else "New email cannot be applied to existing account because of account takeover risks."
            )
            return EmailChangeNotAllowedError(reason)

    response = await self.querier.send_post_request(
        NormalisedURLPath(f"{tenant_id}/recipe/signinup"),
        {
            "thirdPartyId": third_party_id,
            "thirdPartyUserId": third_party_user_id,
            "email": {"id": email, "isVerified": is_verified},
        },
        user_context=user_context,
    )

    if response["status"] == "EMAIL_CHANGE_NOT_ALLOWED_ERROR":
        return EmailChangeNotAllowedError(response["reason"])

    # status is OK

    user = User.from_json(
        response["user"],
    )
    recipe_user_id = RecipeUserId(response["recipeUserId"])

    await account_linking.verify_email_for_recipe_user_if_linked_accounts_are_verified(
        user=user,
        recipe_user_id=recipe_user_id,
        user_context=user_context,
    )

    # Fetch updated user
    user = await get_user(recipe_user_id.get_as_string(), user_context)

    assert user is not None

    link_result = await link_to_session_if_provided_else_create_primary_user_id_or_link_by_account_info(
        tenant_id=tenant_id,
        input_user=user,
        recipe_user_id=recipe_user_id,
        session=session,
        user_context=user_context,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    if link_result.status != "OK":
        return link_result

    return ManuallyCreateOrUpdateUserOkResult(
        user=link_result.user,
        recipe_user_id=recipe_user_id,
        created_new_recipe_user=response["createdNewUser"],
    )
async def sign_in_up(self, third_party_id: str, third_party_user_id: str, email: str, is_verified: bool, oauth_tokens: Dict[str, Any], raw_user_info_from_provider: RawUserInfoFromProvider, session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, user_context: Dict[str, Any]) ‑> Union[SignInUpOkResult, SignInUpNotAllowed, LinkingToSessionUserFailedError]
Expand source code
async def sign_in_up(
    self,
    third_party_id: str,
    third_party_user_id: str,
    email: str,
    is_verified: bool,
    oauth_tokens: Dict[str, Any],
    raw_user_info_from_provider: RawUserInfoFromProvider,
    session: Optional[SessionContainer],
    should_try_linking_with_session_user: Union[bool, None],
    tenant_id: str,
    user_context: Dict[str, Any],
) -> Union[SignInUpOkResult, SignInUpNotAllowed, LinkingToSessionUserFailedError]:
    response = await self.manually_create_or_update_user(
        third_party_id=third_party_id,
        third_party_user_id=third_party_user_id,
        email=email,
        tenant_id=tenant_id,
        is_verified=is_verified,
        session=session,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
        user_context=user_context,
    )

    if isinstance(response, EmailChangeNotAllowedError):
        return SignInUpNotAllowed(
            "Cannot sign in / up because new email cannot be applied to existing account. Please contact support. (ERR_CODE_005)"
            if response.reason
            == "Email already associated with another primary user."
            else "Cannot sign in / up because new email cannot be applied to existing account. Please contact support. (ERR_CODE_024)"
        )

    if isinstance(response, ManuallyCreateOrUpdateUserOkResult):
        return SignInUpOkResult(
            user=response.user,
            recipe_user_id=response.recipe_user_id,
            created_new_recipe_user=response.created_new_recipe_user,
            oauth_tokens=oauth_tokens,
            raw_user_info_from_provider=raw_user_info_from_provider,
        )

    return response