Module supertokens_python.recipe.multifactorauth.utils

Expand source code
# Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional, Union, Dict, Any
from supertokens_python.recipe.multifactorauth.multi_factor_auth_claim import (
    MultiFactorAuthClaim,
)
from supertokens_python.recipe.multitenancy.asyncio import get_tenant
from supertokens_python.recipe.multitenancy.recipe import MultitenancyRecipe
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.session.asyncio import get_session_information
from supertokens_python.recipe.session.exceptions import UnauthorisedError
from supertokens_python.recipe.accountlinking.recipe import AccountLinkingRecipe
from supertokens_python.recipe.multifactorauth.types import (
    MFAClaimValue,
    MFARequirementList,
    FactorIds,
)
from supertokens_python.types import RecipeUserId
import math
import time
from typing_extensions import Literal
from supertokens_python.utils import log_debug_message

if TYPE_CHECKING:
    from .types import OverrideConfig, MultiFactorAuthConfig


# IMPORTANT: If this function signature is modified, please update all tha places where this function is called.
# There will be no type errors cause we use importLib to dynamically import if to prevent cyclic import issues.
def validate_and_normalise_user_input(
    first_factors: Optional[List[str]],
    override: Union[OverrideConfig, None] = None,
) -> MultiFactorAuthConfig:
    if first_factors is not None and len(first_factors) == 0:
        raise ValueError("'first_factors' can be either None or a non-empty list")

    from .types import OverrideConfig as OC, MultiFactorAuthConfig as MFAC

    if override is None:
        override = OC()

    return MFAC(
        first_factors=first_factors,
        override=override,
    )


class UpdateAndGetMFARelatedInfoInSessionResult:
    def __init__(
        self,
        completed_factors: Dict[str, int],
        mfa_requirements_for_auth: MFARequirementList,
        is_mfa_requirements_for_auth_satisfied: bool,
    ):
        self.completed_factors = completed_factors
        self.mfa_requirements_for_auth = mfa_requirements_for_auth
        self.is_mfa_requirements_for_auth_satisfied = (
            is_mfa_requirements_for_auth_satisfied
        )


