File length: 92998 # Additional Verification - Attack Protection Suite - Initial setup Source: https://supertokens.com/docs/additional-verification/attack-protection-suite/initial-setup ## Overview The following page shows you how to include the **Attack Protection Suite** feature in your **SuperTokens** integration. ## Before you start This feature is **in beta**. To get access to it, please [reach out](mailto:support@supertokens.com) to get it set up for you. Once you have access to it, you receive: - **Public API key** - use this on your frontend for generating request IDs - **Secret API key** - use this on your backend for making requests to the anomaly detection API - **Environment ID** - use this for identifying the environment you are using both on the backend and the frontend You can use the feature with either the `Email Password` or the `Passwordless` authentication methods. For social or enterprise login, it is not needed for multiple reasons: - **Existing anomaly detection**: Most reputable third-party authentication providers (like Google, Facebook, Apple, etc.) have robust security measures in place, including their own anomaly detection systems. These systems are typically more comprehensive and tailored to their specific platforms. - **Limited visibility**: When using third-party authentication, you have limited visibility into the authentication process. This makes it difficult to accurately detect anomalies or suspicious activities that occur on the third-party's side. - **Potential false positives**: Applying anomaly detection to third-party logins might lead to an increase in false positives, as you don't have full context of the user's interactions with the third-party provider. - **User experience**: Additional security checks on top of third-party authentication could negatively impact the user experience, defeating the purpose of offering third-party login as a convenient option. ## Steps ### 1. Attach request IDs to backend API calls The **Attack Protection Suite** feature relies on identifying each request through a unique ID. This way the fingerprinting process can determine if it's a potential threat or not. :::info Important This step applies only to bot detection and anomaly IP-based detection such as impossible travel detection. Also, check for bot detection only on the email password login flows. ::: #### 1.1 Generate a request ID To generate a request ID, import, and initialize the SDK using your public API key. This SDK generates a unique request ID for each authentication event attempt. ```tsx const ENVIRONMENT_ID = ""; // Your environment ID that you received from the SuperTokens team // Initialize the agent on page load using your public API key that you received from the SuperTokens team. // @ts-expect-error const supertokensRequestIdPromise = import("https://deviceid.supertokens.io/PqWNQ35Ydhm6WDUK/k9bwGCuvuA83Ad6s?apiKey=") .then((RequestId: any) => RequestId.load({ endpoint: [ 'https://deviceid.supertokens.io/PqWNQ35Ydhm6WDUK/CnsdzKsyFKU8Q3h2', RequestId.defaultEndpoint ] })); async function getRequestId() { const sdk = await supertokensRequestIdPromise; const result = await sdk.get({ tag: { environmentId: ENVIRONMENT_ID, } }); return result.requestId; } ``` #### 1.2 Pass the request ID to the backend Include the `requestId` property along with the value as part of the `preAPIHook` body from the initialisation of the recipes. :::info Important If the request ID is not passed to the backend, the anomaly detection can only detect password breaches and brute force attacks. ::: Below is a full example of how to configure the SDK and pass the request ID to the backend. The request ID generates only for the email password sign in, sign up, and reset password actions because these are the only actions that require bot detection. For all the other recipes, this is not needed. ```tsx const ENVIRONMENT_ID = ""; // Your environment ID that you received from the SuperTokens team // Initialize the agent on page load using your public API key that you received from the SuperTokens team. // @ts-expect-error const supertokensRequestIdPromise = import("https://deviceid.supertokens.io/PqWNQ35Ydhm6WDUK/k9bwGCuvuA83Ad6s?apiKey=") .then((RequestId: any) => RequestId.load({ endpoint: [ 'https://deviceid.supertokens.io/PqWNQ35Ydhm6WDUK/CnsdzKsyFKU8Q3h2', RequestId.defaultEndpoint ] })); async function getRequestId() { const sdk = await supertokensRequestIdPromise; const result = await sdk.get({ tag: { environmentId: ENVIRONMENT_ID, } }); return result.requestId; } export const SuperTokensConfig = { // ... other config options appInfo: { appName: "...", apiDomain: '...', websiteDomain: '...', }, // recipeList contains all the modules that you want to // use from SuperTokens. See the full list here: https://supertokens.com/docs/guides recipeList: [ EmailPassword.init({ // highlight-start preAPIHook: async (context) => { let url = context.url; let requestInit = context.requestInit; let action = context.action; if (action === "EMAIL_PASSWORD_SIGN_IN" || action === "EMAIL_PASSWORD_SIGN_UP" || action === "SEND_RESET_PASSWORD_EMAIL") { let requestId = await getRequestId(); let body = context.requestInit.body; if (body !== undefined) { let bodyJson = JSON.parse(body as string); bodyJson.requestId = requestId; requestInit.body = JSON.stringify(bodyJson); } } return { requestInit, url }; } // highlight-end }), ], }; ``` ### 2. Retrieve the request ID To retrieve the request ID in the backend you have to override the recipe implementations. #### Email and password ```tsx function getIpFromRequest(req: Request): string { let headers: { [key: string]: string } = {}; for (let key of Object.keys(req.headers)) { headers[key] = (req as any).headers[key]!; } return (req as any).headers["x-forwarded-for"] || "127.0.0.1"; } const getBruteForceConfig = ( userIdentifier: string, ip: string, prefix?: string, ) => [ { key: `${prefix ? `${prefix}-` : ""}${userIdentifier}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, { key: `${prefix ? `${prefix}-` : ""}${ip}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, ]; SuperTokens.init({ appInfo: { apiDomain: "...", appName: "...", websiteDomain: "...", }, supertokens: { connectionURI: "...", }, recipeList: [ EmailPassword.init({ // highlight-start override: { apis: (originalImplementation) => { return { ...originalImplementation, signUpPOST: async function (input) { // We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc. const requestId = (await input.options.req.getJSONBody()) .requestId; if (!requestId) { return { status: "GENERAL_ERROR", message: "The request ID is required", }; } const actionType = "emailpassword-sign-up"; const ip = getIpFromRequest(input.options.req.original); let email = input.formFields.filter((f) => f.id === "email")[0] .value as string; let password = input.formFields.filter( (f) => f.id === "password", )[0].value as string; const bruteForceConfig = getBruteForceConfig( email, ip, actionType, ); return originalImplementation.signUpPOST!(input); }, signInPOST: async function (input) { // We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc. const requestId = (await input.options.req.getJSONBody()) .requestId; if (!requestId) { return { status: "GENERAL_ERROR", message: "The request ID is required", }; } const actionType = "emailpassword-sign-up"; const ip = getIpFromRequest(input.options.req.original); let email = input.formFields.filter((f) => f.id === "email")[0] .value as string; let password = input.formFields.filter( (f) => f.id === "password", )[0].value as string; const bruteForceConfig = getBruteForceConfig( email, ip, actionType, ); return originalImplementation.signInPOST!(input); }, generatePasswordResetTokenPOST: async function (input) { // We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc. const requestId = (await input.options.req.getJSONBody()) .requestId; if (!requestId) { return { status: "GENERAL_ERROR", message: "The request ID is required", }; } const actionType = "emailpassword-sign-up"; const ip = getIpFromRequest(input.options.req.original); let email = input.formFields.filter((f) => f.id === "email")[0] .value as string; let password = input.formFields.filter( (f) => f.id === "password", )[0].value as string; const bruteForceConfig = getBruteForceConfig( email, ip, actionType, ); return originalImplementation.generatePasswordResetTokenPOST!( input, ); }, }; }, }, // highlight-end }), ], }); ``` ```go return forwardedFor } return "127.0.0.1" } func getBruteForceConfig(userIdentifier string, ip string, prefix string) []BruteForceConfig { var key string if prefix != "" { key = prefix + "-" } return []BruteForceConfig{ { Key: key + userIdentifier, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, { Key: key + ip, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, } } func main() { supertokens.Init(supertokens.TypeInput{ RecipeList: []supertokens.Recipe{ emailpassword.Init(&epmodels.TypeInput{ Override: &epmodels.OverrideStruct{ APIs: func(originalImplementation epmodels.APIInterface) epmodels.APIInterface { // rewrite the original implementation of SignUpPOST originalSignUpPOST := *originalImplementation.SignUpPOST (*originalImplementation.SignUpPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.SignUpPOSTResponse, error) { // Generate request ID for bot and suspicious IP detection var reqBody ReqBody err := json.NewDecoder(options.Req.Body).Decode(&reqBody) if err != nil { return epmodels.SignUpPOSTResponse{}, err } if reqBody.RequestID == nil { return epmodels.SignUpPOSTResponse{ GeneralError: &supertokens.GeneralErrorResponse{ Message: "The request ID is required", }, }, nil } requestId := *reqBody.RequestID fmt.Println(requestId) actionType := "emailpassword-sign-up" ip := getIpFromRequest(options.Req) email := "" password := "" for _, field := range formFields { if field.ID == "email" || field.ID == "password" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.SignUpPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } if field.ID == "email" { email = valueAsString } else { password = valueAsString } } } fmt.Println(password) bruteForceConfig := getBruteForceConfig(email, ip, actionType) fmt.Println(bruteForceConfig) // pre API logic... resp, err := originalSignUpPOST(formFields, tenantId, options, userContext) if err != nil { return epmodels.SignUpPOSTResponse{}, err } return resp, nil } // rewrite the original implementation of SignInPOST originalSignInPOST := *originalImplementation.SignInPOST (*originalImplementation.SignInPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.SignInPOSTResponse, error) { // Generate request ID for bot and suspicious IP detection var reqBody ReqBody err := json.NewDecoder(options.Req.Body).Decode(&reqBody) if err != nil { return epmodels.SignInPOSTResponse{}, err } if reqBody.RequestID == nil { return epmodels.SignInPOSTResponse{ GeneralError: &supertokens.GeneralErrorResponse{ Message: "The request ID is required", }, }, nil } requestId := *reqBody.RequestID fmt.Println(requestId) actionType := "emailpassword-sign-in" ip := getIpFromRequest(options.Req) email := "" password := "" for _, field := range formFields { if field.ID == "email" || field.ID == "password" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.SignInPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } if field.ID == "email" { email = valueAsString } else { password = valueAsString } } } fmt.Println(password) bruteForceConfig := getBruteForceConfig(email, ip, actionType) fmt.Println(bruteForceConfig) // pre API logic... resp, err := originalSignInPOST(formFields, tenantId, options, userContext) if err != nil { return epmodels.SignInPOSTResponse{}, err } return resp, nil } // rewrite the original implementation of GeneratePasswordResetTokenPOST originalGeneratePasswordResetTokenPOST := *originalImplementation.GeneratePasswordResetTokenPOST (*originalImplementation.GeneratePasswordResetTokenPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.GeneratePasswordResetTokenPOSTResponse, error) { // Generate request ID for bot and suspicious IP detection var reqBody ReqBody err := json.NewDecoder(options.Req.Body).Decode(&reqBody) if err != nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err } if reqBody.RequestID == nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{ GeneralError: &supertokens.GeneralErrorResponse{ Message: "The request ID is required", }, }, nil } requestId := *reqBody.RequestID fmt.Println(requestId) actionType := "send-password-reset-email" ip := getIpFromRequest(options.Req) email := "" for _, field := range formFields { if field.ID == "email" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } email = valueAsString } } bruteForceConfig := getBruteForceConfig(email, ip, actionType) fmt.Println(bruteForceConfig) // pre API logic... resp, err := originalGeneratePasswordResetTokenPOST(formFields, tenantId, options, userContext) if err != nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err } return resp, nil } return originalImplementation }, Functions: func(originalImplementation epmodels.RecipeInterface) epmodels.RecipeInterface { return originalImplementation }, }, }), }, }) } ``` ```python from typing import Dict, Any, Union, List from supertokens_python import init, InputAppInfo from supertokens_python.recipe import emailpassword from supertokens_python.recipe.emailpassword.interfaces import APIInterface, APIOptions from supertokens_python.recipe.emailpassword.types import FormField from supertokens_python.framework import BaseRequest from supertokens_python.types import GeneralErrorResponse from supertokens_python.recipe.session import SessionContainer def get_ip_from_request(req: BaseRequest) -> str: forwarded_for = req.get_header("x-forwarded-for") if forwarded_for: return forwarded_for return "127.0.0.1" def get_brute_force_config( user_identifier: Union[str, None], ip: str, prefix: Union[str, None] = None ) -> List[Dict[str, Any]]: return [ { "key": f"{prefix}-{user_identifier}" if prefix else user_identifier, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, { "key": f"{prefix}-{ip}" if prefix else ip, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, ] # highlight-start def override_email_password_apis(original_implementation: APIInterface): original_sign_up_post = original_implementation.sign_up_post async def sign_up_post( form_fields: List[FormField], tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], api_options: APIOptions, user_context: Dict[str, Any], ): request_body = await api_options.request.json() if not request_body: return GeneralErrorResponse(message="The request body is required") request_id = request_body.get("requestId") if not request_id: return GeneralErrorResponse(message="The request ID is required") action_type = "emailpassword-sign-in" ip = get_ip_from_request(api_options.request) email = None for field in form_fields: if field.id == "email": email = field.value brute_force_config = get_brute_force_config(email, ip, action_type) print(brute_force_config) response = await original_sign_up_post( form_fields, tenant_id, session, should_try_linking_with_session_user, api_options, user_context, ) return response original_implementation.sign_up_post = sign_up_post original_sign_in_post = original_implementation.sign_in_post async def sign_in_post( form_fields: List[FormField], tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], api_options: APIOptions, user_context: Dict[str, Any], ): request_body = await api_options.request.json() if not request_body: return GeneralErrorResponse(message="The request body is required") request_id = request_body.get("requestId") if not request_id: return GeneralErrorResponse(message="The request ID is required") action_type = "emailpassword-sign-in" ip = get_ip_from_request(api_options.request) email = None for field in form_fields: if field.id == "email": email = field.value brute_force_config = get_brute_force_config(email, ip, action_type) print(brute_force_config) response = await original_sign_in_post( form_fields, tenant_id, session, should_try_linking_with_session_user, api_options, user_context, ) return response original_implementation.sign_in_post = sign_in_post original_generate_password_reset_token_post = ( original_implementation.generate_password_reset_token_post ) async def generate_password_reset_token_post( form_fields: List[FormField], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): request_body = await api_options.request.json() if not request_body: return GeneralErrorResponse(message="The request body is required") request_id = request_body.get("requestId") if not request_id: return GeneralErrorResponse(message="The request ID is required") action_type = "send-password-reset-email" ip = get_ip_from_request(api_options.request) email = None for field in form_fields: if field.id == "email": email = field.value brute_force_config = get_brute_force_config(email, ip, action_type) print(brute_force_config) response = await original_generate_password_reset_token_post( form_fields, tenant_id, api_options, user_context ) return response original_implementation.generate_password_reset_token_post = ( generate_password_reset_token_post ) return original_implementation # highlight-end init( app_info=InputAppInfo(api_domain="...", app_name="...", website_domain="..."), framework="...", # type: ignore recipe_list=[ emailpassword.init( # highlight-start override=emailpassword.InputOverrideConfig( apis=override_email_password_apis ) # highlight-end ) ], ) ``` #### Passwordless ```tsx function getIpFromRequest(req: Request): string { let headers: { [key: string]: string } = {}; for (let key of Object.keys(req.headers)) { headers[key] = (req as any).headers[key]!; } return (req as any).headers["x-forwarded-for"] || "127.0.0.1"; } const getBruteForceConfig = ( userIdentifier: string, ip: string, prefix?: string, ) => [ { key: `${prefix ? `${prefix}-` : ""}${userIdentifier}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, { key: `${prefix ? `${prefix}-` : ""}${ip}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, ]; SuperTokens.init({ // @ts-ignore framework: "...", // @ts-ignore appInfo: { /*...*/ }, recipeList: [ Passwordless.init({ // ... other customisations ... // highlight-start contactMethod: "EMAIL_OR_PHONE", flowType: "USER_INPUT_CODE_AND_MAGIC_LINK", override: { apis: (originalImplementation) => { return { ...originalImplementation, createCodePOST: async function (input) { const actionType = "passwordless-send-sms"; const ip = getIpFromRequest(input.options.req.original); const emailOrPhoneNumber = "email" in input ? input.email : input.phoneNumber; const bruteForceConfig = getBruteForceConfig( emailOrPhoneNumber, ip, actionType, ); return originalImplementation.createCodePOST!(input); }, resendCodePOST: async function (input) { const actionType = "passwordless-send-sms"; const ip = getIpFromRequest(input.options.req.original); let codesInfo = await Passwordless.listCodesByPreAuthSessionId({ tenantId: input.tenantId, preAuthSessionId: input.preAuthSessionId, }); const phoneNumber = codesInfo && "phoneNumber" in codesInfo ? codesInfo.phoneNumber : undefined; const email = codesInfo && "email" in codesInfo ? codesInfo.email : undefined; const userIdentifier = email || phoneNumber || input.deviceId; const bruteForceConfig = getBruteForceConfig( userIdentifier, ip, actionType, ); return originalImplementation.resendCodePOST!(input); }, }; }, }, // highlight-end }), ], }); ``` ```go return forwardedFor } return "127.0.0.1" } func getBruteForceConfig(userIdentifier string, ip string, prefix string) []BruteForceConfig { var key string if prefix != "" { key = prefix + "-" } return []BruteForceConfig{ { Key: key + userIdentifier, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, { Key: key + ip, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, } } func main() { supertokens.Init(supertokens.TypeInput{ RecipeList: []supertokens.Recipe{ passwordless.Init(plessmodels.TypeInput{ FlowType: "USER_INPUT_CODE", ContactMethodPhone: plessmodels.ContactMethodPhoneConfig{ Enabled: true, }, Override: &plessmodels.OverrideStruct{ APIs: func(originalImplementation plessmodels.APIInterface) plessmodels.APIInterface { originalCreateCodePOST := *originalImplementation.CreateCodePOST (*originalImplementation.CreateCodePOST) = func(email *string, phoneNumber *string, tenantId string, options plessmodels.APIOptions, userContext supertokens.UserContext) (plessmodels.CreateCodePOSTResponse, error) { actionType := "passwordless-send-sms" ip := getIpFromRequest(options.Req) var key string if email != nil { key = *email } else { key = *phoneNumber } bruteForceConfig := getBruteForceConfig(key, ip, actionType) fmt.Println(bruteForceConfig) return originalCreateCodePOST(email, phoneNumber, tenantId, options, userContext) } originalResendCodePOST := *originalImplementation.ResendCodePOST (*originalImplementation.ResendCodePOST) = func(deviceID string, preAuthSessionID string, tenantId string, options plessmodels.APIOptions, userContext supertokens.UserContext) (plessmodels.ResendCodePOSTResponse, error) { // retreive user details codesInfo, err := passwordless.ListCodesByDeviceID(tenantId, deviceID, userContext) if err != nil { return plessmodels.ResendCodePOSTResponse{}, err } var email *string var phoneNumber *string if codesInfo.Email != nil { email = codesInfo.Email } if codesInfo.PhoneNumber != nil { phoneNumber = codesInfo.PhoneNumber } actionType := "passwordless-send-sms" ip := getIpFromRequest(options.Req) key := "" if email != nil { key = *email } else { key = *phoneNumber } bruteForceConfig := getBruteForceConfig(key, ip, actionType) fmt.Println(bruteForceConfig) return originalResendCodePOST(deviceID, preAuthSessionID, tenantId, options, userContext) } return originalImplementation }, }, }), }, }) } ``` ```python from typing import Dict, Any, Union, List from supertokens_python import init, InputAppInfo from supertokens_python.recipe import passwordless from supertokens_python.recipe.passwordless.interfaces import APIInterface, APIOptions from supertokens_python.recipe.passwordless.asyncio import list_codes_by_device_id from supertokens_python.framework import BaseRequest from supertokens_python.recipe.session import SessionContainer def get_ip_from_request(req: BaseRequest) -> str: forwarded_for = req.get_header("x-forwarded-for") if forwarded_for: return forwarded_for return "127.0.0.1" def get_brute_force_config( user_identifier: Union[str, None], ip: str, prefix: Union[str, None] = None ) -> List[Dict[str, Any]]: return [ { "key": f"{prefix}-{user_identifier}" if prefix else user_identifier, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, { "key": f"{prefix}-{ip}" if prefix else ip, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, ] # highlight-start def override_passwordless_apis(original_implementation: APIInterface): original_create_code_post = original_implementation.create_code_post async def create_code_post( email: Union[str, None], phone_number: Union[str, None], session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): action_type = "passwordless-send-sms" ip = get_ip_from_request(api_options.request) identifier = None if email is not None: identifier = email elif phone_number is not None: identifier = phone_number brute_force_config = get_brute_force_config(identifier, ip, action_type) print(brute_force_config) # We need to call the original implementation of create_code_post. response = await original_create_code_post( email, phone_number, session, should_try_linking_with_session_user, tenant_id, api_options, user_context, ) return response original_implementation.create_code_post = create_code_post original_resend_code_post = original_implementation.resend_code_post async def resend_code_post( device_id: str, pre_auth_session_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): action_type = "passwordless-send-sms" ip = get_ip_from_request(api_options.request) email = None phone_number = None codes = await list_codes_by_device_id( tenant_id=tenant_id, device_id=device_id, user_context=user_context ) if codes is not None: email = codes.email phone_number = codes.phone_number identifier = None if email is not None: identifier = email elif phone_number is not None: identifier = phone_number brute_force_config = get_brute_force_config(identifier, ip, action_type) print(brute_force_config) # We need to call the original implementation of resend_code_post. response = await original_resend_code_post( device_id, pre_auth_session_id, session, should_try_linking_with_session_user, tenant_id, api_options, user_context, ) return response original_implementation.resend_code_post = resend_code_post return original_implementation # highlight-end init( app_info=InputAppInfo(api_domain="...", app_name="...", website_domain="..."), framework="...", # type: ignore recipe_list=[ passwordless.init( # highlight-start flow_type="USER_INPUT_CODE_AND_MAGIC_LINK", contact_config=passwordless.ContactEmailOrPhoneConfig(), override=passwordless.InputOverrideConfig(apis=override_passwordless_apis), # highlight-end ) ], ) ``` ### 3. Call the protection service To use the service, send requests to the appropriate regional endpoint based on your location: - **US Region (N. Virginia)**: `https://security-us-east-1.aws.supertokens.io/v1/security` - **EU Region (Ireland)**: `https://security-eu-west-1.aws.supertokens.io/v1/security` - **APAC Region (Singapore)**: `https://security-ap-southeast-1.aws.SuperTokens.io/v1/security` ```bash curl --location --request POST 'https://security-us-east-1.aws.supertokens.io/v1/security' \ --header 'Authorization: Bearer ' \ --header 'Content-Type: application/json' \ --data-raw '{ "email": "user@email.com", "phoneNumber": "+1234567890", "passwordHash": "9cf95dacd226dcf43da376cdb6cbba7035218920", "requestId": "some-request-id", "actionType": "emailpassword-sign-in", "bruteForce": [ { "key": "some-key", "maxRequests": [ { "limit": 1, "perTimeIntervalMS": 1000 } ] } ] }' ``` ```tsx const REGION = "us-east-1"; // or "eu-west-1" or "ap-southeast-1" const SECRET_API_KEY = ""; const url = `https://security-${REGION}.aws.supertokens.io/v1/security`; const payload = { email: "user@email.com", phoneNumber: "+1234567890", passwordHashPrefix: "9cf95dacd226dcf43da376cdb6cbba7035218920", requestId: "some-request-id", actionType: "emailpassword-sign-in", bruteForce: [ { key: "some-key", maxRequests: [ { limit: 1, perTimeIntervalMS: 1000, }, ], }, ], }; fetch(url, { method: 'POST', headers: { 'Authorization': 'Bearer ' + SECRET_API_KEY, 'Content-Type': 'application/json', }, body: JSON.stringify(payload) }) .then(response => response.json()) .then(json => console.log(json)) .catch(err => console.error(err)); ``` ```python const SECRET_API_KEY = ""; // Your secret API key that you received from the SuperTokens team // The full URL with the correct region will be provided by the SuperTokens team const ANOMALY_DETECTION_API_URL = "https://security-.aws.supertokens.io/v1/security"; async function handleSecurityChecks(input: { actionType?: string; email?: string; phoneNumber?: string; password?: string; requestId?: string; bruteForceConfig?: { key: string; maxRequests: { limit: number; perTimeIntervalMS: number; }[]; }[]; }): Promise< | { status: "GENERAL_ERROR"; message: string; } | undefined > { let requestBody: { email?: string; phoneNumber?: string; actionType?: string; requestId?: string; passwordHashPrefix?: string; bruteForce?: { key: string; maxRequests: { limit: number; perTimeIntervalMS: number; }[]; }[]; } = {}; if (input.requestId !== undefined) { requestBody.requestId = input.requestId; } let passwordHash: string | undefined; if (input.password !== undefined) { let shasum = createHash("sha1"); shasum.update(input.password); passwordHash = shasum.digest("hex"); requestBody.passwordHashPrefix = passwordHash.slice(0, 5); } requestBody.bruteForce = input.bruteForceConfig; requestBody.email = input.email; requestBody.phoneNumber = input.phoneNumber; requestBody.actionType = input.actionType; let response; try { response = await axios.post(ANOMALY_DETECTION_API_URL, requestBody, { headers: { Authorization: `Bearer ${SECRET_API_KEY}`, "Content-Type": "application/json", }, }); } catch (err) { // silently fail in order to not break the auth flow console.error(err); return; } let responseData = response.data; if (responseData.bruteForce.detected) { return { status: "GENERAL_ERROR", message: "Too many requests. Please try again later.", }; } if (responseData.requestIdInfo?.isUsingTor) { return { status: "GENERAL_ERROR", message: "Tor activity detected. Please use a regular browser.", }; } if (responseData.requestIdInfo?.vpn?.result) { return { status: "GENERAL_ERROR", message: "VPN activity detected. Please use a regular network.", }; } if (responseData.requestIdInfo?.botDetected) { return { status: "GENERAL_ERROR", message: "Bot activity detected.", }; } if (responseData?.passwordBreaches && passwordHash) { const suffix = passwordHash.slice(5).toUpperCase(); const foundPasswordHash = responseData?.passwordBreaches[suffix]; if (foundPasswordHash) { return { status: "GENERAL_ERROR", message: "This password has been detected in a breach. Please set a different password.", }; } } return undefined; } function getIpFromRequest(req: Request): string { let headers: { [key: string]: string } = {}; for (let key of Object.keys(req.headers)) { headers[key] = (req as any).headers[key]!; } return (req as any).headers["x-forwarded-for"] || "127.0.0.1"; } const getBruteForceConfig = ( userIdentifier: string, ip: string, prefix?: string, ) => [ { key: `${prefix ? `${prefix}-` : ""}${userIdentifier}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, { key: `${prefix ? `${prefix}-` : ""}${ip}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, ]; // backend SuperTokens.init({ appInfo: { apiDomain: "...", appName: "...", websiteDomain: "...", }, supertokens: { connectionURI: "...", }, recipeList: [ EmailPassword.init({ // highlight-start override: { apis: (originalImplementation) => { return { ...originalImplementation, signUpPOST: async function (input) { // We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc. const requestId = (await input.options.req.getJSONBody()) .requestId; if (!requestId) { return { status: "GENERAL_ERROR", message: "The request ID is required", }; } const actionType = "emailpassword-sign-up"; const ip = getIpFromRequest(input.options.req.original); let email = input.formFields.filter((f) => f.id === "email")[0] .value as string; let password = input.formFields.filter( (f) => f.id === "password", )[0].value as string; const bruteForceConfig = getBruteForceConfig( email, ip, actionType, ); // we check the anomaly detection service before calling the original implementation of signUp let securityCheckResponse = await handleSecurityChecks({ requestId, email, password, bruteForceConfig, actionType, }); if (securityCheckResponse !== undefined) { return securityCheckResponse; } return originalImplementation.signUpPOST!(input); }, signInPOST: async function (input) { // We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc. const requestId = (await input.options.req.getJSONBody()) .requestId; if (!requestId) { return { status: "GENERAL_ERROR", message: "The request ID is required", }; } const actionType = "emailpassword-sign-in"; const ip = getIpFromRequest(input.options.req.original); let email = input.formFields.filter((f) => f.id === "email")[0] .value as string; const bruteForceConfig = getBruteForceConfig( email, ip, actionType, ); // we check the anomaly detection service before calling the original implementation of signIn let securityCheckResponse = await handleSecurityChecks({ requestId, email, bruteForceConfig, actionType, }); if (securityCheckResponse !== undefined) { return securityCheckResponse; } return originalImplementation.signInPOST!(input); }, generatePasswordResetTokenPOST: async function (input) { // We need to generate a request ID in order to detect possible bots, suspicious IP addresses, etc. const requestId = (await input.options.req.getJSONBody()) .requestId; if (!requestId) { return { status: "GENERAL_ERROR", message: "The request ID is required", }; } const actionType = "send-password-reset-email"; const ip = getIpFromRequest(input.options.req.original); let email = input.formFields.filter((f) => f.id === "email")[0] .value as string; const bruteForceConfig = getBruteForceConfig( email, ip, actionType, ); // we check the anomaly detection service before calling the original implementation of generatePasswordResetToken let securityCheckResponse = await handleSecurityChecks({ requestId, email, bruteForceConfig, actionType, }); if (securityCheckResponse !== undefined) { return securityCheckResponse; } return originalImplementation.generatePasswordResetTokenPOST!( input, ); }, passwordResetPOST: async function (input) { let password = input.formFields.filter( (f) => f.id === "password", )[0].value as string; let securityCheckResponse = await handleSecurityChecks({ password, }); if (securityCheckResponse !== undefined) { return securityCheckResponse; } return originalImplementation.passwordResetPOST!(input); }, }; }, }, // highlight-end }), ], }); ``` The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows: - We get the request ID from the request body. This is a unique ID for the request. - Define the action type based on the API you call. - We get the email and password from the form fields. - We get the IP address from the request. - We create the brute force configuration from the email, IP address, and action type. This configuration allows a number of requests over a time interval per: 1. Action and email/phone number. 2. Action and IP address. - We call the anomaly detection service to check if the request is permissible. - If the request is not allowed, it returns a descriptive error response. - If the request is permissible, it calls the original implementation of the API. - We return the response from the original implementation of the API. ```go // The full URL with the correct region will be provided by the SuperTokens team const ANOMALY_DETECTION_API_URL = "https://security-.aws.supertokens.io/v1/security" type SecurityCheckInput struct { ActionType string `json:"actionType,omitempty"` Email string `json:"email,omitempty"` PhoneNumber string `json:"phoneNumber,omitempty"` Password string `json:"password,omitempty"` RequestID string `json:"requestId,omitempty"` BruteForceConfig []BruteForceConfig `json:"bruteForceConfig,omitempty"` } type BruteForceConfig struct { Key string `json:"key"` MaxRequests []MaxRequests `json:"maxRequests"` } type MaxRequests struct { Limit int `json:"limit"` PerTimeIntervalMS int `json:"perTimeIntervalMS"` } type ReqBody struct { RequestID *string `json:"requestId"` } func getIpFromRequest(req *http.Request) string { if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" { return forwardedFor } return "127.0.0.1" } func getBruteForceConfig(userIdentifier string, ip string, prefix string) []BruteForceConfig { var key string if prefix != "" { key = prefix + "-" } return []BruteForceConfig{ { Key: key + userIdentifier, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, { Key: key + ip, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, } } func handleSecurityChecks(input SecurityCheckInput) (*supertokens.GeneralErrorResponse, error) { requestBody := make(map[string]interface{}) if input.RequestID != "" { requestBody["requestId"] = input.RequestID } var passwordHash string if input.Password != "" { hash := sha1.New() hash.Write([]byte(input.Password)) passwordHash = hex.EncodeToString(hash.Sum(nil)) requestBody["passwordHashPrefix"] = passwordHash[:5] } requestBody["bruteForce"] = input.BruteForceConfig requestBody["email"] = input.Email requestBody["phoneNumber"] = input.PhoneNumber requestBody["actionType"] = input.ActionType jsonBody, err := json.Marshal(requestBody) if err != nil { return nil, err } req, err := http.NewRequest("POST", ANOMALY_DETECTION_API_URL, bytes.NewBuffer(jsonBody)) if err != nil { // silently fail in order to not break the auth flow return nil, nil } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+SECRET_API_KEY) client := &http.Client{} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() var responseData map[string]interface{} err = json.NewDecoder(resp.Body).Decode(&responseData) if err != nil { return nil, err } if bruteForce, ok := responseData["bruteForce"].(map[string]interface{}); ok { if detected, ok := bruteForce["detected"].(bool); ok && detected { return &supertokens.GeneralErrorResponse{ Message: "Too many requests. Please try again later.", }, nil } } if requestIdInfo, ok := responseData["requestIdInfo"].(map[string]interface{}); ok { if isUsingTor, ok := requestIdInfo["isUsingTor"].(bool); ok && isUsingTor { return &supertokens.GeneralErrorResponse{ Message: "Tor activity detected. Please use a regular browser.", }, nil } if vpn, ok := requestIdInfo["vpn"].(map[string]interface{}); ok { if result, ok := vpn["result"].(bool); ok && result { return &supertokens.GeneralErrorResponse{ Message: "VPN activity detected. Please use a regular network.", }, nil } } if botDetected, ok := requestIdInfo["botDetected"].(bool); ok && botDetected { return &supertokens.GeneralErrorResponse{ Message: "Bot activity detected.", }, nil } } if passwordBreaches, ok := responseData["passwordBreaches"].(map[string]interface{}); ok { passwordHashSuffix := passwordHash[5:] if _, ok := passwordBreaches[passwordHashSuffix]; ok { return &supertokens.GeneralErrorResponse{ Message: "This password has been detected in a breach. Please set a different password.", }, nil } } return nil, nil } func main() { supertokens.Init(supertokens.TypeInput{ RecipeList: []supertokens.Recipe{ emailpassword.Init(&epmodels.TypeInput{ Override: &epmodels.OverrideStruct{ APIs: func(originalImplementation epmodels.APIInterface) epmodels.APIInterface { // rewrite the original implementation of SignUpPOST originalSignUpPOST := *originalImplementation.SignUpPOST (*originalImplementation.SignUpPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.SignUpPOSTResponse, error) { // Generate request ID for bot and suspicious IP detection var reqBody ReqBody err := json.NewDecoder(options.Req.Body).Decode(&reqBody) if err != nil { return epmodels.SignUpPOSTResponse{}, err } if reqBody.RequestID == nil { return epmodels.SignUpPOSTResponse{ GeneralError: &supertokens.GeneralErrorResponse{ Message: "The request ID is required", }, }, nil } requestId := *reqBody.RequestID actionType := "emailpassword-sign-up" ip := getIpFromRequest(options.Req) email := "" password := "" for _, field := range formFields { if field.ID == "email" || field.ID == "password" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.SignUpPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } if field.ID == "email" { email = valueAsString } else { password = valueAsString } } } bruteForceConfig := getBruteForceConfig(email, ip, actionType) // Check anomaly detection service before proceeding checkErr, err := handleSecurityChecks( SecurityCheckInput{ ActionType: actionType, Email: email, RequestID: requestId, BruteForceConfig: bruteForceConfig, Password: password, }, ) if err != nil { return epmodels.SignUpPOSTResponse{}, err } if checkErr != nil { return epmodels.SignUpPOSTResponse{ GeneralError: checkErr, }, nil } // pre API logic... resp, err := originalSignUpPOST(formFields, tenantId, options, userContext) if err != nil { return epmodels.SignUpPOSTResponse{}, err } return resp, nil } // rewrite the original implementation of SignInPOST originalSignInPOST := *originalImplementation.SignInPOST (*originalImplementation.SignInPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.SignInPOSTResponse, error) { // Generate request ID for bot and suspicious IP detection var reqBody ReqBody err := json.NewDecoder(options.Req.Body).Decode(&reqBody) if err != nil { return epmodels.SignInPOSTResponse{}, err } if reqBody.RequestID == nil { return epmodels.SignInPOSTResponse{ GeneralError: &supertokens.GeneralErrorResponse{ Message: "The request ID is required", }, }, nil } requestId := *reqBody.RequestID actionType := "emailpassword-sign-in" ip := getIpFromRequest(options.Req) email := "" password := "" for _, field := range formFields { if field.ID == "email" || field.ID == "password" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.SignInPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } if field.ID == "email" { email = valueAsString } else { password = valueAsString } } } bruteForceConfig := getBruteForceConfig(email, ip, actionType) // Check anomaly detection service before proceeding checkErr, err := handleSecurityChecks( SecurityCheckInput{ ActionType: actionType, Email: email, RequestID: requestId, BruteForceConfig: bruteForceConfig, Password: password, }, ) if err != nil { return epmodels.SignInPOSTResponse{}, err } if checkErr != nil { return epmodels.SignInPOSTResponse{ GeneralError: checkErr, }, nil } // pre API logic... resp, err := originalSignInPOST(formFields, tenantId, options, userContext) if err != nil { return epmodels.SignInPOSTResponse{}, err } return resp, nil } // rewrite the original implementation of GeneratePasswordResetTokenPOST originalGeneratePasswordResetTokenPOST := *originalImplementation.GeneratePasswordResetTokenPOST (*originalImplementation.GeneratePasswordResetTokenPOST) = func(formFields []epmodels.TypeFormField, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.GeneratePasswordResetTokenPOSTResponse, error) { // Generate request ID for bot and suspicious IP detection var reqBody ReqBody err := json.NewDecoder(options.Req.Body).Decode(&reqBody) if err != nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err } if reqBody.RequestID == nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{ GeneralError: &supertokens.GeneralErrorResponse{ Message: "The request ID is required", }, }, nil } requestId := *reqBody.RequestID actionType := "send-password-reset-email" ip := getIpFromRequest(options.Req) email := "" for _, field := range formFields { if field.ID == "email" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } email = valueAsString } } bruteForceConfig := getBruteForceConfig(email, ip, actionType) // Check anomaly detection service before proceeding checkErr, err := handleSecurityChecks( SecurityCheckInput{ ActionType: actionType, Email: email, RequestID: requestId, BruteForceConfig: bruteForceConfig, }, ) if err != nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err } if checkErr != nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{ GeneralError: checkErr, }, nil } // pre API logic... resp, err := originalGeneratePasswordResetTokenPOST(formFields, tenantId, options, userContext) if err != nil { return epmodels.GeneratePasswordResetTokenPOSTResponse{}, err } return resp, nil } // rewrite the original implementation of PasswordResetPOST originalPasswordResetPOST := *originalImplementation.PasswordResetPOST (*originalImplementation.PasswordResetPOST) = func(formFields []epmodels.TypeFormField, token string, tenantId string, options epmodels.APIOptions, userContext supertokens.UserContext) (epmodels.ResetPasswordPOSTResponse, error) { password := "" for _, field := range formFields { if field.ID == "password" { valueAsString, asStrOk := field.Value.(string) if !asStrOk { return epmodels.ResetPasswordPOSTResponse{}, errors.New("Should never come here as we check the type during validation") } password = valueAsString } } // Check anomaly detection service before proceeding checkErr, err := handleSecurityChecks( SecurityCheckInput{ Password: password, }, ) if err != nil { return epmodels.ResetPasswordPOSTResponse{}, err } if checkErr != nil { return epmodels.ResetPasswordPOSTResponse{ GeneralError: checkErr, }, nil } // First we call the original implementation resp, err := originalPasswordResetPOST(formFields, token, tenantId, options, userContext) if err != nil { return epmodels.ResetPasswordPOSTResponse{}, err } return resp, nil } return originalImplementation }, Functions: func(originalImplementation epmodels.RecipeInterface) epmodels.RecipeInterface { return originalImplementation }, }, }), }, }) } ``` The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows: - We get the request ID from the request body. This is a unique ID for the request. - Define the action type based on the API you call. - We get the email and password from the form fields. - We get the IP address from the request. - We create the brute force configuration from the email, IP address, and action type. This configuration allows a number of requests over a time interval per: 1. Action and email/phone number. 2. Action and IP address. - We call the anomaly detection service to check if the request is permissible. - If the request is not allowed, it returns a descriptive error response. - If the request is permissible, it calls the original implementation of the API. - We return the response from the original implementation of the API. ```python from httpx import AsyncClient from hashlib import sha1 from typing import Dict, Any, Union, List from supertokens_python import init, InputAppInfo from supertokens_python.recipe import emailpassword from supertokens_python.recipe.emailpassword.interfaces import APIInterface, APIOptions from supertokens_python.recipe.emailpassword.types import FormField from supertokens_python.framework import BaseRequest from supertokens_python.types import GeneralErrorResponse from supertokens_python.recipe.session import SessionContainer SECRET_API_KEY = "" # Your secret API key that you received from the SuperTokens team # The full URL with the correct region will be provided by the SuperTokens team ANOMALY_DETECTION_API_URL = "https://security-.aws.supertokens.io/v1/security" async def handle_security_checks( request_id: Union[str, None], password: Union[str, None], brute_force_config: Union[List[Dict[str, Any]], None], email: Union[str, None], phone_number: Union[str, None], action_type: Union[str, None], ) -> Union[GeneralErrorResponse, None]: request_body: Dict[str, Any] = {} if request_id is not None: request_body["requestId"] = request_id password_hash = None if password is not None: password_hash = sha1(password.encode()).hexdigest() request_body["passwordHashPrefix"] = password_hash[:5] request_body["bruteForce"] = brute_force_config request_body["email"] = email request_body["phoneNumber"] = phone_number request_body["actionType"] = action_type try: async with AsyncClient(timeout=10.0) as client: response = await client.post( ANOMALY_DETECTION_API_URL, json=request_body, headers={ "Authorization": f"Bearer {SECRET_API_KEY}", "Content-Type": "application/json", }, ) # type: ignore response_data = response.json() except: # silently fail in order to not break the auth flow return None if response_data.get("bruteForce", {}).get("detected"): return GeneralErrorResponse( message="Too many requests. Please try again later." ) if response_data.get("requestIdInfo", {}).get("isUsingTor"): return GeneralErrorResponse( message="Tor activity detected. Please use a regular browser." ) if response_data.get("requestIdInfo", {}).get("vpn", {}).get("result"): return GeneralErrorResponse( message="VPN activity detected. Please use a regular network." ) if response_data.get("requestIdInfo", {}).get("botDetected"): return GeneralErrorResponse(message="Bot activity detected.") if response_data.get("passwordBreaches") and password_hash is not None: password_hash_suffix = password_hash[5:] if password_hash_suffix in response_data["passwordBreaches"]: return GeneralErrorResponse( message="This password has been detected in a breach. Please set a different password." ) return None def get_ip_from_request(req: BaseRequest) -> str: forwarded_for = req.get_header("x-forwarded-for") if forwarded_for: return forwarded_for return "127.0.0.1" def get_brute_force_config( user_identifier: Union[str, None], ip: str, prefix: Union[str, None] = None ) -> List[Dict[str, Any]]: return [ { "key": f"{prefix}-{user_identifier}" if prefix else user_identifier, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, { "key": f"{prefix}-{ip}" if prefix else ip, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, ] # highlight-start def override_email_password_apis(original_implementation: APIInterface): original_sign_up_post = original_implementation.sign_up_post async def sign_up_post( form_fields: List[FormField], tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], api_options: APIOptions, user_context: Dict[str, Any], ): request_body = await api_options.request.json() if not request_body: return GeneralErrorResponse(message="The request body is required") request_id = request_body.get("requestId") if not request_id: return GeneralErrorResponse(message="The request ID is required") action_type = "emailpassword-sign-in" ip = get_ip_from_request(api_options.request) email = None password = None for field in form_fields: if field.id == "email": email = field.value if field.id == "password": password = field.value brute_force_config = get_brute_force_config(email, ip, action_type) # we check the anomaly detection service before calling the original implementation of signUp security_check_response = await handle_security_checks( request_id=request_id, password=password, brute_force_config=brute_force_config, email=email, phone_number=None, action_type=action_type, ) if security_check_response is not None: return security_check_response # We need to call the original implementation of sign_up_post. response = await original_sign_up_post( form_fields, tenant_id, session, should_try_linking_with_session_user, api_options, user_context, ) return response original_implementation.sign_up_post = sign_up_post original_sign_in_post = original_implementation.sign_in_post async def sign_in_post( form_fields: List[FormField], tenant_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], api_options: APIOptions, user_context: Dict[str, Any], ): request_body = await api_options.request.json() if not request_body: return GeneralErrorResponse(message="The request body is required") request_id = request_body.get("requestId") if not request_id: return GeneralErrorResponse(message="The request ID is required") action_type = "emailpassword-sign-in" ip = get_ip_from_request(api_options.request) email = None for field in form_fields: if field.id == "email": email = field.value brute_force_config = get_brute_force_config(email, ip, action_type) # we check the anomaly detection service before calling the original implementation of sign_in_post security_check_response = await handle_security_checks( request_id=request_id, password=None, brute_force_config=brute_force_config, email=email, phone_number=None, action_type=action_type, ) if security_check_response is not None: return security_check_response # We need to call the original implementation of sign_in_post. response = await original_sign_in_post( form_fields, tenant_id, session, should_try_linking_with_session_user, api_options, user_context, ) return response original_implementation.sign_in_post = sign_in_post original_generate_password_reset_token_post = ( original_implementation.generate_password_reset_token_post ) async def generate_password_reset_token_post( form_fields: List[FormField], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): request_body = await api_options.request.json() if not request_body: return GeneralErrorResponse(message="The request body is required") request_id = request_body.get("requestId") if not request_id: return GeneralErrorResponse(message="The request ID is required") action_type = "send-password-reset-email" ip = get_ip_from_request(api_options.request) email = None for field in form_fields: if field.id == "email": email = field.value brute_force_config = get_brute_force_config(email, ip, action_type) # we check the anomaly detection service before calling the original implementation of generate_password_reset_token_post security_check_response = await handle_security_checks( request_id=request_id, password=None, brute_force_config=brute_force_config, email=email, phone_number=None, action_type=action_type, ) if security_check_response is not None: return security_check_response # We need to call the original implementation of generate_password_reset_token_post. response = await original_generate_password_reset_token_post( form_fields, tenant_id, api_options, user_context ) return response original_implementation.generate_password_reset_token_post = ( generate_password_reset_token_post ) original_password_reset_post = original_implementation.password_reset_post async def password_reset_post( form_fields: List[FormField], token: str, tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): password = None for field in form_fields: if field.id == "password": password = field.value # we check the anomaly detection service before calling the original implementation of password_reset_post security_check_response = await handle_security_checks( request_id=None, password=password, brute_force_config=None, email=None, phone_number=None, action_type=None, ) if security_check_response is not None: return security_check_response response = await original_password_reset_post( form_fields, token, tenant_id, api_options, user_context ) return response original_implementation.password_reset_post = password_reset_post return original_implementation # highlight-end init( app_info=InputAppInfo(api_domain="...", app_name="...", website_domain="..."), framework="...", # type: ignore recipe_list=[ emailpassword.init( # highlight-start override=emailpassword.InputOverrideConfig( apis=override_email_password_apis ) # highlight-end ) ], ) ``` The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows: - We get the request ID from the request body. This is a unique ID for the request. - Define the action type based on the API you call. - We get the email and password from the form fields. - We get the IP address from the request. - We create the brute force configuration from the email, IP address, and action type. This configuration allows a number of requests over a time interval per: 1. Action and email/phone number. 2. Action and IP address. - We call the anomaly detection service to check if the request is permissible. - If the request is not allowed, it returns a descriptive error response. - If the request is permissible, it calls the original implementation of the API. - We return the response from the original implementation of the API. ### Passwordless ```tsx const SECRET_API_KEY = ""; // Your secret API key that you received from the SuperTokens team const ANOMALY_DETECTION_API_URL = "https://security-us-east-1.aws.supertokens.io/v1/security"; async function handleSecurityChecks(input: { actionType?: string; email?: string; phoneNumber?: string; bruteForceConfig?: { key: string; maxRequests: { limit: number; perTimeIntervalMS: number; }[]; }[]; }): Promise< | { status: "GENERAL_ERROR"; message: string; } | undefined > { let requestBody: { email?: string; phoneNumber?: string; actionType?: string; bruteForce?: { key: string; maxRequests: { limit: number; perTimeIntervalMS: number; }[]; }[]; } = {}; requestBody.bruteForce = input.bruteForceConfig; requestBody.email = input.email; requestBody.phoneNumber = input.phoneNumber; requestBody.actionType = input.actionType; let response; try { response = await axios.post(ANOMALY_DETECTION_API_URL, requestBody, { headers: { Authorization: `Bearer ${SECRET_API_KEY}`, "Content-Type": "application/json", }, }); } catch (err) { // silently fail in order to not break the auth flow console.error(err); return; } let responseData = response.data; if (responseData.bruteForce.detected) { return { status: "GENERAL_ERROR", message: "Too many requests. Please try again later.", }; } return undefined; } function getIpFromRequest(req: Request): string { let headers: { [key: string]: string } = {}; for (let key of Object.keys(req.headers)) { headers[key] = (req as any).headers[key]!; } return (req as any).headers["x-forwarded-for"] || "127.0.0.1"; } const getBruteForceConfig = ( userIdentifier: string, ip: string, prefix?: string, ) => [ { key: `${prefix ? `${prefix}-` : ""}${userIdentifier}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, { key: `${prefix ? `${prefix}-` : ""}${ip}`, maxRequests: [ { limit: 5, perTimeIntervalMS: 60 * 1000 }, { limit: 15, perTimeIntervalMS: 60 * 60 * 1000 }, ], }, ]; SuperTokens.init({ // @ts-ignore framework: "...", // @ts-ignore appInfo: { /*...*/ }, recipeList: [ Passwordless.init({ // ... other customisations ... // highlight-start contactMethod: "EMAIL_OR_PHONE", flowType: "USER_INPUT_CODE_AND_MAGIC_LINK", override: { apis: (originalImplementation) => { return { ...originalImplementation, createCodePOST: async function (input) { const actionType = "passwordless-send-sms"; const ip = getIpFromRequest(input.options.req.original); const emailOrPhoneNumber = "email" in input ? input.email : input.phoneNumber; const bruteForceConfig = getBruteForceConfig( emailOrPhoneNumber, ip, actionType, ); // we check the anomaly detection service before calling the original implementation of createCodePOST let securityCheckResponse = await handleSecurityChecks({ bruteForceConfig, actionType, }); if (securityCheckResponse !== undefined) { return securityCheckResponse; } return originalImplementation.createCodePOST!(input); }, resendCodePOST: async function (input) { const actionType = "passwordless-send-sms"; const ip = getIpFromRequest(input.options.req.original); let codesInfo = await Passwordless.listCodesByPreAuthSessionId({ tenantId: input.tenantId, preAuthSessionId: input.preAuthSessionId, }); const phoneNumber = codesInfo && "phoneNumber" in codesInfo ? codesInfo.phoneNumber : undefined; const email = codesInfo && "email" in codesInfo ? codesInfo.email : undefined; const userIdentifier = email || phoneNumber || input.deviceId; const bruteForceConfig = getBruteForceConfig( userIdentifier, ip, actionType, ); // we check the anomaly detection service before calling the original implementation of resendCodePOST let securityCheckResponse = await handleSecurityChecks({ phoneNumber, email, bruteForceConfig, actionType, }); if (securityCheckResponse !== undefined) { return securityCheckResponse; } return originalImplementation.resendCodePOST!(input); }, }; }, }, // highlight-end }), ], }); ``` The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows: - Define the action type based on the API you call. - We get the email or the phone number from the form fields. - We get the IP address from the request. - We create the brute force configuration from the email, IP address, and action type. This configuration allows a number of requests over a time interval per: 1. Action and email/phone number. 2. Action and IP address. - The anomaly detection service checks if the request passes the allowed criteria (only brute force detection occurs here). - If the request is not allowed, the system returns a descriptive error response. - If the request passes the allowed criteria, the original implementation of the API executes. - We return the response from the original implementation of the API. ```go // The full URL with the correct region will be provided by the SuperTokens team const ANOMALY_DETECTION_API_URL = "https://security-.aws.supertokens.io/v1/security" type SecurityCheckInput struct { ActionType string `json:"actionType,omitempty"` Email string `json:"email,omitempty"` PhoneNumber string `json:"phoneNumber,omitempty"` BruteForceConfig []BruteForceConfig `json:"bruteForceConfig,omitempty"` } type BruteForceConfig struct { Key string `json:"key"` MaxRequests []MaxRequests `json:"maxRequests"` } type MaxRequests struct { Limit int `json:"limit"` PerTimeIntervalMS int `json:"perTimeIntervalMS"` } func getIpFromRequest(req *http.Request) string { if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" { return forwardedFor } return "127.0.0.1" } func getBruteForceConfig(userIdentifier string, ip string, prefix string) []BruteForceConfig { var key string if prefix != "" { key = prefix + "-" } return []BruteForceConfig{ { Key: key + userIdentifier, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, { Key: key + ip, MaxRequests: []MaxRequests{ {Limit: 5, PerTimeIntervalMS: 60 * 1000}, {Limit: 15, PerTimeIntervalMS: 60 * 60 * 1000}, }, }, } } func handleSecurityChecks(input SecurityCheckInput) (*supertokens.GeneralErrorResponse, error) { requestBody := make(map[string]interface{}) requestBody["bruteForce"] = input.BruteForceConfig requestBody["email"] = input.Email requestBody["phoneNumber"] = input.PhoneNumber requestBody["actionType"] = input.ActionType jsonBody, err := json.Marshal(requestBody) if err != nil { return nil, err } req, err := http.NewRequest("POST", ANOMALY_DETECTION_API_URL, bytes.NewBuffer(jsonBody)) if err != nil { // silently fail in order to not break the auth flow return nil, nil } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+SECRET_API_KEY) client := &http.Client{} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() var responseData map[string]interface{} err = json.NewDecoder(resp.Body).Decode(&responseData) if err != nil { return nil, err } if bruteForce, ok := responseData["bruteForce"].(map[string]interface{}); ok { if detected, ok := bruteForce["detected"].(bool); ok && detected { return &supertokens.GeneralErrorResponse{ Message: "Too many requests. Please try again later.", }, nil } } return nil, nil } func main() { supertokens.Init(supertokens.TypeInput{ RecipeList: []supertokens.Recipe{ passwordless.Init(plessmodels.TypeInput{ FlowType: "USER_INPUT_CODE", ContactMethodPhone: plessmodels.ContactMethodPhoneConfig{ Enabled: true, }, Override: &plessmodels.OverrideStruct{ APIs: func(originalImplementation plessmodels.APIInterface) plessmodels.APIInterface { originalCreateCodePOST := *originalImplementation.CreateCodePOST (*originalImplementation.CreateCodePOST) = func(email *string, phoneNumber *string, tenantId string, options plessmodels.APIOptions, userContext supertokens.UserContext) (plessmodels.CreateCodePOSTResponse, error) { actionType := "passwordless-send-sms" ip := getIpFromRequest(options.Req) var key string if email != nil { key = *email } else { key = *phoneNumber } bruteForceConfig := getBruteForceConfig(key, ip, actionType) // Check anomaly detection service before proceeding checkErr, err := handleSecurityChecks( SecurityCheckInput{ ActionType: actionType, Email: *email, PhoneNumber: *phoneNumber, BruteForceConfig: bruteForceConfig, }, ) if err != nil { return plessmodels.CreateCodePOSTResponse{}, err } if checkErr != nil { return plessmodels.CreateCodePOSTResponse{ GeneralError: checkErr, }, nil } return originalCreateCodePOST(email, phoneNumber, tenantId, options, userContext) } originalResendCodePOST := *originalImplementation.ResendCodePOST (*originalImplementation.ResendCodePOST) = func(deviceID string, preAuthSessionID string, tenantId string, options plessmodels.APIOptions, userContext supertokens.UserContext) (plessmodels.ResendCodePOSTResponse, error) { // retreive user details codesInfo, err := passwordless.ListCodesByDeviceID(tenantId, deviceID, userContext) if err != nil { return plessmodels.ResendCodePOSTResponse{}, err } var email *string var phoneNumber *string if codesInfo.Email != nil { email = codesInfo.Email } if codesInfo.PhoneNumber != nil { phoneNumber = codesInfo.PhoneNumber } actionType := "passwordless-send-sms" ip := getIpFromRequest(options.Req) key := "" if email != nil { key = *email } else { key = *phoneNumber } bruteForceConfig := getBruteForceConfig(key, ip, actionType) // Check anomaly detection service before proceeding checkErr, err := handleSecurityChecks( SecurityCheckInput{ ActionType: actionType, Email: *email, PhoneNumber: *phoneNumber, BruteForceConfig: bruteForceConfig, }, ) if err != nil { return plessmodels.ResendCodePOSTResponse{}, err } if checkErr != nil { return plessmodels.ResendCodePOSTResponse{ GeneralError: checkErr, }, nil } return originalResendCodePOST(deviceID, preAuthSessionID, tenantId, options, userContext) } return originalImplementation }, }, }), }, }) } ``` The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows: - Define the action type based on the API you call. - We get the email or the phone number from the form fields. - We get the IP address from the request. - We create the brute force configuration from the email, IP address, and action type. This configuration allows a number of requests over a time interval per: 1. Action and email/phone number. 2. Action and IP address. - The anomaly detection service checks if the request passes the allowed criteria (only brute force detection occurs here). - If the request is not allowed, the system returns a descriptive error response. - If the request passes the allowed criteria, the original implementation of the API executes. - We return the response from the original implementation of the API. ```python from httpx import AsyncClient from typing import Dict, Any, Union, List from supertokens_python import init, InputAppInfo from supertokens_python.recipe import passwordless from supertokens_python.recipe.passwordless.interfaces import APIInterface, APIOptions from supertokens_python.recipe.passwordless.asyncio import list_codes_by_device_id from supertokens_python.framework import BaseRequest from supertokens_python.types import GeneralErrorResponse from supertokens_python.recipe.session import SessionContainer SECRET_API_KEY = "" # Your secret API key that you received from the SuperTokens team # The full URL with the correct region will be provided by the SuperTokens team ANOMALY_DETECTION_API_URL = "https://security-.aws.supertokens.io/v1/security" async def handle_security_checks( request_id: Union[str, None], password: Union[str, None], brute_force_config: Union[List[Dict[str, Any]], None], email: Union[str, None], phone_number: Union[str, None], action_type: Union[str, None], ) -> Union[GeneralErrorResponse, None]: request_body: Dict[str, Any] = {} request_body["bruteForce"] = brute_force_config request_body["email"] = email request_body["phoneNumber"] = phone_number request_body["actionType"] = action_type try: async with AsyncClient(timeout=10.0) as client: response = await client.post( ANOMALY_DETECTION_API_URL, json=request_body, headers={ "Authorization": f"Bearer {SECRET_API_KEY}", "Content-Type": "application/json", }, ) # type: ignore response_data = response.json() except: # silently fail in order to not break the auth flow return None if response_data.get("bruteForce", {}).get("detected"): return GeneralErrorResponse( message="Too many requests. Please try again later." ) return None def get_ip_from_request(req: BaseRequest) -> str: forwarded_for = req.get_header("x-forwarded-for") if forwarded_for: return forwarded_for return "127.0.0.1" def get_brute_force_config( user_identifier: Union[str, None], ip: str, prefix: Union[str, None] = None ) -> List[Dict[str, Any]]: return [ { "key": f"{prefix}-{user_identifier}" if prefix else user_identifier, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, { "key": f"{prefix}-{ip}" if prefix else ip, "maxRequests": [ {"limit": 5, "perTimeIntervalMS": 60 * 1000}, {"limit": 15, "perTimeIntervalMS": 60 * 60 * 1000}, ], }, ] # highlight-start def override_passwordless_apis(original_implementation: APIInterface): original_create_code_post = original_implementation.create_code_post async def create_code_post( email: Union[str, None], phone_number: Union[str, None], session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): action_type = "passwordless-send-sms" ip = get_ip_from_request(api_options.request) identifier = None if email is not None: identifier = email elif phone_number is not None: identifier = phone_number brute_force_config = get_brute_force_config(identifier, ip, action_type) # we check the anomaly detection service before calling the original implementation of create_code_post security_check_response = await handle_security_checks( request_id=None, password=None, brute_force_config=brute_force_config, email=email, phone_number=phone_number, action_type=action_type, ) if security_check_response is not None: return security_check_response # We need to call the original implementation of create_code_post. response = await original_create_code_post( email, phone_number, session, should_try_linking_with_session_user, tenant_id, api_options, user_context, ) return response original_implementation.create_code_post = create_code_post original_resend_code_post = original_implementation.resend_code_post async def resend_code_post( device_id: str, pre_auth_session_id: str, session: Union[SessionContainer, None], should_try_linking_with_session_user: Union[bool, None], tenant_id: str, api_options: APIOptions, user_context: Dict[str, Any], ): action_type = "passwordless-send-sms" ip = get_ip_from_request(api_options.request) email = None phone_number = None codes = await list_codes_by_device_id( tenant_id=tenant_id, device_id=device_id, user_context=user_context ) if codes is not None: email = codes.email phone_number = codes.phone_number identifier = None if email is not None: identifier = email elif phone_number is not None: identifier = phone_number brute_force_config = get_brute_force_config(identifier, ip, action_type) # we check the anomaly detection service before calling the original implementation of resend_code_post security_check_response = await handle_security_checks( request_id=None, password=None, brute_force_config=brute_force_config, email=email, phone_number=phone_number, action_type=action_type, ) if security_check_response is not None: return security_check_response # We need to call the original implementation of resend_code_post. response = await original_resend_code_post( device_id, pre_auth_session_id, session, should_try_linking_with_session_user, tenant_id, api_options, user_context, ) return response original_implementation.resend_code_post = resend_code_post return original_implementation # highlight-end init( app_info=InputAppInfo(api_domain="...", app_name="...", website_domain="..."), framework="...", # type: ignore recipe_list=[ passwordless.init( # highlight-start flow_type="USER_INPUT_CODE_AND_MAGIC_LINK", contact_config=passwordless.ContactEmailOrPhoneConfig(), override=passwordless.InputOverrideConfig(apis=override_passwordless_apis), # highlight-end ) ], ) ``` The above code overrides the SuperTokens APIs and adding custom logic for anomaly detection. The steps when overriding the APIs are as follows: - Define the action type based on the API you call. - We get the email or the phone number from the form fields. - We get the IP address from the request. - We create the brute force configuration from the email, IP address, and action type. This configuration allows a number of requests over a time interval per: 1. Action and email/phone number. 2. Action and IP address. - The anomaly detection service checks if the request passes the allowed criteria (only brute force detection occurs here). - If the request is not allowed, the system returns a descriptive error response. - If the request passes the allowed criteria, the original implementation of the API executes. - We return the response from the original implementation of the API.