Skip to content

Commit fb23272

Browse files
Daniel Marbachandreasohlund
Daniel Marbach
andauthored
Processing a control message causes the outbox to throw a null reference exception (#738) (#752)
* Reproduce the bug * Cleanup * Improve requesting partition key not to be mapped * Fix bug * Move test and align configs * Flip around logic * Move the test file * Add tests and checks for missing table name * Share table name check test and fix validation * Make missing partition key test shared as well * Move to extension method * More succinct check --------- Co-authored-by: danielmarbach <[email protected]> # Conflicts: # src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs # src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs # src/NServiceBus.Persistence.AzureTable/Outbox/OutboxPersister.cs # src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs Co-authored-by: Andreas Öhlund <[email protected]>
1 parent aee4ddb commit fb23272

11 files changed

+342
-32
lines changed

Diff for: src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs

+40-19
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,26 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
2828
recoverabilitySettings.Immediate(c => c.NumberOfRetries(1));
2929
}
3030

31-
configuration.Pipeline.Register(new PartitionKeyProviderBehavior.PartitionKeyProviderBehaviorRegisterStep());
31+
if (!settings.TryGet<DoNotRegisterDefaultPartitionKeyProvider>(out _))
32+
{
33+
configuration.Pipeline.Register(new PartitionKeyProviderBehavior.Registration());
34+
}
35+
if (!settings.TryGet<DoNotRegisterDefaultTableNameProvider>(out _))
36+
{
37+
configuration.Pipeline.Register(new TableInformationProviderBehavior.Registration());
38+
}
3239

33-
return Task.FromResult(0);
40+
return Task.CompletedTask;
3441
}
3542

3643
Task IConfigureEndpointTestExecution.Cleanup()
3744
{
38-
return Task.FromResult(0);
45+
return Task.CompletedTask;
3946
}
4047

41-
class PartitionKeyProviderBehavior : Behavior<IIncomingLogicalMessageContext>
48+
class PartitionKeyProviderBehavior : Behavior<IIncomingLogicalMessageContext>
4249
{
43-
private readonly ScenarioContext scenarioContext;
44-
private readonly ReadOnlySettings settings;
45-
46-
public PartitionKeyProviderBehavior(ScenarioContext scenarioContext, ReadOnlySettings settings)
47-
{
48-
this.settings = settings;
49-
this.scenarioContext = scenarioContext;
50-
}
50+
public PartitionKeyProviderBehavior(ScenarioContext scenarioContext) => this.scenarioContext = scenarioContext;
5151

5252
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
5353
{
@@ -56,22 +56,43 @@ public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> n
5656
context.Extensions.Set(new TableEntityPartitionKey(scenarioContext.TestRunId.ToString()));
5757
}
5858

59+
return next();
60+
}
61+
62+
readonly ScenarioContext scenarioContext;
63+
64+
public class Registration : RegisterStep
65+
{
66+
public Registration() : base(nameof(PartitionKeyProviderBehavior),
67+
typeof(PartitionKeyProviderBehavior),
68+
"Populates the partition key",
69+
provider => new PartitionKeyProviderBehavior(provider.Build<ScenarioContext>())) =>
70+
InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
71+
}
72+
}
73+
74+
class TableInformationProviderBehavior : Behavior<IIncomingLogicalMessageContext>
75+
{
76+
public TableInformationProviderBehavior(ReadOnlySettings settings) => this.settings = settings;
77+
78+
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
79+
{
5980
if (!settings.TryGet<TableInformation>(out _) && !context.Extensions.TryGet<TableInformation>(out _))
6081
{
6182
context.Extensions.Set(new TableInformation(SetupFixture.TableName));
6283
}
6384
return next();
6485
}
6586

66-
public class PartitionKeyProviderBehaviorRegisterStep : RegisterStep
87+
readonly ReadOnlySettings settings;
88+
89+
public class Registration : RegisterStep
6790
{
68-
public PartitionKeyProviderBehaviorRegisterStep() : base(nameof(PartitionKeyProviderBehavior),
69-
typeof(PartitionKeyProviderBehavior),
70-
"Populates the partition key",
71-
builder => new PartitionKeyProviderBehavior(builder.Build<ScenarioContext>(), builder.Build<ReadOnlySettings>()))
72-
{
91+
public Registration() : base(nameof(TableInformationProviderBehavior),
92+
typeof(TableInformationProviderBehavior),
93+
"Populates the table information",
94+
provider => new TableInformationProviderBehavior(provider.Build<ReadOnlySettings>())) =>
7395
InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
74-
}
7596
}
7697
}
7798
}

Diff for: src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
5757
outboxTransaction.PartitionKey = partitionKey;
5858
outboxTransaction.StorageSession.TableHolder = tableHolder;
5959

