Skip to content

Commit 2bc70be

Browse files
committed
feat(webapp): gate v2 minting on ClickHouse publication readiness; scope reads on v2-may-exist
#1 publication-readiness interlock: a v2 run minted before task_run_v2 is in the ClickHouse replication publication is permanently absent from ClickHouse (Postgres only decodes changes for transactions that begin after ALTER PUBLICATION ADD TABLE, which the replication leader runs at its own startup, not via a migration), and the run list/metrics/tags are ClickHouse-only. Add a cached, periodically-refreshed status (runTableV2Status.server.ts) and gate minting (triggerTask, triggerFailedTask) through canMintV2Run = org cut over AND table published. Minting fails safe to legacy until the publication carries the table and self-heals once it does, removing the manual pg_publication_tables enable step. #2 native-rollback read scope: cross-table read scoping (idempotency dedup, ApiRetrieveRun hierarchy) keyed on the native master switch alone, so disabling native realtime after v2 runs exist re-scoped reads to legacy and hid existing v2 runs (an idempotency dedup miss means duplicate execution). Scope on v2RunsMayExist (native on OR task_run_v2 has rows) instead; it is monotonic, so the read scope cannot regress once v2 runs exist.
1 parent 60d8662 commit 2bc70be

6 files changed

Lines changed: 189 additions & 28 deletions

