Module supertokens_python.recipe.session.session_request_functions

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING

from supertokens_python.logger import log_debug_message
from supertokens_python.recipe.session.access_token import (
    validate_access_token_structure,
)
from supertokens_python.recipe.session.constants import available_token_transfer_methods
from supertokens_python.recipe.session.cookie_and_header import (
    clear_session_cookies_from_older_cookie_domain,
    clear_session_mutator,
    get_anti_csrf_header,
    get_token,
    has_multiple_cookies_for_token_type,
    set_cookie_response_mutator,
)
from supertokens_python.recipe.session.exceptions import (
    raise_try_refresh_token_exception,
    raise_unauthorised_exception,
)
from supertokens_python.recipe.session.interfaces import (
    RecipeInterface as SessionRecipeInterface,
)
from supertokens_python.recipe.session.interfaces import (
    SessionClaimValidator,
    SessionContainer,
)
from supertokens_python.recipe.session.exceptions import (
    SuperTokensSessionError,
    TokenTheftError,
    UnauthorisedError,
)
from supertokens_python.recipe.session.jwt import (
    ParsedJWTInfo,
    parse_jwt_without_signature_verification,
)
from supertokens_python.recipe.session.utils import (
    SessionConfig,
    TokenTransferMethod,
    get_required_claim_validators,
    get_auth_mode_from_header,
)
from supertokens_python.types import MaybeAwaitable, RecipeUserId
from supertokens_python.utils import (
    FRAMEWORKS,
    get_rid_from_header,
    is_an_ip_address,
    normalise_http_method,
    set_request_in_user_context_if_not_defined,
)
from supertokens_python.supertokens import Supertokens
from .constants import protected_props

if TYPE_CHECKING:
    from supertokens_python.recipe.session.recipe import SessionRecipe
    from supertokens_python.supertokens import AppInfo
    from .interfaces import ResponseMutator

LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME = "sIdRefreshToken"


