Skip to content

Commit 5f04ed7

Browse files
committed
[taskmanager] fallback to gridfs for storing larger data sets
1 parent f705e8d commit 5f04ed7

File tree

1 file changed

+79
-38
lines changed

1 file changed

+79
-38
lines changed

api/utils/taskmanager.js

+79-38
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
/** @lends module:api/utils/taskmanager */
77
var taskmanager = {};
88
var common = require("./common.js");
9+
var countlyFs = require("./countlyFs.js");
910
var crypto = require("crypto");
1011
var request = require("request");
1112
const log = require('./log.js')('core:taskmanager');
@@ -227,6 +228,13 @@ taskmanager.createTask = function(options, callback) {
227228
*/
228229
taskmanager.saveResult = function(options, data, callback) {
229230
options.db = options.db || common.db;
231+
var update = {
232+
end: new Date().getTime(),
233+
status: "completed",
234+
hasData: true,
235+
data: JSON.stringify(data || {}),
236+
};
237+
230238
if (options.errored) {
231239
var message = "";
232240
if (options.errormsg) {
@@ -235,56 +243,43 @@ taskmanager.saveResult = function(options, data, callback) {
235243
if (message.message) {
236244
message = message.message;
237245
}
246+
update.status = "errored";
247+
update.errormsg = message;
248+
}
238249

250+
options.db.collection("long_tasks").findOne({_id: options.id}, function(error, task) {
239251
options.db.collection("long_tasks").update({_id: options.id}, {
240-
$set: {
241-
end: new Date().getTime(),
242-
status: "errored",
243-
hasData: true,
244-
data: JSON.stringify(data || {}),
245-
errormsg: message
246-
}
252+
$set: update
247253
}, {'upsert': false}, function(err, res) {
248254
if (options.subtask && !err) {
249255
var updateObj = {$set: {}};
250-
updateObj.$set["subtasks." + options.id + ".status"] = "errored";
256+
updateObj.$set["subtasks." + options.id + ".status"] = options.errored ? "errored" : "completed";
251257
updateObj.$set["subtasks." + options.id + ".hasData"] = true;
252258
updateObj.$set["subtasks." + options.id + ".end"] = new Date().getTime();
253259

254-
options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, callback);
255-
}
256-
else {
257-
if (callback) {
258-
callback(err, res);
259-
}
260-
}
261-
});
262-
}
263-
else {
264-
options.db.collection("long_tasks").update({_id: options.id}, {
265-
$set: {
266-
end: new Date().getTime(),
267-
status: "completed",
268-
hasData: true,
269-
data: JSON.stringify(data || {})
260+
options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, function() {});
270261
}
271-
}, {'upsert': false}, function(err, res) {
272-
if (options.subtask && !err) {
273-
var updateObj = {$set: {}};
274-
updateObj.$set["subtasks." + options.id + ".status"] = "completed";
275-
updateObj.$set["subtasks." + options.id + ".hasData"] = true;
276-
updateObj.$set["subtasks." + options.id + ".end"] = new Date().getTime();
277-
options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, callback);
262+
263+
//document too large for update or it was already previous stored in gridfs
264+
if ((err && err.code === 17419) || (task && task.gridfs)) {
265+
//let's store it in gridfs
266+
update.data = {};
267+
update.gridfs = true;
268+
options.db.collection("long_tasks").update({_id: options.id}, {$set: update}, function() {
269+
countlyFs.gridfs.saveData("task_results", options.id, JSON.stringify(data || {}), {id: options.id}, function(err2, res2) {
270+
if (callback) {
271+
callback(err2, res2);
272+
}
273+
});
274+
});
278275
}
279276
else {
280277
if (callback) {
281278
callback(err, res);
282279
}
283280
}
284281
});
285-
286-
287-
}
282+
});
288283
};
289284

290285
/**
@@ -310,19 +305,19 @@ taskmanager.nameResult = function(options, data, callback) {
310305
*/
311306
taskmanager.getResult = function(options, callback) {
312307
options.db = options.db || common.db;
313-
options.db.collection("long_tasks").findOne({_id: options.id}, callback);
308+
options.db.collection("long_tasks").findOne({_id: options.id}, getResult(callback));
314309
};
315310

316311
/**
317312
* Get specific task result
318313
* @param {object} options - options for the task
319314
* @param {object} options.db - database connection
320-
* @param {string} options.id - id of the task result
315+
* @param {string} options.query - query for the task result
321316
* @param {funciton} callback - callback for the result
322317
*/
323318
taskmanager.getResultByQuery = function(options, callback) {
324319
options.db = options.db || common.db;
325-
options.db.collection("long_tasks").findOne(options.query, callback);
320+
options.db.collection("long_tasks").findOne(options.query, getResult(callback));
326321
};
327322

328323
/**
@@ -531,7 +526,24 @@ taskmanager.getTableQueryResult = async function(options, callback) {
531526
*/
532527
taskmanager.deleteResult = function(options, callback) {
533528
options.db = options.db || common.db;
534-
options.db.collection("long_tasks").remove({$or: [{_id: options.id}, {subtask: options.id}]}, callback);
529+
options.db.collection("long_tasks").findOne({_id: options.id}, function(err, task) {
530+
if (err || !task) {
531+
return callback(err);
532+
}
533+
if (task.gridfs) {
534+
countlyFs.gridfs.deleteFile("task_results", options.id, {id: options.id}, function() {});
535+
}
536+
options.db.collection("long_tasks").remove({_id: options.id}, callback);
537+
if (task.taskgroup) {
538+
options.db.collection("long_tasks").find({subtask: options.id}, {_id: 1}).toArray(function(err2, tasks) {
539+
if (tasks && tasks.length) {
540+
for (var i = 0; i < tasks.length; i++) {
541+
taskmanager.deleteResult({id: tasks[i]._id, db: options.db}, function() {});
542+
}
543+
}
544+
});
545+
}
546+
});
535547
};
536548

537549
/**
@@ -657,4 +669,33 @@ taskmanager.rerunTask = function(options, callback) {
657669
}
658670
});
659671
};
672+
673+
/**
674+
* Create a callback for getting result, including checking gridfs
675+
* @param {function} callback - callback for the result
676+
* @returns {function} callback to use for db query
677+
*/
678+
function getResult(callback) {
679+
return function(err, data) {
680+
if (!err) {
681+
if (data && data.gridfs) {
682+
countlyFs.gridfs.getData("task_results", data._id + "", {id: data._id}, function(err2, largeData) {
683+
if (!err2) {
684+
data.data = largeData;
685+
callback(null, data);
686+
}
687+
else {
688+
callback(err2, data);
689+
}
690+
});
691+
}
692+
else {
693+
callback(err, data);
694+
}
695+
}
696+
else {
697+
callback(err, data);
698+
}
699+
};
700+
}
660701
module.exports = taskmanager;

0 commit comments

Comments
 (0)