Skip to content

Commit 59e3318

Browse files
authored
fix: Kafka initialization occasionally failed due to concurrent injection of OpenTelemetryMetricsReporter (to #12538) (#12583)
1 parent c8bd230 commit 59e3318

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,24 @@ public static class ConstructorMapAdvice {
4242
@Advice.OnMethodEnter(suppress = Throwable.class)
4343
public static void onEnter(
4444
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
45-
// ensure config is a mutable map
46-
if (config.getClass() != HashMap.class) {
47-
config = new HashMap<>(config);
48-
}
45+
46+
// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
47+
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
48+
// this advice block can be shared across multiple threads. Directly modifying
49+
// `config` could lead to unexpected item loss due to race conditions, where
50+
// some entries might be lost as different threads attempt to modify it
51+
// concurrently.
52+
//
53+
// To prevent such issues, a copy of the `config` should be created here before
54+
// any modifications are made. This ensures that each thread operates on its
55+
// own independent copy of the configuration, thereby eliminating the risk of
56+
// configurations corruption.
57+
//
58+
// More detailed information:
59+
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538
60+
61+
// ensure config is a mutable map and avoid concurrency conflicts
62+
config = new HashMap<>(config);
4963
enhanceConfig(config);
5064
}
5165
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,24 @@ public static class ConstructorMapAdvice {
4242
@Advice.OnMethodEnter(suppress = Throwable.class)
4343
public static void onEnter(
4444
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
45-
// ensure config is a mutable map
46-
if (config.getClass() != HashMap.class) {
47-
config = new HashMap<>(config);
48-
}
45+
46+
// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
47+
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
48+
// this advice block can be shared across multiple threads. Directly modifying
49+
// `config` could lead to unexpected item loss due to race conditions, where
50+
// some entries might be lost as different threads attempt to modify it
51+
// concurrently.
52+
//
53+
// To prevent such issues, a copy of the `config` should be created here before
54+
// any modifications are made. This ensures that each thread operates on its
55+
// own independent copy of the configuration, thereby eliminating the risk of
56+
// configurations corruption.
57+
//
58+
// More detailed information:
59+
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538
60+
61+
// ensure config is a mutable map and avoid concurrency conflicts
62+
config = new HashMap<>(config);
4963
enhanceConfig(config);
5064
}
5165
}

0 commit comments

Comments
 (0)