Skip to content

Commit bd71fbe

Browse files
authored
[kafka] do not create spans for PartitionEOF events (open-telemetry#3445)
1 parent f9ae4c9 commit bd71fbe

File tree

6 files changed

+72
-5
lines changed

6 files changed

+72
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h
2121
- Do not use message creation context as a parent for consumer spans for `Confluent.Kafka`
2222
client instrumentation. See the [issue](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation/issues/3434)
2323
for details.
24+
- Do not create consumer spans related to `PartitionEOF` events
25+
for `Confluent.Kafka` client instrumentation.
2426

2527
#### Dependency updates
2628

src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/DuckTypes/IConsumeResult.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,7 @@ internal interface IConsumeResult
1313
public Offset Offset { get; set; }
1414

1515
public Partition Partition { get; set; }
16+
17+
// ReSharper disable once InconsistentNaming
18+
public bool IsPartitionEOF { get; set; }
1619
}

src/OpenTelemetry.AutoInstrumentation/Instrumentations/Kafka/Integrations/ConsumerConsumeSyncIntegration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTar
4646
consumeResult = response == null ? null : response.DuckAs<IConsumeResult>();
4747
}
4848

49-
if (consumeResult is not null)
49+
if (consumeResult is not null && !consumeResult.IsPartitionEOF)
5050
{
5151
var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!);
5252
if (exception is not null)

test/IntegrationTests/KafkaTests.cs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,40 @@ public void SubmitsTraces(string packageVersion)
7676
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt.");
7777

7878
collector.ExpectCollected(collection => ValidatePropagation(collection, topicName));
79-
8079
EnableBytecodeInstrumentation();
8180

8281
RunTestApplication(new TestSettings
8382
{
8483
PackageVersion = packageVersion,
85-
Arguments = topicName
84+
Arguments = $"--topic-name {topicName}"
8685
});
8786

8887
collector.AssertExpectations();
8988
}
9089

90+
[Theory]
91+
[Trait("Category", "EndToEnd")]
92+
[Trait("Containers", "Linux")]
93+
[MemberData(nameof(LibraryVersion.GetPlatformVersions), nameof(LibraryVersion.Kafka), MemberType = typeof(LibraryVersion))]
94+
// ReSharper disable once InconsistentNaming
95+
public void NoSpansForPartitionEOF(string packageVersion)
96+
{
97+
var topicName = $"test-topic2-{packageVersion}";
98+
99+
using var collector = new MockSpansCollector(Output);
100+
SetExporter(collector);
101+
102+
EnableBytecodeInstrumentation();
103+
104+
RunTestApplication(new TestSettings
105+
{
106+
PackageVersion = packageVersion,
107+
Arguments = $"--topic-name {topicName} --consume-only"
108+
});
109+
110+
collector.AssertEmpty(TimeSpan.FromSeconds(10));
111+
}
112+
91113
private static string GetConsumerGroupIdAttributeValue(string topicName)
92114
{
93115
return $"test-consumer-group-{topicName}";

test/test-applications/integrations/TestApplication.Kafka/Program.cs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,47 @@ internal static class Program
1313

1414
public static async Task<int> Main(string[] args)
1515
{
16-
var topicName = args[0];
16+
if (args.Length < 2)
17+
{
18+
throw new ArgumentException("Required parameters not provided.");
19+
}
20+
21+
var topicName = args[1];
22+
23+
if (args.Length == 3 && args[2] == "--consume-only")
24+
{
25+
return await ConsumeOnly(topicName);
26+
}
27+
28+
if (args.Length == 2)
29+
{
30+
return await ProduceAndConsume(topicName);
31+
}
32+
33+
throw new ArgumentException("Invalid parameters.");
34+
}
35+
36+
private static async Task<int> ConsumeOnly(string topicName)
37+
{
38+
await CreateTopic(BootstrapServers, topicName);
39+
40+
using var consumer = BuildConsumer(topicName, BootstrapServers);
41+
consumer.Subscribe(topicName);
42+
43+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
1744

45+
var consumeResult = consumer.Consume(cts.Token);
46+
if (consumeResult.IsPartitionEOF)
47+
{
48+
return 0;
49+
}
50+
51+
Console.WriteLine("Unexpected Consume result.");
52+
return 1;
53+
}
54+
55+
private static async Task<int> ProduceAndConsume(string topicName)
56+
{
1857
using var waitEvent = new ManualResetEventSlim();
1958

2059
using var producer = BuildProducer(BootstrapServers);
@@ -142,6 +181,7 @@ private static IConsumer<string, string> BuildConsumer(string topicName, string
142181
// https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
143182
AutoOffsetReset = AutoOffsetReset.Earliest,
144183
CancellationDelayMaxMs = 5000,
184+
EnablePartitionEof = true,
145185
EnableAutoCommit = true
146186
};
147187

test/test-applications/integrations/TestApplication.Kafka/Properties/launchSettings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"profiles": {
44
"instrumented": {
55
"commandName": "Project",
6-
"commandLineArgs": "test-topic",
6+
"commandLineArgs": "--topic-name test-topic",
77
"environmentVariables": {
88
"COR_ENABLE_PROFILING": "1",
99
"COR_PROFILER": "{918728DD-259F-4A6A-AC2B-B85E1B658318}",

0 commit comments

Comments
 (0)