Module supertokens_python.recipe.thirdparty.api.implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Union
from urllib.parse import urlencode

from httpx import AsyncClient
from supertokens_python.exceptions import raise_general_exception
from supertokens_python.recipe.session.asyncio import create_new_session
from supertokens_python.recipe.thirdparty.interfaces import APIInterface, SignInUpPostOkResponse, \
    AuthorisationUrlGetOkResponse, SignInUpPostNoEmailGivenByProviderResponse, SignInUpPostFieldErrorResponse

if TYPE_CHECKING:
    from supertokens_python.recipe.thirdparty.interfaces import APIOptions, SignInUpPostResponse, \
        AuthorisationUrlGetResponse
    from supertokens_python.recipe.thirdparty.provider import Provider

DEV_OAUTH_CLIENT_IDS = [
    '1060725074195-kmeum4crr01uirfl2op9kd5acmi9jutn.apps.googleusercontent.com',  # google client id
    '467101b197249757c71f'  # github client id
]
DEV_KEY_IDENTIFIER = "4398792-"
DEV_OAUTH_AUTHORIZATION_URL = 'https://supertokens.io/dev/oauth/redirect-to-provider'
DEV_OAUTH_REDIRECT_URL = 'https://supertokens.io/dev/oauth/redirect-to-app'


def is_using_oauth_development_client_id(client_id: str):
    return client_id.startswith(DEV_KEY_IDENTIFIER) or client_id in DEV_OAUTH_CLIENT_IDS


def get_actual_client_id_from_development_client_id(client_id: str):
    if client_id.startswith(DEV_KEY_IDENTIFIER):
        return client_id.split(DEV_KEY_IDENTIFIER, 1)[1]
    return client_id


class APIImplementation(APIInterface):
    def __init__(self):
        super().__init__()

    async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) -> AuthorisationUrlGetResponse:
        authorisation_url_info = provider.get_authorisation_redirect_api_info()

        params = {}
        for key, value in authorisation_url_info.params.items():
            params[key] = value if not callable(
                value) else value(api_options.request)

        if provider.get_redirect_uri() is not None and not is_using_oauth_development_client_id(provider.client_id):
            # the backend wants to set the redirectURI - so we set that here.
            # we add the not development keys because the oauth provider will
            # redirect to supertokens.io's URL which will redirect the app
            # to the the user's website, which will handle the callback as usual.
            # If we add this, then instead, the supertokens' site will redirect
            # the user to this API layer, which is not needed.
            params['redirect_uri'] = provider.get_redirect_uri()

        auth_url = authorisation_url_info.url
        if is_using_oauth_development_client_id(provider.client_id):
            params['actual_redirect_uri'] = authorisation_url_info.url

            for k, v in params.items():
                if params[k] == provider.client_id:
                    params[k] = get_actual_client_id_from_development_client_id(provider.client_id)
            auth_url = DEV_OAUTH_AUTHORIZATION_URL

        query_string = urlencode(params)

        url = auth_url + '?' + query_string
        return AuthorisationUrlGetOkResponse(url)

    async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str, client_id: Union[str, None],
                              auth_code_response: Union[str, None], api_options: APIOptions) -> SignInUpPostResponse:
        if is_using_oauth_development_client_id(provider.client_id):
            redirect_uri = DEV_OAUTH_REDIRECT_URL
        elif provider.get_redirect_uri() is not None:
            # we overwrite the redirectURI provided by the frontend
            # since the backend wants to take charge of setting this.
            redirect_uri = provider.get_redirect_uri()
        try:
            if auth_code_response is None:
                access_token_api_info = provider.get_access_token_api_info(
                    redirect_uri, code)
                if is_using_oauth_development_client_id(provider.client_id):
                    for k, v in access_token_api_info.params.items():
                        if access_token_api_info.params[k] == provider.client_id:
                            access_token_api_info.params[k] = get_actual_client_id_from_development_client_id(
                                provider.client_id)
                headers = {
                    'Accept': 'application/json',
                    'Content-Type': 'application/x-www-form-urlencoded'
                }
                async with AsyncClient() as client:
                    access_token_response = await client.post(access_token_api_info.url,
                                                              data=access_token_api_info.params,
                                                              headers=headers)
                    access_token_response = access_token_response.json()
            else:
                access_token_response = auth_code_response
        except Exception as e:
            raise_general_exception(e)

        try:
            user_info = await provider.get_profile_info(access_token_response)
        except Exception as e:
            return SignInUpPostFieldErrorResponse(str(e))
        email = user_info.email.id if user_info.email is not None else None
        email_verified = user_info.email.is_verified if user_info.email is not None else None
        if email is None or email_verified is None:
            return SignInUpPostNoEmailGivenByProviderResponse()

        signinup_response = await api_options.recipe_implementation.sign_in_up(provider.id, user_info.user_id, email,
                                                                               email_verified)
        user = signinup_response.user
        await create_new_session(api_options.request, user.user_id)

        return SignInUpPostOkResponse(
            user, signinup_response.created_new_user, access_token_response)

    async def apple_redirect_handler_post(self, code: str, state: str, api_options: APIOptions):
        app_info = api_options.app_info
        redirect_uri = app_info.website_domain.get_as_string_dangerous() + app_info.website_base_path.get_as_string_dangerous() + '/callback/apple?state=' + state + '&code=' + code
        html_content = '<html><head><script>window.location.replace("' + redirect_uri + '");</script></head></html>'
        api_options.response.set_html_content(html_content)

