Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,12 @@ public StateStore getStateStore(final String name) throws IllegalArgumentExcepti
private StateStore getStateStore(final String name,
final boolean throwForBuiltInStores) {
if (task != null) {
task.processorContext().setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
// Accessing a store must not corrupt the task's record context. Only set a dummy
// context when none exists yet (i.e. before any record has been processed) so that
// direct store operations have a context to work with; never overwrite a live one.
if (task.processorContext().recordContext() == null) {
task.processorContext().setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
}
final StateStore stateStore = ((ProcessorContextImpl) task.processorContext()).stateManager().store(name);
if (stateStore != null) {
if (throwForBuiltInStores) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,63 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
assertTrue(testDriver.isEmpty("result-topic"));
}

@Test
public void shouldNotResetRecordContextWhenAccessingStateStore() {
final String storeName = "recordContextStore";
final Topology topology = new Topology();
topology.addSource("source", "input-topic");
topology.addProcessor("writer", () -> new StoreWriter(storeName), "source");
topology.addStateStore(
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.Long()),
"writer");

config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
testDriver = new TopologyTestDriver(topology, config);

final TestInputTopic<String, Long> input =
testDriver.createInputTopic("input-topic", new StringSerializer(), new LongSerializer());
final TestOutputTopic<String, Long> changelog = testDriver.createOutputTopic(
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + storeName + "-changelog",
stringDeserializer, longDeserializer);

// a record processed at stream-time 5000 anchors the task's record context there
input.pipeInput("processed", 1L, 5000L);
changelog.readRecordsToList();

// grabbing the store handle and writing to it must not reset the live record context:
// the direct write should be logged at the live stream-time (5000), not epoch 0
final KeyValueStore<String, Long> handle = testDriver.getKeyValueStore(storeName);
handle.put("seeded", 2L);
testDriver.advanceWallClockTime(Duration.ZERO);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call this and pass in ZERO? Looks like a no-op?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's on purpose here: this line force the changelog topic to be flushed and allow to test the output. I chose 0 to explicitly use a time neutral value. Same result with 1ms.


final TestRecord<String, Long> seeded = changelog.readRecordsToList().stream()
.filter(record -> record.key().equals("seeded"))
.findFirst()
.orElseThrow(() -> new AssertionError("seeded entry was not logged to the changelog"));
assertEquals(5000L, seeded.timestamp(),
"getStateStore reset the live record context, so the direct write was logged at epoch 0");
}

private static final class StoreWriter implements Processor<String, Long, Void, Void> {
private final String storeName;
private KeyValueStore<String, Long> store;

StoreWriter(final String storeName) {
this.storeName = storeName;
}

@Override
public void init(final ProcessorContext<Void, Void> context) {
this.store = context.getStateStore(storeName);
}

@Override
public void process(final Record<String, Long> record) {
store.put(record.key(), record.value());
}
}

private static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long, String, Long> {
@Override
public Processor<String, Long, String, Long> get() {
Expand Down
Loading