@@ -14,6 +14,10 @@ public class RavenTimeoutPersistence : IPersistTimeouts
14
14
{
15
15
readonly IDocumentStore store ;
16
16
17
+ public TimeSpan CleanupGapFromTimeslice { get ; set ; }
18
+ public TimeSpan TriggerCleanupEvery { get ; set ; }
19
+ DateTime lastCleanupTime = DateTime . MinValue ;
20
+
17
21
public RavenTimeoutPersistence ( IDocumentStore store )
18
22
{
19
23
this . store = store ;
@@ -42,36 +46,72 @@ public RavenTimeoutPersistence(IDocumentStore store)
42
46
Map = docs => from doc in docs
43
47
select new { doc . SagaId }
44
48
} , true ) ;
45
-
49
+
50
+ TriggerCleanupEvery = TimeSpan . FromMinutes ( 2 ) ;
51
+ CleanupGapFromTimeslice = TimeSpan . FromMinutes ( 1 ) ;
52
+ }
53
+
54
+ private static IRavenQueryable < TimeoutData > GetChunkQuery ( IDocumentSession session )
55
+ {
56
+ session . Advanced . AllowNonAuthoritativeInformation = true ;
57
+ return session . Query < TimeoutData > ( "RavenTimeoutPersistence/TimeoutDataSortedByTime" )
58
+ . OrderBy ( t => t . Time )
59
+ . Where ( t =>
60
+ t . OwningTimeoutManager == String . Empty ||
61
+ t . OwningTimeoutManager == Configure . EndpointName ) ;
62
+ }
63
+
64
+ public IEnumerable < Tuple < string , DateTime > > GetCleanupChunk ( DateTime startSlice )
65
+ {
66
+ using ( var session = OpenSession ( ) )
67
+ {
68
+ var chunk = GetChunkQuery ( session )
69
+ . Where ( t => t . Time <= startSlice . Subtract ( CleanupGapFromTimeslice ) )
70
+ . Select ( t => new
71
+ {
72
+ t . Id ,
73
+ t . Time
74
+ } )
75
+ . Take ( 1024 )
76
+ . ToList ( )
77
+ . Select ( arg => new Tuple < string , DateTime > ( arg . Id , arg . Time ) ) ;
78
+
79
+ lastCleanupTime = DateTime . UtcNow ;
80
+
81
+ return chunk ;
82
+ }
46
83
}
47
84
48
85
public List < Tuple < string , DateTime > > GetNextChunk ( DateTime startSlice , out DateTime nextTimeToRunQuery )
49
86
{
50
87
try
51
88
{
52
89
var now = DateTime . UtcNow ;
53
- var skip = 0 ;
54
90
var results = new List < Tuple < string , DateTime > > ( ) ;
91
+
92
+ // Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been
93
+ // added after startSlice have been set to a later timout and we might have missed them
94
+ // because of stale indexes.
95
+ if ( lastCleanupTime . Add ( TriggerCleanupEvery ) > now || lastCleanupTime == DateTime . MinValue )
96
+ {
97
+ results . AddRange ( GetCleanupChunk ( startSlice ) ) ;
98
+ }
99
+
100
+ var skip = 0 ;
55
101
var numberOfRequestsExecutedSoFar = 0 ;
56
102
RavenQueryStatistics stats ;
57
-
58
103
do
59
104
{
60
105
using ( var session = OpenSession ( ) )
61
106
{
62
107
session . Advanced . AllowNonAuthoritativeInformation = true ;
63
108
64
- var query = session . Query < TimeoutData > ( "RavenTimeoutPersistence/TimeoutDataSortedByTime" )
65
- . Where (
66
- t =>
67
- t . OwningTimeoutManager == String . Empty ||
68
- t . OwningTimeoutManager == Configure . EndpointName )
69
- . Where (
70
- t =>
71
- t . Time > startSlice &&
72
- t . Time <= now )
73
- . OrderBy ( t => t . Time )
74
- . Select ( t => new { t . Id , t . Time } )
109
+ var query = GetChunkQuery ( session )
110
+ . Where (
111
+ t =>
112
+ t . Time > startSlice &&
113
+ t . Time <= now )
114
+ . Select ( t => new { t . Id , t . Time } )
75
115
. Statistics ( out stats ) ;
76
116
do
77
117
{
@@ -87,33 +127,29 @@ public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateT
87
127
}
88
128
} while ( skip < stats . TotalResults ) ;
89
129
90
- using ( var session = OpenSession ( ) )
130
+ // Set next execution to be now if we received stale results.
131
+ // Delay the next execution a bit if we results weren't stale and we got the full chunk.
132
+ if ( stats . IsStale )
133
+ {
134
+ nextTimeToRunQuery = now ;
135
+ }
136
+ else
91
137
{
92
- session . Advanced . AllowNonAuthoritativeInformation = true ;
93
-
94
- //Retrieve next time we need to run query
95
- var startOfNextChunk =
96
- session . Query < TimeoutData > ( "RavenTimeoutPersistence/TimeoutDataSortedByTime" )
97
- . Where (
98
- t =>
99
- t . OwningTimeoutManager == String . Empty ||
100
- t . OwningTimeoutManager == Configure . EndpointName )
138
+ using ( var session = OpenSession ( ) )
139
+ {
140
+ var beginningOfNextChunk = GetChunkQuery ( session )
101
141
. Where ( t => t . Time > now )
102
- . OrderBy ( t => t . Time )
103
- . Select ( t => new { t . Id , t . Time } )
142
+ . Take ( 1 )
143
+ . Select ( t => t . Time )
104
144
. FirstOrDefault ( ) ;
105
145
106
- if ( startOfNextChunk != null )
107
- {
108
- nextTimeToRunQuery = startOfNextChunk . Time ;
109
- }
110
- else
111
- {
112
- nextTimeToRunQuery = DateTime . UtcNow . AddMinutes ( 10 ) ;
146
+ nextTimeToRunQuery = ( beginningOfNextChunk == default ( DateTime ) )
147
+ ? DateTime . UtcNow . AddMinutes ( 10 )
148
+ : beginningOfNextChunk . ToUniversalTime ( ) ;
113
149
}
114
-
115
- return results ;
116
150
}
151
+
152
+ return results ;
117
153
}
118
154
catch ( Exception )
119
155
{
@@ -151,9 +187,6 @@ public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
151
187
if ( timeoutData == null )
152
188
return false ;
153
189
154
- timeoutData . Time = DateTime . UtcNow . AddYears ( - 1 ) ;
155
- session . SaveChanges ( ) ;
156
-
157
190
session . Delete ( timeoutData ) ;
158
191
session . SaveChanges ( ) ;
159
192
0 commit comments