1
1
const { FRAME , FRAME_NAME } = require ( '../../send/proto' ) ,
2
2
{ DoFinish } = require ( './do_finish' ) ,
3
3
{ ERROR , TriggerKind, State, Status, PushError, Result } = require ( '../../send/data' ) ;
4
+ /**
5
+ * @typedef {import("mongodb").ObjectId } ObjectId
6
+ */
7
+
8
+ /**
9
+ * PushStat object (collection: push_stats)
10
+ * @typedef {Object } PushStat
11
+ * @property {ObjectId } a - application id
12
+ * @property {ObjectId } m - message id from "messages" collection
13
+ * @property {string } u - uid from app_users{appId}
14
+ * @property {string } t - token from "push_{appId}" collection
15
+ * @property {string= } r - id returned from provider
16
+ * @property {Date } d - date this message sent to this user
17
+ * @property {string= } e - error message
18
+ * @property {string } p - platform: "a" for android, "i" for ios and "h" for huawei
19
+ * @property {string } f - token type: "p" for production
20
+ */
4
21
5
22
/**
6
23
* Stream responsible for handling sending results:
@@ -34,7 +51,10 @@ class Resultor extends DoFinish {
34
51
this . fatalErrors = { } ; // {mid: []}
35
52
this . toDelete = [ ] ; // [push id, push id, ...]
36
53
this . count = 0 ; // number of results cached
37
- this . last = null ; // time of last data from
54
+ this . last = null ; // time of last data from
55
+
56
+ /** @type {PushStat[] } */
57
+ this . pushStats = [ ] ;
38
58
39
59
this . data . on ( 'app' , app => {
40
60
this . changed [ app . _id ] = { } ;
@@ -118,11 +138,17 @@ class Resultor extends DoFinish {
118
138
if ( id < 0 ) {
119
139
return ;
120
140
}
121
- let { p, m, pr} = this . data . pushes [ id ] ,
141
+ const p = this . data . pushes [ id ] ;
142
+ let { p : platform , m, pr} = p ,
122
143
msg = this . data . message ( m ) ,
123
144
result ,
124
145
rp , rl ;
125
146
147
+ // additional fields to keep this in push_stats
148
+ if ( msg && msg . saveStats ) {
149
+ this . pushStats . push ( { a : p . a , m : p . m , p : p . p , f : p . f , u : p . u , t : p . t , d : new Date , r : null , e : results . toString ( ) } ) ;
150
+ }
151
+
126
152
if ( msg ) {
127
153
result = msg . result ;
128
154
result . lastRun . processed ++ ;
@@ -131,7 +157,7 @@ class Resultor extends DoFinish {
131
157
else {
132
158
result = this . noMessage [ m ] || ( this . noMessage [ m ] = new Result ( ) ) ;
133
159
}
134
- rp = result . sub ( p , undefined , PLATFORM [ p ] . parent ) ;
160
+ rp = result . sub ( platform , undefined , PLATFORM [ platform ] . parent ) ;
135
161
rl = rp . sub ( pr . la || 'default' ) ;
136
162
137
163
result . processed ++ ;
@@ -141,8 +167,8 @@ class Resultor extends DoFinish {
141
167
rl . recordError ( results . message , 1 ) ;
142
168
rl . processed ++ ;
143
169
144
- if ( PLATFORM [ p ] . parent ) {
145
- rp = result . sub ( PLATFORM [ p ] . parent ) ,
170
+ if ( PLATFORM [ platform ] . parent ) {
171
+ rp = result . sub ( PLATFORM [ platform ] . parent ) ,
146
172
rl = rp . sub ( pr . la || 'default' ) ;
147
173
rp . recordError ( results . message , 1 ) ;
148
174
rp . processed ++ ;
@@ -159,29 +185,39 @@ class Resultor extends DoFinish {
159
185
}
160
186
else {
161
187
results . forEach ( res => {
162
- let id , token ;
163
- if ( typeof res === 'string' ) {
164
- this . log . d ( 'Ok for %s' , id ) ;
165
- id = res ;
166
- }
167
- else {
188
+ let id , resultId , token ;
189
+
190
+ if ( Array . isArray ( res ) ) {
168
191
this . log . d ( 'New token for %s' , id ) ;
169
192
id = res [ 0 ] ;
170
193
token = res [ 1 ] ;
171
194
}
195
+ else {
196
+ id = res ;
197
+ }
198
+
199
+ if ( typeof id !== "string" ) {
200
+ resultId = id . r ;
201
+ id = id . p ;
202
+ }
172
203
173
204
let p = this . data . pushes [ id ] ;
174
205
if ( ! p ) { // 2 or more resultors on one pool
175
206
return ;
176
207
}
177
208
178
- this . data . decSending ( p . m ) ;
179
-
180
- let m = this . data . message ( p . m ) ,
209
+ let msg = this . data . message ( p . m ) ,
181
210
result , rp , rl ;
182
211
183
- if ( m ) {
184
- result = m . result ;
212
+ // additional fields to keep this in push_stats
213
+ if ( msg && msg . saveStats ) {
214
+ this . pushStats . push ( { a : p . a , m : p . m , p : p . p , f : p . f , u : p . u , t : p . t , d : new Date , r : resultId , e : null } ) ;
215
+ }
216
+
217
+ this . data . decSending ( p . m ) ;
218
+
219
+ if ( msg ) {
220
+ result = msg . result ;
185
221
result . lastRun . processed ++ ;
186
222
}
187
223
else {
@@ -220,14 +256,6 @@ class Resultor extends DoFinish {
220
256
} ) ;
221
257
this . log . d ( 'Added %d results' , results . length ) ;
222
258
}
223
-
224
- // // in case no more data is expected, we can safely close the stream
225
- // if (this.check()) {
226
- // for (let _ in this.state.pushes) {
227
- // return;
228
- // }
229
- // this.do_flush(() => this.end());
230
- // }
231
259
}
232
260
else if ( frame & FRAME . ERROR ) {
233
261
let error = results . messageError ( ) ,
@@ -241,28 +269,35 @@ class Resultor extends DoFinish {
241
269
return ;
242
270
}
243
271
this . log . d ( 'Error %d %s for %s' , results . type , results . name , id ) ;
244
- let { m, p, pr} = this . data . pushes [ id ] ,
272
+ const p = this . data . pushes [ id ] ;
273
+ let { m, p : platform , pr} = p ,
245
274
result , rp , rl ;
275
+ let msg = this . data . message ( m ) ;
276
+
277
+ // additional fields to keep this in push_stats
278
+ if ( msg && msg . saveStats ) {
279
+ this . pushStats . push ( { a : p . a , m : p . m , p : p . p , f : p . f , u : p . u , t : p . t , d : new Date , r : null , e : results . toString ( ) } ) ;
280
+ }
281
+
246
282
mids [ m ] = ( mids [ m ] || 0 ) + 1 ;
247
283
delete this . data . pushes [ id ] ;
248
284
this . toDelete . push ( id ) ;
249
285
250
- let msg = this . data . message ( m ) ;
251
286
if ( msg ) {
252
287
result = msg . result ;
253
288
}
254
289
else {
255
290
result = this . noMessage [ m ] || ( this . noMessage [ m ] = new Result ( ) ) ;
256
291
}
257
292
258
- rp = result . sub ( p , undefined , PLATFORM [ p ] . parent ) ;
293
+ rp = result . sub ( platform , undefined , PLATFORM [ platform ] . parent ) ;
259
294
rl = rp . sub ( pr . la || 'default' ) ;
260
295
261
296
rp . processed ++ ;
262
297
rl . processed ++ ;
263
298
264
- if ( PLATFORM [ p ] . parent ) {
265
- rp = result . sub ( PLATFORM [ p ] . parent ) ,
299
+ if ( PLATFORM [ platform ] . parent ) {
300
+ rp = result . sub ( PLATFORM [ platform ] . parent ) ,
266
301
rl = rp . sub ( pr . la || 'default' ) ;
267
302
rp . processed ++ ;
268
303
rl . processed ++ ;
@@ -514,6 +549,11 @@ class Resultor extends DoFinish {
514
549
}
515
550
}
516
551
552
+ if ( this . pushStats . length ) {
553
+ promises . push ( this . db . collection ( "push_stats" ) . insertMany ( this . pushStats ) ) ;
554
+ this . pushStats = [ ] ;
555
+ }
556
+
517
557
Promise . all ( promises ) . then ( ( ) => {
518
558
this . log . d ( 'do_flush done' ) ;
519
559
callback ( ) ;
0 commit comments