Module supertokens_python.recipe.thirdparty.providers.linkedin

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any, Dict, Optional, Union
from supertokens_python.recipe.thirdparty.providers.utils import do_get_request

from supertokens_python.recipe.thirdparty.types import (
    RawUserInfoFromProvider,
    UserInfo,
    UserInfoEmail,
)
from ..provider import (
    Provider,
    ProviderConfigForClient,
    ProviderInput,
    UserFields,
    UserInfoMap,
)

from .custom import (
    GenericProvider,
    NewProvider,
)


class LinkedinImpl(GenericProvider):
    async def get_config_for_client_type(
        self, client_type: Optional[str], user_context: Dict[str, Any]
    ) -> ProviderConfigForClient:
        config = await super().get_config_for_client_type(client_type, user_context)

        if config.scope is None:
            config.scope = ["r_emailaddress", "r_liteprofile"]

        return config

    async def get_user_info(
        self, oauth_tokens: Dict[str, Any], user_context: Dict[str, Any]
    ) -> UserInfo:
        access_token: Union[str, None] = oauth_tokens.get("access_token")

        if access_token is None:
            raise Exception("Access token not found")

        headers = {
            "Authorization": f"Bearer {access_token}",
        }

        raw_user_info_from_provider = RawUserInfoFromProvider({}, {})
        user_info = await do_get_request(
            "https://api.linkedin.com/v2/me", headers=headers
        )
        raw_user_info_from_provider.from_user_info_api = user_info

        email_api_url = "https://api.linkedin.com/v2/emailAddress"
        email_info: Dict[str, Any] = await do_get_request(
            email_api_url,
            query_params={"q": "members", "projection": "(elements*(handle~))"},
            headers=headers,
        )

        if email_info.get("elements") is not None and len(email_info.get("elements")) > 0:  # type: ignore
            raw_user_info_from_provider.from_user_info_api["email"] = email_info.get("elements")[0].get("handle~").get("emailAddress")  # type: ignore

        raw_user_info_from_provider.from_user_info_api = {
            **raw_user_info_from_provider.from_user_info_api,
            **email_info,
        }

        return UserInfo(
            third_party_user_id=raw_user_info_from_provider.from_user_info_api.get("id"),  # type: ignore
            email=UserInfoEmail(
                email=raw_user_info_from_provider.from_user_info_api.get("email"),  # type: ignore
                is_verified=False,
            ),
        )


def Linkedin(input: ProviderInput) -> Provider:  # pylint: disable=redefined-builtin
    if input.config.name is None:
        input.config.name = "Linkedin"

    if input.config.authorization_endpoint is None:
        input.config.authorization_endpoint = (
            "https://www.linkedin.com/oauth/v2/authorization"
        )

    if input.config.token_endpoint is None:
        input.config.token_endpoint = "https://www.linkedin.com/oauth/v2/accessToken"

    if input.config.user_info_map is None:
        input.config.user_info_map = UserInfoMap(UserFields(), UserFields())

    if input.config.user_info_map.from_user_info_api is None:
        input.config.user_info_map.from_user_info_api = UserFields()

    if input.config.user_info_map.from_user_info_api.user_id is None:
        input.config.user_info_map.from_user_info_api.user_id = "id"

    if input.config.user_info_map.from_user_info_api.email is None:
        input.config.user_info_map.from_user_info_api.email = "email"

    if input.config.user_info_map.from_user_info_api.email_verified is None:
        input.config.user_info_map.from_user_info_api.email = "verified"

    return NewProvider(input, LinkedinImpl)

Functions

def Linkedin(input: ProviderInput) ‑> Provider
Expand source code
def Linkedin(input: ProviderInput) -> Provider:  # pylint: disable=redefined-builtin
    if input.config.name is None:
        input.config.name = "Linkedin"

    if input.config.authorization_endpoint is None:
        input.config.authorization_endpoint = (
            "https://www.linkedin.com/oauth/v2/authorization"
        )

    if input.config.token_endpoint is None:
        input.config.token_endpoint = "https://www.linkedin.com/oauth/v2/accessToken"

    if input.config.user_info_map is None:
        input.config.user_info_map = UserInfoMap(UserFields(), UserFields())

    if input.config.user_info_map.from_user_info_api is None:
        input.config.user_info_map.from_user_info_api = UserFields()

    if input.config.user_info_map.from_user_info_api.user_id is None:
        input.config.user_info_map.from_user_info_api.user_id = "id"

    if input.config.user_info_map.from_user_info_api.email is None:
        input.config.user_info_map.from_user_info_api.email = "email"

    if input.config.user_info_map.from_user_info_api.email_verified is None:
        input.config.user_info_map.from_user_info_api.email = "verified"

    return NewProvider(input, LinkedinImpl)

