From d9f58a1792a8f0fb443bc01c1d526ac2d982ec76 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 06:13:01 +0200
Subject: [PATCH 01/17] feat(correlation): add open-telemetry as msg
correlation scope
---
docs/preview/02-getting-started.md | 2 +-
.../{01-service-bus.md => 01-service-bus.mdx} | 50 ++++++-
.../IServiceBusMessageCorrelationScope.cs | 18 +++
.../MessageTelemetryOptions.cs | 2 +-
.../AzureServiceBusMessagePump.cs | 37 ++++-
....ServiceBus.Telemetry.OpenTelemetry.csproj | 33 +++++
...ceBusMessageHandlerCollectionExtensions.cs | 33 +++++
...emetryServiceBusMessageCorrelationScope.cs | 97 ++++++++++++
.../Arcus.Messaging.Tests.Integration.csproj | 5 +
.../MessagePump/Fixture/AssertX.cs | 26 ++++
.../ServiceBusMessagePump.TelemetryTests.cs | 139 +++++++++++++++++-
...toTrackingAzureServiceBusMessageHandler.cs | 16 ++
src/Arcus.Messaging.sln | 7 +
13 files changed, 447 insertions(+), 18 deletions(-)
rename docs/preview/03-Features/01-Azure/{01-service-bus.md => 01-service-bus.mdx} (87%)
create mode 100644 src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
create mode 100644 src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry.csproj
create mode 100644 src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs
create mode 100644 src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
diff --git a/docs/preview/02-getting-started.md b/docs/preview/02-getting-started.md
index e942b8d6..5ea8c0f3 100644
--- a/docs/preview/02-getting-started.md
+++ b/docs/preview/02-getting-started.md
@@ -80,4 +80,4 @@ Host.CreateDefaultBuilder()
The way *'message handlers'* are registered determines when the received message will be routed to them.
-> 🔗 See the [Azure Service Bus messaging feature documentation](./03-Features/01-Azure/01-service-bus.md) for more information on providing additional routing filters to your message handlers.
\ No newline at end of file
+> 🔗 See the [Azure Service Bus messaging feature documentation](./03-Features/01-Azure/01-service-bus.mdx) for more information on providing additional routing filters to your message handlers.
\ No newline at end of file
diff --git a/docs/preview/03-Features/01-Azure/01-service-bus.md b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
similarity index 87%
rename from docs/preview/03-Features/01-Azure/01-service-bus.md
rename to docs/preview/03-Features/01-Azure/01-service-bus.mdx
index 62eb56dd..24cbf9d1 100644
--- a/docs/preview/03-Features/01-Azure/01-service-bus.md
+++ b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
@@ -2,6 +2,9 @@
sidebar_label: Service Bus
---
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
# Azure Service Bus messaging
The `Arcus.Messaging.Pumps.ServiceBus` library provides a way to process Azure Service Bus messages on queues/topic subscriptions via custom routed *message handlers*, instead of interacting with the [`ServiceBusReceiver`](https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusreceiver) yourself.
@@ -138,12 +141,52 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
// this job instance in a multi-instance deployment (default: generated GUID).
options.JobId = Guid.NewGuid().ToString();
+ // The name for the request operation which will be used in the chosen message correlation system.
+ // Default: Process
+ options.Telemetry.OperationName = "ReceiveOrder";
+
// Indicate whether or not the default built-in JSON deserialization should ignore additional members
// when deserializing the incoming message (default: AdditionalMemberHandling.Error).
options.Routing.Deserialization.AdditionalMembers = AdditionalMemberHandling.Ignore;
});
```
+#### Correlation system
+The following correlation systems are available when registering the message pump. These systems will use the incoming Azure Service Bus message to start a request operation. A `MessageCorrelationInfo` model is passed to your registered message handlers, which represents the current request operation. All interactions to dependent systems should be children of this operation for a transactional service-to-service relationship.
+
+
+
+
+```powershell
+PS> Install-Package -Name Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
+```
+
+Make sure to include the following line to your message pump registration:
+```diff
++ ActivitySource applicationSource = new("");
+
+services.AddServiceBusTopicMessagePump(...)
++ .UseServiceBusOpenTelemetryRequestTracking(applicationSource)
+ .WithServiceBusMessageHandler<...>()
+ .WithServiceBusMessageHandler<...>();
+```
+
+Now Arcus will use the OpenTelemetry approach to track Azure Service Bus messages. Make sure that the `ActivitySource` that is passed, is also tracked by OpenTelemetry:
+```csharp
+IServiceCollection services = ...
+
+services.AddOpenTelemetry()
+ .AddTraces(traces =>
+ {
+ traces.AddSource("");
+ });
+```
+
+> 🔗 [More info on OpenTelemetry on Azure](https://learn.microsoft.com/en-us/azure/azure-monitor/app/opentelemetry)
+
+
+
+
### Message handler routing customization
The following routing options are available when registering an Azure Service Bus message handler on a message pump.
@@ -265,7 +308,9 @@ Both the recovery period after the circuit is open and the interval between mess
}
```
-#### 🔔 Get notified on a circuit breaker state transition
+
+**🔔 Get notified on a circuit breaker state transition**
+
To get notified on circuit-breaker state transitions, you can register one or more event handlers on a message pump.
These event handlers should implement the `ICircuitBreakerEventHandler` interface:
@@ -293,4 +338,5 @@ using Microsoft.Extensions.DependencyInjection;
services.AddServiceBus[Queue/Topic]MessagePump(...)
.WithCircuitBreakerStateChangedEventHandler()
.WithCircuitBreakerStateChangedEventHandler();
-```
\ No newline at end of file
+```
+
\ No newline at end of file
diff --git a/src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs b/src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
new file mode 100644
index 00000000..2e96e3ae
--- /dev/null
+++ b/src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
@@ -0,0 +1,18 @@
+using Arcus.Messaging.Abstractions.MessageHandling;
+using Arcus.Messaging.Abstractions.Telemetry;
+
+namespace Arcus.Messaging.Abstractions.ServiceBus.Telemetry
+{
+ ///
+ /// Represents an approach to track the correlation information of a received Azure Service Bus message within a message pump.
+ ///
+ public interface IServiceBusMessageCorrelationScope
+ {
+ ///
+ /// Starts a new Azure Service bus request operation on the telemetry system.
+ ///
+ /// The message context for the currently received Azure Service bus message.
+ /// The user-configurable options to manipulate the telemetry.
+ MessageOperationResult StartOperation(AzureServiceBusMessageContext messageContext, MessageTelemetryOptions options);
+ }
+}
diff --git a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
index e8fd2688..851cc68b 100644
--- a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
+++ b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
@@ -7,7 +7,7 @@ namespace Arcus.Messaging.Abstractions.MessageHandling
///
public class MessageTelemetryOptions
{
- private string _operationName;
+ private string _operationName = "Process";
///
/// Gets or sets the name of the operation that is used when a request telemetry is tracked - default 'Process' is used as operation name.
diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs
index b5290193..159b4c0f 100644
--- a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs
+++ b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs
@@ -7,6 +7,8 @@
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
+using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
+using Arcus.Messaging.Abstractions.Telemetry;
using Arcus.Messaging.Pumps.Abstractions;
using Arcus.Messaging.Pumps.ServiceBus.Configuration;
using Azure.Messaging.ServiceBus;
@@ -419,10 +421,11 @@ private async Task ProcessMessageAsync(ServiceBusReceiv
Logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure Service Bus {EntityType} message pump '{JobId}'", message.MessageId, Settings.ServiceBusEntity, JobId);
}
- using MessageCorrelationResult correlationResult = DetermineMessageCorrelation(message);
var messageContext = AzureServiceBusMessageContext.Create(JobId, Settings.ServiceBusEntity, _messageReceiver, message);
+ using MessageOperationResult correlationResult = DetermineMessageCorrelation(message, messageContext);
- MessageProcessingResult routingResult = await _messageRouter.RouteMessageAsync(_messageReceiver, message, messageContext, correlationResult.CorrelationInfo, cancellationToken);
+ MessageProcessingResult routingResult = await _messageRouter.RouteMessageAsync(_messageReceiver, message, messageContext, correlationResult.Correlation, cancellationToken);
+ correlationResult.IsSuccessful = routingResult.IsSuccessful;
if (routingResult.IsSuccessful && Settings.Options.AutoComplete)
{
@@ -445,13 +448,35 @@ private async Task ProcessMessageAsync(ServiceBusReceiv
return routingResult;
}
- private MessageCorrelationResult DetermineMessageCorrelation(ServiceBusReceivedMessage message)
+ private MessageOperationResult DetermineMessageCorrelation(ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext)
{
- (string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
- var client = ServiceProvider.GetRequiredService();
+ var correlationScope = ServiceProvider.GetService();
+ if (correlationScope is null)
+ {
+ (string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
+ var client = ServiceProvider.GetRequiredService();
+
+ var deprecatedResult = MessageCorrelationResult.Create(client, transactionId, operationParentId);
+ return new W3CAdapterMessageOperationResult(deprecatedResult);
+ }
- return MessageCorrelationResult.Create(client, transactionId, operationParentId);
+ return correlationScope.StartOperation(messageContext, Options.Telemetry);
+ }
+
+ [Obsolete("Will be removed once " + nameof(MessageCorrelationResult) + " is removed")]
+ private sealed class W3CAdapterMessageOperationResult : MessageOperationResult
+ {
+ private readonly MessageCorrelationResult _deprecatedResult;
+ internal W3CAdapterMessageOperationResult(MessageCorrelationResult deprecatedResult) : base(deprecatedResult.CorrelationInfo)
+ {
+ _deprecatedResult = deprecatedResult;
+ }
+
+ protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
+ {
+ _deprecatedResult.Dispose();
+ }
}
private static async Task UntilCancelledAsync(CancellationToken cancellationToken)
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry.csproj b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry.csproj
new file mode 100644
index 00000000..8181a23e
--- /dev/null
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry.csproj
@@ -0,0 +1,33 @@
+
+
+
+ net8.0
+ latest
+ Arcus
+ Arcus
+ Arcus.Messaging
+ Provides capability to track message correlation information using OpenTelemetry for Azure Service Bus message pumps
+ Copyright (c) Arcus
+ https://messaging.arcus-azure.net/
+ https://github.com/arcus-azure/arcus.messaging
+ LICENSE
+ icon.png
+ README.md
+ Git
+ Azure;Messaging;ServiceBus
+ true
+ true
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs
new file mode 100644
index 00000000..c8289118
--- /dev/null
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs
@@ -0,0 +1,33 @@
+using System;
+using System.Diagnostics;
+using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
+using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
+using Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection
+{
+ ///
+ /// Extensions on the to register OpenTelemetry services for Azure Service Bus message pumps.
+ ///
+ public static class ServiceBusMessageHandlerCollectionExtensions
+ {
+ ///
+ /// Register OpenTelemetry as the correlation system to track Azure Service Bus message requests within the message pump.
+ ///
+ /// The collection of Azure Service Bus message handler collection.
+ /// The activity source to start instances from upon receiving Azure Service Bus messages.
+ /// Thrown when the or the is null.
+ public static ServiceBusMessageHandlerCollection UseServiceBusOpenTelemetryRequestTracking(
+ this ServiceBusMessageHandlerCollection handlers,
+ ActivitySource activitySource)
+ {
+ ArgumentNullException.ThrowIfNull(handlers);
+ ArgumentNullException.ThrowIfNull(activitySource);
+
+ handlers.Services.TryAddSingleton(new OpenTelemetryServiceBusMessageCorrelationScope(activitySource));
+ return handlers;
+ }
+ }
+}
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
new file mode 100644
index 00000000..57c36283
--- /dev/null
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
@@ -0,0 +1,97 @@
+using System;
+using System.Diagnostics;
+using Arcus.Messaging.Abstractions;
+using Arcus.Messaging.Abstractions.MessageHandling;
+using Arcus.Messaging.Abstractions.ServiceBus;
+using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
+using Arcus.Messaging.Abstractions.Telemetry;
+
+namespace Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
+{
+ ///
+ /// Represents the OpenTelemetry implementation of the
+ /// to track the correlation information of a received Azure Service Bus message within a message pump.
+ ///
+ internal class OpenTelemetryServiceBusMessageCorrelationScope : IServiceBusMessageCorrelationScope
+ {
+ private readonly ActivitySource _activitySource;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ internal OpenTelemetryServiceBusMessageCorrelationScope(ActivitySource activitySource)
+ {
+ ArgumentNullException.ThrowIfNull(activitySource);
+ _activitySource = activitySource;
+ }
+
+ ///
+ /// Starts a new Azure Service bus request operation on the telemetry system.
+ ///
+ /// The message context for the currently received Azure Service bus message.
+ /// The user-configurable options to manipulate the telemetry.
+ public MessageOperationResult StartOperation(AzureServiceBusMessageContext messageContext, MessageTelemetryOptions options)
+ {
+ ArgumentNullException.ThrowIfNull(messageContext);
+ ArgumentNullException.ThrowIfNull(options);
+
+ (string transactionId, string operationParentId) = messageContext.Properties.GetTraceParent();
+
+ ActivityContext context = new(
+ ActivityTraceId.CreateFromString(transactionId),
+ ActivitySpanId.CreateFromString(operationParentId),
+ ActivityTraceFlags.None);
+
+ Activity activity = _activitySource.CreateActivity(
+ name: options.OperationName,
+ kind: ActivityKind.Server,
+ context);
+
+ activity?.Start();
+ if (activity is null)
+ {
+ return new UnlinkedMessageOperationResult(transactionId, operationParentId);
+ }
+
+ activity.SetTag("az.namespace", "Microsoft.ServiceBus");
+ activity.SetTag("messaging.system", "servicebus");
+ activity.SetTag("messaging.operation", "receive");
+
+ activity.SetTag("ServiceBus-Endpoint", messageContext.FullyQualifiedNamespace);
+ activity.SetTag("ServiceBus-Entity", messageContext.EntityPath);
+ activity.SetTag("ServiceBus-EntityType", messageContext.EntityType.ToString());
+
+ return new OpenTelemetryMessageOperationResult(activity);
+ }
+
+ private sealed class OpenTelemetryMessageOperationResult : MessageOperationResult
+ {
+ private readonly Activity _activity;
+
+ internal OpenTelemetryMessageOperationResult(Activity activity)
+ : base(new MessageCorrelationInfo(activity.TraceId.ToString(), activity.SpanId.ToString(), activity.ParentSpanId.ToString()))
+ {
+ _activity = activity;
+ }
+
+ protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
+ {
+ _activity.SetStatus(isSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
+ _activity.SetEndTime(_activity.StartTimeUtc.Add(duration));
+ _activity.Dispose();
+ }
+ }
+
+ private sealed class UnlinkedMessageOperationResult : MessageOperationResult
+ {
+ internal UnlinkedMessageOperationResult(string transactionId, string operationParentId)
+ : base(new MessageCorrelationInfo(Guid.NewGuid().ToString(), transactionId, operationParentId))
+ {
+ }
+
+ protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
+ {
+ }
+ }
+ }
+}
diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
index 19085f9a..7e42cdc2 100644
--- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
+++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
@@ -14,6 +14,7 @@
+
@@ -22,6 +23,9 @@
+
+
+
@@ -31,6 +35,7 @@
+
diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs
index 3b8a7b6d..f8a15986 100644
--- a/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs
+++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/Fixture/AssertX.cs
@@ -1,9 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading.Tasks;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.DataContracts;
using Xunit;
+using Xunit.Sdk;
namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
{
@@ -12,6 +14,30 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
///
public static class AssertX
{
+ public static T Any(IEnumerable collection, Action action)
+ {
+ Stack<(int index, object item, Exception exception)> failures = new();
+ T[] array = collection.ToArray();
+
+ for (int index = 0; index < array.Length; ++index)
+ {
+ T item = array[index];
+ try
+ {
+ action(item);
+ return item;
+ }
+ catch (Exception ex)
+ {
+ failures.Push((index, item, ex));
+ }
+ }
+
+ throw new XunitException(
+ $"None of the {array.Length} item(s) matches against the given action: {Environment.NewLine}" +
+ $"{string.Join(Environment.NewLine, failures.Select(f => $"- [{f.index}] {f.item}: {f.exception}"))}");
+ }
+
public static RequestTelemetry GetRequestFrom(
IEnumerable telemetries,
Predicate filter)
diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
index 582dc721..d84cc061 100644
--- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
+++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
@@ -1,16 +1,16 @@
using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Diagnostics;
using System.Linq;
+using System.Text;
+using System.Text.Json;
using System.Threading.Tasks;
-using Arcus.Messaging.Abstractions.MessageHandling;
-using Arcus.Messaging.Pumps.ServiceBus;
-using Arcus.Messaging.Tests.Core.Events.v1;
using Arcus.Messaging.Tests.Core.Generators;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Integration.Fixture;
using Arcus.Messaging.Tests.Integration.Fixture.Logging;
using Arcus.Messaging.Tests.Integration.MessagePump.Fixture;
-using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus;
-using Arcus.Messaging.Tests.Workers.MessageHandlers;
using Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers;
using Arcus.Testing;
using Azure.Identity;
@@ -20,12 +20,11 @@
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using OpenTelemetry.Trace;
using Serilog;
-using Serilog.Events;
using Serilog.Sinks.ApplicationInsights.TelemetryConverters;
using Xunit;
-using static Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus.DiskMessageEventConsumer;
-using static Arcus.Observability.Telemetry.Core.ContextProperties.Correlation;
+using Xunit.Sdk;
using static Arcus.Observability.Telemetry.Core.ContextProperties.RequestTracking.ServiceBus;
using static Microsoft.Extensions.Logging.ServiceBusEntityType;
@@ -33,6 +32,130 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump
{
public partial class ServiceBusMessagePumpTests
{
+ private const string DefaultSqlTable = "master";
+
+ private string CustomOperationName { get; } = $"operation-{Guid.NewGuid()}";
+ private bool IsSuccessful { get; } = Bogus.Random.Bool();
+
+ [Fact]
+ public async Task ServiceBusMessagePump_WithW3CCorrelationFormatNewParentViaOpenTelemetry_AutomaticallyTracksMicrosoftDependencies()
+ {
+ // Arrange
+ var options = new WorkerOptions();
+ using ActivitySource source = CreateActivitySource();
+
+ options.AddServiceBusQueueMessagePump(QueueName, HostName, new DefaultAzureCredential(), pump =>
+ {
+ pump.Telemetry.OperationName = CustomOperationName;
+
+ }).UseServiceBusOpenTelemetryRequestTracking(source)
+ .WithServiceBusMessageHandler(CreateAutoTrackingMessageHandler);
+
+ var activities = new Collection();
+ options.AddOpenTelemetry()
+ .WithTracing(traces =>
+ {
+ traces.AddSource(source.Name);
+ traces.AddInMemoryExporter(activities);
+ traces.AddSqlClientInstrumentation();
+ traces.SetSampler(new AlwaysOnSampler());
+ });
+
+ ServiceBusMessage message = CreateOrderServiceBusMessage();
+
+ // Act
+ await TestServiceBusMessageHandlingAsync(options, Queue, message, async () =>
+ {
+ Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName);
+ Activity sqlDependency = await GetDependencyActivityAsync(activities, DefaultSqlTable, a => a.ParentId == serviceBusRequest.Id);
+
+ Assert.Equal(serviceBusRequest, sqlDependency.Parent);
+ });
+ }
+
+ private static ServiceBusMessage CreateOrderServiceBusMessage()
+ {
+ return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(OrderGenerator.Generate())));
+ }
+
+ [Fact]
+ public async Task ServiceBusMessagePump_WithW3CCorrelationFormatViaOpenTelemetry_AutomaticallyTracksMicrosoftDependencies()
+ {
+ // Arrange
+ var options = new WorkerOptions();
+ using ActivitySource source = CreateActivitySource();
+
+ options.AddServiceBusQueueMessagePump(QueueName, HostName, new DefaultAzureCredential(), pump =>
+ {
+ pump.Telemetry.OperationName = CustomOperationName;
+
+ }).WithServiceBusMessageHandler(CreateAutoTrackingMessageHandler)
+ .UseServiceBusOpenTelemetryRequestTracking(source);
+
+ var activities = new Collection();
+ options.AddOpenTelemetry()
+ .WithTracing(traces =>
+ {
+ traces.AddSource(source.Name);
+ traces.AddInMemoryExporter(activities);
+ traces.AddSqlClientInstrumentation();
+ traces.SetSampler(new AlwaysOnSampler());
+ });
+
+ ServiceBusMessage message = CreateOrderServiceBusMessageForW3C();
+
+ // Act / Assert
+ await TestServiceBusMessageHandlingAsync(options, Queue, message, async () =>
+ {
+ (string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
+
+ Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName, a => a.TraceId.ToString() == transactionId && a.ParentSpanId.ToString() == operationParentId);
+ Activity sqlDependency = await GetDependencyActivityAsync(activities, DefaultSqlTable, a => a.TraceId.ToString() == transactionId && a.ParentId == serviceBusRequest.Id);
+
+ Assert.Equal(serviceBusRequest, sqlDependency.Parent);
+ });
+ }
+
+ private OrderWithAutoTrackingAzureServiceBusMessageHandler CreateAutoTrackingMessageHandler(IServiceProvider provider)
+ {
+ return new OrderWithAutoTrackingAzureServiceBusMessageHandler(
+ IsSuccessful,
+ provider.GetRequiredService>());
+ }
+
+ private static ActivitySource CreateActivitySource()
+ {
+ return new ActivitySource("Arcus.Messaging.Tests.Integration");
+ }
+
+ private async Task GetQueueRequestActivityAsync(IReadOnlyCollection activities, string operationName, Func filter = null)
+ {
+ return await Poll.Target(() =>
+ {
+ Assert.NotEmpty(activities);
+ return AssertX.Any(activities.Where(a => a.OperationName == operationName), request =>
+ {
+ Assert.True(IsSuccessful == request.Status is ActivityStatusCode.Ok, $"request for operation '{operationName}' did not match the expected status, expected '{(IsSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error)}' but got '{request.Status}'");
+ Assert.Contains(request.Tags, tag => tag is { Key: "ServiceBus-EntityType", Value: "Queue" });
+ Assert.True(filter is null || filter(request), $"request for operation '{operationName}' did not match the given custom filter assertion, please check whether the OpenTelemetry correlation system did add all the necessary properties");
+ });
+
+ }).FailWith("cannot find request telemetry in spied-upon OpenTelemetry activities");
+ }
+
+ private static async Task GetDependencyActivityAsync(IReadOnlyCollection activities, string operationName, Func filter = null)
+ {
+ return await Poll.Target(() =>
+ {
+ Assert.NotEmpty(activities);
+ return AssertX.Any(activities.Where(a => a.OperationName == operationName), dependency =>
+ {
+ Assert.True(filter is null || filter(dependency), $"dependency for operation '{operationName}' did not match the given custom filter assertion, please check whether the OpenTelemetry correlation system did add all the necessary properties");
+ });
+
+ }).FailWith("cannot find dependency telemetry in spied-upon OpenTelemetry activities");
+ }
+
[Fact]
public async Task ServiceBusMessagePump_WithW3CCorrelationFormat_AutomaticallyTracksMicrosoftDependencies()
{
diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
index 97d4ef60..167b8020 100644
--- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
+++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
@@ -13,6 +13,7 @@ namespace Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers
{
public class OrderWithAutoTrackingAzureServiceBusMessageHandler : IAzureServiceBusMessageHandler
{
+ private readonly bool _isSuccessful;
private readonly ILogger _logger;
///
@@ -23,6 +24,15 @@ public OrderWithAutoTrackingAzureServiceBusMessageHandler(ILogger
+ /// Initializes a new instance of the class.
+ ///
+ public OrderWithAutoTrackingAzureServiceBusMessageHandler(bool isSuccessful, ILogger logger)
+ : this(logger)
+ {
+ _isSuccessful = isSuccessful;
+ }
+
public Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
@@ -32,6 +42,12 @@ public Task ProcessMessageAsync(
_logger.LogAzureKeyVaultDependency("https://my-vault.azure.net", "Sql-connection-string", isSuccessful: true, DateTimeOffset.UtcNow, TimeSpan.FromSeconds(5));
SimulateSqlQueryWithMicrosoftTracking();
+ if (!_isSuccessful)
+ {
+ throw new InvalidOperationException(
+ "[Test] Sabotage this message processing to let the message correlation system pick up an 'unsuccessful request'");
+ }
+
return Task.CompletedTask;
}
diff --git a/src/Arcus.Messaging.sln b/src/Arcus.Messaging.sln
index 23cb83d6..4bba2504 100644
--- a/src/Arcus.Messaging.sln
+++ b/src/Arcus.Messaging.sln
@@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.editorconfig = .editorconfig
EndProjectSection
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry", "Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry\Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry.csproj", "{4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -78,6 +80,10 @@ Global
{F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -90,6 +96,7 @@ Global
{55DE6D12-4C54-4570-BDFA-00B0FFDE5AB6} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13}
{864C12DF-DE3D-421F-8687-EC3918FFB8BE} = {2CD090E7-7306-49A0-9680-6ED78CFECAE1}
{F1B24FD4-099E-489D-B45D-A1A992CA5C3A} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13}
+ {4C2DCD91-2D47-493C-82FC-03A76CFA2CB1} = {2CD090E7-7306-49A0-9680-6ED78CFECAE1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {066FD85A-3DDE-4615-B550-BF67ACCDAA51}
From 01bf8aaad3c052724efa6fce8a3b8d72ce7af1cd Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 06:47:43 +0200
Subject: [PATCH 02/17] fix(test): use correct auto-tracking 'is successful' +
strictken activity assertion
---
...erviceBusMessageHandlerCollectionExtensions.cs | 8 +++++++-
...nTelemetryServiceBusMessageCorrelationScope.cs | 15 ++++++++++++---
.../ServiceBusMessagePump.TelemetryTests.cs | 12 ++++++++----
...thAutoTrackingAzureServiceBusMessageHandler.cs | 5 ++---
4 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs
index c8289118..5f76f005 100644
--- a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/Extensions/ServiceBusMessageHandlerCollectionExtensions.cs
@@ -4,6 +4,7 @@
using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
using Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry;
using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Logging;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
@@ -26,7 +27,12 @@ public static ServiceBusMessageHandlerCollection UseServiceBusOpenTelemetryReque
ArgumentNullException.ThrowIfNull(handlers);
ArgumentNullException.ThrowIfNull(activitySource);
- handlers.Services.TryAddSingleton(new OpenTelemetryServiceBusMessageCorrelationScope(activitySource));
+ handlers.Services.TryAddSingleton(serviceProvider =>
+ {
+ var logger = serviceProvider.GetService>();
+ return new OpenTelemetryServiceBusMessageCorrelationScope(activitySource, logger);
+ });
+
return handlers;
}
}
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
index 57c36283..5ef48a31 100644
--- a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
@@ -5,6 +5,8 @@
using Arcus.Messaging.Abstractions.ServiceBus;
using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
using Arcus.Messaging.Abstractions.Telemetry;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
namespace Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
{
@@ -15,14 +17,16 @@ namespace Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
internal class OpenTelemetryServiceBusMessageCorrelationScope : IServiceBusMessageCorrelationScope
{
private readonly ActivitySource _activitySource;
+ private readonly ILogger _logger;
///
/// Initializes a new instance of the class.
///
- internal OpenTelemetryServiceBusMessageCorrelationScope(ActivitySource activitySource)
+ internal OpenTelemetryServiceBusMessageCorrelationScope(ActivitySource activitySource, ILogger logger)
{
ArgumentNullException.ThrowIfNull(activitySource);
_activitySource = activitySource;
+ _logger = logger ?? NullLogger.Instance;
}
///
@@ -35,6 +39,7 @@ public MessageOperationResult StartOperation(AzureServiceBusMessageContext messa
ArgumentNullException.ThrowIfNull(messageContext);
ArgumentNullException.ThrowIfNull(options);
+ _logger.LogTrace("Start Azure Service Bus request '{OperationName}' operation...", options.OperationName);
(string transactionId, string operationParentId) = messageContext.Properties.GetTraceParent();
ActivityContext context = new(
@@ -61,21 +66,25 @@ public MessageOperationResult StartOperation(AzureServiceBusMessageContext messa
activity.SetTag("ServiceBus-Entity", messageContext.EntityPath);
activity.SetTag("ServiceBus-EntityType", messageContext.EntityType.ToString());
- return new OpenTelemetryMessageOperationResult(activity);
+ return new OpenTelemetryMessageOperationResult(activity, _logger);
}
private sealed class OpenTelemetryMessageOperationResult : MessageOperationResult
{
private readonly Activity _activity;
+ private readonly ILogger _logger;
- internal OpenTelemetryMessageOperationResult(Activity activity)
+ internal OpenTelemetryMessageOperationResult(Activity activity, ILogger logger)
: base(new MessageCorrelationInfo(activity.TraceId.ToString(), activity.SpanId.ToString(), activity.ParentSpanId.ToString()))
{
_activity = activity;
+ _logger = logger;
}
protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
+ _logger.LogTrace("Stop Azure Service Bus request '{OperationName}' operation (isSuccessful={IsSuccessful}", _activity.OperationName, isSuccessful);
+
_activity.SetStatus(isSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
_activity.SetEndTime(_activity.StartTimeUtc.Add(duration));
_activity.Dispose();
diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
index d84cc061..99067744 100644
--- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
+++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
@@ -132,8 +132,10 @@ private async Task GetQueueRequestActivityAsync(IReadOnlyCollection(() =>
{
- Assert.NotEmpty(activities);
- return AssertX.Any(activities.Where(a => a.OperationName == operationName), request =>
+ var requestDependencies = activities.Where(a => a.OperationName == operationName).ToArray();
+ Assert.NotEmpty(requestDependencies);
+
+ return AssertX.Any(requestDependencies, request =>
{
Assert.True(IsSuccessful == request.Status is ActivityStatusCode.Ok, $"request for operation '{operationName}' did not match the expected status, expected '{(IsSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error)}' but got '{request.Status}'");
Assert.Contains(request.Tags, tag => tag is { Key: "ServiceBus-EntityType", Value: "Queue" });
@@ -147,8 +149,10 @@ private static async Task GetDependencyActivityAsync(IReadOnlyCollecti
{
return await Poll.Target(() =>
{
- Assert.NotEmpty(activities);
- return AssertX.Any(activities.Where(a => a.OperationName == operationName), dependency =>
+ var dependencyActivities = activities.Where(a => a.OperationName == operationName).ToArray();
+ Assert.NotEmpty(dependencyActivities);
+
+ return AssertX.Any(dependencyActivities, dependency =>
{
Assert.True(filter is null || filter(dependency), $"dependency for operation '{operationName}' did not match the given custom filter assertion, please check whether the OpenTelemetry correlation system did add all the necessary properties");
});
diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
index 167b8020..dd5c46ea 100644
--- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
+++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
@@ -1,5 +1,4 @@
using System;
-using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
@@ -20,16 +19,16 @@ public class OrderWithAutoTrackingAzureServiceBusMessageHandler : IAzureServiceB
/// Initializes a new instance of the class.
///
public OrderWithAutoTrackingAzureServiceBusMessageHandler(ILogger logger)
+ : this(isSuccessful: true, logger)
{
- _logger = logger;
}
///
/// Initializes a new instance of the class.
///
public OrderWithAutoTrackingAzureServiceBusMessageHandler(bool isSuccessful, ILogger logger)
- : this(logger)
{
+ _logger = logger;
_isSuccessful = isSuccessful;
}
From 183c50a95c38cedf0bdb5bb650490a10a589cdb6 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 06:59:22 +0200
Subject: [PATCH 03/17] chore(test): provide better test assertion failure
messages + use bugfix of testing.core
---
.../Arcus.Messaging.Tests.Integration.csproj | 2 +-
.../MessagePump/ServiceBusMessagePump.TelemetryTests.cs | 8 ++++++--
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
index 7e42cdc2..e9019cb4 100644
--- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
+++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
@@ -12,7 +12,7 @@
-
+
diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
index 99067744..0e83f870 100644
--- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
+++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
@@ -133,7 +133,9 @@ private async Task GetQueueRequestActivityAsync(IReadOnlyCollection(() =>
{
var requestDependencies = activities.Where(a => a.OperationName == operationName).ToArray();
- Assert.NotEmpty(requestDependencies);
+ Assert.True(requestDependencies.Length > 0,
+ $"no request activities found with operation name '{operationName}' in" +
+ $"[{string.Join(", ", activities.Select(a => a.OperationName))}]");
return AssertX.Any(requestDependencies, request =>
{
@@ -150,7 +152,9 @@ private static async Task GetDependencyActivityAsync(IReadOnlyCollecti
return await Poll.Target(() =>
{
var dependencyActivities = activities.Where(a => a.OperationName == operationName).ToArray();
- Assert.NotEmpty(dependencyActivities);
+ Assert.True(dependencyActivities.Length > 0,
+ $"no dependency activities found with operation name '{operationName}' in " +
+ $"[{string.Join(", ", activities.Select(a => a.OperationName))}]");
return AssertX.Any(dependencyActivities, dependency =>
{
From 6e5e66e3d4f7be33d2c5c69fd85c6c99cab12608 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 07:12:38 +0200
Subject: [PATCH 04/17] chore(deps): upgrade sql client package
---
.../Arcus.Messaging.Tests.Integration.csproj | 1 +
.../Arcus.Messaging.Tests.Workers.ServiceBus.csproj | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
index e9019cb4..95bf707a 100644
--- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
+++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
@@ -19,6 +19,7 @@
+
diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj b/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj
index 7a366f7e..5f22961b 100644
--- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj
+++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj
@@ -10,7 +10,7 @@
-
+
From 7293eee24810d2a38204e7dc738837b78e8f0dac Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 07:29:19 +0200
Subject: [PATCH 05/17] chore(deps): use stable open-telemetry http exporter
---
.../Arcus.Messaging.Tests.Integration.csproj | 1 +
.../ServiceBusMessagePump.TelemetryTests.cs | 16 +++++++++-------
...hAutoTrackingAzureServiceBusMessageHandler.cs | 13 ++++++++++---
3 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
index 95bf707a..1a8cc305 100644
--- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
+++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj
@@ -26,6 +26,7 @@
+
diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
index 0e83f870..f92f4098 100644
--- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
+++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
@@ -6,6 +6,7 @@
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
+using System.Transactions;
using Arcus.Messaging.Tests.Core.Generators;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Integration.Fixture;
@@ -32,7 +33,8 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump
{
public partial class ServiceBusMessagePumpTests
{
- private const string DefaultSqlTable = "master";
+ private const string DefaultSqlTable = "master",
+ DefaultHttpOperationName = "System.Net.Http.HttpRequestOut";
private string CustomOperationName { get; } = $"operation-{Guid.NewGuid()}";
private bool IsSuccessful { get; } = Bogus.Random.Bool();
@@ -57,7 +59,7 @@ public async Task ServiceBusMessagePump_WithW3CCorrelationFormatNewParentViaOpen
{
traces.AddSource(source.Name);
traces.AddInMemoryExporter(activities);
- traces.AddSqlClientInstrumentation();
+ traces.AddHttpClientInstrumentation();
traces.SetSampler(new AlwaysOnSampler());
});
@@ -67,9 +69,9 @@ public async Task ServiceBusMessagePump_WithW3CCorrelationFormatNewParentViaOpen
await TestServiceBusMessageHandlingAsync(options, Queue, message, async () =>
{
Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName);
- Activity sqlDependency = await GetDependencyActivityAsync(activities, DefaultSqlTable, a => a.ParentId == serviceBusRequest.Id);
+ Activity httpDependency = await GetDependencyActivityAsync(activities, DefaultHttpOperationName, a => a.ParentId == serviceBusRequest.Id);
- Assert.Equal(serviceBusRequest, sqlDependency.Parent);
+ Assert.Equal(serviceBusRequest, httpDependency.Parent);
});
}
@@ -98,7 +100,7 @@ public async Task ServiceBusMessagePump_WithW3CCorrelationFormatViaOpenTelemetry
{
traces.AddSource(source.Name);
traces.AddInMemoryExporter(activities);
- traces.AddSqlClientInstrumentation();
+ traces.AddHttpClientInstrumentation();
traces.SetSampler(new AlwaysOnSampler());
});
@@ -110,9 +112,9 @@ await TestServiceBusMessageHandlingAsync(options, Queue, message, async () =>
(string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName, a => a.TraceId.ToString() == transactionId && a.ParentSpanId.ToString() == operationParentId);
- Activity sqlDependency = await GetDependencyActivityAsync(activities, DefaultSqlTable, a => a.TraceId.ToString() == transactionId && a.ParentId == serviceBusRequest.Id);
+ Activity httpDependency = await GetDependencyActivityAsync(activities, DefaultHttpOperationName, a => a.TraceId.ToString() == transactionId && a.ParentId == serviceBusRequest.Id);
- Assert.Equal(serviceBusRequest, sqlDependency.Parent);
+ Assert.Equal(serviceBusRequest, httpDependency.Parent);
});
}
diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
index dd5c46ea..1214ea9d 100644
--- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
+++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrderWithAutoTrackingAzureServiceBusMessageHandler.cs
@@ -1,4 +1,5 @@
using System;
+using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
@@ -15,6 +16,8 @@ public class OrderWithAutoTrackingAzureServiceBusMessageHandler : IAzureServiceB
private readonly bool _isSuccessful;
private readonly ILogger _logger;
+ private static readonly HttpClient HttpClient = new();
+
///
/// Initializes a new instance of the class.
///
@@ -32,7 +35,7 @@ public OrderWithAutoTrackingAzureServiceBusMessageHandler(bool isSuccessful, ILo
_isSuccessful = isSuccessful;
}
- public Task ProcessMessageAsync(
+ public async Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
MessageCorrelationInfo correlationInfo,
@@ -40,14 +43,13 @@ public Task ProcessMessageAsync(
{
_logger.LogAzureKeyVaultDependency("https://my-vault.azure.net", "Sql-connection-string", isSuccessful: true, DateTimeOffset.UtcNow, TimeSpan.FromSeconds(5));
SimulateSqlQueryWithMicrosoftTracking();
+ await SimulateHttpClientWithMicrosoftTrackingAsync();
if (!_isSuccessful)
{
throw new InvalidOperationException(
"[Test] Sabotage this message processing to let the message correlation system pick up an 'unsuccessful request'");
}
-
- return Task.CompletedTask;
}
private static void SimulateSqlQueryWithMicrosoftTracking()
@@ -73,5 +75,10 @@ private static void SimulateSqlQueryWithMicrosoftTracking()
// A failure will still result in a dependency telemetry instance that we can assert on.
}
}
+
+ private static async Task SimulateHttpClientWithMicrosoftTrackingAsync()
+ {
+ string _ = await HttpClient.GetStringAsync("https://codit.eu");
+ }
}
}
From fde3632f53d33d63909119e4293b010a14a51ca5 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 07:38:09 +0200
Subject: [PATCH 06/17] chore(log): correct log message with missing closing
')'
---
.../OpenTelemetryServiceBusMessageCorrelationScope.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
index 5ef48a31..948ad447 100644
--- a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
@@ -83,7 +83,7 @@ internal OpenTelemetryMessageOperationResult(Activity activity, ILogger logger)
protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
- _logger.LogTrace("Stop Azure Service Bus request '{OperationName}' operation (isSuccessful={IsSuccessful}", _activity.OperationName, isSuccessful);
+ _logger.LogTrace("Stop Azure Service Bus request '{OperationName}' operation (isSuccessful={IsSuccessful})", _activity.OperationName, isSuccessful);
_activity.SetStatus(isSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
_activity.SetEndTime(_activity.StartTimeUtc.Add(duration));
From 6ad28b227b9a6f1e21823a4e985aa7e48e0c1083 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Thu, 3 Jul 2025 09:06:05 +0200
Subject: [PATCH 07/17] chore(otel): use additional missing tags
---
.../OpenTelemetryServiceBusMessageCorrelationScope.cs | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
index 948ad447..960075bf 100644
--- a/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
+++ b/src/Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry/OpenTelemetryServiceBusMessageCorrelationScope.cs
@@ -49,7 +49,7 @@ public MessageOperationResult StartOperation(AzureServiceBusMessageContext messa
Activity activity = _activitySource.CreateActivity(
name: options.OperationName,
- kind: ActivityKind.Server,
+ kind: ActivityKind.Consumer,
context);
activity?.Start();
@@ -60,7 +60,10 @@ public MessageOperationResult StartOperation(AzureServiceBusMessageContext messa
activity.SetTag("az.namespace", "Microsoft.ServiceBus");
activity.SetTag("messaging.system", "servicebus");
- activity.SetTag("messaging.operation", "receive");
+ activity.SetTag("messaging.operation.type", "receive");
+ activity.SetTag("messaging.destination.name", messageContext.EntityPath);
+ activity.SetTag("messaging.message.id", messageContext.MessageId);
+ activity.SetTag("network.protocol.name", "amqp");
activity.SetTag("ServiceBus-Endpoint", messageContext.FullyQualifiedNamespace);
activity.SetTag("ServiceBus-Entity", messageContext.EntityPath);
@@ -86,6 +89,8 @@ protected override void StopOperation(bool isSuccessful, DateTimeOffset startTim
_logger.LogTrace("Stop Azure Service Bus request '{OperationName}' operation (isSuccessful={IsSuccessful})", _activity.OperationName, isSuccessful);
_activity.SetStatus(isSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
+ _activity.SetTag("messaging.operation.name", isSuccessful ? "ack" : "nack");
+
_activity.SetEndTime(_activity.StartTimeUtc.Add(duration));
_activity.Dispose();
}
From a43da084e0a7fe1cad393fc830d07732ae295f0f Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Mon, 21 Jul 2025 12:58:32 +0200
Subject: [PATCH 08/17] chore(main): finalize merge w/ 'main'
---
.../ServiceBusMessagePump.TelemetryTests.cs | 130 ++++++++++--------
1 file changed, 70 insertions(+), 60 deletions(-)
diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
index aef0f4f3..c1829c53 100644
--- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
+++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.TelemetryTests.cs
@@ -3,28 +3,23 @@
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;
-using System.Text;
-using System.Text.Json;
using System.Threading.Tasks;
-using System.Transactions;
-using Arcus.Messaging.Tests.Core.Generators;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Integration.Fixture;
using Arcus.Messaging.Tests.Integration.Fixture.Logging;
-using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus;
-using Arcus.Messaging.Tests.Workers.MessageHandlers;
using Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers;
using Arcus.Testing;
+using Azure.Messaging.ServiceBus;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.DataContracts;
-using Microsoft.Extensions.Logging;
-using OpenTelemetry.Trace;
+using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using OpenTelemetry.Trace;
using Serilog;
+using Serilog.Sinks.ApplicationInsights.TelemetryConverters;
+using Xunit;
using Xunit.Sdk;
-using static Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus.DiskMessageEventConsumer;
-using static Arcus.Observability.Telemetry.Core.ContextProperties.Correlation;
using static Arcus.Observability.Telemetry.Core.ContextProperties.RequestTracking.ServiceBus;
using static Microsoft.Extensions.Logging.ServiceBusEntityType;
@@ -32,8 +27,7 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump
{
public partial class ServiceBusMessagePumpTests
{
- private const string DefaultSqlTable = "master",
- DefaultHttpOperationName = "System.Net.Http.HttpRequestOut";
+ private const string DefaultHttpOperationName = "System.Net.Http.HttpRequestOut";
private string CustomOperationName { get; } = $"operation-{Guid.NewGuid()}";
private bool IsSuccessful { get; } = Bogus.Random.Bool();
@@ -42,51 +36,44 @@ public partial class ServiceBusMessagePumpTests
public async Task ServiceBusMessagePump_WithW3CCorrelationFormatNewParentViaOpenTelemetry_AutomaticallyTracksMicrosoftDependencies()
{
// Arrange
- var options = new WorkerOptions();
using ActivitySource source = CreateActivitySource();
+ await using var serviceBus = GivenServiceBus();
- options.AddServiceBusQueueMessagePump(QueueName, HostName, new DefaultAzureCredential(), pump =>
+ serviceBus.WhenServiceBusQueueMessagePump(pump =>
{
pump.Telemetry.OperationName = CustomOperationName;
-
}).UseServiceBusOpenTelemetryRequestTracking(source)
.WithServiceBusMessageHandler(CreateAutoTrackingMessageHandler);
var activities = new Collection();
- options.AddOpenTelemetry()
- .WithTracing(traces =>
- {
- traces.AddSource(source.Name);
- traces.AddInMemoryExporter(activities);
- traces.AddHttpClientInstrumentation();
- traces.SetSampler(new AlwaysOnSampler());
- });
-
- ServiceBusMessage message = CreateOrderServiceBusMessage();
+ serviceBus.Services.AddOpenTelemetry()
+ .WithTracing(traces =>
+ {
+ traces.AddSource(source.Name);
+ traces.AddInMemoryExporter(activities);
+ traces.AddHttpClientInstrumentation();
+ traces.SetSampler(new AlwaysOnSampler());
+ });
// Act
- await TestServiceBusMessageHandlingAsync(options, Queue, message, async () =>
- {
- Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName);
- Activity httpDependency = await GetDependencyActivityAsync(activities, DefaultHttpOperationName, a => a.ParentId == serviceBusRequest.Id);
+ await serviceBus.WhenProducingMessagesAsync(msg => msg.WithoutTraceParent());
- Assert.Equal(serviceBusRequest, httpDependency.Parent);
- });
- }
+ // Assert
+ Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName);
+ Activity httpDependency = await GetDependencyActivityAsync(activities, DefaultHttpOperationName, a => a.ParentId == serviceBusRequest.Id);
- private static ServiceBusMessage CreateOrderServiceBusMessage()
- {
- return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(OrderGenerator.Generate())));
+ Assert.Equal(serviceBusRequest, httpDependency.Parent);
}
[Fact]
public async Task ServiceBusMessagePump_WithW3CCorrelationFormatViaOpenTelemetry_AutomaticallyTracksMicrosoftDependencies()
{
// Arrange
- var options = new WorkerOptions();
using ActivitySource source = CreateActivitySource();
- options.AddServiceBusQueueMessagePump(QueueName, HostName, new DefaultAzureCredential(), pump =>
+ await using var serviceBus = GivenServiceBus();
+
+ serviceBus.WhenServiceBusQueueMessagePump(pump =>
{
pump.Telemetry.OperationName = CustomOperationName;
@@ -94,27 +81,25 @@ public async Task ServiceBusMessagePump_WithW3CCorrelationFormatViaOpenTelemetry
.UseServiceBusOpenTelemetryRequestTracking(source);
var activities = new Collection();
- options.AddOpenTelemetry()
- .WithTracing(traces =>
- {
- traces.AddSource(source.Name);
- traces.AddInMemoryExporter(activities);
- traces.AddHttpClientInstrumentation();
- traces.SetSampler(new AlwaysOnSampler());
- });
-
- ServiceBusMessage message = CreateOrderServiceBusMessageForW3C();
-
- // Act / Assert
- await TestServiceBusMessageHandlingAsync(options, Queue, message, async () =>
- {
- (string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
+ serviceBus.Services.AddOpenTelemetry()
+ .WithTracing(traces =>
+ {
+ traces.AddSource(source.Name);
+ traces.AddInMemoryExporter(activities);
+ traces.AddHttpClientInstrumentation();
+ traces.SetSampler(new AlwaysOnSampler());
+ });
- Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName, a => a.TraceId.ToString() == transactionId && a.ParentSpanId.ToString() == operationParentId);
- Activity httpDependency = await GetDependencyActivityAsync(activities, DefaultHttpOperationName, a => a.TraceId.ToString() == transactionId && a.ParentId == serviceBusRequest.Id);
+ // Act
+ ServiceBusMessage message = await serviceBus.WhenProducingMessageAsync();
+
+ // Assert
+ (string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
+
+ Activity serviceBusRequest = await GetQueueRequestActivityAsync(activities, CustomOperationName, a => a.TraceId.ToString() == transactionId && a.ParentSpanId.ToString() == operationParentId);
+ Activity httpDependency = await GetDependencyActivityAsync(activities, DefaultHttpOperationName, a => a.TraceId.ToString() == transactionId && a.ParentId == serviceBusRequest.Id);
- Assert.Equal(serviceBusRequest, httpDependency.Parent);
- });
+ Assert.Equal(serviceBusRequest, httpDependency.Parent);
}
private OrderWithAutoTrackingAzureServiceBusMessageHandler CreateAutoTrackingMessageHandler(IServiceProvider provider)
@@ -137,8 +122,8 @@ private async Task GetQueueRequestActivityAsync(IReadOnlyCollection 0,
$"no request activities found with operation name '{operationName}' in" +
$"[{string.Join(", ", activities.Select(a => a.OperationName))}]");
-
- return AssertX.Any(requestDependencies, request =>
+
+ return AssertAny(requestDependencies, request =>
{
Assert.True(IsSuccessful == request.Status is ActivityStatusCode.Ok, $"request for operation '{operationName}' did not match the expected status, expected '{(IsSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error)}' but got '{request.Status}'");
Assert.Contains(request.Tags, tag => tag is { Key: "ServiceBus-EntityType", Value: "Queue" });
@@ -153,18 +138,43 @@ private static async Task GetDependencyActivityAsync(IReadOnlyCollecti
return await Poll.Target(() =>
{
var dependencyActivities = activities.Where(a => a.OperationName == operationName).ToArray();
- Assert.True(dependencyActivities.Length > 0,
+ Assert.True(dependencyActivities.Length > 0,
$"no dependency activities found with operation name '{operationName}' in " +
$"[{string.Join(", ", activities.Select(a => a.OperationName))}]");
- return AssertX.Any(dependencyActivities, dependency =>
+ return AssertAny(dependencyActivities, dependency =>
{
Assert.True(filter is null || filter(dependency), $"dependency for operation '{operationName}' did not match the given custom filter assertion, please check whether the OpenTelemetry correlation system did add all the necessary properties");
+
});
}).FailWith("cannot find dependency telemetry in spied-upon OpenTelemetry activities");
}
+ public static T AssertAny(IEnumerable collection, Action action)
+ {
+ Stack<(int index, object item, Exception exception)> failures = new();
+ T[] array = collection.ToArray();
+
+ for (int index = 0; index < array.Length; ++index)
+ {
+ T item = array[index];
+ try
+ {
+ action(item);
+ return item;
+ }
+ catch (Exception ex)
+ {
+ failures.Push((index, item, ex));
+ }
+ }
+
+ throw new XunitException(
+ $"None of the {array.Length} item(s) matches against the given action: {Environment.NewLine}" +
+ $"{string.Join(Environment.NewLine, failures.Select(f => $"- [{f.index}] {f.item}: {f.exception}"))}");
+ }
+
[Fact]
public async Task ServiceBusMessagePump_WithW3CCorrelationFormat_AutomaticallyTracksMicrosoftDependencies()
{
From d85e713c0250448e5477fb34c273eb7d36ef092c Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Mon, 21 Jul 2025 13:00:14 +0200
Subject: [PATCH 09/17] chore(main): finalize merge w/ 'main'
---
src/Arcus.Messaging.sln | 5 -----
1 file changed, 5 deletions(-)
diff --git a/src/Arcus.Messaging.sln b/src/Arcus.Messaging.sln
index 9d75a9f2..9a0314a9 100644
--- a/src/Arcus.Messaging.sln
+++ b/src/Arcus.Messaging.sln
@@ -68,10 +68,6 @@ Global
{864C12DF-DE3D-421F-8687-EC3918FFB8BE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{864C12DF-DE3D-421F-8687-EC3918FFB8BE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{864C12DF-DE3D-421F-8687-EC3918FFB8BE}.Release|Any CPU.Build.0 = Release|Any CPU
- {F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {F1B24FD4-099E-489D-B45D-A1A992CA5C3A}.Release|Any CPU.Build.0 = Release|Any CPU
{4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4C2DCD91-2D47-493C-82FC-03A76CFA2CB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -86,7 +82,6 @@ Global
{9EED9AD7-B69D-45D5-870F-D4F63A1C3495} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13}
{55DE6D12-4C54-4570-BDFA-00B0FFDE5AB6} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13}
{864C12DF-DE3D-421F-8687-EC3918FFB8BE} = {2CD090E7-7306-49A0-9680-6ED78CFECAE1}
- {F1B24FD4-099E-489D-B45D-A1A992CA5C3A} = {A1369CCD-42D1-43F6-98BC-D8EDA62C2B13}
{4C2DCD91-2D47-493C-82FC-03A76CFA2CB1} = {2CD090E7-7306-49A0-9680-6ED78CFECAE1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
From a01f6891ea1a2d76398262ed1ef8b798bac8f468 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Mon, 28 Jul 2025 20:41:11 +0200
Subject: [PATCH 10/17] Update
docs/preview/03-Features/01-Azure/01-service-bus.mdx
---
docs/preview/03-Features/01-Azure/01-service-bus.mdx | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/preview/03-Features/01-Azure/01-service-bus.mdx b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
index 5efb7b86..efe036b4 100644
--- a/docs/preview/03-Features/01-Azure/01-service-bus.mdx
+++ b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
@@ -151,7 +151,7 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
});
```
-#### Correlation system
+#### Service Bus request tracking
The following correlation systems are available when registering the message pump. These systems will use the incoming Azure Service Bus message to start a request operation. A `MessageCorrelationInfo` model is passed to your registered message handlers, which represents the current request operation. All interactions to dependent systems should be children of this operation for a transactional service-to-service relationship.
From b8ef4b08b66cc4c2781ec265008b4da93064894f Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Wed, 20 Aug 2025 12:16:49 +0200
Subject: [PATCH 11/17] Update
docs/preview/03-Features/01-Azure/01-service-bus.mdx
---
docs/preview/03-Features/01-Azure/01-service-bus.mdx | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/docs/preview/03-Features/01-Azure/01-service-bus.mdx b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
index 9d5b6b42..c45ee164 100644
--- a/docs/preview/03-Features/01-Azure/01-service-bus.mdx
+++ b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
@@ -152,7 +152,8 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
```
#### Service Bus request tracking
-The following correlation systems are available when registering the message pump. These systems will use the incoming Azure Service Bus message to start a request operation. A `MessageCorrelationInfo` model is passed to your registered message handlers, which represents the current request operation. All interactions to dependent systems should be children of this operation for a transactional service-to-service relationship.
+Arcus Messaging makes it possible to make it visible in a logging system like Azure Application Insights, what happens when a message is received from a Service Bus topic or queue.
+Below, you will find the different options that are supported to enable Service Bus request tracking. When this is enabled, Arcus.Messaging will log a request operation for every message that is received from Service Bus and all traces and interactions to dependent systems that happen during the processing of that message, will be logged as children of this request operation.
From 6df1db3ccc2d44f1767a5262a873e4f5b48e7ba2 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Wed, 20 Aug 2025 12:17:48 +0200
Subject: [PATCH 12/17] Update
src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
---
.../MessageHandling/MessageTelemetryOptions.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
index 851cc68b..7f599ffd 100644
--- a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
+++ b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageTelemetryOptions.cs
@@ -7,7 +7,7 @@ namespace Arcus.Messaging.Abstractions.MessageHandling
///
public class MessageTelemetryOptions
{
- private string _operationName = "Process";
+ private string _operationName = "Service Bus message processing";
///
/// Gets or sets the name of the operation that is used when a request telemetry is tracked - default 'Process' is used as operation name.
From 0fa96ddd984e399d50a358ac885b18d1c63b7028 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Wed, 20 Aug 2025 12:18:17 +0200
Subject: [PATCH 13/17] Update
docs/preview/03-Features/01-Azure/01-service-bus.mdx
---
docs/preview/03-Features/01-Azure/01-service-bus.mdx | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/preview/03-Features/01-Azure/01-service-bus.mdx b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
index c45ee164..1032e1c0 100644
--- a/docs/preview/03-Features/01-Azure/01-service-bus.mdx
+++ b/docs/preview/03-Features/01-Azure/01-service-bus.mdx
@@ -138,7 +138,7 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
options.JobId = Guid.NewGuid().ToString();
// The name for the request operation which will be used in the chosen message correlation system.
- // Default: Process
+ // Default: Service Bus message processing
options.Telemetry.OperationName = "ReceiveOrder";
// Indicate whether or not messages should be automatically marked as completed
From 2acba824f82adf25c4a5883f8589977f0bd5e476 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Tue, 4 Nov 2025 10:19:11 +0100
Subject: [PATCH 14/17] Delete
src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
---
.../IServiceBusMessageCorrelationScope.cs | 18 ------------------
1 file changed, 18 deletions(-)
delete mode 100644 src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
diff --git a/src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs b/src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
deleted file mode 100644
index 2e96e3ae..00000000
--- a/src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IServiceBusMessageCorrelationScope.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using Arcus.Messaging.Abstractions.MessageHandling;
-using Arcus.Messaging.Abstractions.Telemetry;
-
-namespace Arcus.Messaging.Abstractions.ServiceBus.Telemetry
-{
- ///
- /// Represents an approach to track the correlation information of a received Azure Service Bus message within a message pump.
- ///
- public interface IServiceBusMessageCorrelationScope
- {
- ///
- /// Starts a new Azure Service bus request operation on the telemetry system.
- ///
- /// The message context for the currently received Azure Service bus message.
- /// The user-configurable options to manipulate the telemetry.
- MessageOperationResult StartOperation(AzureServiceBusMessageContext messageContext, MessageTelemetryOptions options);
- }
-}
From 5e8d652a2e1a4a05d3e2edeb4f5c7f09b45832fd Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Tue, 4 Nov 2025 10:22:56 +0100
Subject: [PATCH 15/17] Update
src/Arcus.Messaging.Core/MessageHandling/MessageTelemetryOptions.cs
---
.../MessageHandling/MessageTelemetryOptions.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Arcus.Messaging.Core/MessageHandling/MessageTelemetryOptions.cs b/src/Arcus.Messaging.Core/MessageHandling/MessageTelemetryOptions.cs
index 313aab72..7076a6ea 100644
--- a/src/Arcus.Messaging.Core/MessageHandling/MessageTelemetryOptions.cs
+++ b/src/Arcus.Messaging.Core/MessageHandling/MessageTelemetryOptions.cs
@@ -7,7 +7,7 @@ namespace Arcus.Messaging.Abstractions.MessageHandling
///
public class MessageTelemetryOptions
{
- private string _operationName = "Service Bus message processing";
+ private string _operationName;
///
/// Gets or sets the name of the operation that is used when a request telemetry is tracked - default 'Process' is used as operation name.
From 5b5c1f22facb0bd65dc45cf329e779bc03ed208a Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Tue, 4 Nov 2025 10:23:28 +0100
Subject: [PATCH 16/17] Update
src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
---
src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs | 1 -
1 file changed, 1 deletion(-)
diff --git a/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs b/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
index 40d0638d..86ca7116 100644
--- a/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
+++ b/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
@@ -213,7 +213,6 @@ private async Task ProcessMessageAsync(ServiceBusReceiv
var messageContext = ServiceBusMessageContext.Create(JobId, EntityType, _messageReceiver, message);
MessageProcessingResult routingResult = await RouteMessageAsync(message, messageContext, cancellationToken);
-
return routingResult;
}
From b5b0736899733a91c635c9bf9ba3db7b648d44c0 Mon Sep 17 00:00:00 2001
From: Stijn Moreels <9039753+stijnmoreels@users.noreply.github.com>
Date: Tue, 4 Nov 2025 10:23:54 +0100
Subject: [PATCH 17/17] Update
src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
---
src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs b/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
index 86ca7116..00ab3e2d 100644
--- a/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
+++ b/src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
@@ -212,6 +212,7 @@ private async Task ProcessMessageAsync(ServiceBusReceiv
}
var messageContext = ServiceBusMessageContext.Create(JobId, EntityType, _messageReceiver, message);
+
MessageProcessingResult routingResult = await RouteMessageAsync(message, messageContext, cancellationToken);
return routingResult;
}