Skip to content

Commit 111b7d9

Browse files
committed
added possibility to getUndispatchedEvents by query
1 parent d606a44 commit 111b7d9

12 files changed

+192
-45
lines changed

.eslintrc

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ globals:
3636
jQuery: false
3737

3838
rules:
39-
# CUSTOM RULES
40-
no-only-in-tests: 2
41-
4239
# ERRORS
4340
no-unused-vars: [2, {vars: all, args: none}]
4441
curly: [2, "multi-line"]
@@ -79,17 +76,3 @@ rules:
7976
eol-last: 0
8077
no-trailing-spaces: 0
8178
indent: 0
82-
83-
# REACT
84-
react/jsx-no-undef: 2
85-
react/jsx-uses-vars: 1
86-
react/jsx-quotes: 1
87-
react/jsx-uses-react: 1
88-
react/no-did-mount-set-state: 1
89-
react/no-did-update-set-state: 1
90-
react/prop-types: 1
91-
react/react-in-jsx-scope: 1
92-
react/self-closing-comp: 1
93-
react/no-multi-comp: 0
94-
react/wrap-multilines: 1
95-
react/display-name: 0

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ create a snapshot point
282282
## own event dispatching (no event publisher function defined)
283283

284284
es.getUndispatchedEvents(function(err, evts) {
285+
// or es.getUndispatchedEvents('streamId', function(err, evts) {
286+
// or es.getUndispatchedEvents({ // free choice (all, only context, only aggregate, only aggregateId...)
287+
// context: 'hr',
288+
// aggregate: 'person',
289+
// aggregateId: 'uuid'
290+
// }, function(err, evts) {
285291

286292
// all undispatched events
287293
console.log(evts);
@@ -516,5 +522,3 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
516522
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
517523
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
518524
THE SOFTWARE.
519-
520-

lib/base.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,11 @@ _.extend(Store.prototype, {
117117

118118
/**
119119
* loads all undispatched events
120+
* @param {Object} query the query object [optional]
120121
* @param {Function} callback the function that will be called when this action has finished
121122
* `function(err, events){}`
122123
*/
123-
getUndispatchedEvents: function (callback) {
124+
getUndispatchedEvents: function (query, callback) {
124125
implementError(callback);
125126
},
126127

lib/databases/azuretable.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ _.extend(AzureTable.prototype, {
334334
});
335335
},
336336

337-
getUndispatchedEvents: function (callback) {
337+
getUndispatchedEvents: function (query, callback) {
338338

339339
var self = this;
340340
var tableQuery = new azure.TableQuery();

lib/databases/inmemory.js

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ function InMemory(options) {
1010
Store.call(this, options);
1111
this.store = {};
1212
this.snapshots = {};
13-
this.undispatchedEvents = {};
13+
this.undispatchedEvents = { _direct: {} };
1414
}
1515

1616
util.inherits(InMemory, Store);
@@ -57,7 +57,7 @@ _.extend(InMemory.prototype, {
5757
clear: function (callback) {
5858
this.store = {};
5959
this.snapshots = {};
60-
this.undispatchedEvents = {};
60+
this.undispatchedEvents = { _direct: {} };
6161
if (callback) callback(null);
6262
},
6363

@@ -84,14 +84,17 @@ _.extend(InMemory.prototype, {
8484

8585
this.store[context] = this.store[context] || {};
8686
this.store[context][aggregate] = this.store[context][aggregate] || {};
87-
8887
this.store[context][aggregate][aggregateId] = this.store[context][aggregate][aggregateId] || [];
89-
9088
this.store[context][aggregate][aggregateId] = this.store[context][aggregate][aggregateId].concat(events);
9189

90+
this.undispatchedEvents[context] = this.undispatchedEvents[context] || {};
91+
this.undispatchedEvents[context][aggregate] = this.undispatchedEvents[context][aggregate] || {};
92+
this.undispatchedEvents[context][aggregate][aggregateId] = this.undispatchedEvents[context][aggregate][aggregateId] || [];
93+
this.undispatchedEvents[context][aggregate][aggregateId] = this.undispatchedEvents[context][aggregate][aggregateId].concat(events);
94+
9295
var self = this;
9396
_.forEach(events, function(evt) {
94-
self.undispatchedEvents[evt.id] = evt;
97+
self.undispatchedEvents._direct[evt.id] = evt;
9598
});
9699

97100
callback(null);
@@ -250,16 +253,53 @@ _.extend(InMemory.prototype, {
250253
}
251254
},
252255

253-
getUndispatchedEvents: function (callback) {
254-
var res = _.map(this.undispatchedEvents, function(value, key) {
255-
return value;
256+
getUndispatchedEvents: function (query, callback) {
257+
var res = [];
258+
for (var s in this.undispatchedEvents) {
259+
if (s === '_direct') continue;
260+
for (var ss in this.undispatchedEvents[s]) {
261+
for (var sss in this.undispatchedEvents[s][ss]) {
262+
res = res.concat(this.undispatchedEvents[s][ss][sss]);
263+
}
264+
}
265+
}
266+
267+
res = _.sortBy(res, function (e) {
268+
return e.commitStamp.getTime();
256269
});
257270

271+
if (!_.isEmpty(query)) {
272+
res = _.filter(res, function(e) {
273+
var keys = _.keys(query);
274+
var values = _.values(query);
275+
var found = false;
276+
for (var i in keys) {
277+
var key = keys[i];
278+
var deepFound = deepFind(e, key);
279+
if (_.isArray(deepFound) && deepFound.length > 0) {
280+
found = true;
281+
} else if (deepFound === values[i]) {
282+
found = true;
283+
} else {
284+
found = false;
285+
break;
286+
}
287+
}
288+
return found;
289+
});
290+
}
291+
258292
callback(null, res);
259293
},
260294

261295
setEventToDispatched: function (id, callback) {
262-
delete this.undispatchedEvents[id];
296+
var evt = this.undispatchedEvents._direct[id];
297+
var aggregateId = evt.aggregateId;
298+
var aggregate = evt.aggregate || '_general';
299+
var context = evt.context || '_general';
300+
301+
this.undispatchedEvents[context][aggregate][aggregateId] = _.reject(this.undispatchedEvents[context][aggregate][aggregateId], evt);
302+
delete this.undispatchedEvents._direct[id];
263303
callback(null);
264304
},
265305

lib/databases/mongodb.js

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,24 @@ _.extend(Mongo.prototype, {
318318
});
319319
},
320320

321-
getUndispatchedEvents: function (callback) {
322-
this.events.find({ 'dispatched' : false }, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(callback);
321+
getUndispatchedEvents: function (query, callback) {
322+
var findStatement = {
323+
dispatched: false
324+
};
325+
326+
if (query && query.aggregate) {
327+
findStatement.aggregate = query.aggregate;
328+
}
329+
330+
if (query && query.context) {
331+
findStatement.context = query.context;
332+
}
333+
334+
if (query && query.aggregateId) {
335+
findStatement.aggregateId = query.aggregateId;
336+
}
337+
338+
this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(callback);
323339
},
324340

325341
setEventToDispatched: function (id, callback) {

lib/databases/redis.js

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ function Redis(options) {
2828
var url = require('url').parse(options.url);
2929
if (url.protocol === 'redis:') {
3030
if (url.auth) {
31-
var userparts = url.auth.split(":");
31+
var userparts = url.auth.split(':');
3232
options.user = userparts[0];
3333
if (userparts.length === 2) {
3434
options.password = userparts[1];
@@ -37,7 +37,7 @@ function Redis(options) {
3737
options.host = url.hostname;
3838
options.port = url.port;
3939
if (url.pathname) {
40-
options.db = url.pathname.replace("/", "", 1);
40+
options.db = url.pathname.replace('/', '', 1);
4141
}
4242
}
4343
}
@@ -378,12 +378,22 @@ _.extend(Redis.prototype, {
378378
);
379379
},
380380

381-
getUndispatchedEvents: function (callback) {
381+
getUndispatchedEvents: function (query, callback) {
382382
var self = this;
383383

384+
var aggregateId = '*';
385+
var aggregate = '*';
386+
var context = '*';
387+
388+
if (query) {
389+
aggregateId = query.aggregateId || '*';
390+
aggregate = query.aggregate || '*';
391+
context = query.context || '*';
392+
}
393+
384394
var evts = [];
385395

386-
this.scan(this.options.prefix + ':undispatched_' + this.options.eventsCollectionName + ':*:*:*:*:*:*',
396+
this.scan(this.options.prefix + ':undispatched_' + this.options.eventsCollectionName + ':*:*:' + context + ':' + aggregate + ':' + aggregateId + ':*',
387397
function (keys, fn) {
388398
var args = keys.concat(function (err, res) {
389399
handleResultSet(err, res, function (err, events) {

lib/databases/tingodb.js

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,24 @@ _.extend(Tingo.prototype, {
246246
});
247247
},
248248

249-
getUndispatchedEvents: function (callback) {
250-
this.events.find({ 'dispatched' : false }, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(callback);
249+
getUndispatchedEvents: function (query, callback) {
250+
var findStatement = {
251+
dispatched: false
252+
};
253+
254+
if (query && query.aggregate) {
255+
findStatement.aggregate = query.aggregate;
256+
}
257+
258+
if (query && query.context) {
259+
findStatement.context = query.context;
260+
}
261+
262+
if (query && query.aggregateId) {
263+
findStatement.aggregateId = query.aggregateId;
264+
}
265+
266+
this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(callback);
251267
},
252268

253269
setEventToDispatched: function (id, callback) {

lib/eventstore.js

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,11 +435,21 @@ _.extend(Eventstore.prototype, {
435435

436436
/**
437437
* loads all undispatched events
438-
* @param {Function} callback the function that will be called when this action has finished
439-
* `function(err, events){}`
438+
* @param {Object || String} query the query object [optional]
439+
* @param {Function} callback the function that will be called when this action has finished
440+
* `function(err, events){}`
440441
*/
441-
getUndispatchedEvents: function (callback) {
442-
this.store.getUndispatchedEvents(callback);
442+
getUndispatchedEvents: function (query, callback) {
443+
if (!callback) {
444+
callback = query;
445+
query = null;
446+
}
447+
448+
if (typeof query === 'string') {
449+
query = { aggregateId: query };
450+
}
451+
452+
this.store.getUndispatchedEvents(query, callback);
443453
},
444454

445455
/**

releasenotes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
#### [v1.5.0](https://github.com/adrai/node-eventstore/compare/v1.4.2...v1.5.0)
2+
- added possibility to getUndispatchedEvents by query
3+
14
#### [v1.4.2](https://github.com/adrai/node-eventstore/compare/v1.4.1...v1.4.2)
25
- optimization for `npm link`'ed development
36

test/eventstoreTest.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,70 @@ describe('eventstore', function () {
11161116

11171117
});
11181118

1119+
describe('requesting all undispatched events by streamId', function () {
1120+
1121+
it('it should return the correct events', function (done) {
1122+
1123+
es.getUndispatchedEvents('myAggId2', function (err, evts) {
1124+
expect(err).not.to.be.ok();
1125+
expect(evts.length).to.eql(5);
1126+
1127+
done();
1128+
});
1129+
1130+
});
1131+
1132+
});
1133+
1134+
describe('requesting all undispatched events by query', function () {
1135+
1136+
describe('aggregateId', function () {
1137+
1138+
it('it should return the correct events', function (done) {
1139+
1140+
es.getUndispatchedEvents({ aggregateId: 'myAggId' }, function (err, evts) {
1141+
expect(err).not.to.be.ok();
1142+
expect(evts.length).to.eql(3);
1143+
1144+
done();
1145+
});
1146+
1147+
});
1148+
1149+
});
1150+
1151+
describe('aggregate', function () {
1152+
1153+
it('it should return the correct events', function (done) {
1154+
1155+
es.getUndispatchedEvents({ aggregate: 'myAgg' }, function (err, evts) {
1156+
expect(err).not.to.be.ok();
1157+
expect(evts.length).to.eql(8);
1158+
1159+
done();
1160+
});
1161+
1162+
});
1163+
1164+
});
1165+
1166+
describe('context', function () {
1167+
1168+
it('it should return the correct events', function (done) {
1169+
1170+
es.getUndispatchedEvents({ context: 'myCont' }, function (err, evts) {
1171+
expect(err).not.to.be.ok();
1172+
expect(evts.length).to.eql(8);
1173+
1174+
done();
1175+
});
1176+
1177+
});
1178+
1179+
});
1180+
1181+
});
1182+
11191183
describe('setting an event to dispatched', function () {
11201184

11211185
it('it should work correctly', function (done) {

test/storeTest.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,7 +2293,7 @@ types.forEach(function (type) {
22932293

22942294
it('it should return the correct events', function (done) {
22952295

2296-
store.getUndispatchedEvents(function (err, evts) {
2296+
store.getUndispatchedEvents(null, function (err, evts) {
22972297
expect(err).not.to.be.ok();
22982298
expect(evts.length).to.eql(2);
22992299
expect(evts[0].id).to.eql(stream[0].id);
@@ -2313,7 +2313,7 @@ types.forEach(function (type) {
23132313
describe('calling setEventToDispatched', function () {
23142314

23152315
beforeEach(function (done) {
2316-
store.getUndispatchedEvents(function (err, evts) {
2316+
store.getUndispatchedEvents(null, function (err, evts) {
23172317
expect(evts.length).to.eql(2);
23182318
done();
23192319
});
@@ -2324,7 +2324,7 @@ types.forEach(function (type) {
23242324
store.setEventToDispatched('119', function (err) {
23252325
expect(err).not.to.be.ok();
23262326

2327-
store.getUndispatchedEvents(function (err, evts) {
2327+
store.getUndispatchedEvents(null, function (err, evts) {
23282328
expect(err).not.to.be.ok();
23292329
expect(evts.length).to.eql(1);
23302330
expect(evts[0].commitId).to.eql(stream[1].commitId);

0 commit comments

Comments
 (0)