Skip to content

Commit b35eebb

Browse files
authored
Rate limit API requests and changed SQS reading speed (#969)
* Trying to use the @upstash/ratelimit package with ioredis… * WIP using the redis package instead * Revert the action back * Removed redis * Started refactoring * SQS setting for the poll interval. Set the default queue reading to be slower * API rate limiter as Express middleware * Organise imports * Fixed spelling mistake “limitter” * No authorization header response is problem+json
1 parent 9ecf077 commit b35eebb

File tree

7 files changed

+237
-15
lines changed

7 files changed

+237
-15
lines changed

apps/webapp/app/entry.server.tsx

+1
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,4 @@ const sqsEventConsumer = singleton("sqsEventConsumer", getSharedSqsEventConsumer
198198
export { wss } from "./v3/handleWebsockets.server";
199199
export { socketIo } from "./v3/handleSocketIo.server";
200200
export { registryProxy } from "./v3/registryProxy.server";
201+
export { apiRateLimiter } from "./services/apiRateLimit.server";

apps/webapp/app/env.server.ts

+14-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ const EnvironmentSchema = z.object({
5050
AWS_SQS_SECRET_ACCESS_KEY: z.string().optional(),
5151
/** Optional. Only used if you use the apps/proxy */
5252
AWS_SQS_QUEUE_URL: z.string().optional(),
53-
AWS_SQS_BATCH_SIZE: z.coerce.number().int().optional().default(10),
53+
AWS_SQS_BATCH_SIZE: z.coerce.number().int().optional().default(1),
54+
AWS_SQS_WAIT_TIME_MS: z.coerce.number().int().optional().default(100),
5455
DISABLE_SSE: z.string().optional(),
5556

5657
// Redis options
@@ -68,6 +69,18 @@ const EnvironmentSchema = z.object({
6869
TUNNEL_HOST: z.string().optional(),
6970
TUNNEL_SECRET_KEY: z.string().optional(),
7071

72+
//API Rate limiting
73+
/**
74+
* @example "60s"
75+
* @example "1m"
76+
* @example "1h"
77+
* @example "1d"
78+
* @example "1000ms"
79+
* @example "1000s"
80+
*/
81+
API_RATE_LIMIT_WINDOW: z.string().default("60s"),
82+
API_RATE_LIMIT_MAX: z.coerce.number().int().default(600),
83+
7184
//v3
7285
V3_ENABLED: z.string().default("false"),
7386
OTLP_EXPORTER_TRACES_URL: z.string().optional(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import { Ratelimit } from "@upstash/ratelimit";
2+
import { Request as ExpressRequest, Response as ExpressResponse, NextFunction } from "express";
3+
import Redis, { RedisOptions } from "ioredis";
4+
import { createHash } from "node:crypto";
5+
import { env } from "~/env.server";
6+
import { logger } from "./logger.server";
7+
8+
function createRedisRateLimitClient(
9+
redisOptions: RedisOptions
10+
): ConstructorParameters<typeof Ratelimit>[0]["redis"] {
11+
const redis = new Redis(redisOptions);
12+
13+
return {
14+
sadd: async <TData>(key: string, ...members: TData[]): Promise<number> => {
15+
return redis.sadd(key, members as (string | number | Buffer)[]);
16+
},
17+
eval: <TArgs extends unknown[], TData = unknown>(
18+
...args: [script: string, keys: string[], args: TArgs]
19+
): Promise<TData> => {
20+
const script = args[0];
21+
const keys = args[1];
22+
const argsArray = args[2];
23+
return redis.eval(
24+
script,
25+
keys.length,
26+
...keys,
27+
...(argsArray as (string | Buffer | number)[])
28+
) as Promise<TData>;
29+
},
30+
};
31+
}
32+
33+
type Options = {
34+
log?: {
35+
requests?: boolean;
36+
rejections?: boolean;
37+
};
38+
redis: RedisOptions;
39+
keyPrefix: string;
40+
pathMatchers: (RegExp | string)[];
41+
limiter: ConstructorParameters<typeof Ratelimit>[0]["limiter"];
42+
};
43+
44+
//returns an Express middleware that rate limits using the Bearer token in the Authorization header
45+
export function authorizationRateLimitMiddleware({
46+
redis,
47+
keyPrefix,
48+
limiter,
49+
pathMatchers,
50+
log = {
51+
rejections: true,
52+
requests: true,
53+
},
54+
}: Options) {
55+
const rateLimiter = new Ratelimit({
56+
redis: createRedisRateLimitClient(redis),
57+
limiter: limiter,
58+
ephemeralCache: new Map(),
59+
analytics: false,
60+
prefix: keyPrefix,
61+
});
62+
63+
return async (req: ExpressRequest, res: ExpressResponse, next: NextFunction) => {
64+
if (log.requests) {
65+
logger.info(`RateLimiter (${keyPrefix}): request to ${req.path}`);
66+
}
67+
68+
//first check if any of the pathMatchers match the request path
69+
const path = req.path;
70+
if (
71+
!pathMatchers.some((matcher) =>
72+
matcher instanceof RegExp ? matcher.test(path) : path === matcher
73+
)
74+
) {
75+
if (log.requests) {
76+
logger.info(`RateLimiter (${keyPrefix}): didn't match ${req.path}`);
77+
}
78+
return next();
79+
}
80+
81+
if (log.requests) {
82+
logger.info(`RateLimiter (${keyPrefix}): matched ${req.path}`);
83+
}
84+
85+
const authorizationValue = req.headers.authorization;
86+
if (!authorizationValue) {
87+
if (log.requests) {
88+
logger.info(`RateLimiter (${keyPrefix}): no key`);
89+
}
90+
res.setHeader("Content-Type", "application/problem+json");
91+
return res
92+
.status(401)
93+
.send(
94+
JSON.stringify(
95+
{
96+
title: "Unauthorized",
97+
status: 401,
98+
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401",
99+
detail: "No authorization header provided",
100+
},
101+
null,
102+
2
103+
)
104+
);
105+
}
106+
107+
const hash = createHash("sha256");
108+
hash.update(authorizationValue);
109+
const hashedAuthorizationValue = hash.digest("hex");
110+
111+
const { success, pending, limit, reset, remaining } = await rateLimiter.limit(
112+
hashedAuthorizationValue
113+
);
114+
115+
res.set("x-ratelimit-limit", limit.toString());
116+
res.set("x-ratelimit-remaining", remaining.toString());
117+
res.set("x-ratelimit-reset", reset.toString());
118+
119+
if (success) {
120+
if (log.requests) {
121+
logger.info(`RateLimiter (${keyPrefix}): under rate limit`, {
122+
limit,
123+
reset,
124+
remaining,
125+
hashedAuthorizationValue,
126+
});
127+
}
128+
return next();
129+
}
130+
131+
if (log.rejections) {
132+
logger.warn(`RateLimiter (${keyPrefix}): rate limit exceeded`, {
133+
limit,
134+
reset,
135+
remaining,
136+
pending,
137+
hashedAuthorizationValue,
138+
});
139+
}
140+
141+
res.setHeader("Content-Type", "application/problem+json");
142+
return res.status(429).send(
143+
JSON.stringify(
144+
{
145+
title: "Rate Limit Exceeded",
146+
status: 429,
147+
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429",
148+
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry after ${reset} seconds.`,
149+
reset: reset,
150+
limit: limit,
151+
},
152+
null,
153+
2
154+
)
155+
);
156+
};
157+
}
158+
159+
type Duration = Parameters<typeof Ratelimit.slidingWindow>[1];
160+
161+
export const apiRateLimiter = authorizationRateLimitMiddleware({
162+
keyPrefix: "ratelimit:api",
163+
redis: {
164+
port: env.REDIS_PORT,
165+
host: env.REDIS_HOST,
166+
username: env.REDIS_USERNAME,
167+
password: env.REDIS_PASSWORD,
168+
enableAutoPipelining: true,
169+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
170+
},
171+
limiter: Ratelimit.slidingWindow(env.API_RATE_LIMIT_MAX, env.API_RATE_LIMIT_WINDOW as Duration),
172+
pathMatchers: [/^\/api/],
173+
log: {
174+
rejections: true,
175+
requests: false,
176+
},
177+
});
178+
179+
export type RateLimitMiddleware = ReturnType<typeof authorizationRateLimitMiddleware>;

apps/webapp/app/services/events/sqsEventConsumer.ts

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1-
import { Consumer } from "sqs-consumer";
2-
import { PrismaClientOrTransaction, prisma } from "~/db.server";
3-
import { logger, trace } from "../logger.server";
41
import { Message, SQSClient } from "@aws-sdk/client-sqs";
5-
import { authenticateApiKey } from "../apiAuth.server";
62
import { SendEventBodySchema } from "@trigger.dev/core";
3+
import { Consumer } from "sqs-consumer";
74
import { z } from "zod";
85
import { fromZodError } from "zod-validation-error";
9-
import { IngestSendEvent } from "./ingestSendEvent.server";
6+
import { PrismaClientOrTransaction, prisma } from "~/db.server";
107
import { env } from "~/env.server";
11-
import { singleton } from "~/utils/singleton";
8+
import { authenticateApiKey } from "../apiAuth.server";
9+
import { logger, trace } from "../logger.server";
10+
import { IngestSendEvent } from "./ingestSendEvent.server";
1211

1312
type SqsEventConsumerOptions = {
1413
queueUrl: string;
@@ -17,6 +16,7 @@ type SqsEventConsumerOptions = {
1716
region: string;
1817
accessKeyId: string;
1918
secretAccessKey: string;
19+
pollingWaitTimeMs: number;
2020
};
2121

2222
const messageSchema = SendEventBodySchema.extend({
@@ -137,6 +137,7 @@ export function getSharedSqsEventConsumer() {
137137
const consumer = new SqsEventConsumer(undefined, {
138138
queueUrl: env.AWS_SQS_QUEUE_URL,
139139
batchSize: env.AWS_SQS_BATCH_SIZE,
140+
pollingWaitTimeMs: env.AWS_SQS_WAIT_TIME_MS,
140141
region: env.AWS_SQS_REGION,
141142
accessKeyId: env.AWS_SQS_ACCESS_KEY_ID,
142143
secretAccessKey: env.AWS_SQS_SECRET_ACCESS_KEY,

apps/webapp/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
"@trigger.dev/yalt": "workspace:*",
9797
"@types/pg": "8.6.6",
9898
"@uiw/react-codemirror": "^4.19.5",
99+
"@upstash/ratelimit": "^1.0.1",
99100
"@whatwg-node/fetch": "^0.9.14",
100101
"class-variance-authority": "^0.5.2",
101102
"clsx": "^1.2.1",

apps/webapp/server.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { broadcastDevReady, logDevReady } from "@remix-run/server-runtime";
88
import type { Server as IoServer } from "socket.io";
99
import type { Server as EngineServer } from "engine.io";
1010
import { RegistryProxy } from "~/v3/registryProxy.server";
11+
import { RateLimitMiddleware, apiRateLimiter } from "~/services/apiRateLimit.server";
1112

1213
const app = express();
1314

@@ -39,6 +40,7 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") {
3940
const socketIo: { io: IoServer } | undefined = build.entry.module.socketIo;
4041
const wss: WebSocketServer | undefined = build.entry.module.wss;
4142
const registryProxy: RegistryProxy | undefined = build.entry.module.registryProxy;
43+
const apiRateLimiter: RateLimitMiddleware = build.entry.module.apiRateLimiter;
4244

4345
if (registryProxy && process.env.ENABLE_REGISTRY_PROXY === "true") {
4446
console.log(`🐳 Enabling container registry proxy to ${registryProxy.origin}`);
@@ -69,6 +71,8 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") {
6971
});
7072

7173
if (process.env.DASHBOARD_AND_API_DISABLED !== "true") {
74+
app.use(apiRateLimiter);
75+
7276
app.all(
7377
"*",
7478
// @ts-ignore
@@ -84,8 +88,6 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") {
8488
});
8589
}
8690

87-
88-
8991
const server = app.listen(port, () => {
9092
console.log(`✅ server ready: http://localhost:${port} [NODE_ENV: ${MODE}]`);
9193

pnpm-lock.yaml

+31-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)