Setting up the Backend
In order to use the anomaly detection service, a request to the following enpoint (based on the region) needs to be made:
US Region (N. Virginia):
POST https://security-us-east-1.aws.supertokens.io/v1/security
EU Region (Ireland):
POST https://security-eu-west-1.aws.supertokens.io/v1/security
APAC Region (Singapore):
POST https://security-ap-southeast-1.aws.supertokens.io/v1/security
You can view the HTTP API reference for this endpoint below:
Headers
const headers = {
"Content-Type": "application/json",
"Authorization": "Bearer <secret-api-key>"
};
Payload
const payload = {
// all of the fields are optional
"email": "[email protected]",
"phoneNumber": "+1234567890",
"passwordHash": "9cf95dacd226dcf43da376cdb6cbba7035218920",
"requestId": "some-request-id",
"actionType": "emailpassword-sign-in",
"bruteForce": [
{
"key": "some-key",
"maxRequests": [
{
"limit": 1,
"perTimeIntervalMS": 1000
}
]
}
]
};
Response
const response = {
id: "0191bc35-d527-7bbd-88df-1e7669e82cc0", // the id of the anomaly detection check
bruteForce: {
detected: true,
key: "some-key" // this will be present only if brute force has been detected and the value will be the key for which the brute force detection has been detected
},
emailRisk: null,
phoneNumberRisk: null,
passwordBreaches: {
'c1d808e04732adf679965ccc34ca7ae3441': '120', // the suffix of the password hash and the number of times it has been breached
'7acba4f54f55aafc33bb06bbbf6ca803e9a': '399', // the suffix of the password hash and the number of times it has been breached
}, // can be null if the password hash is not provided
isNewDevice: false, // can be null if the email or phone number is not provided
isImpossibleTravel: false, // can be null if the email or phone number is not provided
numberOfUniqueDevicesForUser: 1, // can be null if the email or phone number is not provided
/*
All the values below can be null based on the request ID provided and what has been detected
*/
requestIdInfo: { // can be null if the request ID is not provided
vpn: {
result: true, // this is true if the user is using a VPN
methods: {
publicVPN: true, // this is true if the user is using a public VPN
osMismatch: false,
auxiliaryMobile: false,
timezoneMismatch: true,
},
originCountry: 'unknown',
originTimezone: 'Europe/Bucharest',
},
frida: false,
proxy: false, // this is true if the user is using a proxy
valid: true,
ipInfo: {
v4: {
asn: { asn: '16509', name: 'AMAZON-02', network: '127.0.0.1/13' },
address: '127.0.0.1',
datacenter: { name: 'Amazon AWS', result: true },
geolocation: {
city: { name: 'Frankfurt am Main' },
country: { code: 'DE', name: 'Germany' },
latitude: 51.1187,
timezone: 'Europe/Berlin',
continent: { code: 'EU', name: 'Europe' },
longitude: 9.6842,
postalCode: '12345',
subdivisions: [{ name: 'Hesse', isoCode: 'HE' }],
accuracyRadius: 200,
},
},
v6: null, // contains same information as v4 if the user is using IPv6
},
velocity: {
events: { intervals: { '1h': 3, '5m': 3, '24h': 5 } },
distinctIp: { intervals: { '1h': 1, '5m': 1, '24h': 1 } },
distinctCountry: { intervals: { '1h': 1, '5m': 1, '24h': 1 } },
distinctLinkedId: { intervals: null },
},
clonedApp: false,
incognito: false,
tampering: { result: false, anomalyScore: 0 },
isEmulator: false,
isUsingTor: false, // this is true if the user is using Tor
jailbroken: false,
botDetected: false, // this is true if the user is a bot
ipBlocklist: {
result: false,
details: { emailSpam: false, attackSource: false },
},
factoryReset: { time: '1970-01-01T00:00:00Z', timestamp: 0 },
highActivity: false,
remoteControl: false,
identification: {
tag: { environmentId: 'cddd8855-ff50-4bbe-bb82-62b5057fa4f4' }, // this is the environment ID that you will receive from the SuperTokens team
url: 'http://example.com/index.html?eid=cddd8855-ff50-4bbe-bb82-62b5057fa4f4', // this is the URL that has been used to generate the request ID
linkedId: null,
timeInMS: 1723130887458,
incognito: false,
requestId: '1723130887451.92r32x', // this is the request ID that has been generated on the frontend
visitorId: 'mEYaqlY67Z55cHgzt37y',
confidence: { score: 1 },
browserDetails: {
os: 'Mac OS X',
device: 'Other',
osVersion: '10.15.7',
userAgent:
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36',
browserName: 'Chrome',
browserFullVersion: '127.0.0',
browserMajorVersion: '127',
},
},
virtualMachine: false,
privacySettings: false,
locationSpoofing: false,
rawDeviceAttributes: {
// These are the raw device attributes that are being sent from the frontend
// They might vary based on the device and browser that is being used
audio: { value: 124.04346607114712 },
fonts: {
value: ['Arial Unicode MS', 'Gill Sans', 'Helvetica Neue', 'Menlo'],
},
canvas: {
value: {
Text: '32a115bd05e0f411c5ecd7e285fd36e2',
Winding: true,
Geometry: 'd45e7d71dc99e368affd8a40840c833d',
},
},
contrast: { value: 0 },
cpuClass: {},
colorDepth: { value: 124.04346607114712 },
colorGamut: { value: 'p3' },
architecture: { value: 127 },
cookiesEnabled: { value: true },
},
}
};
#
Retrieving the Request IDWe pass a requestId
during the email password sign in, sign up and reset password APIs from the frontend (see frontend setup section). You can retrieve this ID by overriding these APIs in the emailpassword recipe on the backend.
A more complete example of how to retrieve the request ID from the input body can be found in the examples section.
important
The requestId
should be required when the trying to reset password, sign in or sign up. If the requestId
is not present, an error should be returned. You can see more in the examples section.
#
Making the Request to the Attack Protection Suite endpointThe request body is a JSON object that contains the following properties (all fields are optional):
requestId
: The request ID that has been generated on the frontend. If this is omitted, the bot detection, impossible travel detection, new device detection, device count detection and request ID info will be skipped.bruteForce
: An array of brute force checks that have been configured on the frontend.
type BruteForceCheck = {
key: string; // the key against which the the brute force check is being performed. This should be unique for each user (i.e. email, phone number, ip, etc. )
maxRequests: {
limit: number; // the maximum number of requests allowed within the time interval
perTimeIntervalMS: number; // the time interval in milliseconds within which the maximum number of requests is allowed
}[];
}[]
Here you can see some examples of different types of brute force checks that can be performed:
const userIp = "127.0.0.1"; // this should be the user's IP address
const userEmail = "[email protected]"; // this should be the user's email
// Useful for limiting a user's attempt fom the same network
// This is the most common use case
// ---
// This does two check:
// 1. 1 request per second - fast rate of requests
// 2. 100 requests per 60 minutes - slow brute force - some attackers might try sidestepping the regular brute force detection by using a slower rate of requests
const checkUserInSameNetwork = [{
key: `${userIp}-${userEmail}`,
maxRequests: [
{
limit: 1,
perTimeIntervalMS: 1000,
},
{
limit: 100,
perTimeIntervalMS: 60 * 1000 * 60,
}
]
}]
const userIp = "127.0.0.1"; // this should be the user's IP address
// Useful for limiting requests from the same network
// This should usually have a higher number of requests/time interval allowed
const checkNetwork = [{
key: `${userIp}`,
maxRequests: [
{
limit: 100,
perTimeIntervalMS: 1000,
},
]
}]
const userEmail = "[email protected]"; // this should be the user's email
// Useful for limiting requests for the user only
const checkUserOnly = [{
key: `${userEmail}`,
maxRequests: [
{
limit: 1,
perTimeIntervalMS: 1000,
},
]
}]
const userIp = "127.0.0.1"; // this should be the user's IP address
const userEmail = "[email protected]"; // this should be the user's email
// Checking by multiple keys at once
const checkUserOnly = [
{
key: `${userEmail}-${userIp}`,
maxRequests: [
{
limit: 1,
perTimeIntervalMS: 1000,
},
{
limit: 100,
perTimeIntervalMS: 60 * 1000 * 60,
}
]
},
{
key: `${userIp}`,
maxRequests: [
{
limit: 100,
perTimeIntervalMS: 1000,
},
]
}
]
passwordHashPrefix
: The first 5 characters of the SHA-1 hash of the password that needs to be checked against the breach database. If this is not provided, the password breach check will be skipped.email
: The email address that is being used for the authentication event. If this is not provided (orphoneNumber
), the impossible travel detection, new device detection and device count detection will be skipped.phoneNumber
: The phone number that is being used for the authentication event. If this is not provided (oremail
), the impossible travel detection, new device detection and device count detection will be skipped.actionType
: The type of action that is being performed. The possible values are:- "emailpassword-sign-in"
- "emailpassword-sign-up"
- "send-password-reset-email"
- "passwordless-send-email"
- "passwordless-send-sms"
- "totp-verify-device"
- "totp-verify-totp"
- "thirdparty-login"
- "emailverification-send-email"
#
Example integration codeHere are a few examples of full implementations of the anomaly detection:
#
Email and password- NodeJS
- GoLang
- Python
- Other Frameworks
import SuperTokens from "supertokens-node";
import EmailPassword from "supertokens-node/recipe/emailpassword";
import Session from "supertokens-node/recipe/session";
import axios from "axios";
import { createHash } from 'crypto';
const SECRET_API_KEY = "<secret-api-key>"; // Your secret API key that you received from the SuperTokens team
// The full URL with the correct region will be provided by the SuperTokens team
const ANOMALY_DETECTION_API_URL = "https://security-<region>.aws.supertokens.io/v1/security";
async function handleSecurityChecks(input: {
actionType?: string,
email?: string,
phoneNumber?: string,
password?: string,
requestId?: string,
bruteForceConfig?: {
key: string,
maxRequests: {
limit: number,
perTimeIntervalMS: number
}[]
}[]
}): Promise<{
status: "GENERAL_ERROR",
message: string
} | undefined> {
let requestBody: {
email?: string;
phoneNumber?: string;
actionType?: string;
requestId?: string;
passwordHashPrefix?: string;
bruteForce?: {
key: string;
maxRequests: {
limit: number;
perTimeIntervalMS: number;
}[];
}[];
} = {}
if (input.requestId !== undefined) {
requestBody.requestId = input.requestId;
}
let passwordHash: string | undefined;
if (input.password !== undefined) {
let shasum = createHash('sha1');
shasum.update(input.password);
passwordHash = shasum.digest('hex');
requestBody.passwordHashPrefix = passwordHash.slice(0, 5);
}
requestBody.bruteForce = input.bruteForceConfig;
requestBody.email = input.email;
requestBody.phoneNumber = input.phoneNumber;
requestBody.actionType = input.actionType;
let response;
try {
response = await axios.post(ANOMALY_DETECTION_API_URL, requestBody, {
headers: {
"Authorization": `Bearer ${SECRET_API_KEY}`,
"Content-Type": "application/json"
}
});
} catch (err) {
// silently fail in order to not break the auth flow
console.error(err);
return;
}
let responseData = response.data;
if (responseData.bruteForce.detected) {
return {
status: "GENERAL_ERROR",
message: "Too many requests. Please try again later."
}
}
if(responseData.requestIdInfo?.isUsingTor) {
return {
status: "GENERAL_ERROR",
message: "Tor activity detected. Please use a regular browser."
}
}
if(responseData.requestIdInfo?.vpn?.result) {
return {
status: "GENERAL_ERROR",
message: "VPN activity detected. Please use a regular network."
}
}
if (responseData.requestIdInfo?.botDetected) {
return {
status: "GENERAL_ERROR",
message: "Bot activity detected."
}
}
if (responseData?.passwordBreaches && passwordHash) {
const suffix = passwordHash.slice(5).toUpperCase();
const foundPasswordHash = responseData?.passwordBreaches[suffix];
if (foundPasswordHash) {
return {
status: "GENERAL_ERROR",
message: "This password has been detected in a breach. Please set a different password."
}
}
}
return undefined;
}
function getIpFromRequest(req: Request): string {
let headers: { [key: string]: string } = {};
for (let key of Object.keys(req.headers)) {
headers[key] = (req as any).headers[key]!;
}
return (req as any).headers['x-forwarded-for'] || "127.0.0.1"
}
const getBruteForceConfig = (userIdentifier: string, ip: string, prefix?: string) => [
{
key: `${prefix ? `${prefix}-` : ""}${userIdentifier}`,
maxRequests: [
{ limit: 5, perTimeIntervalMS: 60 * 1000 },
{ limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }
]
},
{
key: `${prefix ? `${prefix}-` : ""}${ip}`,
maxRequests: [
{ limit: 5, perTimeIntervalMS: 60 * 1000 },
{ limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }
]
}
];
// backend
SuperTokens.init({
appInfo: {
apiDomain: "...",
appName: "...",
websiteDomain: "..."
},
supertokens: {
connectionURI: "...",
},
recipeList: [
EmailPassword.init({
override: {
apis: (originalImplementation) => {
return {
...originalImplementation,
signUpPOST: async function (input) {
// We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc.
const requestId = (await input.options.req.getJSONBody()).requestId;
if(!requestId) {
return {
status: "GENERAL_ERROR",
message: "The request ID is required"
}
}
const actionType = 'emailpassword-sign-up';
const ip = getIpFromRequest(input.options.req.original);
let email = input.formFields.filter((f) => f.id === "email")[0].value as string;
let password = input.formFields.filter((f) => f.id === "password")[0].value as string;
const bruteForceConfig = getBruteForceConfig(email, ip, actionType);
// we check the anomaly detection service before calling the original implementation of signUp
let securityCheckResponse = await handleSecurityChecks({ requestId, email, password, bruteForceConfig, actionType });
if(securityCheckResponse !== undefined) {
return securityCheckResponse;
}
return originalImplementation.signUpPOST!(input);
},
signInPOST: async function (input) {
// We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc.
const requestId = (await input.options.req.getJSONBody()).requestId;
if(!requestId) {
return {
status: "GENERAL_ERROR",
message: "The request ID is required"
}
}
const actionType = 'emailpassword-sign-in';
const ip = getIpFromRequest(input.options.req.original);
let email = input.formFields.filter((f) => f.id === "email")[0].value as string;
const bruteForceConfig = getBruteForceConfig(email, ip, actionType);
// we check the anomaly detection service before calling the original implementation of signIn
let securityCheckResponse = await handleSecurityChecks({ requestId, email, bruteForceConfig, actionType });
if(securityCheckResponse !== undefined) {
return securityCheckResponse;
}
return originalImplementation.signInPOST!(input);
},
generatePasswordResetTokenPOST: async function (input) {
// We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc.
const requestId = (await input.options.req.getJSONBody()).requestId;
if(!requestId) {
return {
status: "GENERAL_ERROR",
message: "The request ID is required"
}
}
const actionType = 'send-password-reset-email';
const ip = getIpFromRequest(input.options.req.original);
let email = input.formFields.filter((f) => f.id === "email")[0].value as string;
const bruteForceConfig = getBruteForceConfig(email, ip, actionType);
// we check the anomaly detection service before calling the original implementation of generatePasswordResetToken
let securityCheckResponse = await handleSecurityChecks({ requestId, email, bruteForceConfig, actionType });
if(securityCheckResponse !== undefined) {
return securityCheckResponse;
}
return originalImplementation.generatePasswordResetTokenPOST!(input);
},
passwordResetPOST: async function (input) {
let password = input.formFields.filter((f) => f.id === "password")[0].value as string;
let securityCheckResponse = await handleSecurityChecks({ password });
if (securityCheckResponse !== undefined) {
return securityCheckResponse;
}
return originalImplementation.passwordResetPOST!(input);
}
}
}
}
}),
]
});
The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows:
- We get the request ID from the request body. This is a unique ID for the request.
- We define the action type based on the API that is being called.
- We get the email and password from the form fields.
- We get the IP address from the request.
- We create the brute force config from the email, IP address and action type. This config allows a number of requests over a time interval per:
- Action and email/phone number.
- Action and IP address.
- We call the anomaly detection service to check if the request is allowed.
- If the request is not allowed, we return a descriptive error response
- If the request is allowed, we call the original implementation of the API.
- We return the response from the original implementation of the API.
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"net/http"
"errors"
"github.com/supertokens/supertokens-golang/recipe/emailpassword"
"github.com/supertokens/supertokens-golang/recipe/emailpassword/epmodels"
"github.com/supertokens/supertokens-golang/supertokens"
)
const SECRET_API_KEY = "<secret-api-key>" // Your secret API key that you received from the SuperTokens team
// The full URL with the correct region will be provided by the SuperTokens team
const ANOMALY_DETECTION_API_URL = "https://security-<region>.aws.supertokens.io/v1/security"
type SecurityCheckInput struct {
ActionType string `json:"actionType,omitempty"`
Email string `json:"email,omitempty"`
PhoneNumber string `json:"phoneNumber,omitempty"`
Password string `json:"password,omitempty"`
RequestID string `json:"requestId,omitempty"`
BruteForceConfig []BruteForceConfig `json:"bruteForceConfig,omitempty"`
}
type BruteForceConfig struct {
Key string `json:"key"`
MaxRequests []MaxRequests `json:"maxRequests"`
}
type MaxRequests struct {
Limit int `json:"limit"`
PerTimeIntervalMS int `json:"perTimeIntervalMS"`
}
type ReqBody struct {
RequestID *string `json:"requestId"`
}
func getIpFromRequest(req *http.Request) string {
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
return forwardedFor
}
return "127.0.0.1"
}
func getBruteForceConfig(userIdentifier string, ip string, prefix string) []BruteForceConfig {
var key string
if prefix != "" {
key = prefix + "-"
}
return []BruteForceConfig{
{
Key: key + userIdentifier,
MaxRequests: []MaxRequests{
{Limit: 5, PerTimeIntervalMS: 60 * 1000},
{Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000},
},
},
{
Key: key + ip,
MaxRequests: []MaxRequests{
{Limit: 5, PerTimeIntervalMS: 60 * 1000},
{Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000},
},
},
}
}
func handleSecurityChecks(input SecurityCheckInput) (*supertokens.GeneralErrorResponse, error) {
requestBody := make(map[string]interface{})
if input.RequestID != "" {
requestBody["requestId"] = input.RequestID
}
var passwordHash string
if input.Password != "" {
hash := sha1.New()
hash.Write([]byte(input.Password))
passwordHash = hex.EncodeToString(hash.Sum(nil))
requestBody["passwordHashPrefix"] = passwordHash[:5]
}
requestBody["bruteForce"] = input.BruteForceConfig
requestBody["email"] = input.Email
requestBody["phoneNumber"] = input.PhoneNumber
requestBody["actionType"] = input.ActionType
jsonBody, err := json.Marshal(requestBody)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", ANOMALY_DETECTION_API_URL, bytes.NewBuffer(jsonBody))
if err != nil {
// silently fail in order to not break the auth flow
return nil, nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+SECRET_API_KEY)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var responseData map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&responseData)
if err != nil {
return nil, err
}
if bruteForce, ok := responseData["bruteForce"].(map[string]interface{}); ok {
if detected, ok := bruteForce["detected"].(bool); ok && detected {
return &supertokens.GeneralErrorResponse{
Message: "Too many requests. Please try again later.",
}, nil
}
}
if requestIdInfo, ok := responseData["requestIdInfo"].(map[string]interface{}); ok {
if isUsingTor, ok := requestIdInfo["isUsingTor"].(bool); ok && isUsingTor {
return &supertokens.GeneralErrorResponse{
Message: "Tor activity detected. Please use a regular browser.",
}, nil
}
if vpn, ok := requestIdInfo["vpn"].(map[string]interface{}); ok {
if result, ok := vpn["result"].(bool); ok && result {
return &supertokens.GeneralErrorResponse{
Message: "VPN activity detected. Please use a regular network.",
}, nil
}
}
if botDetected, ok := requestIdInfo["botDetected"].(bool); ok && botDetected {
return &supertokens.GeneralErrorResponse{
Message: "Bot activity detected.",
}, nil
}
}
if passwordBreaches, ok := responseData["passwordBreaches"].(map[string]interface{}); ok {
passwordHashSuffix := passwordHash[5:]
if _, ok := passwordBreaches[passwordHashSuffix]; ok {
return &supertokens.GeneralErrorResponse{
Message: "This password has been detected in a breach. Please set a different password.",
}, nil
}
}
return nil, nil
}
func main() {
supertokens.Init(supertokens.TypeInput{
RecipeList: []supertokens.Recipe{
emailpassword.Init(&epmodels.TypeInput{
Override: &epmodels.OverrideStruct{
APIs: func(originalImplementation epmodels.APIInterface) epmodels.APIInterface {
// rewrite the original implementation of SignUpPOST
originalSignUpPOST := *originalImplementation.SignUpPOST
(*originalImplementation.SignUpPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.SignUpPOSTResponse, error) {
// Generate request ID for bot and suspicious IP detection
var reqBody ReqBody
err := json.NewDecoder(options.Req.Body).Decode(&reqBody)
if err != nil {
return epmodels.SignUpPOSTResponse{}, err
}
if reqBody.RequestID == nil {
return epmodels.SignUpPOSTResponse{
GeneralError: &supertokens.GeneralErrorResponse{
Message: "The request ID is required",
},
}, nil
}
requestId := *reqBody.RequestID
actionType := "emailpassword-sign-up"
ip := getIpFromRequest(options.Req)
email := ""
password := ""
for _, field := range formFields {
if field.ID == "email" || field.ID == "password" {
valueAsString, asStrOk := field.Value.(string)
if !asStrOk {
return epmodels.SignUpPOSTResponse{}, errors.New("Should never come here as we check the type during validation")
}
if field.ID == "email" {
email = valueAsString
} else {
password = valueAsString
}
}
}
bruteForceConfig := getBruteForceConfig(email, ip, actionType)
// Check anomaly detection service before proceeding
checkErr, err := handleSecurityChecks(
SecurityCheckInput{
ActionType: actionType,
Email: email,
RequestID: requestId,
BruteForceConfig: bruteForceConfig,
Password: password,
},
)
if err != nil {
return epmodels.SignUpPOSTResponse{}, err
}
if checkErr != nil {
return epmodels.SignUpPOSTResponse{
GeneralError: checkErr,
}, nil
}
// pre API logic...
resp, err := originalSignUpPOST(formFields, tenantId, options, userContext)
if err != nil {
return epmodels.SignUpPOSTResponse{}, err
}
return resp, nil
}
// rewrite the original implementation of SignInPOST
originalSignInPOST := *originalImplementation.SignInPOST
(*originalImplementation.SignInPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.SignInPOSTResponse, error) {
// Generate request ID for bot and suspicious IP detection
var reqBody ReqBody
err := json.NewDecoder(options.Req.Body).Decode(&reqBody)
if err != nil {
return epmodels.SignInPOSTResponse{}, err
}
if reqBody.RequestID == nil {
return epmodels.SignInPOSTResponse{
GeneralError: &supertokens.GeneralErrorResponse{
Message: "The request ID is required",
},
}, nil
}
requestId := *reqBody.RequestID
actionType := "emailpassword-sign-in"
ip := getIpFromRequest(options.Req)
email := ""
password := ""
for _, field := range formFields {
if field.ID == "email" || field.ID == "password" {
valueAsString, asStrOk := field.Value.(string)
if !asStrOk {
return epmodels.SignInPOSTResponse{}, errors.New("Should never come here as we check the type during validation")
}
if field.ID == "email" {
email = valueAsString
} else {
password = valueAsString
}
}
}
bruteForceConfig := getBruteForceConfig(email, ip, actionType)
// Check anomaly detection service before proceeding
checkErr, err := handleSecurityChecks(
SecurityCheckInput{
ActionType: actionType,
Email: email,
RequestID: requestId,
BruteForceConfig: bruteForceConfig,
Password: password,
},
)
if err != nil {
return epmodels.SignInPOSTResponse{}, err
}
if checkErr != nil {
return epmodels.SignInPOSTResponse{
GeneralError: checkErr,
}, nil
}
// pre API logic...
resp, err := originalSignInPOST(formFields, tenantId, options, userContext)
if err != nil {
return epmodels.SignInPOSTResponse{}, err
}
return resp, nil
}
// rewrite the original implementation of GeneratePasswordResetTokenPOST
originalGeneratePasswordResetTokenPOST := *originalImplementation.GeneratePasswordResetTokenPOST
(*originalImplementation.GeneratePasswordResetTokenPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.GeneratePasswordResetTokenPOSTResponse, error) {
// Generate request ID for bot and suspicious IP detection
var reqBody ReqBody
err := json.NewDecoder(options.Req.Body).Decode(&reqBody)
if err != nil {
return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err
}
if reqBody.RequestID == nil {
return epmodels.GeneratePasswordResetTokenPOSTResponse{
GeneralError: &supertokens.GeneralErrorResponse{
Message: "The request ID is required",
},
}, nil
}
requestId := *reqBody.RequestID
actionType := "send-password-reset-email"
ip := getIpFromRequest(options.Req)
email := ""
for _, field := range formFields {
if field.ID == "email" {
valueAsString, asStrOk := field.Value.(string)
if !asStrOk {
return epmodels.GeneratePasswordResetTokenPOSTResponse{}, errors.New("Should never come here as we check the type during validation")
}
email = valueAsString
}
}
bruteForceConfig := getBruteForceConfig(email, ip, actionType)
// Check anomaly detection service before proceeding
checkErr, err := handleSecurityChecks(
SecurityCheckInput{
ActionType: actionType,
Email: email,
RequestID: requestId,
BruteForceConfig: bruteForceConfig,
},
)
if err != nil {
return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err
}
if checkErr != nil {
return epmodels.GeneratePasswordResetTokenPOSTResponse{
GeneralError: checkErr,
}, nil
}
// pre API logic...
resp, err := originalGeneratePasswordResetTokenPOST(formFields, tenantId, options, userContext)
if err != nil {
return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err
}
return resp, nil
}
// rewrite the original implementation of PasswordResetPOST
originalPasswordResetPOST := *originalImplementation.PasswordResetPOST
(*originalImplementation.PasswordResetPOST) = func(formFields []epmodels.TypeFormField, token string, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.ResetPasswordPOSTResponse, error) {
password := ""
for _, field := range formFields {
if field.ID == "password" {
valueAsString, asStrOk := field.Value.(string)
if !asStrOk {
return epmodels.ResetPasswordPOSTResponse{}, errors.New("Should never come here as we check the type during validation")
}
password = valueAsString
}
}
// Check anomaly detection service before proceeding
checkErr, err := handleSecurityChecks(
SecurityCheckInput{
Password: password,
},
)
if err != nil {
return epmodels.ResetPasswordPOSTResponse{}, err
}
if checkErr != nil {
return epmodels.ResetPasswordPOSTResponse{
GeneralError: checkErr,
}, nil
}
// First we call the original implementation
resp, err := originalPasswordResetPOST(formFields, token, tenantId, options, userContext)
if err != nil {
return epmodels.ResetPasswordPOSTResponse{}, err
}
return resp, nil
}
return originalImplementation
},
Functions: func(originalImplementation epmodels.RecipeInterface) epmodels.RecipeInterface {
return originalImplementation
},
},
}),
},
})
}
The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows:
- We get the request ID from the request body. This is a unique ID for the request.
- We define the action type based on the API that is being called.
- We get the email and password from the form fields.
- We get the IP address from the request.
- We create the brute force config from the email, IP address and action type. This config allows a number of requests over a time interval per:
- Action and email/phone number.
- Action and IP address.
- We call the anomaly detection service to check if the request is allowed.
- If the request is not allowed, we return a descriptive error response
- If the request is allowed, we call the original implementation of the API.
- We return the response from the original implementation of the API.
from httpx import AsyncClient
from hashlib import sha1
from typing import Dict, Any, Union, List
from supertokens_python import init, InputAppInfo
from supertokens_python.recipe import emailpassword
from supertokens_python.recipe.emailpassword.interfaces import APIInterface, APIOptions
from supertokens_python.recipe.emailpassword.types import FormField
from supertokens_python.framework import BaseRequest
from supertokens_python.types import GeneralErrorResponse
from supertokens_python.recipe.session import SessionContainer
SECRET_API_KEY = "<secret-api-key>"; # Your secret API key that you received from the SuperTokens team
# The full URL with the correct region will be provided by the SuperTokens team
ANOMALY_DETECTION_API_URL = "https://security-<region>.aws.supertokens.io/v1/security"
async def handle_security_checks(request_id: Union[str, None], password: Union[str, None], brute_force_config: Union[List[Dict[str, Any]], None], email: Union[str, None], phone_number: Union[str, None], action_type: Union[str, None]) -> Union[GeneralErrorResponse, None]:
request_body: Dict[str, Any] = {}
if request_id is not None:
request_body['requestId'] = request_id
password_hash = None
if password is not None:
password_hash = sha1(password.encode()).hexdigest()
request_body['passwordHashPrefix'] = password_hash[:5]
request_body['bruteForce'] = brute_force_config
request_body['email'] = email
request_body['phoneNumber'] = phone_number
request_body['actionType'] = action_type
try:
async with AsyncClient(timeout=10.0) as client:
response = await client.post(ANOMALY_DETECTION_API_URL, json=request_body, headers={
"Authorization": f"Bearer {SECRET_API_KEY}",
"Content-Type": "application/json"
})
response_data = response.json()
except:
# silently fail in order to not break the auth flow
return None
if response_data.get('bruteForce', {}).get('detected'):
return GeneralErrorResponse(message="Too many requests. Please try again later.")
if response_data.get('requestIdInfo', {}).get('isUsingTor'):
return GeneralErrorResponse(message="Tor activity detected. Please use a regular browser.")
if response_data.get('requestIdInfo', {}).get('vpn', {}).get('result'):
return GeneralErrorResponse(message="VPN activity detected. Please use a regular network.")
if response_data.get('requestIdInfo', {}).get('botDetected'):
return GeneralErrorResponse(message="Bot activity detected.")
if response_data.get('passwordBreaches') and password_hash is not None:
password_hash_suffix = password_hash[5:]
if password_hash_suffix in response_data['passwordBreaches']:
return GeneralErrorResponse(message="This password has been detected in a breach. Please set a different password.")
return None
def get_ip_from_request(req: BaseRequest) -> str:
forwarded_for = req.get_header('x-forwarded-for')
if forwarded_for:
return forwarded_for
return '127.0.0.1'
def get_brute_force_config(user_identifier: Union[str, None], ip: str, prefix: Union[str, None] = None) -> List[Dict[str, Any]]:
return [
{
"key": f"{prefix}-{user_identifier}" if prefix else user_identifier,
"maxRequests": [
{"limit": 5, "perTimeIntervalMS": 60 * 1000},
{"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}
]
},
{
"key": f"{prefix}-{ip}" if prefix else ip,
"maxRequests": [
{"limit": 5, "perTimeIntervalMS": 60 * 1000},
{"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}
]
}
]
def override_email_password_apis(original_implementation: APIInterface):
original_sign_up_post = original_implementation.sign_up_post
async def sign_up_post(form_fields: List[FormField], tenant_id: str, session: Union[SessionContainer, None],
should_try_linking_with_session_user: Union[bool, None], api_options: APIOptions, user_context: Dict[str, Any]):
request_body = await api_options.request.json()
if not request_body:
return GeneralErrorResponse(message="The request body is required")
request_id = request_body.get("requestId")
if not request_id:
return GeneralErrorResponse(message="The request ID is required")
action_type = 'emailpassword-sign-in'
ip = get_ip_from_request(api_options.request)
email = None
password = None
for field in form_fields:
if field.id == "email":
email = field.value
if field.id == "password":
password = field.value
brute_force_config = get_brute_force_config(email, ip, action_type)
# we check the anomaly detection service before calling the original implementation of signUp
security_check_response = await handle_security_checks(
request_id=request_id,
password=password,
brute_force_config=brute_force_config,
email=email,
phone_number=None,
action_type=action_type
)
if security_check_response is not None:
return security_check_response
# We need to call the original implementation of sign_up_post.
response = await original_sign_up_post(form_fields, tenant_id, session, should_try_linking_with_session_user, api_options, user_context)
return response
original_implementation.sign_up_post = sign_up_post
original_sign_in_post = original_implementation.sign_in_post
async def sign_in_post(form_fields: List[FormField], tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], api_options: APIOptions, user_context: Dict[str, Any]):
request_body = await api_options.request.json()
if not request_body:
return GeneralErrorResponse(message="The request body is required")
request_id = request_body.get("requestId")
if not request_id:
return GeneralErrorResponse(message="The request ID is required")
action_type = 'emailpassword-sign-in'
ip = get_ip_from_request(api_options.request)
email = None
for field in form_fields:
if field.id == "email":
email = field.value
brute_force_config = get_brute_force_config(email, ip, action_type)
# we check the anomaly detection service before calling the original implementation of sign_in_post
security_check_response = await handle_security_checks(
request_id=request_id,
password=None,
brute_force_config=brute_force_config,
email=email,
phone_number=None,
action_type=action_type
)
if security_check_response is not None:
return security_check_response
# We need to call the original implementation of sign_in_post.
response = await original_sign_in_post(form_fields, tenant_id, session, should_try_linking_with_session_user, api_options, user_context)
return response
original_implementation.sign_in_post = sign_in_post
original_generate_password_reset_token_post = original_implementation.generate_password_reset_token_post
async def generate_password_reset_token_post(form_fields: List[FormField], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]):
request_body = await api_options.request.json()
if not request_body:
return GeneralErrorResponse(message="The request body is required")
request_id = request_body.get("requestId")
if not request_id:
return GeneralErrorResponse(message="The request ID is required")
action_type = 'send-password-reset-email'
ip = get_ip_from_request(api_options.request)
email = None
for field in form_fields:
if field.id == "email":
email = field.value
brute_force_config = get_brute_force_config(email, ip, action_type)
# we check the anomaly detection service before calling the original implementation of generate_password_reset_token_post
security_check_response = await handle_security_checks(
request_id=request_id,
password=None,
brute_force_config=brute_force_config,
email=email,
phone_number=None,
action_type=action_type
)
if security_check_response is not None:
return security_check_response
# We need to call the original implementation of generate_password_reset_token_post.
response = await original_generate_password_reset_token_post(form_fields, tenant_id, api_options, user_context)
return response
original_implementation.generate_password_reset_token_post = generate_password_reset_token_post
original_password_reset_post = original_implementation.password_reset_post
async def password_reset_post(
form_fields: List[FormField],
token: str,
tenant_id: str,
api_options: APIOptions,
user_context: Dict[str, Any],
):
password = None
for field in form_fields:
if field.id == "password":
password = field.value
# we check the anomaly detection service before calling the original implementation of password_reset_post
security_check_response = await handle_security_checks(
request_id=None,
password=password,
brute_force_config=None,
email=None,
phone_number=None,
action_type=None
)
if security_check_response is not None:
return security_check_response
response = await original_password_reset_post(
form_fields, token, tenant_id, api_options, user_context
)
return response
original_implementation.password_reset_post = password_reset_post
return original_implementation
init(
app_info=InputAppInfo(api_domain="...", app_name="...", website_domain="..."),
framework='...',
recipe_list=[
emailpassword.init(
override=emailpassword.InputOverrideConfig(
apis=override_email_password_apis
)
)
]
)
The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows:
- We get the request ID from the request body. This is a unique ID for the request.
- We define the action type based on the API that is being called.
- We get the email and password from the form fields.
- We get the IP address from the request.
- We create the brute force config from the email, IP address and action type. This config allows a number of requests over a time interval per:
- Action and email/phone number.
- Action and IP address.
- We call the anomaly detection service to check if the request is allowed.
- If the request is not allowed, we return a descriptive error response
- If the request is allowed, we call the original implementation of the API.
- We return the response from the original implementation of the API.
note
For using with other frameworks, you can follow the same steps as for the other languages and you can also refer to the API documenation.
#
Passwordless- NodeJS
- GoLang
- Python
- Other Frameworks
import SuperTokens from "supertokens-node";
import Passwordless from "supertokens-node/recipe/passwordless";
import axios from "axios";
import { createHash } from 'crypto';
const SECRET_API_KEY = "<secret-api-key>"; // Your secret API key that you received from the SuperTokens team
const ANOMALY_DETECTION_API_URL = "https://security-us-east-1.aws.supertokens.io/v1/security";
async function handleSecurityChecks(input: {
actionType?: string,
email?: string,
phoneNumber?: string,
bruteForceConfig?: {
key: string,
maxRequests: {
limit: number,
perTimeIntervalMS: number
}[]
}[]
}): Promise<{
status: "GENERAL_ERROR",
message: string
} | undefined> {
let requestBody: {
email?: string;
phoneNumber?: string;
actionType?: string;
bruteForce?: {
key: string;
maxRequests: {
limit: number;
perTimeIntervalMS: number;
}[];
}[];
} = {}
requestBody.bruteForce = input.bruteForceConfig;
requestBody.email = input.email;
requestBody.phoneNumber = input.phoneNumber;
requestBody.actionType = input.actionType;
let response;
try {
response = await axios.post(ANOMALY_DETECTION_API_URL, requestBody, {
headers: {
"Authorization": `Bearer ${SECRET_API_KEY}`,
"Content-Type": "application/json"
}
});
} catch (err) {
// silently fail in order to not break the auth flow
console.error(err);
return;
}
let responseData = response.data;
if (responseData.bruteForce.detected) {
return {
status: "GENERAL_ERROR",
message: "Too many requests. Please try again later."
}
}
return undefined;
}
function getIpFromRequest(req: Request): string {
let headers: { [key: string]: string } = {};
for (let key of Object.keys(req.headers)) {
headers[key] = (req as any).headers[key]!;
}
return (req as any).headers['x-forwarded-for'] || "127.0.0.1"
}
const getBruteForceConfig = (userIdentifier: string, ip: string, prefix?: string) => [
{
key: `${prefix ? `${prefix}-` : ""}${userIdentifier}`,
maxRequests: [
{ limit: 5, perTimeIntervalMS: 60 * 1000 },
{ limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }
]
},
{
key: `${prefix ? `${prefix}-` : ""}${ip}`,
maxRequests: [
{ limit: 5, perTimeIntervalMS: 60 * 1000 },
{ limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }
]
}
];
SuperTokens.init({
framework: "...",
appInfo: { /*...*/ },
recipeList: [
Passwordless.init({
// ... other customisations ...
contactMethod: "EMAIL_OR_PHONE",
flowType: "USER_INPUT_CODE_AND_MAGIC_LINK",
override: {
apis: (originalImplementation) => {
return {
...originalImplementation,
createCodePOST: async function (input) {
const actionType = 'passwordless-send-sms';
const ip = getIpFromRequest(input.options.req.original);
const emailOrPhoneNumber = "email" in input ? input.email : input.phoneNumber;
const bruteForceConfig = getBruteForceConfig(emailOrPhoneNumber, ip, actionType);
// we check the anomaly detection service before calling the original implementation of createCodePOST
let securityCheckResponse = await handleSecurityChecks({ bruteForceConfig, actionType });
if(securityCheckResponse !== undefined) {
return securityCheckResponse;
}
return originalImplementation.createCodePOST!(input);
},
resendCodePOST: async function (input) {
const actionType = 'passwordless-send-sms';
const ip = getIpFromRequest(input.options.req.original);
let codesInfo = await Passwordless.listCodesByPreAuthSessionId({
tenantId: input.tenantId,
preAuthSessionId: input.preAuthSessionId
})
const phoneNumber = codesInfo && "phoneNumber" in codesInfo ? codesInfo.phoneNumber : undefined;
const email = codesInfo && "email" in codesInfo ? codesInfo.email : undefined;
const userIdentifier = email || phoneNumber || input.deviceId;
const bruteForceConfig = getBruteForceConfig(userIdentifier, ip, actionType);
// we check the anomaly detection service before calling the original implementation of resendCodePOST
let securityCheckResponse = await handleSecurityChecks({ phoneNumber, email, bruteForceConfig, actionType });
if(securityCheckResponse !== undefined) {
return securityCheckResponse;
}
return originalImplementation.resendCodePOST!(input);
},
};
},
},
}),
]
})
The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows:
- We define the action type based on the API that is being called.
- We get the email or the phone number from the form fields.
- We get the IP address from the request.
- We create the brute force config from the email, IP address and action type. This config allows a number of requests over a time interval per:
- Action and email/phone number.
- Action and IP address.
- We call the anomaly detection service to check if the request is allowed (only brute force detection is done here).
- If the request is not allowed, we return a descriptive error response
- If the request is allowed, we call the original implementation of the API.
- We return the response from the original implementation of the API.
import (
"bytes"
"encoding/json"
"net/http"
"github.com/supertokens/supertokens-golang/recipe/passwordless"
"github.com/supertokens/supertokens-golang/recipe/passwordless/plessmodels"
"github.com/supertokens/supertokens-golang/supertokens"
)
const SECRET_API_KEY = "<secret-api-key>" // Your secret API key that you received from the SuperTokens team
// The full URL with the correct region will be provided by the SuperTokens team
const ANOMALY_DETECTION_API_URL = "https://security-<region>.aws.supertokens.io/v1/security"
type SecurityCheckInput struct {
ActionType string `json:"actionType,omitempty"`
Email string `json:"email,omitempty"`
PhoneNumber string `json:"phoneNumber,omitempty"`
BruteForceConfig []BruteForceConfig `json:"bruteForceConfig,omitempty"`
}
type BruteForceConfig struct {
Key string `json:"key"`
MaxRequests []MaxRequests `json:"maxRequests"`
}
type MaxRequests struct {
Limit int `json:"limit"`
PerTimeIntervalMS int `json:"perTimeIntervalMS"`
}
func getIpFromRequest(req *http.Request) string {
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
return forwardedFor
}
return "127.0.0.1"
}
func getBruteForceConfig(userIdentifier string, ip string, prefix string) []BruteForceConfig {
var key string
if prefix != "" {
key = prefix + "-"
}
return []BruteForceConfig{
{
Key: key + userIdentifier,
MaxRequests: []MaxRequests{
{Limit: 5, PerTimeIntervalMS: 60 * 1000},
{Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000},
},
},
{
Key: key + ip,
MaxRequests: []MaxRequests{
{Limit: 5, PerTimeIntervalMS: 60 * 1000},
{Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000},
},
},
}
}
func handleSecurityChecks(input SecurityCheckInput) (*supertokens.GeneralErrorResponse, error) {
requestBody := make(map[string]interface{})
requestBody["bruteForce"] = input.BruteForceConfig
requestBody["email"] = input.Email
requestBody["phoneNumber"] = input.PhoneNumber
requestBody["actionType"] = input.ActionType
jsonBody, err := json.Marshal(requestBody)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", ANOMALY_DETECTION_API_URL, bytes.NewBuffer(jsonBody))
if err != nil {
// silently fail in order to not break the auth flow
return nil, nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+SECRET_API_KEY)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var responseData map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&responseData)
if err != nil {
return nil, err
}
if bruteForce, ok := responseData["bruteForce"].(map[string]interface{}); ok {
if detected, ok := bruteForce["detected"].(bool); ok && detected {
return &supertokens.GeneralErrorResponse{
Message: "Too many requests. Please try again later.",
}, nil
}
}
return nil, nil
}
func main() {
supertokens.Init(supertokens.TypeInput{
RecipeList: []supertokens.Recipe{
passwordless.Init(plessmodels.TypeInput{
FlowType: "USER_INPUT_CODE",
ContactMethodPhone: plessmodels.ContactMethodPhoneConfig{
Enabled: true,
},
Override: &plessmodels.OverrideStruct{
APIs: func(originalImplementation plessmodels.APIInterface) plessmodels.APIInterface {
originalCreateCodePOST := *originalImplementation.CreateCodePOST
(*originalImplementation.CreateCodePOST) = func(email *string, phoneNumber *string, tenantId string, options plessmodels.APIOptions, userContext supertokens.UserContext) (plessmodels.CreateCodePOSTResponse, error) {
actionType := "passwordless-send-sms"
ip := getIpFromRequest(options.Req)
var key string
if email != nil {
key = *email
} else {
key = *phoneNumber
}
bruteForceConfig := getBruteForceConfig(key, ip, actionType)
// Check anomaly detection service before proceeding
checkErr, err := handleSecurityChecks(
SecurityCheckInput{
ActionType: actionType,
Email: *email,
PhoneNumber: *phoneNumber,
BruteForceConfig: bruteForceConfig,
},
)
if err != nil {
return plessmodels.CreateCodePOSTResponse{}, err
}
if checkErr != nil {
return plessmodels.CreateCodePOSTResponse{
GeneralError: checkErr,
}, nil
}
return originalCreateCodePOST(email, phoneNumber, tenantId, options, userContext)
}
originalResendCodePOST := *originalImplementation.ResendCodePOST
(*originalImplementation.ResendCodePOST) = func(deviceID string, preAuthSessionID string, tenantId string, options plessmodels.APIOptions, userContext supertokens.UserContext) (plessmodels.ResendCodePOSTResponse, error) {
// retreive user details
codesInfo, err := passwordless.ListCodesByDeviceID(tenantId, deviceID, userContext)
if err != nil {
return plessmodels.ResendCodePOSTResponse{}, err
}
var email *string
var phoneNumber *string
if codesInfo.Email != nil {
email = codesInfo.Email
}
if codesInfo.PhoneNumber != nil {
phoneNumber = codesInfo.PhoneNumber
}
actionType := "passwordless-send-sms"
ip := getIpFromRequest(options.Req)
key := ""
if email != nil {
key = *email
} else {
key = *phoneNumber
}
bruteForceConfig := getBruteForceConfig(key, ip, actionType)
// Check anomaly detection service before proceeding
checkErr, err := handleSecurityChecks(
SecurityCheckInput{
ActionType: actionType,
Email: *email,
PhoneNumber: *phoneNumber,
BruteForceConfig: bruteForceConfig,
},
)
if err != nil {
return plessmodels.ResendCodePOSTResponse{}, err
}
if checkErr != nil {
return plessmodels.ResendCodePOSTResponse{
GeneralError: checkErr,
}, nil
}
return originalResendCodePOST(deviceID, preAuthSessionID, tenantId, options, userContext)
}
return originalImplementation
},
},
}),
},
})
}
The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows:
- We define the action type based on the API that is being called.
- We get the email or the phone number from the form fields.
- We get the IP address from the request.
- We create the brute force config from the email, IP address and action type. This config allows a number of requests over a time interval per:
- Action and email/phone number.
- Action and IP address.
- We call the anomaly detection service to check if the request is allowed (only brute force detection is done here).
- If the request is not allowed, we return a descriptive error response
- If the request is allowed, we call the original implementation of the API.
- We return the response from the original implementation of the API.
from httpx import AsyncClient
from typing import Dict, Any, Union, List
from supertokens_python import init, InputAppInfo
from supertokens_python.recipe import passwordless
from supertokens_python.recipe.passwordless.interfaces import APIInterface, APIOptions
from supertokens_python.recipe.passwordless.asyncio import list_codes_by_device_id
from supertokens_python.framework import BaseRequest
from supertokens_python.types import GeneralErrorResponse
from supertokens_python.recipe.session import SessionContainer
SECRET_API_KEY = "<secret-api-key>" # Your secret API key that you received from the SuperTokens team
# The full URL with the correct region will be provided by the SuperTokens team
ANOMALY_DETECTION_API_URL = "https://security-<region>.aws.supertokens.io/v1/security"
async def handle_security_checks(request_id: Union[str, None], password: Union[str, None], brute_force_config: Union[List[Dict[str, Any]], None], email: Union[str, None], phone_number: Union[str, None], action_type: Union[str, None]) -> Union[GeneralErrorResponse, None]:
request_body: Dict[str, Any] = {}
request_body['bruteForce'] = brute_force_config
request_body['email'] = email
request_body['phoneNumber'] = phone_number
request_body['actionType'] = action_type
try:
async with AsyncClient(timeout=10.0) as client:
response = await client.post(ANOMALY_DETECTION_API_URL, json=request_body, headers={
"Authorization": f"Bearer {SECRET_API_KEY}",
"Content-Type": "application/json"
})
response_data = response.json()
except:
# silently fail in order to not break the auth flow
return None
if response_data.get('bruteForce', {}).get('detected'):
return GeneralErrorResponse(message="Too many requests. Please try again later.")
return None
def get_ip_from_request(req: BaseRequest) -> str:
forwarded_for = req.get_header('x-forwarded-for')
if forwarded_for:
return forwarded_for
return '127.0.0.1'
def get_brute_force_config(user_identifier: Union[str, None], ip: str, prefix: Union[str, None] = None) -> List[Dict[str, Any]]:
return [
{
"key": f"{prefix}-{user_identifier}" if prefix else user_identifier,
"maxRequests": [
{"limit": 5, "perTimeIntervalMS": 60 * 1000},
{"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}
]
},
{
"key": f"{prefix}-{ip}" if prefix else ip,
"maxRequests": [
{"limit": 5, "perTimeIntervalMS": 60 * 1000},
{"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}
]
}
]
def override_passwordless_apis(original_implementation: APIInterface):
original_create_code_post = original_implementation.create_code_post
async def create_code_post(email: Union[str, None], phone_number: Union[str, None], session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]):
action_type = 'passwordless-send-sms'
ip = get_ip_from_request(api_options.request)
identifier = None
if email is not None:
identifier = email
elif phone_number is not None:
identifier = phone_number
brute_force_config = get_brute_force_config(identifier, ip, action_type)
# we check the anomaly detection service before calling the original implementation of create_code_post
security_check_response = await handle_security_checks(
request_id=None,
password=None,
brute_force_config=brute_force_config,
email=email,
phone_number=phone_number,
action_type=action_type
)
if security_check_response is not None:
return security_check_response
# We need to call the original implementation of create_code_post.
response = await original_create_code_post(email, phone_number, session, should_try_linking_with_session_user,tenant_id, api_options, user_context)
return response
original_implementation.create_code_post = create_code_post
original_resend_code_post = original_implementation.resend_code_post
async def resend_code_post(device_id: str, pre_auth_session_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any]):
action_type = 'passwordless-send-sms'
ip = get_ip_from_request(api_options.request)
email = None
phone_number = None
codes = await list_codes_by_device_id(tenant_id=tenant_id, device_id=device_id, user_context=user_context)
if codes is not None:
email = codes.email
phone_number = codes.phone_number
identifier = None
if email is not None:
identifier = email
elif phone_number is not None:
identifier = phone_number
brute_force_config = get_brute_force_config(identifier, ip, action_type)
# we check the anomaly detection service before calling the original implementation of resend_code_post
security_check_response = await handle_security_checks(
request_id=None,
password=None,
brute_force_config=brute_force_config,
email=email,
phone_number=phone_number,
action_type=action_type
)
if security_check_response is not None:
return security_check_response
# We need to call the original implementation of resend_code_post.
response = await original_resend_code_post(device_id, pre_auth_session_id, session, should_try_linking_with_session_user, tenant_id, api_options, user_context)
return response
original_implementation.resend_code_post = resend_code_post
return original_implementation
init(
app_info=InputAppInfo(api_domain="...", app_name="...", website_domain="..."),
framework='...',
recipe_list=[
passwordless.init(
flow_type="USER_INPUT_CODE_AND_MAGIC_LINK",
contact_config=passwordless.ContactEmailOrPhoneConfig(),
override=passwordless.InputOverrideConfig(
apis=override_passwordless_apis
)
)
]
)
The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows:
- We define the action type based on the API that is being called.
- We get the email or the phone number from the form fields.
- We get the IP address from the request.
- We create the brute force config from the email, IP address and action type. This config allows a number of requests over a time interval per:
- Action and email/phone number.
- Action and IP address.
- We call the anomaly detection service to check if the request is allowed (only brute force detection is done here).
- If the request is not allowed, we return a descriptive error response
- If the request is allowed, we call the original implementation of the API.
- We return the response from the original implementation of the API.
info
For using with other frameworks, you can follow the same steps as for the other languages and you can also refer to the API documenation.
#
Third partyIt's important to note that anomaly detection is not recommended for use with third-party providers. There are several reasons for this:
Existing anomaly detection: Most reputable third-party authentication providers (like Google, Facebook, Apple, etc.) have robust security measures in place, including their own anomaly detection systems. These systems are typically more comprehensive and tailored to their specific platforms.
Limited visibility: When using third-party authentication, you have limited visibility into the authentication process. This makes it difficult to accurately detect anomalies or suspicious activities that occur on the third-party's side.
Potential false positives: Applying anomaly detection to third-party logins might lead to an increase in false positives, as you don't have full context of the user's interactions with the third-party provider.
User experience: Additional security checks on top of third-party authentication could negatively impact the user experience, potentially defeating the purpose of offering third-party login as a convenient option.
For these reasons, it's generally best to rely on the security measures provided by the third-party authentication providers themselves when offering this login option to your users.