Module supertokens_python.recipe.thirdparty.providers.okta

Expand source code
# # Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.

# # This software is licensed under the Apache License, Version 2.0 (the
# # "License") as published by the Apache Software Foundation.

# # You may not use this file except in compliance with the License. You may
# # obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# # License for the specific language governing permissions and limitations
# # under the License.
# from __future__ import annotations

# from typing import TYPE_CHECKING, Any, Callable, Dict, List, Union

# from httpx import AsyncClient
# from supertokens_python.recipe.thirdparty.provider import Provider
# from supertokens_python.recipe.thirdparty.types import (
#     AccessTokenAPI, AuthorisationRedirectAPI, UserInfo, UserInfoEmail)

# if TYPE_CHECKING:
#     from supertokens_python.framework.request import BaseRequest


# class Okta(Provider):
#     def __init__(self, okta_domain: str, client_id: str, client_secret: str, scope: Union[None, List[str]] = None,
#                  authorisation_redirect: Union[None, Dict[str, Union[str, Callable[[BaseRequest], str]]]] = None,
#                  authorization_server_id: str = 'default', is_default: bool = False):
#         super().__init__('okta', client_id, is_default)
#         default_scopes = ["openid", "email"]
#         if scope is None:
#             scope = default_scopes
#         self.client_secret = client_secret
#         self.base_url = 'https://' + okta_domain + '/oauth2/' + authorization_server_id
#         self.scopes = list(set(scope))
#         self.access_token_api_url = self.base_url + '/v1/token'
#         self.authorisation_redirect_url = self.base_url + '/v1/authorize'
#         self.authorisation_redirect_params = {}
#         if authorisation_redirect is not None:
#             self.authorisation_redirect_params = authorisation_redirect

#     async def get_profile_info(self, auth_code_response: Dict[str, Any]) -> UserInfo:
#         access_token: str = auth_code_response['access_token']
#         headers = {
#             'Authorization': 'Bearer ' + access_token
#         }
#         async with AsyncClient() as client:
#             response = await client.get(url=self.base_url + '/v1/userinfo', headers=headers)
#             user_info = response.json()
#             user_id = user_info['sub']
#             if 'email' not in user_info or user_info['email'] is None:
#                 return UserInfo(user_id)
#             is_email_verified = user_info['email_verified'] if 'email_verified' in user_info else False
#             return UserInfo(user_id, UserInfoEmail(
#                 user_info['email'], is_email_verified))

#     def get_authorisation_redirect_api_info(self) -> AuthorisationRedirectAPI:
#         params = {
#             'scope': ' '.join(self.scopes),
#             'client_id': self.client_id,
#             'response_type': 'code',
#             **self.authorisation_redirect_params
#         }
#         return AuthorisationRedirectAPI(
#             self.authorisation_redirect_url, params)

#     def get_access_token_api_info(
#             self, redirect_uri: str, auth_code_from_request: str) -> AccessTokenAPI:
#         params = {
#             'client_id': self.client_id,
#             'client_secret': self.client_secret,
#             'grant_type': 'authorization_code',
#             'code': auth_code_from_request,
#             'redirect_uri': redirect_uri
#         }
#         return AccessTokenAPI(self.access_token_api_url, params)

# def get_redirect_uri(self, user_context: Dict[str, Any]) -> Union[None, str]:
#     return None