@@ -323,7 +323,7 @@ class Resultor extends DoFinish {
323
323
324
324
if ( this . data . isSending ( m . id ) ) {
325
325
this . log . d ( 'message %s is still in processing (%d out of %d)' , m . id , m . result . processed , m . result . total ) ;
326
- return m . save ( ) ;
326
+ return syncTotalProp ( this . db , m ) . then ( ( ) => m . save ( ) ) ;
327
327
}
328
328
this . log . d ( 'message %s is done processing' , m . id ) ;
329
329
let state , status , error ;
@@ -392,11 +392,11 @@ class Resultor extends DoFinish {
392
392
if ( error ) {
393
393
m . result . error = error ;
394
394
}
395
- return m . save ( ) ;
395
+ return syncTotalProp ( this . db , m ) . then ( ( ) => m . save ( ) ) ;
396
396
}
397
397
else {
398
398
this . log . d ( 'message %s is in processing (%d out of %d)' , m . id , m . result . processed , m . result . total ) ;
399
- return m . save ( ) ;
399
+ return syncTotalProp ( this . db , m ) . then ( ( ) => m . save ( ) ) ;
400
400
}
401
401
} ) . concat ( Object . keys ( this . noMessage ) . map ( mid => {
402
402
this . log . e ( 'Message %s doesn\'t exist, ignoring result %j' , mid , this . noMessage [ mid ] ) ;
@@ -822,4 +822,35 @@ class Resultor extends DoFinish {
822
822
// }
823
823
}
824
824
825
- module . exports = { Resultor } ;
825
+ module . exports = { Resultor } ;
826
+ /**
827
+ * This function is required to be called before making any updates to the "message".
828
+ * It is implemented because of the race condition between the Job and the Runner.
829
+ * Sometimes Job starts creating "push" records (queue items) but the Runner
830
+ * starts consuming before all of the push records created. Then the race condition
831
+ * occurs and the "total" property gets overwritten by the Runner.
832
+ * @param {MongoClient } db mongo client
833
+ * @param {Message } message message model instance
834
+ */
835
+ async function syncTotalProp ( db , message ) {
836
+ const saved = await db . collection ( "messages" ) . findOne ( { _id : message . _id } ) ;
837
+
838
+ // fields to sync:
839
+ // m.result.total
840
+ // m.result.subs.a.total
841
+ // m.result.subs.a.subs.en.total
842
+
843
+ if ( saved ?. result ?. total !== undefined ) {
844
+ message . result . total = saved . result . total ;
845
+ }
846
+ if ( typeof saved ?. result ?. subs === "object" ) {
847
+ for ( let key in saved . result . subs ) {
848
+ message . result . subs [ key ] . total = saved . result . subs [ key ] . total ;
849
+ if ( typeof saved . result . subs [ key ] ?. subs === "object" ) {
850
+ for ( let key2 in saved . result . subs [ key ] . subs ) {
851
+ message . result . subs [ key ] . subs [ key2 ] . total = saved . result . subs [ key ] . subs [ key2 ] . total ;
852
+ }
853
+ }
854
+ }
855
+ }
856
+ }
0 commit comments