Skip to content

fix: clear CommitState buffer on new commit to prevent duplicate rows#15651

Open
t3hw wants to merge 2 commits intoapache:mainfrom
t3hw:recovery-fix
Open

fix: clear CommitState buffer on new commit to prevent duplicate rows#15651
t3hw wants to merge 2 commits intoapache:mainfrom
t3hw:recovery-fix

Conversation

@t3hw
Copy link

@t3hw t3hw commented Mar 16, 2026

Fix: Per-commitId group separation prevents duplicate rows and data loss after failed commits

Background

The previous version of this fix cleared
commitBuffer unconditionally in startNewCommit(). While simple, @koodin9
identified that this causes data loss in the partial commit (timeout) scenario: workers that finish after the timeout have already committed source offsets atomically via sendOffsetsToTransaction(), so their DataWritten events cannot be re-produced.
Clearing the buffer discards them, orphaning data files on storage with no way to recover.

This prompted a redesign. Rather than discarding stale events, we commit them separately —
each commitId group in its own RowDelta with a distinct Iceberg sequence number.

Problem

When Coordinator.doCommit() fails, clearResponses() is never called. Stale
DataWritten events persist in commitBuffer. On the next successful commit,
tableCommitMap() returns all events — stale and current — in a single RowDelta. All
files receive the same sequence number, making equality deletes powerless (data.seq < delete.seq required, strictly less than). Duplicate rows appear.

This is workload-agnostic — it affects CDC, upsert, and append-only modes.

Fix

CommitState.java

startNewCommit() no longer clears the buffer. Instead, tableCommitGroups() groups
events by commitId (preserving insertion order via LinkedHashMap) and returns an ordered
list of CommitGroup objects per table. Stale groups sort before current groups because
they were consumed from the control topic earlier.

Additional changes:

  • removeEnvelopes(Collection) — selectively removes committed groups' envelopes
  • Adaptive retry tracking — recordGroupFailure(), isGroupBlocking(), per-commitId
    retry counter
  • TTL eviction — stale events older than stale-ttl-ms (default 1 hour) are evicted with
    an ERROR log containing orphaned file paths for manual recovery
  • CommitStateMXBean — JMX interface exposing EvictedStaleEventCount,
    StaleGroupCount, BufferSize

Coordinator.java

doCommit() rewritten to iterate commitId groups per table sequentially (oldest first).
Each group is committed via the existing commitToTable() method, which now accepts a
commitId parameter.

Error handling follows a three-stage escalation:

  1. Blocking retries (configurable, default 3): stale group blocks current data for the
    same table to preserve sequence number ordering.
  2. Failure policy (configurable):
    • fail (default): throws ConnectException, moving the connector to FAILED state.
    • non-blocking: proceeds with current group, accepting ordering inversion risk.
  3. TTL eviction: stale events past TTL are evicted from the buffer.

Stale groups write only their own envelope offsets to the snapshot (not the global consumer
position), preventing an offset poisoning bug where subsequent groups' envelopes would be
incorrectly filtered out.

When the buffer is not fully drained after a commit, consumer offsets are advanced to the
minimum uncommitted envelope offset per partition, bounding re-consumption on restart.

Channel.java

Added commitConsumerOffsetsTo(Map) for partial consumer offset advancement.

IcebergSinkConfig.java

Three new configuration properties:

Property Default Description
iceberg.control.commit.stale-ttl-ms 3,600,000 TTL for stale events (ms). 0 disables.
iceberg.control.commit.stale-max-blocking-retries 3 Blocking retries before failure policy.
iceberg.control.commit.stale-failure-policy fail fail or non-blocking.

Test coverage

CommitState unit tests (TestCommitState.java — 8 tests):

  • testTableCommitGroupsSingleCommitId — single group, correct envelope count
  • testTableCommitGroupsMultipleCommitIds — stale + current groups, ordered correctly
  • testTableCommitGroupsPreservesInsertionOrder — interleaved commitIds, first-seen order
  • testStartNewCommitDoesNotClearBuffer — stale events survive across cycles
  • testRemoveEnvelopesSelectiveRemoval — only specified envelopes removed
  • testIsBufferEmpty — empty after clear, not empty after add
  • testIsCommitReady — partition count filtering by commitId (existing)
  • testGetValidThroughTs — timestamp calculation (existing)

