Module supertokens_python.recipe.session.recipe_implementation

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict

from supertokens_python.logger import log_debug_message
from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.process_state import AllowedProcessStates, ProcessState
from supertokens_python.utils import (FRAMEWORKS, execute_async,
                                      frontend_has_interceptor,
                                      get_timestamp_ms, normalise_http_method)

from . import session_functions
from .cookie_and_header import (get_access_token_from_cookie,
                                get_anti_csrf_header,
                                get_id_refresh_token_from_cookie,
                                get_refresh_token_from_cookie, get_rid_header)
from .exceptions import (raise_try_refresh_token_exception,
                         raise_unauthorised_exception)
from .interfaces import (AccessTokenObj, RecipeInterface,
                         RegenerateAccessTokenOkResult,
                         SessionInformationResult, SessionObj)
from .session_class import Session

if TYPE_CHECKING:
    from typing import List, Union

    from supertokens_python.querier import Querier

    from .interfaces import RegenerateAccessTokenResult
    from .utils import SessionConfig

from .interfaces import SessionContainer


class HandshakeInfo:

    def __init__(self, info: Dict[str, Any]):
        self.access_token_blacklisting_enabled = info['accessTokenBlacklistingEnabled']
        self.raw_jwt_signing_public_key_list: List[Dict[str, Any]] = []
        self.anti_csrf = info['antiCsrf']
        self.access_token_validity = info['accessTokenValidity']
        self.refresh_token_validity = info['refreshTokenValidity']

    def set_jwt_signing_public_key_list(self, updated_list: List[Dict[str, Any]]):
        self.raw_jwt_signing_public_key_list = updated_list

    def get_jwt_signing_public_key_list(self) -> List[Dict[str, Any]]:
        time_now = get_timestamp_ms()
        return [
            key for key in self.raw_jwt_signing_public_key_list if key['expiryTime'] > time_now]


class RecipeImplementation(RecipeInterface):
    def __init__(self, querier: Querier, config: SessionConfig):
        super().__init__()
        self.querier = querier
        self.config = config
        self.handshake_info: Union[HandshakeInfo, None] = None

        async def call_get_handshake_info():
            try:
                await self.get_handshake_info()
            except Exception:
                pass

        try:
            execute_async(config.mode, call_get_handshake_info)
        except Exception:
            pass

    async def get_handshake_info(self, force_refetch: bool = False) -> HandshakeInfo:
        if self.handshake_info is None or len(
                self.handshake_info.get_jwt_signing_public_key_list()) == 0 or force_refetch:
            ProcessState.get_instance().add_state(
                AllowedProcessStates.CALLING_SERVICE_IN_GET_HANDSHAKE_INFO)
            response = await self.querier.send_post_request(NormalisedURLPath('/recipe/handshake'), {})
            self.handshake_info = HandshakeInfo({
                **response,
                'antiCsrf': self.config.anti_csrf
            })

            self.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'],
                                                    response['jwtSigningPublicKey'],
                                                    response['jwtSigningPublicKeyExpiryTime'])

        return self.handshake_info

    def update_jwt_signing_public_key_info(
            self, key_list: Union[List[Dict[str, Any]], None], public_key: str, expiry_time: int):
        if key_list is None:
            key_list = [{
                'publicKey': public_key,
                'expiryTime': expiry_time,
                'createdAt': get_timestamp_ms()
            }]

        if self.handshake_info is not None:
            self.handshake_info.set_jwt_signing_public_key_list(key_list)

    async def create_new_session(self, request: Any, user_id: str,
                                 access_token_payload: Union[None, Dict[str, Any]],
                                 session_data: Union[None, Dict[str, Any]], user_context: Dict[str, Any]) -> SessionContainer:
        if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
            request = FRAMEWORKS[self.config.framework].wrap_request(request)
        session = await session_functions.create_new_session(self, user_id, access_token_payload, session_data)
        access_token = session['accessToken']
        refresh_token = session['refreshToken']
        id_refresh_token = session['idRefreshToken']
        new_session = Session(self, access_token['token'], session['session']['handle'],
                              session['session']['userId'], session['session']['userDataInJWT'])
        new_session.new_access_token_info = access_token
        new_session.new_refresh_token_info = refresh_token
        new_session.new_id_refresh_token_info = id_refresh_token
        if 'antiCsrfToken' in session and session['antiCsrfToken'] is not None:
            new_session.new_anti_csrf_token = session['antiCsrfToken']
        request.set_session(new_session)
        return request.get_session()

    async def get_session(self, request: Any, anti_csrf_check: Union[bool, None],
                          session_required: bool, user_context: Dict[str, Any]) -> Union[SessionContainer, None]:
        log_debug_message("getSession: Started")
        if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
            request = FRAMEWORKS[self.config.framework].wrap_request(request)

        log_debug_message("getSession: rid in header: %s", str(frontend_has_interceptor(request)))
        log_debug_message("getSession: request method: %s", request.method())

        id_refresh_token = get_id_refresh_token_from_cookie(request)
        if id_refresh_token is None:
            if not session_required:
                log_debug_message("getSession: returning None because idRefreshToken is undefined and session_required is false")
                return None
            log_debug_message("getSession: UNAUTHORISED because idRefreshToken from cookies is undefined")
            raise_unauthorised_exception('Session does not exist. Are you sending the session tokens in the request as cookies?', False)
        access_token: Union[str, None] = get_access_token_from_cookie(request)
        if access_token is None:
            if session_required is True or frontend_has_interceptor(request) or normalise_http_method(
                    request.method()) == 'get':
                log_debug_message(
                    "getSession: Returning try refresh token because access token from cookies is undefined"
                )
                raise_try_refresh_token_exception(
                    'Access token has expired. Please call the refresh API')
            return None
        anti_csrf_token = get_anti_csrf_header(request)
        if anti_csrf_check is None:
            anti_csrf_check = normalise_http_method(request.method()) != 'get'

        log_debug_message("getSession: Value of doAntiCsrfCheck is: %s", str(anti_csrf_check))
        new_session = await session_functions.get_session(self, access_token, anti_csrf_token, anti_csrf_check,
                                                          get_rid_header(request) is not None)
        if 'accessToken' in new_session:
            access_token = new_session['accessToken']['token']

        if access_token is None:
            raise Exception("Should never come here")
        session = Session(self, access_token, new_session['session']['handle'],
                          new_session['session']['userId'], new_session['session']['userDataInJWT'])

        if 'accessToken' in new_session:
            session.new_access_token_info = new_session['accessToken']

        log_debug_message("getSession: Success!")
        request.set_session(session)
        return request.get_session()

    async def refresh_session(self, request: Any, user_context: Dict[str, Any]) -> SessionContainer:
        log_debug_message("refreshSession: Started")
        if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
            request = FRAMEWORKS[self.config.framework].wrap_request(request)

        id_refresh_token = get_id_refresh_token_from_cookie(request)
        if id_refresh_token is None:
            log_debug_message("refreshSession: UNAUTHORISED because idRefreshToken from cookies is undefined")
            raise_unauthorised_exception('Session does not exist. Are you sending the session tokens in the request '
                                         'as cookies?', False)
        refresh_token = get_refresh_token_from_cookie(request)
        if refresh_token is None:
            log_debug_message("refreshSession: UNAUTHORISED because refresh token from cookies is undefined")
            raise_unauthorised_exception('Refresh token not found. Are you sending the refresh token in the '
                                         'request as a cookie?')
        anti_csrf_token = get_anti_csrf_header(request)
        new_session = await session_functions.refresh_session(self, refresh_token, anti_csrf_token,
                                                              get_rid_header(request) is not None)
        access_token = new_session['accessToken']
        refresh_token = new_session['refreshToken']
        id_refresh_token = new_session['idRefreshToken']
        session = Session(self, access_token['token'], new_session['session']['handle'],
                          new_session['session']['userId'], new_session['session']['userDataInJWT'])
        session.new_access_token_info = access_token
        session.new_refresh_token_info = refresh_token
        session.new_id_refresh_token_info = id_refresh_token
        if 'antiCsrfToken' in new_session and new_session['antiCsrfToken'] is not None:
            session.new_anti_csrf_token = new_session['antiCsrfToken']

        log_debug_message("refreshSession: Success!")
        request.set_session(session)
        return request.get_session()

    async def revoke_session(self, session_handle: str, user_context: Dict[str, Any]) -> bool:
        return await session_functions.revoke_session(self, session_handle)

    async def revoke_all_sessions_for_user(self, user_id: str, user_context: Dict[str, Any]) -> List[str]:
        return await session_functions.revoke_all_sessions_for_user(self, user_id)

    async def get_all_session_handles_for_user(self, user_id: str, user_context: Dict[str, Any]) -> List[str]:
        return await session_functions.get_all_session_handles_for_user(self, user_id)

    async def revoke_multiple_sessions(self, session_handles: List[str], user_context: Dict[str, Any]) -> List[str]:
        return await session_functions.revoke_multiple_sessions(self, session_handles)

    async def get_session_information(self, session_handle: str, user_context: Dict[str, Any]) -> SessionInformationResult:
        return await session_functions.get_session_information(self, session_handle)

    async def update_session_data(self, session_handle: str, new_session_data: Dict[str, Any], user_context: Dict[str, Any]) -> None:
        await session_functions.update_session_data(self, session_handle, new_session_data)

    async def update_access_token_payload(self, session_handle: str, new_access_token_payload: Dict[str, Any], user_context: Dict[str, Any]) -> None:
        await session_functions.update_access_token_payload(self, session_handle, new_access_token_payload)

    async def get_access_token_lifetime_ms(self, user_context: Dict[str, Any]) -> int:
        return (await self.get_handshake_info()).access_token_validity

    async def get_refresh_token_lifetime_ms(self, user_context: Dict[str, Any]) -> int:
        return (await self.get_handshake_info()).refresh_token_validity

    async def regenerate_access_token(self,
                                      access_token: str,
                                      new_access_token_payload: Union[Dict[str, Any], None], user_context: Dict[str, Any]) -> RegenerateAccessTokenResult:
        if new_access_token_payload is None:
            new_access_token_payload = {}
        response: Dict[str, Any] = await self.querier.send_post_request(NormalisedURLPath("/recipe/session/regenerate"), {
            'accessToken': access_token,
            'userDataInJWT': new_access_token_payload
        })
        if response['status'] == 'UNAUTHORISED':
            raise_unauthorised_exception(response['message'])
        access_token_obj: Union[None, AccessTokenObj] = None
        if 'accessToken' in response:
            access_token_obj = AccessTokenObj(
                response['accessToken']['token'],
                response['accessToken']['expiry'],
                response['accessToken']['createdTime']
            )
        session = SessionObj(
            response['session']['handle'],
            response['session']['userId'],
            response['session']['userDataInJWT']
        )
        return RegenerateAccessTokenOkResult(session, access_token_obj)

Classes

class HandshakeInfo (info: Dict[str, Any])
Expand source code
class HandshakeInfo:

    def __init__(self, info: Dict[str, Any]):
        self.access_token_blacklisting_enabled = info['accessTokenBlacklistingEnabled']
        self.raw_jwt_signing_public_key_list: List[Dict[str, Any]] = []
        self.anti_csrf = info['antiCsrf']
        self.access_token_validity = info['accessTokenValidity']
        self.refresh_token_validity = info['refreshTokenValidity']

    def set_jwt_signing_public_key_list(self, updated_list: List[Dict[str, Any]]):
        self.raw_jwt_signing_public_key_list = updated_list

    def get_jwt_signing_public_key_list(self) -> List[Dict[str, Any]]:
        time_now = get_timestamp_ms()
        return [
            key for key in self.raw_jwt_signing_public_key_list if key['expiryTime'] > time_now]

Methods

def get_jwt_signing_public_key_list(self) ‑> List[Dict[str, Any]]
Expand source code
def get_jwt_signing_public_key_list(self) -> List[Dict[str, Any]]:
    time_now = get_timestamp_ms()
    return [
        key for key in self.raw_jwt_signing_public_key_list if key['expiryTime'] > time_now]
def set_jwt_signing_public_key_list(self, updated_list: List[Dict[str, Any]])
Expand source code
def set_jwt_signing_public_key_list(self, updated_list: List[Dict[str, Any]]):
    self.raw_jwt_signing_public_key_list = updated_list
class RecipeImplementation (querier: Querier, config: SessionConfig)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class RecipeImplementation(RecipeInterface):
    def __init__(self, querier: Querier, config: SessionConfig):
        super().__init__()
        self.querier = querier
        self.config = config
        self.handshake_info: Union[HandshakeInfo, None] = None

        async def call_get_handshake_info():
            try:
                await self.get_handshake_info()
            except Exception:
                pass

        try:
            execute_async(config.mode, call_get_handshake_info)
        except Exception:
            pass

    async def get_handshake_info(self, force_refetch: bool = False) -> HandshakeInfo:
        if self.handshake_info is None or len(
                self.handshake_info.get_jwt_signing_public_key_list()) == 0 or force_refetch:
            ProcessState.get_instance().add_state(
                AllowedProcessStates.CALLING_SERVICE_IN_GET_HANDSHAKE_INFO)
            response = await self.querier.send_post_request(NormalisedURLPath('/recipe/handshake'), {})
            self.handshake_info = HandshakeInfo({
                **response,
                'antiCsrf': self.config.anti_csrf
            })

            self.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'],
                                                    response['jwtSigningPublicKey'],
                                                    response['jwtSigningPublicKeyExpiryTime'])

        return self.handshake_info

    def update_jwt_signing_public_key_info(
            self, key_list: Union[List[Dict[str, Any]], None], public_key: str, expiry_time: int):
        if key_list is None:
            key_list = [{
                'publicKey': public_key,
                'expiryTime': expiry_time,
                'createdAt': get_timestamp_ms()
            }]

        if self.handshake_info is not None:
            self.handshake_info.set_jwt_signing_public_key_list(key_list)

    async def create_new_session(self, request: Any, user_id: str,
                                 access_token_payload: Union[None, Dict[str, Any]],
                                 session_data: Union[None, Dict[str, Any]], user_context: Dict[str, Any]) -> SessionContainer:
        if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
            request = FRAMEWORKS[self.config.framework].wrap_request(request)
        session = await session_functions.create_new_session(self, user_id, access_token_payload, session_data)
        access_token = session['accessToken']
        refresh_token = session['refreshToken']
        id_refresh_token = session['idRefreshToken']
        new_session = Session(self, access_token['token'], session['session']['handle'],
                              session['session']['userId'], session['session']['userDataInJWT'])
        new_session.new_access_token_info = access_token
        new_session.new_refresh_token_info = refresh_token
        new_session.new_id_refresh_token_info = id_refresh_token
        if 'antiCsrfToken' in session and session['antiCsrfToken'] is not None:
            new_session.new_anti_csrf_token = session['antiCsrfToken']
        request.set_session(new_session)
        return request.get_session()

    async def get_session(self, request: Any, anti_csrf_check: Union[bool, None],
                          session_required: bool, user_context: Dict[str, Any]) -> Union[SessionContainer, None]:
        log_debug_message("getSession: Started")
        if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
            request = FRAMEWORKS[self.config.framework].wrap_request(request)

        log_debug_message("getSession: rid in header: %s", str(frontend_has_interceptor(request)))
        log_debug_message("getSession: request method: %s", request.method())

        id_refresh_token = get_id_refresh_token_from_cookie(request)
        if id_refresh_token is None:
            if not session_required:
                log_debug_message("getSession: returning None because idRefreshToken is undefined and session_required is false")
                return None
            log_debug_message("getSession: UNAUTHORISED because idRefreshToken from cookies is undefined")
            raise_unauthorised_exception('Session does not exist. Are you sending the session tokens in the request as cookies?', False)
        access_token: Union[str, None] = get_access_token_from_cookie(request)
        if access_token is None:
            if session_required is True or frontend_has_interceptor(request) or normalise_http_method(
                    request.method()) == 'get':
                log_debug_message(
                    "getSession: Returning try refresh token because access token from cookies is undefined"
                )
                raise_try_refresh_token_exception(
                    'Access token has expired. Please call the refresh API')
            return None
        anti_csrf_token = get_anti_csrf_header(request)
        if anti_csrf_check is None:
            anti_csrf_check = normalise_http_method(request.method()) != 'get'

        log_debug_message("getSession: Value of doAntiCsrfCheck is: %s", str(anti_csrf_check))
        new_session = await session_functions.get_session(self, access_token, anti_csrf_token, anti_csrf_check,
                                                          get_rid_header(request) is not None)
        if 'accessToken' in new_session:
            access_token = new_session['accessToken']['token']

        if access_token is None:
            raise Exception("Should never come here")
        session = Session(self, access_token, new_session['session']['handle'],
                          new_session['session']['userId'], new_session['session']['userDataInJWT'])

        if 'accessToken' in new_session:
            session.new_access_token_info = new_session['accessToken']

        log_debug_message("getSession: Success!")
        request.set_session(session)
        return request.get_session()

    async def refresh_session(self, request: Any, user_context: Dict[str, Any]) -> SessionContainer:
        log_debug_message("refreshSession: Started")
        if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
            request = FRAMEWORKS[self.config.framework].wrap_request(request)

        id_refresh_token = get_id_refresh_token_from_cookie(request)
        if id_refresh_token is None:
            log_debug_message("refreshSession: UNAUTHORISED because idRefreshToken from cookies is undefined")
            raise_unauthorised_exception('Session does not exist. Are you sending the session tokens in the request '
                                         'as cookies?', False)
        refresh_token = get_refresh_token_from_cookie(request)
        if refresh_token is None:
            log_debug_message("refreshSession: UNAUTHORISED because refresh token from cookies is undefined")
            raise_unauthorised_exception('Refresh token not found. Are you sending the refresh token in the '
                                         'request as a cookie?')
        anti_csrf_token = get_anti_csrf_header(request)
        new_session = await session_functions.refresh_session(self, refresh_token, anti_csrf_token,
                                                              get_rid_header(request) is not None)
        access_token = new_session['accessToken']
        refresh_token = new_session['refreshToken']
        id_refresh_token = new_session['idRefreshToken']
        session = Session(self, access_token['token'], new_session['session']['handle'],
                          new_session['session']['userId'], new_session['session']['userDataInJWT'])
        session.new_access_token_info = access_token
        session.new_refresh_token_info = refresh_token
        session.new_id_refresh_token_info = id_refresh_token
        if 'antiCsrfToken' in new_session and new_session['antiCsrfToken'] is not None:
            session.new_anti_csrf_token = new_session['antiCsrfToken']

        log_debug_message("refreshSession: Success!")
        request.set_session(session)
        return request.get_session()

    async def revoke_session(self, session_handle: str, user_context: Dict[str, Any]) -> bool:
        return await session_functions.revoke_session(self, session_handle)

    async def revoke_all_sessions_for_user(self, user_id: str, user_context: Dict[str, Any]) -> List[str]:
        return await session_functions.revoke_all_sessions_for_user(self, user_id)

    async def get_all_session_handles_for_user(self, user_id: str, user_context: Dict[str, Any]) -> List[str]:
        return await session_functions.get_all_session_handles_for_user(self, user_id)

    async def revoke_multiple_sessions(self, session_handles: List[str], user_context: Dict[str, Any]) -> List[str]:
        return await session_functions.revoke_multiple_sessions(self, session_handles)

    async def get_session_information(self, session_handle: str, user_context: Dict[str, Any]) -> SessionInformationResult:
        return await session_functions.get_session_information(self, session_handle)

    async def update_session_data(self, session_handle: str, new_session_data: Dict[str, Any], user_context: Dict[str, Any]) -> None:
        await session_functions.update_session_data(self, session_handle, new_session_data)

    async def update_access_token_payload(self, session_handle: str, new_access_token_payload: Dict[str, Any], user_context: Dict[str, Any]) -> None:
        await session_functions.update_access_token_payload(self, session_handle, new_access_token_payload)

    async def get_access_token_lifetime_ms(self, user_context: Dict[str, Any]) -> int:
        return (await self.get_handshake_info()).access_token_validity

    async def get_refresh_token_lifetime_ms(self, user_context: Dict[str, Any]) -> int:
        return (await self.get_handshake_info()).refresh_token_validity

    async def regenerate_access_token(self,
                                      access_token: str,
                                      new_access_token_payload: Union[Dict[str, Any], None], user_context: Dict[str, Any]) -> RegenerateAccessTokenResult:
        if new_access_token_payload is None:
            new_access_token_payload = {}
        response: Dict[str, Any] = await self.querier.send_post_request(NormalisedURLPath("/recipe/session/regenerate"), {
            'accessToken': access_token,
            'userDataInJWT': new_access_token_payload
        })
        if response['status'] == 'UNAUTHORISED':
            raise_unauthorised_exception(response['message'])
        access_token_obj: Union[None, AccessTokenObj] = None
        if 'accessToken' in response:
            access_token_obj = AccessTokenObj(
                response['accessToken']['token'],
                response['accessToken']['expiry'],
                response['accessToken']['createdTime']
            )
        session = SessionObj(
            response['session']['handle'],
            response['session']['userId'],
            response['session']['userDataInJWT']
        )
        return RegenerateAccessTokenOkResult(session, access_token_obj)

Ancestors

Methods

async def create_new_session(self, request: Any, user_id: str, access_token_payload: Union[None, Dict[str, Any]], session_data: Union[None, Dict[str, Any]], user_context: Dict[str, Any]) ‑> SessionContainer
Expand source code
async def create_new_session(self, request: Any, user_id: str,
                             access_token_payload: Union[None, Dict[str, Any]],
                             session_data: Union[None, Dict[str, Any]], user_context: Dict[str, Any]) -> SessionContainer:
    if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
        request = FRAMEWORKS[self.config.framework].wrap_request(request)
    session = await session_functions.create_new_session(self, user_id, access_token_payload, session_data)
    access_token = session['accessToken']
    refresh_token = session['refreshToken']
    id_refresh_token = session['idRefreshToken']
    new_session = Session(self, access_token['token'], session['session']['handle'],
                          session['session']['userId'], session['session']['userDataInJWT'])
    new_session.new_access_token_info = access_token
    new_session.new_refresh_token_info = refresh_token
    new_session.new_id_refresh_token_info = id_refresh_token
    if 'antiCsrfToken' in session and session['antiCsrfToken'] is not None:
        new_session.new_anti_csrf_token = session['antiCsrfToken']
    request.set_session(new_session)
    return request.get_session()
async def get_access_token_lifetime_ms(self, user_context: Dict[str, Any]) ‑> int
Expand source code
async def get_access_token_lifetime_ms(self, user_context: Dict[str, Any]) -> int:
    return (await self.get_handshake_info()).access_token_validity
async def get_all_session_handles_for_user(self, user_id: str, user_context: Dict[str, Any]) ‑> List[str]
Expand source code
async def get_all_session_handles_for_user(self, user_id: str, user_context: Dict[str, Any]) -> List[str]:
    return await session_functions.get_all_session_handles_for_user(self, user_id)
async def get_handshake_info(self, force_refetch: bool = False) ‑> HandshakeInfo
Expand source code
async def get_handshake_info(self, force_refetch: bool = False) -> HandshakeInfo:
    if self.handshake_info is None or len(
            self.handshake_info.get_jwt_signing_public_key_list()) == 0 or force_refetch:
        ProcessState.get_instance().add_state(
            AllowedProcessStates.CALLING_SERVICE_IN_GET_HANDSHAKE_INFO)
        response = await self.querier.send_post_request(NormalisedURLPath('/recipe/handshake'), {})
        self.handshake_info = HandshakeInfo({
            **response,
            'antiCsrf': self.config.anti_csrf
        })

        self.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'],
                                                response['jwtSigningPublicKey'],
                                                response['jwtSigningPublicKeyExpiryTime'])

    return self.handshake_info
async def get_refresh_token_lifetime_ms(self, user_context: Dict[str, Any]) ‑> int
Expand source code
async def get_refresh_token_lifetime_ms(self, user_context: Dict[str, Any]) -> int:
    return (await self.get_handshake_info()).refresh_token_validity
