Skip to content

Commit dedd470

Browse files
committed
optimize handling of guarding the first events
1 parent 8829cfd commit dedd470

File tree

4 files changed

+127
-26
lines changed

4 files changed

+127
-26
lines changed

.eslintignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
test/*

lib/revisionGuard.js

+13-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
var debug = require('debug')('saga:revisionGuard'),
44
_ = require('lodash'),
5-
async = require('async'),
65
Queue = require('./orderQueue'),
76
ConcurrencyError = require('./errors/concurrencyError'),
87
AlreadyHandledError = require('./errors/alreadyHandledError'),
@@ -252,30 +251,37 @@ RevisionGuard.prototype = {
252251
var concatenatedId = this.getConcatenatedId(evt);
253252

254253
function proceed (revInStore) {
255-
if (!revInStore) {
254+
if (!revInStore && !self.currentHandlingRevisions[concatenatedId]) {
255+
self.currentHandlingRevisions[concatenatedId] = revInEvt;
256256
debug('first revision to store [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt);
257257
callback(null, function (clb) {
258258
self.finishGuard(evt, revInStore, clb);
259259
});
260260
return;
261261
}
262262

263-
if (revInEvt < revInStore) {
264-
debug('event already denormalized [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
263+
if (revInStore && revInEvt < revInStore) {
264+
debug('event already handled [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
265265
callback(new AlreadyHandledError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handled!'), function (clb) {
266266
clb(null);
267267
});
268268
return;
269269
}
270270

271-
if (revInEvt > revInStore) {
271+
if (revInStore && revInEvt > revInStore) {
272272
debug('queue event [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
273273
self.queueEvent(evt, callback);
274274
return;
275275
}
276276

277+
if (!revInStore && revInEvt > self.currentHandlingRevisions[concatenatedId]) {
278+
debug('queue event [concatenatedId]=' + concatenatedId + ', [currentlyHandling]=' + self.currentHandlingRevisions[concatenatedId] + ', [revInEvt]=' + revInEvt);
279+
self.queueEvent(evt, callback);
280+
return;
281+
}
282+
277283
if (self.currentHandlingRevisions[concatenatedId] >= revInEvt) {
278-
debug('event already denormalizing [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
284+
debug('event already handling [concatenatedId]=' + concatenatedId + ', [revInStore]=' + revInStore + ', [revInEvt]=' + revInEvt);
279285
callback(new AlreadyHandlingError('Event: [id]=' + dotty.get(evt, self.definition.id) + ', [revision]=' + revInEvt + ', [concatenatedId]=' + concatenatedId + ' already handling!'), function (clb) {
280286
clb(null);
281287
});
@@ -322,7 +328,7 @@ RevisionGuard.prototype = {
322328
return callback(err);
323329
}
324330

325-
if (!revInStore && revInEvt !== 1) {
331+
if (!revInStore && revInEvt !== 1 && !self.currentHandlingRevisions[concatenatedId]) {
326332
var max = (self.options.queueTimeout * self.options.queueTimeoutMaxLoops) / 3;
327333
max = max < 10 ? 10 : max;
328334
retry(max, self.options.queueTimeoutMaxLoops);

releasenotes.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## [v1.6.11](https://github.com/adrai/node-cqrs-saga/compare/v1.6.10...v1.6.11)
2+
- optimize handling of guarding the first events
3+
14
## [v1.6.10](https://github.com/adrai/node-cqrs-saga/compare/v1.6.9...v1.6.10)
25
- remove trycatch dependency due to memory leaks
36

test/unit/revisionGuardTest.js

+110-19
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ var expect = require('expect.js'),
44
revGuardStore = require('../../lib/revisionGuardStore');
55

66
describe('revisionGuard', function () {
7-
7+
88
var store;
9-
9+
1010
before(function (done) {
1111
revGuardStore.create(function (err, s) {
1212
store = s;
1313
done();
14-
})
14+
});
1515
});
1616

1717
describe('creating a new guard', function () {
@@ -122,9 +122,9 @@ describe('revisionGuard', function () {
122122
});
123123

124124
});
125-
125+
126126
describe('guarding an event', function () {
127-
127+
128128
var guard;
129129

130130
var evt1 = {
@@ -162,7 +162,7 @@ describe('revisionGuard', function () {
162162
},
163163
revision: 3
164164
};
165-
165+
166166
before(function () {
167167
guard = new RevisionGuard(store, { queueTimeout: 200 });
168168
guard.defineEvent({
@@ -178,36 +178,36 @@ describe('revisionGuard', function () {
178178
meta: 'meta'
179179
});
180180
});
181-
181+
182182
beforeEach(function (done) {
183183
guard.currentHandlingRevisions = {};
184184
store.clear(done);
185185
});
186-
186+
187187
describe('in correct order', function () {
188-
188+
189189
it('it should work as expected', function (done) {
190-
190+
191191
var guarded = 0;
192-
192+
193193
function check () {
194194
guarded++;
195-
195+
196196
if (guarded === 3) {
197197
done();
198198
}
199199
}
200-
200+
201201
guard.guard(evt1, function (err, finish) {
202202
expect(err).not.to.be.ok();
203-
203+
204204
finish(function (err) {
205205
expect(err).not.to.be.ok();
206206
expect(guarded).to.eql(0);
207207
check();
208208
});
209209
});
210-
210+
211211
setTimeout(function () {
212212
guard.guard(evt2, function (err, finish) {
213213
expect(err).not.to.be.ok();
@@ -231,9 +231,100 @@ describe('revisionGuard', function () {
231231
});
232232
});
233233
}, 20);
234-
234+
235235
});
236-
236+
237+
describe('but with slow beginning events', function () {
238+
239+
var specialGuard;
240+
241+
before(function () {
242+
specialGuard = new RevisionGuard(store, { queueTimeout: 2000, queueTimeoutMaxLoops: 15 });
243+
specialGuard.defineEvent({
244+
correlationId: 'correlationId',
245+
id: 'id',
246+
payload: 'payload',
247+
name: 'name',
248+
aggregateId: 'aggregate.id',
249+
aggregate: 'aggregate.name',
250+
context: 'context.name',
251+
revision: 'revision',
252+
version: 'version',
253+
meta: 'meta'
254+
});
255+
});
256+
257+
beforeEach(function (done) {
258+
specialGuard.currentHandlingRevisions = {};
259+
store.clear(done);
260+
});
261+
262+
it('it should work as expected', function (done) {
263+
264+
var guarded = 0;
265+
266+
function check () {
267+
guarded++;
268+
269+
if (guarded === 3) {
270+
done();
271+
}
272+
}
273+
274+
var start1 = Date.now();
275+
specialGuard.guard(evt1, function (err, finish1) {
276+
var diff1 = Date.now() - start1;
277+
console.log('guarded 1: ' + diff1);
278+
expect(err).not.to.be.ok();
279+
280+
setTimeout(function () {
281+
start1 = Date.now();
282+
finish1(function (err) {
283+
diff1 = Date.now() - start1;
284+
console.log('finished 1: ' + diff1);
285+
expect(err).not.to.be.ok();
286+
expect(guarded).to.eql(0);
287+
check();
288+
});
289+
}, 250);
290+
});
291+
292+
var start2 = Date.now();
293+
specialGuard.guard(evt2, function (err, finish2) {
294+
var diff2 = Date.now() - start2;
295+
console.log('guarded 2: ' + diff2);
296+
expect(err).not.to.be.ok();
297+
298+
start2 = Date.now();
299+
finish2(function (err) {
300+
diff2 = Date.now() - start2;
301+
console.log('finished 2: ' + diff2);
302+
expect(err).not.to.be.ok();
303+
expect(guarded).to.eql(1);
304+
check();
305+
});
306+
});
307+
308+
var start3 = Date.now();
309+
specialGuard.guard(evt3, function (err, finish3) {
310+
var diff3 = Date.now() - start3;
311+
console.log('guarded 3: ' + diff3);
312+
expect(err).not.to.be.ok();
313+
314+
start3 = Date.now();
315+
finish3(function (err) {
316+
diff3 = Date.now() - start3;
317+
console.log('finished 3: ' + diff3);
318+
expect(err).not.to.be.ok();
319+
expect(guarded).to.eql(2);
320+
check();
321+
});
322+
});
323+
324+
});
325+
326+
});
327+
237328
});
238329

239330
describe('in wrong order', function () {
@@ -293,7 +384,7 @@ describe('revisionGuard', function () {
293384
expect(err).to.be.ok();
294385
expect(err.name).to.eql('AlreadyHandledError');
295386
expect(guarded).to.eql(3);
296-
387+
297388
guard.guard(evt3, function (err) {
298389
expect(err).to.be.ok();
299390
expect(err.name).to.eql('AlreadyHandledError');
@@ -362,7 +453,7 @@ describe('revisionGuard', function () {
362453
});
363454

364455
});
365-
456+
366457
});
367458

368459
});

0 commit comments

Comments
 (0)