Skip to content

Commit a2a988a

Browse files
authored
Merge branch 'newarchitecture' into ar2rsawseen/feature/ingestion
2 parents c6807e5 + a8f2058 commit a2a988a

29 files changed

+965
-1524
lines changed

.github/workflows/main.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ name: CI
66
on:
77
# Triggers the workflow on push or pull request events but only for the master branch
88
pull_request:
9-
branches: [ master, next, release.*, flex ]
9+
branches: [ master, next, release.*, flex, newarchitecture ]
1010

1111
# Allows you to run this workflow manually from the Actions tab
1212
workflow_dispatch:

api/aggregator/usage.js

+19-9
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ usage.processSessionFromStream = function(token, currEvent, params) {
147147
};
148148

149149
usage.processSessionMetricsFromStream = function(currEvent, uniqueLevelsZero, uniqueLevelsMonth, params) {
150+
151+
/**
152+
*
153+
* @param {string} id - document id
154+
* @param {function} callback - calback function
155+
*/
156+
function fetchMeta(id, callback) {
157+
common.readBatcher.getOne(metaToFetch[id].coll, {'_id': metaToFetch[id].id}, {meta_v2: 1}, (err, metaDoc) => {
158+
var retObj = metaDoc || {};
159+
retObj.coll = metaToFetch[id].coll;
160+
callback(null, retObj);
161+
});
162+
}
163+
150164
var isNewUser = true;
151165
var userProps = {};
152166
if (currEvent.sg && currEvent.sg.prev_session) {
@@ -160,6 +174,7 @@ usage.processSessionMetricsFromStream = function(currEvent, uniqueLevelsZero, un
160174
var dateIds = common.getDateIds(params);
161175
var metaToFetch = {};
162176
if (plugins.getConfig("api", params.app && params.app.plugins, true).metric_limit > 0) {
177+
var postfix;
163178
for (let i = 0; i < predefinedMetrics.length; i++) {
164179
for (let j = 0; j < predefinedMetrics[i].metrics.length; j++) {
165180
let tmpMetric = predefinedMetrics[i].metrics[j],
@@ -185,13 +200,7 @@ usage.processSessionMetricsFromStream = function(currEvent, uniqueLevelsZero, un
185200
}
186201
}
187202
}
188-
function fetchMeta(id, callback) {
189-
common.readBatcher.getOne(metaToFetch[id].coll, {'_id': metaToFetch[id].id}, {meta_v2: 1}, (err, metaDoc) => {
190-
var retObj = metaDoc || {};
191-
retObj.coll = metaToFetch[id].coll;
192-
callback(null, retObj);
193-
});
194-
}
203+
195204
var metas = {};
196205
async.map(Object.keys(metaToFetch), fetchMeta, function(err, metaDocs) {
197206
for (let i = 0; i < metaDocs.length; i++) {
@@ -210,8 +219,9 @@ usage.processSessionMetricsFromStream = function(currEvent, uniqueLevelsZero, un
210219
monthObjUpdate = [],
211220
tmpMetric = predefinedMetrics[i].metrics[j],
212221
recvMetricValue = "",
213-
escapedMetricVal = "",
214-
postfix = "";
222+
escapedMetricVal = "";
223+
224+
postfix = "";
215225

216226
recvMetricValue = currEvent.up[tmpMetric.short_code];
217227

api/api.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const pack = require('../package.json');
1414
const versionInfo = require('../frontend/express/version.info.js');
1515
const moment = require("moment");
1616

17-
var {MongoDbQueryRunner} = require('../plugins/drill/api/parts/data/MongoDbQueryRunner.js');
17+
var {MongoDbQueryRunner} = require('./utils/mongoDbQueryRunner.js');
1818

1919
var t = ["countly:", "api"];
2020
common.processRequest = processRequest;

api/ingestor/requestProcessor.js

+25-15
Original file line numberDiff line numberDiff line change
@@ -401,12 +401,18 @@ var processToDrill = async function(params, drill_updates, callback) {
401401
"ts": events[i].timestamp || Date.now().valueOf(),
402402
"uid": params.app_user.uid,
403403
"_uid": params.app_user._id,
404-
"did": params.app_user.did,
405-
"ce": true,
404+
"did": params.app_user.did
406405
//d, w,m,h
407406
};
408407
if (currEvent.key.indexOf('[CLY]_') === 0) {
409-
dbEventObject.ce = false;
408+
dbEventObject.n = events[i].key;
409+
}
410+
else {
411+
dbEventObject.n = events[i].key;
412+
dbEventObject.e = "[CLY]_custom";
413+
}
414+
if (currEvent.name) {
415+
dbEventObject.n = currEvent.name;
410416
}
411417

412418
if (dbAppUser && dbAppUser[common.dbUserMap.user_id]) {
@@ -557,7 +563,7 @@ var processToDrill = async function(params, drill_updates, callback) {
557563
eventsToInsert.push({"insertOne": {"document": dbEventObject}});
558564
if (eventKey === "[CLY]_view") {
559565
var view_id = crypto.createHash('md5').update(currEvent.segmentation.name).digest('hex');
560-
viewUpdate[view_id] = {"lvid": dbEventObject._id, "ts": dbEventObject.ts};
566+
viewUpdate[view_id] = {"lvid": dbEventObject._id, "ts": dbEventObject.ts, "a": params.app_id + ""};
561567
if (currEvent.segmentation) {
562568
var sgm = {};
563569
var have_sgm = false;
@@ -580,25 +586,26 @@ var processToDrill = async function(params, drill_updates, callback) {
580586
for (var z4 = 0; z4 < drill_updates.length;z4++) {
581587
eventsToInsert.push(drill_updates[z4]);
582588
}
583-
584589
}
585590
if (eventsToInsert.length > 0) {
586591
try {
587592
await common.drillDb.collection("drill_events").bulkWrite(eventsToInsert, {ordered: false});
593+
callback(null);
588594
if (Object.keys(viewUpdate).length) {
589595
//updates app_viewdata colelction.If delayed new incoming view updates will not have reference. (So can do in aggregator only if we can insure minimal delay)
590596
try {
591-
await common.db.collection("app_userviews" + params.app_id).updateOne({_id: params.app_user.uid}, {$set: viewUpdate}, {upsert: true});
597+
await common.db.collection("app_userviews").updateOne({_id: params.app_id + "_" + params.app_user.uid}, {$set: viewUpdate}, {upsert: true});
592598
}
593599
catch (err) {
594600
log.e(err);
595601
}
596602
}
597-
callback(null);
603+
598604
}
599605
catch (errors) {
600606
var realError;
601607
if (errors && Array.isArray(errors)) {
608+
log.e(JSON.stringify(errors));
602609
for (let i = 0; i < errors.length; i++) {
603610
if ([11000, 10334, 17419].indexOf(errors[i].code) === -1) {
604611
realError = true;
@@ -609,16 +616,17 @@ var processToDrill = async function(params, drill_updates, callback) {
609616
callback(realError);
610617
}
611618
else {
619+
callback(null);
612620
if (Object.keys(viewUpdate).length) {
613621
//updates app_viewdata colelction.If delayed new incoming view updates will not have reference. (So can do in aggregator only if we can insure minimal delay)
614622
try {
615-
await common.db.collection("app_userviews" + params.app_id).updateOne({_id: params.app_user.uid}, {$set: viewUpdate}, {upsert: true});
623+
await common.db.collection("app_userviews").updateOne({_id: params.app_id + "_" + params.app_user.uid}, {$set: viewUpdate}, {upsert: true});
616624
}
617625
catch (err) {
618626
log.e(err);
619627
}
620628
}
621-
callback(null);
629+
622630
}
623631
}
624632
else {
@@ -701,7 +709,14 @@ const processRequestData = (ob, done) => {
701709
update = common.mergeQuery(update, ob.updates[i]);
702710
}
703711
}
704-
Promise.all([ common.updateAppUser(ob.params, update, false, true)]).then(function() {
712+
//var SaveAppUser = Date.now().valueOf();
713+
714+
common.updateAppUser(ob.params, update, false, function() {
715+
716+
/*var AfterSaveAppUser = Date.now().valueOf();
717+
if (AfterSaveAppUser - SaveAppUser > treshold) {
718+
console.log("SaveAppUser time: " + (AfterSaveAppUser - SaveAppUser));
719+
}*/
705720
processToDrill(ob.params, ob.drill_updates, function(error) {
706721
if (error) {
707722
common.returnMessage(ob.params, 400, 'Could not record events:' + error);
@@ -712,10 +727,6 @@ const processRequestData = (ob, done) => {
712727
}
713728
});
714729

715-
}).catch(function(err) {
716-
log.e(err);
717-
common.returnMessage(ob.params, 400, 'Cannot process request');
718-
done();
719730
});
720731
};
721732

@@ -730,7 +741,6 @@ plugins.register("/sdk/process_request", async function(ob) {
730741
*
731742
* @param {*} params - request parameters
732743
* @param {*} done - callback function
733-
* @returns {boolean} - returns false if request is cancelled
734744
*
735745
*
736746
* 1)Get App collection settings

api/parts/data/batcher.js

+10-5
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ class WriteBatcher {
197197
});
198198
}
199199

200+
/**
201+
* Function to call once flushed
202+
* @param {string} name - name of collection
203+
* @param {function} callback - callback function
204+
*/
200205
addFlushCallback(name, callback) {
201206
this.flushCallbacks[name] = callback;
202207
}
@@ -226,7 +231,7 @@ class WriteBatcher {
226231
* @param {string} db - name of the database for which to write data
227232
* @param {string} collection - name of the collection for which to write data
228233
*/
229-
async flush(db, collection, callback) {
234+
async flush(db, collection) {
230235
var no_fallback_errors = [10334, 17419, 14, 56];
231236
var notify_errors = [10334, 17419];
232237
if (this.data[db] && this.data[db][collection] && this.data[db][collection].data && Object.keys(this.data[db][collection].data).length) {
@@ -247,7 +252,6 @@ class WriteBatcher {
247252
});
248253
}
249254
}
250-
var token0 = this.data[db][collection].t;
251255
this.data[db][collection] = {"data": {}};
252256
batcherStats.update_queued -= queries.length;
253257
batcherStats.update_processing += queries.length;
@@ -348,6 +352,7 @@ class WriteBatcher {
348352
* @param {string} id - id of the document
349353
* @param {object} operation - operation
350354
* @param {string} db - name of the database for which to write data
355+
* @param {object} options - options for the operation
351356
*/
352357
add(collection, id, operation, db = "countly", options) {
353358
options = options || {};
@@ -451,9 +456,9 @@ class ReadBatcher {
451456
}
452457
else {
453458
try {
454-
var res = await this.db.collection(collection).findOne(query, projection);
455-
this.cache(collection, id, query, projection, res, false);
456-
return res;
459+
var res2 = await this.db.collection(collection).findOne(query, projection);
460+
this.cache(collection, id, query, projection, res2, false);
461+
return res2;
457462
}
458463
catch (err) {
459464
if (this.data && this.data[collection] && this.data[collection][id] && this.data[collection][id].promise) {

api/parts/data/cacher.js

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class Cacher {
88
/**
99
* Create batcher instance
1010
* @param {Db} db - database object
11+
* @param {object} options - options object
1112
*/
1213
constructor(db, options) {
1314
this.db = db;

api/parts/data/changeStreamReader.js

+32-13
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ const common = require("../../utils/common");
22
const log = require('../../utils/log.js')("changeStreamReader");
33
var Timestamp = require('mongodb').Timestamp;
44

5-
5+
/**
6+
* Class to ruse change streams to read from mongodb.
7+
*/
68
class changeStreamReader {
7-
/*
9+
/**
810
* @param {Object} db - Database object
911
* @param {Object} options - Options object
1012
* @param {function} onData - Finction to call when getting new data from stream
@@ -37,7 +39,7 @@ class changeStreamReader {
3739

3840
}
3941

40-
/*
42+
/**
4143
* Check if stream is closed and restart if needed
4244
*/
4345
checkState() {
@@ -52,16 +54,19 @@ class changeStreamReader {
5254
}
5355
}
5456

55-
/*
57+
/**
5658
* Process bad range((when token can't continue))
59+
* @param {Object} options - Options object
60+
* @param {Object} tokenInfo - Token info object
5761
*/
5862
async processBadRange(options, tokenInfo) {
5963
console.log("Processing bad range");
6064
console.log(JSON.stringify({cd: {$gte: options.cd1, $lt: options.cd2}}));
6165
var gotTokenDoc = false;
66+
var doc;
6267
var cursor = this.db.collection(this.collection).find({cd: {$gte: new Date(options.cd1), $lt: new Date(options.cd2)}}).sort({cd: 1});
6368
while (await cursor.hasNext() && !gotTokenDoc) {
64-
var doc = await cursor.next();
69+
doc = await cursor.next();
6570
if (JSON.stringify(doc._id) === JSON.stringify(tokenInfo._id) || doc.cd > tokenInfo.cd) {
6671
gotTokenDoc = true;
6772
}
@@ -75,7 +80,7 @@ class changeStreamReader {
7580
}
7681

7782
while (await cursor.hasNext()) {
78-
var doc = await cursor.next();
83+
doc = await cursor.next();
7984
console.log("Process:" + JSON.stringify(doc));
8085
tokenInfo.cd = doc.cd;
8186
tokenInfo._id = doc._id;
@@ -84,7 +89,10 @@ class changeStreamReader {
8489
console.log("done");
8590
}
8691

87-
/*Opening stream*/
92+
/**
93+
* Sets up stream to read data from mongodb
94+
* @param {function} onData - function to call on new data
95+
*/
8896
async setUp(onData) {
8997
var token;
9098
try {
@@ -149,14 +157,14 @@ class changeStreamReader {
149157
}
150158
else {
151159
this.stream.on('change', (change) => {
152-
var token = {token: self.stream.resumeToken};
153-
token._id = change.__id;
160+
var my_token = {token: self.stream.resumeToken};
161+
my_token._id = change.__id;
154162
if (change.cd) {
155-
token.cd = change.cd;
156-
onData(token, change);
163+
my_token.cd = change.cd;
164+
onData(my_token, change);
157165
}
158166
else {
159-
onData(token, change);
167+
onData(my_token, change);
160168
}
161169
});
162170

@@ -184,12 +192,21 @@ class changeStreamReader {
184192
console.log("Set Failed token", token);
185193
this.failedToken = token;
186194
}
195+
//Failed because of db does not support change streams. Run in "query mode";
196+
else if (err.code === "look_for_right_code") {
197+
//Call process bad range if there is any info about last token.
198+
//Switch to query mode
199+
}
187200
else {
188201
log.e("Error on change stream", err);
189202
}
190203
}
191204
}
192205

206+
/**
207+
* Acknowledges token as recorded
208+
* @param {object} token - token info
209+
*/
193210
async acknowledgeToken(token) {
194211
this.lastToken = token;
195212
//Update last processed token to database
@@ -207,7 +224,9 @@ class changeStreamReader {
207224
}
208225
}
209226

210-
//Close stream by command
227+
/**
228+
* Closes stream permanently
229+
*/
211230
close() {
212231
console.log("Closing permanently");
213232
if (this.intervalRunner) {

api/parts/data/events.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,25 @@ var countlyEvents = {},
1313

1414

1515

16-
countlyEvents.processEventFromStream = function(event) {
17-
var forbiddenSegValues = [];
16+
countlyEvents.processEventFromStream = function() {
17+
/*var forbiddenSegValues = [];
1818
for (let i = 1; i < 32; i++) {
1919
forbiddenSegValues.push(i + "");
2020
}
2121
2222
//Write event totals for aggregated Data
2323
2424
common.readBatcher.getOne("apps", {'_id': event.a}, function(err, app) {
25-
/* common.readBatcher.getOne("events", {'_id': event.a}, {
25+
common.readBatcher.getOne("events", {'_id': event.a}, {
2626
list: 1,
2727
segments: 1,
2828
omitted_segments: 1,
2929
whitelisted_segments: 1
3030
}, (err, eventColl) => {
3131
32-
});*/
32+
});
3333
34-
});
34+
});*/
3535
};
3636
/**
3737
* Process JSON decoded events data from request

0 commit comments

Comments
 (0)