Skip to content

Commit 8957a01

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): recover from ClickHouse JSON parse failures in runs replication
On a `Cannot parse JSON object` rejection, sanitize lone UTF-16 surrogates across the batch via the existing `sanitizeRows` helper and retry once. Drop the batch loudly if the sanitizer found nothing or the retry also fails, so the surrounding `#insertWithRetry` layer doesn't spin on a deterministic failure. Non-parse errors propagate unchanged. Mirrors the pattern shipped for ClickhouseEventRepository in #3659 — same root cause (lone surrogates in user-provided JSON), same recovery shape, same shared helpers. Fixes the customer-facing symptom from TRI-9755: one row's bad output JSON used to kill the COMPLETED updates for its 50+ batch-mates, stranding them in EXECUTING in ClickHouse forever and inflating "Running" counts on the Tasks page. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 1015876 commit 8957a01

2 files changed

Lines changed: 183 additions & 34 deletions

File tree

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Recover from ClickHouse `JSONEachRow` parse failures in the runs
7+
replication path. `RunsReplicationService` now wraps its task-run and
8+
payload inserts with the same reactive-sanitisation pattern used by
9+
`ClickhouseEventRepository` since #3659: on `Cannot parse JSON object`,
10+
sanitize lone UTF-16 surrogates across the batch (via the shared
11+
`sanitizeRows` helper) and retry once. If the sanitiser found nothing
12+
or the retry also fails, the batch is dropped, `permanentlyDroppedBatches`
13+
increments, and a loud error log is emitted — preventing the surrounding
14+
`#insertWithRetry` layer from spinning on the same deterministic
15+
failure. Non-parse errors propagate unchanged.
16+
17+
Stops the bleeding behind the customer-visible "Tasks page shows a huge
18+
Running count" symptom: one row with bad output JSON used to take down
19+
the COMPLETED updates for its 50+ batch-mates, leaving every one of
20+
them stranded in `EXECUTING` in ClickHouse forever (Postgres unaffected).

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 163 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ import EventEmitter from "node:events";
3838
import pLimit from "p-limit";
3939
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4040
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
41+
import {
42+
isClickHouseJsonParseError,
43+
parseRowNumberFromError,
44+
sanitizeRows,
45+
} from "~/v3/eventRepository/sanitizeRowsOnParseError.server";
4146

