Module supertokens_python.recipe.userroles

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Optional, Union

from .recipe import PermissionClaim, UserRoleClaim, UserRolesRecipe
from .utils import InputOverrideConfig, UserRolesOverrideConfig

if TYPE_CHECKING:
    from supertokens_python.supertokens import RecipeInit


def init(
    skip_adding_roles_to_access_token: Optional[bool] = None,
    skip_adding_permissions_to_access_token: Optional[bool] = None,
    override: Union[UserRolesOverrideConfig, None] = None,
) -> RecipeInit:
    return UserRolesRecipe.init(
        skip_adding_roles_to_access_token,
        skip_adding_permissions_to_access_token,
        override,
    )


__all__ = [
    "InputOverrideConfig",  # deprecated, use `UserRolesOverrideConfig` instead
    "PermissionClaim",
    "UserRoleClaim",
    "UserRolesOverrideConfig",
    "UserRolesRecipe",
    "init",
]

Sub-modules

supertokens_python.recipe.userroles.asyncio
supertokens_python.recipe.userroles.exceptions
supertokens_python.recipe.userroles.interfaces
supertokens_python.recipe.userroles.recipe
supertokens_python.recipe.userroles.recipe_implementation
supertokens_python.recipe.userroles.syncio
supertokens_python.recipe.userroles.utils

Functions

def init(skip_adding_roles_to_access_token: Optional[bool] = None, skip_adding_permissions_to_access_token: Optional[bool] = None, override: Union[BaseOverrideConfig[RecipeInterface, APIInterface], None] = None)

Classes

class InputOverrideConfig (**data: Any)

Base class for input override config with API overrides.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var model_config

The type of the None singleton.

class UserRolesOverrideConfig (**data: Any)

Base class for input override config with API overrides.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Inherited members

