Skip to content

Commit 97c562f

Browse files
committed
Allow V3.0.1 to be compatible with V2.6 Return messages.
Closes #236
1 parent 812299e commit 97c562f

File tree

8 files changed

+95
-30
lines changed

8 files changed

+95
-30
lines changed

src/core/NServiceBus/Headers.cs

+10
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,15 @@ public static class Headers
6767
/// Header telling the NServiceBus Version (beginning NServiceBus V3.0.1).
6868
/// </summary>
6969
public const string NServiceBusVersion = "NServiceBus.Version";
70+
71+
/// <summary>
72+
/// Used in a header when doing a callback (bus.return)
73+
/// </summary>
74+
public const string ReturnMessageErrorCodeHeader = "NServiceBus.ReturnMessage.ErrorCode";
75+
76+
/// <summary>
77+
/// Header that tells if this transport message is a control message
78+
/// </summary>
79+
public const string ControlMessageHeader = "NServiceBus.ControlMessage";
7080
}
7181
}

src/unicast/NServiceBus.Unicast.BackwardCompatibility/CompletionMessage.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
using System;
2-
// Completion message is used by a V3.X subsciber with a 2.6 publisher.
2+
// Completion message is used by a V3.X subscriber with a 2.6 publisher.
33
// Do no change the namespace namespace
44

55
namespace NServiceBus.Unicast.Transport
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using System.Globalization;
2+
using Common.Logging;
3+
using NServiceBus.Config;
4+
using NServiceBus.MessageMutator;
5+
using NServiceBus.Unicast.Transport;
6+
7+
namespace NServiceBus.Unicast.BackwardCompatibility
8+
{
9+
/// <summary>
10+
/// If this is a V26 message, extract completion message return error code and place it in the transport headers
11+
/// </summary>
12+
public class IncomingReturnMessageMutator : IMutateIncomingMessages, INeedInitialization
13+
{
14+
/// <summary>
15+
/// Reference to the BUS to get a hold of the current TransportMessage
16+
/// </summary>
17+
public IBus Bus { get; set; }
18+
19+
/// <summary>
20+
/// If this is a completion message from a 2.6 sender, copy the error code.
21+
/// </summary>
22+
/// <param name="message">Message to copy ErrorCode from.</param>
23+
/// <returns>Same message as received.</returns>
24+
public object MutateIncoming(object message)
25+
{
26+
var completionMessage = message as CompletionMessage;
27+
if (completionMessage == null)
28+
return message;
29+
30+
if(!Bus.CurrentMessageContext.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader))
31+
Bus.CurrentMessageContext.Headers.Add(Headers.ReturnMessageErrorCodeHeader,
32+
completionMessage.ErrorCode.ToString(CultureInfo.InvariantCulture));
33+
34+
//Change to Transport to be a Control Message so no need to find a handler for that.
35+
if(!Bus.CurrentMessageContext.Headers.ContainsKey(Headers.ControlMessageHeader))
36+
Bus.CurrentMessageContext.Headers.Add(Headers.ControlMessageHeader, true.ToString(CultureInfo.InvariantCulture));
37+
38+
return message;
39+
}
40+
41+
/// <summary>
42+
/// Register the IncomingReturnMessageMutator
43+
/// </summary>
44+
public void Init()
45+
{
46+
Configure.Instance.Configurer.ConfigureComponent<IncomingReturnMessageMutator>(DependencyLifecycle.InstancePerCall);
47+
Log.Debug("Configured IncomingReturnMessageMutator");
48+
}
49+
50+
private readonly static ILog Log = LogManager.GetLogger(typeof(IncomingSubscriptionMessageMutator));
51+
}
52+
}