4247
interface TransactionEvent<T = any> {
4348
tag: "insert" | "update" | "delete";
@@ -129,6 +134,15 @@ export class RunsReplicationService {
129134
private _disablePayloadInsert: boolean;
130135
private _disableErrorFingerprinting: boolean;
131136

137+
/**
138+
* Counts batches that hit a ClickHouse `Cannot parse JSON object` failure
139+
* that survived one sanitize-retry. These batches are dropped on the floor
140+
* (returning success-ish to the caller so the retry layer doesn't spin on
141+
* the same deterministic failure), and we track the drop count for
142+
* observability. Counter only — does not gate behaviour.
143+
*/
144+
private _permanentlyDroppedBatches = 0;
145+
132146
// Metrics
133147
private _replicationLagHistogram: Histogram;
134148
private _batchesFlushedCounter: Counter;
@@ -283,6 +297,11 @@ export class RunsReplicationService {
283297
this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000;
284298
}
285299

300+
/** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
301+
get permanentlyDroppedBatches() {
302+
return this._permanentlyDroppedBatches;
303+
}
304+
286305
public async shutdown() {
287306
if (this._isShuttingDown) return;
288307

@@ -837,24 +856,29 @@ export class RunsReplicationService {
837856
return;
838857
}
839858
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
840-
const [insertError, insertResult] =
841-
await clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
842-
params: {
843-
clickhouse_settings: this.#getClickhouseInsertSettings(),
844-
},
845-
});
846-
847-
if (insertError) {
848-
this.logger.error("Error inserting task run inserts attempt", {
849-
error: insertError,
850-
attempt,
851-
});
852-
853-
recordSpanError(span, insertError);
854-
throw insertError;
855-
}
856-
857-
return insertResult;
859+
const doInsert = async () => {
860+
const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays(
861+
taskRunInserts,
862+
{ params: { clickhouse_settings: this.#getClickhouseInsertSettings() } }
863+
);
864+
if (insertError) {
865+
this.logger.error("Error inserting task run inserts attempt", {
866+
error: insertError,
867+
attempt,
868+
});
869+
recordSpanError(span, insertError);
870+
throw insertError;
871+
}
872+
return insertResult;
873+
};
874+
875+
const outcome = await this.#insertWithJsonParseRecovery(
876+
taskRunInserts,
877+
doInsert,
878+
"task_runs_v2",
879+
attempt
880+
);
881+
return outcome.kind === "dropped" ? undefined : outcome.insertResult;
858882
});
859883
}
860884

@@ -867,25 +891,130 @@ export class RunsReplicationService {
867891
return;
868892
}
869893
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
870-
const [insertError, insertResult] =
871-
await clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
872-
params: {
873-
clickhouse_settings: this.#getClickhouseInsertSettings(),
874-
},
875-
});
876-
877-
if (insertError) {
878-
this.logger.error("Error inserting payload inserts attempt", {
879-
error: insertError,
880-
attempt,
881-
});
894+
const doInsert = async () => {
895+
const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays(
896+
payloadInserts,
897+
{ params: { clickhouse_settings: this.#getClickhouseInsertSettings() } }
898+
);
899+
if (insertError) {
900+
this.logger.error("Error inserting payload inserts attempt", {
901+
error: insertError,
902+
attempt,
903+
});
904+
recordSpanError(span, insertError);
905+
throw insertError;
906+
}
907+
return insertResult;
908+
};
909+
910+
const outcome = await this.#insertWithJsonParseRecovery(
911+
payloadInserts,
912+
doInsert,
913+
"raw_task_runs_payload_v1",
914+
attempt
915+
);
916+
return outcome.kind === "dropped" ? undefined : outcome.insertResult;
917+
});
918+
}
882919

883-
recordSpanError(span, insertError);
884-
throw insertError;
920+
/**
921+
* Wraps a ClickHouse insert with reactive UTF-16 sanitization for
922+
* `Cannot parse JSON object` rejections. Mirrors the pattern from
923+
* `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced
924+
* in #3659 — same root cause (lone UTF-16 surrogates in user-provided
925+
* JSON), same recovery shape:
926+
*
927+
* 1. Try the insert. Healthy batches pay zero scan cost.
928+
* 2. On parse error, walk the whole batch via `sanitizeRows` and
929+
* replace any lone-surrogate string with `"[invalid-utf16]"`.
930+
* 3. Retry once. If the sanitizer found nothing or the retry also
931+
* fails with the same error class, drop the batch loudly and
932+
* return — do NOT rethrow, otherwise the surrounding
933+
* `#insertWithRetry` layer would spin three more times on the
934+
* same deterministic failure.
935+
* 4. Non-parse errors propagate unchanged so the existing
936+
* transient-retry path still handles them.
937+
*
938+
* The whole-batch scan (rather than slicing on the `at row N` hint) is
939+
* deliberate: `at row N` semantics under `input_format_parallel_parsing`
940+
* aren't stable enough to safely skip rows. The cost is bounded because
941+
* `detectBadJsonStrings` exits in O(1) for clean strings.
942+
*/
943+
async #insertWithJsonParseRecovery<T extends object>(
944+
rows: T[],
945+
doInsert: () => Promise<unknown>,
946+
contextLabel: string,
947+
attempt: number
948+
): Promise<
949+
| { kind: "inserted"; insertResult: unknown }
950+
| { kind: "sanitized"; insertResult: unknown }
951+
| { kind: "dropped" }
952+
> {
953+
try {
954+
return { kind: "inserted", insertResult: await doInsert() };
955+
} catch (firstError) {
956+
if (!isClickHouseJsonParseError(firstError)) throw firstError;
957+
958+
const firstMessage =
959+
typeof firstError === "object" && firstError !== null && "message" in firstError
960+
? String((firstError as { message?: unknown }).message ?? "")
961+
: String(firstError);
962+
963+
const rowHint = parseRowNumberFromError(firstMessage);
964+
const { rowsTouched, fieldsSanitized } = sanitizeRows(rows);
965+
966+
if (fieldsSanitized === 0) {
967+
this._permanentlyDroppedBatches += 1;
968+
this.logger.error(
969+
"Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix",
970+
{
971+
contextLabel,
972+
attempt,
973+
batchSize: rows.length,
974+
clickhouseRowHint: rowHint,
975+
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
976+
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
977+
clickhouseError: firstMessage.split("\n")[0],
978+
}
979+
);
980+
return { kind: "dropped" };
885981
}
886982

887-
return insertResult;
888-
});
983+
this.logger.warn("Sanitizing batch after ClickHouse JSON parse error", {
984+
contextLabel,
985+
attempt,
986+
batchSize: rows.length,
987+
clickhouseRowHint: rowHint,
988+
rowsTouched,
989+
fieldsSanitized,
990+
clickhouseError: firstMessage.split("\n")[0],
991+
});
992+
993+
try {
994+
return { kind: "sanitized", insertResult: await doInsert() };
995+
} catch (retryError) {
996+
if (!isClickHouseJsonParseError(retryError)) throw retryError;
997+
998+
this._permanentlyDroppedBatches += 1;
999+
const retryMessage =
1000+
typeof retryError === "object" && retryError !== null && "message" in retryError
1001+
? String((retryError as { message?: unknown }).message ?? "")
1002+
: String(retryError);
1003+
this.logger.error(
1004+
"Dropped batch after sanitize-retry still hit ClickHouse JSON parse error",
1005+
{
1006+
contextLabel,
1007+
attempt,
1008+
batchSize: rows.length,
1009+
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
1010+
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
1011+
firstError: firstMessage.split("\n")[0],
1012+
retryError: retryMessage.split("\n")[0],
1013+
}
1014+
);
1015+
return { kind: "dropped" };
1016+
}
1017+
}
8891018
}
8901019

8911020
async #prepareRunInserts(

0 commit comments

Comments
 (0)