async def get_session_from_request(
    request: Any,
    config: SessionConfig,
    recipe_interface_impl: SessionRecipeInterface,
    session_required: Optional[bool] = None,
    anti_csrf_check: Optional[bool] = None,
    check_database: Optional[bool] = None,
    override_global_claim_validators: Optional[
        Callable[
            [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
            MaybeAwaitable[List[SessionClaimValidator]],
        ]
    ] = None,
    user_context: Optional[Dict[str, Any]] = None,
) -> Optional[SessionContainer]:
    log_debug_message("getSession: Started")

    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            Supertokens.get_instance().app_info.framework
        ].wrap_request(request)

    log_debug_message("getSession: Wrapping done")

    user_context = set_request_in_user_context_if_not_defined(user_context, request)

    # This token isn't handled by getToken to limit the scope of this legacy/migration code
    if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
        log_debug_message(
            "getSession: Throwing TRY_REFRESH_TOKEN because the request is using a legacy session"
        )
        # This could create a spike on refresh calls during the update of the backend SDK
        return raise_try_refresh_token_exception(
            "using legacy session, please call the refresh API"
        )

    session_optional = not session_required
    log_debug_message("getSession: optional validation: %s", session_optional)

    access_tokens: Dict[TokenTransferMethod, ParsedJWTInfo] = {}

    # We check all token transfer methods for available access tokens
    for transfer_method in available_token_transfer_methods:
        token_string = get_token(request, "access", transfer_method)

        if token_string is not None:
            try:
                info = parse_jwt_without_signature_verification(token_string)
                validate_access_token_structure(info.payload, info.version)
                log_debug_message(
                    "getSession: got access token from %s", transfer_method
                )
                access_tokens[transfer_method] = info
            except Exception:
                log_debug_message(
                    "getSession: ignoring token in %s, because it doesn't match our access token structure",
                    transfer_method,
                )

    allowed_transfer_method = config.get_token_transfer_method(
        request, False, user_context
    )
    request_transfer_method: Optional[TokenTransferMethod] = None
    request_access_token: Union[ParsedJWTInfo, None] = None

    if (allowed_transfer_method in ("any", "header")) and access_tokens.get(
        "header"
    ) is not None:
        log_debug_message("getSession: using header transfer method")
        request_transfer_method = "header"
        request_access_token = access_tokens["header"]
    elif (allowed_transfer_method in ("any", "cookie")) and access_tokens.get(
        "cookie"
    ) is not None:
        log_debug_message("getSession: using cookie transfer method")

        # If multiple access tokens exist in the request cookie, throw TRY_REFRESH_TOKEN.
        # This prompts the client to call the refresh endpoint, clearing older_cookie_domain cookies (if set).
        # ensuring outdated token payload isn't used.
        if has_multiple_cookies_for_token_type(request, "access"):
            log_debug_message(
                "getSession: Throwing TRY_REFRESH_TOKEN because multiple access tokens are present in request cookies"
            )
            raise_try_refresh_token_exception(
                "Multiple access tokens present in the request cookies."
            )

        request_transfer_method = "cookie"
        request_access_token = access_tokens["cookie"]

    anti_csrf_token = get_anti_csrf_header(request)
    do_anti_csrf_check = anti_csrf_check

    if do_anti_csrf_check is None:
        do_anti_csrf_check = normalise_http_method(request.method()) != "get"
    if request_transfer_method == "header":
        do_anti_csrf_check = False
    if request_access_token is None:
        do_anti_csrf_check = False

    if callable(config.anti_csrf_function_or_string):
        anti_csrf = config.anti_csrf_function_or_string(request, user_context)
    else:
        anti_csrf = config.anti_csrf_function_or_string

    if do_anti_csrf_check and anti_csrf == "VIA_CUSTOM_HEADER":
        if anti_csrf == "VIA_CUSTOM_HEADER":
            if get_rid_from_header(request) is None:
                log_debug_message(
                    "getSession: Returning TRY_REFRESH_TOKEN because custom header (rid) was not passed"
                )
                raise_try_refresh_token_exception(
                    "anti-csrf check failed. Please pass 'rid: \"session\"' header in the request, or set doAntiCsrfCheck to false for this API"
                )

            log_debug_message("getSession: VIA_CUSTOM_HEADER anti-csrf check passed")
            do_anti_csrf_check = False

    log_debug_message("getSession: Value of antiCsrfToken is: %s", do_anti_csrf_check)

    session = await recipe_interface_impl.get_session(
        access_token=(
            request_access_token.raw_token_string
            if request_access_token is not None
            else None
        ),
        anti_csrf_token=anti_csrf_token,
        anti_csrf_check=do_anti_csrf_check,
        session_required=session_required,
        check_database=check_database,
        override_global_claim_validators=override_global_claim_validators,
        user_context=user_context,
    )

    if session is not None:
        claim_validators = await get_required_claim_validators(
            session, override_global_claim_validators, user_context
        )
        await session.assert_claims(claim_validators, user_context)

        # request_transfer_method can only be None here if the user overriddes get_session
        # to load the session by a custom method in that (very niche) case they also need to
        # override how the session is attached to the response.
        # In that scenario the transferMethod passed to attachToRequestResponse likely doesn't
        # matter, still, we follow the general fallback logic

        if request_transfer_method is not None:
            final_transfer_method = request_transfer_method
        elif allowed_transfer_method != "any":
            final_transfer_method = allowed_transfer_method
        else:
            final_transfer_method = "header"

        await session.attach_to_request_response(
            request, final_transfer_method, user_context
        )

    return session


