Module supertokens_python.querier

Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import asyncio
from json import JSONDecodeError
from os import environ
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple

from httpx import AsyncClient, ConnectTimeout, NetworkError, Response

from .constants import (
    API_KEY_HEADER,
    API_VERSION,
    API_VERSION_HEADER,
    RID_KEY_HEADER,
    SUPPORTED_CDI_VERSIONS,
    RATE_LIMIT_STATUS_CODE,
)
from .normalised_url_path import NormalisedURLPath

if TYPE_CHECKING:
    from .supertokens import Host

from typing import List, Set, Union

from .process_state import PROCESS_STATE, ProcessState
from .utils import find_max_version, is_4xx_error, is_5xx_error
from sniffio import AsyncLibraryNotFoundError
from supertokens_python.async_to_sync_wrapper import create_or_get_event_loop
from supertokens_python.utils import get_timestamp_ms


class Querier:
    __init_called = False
    __hosts: List[Host] = []
    __api_key: Union[None, str] = None
    api_version = None
    __last_tried_index: int = 0
    __hosts_alive_for_testing: Set[str] = set()
    network_interceptor: Optional[
        Callable[
            [
                str,
                str,
                Dict[str, Any],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
            ],
            Tuple[
                str,
                str,
                Dict[str, Any],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
            ],
        ]
    ] = None
    __global_cache_tag = get_timestamp_ms()
    __disable_cache = False

    def __init__(self, hosts: List[Host], rid_to_core: Union[None, str] = None):
        self.__hosts = hosts
        self.__rid_to_core = None
        if rid_to_core is not None:
            self.__rid_to_core = rid_to_core

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise Exception("calling testing function in non testing env")
        Querier.__init_called = False

    @staticmethod
    def get_hosts_alive_for_testing():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise Exception("calling testing function in non testing env")
        return Querier.__hosts_alive_for_testing

    async def api_request(
        self,
        url: str,
        method: str,
        attempts_remaining: int,
        *args: Any,
        **kwargs: Any,
    ) -> Response:
        if attempts_remaining == 0:
            raise Exception("Retry request failed")

        try:
            async with AsyncClient(timeout=30.0) as client:
                if method == "GET":
                    return await client.get(url, *args, **kwargs)  # type: ignore
                if method == "POST":
                    return await client.post(url, *args, **kwargs)  # type: ignore
                if method == "PUT":
                    return await client.put(url, *args, **kwargs)  # type: ignore
                if method == "DELETE":
                    return await client.delete(url, *args, **kwargs)  # type: ignore
                raise Exception("Shouldn't come here")
        except AsyncLibraryNotFoundError:
            # Retry
            loop = create_or_get_event_loop()
            return loop.run_until_complete(
                self.api_request(url, method, attempts_remaining - 1, *args, **kwargs)
            )

    async def get_api_version(self, user_context: Union[Dict[str, Any], None] = None):
        if user_context is None:
            user_context = {}

        if Querier.api_version is not None:
            return Querier.api_version

        ProcessState.get_instance().add_state(
            PROCESS_STATE.CALLING_SERVICE_IN_GET_API_VERSION
        )

        async def f(url: str, method: str) -> Response:
            from supertokens_python.supertokens import (
                Supertokens,
                get_request_from_user_context,
            )

            headers = {}
            if Querier.__api_key is not None:
                headers = {API_KEY_HEADER: Querier.__api_key}

            # Get app info
            app_info = Supertokens.get_instance().app_info

            req = get_request_from_user_context(user_context)
            website_domain = app_info.get_origin(req, user_context)
            # Prepare query parameters
            query_params = {
                "apiDomain": app_info.api_domain.get_as_string_dangerous(),
                "websiteDomain": website_domain.get_as_string_dangerous(),
            }

            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    query_params,
                    _,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, query_params, {}, user_context
                )

            return await self.api_request(
                url, method, 2, headers=headers, params=query_params
            )

        response = await self.__send_request_helper(
            NormalisedURLPath(API_VERSION), "GET", f, len(self.__hosts)
        )
        cdi_supported_by_server = response["versions"]
        api_version = find_max_version(cdi_supported_by_server, SUPPORTED_CDI_VERSIONS)

        if api_version is None:
            raise Exception(
                "The running SuperTokens core version is not compatible with this python "
                "SDK. Please visit https://supertokens.io/docs/community/compatibility-table "
                "to find the right versions"
            )

        Querier.api_version = api_version
        return Querier.api_version

    @staticmethod
    def get_instance(rid_to_core: Union[str, None] = None):
        if not Querier.__init_called:
            raise Exception(
                "Please call the supertokens.init function before using SuperTokens"
            )
        return Querier(Querier.__hosts, rid_to_core)

    @staticmethod
    def init(
        hosts: List[Host],
        api_key: Union[str, None] = None,
        network_interceptor: Optional[
            Callable[
                [
                    str,
                    str,
                    Dict[str, Any],
                    Optional[Dict[str, Any]],
                    Optional[Dict[str, Any]],
                    Optional[Dict[str, Any]],
                ],
                Tuple[
                    str,
                    str,
                    Dict[str, Any],
                    Optional[Dict[str, Any]],
                    Optional[Dict[str, Any]],
                ],
            ]
        ] = None,
        disable_cache: bool = False,
    ):
        if not Querier.__init_called:
            Querier.__init_called = True
            Querier.__hosts = hosts
            Querier.__api_key = api_key
            Querier.api_version = None
            Querier.__last_tried_index = 0
            Querier.__hosts_alive_for_testing = set()
            Querier.network_interceptor = network_interceptor
            Querier.__disable_cache = disable_cache

    async def __get_headers_with_api_version(
        self, path: NormalisedURLPath, user_context: Union[Dict[str, Any], None]
    ):
        headers = {API_VERSION_HEADER: await self.get_api_version(user_context)}
        if Querier.__api_key is not None:
            headers = {**headers, API_KEY_HEADER: Querier.__api_key}
        if path.is_a_recipe_path() and self.__rid_to_core is not None:
            headers = {**headers, RID_KEY_HEADER: self.__rid_to_core}
        return headers

    async def send_get_request(
        self,
        path: NormalisedURLPath,
        params: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
    ) -> Dict[str, Any]:
        if params is None:
            params = {}

        async def f(url: str, method: str) -> Response:
            headers = await self.__get_headers_with_api_version(path, user_context)
            nonlocal params

            assert params is not None

            # Sort the keys for deterministic order
            sorted_keys = sorted(params.keys())
            sorted_header_keys = sorted(headers.keys())

            # Start with the path as the unique key
            unique_key = path.get_as_string_dangerous()

            # Append sorted params to the unique key
            for key in sorted_keys:
                value = params[key]
                unique_key += f";{key}={value}"

            # Append a separator for headers
            unique_key += ";hdrs"

            # Append sorted headers to the unique key
            for key in sorted_header_keys:
                value = headers[key]
                unique_key += f";{key}={value}"

            if user_context is not None:
                if (
                    user_context.get("_default", {}).get("global_cache_tag", -1)
                    != Querier.__global_cache_tag
                ):
                    self.invalidate_core_call_cache(user_context, False)

                if not Querier.__disable_cache and unique_key in user_context.get(
                    "_default", {}
                ).get("core_call_cache", {}):
                    return user_context["_default"]["core_call_cache"][unique_key]

            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    params,
                    _,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, params, {}, user_context
                )

            response = await self.api_request(
                url,
                method,
                2,
                headers=headers,
                params=params,
            )

            if (
                response.status_code == 200
                and not Querier.__disable_cache
                and user_context is not None
            ):
                user_context["_default"] = {
                    **user_context.get("_default", {}),
                    "core_call_cache": {
                        **user_context.get("_default", {}).get("core_call_cache", {}),
                        unique_key: response,
                    },
                    "global_cache_tag": Querier.__global_cache_tag,
                }

            return response

        return await self.__send_request_helper(path, "GET", f, len(self.__hosts))

    async def send_post_request(
        self,
        path: NormalisedURLPath,
        data: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
        test: bool = False,
    ) -> Dict[str, Any]:
        self.invalidate_core_call_cache(user_context)
        if data is None:
            data = {}

        if (
            ("SUPERTOKENS_ENV" in environ)
            and (environ["SUPERTOKENS_ENV"] == "testing")
            and test
        ):
            return data

        headers = await self.__get_headers_with_api_version(path, user_context)
        headers["content-type"] = "application/json; charset=utf-8"

        async def f(url: str, method: str) -> Response:
            nonlocal headers, data
            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    _,
                    data,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, {}, data, user_context
                )
            return await self.api_request(
                url,
                method,
                2,
                headers=headers,
                json=data,
            )

        return await self.__send_request_helper(path, "POST", f, len(self.__hosts))

    async def send_delete_request(
        self,
        path: NormalisedURLPath,
        params: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
    ) -> Dict[str, Any]:
        self.invalidate_core_call_cache(user_context)
        if params is None:
            params = {}

        async def f(url: str, method: str) -> Response:
            headers = await self.__get_headers_with_api_version(path, user_context)
            nonlocal params
            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    params,
                    _,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, params, {}, user_context
                )
            return await self.api_request(
                url,
                method,
                2,
                headers=headers,
                params=params,
            )

        return await self.__send_request_helper(path, "DELETE", f, len(self.__hosts))

    async def send_put_request(
        self,
        path: NormalisedURLPath,
        data: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
    ) -> Dict[str, Any]:
        self.invalidate_core_call_cache(user_context)
        if data is None:
            data = {}

        headers = await self.__get_headers_with_api_version(path, user_context)
        headers["content-type"] = "application/json; charset=utf-8"

        async def f(url: str, method: str) -> Response:
            nonlocal headers, data
            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    _,
                    data,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, {}, data, user_context
                )
            return await self.api_request(url, method, 2, headers=headers, json=data)

        return await self.__send_request_helper(path, "PUT", f, len(self.__hosts))

    def invalidate_core_call_cache(
        self,
        user_context: Union[Dict[str, Any], None],
        upd_global_cache_tag_if_necessary: bool = True,
    ):
        if user_context is None:
            # this is done so that the code below runs as expected.
            # It will reset the __global_cache_tag if needed, and the
            # stuff we assign to the user_context will just be ignored (as expected)
            user_context = {}

        if upd_global_cache_tag_if_necessary and (
            user_context.get("_default", {}).get("keep_cache_alive", False) is not True
        ):
            # there can be race conditions here, but i think we can ignore them.
            Querier.__global_cache_tag = get_timestamp_ms()

        user_context["_default"] = {
            **user_context.get("_default", {}),
            "core_call_cache": {},
        }

    def get_all_core_urls_for_path(self, path: str) -> List[str]:
        normalized_path = NormalisedURLPath(path)

        result: List[str] = []

        for h in self.__hosts:
            current_domain = h.domain.get_as_string_dangerous()
            current_base_path = h.base_path.get_as_string_dangerous()

            result.append(
                current_domain
                + current_base_path
                + normalized_path.get_as_string_dangerous()
            )
        return result

    async def __send_request_helper(
        self,
        path: NormalisedURLPath,
        method: str,
        http_function: Callable[[str, str], Awaitable[Response]],
        no_of_tries: int,
        retry_info_map: Optional[Dict[str, int]] = None,
    ) -> Dict[str, Any]:
        if no_of_tries == 0:
            raise Exception("No SuperTokens core available to query")

        try:
            current_host_domain = self.__hosts[
                Querier.__last_tried_index
            ].domain.get_as_string_dangerous()
            current_host_base_path = self.__hosts[
                Querier.__last_tried_index
            ].base_path.get_as_string_dangerous()
            current_host: str = current_host_domain + current_host_base_path
            Querier.__last_tried_index += 1
            Querier.__last_tried_index %= len(self.__hosts)
            url = current_host + path.get_as_string_dangerous()

            max_retries = 5

            if retry_info_map is None:
                retry_info_map = {}

            if retry_info_map.get(url) is None:
                retry_info_map[url] = max_retries

            ProcessState.get_instance().add_state(
                PROCESS_STATE.CALLING_SERVICE_IN_REQUEST_HELPER
            )
            response = await http_function(url, method)
            if ("SUPERTOKENS_ENV" in environ) and (
                environ["SUPERTOKENS_ENV"] == "testing"
            ):
                Querier.__hosts_alive_for_testing.add(current_host)

            if response.status_code == RATE_LIMIT_STATUS_CODE:
                retries_left = retry_info_map[url]

                if retries_left > 0:
                    retry_info_map[url] = retries_left - 1

                    attempts_made = max_retries - retries_left
                    delay = (10 + attempts_made * 250) / 1000

                    await asyncio.sleep(delay)
                    return await self.__send_request_helper(
                        path, method, http_function, no_of_tries, retry_info_map
                    )

            if is_4xx_error(response.status_code) or is_5xx_error(response.status_code):  # type: ignore
                raise Exception(
                    "SuperTokens core threw an error for a "
                    + method
                    + " request to path: '"
                    + path.get_as_string_dangerous()
                    + "' with status code: "
                    + str(response.status_code)
                    + " and message: "
                    + response.text  # type: ignore
                )

            res: Dict[str, Any] = {"_headers": dict(response.headers)}

            try:
                res.update(response.json())
            except JSONDecodeError:
                res["_text"] = response.text

            return res

        except (ConnectionError, NetworkError, ConnectTimeout) as _:
            return await self.__send_request_helper(
                path, method, http_function, no_of_tries - 1, retry_info_map
            )