src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportIncomingSubscriptionMessages.cs src/unicast/NServiceBus.Unicast.BackwardCompatibility/IncomingSubscriptionMessageMutator.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
namespace NServiceBus.Unicast.BackwardCompatibility
77
{
8-
class MutateTransportIncomingSubscriptionMessages : IMutateIncomingTransportMessages, INeedInitialization
8+
class IncomingSubscriptionMessageMutator : IMutateIncomingTransportMessages, INeedInitialization
99
{
1010
/// <summary>
11-
/// Re-Adjust V3.0.0 subscribe & UnSubscribe messages. Version 3.0.0 Subs/Unsubs/Publish no NServiceBus.Version set it the headers.
11+
/// Re-Adjust V3.0.0 subscribe and unsubscribe messages.
12+
/// Version 3.0.0 subscribe and unsubscribe message have no NServiceBus.Version set it the headers.
13+
/// Version 3.0.0 Send message have it with "3.0.0" set as value.
1214
/// Do nothing If it is a V2.6 message (contains EnclosedMessageTypes key).
1315
/// </summary>
1416
/// <param name="transportMessage"></param>
@@ -24,14 +26,14 @@ public void MutateIncoming(TransportMessage transportMessage)
2426
}
2527

2628
/// <summary>
27-
/// Register the MutateTransportIncomingSubscriptionMessages mutator
29+
/// Register the IncomingSubscriptionMessageMutator mutator
2830
/// </summary>
2931
public void Init()
3032
{
31-
Configure.Instance.Configurer.ConfigureComponent<MutateTransportIncomingSubscriptionMessages>(DependencyLifecycle.InstancePerCall);
32-
Log.Debug("Configured Transport Incoming Message Mutator: MutateTransportIncomingSubscriptionMessages");
33+
Configure.Instance.Configurer.ConfigureComponent<IncomingSubscriptionMessageMutator>(DependencyLifecycle.InstancePerCall);
34+
Log.Debug("Configured Transport Incoming Message Mutator: IncomingSubscriptionMessageMutator");
3335
}
3436

35-
private readonly static ILog Log = LogManager.GetLogger(typeof(MutateTransportIncomingSubscriptionMessages));
37+
private readonly static ILog Log = LogManager.GetLogger(typeof(IncomingSubscriptionMessageMutator));
3638
}
3739
}

src/unicast/NServiceBus.Unicast.BackwardCompatibility/NServiceBus.Unicast.BackwardCompatibility.csproj

+3-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@
5757
</ItemGroup>
5858
<ItemGroup>
5959
<Compile Include="CompletionMessage.cs" />
60-
<Compile Include="MutateTransportIncomingSubscriptionMessages.cs" />
61-
<Compile Include="MutateTransportOutgoingSubscriptionMessages.cs" />
60+
<Compile Include="IncomingReturnMessageMutator.cs" />
61+
<Compile Include="IncomingSubscriptionMessageMutator.cs" />
62+
<Compile Include="OutgoingSubscriptionMessageMutator.cs" />
6263
<Compile Include="Properties\AssemblyInfo.cs" />
6364
</ItemGroup>
6465
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />

src/unicast/NServiceBus.Unicast.BackwardCompatibility/MutateTransportOutgoingSubscriptionMessages.cs src/unicast/NServiceBus.Unicast.BackwardCompatibility/OutgoingSubscriptionMessageMutator.cs

+12-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.IO;
2-
using System.Threading;
32
using Common.Logging;
43
using NServiceBus.Config;
54
using NServiceBus.MessageMutator;
@@ -11,7 +10,7 @@ namespace NServiceBus.Unicast.BackwardCompatibility
1110
/// <summary>
1211
/// Allow for a V3.X subscriber to subscribe/unsubscribe to a V2.6 publisher
1312
/// </summary>
14-
public class MutateTransportOutgoingSubscriptionMessages : IMutateOutgoingTransportMessages, INeedInitialization
13+
public class OutgoingSubscriptionMessageMutator : IMutateOutgoingTransportMessages, INeedInitialization
1514
{
1615
/// <summary>
1716
/// Allow for a V3.X subscriber to subscribe/unsubscribe to a V2.6 publisher
@@ -23,26 +22,32 @@ public class MutateTransportOutgoingSubscriptionMessages : IMutateOutgoingTransp
2322
public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
2423
{
2524
if ((transportMessage.IsControlMessage() &&
26-
((transportMessage.MessageIntent == MessageIntentEnum.Subscribe) || (transportMessage.MessageIntent == MessageIntentEnum.Unsubscribe))))
25+
((transportMessage.MessageIntent == MessageIntentEnum.Subscribe) ||
26+
(transportMessage.MessageIntent == MessageIntentEnum.Unsubscribe) ||
27+
(transportMessage.MessageIntent == MessageIntentEnum.Send))))
2728
{
2829
var stream = new MemoryStream();
29-
MessageSerializer.Serialize(new object[] { new CompletionMessage() }, stream);
30+
var completionMessage = new CompletionMessage();
31+
if (transportMessage.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader))
32+
completionMessage.ErrorCode = int.Parse(transportMessage.Headers[Headers.ReturnMessageErrorCodeHeader]);
33+
34+
MessageSerializer.Serialize(new object[] { completionMessage }, stream);
3035
transportMessage.Body = stream.ToArray();
3136
Log.Debug("Added Completion message and sending message intent: " + transportMessage.MessageIntent);
3237
}
3338
}
3439

