-
-
Notifications
You must be signed in to change notification settings - Fork 936
fix(fair-queue): prevent unbounded cooloff states growth #2818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,6 +89,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| private cooloffEnabled: boolean; | ||
| private cooloffThreshold: number; | ||
| private cooloffPeriodMs: number; | ||
| private maxCooloffStatesSize: number; | ||
| private queueCooloffStates = new Map<string, QueueCooloffState>(); | ||
|
|
||
| // Global rate limiter | ||
|
|
@@ -142,6 +143,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| this.cooloffEnabled = options.cooloff?.enabled ?? true; | ||
| this.cooloffThreshold = options.cooloff?.threshold ?? 10; | ||
| this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000; | ||
| this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000; | ||
|
|
||
| // Global rate limiter | ||
| this.globalRateLimiter = options.globalRateLimiter; | ||
|
|
@@ -878,8 +880,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| } | ||
| this.#resetCooloff(queueId); | ||
| } else { | ||
| this.batchedSpanManager.incrementStat(loopId, "claim_failures"); | ||
| this.#incrementCooloff(queueId); | ||
| // Don't increment cooloff here - the queue was either: | ||
| // 1. Empty (removed from master, cache cleaned up) | ||
| // 2. Concurrency blocked (message released back to queue) | ||
| // Neither case warrants cooloff as they're not failures | ||
| this.batchedSpanManager.incrementStat(loopId, "claim_skipped"); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -904,6 +909,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| if (this.concurrencyManager) { | ||
| const check = await this.concurrencyManager.canProcess(descriptor); | ||
| if (!check.allowed) { | ||
| // Queue at max concurrency, back off to avoid repeated attempts | ||
| this.#incrementCooloff(queueId); | ||
| return false; | ||
| } | ||
| } | ||
|
|
@@ -953,6 +960,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| queueItemsKey, | ||
| masterQueueKey | ||
| ); | ||
| // Concurrency reservation failed, back off to avoid repeated attempts | ||
| this.#incrementCooloff(queueId); | ||
| return false; | ||
| } | ||
| } | ||
|
|
@@ -1214,8 +1223,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| this.#resetCooloff(queueId); | ||
| slotsUsed++; | ||
| } else { | ||
| this.batchedSpanManager.incrementStat(loopId, "process_failures"); | ||
| this.#incrementCooloff(queueId); | ||
| // Don't increment cooloff here - the queue was either: | ||
| // 1. Empty (removed from master, cache cleaned up) | ||
| // 2. Concurrency blocked (message released back to queue) | ||
| // Neither case warrants cooloff as they're not failures | ||
| this.batchedSpanManager.incrementStat(loopId, "process_skipped"); | ||
| break; // Queue empty or blocked, try next queue | ||
| } | ||
| } | ||
|
|
@@ -1245,6 +1257,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| if (this.concurrencyManager) { | ||
| const check = await this.concurrencyManager.canProcess(descriptor); | ||
| if (!check.allowed) { | ||
| // Queue at max concurrency, back off to avoid repeated attempts | ||
| this.#incrementCooloff(queueId); | ||
| return false; | ||
| } | ||
| } | ||
|
|
@@ -1294,6 +1308,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| queueItemsKey, | ||
| masterQueueKey | ||
| ); | ||
| // Concurrency reservation failed, back off to avoid repeated attempts | ||
| this.#incrementCooloff(queueId); | ||
| return false; | ||
| } | ||
| } | ||
|
|
@@ -1717,6 +1733,15 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> { | |
| } | ||
|
|
||
| #incrementCooloff(queueId: string): void { | ||
| // Safety check: if the cache is too large, just clear it | ||
| if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) { | ||
| this.logger.warn("Cooloff states cache hit size cap, clearing all entries", { | ||
| size: this.queueCooloffStates.size, | ||
| cap: this.maxCooloffStatesSize, | ||
| }); | ||
| this.queueCooloffStates.clear(); | ||
| } | ||
|
Comment on lines
1735
to
+1743
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Disruptive cache clearing strategy. Clearing the entire cooloff cache when the size cap is reached has significant drawbacks:
Consider these alternatives:
🔎 Suggested LRU-based approachTrack access times and evict oldest entries: + private queueCooloffStatesAccessOrder: string[] = [];
+
#incrementCooloff(queueId: string): void {
- // Safety check: if the cache is too large, just clear it
- if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
- this.logger.warn("Cooloff states cache hit size cap, clearing all entries", {
- size: this.queueCooloffStates.size,
- cap: this.maxCooloffStatesSize,
- });
- this.queueCooloffStates.clear();
- }
+ // Safety check: if at capacity, remove oldest entry (LRU)
+ if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
+ const oldestQueue = this.queueCooloffStatesAccessOrder.shift();
+ if (oldestQueue) {
+ this.queueCooloffStates.delete(oldestQueue);
+ this.logger.debug("Evicted oldest cooloff state", {
+ queueId: oldestQueue,
+ remainingSize: this.queueCooloffStates.size,
+ });
+ }
+ }
const state = this.queueCooloffStates.get(queueId) ?? {
tag: "normal" as const,
consecutiveFailures: 0,
};
+
+ // Update access order
+ const existingIndex = this.queueCooloffStatesAccessOrder.indexOf(queueId);
+ if (existingIndex !== -1) {
+ this.queueCooloffStatesAccessOrder.splice(existingIndex, 1);
+ }
+ this.queueCooloffStatesAccessOrder.push(queueId);
// ... rest of function
}
🤖 Prompt for AI Agents |
||
|
|
||
| const state = this.queueCooloffStates.get(queueId) ?? { | ||
| tag: "normal" as const, | ||
| consecutiveFailures: 0, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -728,6 +728,72 @@ describe("FairQueue", () => { | |
| await queue.close(); | ||
| } | ||
| ); | ||
|
|
||
| redisTest( | ||
| "should clear cooloff states when size cap is exceeded", | ||
| { timeout: 15000 }, | ||
| async ({ redisOptions }) => { | ||
| keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); | ||
|
|
||
| const scheduler = new DRRScheduler({ | ||
| redis: redisOptions, | ||
| keys, | ||
| quantum: 10, | ||
| maxDeficit: 100, | ||
| }); | ||
|
|
||
| const queue = new FairQueue({ | ||
| redis: redisOptions, | ||
| keys, | ||
| scheduler, | ||
| payloadSchema: TestPayloadSchema, | ||
| shardCount: 1, | ||
| consumerCount: 1, | ||
| consumerIntervalMs: 20, | ||
| visibilityTimeoutMs: 5000, | ||
| cooloff: { | ||
| enabled: true, | ||
| threshold: 1, // Enter cooloff after 1 failure | ||
| periodMs: 100, // Short cooloff for testing | ||
| maxStatesSize: 5, // Very small cap for testing | ||
| }, | ||
| startConsumers: false, | ||
| }); | ||
|
|
||
| // Enqueue messages to multiple queues | ||
| for (let i = 0; i < 10; i++) { | ||
| await queue.enqueue({ | ||
| queueId: `tenant:t${i}:queue:q1`, | ||
| tenantId: `t${i}`, | ||
| payload: { value: `msg-${i}` }, | ||
| }); | ||
| } | ||
|
|
||
| const processed: string[] = []; | ||
|
|
||
| // Handler that always fails to trigger cooloff | ||
| queue.onMessage(async (ctx) => { | ||
| processed.push(ctx.message.payload.value); | ||
| await ctx.fail(new Error("Forced failure")); | ||
| }); | ||
|
|
||
| queue.start(); | ||
|
|
||
| // Wait for some messages to be processed and fail | ||
| await vi.waitFor( | ||
| () => { | ||
| expect(processed.length).toBeGreaterThanOrEqual(5); | ||
| }, | ||
| { timeout: 10000 } | ||
| ); | ||
|
|
||
| // The cooloff states size should be capped (test that it doesn't grow unbounded) | ||
| const cacheSizes = queue.getCacheSizes(); | ||
| expect(cacheSizes.cooloffStatesSize).toBeLessThanOrEqual(10); // Some buffer for race conditions | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test passes vacuously due to disabled cooloffThe test comment says "Handler that always fails to trigger cooloff" but |
||
|
|
||
| await queue.close(); | ||
| } | ||
| ); | ||
| }); | ||
|
|
||
| describe("inspection methods", () => { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.