Coordinator integration tests (TestCoordinator.java — 3 new):

  • testMultiGroupCommitProducesSequentialSnapshots — stale + current groups produce 2
    snapshots with strictly increasing sequence numbers and distinct commitIds
  • testStaleGroupFailureSkipsCurrentGroup — bad stale data blocks current group, 0
    snapshots, buffer retained, no CommitComplete sent
  • testStaleSucceedsCurrentFails — stale group commits (1 snapshot), current group's
    envelopes retained, no CommitComplete sent

Recovery scenario tests (TestRecoveryScenario.java — 1 new):

  • testSequentialCommitsProduceSeparateSnapshots — sequential commits produce 2 snapshots
    with increasing sequence numbers (prerequisite for equality deletes to work across batches;
    full CDC proof is in TestCDCDeltaWriterTableLevel)

Verification

All tests pass:

./gradlew :iceberg-kafka-connect:iceberg-kafka-connect:test
./gradlew :iceberg-kafka-connect:iceberg-kafka-connect:spotlessApply
cd site && make lint

Red-green: removing the per-group offset computation in doCommit() (reverting to the
global ctlOffsets for all groups) causes testMultiGroupCommitProducesSequentialSnapshots
to fail — the current group's envelopes are filtered out by the stale group's snapshot
offsets. Restoring per-group offsets fixes the test.

Stale DataWritten events accumulate in commitBuffer when
Coordinator.doCommit() throws, causing duplicate rows when committed
alongside new events in a single RowDelta. Clear the buffer at the
top of startNewCommit() so each cycle starts clean.
@t3hw t3hw marked this pull request as ready for review March 16, 2026 12:19
@t3hw
Copy link
Author

t3hw commented Mar 16, 2026

@bryanck hey, i was researching a duplication issue and it led me to this proposed solution.
can you assist in reviewing this change?

@koodin9
Copy link

koodin9 commented Mar 20, 2026

Hi, thanks for looking into this issue. I've been working on the same problem in our fork (based on https://github.com/databricks/iceberg-kafka-connect) and wanted to share some observations about the partial commit (timeout) scenario that this fix may not fully address.

When commitTimeoutMs is exceeded, the Coordinator performs a partial commit — it commits only the DataWritten events received so far and moves on. Workers that were still processing at the time will send their
DataWritten(A) + DataComplete(A) later to the control topic.

Here's the timeline:

  • Cycle A:

    • startNewCommit(A)→ commitBuffer.clear() + new UUID
    • consumeAvailable() → Worker 0 still processing, nothing consumed
    • isCommitTimedOut() → true
    • doCommit(true) → partial commit succeeds (without Worker 0's data)
    • clearResponses()
    • endCurrentCommit()
  • Worker 0 finishes and sends DataWritten(A) + DataComplete(A) to control topic

  • process() call N:

    • consumeAvailable() → DataWritten(A) consumed → added to commitBuffer
  • process() call N+1:

    • startNewCommit(B)→ commitBuffer.clear() ← DataWritten(A) DISCARDED
    • consumeAvailable() → ...

With this PR's fix, commitBuffer.clear() in startNewCommit() discards Worker 0's DataWritten(A). Since the partial commit already advanced the control topic consumer offsets (via commitConsumerOffsets()), this event won't be re-consumed. The data files exist on storage but are never committed to the Iceberg table — resulting in data loss.

The root cause is broader

The underlying issue isn't just stale events lingering in the buffer — it's that tableCommitMap() doesn't distinguish events by commitId. Even if stale events survive in the buffer (which they should, since they
represent legitimate data), they get merged into the same RowDelta as current-cycle events, receiving the same sequence number. This breaks equality delete semantics (data_sequence_number < delete_sequence_number is required for equality deletes to apply).

Alternative approach: commit-id-based separation

Instead of discarding stale events, a safer fix would be to separate them by commitId and commit each group in its own RowDelta. This ensures:

  • Stale data files get a lower sequence number (committed first)
  • Current data files get a higher sequence number (committed last)
  • Equality deletes from the current cycle correctly apply to stale data (stale_seq < current_seq)
  • No data loss — all legitimate DataWritten events are committed