Classes

class Querier (hosts: List[Host], rid_to_core: Union[None, str] = None)
Expand source code
class Querier:
    __init_called = False
    __hosts: List[Host] = []
    __api_key: Union[None, str] = None
    api_version = None
    __last_tried_index: int = 0
    __hosts_alive_for_testing: Set[str] = set()
    network_interceptor: Optional[
        Callable[
            [
                str,
                str,
                Dict[str, Any],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
            ],
            Tuple[
                str,
                str,
                Dict[str, Any],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
            ],
        ]
    ] = None
    __global_cache_tag = get_timestamp_ms()
    __disable_cache = False

    def __init__(self, hosts: List[Host], rid_to_core: Union[None, str] = None):
        self.__hosts = hosts
        self.__rid_to_core = None
        if rid_to_core is not None:
            self.__rid_to_core = rid_to_core

    @staticmethod
    def reset():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise Exception("calling testing function in non testing env")
        Querier.__init_called = False

    @staticmethod
    def get_hosts_alive_for_testing():
        if ("SUPERTOKENS_ENV" not in environ) or (
            environ["SUPERTOKENS_ENV"] != "testing"
        ):
            raise Exception("calling testing function in non testing env")
        return Querier.__hosts_alive_for_testing

    async def api_request(
        self,
        url: str,
        method: str,
        attempts_remaining: int,
        *args: Any,
        **kwargs: Any,
    ) -> Response:
        if attempts_remaining == 0:
            raise Exception("Retry request failed")

        try:
            async with AsyncClient(timeout=30.0) as client:
                if method == "GET":
                    return await client.get(url, *args, **kwargs)  # type: ignore
                if method == "POST":
                    return await client.post(url, *args, **kwargs)  # type: ignore
                if method == "PUT":
                    return await client.put(url, *args, **kwargs)  # type: ignore
                if method == "DELETE":
                    return await client.delete(url, *args, **kwargs)  # type: ignore
                raise Exception("Shouldn't come here")
        except AsyncLibraryNotFoundError:
            # Retry
            loop = create_or_get_event_loop()
            return loop.run_until_complete(
                self.api_request(url, method, attempts_remaining - 1, *args, **kwargs)
            )

    async def get_api_version(self, user_context: Union[Dict[str, Any], None] = None):
        if user_context is None:
            user_context = {}

        if Querier.api_version is not None:
            return Querier.api_version

        ProcessState.get_instance().add_state(
            PROCESS_STATE.CALLING_SERVICE_IN_GET_API_VERSION
        )

        async def f(url: str, method: str) -> Response:
            from supertokens_python.supertokens import (
                Supertokens,
                get_request_from_user_context,
            )

            headers = {}
            if Querier.__api_key is not None:
                headers = {API_KEY_HEADER: Querier.__api_key}

            # Get app info
            app_info = Supertokens.get_instance().app_info

            req = get_request_from_user_context(user_context)
            website_domain = app_info.get_origin(req, user_context)
            # Prepare query parameters
            query_params = {
                "apiDomain": app_info.api_domain.get_as_string_dangerous(),
                "websiteDomain": website_domain.get_as_string_dangerous(),
            }

            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    query_params,
                    _,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, query_params, {}, user_context
                )

            return await self.api_request(
                url, method, 2, headers=headers, params=query_params
            )

        response = await self.__send_request_helper(
            NormalisedURLPath(API_VERSION), "GET", f, len(self.__hosts)
        )
        cdi_supported_by_server = response["versions"]
        api_version = find_max_version(cdi_supported_by_server, SUPPORTED_CDI_VERSIONS)

        if api_version is None:
            raise Exception(
                "The running SuperTokens core version is not compatible with this python "
                "SDK. Please visit https://supertokens.io/docs/community/compatibility-table "
                "to find the right versions"
            )

        Querier.api_version = api_version
        return Querier.api_version

    @staticmethod
    def get_instance(rid_to_core: Union[str, None] = None):
        if not Querier.__init_called:
            raise Exception(
                "Please call the supertokens.init function before using SuperTokens"
            )
        return Querier(Querier.__hosts, rid_to_core)

    @staticmethod
    def init(
        hosts: List[Host],
        api_key: Union[str, None] = None,
        network_interceptor: Optional[
            Callable[
                [
                    str,
                    str,
                    Dict[str, Any],
                    Optional[Dict[str, Any]],
                    Optional[Dict[str, Any]],
                    Optional[Dict[str, Any]],
                ],
                Tuple[
                    str,
                    str,
                    Dict[str, Any],
                    Optional[Dict[str, Any]],
                    Optional[Dict[str, Any]],
                ],
            ]
        ] = None,
        disable_cache: bool = False,
    ):
        if not Querier.__init_called:
            Querier.__init_called = True
            Querier.__hosts = hosts
            Querier.__api_key = api_key
            Querier.api_version = None
            Querier.__last_tried_index = 0
            Querier.__hosts_alive_for_testing = set()
            Querier.network_interceptor = network_interceptor
            Querier.__disable_cache = disable_cache

    async def __get_headers_with_api_version(
        self, path: NormalisedURLPath, user_context: Union[Dict[str, Any], None]
    ):
        headers = {API_VERSION_HEADER: await self.get_api_version(user_context)}
        if Querier.__api_key is not None:
            headers = {**headers, API_KEY_HEADER: Querier.__api_key}
        if path.is_a_recipe_path() and self.__rid_to_core is not None:
            headers = {**headers, RID_KEY_HEADER: self.__rid_to_core}
        return headers

    async def send_get_request(
        self,
        path: NormalisedURLPath,
        params: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
    ) -> Dict[str, Any]:
        if params is None:
            params = {}

        async def f(url: str, method: str) -> Response:
            headers = await self.__get_headers_with_api_version(path, user_context)
            nonlocal params

            assert params is not None

            # Sort the keys for deterministic order
            sorted_keys = sorted(params.keys())
            sorted_header_keys = sorted(headers.keys())

            # Start with the path as the unique key
            unique_key = path.get_as_string_dangerous()

            # Append sorted params to the unique key
            for key in sorted_keys:
                value = params[key]
                unique_key += f";{key}={value}"

            # Append a separator for headers
            unique_key += ";hdrs"

            # Append sorted headers to the unique key
            for key in sorted_header_keys:
                value = headers[key]
                unique_key += f";{key}={value}"

            if user_context is not None:
                if (
                    user_context.get("_default", {}).get("global_cache_tag", -1)
                    != Querier.__global_cache_tag
                ):
                    self.invalidate_core_call_cache(user_context, False)

                if not Querier.__disable_cache and unique_key in user_context.get(
                    "_default", {}
                ).get("core_call_cache", {}):
                    return user_context["_default"]["core_call_cache"][unique_key]

            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    params,
                    _,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, params, {}, user_context
                )

            response = await self.api_request(
                url,
                method,
                2,
                headers=headers,
                params=params,
            )

            if (
                response.status_code == 200
                and not Querier.__disable_cache
                and user_context is not None
            ):
                user_context["_default"] = {
                    **user_context.get("_default", {}),
                    "core_call_cache": {
                        **user_context.get("_default", {}).get("core_call_cache", {}),
                        unique_key: response,
                    },
                    "global_cache_tag": Querier.__global_cache_tag,
                }

            return response

        return await self.__send_request_helper(path, "GET", f, len(self.__hosts))

    async def send_post_request(
        self,
        path: NormalisedURLPath,
        data: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
        test: bool = False,
    ) -> Dict[str, Any]:
        self.invalidate_core_call_cache(user_context)
        if data is None:
            data = {}

        if (
            ("SUPERTOKENS_ENV" in environ)
            and (environ["SUPERTOKENS_ENV"] == "testing")
            and test
        ):
            return data

        headers = await self.__get_headers_with_api_version(path, user_context)
        headers["content-type"] = "application/json; charset=utf-8"

        async def f(url: str, method: str) -> Response:
            nonlocal headers, data
            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    _,
                    data,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, {}, data, user_context
                )
            return await self.api_request(
                url,
                method,
                2,
                headers=headers,
                json=data,
            )

        return await self.__send_request_helper(path, "POST", f, len(self.__hosts))

    async def send_delete_request(
        self,
        path: NormalisedURLPath,
        params: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
    ) -> Dict[str, Any]:
        self.invalidate_core_call_cache(user_context)
        if params is None:
            params = {}

        async def f(url: str, method: str) -> Response:
            headers = await self.__get_headers_with_api_version(path, user_context)
            nonlocal params
            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    params,
                    _,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, params, {}, user_context
                )
            return await self.api_request(
                url,
                method,
                2,
                headers=headers,
                params=params,
            )

        return await self.__send_request_helper(path, "DELETE", f, len(self.__hosts))

    async def send_put_request(
        self,
        path: NormalisedURLPath,
        data: Union[Dict[str, Any], None],
        user_context: Union[Dict[str, Any], None],
    ) -> Dict[str, Any]:
        self.invalidate_core_call_cache(user_context)
        if data is None:
            data = {}

        headers = await self.__get_headers_with_api_version(path, user_context)
        headers["content-type"] = "application/json; charset=utf-8"

        async def f(url: str, method: str) -> Response:
            nonlocal headers, data
            if Querier.network_interceptor is not None:
                (
                    url,
                    method,
                    headers,
                    _,
                    data,
                ) = Querier.network_interceptor(  # pylint:disable=not-callable
                    url, method, headers, {}, data, user_context
                )
            return await self.api_request(url, method, 2, headers=headers, json=data)

        return await self.__send_request_helper(path, "PUT", f, len(self.__hosts))

    def invalidate_core_call_cache(
        self,
        user_context: Union[Dict[str, Any], None],
        upd_global_cache_tag_if_necessary: bool = True,
    ):
        if user_context is None:
            # this is done so that the code below runs as expected.
            # It will reset the __global_cache_tag if needed, and the
            # stuff we assign to the user_context will just be ignored (as expected)
            user_context = {}

        if upd_global_cache_tag_if_necessary and (
            user_context.get("_default", {}).get("keep_cache_alive", False) is not True
        ):
            # there can be race conditions here, but i think we can ignore them.
            Querier.__global_cache_tag = get_timestamp_ms()

        user_context["_default"] = {
            **user_context.get("_default", {}),
            "core_call_cache": {},
        }

    def get_all_core_urls_for_path(self, path: str) -> List[str]:
        normalized_path = NormalisedURLPath(path)

        result: List[str] = []

        for h in self.__hosts:
            current_domain = h.domain.get_as_string_dangerous()
            current_base_path = h.base_path.get_as_string_dangerous()

            result.append(
                current_domain
                + current_base_path
                + normalized_path.get_as_string_dangerous()
            )
        return result

    async def __send_request_helper(
        self,
        path: NormalisedURLPath,
        method: str,
        http_function: Callable[[str, str], Awaitable[Response]],
        no_of_tries: int,
        retry_info_map: Optional[Dict[str, int]] = None,
    ) -> Dict[str, Any]:
        if no_of_tries == 0:
            raise Exception("No SuperTokens core available to query")

        try:
            current_host_domain = self.__hosts[
                Querier.__last_tried_index
            ].domain.get_as_string_dangerous()
            current_host_base_path = self.__hosts[
                Querier.__last_tried_index
            ].base_path.get_as_string_dangerous()
            current_host: str = current_host_domain + current_host_base_path
            Querier.__last_tried_index += 1
            Querier.__last_tried_index %= len(self.__hosts)
            url = current_host + path.get_as_string_dangerous()

            max_retries = 5

            if retry_info_map is None:
                retry_info_map = {}

            if retry_info_map.get(url) is None:
                retry_info_map[url] = max_retries

            ProcessState.get_instance().add_state(
                PROCESS_STATE.CALLING_SERVICE_IN_REQUEST_HELPER
            )
            response = await http_function(url, method)
            if ("SUPERTOKENS_ENV" in environ) and (
                environ["SUPERTOKENS_ENV"] == "testing"
            ):
                Querier.__hosts_alive_for_testing.add(current_host)

            if response.status_code == RATE_LIMIT_STATUS_CODE:
                retries_left = retry_info_map[url]

                if retries_left > 0:
                    retry_info_map[url] = retries_left - 1

                    attempts_made = max_retries - retries_left
                    delay = (10 + attempts_made * 250) / 1000

                    await asyncio.sleep(delay)
                    return await self.__send_request_helper(
                        path, method, http_function, no_of_tries, retry_info_map
                    )

            if is_4xx_error(response.status_code) or is_5xx_error(response.status_code):  # type: ignore
                raise Exception(
                    "SuperTokens core threw an error for a "
                    + method
                    + " request to path: '"
                    + path.get_as_string_dangerous()
                    + "' with status code: "
                    + str(response.status_code)
                    + " and message: "
                    + response.text  # type: ignore
                )

            res: Dict[str, Any] = {"_headers": dict(response.headers)}

            try:
                res.update(response.json())
            except JSONDecodeError:
                res["_text"] = response.text

            return res

        except (ConnectionError, NetworkError, ConnectTimeout) as _:
            return await self.__send_request_helper(
                path, method, http_function, no_of_tries - 1, retry_info_map
            )