Functions

def get_actual_client_id_from_development_client_id(client_id: str)
Expand source code
def get_actual_client_id_from_development_client_id(client_id: str):
    if client_id.startswith(DEV_KEY_IDENTIFIER):
        return client_id.split(DEV_KEY_IDENTIFIER, 1)[1]
    return client_id
def is_using_oauth_development_client_id(client_id: str)
Expand source code
def is_using_oauth_development_client_id(client_id: str):
    return client_id.startswith(DEV_KEY_IDENTIFIER) or client_id in DEV_OAUTH_CLIENT_IDS

Classes

class APIImplementation
Expand source code
class APIImplementation(APIInterface):
    def __init__(self):
        super().__init__()

    async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) -> AuthorisationUrlGetResponse:
        authorisation_url_info = provider.get_authorisation_redirect_api_info()

        params = {}
        for key, value in authorisation_url_info.params.items():
            params[key] = value if not callable(
                value) else value(api_options.request)

        if provider.get_redirect_uri() is not None and not is_using_oauth_development_client_id(provider.client_id):
            # the backend wants to set the redirectURI - so we set that here.
            # we add the not development keys because the oauth provider will
            # redirect to supertokens.io's URL which will redirect the app
            # to the the user's website, which will handle the callback as usual.
            # If we add this, then instead, the supertokens' site will redirect
            # the user to this API layer, which is not needed.
            params['redirect_uri'] = provider.get_redirect_uri()

        auth_url = authorisation_url_info.url
        if is_using_oauth_development_client_id(provider.client_id):
            params['actual_redirect_uri'] = authorisation_url_info.url

            for k, v in params.items():
                if params[k] == provider.client_id:
                    params[k] = get_actual_client_id_from_development_client_id(provider.client_id)
            auth_url = DEV_OAUTH_AUTHORIZATION_URL

        query_string = urlencode(params)

        url = auth_url + '?' + query_string
        return AuthorisationUrlGetOkResponse(url)

    async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str, client_id: Union[str, None],
                              auth_code_response: Union[str, None], api_options: APIOptions) -> SignInUpPostResponse:
        if is_using_oauth_development_client_id(provider.client_id):
            redirect_uri = DEV_OAUTH_REDIRECT_URL
        elif provider.get_redirect_uri() is not None:
            # we overwrite the redirectURI provided by the frontend
            # since the backend wants to take charge of setting this.
            redirect_uri = provider.get_redirect_uri()
        try:
            if auth_code_response is None:
                access_token_api_info = provider.get_access_token_api_info(
                    redirect_uri, code)
                if is_using_oauth_development_client_id(provider.client_id):
                    for k, v in access_token_api_info.params.items():
                        if access_token_api_info.params[k] == provider.client_id:
                            access_token_api_info.params[k] = get_actual_client_id_from_development_client_id(
                                provider.client_id)
                headers = {
                    'Accept': 'application/json',
                    'Content-Type': 'application/x-www-form-urlencoded'
                }
                async with AsyncClient() as client:
                    access_token_response = await client.post(access_token_api_info.url,
                                                              data=access_token_api_info.params,
                                                              headers=headers)
                    access_token_response = access_token_response.json()
            else:
                access_token_response = auth_code_response
        except Exception as e:
            raise_general_exception(e)

        try:
            user_info = await provider.get_profile_info(access_token_response)
        except Exception as e:
            return SignInUpPostFieldErrorResponse(str(e))
        email = user_info.email.id if user_info.email is not None else None
        email_verified = user_info.email.is_verified if user_info.email is not None else None
        if email is None or email_verified is None:
            return SignInUpPostNoEmailGivenByProviderResponse()

        signinup_response = await api_options.recipe_implementation.sign_in_up(provider.id, user_info.user_id, email,
                                                                               email_verified)
        user = signinup_response.user
        await create_new_session(api_options.request, user.user_id)

        return SignInUpPostOkResponse(
            user, signinup_response.created_new_user, access_token_response)

    async def apple_redirect_handler_post(self, code: str, state: str, api_options: APIOptions):
        app_info = api_options.app_info
        redirect_uri = app_info.website_domain.get_as_string_dangerous() + app_info.website_base_path.get_as_string_dangerous() + '/callback/apple?state=' + state + '&code=' + code
        html_content = '<html><head><script>window.location.replace("' + redirect_uri + '");</script></head></html>'
        api_options.response.set_html_content(html_content)

