From 0d38dbd05de7bec98df1ef097c5d3e9953886a38 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Thu, 18 Jun 2026 10:46:37 +0200 Subject: [PATCH 1/2] Add partition support to TestRecord Co-authored-by: Marie-Laure Momplot Co-authored-by: Julien Brunet Co-authored-by: Adam Souquieres --- .../apache/kafka/streams/test/TestRecord.java | 115 ++++++++++++++---- .../kafka/streams/test/TestRecordTest.java | 67 +++++++++- 2 files changed, 152 insertions(+), 30 deletions(-) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java index eb724a08e16c5..3825b90379bb6 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java @@ -33,28 +33,35 @@ * {@link TestInputTopic} will auto advance it's time when the record is piped. */ public class TestRecord { + private static final int NO_PARTITION = -1; private final Headers headers; private final K key; private final V value; private final Instant recordTime; - - public boolean equalsIgnorePartition(final TestRecord o) { - return false; - } + /** + * The partition this record is assigned to. + * A value of {@code -1} is a sentinel meaning "no explicit partition set" and is used + * only on input records created without an explicit partition argument. + * Output records read from {@link org.apache.kafka.streams.TestOutputTopic#readRecordsToList()} + * always carry the real resolved partition ({@code >= 0}). + */ + private final int partition; /** - * Creates a record. + * Creates a record with an explicit partition. * * @param key The key that will be included in the record * @param value The value of the record - * @param headers the record headers that will be included in the record - * @param recordTime The timestamp of the record. + * @param headers The record headers that will be included in the record + * @param recordTime The timestamp of the record + * @param partition The partition this record is assigned to */ - public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime) { + public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime, final int partition) { this.key = key; this.value = value; this.recordTime = recordTime; this.headers = new RecordHeaders(headers); + this.partition = partition; } /** @@ -62,8 +69,21 @@ public TestRecord(final K key, final V value, final Headers headers, final Insta * * @param key The key that will be included in the record * @param value The value of the record - * @param headers the record headers that will be included in the record - * @param timestampMs The timestamp of the record, in milliseconds since the beginning of the epoch. + * @param headers The record headers that will be included in the record + * @param recordTime The timestamp of the record + */ + public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime) { + this(key, value, headers, recordTime, NO_PARTITION); + } + + /** + * Creates a record. + * Partition defaults to {@code -1} (no explicit partition set). + * + * @param key The key that will be included in the record + * @param value The value of the record + * @param headers The record headers that will be included in the record + * @param timestampMs The timestamp of the record, in milliseconds since the beginning of the epoch */ public TestRecord(final K key, final V value, final Headers headers, final Long timestampMs) { if (timestampMs != null) { @@ -78,35 +98,36 @@ public TestRecord(final K key, final V value, final Headers headers, final Long this.key = key; this.value = value; this.headers = new RecordHeaders(headers); + this.partition = NO_PARTITION; } /** * Creates a record. + * Partition defaults to {@code -1} (no explicit partition set). * * @param key The key of the record * @param value The value of the record - * @param recordTime The timestamp of the record as Instant. + * @param recordTime The timestamp of the record as Instant */ public TestRecord(final K key, final V value, final Instant recordTime) { - this(key, value, null, recordTime); + this(key, value, null, recordTime, NO_PARTITION); } /** * Creates a record. + * Partition defaults to {@code -1} (no explicit partition set). * * @param key The key of the record * @param value The value of the record * @param headers The record headers that will be included in the record */ public TestRecord(final K key, final V value, final Headers headers) { - this.key = key; - this.value = value; - this.headers = new RecordHeaders(headers); - this.recordTime = null; + this(key, value, headers, (Instant) null, NO_PARTITION); } - + /** * Creates a record. + * Partition defaults to {@code -1} (no explicit partition set). * * @param key The key of the record * @param value The value of the record @@ -116,10 +137,12 @@ public TestRecord(final K key, final V value) { this.value = value; this.headers = new RecordHeaders(); this.recordTime = null; + this.partition = NO_PARTITION; } /** * Create a record with {@code null} key. + * Partition defaults to {@code -1} (no explicit partition set). * * @param value The value of the record */ @@ -129,6 +152,7 @@ public TestRecord(final V value) { /** * Create a {@code TestRecord} from a {@link ConsumerRecord}. + * The partition is taken from {@link ConsumerRecord#partition()}. * * @param record The v */ @@ -138,10 +162,12 @@ public TestRecord(final ConsumerRecord record) { this.value = record.value(); this.headers = record.headers(); this.recordTime = Instant.ofEpochMilli(record.timestamp()); + this.partition = record.partition(); } /** * Create a {@code TestRecord} from a {@link ProducerRecord}. + * If the producer record carries an explicit partition it is used; otherwise defaults to {@code -1}. * * @param record The record contents */ @@ -151,6 +177,7 @@ public TestRecord(final ProducerRecord record) { this.value = record.value(); this.headers = record.headers(); this.recordTime = Instant.ofEpochMilli(record.timestamp()); + this.partition = record.partition() != null ? record.partition() : NO_PARTITION; } /** @@ -181,6 +208,13 @@ public Long timestamp() { return this.recordTime == null ? null : this.recordTime.toEpochMilli(); } + /** + * @return the partition number, or {@code -1} if no partition was explicitly set + */ + public int partition() { + return partition; + } + /** * @return The headers. */ @@ -209,14 +243,44 @@ public Instant getRecordTime() { return recordTime; } + /** + * @return the partition number, or {@code -1} if no partition was explicitly set + */ + public int getPartition() { + return partition; + } + + /** + * Compares this record to {@code o} without considering the {@code partition} field. + * + *

Use this in tests that do not care about which partition a record was routed to: + *

{@code
+     * assertTrue(expected.equalsIgnorePartition(actual));
+     * }
+ * + * @param o the record to compare against; {@code null} returns {@code false} + * @return {@code true} if all fields except {@code partition} are equal + */ + public boolean equalsIgnorePartition(final TestRecord o) { + return o != null && (this == o || equalsFields(o)); + } + + private boolean equalsFields(final TestRecord other) { + return Objects.equals(headers, other.headers) + && Objects.equals(key, other.key) + && Objects.equals(value, other.value) + && Objects.equals(recordTime, other.recordTime); + } + @Override public String toString() { return new StringJoiner(", ", TestRecord.class.getSimpleName() + "[", "]") - .add("key=" + key) - .add("value=" + value) - .add("headers=" + headers) - .add("recordTime=" + recordTime) - .toString(); + .add("key=" + key) + .add("value=" + value) + .add("headers=" + headers) + .add("recordTime=" + recordTime) + .add("partition=" + partition) + .toString(); } @Override @@ -228,14 +292,11 @@ public boolean equals(final Object o) { return false; } final TestRecord that = (TestRecord) o; - return Objects.equals(headers, that.headers) && - Objects.equals(key, that.key) && - Objects.equals(value, that.value) && - Objects.equals(recordTime, that.recordTime); + return equalsFields(that) && partition == that.partition; } @Override public int hashCode() { - return Objects.hash(headers, key, value, recordTime); + return Objects.hash(headers, key, value, recordTime, partition); } } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java index ad3b1a2a20a43..e6e1531420077 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/TestRecordTest.java @@ -34,8 +34,10 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.hasProperty; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestRecordTest { private final String key = "testKey"; @@ -147,7 +149,7 @@ public void testToString() { assertThat(testRecord.toString(), equalTo("TestRecord[key=testKey, value=1, " + "headers=RecordHeaders(headers = [RecordHeader(key = foo, value = [118, 97, 108, 117, 101]), " + "RecordHeader(key = bar, value = null), RecordHeader(key = \"A\\u00ea\\u00f1\\u00fcC\", value = [118, 97, 108, 117, 101])], isReadOnly = false), " - + "recordTime=2019-06-01T10:00:00Z]")); + + "recordTime=2019-06-01T10:00:00Z, partition=-1]")); } @Test @@ -156,7 +158,7 @@ public void testConsumerRecord() { final ConsumerRecord consumerRecord = new ConsumerRecord<>(topicName, 1, 0, recordMs, TimestampType.CREATE_TIME, 0, 0, key, value, headers, Optional.empty()); final TestRecord testRecord = new TestRecord<>(consumerRecord); - final TestRecord expectedRecord = new TestRecord<>(key, value, headers, recordTime); + final TestRecord expectedRecord = new TestRecord<>(key, value, headers, recordTime, 1); assertEquals(expectedRecord, testRecord); } @@ -166,7 +168,66 @@ public void testProducerRecord() { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, 1, recordMs, key, value, headers); final TestRecord testRecord = new TestRecord<>(producerRecord); - final TestRecord expectedRecord = new TestRecord<>(key, value, headers, recordTime); + final TestRecord expectedRecord = new TestRecord<>(key, value, headers, recordTime, 1); assertEquals(expectedRecord, testRecord); } + + @Test + public void testPartitionDefaultsToUnset() { + // Records built without an explicit partition, default to -1 + assertEquals(-1, new TestRecord<>(key, value, headers, recordTime).partition()); + assertEquals(-1, new TestRecord<>(key, value, headers, recordMs).partition()); + assertEquals(-1, new TestRecord<>(key, value, headers).partition()); + assertEquals(-1, new TestRecord<>(key, value).partition()); + assertEquals(-1, new TestRecord<>(value).partition()); + } + + @Test + public void testExplicitPartitionConstructor() { + // Records built with an explicit partition. + final TestRecord testRecord = new TestRecord<>(key, value, headers, recordTime, 3); + assertEquals(3, testRecord.partition()); + } + + @Test + public void testEqualsConsidersPartition() { + // equals()/hashCode() take the partition into account. + final TestRecord record1 = new TestRecord<>(key, value, headers, recordTime, 0); + final TestRecord record2 = new TestRecord<>(key, value, headers, recordTime, 1); + assertNotEquals(record1, record2); + + final TestRecord record1Again = new TestRecord<>(key, value, headers, recordTime, 0); + assertEquals(record1, record1Again); + assertEquals(record1.hashCode(), record1Again.hashCode()); + + // an unset (default) partition differs from an explicit one + assertNotEquals(new TestRecord<>(key, value, headers, recordTime), record1); + } + + @Test + public void testEqualsIgnorePartition() { + // equalsIgnorePartition() matches on every field except the partition. + final TestRecord record1 = new TestRecord<>(key, value, headers, recordTime, 0); + final TestRecord record2 = new TestRecord<>(key, value, headers, recordTime, 1); + assertNotEquals(record1, record2); + assertTrue(record1.equalsIgnorePartition(record2)); + assertTrue(record2.equalsIgnorePartition(record1)); + + // a genuine field mismatch is still detected + assertFalse(record1.equalsIgnorePartition(new TestRecord<>("other", value, headers, recordTime, 0))); + assertFalse(record1.equalsIgnorePartition(new TestRecord<>(key, 2, headers, recordTime, 0))); + + // reflexive / null guards + assertTrue(record1.equalsIgnorePartition(record1)); + assertFalse(record1.equalsIgnorePartition(null)); + } + + @Test + public void testToStringIncludesPartitionWhenSet() { + final TestRecord testRecord = new TestRecord<>(key, value, headers, recordTime, 2); + assertThat(testRecord.toString(), equalTo("TestRecord[key=testKey, value=1, " + + "headers=RecordHeaders(headers = [RecordHeader(key = foo, value = [118, 97, 108, 117, 101]), " + + "RecordHeader(key = bar, value = null), RecordHeader(key = \"A\\u00ea\\u00f1\\u00fcC\", value = [118, 97, 108, 117, 101])], isReadOnly = false), " + + "recordTime=2019-06-01T10:00:00Z, partition=2]")); + } } From bf47373d7b95d45f88060a205b26117ee0741f39 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Fri, 19 Jun 2026 09:41:11 +0200 Subject: [PATCH 2/2] Add partition support to TestRecord Co-authored-by: Marie-Laure Momplot Co-authored-by: Julien Brunet Co-authored-by: Adam Souquieres --- .../apache/kafka/streams/test/TestRecord.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java index 3825b90379bb6..b39b87981a599 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/test/TestRecord.java @@ -66,6 +66,7 @@ public TestRecord(final K key, final V value, final Headers headers, final Insta /** * Creates a record. + * Partition defaults to {@code -1} (no explicit partition set). * * @param key The key that will be included in the record * @param value The value of the record @@ -162,7 +163,7 @@ public TestRecord(final ConsumerRecord record) { this.value = record.value(); this.headers = record.headers(); this.recordTime = Instant.ofEpochMilli(record.timestamp()); - this.partition = record.partition(); + this.partition = record.partition() < 0 ? NO_PARTITION : record.partition(); } /** @@ -177,7 +178,13 @@ public TestRecord(final ProducerRecord record) { this.value = record.value(); this.headers = record.headers(); this.recordTime = Instant.ofEpochMilli(record.timestamp()); - this.partition = record.partition() != null ? record.partition() : NO_PARTITION; + final Integer partition = record.partition(); + if (partition != null && partition < 0) { + throw new IllegalArgumentException( + "Partition must be >= 0 or null, got: " + partition + ); + } + this.partition = partition != null ? partition : NO_PARTITION; } /** @@ -251,25 +258,25 @@ public int getPartition() { } /** - * Compares this record to {@code o} without considering the {@code partition} field. + * Compares this record to {@code otherRecord} without considering the {@code partition} field. * *

Use this in tests that do not care about which partition a record was routed to: *

{@code
      * assertTrue(expected.equalsIgnorePartition(actual));
      * }
* - * @param o the record to compare against; {@code null} returns {@code false} + * @param otherRecord the record to compare against; {@code null} returns {@code false} * @return {@code true} if all fields except {@code partition} are equal */ - public boolean equalsIgnorePartition(final TestRecord o) { - return o != null && (this == o || equalsFields(o)); + public boolean equalsIgnorePartition(final TestRecord otherRecord) { + return otherRecord != null && (this == otherRecord || equalsFields(otherRecord)); } - private boolean equalsFields(final TestRecord other) { - return Objects.equals(headers, other.headers) - && Objects.equals(key, other.key) - && Objects.equals(value, other.value) - && Objects.equals(recordTime, other.recordTime); + private boolean equalsFields(final TestRecord otherRecord) { + return Objects.equals(headers, otherRecord.headers) + && Objects.equals(key, otherRecord.key) + && Objects.equals(value, otherRecord.value) + && Objects.equals(recordTime, otherRecord.recordTime); } @Override