3540
/// <summary>
36-
/// Register the MutateTransportOutgoingSubscriptionMessages mutator
41+
/// Register the OutgoingSubscriptionMessageMutator mutator
3742
/// </summary>
3843
public void Init()
3944
{
40-
Configure.Instance.Configurer.ConfigureComponent<MutateTransportOutgoingSubscriptionMessages>(DependencyLifecycle.InstancePerCall);
45+
Configure.Instance.Configurer.ConfigureComponent<OutgoingSubscriptionMessageMutator>(DependencyLifecycle.InstancePerCall);
4146
}
4247
/// <summary>
4348
/// Gets or sets the message serializer
4449
/// </summary>
4550
public IMessageSerializer MessageSerializer { get; set; }
46-
private readonly static ILog Log = LogManager.GetLogger(typeof(MutateTransportOutgoingSubscriptionMessages));
51+
private readonly static ILog Log = LogManager.GetLogger(typeof(OutgoingSubscriptionMessageMutator));
4752
}
4853
}

src/unicast/NServiceBus.Unicast/UnicastBus.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ public class UnicastBus : IUnicastBus, IStartableBus
3232
/// </summary>
3333
public const string SubscriptionMessageType = "SubscriptionMessageType";
3434

35-
private const string ReturnMessageErrorCodeHeader = "NServiceBus.ReturnMessage.ErrorCode";
36-
3735
#region config properties
3836

3937
private bool autoSubscribe = true;
@@ -480,10 +478,12 @@ void IBus.Return<T>(T errorCode)
480478
{
481479
var returnMessage = ControlMessage.Create();
482480

483-
returnMessage.Headers[ReturnMessageErrorCodeHeader] = errorCode.GetHashCode().ToString();
481+
returnMessage.Headers[Headers.ReturnMessageErrorCodeHeader] = errorCode.GetHashCode().ToString();
484482
returnMessage.CorrelationId = _messageBeingHandled.IdForCorrelation;
485483
returnMessage.MessageIntent = MessageIntentEnum.Send;
486484

485+
InvokeOutgoingTransportMessagesMutators(new object[] { }, returnMessage);
486+
487487
MessageSender.Send(returnMessage, _messageBeingHandled.ReplyToAddress);
488488
}
489489

@@ -1111,8 +1111,8 @@ void HandleCorellatedMessage(TransportMessage msg, object[] messages)
11111111

11121112
var statusCode = int.MinValue;
11131113

1114-
if (msg.IsControlMessage() && msg.Headers.ContainsKey(ReturnMessageErrorCodeHeader))
1115-
statusCode = int.Parse(msg.Headers[ReturnMessageErrorCodeHeader]);
1114+
if (msg.IsControlMessage() && msg.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader))
1115+
statusCode = int.Parse(msg.Headers[Headers.ReturnMessageErrorCodeHeader]);
11161116

11171117
busAsyncResult.Complete(statusCode, messages);
11181118
}

src/unicastTransport/NServiceBus.Unicast.Transport/ControlMessage.cs

+4-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
using System.Collections.Generic;
44

55
/// <summary>
6-
/// Helper for creating controll messages
6+
/// Helper for creating control messages
77
/// </summary>
88
public static class ControlMessage
99
{
@@ -20,15 +20,10 @@ public static TransportMessage Create()
2020
Recoverable = true,
2121
MessageIntent = MessageIntentEnum.Send
2222
};
23-
transportMessage.Headers.Add(ControlMessageHeader, true.ToString());
23+
transportMessage.Headers.Add(Headers.ControlMessageHeader, true.ToString());
2424

2525
return transportMessage;
2626
}
27-
28-
/// <summary>
29-
/// Header which tells that this transportmessage is a controll message
30-
/// </summary>
31-
public static string ControlMessageHeader = "NServiceBus.ControlMessage";
3227
}
3328

3429
/// <summary>
@@ -37,14 +32,14 @@ public static TransportMessage Create()
3732
public static class TransportMessageExtensions
3833
{
3934
/// <summary>
40-
/// True if the transportmessage is a control message
35+
/// True if the transport message is a control message
4136
/// </summary>
4237
/// <param name="transportMessage"></param>
4338
/// <returns></returns>
4439
public static bool IsControlMessage(this TransportMessage transportMessage)
4540
{
4641
return transportMessage.Headers != null &&
47-
transportMessage.Headers.ContainsKey(ControlMessage.ControlMessageHeader);
42+
transportMessage.Headers.ContainsKey(Headers.ControlMessageHeader);
4843
}
4944

5045
}

0 commit comments

Comments
 (0)