60+
setAsDispatchedHolder.ThrowIfTableClientIsNotSet();
61+
6062
var outboxRecord = await tableHolder.Table.ReadOutboxRecord(context.MessageId, outboxTransaction.PartitionKey.Value, context.Extensions)
6163
.ConfigureAwait(false);
6264

Diff for: src/NServiceBus.Persistence.AzureTable/Outbox/OutboxPersister.cs

+15-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using Microsoft.Azure.Cosmos.Table;
55
using Extensibility;
6+
using NServiceBus.Transport;
67
using Outbox;
78

89
class OutboxPersister : IOutboxStorage
@@ -33,10 +34,20 @@ public async Task<OutboxMessage> Get(string messageId, ContextBag context)
3334

3435
if (!context.TryGet<TableEntityPartitionKey>(out var partitionKey))
3536
{
36-
// we return null here to enable outbox work at logical stage
37-
return null;
37+
// because of the transactional session we cannot assume the incoming message is always present
38+
if (!context.TryGet<IncomingMessage>(out var incomingMessage) ||
39+
!incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader))
40+
{
41+
// we return null here to enable outbox work at logical stage
42+
return null;
43+
}
44+
45+
partitionKey = new TableEntityPartitionKey(messageId);
46+
context.Set(partitionKey);
3847
}
3948

49+
setAsDispatchedHolder.ThrowIfTableClientIsNotSet();
50+
4051
var outboxRecord = await setAsDispatchedHolder.TableHolder.Table
4152
.ReadOutboxRecord(messageId, partitionKey, context)
4253
.ConfigureAwait(false);
@@ -57,6 +68,7 @@ public Task Store(OutboxMessage message, OutboxTransaction transaction, ContextB
5768
}
5869

5970
var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
71+
setAsDispatchedHolder.ThrowIfTableClientIsNotSet();
6072

6173
var outboxRecord = new OutboxRecord
6274
{
@@ -75,6 +87,7 @@ public Task Store(OutboxMessage message, OutboxTransaction transaction, ContextB
7587
public Task SetAsDispatched(string messageId, ContextBag context)
7688
{
7789
var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
90+
setAsDispatchedHolder.ThrowIfTableClientIsNotSet();
7891

7992
var tableHolder = setAsDispatchedHolder.TableHolder;
8093
var record = setAsDispatchedHolder.Record;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace NServiceBus.Persistence.AzureTable
2+
{
3+
using System;
4+
5+
static class SetAsDispatchedHolderExtensions
6+
{
7+
public static void ThrowIfTableClientIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder)
8+
{
9+
if (setAsDispatchedHolder.TableHolder != null && setAsDispatchedHolder.TableHolder.Table != null)
10+
{
11+
return;
12+
}
13+
14+
throw new Exception($"For the outbox to work a table name must be configured. Either configure a default one using '{nameof(ConfigureAzureStorage.DefaultTable)}' or set one via a behavior calling `context.Extensions.Set(new {nameof(TableInformation)}(\"SomeTableName\"))`");
15+
}
16+
}
17+
}

Diff for: src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs

+26-11
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,27 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
2929
}
3030

3131
// This populates the partition key at the physical stage to test the conventional outbox use-case
32-
configuration.Pipeline.Register(typeof(PartitionKeyProviderBehavior), "Populates the partition key");
32+
if (!settings.TryGet<DoNotRegisterDefaultPartitionKeyProvider>(out _))
33+
{
34+
configuration.Pipeline.Register(typeof(PartitionKeyProviderBehavior), "Populates the partition key");
35+
}
36+
37+
if (!settings.TryGet<DoNotRegisterDefaultTableNameProvider>(out _))
38+
{
39+
configuration.Pipeline.Register(typeof(TableInformationProviderBehavior), "Populates the table information key");
40+
}
3341

34-
return Task.FromResult(0);
42+
return Task.CompletedTask;
3543
}
3644

3745
Task IConfigureEndpointTestExecution.Cleanup()
3846
{
39-
return Task.FromResult(0);
47+
return Task.CompletedTask;
4048
}
4149