async def create_new_session_in_request(
    request: Any,
    user_context: Dict[str, Any],
    recipe_instance: SessionRecipe,
    access_token_payload: Dict[str, Any],
    user_id: str,
    recipe_user_id: RecipeUserId,
    config: SessionConfig,
    app_info: AppInfo,
    session_data_in_database: Dict[str, Any],
    tenant_id: str,
) -> SessionContainer:
    log_debug_message("createNewSession: Started")

    # Handling framework specific request/response wrapping
    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            Supertokens.get_instance().app_info.framework
        ].wrap_request(request)

    log_debug_message("createNewSession: Wrapping done")
    user_context = set_request_in_user_context_if_not_defined(user_context, request)

    claims_added_by_other_recipes = recipe_instance.get_claims_added_by_other_recipes()
    issuer = (
        app_info.api_domain.get_as_string_dangerous()
        + app_info.api_base_path.get_as_string_dangerous()
    )

    final_access_token_payload = {**access_token_payload, "iss": issuer}

    for prop in protected_props:
        if prop in final_access_token_payload:
            del final_access_token_payload[prop]

    for claim in claims_added_by_other_recipes:
        update = await claim.build(
            user_id, recipe_user_id, tenant_id, final_access_token_payload, user_context
        )
        final_access_token_payload.update(update)

    log_debug_message("createNewSession: Access token payload built")

    output_transfer_method = config.get_token_transfer_method(
        request, True, user_context
    )
    if output_transfer_method == "any":
        auth_mode_header = get_auth_mode_from_header(request)
        if auth_mode_header == "cookie":
            output_transfer_method = auth_mode_header
        else:
            output_transfer_method = "header"

    log_debug_message(
        "createNewSession: using transfer method %s", output_transfer_method
    )

    if (
        output_transfer_method == "cookie"
        and config.get_cookie_same_site(request, user_context) == "none"
        and not config.cookie_secure
        and not (
            (
                app_info.top_level_api_domain == "localhost"
                or is_an_ip_address(app_info.top_level_api_domain)
            )
            and (
                app_info.get_top_level_website_domain(request, user_context)
                == "localhost"
                or is_an_ip_address(
                    app_info.get_top_level_website_domain(request, user_context)
                )
            )
        )
    ):
        # We can allow insecure cookie when both website & API domain are localhost or an IP
        # When either of them is a different domain, API domain needs to have https and a secure cookie to work
        raise Exception(
            "Since your API and website domain are different, for sessions to work, please use https on your apiDomain and don't set cookieSecure to false."
        )

    disable_anti_csrf = output_transfer_method == "header"
    session = await recipe_instance.recipe_implementation.create_new_session(
        user_id,
        recipe_user_id,
        final_access_token_payload,
        session_data_in_database,
        disable_anti_csrf,
        tenant_id,
        user_context=user_context,
    )

    log_debug_message("createNewSession: Session created in core built")

    for transfer_method in available_token_transfer_methods:
        if (
            transfer_method != output_transfer_method
            and get_token(request, "access", transfer_method) is not None
        ):
            session.response_mutators.append(
                clear_session_mutator(config, transfer_method, request)
            )

    log_debug_message("createNewSession: Cleared old tokens")

    await session.attach_to_request_response(
        request, output_transfer_method, user_context
    )
    log_debug_message("createNewSession: Attached new tokens to res")

    return session


# In all cases: if sIdRefreshToken token exists (so it's a legacy session) we clear it.
# Check http://localhost:3002/docs/contribute/decisions/session/0008 for further details and a table of expected behaviours