Class variables

var api_version
var network_interceptor : Optional[Callable[[str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]]], Tuple[str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]]]]]

Static methods

def get_hosts_alive_for_testing()
Expand source code
@staticmethod
def get_hosts_alive_for_testing():
    if ("SUPERTOKENS_ENV" not in environ) or (
        environ["SUPERTOKENS_ENV"] != "testing"
    ):
        raise Exception("calling testing function in non testing env")
    return Querier.__hosts_alive_for_testing
def get_instance(rid_to_core: Union[str, None] = None)
Expand source code
@staticmethod
def get_instance(rid_to_core: Union[str, None] = None):
    if not Querier.__init_called:
        raise Exception(
            "Please call the supertokens.init function before using SuperTokens"
        )
    return Querier(Querier.__hosts, rid_to_core)
def init(hosts: List[Host], api_key: Union[str, None] = None, network_interceptor: Optional[Callable[[str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]]], Tuple[str, str, Dict[str, Any], Optional[Dict[str, Any]], Optional[Dict[str, Any]]]]] = None, disable_cache: bool = False)
Expand source code
@staticmethod
def init(
    hosts: List[Host],
    api_key: Union[str, None] = None,
    network_interceptor: Optional[
        Callable[
            [
                str,
                str,
                Dict[str, Any],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
            ],
            Tuple[
                str,
                str,
                Dict[str, Any],
                Optional[Dict[str, Any]],
                Optional[Dict[str, Any]],
            ],
        ]
    ] = None,
    disable_cache: bool = False,
):
    if not Querier.__init_called:
        Querier.__init_called = True
        Querier.__hosts = hosts
        Querier.__api_key = api_key
        Querier.api_version = None
        Querier.__last_tried_index = 0
        Querier.__hosts_alive_for_testing = set()
        Querier.network_interceptor = network_interceptor
        Querier.__disable_cache = disable_cache
