Skip to content
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d9f58a1
feat(correlation): add open-telemetry as msg correlation scope
stijnmoreels Jul 3, 2025
01bf8aa
fix(test): use correct auto-tracking 'is successful' + strictken acti…
stijnmoreels Jul 3, 2025
183c50a
chore(test): provide better test assertion failure messages + use bug…
stijnmoreels Jul 3, 2025
6e5e66e
chore(deps): upgrade sql client package
stijnmoreels Jul 3, 2025
7293eee
chore(deps): use stable open-telemetry http exporter
stijnmoreels Jul 3, 2025
fde3632
chore(log): correct log message with missing closing ')'
stijnmoreels Jul 3, 2025
6ad28b2
chore(otel): use additional missing tags
stijnmoreels Jul 3, 2025
c162084
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Jul 4, 2025
ffce365
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Jul 21, 2025
a43da08
chore(main): finalize merge w/ 'main'
stijnmoreels Jul 21, 2025
d85e713
chore(main): finalize merge w/ 'main'
stijnmoreels Jul 21, 2025
a01f689
Update docs/preview/03-Features/01-Azure/01-service-bus.mdx
stijnmoreels Jul 28, 2025
9cc8121
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Aug 20, 2025
b45a9a5
Merge branch 'feature/add-open-telemetry-msg-correlation-scope' of ht…
stijnmoreels Aug 20, 2025
b8ef4b0
Update docs/preview/03-Features/01-Azure/01-service-bus.mdx
stijnmoreels Aug 20, 2025
6df1db3
Update src/Arcus.Messaging.Abstractions/MessageHandling/MessageTeleme…
stijnmoreels Aug 20, 2025
0fa96dd
Update docs/preview/03-Features/01-Azure/01-service-bus.mdx
stijnmoreels Aug 20, 2025
1f59973
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Aug 21, 2025
c3d9fe2
Merge branch 'feature/add-open-telemetry-msg-correlation-scope' of ht…
stijnmoreels Aug 21, 2025
80ac955
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Nov 4, 2025
2acba82
Delete src/Arcus.Messaging.Abstractions.ServiceBus/Telemetry/IService…
stijnmoreels Nov 4, 2025
5e8d652
Update src/Arcus.Messaging.Core/MessageHandling/MessageTelemetryOptio…
stijnmoreels Nov 4, 2025
5b5c1f2
Update src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
stijnmoreels Nov 4, 2025
b5b0736
Update src/Arcus.Messaging.ServiceBus/ServiceBusReceiverMessagePump.cs
stijnmoreels Nov 4, 2025
b7de48a
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Nov 6, 2025
c54fbb4
Merge branch 'feature/add-open-telemetry-msg-correlation-scope' of ht…
stijnmoreels Nov 6, 2025
fe3ecda
Merge branch 'main' into feature/add-open-telemetry-msg-correlation-s…
stijnmoreels Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/preview/02-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ Host.CreateDefaultBuilder()

The way *'message handlers'* are registered determines when the received message will be routed to them.

> 🔗 See the [Azure Service Bus messaging feature documentation](./03-Features/01-Azure/01-service-bus.md) for more information on providing additional routing filters to your message handlers.
> 🔗 See the [Azure Service Bus messaging feature documentation](./03-Features/01-Azure/01-service-bus.mdx) for more information on providing additional routing filters to your message handlers.
Comment thread
fgheysels marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
sidebar_label: Service Bus
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

# Azure Service Bus messaging
The `Arcus.Messaging.Pumps.ServiceBus` library provides a way to process Azure Service Bus messages on queues/topic subscriptions via custom routed *message handlers*, instead of interacting with the [`ServiceBusReceiver`](https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusreceiver) yourself.

Expand Down Expand Up @@ -134,6 +137,10 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
// this job instance in a multi-instance deployment (default: generated GUID).
options.JobId = Guid.NewGuid().ToString();

// The name for the request operation which will be used in the chosen message correlation system.
// Default: Process
Comment thread
stijnmoreels marked this conversation as resolved.
Outdated
options.Telemetry.OperationName = "ReceiveOrder";