async def refresh_session_in_request(
    request: Any,
    user_context: Dict[str, Any],
    config: SessionConfig,
    recipe_interface_impl: SessionRecipeInterface,
) -> SessionContainer:
    log_debug_message("refreshSession: Started")

    response_mutators: List[ResponseMutator] = []

    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            Supertokens.get_instance().app_info.framework
        ].wrap_request(request)

    log_debug_message("refreshSession: Wrapping done")
    user_context = set_request_in_user_context_if_not_defined(user_context, request)

    clear_session_cookies_from_older_cookie_domain(request, config, user_context)

    refresh_tokens: Dict[TokenTransferMethod, Optional[str]] = {}

    for transfer_method in available_token_transfer_methods:
        refresh_tokens[transfer_method] = get_token(request, "refresh", transfer_method)
        if refresh_tokens[transfer_method] is not None:
            log_debug_message(
                "refreshSession: got refresh token from %s", transfer_method
            )

    allowed_transfer_method = config.get_token_transfer_method(
        request, False, user_context
    )
    log_debug_message(
        "refreshSession: getTokenTransferMethod returned: %s",
        allowed_transfer_method,
    )

    request_transfer_method: TokenTransferMethod
    refresh_token: Optional[str]

    if (allowed_transfer_method in ("any", "header")) and (
        refresh_tokens.get("header")
    ):
        log_debug_message("refreshSession: using header transfer method")
        request_transfer_method = "header"
        refresh_token = refresh_tokens["header"]
    elif (allowed_transfer_method in ("any", "cookie")) and (
        refresh_tokens.get("cookie")
    ):
        log_debug_message("refreshSession: using cookie transfer method")
        request_transfer_method = "cookie"
        refresh_token = refresh_tokens["cookie"]
    else:
        # This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code
        if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
            log_debug_message(
                "refreshSession: cleared legacy id refresh token because refresh token was not found"
            )
            response_mutators.append(
                set_cookie_response_mutator(
                    config,
                    LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME,
                    "",
                    0,
                    "access_token_path",
                    request,
                )
            )

        # We need to clear the access token cookie if
        # - the refresh token is not found, and
        # - the allowed_transfer_method is 'cookie' or 'any', and
        # - an access token cookie exists (otherwise it'd be a no-op)
        # See: https://github.com/supertokens/supertokens-node/issues/790
        if (
            allowed_transfer_method in ("cookie", "any")
            and get_token(request, "access", "cookie") is not None
        ):
            log_debug_message(
                "refreshSession: cleared all session tokens and returning UNAUTHORISED because refresh_token in request is None"
            )

            # We're clearing all session tokens instead of just the access token and then throwing an UNAUTHORISED
            # error with `clear_tokens: False`. This approach avoids confusion and we don't want to retain session
            # tokens on the client in any case if the refresh API is called without a refresh token but with an access token.
            return raise_unauthorised_exception(
                "Refresh token not found but access token is present. Clearing all tokens.",
                clear_tokens=True,
                response_mutators=response_mutators,
            )

        return raise_unauthorised_exception(
            "Refresh token not found. Are you sending the refresh token in the request?",
            clear_tokens=True,
            response_mutators=response_mutators,
        )

    assert refresh_token is not None

    disable_anti_csrf = request_transfer_method == "header"
    anti_csrf_token = get_anti_csrf_header(request)

    anti_csrf = config.anti_csrf_function_or_string
    if callable(anti_csrf):
        anti_csrf = anti_csrf(request, user_context)

    if anti_csrf == "VIA_CUSTOM_HEADER" and not disable_anti_csrf:
        if get_rid_from_header(request) is None:
            log_debug_message(
                "refreshSession: Returning UNAUTHORISED because anti-csrf token is undefined"
            )
            # see https://github.com/supertokens/supertokens-node/issues/141
            raise_unauthorised_exception(
                "anti-csrf check failed. Please pass 'rid: \"session\"' header in the request.",
                clear_tokens=True,
            )
        disable_anti_csrf = True

    session: Optional[SessionContainer] = None
    try:
        session = await recipe_interface_impl.refresh_session(
            refresh_token, anti_csrf_token, disable_anti_csrf, user_context
        )
    except SuperTokensSessionError as e:
        if isinstance(e, TokenTheftError) or (
            isinstance(e, UnauthorisedError) and getattr(e, "clear_tokens") is True
        ):
            # We clear the LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME here because we want to limit the scope of
            # this legacy/migration code so the token clearing functions in the error handlers do not.
            if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
                log_debug_message(
                    "refreshSession: cleared legacy id refresh token because refresh token was not found"
                )
                response_mutators.append(
                    set_cookie_response_mutator(
                        config,
                        LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME,
                        "",
                        0,
                        "access_token_path",
                        request,
                    )
                )

        e.response_mutators.extend(response_mutators)
        raise e

    log_debug_message(
        "refreshSession: Attaching refreshed session info as %s",
        request_transfer_method,
    )

    # We clear the tokens in all token transfer methods we are not going to overwrite:
    for transfer_method in available_token_transfer_methods:
        if (
            transfer_method != request_transfer_method
            and refresh_tokens[transfer_method] is not None
        ):
            response_mutators.append(
                clear_session_mutator(config, transfer_method, request)
            )

    await session.attach_to_request_response(
        request, request_transfer_method, user_context
    )
    log_debug_message("refreshSession: Success!")

    # This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code
    if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
        log_debug_message(
            "refreshSession: cleared legacy id refresh token after successful refresh"
        )
        response_mutators.append(
            set_cookie_response_mutator(
                config,
                LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME,
                "",
                0,
                "access_token_path",
                request,
            )
        )

    session.response_mutators.extend(response_mutators)
    return session

