Module supertokens_python.recipe.session.session_functions

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from typing import TYPE_CHECKING, Any, Dict, List, Union

from supertokens_python.recipe.session.interfaces import \
    SessionInformationResult

from .access_token import get_info_from_access_token
from .jwt import get_payload_without_verifying

if TYPE_CHECKING:
    from .recipe_implementation import RecipeImplementation

from supertokens_python.logger import log_debug_message
from supertokens_python.normalised_url_path import NormalisedURLPath
from supertokens_python.process_state import AllowedProcessStates, ProcessState

from .exceptions import (TryRefreshTokenError, raise_token_theft_exception,
                         raise_try_refresh_token_exception,
                         raise_unauthorised_exception)


async def create_new_session(recipe_implementation: RecipeImplementation, user_id: str,
                             access_token_payload: Union[None, Dict[str, Any]],
                             session_data: Union[None, Dict[str, Any]]):
    if session_data is None:
        session_data = {}
    if access_token_payload is None:
        access_token_payload = {}

    handshake_info = await recipe_implementation.get_handshake_info()
    enable_anti_csrf = handshake_info.anti_csrf == 'VIA_TOKEN'
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session'), {
        'userId': user_id,
        'userDataInJWT': access_token_payload,
        'userDataInDatabase': session_data,
        'enableAntiCsrf': enable_anti_csrf
    })
    recipe_implementation.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'], response['jwtSigningPublicKey'],
                                                             response['jwtSigningPublicKeyExpiryTime'])
    response.pop('status', None)
    response.pop('jwtSigningPublicKey', None)
    response.pop('jwtSigningPublicKeyExpiryTime', None)
    response.pop('jwtSigningPublicKeyList', None)

    return response


async def get_session(recipe_implementation: RecipeImplementation, access_token: str,
                      anti_csrf_token: Union[str, None],
                      do_anti_csrf_check: bool, contains_custom_header: bool) -> Dict[str, Any]:
    handshake_info = await recipe_implementation.get_handshake_info()
    access_token_info = None
    found_a_sign_key_that_is_older_than_the_access_token = False

    for key in handshake_info.get_jwt_signing_public_key_list():
        try:
            access_token_info = get_info_from_access_token(access_token,
                                                           key['publicKey'],
                                                           handshake_info.anti_csrf == 'VIA_TOKEN'
                                                           and do_anti_csrf_check)

            found_a_sign_key_that_is_older_than_the_access_token = True

        except Exception as e:
            payload = None

            if e.__class__ != TryRefreshTokenError:
                raise e

            try:
                payload = get_payload_without_verifying(access_token)
            except BaseException:
                raise e

            if payload is None:
                raise e

            if not isinstance(payload['timeCreated'], int) or not isinstance(
                    payload['expiryTime'], int):
                raise e

            if payload['expiryTime'] < time.time():
                raise e

            if payload['timeCreated'] >= key['createdAt']:
                found_a_sign_key_that_is_older_than_the_access_token = True
                break

    if not found_a_sign_key_that_is_older_than_the_access_token:
        raise_try_refresh_token_exception(
            'anti-csrf check failed')

    if handshake_info.anti_csrf == 'VIA_TOKEN' and do_anti_csrf_check:
        if access_token_info is not None:
            if anti_csrf_token is None or anti_csrf_token != access_token_info[
                    'antiCsrfToken']:
                if anti_csrf_token is None:
                    log_debug_message(
                        "getSession: Returning TRY_REFRESH_TOKEN because antiCsrfToken is missing from request"
                    )
                    raise_try_refresh_token_exception('Provided antiCsrfToken is undefined. If you do not want anti-csrf check for this API, please set doAntiCsrfCheck to false for this API')
                else:
                    log_debug_message(
                        "getSession: Returning TRY_REFRESH_TOKEN because the passed antiCsrfToken is not the same as in the access token"
                    )
                    raise_try_refresh_token_exception(
                        'anti-csrf check failed')

    elif handshake_info.anti_csrf == 'VIA_CUSTOM_HEADER' and do_anti_csrf_check:
        if not contains_custom_header:
            log_debug_message("getSession: Returning TRY_REFRESH_TOKEN because custom header (rid) was not passed")
            raise_try_refresh_token_exception('anti-csrf check failed. Please pass \'rid: "anti-csrf"\' header in the request, or set doAntiCsrfCheck to false '
                                              'for this API')

    if access_token_info is not None and not handshake_info.access_token_blacklisting_enabled and \
            access_token_info['parentRefreshTokenHash1'] is None:
        return {
            'session': {
                'handle': access_token_info['sessionHandle'],
                'userId': access_token_info['userId'],
                'userDataInJWT': access_token_info['userData']
            }
        }

    ProcessState.get_instance().add_state(
        AllowedProcessStates.CALLING_SERVICE_IN_VERIFY)

    data = {
        'accessToken': access_token,
        'doAntiCsrfCheck': do_anti_csrf_check,
        'enableAntiCsrf': handshake_info.anti_csrf == "VIA_TOKEN"
    }
    if anti_csrf_token is not None:
        data['antiCsrfToken'] = anti_csrf_token

    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/verify'), data)
    if response['status'] == 'OK':
        recipe_implementation.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'], response['jwtSigningPublicKey'],
                                                                 response['jwtSigningPublicKeyExpiryTime'])
        response.pop('status', None)
        response.pop('jwtSigningPublicKey', None)
        response.pop('jwtSigningPublicKeyExpiryTime', None)
        response.pop('jwtSigningPublicKeyList', None)
        return response
    if response['status'] == 'UNAUTHORISED':
        log_debug_message("getSession: Returning UNAUTHORISED because of core response")
        raise_unauthorised_exception(response['message'])
    if response['jwtSigningPublicKeyList'] is not None or response['jwtSigningPublicKey'] is not None or response['jwtSigningPublicKeyExpiryTime'] is not None:
        recipe_implementation.update_jwt_signing_public_key_info(
            response['jwtSigningPublicKeyList'],
            response['jwtSigningPublicKey'],
            response['jwtSigningPublicKeyExpiryTime'])
    else:
        await recipe_implementation.get_handshake_info(True)

    log_debug_message("getSession: Returning TRY_REFRESH_TOKEN because of core response")
    raise_try_refresh_token_exception(response['message'])


