Skip to content

Commit 780597d

Browse files
author
John Simons
committed
Merge branch 'hotfix-4.0.6' into support-4.0
2 parents a437bd9 + 151cdb7 commit 780597d

File tree

4 files changed

+352
-45
lines changed

4 files changed

+352
-45
lines changed

src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@
192192
<Compile Include="Serializers\XML\SerializingEnumerableTests.cs" />
193193
<Compile Include="Serializers\XML\SomeEnum.cs" />
194194
<Compile Include="Timeout\FakeMessageSender.cs" />
195+
<Compile Include="Timeout\RavenTimeoutPersisterTests.cs" />
195196
<Compile Include="Timeout\When_fetching_timeouts_from_storage.cs" />
196197
<Compile Include="Timeout\When_pooling_timeouts.cs" />
197198
<Compile Include="Timeout\When_receiving_timeouts.cs" />
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
namespace NServiceBus.Core.Tests.Timeout
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Threading;
7+
using NServiceBus.Persistence.Raven;
8+
using NServiceBus.Persistence.Raven.TimeoutPersister;
9+
using NServiceBus.Timeout.Core;
10+
using NUnit.Framework;
11+
using Raven.Client;
12+
using Raven.Client.Document;
13+
14+
[TestFixture]
15+
public class RavenTimeoutPersisterTests
16+
{
17+
[TestCase, Repeat(200)]
18+
public void Should_not_skip_timeouts()
19+
{
20+
var db = Guid.NewGuid().ToString();
21+
documentStore = new DocumentStore
22+
{
23+
Url = "http://localhost:8080",
24+
DefaultDatabase = db,
25+
}.Initialize();
26+
persister = new RavenTimeoutPersistence(new StoreAccessor(documentStore))
27+
{
28+
TriggerCleanupEvery = TimeSpan.FromHours(1), // Make sure cleanup doesn't run automatically
29+
};
30+
31+
var startSlice = DateTime.UtcNow.AddYears(-10);
32+
// avoid cleanup from running during the test by making it register as being run
33+
Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count());
34+
35+
var expected = new List<Tuple<string, DateTime>>();
36+
var lastExpectedTimeout = DateTime.UtcNow;
37+
var finishedAdding = false;
38+
39+
new Thread(() =>
40+
{
41+
var sagaId = Guid.NewGuid();
42+
for (var i = 0; i < 10000; i++)
43+
{
44+
var td = new TimeoutData
45+
{
46+
SagaId = sagaId,
47+
Destination = new Address("queue", "machine"),
48+
Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(5, 20)),
49+
OwningTimeoutManager = string.Empty,
50+
};
51+
persister.Add(td);
52+
expected.Add(new Tuple<string, DateTime>(td.Id, td.Time));
53+
lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout;
54+
}
55+
finishedAdding = true;
56+
Console.WriteLine("*** Finished adding ***");
57+
}).Start();
58+
59+
// Mimic the behavior of the TimeoutPersister coordinator
60+
var found = 0;
61+
TimeoutData tempTd;
62+
while (!finishedAdding || startSlice < lastExpectedTimeout)
63+
{
64+
DateTime nextRetrieval;
65+
var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval);
66+
foreach (var timeoutData in timeoutDatas)
67+
{
68+
if (startSlice < timeoutData.Item2)
69+
{
70+
startSlice = timeoutData.Item2;
71+
}
72+
73+
Assert.IsTrue(persister.TryRemove(timeoutData.Item1, out tempTd));
74+
found++;
75+
}
76+
}
77+
78+
WaitForIndexing(documentStore);
79+
80+
// If the persister reports stale results have been seen at one point during its normal operation,
81+
// we need to perform manual cleaup.
82+
while (true)
83+
{
84+
var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray();
85+
Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length);
86+
if (chunkToCleanup.Length == 0) break;
87+
88+
found += chunkToCleanup.Length;
89+
foreach (var tuple in chunkToCleanup)
90+
{
91+
Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd));
92+
}
93+
94+
WaitForIndexing(documentStore);
95+
}
96+
97+
using (var session = documentStore.OpenSession())
98+
{
99+
var results = session.Query<TimeoutData>().ToList();
100+
Assert.AreEqual(0, results.Count);
101+
}
102+
103+
Assert.AreEqual(expected.Count, found);
104+
}
105+
106+
[TestCase, Repeat(200)]
107+
public void Should_not_skip_timeouts_also_with_multiple_clients_adding_timeouts()
108+
{
109+
var db = Guid.NewGuid().ToString();
110+
documentStore = new DocumentStore
111+
{
112+
Url = "http://localhost:8080",
113+
DefaultDatabase = db,
114+
}.Initialize();
115+
persister = new RavenTimeoutPersistence(new StoreAccessor(documentStore))
116+
{
117+
TriggerCleanupEvery = TimeSpan.FromDays(1), // Make sure cleanup doesn't run automatically
118+
};
119+
120+
var startSlice = DateTime.UtcNow.AddYears(-10);
121+
// avoid cleanup from running during the test by making it register as being run
122+
Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count());
123+
124+
const int insertsPerThread = 10000;
125+
var expected1 = new List<Tuple<string, DateTime>>();
126+
var expected2 = new List<Tuple<string, DateTime>>();
127+
var lastExpectedTimeout = DateTime.UtcNow;
128+
var finishedAdding1 = false;
129+
var finishedAdding2 = false;
130+
131+
new Thread(() =>
132+
{
133+
var sagaId = Guid.NewGuid();
134+
for (var i = 0; i < insertsPerThread; i++)
135+
{
136+
var td = new TimeoutData
137+
{
138+
SagaId = sagaId,
139+
Destination = new Address("queue", "machine"),
140+
Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)),
141+
OwningTimeoutManager = string.Empty,
142+
};
143+
persister.Add(td);
144+
expected1.Add(new Tuple<string, DateTime>(td.Id, td.Time));
145+
lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout;
146+
}
147+
finishedAdding1 = true;
148+
Console.WriteLine("*** Finished adding ***");
149+
}).Start();
150+
151+
new Thread(() =>
152+
{
153+
using (var store = new DocumentStore
154+
{
155+
Url = "http://localhost:8080",
156+
DefaultDatabase = db,
157+
}.Initialize())
158+
{
159+
var persister2 = new RavenTimeoutPersistence(new StoreAccessor(store));
160+
161+
var sagaId = Guid.NewGuid();
162+
for (var i = 0; i < insertsPerThread; i++)
163+
{
164+
var td = new TimeoutData
165+
{
166+
SagaId = sagaId,
167+
Destination = new Address("queue", "machine"),
168+
Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)),
169+
OwningTimeoutManager = string.Empty,
170+
};
171+
persister2.Add(td);
172+
expected2.Add(new Tuple<string, DateTime>(td.Id, td.Time));
173+
lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout;
174+
}
175+
}
176+
finishedAdding2 = true;
177+
Console.WriteLine("*** Finished adding via a second client connection ***");
178+
}).Start();
179+
180+
// Mimic the behavior of the TimeoutPersister coordinator
181+
var found = 0;
182+
TimeoutData tempTd;
183+
while (!finishedAdding1 || !finishedAdding2 || startSlice < lastExpectedTimeout)
184+
{
185+
DateTime nextRetrieval;
186+
var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval);
187+
foreach (var timeoutData in timeoutDatas)
188+
{
189+
if (startSlice < timeoutData.Item2)
190+
{
191+
startSlice = timeoutData.Item2;
192+
}
193+
194+
Assert.IsTrue(persister.TryRemove(timeoutData.Item1, out tempTd)); // Raven returns duplicates, so we can't assert on this here
195+
found++;
196+
}
197+
}
198+
199+
WaitForIndexing(documentStore);
200+
201+
// If the persister reports stale results have been seen at one point during its normal operation,
202+
// we need to perform manual cleaup.
203+
while (true)
204+
{
205+
var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray();
206+
Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length);
207+
if (chunkToCleanup.Length == 0) break;
208+
209+
found += chunkToCleanup.Length;
210+
foreach (var tuple in chunkToCleanup)
211+
{
212+
Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd));
213+
}
214+
215+
WaitForIndexing(documentStore);
216+
}
217+
218+
using (var session = documentStore.OpenSession())
219+
{
220+
var results = session.Query<TimeoutData>().ToList();
221+
Assert.AreEqual(0, results.Count);
222+
}
223+
224+
Assert.AreEqual(expected1.Count + expected2.Count, found);
225+
}
226+
227+
IDocumentStore documentStore;
228+
RavenTimeoutPersistence persister;
229+
230+
[TearDown]
231+
public void TearDown()
232+
{
233+
if (documentStore != null)
234+
documentStore.Dispose();
235+
}
236+
237+
static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null)
238+
{
239+
var databaseCommands = store.DatabaseCommands;
240+
if (db != null)
241+
databaseCommands = databaseCommands.ForDatabase(db);
242+
var spinUntil = SpinWait.SpinUntil(() => databaseCommands.GetStatistics().StaleIndexes.Length == 0, timeout ?? TimeSpan.FromSeconds(20));
243+
Assert.True(spinUntil);
244+
}
245+
246+
static class RandomProvider
247+
{
248+
private static int seed = Environment.TickCount;
249+
250+
private static ThreadLocal<Random> randomWrapper = new ThreadLocal<Random>(() =>
251+
new Random(Interlocked.Increment(ref seed))
252+
);
253+
254+
public static Random GetThreadRandom()
255+
{
256+
return randomWrapper.Value;
257+
}
258+
}
259+
}
260+
}