# IMPORTANT: If this function signature is modified, please update all tha places where this function is called.
# There will be no type errors cause we use importLib to dynamically import if to prevent cyclic import issues.
async def update_and_get_mfa_related_info_in_session(
    user_context: Dict[str, Any],
    input_session_recipe_user_id: Optional[RecipeUserId] = None,
    input_tenant_id: Optional[str] = None,
    input_access_token_payload: Optional[Dict[str, Any]] = None,
    input_session: Optional[SessionContainer] = None,
    input_updated_factor_id: Optional[str] = None,
) -> UpdateAndGetMFARelatedInfoInSessionResult:
    from supertokens_python.recipe.multifactorauth.recipe import (
        MultiFactorAuthRecipe as Recipe,
    )

    session_recipe_user_id: RecipeUserId
    tenant_id: str
    access_token_payload: Dict[str, Any]
    session_handle: str

    if input_session is not None:
        session_recipe_user_id = input_session.get_recipe_user_id(user_context)
        tenant_id = input_session.get_tenant_id(user_context)
        access_token_payload = input_session.get_access_token_payload(user_context)
        session_handle = input_session.get_handle(user_context)
    else:
        assert input_session_recipe_user_id is not None
        assert input_tenant_id is not None
        assert input_access_token_payload is not None
        session_recipe_user_id = input_session_recipe_user_id
        tenant_id = input_tenant_id
        access_token_payload = input_access_token_payload
        session_handle = access_token_payload["sessionHandle"]

    updated_claim_val = False
    mfa_claim_value = MultiFactorAuthClaim.get_value_from_payload(access_token_payload)

    if input_updated_factor_id is not None:
        if mfa_claim_value is None:
            updated_claim_val = True
            mfa_claim_value = MFAClaimValue(
                c={input_updated_factor_id: math.floor(time.time())},
                v=True,  # updated later in the function
            )
        else:
            updated_claim_val = True
            mfa_claim_value.c[input_updated_factor_id] = math.floor(time.time())

    if mfa_claim_value is None:
        session_user = (
            await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
                session_recipe_user_id.get_as_string(), user_context
            )
        )
        if session_user is None:
            raise UnauthorisedError("Session user not found")

        session_info = await get_session_information(session_handle, user_context)
        if session_info is None:
            raise UnauthorisedError("Session not found")

        first_factor_time = session_info.time_created
        computed_first_factor_id_for_session = None

        for login_method in session_user.login_methods:
            if (
                login_method.recipe_user_id.get_as_string()
                == session_recipe_user_id.get_as_string()
            ):
                if login_method.recipe_id == "emailpassword":
                    valid_res = await is_valid_first_factor(
                        tenant_id, FactorIds.EMAILPASSWORD, user_context
                    )
                    if valid_res == "TENANT_NOT_FOUND_ERROR":
                        raise UnauthorisedError("Tenant not found")
                    elif valid_res == "OK":
                        computed_first_factor_id_for_session = FactorIds.EMAILPASSWORD
                        break
                elif login_method.recipe_id == "thirdparty":
                    valid_res = await is_valid_first_factor(
                        tenant_id, FactorIds.THIRDPARTY, user_context
                    )
                    if valid_res == "TENANT_NOT_FOUND_ERROR":
                        raise UnauthorisedError("Tenant not found")
                    elif valid_res == "OK":
                        computed_first_factor_id_for_session = FactorIds.THIRDPARTY
                        break
                else:
                    factors_to_check: List[str] = []
                    if login_method.email is not None:
                        factors_to_check.extend(
                            [FactorIds.LINK_EMAIL, FactorIds.OTP_EMAIL]
                        )
                    if login_method.phone_number is not None:
                        factors_to_check.extend(
                            [FactorIds.LINK_PHONE, FactorIds.OTP_PHONE]
                        )

                    for factor_id in factors_to_check:
                        valid_res = await is_valid_first_factor(
                            tenant_id, factor_id, user_context
                        )
                        if valid_res == "TENANT_NOT_FOUND_ERROR":
                            raise UnauthorisedError("Tenant not found")
                        elif valid_res == "OK":
                            computed_first_factor_id_for_session = factor_id
                            break

                    if computed_first_factor_id_for_session is not None:
                        break

        if computed_first_factor_id_for_session is None:
            raise UnauthorisedError("Incorrect login method used")

        updated_claim_val = True
        mfa_claim_value = MFAClaimValue(
            c={computed_first_factor_id_for_session: first_factor_time},
            v=True,  # updated later in this function
        )

    completed_factors = mfa_claim_value.c

    async def user_getter():
        resp = await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
            session_recipe_user_id.get_as_string(), user_context
        )
        if resp is None:
            raise UnauthorisedError("Session user not found")
        return resp

    async def get_required_secondary_factors_for_tenant(
        tenant_id: str, user_context: Dict[str, Any]
    ) -> List[str]:
        tenant_info = await get_tenant(tenant_id, user_context)
        if tenant_info is None:
            raise UnauthorisedError("Tenant not found")
        return (
            tenant_info.required_secondary_factors
            if tenant_info.required_secondary_factors is not None
            else []
        )

    async def get_factors_setup_for_user() -> List[str]:
        return await Recipe.get_instance_or_throw_error().recipe_implementation.get_factors_setup_for_user(
            user=(await user_getter()), user_context=user_context
        )

    async def get_required_secondary_factors_for_user() -> List[str]:
        return await Recipe.get_instance_or_throw_error().recipe_implementation.get_required_secondary_factors_for_user(
            user_id=(await user_getter()).id, user_context=user_context
        )

    async def get_required_secondary_factors_for_tenant_helper() -> List[str]:
        return await get_required_secondary_factors_for_tenant(
            tenant_id=tenant_id, user_context=user_context
        )

    mfa_requirements_for_auth = await Recipe.get_instance_or_throw_error().recipe_implementation.get_mfa_requirements_for_auth(
        tenant_id=tenant_id,
        access_token_payload=access_token_payload,
        user=user_getter,
        factors_set_up_for_user=get_factors_setup_for_user,
        required_secondary_factors_for_user=get_required_secondary_factors_for_user,
        required_secondary_factors_for_tenant=get_required_secondary_factors_for_tenant_helper,
        completed_factors=completed_factors,
        user_context=user_context,
    )

    are_auth_reqs_complete = (
        len(
            MultiFactorAuthClaim.get_next_set_of_unsatisfied_factors(
                completed_factors, mfa_requirements_for_auth
            ).factor_ids
        )
        == 0
    )

    if mfa_claim_value.v != are_auth_reqs_complete:
        updated_claim_val = True
        mfa_claim_value.v = are_auth_reqs_complete

    if input_session is not None and updated_claim_val:
        await input_session.set_claim_value(
            MultiFactorAuthClaim, mfa_claim_value, user_context
        )

    return UpdateAndGetMFARelatedInfoInSessionResult(
        completed_factors=completed_factors,
        mfa_requirements_for_auth=mfa_requirements_for_auth,
        is_mfa_requirements_for_auth_satisfied=mfa_claim_value.v,
    )


