Skip to content

KAFKA-20699: TopologyTestDriver.getStateStore must not reset the record context#22590

Open
SouquieresAdam wants to merge 1 commit into
apache:trunkfrom
SouquieresAdam:kafka-ttd-getstatestore-context
Open

KAFKA-20699: TopologyTestDriver.getStateStore must not reset the record context#22590
SouquieresAdam wants to merge 1 commit into
apache:trunkfrom
SouquieresAdam:kafka-ttd-getstatestore-context

Conversation

@SouquieresAdam

@SouquieresAdam SouquieresAdam commented Jun 16, 2026

Copy link
Copy Markdown

Summary

KAFKA-20699. A
regression introduced by KAFKA-19638 (#20403):
TopologyTestDriver#getStateStore now sets a dummy
ProcessorRecordContext (null topic, -1 offset/partition, timestamp
0) unconditionally and never restores it. Before KAFKA-19638 the
dummy context was set once at task construction and getStateStore was
read-only.

As a result, a store lookup mutates the task's live record context as a
side effect. When a store handle is fetched while a record's context is
active — an interactive query interleaved with processing, or a test
seam that resolves a store handle through TTD from production code — the
in-flight record's RecordMetadata is wiped: recordMetadata().topic()
returns null and offset/partition become -1. Code building
provenance from recordMetadata().topic() then fails, and direct store
writes capture timestamp 0.

The normal process() path masks this because doProcess rebuilds the
context per record, so it surfaces on direct store writes after a lookup
and on any context read not preceded by a fresh process().

Fix

Only materialize the dummy context when none exists yet (before any
record has been processed); never overwrite a live one. Direct store
access still works, while an in-flight record's context is left intact.

Testing

Added shouldNotResetRecordContextWhenAccessingStateStore to
TopologyTestDriverTest (runs under both at-least-once and EOS). It
processes a record at stream-time 5000, then grabs the store handle and
writes directly to it; the write must be logged to the changelog at
5000. Fails before the fix (expected: <5000> but was: <0>), passes
after. Full :streams:test-utils:test + checkstyle + spotbugs pass.

Reviewers: Matthias J. Sax matthias@confluent.io

…rd context

KAFKA-19638 moved the dummy ProcessorRecordContext initialization out of task
construction and into TopologyTestDriver#getStateStore, but set it
unconditionally. As a result every store lookup overwrites the task's live
record context (null topic, -1 offset/partition, timestamp 0) and never
restores it, so a store handle fetched while a record's context is active wipes
the in-flight RecordMetadata: recordMetadata().topic() returns null and a
subsequent direct write is logged to the changelog at timestamp 0.

The normal process() path masks this because doProcess rebuilds the context per
record, so it surfaces on direct store writes after a lookup, and on any context
read not preceded by a fresh process().

Only materialize the dummy context when none exists yet (before any record has
been processed); never overwrite a live one. This keeps direct store access
working while leaving an in-flight record's context intact.
@github-actions github-actions Bot added triage PRs from the community streams small Small PRs labels Jun 16, 2026
@SouquieresAdam

Copy link
Copy Markdown
Author

Hi @mjsax & @eduwercamacaro. Migrating to 4.2 raised this issue on our side. First PR for a bug, feel free to educate me on the process.

// the direct write should be logged at the live stream-time (5000), not epoch 0
final KeyValueStore<String, Long> handle = testDriver.getKeyValueStore(storeName);
handle.put("seeded", 2L);
testDriver.advanceWallClockTime(Duration.ZERO);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call this and pass in ZERO? Looks like a no-op?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's on purpose here: this line force the changelog topic to be flushed and allow to test the output. I chose 0 to explicitly use a time neutral value. Same result with 1ms.

@mjsax mjsax added ci-approved and removed triage PRs from the community labels Jun 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants