Skip to content

Commit 205e596

Browse files
authored
Merge pull request #4788 from Cookiezaurs/master-user-merging
[SER-849] Ensure data consistency when user merging
2 parents e1a3f92 + a44e30d commit 205e596

File tree

10 files changed

+816
-350
lines changed

10 files changed

+816
-350
lines changed

api/api.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ plugins.connectToAllDatabases().then(function() {
298298
jobs.job('api:clearTokens').replace().schedule('every 1 day');
299299
jobs.job('api:clearAutoTasks').replace().schedule('every 1 day');
300300
jobs.job('api:task').replace().schedule('every 5 minutes');
301-
//jobs.job('api:userMerge').replace().schedule('every 1 hour on the 10th min');
301+
jobs.job('api:userMerge').replace().schedule('every 10 minutes');
302302
//jobs.job('api:appExpire').replace().schedule('every 1 day');
303303
}, 10000);
304304
}

api/jobs/userMerge.js

+161-65
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,180 @@
11
'use strict';
22

33
const job = require('../parts/jobs/job.js'),
4-
async = require('async'),
5-
moment = require('moment'),
64
plugins = require('../../plugins/pluginManager.js'),
75
log = require('../utils/log.js')('job:userMerge');
6+
var Promise = require("bluebird");
7+
var usersApi = require('../parts/mgmt/app_users.js');
88