def reset()
Expand source code
@staticmethod
def reset():
    if ("SUPERTOKENS_ENV" not in environ) or (
        environ["SUPERTOKENS_ENV"] != "testing"
    ):
        raise Exception("calling testing function in non testing env")
    Querier.__init_called = False

Methods

async def api_request(self, url: str, method: str, attempts_remaining: int, *args: Any, **kwargs: Any) ‑> httpx.Response
Expand source code
async def api_request(
    self,
    url: str,
    method: str,
    attempts_remaining: int,
    *args: Any,
    **kwargs: Any,
) -> Response:
    if attempts_remaining == 0:
        raise Exception("Retry request failed")

    try:
        async with AsyncClient(timeout=30.0) as client:
            if method == "GET":
                return await client.get(url, *args, **kwargs)  # type: ignore
            if method == "POST":
                return await client.post(url, *args, **kwargs)  # type: ignore
            if method == "PUT":
                return await client.put(url, *args, **kwargs)  # type: ignore
            if method == "DELETE":
                return await client.delete(url, *args, **kwargs)  # type: ignore
            raise Exception("Shouldn't come here")
    except AsyncLibraryNotFoundError:
        # Retry
        loop = create_or_get_event_loop()
        return loop.run_until_complete(
            self.api_request(url, method, attempts_remaining - 1, *args, **kwargs)
        )