Ancestors

Methods

async def apple_redirect_handler_post(self, code: str, state: str, api_options: APIOptions)
Expand source code
async def apple_redirect_handler_post(self, code: str, state: str, api_options: APIOptions):
    app_info = api_options.app_info
    redirect_uri = app_info.website_domain.get_as_string_dangerous() + app_info.website_base_path.get_as_string_dangerous() + '/callback/apple?state=' + state + '&code=' + code
    html_content = '<html><head><script>window.location.replace("' + redirect_uri + '");</script></head></html>'
    api_options.response.set_html_content(html_content)
async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) ‑> AuthorisationUrlGetResponse
Expand source code
async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) -> AuthorisationUrlGetResponse:
    authorisation_url_info = provider.get_authorisation_redirect_api_info()

    params = {}
    for key, value in authorisation_url_info.params.items():
        params[key] = value if not callable(
            value) else value(api_options.request)

    if provider.get_redirect_uri() is not None and not is_using_oauth_development_client_id(provider.client_id):
        # the backend wants to set the redirectURI - so we set that here.
        # we add the not development keys because the oauth provider will
        # redirect to supertokens.io's URL which will redirect the app
        # to the the user's website, which will handle the callback as usual.
        # If we add this, then instead, the supertokens' site will redirect
        # the user to this API layer, which is not needed.
        params['redirect_uri'] = provider.get_redirect_uri()

    auth_url = authorisation_url_info.url
    if is_using_oauth_development_client_id(provider.client_id):
        params['actual_redirect_uri'] = authorisation_url_info.url

        for k, v in params.items():
            if params[k] == provider.client_id:
                params[k] = get_actual_client_id_from_development_client_id(provider.client_id)
        auth_url = DEV_OAUTH_AUTHORIZATION_URL

    query_string = urlencode(params)

    url = auth_url + '?' + query_string
    return AuthorisationUrlGetOkResponse(url)
async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str, client_id: Union[str, None], auth_code_response: Union[str, None], api_options: APIOptions) ‑> SignInUpPostResponse
Expand source code
async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str, client_id: Union[str, None],
                          auth_code_response: Union[str, None], api_options: APIOptions) -> SignInUpPostResponse:
    if is_using_oauth_development_client_id(provider.client_id):
        redirect_uri = DEV_OAUTH_REDIRECT_URL
    elif provider.get_redirect_uri() is not None:
        # we overwrite the redirectURI provided by the frontend
        # since the backend wants to take charge of setting this.
        redirect_uri = provider.get_redirect_uri()
    try:
        if auth_code_response is None:
            access_token_api_info = provider.get_access_token_api_info(
                redirect_uri, code)
            if is_using_oauth_development_client_id(provider.client_id):
                for k, v in access_token_api_info.params.items():
                    if access_token_api_info.params[k] == provider.client_id:
                        access_token_api_info.params[k] = get_actual_client_id_from_development_client_id(
                            provider.client_id)
            headers = {
                'Accept': 'application/json',
                'Content-Type': 'application/x-www-form-urlencoded'
            }
            async with AsyncClient() as client:
                access_token_response = await client.post(access_token_api_info.url,
                                                          data=access_token_api_info.params,
                                                          headers=headers)
                access_token_response = access_token_response.json()
        else:
            access_token_response = auth_code_response
    except Exception as e:
        raise_general_exception(e)

    try:
        user_info = await provider.get_profile_info(access_token_response)
    except Exception as e:
        return SignInUpPostFieldErrorResponse(str(e))
    email = user_info.email.id if user_info.email is not None else None
    email_verified = user_info.email.is_verified if user_info.email is not None else None
    if email is None or email_verified is None:
        return SignInUpPostNoEmailGivenByProviderResponse()

    signinup_response = await api_options.recipe_implementation.sign_in_up(provider.id, user_info.user_id, email,
                                                                           email_verified)
    user = signinup_response.user
    await create_new_session(api_options.request, user.user_id)

    return SignInUpPostOkResponse(
        user, signinup_response.created_new_user, access_token_response)