// Indicate whether or not messages should be automatically marked as completed
// if no exceptions occurred and processing has finished (default: true).
options.Routing.AutoComplete = false;
Expand All @@ -144,6 +151,43 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
});
```

#### Service Bus request tracking
Arcus Messaging makes it possible to make it visible in a logging system like Azure Application Insights, what happens when a message is received from a Service Bus topic or queue.
Below, you will find the different options that are supported to enable Service Bus request tracking. When this is enabled, Arcus.Messaging will log a request operation for every message that is received from Service Bus and all traces and interactions to dependent systems that happen during the processing of that message, will be logged as children of this request operation.

<Tabs groupId="correlation-systems">
<TabItem value="open-telemetry" label="OpenTelemetry">
Comment thread
fgheysels marked this conversation as resolved.

```powershell
PS> Install-Package -Name Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
```

Make sure to include the following line to your message pump registration:
```diff
+ ActivitySource applicationSource = new("<activity-source-name>");

services.AddServiceBusTopicMessagePump(...)
+ .UseServiceBusOpenTelemetryRequestTracking(applicationSource)
.WithServiceBusMessageHandler<...>()
.WithServiceBusMessageHandler<...>();
```

Now Arcus will use the OpenTelemetry approach to track Azure Service Bus messages. Make sure that the `ActivitySource` that is passed, is also tracked by OpenTelemetry:
```csharp
IServiceCollection services = ...

services.AddOpenTelemetry()
.AddTraces(traces =>
{
traces.AddSource("<activity-source-name>");
});
```

> 🔗 [More info on OpenTelemetry on Azure](https://learn.microsoft.com/en-us/azure/azure-monitor/app/opentelemetry)

</TabItem>
</Tabs>

### Message handler routing customization
The following routing options are available when registering an Azure Service Bus message handler on a message pump.

Expand Down Expand Up @@ -267,7 +311,9 @@ Both the recovery period after the circuit is open and the interval between mess
}
```

#### 🔔 Get notified on a circuit breaker state transition
<details>
<summary>**🔔 Get notified on a circuit breaker state transition**</summary>

To get notified on circuit-breaker state transitions, you can register one or more event handlers on a message pump.

These event handlers should implement the `ICircuitBreakerEventHandler` interface:
Expand Down Expand Up @@ -295,4 +341,5 @@ using Microsoft.Extensions.DependencyInjection;
services.AddServiceBus[Queue/Topic]MessagePump(...)
.WithCircuitBreakerStateChangedEventHandler<MyFirstCircuitBreakerEventHandler>()
.WithCircuitBreakerStateChangedEventHandler<MySecondCircuitBreakerEventHandler>();
```
```
</details>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Abstractions.Telemetry;