9-
/** Class for the user mergind job **/
10-
class UserMergeJob extends job.Job {
11-
/**
12-
* Run the job
13-
* @param {Db} db connection
14-
* @param {done} done callback
15-
*/
16-
run(db, done) {
17-
log.d('Merging users ...');
18-
var startTime = moment().subtract(1, 'hour').startOf('hour');
19-
var endTime = moment(startTime).endOf("hour");
20-
log.d("query from", startTime, "to", endTime);
21-
/**
22-
* read historical merges for the lest hour and process them
23-
* @param {object} app - app db document
24-
* @param {object} callback - when procssing finished
25-
**/
26-
function handleMerge(app, callback) {
27-
db.collection('app_user_merges' + app._id).find({
28-
cd: {
29-
$gte: startTime.toDate(),
30-
$lte: endTime.toDate()
31-
}
32-
}).toArray(function(err, res) {
33-
if (!err && res && res.length) {
34-
log.d('Found merges for ' + app._id + ' ', res);
35-
var merged = [];
36-
async.eachSeries(res, function(user, done2) {
9+
var handleMerges = function(db, callback) {
10+
log.d('looking for unfinished merges ...');
11+
12+
var date = Math.round(new Date().getTime() / 1000) - 60;
13+
db.collection('app_user_merges').find({"lu": {"$lt": date}}).limit(100).toArray(function(err, mergedocs) {
14+
if (err) {
15+
callback(err);
16+
}
17+
if (mergedocs && mergedocs.length > 0) {
18+
log.d('found ' + mergedocs.length + ' unfinished merges');
19+
Promise.each(mergedocs, function(user) {
20+
return new Promise((resolve)=>{
21+
var dd = user._id.split("_");
22+
if (dd.length !== 3) {
23+
log.e("deleting unexpected document in merges with bad _id: " + user._id);
24+
db.collection('app_user_merges').remove({"_id": user._id}, (err2)=>{
25+
if (err2) {
26+
log.e("error deleting document in merges with bad _id: " + user._id);
27+
log.e(err2);
28+
}
29+
resolve();
30+
});
31+
}
32+
else if (user.t > 100) {
33+
log.e("deleting document in merges with too many retries: " + user._id);
34+
db.collection('app_user_merges').remove({"_id": user._id}, (err2)=>{
35+
if (err2) {
36+
log.e("error deleting document in merges with _id: " + user._id);
37+
log.e(err2);
38+
}
39+
resolve();
40+
});
41+
}
42+
else {
43+
var app_id = dd[0];
44+
var olduid = dd[2];
45+
//user docuument is not saved merged - try merginfg it at first
3746
if (user.merged_to) {
38-
merged.push(user._id);
39-
log.d('Dispatching', {
40-
app_id: app._id + "",
41-
oldUser: {uid: user._id},
42-
newUser: {uid: user.merged_to}
43-
});
44-
plugins.dispatch("/i/device_id", {
45-
app_id: app._id + "",
46-
oldUser: {uid: user._id},
47-
newUser: {uid: user.merged_to}
48-
}, function() {
49-
done2();
50-
});
51-
}
52-
else {
53-
done2();
54-
}
55-
}, function() {
56-
//delete merged users if they still exist
57-
if (merged.length) {
58-
db.collection("app_users" + app._id).remove({uid: {$in: merged}}, function() {
59-
callback();
60-
});
47+
if (!user.u) { //user documents are not merged. Could be just failed state.
48+
log.e("user doc not saved as merged. Processing it.");
49+
db.collection('app_users' + app_id).find({"uid": {"$in": [olduid, user.merged_to]}}).toArray((err5, docs)=>{
50+
if (err5) {
51+
log.e("error fetching users for merge", err5);
52+
resolve();
53+
return;
54+
}
55+
var oldAppUser;
56+
var newAppUser;
57+
for (var z = 0; z < docs.length;z++) {
58+
if (docs[z].uid === olduid) {
59+
oldAppUser = docs[z];
60+
}
61+
if (docs[z].uid === user.merged_to) {
62+
newAppUser = docs[z];
63+
}
64+
}
65+
if (!oldAppUser && newAppUser) {
66+
//old user was merged to new user, but state update failed - we can mark it as merged and process other plugins
67+
usersApi.mergeOtherPlugins(db, app_id, {uid: user.merged_to}, {uid: olduid}, {"mc": true, "cc": true, "u": true}, resolve);
68+
}
69+
if (!newAppUser) {
70+
//new user do not exists - we can delete merging record
71+
db.collection('app_user_merges').remove({"_id": user._id}, (err4)=>{
72+
if (err4) {
73+
log.e("error deleting document in merges with bad _id: " + user._id);
74+
log.e(err4);
75+
}
76+
resolve();
77+
});
78+
}
79+
else if (oldAppUser && newAppUser) {
80+
db.collection('app_user_merges').update({"_id": user._id}, {"$inc": {"t": 1}}, {upsert: false}, function(err0) {
81+
if (err0) {
82+
log.e(err0);
83+
}
84+
//Both documents exists. We can assume that documents were not merged
85+
plugins.dispatch("/i/user_merge", {
86+
app_id: app_id,
87+
newAppUser: newAppUser,
88+
oldAppUser: oldAppUser
89+
}, function() {
90+
//merge user data
91+
usersApi.mergeUserProperties(newAppUser, oldAppUser);
92+
//update new user
93+
94+
db.collection('app_users' + app_id).update({_id: newAppUser._id}, {'$set': newAppUser}, function(err6) {
95+
if (callback && typeof callback === 'function') {
96+
callback(null, newAppUser);//we do not return error as merge is already registred. Doc merging will be retried in job.
97+
}
98+
//Dispatch to other plugins only after callback.
99+
if (!err6) {
100+
//update metric changes document
101+
db.collection("metric_changes" + app_id).update({uid: oldAppUser.uid}, {'$set': {uid: newAppUser.uid}}, {multi: true}, function(err7) {
102+
if (err7) {
103+
log.e("Failed metric changes update in app_users merge", err7);
104+
}
105+
});
106+
//delete old app users document
107+
db.collection('app_users' + app_id).remove({_id: oldAppUser._id}, function(errRemoving) {
108+
if (errRemoving) {
109+
log.e("Failed to remove merged user from database", errRemoving);
110+
}
111+
else {
112+
usersApi.mergeOtherPlugins(db, app_id, newAppUser, oldAppUser, {"cc": true, "u": true}, resolve);
113+
}
114+
});
115+
}
116+
});
117+
});
118+
});
119+
}
120+
});
121+
}
122+
else if (!user.mc) { //documents are merged, but metric changes and other plugins are not yet
123+
db.collection('app_user_merges').update({"_id": user._id}, {"$inc": {"t": 1}}, {upsert: false}, function(err0) {
124+
if (err0) {
125+
log.e(err0);
126+
}
127+
db.collection("metric_changes" + app_id).update({uid: olduid}, {'$set': {uid: usersApi.merged_to}}, {multi: true}, function(err7) {
128+
if (err7) {
129+
log.e("Failed metric changes update in app_users merge", err7);
130+
}
131+
else {
132+
usersApi.mergeOtherPlugins(db, app_id, {uid: user.merged_to}, {uid: olduid}, {"cc": true, "mc": true}, resolve);
133+
}
134+
});
135+
});
136+
}
137+
else {
138+
usersApi.mergeOtherPlugins(db, app_id, {uid: user.merged_to}, {uid: olduid}, {"cc": true}, resolve);
139+
}
61140
}
62141
else {
63-
callback();
142+
resolve();
64143
}
65-
});
144+
}
145+
});
146+
}).then(()=>{
147+
if (mergedocs.length === 100) {
148+
setTimeout(()=>{
149+
handleMerges(db, callback);
150+
}, 0); //To do not grow stack.
66151
}
67152
else {
68153
callback();
69154
}
155+
}).catch((errThrown)=>{
156+
log.e("finished with errors");
157+
log.e(errThrown);
158+
callback(errThrown);
70159
});
71160
}
72-
db.collection('apps').find({}, {_id: 1}).toArray(function(err, apps) {
73-
if (!err && apps && apps.length) {
74-
async.eachSeries(apps, handleMerge, function() {
75-
log.d('Merging users finished ...');
76-
done();
77-
});
78-
}
79-
else {
80-
done(err);
81-
}
161+
else {
162+
log.d('all users merged');
163+
callback();
164+
}
165+
});
166+
};
167+
/** Class for the user mergind job **/
168+
class UserMergeJob extends job.Job {
169+
/**
170+
* Run the job
171+
* @param {Db} db connection
172+
* @param {done} done callback
173+
*/
174+
run(db, done) {
175+
log.d('finishing up not finished merges merges...');
176+
handleMerges(db, ()=>{
177+
done();
82178
});
83179
}
84180
}

0 commit comments

Comments
 (0)