src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs

+20-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace NServiceBus.Core.Tests.Timeout
1515
using Raven.Client.Document;
1616
using Raven.Client.Embedded;
1717

18-
[TestFixture, Category("Integration")]
18+
[TestFixture]
1919
public class When_pooling_timeouts_with_raven : When_pooling_timeouts
2020
{
2121
private IDocumentStore store;
@@ -36,10 +36,22 @@ public void Cleanup()
3636
{
3737
store.Dispose();
3838
}
39+
40+
[Test]
41+
public void Should_retrieve_all_timeout_messages_that_expired_even_if_it_needs_to_page()
42+
{
43+
expected = 1024 + 5;
44+
45+
Enumerable.Range(1, expected).ToList().ForEach(i => persister.Add(CreateData(DateTime.UtcNow.AddSeconds(-5))));
46+
47+
StartAndStopReceiver(5);
48+
49+
WaitForMessagesThenAssert(5);
50+
}
3951
}
4052

41-
[TestFixture, Category("Integration")]
42-
public class When_pooling_timeouts_with_inmemory : When_pooling_timeouts
53+
[TestFixture]
54+
public class When_pooling_timeouts_with_inMemory : When_pooling_timeouts
4355
{
4456
protected override IPersistTimeouts CreateTimeoutPersister()
4557
{
@@ -52,7 +64,7 @@ public abstract class When_pooling_timeouts
5264
private IManageTimeouts manager;
5365
private FakeMessageSender messageSender;
5466
readonly Random rand = new Random();
55-
private int expected;
67+
protected int expected;
5668

5769
protected IPersistTimeouts persister;
5870
protected TimeoutPersisterReceiver receiver;
@@ -185,14 +197,14 @@ private void Push(int total, DateTime time)
185197
Enumerable.Range(1, total).ToList().ForEach(i => manager.PushTimeout(CreateData(time)));
186198
}
187199

188-
private void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1)
200+
protected void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1)
189201
{
190202
receiver.Start();
191203
Thread.Sleep(TimeSpan.FromSeconds(secondsToWaitBeforeCallingStop));
192204
receiver.Stop();
193205
}
194206

195-
private static TimeoutData CreateData(DateTime time)
207+
protected static TimeoutData CreateData(DateTime time)
196208
{
197209
return new TimeoutData
198210
{
@@ -202,7 +214,7 @@ private static TimeoutData CreateData(DateTime time)
202214
};
203215
}
204216

205-
private void WaitForMessagesThenAssert(int maxSecondsToWait)
217+
protected void WaitForMessagesThenAssert(int maxSecondsToWait)
206218
{
207219
var maxTime = DateTime.Now.AddSeconds(maxSecondsToWait);
208220

@@ -214,4 +226,4 @@ private void WaitForMessagesThenAssert(int maxSecondsToWait)
214226
Assert.AreEqual(expected, messageSender.MessagesSent);
215227
}
216228
}
217-
}
229+
}

0 commit comments

Comments
 (0)