Skip to content

Commit 0400e13

Browse files
lailabougriaHEskandarimikeminutilloTim Bussmann
authored
Initial support for OpenTelemetry (#6410)
Co-Authored-By: Hadi Eskandari <[email protected]> Co-authored-by: Mike Minutillo <[email protected]> Co-authored-by: Laila Bougria <[email protected]> Co-authored-by: Tim Bussmann <[email protected]>
1 parent f7d93a4 commit 0400e13

File tree

86 files changed

+3969
-50
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+3969
-50
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry;
2+
3+
using System;
4+
using System.Collections.Immutable;
5+
using System.Diagnostics;
6+
using NUnit.Framework;
7+
8+
public static class ActivityTestingExtensions
9+
{
10+
public static void VerifyTag(this ImmutableDictionary<string, string> tags, string tagName, string expectedValue)
11+
{
12+
Assert.IsTrue(tags.TryGetValue(tagName, out var tagValue), $"Tags should contain key '{tagName}'");
13+
Assert.AreEqual(expectedValue, tagValue, $"Tag value with key '{tagName}' is incorrect");
14+
}
15+
16+
/// <summary>
17+
/// Checks tags for duplicate tag keys.
18+
/// </summary>
19+
public static void VerifyUniqueTags(this Activity activity)
20+
{
21+
var tagsList = activity.Tags.ToImmutableList();
22+
23+
if (tagsList.Count < 2)
24+
{
25+
return;
26+
}
27+
28+
var sortedTags = tagsList.Sort((a, b) => StringComparer.CurrentCultureIgnoreCase.Compare(a.Key, b.Key));
29+
30+
for (int i = 0; i < sortedTags.Count - 1; i++)
31+
{
32+
if (StringComparer.InvariantCultureIgnoreCase.Equals(sortedTags[i].Key, sortedTags[i + 1].Key))
33+
{
34+
Assert.Fail($"duplicate tag found: {sortedTags[i].Key}. Tags should be unique.");
35+
}
36+
}
37+
}
38+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry;
2+
3+
using NUnit.Framework;
4+
5+
[NonParallelizable] // Ensure only activities for the current test are captured
6+
public class OpenTelemetryAcceptanceTest : NServiceBusAcceptanceTest
7+
{
8+
protected TestingActivityListener NServicebusActivityListener { get; private set; }
9+
10+
[SetUp]
11+
public void Setup() => NServicebusActivityListener = TestingActivityListener.SetupDiagnosticListener("NServiceBus.Core");
12+
13+
[TearDown]
14+
public void Cleanup()
15+
{
16+
NServicebusActivityListener?.VerifyAllActivitiesCompleted();
17+
NServicebusActivityListener?.Dispose();
18+
}
19+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry
2+
{
3+
using System;
4+
using System.Collections.Concurrent;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.Linq;
8+
using NUnit.Framework;
9+
10+
public class TestingActivityListener : IDisposable
11+
{
12+
readonly ActivityListener activityListener;
13+
14+
public static TestingActivityListener SetupDiagnosticListener(string sourceName)
15+
{
16+
var testingListener = new TestingActivityListener(sourceName);
17+
18+
ActivitySource.AddActivityListener(testingListener.activityListener);
19+
return testingListener;
20+
}
21+
22+
TestingActivityListener(string sourceName = null)
23+
{
24+
activityListener = new ActivityListener
25+
{
26+
ShouldListenTo = source => string.IsNullOrEmpty(sourceName) || source.Name == sourceName,
27+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
28+
SampleUsingParentId = (ref ActivityCreationOptions<string> options) => ActivitySamplingResult.AllData
29+
};
30+
activityListener.ActivityStarted += activity =>
31+
{
32+
StartedActivities.Enqueue(activity);
33+
};
34+
activityListener.ActivityStopped += activity =>
35+
{
36+
CompletedActivities.Enqueue(activity);
37+
};
38+
}
39+
40+
public void Dispose() => activityListener?.Dispose();
41+
42+
public ConcurrentQueue<Activity> StartedActivities { get; } = new ConcurrentQueue<Activity>();
43+
public ConcurrentQueue<Activity> CompletedActivities { get; } = new ConcurrentQueue<Activity>();
44+
45+
public void VerifyAllActivitiesCompleted()
46+
{
47+
Assert.AreEqual(StartedActivities.Count, CompletedActivities.Count, "all started activities should be completed");
48+
}
49+
}
50+
51+
static class ActivityExtensions
52+
{
53+
public static List<Activity> GetReceiveMessageActivities(this ConcurrentQueue<Activity> activities, bool includeControlMessages = false)
54+
=> activities.Where(a => a.OperationName == "NServiceBus.Diagnostics.ReceiveMessage")
55+
.Where(a => includeControlMessages || !Convert.ToBoolean(a.GetTagItem("nservicebus.control_message")))
56+
.ToList();
57+
58+
public static List<Activity> GetSendMessageActivities(this ConcurrentQueue<Activity> activities) => activities.Where(a => a.OperationName == "NServiceBus.Diagnostics.SendMessage").ToList();
59+
public static List<Activity> GetPublishEventActivities(this ConcurrentQueue<Activity> activities) => activities.Where(a => a.OperationName == "NServiceBus.Diagnostics.PublishMessage").ToList();
60+
public static List<Activity> GetInvokedHandlerActivities(this ConcurrentQueue<Activity> activities) => activities.Where(a => a.OperationName == "NServiceBus.Diagnostics.InvokeHandler").ToList();
61+
}
62+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry
2+
{
3+
using System;
4+
using System.Collections.Concurrent;
5+
using System.Collections.Generic;
6+
using System.Diagnostics.Metrics;
7+
using System.Linq;
8+
using NUnit.Framework;
9+
10+
class TestingMetricListener : IDisposable
11+
{
12+
readonly MeterListener meterListener;
13+
14+
public TestingMetricListener(string sourceName)
15+
{
16+
meterListener = new()
17+
{
18+
InstrumentPublished = (instrument, listener) =>
19+
{
20+
if (instrument.Meter.Name == sourceName)
21+
{
22+
TestContext.WriteLine($"Subscribing to {instrument.Meter.Name}\\{instrument.Name}");
23+
listener.EnableMeasurementEvents(instrument);
24+
}
25+
}
26+
};
27+
28+
meterListener.SetMeasurementEventCallback((Instrument instrument,
29+
long measurement,
30+
ReadOnlySpan<KeyValuePair<string, object>> t,
31+
object _) =>
32+
{
33+
TestContext.WriteLine($"{instrument.Meter.Name}\\{instrument.Name}:{measurement}");
34+
35+
var tags = t.ToArray();
36+
ReportedMeters.AddOrUpdate(instrument.Name, measurement, (_, val) => val + measurement);
37+
Tags.AddOrUpdate(instrument.Name, _ => tags, (_, _) => tags);
38+
});
39+
meterListener.Start();
40+
}
41+
42+
public static TestingMetricListener SetupNServiceBusMetricsListener() =>
43+
SetupMetricsListener("NServiceBus.Core");
44+
45+
public static TestingMetricListener SetupMetricsListener(string sourceName)
46+
{
47+
var testingMetricListener = new TestingMetricListener(sourceName);
48+
return testingMetricListener;
49+
}
50+
51+
public void Dispose() => meterListener?.Dispose();
52+
53+
public ConcurrentDictionary<string, long> ReportedMeters { get; } = new();
54+
public ConcurrentDictionary<string, KeyValuePair<string, object>[]> Tags { get; } = new();
55+
56+
public void AssertMetric(string metricName, long expected)
57+
{
58+
if (expected == 0)
59+
{
60+
Assert.False(ReportedMeters.ContainsKey(metricName), $"Should not have '{metricName}' metric reported.");
61+
}
62+
else
63+
{
64+
Assert.True(ReportedMeters.ContainsKey(metricName), $"'{metricName}' metric was not reported.");
65+
Assert.AreEqual(expected, ReportedMeters[metricName]);
66+
}
67+
}
68+
69+
public object AssertTagKeyExists(string metricName, string tagKey)
70+
{
71+
if (!Tags.ContainsKey(metricName))
72+
{
73+
Assert.Fail($"'{metricName}' metric was not reported");
74+
}
75+
76+
var emptyTag = default(KeyValuePair<string, object>);
77+
var meterTag = Tags[metricName].FirstOrDefault(t => t.Key == tagKey);
78+
if (meterTag.Equals(emptyTag))
79+
{
80+
Assert.Fail($"'{tagKey}' tag was not found.");
81+
}
82+
83+
return meterTag.Value;
84+
}
85+
}
86+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry
2+
{
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using NServiceBus.AcceptanceTesting;
7+
using NServiceBus.AcceptanceTests.EndpointTemplates;
8+
using NUnit.Framework;
9+
10+
public class When_ambient_trace_in_message_session : OpenTelemetryAcceptanceTest
11+
{
12+
[Test]
13+
public async Task Should_attach_to_ambient_trace()
14+
{
15+
using var externalActivitySource = new ActivitySource("external trace source");
16+
using var _ = TestingActivityListener.SetupDiagnosticListener(externalActivitySource.Name); // need to have a registered listener for activities to be created
17+
18+
const string wrapperActivityTraceState = "test trace state";
19+
20+
var context = await Scenario.Define<Context>()
21+
.WithEndpoint<EndpointWithAmbientActivity>(b => b
22+
.When(async (s, ctx) =>
23+
{
24+
// Otherwise the activity is created with a hierarchical ID format on .NET Framework which resets the RootId once it's converted to W3C format in the send pipeline.
25+
var activityTraceContext = new ActivityContext(ActivityTraceId.CreateRandom(),
26+
ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded);
27+
using (var wrapperActivity = externalActivitySource.StartActivity("ambient span", ActivityKind.Server, activityTraceContext))
28+
{
29+
wrapperActivity.TraceStateString = wrapperActivityTraceState;
30+
ctx.WrapperActivityId = wrapperActivity.Id;
31+
ctx.WrapperActivityRootId = wrapperActivity.RootId;
32+
await s.SendLocal(new LocalMessage());
33+
}
34+
}))
35+
.Done(c => c.OutgoingMessageReceived)
36+
.Run();
37+
38+
var outgoingMessageActivity = NServicebusActivityListener.CompletedActivities.GetSendMessageActivities().Single();
39+
var incomingMessageActivity = NServicebusActivityListener.CompletedActivities.GetReceiveMessageActivities().Single();
40+
41+
Assert.AreEqual(context.WrapperActivityId, outgoingMessageActivity.ParentId, "outgoing message should be connected to the ambient span");
42+
Assert.AreEqual(context.WrapperActivityRootId, outgoingMessageActivity.RootId, "outgoing message should be connected to the ambient trace");
43+
Assert.AreEqual(wrapperActivityTraceState, outgoingMessageActivity.TraceStateString, "ambient trace state should be floated to outgoing message span");
44+
Assert.AreEqual(outgoingMessageActivity.Id, incomingMessageActivity.ParentId, "received message should be connected to send operation span");
45+
Assert.AreEqual(context.WrapperActivityRootId, incomingMessageActivity.RootId, "received message should be connected to the ambient trace");
46+
Assert.AreEqual(wrapperActivityTraceState, incomingMessageActivity.TraceStateString, "ambient trace state should be floated to incoming message span");
47+
}
48+
49+
class Context : ScenarioContext
50+
{
51+
public bool OutgoingMessageReceived { get; set; }
52+
public string WrapperActivityId { get; set; }
53+
public string WrapperActivityRootId { get; set; }
54+
}
55+
56+
class EndpointWithAmbientActivity : EndpointConfigurationBuilder
57+
{
58+
public EndpointWithAmbientActivity() => EndpointSetup<DefaultServer>();
59+
60+
public class MessageHandler : IHandleMessages<LocalMessage>
61+
{
62+
Context testContext;
63+
64+
public MessageHandler(Context testContext) => this.testContext = testContext;
65+
66+
public Task Handle(LocalMessage message, IMessageHandlerContext context)
67+
{
68+
testContext.OutgoingMessageReceived = true;
69+
return Task.CompletedTask;
70+
}
71+
}
72+
}
73+
74+
public class LocalMessage : IMessage
75+
{
76+
}
77+
}
78+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry
2+
{
3+
using System;
4+
using System.Diagnostics;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
using NServiceBus.AcceptanceTesting;
8+
using NServiceBus.AcceptanceTests.EndpointTemplates;
9+
using NUnit.Framework;
10+
11+
public class When_ambient_trace_in_pipeline : OpenTelemetryAcceptanceTest
12+
{
13+
static ActivitySource externalActivitySource = new(Guid.NewGuid().ToString());
14+
15+
[Test]
16+
public async Task Should_attach_to_ambient_trace()
17+
{
18+
using var _ = TestingActivityListener.SetupDiagnosticListener(externalActivitySource.Name); // need to have a registered listener for activities to be created
19+
20+
var context = await Scenario.Define<Context>()
21+
.WithEndpoint<EndpointWithAmbientActivity>(e => e
22+
.When(s => s.SendLocal(new TriggerMessage())))
23+
.Done(c => c.MessageReceived)
24+
.Run();
25+
26+
var handlerActivity = NServicebusActivityListener.CompletedActivities.GetInvokedHandlerActivities().First();
27+
var sendFromHandlerActivity = NServicebusActivityListener.CompletedActivities.GetSendMessageActivities().Last();
28+
Assert.AreEqual(context.AmbientActivityId, sendFromHandlerActivity.ParentId, "the outgoing message should be connected to the ambient span");
29+
Assert.AreEqual(context.AmbientActivityRootId, sendFromHandlerActivity.RootId, "outgoing and ambient activity should belong to same trace");
30+
Assert.AreEqual(ExpectedTraceState, sendFromHandlerActivity.TraceStateString, "outgoing activity should capture ambient trace state");
31+
Assert.AreEqual(handlerActivity.Id, context.AmbientActivityParentId, "the ambient activity should be connected to the handler span");
32+
Assert.AreEqual(handlerActivity.RootId, context.AmbientActivityRootId, "handler and ambient activity should belong to same trace");
33+
}
34+
35+
class Context : ScenarioContext
36+
{
37+
public bool MessageReceived { get; set; }
38+
public string AmbientActivityId { get; set; }
39+
public string AmbientActivityParentId { get; set; }
40+
public string AmbientActivityRootId { get; set; }
41+
public string AmbientActivityState { get; set; }
42+
}
43+
44+
class EndpointWithAmbientActivity : EndpointConfigurationBuilder
45+
{
46+
public EndpointWithAmbientActivity() => EndpointSetup<DefaultServer>();
47+
48+
class MessageHandler : IHandleMessages<TriggerMessage>, IHandleMessages<MessageFromAmbientTrace>
49+
{
50+
Context testContext;
51+
52+
public MessageHandler(Context testContext) => this.testContext = testContext;
53+
54+
public async Task Handle(TriggerMessage message, IMessageHandlerContext context)
55+
{
56+
using (var ambientActivity = externalActivitySource.StartActivity())
57+
{
58+
// set/modify trace state:
59+
ambientActivity.TraceStateString = ExpectedTraceState;
60+
61+
testContext.AmbientActivityId = ambientActivity.Id;
62+
testContext.AmbientActivityParentId = ambientActivity.ParentId;
63+
testContext.AmbientActivityRootId = ambientActivity.RootId;
64+
testContext.AmbientActivityState = ambientActivity.TraceStateString;
65+
await context.SendLocal(new MessageFromAmbientTrace());
66+
}
67+
}
68+
69+
public Task Handle(MessageFromAmbientTrace message, IMessageHandlerContext context)
70+
{
71+
testContext.MessageReceived = true;
72+
return Task.CompletedTask;
73+
}
74+
}
75+
}
76+
77+
public class TriggerMessage : IMessage
78+
{
79+
}
80+
81+
public class MessageFromAmbientTrace : IMessage
82+
{
83+
}
84+
85+
const string ExpectedTraceState = "trace state from ambient activity";
86+
}
87+
}

0 commit comments

Comments
 (0)