Skip to content

Commit 8f558b2

Browse files
committed
[push] Results handling refactoring
Plus error messages localizations, processed fixes, extra logging temporarily, flushing in workers, sending counter
1 parent ecf2c44 commit 8f558b2

File tree

11 files changed

+356
-214
lines changed

11 files changed

+356
-214
lines changed

plugins/push/api/api-auto.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ module.exports.autoOnCohort = function(entry, cohort, uids) {
5252
audience.push(trigger).setUIDs(uids).setStart(new Date()).run().then(result => {
5353
logCohorts.d('processing %s %s, result: %j', typ, msg._id, result);
5454
if (result.total) {
55-
return msg.update({$inc: {'result.total': result.total}}).then(() => Audience.resetQueue(result.next));
55+
return msg.update({$inc: {'result.total': result.total}}, () => {
56+
result.total += result.total;
57+
}).then(() => Audience.resetQueue(result.next));
5658
}
5759
}).then(() => {
5860
logCohorts.d('done processing %s %s', typ, msg._id);

plugins/push/api/jobs/util/batcher.js

+49-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const { PushError, ERROR } = require('../../send/data');
2+
const { FRAME_NAME } = require('../../send/proto');
23
const { SynFlushTransform } = require('./syn'),
34
{ encode, FRAME, pools } = require('../../send');
45

@@ -35,6 +36,7 @@ class Batcher extends SynFlushTransform {
3536
this.ids = {}; // {aid: {p: {f: id}}}
3637
this.buffers = {}; // {id: [push, push, ...]}
3738
this.listeners = {}; // {id: function}
39+
this.flushes = {}; // {flushid: [pool id, pool id, ...]}
3840
this.count = 0;
3941
this.size = size;
4042
}
@@ -47,22 +49,59 @@ class Batcher extends SynFlushTransform {
4749
* @param {function} callback callback
4850
*/
4951
_transform(push, encoding, callback) {
52+
this.log.d('in batcher _transform', FRAME_NAME[push.frame], push._id);
5053
if (push.frame & FRAME.CMD) {
5154
if (push.frame & (FRAME.FLUSH | FRAME.SYN)) {
52-
if (this.do_flush(() => {}) === true) {
53-
this.push(push);
54-
}
55+
this.do_flush(() => {
56+
this.flushes[push.payload] = [];
57+
this.log.d('in batcher _transform', FRAME_NAME[push.frame], push._id, 'sending to', Object.keys(this.listeners));
58+
for (let id in this.listeners) {
59+
pools.pools[id].write(encode(push.frame, push.payload));
60+
this.flushes[push.payload].push(id);
61+
}
62+
callback();
63+
});
5564
}
56-
for (let id in this.listeners) {
57-
pools.pools[id].write(encode(push.frame, push.payload));
65+
else {
66+
for (let id in this.listeners) {
67+
pools.pools[id].write(encode(push.frame, push.payload));
68+
}
69+
callback();
5870
}
71+
return;
72+
}
73+
else if (push.frame & FRAME.RESULTS) {
74+
this.push(push);
5975
callback();
6076
return;
6177
}
6278

6379
let id = this.ids[push.a][push.p][push.f];
6480
this.buffers[id].push(push);
6581
this.count++;
82+
this.state.incSending(push.m);
83+
84+
if (!this.listeners[id]) {
85+
// this.listeners[id] = true;
86+
// pools.pools[id].pipe(this);
87+
this.listeners[id] = result => {
88+
if (result.frame & FRAME.FLUSH) {
89+
let pls = this.flushes[result.payload];
90+
if (pls && pls.indexOf(id) !== -1) {
91+
pls.splice(pls.indexOf(id), 1);
92+
if (!pls.length) {
93+
this.log.d('flush %d is done', result.payload);
94+
this.push(result);
95+
delete this.flushes[result.payload];
96+
}
97+
}
98+
}
99+
else {
100+
this.push(result);
101+
}
102+
};
103+
pools.pools[id].on('data', this.listeners[id]);
104+
}
66105

67106
if (this.count >= this.size) {
68107
this.log.d('flushing');
@@ -98,12 +137,15 @@ class Batcher extends SynFlushTransform {
98137
* @param {function} callback callback
99138
*/
100139
_flush(callback) {
140+
this.log.d('in batcher _flush');
101141
// this.do_flush(callback);
102142
this.do_flush(err => {
103143
if (err) {
144+
this.log.d('_flush err, callback', err);
104145
callback(err);
105146
}
106147
else {
148+
this.log.d('_flush ok, synIt');
107149
this.synIt(callback);
108150
}
109151
});
@@ -116,6 +158,7 @@ class Batcher extends SynFlushTransform {
116158
* @returns {boolean|undefined} true in case nothing has been written
117159
*/
118160
do_flush(callback) {
161+
this.log.d('in batcher do_flush');
119162
let count = 0,
120163
anything = false,
121164
/**
@@ -140,17 +183,7 @@ class Batcher extends SynFlushTransform {
140183
anything = true;
141184
count++;
142185
this.count -= this.buffers[id].length;
143-
144-
if (!this.listeners[id]) {
145-
// this.listeners[id] = true;
146-
// pools.pools[id].pipe(this);
147-
this.listeners[id] = result => {
148-
this.push(result);
149-
};
150-
pools.pools[id].on('data', this.listeners[id]);
151-
}
152186
pools.pools[id].write(encode(FRAME.SEND, this.buffers[id]), cb);
153-
154187
this.buffers[id] = [];
155188
}
156189
}
@@ -170,6 +203,7 @@ class Batcher extends SynFlushTransform {
170203
* @param {function} callback callback
171204
*/
172205
_final(callback) {
206+
this.log.d('in batcher _final');
173207
if (this.count) {
174208
callback(new PushError('final with data left', ERROR.EXCEPTION));
175209
// this.log.d('final: flushing');

0 commit comments

Comments
 (0)