async def refresh_session(recipe_implementation: RecipeImplementation, refresh_token: str,
                          anti_csrf_token: Union[str, None],
                          contains_custom_header: bool):
    handshake_info = await recipe_implementation.get_handshake_info()
    data = {
        'refreshToken': refresh_token,
        'enableAntiCsrf': handshake_info.anti_csrf == 'VIA_TOKEN'
    }
    if anti_csrf_token is not None:
        data['antiCsrfToken'] = anti_csrf_token

    if handshake_info.anti_csrf == 'VIA_CUSTOM_HEADER':
        if not contains_custom_header:
            log_debug_message("refreshSession: Returning UNAUTHORISED because custom header (rid) was not passed")
            raise_unauthorised_exception('anti-csrf check failed. Please pass \'rid: "session"\' header '
                                         'in the request.', False)
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/refresh'), data)
    if response['status'] == 'OK':
        response.pop('status', None)
        return response
    if response['status'] == 'UNAUTHORISED':
        log_debug_message("refreshSession: Returning UNAUTHORISED because of core response")
        raise_unauthorised_exception(response['message'])
    log_debug_message("refreshSession: Returning TOKEN_THEFT_DETECTED because of core response")
    raise_token_theft_exception(
        response['session']['userId'],
        response['session']['handle']
    )


async def revoke_all_sessions_for_user(recipe_implementation: RecipeImplementation, user_id: str) -> List[str]:
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/remove'), {
        'userId': user_id
    })
    return response['sessionHandlesRevoked']


async def get_all_session_handles_for_user(recipe_implementation: RecipeImplementation, user_id: str) -> List[str]:
    response = await recipe_implementation.querier.send_get_request(NormalisedURLPath('/recipe/session/user'), {
        'userId': user_id
    })
    return response['sessionHandles']


async def revoke_session(recipe_implementation: RecipeImplementation, session_handle: str) -> bool:
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/remove'), {
        'sessionHandles': [session_handle]
    })
    return len(response['sessionHandlesRevoked']) == 1


