diff --git a/lib/databases/redis.js b/lib/databases/redis.js index 32e89560..ce2a1de1 100644 --- a/lib/databases/redis.js +++ b/lib/databases/redis.js @@ -251,8 +251,9 @@ _.extend(Redis.prototype, { } var savedKeysAndEvents = events.map(function(event, index) { - var key = prefix + ':' + eventKey(event); - event.streamRevision = parseInt(revisions[index], 10) - 1; + var streamRevision = parseInt(revisions[index], 10) - 1; + var key = prefix + ':' + eventKey(event) + ':' + streamRevision; + event.streamRevision = streamRevision; event.applyMappings(); return [key, JSON.stringify(event)]; }); @@ -427,13 +428,24 @@ _.extend(Redis.prototype, { return s; }); - if (revMax === -1) { - allKeys = allKeys.slice(revMin); + var getRevision = function(key) { + var parts = key.split(':') + return parts[parts.length - 1] } - else { - allKeys = allKeys.slice(revMin, revMax); + + var filterMin = function(key) { + return getRevision(key) >= revMin + } + + var filterMinAndMax = function(key) { + var revision = getRevision(key) + return revision >= revMin && revision < revMax } + var filterFunc = (revMax && revMax !== -1) ? filterMinAndMax : filterMin + + allKeys = allKeys.filter(filterFunc) + if (allKeys.length === 0) { return callback(null, []); } diff --git a/test/storeTest.js b/test/storeTest.js index 27e4b7d8..1bb6bee3 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -2167,6 +2167,36 @@ types.forEach(function (type) { }); + if (type === 'redis') { + describe('when events do not start at revision 0', function () { + + it('should return the events specified by the revMin and revMax', function (done) { + + var firstEventMatcher = options.prefix + ':' + options.eventsCollectionName + ':*:*:*:*:' + 'idWithAgg' + ':*:0' + var revMin = 1 + var revMax = 2 + + store.client.keys(firstEventMatcher, (err, keys) => { + if (err) { console.log(err) } + + store.client.del(keys[0], (err, keysDeleted) => { + expect(keysDeleted).to.equal(1) + + store.getEventsByRevision({ aggregateId: 'idWithAgg' }, revMin, revMax, function (err, evts) { + expect(err).not.to.be.ok(); + expect(evts.length).to.eql(1); + expect(evts[0].aggregateId).to.eql(stream2[1].aggregateId); + expect(evts[0].commitStamp.getTime()).to.eql(stream2[1].commitStamp.getTime()); + expect(evts[0].streamRevision).to.eql(stream2[1].streamRevision); + + done(); + }); + }) + }) + }) + }) + } + }); }); @@ -2283,7 +2313,7 @@ types.forEach(function (type) { }); - describe('and limit it with skip and limit', function () { + describe('and limit it with revMin and revMax', function () { it('it should return the correct events', function (done) {