Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getLastEventOfEachAggregate, onBeforeSet middleware #90

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,18 @@ It can be very useful as eventdenormalizer component if you work with (d)ddd, cq
startRevisionNumber: 1, // optional, if defined the denormaizer waits for an event with that revision to be used as first event

type: 'redis',
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'readmodel_revision', // optional
timeout: 10000 // optional
// password: 'secret' // optional
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'readmodel_revision', // optional
timeout: 10000 // optional
// password: 'secret', // optional
middlewares: { // optional
onBeforeSet: (evt, next) => {
var occuredAt = dotty.get(evt, 'occuredAt');
next({ occuredAt }); // the value passed will be stored within the rev guard, next to the revision
} // optional
},
},
skipExtendEvent: false, // optional
skipOnEventMissing: false, // optional
Expand Down Expand Up @@ -313,7 +319,7 @@ The values describes the path to that property in the notification message.
callback(null, evt);
});

### skip default event extensions
### skip default event extensions
You can skip all event extenders and the default extensions from being executed by adding the option `skipExtendEvent` to the denormalizer. Checkout the usage section for more information.


Expand All @@ -331,7 +337,7 @@ To do that, you need to include a loading method in the options object passed to
collection.addViewBuilder(new options.definitions.ViewBuilder({
name: 'evt',
aggregate: 'agg',
context: 'ctx'
context: 'ctx'
}, function() {}));
return {
collections: [
Expand Down Expand Up @@ -661,7 +667,7 @@ A lot of viewmodels can slow down the denormalization process!
// or
//.useAsId(function (evt, callback) {
// callback(null, 'newId');
//});
//});
// optional define a function that returns a query that will be used as query to find the viewmodels (but do not define the query in the options)
//.useAsQuery(function (evt) {
// return { my: evt.payload.my };
Expand Down Expand Up @@ -801,8 +807,8 @@ A lot of viewmodels can slow down the denormalization process!
// or
//.useAsId(function (evt, callback) {
// callback(null, 'newId');
//});
//});


### not for a collection

Expand Down Expand Up @@ -890,6 +896,22 @@ or depending on the last guarded event:

});

or depending on the last guarded event per aggregate:

denorm.getLastEventOfEachAggregate((err, aggregateHandleFns) => {

aggregateHandleFns.forEach(handleFn => {

handleFn((err, data) => {
console.log('id', data.id); // id that is stored within the revision guard for the aggregate
console.log('value', data.value); // value that is stored within the revision guard for the aggregate
});

});

});


### streamed