def get_all_core_urls_for_path(self, path: str) ‑> List[str]
Expand source code
def get_all_core_urls_for_path(self, path: str) -> List[str]:
    normalized_path = NormalisedURLPath(path)

    result: List[str] = []

    for h in self.__hosts:
        current_domain = h.domain.get_as_string_dangerous()
        current_base_path = h.base_path.get_as_string_dangerous()

        result.append(
            current_domain
            + current_base_path
            + normalized_path.get_as_string_dangerous()
        )
    return result
async def get_api_version(self, user_context: Union[Dict[str, Any], None] = None)
Expand source code
async def get_api_version(self, user_context: Union[Dict[str, Any], None] = None):
    if user_context is None:
        user_context = {}

    if Querier.api_version is not None:
        return Querier.api_version

    ProcessState.get_instance().add_state(
        PROCESS_STATE.CALLING_SERVICE_IN_GET_API_VERSION
    )

    async def f(url: str, method: str) -> Response:
        from supertokens_python.supertokens import (
            Supertokens,
            get_request_from_user_context,
        )

        headers = {}
        if Querier.__api_key is not None:
            headers = {API_KEY_HEADER: Querier.__api_key}

        # Get app info
        app_info = Supertokens.get_instance().app_info

        req = get_request_from_user_context(user_context)
        website_domain = app_info.get_origin(req, user_context)
        # Prepare query parameters
        query_params = {
            "apiDomain": app_info.api_domain.get_as_string_dangerous(),
            "websiteDomain": website_domain.get_as_string_dangerous(),
        }

        if Querier.network_interceptor is not None:
            (
                url,
                method,
                headers,
                query_params,
                _,
            ) = Querier.network_interceptor(  # pylint:disable=not-callable
                url, method, headers, query_params, {}, user_context
            )

        return await self.api_request(
            url, method, 2, headers=headers, params=query_params
        )

    response = await self.__send_request_helper(
        NormalisedURLPath(API_VERSION), "GET", f, len(self.__hosts)
    )
    cdi_supported_by_server = response["versions"]
    api_version = find_max_version(cdi_supported_by_server, SUPPORTED_CDI_VERSIONS)

    if api_version is None:
        raise Exception(
            "The running SuperTokens core version is not compatible with this python "
            "SDK. Please visit https://supertokens.io/docs/community/compatibility-table "
            "to find the right versions"
        )

    Querier.api_version = api_version
    return Querier.api_version