Functions

async def create_new_session_in_request(request: Any, user_context: Dict[str, Any], recipe_instance: SessionRecipe, access_token_payload: Dict[str, Any], user_id: str, recipe_user_id: RecipeUserId, config: SessionConfig, app_info: AppInfo, session_data_in_database: Dict[str, Any], tenant_id: str) ‑> SessionContainer
Expand source code
async def create_new_session_in_request(
    request: Any,
    user_context: Dict[str, Any],
    recipe_instance: SessionRecipe,
    access_token_payload: Dict[str, Any],
    user_id: str,
    recipe_user_id: RecipeUserId,
    config: SessionConfig,
    app_info: AppInfo,
    session_data_in_database: Dict[str, Any],
    tenant_id: str,
) -> SessionContainer:
    log_debug_message("createNewSession: Started")

    # Handling framework specific request/response wrapping
    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            Supertokens.get_instance().app_info.framework
        ].wrap_request(request)

    log_debug_message("createNewSession: Wrapping done")
    user_context = set_request_in_user_context_if_not_defined(user_context, request)

    claims_added_by_other_recipes = recipe_instance.get_claims_added_by_other_recipes()
    issuer = (
        app_info.api_domain.get_as_string_dangerous()
        + app_info.api_base_path.get_as_string_dangerous()
    )

    final_access_token_payload = {**access_token_payload, "iss": issuer}

    for prop in protected_props:
        if prop in final_access_token_payload:
            del final_access_token_payload[prop]

    for claim in claims_added_by_other_recipes:
        update = await claim.build(
            user_id, recipe_user_id, tenant_id, final_access_token_payload, user_context
        )
        final_access_token_payload.update(update)

    log_debug_message("createNewSession: Access token payload built")

    output_transfer_method = config.get_token_transfer_method(
        request, True, user_context
    )
    if output_transfer_method == "any":
        auth_mode_header = get_auth_mode_from_header(request)
        if auth_mode_header == "cookie":
            output_transfer_method = auth_mode_header
        else:
            output_transfer_method = "header"

    log_debug_message(
        "createNewSession: using transfer method %s", output_transfer_method
    )

    if (
        output_transfer_method == "cookie"
        and config.get_cookie_same_site(request, user_context) == "none"
        and not config.cookie_secure
        and not (
            (
                app_info.top_level_api_domain == "localhost"
                or is_an_ip_address(app_info.top_level_api_domain)
            )
            and (
                app_info.get_top_level_website_domain(request, user_context)
                == "localhost"
                or is_an_ip_address(
                    app_info.get_top_level_website_domain(request, user_context)
                )
            )
        )
    ):
        # We can allow insecure cookie when both website & API domain are localhost or an IP
        # When either of them is a different domain, API domain needs to have https and a secure cookie to work
        raise Exception(
            "Since your API and website domain are different, for sessions to work, please use https on your apiDomain and don't set cookieSecure to false."
        )

    disable_anti_csrf = output_transfer_method == "header"
    session = await recipe_instance.recipe_implementation.create_new_session(
        user_id,
        recipe_user_id,
        final_access_token_payload,
        session_data_in_database,
        disable_anti_csrf,
        tenant_id,
        user_context=user_context,
    )

    log_debug_message("createNewSession: Session created in core built")

    for transfer_method in available_token_transfer_methods:
        if (
            transfer_method != output_transfer_method
            and get_token(request, "access", transfer_method) is not None
        ):
            session.response_mutators.append(
                clear_session_mutator(config, transfer_method, request)
            )

    log_debug_message("createNewSession: Cleared old tokens")

    await session.attach_to_request_response(
        request, output_transfer_method, user_context
    )
    log_debug_message("createNewSession: Attached new tokens to res")

    return session
async def get_session_from_request(request: Any, config: SessionConfig, recipe_interface_impl: SessionRecipeInterface, session_required: Optional[bool] = None, anti_csrf_check: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]] = None, user_context: Optional[Dict[str, Any]] = None) ‑> Optional[SessionContainer]
Expand source code
async def get_session_from_request(
    request: Any,
    config: SessionConfig,
    recipe_interface_impl: SessionRecipeInterface,
    session_required: Optional[bool] = None,
    anti_csrf_check: Optional[bool] = None,
    check_database: Optional[bool] = None,
    override_global_claim_validators: Optional[
        Callable[
            [List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
            MaybeAwaitable[List[SessionClaimValidator]],
        ]
    ] = None,
    user_context: Optional[Dict[str, Any]] = None,
) -> Optional[SessionContainer]:
    log_debug_message("getSession: Started")

    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            Supertokens.get_instance().app_info.framework
        ].wrap_request(request)

    log_debug_message("getSession: Wrapping done")

    user_context = set_request_in_user_context_if_not_defined(user_context, request)

    # This token isn't handled by getToken to limit the scope of this legacy/migration code
    if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
        log_debug_message(
            "getSession: Throwing TRY_REFRESH_TOKEN because the request is using a legacy session"
        )
        # This could create a spike on refresh calls during the update of the backend SDK
        return raise_try_refresh_token_exception(
            "using legacy session, please call the refresh API"
        )

    session_optional = not session_required
    log_debug_message("getSession: optional validation: %s", session_optional)

    access_tokens: Dict[TokenTransferMethod, ParsedJWTInfo] = {}

    # We check all token transfer methods for available access tokens
    for transfer_method in available_token_transfer_methods:
        token_string = get_token(request, "access", transfer_method)

        if token_string is not None:
            try:
                info = parse_jwt_without_signature_verification(token_string)
                validate_access_token_structure(info.payload, info.version)
                log_debug_message(
                    "getSession: got access token from %s", transfer_method
                )
                access_tokens[transfer_method] = info
            except Exception:
                log_debug_message(
                    "getSession: ignoring token in %s, because it doesn't match our access token structure",
                    transfer_method,
                )

    allowed_transfer_method = config.get_token_transfer_method(
        request, False, user_context
    )
    request_transfer_method: Optional[TokenTransferMethod] = None
    request_access_token: Union[ParsedJWTInfo, None] = None

    if (allowed_transfer_method in ("any", "header")) and access_tokens.get(
        "header"
    ) is not None:
        log_debug_message("getSession: using header transfer method")
        request_transfer_method = "header"
        request_access_token = access_tokens["header"]
    elif (allowed_transfer_method in ("any", "cookie")) and access_tokens.get(
        "cookie"
    ) is not None:
        log_debug_message("getSession: using cookie transfer method")

        # If multiple access tokens exist in the request cookie, throw TRY_REFRESH_TOKEN.
        # This prompts the client to call the refresh endpoint, clearing older_cookie_domain cookies (if set).
        # ensuring outdated token payload isn't used.
        if has_multiple_cookies_for_token_type(request, "access"):
            log_debug_message(
                "getSession: Throwing TRY_REFRESH_TOKEN because multiple access tokens are present in request cookies"
            )
            raise_try_refresh_token_exception(
                "Multiple access tokens present in the request cookies."
            )

        request_transfer_method = "cookie"
        request_access_token = access_tokens["cookie"]

    anti_csrf_token = get_anti_csrf_header(request)
    do_anti_csrf_check = anti_csrf_check

    if do_anti_csrf_check is None:
        do_anti_csrf_check = normalise_http_method(request.method()) != "get"
    if request_transfer_method == "header":
        do_anti_csrf_check = False
    if request_access_token is None:
        do_anti_csrf_check = False

    if callable(config.anti_csrf_function_or_string):
        anti_csrf = config.anti_csrf_function_or_string(request, user_context)
    else:
        anti_csrf = config.anti_csrf_function_or_string

    if do_anti_csrf_check and anti_csrf == "VIA_CUSTOM_HEADER":
        if anti_csrf == "VIA_CUSTOM_HEADER":
            if get_rid_from_header(request) is None:
                log_debug_message(
                    "getSession: Returning TRY_REFRESH_TOKEN because custom header (rid) was not passed"
                )
                raise_try_refresh_token_exception(
                    "anti-csrf check failed. Please pass 'rid: \"session\"' header in the request, or set doAntiCsrfCheck to false for this API"
                )

            log_debug_message("getSession: VIA_CUSTOM_HEADER anti-csrf check passed")
            do_anti_csrf_check = False

    log_debug_message("getSession: Value of antiCsrfToken is: %s", do_anti_csrf_check)

    session = await recipe_interface_impl.get_session(
        access_token=(
            request_access_token.raw_token_string
            if request_access_token is not None
            else None
        ),
        anti_csrf_token=anti_csrf_token,
        anti_csrf_check=do_anti_csrf_check,
        session_required=session_required,
        check_database=check_database,
        override_global_claim_validators=override_global_claim_validators,
        user_context=user_context,
    )

    if session is not None:
        claim_validators = await get_required_claim_validators(
            session, override_global_claim_validators, user_context
        )
        await session.assert_claims(claim_validators, user_context)

        # request_transfer_method can only be None here if the user overriddes get_session
        # to load the session by a custom method in that (very niche) case they also need to
        # override how the session is attached to the response.
        # In that scenario the transferMethod passed to attachToRequestResponse likely doesn't
        # matter, still, we follow the general fallback logic

        if request_transfer_method is not None:
            final_transfer_method = request_transfer_method
        elif allowed_transfer_method != "any":
            final_transfer_method = allowed_transfer_method
        else:
            final_transfer_method = "header"

        await session.attach_to_request_response(
            request, final_transfer_method, user_context
        )

    return session