This approach also covers the failed-commit and recovery scenarios your PR addresses, since stale events from any prior cycle are naturally separated by their commitId.

We've already implemented this approach in our fork and will be deploying it to production soon. We'll also be opening a PR here shortly.

@jerryzhujing
Copy link

jerryzhujing commented Mar 20, 2026

@koodin9 Can you please open a pr as soon as possible ? So I can test it. In my testing #15651 it is actually losing data every day.

@t3hw t3hw marked this pull request as draft March 20, 2026 21:49
prevent duplicate rows without data loss

The previous fix (edef7f9) cleared `commitBuffer` in `startNewCommit()`
to prevent stale `DataWritten` events from producing same-sequence
equality deletes. However, this discards late events from workers that
finished after a partial commit (timeout) — those workers already
committed source offsets via `sendOffsetsToTransaction()`, so the data
is unrecoverable.

Instead, group events by commitId and commit each group as a separate
RowDelta with its own Iceberg sequence number. Stale groups are
committed first (lower seq), so equality deletes from newer groups
can apply to older data (`data.seq < delete.seq`).

Error handling uses a three-stage escalation:
1. Blocking retries (configurable, default 3) preserve ordering
2. Failure policy (`fail` stops the connector, `non-blocking` proceeds
   with ordering inversion risk)
3. TTL eviction (default 1 hour) with ERROR log of orphaned file paths

Other changes:
- Per-group offset computation prevents stale snapshots from filtering
  out current envelopes (offset poisoning fix)
- Min-offset consumer advancement on partial success bounds
  re-consumption on restart
- JMX MBean (`CommitStateMXBean`) exposes buffer metrics
- Three new configs: `stale-ttl-ms`, `stale-max-blocking-retries`,
  `stale-failure-policy`
@t3hw
Copy link
Author

t3hw commented Mar 20, 2026

I'd like to share some context on how this fix was developed and what alternatives were considered.

Disclaimer: I used Claude extensively for the investigation, design, and implementation of this fix. I validated every conclusion against the code, but want to be transparent about the tooling.


Approaches rejected

After @koodin9's comment identified the data loss with commitBuffer.clear(), I evaluated several alternatives before settling on per-commitId group separation:

Approach Why rejected
Conditional clear (boolean flag) Too coarse — successful partial → failed commit → clear discards legitimate late events. Same-sequence problem persists.
Two-bucket stale/current split Consecutive failures lump stale events into one RowDelta — equality deletes within the bucket share the same seq number.
Iceberg Transaction API No per-RowDelta offset validator (#14509/#14510), uncertain txn.table() pending state, all-or-nothing retry.
Data-then-deletes split Equality deletes are key-based, not cycle-aware — cycle 1's deletes would remove cycle 2's data too.
Position delete injection BaseRowDelta validation prevents referencing files added in the same RowDelta.

Future enhancement: Worker-side rollforward

The current fix handles stale events at the Coordinator. A cleaner long-term approach would eliminate them at the source.

Today, Workers fire-and-forget DataWritten — they don't know if it was committed. If Workers consumed CommitComplete events (currently ignored in Worker.receive()) and tracked pending writes, they could detect uncommitted work when the next StartCommit arrives. On detection, the Worker re-seeks to the uncommitted source offsets and re-processes under the new commitId — producing a fresh DataWritten instead of leaving a stale one.

This eliminates stale events entirely, removing the need for per-group ordering, retry escalation, and TTL eviction. It's deferred because it touches Worker, Channel, SinkWriter, and the control topic protocol — a scope that warrants its
own design discussion.

@t3hw t3hw marked this pull request as ready for review March 20, 2026 23:39
@github-actions github-actions bot added the docs label Mar 20, 2026
@koodin9
Copy link

koodin9 commented Mar 21, 2026

@jerryzhujing @t3hw
I just opened a PR, could you please take a look?

@jerryzhujing
Copy link

Fix stale DataWritten handling by separating commits into distinct RowDeltas #15710

@koodin9 Thanks for your quick response. I will merge your code and do the testing today

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants