Module supertokens_python.recipe.dashboard.api
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from .analytics import handle_analytics_post
from .api_key_protector import api_key_protector
from .dashboard import handle_dashboard_api
from .search.getTags import handle_get_tags
from .signin import handle_emailpassword_signin_api
from .signout import handle_emailpassword_signout_api
from .userdetails.user_delete import handle_user_delete
from .userdetails.user_email_verify_get import handle_user_email_verify_get
from .userdetails.user_email_verify_put import handle_user_email_verify_put
from .userdetails.user_email_verify_token_post import handle_email_verify_token_post
from .userdetails.user_get import handle_user_get
from .userdetails.user_metadata_get import handle_metadata_get
from .userdetails.user_metadata_put import handle_metadata_put
from .userdetails.user_password_put import handle_user_password_put
from .userdetails.user_put import handle_user_put
from .userdetails.user_sessions_get import handle_sessions_get
from .userdetails.user_sessions_post import handle_user_sessions_post
from .users_count_get import handle_users_count_get_api
from .users_get import handle_users_get_api
from .validate_key import handle_validate_key_api
from .list_tenants import handle_list_tenants_api
__all__ = [
"handle_dashboard_api",
"api_key_protector",
"handle_users_count_get_api",
"handle_users_get_api",
"handle_validate_key_api",
"handle_user_email_verify_get",
"handle_user_get",
"handle_metadata_get",
"handle_sessions_get",
"handle_user_delete",
"handle_user_put",
"handle_user_email_verify_put",
"handle_metadata_put",
"handle_user_sessions_post",
"handle_user_password_put",
"handle_email_verify_token_post",
"handle_emailpassword_signin_api",
"handle_emailpassword_signout_api",
"handle_get_tags",
"handle_analytics_post",
"handle_list_tenants_api",
]
Sub-modules
supertokens_python.recipe.dashboard.api.analytics
supertokens_python.recipe.dashboard.api.dashboard
supertokens_python.recipe.dashboard.api.implementation
supertokens_python.recipe.dashboard.api.list_tenants
supertokens_python.recipe.dashboard.api.search
supertokens_python.recipe.dashboard.api.signin
supertokens_python.recipe.dashboard.api.signout
supertokens_python.recipe.dashboard.api.userdetails
supertokens_python.recipe.dashboard.api.users_count_get
supertokens_python.recipe.dashboard.api.users_get
supertokens_python.recipe.dashboard.api.validate_key
Functions
async def api_key_protector(api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, api_function: Callable[[APIInterface, str, APIOptions, Dict[str, Any]], Awaitable[APIResponse]], user_context: Dict[str, Any]) ‑> Optional[BaseResponse]
-
Expand source code
async def api_key_protector( api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, api_function: Callable[ [APIInterface, str, APIOptions, Dict[str, Any]], Awaitable[APIResponse] ], user_context: Dict[str, Any], ) -> Optional[BaseResponse]: should_allow_access = False try: should_allow_access = ( await api_options.recipe_implementation.should_allow_access( api_options.request, api_options.config, user_context ) ) except DashboardOperationNotAllowedError as _: return send_non_200_response_with_message( "You are not permitted to perform this operation", 403, api_options.response, ) if should_allow_access is False: return send_non_200_response_with_message( "Unauthorised access", 401, api_options.response ) response = await api_function( api_implementation, tenant_id, api_options, user_context ) return send_200_response(response.to_json(), api_options.response)
async def handle_analytics_post(_: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> AnalyticsResponse
-
Expand source code
async def handle_analytics_post( _: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any], ) -> AnalyticsResponse: if not Supertokens.get_instance().telemetry: return AnalyticsResponse() body = await api_options.request.json() if body is None: raise_bad_input_exception("Please send body") email = body.get("email") dashboard_version = body.get("dashboardVersion") if email is None: raise_bad_input_exception("Missing required property 'email'") if dashboard_version is None: raise_bad_input_exception("Missing required property 'dashboardVersion'") telemetry_id = None try: response = await Querier.get_instance().send_get_request( NormalisedURLPath("/telemetry"), None, _user_context, ) if "exists" in response and response["exists"] and "telemetryId" in response: telemetry_id = response["telemetryId"] number_of_users = await Supertokens.get_instance().get_user_count( include_recipe_ids=None ) except Exception as __: # If either telemetry id API or user count fetch fails, no event should be sent return AnalyticsResponse() apiDomain, websiteDomain, appName = ( api_options.app_info.api_domain, api_options.app_info.get_origin(api_options.request, {}), api_options.app_info.app_name, ) data = { "websiteDomain": websiteDomain.get_as_string_dangerous(), "apiDomain": apiDomain.get_as_string_dangerous(), "appName": appName, "sdk": "python", "sdkVersion": SDKVersion, "numberOfUsers": number_of_users, "email": email, "dashboardVersion": dashboard_version, } if telemetry_id is not None: data["telemetryId"] = telemetry_id try: async with AsyncClient() as client: await client.post( # type: ignore url=TELEMETRY_SUPERTOKENS_API_URL, json=data, headers={"api-version": TELEMETRY_SUPERTOKENS_API_VERSION}, ) except Exception as __: # If telemetry event fails, no error should be thrown pass return AnalyticsResponse()
async def handle_dashboard_api(api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Optional[BaseResponse]
-
Expand source code
async def handle_dashboard_api( api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any], ) -> Optional[BaseResponse]: if api_implementation.dashboard_get is None: return None html_str = await api_implementation.dashboard_get(api_options, user_context) api_options.response.set_html_content(html_str) return api_options.response
async def handle_email_verify_token_post(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserEmailVerifyTokenPostAPIOkResponse, UserEmailVerifyTokenPostAPIEmailAlreadyVerifiedErrorResponse]
-
Expand source code
async def handle_email_verify_token_post( _api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ UserEmailVerifyTokenPostAPIOkResponse, UserEmailVerifyTokenPostAPIEmailAlreadyVerifiedErrorResponse, ]: request_body: Dict[str, Any] = await api_options.request.json() # type: ignore user_id = request_body.get("userId") if user_id is None or not isinstance(user_id, str): raise_bad_input_exception( "Required parameter 'userId' is missing or has an invalid type" ) res = await send_email_verification_email( tenant_id=tenant_id, user_id=user_id, email=None, user_context=user_context ) if isinstance(res, SendEmailVerificationEmailAlreadyVerifiedError): return UserEmailVerifyTokenPostAPIEmailAlreadyVerifiedErrorResponse() return UserEmailVerifyTokenPostAPIOkResponse()
async def handle_emailpassword_signin_api(_: APIInterface, api_options: APIOptions, _user_context: Dict[str, Any])
-
Expand source code
async def handle_emailpassword_signin_api( _: APIInterface, api_options: APIOptions, _user_context: Dict[str, Any] ): body = await api_options.request.json() if body is None: raise_bad_input_exception("Please send body") email = body.get("email") password = body.get("password") if email is None or not isinstance(email, str): raise_bad_input_exception("Missing required parameter 'email'") if password is None or not isinstance(password, str): raise_bad_input_exception("Missing required parameter 'password'") response = await Querier.get_instance().send_post_request( NormalisedURLPath("/recipe/dashboard/signin"), {"email": email, "password": password}, user_context=_user_context, ) if "status" in response and response["status"] == "OK": return send_200_response( {"status": "OK", "sessionId": response["sessionId"]}, api_options.response ) if "status" in response and response["status"] == "INVALID_CREDENTIALS_ERROR": return send_200_response( {"status": "INVALID_CREDENTIALS_ERROR"}, api_options.response, ) if "status" in response and response["status"] == "USER_SUSPENDED_ERROR": return send_200_response( {"status": "USER_SUSPENDED_ERROR", "message": response["message"]}, api_options.response, )
async def handle_emailpassword_signout_api(_: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> SignOutOK
-
Expand source code
async def handle_emailpassword_signout_api( _: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any], ) -> SignOutOK: if api_options.config.auth_mode == "api-key": return SignOutOK() session_id_form_auth_header = api_options.request.get_header("authorization") if not session_id_form_auth_header: return raise_bad_input_exception( "Neither 'API Key' nor 'Authorization' header was found" ) session_id_form_auth_header = session_id_form_auth_header.split()[1] await Querier.get_instance().send_delete_request( NormalisedURLPath("/recipe/dashboard/session"), {"sessionId": session_id_form_auth_header}, user_context=_user_context, ) return SignOutOK()
-
Expand source code
async def handle_get_tags( _: APIInterface, _tenant_id: str, __: APIOptions, _user_context: Dict[str, Any] ) -> SearchTagsOK: response = await Querier.get_instance().send_get_request( NormalisedURLPath("/user/search/tags"), None, _user_context ) return SearchTagsOK(tags=response["tags"])
async def handle_list_tenants_api(_api_implementation: APIInterface, _tenant_id: str, _api_options: APIOptions, user_context: Dict[str, Any]) ‑> APIResponse
-
Expand source code
async def handle_list_tenants_api( _api_implementation: APIInterface, _tenant_id: str, _api_options: APIOptions, user_context: Dict[str, Any], ) -> APIResponse: tenants = await list_all_tenants(user_context) final_tenants: List[DashboardListTenantItem] = [] for current_tenant in tenants.tenants: dashboard_tenant = DashboardListTenantItem( tenant_id=current_tenant.tenant_id, emailpassword=current_tenant.emailpassword, passwordless=current_tenant.passwordless, third_party=current_tenant.third_party, ) final_tenants.append(dashboard_tenant) return DashboardListTenantsGetResponse(final_tenants)
async def handle_metadata_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserMetadataGetAPIOkResponse, FeatureNotEnabledError]
-
Expand source code
async def handle_metadata_get( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[UserMetadataGetAPIOkResponse, FeatureNotEnabledError]: user_id = api_options.request.get_query_param("userId") if user_id is None: raise_bad_input_exception("Missing required parameter 'userId'") try: UserMetadataRecipe.get_instance() except Exception: return FeatureNotEnabledError() metadata_response = await get_user_metadata(user_id, user_context=user_context) return UserMetadataGetAPIOkResponse(metadata_response.metadata)
async def handle_metadata_put(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> UserMetadataPutAPIResponse
-
Expand source code
async def handle_metadata_put( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> UserMetadataPutAPIResponse: request_body: Dict[str, Any] = await api_options.request.json() # type: ignore user_id = request_body.get("userId") data = request_body.get("data") # This is to throw an error early in case the recipe has not been initialised UserMetadataRecipe.get_instance() if user_id is None or isinstance(user_id, str) is False: raise_bad_input_exception( "Required parameter 'userId' is missing or has an invalid type" ) if data is None or isinstance(data, str) is False: raise_bad_input_exception( "Required parameter 'data' is missing or has an invalid type" ) parsed_data: Dict[str, Any] = {} try: parsed_data = json.loads(data) if not isinstance(parsed_data, dict): # type: ignore raise Exception() except Exception: raise_bad_input_exception("'data' must be a valid JSON body") # This API is meant to set the user metadata of a user. We delete the existing data # before updating it because we want to make sure that shallow merging does not result # in the data being incorrect # # For example if the old data is {test: "test", test2: "test2"} and the user wants to delete # test2 from the data simply calling updateUserMetadata with {test: "test"} would not remove # test2 because of shallow merging. # # Removing first ensures that the final data is exactly what the user wanted it to be await clear_user_metadata(user_id, user_context) await update_user_metadata(user_id, parsed_data, user_context) return UserMetadataPutAPIResponse()
async def handle_sessions_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> UserSessionsGetAPIResponse
-
Expand source code
async def handle_sessions_get( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> UserSessionsGetAPIResponse: user_id = api_options.request.get_query_param("userId") if user_id is None: raise_bad_input_exception("Missing required parameter 'userId'") # Passing tenant id as None sets fetch_across_all_tenants to True # which is what we want here. session_handles = await get_all_session_handles_for_user( user_id, None, user_context ) sessions: List[Optional[SessionInfo]] = [None for _ in session_handles] async def call_(i: int, session_handle: str): try: session_response = await get_session_information( session_handle, user_context ) if session_response is not None: sessions[i] = SessionInfo(session_response) except Exception: sessions[i] = None session_info_promises = [ call_(i, handle) for i, handle in enumerate(session_handles) ] await asyncio.gather(*session_info_promises) return UserSessionsGetAPIResponse([s for s in sessions if s is not None])
async def handle_user_delete(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> UserDeleteAPIResponse
-
Expand source code
async def handle_user_delete( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any], ) -> UserDeleteAPIResponse: user_id = api_options.request.get_query_param("userId") if user_id is None: raise_bad_input_exception("Missing required parameter 'userId'") await Supertokens.get_instance().delete_user(user_id, _user_context) return UserDeleteAPIResponse()
async def handle_user_email_verify_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserEmailVerifyGetAPIResponse, FeatureNotEnabledError]
-
Expand source code
async def handle_user_email_verify_get( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[UserEmailVerifyGetAPIResponse, FeatureNotEnabledError]: req = api_options.request user_id = req.get_query_param("userId") if user_id is None: raise_bad_input_exception("Missing required parameter 'userId'") try: EmailVerificationRecipe.get_instance() except Exception: return FeatureNotEnabledError() is_verified = await is_email_verified(user_id, user_context=user_context) return UserEmailVerifyGetAPIResponse(is_verified)
async def handle_user_email_verify_put(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> UserEmailVerifyPutAPIResponse
-
Expand source code
async def handle_user_email_verify_put( _api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> UserEmailVerifyPutAPIResponse: request_body: Dict[str, Any] = await api_options.request.json() # type: ignore user_id = request_body.get("userId") verified = request_body.get("verified") if user_id is None or not isinstance(user_id, str): raise_bad_input_exception( "Required parameter 'userId' is missing or has an invalid type" ) if verified is None or not isinstance(verified, bool): raise_bad_input_exception( "Required parameter 'verified' is missing or has an invalid type" ) if verified: token_response = await create_email_verification_token( tenant_id=tenant_id, user_id=user_id, email=None, user_context=user_context ) if isinstance( token_response, CreateEmailVerificationTokenEmailAlreadyVerifiedError ): return UserEmailVerifyPutAPIResponse() verify_response = await verify_email_using_token( tenant_id=tenant_id, token=token_response.token, user_context=user_context ) if isinstance(verify_response, VerifyEmailUsingTokenInvalidTokenError): # This should never happen because we consume the token immediately after creating it raise Exception("Should not come here") else: await unverify_email(user_id, user_context=user_context) return UserEmailVerifyPutAPIResponse()
async def handle_user_get(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> Union[UserGetAPINoUserFoundError, UserGetAPIOkResponse, UserGetAPIRecipeNotInitialisedError]
-
Expand source code
async def handle_user_get( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any], ) -> Union[ UserGetAPINoUserFoundError, UserGetAPIOkResponse, UserGetAPIRecipeNotInitialisedError, ]: user_id = api_options.request.get_query_param("userId") recipe_id = api_options.request.get_query_param("recipeId") if user_id is None: raise_bad_input_exception("Missing required parameter 'userId'") if recipe_id is None: raise_bad_input_exception("Missing required parameter 'recipeId'") if not is_valid_recipe_id(recipe_id): raise_bad_input_exception("Invalid recipe id") if not is_recipe_initialised(recipe_id): return UserGetAPIRecipeNotInitialisedError() user_response = await get_user_for_recipe_id(user_id, recipe_id) if user_response is None: return UserGetAPINoUserFoundError() user = user_response.user try: UserMetadataRecipe.get_instance() except Exception: user.first_name = "FEATURE_NOT_ENABLED" user.last_name = "FEATURE_NOT_ENABLED" return UserGetAPIOkResponse(recipe_id, user) user_metadata = await get_user_metadata(user_id, user_context=_user_context) first_name = user_metadata.metadata.get("first_name", "") last_name = user_metadata.metadata.get("last_name", "") user.first_name = first_name user.last_name = last_name return UserGetAPIOkResponse(recipe_id, user)
async def handle_user_password_put(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserPasswordPutAPIResponse, UserPasswordPutAPIInvalidPasswordErrorResponse]
-
Expand source code
async def handle_user_password_put( _api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[UserPasswordPutAPIResponse, UserPasswordPutAPIInvalidPasswordErrorResponse]: request_body: Dict[str, Any] = await api_options.request.json() # type: ignore user_id = request_body.get("userId") new_password = request_body.get("newPassword") if user_id is None or not isinstance(user_id, str): raise_bad_input_exception("Missing required parameter 'userId'") if new_password is None or not isinstance(new_password, str): raise_bad_input_exception("Missing required parameter 'newPassword'") recipe_to_use: Union[ Literal["emailpassword", "thirdpartyemailpassword"], None ] = None try: EmailPasswordRecipe.get_instance() recipe_to_use = "emailpassword" except Exception: pass if recipe_to_use is None: try: ThirdPartyEmailPasswordRecipe.get_instance() recipe_to_use = "thirdpartyemailpassword" except Exception: pass if recipe_to_use is None: raise Exception("Should not come here") async def reset_password( form_fields: List[NormalisedFormField], create_reset_password_token: Callable[ [str, str, Dict[str, Any]], Awaitable[ Union[CreateResetPasswordOkResult, CreateResetPasswordWrongUserIdError] ], ], reset_password_using_token: Callable[ [str, str, str, Dict[str, Any]], Awaitable[ Union[ ResetPasswordUsingTokenOkResult, ResetPasswordUsingTokenInvalidTokenError, ] ], ], ) -> Union[ UserPasswordPutAPIResponse, UserPasswordPutAPIInvalidPasswordErrorResponse ]: password_form_field = [ field for field in form_fields if field.id == FORM_FIELD_PASSWORD_ID ][0] password_validation_error = await password_form_field.validate( new_password, tenant_id ) if password_validation_error is not None: return UserPasswordPutAPIInvalidPasswordErrorResponse( password_validation_error ) password_reset_token = await create_reset_password_token( tenant_id, user_id, user_context ) if isinstance(password_reset_token, CreateResetPasswordWrongUserIdError): # Techincally it can but its an edge case so we assume that it wont # UNKNOWN_USER_ID_ERROR raise Exception("Should never come here") password_reset_response = await reset_password_using_token( tenant_id, password_reset_token.token, new_password, user_context ) if isinstance( password_reset_response, ResetPasswordUsingTokenInvalidTokenError ): # RESET_PASSWORD_INVALID_TOKEN_ERROR raise Exception("Should not come here") return UserPasswordPutAPIResponse() if recipe_to_use == "emailpassword": return await reset_password( EmailPasswordRecipe.get_instance().config.sign_up_feature.form_fields, ep_create_reset_password_token, ep_reset_password_using_token, ) if recipe_to_use == "thirdpartyemailpassword": return await reset_password( ThirdPartyEmailPasswordRecipe.get_instance().email_password_recipe.config.sign_up_feature.form_fields, tpep_create_reset_password_token, tpep_reset_password_using_token, )
async def handle_user_put(_api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[UserPutAPIOkResponse, UserPutAPIInvalidEmailErrorResponse, UserPutAPIEmailAlreadyExistsErrorResponse, UserPutAPIInvalidPhoneErrorResponse, UserPutPhoneAlreadyExistsAPIResponse]
-
Expand source code
async def handle_user_put( _api_interface: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ UserPutAPIOkResponse, UserPutAPIInvalidEmailErrorResponse, UserPutAPIEmailAlreadyExistsErrorResponse, UserPutAPIInvalidPhoneErrorResponse, UserPutPhoneAlreadyExistsAPIResponse, ]: request_body: Dict[str, Any] = await api_options.request.json() # type: ignore user_id: Optional[str] = request_body.get("userId") recipe_id: Optional[str] = request_body.get("recipeId") first_name: Optional[str] = request_body.get("firstName") last_name: Optional[str] = request_body.get("lastName") email: Optional[str] = request_body.get("email") phone: Optional[str] = request_body.get("phone") if not isinstance(user_id, str): return raise_bad_input_exception( "Required parameter 'userId' is missing or has an invalid type" ) if not isinstance(recipe_id, str): return raise_bad_input_exception( "Required parameter 'recipeId' is missing or has an invalid type" ) if not is_valid_recipe_id(recipe_id): raise_bad_input_exception("Invalid recipe id") if first_name is None and not isinstance(first_name, str): raise_bad_input_exception( "Required parameter 'firstName' is missing or has an invalid type" ) if last_name is None and not isinstance(last_name, str): raise_bad_input_exception( "Required parameter 'lastName' is missing or has an invalid type" ) if email is None and not isinstance(email, str): raise_bad_input_exception( "Required parameter 'email' is missing or has an invalid type" ) if phone is None and not isinstance(phone, str): raise_bad_input_exception( "Required parameter 'phone' is missing or has an invalid type" ) user_response = await get_user_for_recipe_id(user_id, recipe_id) if user_response is None: raise Exception("Should never come here") first_name = first_name.strip() last_name = last_name.strip() email = email.strip() phone = phone.strip() if first_name != "" or last_name != "": is_recipe_initialized = False try: UserMetadataRecipe.get_instance() is_recipe_initialized = True except Exception: pass if is_recipe_initialized: metadata_update = {} if first_name != "": metadata_update["first_name"] = first_name if last_name != "": metadata_update["last_name"] = last_name await update_user_metadata(user_id, metadata_update, user_context) if email != "": email_update_response = await update_email_for_recipe_id( user_response.recipe, user_id, email, tenant_id, user_context ) if not isinstance(email_update_response, UserPutAPIOkResponse): return email_update_response if phone != "": phone_update_response = await update_phone_for_recipe_id( user_response.recipe, user_id, phone, tenant_id, user_context ) if not isinstance(phone_update_response, UserPutAPIOkResponse): return phone_update_response return UserPutAPIOkResponse()
async def handle_user_sessions_post(_api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any]) ‑> UserSessionsPostAPIResponse
-
Expand source code
async def handle_user_sessions_post( _api_interface: APIInterface, _tenant_id: str, api_options: APIOptions, _user_context: Dict[str, Any], ) -> UserSessionsPostAPIResponse: request_body = await api_options.request.json() # type: ignore session_handles: Optional[List[str]] = request_body.get("sessionHandles") # type: ignore if not isinstance(session_handles, list): raise_bad_input_exception( "Required parameter 'sessionHandles' is missing or has an invalid type" ) await revoke_multiple_sessions(session_handles, _user_context) return UserSessionsPostAPIResponse()
async def handle_users_count_get_api(_: APIInterface, tenant_id: str, _api_options: APIOptions, _user_context: Dict[str, Any]) ‑> UserCountGetAPIResponse
-
Expand source code
async def handle_users_count_get_api( _: APIInterface, tenant_id: str, _api_options: APIOptions, _user_context: Dict[str, Any], ) -> UserCountGetAPIResponse: count = await Supertokens.get_instance().get_user_count( None, tenant_id, ) return UserCountGetAPIResponse(count=count)
async def handle_users_get_api(api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> APIResponse
-
Expand source code
async def handle_users_get_api( api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> APIResponse: _ = api_implementation limit = api_options.request.get_query_param("limit") if limit is None: raise_bad_input_exception("Missing required parameter 'limit'") time_joined_order: Literal["ASC", "DESC"] = api_options.request.get_query_param( # type: ignore "timeJoinedOrder", "DESC" ) if time_joined_order not in ["ASC", "DESC"]: raise_bad_input_exception("Invalid value recieved for 'timeJoinedOrder'") pagination_token = api_options.request.get_query_param("paginationToken") users_response = await Supertokens.get_instance().get_users( tenant_id, time_joined_order=time_joined_order, limit=int(limit), pagination_token=pagination_token, include_recipe_ids=None, query=api_options.request.get_query_params(), user_context=user_context, ) # user metadata bulk fetch with batches: try: UserMetadataRecipe.get_instance() except GeneralError: return DashboardUsersGetResponse( users_response.users, users_response.next_pagination_token ) users_with_metadata: List[UserWithMetadata] = [ UserWithMetadata().from_user(user) for user in users_response.users ] metadata_fetch_awaitables: List[Awaitable[Any]] = [] async def get_user_metadata_and_update_user(user_idx: int) -> None: user = users_response.users[user_idx] user_metadata = await get_user_metadata(user.user_id, user_context) first_name = user_metadata.metadata.get("first_name") last_name = user_metadata.metadata.get("last_name") # None becomes null which is acceptable for the dashboard. users_with_metadata[user_idx].first_name = first_name users_with_metadata[user_idx].last_name = last_name # Batch calls to get user metadata: for i, _ in enumerate(users_response.users): metadata_fetch_awaitables.append(get_user_metadata_and_update_user(i)) promise_arr_start_position = 0 batch_size = 5 while promise_arr_start_position < len(metadata_fetch_awaitables): # We want to query only 5 in parallel at a time promises_to_call = [ metadata_fetch_awaitables[i] for i in range( promise_arr_start_position, min( promise_arr_start_position + batch_size, len(metadata_fetch_awaitables), ), ) ] await asyncio.gather(*promises_to_call) promise_arr_start_position += batch_size return DashboardUsersGetResponse( users_with_metadata, users_response.next_pagination_token, )
async def handle_validate_key_api(_api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any])
-
Expand source code
async def handle_validate_key_api( _api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any], ): is_valid_key = await validate_api_key( api_options.request, api_options.config, user_context ) if is_valid_key: return send_200_response({"status": "OK"}, api_options.response) return send_non_200_response_with_message("Unauthorised", 401, api_options.response)