denormalizer.replayStreamed(function (replay, done) {
Expand All @@ -914,7 +936,7 @@ Importing ES6 style default exports is supported for all definitions where you a
```
module.exports = defineCollection({...});
```
works as well as
works as well as
```
exports.default = defineCollection({...});
```
Expand All @@ -923,7 +945,7 @@ as well as (must be transpiled by babel or tsc to be runnable in node)
export default defineCollection({...});
```

Also:
Also:
```
exports.default = defineViewBuilder({...});
exports.default = defineEventExtender({...});
Expand Down
19 changes: 16 additions & 3 deletions lib/denormalizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ function Denormalizer(options) {

var defaultRevOpt = {
queueTimeout: 1000,
queueTimeoutMaxLoops: 3//,
queueTimeoutMaxLoops: 3, //,
// startRevisionNumber: 1
middlewares: {
onBeforeSet: null //(evt, currentRev, next) => {}
}
};

options.revisionGuard = options.revisionGuard || {};
Expand Down Expand Up @@ -493,7 +496,7 @@ _.extend(Denormalizer.prototype, {
return res;
}

this.revisionGuardStore.get(evtPayload.reason.aggregateId, function (err, rev) {
this.revisionGuardStore.get(this.options.revisionGuard.prefix, evtPayload.reason.aggregateId, function (err, rev) {
if (err) {
debug(err);
if (callback) {
Expand Down Expand Up @@ -714,7 +717,17 @@ _.extend(Denormalizer.prototype, {
* `function(err, evt){}` evt is of type Object.
*/
getLastEvent: function (callback) {
this.revisionGuardStore.getLastEvent(callback);
this.revisionGuardStore.getLastEvent(this.options.revisionGuard.prefix, callback);
},

/**
* Returns an array of functions - for each aggregate one - which on invocation will return the saved value within the
* revision guard. To save a different value from an event than just the revision, the onBeforeSet middleware can be used.
* @param {Function} callback The function, that will be called when this action is completed.
* `function(err, aggregateHandleFns){}` aggregateHandleFns is of type Array.
*/
getLastEventOfEachAggregate: function (callback) {
this.revisionGuardStore.getValueOfEachId(this.options.revisionGuard.prefix, callback);
},

/**
Expand Down
67 changes: 48 additions & 19 deletions lib/replayHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ReplayHandler.prototype = {
}, callback);
},
function (callback) {
self.store.clear(callback);
self.store.clear(self.options.revisionGuard.prefix, callback);
}
], callback);
},
Expand All @@ -68,39 +68,66 @@ ReplayHandler.prototype = {
* @returns {string}
*/
getConcatenatedId: function (evt) {
var aggregateId = '';
if (dotty.exists(evt, this.definition.aggregateId)) {
aggregateId = dotty.get(evt, this.definition.aggregateId);
var concatenatedId = '';
if (dotty.exists(evt, this.definition.context)) {
context = dotty.get(evt, this.definition.context);
concatenatedId = 'ctx:' + context;
}

var aggregate = '';
if (dotty.exists(evt, this.definition.aggregate)) {
aggregate = dotty.get(evt, this.definition.aggregate);
concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId;
concatenatedId = concatenatedId + 'agg:' + aggregate;
}

var context = '';
if (dotty.exists(evt, this.definition.context)) {
context = dotty.get(evt, this.definition.context);
if (dotty.exists(evt, this.definition.aggregateId)) {
aggregateId = dotty.get(evt, this.definition.aggregateId);
concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId;
concatenatedId = concatenatedId + 'aggId:' + aggregateId;
}

return context + aggregate + aggregateId;
return concatenatedId;
},

/**
* Updates the revision in the store.
* @param {Object} revisionMap The revision map.
* @param {Object} dataMap The data from the revision guard map.
* @param {Function} callback The function, that will be called when this action is completed.
* `function(err){}`
*/
updateRevision: function (revisionMap, callback) {
updateRevision: function (dataMap, callback) {
var self = this;
var ids = _.keys(revisionMap);
var ids = _.keys(dataMap);

async.each(ids, function (id, callback) {
self.store.get(id, function (err, rev) {
self.store.get(self.options.revisionGuard.prefix, id, function (err, data) {
if (err) {
return callback(err);
}
self.store.set(id, revisionMap[id] + 1, rev, callback);

var nextCalled = false;
var rev = (data && data.revision) || data;
var evt = dataMap[id]['evt'];
var revInEvt = dataMap[id]['revision'];

function next(revInEvt, oldRev, data) {
nextCalled = true;
self.store.set(self.options.revisionGuard.prefix, id, data, revInEvt + 1, oldRev, callback);
}

var onBeforeSetMiddleware = (
self.options &&
self.options.revisionGuard &&
self.options.revisionGuard.middlewares &&
self.options.revisionGuard.middlewares.onBeforeSet) || null;
if (!onBeforeSetMiddleware) {
next(revInEvt, rev, null);
} else {
onBeforeSetMiddleware(evt, next.bind(null, revInEvt, rev));
if (nextCalled === false) {
callback('next within the middleware onBeforeSet, never got called. You must call it.');
}
}
});
}, callback);
},
Expand Down Expand Up @@ -136,7 +163,7 @@ ReplayHandler.prototype = {
var doneCalled = false;
var doneClb = null;

var revisionMap = {};
var dataMap = {};
var collections = {};

var lastEvent;
Expand All @@ -150,11 +177,13 @@ ReplayHandler.prototype = {
if (!!self.definition.revision && dotty.exists(evt, self.definition.revision) &&
!!self.definition.aggregateId && dotty.exists(evt, self.definition.aggregateId)) {
concatenatedId = self.getConcatenatedId(evt);
revisionMap[concatenatedId] = dotty.get(evt, self.definition.revision);
dataMap[concatenatedId] = dataMap[concatenatedId] || {};
dataMap[concatenatedId]['evt'] = evt;
dataMap[concatenatedId]['revision'] = dotty.get(evt, self.definition.revision);
}

var concatenatedWithEventId = evtId;
if (concatenatedId) concatenatedWithEventId = concatenatedId + ':' + evtId;
if (concatenatedId) concatenatedWithEventId = concatenatedId + '::' + 'evtId:' + evtId;
if (seenEvents[concatenatedWithEventId]) return;
seenEvents[concatenatedWithEventId] = true;

Expand Down Expand Up @@ -254,10 +283,10 @@ ReplayHandler.prototype = {
}, callback);
},
function (callback) {
self.updateRevision(revisionMap, callback);
self.updateRevision(dataMap, callback);
},
function (callback) {
self.store.saveLastEvent(lastEvent, callback);
self.store.saveLastEvent(self.options.revisionGuard.prefix, lastEvent, callback);
}
], function (err) {
seenEvents = {};
Expand Down
Loading