Module supertokens_python.recipe.thirdparty.api.implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING
from urllib.parse import urlencode

from httpx import AsyncClient
from supertokens_python.recipe.thirdparty.constants import DEV_OAUTH_AUTHORIZATION_URL, DEV_OAUTH_REDIRECT_URL
from supertokens_python.recipe.thirdparty.utils import is_using_oauth_development_keys
from supertokens_python.exceptions import raise_general_exception
from supertokens_python.recipe.session.asyncio import create_new_session
from supertokens_python.recipe.thirdparty.interfaces import APIInterface, SignInUpPostOkResponse, \
    AuthorisationUrlGetOkResponse, SignInUpPostNoEmailGivenByProviderResponse

if TYPE_CHECKING:
    from supertokens_python.recipe.thirdparty.interfaces import APIOptions, SignInUpPostResponse, \
        AuthorisationUrlGetResponse
    from supertokens_python.recipe.thirdparty.provider import Provider


class APIImplementation(APIInterface):
    def __init__(self):
        super().__init__()

    async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) -> AuthorisationUrlGetResponse:
        authorisation_url_info = provider.get_authorisation_redirect_api_info()

        params = {}
        for key, value in authorisation_url_info.params.items():
            params[key] = value if not callable(
                value) else value(api_options.request)

        auth_url = authorisation_url_info.url
        if is_using_oauth_development_keys(provider.client_id):
            params['actual_redirect_uri'] = authorisation_url_info.url
            auth_url = DEV_OAUTH_AUTHORIZATION_URL

        query_string = urlencode(params)

        url = auth_url + '?' + query_string
        return AuthorisationUrlGetOkResponse(url)

    async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str,
                              api_options: APIOptions) -> SignInUpPostResponse:
        if is_using_oauth_development_keys(provider.client_id):
            redirect_uri = DEV_OAUTH_REDIRECT_URL
        try:
            access_token_api_info = provider.get_access_token_api_info(
                redirect_uri, code)
            headers = {
                'Accept': 'application/json',
                'Content-Type': 'application/x-www-form-urlencoded'
            }
            async with AsyncClient() as client:
                access_token_response = await client.post(access_token_api_info.url, data=access_token_api_info.params,
                                                          headers=headers)
                access_token_response = access_token_response.json()
                user_info = await provider.get_profile_info(access_token_response)
        except Exception as e:
            raise_general_exception(e)

        email = user_info.email.id if user_info.email is not None else None
        email_verified = user_info.email.is_verified if user_info.email is not None else None
        if email is None or email_verified is None:
            return SignInUpPostNoEmailGivenByProviderResponse()

        signinup_response = await api_options.recipe_implementation.sign_in_up(provider.id, user_info.user_id, email,
                                                                               email_verified)
        user = signinup_response.user
        await create_new_session(api_options.request, user.user_id)

        return SignInUpPostOkResponse(
            user, signinup_response.created_new_user, access_token_response)

Classes

class APIImplementation
Expand source code
class APIImplementation(APIInterface):
    def __init__(self):
        super().__init__()

    async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) -> AuthorisationUrlGetResponse:
        authorisation_url_info = provider.get_authorisation_redirect_api_info()

        params = {}
        for key, value in authorisation_url_info.params.items():
            params[key] = value if not callable(
                value) else value(api_options.request)

        auth_url = authorisation_url_info.url
        if is_using_oauth_development_keys(provider.client_id):
            params['actual_redirect_uri'] = authorisation_url_info.url
            auth_url = DEV_OAUTH_AUTHORIZATION_URL

        query_string = urlencode(params)

        url = auth_url + '?' + query_string
        return AuthorisationUrlGetOkResponse(url)

    async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str,
                              api_options: APIOptions) -> SignInUpPostResponse:
        if is_using_oauth_development_keys(provider.client_id):
            redirect_uri = DEV_OAUTH_REDIRECT_URL
        try:
            access_token_api_info = provider.get_access_token_api_info(
                redirect_uri, code)
            headers = {
                'Accept': 'application/json',
                'Content-Type': 'application/x-www-form-urlencoded'
            }
            async with AsyncClient() as client:
                access_token_response = await client.post(access_token_api_info.url, data=access_token_api_info.params,
                                                          headers=headers)
                access_token_response = access_token_response.json()
                user_info = await provider.get_profile_info(access_token_response)
        except Exception as e:
            raise_general_exception(e)

        email = user_info.email.id if user_info.email is not None else None
        email_verified = user_info.email.is_verified if user_info.email is not None else None
        if email is None or email_verified is None:
            return SignInUpPostNoEmailGivenByProviderResponse()

        signinup_response = await api_options.recipe_implementation.sign_in_up(provider.id, user_info.user_id, email,
                                                                               email_verified)
        user = signinup_response.user
        await create_new_session(api_options.request, user.user_id)

        return SignInUpPostOkResponse(
            user, signinup_response.created_new_user, access_token_response)

Ancestors

Methods

async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) ‑> AuthorisationUrlGetResponse
Expand source code
async def authorisation_url_get(self, provider: Provider, api_options: APIOptions) -> AuthorisationUrlGetResponse:
    authorisation_url_info = provider.get_authorisation_redirect_api_info()

    params = {}
    for key, value in authorisation_url_info.params.items():
        params[key] = value if not callable(
            value) else value(api_options.request)

    auth_url = authorisation_url_info.url
    if is_using_oauth_development_keys(provider.client_id):
        params['actual_redirect_uri'] = authorisation_url_info.url
        auth_url = DEV_OAUTH_AUTHORIZATION_URL

    query_string = urlencode(params)

    url = auth_url + '?' + query_string
    return AuthorisationUrlGetOkResponse(url)
async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str, api_options: APIOptions) ‑> SignInUpPostResponse
Expand source code
async def sign_in_up_post(self, provider: Provider, code: str, redirect_uri: str,
                          api_options: APIOptions) -> SignInUpPostResponse:
    if is_using_oauth_development_keys(provider.client_id):
        redirect_uri = DEV_OAUTH_REDIRECT_URL
    try:
        access_token_api_info = provider.get_access_token_api_info(
            redirect_uri, code)
        headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        async with AsyncClient() as client:
            access_token_response = await client.post(access_token_api_info.url, data=access_token_api_info.params,
                                                      headers=headers)
            access_token_response = access_token_response.json()
            user_info = await provider.get_profile_info(access_token_response)
    except Exception as e:
        raise_general_exception(e)

    email = user_info.email.id if user_info.email is not None else None
    email_verified = user_info.email.is_verified if user_info.email is not None else None
    if email is None or email_verified is None:
        return SignInUpPostNoEmailGivenByProviderResponse()

    signinup_response = await api_options.recipe_implementation.sign_in_up(provider.id, user_info.user_id, email,
                                                                           email_verified)
    user = signinup_response.user
    await create_new_session(api_options.request, user.user_id)

    return SignInUpPostOkResponse(
        user, signinup_response.created_new_user, access_token_response)