Module supertokens_python.recipe.session.session_request_functions
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING
from supertokens_python.logger import log_debug_message
from supertokens_python.recipe.session.access_token import (
from supertokens_python.recipe.session.constants import available_token_transfer_methods
from supertokens_python.recipe.session.cookie_and_header import (
from supertokens_python.recipe.session.exceptions import (
from supertokens_python.recipe.session.interfaces import (
RecipeInterface as SessionRecipeInterface,
from supertokens_python.recipe.session.interfaces import (
from supertokens_python.recipe.session.exceptions import (
from supertokens_python.recipe.session.jwt import (
from supertokens_python.recipe.session.utils import (
from supertokens_python.types import MaybeAwaitable
from supertokens_python.utils import (
from supertokens_python.supertokens import Supertokens
from .constants import protected_props
from supertokens_python.recipe.session.recipe import SessionRecipe
from supertokens_python.supertokens import AppInfo
from .interfaces import ResponseMutator
async def get_session_from_request(
request: Any,
config: SessionConfig,
recipe_interface_impl: SessionRecipeInterface,
session_required: Optional[bool] = None,
anti_csrf_check: Optional[bool] = None,
check_database: Optional[bool] = None,
override_global_claim_validators: Optional[
[List[SessionClaimValidator], SessionContainer, Dict[str, Any]],
] = None,
user_context: Optional[Dict[str, Any]] = None,
) -> Optional[SessionContainer]:
log_debug_message("getSession: Started")
if not hasattr(request, "wrapper_used") or not request.wrapper_used:
request = FRAMEWORKS[
log_debug_message("getSession: Wrapping done")
user_context = set_request_in_user_context_if_not_defined(user_context, request)
# This token isn't handled by getToken to limit the scope of this legacy/migration code
if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
"getSession: Throwing TRY_REFRESH_TOKEN because the request is using a legacy session"
# This could create a spike on refresh calls during the update of the backend SDK
return raise_try_refresh_token_exception(
"using legacy session, please call the refresh API"
session_optional = not session_required
log_debug_message("getSession: optional validation: %s", session_optional)
access_tokens: Dict[TokenTransferMethod, ParsedJWTInfo] = {}
# We check all token transfer methods for available access tokens
for transfer_method in available_token_transfer_methods:
token_string = get_token(request, "access", transfer_method)
if token_string is not None:
info = parse_jwt_without_signature_verification(token_string)
validate_access_token_structure(info.payload, info.version)
"getSession: got access token from %s", transfer_method
access_tokens[transfer_method] = info
except Exception:
"getSession: ignoring token in %s, because it doesn't match our access token structure",
allowed_transfer_method = config.get_token_transfer_method(
request, False, user_context
request_transfer_method: Optional[TokenTransferMethod] = None
request_access_token: Union[ParsedJWTInfo, None] = None
if (allowed_transfer_method in ("any", "header")) and access_tokens.get(
) is not None:
log_debug_message("getSession: using header transfer method")
request_transfer_method = "header"
request_access_token = access_tokens["header"]
elif (allowed_transfer_method in ("any", "cookie")) and access_tokens.get(
) is not None:
log_debug_message("getSession: using cookie transfer method")
request_transfer_method = "cookie"
request_access_token = access_tokens["cookie"]
anti_csrf_token = get_anti_csrf_header(request)
do_anti_csrf_check = anti_csrf_check
if do_anti_csrf_check is None:
do_anti_csrf_check = normalise_http_method(request.method()) != "get"
if request_transfer_method == "header":
do_anti_csrf_check = False
if request_access_token is None:
do_anti_csrf_check = False
if callable(config.anti_csrf_function_or_string):
anti_csrf = config.anti_csrf_function_or_string(request, user_context)
anti_csrf = config.anti_csrf_function_or_string
if do_anti_csrf_check and anti_csrf == "VIA_CUSTOM_HEADER":
if anti_csrf == "VIA_CUSTOM_HEADER":
if get_rid_from_header(request) is None:
"getSession: Returning TRY_REFRESH_TOKEN because custom header (rid) was not passed"
"anti-csrf check failed. Please pass 'rid: \"session\"' header in the request, or set doAntiCsrfCheck to false for this API"
log_debug_message("getSession: VIA_CUSTOM_HEADER anti-csrf check passed")
do_anti_csrf_check = False
log_debug_message("getSession: Value of antiCsrfToken is: %s", do_anti_csrf_check)
session = await recipe_interface_impl.get_session(
if request_access_token is not None
else None
if session is not None:
claim_validators = await get_required_claim_validators(
session, override_global_claim_validators, user_context
await session.assert_claims(claim_validators, user_context)
# request_transfer_method can only be None here if the user overriddes get_session
# to load the session by a custom method in that (very niche) case they also need to
# override how the session is attached to the response.
# In that scenario the transferMethod passed to attachToRequestResponse likely doesn't
# matter, still, we follow the general fallback logic
if request_transfer_method is not None:
final_transfer_method = request_transfer_method
elif allowed_transfer_method != "any":
final_transfer_method = allowed_transfer_method
final_transfer_method = "header"
await session.attach_to_request_response(
request, final_transfer_method, user_context
return session
async def create_new_session_in_request(
request: Any,
user_context: Dict[str, Any],
recipe_instance: SessionRecipe,
access_token_payload: Dict[str, Any],
user_id: str,
config: SessionConfig,
app_info: AppInfo,
session_data_in_database: Dict[str, Any],
tenant_id: str,
) -> SessionContainer:
log_debug_message("createNewSession: Started")
# Handling framework specific request/response wrapping
if not hasattr(request, "wrapper_used") or not request.wrapper_used:
request = FRAMEWORKS[
log_debug_message("createNewSession: Wrapping done")
user_context = set_request_in_user_context_if_not_defined(user_context, request)
claims_added_by_other_recipes = recipe_instance.get_claims_added_by_other_recipes()
issuer = (
+ app_info.api_base_path.get_as_string_dangerous()
final_access_token_payload = {**access_token_payload, "iss": issuer}
for prop in protected_props:
if prop in final_access_token_payload:
del final_access_token_payload[prop]
for claim in claims_added_by_other_recipes:
update = await, tenant_id, user_context)
log_debug_message("createNewSession: Access token payload built")
output_transfer_method = config.get_token_transfer_method(
request, True, user_context
if output_transfer_method == "any":
auth_mode_header = get_auth_mode_from_header(request)
if auth_mode_header == "cookie":
output_transfer_method = auth_mode_header
output_transfer_method = "header"
"createNewSession: using transfer method %s", output_transfer_method
if (
output_transfer_method == "cookie"
and config.get_cookie_same_site(request, user_context) == "none"
and not config.cookie_secure
and not (
app_info.top_level_api_domain == "localhost"
or is_an_ip_address(app_info.top_level_api_domain)
and (
app_info.get_top_level_website_domain(request, user_context)
== "localhost"
or is_an_ip_address(
app_info.get_top_level_website_domain(request, user_context)
# We can allow insecure cookie when both website & API domain are localhost or an IP
# When either of them is a different domain, API domain needs to have https and a secure cookie to work
raise Exception(
"Since your API and website domain are different, for sessions to work, please use https on your apiDomain and don't set cookieSecure to false."
disable_anti_csrf = output_transfer_method == "header"
session = await recipe_instance.recipe_implementation.create_new_session(
log_debug_message("createNewSession: Session created in core built")
for transfer_method in available_token_transfer_methods:
if (
transfer_method != output_transfer_method
and get_token(request, "access", transfer_method) is not None
clear_session_mutator(config, transfer_method, request)
log_debug_message("createNewSession: Cleared old tokens")
await session.attach_to_request_response(
request, output_transfer_method, user_context
log_debug_message("createNewSession: Attached new tokens to res")
return session
# In all cases: if sIdRefreshToken token exists (so it's a legacy session) we clear it.
# Check http://localhost:3002/docs/contribute/decisions/session/0008 for further details and a table of expected behaviours
async def refresh_session_in_request(
request: Any,
user_context: Dict[str, Any],
config: SessionConfig,
recipe_interface_impl: SessionRecipeInterface,
) -> SessionContainer:
log_debug_message("refreshSession: Started")
response_mutators: List[ResponseMutator] = []
if not hasattr(request, "wrapper_used") or not request.wrapper_used:
request = FRAMEWORKS[
log_debug_message("refreshSession: Wrapping done")
user_context = set_request_in_user_context_if_not_defined(user_context, request)
refresh_tokens: Dict[TokenTransferMethod, Optional[str]] = {}
for transfer_method in available_token_transfer_methods:
refresh_tokens[transfer_method] = get_token(request, "refresh", transfer_method)
if refresh_tokens[transfer_method] is not None:
"refreshSession: got refresh token from %s", transfer_method
allowed_transfer_method = config.get_token_transfer_method(
request, False, user_context
"refreshSession: getTokenTransferMethod returned: %s",
request_transfer_method: TokenTransferMethod
refresh_token: Optional[str]
if (allowed_transfer_method in ("any", "header")) and (
log_debug_message("refreshSession: using header transfer method")
request_transfer_method = "header"
refresh_token = refresh_tokens["header"]
elif (allowed_transfer_method in ("any", "cookie")) and (
log_debug_message("refreshSession: using cookie transfer method")
request_transfer_method = "cookie"
refresh_token = refresh_tokens["cookie"]
# This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code
if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
"refreshSession: cleared legacy id refresh token because refresh token was not found"
"refreshSession: UNAUTHORISED because refresh_token in request is None"
return raise_unauthorised_exception(
"Refresh token not found. Are you sending the refresh token in the request?",
assert refresh_token is not None
disable_anti_csrf = request_transfer_method == "header"
anti_csrf_token = get_anti_csrf_header(request)
anti_csrf = config.anti_csrf_function_or_string
if callable(anti_csrf):
anti_csrf = anti_csrf(request, user_context)
if anti_csrf == "VIA_CUSTOM_HEADER" and not disable_anti_csrf:
if get_rid_from_header(request) is None:
"refreshSession: Returning UNAUTHORISED because anti-csrf token is undefined"
# see
"anti-csrf check failed. Please pass 'rid: \"session\"' header in the request.",
disable_anti_csrf = True
session: Optional[SessionContainer] = None
session = await recipe_interface_impl.refresh_session(
refresh_token, anti_csrf_token, disable_anti_csrf, user_context
except SuperTokensSessionError as e:
if isinstance(e, TokenTheftError) or (
isinstance(e, UnauthorisedError) and getattr(e, "clear_tokens") is True
# We clear the LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME here because we want to limit the scope of
# this legacy/migration code so the token clearing functions in the error handlers do not.
if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
"refreshSession: cleared legacy id refresh token because refresh token was not found"
raise e
"refreshSession: Attaching refreshed session info as %s",
# We clear the tokens in all token transfer methods we are not going to overwrite:
for transfer_method in available_token_transfer_methods:
if (
transfer_method != request_transfer_method
and refresh_tokens[transfer_method] is not None
clear_session_mutator(config, transfer_method, request)
await session.attach_to_request_response(
request, request_transfer_method, user_context
log_debug_message("refreshSession: Success!")
# This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code
if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None:
"refreshSession: cleared legacy id refresh token after successful refresh"
return session
async def create_new_session_in_request(request: Any, user_context: Dict[str, Any], recipe_instance: SessionRecipe, access_token_payload: Dict[str, Any], user_id: str, config: SessionConfig, app_info: AppInfo, session_data_in_database: Dict[str, Any], tenant_id: str) ‑> SessionContainer
Expand source code
async def create_new_session_in_request( request: Any, user_context: Dict[str, Any], recipe_instance: SessionRecipe, access_token_payload: Dict[str, Any], user_id: str, config: SessionConfig, app_info: AppInfo, session_data_in_database: Dict[str, Any], tenant_id: str, ) -> SessionContainer: log_debug_message("createNewSession: Started") # Handling framework specific request/response wrapping if not hasattr(request, "wrapper_used") or not request.wrapper_used: request = FRAMEWORKS[ Supertokens.get_instance().app_info.framework ].wrap_request(request) log_debug_message("createNewSession: Wrapping done") user_context = set_request_in_user_context_if_not_defined(user_context, request) claims_added_by_other_recipes = recipe_instance.get_claims_added_by_other_recipes() issuer = ( app_info.api_domain.get_as_string_dangerous() + app_info.api_base_path.get_as_string_dangerous() ) final_access_token_payload = {**access_token_payload, "iss": issuer} for prop in protected_props: if prop in final_access_token_payload: del final_access_token_payload[prop] for claim in claims_added_by_other_recipes: update = await, tenant_id, user_context) final_access_token_payload.update(update) log_debug_message("createNewSession: Access token payload built") output_transfer_method = config.get_token_transfer_method( request, True, user_context ) if output_transfer_method == "any": auth_mode_header = get_auth_mode_from_header(request) if auth_mode_header == "cookie": output_transfer_method = auth_mode_header else: output_transfer_method = "header" log_debug_message( "createNewSession: using transfer method %s", output_transfer_method ) if ( output_transfer_method == "cookie" and config.get_cookie_same_site(request, user_context) == "none" and not config.cookie_secure and not ( ( app_info.top_level_api_domain == "localhost" or is_an_ip_address(app_info.top_level_api_domain) ) and ( app_info.get_top_level_website_domain(request, user_context) == "localhost" or is_an_ip_address( app_info.get_top_level_website_domain(request, user_context) ) ) ) ): # We can allow insecure cookie when both website & API domain are localhost or an IP # When either of them is a different domain, API domain needs to have https and a secure cookie to work raise Exception( "Since your API and website domain are different, for sessions to work, please use https on your apiDomain and don't set cookieSecure to false." ) disable_anti_csrf = output_transfer_method == "header" session = await recipe_instance.recipe_implementation.create_new_session( user_id, final_access_token_payload, session_data_in_database, disable_anti_csrf, tenant_id, user_context=user_context, ) log_debug_message("createNewSession: Session created in core built") for transfer_method in available_token_transfer_methods: if ( transfer_method != output_transfer_method and get_token(request, "access", transfer_method) is not None ): session.response_mutators.append( clear_session_mutator(config, transfer_method, request) ) log_debug_message("createNewSession: Cleared old tokens") await session.attach_to_request_response( request, output_transfer_method, user_context ) log_debug_message("createNewSession: Attached new tokens to res") return session
async def get_session_from_request(request: Any, config: SessionConfig, recipe_interface_impl: SessionRecipeInterface, session_required: Optional[bool] = None, anti_csrf_check: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[Callable[[List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]]]] = None, user_context: Optional[Dict[str, Any]] = None) ‑> Optional[SessionContainer]
Expand source code
async def get_session_from_request( request: Any, config: SessionConfig, recipe_interface_impl: SessionRecipeInterface, session_required: Optional[bool] = None, anti_csrf_check: Optional[bool] = None, check_database: Optional[bool] = None, override_global_claim_validators: Optional[ Callable[ [List[SessionClaimValidator], SessionContainer, Dict[str, Any]], MaybeAwaitable[List[SessionClaimValidator]], ] ] = None, user_context: Optional[Dict[str, Any]] = None, ) -> Optional[SessionContainer]: log_debug_message("getSession: Started") if not hasattr(request, "wrapper_used") or not request.wrapper_used: request = FRAMEWORKS[ Supertokens.get_instance().app_info.framework ].wrap_request(request) log_debug_message("getSession: Wrapping done") user_context = set_request_in_user_context_if_not_defined(user_context, request) # This token isn't handled by getToken to limit the scope of this legacy/migration code if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None: log_debug_message( "getSession: Throwing TRY_REFRESH_TOKEN because the request is using a legacy session" ) # This could create a spike on refresh calls during the update of the backend SDK return raise_try_refresh_token_exception( "using legacy session, please call the refresh API" ) session_optional = not session_required log_debug_message("getSession: optional validation: %s", session_optional) access_tokens: Dict[TokenTransferMethod, ParsedJWTInfo] = {} # We check all token transfer methods for available access tokens for transfer_method in available_token_transfer_methods: token_string = get_token(request, "access", transfer_method) if token_string is not None: try: info = parse_jwt_without_signature_verification(token_string) validate_access_token_structure(info.payload, info.version) log_debug_message( "getSession: got access token from %s", transfer_method ) access_tokens[transfer_method] = info except Exception: log_debug_message( "getSession: ignoring token in %s, because it doesn't match our access token structure", transfer_method, ) allowed_transfer_method = config.get_token_transfer_method( request, False, user_context ) request_transfer_method: Optional[TokenTransferMethod] = None request_access_token: Union[ParsedJWTInfo, None] = None if (allowed_transfer_method in ("any", "header")) and access_tokens.get( "header" ) is not None: log_debug_message("getSession: using header transfer method") request_transfer_method = "header" request_access_token = access_tokens["header"] elif (allowed_transfer_method in ("any", "cookie")) and access_tokens.get( "cookie" ) is not None: log_debug_message("getSession: using cookie transfer method") request_transfer_method = "cookie" request_access_token = access_tokens["cookie"] anti_csrf_token = get_anti_csrf_header(request) do_anti_csrf_check = anti_csrf_check if do_anti_csrf_check is None: do_anti_csrf_check = normalise_http_method(request.method()) != "get" if request_transfer_method == "header": do_anti_csrf_check = False if request_access_token is None: do_anti_csrf_check = False if callable(config.anti_csrf_function_or_string): anti_csrf = config.anti_csrf_function_or_string(request, user_context) else: anti_csrf = config.anti_csrf_function_or_string if do_anti_csrf_check and anti_csrf == "VIA_CUSTOM_HEADER": if anti_csrf == "VIA_CUSTOM_HEADER": if get_rid_from_header(request) is None: log_debug_message( "getSession: Returning TRY_REFRESH_TOKEN because custom header (rid) was not passed" ) raise_try_refresh_token_exception( "anti-csrf check failed. Please pass 'rid: \"session\"' header in the request, or set doAntiCsrfCheck to false for this API" ) log_debug_message("getSession: VIA_CUSTOM_HEADER anti-csrf check passed") do_anti_csrf_check = False log_debug_message("getSession: Value of antiCsrfToken is: %s", do_anti_csrf_check) session = await recipe_interface_impl.get_session( access_token=( request_access_token.raw_token_string if request_access_token is not None else None ), anti_csrf_token=anti_csrf_token, anti_csrf_check=do_anti_csrf_check, session_required=session_required, check_database=check_database, override_global_claim_validators=override_global_claim_validators, user_context=user_context, ) if session is not None: claim_validators = await get_required_claim_validators( session, override_global_claim_validators, user_context ) await session.assert_claims(claim_validators, user_context) # request_transfer_method can only be None here if the user overriddes get_session # to load the session by a custom method in that (very niche) case they also need to # override how the session is attached to the response. # In that scenario the transferMethod passed to attachToRequestResponse likely doesn't # matter, still, we follow the general fallback logic if request_transfer_method is not None: final_transfer_method = request_transfer_method elif allowed_transfer_method != "any": final_transfer_method = allowed_transfer_method else: final_transfer_method = "header" await session.attach_to_request_response( request, final_transfer_method, user_context ) return session
async def refresh_session_in_request(request: Any, user_context: Dict[str, Any], config: SessionConfig, recipe_interface_impl: SessionRecipeInterface) ‑> SessionContainer
Expand source code
async def refresh_session_in_request( request: Any, user_context: Dict[str, Any], config: SessionConfig, recipe_interface_impl: SessionRecipeInterface, ) -> SessionContainer: log_debug_message("refreshSession: Started") response_mutators: List[ResponseMutator] = [] if not hasattr(request, "wrapper_used") or not request.wrapper_used: request = FRAMEWORKS[ Supertokens.get_instance().app_info.framework ].wrap_request(request) log_debug_message("refreshSession: Wrapping done") user_context = set_request_in_user_context_if_not_defined(user_context, request) refresh_tokens: Dict[TokenTransferMethod, Optional[str]] = {} for transfer_method in available_token_transfer_methods: refresh_tokens[transfer_method] = get_token(request, "refresh", transfer_method) if refresh_tokens[transfer_method] is not None: log_debug_message( "refreshSession: got refresh token from %s", transfer_method ) allowed_transfer_method = config.get_token_transfer_method( request, False, user_context ) log_debug_message( "refreshSession: getTokenTransferMethod returned: %s", allowed_transfer_method, ) request_transfer_method: TokenTransferMethod refresh_token: Optional[str] if (allowed_transfer_method in ("any", "header")) and ( refresh_tokens.get("header") ): log_debug_message("refreshSession: using header transfer method") request_transfer_method = "header" refresh_token = refresh_tokens["header"] elif (allowed_transfer_method in ("any", "cookie")) and ( refresh_tokens.get("cookie") ): log_debug_message("refreshSession: using cookie transfer method") request_transfer_method = "cookie" refresh_token = refresh_tokens["cookie"] else: # This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None: log_debug_message( "refreshSession: cleared legacy id refresh token because refresh token was not found" ) response_mutators.append( set_cookie_response_mutator( config, LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME, "", 0, "access_token_path", request, ) ) log_debug_message( "refreshSession: UNAUTHORISED because refresh_token in request is None" ) return raise_unauthorised_exception( "Refresh token not found. Are you sending the refresh token in the request?", clear_tokens=False, response_mutators=response_mutators, ) assert refresh_token is not None disable_anti_csrf = request_transfer_method == "header" anti_csrf_token = get_anti_csrf_header(request) anti_csrf = config.anti_csrf_function_or_string if callable(anti_csrf): anti_csrf = anti_csrf(request, user_context) if anti_csrf == "VIA_CUSTOM_HEADER" and not disable_anti_csrf: if get_rid_from_header(request) is None: log_debug_message( "refreshSession: Returning UNAUTHORISED because anti-csrf token is undefined" ) # see raise_unauthorised_exception( "anti-csrf check failed. Please pass 'rid: \"session\"' header in the request.", clear_tokens=False, ) disable_anti_csrf = True session: Optional[SessionContainer] = None try: session = await recipe_interface_impl.refresh_session( refresh_token, anti_csrf_token, disable_anti_csrf, user_context ) except SuperTokensSessionError as e: if isinstance(e, TokenTheftError) or ( isinstance(e, UnauthorisedError) and getattr(e, "clear_tokens") is True ): # We clear the LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME here because we want to limit the scope of # this legacy/migration code so the token clearing functions in the error handlers do not. if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None: log_debug_message( "refreshSession: cleared legacy id refresh token because refresh token was not found" ) response_mutators.append( set_cookie_response_mutator( config, LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME, "", 0, "access_token_path", request, ) ) e.response_mutators.extend(response_mutators) raise e log_debug_message( "refreshSession: Attaching refreshed session info as %s", request_transfer_method, ) # We clear the tokens in all token transfer methods we are not going to overwrite: for transfer_method in available_token_transfer_methods: if ( transfer_method != request_transfer_method and refresh_tokens[transfer_method] is not None ): response_mutators.append( clear_session_mutator(config, transfer_method, request) ) await session.attach_to_request_response( request, request_transfer_method, user_context ) log_debug_message("refreshSession: Success!") # This token isn't handled by getToken/setToken to limit the scope of this legacy/migration code if request.get_cookie(LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME) is not None: log_debug_message( "refreshSession: cleared legacy id refresh token after successful refresh" ) response_mutators.append( set_cookie_response_mutator( config, LEGACY_ID_REFRESH_TOKEN_COOKIE_NAME, "", 0, "access_token_path", request, ) ) session.response_mutators.extend(response_mutators) return session