diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2738458062a58..bad4fd58a7f45 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -938,7 +938,12 @@ public StateStore getStateStore(final String name) throws IllegalArgumentExcepti private StateStore getStateStore(final String name, final boolean throwForBuiltInStores) { if (task != null) { - task.processorContext().setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); + // Accessing a store must not corrupt the task's record context. Only set a dummy + // context when none exists yet (i.e. before any record has been processed) so that + // direct store operations have a context to work with; never overwrite a live one. + if (task.processorContext().recordContext() == null) { + task.processorContext().setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); + } final StateStore stateStore = ((ProcessorContextImpl) task.processorContext()).stateManager().store(name); if (stateStore != null) { if (throwForBuiltInStores) { diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index c8651c50ec6cb..e8e31a7532c35 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -1499,6 +1499,63 @@ public void shouldPunctuateIfWallClockTimeAdvances() { assertTrue(testDriver.isEmpty("result-topic")); } + @Test + public void shouldNotResetRecordContextWhenAccessingStateStore() { + final String storeName = "recordContextStore"; + final Topology topology = new Topology(); + topology.addSource("source", "input-topic"); + topology.addProcessor("writer", () -> new StoreWriter(storeName), "source"); + topology.addStateStore( + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.Long()), + "writer"); + + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); + testDriver = new TopologyTestDriver(topology, config); + + final TestInputTopic input = + testDriver.createInputTopic("input-topic", new StringSerializer(), new LongSerializer()); + final TestOutputTopic changelog = testDriver.createOutputTopic( + config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + storeName + "-changelog", + stringDeserializer, longDeserializer); + + // a record processed at stream-time 5000 anchors the task's record context there + input.pipeInput("processed", 1L, 5000L); + changelog.readRecordsToList(); + + // grabbing the store handle and writing to it must not reset the live record context: + // the direct write should be logged at the live stream-time (5000), not epoch 0 + final KeyValueStore handle = testDriver.getKeyValueStore(storeName); + handle.put("seeded", 2L); + testDriver.advanceWallClockTime(Duration.ZERO); + + final TestRecord seeded = changelog.readRecordsToList().stream() + .filter(record -> record.key().equals("seeded")) + .findFirst() + .orElseThrow(() -> new AssertionError("seeded entry was not logged to the changelog")); + assertEquals(5000L, seeded.timestamp(), + "getStateStore reset the live record context, so the direct write was logged at epoch 0"); + } + + private static final class StoreWriter implements Processor { + private final String storeName; + private KeyValueStore store; + + StoreWriter(final String storeName) { + this.storeName = storeName; + } + + @Override + public void init(final ProcessorContext context) { + this.store = context.getStateStore(storeName); + } + + @Override + public void process(final Record record) { + store.put(record.key(), record.value()); + } + } + private static class CustomMaxAggregatorSupplier implements ProcessorSupplier { @Override public Processor get() {