async def get_session(self, request: Any, anti_csrf_check: Union[bool, None], session_required: bool, user_context: Dict[str, Any]) ‑> Union[SessionContainer, None]
Expand source code
async def get_session(self, request: Any, anti_csrf_check: Union[bool, None],
                      session_required: bool, user_context: Dict[str, Any]) -> Union[SessionContainer, None]:
    log_debug_message("getSession: Started")
    if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
        request = FRAMEWORKS[self.config.framework].wrap_request(request)

    log_debug_message("getSession: rid in header: %s", str(frontend_has_interceptor(request)))
    log_debug_message("getSession: request method: %s", request.method())

    id_refresh_token = get_id_refresh_token_from_cookie(request)
    if id_refresh_token is None:
        if not session_required:
            log_debug_message("getSession: returning None because idRefreshToken is undefined and session_required is false")
            return None
        log_debug_message("getSession: UNAUTHORISED because idRefreshToken from cookies is undefined")
        raise_unauthorised_exception('Session does not exist. Are you sending the session tokens in the request as cookies?', False)
    access_token: Union[str, None] = get_access_token_from_cookie(request)
    if access_token is None:
        if session_required is True or frontend_has_interceptor(request) or normalise_http_method(
                request.method()) == 'get':
            log_debug_message(
                "getSession: Returning try refresh token because access token from cookies is undefined"
            )
            raise_try_refresh_token_exception(
                'Access token has expired. Please call the refresh API')
        return None
    anti_csrf_token = get_anti_csrf_header(request)
    if anti_csrf_check is None:
        anti_csrf_check = normalise_http_method(request.method()) != 'get'

    log_debug_message("getSession: Value of doAntiCsrfCheck is: %s", str(anti_csrf_check))
    new_session = await session_functions.get_session(self, access_token, anti_csrf_token, anti_csrf_check,
                                                      get_rid_header(request) is not None)
    if 'accessToken' in new_session:
        access_token = new_session['accessToken']['token']

    if access_token is None:
        raise Exception("Should never come here")
    session = Session(self, access_token, new_session['session']['handle'],
                      new_session['session']['userId'], new_session['session']['userDataInJWT'])

    if 'accessToken' in new_session:
        session.new_access_token_info = new_session['accessToken']

    log_debug_message("getSession: Success!")
    request.set_session(session)
    return request.get_session()
async def get_session_information(self, session_handle: str, user_context: Dict[str, Any]) ‑> SessionInformationResult
Expand source code
async def get_session_information(self, session_handle: str, user_context: Dict[str, Any]) -> SessionInformationResult:
    return await session_functions.get_session_information(self, session_handle)
async def refresh_session(self, request: Any, user_context: Dict[str, Any]) ‑> SessionContainer
Expand source code
async def refresh_session(self, request: Any, user_context: Dict[str, Any]) -> SessionContainer:
    log_debug_message("refreshSession: Started")
    if not hasattr(request, 'wrapper_used') or not request.wrapper_used:
        request = FRAMEWORKS[self.config.framework].wrap_request(request)

    id_refresh_token = get_id_refresh_token_from_cookie(request)
    if id_refresh_token is None:
        log_debug_message("refreshSession: UNAUTHORISED because idRefreshToken from cookies is undefined")
        raise_unauthorised_exception('Session does not exist. Are you sending the session tokens in the request '
                                     'as cookies?', False)
    refresh_token = get_refresh_token_from_cookie(request)
    if refresh_token is None:
        log_debug_message("refreshSession: UNAUTHORISED because refresh token from cookies is undefined")
        raise_unauthorised_exception('Refresh token not found. Are you sending the refresh token in the '
                                     'request as a cookie?')
    anti_csrf_token = get_anti_csrf_header(request)
    new_session = await session_functions.refresh_session(self, refresh_token, anti_csrf_token,
                                                          get_rid_header(request) is not None)
    access_token = new_session['accessToken']
    refresh_token = new_session['refreshToken']
    id_refresh_token = new_session['idRefreshToken']
    session = Session(self, access_token['token'], new_session['session']['handle'],
                      new_session['session']['userId'], new_session['session']['userDataInJWT'])
    session.new_access_token_info = access_token
    session.new_refresh_token_info = refresh_token
    session.new_id_refresh_token_info = id_refresh_token
    if 'antiCsrfToken' in new_session and new_session['antiCsrfToken'] is not None:
        session.new_anti_csrf_token = new_session['antiCsrfToken']

    log_debug_message("refreshSession: Success!")
    request.set_session(session)
    return request.get_session()
