Module supertokens_python.recipe.thirdparty.api.implementation
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from base64 import b64decode
import json
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from urllib.parse import parse_qs, urlencode, urlparse
from supertokens_python.recipe.accountlinking.types import AccountInfoWithRecipeId
from supertokens_python.recipe.emailverification import EmailVerificationRecipe
from supertokens_python.recipe.emailverification.asyncio import is_email_verified
from supertokens_python.recipe.session import SessionContainer
from supertokens_python.recipe.thirdparty.interfaces import (
APIInterface,
AuthorisationUrlGetOkResult,
SignInUpNotAllowed,
SignInUpOkResult,
SignInUpPostNoEmailGivenByProviderResponse,
SignInUpPostOkResult,
)
from supertokens_python.recipe.thirdparty.provider import RedirectUriInfo
from supertokens_python.recipe.thirdparty.types import ThirdPartyInfo, UserInfoEmail
if TYPE_CHECKING:
from supertokens_python.recipe.thirdparty.interfaces import APIOptions
from supertokens_python.recipe.thirdparty.provider import Provider
from supertokens_python.types import GeneralErrorResponse
class APIImplementation(APIInterface):
async def authorisation_url_get(
self,
provider: Provider,
redirect_uri_on_provider_dashboard: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[AuthorisationUrlGetOkResult, GeneralErrorResponse]:
authorisation_url_info = await provider.get_authorisation_redirect_url(
redirect_uri_on_provider_dashboard=redirect_uri_on_provider_dashboard,
user_context=user_context,
)
return AuthorisationUrlGetOkResult(
url_with_query_params=authorisation_url_info.url_with_query_params,
pkce_code_verifier=authorisation_url_info.pkce_code_verifier,
)
async def sign_in_up_post(
self,
provider: Provider,
redirect_uri_info: Optional[RedirectUriInfo],
oauth_tokens: Optional[Dict[str, Any]],
session: Optional[SessionContainer],
should_try_linking_with_session_user: Union[bool, None],
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[
SignInUpPostOkResult,
SignInUpPostNoEmailGivenByProviderResponse,
SignInUpNotAllowed,
GeneralErrorResponse,
]:
from supertokens_python.auth_utils import (
OkResponse,
PostAuthChecksOkResponse,
SignInNotAllowedResponse,
SignUpNotAllowedResponse,
get_authenticating_user_and_add_to_current_tenant_if_required,
post_auth_checks,
pre_auth_checks,
)
error_code_map = {
"SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_006)",
"SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_004)",
"LINKING_TO_SESSION_USER_FAILED": {
"EMAIL_VERIFICATION_REQUIRED": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_020)",
"RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_021)",
"ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_022)",
"SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_023)",
},
}
oauth_tokens_to_use: Dict[str, Any] = {}
if redirect_uri_info is not None:
oauth_tokens_to_use = await provider.exchange_auth_code_for_oauth_tokens(
redirect_uri_info=redirect_uri_info,
user_context=user_context,
)
elif oauth_tokens is not None:
oauth_tokens_to_use = oauth_tokens
else:
raise Exception("should never come here")
user_info = await provider.get_user_info(
oauth_tokens=oauth_tokens_to_use,
user_context=user_context,
)
if user_info.email is None and provider.config.require_email is False:
# We don't expect to get an email from this provider.
# So we generate a fake one
assert provider.config.generate_fake_email is not None
user_info.email = UserInfoEmail(
email=await provider.config.generate_fake_email(
tenant_id, user_info.third_party_user_id, user_context
),
is_verified=True,
)
email_info = user_info.email
if email_info is None:
return SignInUpPostNoEmailGivenByProviderResponse()
recipe_id = "thirdparty"
async def check_credentials_on_tenant(_: str):
# We essentially did this above when calling exchange_auth_code_for_oauth_tokens
return True
authenticating_user = (
await get_authenticating_user_and_add_to_current_tenant_if_required(
third_party=ThirdPartyInfo(
third_party_user_id=user_info.third_party_user_id,
third_party_id=provider.id,
),
email=None,
phone_number=None,
recipe_id=recipe_id,
user_context=user_context,
session=session,
tenant_id=tenant_id,
check_credentials_on_tenant=check_credentials_on_tenant,
)
)
is_sign_up = authenticating_user is None
if authenticating_user is not None:
# This is a sign in. So before we proceed, we need to check if an email change
# is allowed since the email could have changed from the social provider's side.
# We do this check here and not in the recipe function cause we want to keep the
# recipe function checks to a minimum so that the dev has complete control of
# what they can do.
# The is_email_change_allowed and pre_auth_checks functions take an is_verified boolean.
# Now, even though we already have that from the input, that's just what the provider says.
# If the provider says that the email is NOT verified, it could have been that the email
# is verified on the user's account via supertokens on a previous sign in / up.
# So we just check that as well before calling is_email_change_allowed
assert authenticating_user.login_method is not None
recipe_user_id = authenticating_user.login_method.recipe_user_id
if (
not email_info.is_verified
and EmailVerificationRecipe.get_instance_optional() is not None
):
email_info.is_verified = await is_email_verified(
recipe_user_id,
email_info.id,
user_context,
)
pre_auth_checks_result = await pre_auth_checks(
authenticating_account_info=AccountInfoWithRecipeId(
recipe_id=recipe_id,
email=email_info.id,
third_party=ThirdPartyInfo(
third_party_user_id=user_info.third_party_user_id,
third_party_id=provider.id,
),
),
authenticating_user=(
authenticating_user.user if authenticating_user else None
),
factor_ids=["thirdparty"],
is_sign_up=is_sign_up,
is_verified=email_info.is_verified,
sign_in_verifies_login_method=email_info.is_verified,
skip_session_user_update_in_core=False,
tenant_id=tenant_id,
user_context=user_context,
session=session,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
if not isinstance(pre_auth_checks_result, OkResponse):
if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse):
reason = error_code_map["SIGN_UP_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpNotAllowed(reason)
if isinstance(pre_auth_checks_result, SignInNotAllowedResponse):
reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpNotAllowed(reason)
reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
assert isinstance(reason_dict, Dict)
reason = reason_dict[pre_auth_checks_result.reason]
return SignInUpNotAllowed(reason=reason)
signinup_response = await api_options.recipe_implementation.sign_in_up(
third_party_id=provider.id,
third_party_user_id=user_info.third_party_user_id,
email=email_info.id,
is_verified=email_info.is_verified,
oauth_tokens=oauth_tokens_to_use,
raw_user_info_from_provider=user_info.raw_user_info_from_provider,
session=session,
tenant_id=tenant_id,
user_context=user_context,
should_try_linking_with_session_user=should_try_linking_with_session_user,
)
if isinstance(signinup_response, SignInUpNotAllowed):
return signinup_response
if not isinstance(signinup_response, SignInUpOkResult):
reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"]
assert isinstance(reason_dict, Dict)
reason = reason_dict[signinup_response.reason]
return SignInUpNotAllowed(reason=reason)
post_auth_checks_result = await post_auth_checks(
factor_id="thirdparty",
is_sign_up=is_sign_up,
authenticated_user=signinup_response.user,
recipe_user_id=signinup_response.recipe_user_id,
request=api_options.request,
tenant_id=tenant_id,
user_context=user_context,
session=session,
)
if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse):
reason = error_code_map["SIGN_IN_NOT_ALLOWED"]
assert isinstance(reason, str)
return SignInUpNotAllowed(reason)
return SignInUpPostOkResult(
created_new_recipe_user=signinup_response.created_new_recipe_user,
user=post_auth_checks_result.user,
session=post_auth_checks_result.session,
oauth_tokens=oauth_tokens_to_use,
raw_user_info_from_provider=user_info.raw_user_info_from_provider,
)
async def apple_redirect_handler_post(
self,
form_post_info: Dict[str, Any],
api_options: APIOptions,
user_context: Dict[str, Any],
):
state_in_b64: str = form_post_info["state"]
state = b64decode(state_in_b64).decode("utf-8")
state_obj = json.loads(state)
redirect_uri: str = state_obj["frontendRedirectURI"]
url_obj = urlparse(redirect_uri)
qparams = parse_qs(url_obj.query)
for k, v in form_post_info.items():
qparams[k] = [v]
redirect_uri = url_obj._replace(query=urlencode(qparams, doseq=True)).geturl()
api_options.response.set_header("Location", redirect_uri)
api_options.response.set_status_code(303)
api_options.response.set_html_content("")
Classes
class APIImplementation
-
Expand source code
class APIImplementation(APIInterface): async def authorisation_url_get( self, provider: Provider, redirect_uri_on_provider_dashboard: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[AuthorisationUrlGetOkResult, GeneralErrorResponse]: authorisation_url_info = await provider.get_authorisation_redirect_url( redirect_uri_on_provider_dashboard=redirect_uri_on_provider_dashboard, user_context=user_context, ) return AuthorisationUrlGetOkResult( url_with_query_params=authorisation_url_info.url_with_query_params, pkce_code_verifier=authorisation_url_info.pkce_code_verifier, ) async def sign_in_up_post( self, provider: Provider, redirect_uri_info: Optional[RedirectUriInfo], oauth_tokens: Optional[Dict[str, Any]], session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ SignInUpPostOkResult, SignInUpPostNoEmailGivenByProviderResponse, SignInUpNotAllowed, GeneralErrorResponse, ]: from supertokens_python.auth_utils import ( OkResponse, PostAuthChecksOkResponse, SignInNotAllowedResponse, SignUpNotAllowedResponse, get_authenticating_user_and_add_to_current_tenant_if_required, post_auth_checks, pre_auth_checks, ) error_code_map = { "SIGN_UP_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_006)", "SIGN_IN_NOT_ALLOWED": "Cannot sign in / up due to security reasons. Please try a different login method or contact support. (ERR_CODE_004)", "LINKING_TO_SESSION_USER_FAILED": { "EMAIL_VERIFICATION_REQUIRED": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_020)", "RECIPE_USER_ID_ALREADY_LINKED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_021)", "ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_022)", "SESSION_USER_ACCOUNT_INFO_ALREADY_ASSOCIATED_WITH_ANOTHER_PRIMARY_USER_ID_ERROR": "Cannot sign in / up due to security reasons. Please contact support. (ERR_CODE_023)", }, } oauth_tokens_to_use: Dict[str, Any] = {} if redirect_uri_info is not None: oauth_tokens_to_use = await provider.exchange_auth_code_for_oauth_tokens( redirect_uri_info=redirect_uri_info, user_context=user_context, ) elif oauth_tokens is not None: oauth_tokens_to_use = oauth_tokens else: raise Exception("should never come here") user_info = await provider.get_user_info( oauth_tokens=oauth_tokens_to_use, user_context=user_context, ) if user_info.email is None and provider.config.require_email is False: # We don't expect to get an email from this provider. # So we generate a fake one assert provider.config.generate_fake_email is not None user_info.email = UserInfoEmail( email=await provider.config.generate_fake_email( tenant_id, user_info.third_party_user_id, user_context ), is_verified=True, ) email_info = user_info.email if email_info is None: return SignInUpPostNoEmailGivenByProviderResponse() recipe_id = "thirdparty" async def check_credentials_on_tenant(_: str): # We essentially did this above when calling exchange_auth_code_for_oauth_tokens return True authenticating_user = ( await get_authenticating_user_and_add_to_current_tenant_if_required( third_party=ThirdPartyInfo( third_party_user_id=user_info.third_party_user_id, third_party_id=provider.id, ), email=None, phone_number=None, recipe_id=recipe_id, user_context=user_context, session=session, tenant_id=tenant_id, check_credentials_on_tenant=check_credentials_on_tenant, ) ) is_sign_up = authenticating_user is None if authenticating_user is not None: # This is a sign in. So before we proceed, we need to check if an email change # is allowed since the email could have changed from the social provider's side. # We do this check here and not in the recipe function cause we want to keep the # recipe function checks to a minimum so that the dev has complete control of # what they can do. # The is_email_change_allowed and pre_auth_checks functions take an is_verified boolean. # Now, even though we already have that from the input, that's just what the provider says. # If the provider says that the email is NOT verified, it could have been that the email # is verified on the user's account via supertokens on a previous sign in / up. # So we just check that as well before calling is_email_change_allowed assert authenticating_user.login_method is not None recipe_user_id = authenticating_user.login_method.recipe_user_id if ( not email_info.is_verified and EmailVerificationRecipe.get_instance_optional() is not None ): email_info.is_verified = await is_email_verified( recipe_user_id, email_info.id, user_context, ) pre_auth_checks_result = await pre_auth_checks( authenticating_account_info=AccountInfoWithRecipeId( recipe_id=recipe_id, email=email_info.id, third_party=ThirdPartyInfo( third_party_user_id=user_info.third_party_user_id, third_party_id=provider.id, ), ), authenticating_user=( authenticating_user.user if authenticating_user else None ), factor_ids=["thirdparty"], is_sign_up=is_sign_up, is_verified=email_info.is_verified, sign_in_verifies_login_method=email_info.is_verified, skip_session_user_update_in_core=False, tenant_id=tenant_id, user_context=user_context, session=session, should_try_linking_with_session_user=should_try_linking_with_session_user, ) if not isinstance(pre_auth_checks_result, OkResponse): if isinstance(pre_auth_checks_result, SignUpNotAllowedResponse): reason = error_code_map["SIGN_UP_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpNotAllowed(reason) if isinstance(pre_auth_checks_result, SignInNotAllowedResponse): reason = error_code_map["SIGN_IN_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpNotAllowed(reason) reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"] assert isinstance(reason_dict, Dict) reason = reason_dict[pre_auth_checks_result.reason] return SignInUpNotAllowed(reason=reason) signinup_response = await api_options.recipe_implementation.sign_in_up( third_party_id=provider.id, third_party_user_id=user_info.third_party_user_id, email=email_info.id, is_verified=email_info.is_verified, oauth_tokens=oauth_tokens_to_use, raw_user_info_from_provider=user_info.raw_user_info_from_provider, session=session, tenant_id=tenant_id, user_context=user_context, should_try_linking_with_session_user=should_try_linking_with_session_user, ) if isinstance(signinup_response, SignInUpNotAllowed): return signinup_response if not isinstance(signinup_response, SignInUpOkResult): reason_dict = error_code_map["LINKING_TO_SESSION_USER_FAILED"] assert isinstance(reason_dict, Dict) reason = reason_dict[signinup_response.reason] return SignInUpNotAllowed(reason=reason) post_auth_checks_result = await post_auth_checks( factor_id="thirdparty", is_sign_up=is_sign_up, authenticated_user=signinup_response.user, recipe_user_id=signinup_response.recipe_user_id, request=api_options.request, tenant_id=tenant_id, user_context=user_context, session=session, ) if not isinstance(post_auth_checks_result, PostAuthChecksOkResponse): reason = error_code_map["SIGN_IN_NOT_ALLOWED"] assert isinstance(reason, str) return SignInUpNotAllowed(reason) return SignInUpPostOkResult( created_new_recipe_user=signinup_response.created_new_recipe_user, user=post_auth_checks_result.user, session=post_auth_checks_result.session, oauth_tokens=oauth_tokens_to_use, raw_user_info_from_provider=user_info.raw_user_info_from_provider, ) async def apple_redirect_handler_post( self, form_post_info: Dict[str, Any], api_options: APIOptions, user_context: Dict[str, Any], ): state_in_b64: str = form_post_info["state"] state = b64decode(state_in_b64).decode("utf-8") state_obj = json.loads(state) redirect_uri: str = state_obj["frontendRedirectURI"] url_obj = urlparse(redirect_uri) qparams = parse_qs(url_obj.query) for k, v in form_post_info.items(): qparams[k] = [v] redirect_uri = url_obj._replace(query=urlencode(qparams, doseq=True)).geturl() api_options.response.set_header("Location", redirect_uri) api_options.response.set_status_code(303) api_options.response.set_html_content("")
Ancestors
Methods
async def apple_redirect_handler_post(self, form_post_info: Dict[str, Any], api_options: APIOptions, user_context: Dict[str, Any])
async def sign_in_up_post(self, provider: Provider, redirect_uri_info: Optional[RedirectUriInfo], oauth_tokens: Optional[Dict[str, Any]], session: Optional[SessionContainer], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any])