File tree

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
import { generatePresignedUrl } from "~/v3/objectStore.server";
2525
import { runStore } from "~/v3/runStore.server";
2626
import { hydrateParentAndRoot, hydrateChildRuns } from "~/v3/runHierarchy.server";
27+
import { v2RunsMayExist } from "~/v3/runTableV2Status.server";
2728
import { env as serverEnv } from "~/env.server";
2829
import { tracer } from "~/v3/tracer.server";
2930
import { startSpanWithEnv } from "~/v3/tracing.server";
@@ -148,16 +149,19 @@ export class ApiRetrieveRunPresenter {
148149
// legacy run's v2 children), which arise in the mixed window, would come
149150
// back null/empty. Resolve parent/root by id (RunStore routes by format)
150151
// and children by a both-table predicate.
151-
// Scope the cross-table reads on whether ANY v2 run can exist in this
152-
// deployment (the native master switch), NOT the org's current flag: a
153-
// run's table is fixed by its id format, and an org that was on v2 then
154-
// flipped off still HAS v2 runs (and v2 children) that runTableV2.server.ts
155-
// documents as staying readable. pgRow is routed here by id format, so it
156-
// can be a v2 run for a now-non-v2 org; scoping to "legacy" off the per-org
157-
// flag would then silently drop its v2 children/parent. Until native is
158-
// enabled no v2 run exists yet (minting requires it), so "legacy" is safe
159-
// and skips the empty task_run_v2 query. The reads also run in parallel.
160-
const tables = serverEnv.REALTIME_BACKEND_NATIVE_ENABLED === "1" ? "both" : "legacy";
152+
// Scope the cross-table reads on whether a v2 run could exist at all, NOT
153+
// the org's current flag: a run's table is fixed by its id format, and an
154+
// org that was on v2 then flipped off still HAS v2 runs (and v2 children)
155+
// that stay readable. pgRow is routed here by id format, so it can be a v2
156+
// run for a now-non-v2 org; scoping to "legacy" would then silently drop
157+
// its v2 children/parent. v2RunsMayExist is monotonic (native on now, OR
158+
// task_run_v2 already has rows), so turning the native master switch off
159+
// does not re-scope to legacy and hide existing v2 runs. While no v2 run
160+
// has ever existed it stays "legacy" and skips the empty task_run_v2 query.
161+
// The reads also run in parallel.
162+
const tables = v2RunsMayExist(serverEnv.REALTIME_BACKEND_NATIVE_ENABLED === "1")
163+
? "both"
164+
: "legacy";
161165
const [{ parentTaskRun, rootTaskRun }, childRuns] = await Promise.all([
162166
hydrateParentAndRoot(
163167
{ parentTaskRunId: pgRow.parentTaskRunId, rootTaskRunId: pgRow.rootTaskRunId },

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
1212
import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server";
1313
import { runStore } from "~/v3/runStore.server";
1414
import { shouldUseV2RunTable } from "~/v3/runTableV2.server";
15+
import { v2RunsMayExist } from "~/v3/runTableV2Status.server";
1516
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
1617

1718
// In-memory per-org mollifier-enabled check, shared with `evaluateGate`
@@ -313,16 +314,16 @@ export class IdempotencyKeyConcern {
313314
nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1",
314315
});
315316

316-
// Scope the idempotency dedup read on whether ANY v2 run can exist in this
317-
// deployment (the native master switch), NOT on whether this org currently
318-
// mints v2. A run's table is fixed by its id format, so an org that was on
319-
// v2 then flipped off still holds v2 runs an idempotency key can match
320-
// (runTableV2.server.ts documents they stay readable); gating the read on
321-
// orgUsesV2 would miss them and let a duplicate through. Until native is
322-
// enabled no v2 run exists yet (minting requires it), so "legacy" is safe and
323-
// skips the empty task_run_v2 query on the trigger hot path; once native is
324-
// on, read both.
325-
const anyV2RunsPossible = env.REALTIME_BACKEND_NATIVE_ENABLED === "1";
317+
// Scope the idempotency dedup read on whether a v2 run could exist at all,
318+
// NOT on whether this org currently mints v2. A run's table is fixed by its
319+
// id format, so an org that was on v2 then flipped off still holds v2 runs an
320+
// idempotency key can match; gating the read on orgUsesV2 would miss them and
321+
// let a duplicate through. v2RunsMayExist is monotonic (native on now, OR
322+
// task_run_v2 already has rows), so turning the native master switch off
323+
// after v2 runs exist does NOT re-scope the read back to legacy and hide
324+
// them. While no v2 run has ever existed it stays "legacy" and skips the
325+
// empty task_run_v2 query on the trigger hot path.
326+
const anyV2RunsPossible = v2RunsMayExist(env.REALTIME_BACKEND_NATIVE_ENABLED === "1");
326327

327328
const existingRun = idempotencyKey
328329
? await runStore.findRun(

apps/webapp/app/runEngine/services/triggerFailedTask.server.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRun
1010
import { DefaultQueueManager } from "../concerns/queues.server";
1111
import type { TriggerTaskRequest } from "../types";
1212
import { runStore } from "~/v3/runStore.server";
13-
import { shouldUseV2RunTable } from "~/v3/runTableV2.server";
13+
import { canMintV2Run } from "~/v3/runTableV2Status.server";
1414
import { env } from "~/env.server";
1515

1616
export type TriggerFailedTaskRequest = {
@@ -76,7 +76,7 @@ export class TriggerFailedTaskService {
7676
// batch, create an ongoing cross-table edge on the failure path. Mirrors the
7777
// mint gate in triggerTask.server.ts.
7878
const failedRunFriendlyId = (
79-
shouldUseV2RunTable(request.environment.organization.featureFlags, {
79+
canMintV2Run(request.environment.organization.featureFlags, {
8080
nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1",
8181
})
8282
? RunId.generateKsuid()
@@ -292,10 +292,9 @@ export class TriggerFailedTaskService {
292292
where: { id: opts.organizationId },
293293
select: { featureFlags: true },
294294
});
295-
useV2RunTable = shouldUseV2RunTable(
296-
(org?.featureFlags as Record<string, unknown>) ?? null,
297-
{ nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1" }
298-
);
295+
useV2RunTable = canMintV2Run((org?.featureFlags as Record<string, unknown>) ?? null, {
296+
nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1",
297+
});
299298
} catch {
300299
// Leave useV2RunTable=false (legacy id).
301300
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { logger } from "~/services/logger.server";
2525
import { parseDelay } from "~/utils/delays";
2626
import { handleMetadataPacket } from "~/utils/packets";
2727
import { startSpan } from "~/v3/tracing.server";
28-
import { shouldUseV2RunTable } from "~/v3/runTableV2.server";
28+
import { canMintV2Run } from "~/v3/runTableV2Status.server";
2929
import type {
3030
TriggerTaskServiceOptions,
3131
TriggerTaskServiceResult,
@@ -159,7 +159,7 @@ export class RunEngineTriggerTaskService {
159159
// trigger hot path. Downstream routing is by id format only.
160160
const runFriendlyId =
161161
options?.runFriendlyId ??
162-
(shouldUseV2RunTable(environment.organization.featureFlags, {
162+
(canMintV2Run(environment.organization.featureFlags, {
163163
nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1",
164164
})
165165
? RunId.generateKsuid()
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import { prisma } from "~/db.server";
2+
import { env } from "~/env.server";
3+
import { logger } from "~/services/logger.server";
4+
import { singleton } from "~/utils/singleton";
5+
import { shouldUseV2RunTable, type ShouldUseV2RunTableOptions } from "~/v3/runTableV2.server";
6+
7+
/**
8+
* Cached, periodically-refreshed facts about the `task_run_v2` table, read OFF
9+
* the trigger hot path (no per-request DB query) to gate v2 minting and
10+
* cross-table read scoping.
11+
*/
12+
type RunTableV2Status = {
13+
/**
14+
* Is `task_run_v2` in the ClickHouse logical-replication publication?
15+
*
16+
* Postgres only decodes a table's changes for transactions that BEGIN after
17+
* the decoder sees `ALTER PUBLICATION ... ADD TABLE`, and that ADD TABLE is run
18+
* lazily by the replication leader on its own startup, NOT by a migration. So a
19+
* v2 run minted before the table is published is permanently absent from
20+
* ClickHouse with no backfill, and the run list / metrics / tags / bulk actions
21+
* are ClickHouse-only. Mint v2 ONLY when this is true; otherwise mint legacy
22+
* (fail-safe), self-healing once the leader publishes the table.
23+
*/
24+
published: boolean;
25+
/**
26+
* Has any v2 run ever existed (monotonic in practice)? Cross-table READ scoping
27+
* uses this (OR the native master switch) rather than the master switch alone,
28+
* so disabling native realtime cannot re-scope reads back to legacy and hide
29+
* already-minted v2 runs from idempotency dedup and hierarchy reads.
30+
*/
31+
hasRows: boolean;
32+
};
33+
34+
const REFRESH_INTERVAL_MS = 30_000;
35+
36+
const status = singleton("runTableV2Status", initialize);
37+
38+
function initialize(): RunTableV2Status {
39+
const state: RunTableV2Status = { published: false, hasRows: false };
40+
41+
// The publication only exists when runs replication is configured. Without it
42+
// no v2 run can be captured by ClickHouse, so leave published=false: minting
43+
// stays on legacy regardless of org flags.
44+
if (!env.RUN_REPLICATION_CLICKHOUSE_URL) {
45+
return state;
46+
}
47+
48+
const refresh = async () => {
49+
try {
50+
const published = await prisma.$queryRaw<Array<{ present: boolean }>>`
51+
SELECT EXISTS (
52+
SELECT 1 FROM pg_publication_tables
53+
WHERE pubname = ${env.RUN_REPLICATION_PUBLICATION_NAME}
54+
AND tablename = 'task_run_v2'
55+
) AS present`;
56+
state.published = published[0]?.present ?? false;
57+
58+
// hasRows is monotonic; once true, stop probing.
59+
if (!state.hasRows) {
60+
const hasRows = await prisma.$queryRaw<Array<{ present: boolean }>>`
61+
SELECT EXISTS (SELECT 1 FROM task_run_v2 LIMIT 1) AS present`;
62+
state.hasRows = hasRows[0]?.present ?? false;
63+
}
64+
} catch (error) {
65+
logger.warn("runTableV2Status refresh failed; keeping last-known status", {
66+
error: error instanceof Error ? error.message : String(error),
67+
});
68+
}
69+
};
70+
71+
void refresh();
72+
const timer = setInterval(() => void refresh(), REFRESH_INTERVAL_MS);
73+
timer.unref?.();
74+
75+
return state;
76+
}
77+
78+
/** `task_run_v2` is in the ClickHouse replication publication (cached, off the hot path). */
79+
export function isV2RunTablePublished(): boolean {
80+
return status.published;
81+
}
82+
83+
/**
84+
* Whether a v2 run could be relevant to a cross-table READ: native realtime is on
85+
* (v2 is being minted now) OR `task_run_v2` already holds rows. Scope cross-table
86+
* reads on this, not the native master switch alone, so turning native off cannot
87+
* hide already-minted v2 runs.
88+
*/
89+
export function v2RunsMayExist(nativeRealtimeEnabled: boolean): boolean {
90+
return nativeRealtimeEnabled || status.hasRows;
91+
}
92+
93+
/**
94+
* Mint gate: mint a v2 (KSUID) run only when the org is cut over to v2 AND
95+
* `task_run_v2` is in the ClickHouse publication, so a v2 run can never be
96+
* silently lost from ClickHouse by being minted before the replication leader
97+
* publishes the table. Fails safe to legacy until then; self-heals once published.
98+
*/
99+
export function canMintV2Run(
100+
orgFeatureFlags: unknown,
101+
options: ShouldUseV2RunTableOptions
102+
): boolean {
103+
return shouldUseV2RunTable(orgFeatureFlags, options) && isV2RunTablePublished();
104+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { describe, expect, it } from "vitest";
2+
import { canMintV2Run, v2RunsMayExist } from "~/v3/runTableV2Status.server";
3+
4+
// The module caches its status in a globalThis singleton ("runTableV2Status").
5+
// In the unit-test env runs replication is unconfigured, so it initializes to
6+
// { published:false, hasRows:false } with no background poller. Mutate that
7+
// cached object to exercise the gates deterministically.
8+
function setStatus(published: boolean, hasRows: boolean) {
9+
const singletons = (globalThis as any).__trigger_singletons;
10+
// Force module init (the singleton is created on first getter call/import).
11+
v2RunsMayExist(false);
12+
singletons.runTableV2Status.published = published;
13+
singletons.runTableV2Status.hasRows = hasRows;
14+
}
15+
16+
const CUTOVER_FLAGS = { realtimeBackend: "native", runTableV2: true };
17+
18+
describe("canMintV2Run (mint gate: org cut over AND task_run_v2 published)", () => {
19+
it("mints v2 only when the org is cut over AND the table is published", () => {
20+
setStatus(true, true);
21+
expect(canMintV2Run(CUTOVER_FLAGS, { nativeRealtimeEnabled: true })).toBe(true);
22+
});
23+
24+
it("fails safe to legacy when the org is cut over but the table is NOT published", () => {
25+
setStatus(false, true);
26+
expect(canMintV2Run(CUTOVER_FLAGS, { nativeRealtimeEnabled: true })).toBe(false);
27+
});
28+
29+
it("stays legacy when the org is not cut over, even if published", () => {
30+
setStatus(true, true);
31+
expect(
32+
canMintV2Run({ realtimeBackend: "electric", runTableV2: false }, { nativeRealtimeEnabled: true })
33+
).toBe(false);
34+
expect(canMintV2Run(CUTOVER_FLAGS, { nativeRealtimeEnabled: false })).toBe(false);
35+
});
36+
});
37+
38+
describe("v2RunsMayExist (read scope: native on OR table has rows)", () => {
39+
it("is true when native realtime is on (v2 being minted now)", () => {
40+
setStatus(false, false);
41+
expect(v2RunsMayExist(true)).toBe(true);
42+
});
43+
44+
it("is true when task_run_v2 already has rows even with native OFF (rollback safety)", () => {
45+
setStatus(false, true);
46+
expect(v2RunsMayExist(false)).toBe(true);
47+
});
48+
49+
it("is false only when native is off AND no v2 run has ever existed", () => {
50+
setStatus(false, false);
51+
expect(v2RunsMayExist(false)).toBe(false);
52+
});
53+
});

0 commit comments

Comments
 (0)