Skip to content

Commit 5667461

Browse files
authored
fix(run-engine): decrement totalWeight in fair-queue weighted env shuffle (#4019)
## Summary Fixes the fair-queue weighted environment shuffle, which biased environment ordering whenever fair-queue biases are enabled (the default configuration). ## Root cause `#weightedShuffle` in `fairQueueSelectionStrategy.ts` computed the total weight once and drew its random pivot against that full-set total on every iteration, but never decremented the total as items were removed from the working set. After the first pick, the pivot frequently overshot the sum of the remaining items, so the inner selection loop ran off the end and clamped to the last remaining element. The result systematically over-selected whichever environment sat at the tail of the set. The first slot stayed fair (the full total is correct on the first draw), but later positions were ordered by environment iteration order rather than by the intended concurrency-limit and available-capacity weighting. For four equal-weight environments, the final position landed on one env ~9% of the time and another ~42%, instead of ~25% each. The two sibling selection paths (`#weightedRandomQueueOrder` and `#selectTopEnvs`) already decrement the total before splicing; this brings the env shuffle in line with them. ## Fix ```ts result.push(items[index].envId); totalWeight -= items[index].weight; items.splice(index, 1); ``` Adds a regression test that runs the weighted shuffle over equal-weight envs with biases enabled and asserts each env lands in every position roughly uniformly. It fails on the old code (tail position ~37%) and passes with the fix. Reported in #4001.
1 parent bf4c6e9 commit 5667461

2 files changed

Lines changed: 89 additions & 2 deletions

File tree

internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
209209
}
210210

211211
#weightedShuffle(weightedItems: WeightedEnv[]): string[] {
212-
const totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0);
212+
let totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0);
213213
const result: string[] = [];
214214
const items = [...weightedItems];
215215

@@ -224,8 +224,11 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
224224
}
225225
index = Math.max(0, index - 1);
226226

227-
// Add selected item to result and remove from items
227+
// Add selected item to result and remove from items. Decrement totalWeight
228+
// so the next draw is scaled to the remaining items; otherwise random
229+
// routinely overshoots the shrinking set and the tail item is over-picked.
228230
result.push(items[index].envId);
231+
totalWeight -= items[index].weight;
229232
items.splice(index, 1);
230233
}
231234

internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,90 @@ describe("FairDequeuingStrategy", () => {
12031203
expect(queuesByEnv["env-1"]).toBeDefined();
12041204
expect(queuesByEnv["env-1"].length).toBe(2);
12051205
});
1206+
1207+
redisTest(
1208+
"weighted env shuffle stays uniform across every position for equal-weight envs",
1209+
async ({ redisOptions: redis }) => {
1210+
const keyProducer = new RunQueueFullKeyProducer();
1211+
// Biases must be non-zero so env ordering goes through the weighted shuffle
1212+
// path rather than the plain shuffle short-circuit.
1213+
const strategy = new FairQueueSelectionStrategy({
1214+
redis,
1215+
keys: keyProducer,
1216+
defaultEnvConcurrencyLimit: 10,
1217+
parentQueueLimit: 100,
1218+
seed: "weighted-shuffle-seed",
1219+
biases: {
1220+
concurrencyLimitBias: 0.75,
1221+
availableCapacityBias: 0.3,
1222+
queueAgeRandomization: 0,
1223+
},
1224+
});
1225+
1226+
const now = Date.now();
1227+
1228+
// Four envs, each its own org, one queue each. Identical concurrency limit
1229+
// and current usage means identical weights, so a correct weighted shuffle
1230+
// should land each env in each position equally often. Insertion order is
1231+
// alphabetical by org, which is the order the tail-overshoot bug skews by.
1232+
const envIds = ["env-1", "env-2", "env-3", "env-4"];
1233+
for (let i = 0; i < envIds.length; i++) {
1234+
const orgId = `org-${i + 1}`;
1235+
const projectId = `proj-${i + 1}`;
1236+
const envId = envIds[i];
1237+
1238+
await setupQueue({
1239+
redis,
1240+
keyProducer,
1241+
parentQueue: "parent-queue",
1242+
score: now - 1000,
1243+
queueId: `queue-${envId}`,
1244+
orgId,
1245+
projectId,
1246+
envId,
1247+
});
1248+
1249+
await setupConcurrency({
1250+
redis,
1251+
keyProducer,
1252+
env: { envId, projectId, orgId, currentConcurrency: 5, limit: 10 },
1253+
});
1254+
}
1255+
1256+
const iterations = 2000;
1257+
// positionCounts[position][envId] = times envId landed in that position
1258+
const positionCounts: Array<Record<string, number>> = envIds.map(() =>
1259+
Object.fromEntries(envIds.map((envId) => [envId, 0]))
1260+
);
1261+
1262+
for (let i = 0; i < iterations; i++) {
1263+
const envResult = await strategy.distributeFairQueuesFromParentQueue(
1264+
"parent-queue",
1265+
`consumer-${i % 3}`
1266+
);
1267+
const result = flattenResults(envResult);
1268+
expect(result).toHaveLength(envIds.length);
1269+
1270+
result.forEach((queueId, position) => {
1271+
const envId = keyProducer.envIdFromQueue(queueId);
1272+
positionCounts[position][envId]++;
1273+
});
1274+
}
1275+
1276+
// For equal-weight envs the share at every position should be ~1/N. The
1277+
// tail-overshoot bug leaves position 0 fair but skews later positions hard
1278+
// (one env far below, the tail env far above). Assert each share stays
1279+
// within 40% of uniform at every position, which the bug violates.
1280+
const expectedShare = 100 / envIds.length;
1281+
for (let position = 0; position < envIds.length; position++) {
1282+
for (const envId of envIds) {
1283+
const share = (positionCounts[position][envId] / iterations) * 100;
1284+
expect(share).toBeGreaterThan(expectedShare * 0.6);
1285+
expect(share).toBeLessThan(expectedShare * 1.4);
1286+
}
1287+
}
1288+
}
1289+
);
12061290
});
12071291

12081292
// Helper function to flatten results for counting

0 commit comments

Comments
 (0)