Module supertokens_python.recipe.multifactorauth.api.implementation

Expand source code
# Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import importlib

from typing import Any, Dict, List, Union
from supertokens_python.recipe.multifactorauth.multi_factor_auth_claim import (
    MultiFactorAuthClaim,
)

from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.multitenancy.asyncio import get_tenant
from supertokens_python.asyncio import get_user
from supertokens_python.recipe.session.exceptions import (
    InvalidClaimsError,
    SuperTokensSessionError,
    UnauthorisedError,
)

from supertokens_python.types import GeneralErrorResponse
from ..interfaces import (
    APIInterface,
    APIOptions,
    NextFactors,
    ResyncSessionAndFetchMFAInfoPUTOkResult,
)


class APIImplementation(APIInterface):
    async def resync_session_and_fetch_mfa_info_put(
        self,
        api_options: APIOptions,
        session: SessionContainer,
        user_context: Dict[str, Any],
    ) -> Union[ResyncSessionAndFetchMFAInfoPUTOkResult, GeneralErrorResponse]:

        module = importlib.import_module(
            "supertokens_python.recipe.multifactorauth.utils"
        )

        session_user = await get_user(session.get_user_id(), user_context)

        if session_user is None:
            raise UnauthorisedError(
                "Session user not found",
            )

        mfa_info = await module.update_and_get_mfa_related_info_in_session(
            input_session=session,
            user_context=user_context,
        )
        factors_setup_for_user = (
            await api_options.recipe_implementation.get_factors_setup_for_user(
                user=session_user,
                user_context=user_context,
            )
        )
        tenant_info = await get_tenant(
            session.get_tenant_id(user_context), user_context
        )
        if tenant_info is None:
            raise UnauthorisedError(
                "Tenant not found",
            )
        all_available_secondary_factors = (
            await api_options.recipe_instance.get_all_available_secondary_factor_ids(
                tenant_info
            )
        )

        factors_allowed_to_setup: List[str] = []

        async def get_factors_set_up_for_user():
            return factors_setup_for_user

        async def get_mfa_requirements_for_auth():
            return mfa_info.mfa_requirements_for_auth

        for factor_id in all_available_secondary_factors:
            try:
                await api_options.recipe_implementation.assert_allowed_to_setup_factor_else_throw_invalid_claim_error(
                    session=session,
                    factor_id=factor_id,
                    factors_set_up_for_user=get_factors_set_up_for_user,
                    mfa_requirements_for_auth=get_mfa_requirements_for_auth,
                    user_context=user_context,
                )
                factors_allowed_to_setup.append(factor_id)
            except SuperTokensSessionError as err:
                if not isinstance(err, InvalidClaimsError):
                    raise err

        next_set_of_unsatisfied_factors = (
            MultiFactorAuthClaim.get_next_set_of_unsatisfied_factors(
                mfa_info.completed_factors, mfa_info.mfa_requirements_for_auth
            )
        )

        get_emails_for_factors_result = (
            await api_options.recipe_instance.get_emails_for_factors(
                session_user, session.get_recipe_user_id(user_context)
            )
        )
        get_phone_numbers_for_factors_result = (
            await api_options.recipe_instance.get_phone_numbers_for_factors(
                session_user, session.get_recipe_user_id(user_context)
            )
        )
        if (
            get_emails_for_factors_result.status == "UNKNOWN_SESSION_RECIPE_USER_ID"
            or get_phone_numbers_for_factors_result.status
            == "UNKNOWN_SESSION_RECIPE_USER_ID"
        ):
            raise UnauthorisedError(
                "User no longer associated with the session",
            )

        next_factors = [
            factor_id
            for factor_id in next_set_of_unsatisfied_factors.factor_ids
            if factor_id in factors_allowed_to_setup
            or factor_id in factors_setup_for_user
        ]

        if (
            len(next_factors) == 0
            and len(next_set_of_unsatisfied_factors.factor_ids) != 0
        ):
            raise Exception(
                f"The user is required to complete secondary factors they are not allowed to "
                f"({', '.join(next_set_of_unsatisfied_factors.factor_ids)}), likely because of configuration issues."
            )
        return ResyncSessionAndFetchMFAInfoPUTOkResult(
            factors=NextFactors(
                next_=next_factors,
                already_setup=factors_setup_for_user,
                allowed_to_setup=factors_allowed_to_setup,
            ),
            emails=get_emails_for_factors_result.factor_id_to_emails_map,
            phone_numbers=get_phone_numbers_for_factors_result.factor_id_to_phone_number_map,
        )

