Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,58 @@
* {@link TestInputTopic} will auto advance it's time when the record is piped.
*/
public class TestRecord<K, V> {
private static final int NO_PARTITION = -1;
private final Headers headers;
private final K key;
private final V value;
private final Instant recordTime;

public boolean equalsIgnorePartition(final TestRecord<? extends K, ? super V> o) {
return false;
}
/**
* The partition this record is assigned to.
* A value of {@code -1} is a sentinel meaning "no explicit partition set" and is used
* only on <em>input</em> records created without an explicit partition argument.
* Output records read from {@link org.apache.kafka.streams.TestOutputTopic#readRecordsToList()}
* always carry the real resolved partition ({@code >= 0}).
*/
private final int partition;

/**
* Creates a record.
* Creates a record with an explicit partition.
*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers the record headers that will be included in the record
* @param recordTime The timestamp of the record.
* @param headers The record headers that will be included in the record
* @param recordTime The timestamp of the record
* @param partition The partition this record is assigned to
*/
public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime) {
public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime, final int partition) {
this.key = key;
this.value = value;
this.recordTime = recordTime;
this.headers = new RecordHeaders(headers);
this.partition = partition;
}

/**
* Creates a record.
* Partition defaults to {@code -1} (no explicit partition set).
*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers the record headers that will be included in the record
* @param timestampMs The timestamp of the record, in milliseconds since the beginning of the epoch.
* @param headers The record headers that will be included in the record
* @param recordTime The timestamp of the record
*/
public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime) {
this(key, value, headers, recordTime, NO_PARTITION);
}