async def regenerate_access_token(self, access_token: str, new_access_token_payload: Union[Dict[str, Any], None], user_context: Dict[str, Any]) ‑> RegenerateAccessTokenResult
Expand source code
async def regenerate_access_token(self,
                                  access_token: str,
                                  new_access_token_payload: Union[Dict[str, Any], None], user_context: Dict[str, Any]) -> RegenerateAccessTokenResult:
    if new_access_token_payload is None:
        new_access_token_payload = {}
    response: Dict[str, Any] = await self.querier.send_post_request(NormalisedURLPath("/recipe/session/regenerate"), {
        'accessToken': access_token,
        'userDataInJWT': new_access_token_payload
    })
    if response['status'] == 'UNAUTHORISED':
        raise_unauthorised_exception(response['message'])
    access_token_obj: Union[None, AccessTokenObj] = None
    if 'accessToken' in response:
        access_token_obj = AccessTokenObj(
            response['accessToken']['token'],
            response['accessToken']['expiry'],
            response['accessToken']['createdTime']
        )
    session = SessionObj(
        response['session']['handle'],
        response['session']['userId'],
        response['session']['userDataInJWT']
    )
    return RegenerateAccessTokenOkResult(session, access_token_obj)
async def revoke_all_sessions_for_user(self, user_id: str, user_context: Dict[str, Any]) ‑> List[str]
Expand source code
async def revoke_all_sessions_for_user(self, user_id: str, user_context: Dict[str, Any]) -> List[str]:
    return await session_functions.revoke_all_sessions_for_user(self, user_id)
async def revoke_multiple_sessions(self, session_handles: List[str], user_context: Dict[str, Any]) ‑> List[str]
Expand source code
async def revoke_multiple_sessions(self, session_handles: List[str], user_context: Dict[str, Any]) -> List[str]:
    return await session_functions.revoke_multiple_sessions(self, session_handles)
async def revoke_session(self, session_handle: str, user_context: Dict[str, Any]) ‑> bool
Expand source code
async def revoke_session(self, session_handle: str, user_context: Dict[str, Any]) -> bool:
    return await session_functions.revoke_session(self, session_handle)
async def update_access_token_payload(self, session_handle: str, new_access_token_payload: Dict[str, Any], user_context: Dict[str, Any]) ‑> None
Expand source code
async def update_access_token_payload(self, session_handle: str, new_access_token_payload: Dict[str, Any], user_context: Dict[str, Any]) -> None:
    await session_functions.update_access_token_payload(self, session_handle, new_access_token_payload)
def update_jwt_signing_public_key_info(self, key_list: Union[List[Dict[str, Any]], None], public_key: str, expiry_time: int)
Expand source code
def update_jwt_signing_public_key_info(
        self, key_list: Union[List[Dict[str, Any]], None], public_key: str, expiry_time: int):
    if key_list is None:
        key_list = [{
            'publicKey': public_key,
            'expiryTime': expiry_time,
            'createdAt': get_timestamp_ms()
        }]

    if self.handshake_info is not None:
        self.handshake_info.set_jwt_signing_public_key_list(key_list)
async def update_session_data(self, session_handle: str, new_session_data: Dict[str, Any], user_context: Dict[str, Any]) ‑> None
Expand source code
async def update_session_data(self, session_handle: str, new_session_data: Dict[str, Any], user_context: Dict[str, Any]) -> None:
    await session_functions.update_session_data(self, session_handle, new_session_data)