Module supertokens_python.recipe.session.interfaces
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
Optional,
TypeVar,
Union,
)
from typing_extensions import TypedDict
from supertokens_python.async_to_sync_wrapper import sync
from supertokens_python.types import (
APIResponse,
GeneralErrorResponse,
MaybeAwaitable,
RecipeUserId,
)
from ...utils import resolve
from .exceptions import ClaimValidationError
from .utils import SessionConfig, TokenTransferMethod
if TYPE_CHECKING:
from supertokens_python.framework import BaseRequest
from supertokens_python.framework import BaseResponse
class SessionObj:
def __init__(
self,
handle: str,
user_id: str,
recipe_user_id: RecipeUserId,
user_data_in_jwt: Dict[str, Any],
tenant_id: str,
):
self.handle = handle
self.user_id = user_id
self.user_data_in_jwt = user_data_in_jwt
self.tenant_id = tenant_id
self.recipe_user_id = recipe_user_id
def to_json(self) -> Dict[str, Any]:
return {
"handle": self.handle,
"userId": self.user_id,
"recipeUserId": self.recipe_user_id.get_as_string(),
"tenantId": self.tenant_id,
"userDataInJWT": self.user_data_in_jwt,
}
class AccessTokenObj:
def __init__(self, token: str, expiry: int, created_time: int):
self.token = token
self.expiry = expiry
self.created_time = created_time
def to_json(self) -> Dict[str, Any]:
return {
"token": self.token,
"expiry": self.expiry,
"createdTime": self.created_time,
}
class RegenerateAccessTokenOkResult:
def __init__(self, session: SessionObj, access_token: Union[AccessTokenObj, None]):
self.session = session
self.access_token = access_token
def to_json(self) -> Dict[str, Any]:
return {
"session": self.session.to_json(),
"accessToken": (
self.access_token.to_json() if self.access_token is not None else None
),
}
class SessionInformationResult:
def __init__(
self,
session_handle: str,
user_id: str,
recipe_user_id: RecipeUserId,
session_data_in_database: Dict[str, Any],
expiry: int,
custom_claims_in_access_token_payload: Dict[str, Any],
time_created: int,
tenant_id: str,
):
self.session_handle = session_handle
self.user_id = user_id
self.session_data_in_database = session_data_in_database
self.expiry = expiry
self.custom_claims_in_access_token_payload = (
custom_claims_in_access_token_payload
)
self.time_created = time_created
self.tenant_id = tenant_id
self.recipe_user_id = recipe_user_id
def to_json(self) -> Dict[str, Any]:
return {
"sessionHandle": self.session_handle,
"userId": self.user_id,
"recipeUserId": self.recipe_user_id.get_as_string(),
"sessionDataInDatabase": self.session_data_in_database,
"expiry": self.expiry,
"customClaimsInAccessTokenPayload": self.custom_claims_in_access_token_payload,
"timeCreated": self.time_created,
"tenantId": self.tenant_id,
}
class ReqResInfo:
def __init__(
self,
request: BaseRequest,
transfer_method: TokenTransferMethod,
):
self.request = request
self.transfer_method = transfer_method
_T = TypeVar("_T")
JSONObject = Dict[str, Any]
JSONPrimitive = Union[str, int, bool, None, Dict[str, Any]]
JSONPrimitiveList = Union[
List[str], List[int], List[bool], List[None], List[Dict[str, Any]]
]
class SessionDoesNotExistError:
pass
class GetClaimValueOkResult(Generic[_T]):
def __init__(self, value: Optional[_T]):
self.value = value
class ClaimsValidationResult:
def __init__(
self,
invalid_claims: List[ClaimValidationError],
access_token_payload_update: Optional[Dict[str, Any]] = None,
):
self.invalid_claims = invalid_claims
self.access_token_payload_update = access_token_payload_update
def to_json(self) -> Dict[str, Any]:
return {
"status": "OK",
"invalidClaims": [i.to_json() for i in self.invalid_claims],
"accessTokenPayloadUpdate": self.access_token_payload_update,
}
class GetSessionTokensDangerouslyDict(TypedDict):
accessToken: str
accessAndFrontTokenUpdated: bool
refreshToken: Optional[str]
frontToken: str
antiCsrfToken: Optional[str]
class RecipeInterface(ABC): # pylint: disable=too-many-public-methods
def __init__(self):
pass
@abstractmethod
async def create_new_session(
self,
user_id: str,
recipe_user_id: RecipeUserId,
access_token_payload: Optional[Dict[str, Any]],
session_data_in_database: Optional[Dict[str, Any]],
disable_anti_csrf: Optional[bool],
tenant_id: str,
user_context: Dict[str, Any],
) -> SessionContainer:
pass
@abstractmethod
def get_global_claim_validators(
self,
tenant_id: str,
user_id: str,
recipe_user_id: RecipeUserId,
claim_validators_added_by_other_recipes: List[SessionClaimValidator],
user_context: Dict[str, Any],
) -> MaybeAwaitable[List[SessionClaimValidator]]:
pass
@abstractmethod
async def get_session(
self,
access_token: Optional[str],
anti_csrf_token: Optional[str] = None,
anti_csrf_check: Optional[bool] = None,
session_required: Optional[bool] = None,
check_database: Optional[bool] = None,
override_global_claim_validators: Optional[
Callable[
[List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
MaybeAwaitable[List[SessionClaimValidator]],
]
] = None,
user_context: Optional[Dict[str, Any]] = None,
) -> Optional[SessionContainer]:
pass
@abstractmethod
async def validate_claims(
self,
user_id: str,
recipe_user_id: RecipeUserId,
access_token_payload: Dict[str, Any],
claim_validators: List[SessionClaimValidator],
user_context: Dict[str, Any],
) -> ClaimsValidationResult:
pass
@abstractmethod
async def refresh_session(
self,
refresh_token: str,
anti_csrf_token: Optional[str],
disable_anti_csrf: bool,
user_context: Dict[str, Any],
) -> SessionContainer:
pass
@abstractmethod
async def revoke_session(
self, session_handle: str, user_context: Dict[str, Any]
) -> bool:
pass
@abstractmethod
async def revoke_all_sessions_for_user(
self,
user_id: str,
revoke_sessions_for_linked_accounts: bool,
tenant_id: str,
revoke_across_all_tenants: bool,
user_context: Dict[str, Any],
) -> List[str]:
pass
@abstractmethod
async def get_all_session_handles_for_user(
self,
user_id: str,
fetch_sessions_for_linked_accounts: bool,
tenant_id: str,
fetch_across_all_tenants: bool,
user_context: Dict[str, Any],
) -> List[str]:
pass
@abstractmethod
async def revoke_multiple_sessions(
self, session_handles: List[str], user_context: Dict[str, Any]
) -> List[str]:
pass
@abstractmethod
async def get_session_information(
self, session_handle: str, user_context: Dict[str, Any]
) -> Union[SessionInformationResult, None]:
pass
@abstractmethod
async def update_session_data_in_database(
self,
session_handle: str,
new_session_data: Dict[str, Any],
user_context: Dict[str, Any],
) -> bool:
pass
@abstractmethod
async def merge_into_access_token_payload(
self,
session_handle: str,
access_token_payload_update: JSONObject,
user_context: Dict[str, Any],
) -> bool:
pass
@abstractmethod
async def fetch_and_set_claim(
self,
session_handle: str,
claim: SessionClaim[Any],
user_context: Dict[str, Any],
) -> bool:
pass
@abstractmethod
async def set_claim_value(
self,
session_handle: str,
claim: SessionClaim[_T],
value: _T,
user_context: Dict[str, Any],
) -> bool:
pass
@abstractmethod
async def get_claim_value(
self,
session_handle: str,
claim: SessionClaim[Any],
user_context: Dict[str, Any],
) -> Union[SessionDoesNotExistError, GetClaimValueOkResult[Any]]:
pass
@abstractmethod
async def remove_claim(
self,
session_handle: str,
claim: SessionClaim[Any],
user_context: Dict[str, Any],
) -> bool:
pass
@abstractmethod
async def regenerate_access_token(
self,
access_token: str,
new_access_token_payload: Union[Dict[str, Any], None],
user_context: Dict[str, Any],
) -> Union[RegenerateAccessTokenOkResult, None]:
pass
class SignOutOkayResponse(APIResponse):
def __init__(self):
self.status = "OK"
def to_json(self) -> Dict[str, Any]:
return {"status": self.status}
class APIOptions:
def __init__(
self,
request: BaseRequest,
response: Optional[BaseResponse],
recipe_id: str,
config: SessionConfig,
recipe_implementation: RecipeInterface,
):
self.request = request
self.response = response
self.recipe_id = recipe_id
self.config = config
self.recipe_implementation = recipe_implementation
class APIInterface(ABC):
def __init__(self):
self.disable_refresh_post = False
self.disable_signout_post = False
# We do not add a GeneralErrorResponse response to this API
# since it's not something that is directly called by the user on the
# frontend anyway
@abstractmethod
async def refresh_post(
self, api_options: APIOptions, user_context: Dict[str, Any]
) -> SessionContainer:
pass
@abstractmethod
async def signout_post(
self,
session: SessionContainer,
api_options: APIOptions,
user_context: Dict[str, Any],
) -> Union[SignOutOkayResponse, GeneralErrorResponse]:
pass
@abstractmethod
async def verify_session(
self,
api_options: APIOptions,
anti_csrf_check: Union[bool, None],
session_required: bool,
check_database: bool,
override_global_claim_validators: Optional[
Callable[
[List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
MaybeAwaitable[List[SessionClaimValidator]],
]
],
user_context: Dict[str, Any],
) -> Union[SessionContainer, None]:
pass
ResponseMutator = Callable[[BaseResponse, Dict[str, Any]], None]
class TokenInfo:
def __init__(self, token: str, expiry: int, created_time: int):
self.token = token
self.expiry = expiry
self.created_time = created_time
def to_json(self) -> Dict[str, Any]:
return {
"token": self.token,
"expiry": self.expiry,
"createdTime": self.created_time,
}
class SessionContainer(ABC): # pylint: disable=too-many-public-methods
def __init__(
self,
recipe_implementation: RecipeInterface,
config: SessionConfig,
access_token: str,
front_token: str,
refresh_token: Optional[TokenInfo],
anti_csrf_token: Optional[str],
session_handle: str,
user_id: str,
recipe_user_id: RecipeUserId,
user_data_in_access_token: Optional[Dict[str, Any]],
req_res_info: Optional[ReqResInfo],
access_token_updated: bool,
tenant_id: str,
):
self.recipe_implementation = recipe_implementation
self.config = config
self.access_token = access_token
self.front_token = front_token
self.refresh_token = refresh_token
self.anti_csrf_token = anti_csrf_token
self.session_handle = session_handle
self.user_id = user_id
self.user_data_in_access_token = user_data_in_access_token
self.req_res_info: Optional[ReqResInfo] = req_res_info
self.access_token_updated = access_token_updated
self.tenant_id = tenant_id
self.recipe_user_id = recipe_user_id
self.response_mutators: List[ResponseMutator] = []
@abstractmethod
async def revoke_session(
self, user_context: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def get_session_data_from_database(
self, user_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
pass
@abstractmethod
async def update_session_data_in_database(
self,
new_session_data: Dict[str, Any],
user_context: Optional[Dict[str, Any]] = None,
) -> None:
pass
@abstractmethod
async def attach_to_request_response(
self,
request: BaseRequest,
transfer_method: TokenTransferMethod,
user_context: Dict[str, Any],
):
pass
@abstractmethod
async def merge_into_access_token_payload(
self,
access_token_payload_update: JSONObject,
user_context: Optional[Dict[str, Any]] = None,
) -> None:
pass
@abstractmethod
def get_user_id(self, user_context: Optional[Dict[str, Any]] = None) -> str:
pass
@abstractmethod
def get_recipe_user_id(
self, user_context: Optional[Dict[str, Any]] = None
) -> RecipeUserId:
pass
@abstractmethod
def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) -> str:
pass
@abstractmethod
def get_access_token_payload(
self, user_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
pass
@abstractmethod
def get_handle(self, user_context: Optional[Dict[str, Any]] = None) -> str:
pass
@abstractmethod
def get_all_session_tokens_dangerously(self) -> GetSessionTokensDangerouslyDict:
pass
@abstractmethod
def get_access_token(self, user_context: Optional[Dict[str, Any]] = None) -> str:
pass
@abstractmethod
async def get_time_created(
self, user_context: Optional[Dict[str, Any]] = None
) -> int:
pass
@abstractmethod
async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int:
pass
@abstractmethod
async def assert_claims(
self,
claim_validators: List[SessionClaimValidator],
user_context: Optional[Dict[str, Any]] = None,
) -> None:
pass
@abstractmethod
async def fetch_and_set_claim(
self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def set_claim_value(
self,
claim: SessionClaim[_T],
value: _T,
user_context: Optional[Dict[str, Any]] = None,
) -> None:
pass
@abstractmethod
async def get_claim_value(
self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None
) -> Union[_T, None]:
pass
@abstractmethod
async def remove_claim(
self,
claim: SessionClaim[Any],
user_context: Optional[Dict[str, Any]] = None,
) -> None:
pass
def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int:
return sync(self.get_expiry(user_context))
def sync_revoke_session(
self, user_context: Optional[Dict[str, Any]] = None
) -> None:
return sync(self.revoke_session(user_context=user_context))
def sync_get_session_data_from_database(
self, user_context: Union[Dict[str, Any], None] = None
) -> Dict[str, Any]:
return sync(self.get_session_data_from_database(user_context))
def sync_get_time_created(
self, user_context: Optional[Dict[str, Any]] = None
) -> int:
return sync(self.get_time_created(user_context))
def sync_merge_into_access_token_payload(
self,
access_token_payload_update: Dict[str, Any],
user_context: Optional[Dict[str, Any]] = None,
) -> None:
return sync(
self.merge_into_access_token_payload(
access_token_payload_update, user_context
)
)
def sync_update_session_data_in_database(
self,
new_session_data: Dict[str, Any],
user_context: Optional[Dict[str, Any]] = None,
) -> None:
return sync(
self.update_session_data_in_database(new_session_data, user_context)
)
# Session claims sync functions:
def sync_assert_claims(
self,
claim_validators: List[SessionClaimValidator],
user_context: Optional[Dict[str, Any]] = None,
) -> None:
return sync(self.assert_claims(claim_validators, user_context))
def sync_fetch_and_set_claim(
self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None
) -> None:
return sync(self.fetch_and_set_claim(claim, user_context))
def sync_set_claim_value(
self,
claim: SessionClaim[_T],
value: _T,
user_context: Optional[Dict[str, Any]] = None,
) -> None:
return sync(self.set_claim_value(claim, value, user_context))
def sync_get_claim_value(
self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None
) -> Union[_T, None]:
return sync(self.get_claim_value(claim, user_context))
def sync_remove_claim(
self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None
) -> None:
return sync(self.remove_claim(claim, user_context))
def sync_attach_to_request_response(
self,
request: BaseRequest,
transfer_method: TokenTransferMethod,
user_context: Dict[str, Any],
) -> None:
return sync(
self.attach_to_request_response(request, transfer_method, user_context)
)
# This is there so that we can do session["..."] to access some of the members of this class
def __getitem__(self, item: str):
return getattr(self, item)
class SessionClaim(ABC, Generic[_T]):
def __init__(
self,
key: str,
fetch_value: Callable[
[str, RecipeUserId, str, Dict[str, Any], Dict[str, Any]],
MaybeAwaitable[Optional[_T]],
],
) -> None:
"""
Args:
key: The key to use when storing the claim in the payload.
fetch_value: a method that fetches the current value of this claim for the user.
A None return value signifies that we don't want to update the claim payload and or the claim value is
not present in the database. For example, this can happen with a second factor auth claim, where we
don't want to add the claim to the session automatically
"""
self.key = key
self.fetch_value = fetch_value
@abstractmethod
def add_to_payload_(
self,
payload: JSONObject,
value: _T,
user_context: Union[Dict[str, Any], None] = None,
) -> JSONObject:
"""Saves the provided value into the payload, by cloning and updating the entire object"""
@abstractmethod
def remove_from_payload_by_merge_(
self, payload: JSONObject, user_context: Optional[Dict[str, Any]] = None
) -> JSONObject:
"""Removes the claim from the payload by setting it to None, so merge_into_access_token_payload can clear it"""
@abstractmethod
def remove_from_payload(
self, payload: JSONObject, user_context: Optional[Dict[str, Any]] = None
) -> JSONObject:
"""Removes the claim from the payload, by cloning and updating the entire object."""
@abstractmethod
def get_value_from_payload(
self, payload: JSONObject, user_context: Union[Dict[str, Any], None] = None
) -> Union[_T, None]:
"""Gets the value of the claim stored in the payload"""
async def build(
self,
user_id: str,
recipe_user_id: RecipeUserId,
tenant_id: str,
current_payload: Dict[str, Any],
user_context: Dict[str, Any],
) -> JSONObject:
value = await resolve(
self.fetch_value(
user_id, recipe_user_id, tenant_id, current_payload, user_context
)
)
if value is None:
return {}
return self.add_to_payload_({}, value, user_context)
class ClaimValidationResult:
def __init__(self, is_valid: bool, reason: Optional[Dict[str, Any]] = None):
self.is_valid = is_valid
self.reason = {} if is_valid else reason
class SessionClaimValidator(ABC):
def __init__(
self,
id_: str,
) -> None:
self.id = id_
self.claim: Optional[SessionClaim[Any]] = None
@abstractmethod
async def validate(
self, payload: JSONObject, user_context: Dict[str, Any]
) -> ClaimValidationResult:
pass
def should_refetch(
self, payload: JSONObject, user_context: Dict[str, Any]
) -> MaybeAwaitable[bool]:
raise NotImplementedError()
Classes
class APIInterface
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class APIInterface(ABC): def __init__(self): self.disable_refresh_post = False self.disable_signout_post = False # We do not add a GeneralErrorResponse response to this API # since it's not something that is directly called by the user on the # frontend anyway @abstractmethod async def refresh_post( self, api_options: APIOptions, user_context: Dict[str, Any] ) -> SessionContainer: pass @abstractmethod async def signout_post( self, session: SessionContainer, api_options: APIOptions, user_context: Dict[str, Any], ) -> Union[SignOutOkayResponse, GeneralErrorResponse]: pass @abstractmethod async def verify_session( self, api_options: APIOptions, anti_csrf_check: Union[bool, None], session_required: bool, check_database: bool, override_global_claim_validators: Optional[ Callable[ [List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]], ] ], user_context: Dict[str, Any], ) -> Union[SessionContainer, None]: pass
Ancestors
- abc.ABC
Subclasses
Methods
async def refresh_post(self, api_options: APIOptions, user_context: Dict[str, Any]) ‑> SessionContainer
async def signout_post(self, session: SessionContainer, api_options: APIOptions, user_context: Dict[str, Any]) ‑> Union[SignOutOkayResponse, GeneralErrorResponse]
async def verify_session(self, api_options: APIOptions, anti_csrf_check: Union[bool, None], session_required: bool, check_database: bool, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]], user_context: Dict[str, Any]) ‑> Optional[SessionContainer]
class APIOptions (request: BaseRequest, response: Optional[BaseResponse], recipe_id: str, config: SessionConfig, recipe_implementation: RecipeInterface)
-
Expand source code
class APIOptions: def __init__( self, request: BaseRequest, response: Optional[BaseResponse], recipe_id: str, config: SessionConfig, recipe_implementation: RecipeInterface, ): self.request = request self.response = response self.recipe_id = recipe_id self.config = config self.recipe_implementation = recipe_implementation
class AccessTokenObj (token: str, expiry: int, created_time: int)
-
Expand source code
class AccessTokenObj: def __init__(self, token: str, expiry: int, created_time: int): self.token = token self.expiry = expiry self.created_time = created_time def to_json(self) -> Dict[str, Any]: return { "token": self.token, "expiry": self.expiry, "createdTime": self.created_time, }
Methods
def to_json(self) ‑> Dict[str, Any]
class ClaimValidationResult (is_valid: bool, reason: Optional[Dict[str, Any]] = None)
-
Expand source code
class ClaimValidationResult: def __init__(self, is_valid: bool, reason: Optional[Dict[str, Any]] = None): self.is_valid = is_valid self.reason = {} if is_valid else reason
class ClaimsValidationResult (invalid_claims: List[ClaimValidationError], access_token_payload_update: Optional[Dict[str, Any]] = None)
-
Expand source code
class ClaimsValidationResult: def __init__( self, invalid_claims: List[ClaimValidationError], access_token_payload_update: Optional[Dict[str, Any]] = None, ): self.invalid_claims = invalid_claims self.access_token_payload_update = access_token_payload_update def to_json(self) -> Dict[str, Any]: return { "status": "OK", "invalidClaims": [i.to_json() for i in self.invalid_claims], "accessTokenPayloadUpdate": self.access_token_payload_update, }
Methods
def to_json(self) ‑> Dict[str, Any]
class GetClaimValueOkResult (value: Optional[_T])
-
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Expand source code
class GetClaimValueOkResult(Generic[_T]): def __init__(self, value: Optional[_T]): self.value = value
Ancestors
- typing.Generic
class GetSessionTokensDangerouslyDict (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Expand source code
class GetSessionTokensDangerouslyDict(TypedDict): accessToken: str accessAndFrontTokenUpdated: bool refreshToken: Optional[str] frontToken: str antiCsrfToken: Optional[str]
Ancestors
- builtins.dict
Class variables
var accessAndFrontTokenUpdated : bool
var accessToken : str
var antiCsrfToken : Optional[str]
var frontToken : str
var refreshToken : Optional[str]
class RecipeInterface
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class RecipeInterface(ABC): # pylint: disable=too-many-public-methods def __init__(self): pass @abstractmethod async def create_new_session( self, user_id: str, recipe_user_id: RecipeUserId, access_token_payload: Optional[Dict[str, Any]], session_data_in_database: Optional[Dict[str, Any]], disable_anti_csrf: Optional[bool], tenant_id: str, user_context: Dict[str, Any], ) -> SessionContainer: pass @abstractmethod def get_global_claim_validators( self, tenant_id: str, user_id: str, recipe_user_id: RecipeUserId, claim_validators_added_by_other_recipes: List[SessionClaimValidator], user_context: Dict[str, Any], ) -> MaybeAwaitable[List[SessionClaimValidator]]: pass @abstractmethod async def get_session( self, access_token: Optional[str], anti_csrf_token: Optional[str] = None, anti_csrf_check: Optional[bool] = None, session_required: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[ Callable[ [List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]], ] ] = None, user_context: Optional[Dict[str, Any]] = None, ) -> Optional[SessionContainer]: pass @abstractmethod async def validate_claims( self, user_id: str, recipe_user_id: RecipeUserId, access_token_payload: Dict[str, Any], claim_validators: List[SessionClaimValidator], user_context: Dict[str, Any], ) -> ClaimsValidationResult: pass @abstractmethod async def refresh_session( self, refresh_token: str, anti_csrf_token: Optional[str], disable_anti_csrf: bool, user_context: Dict[str, Any], ) -> SessionContainer: pass @abstractmethod async def revoke_session( self, session_handle: str, user_context: Dict[str, Any] ) -> bool: pass @abstractmethod async def revoke_all_sessions_for_user( self, user_id: str, revoke_sessions_for_linked_accounts: bool, tenant_id: str, revoke_across_all_tenants: bool, user_context: Dict[str, Any], ) -> List[str]: pass @abstractmethod async def get_all_session_handles_for_user( self, user_id: str, fetch_sessions_for_linked_accounts: bool, tenant_id: str, fetch_across_all_tenants: bool, user_context: Dict[str, Any], ) -> List[str]: pass @abstractmethod async def revoke_multiple_sessions( self, session_handles: List[str], user_context: Dict[str, Any] ) -> List[str]: pass @abstractmethod async def get_session_information( self, session_handle: str, user_context: Dict[str, Any] ) -> Union[SessionInformationResult, None]: pass @abstractmethod async def update_session_data_in_database( self, session_handle: str, new_session_data: Dict[str, Any], user_context: Dict[str, Any], ) -> bool: pass @abstractmethod async def merge_into_access_token_payload( self, session_handle: str, access_token_payload_update: JSONObject, user_context: Dict[str, Any], ) -> bool: pass @abstractmethod async def fetch_and_set_claim( self, session_handle: str, claim: SessionClaim[Any], user_context: Dict[str, Any], ) -> bool: pass @abstractmethod async def set_claim_value( self, session_handle: str, claim: SessionClaim[_T], value: _T, user_context: Dict[str, Any], ) -> bool: pass @abstractmethod async def get_claim_value( self, session_handle: str, claim: SessionClaim[Any], user_context: Dict[str, Any], ) -> Union[SessionDoesNotExistError, GetClaimValueOkResult[Any]]: pass @abstractmethod async def remove_claim( self, session_handle: str, claim: SessionClaim[Any], user_context: Dict[str, Any], ) -> bool: pass @abstractmethod async def regenerate_access_token( self, access_token: str, new_access_token_payload: Union[Dict[str, Any], None], user_context: Dict[str, Any], ) -> Union[RegenerateAccessTokenOkResult, None]: pass
Ancestors
- abc.ABC
Subclasses
Methods
async def create_new_session(self, user_id: str, recipe_user_id: RecipeUserId, access_token_payload: Optional[Dict[str, Any]], session_data_in_database: Optional[Dict[str, Any]], disable_anti_csrf: Optional[bool], tenant_id: str, user_context: Dict[str, Any]) ‑> SessionContainer
async def fetch_and_set_claim(self, session_handle: str, claim: SessionClaim[Any], user_context: Dict[str, Any]) ‑> bool
async def get_all_session_handles_for_user(self, user_id: str, fetch_sessions_for_linked_accounts: bool, tenant_id: str, fetch_across_all_tenants: bool, user_context: Dict[str, Any]) ‑> List[str]
async def get_claim_value(self, session_handle: str, claim: SessionClaim[Any], user_context: Dict[str, Any]) ‑> Union[SessionDoesNotExistError, GetClaimValueOkResult[Any]]
def get_global_claim_validators(self, tenant_id: str, user_id: str, recipe_user_id: RecipeUserId, claim_validators_added_by_other_recipes: List[SessionClaimValidator], user_context: Dict[str, Any]) ‑> Union[Awaitable[List[SessionClaimValidator]], List[SessionClaimValidator]]
async def get_session(self, access_token: Optional[str], anti_csrf_token: Optional[str] = None, anti_csrf_check: Optional[bool] = None, session_required: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]] = None, user_context: Optional[Dict[str, Any]] = None) ‑> Optional[SessionContainer]
async def get_session_information(self, session_handle: str, user_context: Dict[str, Any]) ‑> Optional[SessionInformationResult]
async def merge_into_access_token_payload(self, session_handle: str, access_token_payload_update: JSONObject, user_context: Dict[str, Any]) ‑> bool
async def refresh_session(self, refresh_token: str, anti_csrf_token: Optional[str], disable_anti_csrf: bool, user_context: Dict[str, Any]) ‑> SessionContainer
async def regenerate_access_token(self, access_token: str, new_access_token_payload: Union[Dict[str, Any], None], user_context: Dict[str, Any]) ‑> Optional[RegenerateAccessTokenOkResult]
async def remove_claim(self, session_handle: str, claim: SessionClaim[Any], user_context: Dict[str, Any]) ‑> bool
async def revoke_all_sessions_for_user(self, user_id: str, revoke_sessions_for_linked_accounts: bool, tenant_id: str, revoke_across_all_tenants: bool, user_context: Dict[str, Any]) ‑> List[str]
async def revoke_multiple_sessions(self, session_handles: List[str], user_context: Dict[str, Any]) ‑> List[str]
async def revoke_session(self, session_handle: str, user_context: Dict[str, Any]) ‑> bool
async def set_claim_value(self, session_handle: str, claim: SessionClaim[_T], value: _T, user_context: Dict[str, Any]) ‑> bool
async def update_session_data_in_database(self, session_handle: str, new_session_data: Dict[str, Any], user_context: Dict[str, Any]) ‑> bool
async def validate_claims(self, user_id: str, recipe_user_id: RecipeUserId, access_token_payload: Dict[str, Any], claim_validators: List[SessionClaimValidator], user_context: Dict[str, Any]) ‑> ClaimsValidationResult
class RegenerateAccessTokenOkResult (session: SessionObj, access_token: Union[AccessTokenObj, None])
-
Expand source code
class RegenerateAccessTokenOkResult: def __init__(self, session: SessionObj, access_token: Union[AccessTokenObj, None]): self.session = session self.access_token = access_token def to_json(self) -> Dict[str, Any]: return { "session": self.session.to_json(), "accessToken": ( self.access_token.to_json() if self.access_token is not None else None ), }
Methods
def to_json(self) ‑> Dict[str, Any]
class ReqResInfo (request: BaseRequest, transfer_method: TokenTransferMethod)
-
Expand source code
class ReqResInfo: def __init__( self, request: BaseRequest, transfer_method: TokenTransferMethod, ): self.request = request self.transfer_method = transfer_method
class SessionClaim (key: str, fetch_value: Callable[[str, RecipeUserId, str, Dict[str, Any], Dict[str, Any]], MaybeAwaitable[Optional[_T]]])
-
Helper class that provides a standard way to create an ABC using inheritance.
Args
key
- The key to use when storing the claim in the payload.
fetch_value
- a method that fetches the current value of this claim for the user. A None return value signifies that we don't want to update the claim payload and or the claim value is not present in the database. For example, this can happen with a second factor auth claim, where we don't want to add the claim to the session automatically
Expand source code
class SessionClaim(ABC, Generic[_T]): def __init__( self, key: str, fetch_value: Callable[ [str, RecipeUserId, str, Dict[str, Any], Dict[str, Any]], MaybeAwaitable[Optional[_T]], ], ) -> None: """ Args: key: The key to use when storing the claim in the payload. fetch_value: a method that fetches the current value of this claim for the user. A None return value signifies that we don't want to update the claim payload and or the claim value is not present in the database. For example, this can happen with a second factor auth claim, where we don't want to add the claim to the session automatically """ self.key = key self.fetch_value = fetch_value @abstractmethod def add_to_payload_( self, payload: JSONObject, value: _T, user_context: Union[Dict[str, Any], None] = None, ) -> JSONObject: """Saves the provided value into the payload, by cloning and updating the entire object""" @abstractmethod def remove_from_payload_by_merge_( self, payload: JSONObject, user_context: Optional[Dict[str, Any]] = None ) -> JSONObject: """Removes the claim from the payload by setting it to None, so merge_into_access_token_payload can clear it""" @abstractmethod def remove_from_payload( self, payload: JSONObject, user_context: Optional[Dict[str, Any]] = None ) -> JSONObject: """Removes the claim from the payload, by cloning and updating the entire object.""" @abstractmethod def get_value_from_payload( self, payload: JSONObject, user_context: Union[Dict[str, Any], None] = None ) -> Union[_T, None]: """Gets the value of the claim stored in the payload""" async def build( self, user_id: str, recipe_user_id: RecipeUserId, tenant_id: str, current_payload: Dict[str, Any], user_context: Dict[str, Any], ) -> JSONObject: value = await resolve( self.fetch_value( user_id, recipe_user_id, tenant_id, current_payload, user_context ) ) if value is None: return {} return self.add_to_payload_({}, value, user_context)
Ancestors
- abc.ABC
- typing.Generic
Subclasses
Methods
def add_to_payload_(self, payload: JSONObject, value: _T, user_context: Union[Dict[str, Any], None] = None) ‑> Dict[str, Any]
-
Saves the provided value into the payload, by cloning and updating the entire object
async def build(self, user_id: str, recipe_user_id: RecipeUserId, tenant_id: str, current_payload: Dict[str, Any], user_context: Dict[str, Any]) ‑> Dict[str, Any]
def get_value_from_payload(self, payload: JSONObject, user_context: Union[Dict[str, Any], None] = None) ‑> Optional[~_T]
-
Gets the value of the claim stored in the payload
def remove_from_payload(self, payload: JSONObject, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
-
Removes the claim from the payload, by cloning and updating the entire object.
def remove_from_payload_by_merge_(self, payload: JSONObject, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
-
Removes the claim from the payload by setting it to None, so merge_into_access_token_payload can clear it
class SessionClaimValidator (id_: str)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SessionClaimValidator(ABC): def __init__( self, id_: str, ) -> None: self.id = id_ self.claim: Optional[SessionClaim[Any]] = None @abstractmethod async def validate( self, payload: JSONObject, user_context: Dict[str, Any] ) -> ClaimValidationResult: pass def should_refetch( self, payload: JSONObject, user_context: Dict[str, Any] ) -> MaybeAwaitable[bool]: raise NotImplementedError()
Ancestors
- abc.ABC
Subclasses
- IsVerifiedSCV
- HasCompletedMFARequirementsForAuthSCV
- HasCompletedRequirementListSCV
- Validator
- SCVMixin
- HasValueSCV
Methods
def should_refetch(self, payload: JSONObject, user_context: Dict[str, Any]) ‑> Union[Awaitable[bool], bool]
async def validate(self, payload: JSONObject, user_context: Dict[str, Any]) ‑> ClaimValidationResult
class SessionContainer (recipe_implementation: RecipeInterface, config: SessionConfig, access_token: str, front_token: str, refresh_token: Optional[TokenInfo], anti_csrf_token: Optional[str], session_handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_access_token: Optional[Dict[str, Any]], req_res_info: Optional[ReqResInfo], access_token_updated: bool, tenant_id: str)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SessionContainer(ABC): # pylint: disable=too-many-public-methods def __init__( self, recipe_implementation: RecipeInterface, config: SessionConfig, access_token: str, front_token: str, refresh_token: Optional[TokenInfo], anti_csrf_token: Optional[str], session_handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_access_token: Optional[Dict[str, Any]], req_res_info: Optional[ReqResInfo], access_token_updated: bool, tenant_id: str, ): self.recipe_implementation = recipe_implementation self.config = config self.access_token = access_token self.front_token = front_token self.refresh_token = refresh_token self.anti_csrf_token = anti_csrf_token self.session_handle = session_handle self.user_id = user_id self.user_data_in_access_token = user_data_in_access_token self.req_res_info: Optional[ReqResInfo] = req_res_info self.access_token_updated = access_token_updated self.tenant_id = tenant_id self.recipe_user_id = recipe_user_id self.response_mutators: List[ResponseMutator] = [] @abstractmethod async def revoke_session( self, user_context: Optional[Dict[str, Any]] = None ) -> None: pass @abstractmethod async def get_session_data_from_database( self, user_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: pass @abstractmethod async def update_session_data_in_database( self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod async def attach_to_request_response( self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any], ): pass @abstractmethod async def merge_into_access_token_payload( self, access_token_payload_update: JSONObject, user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod def get_user_id(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod def get_recipe_user_id( self, user_context: Optional[Dict[str, Any]] = None ) -> RecipeUserId: pass @abstractmethod def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod def get_access_token_payload( self, user_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: pass @abstractmethod def get_handle(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod def get_all_session_tokens_dangerously(self) -> GetSessionTokensDangerouslyDict: pass @abstractmethod def get_access_token(self, user_context: Optional[Dict[str, Any]] = None) -> str: pass @abstractmethod async def get_time_created( self, user_context: Optional[Dict[str, Any]] = None ) -> int: pass @abstractmethod async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int: pass @abstractmethod async def assert_claims( self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod async def fetch_and_set_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None ) -> None: pass @abstractmethod async def set_claim_value( self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None, ) -> None: pass @abstractmethod async def get_claim_value( self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None ) -> Union[_T, None]: pass @abstractmethod async def remove_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: pass def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) -> int: return sync(self.get_expiry(user_context)) def sync_revoke_session( self, user_context: Optional[Dict[str, Any]] = None ) -> None: return sync(self.revoke_session(user_context=user_context)) def sync_get_session_data_from_database( self, user_context: Union[Dict[str, Any], None] = None ) -> Dict[str, Any]: return sync(self.get_session_data_from_database(user_context)) def sync_get_time_created( self, user_context: Optional[Dict[str, Any]] = None ) -> int: return sync(self.get_time_created(user_context)) def sync_merge_into_access_token_payload( self, access_token_payload_update: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync( self.merge_into_access_token_payload( access_token_payload_update, user_context ) ) def sync_update_session_data_in_database( self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync( self.update_session_data_in_database(new_session_data, user_context) ) # Session claims sync functions: def sync_assert_claims( self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync(self.assert_claims(claim_validators, user_context)) def sync_fetch_and_set_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None ) -> None: return sync(self.fetch_and_set_claim(claim, user_context)) def sync_set_claim_value( self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None, ) -> None: return sync(self.set_claim_value(claim, value, user_context)) def sync_get_claim_value( self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None ) -> Union[_T, None]: return sync(self.get_claim_value(claim, user_context)) def sync_remove_claim( self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None ) -> None: return sync(self.remove_claim(claim, user_context)) def sync_attach_to_request_response( self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any], ) -> None: return sync( self.attach_to_request_response(request, transfer_method, user_context) ) # This is there so that we can do session["..."] to access some of the members of this class def __getitem__(self, item: str): return getattr(self, item)
Ancestors
- abc.ABC
Subclasses
Methods
async def assert_claims(self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None) ‑> None
async def attach_to_request_response(self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any])
async def fetch_and_set_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def get_access_token(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
def get_access_token_payload(self, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
def get_all_session_tokens_dangerously(self) ‑> GetSessionTokensDangerouslyDict
async def get_claim_value(self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None) ‑> Optional[~_T]
async def get_expiry(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def get_handle(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
def get_recipe_user_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> RecipeUserId
async def get_session_data_from_database(self, user_context: Optional[Dict[str, Any]] = None) ‑> Dict[str, Any]
def get_tenant_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
async def get_time_created(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def get_user_id(self, user_context: Optional[Dict[str, Any]] = None) ‑> str
async def merge_into_access_token_payload(self, access_token_payload_update: JSONObject, user_context: Optional[Dict[str, Any]] = None) ‑> None
async def remove_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
async def revoke_session(self, user_context: Optional[Dict[str, Any]] = None) ‑> None
async def set_claim_value(self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_assert_claims(self, claim_validators: List[SessionClaimValidator], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_attach_to_request_response(self, request: BaseRequest, transfer_method: TokenTransferMethod, user_context: Dict[str, Any])
def sync_fetch_and_set_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_get_claim_value(self, claim: SessionClaim[_T], user_context: Optional[Dict[str, Any]] = None) ‑> Optional[~_T]
def sync_get_expiry(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def sync_get_session_data_from_database(self, user_context: Union[Dict[str, Any], None] = None) ‑> Dict[str, Any]
def sync_get_time_created(self, user_context: Optional[Dict[str, Any]] = None) ‑> int
def sync_merge_into_access_token_payload(self, access_token_payload_update: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_remove_claim(self, claim: SessionClaim[Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_revoke_session(self, user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_set_claim_value(self, claim: SessionClaim[_T], value: _T, user_context: Optional[Dict[str, Any]] = None) ‑> None
def sync_update_session_data_in_database(self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
async def update_session_data_in_database(self, new_session_data: Dict[str, Any], user_context: Optional[Dict[str, Any]] = None) ‑> None
class SessionDoesNotExistError
-
Expand source code
class SessionDoesNotExistError: pass
class SessionInformationResult (session_handle: str, user_id: str, recipe_user_id: RecipeUserId, session_data_in_database: Dict[str, Any], expiry: int, custom_claims_in_access_token_payload: Dict[str, Any], time_created: int, tenant_id: str)
-
Expand source code
class SessionInformationResult: def __init__( self, session_handle: str, user_id: str, recipe_user_id: RecipeUserId, session_data_in_database: Dict[str, Any], expiry: int, custom_claims_in_access_token_payload: Dict[str, Any], time_created: int, tenant_id: str, ): self.session_handle = session_handle self.user_id = user_id self.session_data_in_database = session_data_in_database self.expiry = expiry self.custom_claims_in_access_token_payload = ( custom_claims_in_access_token_payload ) self.time_created = time_created self.tenant_id = tenant_id self.recipe_user_id = recipe_user_id def to_json(self) -> Dict[str, Any]: return { "sessionHandle": self.session_handle, "userId": self.user_id, "recipeUserId": self.recipe_user_id.get_as_string(), "sessionDataInDatabase": self.session_data_in_database, "expiry": self.expiry, "customClaimsInAccessTokenPayload": self.custom_claims_in_access_token_payload, "timeCreated": self.time_created, "tenantId": self.tenant_id, }
Methods
def to_json(self) ‑> Dict[str, Any]
class SessionObj (handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_jwt: Dict[str, Any], tenant_id: str)
-
Expand source code
class SessionObj: def __init__( self, handle: str, user_id: str, recipe_user_id: RecipeUserId, user_data_in_jwt: Dict[str, Any], tenant_id: str, ): self.handle = handle self.user_id = user_id self.user_data_in_jwt = user_data_in_jwt self.tenant_id = tenant_id self.recipe_user_id = recipe_user_id def to_json(self) -> Dict[str, Any]: return { "handle": self.handle, "userId": self.user_id, "recipeUserId": self.recipe_user_id.get_as_string(), "tenantId": self.tenant_id, "userDataInJWT": self.user_data_in_jwt, }
Methods
def to_json(self) ‑> Dict[str, Any]
class SignOutOkayResponse
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SignOutOkayResponse(APIResponse): def __init__(self): self.status = "OK" def to_json(self) -> Dict[str, Any]: return {"status": self.status}
Ancestors
- APIResponse
- abc.ABC
Methods
def to_json(self) ‑> Dict[str, Any]
class TokenInfo (token: str, expiry: int, created_time: int)
-
Expand source code
class TokenInfo: def __init__(self, token: str, expiry: int, created_time: int): self.token = token self.expiry = expiry self.created_time = created_time def to_json(self) -> Dict[str, Any]: return { "token": self.token, "expiry": self.expiry, "createdTime": self.created_time, }
Methods
def to_json(self) ‑> Dict[str, Any]