Module supertokens_python.recipe.multitenancy.recipe_implementation
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Dict, Any, Union, List
from supertokens_python.recipe.multitenancy.interfaces import (
AssociateUserToTenantOkResult,
AssociateUserToTenantUnknownUserIdError,
AssociateUserToTenantEmailAlreadyExistsError,
AssociateUserToTenantPhoneNumberAlreadyExistsError,
AssociateUserToTenantThirdPartyUserAlreadyExistsError,
DisassociateUserFromTenantOkResult,
)
from .interfaces import (
RecipeInterface,
TenantConfig,
CreateOrUpdateTenantOkResult,
DeleteTenantOkResult,
TenantConfigResponse,
GetTenantOkResult,
EmailPasswordConfig,
PasswordlessConfig,
ThirdPartyConfig,
ListAllTenantsOkResult,
CreateOrUpdateThirdPartyConfigOkResult,
DeleteThirdPartyConfigOkResult,
ListAllTenantsItem,
)
if TYPE_CHECKING:
from supertokens_python.querier import Querier
from supertokens_python.recipe.thirdparty.provider import ProviderConfig
from .utils import MultitenancyConfig
from supertokens_python.querier import NormalisedURLPath
from .constants import DEFAULT_TENANT_ID
def parse_tenant_config(tenant: Dict[str, Any]) -> TenantConfigResponse:
from supertokens_python.recipe.thirdparty.provider import (
UserInfoMap,
UserFields,
ProviderClientConfig,
ProviderConfig,
)
providers: List[ProviderConfig] = []
for p in tenant["thirdParty"]["providers"]:
user_info_map: Optional[UserInfoMap] = None
if "userInfoMap" in p:
map_from_payload = p["userInfoMap"].get("fromIdTokenPayload", {})
map_from_api = p["userInfoMap"].get("fromUserInfoAPI", {})
user_info_map = UserInfoMap(
UserFields(
map_from_payload.get("userId"),
map_from_payload.get("email"),
map_from_payload.get("emailVerified"),
),
UserFields(
map_from_api.get("userId"),
map_from_api.get("email"),
map_from_api.get("emailVerified"),
),
)
providers.append(
ProviderConfig(
third_party_id=p["thirdPartyId"],
name=p.get("name"),
clients=[
ProviderClientConfig(
client_id=c["clientId"],
client_secret=c.get("clientSecret"),
client_type=c.get("clientType"),
scope=c.get("scope"),
force_pkce=c.get("forcePKCE", False),
additional_config=c.get("additionalConfig"),
)
for c in p["clients"]
],
authorization_endpoint=p.get("authorizationEndpoint"),
authorization_endpoint_query_params=p.get(
"authorizationEndpointQueryParams"
),
token_endpoint=p.get("tokenEndpoint"),
token_endpoint_body_params=p.get("tokenEndpointBodyParams"),
user_info_endpoint=p.get("userInfoEndpoint"),
user_info_endpoint_query_params=p.get("userInfoEndpointQueryParams"),
user_info_endpoint_headers=p.get("userInfoEndpointHeaders"),
jwks_uri=p.get("jwksURI"),
oidc_discovery_endpoint=p.get("oidcDiscoveryEndpoint"),
user_info_map=user_info_map,
require_email=p.get("requireEmail", True),
validate_id_token_payload=None,
generate_fake_email=None,
validate_access_token=None,
)
)
return TenantConfigResponse(
emailpassword=EmailPasswordConfig(tenant["emailPassword"]["enabled"]),
passwordless=PasswordlessConfig(tenant["passwordless"]["enabled"]),
third_party=ThirdPartyConfig(
tenant["thirdParty"]["enabled"],
providers,
),
core_config=tenant["coreConfig"],
)
class RecipeImplementation(RecipeInterface):
def __init__(self, querier: Querier, config: MultitenancyConfig):
super().__init__()
self.querier = querier
self.config = config
async def get_tenant_id(
self, tenant_id_from_frontend: str, user_context: Dict[str, Any]
) -> str:
return tenant_id_from_frontend
async def create_or_update_tenant(
self,
tenant_id: str,
config: Optional[TenantConfig],
user_context: Dict[str, Any],
) -> CreateOrUpdateTenantOkResult:
response = await self.querier.send_put_request(
NormalisedURLPath("/recipe/multitenancy/tenant"),
{
"tenantId": tenant_id,
**(config.to_json() if config is not None else {}),
},
user_context=user_context,
)
return CreateOrUpdateTenantOkResult(
created_new=response["createdNew"],
)
async def delete_tenant(
self, tenant_id: str, user_context: Dict[str, Any]
) -> DeleteTenantOkResult:
response = await self.querier.send_post_request(
NormalisedURLPath("/recipe/multitenancy/tenant/remove"),
{"tenantId": tenant_id},
user_context=user_context,
)
return DeleteTenantOkResult(
did_exist=response["didExist"],
)
async def get_tenant(
self, tenant_id: Optional[str], user_context: Dict[str, Any]
) -> Optional[GetTenantOkResult]:
res = await self.querier.send_get_request(
NormalisedURLPath(
f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant"
),
None,
user_context=user_context,
)
if res["status"] == "TENANT_NOT_FOUND_ERROR":
return None
tenant_config = parse_tenant_config(res)
return GetTenantOkResult(
emailpassword=tenant_config.emailpassword,
passwordless=tenant_config.passwordless,
third_party=tenant_config.third_party,
core_config=tenant_config.core_config,
)
async def list_all_tenants(
self, user_context: Dict[str, Any]
) -> ListAllTenantsOkResult:
response = await self.querier.send_get_request(
NormalisedURLPath("/recipe/multitenancy/tenant/list"),
{},
user_context=user_context,
)
tenant_items: List[ListAllTenantsItem] = []
for tenant in response["tenants"]:
config = parse_tenant_config(tenant)
item = ListAllTenantsItem(
tenant["tenantId"],
config.emailpassword,
config.passwordless,
config.third_party,
config.core_config,
)
tenant_items.append(item)
return ListAllTenantsOkResult(
tenants=tenant_items,
)
async def create_or_update_third_party_config(
self,
tenant_id: Optional[str],
config: ProviderConfig,
skip_validation: Optional[bool],
user_context: Dict[str, Any],
) -> CreateOrUpdateThirdPartyConfigOkResult:
response = await self.querier.send_put_request(
NormalisedURLPath(
f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/config/thirdparty"
),
{
"config": config.to_json(),
"skipValidation": skip_validation is True,
},
user_context=user_context,
)
return CreateOrUpdateThirdPartyConfigOkResult(
created_new=response["createdNew"],
)
async def delete_third_party_config(
self,
tenant_id: Optional[str],
third_party_id: str,
user_context: Dict[str, Any],
) -> DeleteThirdPartyConfigOkResult:
response = await self.querier.send_post_request(
NormalisedURLPath(
f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/config/thirdparty/remove"
),
{
"thirdPartyId": third_party_id,
},
user_context=user_context,
)
return DeleteThirdPartyConfigOkResult(
did_config_exist=response["didConfigExist"],
)
async def associate_user_to_tenant(
self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any]
) -> Union[
AssociateUserToTenantOkResult,
AssociateUserToTenantUnknownUserIdError,
AssociateUserToTenantEmailAlreadyExistsError,
AssociateUserToTenantPhoneNumberAlreadyExistsError,
AssociateUserToTenantThirdPartyUserAlreadyExistsError,
]:
response = await self.querier.send_post_request(
NormalisedURLPath(
f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant/user"
),
{
"userId": user_id,
},
user_context=user_context,
)
if response["status"] == "OK":
return AssociateUserToTenantOkResult(
was_already_associated=response["wasAlreadyAssociated"],
)
if response["status"] == AssociateUserToTenantUnknownUserIdError.status:
return AssociateUserToTenantUnknownUserIdError()
if response["status"] == AssociateUserToTenantEmailAlreadyExistsError.status:
return AssociateUserToTenantEmailAlreadyExistsError()
if (
response["status"]
== AssociateUserToTenantPhoneNumberAlreadyExistsError.status
):
return AssociateUserToTenantPhoneNumberAlreadyExistsError()
if (
response["status"]
== AssociateUserToTenantThirdPartyUserAlreadyExistsError.status
):
return AssociateUserToTenantThirdPartyUserAlreadyExistsError()
raise Exception("Should never come here")
async def dissociate_user_from_tenant(
self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any]
) -> DisassociateUserFromTenantOkResult:
response = await self.querier.send_post_request(
NormalisedURLPath(
f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant/user/remove"
),
{
"userId": user_id,
},
user_context=user_context,
)
return DisassociateUserFromTenantOkResult(
was_associated=response["wasAssociated"],
)
Functions
def parse_tenant_config(tenant: Dict[str, Any]) ‑> TenantConfigResponse
-
Expand source code
def parse_tenant_config(tenant: Dict[str, Any]) -> TenantConfigResponse: from supertokens_python.recipe.thirdparty.provider import ( UserInfoMap, UserFields, ProviderClientConfig, ProviderConfig, ) providers: List[ProviderConfig] = [] for p in tenant["thirdParty"]["providers"]: user_info_map: Optional[UserInfoMap] = None if "userInfoMap" in p: map_from_payload = p["userInfoMap"].get("fromIdTokenPayload", {}) map_from_api = p["userInfoMap"].get("fromUserInfoAPI", {}) user_info_map = UserInfoMap( UserFields( map_from_payload.get("userId"), map_from_payload.get("email"), map_from_payload.get("emailVerified"), ), UserFields( map_from_api.get("userId"), map_from_api.get("email"), map_from_api.get("emailVerified"), ), ) providers.append( ProviderConfig( third_party_id=p["thirdPartyId"], name=p.get("name"), clients=[ ProviderClientConfig( client_id=c["clientId"], client_secret=c.get("clientSecret"), client_type=c.get("clientType"), scope=c.get("scope"), force_pkce=c.get("forcePKCE", False), additional_config=c.get("additionalConfig"), ) for c in p["clients"] ], authorization_endpoint=p.get("authorizationEndpoint"), authorization_endpoint_query_params=p.get( "authorizationEndpointQueryParams" ), token_endpoint=p.get("tokenEndpoint"), token_endpoint_body_params=p.get("tokenEndpointBodyParams"), user_info_endpoint=p.get("userInfoEndpoint"), user_info_endpoint_query_params=p.get("userInfoEndpointQueryParams"), user_info_endpoint_headers=p.get("userInfoEndpointHeaders"), jwks_uri=p.get("jwksURI"), oidc_discovery_endpoint=p.get("oidcDiscoveryEndpoint"), user_info_map=user_info_map, require_email=p.get("requireEmail", True), validate_id_token_payload=None, generate_fake_email=None, validate_access_token=None, ) ) return TenantConfigResponse( emailpassword=EmailPasswordConfig(tenant["emailPassword"]["enabled"]), passwordless=PasswordlessConfig(tenant["passwordless"]["enabled"]), third_party=ThirdPartyConfig( tenant["thirdParty"]["enabled"], providers, ), core_config=tenant["coreConfig"], )
Classes
class RecipeImplementation (querier: Querier, config: MultitenancyConfig)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class RecipeImplementation(RecipeInterface): def __init__(self, querier: Querier, config: MultitenancyConfig): super().__init__() self.querier = querier self.config = config async def get_tenant_id( self, tenant_id_from_frontend: str, user_context: Dict[str, Any] ) -> str: return tenant_id_from_frontend async def create_or_update_tenant( self, tenant_id: str, config: Optional[TenantConfig], user_context: Dict[str, Any], ) -> CreateOrUpdateTenantOkResult: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/multitenancy/tenant"), { "tenantId": tenant_id, **(config.to_json() if config is not None else {}), }, user_context=user_context, ) return CreateOrUpdateTenantOkResult( created_new=response["createdNew"], ) async def delete_tenant( self, tenant_id: str, user_context: Dict[str, Any] ) -> DeleteTenantOkResult: response = await self.querier.send_post_request( NormalisedURLPath("/recipe/multitenancy/tenant/remove"), {"tenantId": tenant_id}, user_context=user_context, ) return DeleteTenantOkResult( did_exist=response["didExist"], ) async def get_tenant( self, tenant_id: Optional[str], user_context: Dict[str, Any] ) -> Optional[GetTenantOkResult]: res = await self.querier.send_get_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant" ), None, user_context=user_context, ) if res["status"] == "TENANT_NOT_FOUND_ERROR": return None tenant_config = parse_tenant_config(res) return GetTenantOkResult( emailpassword=tenant_config.emailpassword, passwordless=tenant_config.passwordless, third_party=tenant_config.third_party, core_config=tenant_config.core_config, ) async def list_all_tenants( self, user_context: Dict[str, Any] ) -> ListAllTenantsOkResult: response = await self.querier.send_get_request( NormalisedURLPath("/recipe/multitenancy/tenant/list"), {}, user_context=user_context, ) tenant_items: List[ListAllTenantsItem] = [] for tenant in response["tenants"]: config = parse_tenant_config(tenant) item = ListAllTenantsItem( tenant["tenantId"], config.emailpassword, config.passwordless, config.third_party, config.core_config, ) tenant_items.append(item) return ListAllTenantsOkResult( tenants=tenant_items, ) async def create_or_update_third_party_config( self, tenant_id: Optional[str], config: ProviderConfig, skip_validation: Optional[bool], user_context: Dict[str, Any], ) -> CreateOrUpdateThirdPartyConfigOkResult: response = await self.querier.send_put_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/config/thirdparty" ), { "config": config.to_json(), "skipValidation": skip_validation is True, }, user_context=user_context, ) return CreateOrUpdateThirdPartyConfigOkResult( created_new=response["createdNew"], ) async def delete_third_party_config( self, tenant_id: Optional[str], third_party_id: str, user_context: Dict[str, Any], ) -> DeleteThirdPartyConfigOkResult: response = await self.querier.send_post_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/config/thirdparty/remove" ), { "thirdPartyId": third_party_id, }, user_context=user_context, ) return DeleteThirdPartyConfigOkResult( did_config_exist=response["didConfigExist"], ) async def associate_user_to_tenant( self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any] ) -> Union[ AssociateUserToTenantOkResult, AssociateUserToTenantUnknownUserIdError, AssociateUserToTenantEmailAlreadyExistsError, AssociateUserToTenantPhoneNumberAlreadyExistsError, AssociateUserToTenantThirdPartyUserAlreadyExistsError, ]: response = await self.querier.send_post_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant/user" ), { "userId": user_id, }, user_context=user_context, ) if response["status"] == "OK": return AssociateUserToTenantOkResult( was_already_associated=response["wasAlreadyAssociated"], ) if response["status"] == AssociateUserToTenantUnknownUserIdError.status: return AssociateUserToTenantUnknownUserIdError() if response["status"] == AssociateUserToTenantEmailAlreadyExistsError.status: return AssociateUserToTenantEmailAlreadyExistsError() if ( response["status"] == AssociateUserToTenantPhoneNumberAlreadyExistsError.status ): return AssociateUserToTenantPhoneNumberAlreadyExistsError() if ( response["status"] == AssociateUserToTenantThirdPartyUserAlreadyExistsError.status ): return AssociateUserToTenantThirdPartyUserAlreadyExistsError() raise Exception("Should never come here") async def dissociate_user_from_tenant( self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any] ) -> DisassociateUserFromTenantOkResult: response = await self.querier.send_post_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant/user/remove" ), { "userId": user_id, }, user_context=user_context, ) return DisassociateUserFromTenantOkResult( was_associated=response["wasAssociated"], )
Ancestors
- RecipeInterface
- abc.ABC
Methods
async def associate_user_to_tenant(self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any]) ‑> Union[AssociateUserToTenantOkResult, AssociateUserToTenantUnknownUserIdError, AssociateUserToTenantEmailAlreadyExistsError, AssociateUserToTenantPhoneNumberAlreadyExistsError, AssociateUserToTenantThirdPartyUserAlreadyExistsError]
-
Expand source code
async def associate_user_to_tenant( self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any] ) -> Union[ AssociateUserToTenantOkResult, AssociateUserToTenantUnknownUserIdError, AssociateUserToTenantEmailAlreadyExistsError, AssociateUserToTenantPhoneNumberAlreadyExistsError, AssociateUserToTenantThirdPartyUserAlreadyExistsError, ]: response = await self.querier.send_post_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant/user" ), { "userId": user_id, }, user_context=user_context, ) if response["status"] == "OK": return AssociateUserToTenantOkResult( was_already_associated=response["wasAlreadyAssociated"], ) if response["status"] == AssociateUserToTenantUnknownUserIdError.status: return AssociateUserToTenantUnknownUserIdError() if response["status"] == AssociateUserToTenantEmailAlreadyExistsError.status: return AssociateUserToTenantEmailAlreadyExistsError() if ( response["status"] == AssociateUserToTenantPhoneNumberAlreadyExistsError.status ): return AssociateUserToTenantPhoneNumberAlreadyExistsError() if ( response["status"] == AssociateUserToTenantThirdPartyUserAlreadyExistsError.status ): return AssociateUserToTenantThirdPartyUserAlreadyExistsError() raise Exception("Should never come here")
async def create_or_update_tenant(self, tenant_id: str, config: Optional[TenantConfig], user_context: Dict[str, Any]) ‑> CreateOrUpdateTenantOkResult
-
Expand source code
async def create_or_update_tenant( self, tenant_id: str, config: Optional[TenantConfig], user_context: Dict[str, Any], ) -> CreateOrUpdateTenantOkResult: response = await self.querier.send_put_request( NormalisedURLPath("/recipe/multitenancy/tenant"), { "tenantId": tenant_id, **(config.to_json() if config is not None else {}), }, user_context=user_context, ) return CreateOrUpdateTenantOkResult( created_new=response["createdNew"], )
async def create_or_update_third_party_config(self, tenant_id: Optional[str], config: ProviderConfig, skip_validation: Optional[bool], user_context: Dict[str, Any]) ‑> CreateOrUpdateThirdPartyConfigOkResult
-
Expand source code
async def create_or_update_third_party_config( self, tenant_id: Optional[str], config: ProviderConfig, skip_validation: Optional[bool], user_context: Dict[str, Any], ) -> CreateOrUpdateThirdPartyConfigOkResult: response = await self.querier.send_put_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/config/thirdparty" ), { "config": config.to_json(), "skipValidation": skip_validation is True, }, user_context=user_context, ) return CreateOrUpdateThirdPartyConfigOkResult( created_new=response["createdNew"], )
async def delete_tenant(self, tenant_id: str, user_context: Dict[str, Any]) ‑> DeleteTenantOkResult
-
Expand source code
async def delete_tenant( self, tenant_id: str, user_context: Dict[str, Any] ) -> DeleteTenantOkResult: response = await self.querier.send_post_request( NormalisedURLPath("/recipe/multitenancy/tenant/remove"), {"tenantId": tenant_id}, user_context=user_context, ) return DeleteTenantOkResult( did_exist=response["didExist"], )
async def delete_third_party_config(self, tenant_id: Optional[str], third_party_id: str, user_context: Dict[str, Any]) ‑> DeleteThirdPartyConfigOkResult
-
Expand source code
async def delete_third_party_config( self, tenant_id: Optional[str], third_party_id: str, user_context: Dict[str, Any], ) -> DeleteThirdPartyConfigOkResult: response = await self.querier.send_post_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/config/thirdparty/remove" ), { "thirdPartyId": third_party_id, }, user_context=user_context, ) return DeleteThirdPartyConfigOkResult( did_config_exist=response["didConfigExist"], )
async def dissociate_user_from_tenant(self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any]) ‑> DisassociateUserFromTenantOkResult
-
Expand source code
async def dissociate_user_from_tenant( self, tenant_id: Optional[str], user_id: str, user_context: Dict[str, Any] ) -> DisassociateUserFromTenantOkResult: response = await self.querier.send_post_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant/user/remove" ), { "userId": user_id, }, user_context=user_context, ) return DisassociateUserFromTenantOkResult( was_associated=response["wasAssociated"], )
async def get_tenant(self, tenant_id: Optional[str], user_context: Dict[str, Any]) ‑> Optional[GetTenantOkResult]
-
Expand source code
async def get_tenant( self, tenant_id: Optional[str], user_context: Dict[str, Any] ) -> Optional[GetTenantOkResult]: res = await self.querier.send_get_request( NormalisedURLPath( f"{tenant_id or DEFAULT_TENANT_ID}/recipe/multitenancy/tenant" ), None, user_context=user_context, ) if res["status"] == "TENANT_NOT_FOUND_ERROR": return None tenant_config = parse_tenant_config(res) return GetTenantOkResult( emailpassword=tenant_config.emailpassword, passwordless=tenant_config.passwordless, third_party=tenant_config.third_party, core_config=tenant_config.core_config, )
async def get_tenant_id(self, tenant_id_from_frontend: str, user_context: Dict[str, Any]) ‑> str
-
Expand source code
async def get_tenant_id( self, tenant_id_from_frontend: str, user_context: Dict[str, Any] ) -> str: return tenant_id_from_frontend
async def list_all_tenants(self, user_context: Dict[str, Any]) ‑> ListAllTenantsOkResult
-
Expand source code
async def list_all_tenants( self, user_context: Dict[str, Any] ) -> ListAllTenantsOkResult: response = await self.querier.send_get_request( NormalisedURLPath("/recipe/multitenancy/tenant/list"), {}, user_context=user_context, ) tenant_items: List[ListAllTenantsItem] = [] for tenant in response["tenants"]: config = parse_tenant_config(tenant) item = ListAllTenantsItem( tenant["tenantId"], config.emailpassword, config.passwordless, config.third_party, config.core_config, ) tenant_items.append(item) return ListAllTenantsOkResult( tenants=tenant_items, )