Skip to content

Commit 68afa8b

Browse files
AndrewJSchofieldeduwercamacaro
authored andcommitted
KAFKA-19845: [1/N] Renew acks in share consumer (apache#20838)
Implements AcknowledgeType.RENEW in the share consumer as part of KIP-1222. There will be a future PR with additional tests. Reviewers: Apoorv Mittal <[email protected]>, Shivsundar R <[email protected]>, Abhinav Dixit <[email protected]>
1 parent 6691844 commit 68afa8b

File tree

20 files changed

+772
-154
lines changed

20 files changed

+772
-154
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
public class ShareConsumerTest {
134134
private final ClusterInstance cluster;
135135
private final TopicPartition tp = new TopicPartition("topic", 0);
136+
private Uuid tpId;
136137
private final TopicPartition tp2 = new TopicPartition("topic2", 0);
137138
private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
138139
private List<TopicPartition> sgsTopicPartitions;
@@ -151,7 +152,7 @@ public ShareConsumerTest(ClusterInstance cluster) {
151152
public void setup() {
152153
try {
153154
this.cluster.waitForReadyBrokers();
154-
createTopic("topic");
155+
tpId = createTopic("topic");
155156
createTopic("topic2");
156157
sgsTopicPartitions = IntStream.range(0, 3)
157158
.mapToObj(part -> new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
@@ -2906,6 +2907,95 @@ public void testExplicitAcknowledgeReleaseAcceptInRecordLimitMode() {
29062907
}
29072908
}
29082909

2910+
@ClusterTest
2911+
public void testRenewAcknowledgementOnPoll() {
2912+
alterShareAutoOffsetReset("group1", "earliest");
2913+
try (Producer<byte[], byte[]> producer = createProducer();
2914+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2915+
"group1",
2916+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))
2917+
) {
2918+
AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
2919+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) ->
2920+
offsetsByTopicPartition.forEach((tip, offsets) -> acknowledgementsCommitted.addAndGet(offsets.size())));
2921+
2922+
for (int i = 0; i < 10; i++) {
2923+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message " + i).getBytes());
2924+
producer.send(record);
2925+
}
2926+
producer.flush();
2927+
2928+
shareConsumer.subscribe(List.of(tp.topic()));
2929+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
2930+
assertEquals(10, records.count());
2931+
2932+
int count = 0;
2933+
for (ConsumerRecord<byte[], byte[]> record : records) {
2934+
if (count % 2 == 0) {
2935+
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
2936+
} else {
2937+
shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
2938+
}
2939+
count++;
2940+
}
2941+
2942+
// Get the rest of all 5 records.
2943+
records = waitedPoll(shareConsumer, 2500L, 5);
2944+
assertEquals(5, records.count());
2945+
for (ConsumerRecord<byte[], byte[]> record : records) {
2946+
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
2947+
}
2948+
2949+
shareConsumer.commitSync();
2950+
assertEquals(15, acknowledgementsCommitted.get());
2951+
}
2952+
}
2953+
2954+
@ClusterTest
2955+
public void testRenewAcknowledgementOnCommitSync() {
2956+
alterShareAutoOffsetReset("group1", "earliest");
2957+
try (Producer<byte[], byte[]> producer = createProducer();
2958+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2959+
"group1",
2960+
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))
2961+
) {
2962+
AtomicInteger acknowledgementsCommitted = new AtomicInteger(0);
2963+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, exception) ->
2964+
offsetsByTopicPartition.forEach((tip, offsets) -> acknowledgementsCommitted.addAndGet(offsets.size())));
2965+
2966+
for (int i = 0; i < 10; i++) {
2967+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message " + i).getBytes());
2968+
producer.send(record);
2969+
}
2970+
producer.flush();
2971+
2972+
shareConsumer.subscribe(List.of(tp.topic()));
2973+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
2974+
assertEquals(10, records.count());
2975+
2976+
int count = 0;
2977+
Map<TopicIdPartition, Optional<KafkaException>> result;
2978+
for (ConsumerRecord<byte[], byte[]> record : records) {
2979+
if (count % 2 == 0) {
2980+
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
2981+
} else {
2982+
shareConsumer.acknowledge(record, AcknowledgeType.RENEW);
2983+
}
2984+
result = shareConsumer.commitSync();
2985+
assertEquals(1, result.size());
2986+
assertEquals(Optional.empty(), result.get(new TopicIdPartition(tpId, tp.partition(), tp.topic())));
2987+
count++;
2988+
}
2989+
2990+
// Get the rest of all 5 records.
2991+
records = waitedPoll(shareConsumer, 2500L, 5);
2992+
assertEquals(5, records.count());
2993+
for (ConsumerRecord<byte[], byte[]> record : records) {
2994+
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
2995+
}
2996+
}
2997+
}
2998+
29092999
/**
29103000
* Util class to encapsulate state for a consumer/producer
29113001
* being executed by an {@link ExecutorService}.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ public KafkaException getAcknowledgeException() {
143143
return acknowledgeException;
144144
}
145145

146+
/**
147+
* Whether an acknowledgement error code was received in the response from the broker.
148+
*
149+
* @return Whether an acknowledgement error code was received in the response from the broker.
150+
*/
151+
public boolean isCompletedExceptionally() {
152+
return acknowledgeException != null;
153+
}
154+
146155
/**
147156
* Merges two sets of acknowledgements. If there are overlapping acknowledgements, the
148157
* merged set wins.

0 commit comments

Comments
 (0)