Module supertokens_python.recipe.thirdparty.api

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from .apple_redirect import handle_apple_redirect_api
from .authorisation_url import handle_authorisation_url_api
from .signinup import handle_sign_in_up_api

__all__ = [
    "handle_apple_redirect_api",
    "handle_authorisation_url_api",
    "handle_sign_in_up_api",
]

Sub-modules

supertokens_python.recipe.thirdparty.api.apple_redirect
supertokens_python.recipe.thirdparty.api.authorisation_url
supertokens_python.recipe.thirdparty.api.implementation
supertokens_python.recipe.thirdparty.api.signinup

Functions

async def handle_apple_redirect_api(api_implementation: APIInterface, api_options: APIOptions, user_context: Dict[str, Any])
Expand source code
async def handle_apple_redirect_api(
    api_implementation: APIInterface,
    api_options: APIOptions,
    user_context: Dict[str, Any],
):
    if api_implementation.disable_apple_redirect_handler_post:
        return None

    body = await api_options.request.form_data()

    # this will redirect the user...
    await api_implementation.apple_redirect_handler_post(
        body, api_options, user_context
    )

    return api_options.response
async def handle_authorisation_url_api(api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any])
Expand source code
async def handle_authorisation_url_api(
    api_implementation: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
):
    if api_implementation.disable_authorisation_url_get:
        return None

    third_party_id = api_options.request.get_query_param("thirdPartyId")
    redirect_uri_on_provider_dashboard = api_options.request.get_query_param(
        "redirectURIOnProviderDashboard"
    )
    client_type = api_options.request.get_query_param("clientType")

    if third_party_id is None:
        raise_bad_input_exception("Please provide the thirdPartyId as a GET param")

    if redirect_uri_on_provider_dashboard is None:
        raise_bad_input_exception(
            "Please provide the redirectURIOnProviderDashboard as a GET param"
        )

    provider_response = await api_options.recipe_implementation.get_provider(
        third_party_id=third_party_id,
        client_type=client_type,
        tenant_id=tenant_id,
        user_context=user_context,
    )

    if provider_response is None:
        raise BadInputError(
            f"the provider {third_party_id} could not be found in the configuration"
        )

    provider = provider_response
    result = await api_implementation.authorisation_url_get(
        provider=provider,
        redirect_uri_on_provider_dashboard=redirect_uri_on_provider_dashboard,
        api_options=api_options,
        user_context=user_context,
    )
    return send_200_response(result.to_json(), api_options.response)
async def handle_sign_in_up_api(api_implementation: APIInterface, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any])
Expand source code
async def handle_sign_in_up_api(
    api_implementation: APIInterface,
    tenant_id: str,
    api_options: APIOptions,
    user_context: Dict[str, Any],
):
    from supertokens_python.auth_utils import load_session_in_auth_api_if_needed

    if api_implementation.disable_sign_in_up_post:
        return None

    body = await api_options.request.json()
    if body is None:
        raise_bad_input_exception("Please provide a JSON input")

    third_party_id = body.get("thirdPartyId")
    client_type = body.get("clientType")

    if third_party_id is None or not isinstance(third_party_id, str):
        raise_bad_input_exception("Please provide the thirdPartyId in request body")

    oauth_tokens = None
    redirect_uri_info = None
    if body.get("redirectURIInfo") is not None:
        if body.get("redirectURIInfo").get("redirectURIOnProviderDashboard") is None:
            raise_bad_input_exception(
                "Please provide the redirectURIOnProviderDashboard in request body"
            )
        redirect_uri_info = body.get("redirectURIInfo")
    elif body.get("oAuthTokens") is not None:
        oauth_tokens = body.get("oAuthTokens")
    else:
        raise_bad_input_exception(
            "Please provide one of redirectURIInfo or oAuthTokens in the request body"
        )

    provider_response = await api_options.recipe_implementation.get_provider(
        third_party_id=third_party_id,
        client_type=client_type,
        tenant_id=tenant_id,
        user_context=user_context,
    )

    if provider_response is None:
        raise BadInputError(
            f"the provider {third_party_id} could not be found in the configuration"
        )

    provider = provider_response

    redirect_uri_info_parsed: Optional[RedirectUriInfo] = None
    if redirect_uri_info is not None:
        redirect_uri_info_parsed = RedirectUriInfo(
            redirect_uri_on_provider_dashboard=redirect_uri_info.get(
                "redirectURIOnProviderDashboard"
            ),
            redirect_uri_query_params=redirect_uri_info.get("redirectURIQueryParams"),
            pkce_code_verifier=redirect_uri_info.get("pkceCodeVerifier"),
        )

    should_try_linking_with_session_user = (
        get_normalised_should_try_linking_with_session_user_flag(
            api_options.request, body
        )
    )

    session = await load_session_in_auth_api_if_needed(
        api_options.request, should_try_linking_with_session_user, user_context
    )

    if session is not None:
        tenant_id = session.get_tenant_id()

    result = await api_implementation.sign_in_up_post(
        provider=provider,
        redirect_uri_info=redirect_uri_info_parsed,
        oauth_tokens=oauth_tokens,
        tenant_id=tenant_id,
        api_options=api_options,
        user_context=user_context,
        session=session,
        should_try_linking_with_session_user=should_try_linking_with_session_user,
    )

    if isinstance(result, SignInUpPostOkResult):
        return send_200_response(
            {
                "status": "OK",
                **get_backwards_compatible_user_info(
                    req=api_options.request,
                    user_info=result.user,
                    session_container=result.session,
                    created_new_recipe_user=result.created_new_recipe_user,
                    user_context=user_context,
                ),
            },
            api_options.response,
        )

    return send_200_response(result.to_json(), api_options.response)