def invalidate_core_call_cache(self, user_context: Union[Dict[str, Any], None], upd_global_cache_tag_if_necessary: bool = True)
Expand source code
def invalidate_core_call_cache(
    self,
    user_context: Union[Dict[str, Any], None],
    upd_global_cache_tag_if_necessary: bool = True,
):
    if user_context is None:
        # this is done so that the code below runs as expected.
        # It will reset the __global_cache_tag if needed, and the
        # stuff we assign to the user_context will just be ignored (as expected)
        user_context = {}

    if upd_global_cache_tag_if_necessary and (
        user_context.get("_default", {}).get("keep_cache_alive", False) is not True
    ):
        # there can be race conditions here, but i think we can ignore them.
        Querier.__global_cache_tag = get_timestamp_ms()

    user_context["_default"] = {
        **user_context.get("_default", {}),
        "core_call_cache": {},
    }
async def send_delete_request(self, path: NormalisedURLPath, params: Union[Dict[str, Any], None], user_context: Union[Dict[str, Any], None]) ‑> Dict[str, Any]
Expand source code
async def send_delete_request(
    self,
    path: NormalisedURLPath,
    params: Union[Dict[str, Any], None],
    user_context: Union[Dict[str, Any], None],
) -> Dict[str, Any]:
    self.invalidate_core_call_cache(user_context)
    if params is None:
        params = {}

    async def f(url: str, method: str) -> Response:
        headers = await self.__get_headers_with_api_version(path, user_context)
        nonlocal params
        if Querier.network_interceptor is not None:
            (
                url,
                method,
                headers,
                params,
                _,
            ) = Querier.network_interceptor(  # pylint:disable=not-callable
                url, method, headers, params, {}, user_context
            )
        return await self.api_request(
            url,
            method,
            2,
            headers=headers,
            params=params,
        )

    return await self.__send_request_helper(path, "DELETE", f, len(self.__hosts))