4250
class PartitionKeyProviderBehavior : Behavior<ITransportReceiveContext>
4351
{
44-
private readonly ScenarioContext scenarioContext;
45-
private readonly ReadOnlySettings settings;
46-
47-
public PartitionKeyProviderBehavior(ScenarioContext scenarioContext, ReadOnlySettings settings)
48-
{
49-
this.settings = settings;
50-
this.scenarioContext = scenarioContext;
51-
}
52+
public PartitionKeyProviderBehavior(ScenarioContext scenarioContext) => this.scenarioContext = scenarioContext;
5253

5354
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
5455
{
@@ -57,11 +58,25 @@ public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
5758
context.Extensions.Set(new TableEntityPartitionKey(scenarioContext.TestRunId.ToString()));
5859
}
5960

61+
return next();
62+
}
63+
64+
readonly ScenarioContext scenarioContext;
65+
}
66+
67+
class TableInformationProviderBehavior : Behavior<ITransportReceiveContext>
68+
{
69+
public TableInformationProviderBehavior(ReadOnlySettings settings) => this.settings = settings;
70+
71+
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
72+
{
6073
if (!settings.TryGet<TableInformation>(out _) && !context.Extensions.TryGet<TableInformation>(out _))
6174
{
6275
context.Extensions.Set(new TableInformation(SetupFixture.TableName));
6376
}
6477
return next();
6578
}
79+
80+
readonly ReadOnlySettings settings;
6681
}
6782
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using System;
4+
using System.Threading.Tasks;
5+
using AcceptanceTesting;
6+
using EndpointTemplates;
7+
using NServiceBus.AcceptanceTesting.Support;
8+
using NServiceBus.Extensibility;
9+
using NServiceBus.Features;
10+
using NServiceBus.Pipeline;
11+
using NServiceBus.Routing;
12+
using NServiceBus.Transport;
13+
using NServiceBus.Unicast.Transport;
14+
using NUnit.Framework;
15+
16+
[TestFixture]
17+
public class When_using_outbox_control_message : NServiceBusAcceptanceTest
18+
{
19+
[Test]
20+
public async Task Should_work()
21+
{
22+
var runSettings = new RunSettings();
23+
runSettings.DoNotRegisterDefaultPartitionKeyProvider();
24+
25+
var context = await Scenario.Define<Context>()
26+
.WithEndpoint<Endpoint>()
27+
.Done(c => c.ProcessedControlMessage)
28+
.Run(runSettings)
29+
.ConfigureAwait(false);
30+
31+
Assert.True(context.ProcessedControlMessage);
32+
}
33+
34+
public class Context : ScenarioContext
35+
{
36+
public bool ProcessedControlMessage { get; set; }
37+
}
38+
39+
public class Endpoint : EndpointConfigurationBuilder
40+
{
41+
public Endpoint() =>
42+
EndpointSetup<DefaultServer>((config, runDescriptor) =>
43+
{
44+
config.EnableOutbox();
45+
config.ConfigureTransport().Transactions(TransportTransactionMode.ReceiveOnly);
46+
config.EnableFeature<ControlMessageSender>();
47+
config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully");
48+
});
49+
50+
class ControlMessageSender : Feature
51+
{
52+
protected override void Setup(FeatureConfigurationContext context)
53+
{
54+
context.Container.ConfigureComponent<StartupTask>(DependencyLifecycle.InstancePerCall);
55+
context.RegisterStartupTask(b => b.Build<StartupTask>());
56+
}
57+
58+
class StartupTask : FeatureStartupTask
59+
{
60+
public StartupTask(IDispatchMessages dispatcher) => this.dispatcher = dispatcher;
61+
62+
protected override Task OnStart(IMessageSession session)
63+
{
64+
var controlMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);
65+
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint))));
66+
67+
return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), new ContextBag());
68+
}
69+
70+
protected override Task OnStop(IMessageSession sessio) => Task.CompletedTask;
71+
72+
readonly IDispatchMessages dispatcher;
73+
}
74+
}
75+
76+
class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
77+
{
78+
public ControlMessageBehavior(Context testContext) => this.testContext = testContext;
79+
80+
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
81+
{
82+
await next();
83+
84+
testContext.ProcessedControlMessage = true;
85+
}
86+
87+
readonly Context testContext;
88+
}
89+
}
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
class DoNotRegisterDefaultPartitionKeyProvider
4+
{
5+
public DoNotRegisterDefaultPartitionKeyProvider()
6+
{
7+
}
8+
}
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
class DoNotRegisterDefaultTableNameProvider
4+
{
5+
public DoNotRegisterDefaultTableNameProvider()
6+
{
7+
}
8+
}
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace NServiceBus.AcceptanceTests
2+
{
3+
using NServiceBus.AcceptanceTesting.Support;
4+
5+
public static partial class RunSettingsExtensions
6+
{
7+
public static void DoNotRegisterDefaultPartitionKeyProvider(this RunSettings runSettings) =>
8+
runSettings.Set(new DoNotRegisterDefaultPartitionKeyProvider());
9+
10+
public static void DoNotRegisterDefaultTableNameProvider(this RunSettings runSettings) =>
11+
runSettings.Set(new DoNotRegisterDefaultTableNameProvider());
12+
}
13+
}

0 commit comments

Comments
 (0)