@@ -13,8 +13,12 @@ const FORK_WHEN_MORE_THAN = 100000,
13
13
FORK_MAX = 5 ,
14
14
SEND_AHEAD = 5 * 60000 ,
15
15
BATCH = 50000 ;
16
-
16
+ /** proces jo class */
17
17
class ProcessJob extends J . IPCJob {
18
+ /** class constructr
19
+ * @param {string } name - name
20
+ * @param {object } data - data
21
+ */
18
22
constructor ( name , data ) {
19
23
super ( name , data ) ;
20
24
if ( this . isFork ) {
@@ -23,26 +27,46 @@ class ProcessJob extends J.IPCJob {
23
27
log . d ( 'initializing ProcessJob with %j & %j' , name , data ) ;
24
28
}
25
29
30
+ /** gets cid
31
+ * @returns {string } cid
32
+ */
26
33
get cid ( ) {
27
34
return this . data . cid ;
28
35
}
29
36
37
+ /** gets aid
38
+ * @returns {string } aid
39
+ */
30
40
get aid ( ) {
31
41
return this . data . aid ;
32
42
}
33
43
44
+ /** gets data.field
45
+ * @returns {object } data.field
46
+ */
34
47
get field ( ) {
35
48
return this . data . field ;
36
49
}
37
50
51
+ /** gets platform
52
+ * @returns {string } data.field.substr(0, 1)
53
+ */
38
54
get platform ( ) {
39
55
return this . data . field . substr ( 0 , 1 ) ;
40
56
}
41
57
58
+ /** checks if is fork
59
+ * @returns {boolean } true - if fork
60
+ */
42
61
get isFork ( ) {
43
62
return ! ! this . data . fork ;
44
63
}
45
64
65
+ /** prepares job
66
+ * @param {object } manager - manager
67
+ * @param {object } db - db connection
68
+ * @returns {Promise } - resolved or rejected
69
+ */
46
70
prepare ( manager , db ) {
47
71
log . d ( 'Loading credentials for %j' , this . data ) ;
48
72
this . creds = new C . Credentials ( this . data . cid ) ;
@@ -77,22 +101,41 @@ class ProcessJob extends J.IPCJob {
77
101
} ) ;
78
102
}
79
103
104
+ /** gets resource name
105
+ * @returns {string } name
106
+ */
80
107
resourceName ( ) {
81
108
return 'process:' + this . cid + ':' + this . field ;
82
109
}
83
110
111
+ /** creates resource
112
+ * @param {string } _id - id
113
+ * @param {string } name - name
114
+ * @param {object } db - db connection
115
+ * @returns {object } Resource
116
+ */
84
117
createResource ( _id , name , db ) {
85
118
return new Resource ( _id , name , { cid : this . cid , field : this . field } , db ) ;
86
119
}
87
120
121
+ /** gets new retry policy
122
+ * @returns {object } retry policy
123
+ */
88
124
retryPolicy ( ) {
89
125
return new R . IPCRetryPolicy ( 3 ) ;
90
126
}
91
127
128
+ /** rescedule
129
+ * @param {number } date - timestamp
130
+ * @returns {Promise } - resolved if updated
131
+ */
92
132
reschedule ( date ) {
93
133
return this . replaceAfter ( date ) ;
94
134
}
95
135
136
+ /** fork
137
+ * @returns {Promise } promise
138
+ */
96
139
fork ( ) {
97
140
if ( ! this . maxFork ) {
98
141
this . maxFork = 0 ;
@@ -102,10 +145,18 @@ class ProcessJob extends J.IPCJob {
102
145
return ProcessJob . insert ( this . db ( ) , { name : this . name , status : 0 , data : data , next : Date . now ( ) } ) ;
103
146
}
104
147
148
+ /** gets current timestamp
149
+ * @returns {number } timestamp
150
+ */
105
151
now ( ) {
106
152
return Date . now ( ) ;
107
153
}
108
154
155
+ /**
156
+ * @param {object } notes - notes
157
+ * @param {object } msgs - messages
158
+ * @returns {object } m
159
+ */
109
160
compile ( notes , msgs ) {
110
161
// let pm, pn, pp, po;
111
162
@@ -126,6 +177,9 @@ class ProcessJob extends J.IPCJob {
126
177
} ) . filter ( m => ! ! m . m ) ;
127
178
}
128
179
180
+ /** finish
181
+ * @param {object } err - error message or object
182
+ */
129
183
async _finish ( err ) {
130
184
if ( err ) {
131
185
let counts = await this . loader . counts ( Date . now ( ) + SEND_AHEAD , this . _id ) ;
@@ -148,18 +202,23 @@ class ProcessJob extends J.IPCJob {
148
202
return await super . _finish ( err ) ;
149
203
}
150
204
205
+ /** run
206
+ * @param {object } db - db connection
207
+ * @param {function } done - callback function
208
+ */
151
209
async run ( db , done ) {
152
210
let resourceError , affected ;
153
211
try {
154
212
let count = await this . loader . count ( this . now ( ) + SEND_AHEAD ) , recheck = [ ] , sending = new Set ( ) ;
155
213
156
- if ( count == 0 ) {
214
+ if ( count === 0 ) {
157
215
return done ( ) ;
158
216
}
159
217
else if ( this . isFork && count < FORK_WHEN_MORE_THAN ) {
160
218
return done ( ) ;
161
219
}
162
220
221
+
163
222
do {
164
223
let date = this . now ( ) + SEND_AHEAD ,
165
224
discarded = await this . loader . discard ( date - 3600000 ) ;
@@ -181,9 +240,9 @@ class ProcessJob extends J.IPCJob {
181
240
// check for aborted or deleted messages, delete notes if needed
182
241
if ( ! this . isFork ) {
183
242
await Promise . all ( Object . values ( notes ) . filter ( n => ( n . result . status & ( N . Status . Aborted | N . Status . Deleted ) ) > 0 ) . map ( note => {
184
- let count = counts [ note . _id . toString ( ) ] ;
185
- log . w ( 'Note %s has been aborted, clearing %d notifications' , note . _id , count ) ;
186
- return this . loader . abortNote ( note . _id , count , this . now ( ) , this . field , ( note . result . status & N . Status . Aborted ) ? 'Aborted' : 'Deleted' ) ;
243
+ let count1 = counts [ note . _id . toString ( ) ] ;
244
+ log . w ( 'Note %s has been aborted, clearing %d notifications' , note . _id , count1 ) ;
245
+ return this . loader . abortNote ( note . _id , count1 , this . now ( ) , this . field , ( note . result . status & N . Status . Aborted ) ? 'Aborted' : 'Deleted' ) ;
187
246
} ) ) ;
188
247
}
189
248
@@ -473,13 +532,18 @@ class ProcessJob extends J.IPCJob {
473
532
try {
474
533
await this . loader . reload ( this . _id ) ;
475
534
}
476
- catch ( e ) {
477
- log . e ( 'Error when reloading for %s: %j' , this . _id , e ) ;
535
+ catch ( err ) {
536
+ log . e ( 'Error when reloading for %s: %j' , this . _id , err ) ;
478
537
}
479
538
done ( e ) ;
480
539
}
481
540
}
482
541
542
+ /** result handling
543
+ * @param {object } resourceError - error object or string
544
+ * @param {array } affected - list of affected
545
+ * @returns {boolean } true or false
546
+ */
483
547
async handleResults ( resourceError , affected ) {
484
548
// in case main job ends with resource error, record it in all messages affected
485
549
// when too much same errors gathered in messages within retry period of 30 minutes, don't reschedule this process job
0 commit comments