@@ -171,6 +171,77 @@ function prepareIterationList(collections, seconds) {
171
171
return listed ;
172
172
173
173
}
174
+
175
+ function processDrillCollection ( collection , seconds , callback ) {
176
+ var listed = [ ] ;
177
+
178
+ if ( start === 0 ) {
179
+ getMinTs ( function ( err , minTs ) {
180
+ if ( err ) {
181
+ console . log ( "ERROR: Could not fetch min ts for collection " + collection . collection ) ;
182
+ callback ( err ) ;
183
+ }
184
+ else {
185
+ console . log ( "Min ts for collection " + collection . collection + ": " + minTs ) ;
186
+ generateIterationList ( minTs ) ;
187
+ processCollection ( ) ;
188
+ }
189
+ } ) ;
190
+ }
191
+ else {
192
+ generateIterationList ( start ) ;
193
+ processCollection ( ) ;
194
+ }
195
+
196
+ function getMinTs ( cb ) {
197
+ collection . db . collection ( collection . collection ) . findOne ( { } , { sort : { ts : 1 } , projection : { ts : 1 } } , function ( err , doc ) {
198
+ if ( err ) {
199
+ return callback ( err ) ;
200
+ }
201
+ if ( doc && doc . ts ) {
202
+ cb ( null , doc . ts ) ;
203
+ }
204
+ else {
205
+ cb ( null , end ) ;
206
+ }
207
+ } ) ;
208
+ }
209
+
210
+ function generateIterationList ( z ) {
211
+ z = ( start === 0 && z ) ? z : start ;
212
+ if ( timeSpan === 0 && z === 0 ) {
213
+ listed . push ( { "collection" : collection . collection , "db" : collection . db , "start" : 0 , "end" : end , "query" : { "ts" : { "$lt" : end } } } ) ;
214
+ }
215
+ else if ( timeSpan === 0 ) {
216
+ listed . push ( { "collection" : collection . collection , "db" : collection . db , "start" : z , "end" : end , "query" : { "ts" : { "$gte" : z , "$lt" : end } } } ) ;
217
+ }
218
+ else {
219
+ if ( seconds ) {
220
+ z = Math . floor ( z / 1000 ) ;
221
+ for ( ; z <= Math . floor ( end / 1000 ) ; z += timeSpan ) {
222
+ listed . push ( { "collection" : collection . collection , "db" : collection . db , "start" : z , "end" : Math . min ( z + timeSpan , end ) , "seconds" : true } ) ;
223
+ }
224
+ }
225
+ else {
226
+ for ( ; z <= end ; z += timeSpan * 1000 ) {
227
+ listed . push ( { "collection" : collection . collection , "db" : collection . db , "start" : z , "end" : Math . min ( z + timeSpan * 1000 , end ) } ) ;
228
+ }
229
+ }
230
+ }
231
+ }
232
+
233
+ function processCollection ( ) {
234
+ async . eachLimit ( listed , paralelCn , eventIterator , function ( err ) {
235
+ if ( err ) {
236
+ console . log ( "ERROR: Error while processing drill collection: " + collection . collection ) ;
237
+ return callback ( err ) ;
238
+ }
239
+ console . log ( 'Finished processing collection ' + collection . collection ) ;
240
+ callback ( ) ;
241
+ } ) ;
242
+ }
243
+ }
244
+
174
245
function processDrillCollections ( db , drill_db , callback ) {
175
246
if ( process && process . drill_events ) {
176
247
var collections = [ ] ;
@@ -192,10 +263,19 @@ function processDrillCollections(db, drill_db, callback) {
192
263
collections . push ( { 'db' : drill_db , 'collection' : "drill_events" + crypto . createHash ( 'sha1' ) . update ( eventData . list [ i ] + APP_ID ) . digest ( 'hex' ) } ) ;
193
264
}
194
265
}
195
- var iteratorList = prepareIterationList ( collections ) ;
196
- async . eachLimit ( iteratorList , paralelCn , eventIterator , function ( ) {
197
- console . log ( 'Drill collections processed' ) ;
198
- callback ( ) ;
266
+
267
+ async . eachSeries ( collections , function ( collection , done ) {
268
+ processDrillCollection ( collection , false , function ( err ) {
269
+ if ( err ) {
270
+ console . log ( "ERROR: Error while processing drill collection: " + collection . collection ) ;
271
+ }
272
+ done ( err ) ;
273
+ } ) ;
274
+ } , function ( err ) {
275
+ if ( err ) {
276
+ console . log ( "ERROR: Error processing collections." ) ;
277
+ }
278
+ callback ( err ) ;
199
279
} ) ;
200
280
} ) ;
201
281
}
@@ -224,15 +304,22 @@ Promise.all([plugins.dbConnection("countly"), plugins.dbConnection("countly_dril
224
304
}
225
305
}
226
306
}
227
- var iteratorList = prepareIterationList ( processCols , true ) ;
228
- async . eachLimit ( iteratorList , paralelCn , eventIterator , function ( ) {
229
- if ( errorCn > 0 ) {
230
- console . log ( "There were errors. Please recheck logs for those." ) ;
231
- }
232
- console . log ( 'finished' ) ;
307
+ if ( processCols . length === 0 ) {
308
+ console . log ( "Finished" ) ;
233
309
db . close ( ) ;
234
310
db_drill . close ( ) ;
235
- } ) ;
311
+ }
312
+ else {
313
+ var iteratorList = prepareIterationList ( processCols , true ) ;
314
+ async . eachLimit ( iteratorList , paralelCn , eventIterator , function ( ) {
315
+ if ( errorCn > 0 ) {
316
+ console . log ( "There were errors. Please recheck logs for those." ) ;
317
+ }
318
+ console . log ( 'finished' ) ;
319
+ db . close ( ) ;
320
+ db_drill . close ( ) ;
321
+ } ) ;
322
+ }
236
323
} ) ;
237
324
}
238
325
} ) ;
0 commit comments