Module supertokens_python.recipe.dashboard.api

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from .analytics import handle_analytics_post
from .api_key_protector import api_key_protector
from .dashboard import handle_dashboard_api
from .search.getTags import handle_get_tags
from .signin import handle_emailpassword_signin_api
from .signout import handle_emailpassword_signout_api
from .userdetails.user_delete import handle_user_delete
from .userdetails.user_email_verify_get import handle_user_email_verify_get
from .userdetails.user_email_verify_put import handle_user_email_verify_put
from .userdetails.user_email_verify_token_post import handle_email_verify_token_post
from .userdetails.user_get import handle_user_get
from .userdetails.user_metadata_get import handle_metadata_get
from .userdetails.user_metadata_put import handle_metadata_put
from .userdetails.user_password_put import handle_user_password_put
from .userdetails.user_put import handle_user_put
from .userdetails.user_sessions_get import handle_sessions_get
from .userdetails.user_sessions_post import handle_user_sessions_post
from .users_count_get import handle_users_count_get_api
from .users_get import handle_users_get_api
from .validate_key import handle_validate_key_api
from .list_tenants import handle_list_tenants_api

__all__ = [
    "handle_dashboard_api",
    "api_key_protector",
    "handle_users_count_get_api",
    "handle_users_get_api",
    "handle_validate_key_api",
    "handle_user_email_verify_get",
    "handle_user_get",
    "handle_metadata_get",
    "handle_sessions_get",
    "handle_user_delete",
    "handle_user_put",
    "handle_user_email_verify_put",
    "handle_metadata_put",
    "handle_user_sessions_post",
    "handle_user_password_put",
    "handle_email_verify_token_post",
    "handle_emailpassword_signin_api",
    "handle_emailpassword_signout_api",
    "handle_get_tags",
    "handle_analytics_post",
    "handle_list_tenants_api",
]

Sub-modules

supertokens_python.recipe.dashboard.api.analytics
supertokens_python.recipe.dashboard.api.dashboard
supertokens_python.recipe.dashboard.api.implementation
supertokens_python.recipe.dashboard.api.list_tenants
supertokens_python.recipe.dashboard.api.search
supertokens_python.recipe.dashboard.api.signin
supertokens_python.recipe.dashboard.api.signout
supertokens_python.recipe.dashboard.api.userdetails
supertokens_python.recipe.dashboard.api.users_count_get
supertokens_python.recipe.dashboard.api.users_get
supertokens_python.recipe.dashboard.api.validate_key

Functions

async def api_key_protector(api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, api_function: Callable[[APIInterface, str, APIOptions, Dict[str, Any]], Awaitable[APIResponse]], user_context: Dict[str, Any]) ‑> Optional[BaseResponse]
Expand source code
async def api_key_protector(
    api_implementation: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    api_function: Callable[
        [APIInterface, str, APIOptions, Dict[str, Any]], Awaitable[APIResponse]
    ],
    user_context: Dict[str, Any],
) -> Optional[BaseResponse]:
    should_allow_access = False

    try:
        should_allow_access = (
            await api_options.recipe_implementation.should_allow_access(
                api_options.request, api_options.config, user_context
            )
        )
    except DashboardOperationNotAllowedError as _:
        return send_non_200_response_with_message(
            "You are not permitted to perform this operation",
            403,
            api_options.response,
        )

    if should_allow_access is False:
        return send_non_200_response_with_message(
            "Unauthorised access", 401, api_options.response
        )

    response = await api_function(
        api_implementation, tenant_id, api_options, user_context
    )
    return send_200_response(response.to_json(), api_options.response)
