Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getLastEventOfEachAggregate, onBeforeSet middleware #90

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,18 @@ It can be very useful as eventdenormalizer component if you work with (d)ddd, cq
startRevisionNumber: 1, // optional, if defined the denormaizer waits for an event with that revision to be used as first event

type: 'redis',
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'readmodel_revision', // optional
timeout: 10000 // optional
// password: 'secret' // optional
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'readmodel_revision', // optional
timeout: 10000 // optional
// password: 'secret', // optional
middlewares: { // optional
onBeforeSet: (evt, next) => {
var occuredAt = dotty.get(evt, 'occuredAt');
next({ occuredAt }); // the value passed will be stored within the rev guard, next to the revision
} // optional
},
},
skipExtendEvent: false, // optional
skipOnEventMissing: false, // optional
Expand Down Expand Up @@ -313,7 +319,7 @@ The values describes the path to that property in the notification message.
callback(null, evt);
});

### skip default event extensions
### skip default event extensions
You can skip all event extenders and the default extensions from being executed by adding the option `skipExtendEvent` to the denormalizer. Checkout the usage section for more information.


Expand All @@ -331,7 +337,7 @@ To do that, you need to include a loading method in the options object passed to
collection.addViewBuilder(new options.definitions.ViewBuilder({
name: 'evt',
aggregate: 'agg',
context: 'ctx'
context: 'ctx'
}, function() {}));
return {
collections: [
Expand Down Expand Up @@ -661,7 +667,7 @@ A lot of viewmodels can slow down the denormalization process!
// or
//.useAsId(function (evt, callback) {
// callback(null, 'newId');
//});
//});
// optional define a function that returns a query that will be used as query to find the viewmodels (but do not define the query in the options)
//.useAsQuery(function (evt) {
// return { my: evt.payload.my };
Expand Down Expand Up @@ -801,8 +807,8 @@ A lot of viewmodels can slow down the denormalization process!
// or
//.useAsId(function (evt, callback) {
// callback(null, 'newId');
//});
//});


### not for a collection

Expand Down Expand Up @@ -914,7 +920,7 @@ Importing ES6 style default exports is supported for all definitions where you a
```
module.exports = defineCollection({...});
```
works as well as
works as well as
```
exports.default = defineCollection({...});
```
Expand All @@ -923,7 +929,7 @@ as well as (must be transpiled by babel or tsc to be runnable in node)
export default defineCollection({...});
```

Also:
Also:
```
exports.default = defineViewBuilder({...});
exports.default = defineEventExtender({...});
Expand Down
12 changes: 9 additions & 3 deletions lib/denormalizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ function Denormalizer(options) {

var defaultRevOpt = {
queueTimeout: 1000,
queueTimeoutMaxLoops: 3//,
queueTimeoutMaxLoops: 3, //,
// startRevisionNumber: 1
middlewares: {
onBeforeSet: null //(evt, currentRev, next) => {}
}
};

options.revisionGuard = options.revisionGuard || {};
Expand Down Expand Up @@ -493,7 +496,7 @@ _.extend(Denormalizer.prototype, {
return res;
}

this.revisionGuardStore.get(evtPayload.reason.aggregateId, function (err, rev) {
this.revisionGuardStore.get(this.options.revisionGuard.prefix, evtPayload.reason.aggregateId, function (err, rev) {
if (err) {
debug(err);
if (callback) {
Expand Down Expand Up @@ -714,9 +717,12 @@ _.extend(Denormalizer.prototype, {
* `function(err, evt){}` evt is of type Object.
*/
getLastEvent: function (callback) {
this.revisionGuardStore.getLastEvent(callback);
this.revisionGuardStore.getLastEvent(this.options.revisionGuard.prefix, callback);
},

getLastEventOfEachAggregate: function (callback = (err, aggregateHandleFns) => {}) {
this.revisionGuardStore.getValueOfEachKey(this.options.revisionGuard.prefix, callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk, getValueOfEachKey sounds very specific to the redis implementation...
What about mongodb, etc...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you like getValueOfEachId?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nanov Any better idea?

Copy link
Contributor Author

@robinfehr robinfehr Jun 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will now add the getValueOfEachId version and update the PR :)

},
/**
* Clears all collections and the revisionGuardStore.
* @param {Function} callback The function, that will be called when this action is completed.
Expand Down
67 changes: 48 additions & 19 deletions lib/replayHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ReplayHandler.prototype = {
}, callback);
},
function (callback) {
self.store.clear(callback);
self.store.clear(self.options.revisionGuard.prefix, callback);
}
], callback);
},
Expand All @@ -68,39 +68,66 @@ ReplayHandler.prototype = {
* @returns {string}
*/
getConcatenatedId: function (evt) {
var aggregateId = '';
if (dotty.exists(evt, this.definition.aggregateId)) {
aggregateId = dotty.get(evt, this.definition.aggregateId);
var concatenatedId = '';
if (dotty.exists(evt, this.definition.context)) {
context = dotty.get(evt, this.definition.context);
concatenatedId = 'ctx:' + context;
}

var aggregate = '';
if (dotty.exists(evt, this.definition.aggregate)) {
aggregate = dotty.get(evt, this.definition.aggregate);
concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId;
concatenatedId = concatenatedId + 'agg:' + aggregate;
}

var context = '';
if (dotty.exists(evt, this.definition.context)) {
context = dotty.get(evt, this.definition.context);
if (dotty.exists(evt, this.definition.aggregateId)) {
aggregateId = dotty.get(evt, this.definition.aggregateId);
concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId;
concatenatedId = concatenatedId + 'aggId:' + aggregateId;
}

return context + aggregate + aggregateId;
return concatenatedId;
},

/**
* Updates the revision in the store.
* @param {Object} revisionMap The revision map.
* @param {Object} dataMap The data from the revision guard map.
* @param {Function} callback The function, that will be called when this action is completed.
* `function(err){}`
*/
updateRevision: function (revisionMap, callback) {
updateRevision: function (dataMap, callback) {
var self = this;
var ids = _.keys(revisionMap);
var ids = _.keys(dataMap);

async.each(ids, function (id, callback) {
self.store.get(id, function (err, rev) {
self.store.get(self.options.revisionGuard.prefix, id, function (err, data) {
if (err) {
return callback(err);
}
self.store.set(id, revisionMap[id] + 1, rev, callback);

var nextCalled = false;
var rev = (data && data.revision) || data;
var evt = dataMap[id]['evt'];
var revInEvt = dataMap[id]['revision'];

function next(revInEvt, oldRev, data) {
nextCalled = true;
self.store.set(self.options.revisionGuard.prefix, id, data, revInEvt + 1, oldRev, callback);
}

var onBeforeSetMiddleware = (
self.options &&
self.options.revisionGuard &&
self.options.revisionGuard.middlewares &&
self.options.revisionGuard.middlewares.onBeforeSet) || null;
if (!onBeforeSetMiddleware) {
next(revInEvt, rev, null);
} else {
onBeforeSetMiddleware(evt, next.bind(null, revInEvt, rev));
if (nextCalled === false) {
callback('next within the middleware onBeforeSet, never got called. You must call it.');
}
}
});
}, callback);
},
Expand Down Expand Up @@ -136,7 +163,7 @@ ReplayHandler.prototype = {
var doneCalled = false;
var doneClb = null;

var revisionMap = {};
var dataMap = {};
var collections = {};

var lastEvent;
Expand All @@ -150,11 +177,13 @@ ReplayHandler.prototype = {
if (!!self.definition.revision && dotty.exists(evt, self.definition.revision) &&
!!self.definition.aggregateId && dotty.exists(evt, self.definition.aggregateId)) {
concatenatedId = self.getConcatenatedId(evt);
revisionMap[concatenatedId] = dotty.get(evt, self.definition.revision);
dataMap[concatenatedId] = dataMap[concatenatedId] || {};
dataMap[concatenatedId]['evt'] = evt;
dataMap[concatenatedId]['revision'] = dotty.get(evt, self.definition.revision);
}

var concatenatedWithEventId = evtId;
if (concatenatedId) concatenatedWithEventId = concatenatedId + ':' + evtId;
if (concatenatedId) concatenatedWithEventId = concatenatedId + '::' + 'evtId:' + evtId;
if (seenEvents[concatenatedWithEventId]) return;
seenEvents[concatenatedWithEventId] = true;

Expand Down Expand Up @@ -254,10 +283,10 @@ ReplayHandler.prototype = {
}, callback);
},
function (callback) {
self.updateRevision(revisionMap, callback);
self.updateRevision(dataMap, callback);
},
function (callback) {
self.store.saveLastEvent(lastEvent, callback);
self.store.saveLastEvent(self.options.revisionGuard.prefix, lastEvent, callback);
}
], function (err) {
seenEvents = {};
Expand Down
Loading