/**
* Creates a record.
* Partition defaults to {@code -1} (no explicit partition set).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment be also on the above constructor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* @param key The key that will be included in the record
* @param value The value of the record
* @param headers The record headers that will be included in the record
* @param timestampMs The timestamp of the record, in milliseconds since the beginning of the epoch
*/
public TestRecord(final K key, final V value, final Headers headers, final Long timestampMs) {
if (timestampMs != null) {
Expand All @@ -78,35 +99,36 @@ public TestRecord(final K key, final V value, final Headers headers, final Long
this.key = key;
this.value = value;
this.headers = new RecordHeaders(headers);
this.partition = NO_PARTITION;
}

/**
* Creates a record.
* Partition defaults to {@code -1} (no explicit partition set).
*
* @param key The key of the record
* @param value The value of the record
* @param recordTime The timestamp of the record as Instant.
* @param recordTime The timestamp of the record as Instant
*/
public TestRecord(final K key, final V value, final Instant recordTime) {
this(key, value, null, recordTime);
this(key, value, null, recordTime, NO_PARTITION);
}

/**
* Creates a record.
* Partition defaults to {@code -1} (no explicit partition set).
*
* @param key The key of the record
* @param value The value of the record
* @param headers The record headers that will be included in the record
*/
public TestRecord(final K key, final V value, final Headers headers) {
this.key = key;
this.value = value;
this.headers = new RecordHeaders(headers);
this.recordTime = null;
this(key, value, headers, (Instant) null, NO_PARTITION);
}

/**
* Creates a record.
* Partition defaults to {@code -1} (no explicit partition set).
*
* @param key The key of the record
* @param value The value of the record
Expand All @@ -116,10 +138,12 @@ public TestRecord(final K key, final V value) {
this.value = value;
this.headers = new RecordHeaders();
this.recordTime = null;
this.partition = NO_PARTITION;
}

/**
* Create a record with {@code null} key.
* Partition defaults to {@code -1} (no explicit partition set).
*
* @param value The value of the record
*/
Expand All @@ -129,6 +153,7 @@ public TestRecord(final V value) {

/**
* Create a {@code TestRecord} from a {@link ConsumerRecord}.
* The partition is taken from {@link ConsumerRecord#partition()}.
*
* @param record The v
*/
Expand All @@ -138,10 +163,12 @@ public TestRecord(final ConsumerRecord<K, V> record) {
this.value = record.value();
this.headers = record.headers();
this.recordTime = Instant.ofEpochMilli(record.timestamp());
this.partition = record.partition() < 0 ? NO_PARTITION : record.partition();
}

/**
* Create a {@code TestRecord} from a {@link ProducerRecord}.
* If the producer record carries an explicit partition it is used; otherwise defaults to {@code -1}.
*
* @param record The record contents
*/
Expand All @@ -151,6 +178,13 @@ public TestRecord(final ProducerRecord<K, V> record) {
this.value = record.value();
this.headers = record.headers();
this.recordTime = Instant.ofEpochMilli(record.timestamp());
final Integer partition = record.partition();
if (partition != null && partition < 0) {
throw new IllegalArgumentException(
"Partition must be >= 0 or null, got: " + partition
);
}
this.partition = partition != null ? partition : NO_PARTITION;
}

/**
Expand Down Expand Up @@ -181,6 +215,13 @@ public Long timestamp() {
return this.recordTime == null ? null : this.recordTime.toEpochMilli();
}

/**
* @return the partition number, or {@code -1} if no partition was explicitly set
*/
public int partition() {
return partition;
}

/**
* @return The headers.
*/
Expand Down Expand Up @@ -209,14 +250,44 @@ public Instant getRecordTime() {
return recordTime;
}

/**
* @return the partition number, or {@code -1} if no partition was explicitly set
*/
public int getPartition() {
return partition;
}

/**
* Compares this record to {@code otherRecord} without considering the {@code partition} field.
*
* <p>Use this in tests that do not care about which partition a record was routed to:
* <pre>{@code
* assertTrue(expected.equalsIgnorePartition(actual));
* }</pre>
*
* @param otherRecord the record to compare against; {@code null} returns {@code false}
* @return {@code true} if all fields except {@code partition} are equal
*/
public boolean equalsIgnorePartition(final TestRecord<K, V> otherRecord) {
return otherRecord != null && (this == otherRecord || equalsFields(otherRecord));
}

private boolean equalsFields(final TestRecord<?, ?> otherRecord) {
return Objects.equals(headers, otherRecord.headers)
&& Objects.equals(key, otherRecord.key)
&& Objects.equals(value, otherRecord.value)
&& Objects.equals(recordTime, otherRecord.recordTime);
}

@Override
public String toString() {
return new StringJoiner(", ", TestRecord.class.getSimpleName() + "[", "]")
.add("key=" + key)
.add("value=" + value)
.add("headers=" + headers)
.add("recordTime=" + recordTime)
.toString();
.add("key=" + key)
.add("value=" + value)
.add("headers=" + headers)
.add("recordTime=" + recordTime)
.add("partition=" + partition)
.toString();
}

@Override
Expand All @@ -228,14 +299,11 @@ public boolean equals(final Object o) {
return false;
}
final TestRecord<?, ?> that = (TestRecord<?, ?>) o;
return Objects.equals(headers, that.headers) &&
Objects.equals(key, that.key) &&
Objects.equals(value, that.value) &&
Objects.equals(recordTime, that.recordTime);
return equalsFields(that) && partition == that.partition;
}

@Override
public int hashCode() {
return Objects.hash(headers, key, value, recordTime);
return Objects.hash(headers, key, value, recordTime, partition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestRecordTest {
private final String key = "testKey";
Expand Down Expand Up @@ -147,7 +149,7 @@ public void testToString() {
assertThat(testRecord.toString(), equalTo("TestRecord[key=testKey, value=1, "
+ "headers=RecordHeaders(headers = [RecordHeader(key = foo, value = [118, 97, 108, 117, 101]), "
+ "RecordHeader(key = bar, value = null), RecordHeader(key = \"A\\u00ea\\u00f1\\u00fcC\", value = [118, 97, 108, 117, 101])], isReadOnly = false), "
+ "recordTime=2019-06-01T10:00:00Z]"));
+ "recordTime=2019-06-01T10:00:00Z, partition=-1]"));
}

@Test
Expand All @@ -156,7 +158,7 @@ public void testConsumerRecord() {
final ConsumerRecord<String, Integer> consumerRecord = new ConsumerRecord<>(topicName, 1, 0, recordMs,
TimestampType.CREATE_TIME, 0, 0, key, value, headers, Optional.empty());
final TestRecord<String, Integer> testRecord = new TestRecord<>(consumerRecord);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime, 1);
assertEquals(expectedRecord, testRecord);
}

Expand All @@ -166,7 +168,66 @@ public void testProducerRecord() {
final ProducerRecord<String, Integer> producerRecord =
new ProducerRecord<>(topicName, 1, recordMs, key, value, headers);
final TestRecord<String, Integer> testRecord = new TestRecord<>(producerRecord);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime);
final TestRecord<String, Integer> expectedRecord = new TestRecord<>(key, value, headers, recordTime, 1);
assertEquals(expectedRecord, testRecord);
}

@Test
public void testPartitionDefaultsToUnset() {
// Records built without an explicit partition, default to -1
assertEquals(-1, new TestRecord<>(key, value, headers, recordTime).partition());
assertEquals(-1, new TestRecord<>(key, value, headers, recordMs).partition());
assertEquals(-1, new TestRecord<>(key, value, headers).partition());
assertEquals(-1, new TestRecord<>(key, value).partition());
assertEquals(-1, new TestRecord<>(value).partition());
}

@Test
public void testExplicitPartitionConstructor() {
// Records built with an explicit partition.
final TestRecord<String, Integer> testRecord = new TestRecord<>(key, value, headers, recordTime, 3);
assertEquals(3, testRecord.partition());
}

@Test
public void testEqualsConsidersPartition() {
// equals()/hashCode() take the partition into account.
final TestRecord<String, Integer> record1 = new TestRecord<>(key, value, headers, recordTime, 0);
final TestRecord<String, Integer> record2 = new TestRecord<>(key, value, headers, recordTime, 1);
assertNotEquals(record1, record2);

final TestRecord<String, Integer> record1Again = new TestRecord<>(key, value, headers, recordTime, 0);
assertEquals(record1, record1Again);
assertEquals(record1.hashCode(), record1Again.hashCode());

// an unset (default) partition differs from an explicit one
assertNotEquals(new TestRecord<>(key, value, headers, recordTime), record1);
}

@Test
public void testEqualsIgnorePartition() {
// equalsIgnorePartition() matches on every field except the partition.
final TestRecord<String, Integer> record1 = new TestRecord<>(key, value, headers, recordTime, 0);
final TestRecord<String, Integer> record2 = new TestRecord<>(key, value, headers, recordTime, 1);
assertNotEquals(record1, record2);
assertTrue(record1.equalsIgnorePartition(record2));
assertTrue(record2.equalsIgnorePartition(record1));

// a genuine field mismatch is still detected
assertFalse(record1.equalsIgnorePartition(new TestRecord<>("other", value, headers, recordTime, 0)));
assertFalse(record1.equalsIgnorePartition(new TestRecord<>(key, 2, headers, recordTime, 0)));

// reflexive / null guards
assertTrue(record1.equalsIgnorePartition(record1));
assertFalse(record1.equalsIgnorePartition(null));
}

@Test
public void testToStringIncludesPartitionWhenSet() {
final TestRecord<String, Integer> testRecord = new TestRecord<>(key, value, headers, recordTime, 2);
assertThat(testRecord.toString(), equalTo("TestRecord[key=testKey, value=1, "
+ "headers=RecordHeaders(headers = [RecordHeader(key = foo, value = [118, 97, 108, 117, 101]), "
+ "RecordHeader(key = bar, value = null), RecordHeader(key = \"A\\u00ea\\u00f1\\u00fcC\", value = [118, 97, 108, 117, 101])], isReadOnly = false), "
+ "recordTime=2019-06-01T10:00:00Z, partition=2]"));
}
}