Skip to content

Commit 2fea4fd

Browse files
committed
propagate scope in async failures
Signed-off-by: Igor Macedo Quintanilha <[email protected]>
1 parent 6425682 commit 2fea4fd

File tree

3 files changed

+217
-83
lines changed

3 files changed

+217
-83
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,6 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.nio.ByteBuffer;
20-
import java.time.Duration;
21-
import java.util.AbstractMap.SimpleEntry;
22-
import java.util.ArrayList;
23-
import java.util.Arrays;
24-
import java.util.Collection;
25-
import java.util.Collections;
26-
import java.util.HashMap;
27-
import java.util.HashSet;
28-
import java.util.Iterator;
29-
import java.util.LinkedHashMap;
30-
import java.util.LinkedHashSet;
31-
import java.util.LinkedList;
32-
import java.util.List;
33-
import java.util.Map;
34-
import java.util.Map.Entry;
35-
import java.util.Objects;
36-
import java.util.Properties;
37-
import java.util.Set;
38-
import java.util.concurrent.BlockingQueue;
39-
import java.util.concurrent.CompletableFuture;
40-
import java.util.concurrent.ConcurrentHashMap;
41-
import java.util.concurrent.ConcurrentLinkedDeque;
42-
import java.util.concurrent.CountDownLatch;
43-
import java.util.concurrent.LinkedBlockingQueue;
44-
import java.util.concurrent.ScheduledFuture;
45-
import java.util.concurrent.TimeUnit;
46-
import java.util.concurrent.atomic.AtomicBoolean;
47-
import java.util.function.BiConsumer;
48-
import java.util.function.Function;
49-
import java.util.regex.Pattern;
50-
import java.util.stream.Collectors;
51-
5219
import io.micrometer.observation.Observation;
5320
import io.micrometer.observation.ObservationRegistry;
5421
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -76,7 +43,6 @@
7643
import org.apache.kafka.common.header.Header;
7744
import org.apache.kafka.common.header.internals.RecordHeader;
7845
import org.jspecify.annotations.Nullable;
79-
8046
import org.springframework.aop.support.AopUtils;
8147
import org.springframework.beans.BeanUtils;
8248
import org.springframework.context.ApplicationContext;
@@ -139,6 +105,39 @@
139105
import org.springframework.util.ObjectUtils;
140106
import org.springframework.util.StringUtils;
141107