Classes

class LinkedinImpl (provider_config: ProviderConfig)
Expand source code
class LinkedinImpl(GenericProvider):
    async def get_config_for_client_type(
        self, client_type: Optional[str], user_context: Dict[str, Any]
    ) -> ProviderConfigForClient:
        config = await super().get_config_for_client_type(client_type, user_context)

        if config.scope is None:
            config.scope = ["r_emailaddress", "r_liteprofile"]

        return config

    async def get_user_info(
        self, oauth_tokens: Dict[str, Any], user_context: Dict[str, Any]
    ) -> UserInfo:
        access_token: Union[str, None] = oauth_tokens.get("access_token")

        if access_token is None:
            raise Exception("Access token not found")

        headers = {
            "Authorization": f"Bearer {access_token}",
        }

        raw_user_info_from_provider = RawUserInfoFromProvider({}, {})
        user_info = await do_get_request(
            "https://api.linkedin.com/v2/me", headers=headers
        )
        raw_user_info_from_provider.from_user_info_api = user_info

        email_api_url = "https://api.linkedin.com/v2/emailAddress"
        email_info: Dict[str, Any] = await do_get_request(
            email_api_url,
            query_params={"q": "members", "projection": "(elements*(handle~))"},
            headers=headers,
        )

        if email_info.get("elements") is not None and len(email_info.get("elements")) > 0:  # type: ignore
            raw_user_info_from_provider.from_user_info_api["email"] = email_info.get("elements")[0].get("handle~").get("emailAddress")  # type: ignore

        raw_user_info_from_provider.from_user_info_api = {
            **raw_user_info_from_provider.from_user_info_api,
            **email_info,
        }

        return UserInfo(
            third_party_user_id=raw_user_info_from_provider.from_user_info_api.get("id"),  # type: ignore
            email=UserInfoEmail(
                email=raw_user_info_from_provider.from_user_info_api.get("email"),  # type: ignore
                is_verified=False,
            ),
        )

Ancestors

Methods

async def get_config_for_client_type(self, client_type: Optional[str], user_context: Dict[str, Any]) ‑> ProviderConfigForClient
Expand source code
async def get_config_for_client_type(
    self, client_type: Optional[str], user_context: Dict[str, Any]
) -> ProviderConfigForClient:
    config = await super().get_config_for_client_type(client_type, user_context)

    if config.scope is None:
        config.scope = ["r_emailaddress", "r_liteprofile"]

    return config
async def get_user_info(self, oauth_tokens: Dict[str, Any], user_context: Dict[str, Any]) ‑> UserInfo
Expand source code
async def get_user_info(
    self, oauth_tokens: Dict[str, Any], user_context: Dict[str, Any]
) -> UserInfo:
    access_token: Union[str, None] = oauth_tokens.get("access_token")

    if access_token is None:
        raise Exception("Access token not found")

    headers = {
        "Authorization": f"Bearer {access_token}",
    }

    raw_user_info_from_provider = RawUserInfoFromProvider({}, {})
    user_info = await do_get_request(
        "https://api.linkedin.com/v2/me", headers=headers
    )
    raw_user_info_from_provider.from_user_info_api = user_info

    email_api_url = "https://api.linkedin.com/v2/emailAddress"
    email_info: Dict[str, Any] = await do_get_request(
        email_api_url,
        query_params={"q": "members", "projection": "(elements*(handle~))"},
        headers=headers,
    )

    if email_info.get("elements") is not None and len(email_info.get("elements")) > 0:  # type: ignore
        raw_user_info_from_provider.from_user_info_api["email"] = email_info.get("elements")[0].get("handle~").get("emailAddress")  # type: ignore

    raw_user_info_from_provider.from_user_info_api = {
        **raw_user_info_from_provider.from_user_info_api,
        **email_info,
    }

    return UserInfo(
        third_party_user_id=raw_user_info_from_provider.from_user_info_api.get("id"),  # type: ignore
        email=UserInfoEmail(
            email=raw_user_info_from_provider.from_user_info_api.get("email"),  # type: ignore
            is_verified=False,
        ),
    )