Module supertokens_python.querier
Expand source code
# Copyright (c) 2021, VRAI Labs and/or its affiliates. All rights reserved.
#
# This software is licensed under the Apache License, Version 2.0 (the
# "License") as published by the Apache Software Foundation.
#
# You may not use this file except in compliance with the License. You may
# obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from json import JSONDecodeError
from os import environ
from typing import TYPE_CHECKING
from httpx import AsyncClient, NetworkError, ConnectTimeout
from .constants import (
API_VERSION,
API_KEY_HEADER,
RID_KEY_HEADER,
SUPPORTED_CDI_VERSIONS,
API_VERSION_HEADER
)
from .normalised_url_path import NormalisedURLPath
if TYPE_CHECKING:
pass
from .exceptions import raise_general_exception
from .utils import (
is_4xx_error,
is_5xx_error,
find_max_version
)
from .process_state import AllowedProcessStates, ProcessState
class Querier:
__init_called = False
__hosts = None
__api_key = None
__api_version = None
__last_tried_index: int = 0
__hosts_alive_for_testing = set()
def __init__(self, hosts: list, rid_to_core=None):
self.__hosts = hosts
self.__rid_to_core = None
if rid_to_core is not None:
self.__rid_to_core = rid_to_core
@staticmethod
def reset():
if ('SUPERTOKENS_ENV' not in environ) or (
environ['SUPERTOKENS_ENV'] != 'testing'):
raise_general_exception(
None, 'calling testing function in non testing env')
Querier.__init_called = False
@staticmethod
def get_hosts_alive_for_testing():
if ('SUPERTOKENS_ENV' not in environ) or (
environ['SUPERTOKENS_ENV'] != 'testing'):
raise_general_exception(
None, 'calling testing function in non testing env')
return Querier.__hosts_alive_for_testing
async def get_api_version(self):
if Querier.__api_version is not None:
return Querier.__api_version
ProcessState.get_instance().add_state(
AllowedProcessStates.CALLING_SERVICE_IN_GET_API_VERSION)
async def f(url):
headers = {}
if Querier.__api_key is not None:
headers = {
API_KEY_HEADER: Querier.__api_key
}
async with AsyncClient() as client:
return await client.get(url, headers=headers)
response = await self.__send_request_helper(
NormalisedURLPath(API_VERSION), 'GET', f, len(self.__hosts))
cdi_supported_by_server = response['versions']
api_version = find_max_version(
cdi_supported_by_server,
SUPPORTED_CDI_VERSIONS)
if api_version is None:
raise_general_exception(None, 'The running SuperTokens core version is not compatible with this FastAPI '
'SDK. Please visit https://supertokens.com/docs/community/compatibility-table '
'to find the right versions')
Querier.__api_version = api_version
# TODO: server-less
return Querier.__api_version
@staticmethod
def get_instance(rid_to_core=None):
if (not Querier.__init_called) or (Querier.__hosts is None):
# TODO
raise Exception(
"Please call the supertokens.init function before using SuperTokens")
return Querier(Querier.__hosts, rid_to_core)
@staticmethod
def init(hosts, api_key=None):
if not Querier.__init_called:
Querier.__init_called = True
Querier.__hosts = hosts
Querier.__api_key = api_key
Querier.__api_version = None
Querier.__last_tried_index = 0
Querier.__hosts_alive_for_testing = set()
async def __get_headers_with_api_version(self, path):
headers = {
API_VERSION_HEADER: await self.get_api_version()
}
if Querier.__api_key is not None:
headers = {
**headers,
API_KEY_HEADER: Querier.__api_key
}
if path.is_a_recipe_path() and self.__rid_to_core is not None:
headers = {
**headers,
RID_KEY_HEADER: self.__rid_to_core
}
return headers
async def send_get_request(self, path: NormalisedURLPath, params=None):
if params is None:
params = {}
async def f(url):
async with AsyncClient() as client:
return await client.get(url, params=params, headers=await self.__get_headers_with_api_version(path))
return await self.__send_request_helper(path, 'GET', f, len(self.__hosts))
async def send_post_request(self, path: NormalisedURLPath, data=None, test=False):
if data is None:
data = {}
if ('SUPERTOKENS_ENV' in environ) and (
environ['SUPERTOKENS_ENV'] == 'testing') and test:
return data
headers = await self.__get_headers_with_api_version(path)
headers['content-type'] = 'application/json; charset=utf-8'
async def f(url):
async with AsyncClient() as client:
return await client.post(url, json=data, headers=headers)
return await self.__send_request_helper(path, 'POST', f, len(self.__hosts))
async def send_delete_request(self, path: NormalisedURLPath):
async def f(url):
async with AsyncClient() as client:
return await client.delete(url, headers=await self.__get_headers_with_api_version(path))
return await self.__send_request_helper(path, 'DELETE', f, len(self.__hosts))
async def send_put_request(self, path: NormalisedURLPath, data=None):
if data is None:
data = {}
headers = await self.__get_headers_with_api_version(path)
headers['content-type'] = 'application/json; charset=utf-8'
async def f(url):
async with AsyncClient() as client:
return await client.put(url, json=data, headers=headers)
return await self.__send_request_helper(path, 'PUT', f, len(self.__hosts))
async def __send_request_helper(self, path: NormalisedURLPath, method, http_function, no_of_tries):
if no_of_tries == 0:
raise_general_exception('No SuperTokens core available to query')
try:
current_host = self.__hosts[Querier.__last_tried_index].get_as_string_dangerous(
)
Querier.__last_tried_index += 1
Querier.__last_tried_index %= len(self.__hosts)
url = current_host + path.get_as_string_dangerous()
ProcessState.get_instance().add_state(
AllowedProcessStates.CALLING_SERVICE_IN_REQUEST_HELPER)
response = await http_function(url)
if ('SUPERTOKENS_ENV' in environ) and (
environ['SUPERTOKENS_ENV'] == 'testing'):
Querier.__hosts_alive_for_testing.add(current_host)
if is_4xx_error(response.status_code) or is_5xx_error(
response.status_code):
raise_general_exception('SuperTokens core threw an error for a ' + method + ' request to path: ' +
path.get_as_string_dangerous() + ' with status code: ' + str(
response.status_code) + ' and message: ' +
response.text)
try:
return response.json()
except JSONDecodeError:
return response.text
except (ConnectionError, NetworkError, ConnectTimeout):
return await self.__send_request_helper(
path, method, http_function, no_of_tries - 1)
except Exception as e:
raise_general_exception(e)
Classes
class Querier (hosts: list, rid_to_core=None)
-
Expand source code
class Querier: __init_called = False __hosts = None __api_key = None __api_version = None __last_tried_index: int = 0 __hosts_alive_for_testing = set() def __init__(self, hosts: list, rid_to_core=None): self.__hosts = hosts self.__rid_to_core = None if rid_to_core is not None: self.__rid_to_core = rid_to_core @staticmethod def reset(): if ('SUPERTOKENS_ENV' not in environ) or ( environ['SUPERTOKENS_ENV'] != 'testing'): raise_general_exception( None, 'calling testing function in non testing env') Querier.__init_called = False @staticmethod def get_hosts_alive_for_testing(): if ('SUPERTOKENS_ENV' not in environ) or ( environ['SUPERTOKENS_ENV'] != 'testing'): raise_general_exception( None, 'calling testing function in non testing env') return Querier.__hosts_alive_for_testing async def get_api_version(self): if Querier.__api_version is not None: return Querier.__api_version ProcessState.get_instance().add_state( AllowedProcessStates.CALLING_SERVICE_IN_GET_API_VERSION) async def f(url): headers = {} if Querier.__api_key is not None: headers = { API_KEY_HEADER: Querier.__api_key } async with AsyncClient() as client: return await client.get(url, headers=headers) response = await self.__send_request_helper( NormalisedURLPath(API_VERSION), 'GET', f, len(self.__hosts)) cdi_supported_by_server = response['versions'] api_version = find_max_version( cdi_supported_by_server, SUPPORTED_CDI_VERSIONS) if api_version is None: raise_general_exception(None, 'The running SuperTokens core version is not compatible with this FastAPI ' 'SDK. Please visit https://supertokens.com/docs/community/compatibility-table ' 'to find the right versions') Querier.__api_version = api_version # TODO: server-less return Querier.__api_version @staticmethod def get_instance(rid_to_core=None): if (not Querier.__init_called) or (Querier.__hosts is None): # TODO raise Exception( "Please call the supertokens.init function before using SuperTokens") return Querier(Querier.__hosts, rid_to_core) @staticmethod def init(hosts, api_key=None): if not Querier.__init_called: Querier.__init_called = True Querier.__hosts = hosts Querier.__api_key = api_key Querier.__api_version = None Querier.__last_tried_index = 0 Querier.__hosts_alive_for_testing = set() async def __get_headers_with_api_version(self, path): headers = { API_VERSION_HEADER: await self.get_api_version() } if Querier.__api_key is not None: headers = { **headers, API_KEY_HEADER: Querier.__api_key } if path.is_a_recipe_path() and self.__rid_to_core is not None: headers = { **headers, RID_KEY_HEADER: self.__rid_to_core } return headers async def send_get_request(self, path: NormalisedURLPath, params=None): if params is None: params = {} async def f(url): async with AsyncClient() as client: return await client.get(url, params=params, headers=await self.__get_headers_with_api_version(path)) return await self.__send_request_helper(path, 'GET', f, len(self.__hosts)) async def send_post_request(self, path: NormalisedURLPath, data=None, test=False): if data is None: data = {} if ('SUPERTOKENS_ENV' in environ) and ( environ['SUPERTOKENS_ENV'] == 'testing') and test: return data headers = await self.__get_headers_with_api_version(path) headers['content-type'] = 'application/json; charset=utf-8' async def f(url): async with AsyncClient() as client: return await client.post(url, json=data, headers=headers) return await self.__send_request_helper(path, 'POST', f, len(self.__hosts)) async def send_delete_request(self, path: NormalisedURLPath): async def f(url): async with AsyncClient() as client: return await client.delete(url, headers=await self.__get_headers_with_api_version(path)) return await self.__send_request_helper(path, 'DELETE', f, len(self.__hosts)) async def send_put_request(self, path: NormalisedURLPath, data=None): if data is None: data = {} headers = await self.__get_headers_with_api_version(path) headers['content-type'] = 'application/json; charset=utf-8' async def f(url): async with AsyncClient() as client: return await client.put(url, json=data, headers=headers) return await self.__send_request_helper(path, 'PUT', f, len(self.__hosts)) async def __send_request_helper(self, path: NormalisedURLPath, method, http_function, no_of_tries): if no_of_tries == 0: raise_general_exception('No SuperTokens core available to query') try: current_host = self.__hosts[Querier.__last_tried_index].get_as_string_dangerous( ) Querier.__last_tried_index += 1 Querier.__last_tried_index %= len(self.__hosts) url = current_host + path.get_as_string_dangerous() ProcessState.get_instance().add_state( AllowedProcessStates.CALLING_SERVICE_IN_REQUEST_HELPER) response = await http_function(url) if ('SUPERTOKENS_ENV' in environ) and ( environ['SUPERTOKENS_ENV'] == 'testing'): Querier.__hosts_alive_for_testing.add(current_host) if is_4xx_error(response.status_code) or is_5xx_error( response.status_code): raise_general_exception('SuperTokens core threw an error for a ' + method + ' request to path: ' + path.get_as_string_dangerous() + ' with status code: ' + str( response.status_code) + ' and message: ' + response.text) try: return response.json() except JSONDecodeError: return response.text except (ConnectionError, NetworkError, ConnectTimeout): return await self.__send_request_helper( path, method, http_function, no_of_tries - 1) except Exception as e: raise_general_exception(e)
Static methods
def get_hosts_alive_for_testing()
-
Expand source code
@staticmethod def get_hosts_alive_for_testing(): if ('SUPERTOKENS_ENV' not in environ) or ( environ['SUPERTOKENS_ENV'] != 'testing'): raise_general_exception( None, 'calling testing function in non testing env') return Querier.__hosts_alive_for_testing
def get_instance(rid_to_core=None)
-
Expand source code
@staticmethod def get_instance(rid_to_core=None): if (not Querier.__init_called) or (Querier.__hosts is None): # TODO raise Exception( "Please call the supertokens.init function before using SuperTokens") return Querier(Querier.__hosts, rid_to_core)
def init(hosts, api_key=None)
-
Expand source code
@staticmethod def init(hosts, api_key=None): if not Querier.__init_called: Querier.__init_called = True Querier.__hosts = hosts Querier.__api_key = api_key Querier.__api_version = None Querier.__last_tried_index = 0 Querier.__hosts_alive_for_testing = set()
def reset()
-
Expand source code
@staticmethod def reset(): if ('SUPERTOKENS_ENV' not in environ) or ( environ['SUPERTOKENS_ENV'] != 'testing'): raise_general_exception( None, 'calling testing function in non testing env') Querier.__init_called = False
Methods
async def get_api_version(self)
-
Expand source code
async def get_api_version(self): if Querier.__api_version is not None: return Querier.__api_version ProcessState.get_instance().add_state( AllowedProcessStates.CALLING_SERVICE_IN_GET_API_VERSION) async def f(url): headers = {} if Querier.__api_key is not None: headers = { API_KEY_HEADER: Querier.__api_key } async with AsyncClient() as client: return await client.get(url, headers=headers) response = await self.__send_request_helper( NormalisedURLPath(API_VERSION), 'GET', f, len(self.__hosts)) cdi_supported_by_server = response['versions'] api_version = find_max_version( cdi_supported_by_server, SUPPORTED_CDI_VERSIONS) if api_version is None: raise_general_exception(None, 'The running SuperTokens core version is not compatible with this FastAPI ' 'SDK. Please visit https://supertokens.com/docs/community/compatibility-table ' 'to find the right versions') Querier.__api_version = api_version # TODO: server-less return Querier.__api_version
async def send_delete_request(self, path: NormalisedURLPath)
-
Expand source code
async def send_delete_request(self, path: NormalisedURLPath): async def f(url): async with AsyncClient() as client: return await client.delete(url, headers=await self.__get_headers_with_api_version(path)) return await self.__send_request_helper(path, 'DELETE', f, len(self.__hosts))
async def send_get_request(self, path: NormalisedURLPath, params=None)
-
Expand source code
async def send_get_request(self, path: NormalisedURLPath, params=None): if params is None: params = {} async def f(url): async with AsyncClient() as client: return await client.get(url, params=params, headers=await self.__get_headers_with_api_version(path)) return await self.__send_request_helper(path, 'GET', f, len(self.__hosts))
async def send_post_request(self, path: NormalisedURLPath, data=None, test=False)
-
Expand source code
async def send_post_request(self, path: NormalisedURLPath, data=None, test=False): if data is None: data = {} if ('SUPERTOKENS_ENV' in environ) and ( environ['SUPERTOKENS_ENV'] == 'testing') and test: return data headers = await self.__get_headers_with_api_version(path) headers['content-type'] = 'application/json; charset=utf-8' async def f(url): async with AsyncClient() as client: return await client.post(url, json=data, headers=headers) return await self.__send_request_helper(path, 'POST', f, len(self.__hosts))
async def send_put_request(self, path: NormalisedURLPath, data=None)
-
Expand source code
async def send_put_request(self, path: NormalisedURLPath, data=None): if data is None: data = {} headers = await self.__get_headers_with_api_version(path) headers['content-type'] = 'application/json; charset=utf-8' async def f(url): async with AsyncClient() as client: return await client.put(url, json=data, headers=headers) return await self.__send_request_helper(path, 'PUT', f, len(self.__hosts))