diff --git a/README.md b/README.md index ab0ded1..c617d4c 100644 --- a/README.md +++ b/README.md @@ -94,12 +94,18 @@ It can be very useful as eventdenormalizer component if you work with (d)ddd, cq startRevisionNumber: 1, // optional, if defined the denormaizer waits for an event with that revision to be used as first event type: 'redis', - host: 'localhost', // optional - port: 6379, // optional - db: 0, // optional - prefix: 'readmodel_revision', // optional - timeout: 10000 // optional - // password: 'secret' // optional + host: 'localhost', // optional + port: 6379, // optional + db: 0, // optional + prefix: 'readmodel_revision', // optional + timeout: 10000 // optional + // password: 'secret', // optional + middlewares: { // optional + onBeforeSet: (evt, next) => { + var occuredAt = dotty.get(evt, 'occuredAt'); + next({ occuredAt }); // the value passed will be stored within the rev guard, next to the revision + } // optional + }, }, skipExtendEvent: false, // optional skipOnEventMissing: false, // optional @@ -313,7 +319,7 @@ The values describes the path to that property in the notification message. callback(null, evt); }); -### skip default event extensions +### skip default event extensions You can skip all event extenders and the default extensions from being executed by adding the option `skipExtendEvent` to the denormalizer. Checkout the usage section for more information. @@ -331,7 +337,7 @@ To do that, you need to include a loading method in the options object passed to collection.addViewBuilder(new options.definitions.ViewBuilder({ name: 'evt', aggregate: 'agg', - context: 'ctx' + context: 'ctx' }, function() {})); return { collections: [ @@ -661,7 +667,7 @@ A lot of viewmodels can slow down the denormalization process! // or //.useAsId(function (evt, callback) { // callback(null, 'newId'); - //}); + //}); // optional define a function that returns a query that will be used as query to find the viewmodels (but do not define the query in the options) //.useAsQuery(function (evt) { // return { my: evt.payload.my }; @@ -801,8 +807,8 @@ A lot of viewmodels can slow down the denormalization process! // or //.useAsId(function (evt, callback) { // callback(null, 'newId'); - //}); - + //}); + ### not for a collection @@ -890,6 +896,22 @@ or depending on the last guarded event: }); +or depending on the last guarded event per aggregate: + + denorm.getLastEventOfEachAggregate((err, aggregateHandleFns) => { + + aggregateHandleFns.forEach(handleFn => { + + handleFn((err, data) => { + console.log('id', data.id); // id that is stored within the revision guard for the aggregate + console.log('value', data.value); // value that is stored within the revision guard for the aggregate + }); + + }); + + }); + + ### streamed denormalizer.replayStreamed(function (replay, done) { @@ -914,7 +936,7 @@ Importing ES6 style default exports is supported for all definitions where you a ``` module.exports = defineCollection({...}); ``` -works as well as +works as well as ``` exports.default = defineCollection({...}); ``` @@ -923,7 +945,7 @@ as well as (must be transpiled by babel or tsc to be runnable in node) export default defineCollection({...}); ``` -Also: +Also: ``` exports.default = defineViewBuilder({...}); exports.default = defineEventExtender({...}); diff --git a/lib/denormalizer.js b/lib/denormalizer.js index e00198d..9c894e1 100644 --- a/lib/denormalizer.js +++ b/lib/denormalizer.js @@ -57,8 +57,11 @@ function Denormalizer(options) { var defaultRevOpt = { queueTimeout: 1000, - queueTimeoutMaxLoops: 3//, + queueTimeoutMaxLoops: 3, //, // startRevisionNumber: 1 + middlewares: { + onBeforeSet: null //(evt, currentRev, next) => {} + } }; options.revisionGuard = options.revisionGuard || {}; @@ -493,7 +496,7 @@ _.extend(Denormalizer.prototype, { return res; } - this.revisionGuardStore.get(evtPayload.reason.aggregateId, function (err, rev) { + this.revisionGuardStore.get(this.options.revisionGuard.prefix, evtPayload.reason.aggregateId, function (err, rev) { if (err) { debug(err); if (callback) { @@ -714,7 +717,17 @@ _.extend(Denormalizer.prototype, { * `function(err, evt){}` evt is of type Object. */ getLastEvent: function (callback) { - this.revisionGuardStore.getLastEvent(callback); + this.revisionGuardStore.getLastEvent(this.options.revisionGuard.prefix, callback); + }, + + /** + * Returns an array of functions - for each aggregate one - which on invocation will return the saved value within the + * revision guard. To save a different value from an event than just the revision, the onBeforeSet middleware can be used. + * @param {Function} callback The function, that will be called when this action is completed. + * `function(err, aggregateHandleFns){}` aggregateHandleFns is of type Array. + */ + getLastEventOfEachAggregate: function (callback) { + this.revisionGuardStore.getValueOfEachId(this.options.revisionGuard.prefix, callback); }, /** diff --git a/lib/replayHandler.js b/lib/replayHandler.js index a3390d1..cd0ca77 100644 --- a/lib/replayHandler.js +++ b/lib/replayHandler.js @@ -57,7 +57,7 @@ ReplayHandler.prototype = { }, callback); }, function (callback) { - self.store.clear(callback); + self.store.clear(self.options.revisionGuard.prefix, callback); } ], callback); }, @@ -68,39 +68,66 @@ ReplayHandler.prototype = { * @returns {string} */ getConcatenatedId: function (evt) { - var aggregateId = ''; - if (dotty.exists(evt, this.definition.aggregateId)) { - aggregateId = dotty.get(evt, this.definition.aggregateId); + var concatenatedId = ''; + if (dotty.exists(evt, this.definition.context)) { + context = dotty.get(evt, this.definition.context); + concatenatedId = 'ctx:' + context; } - var aggregate = ''; if (dotty.exists(evt, this.definition.aggregate)) { aggregate = dotty.get(evt, this.definition.aggregate); + concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId; + concatenatedId = concatenatedId + 'agg:' + aggregate; } - var context = ''; - if (dotty.exists(evt, this.definition.context)) { - context = dotty.get(evt, this.definition.context); + if (dotty.exists(evt, this.definition.aggregateId)) { + aggregateId = dotty.get(evt, this.definition.aggregateId); + concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId; + concatenatedId = concatenatedId + 'aggId:' + aggregateId; } - return context + aggregate + aggregateId; + return concatenatedId; }, /** * Updates the revision in the store. - * @param {Object} revisionMap The revision map. + * @param {Object} dataMap The data from the revision guard map. * @param {Function} callback The function, that will be called when this action is completed. * `function(err){}` */ - updateRevision: function (revisionMap, callback) { + updateRevision: function (dataMap, callback) { var self = this; - var ids = _.keys(revisionMap); + var ids = _.keys(dataMap); + async.each(ids, function (id, callback) { - self.store.get(id, function (err, rev) { + self.store.get(self.options.revisionGuard.prefix, id, function (err, data) { if (err) { return callback(err); } - self.store.set(id, revisionMap[id] + 1, rev, callback); + + var nextCalled = false; + var rev = (data && data.revision) || data; + var evt = dataMap[id]['evt']; + var revInEvt = dataMap[id]['revision']; + + function next(revInEvt, oldRev, data) { + nextCalled = true; + self.store.set(self.options.revisionGuard.prefix, id, data, revInEvt + 1, oldRev, callback); + } + + var onBeforeSetMiddleware = ( + self.options && + self.options.revisionGuard && + self.options.revisionGuard.middlewares && + self.options.revisionGuard.middlewares.onBeforeSet) || null; + if (!onBeforeSetMiddleware) { + next(revInEvt, rev, null); + } else { + onBeforeSetMiddleware(evt, next.bind(null, revInEvt, rev)); + if (nextCalled === false) { + callback('next within the middleware onBeforeSet, never got called. You must call it.'); + } + } }); }, callback); }, @@ -136,7 +163,7 @@ ReplayHandler.prototype = { var doneCalled = false; var doneClb = null; - var revisionMap = {}; + var dataMap = {}; var collections = {}; var lastEvent; @@ -150,11 +177,13 @@ ReplayHandler.prototype = { if (!!self.definition.revision && dotty.exists(evt, self.definition.revision) && !!self.definition.aggregateId && dotty.exists(evt, self.definition.aggregateId)) { concatenatedId = self.getConcatenatedId(evt); - revisionMap[concatenatedId] = dotty.get(evt, self.definition.revision); + dataMap[concatenatedId] = dataMap[concatenatedId] || {}; + dataMap[concatenatedId]['evt'] = evt; + dataMap[concatenatedId]['revision'] = dotty.get(evt, self.definition.revision); } var concatenatedWithEventId = evtId; - if (concatenatedId) concatenatedWithEventId = concatenatedId + ':' + evtId; + if (concatenatedId) concatenatedWithEventId = concatenatedId + '::' + 'evtId:' + evtId; if (seenEvents[concatenatedWithEventId]) return; seenEvents[concatenatedWithEventId] = true; @@ -254,10 +283,10 @@ ReplayHandler.prototype = { }, callback); }, function (callback) { - self.updateRevision(revisionMap, callback); + self.updateRevision(dataMap, callback); }, function (callback) { - self.store.saveLastEvent(lastEvent, callback); + self.store.saveLastEvent(self.options.revisionGuard.prefix, lastEvent, callback); } ], function (err) { seenEvents = {}; diff --git a/lib/revisionGuard.js b/lib/revisionGuard.js index c6431b9..839dc76 100644 --- a/lib/revisionGuard.js +++ b/lib/revisionGuard.js @@ -17,7 +17,8 @@ function RevisionGuard (store, options) { var defaults = { queueTimeout: 1000, - queueTimeoutMaxLoops: 3//, + queueTimeoutMaxLoops: 3, + prefix: 'default' // must be dafault - store also uses it // startRevisionNumber: 1 }; @@ -109,22 +110,25 @@ RevisionGuard.prototype = { * @returns {string} */ getConcatenatedId: function (evt) { - var aggregateId = ''; - if (dotty.exists(evt, this.definition.aggregateId)) { - aggregateId = dotty.get(evt, this.definition.aggregateId); + var concatenatedId = ''; + if (dotty.exists(evt, this.definition.context)) { + context = dotty.get(evt, this.definition.context); + concatenatedId = 'ctx:' + context; } - var aggregate = ''; if (dotty.exists(evt, this.definition.aggregate)) { aggregate = dotty.get(evt, this.definition.aggregate); + concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId; + concatenatedId = concatenatedId + 'agg:' + aggregate; } - var context = ''; - if (dotty.exists(evt, this.definition.context)) { - context = dotty.get(evt, this.definition.context); + if (dotty.exists(evt, this.definition.aggregateId)) { + aggregateId = dotty.get(evt, this.definition.aggregateId); + concatenatedId = concatenatedId ? (concatenatedId + '::') : concatenatedId; + concatenatedId = concatenatedId + 'aggId:' + aggregateId; } - return context + aggregate + aggregateId; + return concatenatedId; }, /** @@ -144,10 +148,11 @@ RevisionGuard.prototype = { callback = _.once(callback); this.queue.push(concatenatedId, evtId, evt, callback, function (loopCount, waitAgain) { - self.store.get(concatenatedId, function (err, revInStore) { + self.store.get(self.options.prefix, concatenatedId, function (err, revInStore) { if (err) { debug(err); - self.store.remove(concatenatedId, evtId); + // TODO: this might not work - no remove fn on the store + self.store.remove(self.options.prefix, concatenatedId, evtId); return callback(err); } @@ -183,51 +188,68 @@ RevisionGuard.prototype = { * `function(err){}` */ finishGuard: function (evt, revInStore, callback) { + var self = this; + var nextCalled = false; var evtId = dotty.get(evt, this.definition.id); var revInEvt = dotty.get(evt, this.definition.revision); - var self = this; + function next (data) { + nextCalled = true; + var concatenatedId = self.getConcatenatedId(evt); - var concatenatedId = this.getConcatenatedId(evt); + self.store.set(self.options.prefix, concatenatedId, data, revInEvt + 1, revInStore, function (err) { + if (err) { + debug(err); + if (err instanceof ConcurrencyError) { + var retryIn = randomBetween(0, self.options.retryOnConcurrencyTimeout || 800); + debug('retry in ' + retryIn + 'ms for [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); + setTimeout(function() { + self.guard(evt, callback); + }, retryIn); + return; + } - this.store.set(concatenatedId, revInEvt + 1, revInStore, function (err) { - if (err) { - debug(err); - if (err instanceof ConcurrencyError) { - var retryIn = randomBetween(0, self.options.retryOnConcurrencyTimeout || 800); - debug('retry in ' + retryIn + 'ms for [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); - setTimeout(function() { - self.guard(evt, callback); - }, retryIn); - return; + return callback(err); } - return callback(err); - } + self.store.saveLastEvent(self.options.prefix, evt, function (err) { + if (err) { + debug('error while saving last event'); + debug(err); + } + }); - self.store.saveLastEvent(evt, function (err) { - if (err) { - debug('error while saving last event'); - debug(err); - } - }); + self.queue.remove(concatenatedId, evtId); + callback(null); + + var pendingEvents = self.queue.get(concatenatedId); + if (!pendingEvents || pendingEvents.length === 0) return debug('no other pending event found [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); - self.queue.remove(concatenatedId, evtId); - callback(null); + var nextEvent = _.find(pendingEvents, function (e) { + var revInNextEvt = dotty.get(e.payload, self.definition.revision); + return revInNextEvt === revInEvt + 1; + }); - var pendingEvents = self.queue.get(concatenatedId); - if (!pendingEvents || pendingEvents.length === 0) return debug('no other pending event found [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); + if (!nextEvent) return debug('no next pending event found [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); - var nextEvent = _.find(pendingEvents, function (e) { - var revInNextEvt = dotty.get(e.payload, self.definition.revision); - return revInNextEvt === revInEvt + 1; + debug('found next pending event => guard [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); + self.guard(nextEvent.payload, nextEvent.callback); }); + } - if (!nextEvent) return debug('no next pending event found [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); - - debug('found next pending event => guard [concatenatedId]=' + concatenatedId + ', [revInStore]=' + (revInStore || 'null') + ', [revInEvt]=' + revInEvt); - self.guard(nextEvent.payload, nextEvent.callback); - }); + var onBeforeSetMiddleware = ( + self.options && + self.options.middlewares && + self.options.middlewares.onBeforeSet) || null; + + if (!onBeforeSetMiddleware) { + next(null); + } else { + onBeforeSetMiddleware(evt, next); + if (nextCalled === false) { + callback('next within the middleware onBeforeSet, never got called. You must call it.'); + } + } }, /** @@ -237,7 +259,7 @@ RevisionGuard.prototype = { */ guard: function (evt, callback) { if (!this.definition.aggregateId || !dotty.exists(evt, this.definition.aggregateId) || - !this.definition.revision || !dotty.exists(evt, this.definition.revision)) { + !this.definition.revision || !dotty.exists(evt, this.definition.revision)) { var err = new Error('Please define an aggregateId!'); debug(err); return callback(err); @@ -303,7 +325,7 @@ RevisionGuard.prototype = { function retry (max, loop) { setTimeout(function () { - self.store.get(concatenatedId, function(err, revInStore) { + self.store.get(self.options.prefix, concatenatedId, function(err, revInStore) { if (err) { debug(err); return callback(err); @@ -327,7 +349,7 @@ RevisionGuard.prototype = { } process.nextTick(function () { - self.store.get(concatenatedId, function (err, revInStore) { + self.store.get(self.options.prefix, concatenatedId, function (err, revInStore) { if (err) { debug(err); return callback(err); diff --git a/lib/revisionGuardStore/base.js b/lib/revisionGuardStore/base.js index 57ee4da..3541c43 100644 --- a/lib/revisionGuardStore/base.js +++ b/lib/revisionGuardStore/base.js @@ -43,7 +43,7 @@ _.extend(Guard.prototype, { * @param {Function} callback The function, that will be called when this action is completed. * `function(err, id){}` id is of type String. */ - getNewId: function (callback) { + getNewId: function (prefix, callback) { var id = uuid().toString(); if (callback) callback(null, id); }, @@ -54,19 +54,20 @@ _.extend(Guard.prototype, { * @param {Function} callback The function, that will be called when this action is completed. * `function(err, revision){}` id is of type String. */ - get: function (id, callback) { + get: function (prefix, id, callback) { implementError(callback); }, /** * Updates the revision number. - * @param {String} id The aggregate id. + * @param {String} id The unique id including the prefix. + * @param {Object} data The data that should be stored next to the revision * @param {Number} revision The new revision number. * @param {Number} oldRevision The old revision number. * @param {Function} callback The function, that will be called when this action is completed. * `function(err, revision){}` revision is of type Number. */ - set: function (id, revision, oldRevision, callback) { + set: function (prefix, id, data, revision, oldRevision, callback) { implementError(callback); }, @@ -76,7 +77,7 @@ _.extend(Guard.prototype, { * @param {Function} callback The function, that will be called when this action is completed. * `function(err){}` */ - saveLastEvent: function (evt, callback) { + saveLastEvent: function (prefix, evt, callback) { implementError(callback); }, @@ -85,7 +86,19 @@ _.extend(Guard.prototype, { * @param {Function} callback The function, that will be called when this action is completed. * `function(err, evt){}` evt is of type Object. */ - getLastEvent: function (callback) { + getLastEvent: function (prefix, callback) { + implementError(callback); + }, + + + /** + * Gets the last value of each id and will invoke the callback + * with all the handler function of which each on invokation will retrieve the + * value of the last event from the revision guard. + * @param {Function} callback The function, that will be called when this action is completed. + * `function(err, aggregateHandleFns){}` aggregateHandleFns is of type Array. + */ + getValueOfEachId: function (prefix, callback = (err, aggregateHandleFns) => {}) { implementError(callback); }, @@ -94,7 +107,7 @@ _.extend(Guard.prototype, { * clears the complete store... * @param {Function} callback the function that will be called when this action has finished [optional] */ - clear: function (callback) { + clear: function (prefix, callback) { implementError(callback); } diff --git a/lib/revisionGuardStore/databases/dynamodb.js b/lib/revisionGuardStore/databases/dynamodb.js index a713ba8..e8e6377 100644 --- a/lib/revisionGuardStore/databases/dynamodb.js +++ b/lib/revisionGuardStore/databases/dynamodb.js @@ -50,13 +50,15 @@ _.extend(DynamoDB.prototype, { if (callback) callback(null); }, - get: function(id, callback) { + get: function(prefix, id, callback) { + prefix = prefix || 'default'; + var self = this; if (_.isFunction(id)) { callback = id; id = null; - } + } if (!id) { id = uuid().toString(); @@ -66,6 +68,8 @@ _.extend(DynamoDB.prototype, { return callback(err); } + id = prefix + '::' + id; + var params = { TableName: self.options.tableName, Key: { @@ -88,7 +92,9 @@ _.extend(DynamoDB.prototype, { }); }, - set: function (id, revision, oldRevision, callback) { + set: function (prefix, id, data, revision, oldRevision, callback) { + prefix = prefix || 'default'; + var self = this; if (!id || !_.isString(id)) { @@ -96,28 +102,41 @@ _.extend(DynamoDB.prototype, { debug(err); return callback(err); } + + if (typeof data !== 'object') { + var err = new Error('Please pass a valid data object or null!'); + debug(err); + return callback(err); + } if (!revision || !_.isNumber(revision)) { var err = new Error('Please pass a valid revision!'); debug(err); return callback(err); } + id = prefix + '::' + id; + self.checkConnection(function(err) { if (err) { return callback(err); } var entity = { TableName: self.options.tableName, - Item: { - HashKey: id, - RangeKey: id, - Revision: revision + Item: { + HashKey: id, + RangeKey: id, + Revision: revision }, ConditionExpression: 'attribute_not_exists(HashKey) OR Revision = :oldRevision', ExpressionAttributeValues: { ':oldRevision': oldRevision } }; + + if (data) { + entity.Item.Data = data; + } + self.documentClient.put(entity, function(err, data) { if (err) { if (err.code == 'ConditionalCheckFailedException') @@ -129,7 +148,9 @@ _.extend(DynamoDB.prototype, { }); }, - saveLastEvent: function (evt, callback) { + saveLastEvent: function (prefix, evt, callback) { + prefix = prefix || 'default'; + var self = this; self.checkConnection(function(err) { if (err) { @@ -137,10 +158,10 @@ _.extend(DynamoDB.prototype, { } var entity = { TableName: self.options.tableName, - Item: { - HashKey: "THE_LAST_SEEN_EVENT", - RangeKey: "THE_LAST_SEEN_EVENT", - event: evt + Item: { + HashKey: prefix + '::' + 'THE_LAST_SEEN_EVENT', + RangeKey: prefix + '::' + 'THE_LAST_SEEN_EVENT', + event: evt } }; @@ -155,7 +176,9 @@ _.extend(DynamoDB.prototype, { }); }, - getLastEvent: function (callback) { + getLastEvent: function (prefix, callback) { + prefix = prefix || 'default'; + var self = this; self.checkConnection(function(err) { if (err) { @@ -164,8 +187,8 @@ _.extend(DynamoDB.prototype, { var params = { TableName: self.options.tableName, Key: { - HashKey: "THE_LAST_SEEN_EVENT", - RangeKey: "THE_LAST_SEEN_EVENT" + HashKey: prefix + '::' + 'THE_LAST_SEEN_EVENT', + RangeKey: prefix + '::' + 'THE_LAST_SEEN_EVENT' } }; @@ -192,11 +215,11 @@ _.extend(DynamoDB.prototype, { } createTableIfNotExists( - self.client, - RevisionTableDefinition(self.options.tableName, self.options), + self.client, + RevisionTableDefinition(self.options.tableName, self.options), function(err){ if (err) { - // ignore ResourceInUseException + // ignore ResourceInUseException // as there could be multiple requests attempt to create table concurrently if (err.code === 'ResourceInUseException') { return callback(null); @@ -213,7 +236,10 @@ _.extend(DynamoDB.prototype, { } ); }, - clear: function(callback) { + + clear: function(prefix, callback) { + prefix = prefix || 'default'; + var self = this; self.checkConnection(function(err) { if (err) { @@ -223,6 +249,7 @@ _.extend(DynamoDB.prototype, { var query = { TableName: self.options.tableName }; + self.documentClient.scan(query, function(err, entities) { if (err) { return callback(err); @@ -230,6 +257,10 @@ _.extend(DynamoDB.prototype, { async.each( entities.Items, function(entity, callback) { + if (entity.HashKey.indexOf(prefix) >= 0) { + return callback(null); + } + var params = { TableName: self.options.tableName, Key: { HashKey: entity.HashKey, RangeKey: entity.RangeKey } diff --git a/lib/revisionGuardStore/databases/inmemory.js b/lib/revisionGuardStore/databases/inmemory.js index 0e37f8a..a5510e1 100644 --- a/lib/revisionGuardStore/databases/inmemory.js +++ b/lib/revisionGuardStore/databases/inmemory.js @@ -7,7 +7,7 @@ var util = require('util'), function InMemory(options) { Store.call(this, options); this.store = {}; - this.lastEvent = null; + this.lastEvent = {}; } util.inherits(InMemory, Store); @@ -24,49 +24,101 @@ _.extend(InMemory.prototype, { if (callback) callback(null); }, - get: function (id, callback) { + get: function (prefix, id, callback) { + prefix = prefix || 'default'; if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } - callback(null, this.store[id] || null); + id = prefix + '::' + id; + + var rev = (this.store[id] && this.store[id]['revision']) || null; + callback(null, rev); }, - set: function (id, revision, oldRevision, callback) { + set: function (prefix, id, data, revision, oldRevision, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } + if (typeof data !== 'object') { + var err = new Error('Please pass a valid data object or null!'); + debug(err); + return callback(err); + } if (!revision || !_.isNumber(revision)) { var err = new Error('Please pass a valid revision!'); debug(err); return callback(err); } - if (this.store[id] && this.store[id] !== oldRevision) { + id = prefix + '::' + id; + + if (this.store[id] && this.store[id]['revision'] && this.store[id]['revision'] !== oldRevision) { return callback(new ConcurrencyError()); } - this.store[id] = revision; + + this.store[id] = this.store[id] || {}; + this.store[id]['revision'] = revision; + if (data) { + this.store[id]['data'] = data; + } callback(null); }, - saveLastEvent: function (evt, callback) { - this.lastEvent = evt; + saveLastEvent: function (prefix, evt, callback) { + prefix = prefix || 'default'; + this.lastEvent[prefix] = evt; if (callback) callback(null); }, - getLastEvent: function (callback) { - callback(null, this.lastEvent); + getLastEvent: function (prefix, callback) { + prefix = prefix || 'default'; + callback(null, this.lastEvent[prefix]); }, - clear: function (callback) { - this.store = {}; - this.lastEvent = null; + getValueOfId: function (id, callback) { + callback(null, { key: id, value: this.store[id] }); + }, + + getValueOfEachId: function (prefix, callback = (err, aggregateHandleFns) => {}) { + prefix = prefix || 'default'; + + var self = this; + var uniqueIds = {}; + var aggregateHandleFns = []; + var ids = _.keys(this.store); + ids.forEach(function (id) { + // don't reprocess an already handeled id + if ( !uniqueIds[id] && id.indexOf(prefix) >= 0) { + uniqueIds[id] = true; + aggregateHandleFns.push((cb) => self.getValueOfId(id, cb)); + } + }); + + callback(null, aggregateHandleFns); + }, + + clear: function (prefix, callback) { + var self = this; + prefix = prefix || 'default'; + + var keys = _.keys(self.store); + keys.forEach(function (key) { + if (key.indexOf(prefix) >= 0) { + self.store[key] = null; + } + }); + + self.lastEvent[prefix] = null; + if (callback) callback(null); } diff --git a/lib/revisionGuardStore/databases/mongodb.js b/lib/revisionGuardStore/databases/mongodb.js index f49e9ff..988a6e1 100644 --- a/lib/revisionGuardStore/databases/mongodb.js +++ b/lib/revisionGuardStore/databases/mongodb.js @@ -162,17 +162,21 @@ _.extend(Mongo.prototype, { this.db.close(callback || function () {}); }, - getNewId: function(callback) { + getNewId: function(prefix, callback) { callback(null, new ObjectID().toString()); }, - get: function (id, callback) { + get: function (prefix, id, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } + id = prefix + '::' + id; + this.store.findOne({ _id: id }, function (err, entry) { if (err) { return callback(err); @@ -186,19 +190,30 @@ _.extend(Mongo.prototype, { }); }, - set: function (id, revision, oldRevision, callback) { + set: function (prefix, id, data, revision, oldRevision, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } + if (typeof data !== 'object') { + var err = new Error('Please pass a valid data object or null!'); + debug(err); + return callback(err); + } if (!revision || !_.isNumber(revision)) { var err = new Error('Please pass a valid revision!'); debug(err); return callback(err); } - this.store.update({ _id: id, revision: oldRevision }, { _id: id, revision: revision }, { safe: true, upsert: true }, function (err, modifiedCount) { + + id = prefix + '::' + id; + + var newData = data ? { _id: id, revision, data } : { _id: id, revision }; + this.store.update({ _id: id, revision: oldRevision }, newData, { safe: true, upsert: true }, function (err, modifiedCount) { if (isNew) { if (modifiedCount && modifiedCount.result && modifiedCount.result.n === 0) { err = new ConcurrencyError(); @@ -231,14 +246,18 @@ _.extend(Mongo.prototype, { }); }, - saveLastEvent: function (evt, callback) { - this.store.save({ _id: 'THE_LAST_SEEN_EVENT', event: evt }, { safe: true }, function (err) { + saveLastEvent: function (prefix, evt, callback) { + prefix = prefix || 'default'; + + this.store.save({ _id: prefix + '::THE_LAST_SEEN_EVENT', event: evt }, { safe: true }, function (err) { if (callback) { callback(err); } }); }, - getLastEvent: function (callback) { - this.store.findOne({ _id: 'THE_LAST_SEEN_EVENT' }, function (err, entry) { + getLastEvent: function (prefix, callback) { + prefix = prefix || 'default'; + + this.store.findOne({ _id: prefix + '::THE_LAST_SEEN_EVENT' }, function (err, entry) { if (err) { return callback(err); } @@ -251,7 +270,8 @@ _.extend(Mongo.prototype, { }); }, - clear: function (callback) { + clear: function (prefix, callback) { + // TODO: clear only for prefix this.store.remove({}, { safe: true }, callback); } diff --git a/lib/revisionGuardStore/databases/redis.js b/lib/revisionGuardStore/databases/redis.js index 175df35..43efeba 100644 --- a/lib/revisionGuardStore/databases/redis.js +++ b/lib/revisionGuardStore/databases/redis.js @@ -13,7 +13,6 @@ function Redis(options) { var defaults = { host: 'localhost', port: 6379, - prefix: 'readmodel_revision', retry_strategy: function (options) { return undefined; }//, @@ -144,8 +143,9 @@ _.extend(Redis.prototype, { if (callback) callback(null, this); }, - getNewId: function(callback) { - this.client.incr('nextItemId:' + this.prefix, function(err, id) { + getNewId: function(prefix, callback) { + prefix = prefix || 'default'; + this.client.incr('nextItemId:' + prefix, function(err, id) { if (err) { return callback(err); } @@ -153,14 +153,18 @@ _.extend(Redis.prototype, { }); }, - get: function (id, callback) { + get: function (prefix, id, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } - this.client.get(this.options.prefix + ':' + id, function (err, entry) { + id = prefix + '::' + id; + + this.client.get(id, function (err, entry) { if (err) { return callback(err); } @@ -180,28 +184,33 @@ _.extend(Redis.prototype, { }); }, - set: function (id, revision, oldRevision, callback) { + set: function (prefix, id, data, revision, oldRevision, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } + if (typeof data !== 'object') { + var err = new Error('Please pass a valid data object!'); + debug(err); + return callback(err); + } if (!revision || !_.isNumber(revision)) { var err = new Error('Please pass a valid revision!'); debug(err); return callback(err); } - var key = this.options.prefix + ':' + id; - var self = this; - this.client.watch(key, function (err) { + this.client.watch(id, function (err) { if (err) { return callback(err); } - self.get(id, function (err, rev) { + self.get(prefix, id, function (err, rev) { if (err) { debug(err); if (callback) callback(err); @@ -223,7 +232,9 @@ _.extend(Redis.prototype, { return; } - self.client.multi([['set'].concat([key, JSON.stringify({ revision: revision })])]).exec(function (err, replies) { + id = prefix + '::' + id; + var newData = data ? { revision, data } : { revision }; + self.client.multi([['set'].concat([id, JSON.stringify(newData)])]).exec(function (err, replies) { if (err) { debug(err); if (callback) { @@ -249,16 +260,18 @@ _.extend(Redis.prototype, { }); }, - saveLastEvent: function (evt, callback) { - var key = this.options.prefix + ':THE_LAST_SEEN_EVENT'; + saveLastEvent: function (prefix, evt, callback) { + prefix = prefix || 'default'; - this.client.set(key, JSON.stringify({ event: evt }), function (err) { + this.client.set(prefix + '::THE_LAST_SEEN_EVENT', JSON.stringify({ event: evt }), function (err) { if (callback) { callback(err); } }); }, - getLastEvent: function (callback) { - this.client.get(this.options.prefix + ':THE_LAST_SEEN_EVENT', function (err, entry) { + getLastEvent: function (prefix, callback) { + prefix = prefix || 'default'; + + this.client.get(prefix + '::THE_LAST_SEEN_EVENT', function (err, entry) { if (err) { return callback(err); } @@ -278,14 +291,78 @@ _.extend(Redis.prototype, { }); }, - clear: function (callback) { + getValueOfId: function (id, callback) { + this.client.get(id, (err, value) => { + if (err) { + return callback(err); + } + + if (!value) { + return callback('No value for the id:' + id); + } + + try { + value = jsondate.parse(value.toString()); + } catch (err) { + return callback(err); + } + + return callback(null, { id, value }); + }); + }, + + getValueOfEachId: function (prefix, callback = (err, aggregateHandleFns) => {}) { + prefix = prefix || 'default'; + + var self = this; + var uniqueIds = {}; + var aggregateHandleFns = []; + + function scanRecursive(curs) { + self.client.scan(curs, 'MATCH', prefix + '::*', function (err, res) { + if (err) return callback(err); + + var cursor = res[0]; + var ids = res[1]; + + if (err) { + return callback(err); + } + + function next() { + // Check if we processed all ids from the redis store + if (cursor === '0') { + callback(null, aggregateHandleFns); + } else { + setImmediate(scanRecursive.bind(null, cursor)); + } + } + + ids.forEach(id => { + // don't reprocess an already handeled id + if (!uniqueIds[id] && id.indexOf('THE_LAST_SEEN_EVENT') === -1) { + uniqueIds[id] = true; + aggregateHandleFns.push((cb) => self.getValueOfId(id, cb)); + } + }); + + next(); + }); + } + + scanRecursive(0); + }, + + clear: function (prefix, callback) { + prefix = prefix || 'default'; + var self = this; async.parallel([ function (callback) { - self.client.del('nextItemId:' + self.options.prefix, callback); + self.client.del('nextItemId:' + prefix, callback); }, function (callback) { - self.client.keys(self.options.prefix + ':*', function(err, keys) { + self.client.keys(prefix + '::*', function(err, keys) { if (err) { return callback(err); } diff --git a/lib/revisionGuardStore/databases/tingodb.js b/lib/revisionGuardStore/databases/tingodb.js index 0d2f27a..89dfbaa 100644 --- a/lib/revisionGuardStore/databases/tingodb.js +++ b/lib/revisionGuardStore/databases/tingodb.js @@ -49,17 +49,21 @@ _.extend(Tingo.prototype, { this.db.close(callback || function () {}); }, - getNewId: function(callback) { + getNewId: function(prefix, callback) { callback(null, new ObjectID().toString()); }, - get: function (id, callback) { + get: function (prefix, id, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } + id = prefix + '::' + id; + this.store.findOne({ _id: id }, function (err, entry) { if (err) { return callback(err); @@ -73,19 +77,28 @@ _.extend(Tingo.prototype, { }); }, - set: function (id, revision, oldRevision, callback) { + set: function (prefix, id, data, revision, oldRevision, callback) { + prefix = prefix || 'default'; + if (!id || !_.isString(id)) { var err = new Error('Please pass a valid id!'); debug(err); return callback(err); } + if (typeof data !== 'object') { + var err = new Error('Please pass a valid data object or null!'); + debug(err); + return callback(err); + } if (!revision || !_.isNumber(revision)) { var err = new Error('Please pass a valid revision!'); debug(err); return callback(err); } - this.store.update({ _id: id, revision: oldRevision }, { _id: id, revision: revision }, { safe: true, upsert: true }, function (err, modifiedCount) { + id = prefix + '::' + id; + var newData = data ? { _id: id ,revision, data } : { _id: id, revision }; + this.store.update({ _id: id, revision: oldRevision }, newData, { safe: true, upsert: true }, function (err, modifiedCount) { if (modifiedCount === 0) { err = new ConcurrencyError(); debug(err); @@ -107,14 +120,18 @@ _.extend(Tingo.prototype, { }); }, - saveLastEvent: function (evt, callback) { - this.store.save({ _id: 'THE_LAST_SEEN_EVENT', event: evt }, { safe: true }, function (err) { + saveLastEvent: function (prefix, evt, callback) { + prefix = prefix || 'default'; + + this.store.save({ _id: prefix + '::THE_LAST_SEEN_EVENT', event: evt }, { safe: true }, function (err) { if (callback) { callback(err); } }); }, - getLastEvent: function (callback) { - this.store.findOne({ _id: 'THE_LAST_SEEN_EVENT' }, function (err, entry) { + getLastEvent: function (prefix, callback) { + prefix = prefix || 'default'; + + this.store.findOne({ _id: prefix + '::THE_LAST_SEEN_EVENT' }, function (err, entry) { if (err) { return callback(err); } @@ -127,7 +144,8 @@ _.extend(Tingo.prototype, { }); }, - clear: function (callback) { + clear: function (prefix, callback) { + // TODO: remove only for prefix this.store.remove({}, { safe: true }, callback); } diff --git a/lib/revisionGuardStore/index.js b/lib/revisionGuardStore/index.js index efcd24f..2cf2c9c 100644 --- a/lib/revisionGuardStore/index.js +++ b/lib/revisionGuardStore/index.js @@ -32,7 +32,7 @@ function getSpecificDbImplementation(options) { if (!exists(dbPath)) { var errMsg = 'Implementation for db "' + options.type + '" does not exist!'; - console.log(errMsg); + console.error(errMsg); throw new Error(errMsg); } @@ -46,7 +46,7 @@ function getSpecificDbImplementation(options) { err.message.lastIndexOf("'") !== err.message.indexOf("'")) { var moduleName = err.message.substring(err.message.indexOf("'") + 1, err.message.lastIndexOf("'")); - console.log('Please install module "' + moduleName + + console.error('Please install module "' + moduleName + '" to work with db implementation "' + options.type + '"!'); } diff --git a/package.json b/package.json index 3d7f89a..cc88046 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "author": "adrai", "name": "cqrs-eventdenormalizer", - "version": "1.17.0", + "version": "2.0.0", "private": false, "main": "index.js", "engines": { diff --git a/releasenotes.md b/releasenotes.md index 3b7c3db..d9d9faf 100644 --- a/releasenotes.md +++ b/releasenotes.md @@ -1,3 +1,13 @@ +## [v2.0.0](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.17.0...v2.0.0) +- IMPORTANT: FULL REBUILD NEEDED!! + new revision guard prefix strucutre got introduced to make parsing the key possible. + The API stays the same - The prefix option can still be added or omitted as before. +- a onBeforeSet middleware got added which allows to save a full / parts of an events to the revison guard next to the revision number itself. +- getLastEventOfEachAggregate function got added to the exposed denormalizer API. It returns an array of functions which on invocation + return the value that got saved for a key (aggregate) within the revision guard. The onBeforeSet middleware can be used to alter the value, + the revision will always be present. + +- introduce skip steps [#89](https://github.com/adrai/node-cqrs-eventdenormalizer/pull/89) thanks to [Robin Fehr](https://github.com/robinfehr) ## [v1.17.0](https://github.com/adrai/node-cqrs-eventdenormalizer/compare/v1.16.57...v1.17.0) - introduce skip steps [#89](https://github.com/adrai/node-cqrs-eventdenormalizer/pull/89) thanks to [Robin Fehr](https://github.com/robinfehr) diff --git a/test/integration/integrationTest.js b/test/integration/integrationTest.js index ba90eff..657c45b 100644 --- a/test/integration/integrationTest.js +++ b/test/integration/integrationTest.js @@ -2571,6 +2571,104 @@ describe('integration', function () { }); + describe('inject a onBeforeSet middleware', function () { + + describe('call getLastEventOfEachAggregate', function () { + + it('it should store the occuredAt date within the revision guard', function (done) { + var publishedEvents = []; + denorm.onEvent(function (evt) { + publishedEvents.push(evt); + }); + + var publishedNotis = []; + denorm.onNotification(function (noti) { + publishedNotis.push(noti); + }); + + var evt1= { + id: 'evtId', + commandId: 'cmdId', + event: 'registeredEMailAddress', + payload: { + id: '12345678911', + email: 'abc@d.e' + }, + head: { + revision: 4, + version: 2, + userId: 'userId' + }, + occuredAt: '2020-06-09T22:55:54.946Z' + }; + + var evt2= { + id: 'evtId', + commandId: 'cmdId', + event: 'registeredEMailAddress', + payload: { + id: '12345678910', + email: 'abc@d.e' + }, + head: { + revision: 2, + version: 2, + userId: 'userId' + }, + occuredAt: '2020-06-10T22:55:54.946Z' + }; + + denorm.options.revisionGuard.middlewares.onBeforeSet = function (evt, next) { + next({ occuredAt: evt.occuredAt }); + } + + denorm.handle(evt1, function (errs, e, notis) { + expect(errs).not.to.be.ok(); + expect(e).to.eql(evt1); + + + denorm.handle(evt2, function (errs, e, notis) { + expect(errs).not.to.be.ok(); + expect(e).to.eql(evt2); + + var count = 0; + denorm.getLastEventOfEachAggregate((err, aggregateHandleFns) => { + expect(err).not.to.be.ok(); + expect(aggregateHandleFns.length).to.eql(2); + aggregateHandleFns.forEach(handleFn => { + handleFn((err, data) => { + + expect(err).not.to.be.ok(); + expect(data.id).to.be.ok(); + + if (data.id.indexOf('12345678910') >= 0) { + expect(data.id).to.eql('readmodel_revision::aggId:12345678910'); + expect(data.value).to.eql({ revision: 3, data: { occuredAt: '2020-06-10T22:55:54.946Z' } }); + count++; + } + if (data.id.indexOf('12345678911') >= 0) { + expect(data.id).to.eql('readmodel_revision::aggId:12345678911'); + expect(data.value).to.eql({ revision: 5, data: { occuredAt: '2020-06-09T22:55:54.946Z' } }); + count++; + } + + if (count === 2) { + done(); + } + }); + }); + }); + + }); + + }); + + }); + + }); + + }); + describe('handling an command rejected event', function () { before(function (done) { denorm = api({ diff --git a/test/unit/denormalizerTest.js b/test/unit/denormalizerTest.js index 95ae8e3..f298014 100644 --- a/test/unit/denormalizerTest.js +++ b/test/unit/denormalizerTest.js @@ -53,6 +53,7 @@ describe('denormalizer', function () { expect(denorm.init).to.be.a('function'); expect(denorm.handle).to.be.a('function'); expect(denorm.getLastEvent).to.be.a('function'); + expect(denorm.getLastEventOfEachAggregate).to.be.a('function'); expect(denorm.options.retryOnConcurrencyTimeout).to.eql(800); expect(denorm.options.commandRejectedEventName).to.eql('commandRejected'); @@ -107,6 +108,7 @@ describe('denormalizer', function () { expect(denorm.init).to.be.a('function'); expect(denorm.handle).to.be.a('function'); expect(denorm.getLastEvent).to.be.a('function'); + expect(denorm.getLastEventOfEachAggregate).to.be.a('function'); expect(denorm.options.retryOnConcurrencyTimeout).to.eql(800); expect(denorm.options.commandRejectedEventName).to.eql('commandRejected'); @@ -932,7 +934,7 @@ describe('denormalizer', function () { var calledStore = false; denorm.revisionGuardStore = { - get: function (aggId, clb) { + get: function (prefix, aggId, clb) { expect(aggId).to.eql('aggId'); calledStore = true; @@ -992,7 +994,7 @@ describe('denormalizer', function () { var calledStore = false; denorm.revisionGuardStore = { - get: function (aggId, clb) { + get: function (prefix, aggId, clb) { expect(aggId).to.eql('aggId'); calledStore = true; @@ -1327,6 +1329,31 @@ describe('denormalizer', function () { }); + + describe('calling getLastEventOfEachAggregate', function () { + + var denorm; + + beforeEach(function () { + denorm = api({ denormalizerPath: __dirname }); + }); + + it('it should work as expected', function (done) { + var callback = function () {}; + + denorm.revisionGuardStore = { + getValueOfEachId: function (prefix, callbackFn) { + expect(callbackFn).to.eql(callback) + done(); + } + }; + + denorm.getLastEventOfEachAggregate(callback); + + }); + + }); + describe('loading custom structure', function() { it('it should return as expected', function(done) { var denorm = api({ diff --git a/test/unit/replayHandlerTest.js b/test/unit/replayHandlerTest.js index 51aa35b..4041033 100644 --- a/test/unit/replayHandlerTest.js +++ b/test/unit/replayHandlerTest.js @@ -114,7 +114,7 @@ describe('replayHandler', function () { }); beforeEach(function (done) { - store.clear(done); + store.clear(null, done); }); describe('normally', function () { @@ -183,7 +183,7 @@ describe('replayHandler', function () { } }, def); - repl = new ReplayHandler(disp, store, def); + repl = new ReplayHandler(disp, store, def, { revisionGuard: { prefix: 'readmodel_revision' } }); repl.replay(evts, function (err) { expect(err).not.to.be.ok(); @@ -206,11 +206,11 @@ describe('replayHandler', function () { expect(saveRvmsCalled3).to.eql(false); expect(saveRvmsCalled4).to.eql(false); - store.get('ctxagg1aggId1', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg1::aggId:aggId1', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(4); - store.get('ctxagg2aggId2', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg2::aggId:aggId2', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(6); @@ -278,7 +278,7 @@ describe('replayHandler', function () { } }, def); - repl = new ReplayHandler(disp, store, def); + repl = new ReplayHandler(disp, store, def, { revisionGuard: { prefix: 'readmodel_revision' } }); repl.replay(evts, function (err) { expect(err).not.to.be.ok(); @@ -298,11 +298,11 @@ describe('replayHandler', function () { expect(saveRvmsCalled2).to.eql(true); expect(saveRvmsCalled3).to.eql(false); - store.get('aggId1', function (err, rev) { + store.get(null, 'aggId1', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).not.to.be.ok(); - store.get('aggId2', function (err, rev) { + store.get(null, 'aggId2', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).not.to.be.ok(); @@ -391,7 +391,7 @@ describe('replayHandler', function () { } }, def); - repl = new ReplayHandler(disp, store, def); + repl = new ReplayHandler(disp, store, def, { revisionGuard: { prefix: 'readmodel_revision' } }); repl.replay(evts, function (err) { expect(err).not.to.be.ok(); @@ -421,11 +421,11 @@ describe('replayHandler', function () { expect(saveRvmsCalled3).to.eql(false); expect(saveRvmsCalled4).to.eql(false); - store.get('ctxagg1aggId1', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg1::aggId:aggId1', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(4); - store.get('ctxagg2aggId2', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg2::aggId:aggId2', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(6); @@ -495,7 +495,7 @@ describe('replayHandler', function () { } }, def); - repl = new ReplayHandler(disp, store, def); + repl = new ReplayHandler(disp, store, def, { revisionGuard: { prefix: 'readmodel_revision' } }); repl.replayStreamed(function (replay, finished) { @@ -521,11 +521,11 @@ describe('replayHandler', function () { expect(saveRvmsCalled2).to.eql(true); expect(saveRvmsCalled3).to.eql(false); - store.get('ctxagg1aggId1', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg1::aggId:aggId1', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(4); - store.get('ctxagg2aggId2', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg2::aggId:aggId2', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(6); @@ -591,7 +591,7 @@ describe('replayHandler', function () { } }, def); - repl = new ReplayHandler(disp, store, def); + repl = new ReplayHandler(disp, store, def, { revisionGuard: { prefix: 'readmodel_revision' } }); repl.replayStreamed(function (replay, finished) { @@ -642,11 +642,11 @@ describe('replayHandler', function () { expect(saveRvmsCalled2).to.eql(true); expect(saveRvmsCalled3).to.eql(false); - store.get('ctxagg1aggId1', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg1::aggId:aggId1', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(4); - store.get('ctxagg2aggId2', function (err, rev) { + store.get('readmodel_revision', 'ctx:ctx::agg:agg2::aggId:aggId2', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(6); @@ -720,7 +720,7 @@ describe('replayHandler', function () { } }, def); - repl = new ReplayHandler(disp, store, def); + repl = new ReplayHandler(disp, store, def, { revisionGuard: { prefix: 'readmodel_revision' } }); repl.replayStreamed(function (replay, finished) { @@ -746,11 +746,11 @@ describe('replayHandler', function () { expect(saveRvmsCalled2).to.eql(true); expect(saveRvmsCalled3).to.eql(false); - store.get('aggId1', function (err, rev) { + store.get(null, 'aggId1', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).not.to.be.ok(); - store.get('aggId2', function (err, rev) { + store.get(null, 'aggId2', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).not.to.be.ok(); diff --git a/test/unit/revisionGuardStoreTest.js b/test/unit/revisionGuardStoreTest.js index d0fdfff..10fa072 100644 --- a/test/unit/revisionGuardStoreTest.js +++ b/test/unit/revisionGuardStoreTest.js @@ -1,5 +1,4 @@ var expect = require('expect.js'), - async = require('async'), revisionGuardStore = require('../../lib/revisionGuardStore'), Base = require('../../lib/revisionGuardStore/base'), InMemory = require('../../lib/revisionGuardStore/databases/inmemory'); @@ -103,7 +102,10 @@ describe('revisionGuardStore', function() { expect(store.set).to.be.a('function'); expect(store.saveLastEvent).to.be.a('function'); expect(store.getLastEvent).to.be.a('function'); - + if (type === 'redis') { + expect(store.getValueOfEachId).to.be.a('function'); + expect(store.getValueOfId).to.be.a('function'); + } }); }); @@ -199,7 +201,7 @@ describe('revisionGuardStore', function() { it('it should callback with a new Id as string', function(done) { - store.getNewId(function(err, id) { + store.getNewId(null, function(err, id) { expect(err).not.to.be.ok(); expect(id).to.be.a('string'); done(); @@ -212,14 +214,14 @@ describe('revisionGuardStore', function() { describe('having no entries', function() { before(function(done) { - store.clear(done); + store.clear(null, done); }); describe('calling get', function() { it('it should callback with an empty revision', function(done) { - store.get('23', function (err, rev) { + store.get(null, '23', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).not.to.be.ok(); done(); @@ -233,10 +235,10 @@ describe('revisionGuardStore', function() { it('it should work as expected', function(done) { - store.set('23', 5, 4, function (err) { + store.set(null, '23', null, 5, 4, function (err) { expect(err).not.to.be.ok(); - store.get('23', function (err, rev) { + store.get(null, '23', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(5); @@ -250,11 +252,11 @@ describe('revisionGuardStore', function() { it('it should callback with a ConcurrencyError', function(done) { - store.set('23', 6, 4, function (err) { + store.set(null, '23', null, 6, 4, function (err) { expect(err).to.be.ok(); expect(err.name).to.eql('ConcurrencyError'); - store.get('23', function (err, rev) { + store.get(null, '23', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(5); @@ -270,11 +272,11 @@ describe('revisionGuardStore', function() { it('it should callback with a ConcurrencyError', function(done) { - store.set('23', 6, 7, function (err) { + store.set(null, '23', null, 6, 7, function (err) { expect(err).to.be.ok(); expect(err.name).to.eql('ConcurrencyError'); - store.get('23', function (err, rev) { + store.get(null, '23', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(5); @@ -290,11 +292,11 @@ describe('revisionGuardStore', function() { it('it should callback with a ConcurrencyError', function(done) { - store.set('23', 6, 6, function (err) { + store.set(null, '23', null, 6, 6, function (err) { expect(err).to.be.ok(); expect(err.name).to.eql('ConcurrencyError'); - store.get('23', function (err, rev) { + store.get(null, '23', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(5); @@ -310,10 +312,10 @@ describe('revisionGuardStore', function() { it('it should callback without an error', function(done) { - store.set('2345', 2, null, function (err) { + store.set(null, '2345', null, 2, null, function (err) { expect(err).not.to.be.ok(); - store.get('2345', function (err, rev) { + store.get(null, '2345', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(2); @@ -329,10 +331,10 @@ describe('revisionGuardStore', function() { it('it should callback without an error', function(done) { - store.set('23', 6, 5, function (err) { + store.set(null, '23', null, 6, 5, function (err) { expect(err).not.to.be.ok(); - store.get('23', function (err, rev) { + store.get(null, '23', function (err, rev) { expect(err).not.to.be.ok(); expect(rev).to.eql(6); @@ -350,21 +352,21 @@ describe('revisionGuardStore', function() { it('it should work as expected', function (done) { - store.getLastEvent(function (err, evt) { + store.getLastEvent(null, function (err, evt) { expect(err).not.to.be.ok(); expect(evt).not.to.be.ok(); - store.saveLastEvent({ my: 'evt' }, function (err) { + store.saveLastEvent(null, { my: 'evt' }, function (err) { expect(err).not.to.be.ok(); - store.getLastEvent(function (err, evt) { + store.getLastEvent(null, function (err, evt) { expect(err).not.to.be.ok(); expect(evt.my).to.eql('evt'); - store.clear(function (err) { + store.clear(null, function (err) { expect(err).not.to.be.ok(); - store.getLastEvent(function (err, evt) { + store.getLastEvent(null, function (err, evt) { expect(err).not.to.be.ok(); expect(evt).not.to.be.ok(); @@ -380,6 +382,67 @@ describe('revisionGuardStore', function() { }); + if (type === 'redis') { + describe('get value for each id', function() { + + it('it should work as expected', function (done) { + var count = 0; + + store.set(null, '23', { aggregate: 'test' }, 7, 6, function (err) { + expect(err).not.to.be.ok(); + + + store.set(null, '24', { aggregate: 'test1' }, 5, 4, function (err) { + expect(err).not.to.be.ok(); + + store.set(null, '25', null, 5, 4, function (err) { + expect(err).not.to.be.ok(); + + store.getValueOfEachId(null, (err, aggregateHandleFns) => { + expect(err).not.to.be.ok(); + + aggregateHandleFns.forEach(handleFn => { + handleFn((err, data) => { + + expect(err).to.not.be.ok(); + expect(data.id).to.be.ok(); + + if (data.id.indexOf('23') >= 0) { + expect(data.id).to.eql('default::23'); + expect(data.value).to.eql({ revision: 7, data: { aggregate: 'test' } }); + count++; + } + if (data.id.indexOf('24') >= 0) { + expect(data.id).to.eql('default::24'); + expect(data.value).to.eql({ revision: 5, data: { aggregate: 'test1' } }); + count++; + } + if (data.id.indexOf('25') >= 0) { + expect(data.id).to.eql('default::25'); + expect(data.value).to.eql({ revision: 5 }); + count++; + } + + if (count === 3) { + done(); + } + }); + + }); + + }); + + }) + + }); + + }); + + }); + + }); + } + }); }); diff --git a/test/unit/revisionGuardTest.js b/test/unit/revisionGuardTest.js index 7de39d8..ef0c4a4 100644 --- a/test/unit/revisionGuardTest.js +++ b/test/unit/revisionGuardTest.js @@ -181,7 +181,7 @@ describe('revisionGuard', function () { beforeEach(function (done) { guard.currentHandlingRevisions = {}; - store.clear(done); + store.clear(null, done); }); describe('in correct order', function () { @@ -256,7 +256,7 @@ describe('revisionGuard', function () { beforeEach(function (done) { specialGuard.currentHandlingRevisions = {}; - store.clear(done); + store.clear(null, done); }); it('it should work as expected', function (done) { @@ -274,14 +274,12 @@ describe('revisionGuard', function () { var start1 = Date.now(); specialGuard.guard(evt1, function (err, finish1) { var diff1 = Date.now() - start1; - console.log('guarded 1: ' + diff1); expect(err).not.to.be.ok(); setTimeout(function () { start1 = Date.now(); finish1(function (err) { diff1 = Date.now() - start1; - console.log('finished 1: ' + diff1); expect(err).not.to.be.ok(); expect(guarded).to.eql(0); check(); @@ -292,13 +290,11 @@ describe('revisionGuard', function () { var start2 = Date.now(); specialGuard.guard(evt2, function (err, finish2) { var diff2 = Date.now() - start2; - console.log('guarded 2: ' + diff2); expect(err).not.to.be.ok(); start2 = Date.now(); finish2(function (err) { diff2 = Date.now() - start2; - console.log('finished 2: ' + diff2); expect(err).not.to.be.ok(); expect(guarded).to.eql(1); check(); @@ -308,13 +304,11 @@ describe('revisionGuard', function () { var start3 = Date.now(); specialGuard.guard(evt3, function (err, finish3) { var diff3 = Date.now() - start3; - console.log('guarded 3: ' + diff3); expect(err).not.to.be.ok(); start3 = Date.now(); finish3(function (err) { diff3 = Date.now() - start3; - console.log('finished 3: ' + diff3); expect(err).not.to.be.ok(); expect(guarded).to.eql(2); check(); @@ -344,7 +338,7 @@ describe('revisionGuard', function () { meta: 'meta' }); specialGuard.currentHandlingRevisions = {}; - store.clear(done); + store.clear(null, done); }); it('and guarding an event with revision greater than expected, it should emit an eventMissing event', function (done) { @@ -455,7 +449,7 @@ describe('revisionGuard', function () { expect(err.name).to.eql('AlreadyDenormalizedError'); expect(guarded).to.eql(3); - store.getLastEvent(function (err, evt) { + store.getLastEvent(null, function (err, evt) { expect(err).not.to.be.ok(); expect(evt.id).to.eql(evt3.id); done();