async def send_get_request(self, path: NormalisedURLPath, params: Union[Dict[str, Any], None], user_context: Union[Dict[str, Any], None]) ‑> Dict[str, Any]
Expand source code
async def send_get_request(
    self,
    path: NormalisedURLPath,
    params: Union[Dict[str, Any], None],
    user_context: Union[Dict[str, Any], None],
) -> Dict[str, Any]:
    if params is None:
        params = {}

    async def f(url: str, method: str) -> Response:
        headers = await self.__get_headers_with_api_version(path, user_context)
        nonlocal params

        assert params is not None

        # Sort the keys for deterministic order
        sorted_keys = sorted(params.keys())
        sorted_header_keys = sorted(headers.keys())

        # Start with the path as the unique key
        unique_key = path.get_as_string_dangerous()

        # Append sorted params to the unique key
        for key in sorted_keys:
            value = params[key]
            unique_key += f";{key}={value}"

        # Append a separator for headers
        unique_key += ";hdrs"

        # Append sorted headers to the unique key
        for key in sorted_header_keys:
            value = headers[key]
            unique_key += f";{key}={value}"

        if user_context is not None:
            if (
                user_context.get("_default", {}).get("global_cache_tag", -1)
                != Querier.__global_cache_tag
            ):
                self.invalidate_core_call_cache(user_context, False)

            if not Querier.__disable_cache and unique_key in user_context.get(
                "_default", {}
            ).get("core_call_cache", {}):
                return user_context["_default"]["core_call_cache"][unique_key]

        if Querier.network_interceptor is not None:
            (
                url,
                method,
                headers,
                params,
                _,
            ) = Querier.network_interceptor(  # pylint:disable=not-callable
                url, method, headers, params, {}, user_context
            )

        response = await self.api_request(
            url,
            method,
            2,
            headers=headers,
            params=params,
        )

        if (
            response.status_code == 200
            and not Querier.__disable_cache
            and user_context is not None
        ):
            user_context["_default"] = {
                **user_context.get("_default", {}),
                "core_call_cache": {
                    **user_context.get("_default", {}).get("core_call_cache", {}),
                    unique_key: response,
                },
                "global_cache_tag": Querier.__global_cache_tag,
            }

        return response

    return await self.__send_request_helper(path, "GET", f, len(self.__hosts))