108+
import java.nio.ByteBuffer;
109+
import java.time.Duration;
110+
import java.util.AbstractMap.SimpleEntry;
111+
import java.util.ArrayList;
112+
import java.util.Arrays;
113+
import java.util.Collection;
114+
import java.util.Collections;
115+
import java.util.HashMap;
116+
import java.util.HashSet;
117+
import java.util.Iterator;
118+
import java.util.LinkedHashMap;
119+
import java.util.LinkedHashSet;
120+
import java.util.LinkedList;
121+
import java.util.List;
122+
import java.util.Map;
123+
import java.util.Map.Entry;
124+
import java.util.Objects;
125+
import java.util.Properties;
126+
import java.util.Set;
127+
import java.util.concurrent.BlockingQueue;
128+
import java.util.concurrent.CompletableFuture;
129+
import java.util.concurrent.ConcurrentHashMap;
130+
import java.util.concurrent.ConcurrentLinkedDeque;
131+
import java.util.concurrent.CountDownLatch;
132+
import java.util.concurrent.LinkedBlockingQueue;
133+
import java.util.concurrent.ScheduledFuture;
134+
import java.util.concurrent.TimeUnit;
135+
import java.util.concurrent.atomic.AtomicBoolean;
136+
import java.util.function.BiConsumer;
137+
import java.util.function.Function;
138+
import java.util.regex.Pattern;
139+
import java.util.stream.Collectors;
140+
142141
/**
143142
* Single-threaded Message listener container using the Java {@link Consumer} supporting
144143
* auto-partition assignment or user-configured assignment.
@@ -680,16 +679,16 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
680679

681680
private final @Nullable CommonErrorHandler commonErrorHandler;
682681

683-
@Deprecated(since = "3.2", forRemoval = true)
684682
@SuppressWarnings("removal")
685683
private final @Nullable PlatformTransactionManager transactionManager =
686684
this.containerProperties.getKafkaAwareTransactionManager() != null ?
687685
this.containerProperties.getKafkaAwareTransactionManager() :
688686
this.containerProperties.getTransactionManager();
689687

690688
private final @Nullable KafkaAwareTransactionManager<?, ?> kafkaTxManager =
691-
this.transactionManager instanceof KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager ?
692-
kafkaAwareTransactionManager : null;
689+
this.transactionManager instanceof KafkaAwareTransactionManager
690+
? (KafkaAwareTransactionManager<?, ?>) this.transactionManager
691+
: null;
693692

694693
private final @Nullable TransactionTemplate transactionTemplate;
695694

@@ -1498,7 +1497,13 @@ protected void handleAsyncFailure() {
14981497
// We will give up on retrying with the remaining copied and failed Records.
14991498
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
15001499
try {
1501-
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1500+
KafkaListenerObservation.LISTENER_OBSERVATION.observation(
1501+
getContainerProperties().getObservationConvention(),
1502+
DefaultKafkaListenerObservationConvention.INSTANCE,
1503+
() -> new KafkaRecordReceiverContext(copyFailedRecord.record(), getListenerId(),
1504+
getClientId(), this.consumerGroupId, this::clusterId),
1505+
this.observationRegistry)
1506+
.observe(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord));
15021507
}
15031508
catch (Exception e) {
15041509
this.logger.warn(() ->

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,6 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19-
import java.lang.reflect.Method;
20-
import java.lang.reflect.ParameterizedType;
21-
import java.lang.reflect.Type;
22-
import java.lang.reflect.WildcardType;
23-
import java.nio.ByteBuffer;
24-
import java.nio.charset.StandardCharsets;
25-
import java.util.Collection;
26-
import java.util.Iterator;
27-
import java.util.List;
28-
import java.util.Map;
29-
import java.util.Objects;
30-
import java.util.concurrent.CompletableFuture;
31-
import java.util.concurrent.CompletionException;
32-
import java.util.function.BiConsumer;
33-
import java.util.stream.Collectors;
34-
3519
import io.micrometer.observation.Observation;
3620
import io.micrometer.observation.ObservationRegistry;
3721
import org.apache.commons.logging.LogFactory;
@@ -41,8 +25,6 @@
4125
import org.apache.kafka.common.TopicPartition;
4226
import org.jspecify.annotations.NonNull;
4327
import org.jspecify.annotations.Nullable;
44-
import reactor.core.publisher.Mono;
45-
4628
import org.springframework.context.expression.MapAccessor;
4729
import org.springframework.core.MethodParameter;
4830
import org.springframework.core.log.LogAccessor;
@@ -78,6 +60,23 @@
7860
import org.springframework.util.ObjectUtils;
7961
import org.springframework.util.StringUtils;
8062
import org.springframework.util.TypeUtils;
63+
import reactor.core.publisher.Mono;
64+
65+
import java.lang.reflect.Method;
66+
import java.lang.reflect.ParameterizedType;
67+
import java.lang.reflect.Type;
68+
import java.lang.reflect.WildcardType;
69+
import java.nio.ByteBuffer;
70+
import java.nio.charset.StandardCharsets;
71+
import java.util.Collection;
72+
import java.util.Iterator;
73+
import java.util.List;
74+
import java.util.Map;
75+
import java.util.Objects;
76+
import java.util.concurrent.CompletableFuture;
77+
import java.util.concurrent.CompletionException;
78+
import java.util.function.BiConsumer;
79+
import java.util.stream.Collectors;
8180

8281
/**
8382
* An abstract {@link MessageListener} adapter
@@ -736,13 +735,15 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
736735
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
737736
}
738737
catch (Throwable ex) {
739-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
740738
acknowledge(acknowledgment);
741739
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) {
742740
@SuppressWarnings("unchecked")
743741
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
744742
this.asyncRetryCallback.accept(record, (RuntimeException) ex);
745743
}
744+
else {
745+
this.logger.error(ex, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
746+
}
746747
}
747748
}
748749

0 commit comments

Comments
 (0)