async def revoke_multiple_sessions(recipe_implementation: RecipeImplementation, session_handles: List[str]) -> List[
        str]:
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/remove'), {
        'sessionHandles': session_handles
    })
    return response['sessionHandlesRevoked']


async def update_session_data(recipe_implementation: RecipeImplementation, session_handle: str, new_session_data: Dict[str, Any]):
    response = await recipe_implementation.querier.send_put_request(NormalisedURLPath('/recipe/session/data'), {
        'sessionHandle': session_handle,
        'userDataInDatabase': new_session_data
    })
    if response['status'] == 'UNAUTHORISED':
        raise_unauthorised_exception(response['message'])


async def update_access_token_payload(recipe_implementation: RecipeImplementation, session_handle: str, new_access_token_payload: Dict[str, Any]):
    response = await recipe_implementation.querier.send_put_request(NormalisedURLPath('/recipe/jwt/data'), {
        'sessionHandle': session_handle,
        'userDataInJWT': new_access_token_payload
    })
    if response['status'] == 'UNAUTHORISED':
        raise_unauthorised_exception(response['message'])


async def get_session_information(recipe_implementation: RecipeImplementation, session_handle: str) -> SessionInformationResult:
    response = await recipe_implementation.querier.send_get_request(NormalisedURLPath('/recipe/session'), {
        'sessionHandle': session_handle
    })
    if response['status'] == 'OK':
        return SessionInformationResult('OK', response['sessionHandle'], response['userId'], response['userDataInDatabase'], response['expiry'], response['userDataInJWT'], response['timeCreated'])
    raise_unauthorised_exception(response['message'])

Functions

async def create_new_session(recipe_implementation: RecipeImplementation, user_id: str, access_token_payload: Union[None, Dict[str, Any]], session_data: Union[None, Dict[str, Any]])
Expand source code
async def create_new_session(recipe_implementation: RecipeImplementation, user_id: str,
                             access_token_payload: Union[None, Dict[str, Any]],
                             session_data: Union[None, Dict[str, Any]]):
    if session_data is None:
        session_data = {}
    if access_token_payload is None:
        access_token_payload = {}

    handshake_info = await recipe_implementation.get_handshake_info()
    enable_anti_csrf = handshake_info.anti_csrf == 'VIA_TOKEN'
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session'), {
        'userId': user_id,
        'userDataInJWT': access_token_payload,
        'userDataInDatabase': session_data,
        'enableAntiCsrf': enable_anti_csrf
    })
    recipe_implementation.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'], response['jwtSigningPublicKey'],
                                                             response['jwtSigningPublicKeyExpiryTime'])
    response.pop('status', None)
    response.pop('jwtSigningPublicKey', None)
    response.pop('jwtSigningPublicKeyExpiryTime', None)
    response.pop('jwtSigningPublicKeyList', None)

    return response
async def get_all_session_handles_for_user(recipe_implementation: RecipeImplementation, user_id: str) ‑> List[str]
Expand source code
async def get_all_session_handles_for_user(recipe_implementation: RecipeImplementation, user_id: str) -> List[str]:
    response = await recipe_implementation.querier.send_get_request(NormalisedURLPath('/recipe/session/user'), {
        'userId': user_id
    })
    return response['sessionHandles']
