diff --git a/projects/Directory.Packages.props b/projects/Directory.Packages.props index cf53e6134..26c03fde2 100644 --- a/projects/Directory.Packages.props +++ b/projects/Directory.Packages.props @@ -7,9 +7,10 @@ - + + - @@ -44,4 +44,4 @@ - + \ No newline at end of file diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj index fe1af035f..31ddffbee 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj +++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj @@ -50,6 +50,7 @@ + diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 9ed4e0de1..c27628290 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -60,9 +60,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublish(exchange, routingKey, mandatory, default); - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length) - : default; + using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length); ulong publishSequenceNumber = 0; if (publisherConfirmationInfo is not null) @@ -115,9 +113,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken) var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length) - : default; + using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length); ulong publishSequenceNumber = 0; if (publisherConfirmationInfo is not null) diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index f661669af..c9dc7104e 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -228,11 +228,10 @@ internal void TakeOver(Connection other) internal async ValueTask OpenAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - + using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(_frameHandler); try { RabbitMqClientEventSource.Log.ConnectionOpened(); - cancellationToken.ThrowIfCancellationRequested(); // Note: this must happen *after* the frame handler is started @@ -250,8 +249,10 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken) return this; } - catch + catch (Exception ex) { + connectionActivity?.SetStatus(ActivityStatusCode.Error); + connectionActivity?.AddException(ex); try { var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen"); diff --git a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs index 31d076cbd..7ee99f39c 100644 --- a/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs @@ -43,16 +43,20 @@ public static class RabbitMQActivitySource private static readonly ActivitySource s_subscriberSource = new ActivitySource(SubscriberSourceName, AssemblyVersion); + private static readonly ActivitySource s_connectionSource = + new ActivitySource(ConnectionSourceName, AssemblyVersion); + public const string PublisherSourceName = "RabbitMQ.Client.Publisher"; public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber"; + public const string ConnectionSourceName = "RabbitMQ.Client.Connection"; - public static Action> ContextInjector { get; set; } = DefaultContextInjector; + public static Action> ContextInjector { get; set; } = + DefaultContextInjector; public static Func ContextExtractor { get; set; } = DefaultContextExtractor; public static bool UseRoutingKeyAsOperationName { get; set; } = true; - internal static bool PublisherHasListeners => s_publisherSource.HasListeners(); internal static readonly IEnumerable> CreationTags = new[] { @@ -61,14 +65,18 @@ public static class RabbitMQActivitySource new KeyValuePair(ProtocolVersion, "0.9.1") }; + internal static Activity? OpenConnection(IFrameHandler frameHandler) + { + Activity? connectionActivity = + s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client); + connectionActivity? + .SetNetworkTags(frameHandler); + return connectionActivity; + } + internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize, ActivityContext linkedContext = default) { - if (!s_publisherSource.HasListeners()) - { - return null; - } - Activity? activity = linkedContext == default ? s_publisherSource.StartRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish, @@ -82,16 +90,10 @@ public static class RabbitMQActivitySource } return activity; - } internal static Activity? BasicGetEmpty(string queue) { - if (!s_subscriberSource.HasListeners()) - { - return null; - } - Activity? activity = s_subscriberSource.StartRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty, ActivityKind.Consumer); @@ -109,11 +111,6 @@ public static class RabbitMQActivitySource internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize) { - if (!s_subscriberSource.HasListeners()) - { - return null; - } - // Extract the PropagationContext of the upstream parent from the message headers. Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer, @@ -130,11 +127,6 @@ public static class RabbitMQActivitySource internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag, IReadOnlyBasicProperties basicProperties, int bodySize) { - if (!s_subscriberSource.HasListeners()) - { - return null; - } - // Extract the PropagationContext of the upstream parent from the message headers. Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver, @@ -197,7 +189,7 @@ private static void PopulateMessagingTags(string operationType, string operation internal static void PopulateMessageEnvelopeSize(Activity? activity, int size) { - if (activity != null && activity.IsAllDataRequested && PublisherHasListeners) + if (activity?.IsAllDataRequested ?? false) { activity.SetTag(MessagingEnvelopeSize, size); } @@ -205,7 +197,7 @@ internal static void PopulateMessageEnvelopeSize(Activity? activity, int size) internal static void SetNetworkTags(this Activity? activity, IFrameHandler frameHandler) { - if (PublisherHasListeners && activity != null && activity.IsAllDataRequested) + if (activity?.IsAllDataRequested ?? false) { switch (frameHandler.RemoteEndPoint.AddressFamily) { diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb..204448ddd 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1 @@ +const RabbitMQ.Client.RabbitMQActivitySource.ConnectionSourceName = "RabbitMQ.Client.Connection" -> string! \ No newline at end of file diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index d552e6324..765704f0d 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -67,6 +67,7 @@ + @@ -76,7 +77,6 @@ * https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299 * https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594 --> - diff --git a/projects/Test/Common/ActivityRecorder.cs b/projects/Test/Common/ActivityRecorder.cs new file mode 100644 index 000000000..073ac124a --- /dev/null +++ b/projects/Test/Common/ActivityRecorder.cs @@ -0,0 +1,187 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using Xunit; + +namespace Test +{ + public class ActivityRecorder : IDisposable + { + private string _activitySourceName; + private string _activityName; + + private readonly ActivityListener _listener; + private List _finishedActivities = new(); + + private int _started; + private int _stopped; + + public int Started => _started; + public int Stopped => _stopped; + + public Predicate Filter { get; set; } = _ => true; + public bool VerifyParent { get; set; } = true; + public Activity ExpectedParent { get; set; } + + public Activity LastStartedActivity { get; private set; } + public Activity LastFinishedActivity { get; private set; } + public IEnumerable FinishedActivities => _finishedActivities; + + public ActivityRecorder(string activitySourceName, string activityName) + { + _activitySourceName = activitySourceName; + _activityName = activityName; + _listener = new ActivityListener + { + ShouldListenTo = (activitySource) => activitySource.Name == _activitySourceName, + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = (activity) => + { + if (activity.OperationName == _activityName && Filter(activity)) + { + if (VerifyParent) + { + Assert.Same(ExpectedParent, activity.Parent); + } + + Interlocked.Increment(ref _started); + + LastStartedActivity = activity; + } + }, + ActivityStopped = (activity) => + { + if (activity.OperationName == _activityName && Filter(activity)) + { + if (VerifyParent) + { + Assert.Same(ExpectedParent, activity.Parent); + } + + Interlocked.Increment(ref _stopped); + + lock (_finishedActivities) + { + LastFinishedActivity = activity; + _finishedActivities.Add(activity); + } + } + } + }; + + ActivitySource.AddActivityListener(_listener); + } + + public void Dispose() => _listener.Dispose(); + + public void VerifyActivityRecorded(int times) + { + Assert.Equal(times, Started); + Assert.Equal(times, Stopped); + } + + public Activity VerifyActivityRecordedOnce() + { + VerifyActivityRecorded(1); + return LastFinishedActivity; + } + } + + public static class ActivityAssert + { + public static KeyValuePair HasTag(this Activity activity, string name) + { + KeyValuePair tag = activity.TagObjects.SingleOrDefault(t => t.Key == name); + if (tag.Key is null) + { + Assert.Fail($"The Activity tags should contain {name}."); + } + + return tag; + } + + public static void HasTag(this Activity activity, string name, T expectedValue) + { + KeyValuePair tag = HasTag(activity, name); + Assert.Equal(expectedValue, (T)tag.Value); + } + + public static void HasRecordedException(this Activity activity, Exception exception) + { + var exceptionEvent = activity.Events.First(); + Assert.Equal("exception", activity.Events.First().Name); + Assert.Equal(exception.GetType().ToString(), + exceptionEvent.Tags.SingleOrDefault(t => t.Key == "exception.type").Value); + } + + public static void IsInError(this Activity activity) + { + Assert.Equal(ActivityStatusCode.Error, activity.Status); + } + + public static void HasNoTag(this Activity activity, string name) + { + bool contains = activity.TagObjects.Any(t => t.Key == name); + Assert.False(contains, $"The Activity tags should not contain {name}."); + } + + public static void FinishedInOrder(this Activity first, Activity second) + { + Assert.True(first.StartTimeUtc + first.Duration < second.StartTimeUtc + second.Duration, + $"{first.OperationName} should stop before {second.OperationName}"); + } + + public static string CamelToSnake(string camel) + { + if (string.IsNullOrEmpty(camel)) return camel; + StringBuilder bld = new(); + bld.Append(char.ToLower(camel[0])); + for (int i = 1; i < camel.Length; i++) + { + char c = camel[i]; + if (char.IsUpper(c)) + { + bld.Append('_'); + } + + bld.Append(char.ToLower(c)); + } + + return bld.ToString(); + } + } +} diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index 3a17efcd9..34fed3762 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; @@ -434,5 +435,34 @@ public async Task TestCreateConnectionAsync_TruncatesWhenClientNameIsLong_GH980( Assert.Contains(conn.ClientProvidedName, cpn); } } + + [Fact] + public async Task TestCreateConnectionRegisterAnActivity() + { + using ActivityRecorder recorder = + new ActivityRecorder(RabbitMQActivitySource.ConnectionSourceName, "connection attempt"); + ConnectionFactory cf = CreateConnectionFactory(); + await using IConnection conn = await cf.CreateConnectionAsync(); + recorder.VerifyActivityRecordedOnce(); + await conn.CloseAsync(); + } + + [Fact] + public async Task TestCreateConnectionWithFailureRecordException() + { + using ActivityRecorder recorder = + new ActivityRecorder(RabbitMQActivitySource.ConnectionSourceName, "connection attempt"); + ConnectionFactory cf = CreateConnectionFactory(); + cf.AutomaticRecoveryEnabled = true; + var unreachablePort = 1234; + var ep = new AmqpTcpEndpoint("localhost", unreachablePort); + var exception = await Assert.ThrowsAsync(() => + { + return cf.CreateConnectionAsync(new List { ep }); + }); + Activity activity = recorder.VerifyActivityRecordedOnce(); + activity.HasRecordedException(exception); + activity.IsInError(); + } } }