Module supertokens_python.recipe.totp.api.implementation
Expand source code
# Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from typing import Dict, Any, Union
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.session.exceptions import UnauthorisedError
from supertokens_python.recipe.multifactorauth.asyncio import (
assert_allowed_to_setup_factor_else_throw_invalid_claim_error,
)
from supertokens_python.recipe.multifactorauth.recipe import MultiFactorAuthRecipe
from ..interfaces import APIInterface, APIOptions
from ..types import (
CreateDeviceOkResult,
DeviceAlreadyExistsError,
ListDevicesOkResult,
RemoveDeviceOkResult,
VerifyDeviceOkResult,
UnknownDeviceError,
InvalidTOTPError,
LimitReachedError,
VerifyTOTPOkResult,
UnknownUserIdError,
)
from supertokens_python.types import GeneralErrorResponse
class APIImplementation(APIInterface):
async def create_device_post(
self,
device_name: Union[str, None],
options: APIOptions,
session: SessionContainer,
user_context: Dict[str, Any],
) -> Union[CreateDeviceOkResult, DeviceAlreadyExistsError, GeneralErrorResponse]:
user_id = session.get_user_id()
mfa_instance = MultiFactorAuthRecipe.get_instance()
if mfa_instance is None:
raise Exception("should never come here")
await assert_allowed_to_setup_factor_else_throw_invalid_claim_error(
session, "totp", user_context
)
create_device_res = await options.recipe_implementation.create_device(
user_id=user_id,
user_identifier_info=None,
device_name=device_name,
skew=None,
period=None,
user_context=user_context,
)
if isinstance(create_device_res, UnknownUserIdError):
raise UnauthorisedError("Session user not found")
return create_device_res
async def list_devices_get(
self,
options: APIOptions,
session: SessionContainer,
user_context: Dict[str, Any],
) -> Union[ListDevicesOkResult, GeneralErrorResponse]:
user_id = session.get_user_id()
return await options.recipe_implementation.list_devices(
user_id=user_id,
user_context=user_context,
)
async def remove_device_post(
self,
device_name: str,
options: APIOptions,
session: SessionContainer,
user_context: Dict[str, Any],
) -> Union[RemoveDeviceOkResult, GeneralErrorResponse]:
user_id = session.get_user_id()
return await options.recipe_implementation.remove_device(
user_id=user_id,
device_name=device_name,
user_context=user_context,
)
async def verify_device_post(
self,
device_name: str,
totp: str,
options: APIOptions,
session: SessionContainer,
user_context: Dict[str, Any],
) -> Union[
VerifyDeviceOkResult,
UnknownDeviceError,
InvalidTOTPError,
LimitReachedError,
GeneralErrorResponse,
]:
user_id = session.get_user_id()
tenant_id = session.get_tenant_id()
mfa_instance = MultiFactorAuthRecipe.get_instance()
if mfa_instance is None:
raise Exception("should never come here")
await assert_allowed_to_setup_factor_else_throw_invalid_claim_error(
session, "totp", user_context
)
res = await options.recipe_implementation.verify_device(
tenant_id=tenant_id,
user_id=user_id,
device_name=device_name,
totp=totp,
user_context=user_context,
)
if isinstance(res, VerifyDeviceOkResult):
await mfa_instance.recipe_implementation.mark_factor_as_complete_in_session(
session=session,
factor_id="totp",
user_context=user_context,
)
return res
async def verify_totp_post(
self,
totp: str,
options: APIOptions,
session: SessionContainer,
user_context: Dict[str, Any],
) -> Union[
VerifyTOTPOkResult,
UnknownUserIdError,
InvalidTOTPError,
LimitReachedError,
GeneralErrorResponse,
]:
user_id = session.get_user_id()
tenant_id = session.get_tenant_id()
mfa_instance = MultiFactorAuthRecipe.get_instance()
if mfa_instance is None:
raise Exception("should never come here")
res = await options.recipe_implementation.verify_totp(
tenant_id=tenant_id,
user_id=user_id,
totp=totp,
user_context=user_context,
)
if isinstance(res, VerifyTOTPOkResult):
await mfa_instance.recipe_implementation.mark_factor_as_complete_in_session(
session=session,
factor_id="totp",
user_context=user_context,
)
return res
Classes
class APIImplementation
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class APIImplementation(APIInterface): async def create_device_post( self, device_name: Union[str, None], options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[CreateDeviceOkResult, DeviceAlreadyExistsError, GeneralErrorResponse]: user_id = session.get_user_id() mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is None: raise Exception("should never come here") await assert_allowed_to_setup_factor_else_throw_invalid_claim_error( session, "totp", user_context ) create_device_res = await options.recipe_implementation.create_device( user_id=user_id, user_identifier_info=None, device_name=device_name, skew=None, period=None, user_context=user_context, ) if isinstance(create_device_res, UnknownUserIdError): raise UnauthorisedError("Session user not found") return create_device_res async def list_devices_get( self, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[ListDevicesOkResult, GeneralErrorResponse]: user_id = session.get_user_id() return await options.recipe_implementation.list_devices( user_id=user_id, user_context=user_context, ) async def remove_device_post( self, device_name: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[RemoveDeviceOkResult, GeneralErrorResponse]: user_id = session.get_user_id() return await options.recipe_implementation.remove_device( user_id=user_id, device_name=device_name, user_context=user_context, ) async def verify_device_post( self, device_name: str, totp: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[ VerifyDeviceOkResult, UnknownDeviceError, InvalidTOTPError, LimitReachedError, GeneralErrorResponse, ]: user_id = session.get_user_id() tenant_id = session.get_tenant_id() mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is None: raise Exception("should never come here") await assert_allowed_to_setup_factor_else_throw_invalid_claim_error( session, "totp", user_context ) res = await options.recipe_implementation.verify_device( tenant_id=tenant_id, user_id=user_id, device_name=device_name, totp=totp, user_context=user_context, ) if isinstance(res, VerifyDeviceOkResult): await mfa_instance.recipe_implementation.mark_factor_as_complete_in_session( session=session, factor_id="totp", user_context=user_context, ) return res async def verify_totp_post( self, totp: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[ VerifyTOTPOkResult, UnknownUserIdError, InvalidTOTPError, LimitReachedError, GeneralErrorResponse, ]: user_id = session.get_user_id() tenant_id = session.get_tenant_id() mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is None: raise Exception("should never come here") res = await options.recipe_implementation.verify_totp( tenant_id=tenant_id, user_id=user_id, totp=totp, user_context=user_context, ) if isinstance(res, VerifyTOTPOkResult): await mfa_instance.recipe_implementation.mark_factor_as_complete_in_session( session=session, factor_id="totp", user_context=user_context, ) return res
Ancestors
- APIInterface
- abc.ABC
Methods
async def create_device_post(self, device_name: Optional[str], options: APIOptions, session: SessionContainer, user_context: Dict[str, Any]) ‑> Union[CreateDeviceOkResult, DeviceAlreadyExistsError, GeneralErrorResponse]
-
Expand source code
async def create_device_post( self, device_name: Union[str, None], options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[CreateDeviceOkResult, DeviceAlreadyExistsError, GeneralErrorResponse]: user_id = session.get_user_id() mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is None: raise Exception("should never come here") await assert_allowed_to_setup_factor_else_throw_invalid_claim_error( session, "totp", user_context ) create_device_res = await options.recipe_implementation.create_device( user_id=user_id, user_identifier_info=None, device_name=device_name, skew=None, period=None, user_context=user_context, ) if isinstance(create_device_res, UnknownUserIdError): raise UnauthorisedError("Session user not found") return create_device_res
async def list_devices_get(self, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any]) ‑> Union[ListDevicesOkResult, GeneralErrorResponse]
-
Expand source code
async def list_devices_get( self, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[ListDevicesOkResult, GeneralErrorResponse]: user_id = session.get_user_id() return await options.recipe_implementation.list_devices( user_id=user_id, user_context=user_context, )
async def remove_device_post(self, device_name: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any]) ‑> Union[RemoveDeviceOkResult, GeneralErrorResponse]
-
Expand source code
async def remove_device_post( self, device_name: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[RemoveDeviceOkResult, GeneralErrorResponse]: user_id = session.get_user_id() return await options.recipe_implementation.remove_device( user_id=user_id, device_name=device_name, user_context=user_context, )
async def verify_device_post(self, device_name: str, totp: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any]) ‑> Union[VerifyDeviceOkResult, UnknownDeviceError, InvalidTOTPError, LimitReachedError, GeneralErrorResponse]
-
Expand source code
async def verify_device_post( self, device_name: str, totp: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[ VerifyDeviceOkResult, UnknownDeviceError, InvalidTOTPError, LimitReachedError, GeneralErrorResponse, ]: user_id = session.get_user_id() tenant_id = session.get_tenant_id() mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is None: raise Exception("should never come here") await assert_allowed_to_setup_factor_else_throw_invalid_claim_error( session, "totp", user_context ) res = await options.recipe_implementation.verify_device( tenant_id=tenant_id, user_id=user_id, device_name=device_name, totp=totp, user_context=user_context, ) if isinstance(res, VerifyDeviceOkResult): await mfa_instance.recipe_implementation.mark_factor_as_complete_in_session( session=session, factor_id="totp", user_context=user_context, ) return res
async def verify_totp_post(self, totp: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any]) ‑> Union[VerifyTOTPOkResult, UnknownUserIdError, InvalidTOTPError, LimitReachedError, GeneralErrorResponse]
-
Expand source code
async def verify_totp_post( self, totp: str, options: APIOptions, session: SessionContainer, user_context: Dict[str, Any], ) -> Union[ VerifyTOTPOkResult, UnknownUserIdError, InvalidTOTPError, LimitReachedError, GeneralErrorResponse, ]: user_id = session.get_user_id() tenant_id = session.get_tenant_id() mfa_instance = MultiFactorAuthRecipe.get_instance() if mfa_instance is None: raise Exception("should never come here") res = await options.recipe_implementation.verify_totp( tenant_id=tenant_id, user_id=user_id, totp=totp, user_context=user_context, ) if isinstance(res, VerifyTOTPOkResult): await mfa_instance.recipe_implementation.mark_factor_as_complete_in_session( session=session, factor_id="totp", user_context=user_context, ) return res