Classes

class APIImplementation
Expand source code
class APIImplementation(APIInterface):
    async def resync_session_and_fetch_mfa_info_put(
        self,
        api_options: APIOptions,
        session: SessionContainer,
        user_context: Dict[str, Any],
    ) -> Union[ResyncSessionAndFetchMFAInfoPUTOkResult, GeneralErrorResponse]:

        module = importlib.import_module(
            "supertokens_python.recipe.multifactorauth.utils"
        )

        session_user = await get_user(session.get_user_id(), user_context)

        if session_user is None:
            raise UnauthorisedError(
                "Session user not found",
            )

        mfa_info = await module.update_and_get_mfa_related_info_in_session(
            input_session=session,
            user_context=user_context,
        )
        factors_setup_for_user = (
            await api_options.recipe_implementation.get_factors_setup_for_user(
                user=session_user,
                user_context=user_context,
            )
        )
        tenant_info = await get_tenant(
            session.get_tenant_id(user_context), user_context
        )
        if tenant_info is None:
            raise UnauthorisedError(
                "Tenant not found",
            )
        all_available_secondary_factors = (
            await api_options.recipe_instance.get_all_available_secondary_factor_ids(
                tenant_info
            )
        )

        factors_allowed_to_setup: List[str] = []

        async def get_factors_set_up_for_user():
            return factors_setup_for_user

        async def get_mfa_requirements_for_auth():
            return mfa_info.mfa_requirements_for_auth

        for factor_id in all_available_secondary_factors:
            try:
                await api_options.recipe_implementation.assert_allowed_to_setup_factor_else_throw_invalid_claim_error(
                    session=session,
                    factor_id=factor_id,
                    factors_set_up_for_user=get_factors_set_up_for_user,
                    mfa_requirements_for_auth=get_mfa_requirements_for_auth,
                    user_context=user_context,
                )
                factors_allowed_to_setup.append(factor_id)
            except SuperTokensSessionError as err:
                if not isinstance(err, InvalidClaimsError):
                    raise err

        next_set_of_unsatisfied_factors = (
            MultiFactorAuthClaim.get_next_set_of_unsatisfied_factors(
                mfa_info.completed_factors, mfa_info.mfa_requirements_for_auth
            )
        )

        get_emails_for_factors_result = (
            await api_options.recipe_instance.get_emails_for_factors(
                session_user, session.get_recipe_user_id(user_context)
            )
        )
        get_phone_numbers_for_factors_result = (
            await api_options.recipe_instance.get_phone_numbers_for_factors(
                session_user, session.get_recipe_user_id(user_context)
            )
        )
        if (
            get_emails_for_factors_result.status == "UNKNOWN_SESSION_RECIPE_USER_ID"
            or get_phone_numbers_for_factors_result.status
            == "UNKNOWN_SESSION_RECIPE_USER_ID"
        ):
            raise UnauthorisedError(
                "User no longer associated with the session",
            )

        next_factors = [
            factor_id
            for factor_id in next_set_of_unsatisfied_factors.factor_ids
            if factor_id in factors_allowed_to_setup
            or factor_id in factors_setup_for_user
        ]

        if (
            len(next_factors) == 0
            and len(next_set_of_unsatisfied_factors.factor_ids) != 0
        ):
            raise Exception(
                f"The user is required to complete secondary factors they are not allowed to "
                f"({', '.join(next_set_of_unsatisfied_factors.factor_ids)}), likely because of configuration issues."
            )
        return ResyncSessionAndFetchMFAInfoPUTOkResult(
            factors=NextFactors(
                next_=next_factors,
                already_setup=factors_setup_for_user,
                allowed_to_setup=factors_allowed_to_setup,
            ),
            emails=get_emails_for_factors_result.factor_id_to_emails_map,
            phone_numbers=get_phone_numbers_for_factors_result.factor_id_to_phone_number_map,
        )

Ancestors

Methods

async def resync_session_and_fetch_mfa_info_put(self, api_options: APIOptions, session: SessionContainer, user_context: Dict[str, Any]) ‑> Union[ResyncSessionAndFetchMFAInfoPUTOkResultGeneralErrorResponse]