# IMPORTANT: If this function signature is modified, please update all tha places where this function is called.
# There will be no type errors cause we use importLib to dynamically import if to prevent cyclic import issues.
async def is_valid_first_factor(
    tenant_id: str, factor_id: str, user_context: Dict[str, Any]
) -> Literal["OK", "INVALID_FIRST_FACTOR_ERROR", "TENANT_NOT_FOUND_ERROR"]:

    mt_recipe = MultitenancyRecipe.get_instance()
    tenant_info = await get_tenant(tenant_id=tenant_id, user_context=user_context)
    if tenant_info is None:
        return "TENANT_NOT_FOUND_ERROR"

    tenant_config = tenant_info

    first_factors_from_mfa = mt_recipe.static_first_factors

    log_debug_message(
        f"is_valid_first_factor got {', '.join(tenant_config.first_factors) if tenant_config.first_factors else None} from tenant config"
    )
    log_debug_message(f"is_valid_first_factor got {first_factors_from_mfa} from MFA")

    configured_first_factors: Union[List[str], None] = (
        tenant_config.first_factors or first_factors_from_mfa
    )

    if configured_first_factors is None:
        configured_first_factors = mt_recipe.all_available_first_factors

    if is_factor_configured_for_tenant(
        all_available_first_factors=mt_recipe.all_available_first_factors,
        first_factors=configured_first_factors,
        factor_id=factor_id,
    ):
        return "OK"

    return "INVALID_FIRST_FACTOR_ERROR"


def is_factor_configured_for_tenant(
    all_available_first_factors: List[str],
    first_factors: List[str],
    factor_id: str,
) -> bool:
    configured_first_factors = [
        f
        for f in first_factors
        if f in all_available_first_factors or f not in FactorIds.__dict__.values()
    ]

    return factor_id in configured_first_factors

Functions

def is_factor_configured_for_tenant(all_available_first_factors: List[str], first_factors: List[str], factor_id: str) ‑> bool
Expand source code
def is_factor_configured_for_tenant(
    all_available_first_factors: List[str],
    first_factors: List[str],
    factor_id: str,
) -> bool:
    configured_first_factors = [
        f
        for f in first_factors
        if f in all_available_first_factors or f not in FactorIds.__dict__.values()
    ]

    return factor_id in configured_first_factors
async def is_valid_first_factor(tenant_id: str, factor_id: str, user_context: Dict[str, Any]) ‑> Literal['OK', 'INVALID_FIRST_FACTOR_ERROR', 'TENANT_NOT_FOUND_ERROR']
Expand source code
async def is_valid_first_factor(
    tenant_id: str, factor_id: str, user_context: Dict[str, Any]
) -> Literal["OK", "INVALID_FIRST_FACTOR_ERROR", "TENANT_NOT_FOUND_ERROR"]:

    mt_recipe = MultitenancyRecipe.get_instance()
    tenant_info = await get_tenant(tenant_id=tenant_id, user_context=user_context)
    if tenant_info is None:
        return "TENANT_NOT_FOUND_ERROR"

    tenant_config = tenant_info

    first_factors_from_mfa = mt_recipe.static_first_factors

    log_debug_message(
        f"is_valid_first_factor got {', '.join(tenant_config.first_factors) if tenant_config.first_factors else None} from tenant config"
    )
    log_debug_message(f"is_valid_first_factor got {first_factors_from_mfa} from MFA")

    configured_first_factors: Union[List[str], None] = (
        tenant_config.first_factors or first_factors_from_mfa
    )

    if configured_first_factors is None:
        configured_first_factors = mt_recipe.all_available_first_factors

    if is_factor_configured_for_tenant(
        all_available_first_factors=mt_recipe.all_available_first_factors,
        first_factors=configured_first_factors,
        factor_id=factor_id,
    ):
        return "OK"

    return "INVALID_FIRST_FACTOR_ERROR"
Expand source code
async def update_and_get_mfa_related_info_in_session(
    user_context: Dict[str, Any],
    input_session_recipe_user_id: Optional[RecipeUserId] = None,
    input_tenant_id: Optional[str] = None,
    input_access_token_payload: Optional[Dict[str, Any]] = None,
    input_session: Optional[SessionContainer] = None,
    input_updated_factor_id: Optional[str] = None,
) -> UpdateAndGetMFARelatedInfoInSessionResult:
    from supertokens_python.recipe.multifactorauth.recipe import (
        MultiFactorAuthRecipe as Recipe,
    )

    session_recipe_user_id: RecipeUserId
    tenant_id: str
    access_token_payload: Dict[str, Any]
    session_handle: str

    if input_session is not None:
        session_recipe_user_id = input_session.get_recipe_user_id(user_context)
        tenant_id = input_session.get_tenant_id(user_context)
        access_token_payload = input_session.get_access_token_payload(user_context)
        session_handle = input_session.get_handle(user_context)
    else:
        assert input_session_recipe_user_id is not None
        assert input_tenant_id is not None
        assert input_access_token_payload is not None
        session_recipe_user_id = input_session_recipe_user_id
        tenant_id = input_tenant_id
        access_token_payload = input_access_token_payload
        session_handle = access_token_payload["sessionHandle"]

    updated_claim_val = False
    mfa_claim_value = MultiFactorAuthClaim.get_value_from_payload(access_token_payload)

    if input_updated_factor_id is not None:
        if mfa_claim_value is None:
            updated_claim_val = True
            mfa_claim_value = MFAClaimValue(
                c={input_updated_factor_id: math.floor(time.time())},
                v=True,  # updated later in the function
            )
        else:
            updated_claim_val = True
            mfa_claim_value.c[input_updated_factor_id] = math.floor(time.time())

    if mfa_claim_value is None:
        session_user = (
            await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
                session_recipe_user_id.get_as_string(), user_context
            )
        )
        if session_user is None:
            raise UnauthorisedError("Session user not found")

        session_info = await get_session_information(session_handle, user_context)
        if session_info is None:
            raise UnauthorisedError("Session not found")

        first_factor_time = session_info.time_created
        computed_first_factor_id_for_session = None

        for login_method in session_user.login_methods:
            if (
                login_method.recipe_user_id.get_as_string()
                == session_recipe_user_id.get_as_string()
            ):
                if login_method.recipe_id == "emailpassword":
                    valid_res = await is_valid_first_factor(
                        tenant_id, FactorIds.EMAILPASSWORD, user_context
                    )
                    if valid_res == "TENANT_NOT_FOUND_ERROR":
                        raise UnauthorisedError("Tenant not found")
                    elif valid_res == "OK":
                        computed_first_factor_id_for_session = FactorIds.EMAILPASSWORD
                        break
                elif login_method.recipe_id == "thirdparty":
                    valid_res = await is_valid_first_factor(
                        tenant_id, FactorIds.THIRDPARTY, user_context
                    )
                    if valid_res == "TENANT_NOT_FOUND_ERROR":
                        raise UnauthorisedError("Tenant not found")
                    elif valid_res == "OK":
                        computed_first_factor_id_for_session = FactorIds.THIRDPARTY
                        break
                else:
                    factors_to_check: List[str] = []
                    if login_method.email is not None:
                        factors_to_check.extend(
                            [FactorIds.LINK_EMAIL, FactorIds.OTP_EMAIL]
                        )
                    if login_method.phone_number is not None:
                        factors_to_check.extend(
                            [FactorIds.LINK_PHONE, FactorIds.OTP_PHONE]
                        )

                    for factor_id in factors_to_check:
                        valid_res = await is_valid_first_factor(
                            tenant_id, factor_id, user_context
                        )
                        if valid_res == "TENANT_NOT_FOUND_ERROR":
                            raise UnauthorisedError("Tenant not found")
                        elif valid_res == "OK":
                            computed_first_factor_id_for_session = factor_id
                            break

                    if computed_first_factor_id_for_session is not None:
                        break

        if computed_first_factor_id_for_session is None:
            raise UnauthorisedError("Incorrect login method used")

        updated_claim_val = True
        mfa_claim_value = MFAClaimValue(
            c={computed_first_factor_id_for_session: first_factor_time},
            v=True,  # updated later in this function
        )

    completed_factors = mfa_claim_value.c

    async def user_getter():
        resp = await AccountLinkingRecipe.get_instance().recipe_implementation.get_user(
            session_recipe_user_id.get_as_string(), user_context
        )
        if resp is None:
            raise UnauthorisedError("Session user not found")
        return resp

    async def get_required_secondary_factors_for_tenant(
        tenant_id: str, user_context: Dict[str, Any]
    ) -> List[str]:
        tenant_info = await get_tenant(tenant_id, user_context)
        if tenant_info is None:
            raise UnauthorisedError("Tenant not found")
        return (
            tenant_info.required_secondary_factors
            if tenant_info.required_secondary_factors is not None
            else []
        )

    async def get_factors_setup_for_user() -> List[str]:
        return await Recipe.get_instance_or_throw_error().recipe_implementation.get_factors_setup_for_user(
            user=(await user_getter()), user_context=user_context
        )

    async def get_required_secondary_factors_for_user() -> List[str]:
        return await Recipe.get_instance_or_throw_error().recipe_implementation.get_required_secondary_factors_for_user(
            user_id=(await user_getter()).id, user_context=user_context
        )

    async def get_required_secondary_factors_for_tenant_helper() -> List[str]:
        return await get_required_secondary_factors_for_tenant(
            tenant_id=tenant_id, user_context=user_context
        )

    mfa_requirements_for_auth = await Recipe.get_instance_or_throw_error().recipe_implementation.get_mfa_requirements_for_auth(
        tenant_id=tenant_id,
        access_token_payload=access_token_payload,
        user=user_getter,
        factors_set_up_for_user=get_factors_setup_for_user,
        required_secondary_factors_for_user=get_required_secondary_factors_for_user,
        required_secondary_factors_for_tenant=get_required_secondary_factors_for_tenant_helper,
        completed_factors=completed_factors,
        user_context=user_context,
    )

    are_auth_reqs_complete = (
        len(
            MultiFactorAuthClaim.get_next_set_of_unsatisfied_factors(
                completed_factors, mfa_requirements_for_auth
            ).factor_ids
        )
        == 0
    )

    if mfa_claim_value.v != are_auth_reqs_complete:
        updated_claim_val = True
        mfa_claim_value.v = are_auth_reqs_complete

    if input_session is not None and updated_claim_val:
        await input_session.set_claim_value(
            MultiFactorAuthClaim, mfa_claim_value, user_context
        )

    return UpdateAndGetMFARelatedInfoInSessionResult(
        completed_factors=completed_factors,
        mfa_requirements_for_auth=mfa_requirements_for_auth,
        is_mfa_requirements_for_auth_satisfied=mfa_claim_value.v,
    )
