Skip to content

Commit 2a34a4b

Browse files
committed
Refactor fixTxOffsets to use ConsumerRecords#nextOffsets
Replaced the position()-based offset correction with nextOffsets() to address known issues with transactional topics: - Works correctly with both read_committed and read_uncommitted - Handles empty poll() advancement and leader epoch propagation - Fixes the case when max.poll.records equals transaction batch size Signed-off-by: Su Ko <[email protected]>
1 parent 1abc7c1 commit 2a34a4b

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public class TransactionalContainerTests {
156156

157157
public static final String topic15 = "txTopic15";
158158

159-
160159
private static EmbeddedKafkaBroker embeddedKafka;
161160

162161
@BeforeAll
@@ -1323,7 +1322,8 @@ void testFixTxOffsetsWithEmptyPollAdvance() throws Exception {
13231322
containerProps.setFixTxOffsets(true);
13241323
containerProps.setIdleEventInterval(1000L);
13251324

1326-
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1325+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {
1326+
});
13271327

13281328
DefaultKafkaProducerFactory<Integer, String> pf =
13291329
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
@@ -1371,7 +1371,8 @@ void testFixTxOffsetsRetainsLeaderEpoch() throws Exception {
13711371
containerProps.setFixTxOffsets(true);
13721372
containerProps.setIdleEventInterval(1000L);
13731373

1374-
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1374+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {
1375+
});
13751376

13761377
DefaultKafkaProducerFactory<Integer, String> pf =
13771378
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
@@ -1417,7 +1418,8 @@ void testFixLagWhenMaxPollEqualsTxBatchSize() throws Exception {
14171418
containerProps.setPollTimeout(500L);
14181419
containerProps.setFixTxOffsets(true);
14191420
containerProps.setIdleEventInterval(1000L);
1420-
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {});
1421+
containerProps.setMessageListener((MessageListener<Integer, String>) rec -> {
1422+
});
14211423

14221424
DefaultKafkaProducerFactory<Integer, String> pf =
14231425
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));

0 commit comments

Comments
 (0)