async def refresh_session_in_request(request: Any, user_context: Dict[str, Any], config: SessionConfig, recipe_interface_impl: SessionRecipeInterface) ‑> SessionContainer
Expand source code
async def refresh_session_in_request(
    request: Any,
    user_context: Dict[str, Any],
    config: SessionConfig,
    recipe_interface_impl: SessionRecipeInterface,
) -> SessionContainer:
    log_debug_message("refreshSession: Started")

    response_mutators: List[ResponseMutator] = []

    if not hasattr(request, "wrapper_used") or not request.wrapper_used:
        request = FRAMEWORKS[
            Supertokens.get_instance().app_info.framework
        ].wrap_request(request)

    log_debug_message("refreshSession: Wrapping done")
    user_context = set_request_in_user_context_if_not_defined(user_context, request)

    clear_session_cookies_from_older_cookie_domain(request, config, user_context)

    refresh_tokens: Dict[TokenTransferMethod, Optional[str]] = {}

    for transfer_method in available_token_transfer_methods:
        refresh_tokens[transfer_method] = get_token(request, "refresh", transfer_method)
        if refresh_tokens[transfer_method] is not None:
            log_debug_message(
                "refreshSession: got refresh token from %s", transfer_method
            )

    allowed_transfer_method = config.get_token_transfer_method(
        request, False, user_context
    )
    log_debug_message(
        "refreshSession: getTokenTransferMethod returned: %s",
        allowed_transfer_method,
    )

    request_transfer_method: TokenTransferMethod
    refresh_token: Optional[str]

    if (allowed_transfer_method in ("any", "header")) and (
        refresh_tokens.get("header")
    ):
        log_debug_message("refreshSession: using header transfer method")
        request_transfer_method = "header"
        refresh_token = refresh_tokens["header"]
    elif (allowed_transfer_method in ("any", "cookie")) and (
        refresh_tokens.get("cookie")
    ):
        log_debug_message("refreshSession: using cookie transfer method")
        request_transfer_method = "cookie"
        refresh_token = refresh_tokens["cookie"]
    else:
        # This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code
        if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
            log_debug_message(
                "refreshSession: cleared legacy id refresh token because refresh token was not found"
            )
            response_mutators.append(
                set_cookie_response_mutator(
                    config,
                    LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME,
                    "",
                    0,
                    "access_token_path",
                    request,
                )
            )

        # We need to clear the access token cookie if
        # - the refresh token is not found, and
        # - the allowed_transfer_method is 'cookie' or 'any', and
        # - an access token cookie exists (otherwise it'd be a no-op)
        # See: https://github.com/supertokens/supertokens-node/issues/790
        if (
            allowed_transfer_method in ("cookie", "any")
            and get_token(request, "access", "cookie") is not None
        ):
            log_debug_message(
                "refreshSession: cleared all session tokens and returning UNAUTHORISED because refresh_token in request is None"
            )

            # We're clearing all session tokens instead of just the access token and then throwing an UNAUTHORISED
            # error with `clear_tokens: False`. This approach avoids confusion and we don't want to retain session
            # tokens on the client in any case if the refresh API is called without a refresh token but with an access token.
            return raise_unauthorised_exception(
                "Refresh token not found but access token is present. Clearing all tokens.",
                clear_tokens=True,
                response_mutators=response_mutators,
            )

        return raise_unauthorised_exception(
            "Refresh token not found. Are you sending the refresh token in the request?",
            clear_tokens=True,
            response_mutators=response_mutators,
        )

    assert refresh_token is not None

    disable_anti_csrf = request_transfer_method == "header"
    anti_csrf_token = get_anti_csrf_header(request)

    anti_csrf = config.anti_csrf_function_or_string
    if callable(anti_csrf):
        anti_csrf = anti_csrf(request, user_context)

    if anti_csrf == "VIA_CUSTOM_HEADER" and not disable_anti_csrf:
        if get_rid_from_header(request) is None:
            log_debug_message(
                "refreshSession: Returning UNAUTHORISED because anti-csrf token is undefined"
            )
            # see https://github.com/supertokens/supertokens-node/issues/141
            raise_unauthorised_exception(
                "anti-csrf check failed. Please pass 'rid: \"session\"' header in the request.",
                clear_tokens=True,
            )
        disable_anti_csrf = True

    session: Optional[SessionContainer] = None
    try:
        session = await recipe_interface_impl.refresh_session(
            refresh_token, anti_csrf_token, disable_anti_csrf, user_context
        )
    except SuperTokensSessionError as e:
        if isinstance(e, TokenTheftError) or (
            isinstance(e, UnauthorisedError) and getattr(e, "clear_tokens") is True
        ):
            # We clear the LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME here because we want to limit the scope of
            # this legacy/migration code so the token clearing functions in the error handlers do not.
            if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
                log_debug_message(
                    "refreshSession: cleared legacy id refresh token because refresh token was not found"
                )
                response_mutators.append(
                    set_cookie_response_mutator(
                        config,
                        LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME,
                        "",
                        0,
                        "access_token_path",
                        request,
                    )
                )

        e.response_mutators.extend(response_mutators)
        raise e

    log_debug_message(
        "refreshSession: Attaching refreshed session info as %s",
        request_transfer_method,
    )

    # We clear the tokens in all token transfer methods we are not going to overwrite:
    for transfer_method in available_token_transfer_methods:
        if (
            transfer_method != request_transfer_method
            and refresh_tokens[transfer_method] is not None
        ):
            response_mutators.append(
                clear_session_mutator(config, transfer_method, request)
            )

    await session.attach_to_request_response(
        request, request_transfer_method, user_context
    )
    log_debug_message("refreshSession: Success!")

    # This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code
    if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
        log_debug_message(
            "refreshSession: cleared legacy id refresh token after successful refresh"
        )
        response_mutators.append(
            set_cookie_response_mutator(
                config,
                LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME,
                "",
                0,
                "access_token_path",
                request,
            )
        )

    session.response_mutators.extend(response_mutators)
    return session