Module supertokens_python.recipe.thirdparty.api.implementation
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from base64 import b64decode
import json
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from urllib.parse import parse_qs, urlencode, urlparse
from supertokens_python.recipe.emailverification import EmailVerificationRecipe
from supertokens_python.recipe.emailverification.interfaces import (
CreateEmailVerificationTokenOkResult,
)
from supertokens_python.recipe.session.asyncio import create_new_session
from supertokens_python.recipe.thirdparty.interfaces import (
APIInterface,
AuthorisationUrlGetOkResult,
SignInUpPostNoEmailGivenByProviderResponse,
SignInUpPostOkResult,
)
from supertokens_python.recipe.thirdparty.provider import RedirectUriInfo
from supertokens_python.recipe.thirdparty.types import UserInfoEmail
if TYPE_CHECKING:
from supertokens_python.recipe.thirdparty.interfaces import APIOptions
from supertokens_python.recipe.thirdparty.provider import Provider
from supertokens_python.types import GeneralErrorResponse
class APIImplementation(APIInterface):
async def authorisation_url_get(
self,
provider: Provider,
redirect_uri_on_provider_dashboard: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[AuthorisationUrlGetOkResult, GeneralErrorResponse]:
authorisation_url_info = await provider.get_authorisation_redirect_url(
redirect_uri_on_provider_dashboard=redirect_uri_on_provider_dashboard,
user_context=user_context,
)
return AuthorisationUrlGetOkResult(
url_with_query_params=authorisation_url_info.url_with_query_params,
pkce_code_verifier=authorisation_url_info.pkce_code_verifier,
)
async def sign_in_up_post(
self,
provider: Provider,
redirect_uri_info: Optional[RedirectUriInfo],
oauth_tokens: Optional[Dict[str, Any]],
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[
SignInUpPostOkResult,
SignInUpPostNoEmailGivenByProviderResponse,
GeneralErrorResponse,
]:
oauth_tokens_to_use: Dict[str, Any] = {}
if redirect_uri_info is not None:
oauth_tokens_to_use = await provider.exchange_auth_code_for_oauth_tokens(
redirect_uri_info=redirect_uri_info,
user_context=user_context,
)
else:
oauth_tokens_to_use = oauth_tokens # type: ignore
user_info = await provider.get_user_info(
oauth_tokens=oauth_tokens_to_use,
user_context=user_context,
)
if user_info.email is None and not provider.config.require_email:
if provider.config.generate_fake_email is not None:
user_info.email = UserInfoEmail(
email=await provider.config.generate_fake_email(
user_info.third_party_user_id, tenant_id, user_context
),
is_verified=True,
)
email = user_info.email.id if user_info.email is not None else None
email_verified = (
user_info.email.is_verified if user_info.email is not None else None
)
if email is None:
return SignInUpPostNoEmailGivenByProviderResponse()
signinup_response = await api_options.recipe_implementation.sign_in_up(
third_party_id=provider.id,
third_party_user_id=user_info.third_party_user_id,
email=email,
oauth_tokens=oauth_tokens_to_use,
raw_user_info_from_provider=user_info.raw_user_info_from_provider,
tenant_id=tenant_id,
user_context=user_context,
)
if email_verified:
ev_instance = EmailVerificationRecipe.get_instance_optional()
if ev_instance is not None:
token_response = await ev_instance.recipe_implementation.create_email_verification_token(
tenant_id=tenant_id,
user_id=signinup_response.user.user_id,
email=signinup_response.user.email,
user_context=user_context,
)
if isinstance(token_response, CreateEmailVerificationTokenOkResult):
await ev_instance.recipe_implementation.verify_email_using_token(
token=token_response.token,
tenant_id=tenant_id,
user_context=user_context,
)
user = signinup_response.user
session = await create_new_session(
request=api_options.request,
tenant_id=tenant_id,
user_id=user.user_id,
user_context=user_context,
)
return SignInUpPostOkResult(
created_new_user=signinup_response.created_new_user,
user=user,
session=session,
oauth_tokens=oauth_tokens_to_use,
raw_user_info_from_provider=user_info.raw_user_info_from_provider,
)
async def apple_redirect_handler_post(
self,
form_post_info: Dict[str, Any],
api_options: APIOptions,
user_context: Dict[str, Any],
):
state_in_b64: str = form_post_info["state"]
state = b64decode(state_in_b64).decode("utf-8")
state_obj = json.loads(state)
redirect_uri: str = state_obj["frontendRedirectURI"]
url_obj = urlparse(redirect_uri)
qparams = parse_qs(url_obj.query)
for k, v in form_post_info.items():
qparams[k] = [v]
redirect_uri = url_obj._replace(query=urlencode(qparams, doseq=True)).geturl()
api_options.response.set_header("Location", redirect_uri)
api_options.response.set_status_code(303)
api_options.response.set_html_content("")
Classes
class APIImplementation
-
Expand source code
class APIImplementation(APIInterface): async def authorisation_url_get( self, provider: Provider, redirect_uri_on_provider_dashboard: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[AuthorisationUrlGetOkResult, GeneralErrorResponse]: authorisation_url_info = await provider.get_authorisation_redirect_url( redirect_uri_on_provider_dashboard=redirect_uri_on_provider_dashboard, user_context=user_context, ) return AuthorisationUrlGetOkResult( url_with_query_params=authorisation_url_info.url_with_query_params, pkce_code_verifier=authorisation_url_info.pkce_code_verifier, ) async def sign_in_up_post( self, provider: Provider, redirect_uri_info: Optional[RedirectUriInfo], oauth_tokens: Optional[Dict[str, Any]], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ SignInUpPostOkResult, SignInUpPostNoEmailGivenByProviderResponse, GeneralErrorResponse, ]: oauth_tokens_to_use: Dict[str, Any] = {} if redirect_uri_info is not None: oauth_tokens_to_use = await provider.exchange_auth_code_for_oauth_tokens( redirect_uri_info=redirect_uri_info, user_context=user_context, ) else: oauth_tokens_to_use = oauth_tokens # type: ignore user_info = await provider.get_user_info( oauth_tokens=oauth_tokens_to_use, user_context=user_context, ) if user_info.email is None and not provider.config.require_email: if provider.config.generate_fake_email is not None: user_info.email = UserInfoEmail( email=await provider.config.generate_fake_email( user_info.third_party_user_id, tenant_id, user_context ), is_verified=True, ) email = user_info.email.id if user_info.email is not None else None email_verified = ( user_info.email.is_verified if user_info.email is not None else None ) if email is None: return SignInUpPostNoEmailGivenByProviderResponse() signinup_response = await api_options.recipe_implementation.sign_in_up( third_party_id=provider.id, third_party_user_id=user_info.third_party_user_id, email=email, oauth_tokens=oauth_tokens_to_use, raw_user_info_from_provider=user_info.raw_user_info_from_provider, tenant_id=tenant_id, user_context=user_context, ) if email_verified: ev_instance = EmailVerificationRecipe.get_instance_optional() if ev_instance is not None: token_response = await ev_instance.recipe_implementation.create_email_verification_token( tenant_id=tenant_id, user_id=signinup_response.user.user_id, email=signinup_response.user.email, user_context=user_context, ) if isinstance(token_response, CreateEmailVerificationTokenOkResult): await ev_instance.recipe_implementation.verify_email_using_token( token=token_response.token, tenant_id=tenant_id, user_context=user_context, ) user = signinup_response.user session = await create_new_session( request=api_options.request, tenant_id=tenant_id, user_id=user.user_id, user_context=user_context, ) return SignInUpPostOkResult( created_new_user=signinup_response.created_new_user, user=user, session=session, oauth_tokens=oauth_tokens_to_use, raw_user_info_from_provider=user_info.raw_user_info_from_provider, ) async def apple_redirect_handler_post( self, form_post_info: Dict[str, Any], api_options: APIOptions, user_context: Dict[str, Any], ): state_in_b64: str = form_post_info["state"] state = b64decode(state_in_b64).decode("utf-8") state_obj = json.loads(state) redirect_uri: str = state_obj["frontendRedirectURI"] url_obj = urlparse(redirect_uri) qparams = parse_qs(url_obj.query) for k, v in form_post_info.items(): qparams[k] = [v] redirect_uri = url_obj._replace(query=urlencode(qparams, doseq=True)).geturl() api_options.response.set_header("Location", redirect_uri) api_options.response.set_status_code(303) api_options.response.set_html_content("")
Ancestors
Methods
async def apple_redirect_handler_post(self, form_post_info: Dict[str, Any], api_options: APIOptions, user_context: Dict[str, Any])
-
Expand source code
async def apple_redirect_handler_post( self, form_post_info: Dict[str, Any], api_options: APIOptions, user_context: Dict[str, Any], ): state_in_b64: str = form_post_info["state"] state = b64decode(state_in_b64).decode("utf-8") state_obj = json.loads(state) redirect_uri: str = state_obj["frontendRedirectURI"] url_obj = urlparse(redirect_uri) qparams = parse_qs(url_obj.query) for k, v in form_post_info.items(): qparams[k] = [v] redirect_uri = url_obj._replace(query=urlencode(qparams, doseq=True)).geturl() api_options.response.set_header("Location", redirect_uri) api_options.response.set_status_code(303) api_options.response.set_html_content("")
-
Expand source code
async def authorisation_url_get( self, provider: Provider, redirect_uri_on_provider_dashboard: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[AuthorisationUrlGetOkResult, GeneralErrorResponse]: authorisation_url_info = await provider.get_authorisation_redirect_url( redirect_uri_on_provider_dashboard=redirect_uri_on_provider_dashboard, user_context=user_context, ) return AuthorisationUrlGetOkResult( url_with_query_params=authorisation_url_info.url_with_query_params, pkce_code_verifier=authorisation_url_info.pkce_code_verifier, )
async def sign_in_up_post(self, provider: Provider, redirect_uri_info: Optional[RedirectUriInfo], oauth_tokens: Optional[Dict[str, Any]], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[SignInUpPostOkResult, SignInUpPostNoEmailGivenByProviderResponse, GeneralErrorResponse]
-
Expand source code
async def sign_in_up_post( self, provider: Provider, redirect_uri_info: Optional[RedirectUriInfo], oauth_tokens: Optional[Dict[str, Any]], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[ SignInUpPostOkResult, SignInUpPostNoEmailGivenByProviderResponse, GeneralErrorResponse, ]: oauth_tokens_to_use: Dict[str, Any] = {} if redirect_uri_info is not None: oauth_tokens_to_use = await provider.exchange_auth_code_for_oauth_tokens( redirect_uri_info=redirect_uri_info, user_context=user_context, ) else: oauth_tokens_to_use = oauth_tokens # type: ignore user_info = await provider.get_user_info( oauth_tokens=oauth_tokens_to_use, user_context=user_context, ) if user_info.email is None and not provider.config.require_email: if provider.config.generate_fake_email is not None: user_info.email = UserInfoEmail( email=await provider.config.generate_fake_email( user_info.third_party_user_id, tenant_id, user_context ), is_verified=True, ) email = user_info.email.id if user_info.email is not None else None email_verified = ( user_info.email.is_verified if user_info.email is not None else None ) if email is None: return SignInUpPostNoEmailGivenByProviderResponse() signinup_response = await api_options.recipe_implementation.sign_in_up( third_party_id=provider.id, third_party_user_id=user_info.third_party_user_id, email=email, oauth_tokens=oauth_tokens_to_use, raw_user_info_from_provider=user_info.raw_user_info_from_provider, tenant_id=tenant_id, user_context=user_context, ) if email_verified: ev_instance = EmailVerificationRecipe.get_instance_optional() if ev_instance is not None: token_response = await ev_instance.recipe_implementation.create_email_verification_token( tenant_id=tenant_id, user_id=signinup_response.user.user_id, email=signinup_response.user.email, user_context=user_context, ) if isinstance(token_response, CreateEmailVerificationTokenOkResult): await ev_instance.recipe_implementation.verify_email_using_token( token=token_response.token, tenant_id=tenant_id, user_context=user_context, ) user = signinup_response.user session = await create_new_session( request=api_options.request, tenant_id=tenant_id, user_id=user.user_id, user_context=user_context, ) return SignInUpPostOkResult( created_new_user=signinup_response.created_new_user, user=user, session=session, oauth_tokens=oauth_tokens_to_use, raw_user_info_from_provider=user_info.raw_user_info_from_provider, )