-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathOpenTelemetryServiceBusMessageCorrelationScope.cs
More file actions
111 lines (94 loc) · 5.18 KB
/
OpenTelemetryServiceBusMessageCorrelationScope.cs
File metadata and controls
111 lines (94 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
using System;
using System.Diagnostics;
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus;
using Arcus.Messaging.Abstractions.ServiceBus.Telemetry;
using Arcus.Messaging.Abstractions.Telemetry;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Arcus.Messaging.ServiceBus.Telemetry.OpenTelemetry
{
/// <summary>
/// Represents the OpenTelemetry implementation of the <see cref="IServiceBusMessageCorrelationScope"/>
/// to track the correlation information of a received Azure Service Bus message within a message pump.
/// </summary>
internal class OpenTelemetryServiceBusMessageCorrelationScope : IServiceBusMessageCorrelationScope
{
private readonly ActivitySource _activitySource;
private readonly ILogger _logger;
/// <summary>
/// Initializes a new instance of the <see cref="OpenTelemetryServiceBusMessageCorrelationScope"/> class.
/// </summary>
internal OpenTelemetryServiceBusMessageCorrelationScope(ActivitySource activitySource, ILogger<OpenTelemetryServiceBusMessageCorrelationScope> logger)
{
ArgumentNullException.ThrowIfNull(activitySource);
_activitySource = activitySource;
_logger = logger ?? NullLogger<OpenTelemetryServiceBusMessageCorrelationScope>.Instance;
}
/// <summary>
/// Starts a new Azure Service bus request operation on the telemetry system.
/// </summary>
/// <param name="messageContext">The message context for the currently received Azure Service bus message.</param>
/// <param name="options">The user-configurable options to manipulate the telemetry.</param>
public MessageOperationResult StartOperation(AzureServiceBusMessageContext messageContext, MessageTelemetryOptions options)
{
ArgumentNullException.ThrowIfNull(messageContext);
ArgumentNullException.ThrowIfNull(options);
_logger.LogTrace("Start Azure Service Bus request '{OperationName}' operation...", options.OperationName);
(string transactionId, string operationParentId) = messageContext.Properties.GetTraceParent();
ActivityContext context = new(
ActivityTraceId.CreateFromString(transactionId),
ActivitySpanId.CreateFromString(operationParentId),
ActivityTraceFlags.None);
Activity activity = _activitySource.CreateActivity(
name: options.OperationName,
kind: ActivityKind.Consumer,
context);
activity?.Start();
if (activity is null)
{
return new UnlinkedMessageOperationResult(transactionId, operationParentId);
}
activity.SetTag("az.namespace", "Microsoft.ServiceBus");
activity.SetTag("messaging.system", "servicebus");
activity.SetTag("messaging.operation.type", "receive");
activity.SetTag("messaging.destination.name", messageContext.EntityPath);
activity.SetTag("messaging.message.id", messageContext.MessageId);
activity.SetTag("network.protocol.name", "amqp");
activity.SetTag("ServiceBus-Endpoint", messageContext.FullyQualifiedNamespace);
activity.SetTag("ServiceBus-Entity", messageContext.EntityPath);
activity.SetTag("ServiceBus-EntityType", messageContext.EntityType.ToString());
return new OpenTelemetryMessageOperationResult(activity, _logger);
}
private sealed class OpenTelemetryMessageOperationResult : MessageOperationResult
{
private readonly Activity _activity;
private readonly ILogger _logger;
internal OpenTelemetryMessageOperationResult(Activity activity, ILogger logger)
: base(new MessageCorrelationInfo(activity.TraceId.ToString(), activity.SpanId.ToString(), activity.ParentSpanId.ToString()))
{
_activity = activity;
_logger = logger;
}
protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
_logger.LogTrace("Stop Azure Service Bus request '{OperationName}' operation (isSuccessful={IsSuccessful})", _activity.OperationName, isSuccessful);
_activity.SetStatus(isSuccessful ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
_activity.SetTag("messaging.operation.name", isSuccessful ? "ack" : "nack");
_activity.SetEndTime(_activity.StartTimeUtc.Add(duration));
_activity.Dispose();
}
}
private sealed class UnlinkedMessageOperationResult : MessageOperationResult
{
internal UnlinkedMessageOperationResult(string transactionId, string operationParentId)
: base(new MessageCorrelationInfo(Guid.NewGuid().ToString(), transactionId, operationParentId))
{
}
protected override void StopOperation(bool isSuccessful, DateTimeOffset startTime, TimeSpan duration)
{
}
}
}
}