@@ -4,7 +4,24 @@ const plugins = require('../../../plugins/pluginManager.js');
4
4
const log = require ( '../../utils/log.js' ) ( "batcher" ) ;
5
5
const common = require ( '../../utils/common.js' ) ;
6
6
7
-
7
+ var batcherStats = {
8
+ key : 'BATCHER_STATS' ,
9
+ pid : process . pid ,
10
+ insert_queued : 0 ,
11
+ insert_processing : 0 ,
12
+ insert_errored_fallback : 0 ,
13
+ insert_errored_no_fallback : 0 ,
14
+ insert_errored_no_fallback_last_error : "" ,
15
+ update_queued : 0 ,
16
+ update_processing : 0 ,
17
+ update_errored_fallback : 0 ,
18
+ update_errored_no_fallback : 0 ,
19
+ update_errored_no_fallback_last_error : "" ,
20
+ } ;
21
+
22
+ setInterval ( function ( ) {
23
+ log . i ( '%j' , batcherStats ) ;
24
+ } , 10000 ) ;
8
25
/**
9
26
* Class for batching database insert operations
10
27
* @example
@@ -56,13 +73,16 @@ class InsertBatcher {
56
73
if ( this . data [ db ] [ collection ] . length ) {
57
74
var docs = this . data [ db ] [ collection ] ;
58
75
this . data [ db ] [ collection ] = [ ] ;
76
+ batcherStats . insert_queued -= docs . length ;
77
+ batcherStats . insert_processing += docs . length ;
59
78
try {
60
79
await new Promise ( ( resolve , reject ) => {
61
80
this . dbs [ db ] . collection ( collection ) . insertMany ( docs , { ordered : false , ignore_errors : [ 11000 ] } , function ( err , res ) {
62
81
if ( err ) {
63
82
reject ( err ) ;
64
83
return ;
65
84
}
85
+ batcherStats . insert_processing -= docs . length ;
66
86
resolve ( res ) ;
67
87
} ) ;
68
88
} ) ;
@@ -75,7 +95,10 @@ class InsertBatcher {
75
95
//trying to rollback operations to try again on next iteration
76
96
if ( ex . writeErrors && ex . writeErrors . length ) {
77
97
for ( let i = 0 ; i < ex . writeErrors . length ; i ++ ) {
98
+ batcherStats . insert_processing -- ;
78
99
if ( no_fallback_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
100
+ batcherStats . insert_errored_no_fallback ++ ;
101
+ batcherStats . insert_errored_no_fallback_last_error = ex . writeErrors [ i ] . errmsg ;
79
102
//dispatch failure
80
103
if ( notify_fallback_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
81
104
var index0 = ex . writeErrors [ i ] . index ;
@@ -85,6 +108,7 @@ class InsertBatcher {
85
108
//we could record in diagnostic data
86
109
continue ;
87
110
}
111
+ batcherStats . insert_errored_fallback ++ ;
88
112
let index = ex . writeErrors [ i ] . index ;
89
113
if ( docs [ index ] ) {
90
114
this . data [ db ] [ collection ] . push ( docs [ index ] ) ;
@@ -136,8 +160,10 @@ class InsertBatcher {
136
160
for ( let i = 0 ; i < doc . length ; i ++ ) {
137
161
this . data [ db ] [ collection ] . push ( doc [ i ] ) ;
138
162
}
163
+ batcherStats . insert_queued += doc . length ;
139
164
}
140
165
else {
166
+ batcherStats . insert_queued ++ ;
141
167
this . data [ db ] [ collection ] . push ( doc ) ;
142
168
}
143
169
if ( ! this . process ) {
@@ -212,13 +238,16 @@ class WriteBatcher {
212
238
}
213
239
}
214
240
this . data [ db ] [ collection ] = { } ;
241
+ batcherStats . update_queued -= queries . length ;
242
+ batcherStats . update_processing += queries . length ;
215
243
try {
216
244
await new Promise ( ( resolve , reject ) => {
217
245
this . dbs [ db ] . collection ( collection ) . bulkWrite ( queries , { ordered : false , ignore_errors : [ 11000 ] } , function ( err , res ) {
218
246
if ( err ) {
219
247
reject ( err ) ;
220
248
return ;
221
249
}
250
+ batcherStats . update_processing -= queries . length ;
222
251
resolve ( res ) ;
223
252
} ) ;
224
253
} ) ;
@@ -231,8 +260,10 @@ class WriteBatcher {
231
260
//trying to rollback operations to try again on next iteration
232
261
if ( ex . writeErrors && ex . writeErrors . length ) {
233
262
for ( let i = 0 ; i < ex . writeErrors . length ; i ++ ) {
234
-
263
+ batcherStats . update_processing -- ;
235
264
if ( no_fallback_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
265
+ batcherStats . update_errored_no_fallback ++ ;
266
+ batcherStats . update_errored_no_fallback_last_error = ex . writeErrors [ i ] . errmsg ;
236
267
//dispatch failure
237
268
if ( notify_errors . indexOf ( ex . writeErrors [ i ] . code ) !== - 1 ) {
238
269
var index0 = ex . writeErrors [ i ] . index ;
@@ -242,6 +273,7 @@ class WriteBatcher {
242
273
//we could record in diagnostic data
243
274
continue ;
244
275
}
276
+ batcherStats . update_errored_fallback ++ ;
245
277
let index = ex . writeErrors [ i ] . index ;
246
278
if ( queries [ index ] ) {
247
279
//if we don't have anything for this document yet just use query
@@ -316,6 +348,7 @@ class WriteBatcher {
316
348
}
317
349
if ( ! this . data [ db ] [ collection ] [ id ] ) {
318
350
this . data [ db ] [ collection ] [ id ] = { id : id , value : operation } ;
351
+ batcherStats . update_queued ++ ;
319
352
}
320
353
else {
321
354
this . data [ db ] [ collection ] [ id ] . value = common . mergeQuery ( this . data [ db ] [ collection ] [ id ] . value , operation ) ;
@@ -653,4 +686,4 @@ function getId() {
653
686
return crypto . randomBytes ( 16 ) . toString ( "hex" ) ;
654
687
}
655
688
656
- module . exports = { WriteBatcher, ReadBatcher, InsertBatcher} ;
689
+ module . exports = { WriteBatcher, ReadBatcher, InsertBatcher} ;
0 commit comments