async def send_post_request(self, path: NormalisedURLPath, data: Union[Dict[str, Any], None], user_context: Union[Dict[str, Any], None], test: bool = False) ‑> Dict[str, Any]
Expand source code
async def send_post_request(
    self,
    path: NormalisedURLPath,
    data: Union[Dict[str, Any], None],
    user_context: Union[Dict[str, Any], None],
    test: bool = False,
) -> Dict[str, Any]:
    self.invalidate_core_call_cache(user_context)
    if data is None:
        data = {}

    if (
        ("SUPERTOKENS_ENV" in environ)
        and (environ["SUPERTOKENS_ENV"] == "testing")
        and test
    ):
        return data

    headers = await self.__get_headers_with_api_version(path, user_context)
    headers["content-type"] = "application/json; charset=utf-8"

    async def f(url: str, method: str) -> Response:
        nonlocal headers, data
        if Querier.network_interceptor is not None:
            (
                url,
                method,
                headers,
                _,
                data,
            ) = Querier.network_interceptor(  # pylint:disable=not-callable
                url, method, headers, {}, data, user_context
            )
        return await self.api_request(
            url,
            method,
            2,
            headers=headers,
            json=data,
        )

    return await self.__send_request_helper(path, "POST", f, len(self.__hosts))
async def send_put_request(self, path: NormalisedURLPath, data: Union[Dict[str, Any], None], user_context: Union[Dict[str, Any], None]) ‑> Dict[str, Any]
Expand source code
async def send_put_request(
    self,
    path: NormalisedURLPath,
    data: Union[Dict[str, Any], None],
    user_context: Union[Dict[str, Any], None],
) -> Dict[str, Any]:
    self.invalidate_core_call_cache(user_context)
    if data is None:
        data = {}

    headers = await self.__get_headers_with_api_version(path, user_context)
    headers["content-type"] = "application/json; charset=utf-8"

    async def f(url: str, method: str) -> Response:
        nonlocal headers, data
        if Querier.network_interceptor is not None:
            (
                url,
                method,
                headers,
                _,
                data,
            ) = Querier.network_interceptor(  # pylint:disable=not-callable
                url, method, headers, {}, data, user_context
            )
        return await self.api_request(url, method, 2, headers=headers, json=data)

    return await self.__send_request_helper(path, "PUT", f, len(self.__hosts))