Skip to content

Commit 51c031e

Browse files
author
Cookiezaurs
committed
changes in core
1 parent 2149bd8 commit 51c031e

13 files changed

+1111
-119
lines changed

api/aggregator.js

+35-12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const common = require('./utils/common.js');
55
const {WriteBatcher} = require('./parts/data/batcher.js');
66
const {Cacher} = require('./parts/data/cacher.js');
77
const {changeStreamReader} = require('./parts/data/changeStreamReader.js');
8+
const usage = require('./aggregator/usage.js');
89
var t = ["countly:", "aggregator"];
910
t.push("node");
1011

@@ -21,6 +22,7 @@ plugins.connectToAllDatabases(true).then(function() {
2122
// common.writeBatcher = new WriteBatcher(common.db);
2223

2324
common.writeBatcher = new WriteBatcher(common.db);
25+
common.secondaryWriteBatcher = new WriteBatcher(common.db);
2426
common.readBatcher = new Cacher(common.db); //Used for Apps info
2527

2628

@@ -29,27 +31,47 @@ plugins.connectToAllDatabases(true).then(function() {
2931
var changeStream = new changeStreamReader(common.drillDb, {
3032
pipeline: [
3133
{"$match": {"operationType": "insert", "fullDocument.ce": true}},
32-
{"$project": {"a": "$fullDocument.a", "key": "$fullDocument.e", "ts": "$fullDocument.ts", "sg": "$fullDocument.sg", "count": "$fullDocument.c", "s": "$fullDocument.s", "dur": "$fullDocument.dur"}}
34+
{"$project": {"__iid": "$fullDocument._id", "cd": "$fullDocument.cd", "a": "$fullDocument.a", "key": "$fullDocument.e", "ts": "$fullDocument.ts", "sg": "$fullDocument.sg", "count": "$fullDocument.c", "s": "$fullDocument.s", "dur": "$fullDocument.dur"}}
3335
],
3436
"name": "event-ingestion"
3537
}, (token, currEvent) => {
3638
if (currEvent && currEvent.a && currEvent.e) {
37-
common.readBatcher.getOne("apps", currEvent.a, function(err, app) {
39+
// usage.processEventFromStream(currEvent));
40+
}
41+
// process next document
42+
});
43+
44+
common.writeBatcher.addFlushCallback("events_data", function(token) {
45+
console.log("flush callback");
46+
changeStream.acknowledgeToken(token);
47+
});
48+
});
49+
50+
plugins.register("/aggregator", function() {
51+
var changeStream = new changeStreamReader(common.drillDb, {
52+
pipeline: [
53+
{"$match": {"operationType": "insert", "fullDocument.e": "[CLY]_session"}},
54+
{"$addFields": {"__id": "$fullDocument._id", "cd": "$fullDocument.cd"}},
55+
],
56+
"name": "session-ingestion"
57+
}, (token, next) => {
58+
var currEvent = next.fullDocument;
59+
if (currEvent && currEvent.a) {
60+
//Record in session data
61+
common.readBatcher.getOne("apps", common.db.ObjectID(currEvent.a), function(err, app) {
3862
//record event totals in aggregated data
39-
if (currEvent.count && common.isNumber(currEvent.count)) {
40-
currEvent.count = parseInt(currEvent.count, 10);
63+
if (err) {
64+
log.e("Error getting app data for session", err);
65+
return;
4166
}
42-
else {
43-
currEvent.count = 1;
67+
if (app) {
68+
usage.processSessionFromStream(token, currEvent, {"app_id": currEvent.a, "app": app, "time": common.initTimeObj(app.timezone, currEvent.ts), "appTimezone": (app.timezone || "UTC")});
4469
}
45-
4670
});
4771
}
48-
// process next document
4972
});
5073

51-
common.writeBatcher.addFlushCallback("events_data", function(token) {
52-
console.log("flush callback");
74+
common.writeBatcher.addFlushCallback("users", function(token) {
5375
changeStream.acknowledgeToken(token);
5476
});
5577

@@ -157,8 +179,9 @@ plugins.connectToAllDatabases(true).then(function() {
157179
*/
158180
async function storeBatchedData(code) {
159181
try {
160-
//await common.writeBatcher.flushAll();
161-
//await common.insertBatcher.flushAll();
182+
await common.writeBatcher.flushAll();
183+
await common.secondaryWriteBatcher.flushAll();
184+
await common.insertBatcher.flushAll();
162185
console.log("Successfully stored batch state");
163186
}
164187
catch (ex) {

0 commit comments

Comments
 (0)