async def get_session(recipe_implementation: RecipeImplementation, access_token: str, anti_csrf_token: Union[str, None], do_anti_csrf_check: bool, contains_custom_header: bool) ‑> Dict[str, Any]
Expand source code
async def get_session(recipe_implementation: RecipeImplementation, access_token: str,
                      anti_csrf_token: Union[str, None],
                      do_anti_csrf_check: bool, contains_custom_header: bool) -> Dict[str, Any]:
    handshake_info = await recipe_implementation.get_handshake_info()
    access_token_info = None
    found_a_sign_key_that_is_older_than_the_access_token = False

    for key in handshake_info.get_jwt_signing_public_key_list():
        try:
            access_token_info = get_info_from_access_token(access_token,
                                                           key['publicKey'],
                                                           handshake_info.anti_csrf == 'VIA_TOKEN'
                                                           and do_anti_csrf_check)

            found_a_sign_key_that_is_older_than_the_access_token = True

        except Exception as e:
            payload = None

            if e.__class__ != TryRefreshTokenError:
                raise e

            try:
                payload = get_payload_without_verifying(access_token)
            except BaseException:
                raise e

            if payload is None:
                raise e

            if not isinstance(payload['timeCreated'], int) or not isinstance(
                    payload['expiryTime'], int):
                raise e

            if payload['expiryTime'] < time.time():
                raise e

            if payload['timeCreated'] >= key['createdAt']:
                found_a_sign_key_that_is_older_than_the_access_token = True
                break

    if not found_a_sign_key_that_is_older_than_the_access_token:
        raise_try_refresh_token_exception(
            'anti-csrf check failed')

    if handshake_info.anti_csrf == 'VIA_TOKEN' and do_anti_csrf_check:
        if access_token_info is not None:
            if anti_csrf_token is None or anti_csrf_token != access_token_info[
                    'antiCsrfToken']:
                if anti_csrf_token is None:
                    log_debug_message(
                        "getSession: Returning TRY_REFRESH_TOKEN because antiCsrfToken is missing from request"
                    )
                    raise_try_refresh_token_exception('Provided antiCsrfToken is undefined. If you do not want anti-csrf check for this API, please set doAntiCsrfCheck to false for this API')
                else:
                    log_debug_message(
                        "getSession: Returning TRY_REFRESH_TOKEN because the passed antiCsrfToken is not the same as in the access token"
                    )
                    raise_try_refresh_token_exception(
                        'anti-csrf check failed')

    elif handshake_info.anti_csrf == 'VIA_CUSTOM_HEADER' and do_anti_csrf_check:
        if not contains_custom_header:
            log_debug_message("getSession: Returning TRY_REFRESH_TOKEN because custom header (rid) was not passed")
            raise_try_refresh_token_exception('anti-csrf check failed. Please pass \'rid: "anti-csrf"\' header in the request, or set doAntiCsrfCheck to false '
                                              'for this API')

    if access_token_info is not None and not handshake_info.access_token_blacklisting_enabled and \
            access_token_info['parentRefreshTokenHash1'] is None:
        return {
            'session': {
                'handle': access_token_info['sessionHandle'],
                'userId': access_token_info['userId'],
                'userDataInJWT': access_token_info['userData']
            }
        }

    ProcessState.get_instance().add_state(
        AllowedProcessStates.CALLING_SERVICE_IN_VERIFY)

    data = {
        'accessToken': access_token,
        'doAntiCsrfCheck': do_anti_csrf_check,
        'enableAntiCsrf': handshake_info.anti_csrf == "VIA_TOKEN"
    }
    if anti_csrf_token is not None:
        data['antiCsrfToken'] = anti_csrf_token

    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/verify'), data)
    if response['status'] == 'OK':
        recipe_implementation.update_jwt_signing_public_key_info(response['jwtSigningPublicKeyList'], response['jwtSigningPublicKey'],
                                                                 response['jwtSigningPublicKeyExpiryTime'])
        response.pop('status', None)
        response.pop('jwtSigningPublicKey', None)
        response.pop('jwtSigningPublicKeyExpiryTime', None)
        response.pop('jwtSigningPublicKeyList', None)
        return response
    if response['status'] == 'UNAUTHORISED':
        log_debug_message("getSession: Returning UNAUTHORISED because of core response")
        raise_unauthorised_exception(response['message'])
    if response['jwtSigningPublicKeyList'] is not None or response['jwtSigningPublicKey'] is not None or response['jwtSigningPublicKeyExpiryTime'] is not None:
        recipe_implementation.update_jwt_signing_public_key_info(
            response['jwtSigningPublicKeyList'],
            response['jwtSigningPublicKey'],
            response['jwtSigningPublicKeyExpiryTime'])
    else:
        await recipe_implementation.get_handshake_info(True)

    log_debug_message("getSession: Returning TRY_REFRESH_TOKEN because of core response")
    raise_try_refresh_token_exception(response['message'])
async def get_session_information(recipe_implementation: RecipeImplementation, session_handle: str) ‑> SessionInformationResult
Expand source code
async def get_session_information(recipe_implementation: RecipeImplementation, session_handle: str) -> SessionInformationResult:
    response = await recipe_implementation.querier.send_get_request(NormalisedURLPath('/recipe/session'), {
        'sessionHandle': session_handle
    })
    if response['status'] == 'OK':
        return SessionInformationResult('OK', response['sessionHandle'], response['userId'], response['userDataInDatabase'], response['expiry'], response['userDataInJWT'], response['timeCreated'])
    raise_unauthorised_exception(response['message'])
async def refresh_session(recipe_implementation: RecipeImplementation, refresh_token: str, anti_csrf_token: Union[str, None], contains_custom_header: bool)
Expand source code
async def refresh_session(recipe_implementation: RecipeImplementation, refresh_token: str,
                          anti_csrf_token: Union[str, None],
                          contains_custom_header: bool):
    handshake_info = await recipe_implementation.get_handshake_info()
    data = {
        'refreshToken': refresh_token,
        'enableAntiCsrf': handshake_info.anti_csrf == 'VIA_TOKEN'
    }
    if anti_csrf_token is not None:
        data['antiCsrfToken'] = anti_csrf_token

    if handshake_info.anti_csrf == 'VIA_CUSTOM_HEADER':
        if not contains_custom_header:
            log_debug_message("refreshSession: Returning UNAUTHORISED because custom header (rid) was not passed")
            raise_unauthorised_exception('anti-csrf check failed. Please pass \'rid: "session"\' header '
                                         'in the request.', False)
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/refresh'), data)
    if response['status'] == 'OK':
        response.pop('status', None)
        return response
    if response['status'] == 'UNAUTHORISED':
        log_debug_message("refreshSession: Returning UNAUTHORISED because of core response")
        raise_unauthorised_exception(response['message'])
    log_debug_message("refreshSession: Returning TOKEN_THEFT_DETECTED because of core response")
    raise_token_theft_exception(
        response['session']['userId'],
        response['session']['handle']
    )
async def revoke_all_sessions_for_user(recipe_implementation: RecipeImplementation, user_id: str) ‑> List[str]
Expand source code
async def revoke_all_sessions_for_user(recipe_implementation: RecipeImplementation, user_id: str) -> List[str]:
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/remove'), {
        'userId': user_id
    })
    return response['sessionHandlesRevoked']
async def revoke_multiple_sessions(recipe_implementation: RecipeImplementation, session_handles: List[str]) ‑> List[str]
Expand source code
async def revoke_multiple_sessions(recipe_implementation: RecipeImplementation, session_handles: List[str]) -> List[
        str]:
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/remove'), {
        'sessionHandles': session_handles
    })
    return response['sessionHandlesRevoked']
async def revoke_session(recipe_implementation: RecipeImplementation, session_handle: str) ‑> bool
Expand source code
async def revoke_session(recipe_implementation: RecipeImplementation, session_handle: str) -> bool:
    response = await recipe_implementation.querier.send_post_request(NormalisedURLPath('/recipe/session/remove'), {
        'sessionHandles': [session_handle]
    })
    return len(response['sessionHandlesRevoked']) == 1
async def update_access_token_payload(recipe_implementation: RecipeImplementation, session_handle: str, new_access_token_payload: Dict[str, Any])
Expand source code
async def update_access_token_payload(recipe_implementation: RecipeImplementation, session_handle: str, new_access_token_payload: Dict[str, Any]):
    response = await recipe_implementation.querier.send_put_request(NormalisedURLPath('/recipe/jwt/data'), {
        'sessionHandle': session_handle,
        'userDataInJWT': new_access_token_payload
    })
    if response['status'] == 'UNAUTHORISED':
        raise_unauthorised_exception(response['message'])
async def update_session_data(recipe_implementation: RecipeImplementation, session_handle: str, new_session_data: Dict[str, Any])
Expand source code
async def update_session_data(recipe_implementation: RecipeImplementation, session_handle: str, new_session_data: Dict[str, Any]):
    response = await recipe_implementation.querier.send_put_request(NormalisedURLPath('/recipe/session/data'), {
        'sessionHandle': session_handle,
        'userDataInDatabase': new_session_data
    })
    if response['status'] == 'UNAUTHORISED':
        raise_unauthorised_exception(response['message'])