namespace Arcus.Messaging.Abstractions.ServiceBus.Telemetry
{
/// <summary>
/// Represents an approach to track the correlation information of a received Azure Service Bus message within a message pump.
/// </summary>
public interface IServiceBusMessageCorrelationScope
Comment thread
fgheysels marked this conversation as resolved.
{
/// <summary>
/// Starts a new Azure Service bus request operation on the telemetry system.
/// </summary>
/// <param name="messageContext">The message context for the currently received Azure Service bus message.</param>
/// <param name="options">The user-configurable options to manipulate the telemetry.</param>
MessageOperationResult StartOperation(AzureServiceBusMessageContext messageContext, MessageTelemetryOptions options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Arcus.Messaging.Abstractions.MessageHandling
/// </summary>
public class MessageTelemetryOptions
{
private string _operationName;
private string _operationName = "Service Bus message processing";

/// <summary>
/// Gets or sets the name of the operation that is used when a request telemetry is tracked - default 'Process' is used as operation name.
Expand Down
41 changes: 34 additions & 7 deletions src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
using Arcus.Messaging.Abstractions.Telemetry;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using Arcus.Messaging.Pumps.ServiceBus.Configuration;
using Azure.Messaging.ServiceBus;
Expand Down Expand Up @@ -284,21 +286,46 @@ private async Task<MessageProcessingResult> ProcessMessageAsync(ServiceBusReceiv
_logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure Service Bus {EntityType} message pump '{JobId}'", message.MessageId, EntityType, JobId);
}

#pragma warning disable CS0618 // Type or member is obsolete
using MessageCorrelationResult correlationResult = DetermineMessageCorrelation(message);
var messageContext = AzureServiceBusMessageContext.Create(JobId, EntityType, _messageReceiver, message);
using MessageOperationResult correlationResult = DetermineMessageCorrelation(message, messageContext);

MessageProcessingResult routingResult = await _messageRouter.RouteMessageAsync(_messageReceiver, message, messageContext, correlationResult.Correlation, cancellationToken);
correlationResult.IsSuccessful = routingResult.IsSuccessful;

MessageProcessingResult routingResult = await _messageRouter.RouteMessageAsync(_messageReceiver, message, messageContext, correlationResult.CorrelationInfo, cancellationToken);
return routingResult;
}

private MessageCorrelationResult DetermineMessageCorrelation(ServiceBusReceivedMessage message)
private MessageOperationResult DetermineMessageCorrelation(ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext)
{
var correlationScope = _serviceProvider.GetService<IServiceBusMessageCorrelationScope>();
if (correlationScope is null)
{
(string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
var client = _serviceProvider.GetRequiredService<TelemetryClient>();

#pragma warning disable CS0618 // Type or member is obsolete
var deprecatedResult = MessageCorrelationResult.Create(client, transactionId, operationParentId);
return new W3CAdapterMessageOperationResult(deprecatedResult);
#pragma warning restore CS0618 // Type or member is obsolete
}

return correlationScope.StartOperation(messageContext, Options.Telemetry);
}

[Obsolete("Will be removed once " + nameof(MessageCorrelationResult) + " is removed")]
private sealed class W3CAdapterMessageOperationResult : MessageOperationResult
{
(string transactionId, string operationParentId) = message.ApplicationProperties.GetTraceParent();
var client = _serviceProvider.GetRequiredService<TelemetryClient>();
private readonly MessageCorrelationResult _deprecatedResult;

return MessageCorrelationResult.Create(client, transactionId, operationParentId);
internal W3CAdapterMessageOperationResult(MessageCorrelationResult deprecatedResult) : base(deprecatedResult.CorrelationInfo)
{
_deprecatedResult = deprecatedResult;
}

protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
_deprecatedResult.Dispose();
}
}

private static async Task UntilCancelledAsync(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>latest</LangVersion>
<Authors>Arcus</Authors>
<Company>Arcus</Company>
<Product>Arcus.Messaging</Product>
<Description>Provides capability to track message correlation information using OpenTelemetry for Azure Service Bus message pumps</Description>
<Copyright>Copyright (c) Arcus</Copyright>
<PackageProjectUrl>https://messaging.arcus-azure.net/</PackageProjectUrl>
<RepositoryUrl>https://github.com/arcus-azure/arcus.messaging</RepositoryUrl>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageIcon>icon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<RepositoryType>Git</RepositoryType>
<PackageTags>Azure;Messaging;ServiceBus</PackageTags>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\README.md" Pack="true" PackagePath="\" />
<None Include="..\..\LICENSE" Pack="true" PackagePath="\" />
<None Include="..\..\docs\static\img\icon.png" Pack="true" PackagePath="\" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Arcus.Messaging.Abstractions.ServiceBus\Arcus.Messaging.Abstractions.ServiceBus.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Diagnostics;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
using Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
/// <summary>
/// Extensions on the <see cref="ServiceBusMessageHandlerCollection"/> to register OpenTelemetry services for Azure Service Bus message pumps.
/// </summary>
public static class ServiceBusMessageHandlerCollectionExtensions
{
/// <summary>
/// Register OpenTelemetry as the correlation system to track Azure Service Bus message requests within the message pump.
/// </summary>
/// <param name="handlers">The collection of Azure Service Bus message handler collection.</param>
/// <param name="activitySource">The activity source to start <see cref="Activity"/> instances from upon receiving Azure Service Bus messages.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="handlers"/> or the <paramref name="activitySource"/> is <c>null</c>.</exception>
public static ServiceBusMessageHandlerCollection UseServiceBusOpenTelemetryRequestTracking(
this ServiceBusMessageHandlerCollection handlers,
ActivitySource activitySource)
{
ArgumentNullException.ThrowIfNull(handlers);
ArgumentNullException.ThrowIfNull(activitySource);

handlers.Services.TryAddSingleton<IServiceBusMessageCorrelationScope>(serviceProvider =>
{
var logger = serviceProvider.GetService<ILogger<OpenTelemetryServiceBusMessageCorrelationScope>>();
return new OpenTelemetryServiceBusMessageCorrelationScope(activitySource, logger);
});

return handlers;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Diagnostics;
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus;
using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
using Arcus.Messaging.Abstractions.Telemetry;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
{
/// <summary>
/// Represents the OpenTelemetry implementation of the <see cref="IServiceBusMessageCorrelationScope"/>
/// to track the correlation information of a received Azure Service Bus message within a message pump.
/// </summary>
internal class OpenTelemetryServiceBusMessageCorrelationScope : IServiceBusMessageCorrelationScope
{
private readonly ActivitySource _activitySource;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="OpenTelemetryServiceBusMessageCorrelationScope"/> class.
/// </summary>
internal OpenTelemetryServiceBusMessageCorrelationScope(ActivitySource activitySource, ILogger<OpenTelemetryServiceBusMessageCorrelationScope> logger)
{
ArgumentNullException.ThrowIfNull(activitySource);
_activitySource = activitySource;
_logger = logger ?? NullLogger<OpenTelemetryServiceBusMessageCorrelationScope>.Instance;
}

/// <summary>
/// Starts a new Azure Service bus request operation on the telemetry system.
/// </summary>
/// <param name="messageContext">The message context for the currently received Azure Service bus message.</param>
/// <param name="options">The user-configurable options to manipulate the telemetry.</param>
public MessageOperationResult StartOperation(AzureServiceBusMessageContext messageContext, MessageTelemetryOptions options)
{
ArgumentNullException.ThrowIfNull(messageContext);
ArgumentNullException.ThrowIfNull(options);

_logger.LogTrace("Start Azure Service Bus request '{OperationName}' operation...", options.OperationName);
(string transactionId, string operationParentId) = messageContext.Properties.GetTraceParent();

ActivityContext context = new(
ActivityTraceId.CreateFromString(transactionId),
ActivitySpanId.CreateFromString(operationParentId),
ActivityTraceFlags.None);

Activity activity = _activitySource.CreateActivity(
name: options.OperationName,
kind: ActivityKind.Consumer,
context);

activity?.Start();
if (activity is null)
{
return new UnlinkedMessageOperationResult(transactionId, operationParentId);
}

activity.SetTag("az.namespace", "Microsoft.ServiceBus");
activity.SetTag("messaging.system", "servicebus");
activity.SetTag("messaging.operation.type", "receive");
activity.SetTag("messaging.destination.name", messageContext.EntityPath);
activity.SetTag("messaging.message.id", messageContext.MessageId);
activity.SetTag("network.protocol.name", "amqp");

activity.SetTag("ServiceBus-Endpoint", messageContext.FullyQualifiedNamespace);
activity.SetTag("ServiceBus-Entity", messageContext.EntityPath);
activity.SetTag("ServiceBus-EntityType", messageContext.EntityType.ToString());

return new OpenTelemetryMessageOperationResult(activity, _logger);
}

private sealed class OpenTelemetryMessageOperationResult : MessageOperationResult
{
private readonly Activity _activity;
private readonly ILogger _logger;

internal OpenTelemetryMessageOperationResult(Activity activity, ILogger logger)
: base(new MessageCorrelationInfo(activity.TraceId.ToString(), activity.SpanId.ToString(), activity.ParentSpanId.ToString()))
{
_activity = activity;
_logger = logger;
}

protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
_logger.LogTrace("Stop Azure Service Bus request '{OperationName}' operation (isSuccessful={IsSuccessful})", _activity.OperationName, isSuccessful);

_activity.SetStatus(isSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
_activity.SetTag("messaging.operation.name", isSuccessful ? "ack" : "nack");

_activity.SetEndTime(_activity.StartTimeUtc.Add(duration));
_activity.Dispose();
}
}

private sealed class UnlinkedMessageOperationResult : MessageOperationResult
{
internal UnlinkedMessageOperationResult(string transactionId, string operationParentId)
: base(new MessageCorrelationInfo(Guid.NewGuid().ToString(), transactionId, operationParentId))
{
}

protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
}
}
}
}
Loading
Loading