class UserRolesRecipe (recipe_id: str, app_info: AppInfo, config: UserRolesConfig)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class UserRolesRecipe(RecipeModule):
    recipe_id = "userroles"
    __instance = None

    def __init__(
        self,
        recipe_id: str,
        app_info: AppInfo,
        config: UserRolesConfig,
    ):
        from ..oauth2provider.recipe import OAuth2ProviderRecipe

        super().__init__(recipe_id, app_info)
        self.config = validate_and_normalise_user_input(
            _recipe=self,
            _app_info=app_info,
            config=config,
        )
        recipe_implementation = RecipeImplementation(Querier.get_instance(recipe_id))
        self.recipe_implementation = self.config.override.functions(
            recipe_implementation
        )

        def callback():
            if self.config.skip_adding_roles_to_access_token is False:
                SessionRecipe.get_instance().add_claim_from_other_recipe(UserRoleClaim)
            if self.config.skip_adding_permissions_to_access_token is False:
                SessionRecipe.get_instance().add_claim_from_other_recipe(
                    PermissionClaim
                )

            async def token_payload_builder(
                user: User,
                scopes: List[str],
                session_handle: str,
                user_context: Dict[str, Any],
            ) -> Dict[str, Any]:
                payload: Dict[str, Any] = {"roles": None, "permissions": None}

                session_info = await get_session_information(
                    session_handle, user_context
                )

                if session_info is None:
                    raise Exception("should never come here")

                user_roles: List[str] = []

                if "roles" in scopes or "permissions" in scopes:
                    res = await self.recipe_implementation.get_roles_for_user(
                        tenant_id=session_info.tenant_id,
                        user_id=user.id,
                        user_context=user_context,
                    )

                    user_roles = res.roles

                if "roles" in scopes:
                    payload["roles"] = user_roles

                if "permissions" in scopes:
                    user_permissions: Set[str] = set()
                    for role in user_roles:
                        role_permissions = (
                            await self.recipe_implementation.get_permissions_for_role(
                                role=role,
                                user_context=user_context,
                            )
                        )

                        if isinstance(role_permissions, UnknownRoleError):
                            raise Exception("Failed to fetch permissions for the role")

                        for perm in role_permissions.permissions:
                            user_permissions.add(perm)

                    payload["permissions"] = list(user_permissions)

                return payload

            OAuth2ProviderRecipe.get_instance().add_access_token_builder_from_other_recipe(
                token_payload_builder
            )
            OAuth2ProviderRecipe.get_instance().add_id_token_builder_from_other_recipe(
                token_payload_builder
            )

            async def user_info_builder(
                user: User,
                _access_token_payload: Dict[str, Any],
                scopes: List[str],
                tenant_id: str,
                user_context: Dict[str, Any],
            ) -> Dict[str, Any]:
                user_info: Dict[str, Any] = {"roles": None, "permissions": None}

                user_roles = []

                if "roles" in scopes or "permissions" in scopes:
                    res = await self.recipe_implementation.get_roles_for_user(
                        tenant_id=tenant_id,
                        user_id=user.id,
                        user_context=user_context,
                    )

                    user_roles = res.roles

                if "roles" in scopes:
                    user_info["roles"] = user_roles

                if "permissions" in scopes:
                    user_permissions: Set[str] = set()
                    for role in user_roles:
                        role_permissions = (
                            await self.recipe_implementation.get_permissions_for_role(
                                role=role,
                                user_context=user_context,
                            )
                        )

                        if isinstance(role_permissions, UnknownRoleError):
                            raise Exception("Failed to fetch permissions for the role")

                        for perm in role_permissions.permissions:
                            user_permissions.add(perm)

                    user_info["permissions"] = list(user_permissions)

                return user_info

            OAuth2ProviderRecipe.get_instance().add_user_info_builder_from_other_recipe(
                user_info_builder
            )

        PostSTInitCallbacks.add_post_init_callback(callback)

    def is_error_from_this_recipe_based_on_instance(self, err: Exception) -> bool:
        return isinstance(err, SuperTokensError) and (
            isinstance(err, SuperTokensUserRolesError)
        )

    def get_apis_handled(self) -> List[APIHandled]:
        return []

    async def handle_api_request(
        self,
        request_id: str,
        tenant_id: Optional[str],
        request: BaseRequest,
        path: NormalisedURLPath,
        method: str,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> Union[BaseResponse, None]:
        raise Exception("Should never come here")

    async def handle_error(
        self,
        request: BaseRequest,
        err: SuperTokensError,
        response: BaseResponse,
        user_context: Dict[str, Any],
    ) -> BaseResponse:
        raise err

    def get_all_cors_headers(self) -> List[str]:
        return []

    @staticmethod
    def init(
        skip_adding_roles_to_access_token: Optional[bool] = None,
        skip_adding_permissions_to_access_token: Optional[bool] = None,
        override: Union[UserRolesOverrideConfig, None] = None,
    ):
        from supertokens_python.plugins import OverrideMap, apply_plugins

        config = UserRolesConfig(
            skip_adding_roles_to_access_token=skip_adding_roles_to_access_token,
            skip_adding_permissions_to_access_token=skip_adding_permissions_to_access_token,
            override=override,
        )

        def func(app_info: AppInfo, plugins: List[OverrideMap]):
            if UserRolesRecipe.__instance is None:
                UserRolesRecipe.__instance = UserRolesRecipe(
                    recipe_id=UserRolesRecipe.recipe_id,
                    app_info=app_info,
                    config=apply_plugins(
                        recipe_id=UserRolesRecipe.recipe_id,
                        config=config,
                        plugins=plugins,
                    ),
                )
                return UserRolesRecipe.__instance
            raise Exception(
                None,
                "UserRoles recipe has already been initialised. Please check your code for bugs.",
            )

        return func

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise_general_exception("calling testing function in non testing env")
        UserRolesRecipe.__instance = None

    @staticmethod
    def get_instance() -> UserRolesRecipe:
        if UserRolesRecipe.__instance is not None:
            return UserRolesRecipe.__instance
        raise_general_exception(
            "Initialisation not done. Did you forget to call the SuperTokens.init or UserRoles.init function?"
        )

Ancestors

Class variables

var recipe_id

The type of the None singleton.

Static methods

def get_instance() ‑> UserRolesRecipe
def init(skip_adding_roles_to_access_token: Optional[bool] = None, skip_adding_permissions_to_access_token: Optional[bool] = None, override: Union[BaseOverrideConfig[RecipeInterface, APIInterface], None] = None)
def reset()

Methods

def get_all_cors_headers(self) ‑> List[str]
def get_apis_handled(self) ‑> List[APIHandled]
async def handle_api_request(self, request_id: str, tenant_id: Optional[str], request: BaseRequest, path: NormalisedURLPath, method: str, response: BaseResponse, user_context: Dict[str, Any]) ‑> Optional[BaseResponse]
async def handle_error(self, request: BaseRequest, err: SuperTokensError, response: BaseResponse, user_context: Dict[str, Any]) ‑> BaseResponse
def is_error_from_this_recipe_based_on_instance(self, err: Exception) ‑> bool

Inherited members