diff --git a/lib/eventstore.js b/lib/eventstore.js index 3539e1cd..15e0ce3f 100644 --- a/lib/eventstore.js +++ b/lib/eventstore.js @@ -474,64 +474,72 @@ _.extend(Eventstore.prototype, { commit: function(eventstream, callback) { var self = this; + var waterfallFuncs = []; + var idSet = []; + + waterfallFuncs.push(function getNewId(callback) { + self.getNewId(callback); + }); - async.waterfall([ - - function getNewCommitId(callback) { + // Generate an id for each uncommitted event + for (var i= 0, len = eventstream.uncommittedEvents.length; i < len; i++) { + waterfallFuncs.push(function getAnotherId(previousId, callback) { + idSet.push(previousId); self.getNewId(callback); - }, + }); + } + waterfallFuncs.push(function commitEvents(id, callback) { + idSet.push(id); + + // start committing. + var event, + currentRevision = eventstream.currentRevision(), + uncommittedEvents = [].concat(eventstream.uncommittedEvents); + eventstream.uncommittedEvents = []; + + self.store.getNextPositions(uncommittedEvents.length, function(err, positions) { + if (err) + return callback(err) + + for (var i = 0, len = uncommittedEvents.length; i < len; i++) { + event = uncommittedEvents[i]; + event.id = idSet[i+1]; // additional ids generated by getAnotherId function + event.commitId = idSet[0]; // first id generated is for the commit + event.commitSequence = i; + event.restInCommitStream = len - 1 - i; + event.commitStamp = new Date(); + currentRevision++; + event.streamRevision = currentRevision; + if (positions) + event.position = positions[i]; + + event.applyMappings(); + } - function commitEvents(id, callback) { - // start committing. - var event, - currentRevision = eventstream.currentRevision(), - uncommittedEvents = [].concat(eventstream.uncommittedEvents); - eventstream.uncommittedEvents = []; - - self.store.getNextPositions(uncommittedEvents.length, function(err, positions) { - if (err) - return callback(err) - - for (var i = 0, len = uncommittedEvents.length; i < len; i++) { - event = uncommittedEvents[i]; - event.id = id + i.toString(); - event.commitId = id; - event.commitSequence = i; - event.restInCommitStream = len - 1 - i; - event.commitStamp = new Date(); - currentRevision++; - event.streamRevision = currentRevision; - if (positions) - event.position = positions[i]; - - event.applyMappings(); + self.store.addEvents(uncommittedEvents, function(err) { + if (err) { + // add uncommitted events back to eventstream + eventstream.uncommittedEvents = uncommittedEvents.concat(eventstream.uncommittedEvents); + return callback(err); } - self.store.addEvents(uncommittedEvents, function(err) { - if (err) { - // add uncommitted events back to eventstream - eventstream.uncommittedEvents = uncommittedEvents.concat(eventstream.uncommittedEvents); - return callback(err); - } - - if (self.publisher && self.dispatcher) { - // push to undispatchedQueue - self.dispatcher.addUndispatchedEvents(uncommittedEvents); - } else { - eventstream.eventsToDispatch = [].concat(uncommittedEvents); - } + if (self.publisher && self.dispatcher) { + // push to undispatchedQueue + self.dispatcher.addUndispatchedEvents(uncommittedEvents); + } else { + eventstream.eventsToDispatch = [].concat(uncommittedEvents); + } - // move uncommitted events to events - eventstream.events = eventstream.events.concat(uncommittedEvents); - eventstream.currentRevision(); + // move uncommitted events to events + eventstream.events = eventstream.events.concat(uncommittedEvents); + eventstream.currentRevision(); - callback(null, eventstream); - }); + callback(null, eventstream); }); - }], - - callback - ); + }); + }); + + async.waterfall(waterfallFuncs, callback); }, /**