def validate_and_normalise_user_input(first_factors: Optional[List[str]], override: Union[OverrideConfig, None] = None) ‑> MultiFactorAuthConfig
Expand source code
def validate_and_normalise_user_input(
    first_factors: Optional[List[str]],
    override: Union[OverrideConfig, None] = None,
) -> MultiFactorAuthConfig:
    if first_factors is not None and len(first_factors) == 0:
        raise ValueError("'first_factors' can be either None or a non-empty list")

    from .types import OverrideConfig as OC, MultiFactorAuthConfig as MFAC

    if override is None:
        override = OC()

    return MFAC(
        first_factors=first_factors,
        override=override,
    )

Classes

class UpdateAndGetMFARelatedInfoInSessionResult (completed_factors: Dict[str, int], mfa_requirements_for_auth: MFARequirementList, is_mfa_requirements_for_auth_satisfied: bool)
Expand source code
class UpdateAndGetMFARelatedInfoInSessionResult:
    def __init__(
        self,
        completed_factors: Dict[str, int],
        mfa_requirements_for_auth: MFARequirementList,
        is_mfa_requirements_for_auth_satisfied: bool,
    ):
        self.completed_factors = completed_factors
        self.mfa_requirements_for_auth = mfa_requirements_for_auth
        self.is_mfa_requirements_for_auth_satisfied = (
            is_mfa_requirements_for_auth_satisfied
        )