async def handle_analytics_post(_: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> AnalyticsResponse
Expand source code
async def handle_analytics_post(
    _: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    _user_context: Dict[str, Any],
) -> AnalyticsResponse:
    if not Supertokens.get_instance().telemetry:
        return AnalyticsResponse()
    body = await api_options.request.json()
    if body is None:
        raise_bad_input_exception("Please send body")
    email = body.get("email")
    dashboard_version = body.get("dashboardVersion")

    if email is None:
        raise_bad_input_exception("Missing required property 'email'")
    if dashboard_version is None:
        raise_bad_input_exception("Missing required property 'dashboardVersion'")

    telemetry_id = None

    try:
        response = await Querier.get_instance().send_get_request(
            NormalisedURLPath("/telemetry"),
            None,
            _user_context,
        )
        if "exists" in response and response["exists"] and "telemetryId" in response:
            telemetry_id = response["telemetryId"]

        number_of_users = await Supertokens.get_instance().get_user_count(
            include_recipe_ids=None
        )

    except Exception as __:
        # If either telemetry id API or user count fetch fails, no event should be sent
        return AnalyticsResponse()

    apiDomain, websiteDomain, appName = (
        api_options.app_info.api_domain,
        api_options.app_info.get_origin(api_options.request, {}),
        api_options.app_info.app_name,
    )

    data = {
        "websiteDomain": websiteDomain.get_as_string_dangerous(),
        "apiDomain": apiDomain.get_as_string_dangerous(),
        "appName": appName,
        "sdk": "python",
        "sdkVersion": SDKVersion,
        "numberOfUsers": number_of_users,
        "email": email,
        "dashboardVersion": dashboard_version,
    }

    if telemetry_id is not None:
        data["telemetryId"] = telemetry_id

    try:
        async with AsyncClient() as client:
            await client.post(  # type: ignore
                url=TELEMETRY_SUPERTOKENS_API_URL,
                json=data,
                headers={"api-version": TELEMETRY_SUPERTOKENS_API_VERSION},
            )
    except Exception as __:
        # If telemetry event fails, no error should be thrown
        pass

    return AnalyticsResponse()
async def handle_dashboard_api(api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Optional[BaseResponse]
Expand source code
async def handle_dashboard_api(
    api_implementation: APIInterface,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Optional[BaseResponse]:
    if api_implementation.dashboard_get is None:
        return None

    html_str = await api_implementation.dashboard_get(api_options, user_context)

    api_options.response.set_html_content(html_str)
    return api_options.response
async def handle_email_verify_token_post(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserEmailVerifyTokenPostAPIOkResponseUserEmailVerifyTokenPostAPIEmailAlreadyVerifiedErrorResponse]
Expand source code
async def handle_email_verify_token_post(
    _api_interface: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[
    UserEmailVerifyTokenPostAPIOkResponse,
    UserEmailVerifyTokenPostAPIEmailAlreadyVerifiedErrorResponse,
]:
    request_body: Dict[str, Any] = await api_options.request.json()  # type: ignore
    user_id = request_body.get("userId")

    if user_id is None or not isinstance(user_id, str):
        raise_bad_input_exception(
            "Required parameter 'userId' is missing or has an invalid type"
        )

    res = await send_email_verification_email(
        tenant_id=tenant_id, user_id=user_id, email=None, user_context=user_context
    )

    if isinstance(res, SendEmailVerificationEmailAlreadyVerifiedError):
        return UserEmailVerifyTokenPostAPIEmailAlreadyVerifiedErrorResponse()

    return UserEmailVerifyTokenPostAPIOkResponse()
async def handle_emailpassword_signin_api(_: APIInterface, api_options: APIOptions, _user_context: Dict[str, Any])
Expand source code
async def handle_emailpassword_signin_api(
    _: APIInterface, api_options: APIOptions, _user_context: Dict[str, Any]
):
    body = await api_options.request.json()
    if body is None:
        raise_bad_input_exception("Please send body")
    email = body.get("email")
    password = body.get("password")

    if email is None or not isinstance(email, str):
        raise_bad_input_exception("Missing required parameter 'email'")
    if password is None or not isinstance(password, str):
        raise_bad_input_exception("Missing required parameter 'password'")
    response = await Querier.get_instance().send_post_request(
        NormalisedURLPath("/recipe/dashboard/signin"),
        {"email": email, "password": password},
        user_context=_user_context,
    )

    if "status" in response and response["status"] == "OK":
        return send_200_response(
            {"status": "OK", "sessionId": response["sessionId"]}, api_options.response
        )
    if "status" in response and response["status"] == "INVALID_CREDENTIALS_ERROR":
        return send_200_response(
            {"status": "INVALID_CREDENTIALS_ERROR"},
            api_options.response,
        )
    if "status" in response and response["status"] == "USER_SUSPENDED_ERROR":
        return send_200_response(
            {"status": "USER_SUSPENDED_ERROR", "message": response["message"]},
            api_options.response,
        )
async def handle_emailpassword_signout_api(_: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> SignOutOK
Expand source code
async def handle_emailpassword_signout_api(
    _: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    _user_context: Dict[str, Any],
) -> SignOutOK:
    if api_options.config.auth_mode == "api-key":
        return SignOutOK()
    session_id_form_auth_header = api_options.request.get_header("authorization")
    if not session_id_form_auth_header:
        return raise_bad_input_exception(
            "Neither 'API Key' nor 'Authorization' header was found"
        )
    session_id_form_auth_header = session_id_form_auth_header.split()[1]
    await Querier.get_instance().send_delete_request(
        NormalisedURLPath("/recipe/dashboard/session"),
        {"sessionId": session_id_form_auth_header},
        user_context=_user_context,
    )
    return SignOutOK()
async def handle_get_tags(_: APIInterface, _tenant_id: str, __: APIOptions, _user_context: Dict[str, Any]) ‑> SearchTagsOK
Expand source code
async def handle_get_tags(
    _: APIInterface, _tenant_id: str, __: APIOptions, _user_context: Dict[str, Any]
) -> SearchTagsOK:
    response = await Querier.get_instance().send_get_request(
        NormalisedURLPath("/user/search/tags"), None, _user_context
    )
    return SearchTagsOK(tags=response["tags"])
async def handle_list_tenants_api(_api_implementation: APIInterface, _tenant_id: str, _api_options: APIOptions, user_context: Dict[str, Any]) ‑> APIResponse
Expand source code
async def handle_list_tenants_api(
    _api_implementation: APIInterface,
    _tenant_id: str,
    _api_options: APIOptions,
    user_context: Dict[str, Any],
) -> APIResponse:
    tenants = await list_all_tenants(user_context)

    final_tenants: List[DashboardListTenantItem] = []

    for current_tenant in tenants.tenants:
        dashboard_tenant = DashboardListTenantItem(
            tenant_id=current_tenant.tenant_id,
            emailpassword=current_tenant.emailpassword,
            passwordless=current_tenant.passwordless,
            third_party=current_tenant.third_party,
        )
        final_tenants.append(dashboard_tenant)

    return DashboardListTenantsGetResponse(final_tenants)
async def handle_metadata_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserMetadataGetAPIOkResponseFeatureNotEnabledError]
Expand source code
async def handle_metadata_get(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[UserMetadataGetAPIOkResponse, FeatureNotEnabledError]:
    user_id = api_options.request.get_query_param("userId")

    if user_id is None:
        raise_bad_input_exception("Missing required parameter 'userId'")

    try:
        UserMetadataRecipe.get_instance()
    except Exception:
        return FeatureNotEnabledError()

    metadata_response = await get_user_metadata(user_id, user_context=user_context)
    return UserMetadataGetAPIOkResponse(metadata_response.metadata)
async def handle_metadata_put(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> UserMetadataPutAPIResponse
Expand source code
async def handle_metadata_put(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> UserMetadataPutAPIResponse:
    request_body: Dict[str, Any] = await api_options.request.json()  # type: ignore
    user_id = request_body.get("userId")
    data = request_body.get("data")

    # This is to throw an error early in case the recipe has not been initialised
    UserMetadataRecipe.get_instance()

    if user_id is None or isinstance(user_id, str) is False:
        raise_bad_input_exception(
            "Required parameter 'userId' is missing or has an invalid type"
        )

    if data is None or isinstance(data, str) is False:
        raise_bad_input_exception(
            "Required parameter 'data' is missing or has an invalid type"
        )

    parsed_data: Dict[str, Any] = {}
    try:
        parsed_data = json.loads(data)
        if not isinstance(parsed_data, dict):  # type: ignore
            raise Exception()

    except Exception:
        raise_bad_input_exception("'data' must be a valid JSON body")

    # This API is meant to set the user metadata of a user. We delete the existing data
    # before updating it because we want to make sure that shallow merging does not result
    # in the data being incorrect
    #
    # For example if the old data is {test: "test", test2: "test2"} and the user wants to delete
    # test2 from the data simply calling updateUserMetadata with {test: "test"} would not remove
    # test2 because of shallow merging.
    #
    # Removing first ensures that the final data is exactly what the user wanted it to be

    await clear_user_metadata(user_id, user_context)
    await update_user_metadata(user_id, parsed_data, user_context)

    return UserMetadataPutAPIResponse()
async def handle_sessions_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> UserSessionsGetAPIResponse
Expand source code
async def handle_sessions_get(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> UserSessionsGetAPIResponse:
    user_id = api_options.request.get_query_param("userId")

    if user_id is None:
        raise_bad_input_exception("Missing required parameter 'userId'")

    # Passing tenant id as None sets fetch_across_all_tenants to True
    # which is what we want here.
    session_handles = await get_all_session_handles_for_user(
        user_id, None, user_context
    )
    sessions: List[Optional[SessionInfo]] = [None for _ in session_handles]

    async def call_(i: int, session_handle: str):
        try:
            session_response = await get_session_information(
                session_handle, user_context
            )
            if session_response is not None:
                sessions[i] = SessionInfo(session_response)
        except Exception:
            sessions[i] = None

    session_info_promises = [
        call_(i, handle) for i, handle in enumerate(session_handles)
    ]

    await asyncio.gather(*session_info_promises)

    return UserSessionsGetAPIResponse([s for s in sessions if s is not None])
async def handle_user_delete(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> UserDeleteAPIResponse
Expand source code
async def handle_user_delete(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    _user_context: Dict[str, Any],
) -> UserDeleteAPIResponse:
    user_id = api_options.request.get_query_param("userId")

    if user_id is None:
        raise_bad_input_exception("Missing required parameter 'userId'")

    await Supertokens.get_instance().delete_user(user_id, _user_context)

    return UserDeleteAPIResponse()
async def handle_user_email_verify_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserEmailVerifyGetAPIResponseFeatureNotEnabledError]
Expand source code
async def handle_user_email_verify_get(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[UserEmailVerifyGetAPIResponse, FeatureNotEnabledError]:
    req = api_options.request
    user_id = req.get_query_param("userId")

    if user_id is None:
        raise_bad_input_exception("Missing required parameter 'userId'")

    try:
        EmailVerificationRecipe.get_instance()
    except Exception:
        return FeatureNotEnabledError()

    is_verified = await is_email_verified(user_id, user_context=user_context)
    return UserEmailVerifyGetAPIResponse(is_verified)
async def handle_user_email_verify_put(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> UserEmailVerifyPutAPIResponse
Expand source code
async def handle_user_email_verify_put(
    _api_interface: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> UserEmailVerifyPutAPIResponse:
    request_body: Dict[str, Any] = await api_options.request.json()  # type: ignore
    user_id = request_body.get("userId")
    verified = request_body.get("verified")

    if user_id is None or not isinstance(user_id, str):
        raise_bad_input_exception(
            "Required parameter 'userId' is missing or has an invalid type"
        )

    if verified is None or not isinstance(verified, bool):
        raise_bad_input_exception(
            "Required parameter 'verified' is missing or has an invalid type"
        )

    if verified:
        token_response = await create_email_verification_token(
            tenant_id=tenant_id, user_id=user_id, email=None, user_context=user_context
        )

        if isinstance(
            token_response, CreateEmailVerificationTokenEmailAlreadyVerifiedError
        ):
            return UserEmailVerifyPutAPIResponse()

        verify_response = await verify_email_using_token(
            tenant_id=tenant_id, token=token_response.token, user_context=user_context
        )

        if isinstance(verify_response, VerifyEmailUsingTokenInvalidTokenError):
            # This should never happen because we consume the token immediately after creating it
            raise Exception("Should not come here")

    else:
        await unverify_email(user_id, user_context=user_context)

    return UserEmailVerifyPutAPIResponse()
async def handle_user_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> Union[UserGetAPINoUserFoundErrorUserGetAPIOkResponseUserGetAPIRecipeNotInitialisedError]
Expand source code
async def handle_user_get(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    _user_context: Dict[str, Any],
) -> Union[
    UserGetAPINoUserFoundError,
    UserGetAPIOkResponse,
    UserGetAPIRecipeNotInitialisedError,
]:
    user_id = api_options.request.get_query_param("userId")
    recipe_id = api_options.request.get_query_param("recipeId")

    if user_id is None:
        raise_bad_input_exception("Missing required parameter 'userId'")

    if recipe_id is None:
        raise_bad_input_exception("Missing required parameter 'recipeId'")

    if not is_valid_recipe_id(recipe_id):
        raise_bad_input_exception("Invalid recipe id")

    if not is_recipe_initialised(recipe_id):
        return UserGetAPIRecipeNotInitialisedError()

    user_response = await get_user_for_recipe_id(user_id, recipe_id)
    if user_response is None:
        return UserGetAPINoUserFoundError()

    user = user_response.user

    try:
        UserMetadataRecipe.get_instance()
    except Exception:
        user.first_name = "FEATURE_NOT_ENABLED"
        user.last_name = "FEATURE_NOT_ENABLED"

        return UserGetAPIOkResponse(recipe_id, user)

    user_metadata = await get_user_metadata(user_id, user_context=_user_context)
    first_name = user_metadata.metadata.get("first_name", "")
    last_name = user_metadata.metadata.get("last_name", "")

    user.first_name = first_name
    user.last_name = last_name

    return UserGetAPIOkResponse(recipe_id, user)
async def handle_user_password_put(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserPasswordPutAPIResponseUserPasswordPutAPIInvalidPasswordErrorResponse]
Expand source code
async def handle_user_password_put(
    _api_interface: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[UserPasswordPutAPIResponse, UserPasswordPutAPIInvalidPasswordErrorResponse]:
    request_body: Dict[str, Any] = await api_options.request.json()  # type: ignore
    user_id = request_body.get("userId")
    new_password = request_body.get("newPassword")

    if user_id is None or not isinstance(user_id, str):
        raise_bad_input_exception("Missing required parameter 'userId'")

    if new_password is None or not isinstance(new_password, str):
        raise_bad_input_exception("Missing required parameter 'newPassword'")

    recipe_to_use: Union[
        Literal["emailpassword", "thirdpartyemailpassword"], None
    ] = None

    try:
        EmailPasswordRecipe.get_instance()
        recipe_to_use = "emailpassword"
    except Exception:
        pass

    if recipe_to_use is None:
        try:
            ThirdPartyEmailPasswordRecipe.get_instance()
            recipe_to_use = "thirdpartyemailpassword"
        except Exception:
            pass

    if recipe_to_use is None:
        raise Exception("Should not come here")

    async def reset_password(
        form_fields: List[NormalisedFormField],
        create_reset_password_token: Callable[
            [str, str, Dict[str, Any]],
            Awaitable[
                Union[CreateResetPasswordOkResult, CreateResetPasswordWrongUserIdError]
            ],
        ],
        reset_password_using_token: Callable[
            [str, str, str, Dict[str, Any]],
            Awaitable[
                Union[
                    ResetPasswordUsingTokenOkResult,
                    ResetPasswordUsingTokenInvalidTokenError,
                ]
            ],
        ],
    ) -> Union[
        UserPasswordPutAPIResponse, UserPasswordPutAPIInvalidPasswordErrorResponse
    ]:
        password_form_field = [
            field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID
        ][0]

        password_validation_error = await password_form_field.validate(
            new_password, tenant_id
        )

        if password_validation_error is not None:
            return UserPasswordPutAPIInvalidPasswordErrorResponse(
                password_validation_error
            )

        password_reset_token = await create_reset_password_token(
            tenant_id, user_id, user_context
        )

        if isinstance(password_reset_token, CreateResetPasswordWrongUserIdError):
            # Techincally it can but its an edge case so we assume that it wont
            # UNKNOWN_USER_ID_ERROR
            raise Exception("Should never come here")

        password_reset_response = await reset_password_using_token(
            tenant_id, password_reset_token.token, new_password, user_context
        )

        if isinstance(
            password_reset_response, ResetPasswordUsingTokenInvalidTokenError
        ):
            # RESET_PASSWORD_INVALID_TOKEN_ERROR
            raise Exception("Should not come here")

        return UserPasswordPutAPIResponse()

    if recipe_to_use == "emailpassword":
        return await reset_password(
            EmailPasswordRecipe.get_instance().config.sign_up_feature.form_fields,
            ep_create_reset_password_token,
            ep_reset_password_using_token,
        )

    if recipe_to_use == "thirdpartyemailpassword":
        return await reset_password(
            ThirdPartyEmailPasswordRecipe.get_instance().email_password_recipe.config.sign_up_feature.form_fields,
            tpep_create_reset_password_token,
            tpep_reset_password_using_token,
        )
async def handle_user_put(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserPutAPIOkResponseUserPutAPIInvalidEmailErrorResponseUserPutAPIEmailAlreadyExistsErrorResponseUserPutAPIInvalidPhoneErrorResponseUserPutPhoneAlreadyExistsAPIResponse]
Expand source code
async def handle_user_put(
    _api_interface: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> Union[
    UserPutAPIOkResponse,
    UserPutAPIInvalidEmailErrorResponse,
    UserPutAPIEmailAlreadyExistsErrorResponse,
    UserPutAPIInvalidPhoneErrorResponse,
    UserPutPhoneAlreadyExistsAPIResponse,
]:
    request_body: Dict[str, Any] = await api_options.request.json()  # type: ignore
    user_id: Optional[str] = request_body.get("userId")
    recipe_id: Optional[str] = request_body.get("recipeId")
    first_name: Optional[str] = request_body.get("firstName")
    last_name: Optional[str] = request_body.get("lastName")
    email: Optional[str] = request_body.get("email")
    phone: Optional[str] = request_body.get("phone")

    if not isinstance(user_id, str):
        return raise_bad_input_exception(
            "Required parameter 'userId' is missing or has an invalid type"
        )

    if not isinstance(recipe_id, str):
        return raise_bad_input_exception(
            "Required parameter 'recipeId' is missing or has an invalid type"
        )

    if not is_valid_recipe_id(recipe_id):
        raise_bad_input_exception("Invalid recipe id")

    if first_name is None and not isinstance(first_name, str):
        raise_bad_input_exception(
            "Required parameter 'firstName' is missing or has an invalid type"
        )

    if last_name is None and not isinstance(last_name, str):
        raise_bad_input_exception(
            "Required parameter 'lastName' is missing or has an invalid type"
        )

    if email is None and not isinstance(email, str):
        raise_bad_input_exception(
            "Required parameter 'email' is missing or has an invalid type"
        )

    if phone is None and not isinstance(phone, str):
        raise_bad_input_exception(
            "Required parameter 'phone' is missing or has an invalid type"
        )

    user_response = await get_user_for_recipe_id(user_id, recipe_id)

    if user_response is None:
        raise Exception("Should never come here")

    first_name = first_name.strip()
    last_name = last_name.strip()
    email = email.strip()
    phone = phone.strip()

    if first_name != "" or last_name != "":
        is_recipe_initialized = False

        try:
            UserMetadataRecipe.get_instance()
            is_recipe_initialized = True
        except Exception:
            pass

        if is_recipe_initialized:
            metadata_update = {}

            if first_name != "":
                metadata_update["first_name"] = first_name

            if last_name != "":
                metadata_update["last_name"] = last_name

            await update_user_metadata(user_id, metadata_update, user_context)

    if email != "":
        email_update_response = await update_email_for_recipe_id(
            user_response.recipe, user_id, email, tenant_id, user_context
        )

        if not isinstance(email_update_response, UserPutAPIOkResponse):
            return email_update_response

    if phone != "":
        phone_update_response = await update_phone_for_recipe_id(
            user_response.recipe, user_id, phone, tenant_id, user_context
        )

        if not isinstance(phone_update_response, UserPutAPIOkResponse):
            return phone_update_response

    return UserPutAPIOkResponse()
async def handle_user_sessions_post(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> UserSessionsPostAPIResponse
Expand source code
async def handle_user_sessions_post(
    _api_interface: APIInterface,
    _tenant_id: str,
    api_options: APIOptions,
    _user_context: Dict[str, Any],
) -> UserSessionsPostAPIResponse:
    request_body = await api_options.request.json()  # type: ignore
    session_handles: Optional[List[str]] = request_body.get("sessionHandles")  # type: ignore

    if not isinstance(session_handles, list):
        raise_bad_input_exception(
            "Required parameter 'sessionHandles' is missing or has an invalid type"
        )

    await revoke_multiple_sessions(session_handles, _user_context)
    return UserSessionsPostAPIResponse()
async def handle_users_count_get_api(_: APIInterface, tenant_id: str, _api_options: APIOptions, _user_context: Dict[str, Any]) ‑> UserCountGetAPIResponse
Expand source code
async def handle_users_count_get_api(
    _: APIInterface,
    tenant_id: str,
    _api_options: APIOptions,
    _user_context: Dict[str, Any],
) -> UserCountGetAPIResponse:
    count = await Supertokens.get_instance().get_user_count(
        None,
        tenant_id,
    )
    return UserCountGetAPIResponse(count=count)
async def handle_users_get_api(api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> APIResponse
Expand source code
async def handle_users_get_api(
    api_implementation: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
) -> APIResponse:
    _ = api_implementation

    limit = api_options.request.get_query_param("limit")
    if limit is None:
        raise_bad_input_exception("Missing required parameter 'limit'")

    time_joined_order: Literal["ASC", "DESC"] = api_options.request.get_query_param(  # type: ignore
        "timeJoinedOrder", "DESC"
    )
    if time_joined_order not in ["ASC", "DESC"]:
        raise_bad_input_exception("Invalid value recieved for 'timeJoinedOrder'")

    pagination_token = api_options.request.get_query_param("paginationToken")

    users_response = await Supertokens.get_instance().get_users(
        tenant_id,
        time_joined_order=time_joined_order,
        limit=int(limit),
        pagination_token=pagination_token,
        include_recipe_ids=None,
        query=api_options.request.get_query_params(),
        user_context=user_context,
    )

    # user metadata bulk fetch with batches:

    try:
        UserMetadataRecipe.get_instance()
    except GeneralError:
        return DashboardUsersGetResponse(
            users_response.users, users_response.next_pagination_token
        )

    users_with_metadata: List[UserWithMetadata] = [
        UserWithMetadata().from_user(user) for user in users_response.users
    ]
    metadata_fetch_awaitables: List[Awaitable[Any]] = []

    async def get_user_metadata_and_update_user(user_idx: int) -> None:
        user = users_response.users[user_idx]
        user_metadata = await get_user_metadata(user.user_id, user_context)
        first_name = user_metadata.metadata.get("first_name")
        last_name = user_metadata.metadata.get("last_name")

        # None becomes null which is acceptable for the dashboard.
        users_with_metadata[user_idx].first_name = first_name
        users_with_metadata[user_idx].last_name = last_name

    # Batch calls to get user metadata:
    for i, _ in enumerate(users_response.users):
        metadata_fetch_awaitables.append(get_user_metadata_and_update_user(i))

    promise_arr_start_position = 0
    batch_size = 5

    while promise_arr_start_position < len(metadata_fetch_awaitables):
        # We want to query only 5 in parallel at a time
        promises_to_call = [
            metadata_fetch_awaitables[i]
            for i in range(
                promise_arr_start_position,
                min(
                    promise_arr_start_position + batch_size,
                    len(metadata_fetch_awaitables),
                ),
            )
        ]
        await asyncio.gather(*promises_to_call)

        promise_arr_start_position += batch_size

    return DashboardUsersGetResponse(
        users_with_metadata,
        users_response.next_pagination_token,
    )
async def handle_validate_key_api(_api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any])
Expand source code
async def handle_validate_key_api(
    _api_implementation: APIInterface,
    api_options: APIOptions,
    user_context: Dict[str, Any],
):

    is_valid_key = await validate_api_key(
        api_options.request, api_options.config, user_context
    )

    if is_valid_key:
        return send_200_response({"status": "OK"}, api_options.response)
    return send_non_200_response_with_message("Unauthorised", 401, api_options.response)