Skip to content

Feature/ingestion #6080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions api/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ plugins.connectToAllDatabases(true).then(function() {
{"$match": {"operationType": "insert", "fullDocument.e": "[CLY]_custom"}},
{"$project": {"__iid": "$fullDocument._id", "cd": "$fullDocument.cd", "a": "$fullDocument.a", "e": "$fullDocument.e", "n": "$fullDocument.n", "ts": "$fullDocument.ts", "sg": "$fullDocument.sg", "c": "$fullDocument.c", "s": "$fullDocument.s", "dur": "$fullDocument.dur"}}
],
fallback: {
pipeline: [{
"$match": {"e": {"$in": ["[CLY]_custom"]}}
}, {"$project": {"__id": "$_id", "cd": "$cd", "a": "$a", "e": "$e", "n": "$n", "ts": "$ts", "sg": "$sg", "c": "$c", "s": "$s", "dur": "$dur"}}],
},
"name": "event-ingestion"
}, (token, currEvent) => {
if (currEvent && currEvent.a && currEvent.e) {
Expand All @@ -98,9 +103,17 @@ plugins.connectToAllDatabases(true).then(function() {
{"$match": {"operationType": "insert", "fullDocument.e": "[CLY]_session"}},
{"$addFields": {"__id": "$fullDocument._id", "cd": "$fullDocument.cd"}},
],
fallback: {
pipeline: [{
"$match": {"e": {"$in": ["[CLY]_session"]}}
}]
},
"name": "session-ingestion"
}, (token, next) => {
var currEvent = next.fullDocument;
if (next.fullDocument) {
next = next.fullDocument;
}
var currEvent = next;
if (currEvent && currEvent.a) {
//Record in session data
common.readBatcher.getOne("apps", common.db.ObjectID(currEvent.a), function(err, app) {
Expand All @@ -122,12 +135,15 @@ plugins.connectToAllDatabases(true).then(function() {

plugins.register("/aggregator", function() {
var writeBatcher = new WriteBatcher(common.db);

var changeStream = new changeStreamReader(common.drillDb, {
pipeline: [
{"$match": {"operationType": "update"}},
{"$addFields": {"__id": "$fullDocument._id", "cd": "$fullDocument.cd"}}
],
fallback: {
pipeline: [{"$match": {"e": {"$in": ["[CLY]_session"]}}}],
"timefield": "lu"
},
"options": {fullDocument: "updateLookup"},
"name": "session-updates",
"collection": "drill_events",
Expand All @@ -138,7 +154,12 @@ plugins.connectToAllDatabases(true).then(function() {
}
}
}, (token, fullDoc) => {
var next = fullDoc.fullDocument;
var fallback_processing = true;
var next = fullDoc;
if (next.fullDocument) {
fallback_processing = false;
next = fullDoc.fullDocument;
}
if (next && next.a && next.e && next.e === "[CLY]_session" && next.n && next.ts) {
common.readBatcher.getOne("apps", common.db.ObjectID(next.a), function(err, app) {
//record event totals in aggregated data
Expand All @@ -147,8 +168,13 @@ plugins.connectToAllDatabases(true).then(function() {
return;
}
if (app) {
var dur = (fullDoc && fullDoc.updateDescription && fullDoc.updateDescription.updatedFields && fullDoc.updateDescription.updatedFields.dur) || 0;
//if(dur){
var dur = 0;
if (fallback_processing) {
dur = next.dur || 0;
}
else {
dur = (fullDoc && fullDoc.updateDescription && fullDoc.updateDescription.updatedFields && fullDoc.updateDescription.updatedFields.dur) || 0;
}//if(dur){
usage.processSessionDurationRange(writeBatcher, token, dur, next.did, {"app_id": next.a, "app": app, "time": common.initTimeObj(app.timezone, next.ts), "appTimezone": (app.timezone || "UTC")});
//}
}
Expand Down
134 changes: 105 additions & 29 deletions api/aggregator/usage.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ usage.processSessionFromStream = function(token, currEvent, params) {
};


usage.processEventFromStream = function(token, currEvent) {
usage.processEventFromStream = function(token, currEvent, writeBatcher) {
writeBatcher = writeBatcher || common.writeBatcher;
var forbiddenSegValues = [];
for (let i = 1; i < 32; i++) {
forbiddenSegValues.push(i + "");
Expand All @@ -207,10 +208,15 @@ usage.processEventFromStream = function(token, currEvent) {
return;
}
else {
common.readBatcher.getOne("events", common.db.ObjectID(currEvent.a), {"transformation": "event_object"}, function(err2, eventColl) {
common.readBatcher.getOne("events", common.db.ObjectID(currEvent.a), {"transformation": "event_object"}, async function(err2, eventColl) {
var tmpEventObj = {};
var tmpEventColl = {};
var tmpTotalObj = {};
var pluginsGetConfig = plugins.getConfig("api", app.plugins, true);

var time = common.initTimeObj(app.timezone, currEvent.ts);
var params = {time: time, app_id: currEvent.a, app: app, appTimezone: app.timezone || "UTC"};

var shortEventName = currEvent.n;
if (currEvent.e !== "[CLY]_custom") {
shortEventName = currEvent.e;
Expand All @@ -230,6 +236,32 @@ usage.processEventFromStream = function(token, currEvent) {
}
}
eventColl._segments = eventColl._segments || {};
var eventCollectionName = crypto.createHash('sha1').update(shortEventName + params.app_id).digest('hex');
var updates = [];

if (currEvent.s && common.isNumber(currEvent.s)) {
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.sum, currEvent.s);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.sum, currEvent.s);
}
else {
currEvent.s = 0;
}

if (currEvent.dur && common.isNumber(currEvent.dur)) {
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.dur, currEvent.dur);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.dur, currEvent.dur);
}
else {
currEvent.dur = 0;
}
currEvent.c = currEvent.c || 1;
if (currEvent.c && common.isNumber(currEvent.c)) {
currEvent.count = parseInt(currEvent.c, 10);
}

common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.count, currEvent.count);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.count, currEvent.count);


for (var seg in currEvent.sg) {
if (forbiddenSegValues.indexOf(currEvent.sg[seg] + "") !== -1) {
Expand All @@ -239,16 +271,17 @@ usage.processEventFromStream = function(token, currEvent) {
if (eventColl._omitted_segments[shortEventName][seg]) {
continue;
}

}
if (eventColl._whitelisted_segments && eventColl._whitelisted_segments[shortEventName]) {
if (!eventColl._whitelisted_segments[shortEventName][seg]) {
continue;
}
}
if (Array.isArray(currEvent.sg[seg])) {
continue; //Skipping arrays
}



//Segment is not registred in meta.
if (!eventColl._segments[shortEventName] || !eventColl._segments[shortEventName]._list[seg]) {
eventColl._segments[shortEventName] = eventColl._segments[shortEventName] || {_list: {}, _list_length: 0};
eventColl._segments[shortEventName]._list[seg] = true;
Expand All @@ -265,38 +298,67 @@ usage.processEventFromStream = function(token, currEvent) {
rootUpdate.$addToSet["segments." + shortEventName] = seg;
}
}
}

var time = common.initTimeObj(app.timezone, currEvent.ts);
var params = {time: time, app_id: currEvent.a, app: app, appTimezone: app.timezone || "UTC"};
//load meta for this segment in cacher. Add new value if needed

if (currEvent.s && common.isNumber(currEvent.s)) {
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.sum, currEvent.s);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.sum, currEvent.s);
}
var tmpSegVal = currEvent.sg[seg] + "";
tmpSegVal = tmpSegVal.replace(/^\$+/, "").replace(/\./g, ":");
tmpSegVal = common.encodeCharacters(tmpSegVal);

if (currEvent.dur && common.isNumber(currEvent.dur)) {
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.dur, currEvent.dur);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.dur, currEvent.dur);
}
currEvent.c = currEvent.c || 1;
if (currEvent.c && common.isNumber(currEvent.count)) {
currEvent.count = parseInt(currEvent.count, 10);
}
if (forbiddenSegValues.indexOf(tmpSegVal) !== -1) {
tmpSegVal = "[CLY]" + tmpSegVal;
}

common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.count, currEvent.count);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.count, currEvent.count);
var postfix_seg = common.crypto.createHash("md5").update(tmpSegVal).digest('base64')[0];
var meta = await common.readBatcher.getOne("events_meta", {"_id": eventCollectionName + "no-segment_" + common.getDateIds(params).zero + "_" + postfix_seg});

if (pluginsGetConfig.event_segmentation_value_limit && meta.meta_v2 &&
meta.meta_v2[seg] &&
meta.meta_v2[seg].indexOf(tmpSegVal) === -1 &&
meta.meta_v2[seg].length >= pluginsGetConfig.event_segmentation_value_limit) {
continue;
}

if (!meta.meta_v2 || !meta.meta_v2[seg] || meta.meta_v2[seg].indexOf(tmpSegVal) === -1) {
meta.meta_v2 = meta.meta_v2 || {};
meta.meta_v2[seg] = meta.meta_v2[seg] || [];
meta.meta_v2[seg].push(tmpSegVal);
updates.push({
id: currEvent.a + "_" + eventCollectionName + "_no-segment_" + common.getDateIds(params).zero + "_" + postfix_seg,
update: {"$set": {["meta_v2." + seg + "." + tmpSegVal]: true, ["meta_v2.segments." + seg]: true, "s": "no-segment", "e": shortEventName, "m": common.getDateIds(params).zero, "a": params.app_id + ""}}
});
}
//record data
var tmpObj = {};

if (currEvent.s) {
common.fillTimeObjectMonth(params, tmpObj, tmpSegVal + '.' + common.dbMap.sum, currEvent.s);
}

if (currEvent.dur) {
common.fillTimeObjectMonth(params, tmpEventObj, tmpSegVal + '.' + common.dbMap.dur, currEvent.dur);
}

common.fillTimeObjectMonth(params, tmpObj, tmpSegVal + '.' + common.dbMap.count, currEvent.c);
updates.push({
id: currEvent.a + "_" + eventCollectionName + "_" + seg + "_" + common.getDateIds(params).month + "_" + postfix_seg,
update: {$inc: tmpObj, $set: {"s": seg, "e": shortEventName, m: common.getDateIds(params).month, a: params.app_id + ""}}
});
}

var dateIds = common.getDateIds(params);
var postfix2 = common.crypto.createHash("md5").update(shortEventName).digest('base64')[0];

tmpEventColl["no-segment" + "." + dateIds.month] = tmpEventObj;

for (var z = 0; z < updates.length; z++) {
writeBatcher.add("events_data", updates[z].id, updates[z].update, "countly", {token: token});
}
//ID is - appID_hash_no-segment_month
var eventCollectionName = crypto.createHash('sha1').update(shortEventName + params.app_id).digest('hex');

var _id = currEvent.a + "_" + eventCollectionName + "_no-segment_" + dateIds.month;
//Current event
common.writeBatcher.add("events_data", _id, {
writeBatcher.add("events_data", _id, {
"$set": {
"m": dateIds.month,
"s": "no-segment",
Expand All @@ -308,16 +370,30 @@ usage.processEventFromStream = function(token, currEvent) {
{token: token});

//Total event
common.writeBatcher.add("events_data", currEvent.a + "_all_key_" + dateIds.month + "_" + postfix2, {
writeBatcher.add("events_data", currEvent.a + "_all_key_" + dateIds.month + "_" + postfix2, {
"$set": {
"m": dateIds.month,
"s": "key",
"a": params.app_id + "",
"e": "all"
},
"$inc": tmpEventObj
});
"$inc": tmpTotalObj
}, "countly",
{token: token});

//Meta document for all events:
writeBatcher.add("events_data", params.app_id + "_all_" + "no-segment_" + dateIds.zero + "_" + postfix2, {
$set: {
m: dateIds.zero,
s: "no-segment",
a: params.app_id + "",
e: "all",
["meta_v2.key." + shortEventName]: true,
"meta_v2.segments.key": true

}
}, "countly",
{token: token});
//Total event meta data

if (Object.keys(rootUpdate).length) {
Expand All @@ -331,7 +407,6 @@ usage.processEventFromStream = function(token, currEvent) {


usage.processSessionMetricsFromStream = function(currEvent, uniqueLevelsZero, uniqueLevelsMonth, params) {

/**
*
* @param {string} id - document id
Expand All @@ -357,7 +432,8 @@ usage.processSessionMetricsFromStream = function(currEvent, uniqueLevelsZero, un

var dateIds = common.getDateIds(params);
var metaToFetch = {};
if (plugins.getConfig("api", params.app && params.app.plugins, true).metric_limit > 0) {

if ((plugins.getConfig("api", params.app && params.app.plugins, true).metric_limit || 1000) > 0) {
var postfix;
for (let i = 0; i < predefinedMetrics.length; i++) {
for (let j = 0; j < predefinedMetrics[i].metrics.length; j++) {
Expand Down
2 changes: 2 additions & 0 deletions api/ingestor.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ plugins.connectToAllDatabases(true).then(function() {
res: res,
req: req
};

console.log("recieved some data");
params.tt = Date.now().valueOf();
if (req.method.toLowerCase() === 'post') {
const formidableOptions = {};
Expand Down
25 changes: 17 additions & 8 deletions api/ingestor/requestProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,20 +351,20 @@ var processToDrill = async function(params, drill_updates, callback) {
if (!currEvent.key || (currEvent.key.indexOf('[CLY]_') === 0 && plugins.internalDrillEvents.indexOf(currEvent.key) === -1)) {
continue;
}

/*
if (currEvent.key === "[CLY]_session" && !plugins.getConfig("drill", params.app && params.app.plugins, true).record_sessions) {
continue;
}

if (currEvent.key === "[CLY]_view" && !plugins.getConfig("drill", params.app && params.app.plugins, true).record_views) {
continue;
}
}*/

if (currEvent.key === "[CLY]_view" && !(currEvent.segmentation && currEvent.segmentation.visit)) {
continue;
}


/*
if (currEvent.key === "[CLY]_action" && !plugins.getConfig("drill", params.app && params.app.plugins, true).record_actions) {
continue;
}
Expand All @@ -391,7 +391,7 @@ var processToDrill = async function(params, drill_updates, callback) {

if ((currEvent.key === "[CLY]_consent") && !plugins.getConfig("drill", params.app && params.app.plugins, true).record_consent) {
continue;
}
}*/


var dbEventObject = {
Expand Down Expand Up @@ -784,7 +784,7 @@ const validateAppForWriteAPI = (params, done) => {
}
if (!validateRedirect({params: params, app: app})) {
if (!params.res.finished && !params.waitForResponse) {
common.returnOutput(params, {result: 'Success', info: 'Request regirected: ' + params.cancelRequest});
common.returnOutput(params, {result: 'Success', info: 'Request redirected: ' + params.cancelRequest});
}
done();
return;
Expand Down Expand Up @@ -841,7 +841,7 @@ const validateAppForWriteAPI = (params, done) => {
params.collectedMetrics = {};

let payload = params.href.substr(3) || "";
if (params.req.method.toLowerCase() === 'post') {
if (params.req && params.req.method && params.req.method.toLowerCase() === 'post') {
payload += "&" + params.req.body;
}
//remove dynamic parameters
Expand Down Expand Up @@ -1116,8 +1116,17 @@ const processRequest = (params) => {
break;
}
default:
common.returnMessage(params, 400, 'Invalid path');
break;
if (!plugins.dispatch(apiPath, {
params: params,
paths: paths
})) {
if (!plugins.dispatch(params.fullPath, {
params: params,
paths: paths
})) {
common.returnMessage(params, 400, 'Invalid path');
}
}
}

};
Expand Down
Loading
Loading