Skip to content

Commit 752c223

Browse files
authored
Play: Inline single-use functions in channel.js (#92)
There are several helper functions which get called in `channel.js` by only a single function. In this patch we've inlined those functions in order to make the sequence of logic and fetching more obvious. This isn't designed specifically to change the codebase as much as build a better understanding of the asynchronous points in the channel where concurrency issues could arise. This patch may or may not be considered for merging.
1 parent b586384 commit 752c223

File tree

1 file changed

+38
-86
lines changed

1 file changed

+38
-86
lines changed

src/simperium/channel.js

+38-86
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*eslint no-shadow: 0*/
2-
import { format, inherits } from 'util'
3-
import { EventEmitter } from 'events'
4-
import { parseMessage, parseVersionMessage, change as change_util } from './util'
5-
import { v4 as uuid } from 'uuid'
2+
import {format, inherits} from 'util'
3+
import {EventEmitter} from 'events'
4+
import {change as change_util, parseMessage, parseVersionMessage} from './util'
5+
import {v4 as uuid} from 'uuid'
66
import buildOperation from './util/operation';
77

88
const UNKNOWN_CV = '?';
@@ -33,21 +33,6 @@ internal.updateChangeVersion = function( cv ) {
3333
} );
3434
};
3535

36-
/**
37-
* Called when receive a change from the network. Attempt to apply the change
38-
* to the ghost object and notify.
39-
*
40-
* @param {String} id - id of the object changed
41-
* @param {Object} change - the change to apply to the object
42-
*/
43-
internal.changeObject = function( id, change ) {
44-
// pull out the object from the store and apply the change delta
45-
var applyChange = internal.performChange.bind( this, change );
46-
this.networkQueue.queueFor( id ).add( function( done ) {
47-
return applyChange().then( done, done );
48-
} );
49-
};
50-
5136
/**
5237
* Creates a change operation for the object of `id` that changes
5338
* from the date stored in the `ghost` into the data of `object`.
@@ -79,28 +64,6 @@ internal.buildModifyChange = function( id, object, ghost ) {
7964
this.localQueue.queue( { type: 'modify', id, object } );
8065
};
8166

82-
/**
83-
* Creates a change object that deletes an object from a bucket.
84-
*
85-
* Queues the change for syncing.
86-
*
87-
* @param {String} id - object to remove
88-
* @param {Object} ghost - current ghost object for the given id
89-
*/
90-
internal.buildRemoveChange = function( id ) {
91-
this.localQueue.queue( { type: 'remove', id } );
92-
};
93-
94-
internal.diffAndSend = function( id, object ) {
95-
var modify = internal.buildModifyChange.bind( this, id, object );
96-
return this.store.get( id ).then( modify );
97-
};
98-
99-
internal.removeAndSend = function( id ) {
100-
var remove = internal.buildRemoveChange.bind( this, id );
101-
return this.store.get( id ).then( remove );
102-
};
103-
10467
/**
10568
* Updates the ghost store with the updated ghost data based on the data that
10669
* comes from applying the patch to the original.
@@ -169,11 +132,6 @@ internal.updateAcknowledged = function( change ) {
169132
}
170133
};
171134

172-
internal.performChange = function( change ) {
173-
var success = internal.applyChange.bind( this, change );
174-
return this.store.get( change.id ).then( success );
175-
};
176-
177135
internal.findAcknowledgedChange = function( change ) {
178136
var possibleChange = this.localQueue.sent[change.id];
179137
if ( possibleChange ) {
@@ -427,9 +385,10 @@ inherits( Channel, EventEmitter );
427385
*/
428386
Channel.prototype.update = function( object, sync = true ) {
429387
this.onBucketUpdate( object.id );
388+
430389
if ( sync === true ) {
431-
return internal
432-
.diffAndSend.call( this, object.id, object.data )
390+
return this.store.get( object.id )
391+
.then( ghost => internal.buildModifyChange.call( this, object.id, object.data, ghost ) )
433392
.then( () => object );
434393
}
435394
return Promise.resolve( object );
@@ -452,7 +411,7 @@ Channel.prototype.setIsIndexing = function( isIndexing ) {
452411
* @param {String} id - the id of the object to remove
453412
*/
454413
Channel.prototype.remove = function( id ) {
455-
internal.removeAndSend.call( this, id )
414+
this.store.get( id ).then( () => this.localQueue.queue( { type: 'remove', id } ) );
456415
}
457416

458417
/**
@@ -484,7 +443,7 @@ Channel.prototype.getRevisions = function( id ) {
484443
* @returns {Promise<Boolean>} true if there are still changes to sync
485444
*/
486445
Channel.prototype.hasLocalChanges = function() {
487-
return Promise.resolve( this.localQueue.hasChanges() );
446+
return Promise.resolve( Object.keys( this.localQueue.queues ).length > 0 );
488447
}
489448

490449
/**
@@ -663,12 +622,16 @@ Channel.prototype.sendChangeVersionRequest = function( cv ) {
663622
this.send( format( 'cv:%s', cv ) );
664623
};
665624

666-
Channel.prototype.onChanges = function( data ) {
667-
var changes = JSON.parse( data ),
668-
onChange = internal.changeObject.bind( this );
669-
670-
changes.forEach( function( change ) {
671-
onChange( change.id, change );
625+
Channel.prototype.onChanges = function( changes ) {
626+
JSON.parse( changes ).forEach( change => {
627+
// pull out the object from the store and apply the change delta
628+
this.networkQueue
629+
.queueFor( change.id )
630+
.add(
631+
done => this.store.get( change.id )
632+
.then( ghost => internal.applyChange.call( this, change, ghost ) )
633+
.then( done, done )
634+
);
672635
} );
673636
// emit ready after all server changes have been applied
674637
this.emit( 'ready' );
@@ -794,15 +757,10 @@ LocalQueue.prototype.queue = function( change ) {
794757
this.processQueue( change.id );
795758
};
796759

797-
LocalQueue.prototype.hasChanges = function() {
798-
return Object.keys( this.queues ).length > 0;
799-
};
800-
801760
LocalQueue.prototype.dequeueChangesFor = function( id ) {
802761
var changes = [], sent = this.sent[id], queue = this.queues[id];
803762

804763
if ( sent ) {
805-
this.sent[id];
806764
changes.push( sent );
807765
}
808766

@@ -816,7 +774,6 @@ LocalQueue.prototype.dequeueChangesFor = function( id ) {
816774

817775
LocalQueue.prototype.processQueue = function( id ) {
818776
var queue = this.queues[id];
819-
var compressAndSend = this.compressAndSend.bind( this, id );
820777

821778
// there is no queue, don't do anything
822779
if ( !queue ) return;
@@ -832,34 +789,29 @@ LocalQueue.prototype.processQueue = function( id ) {
832789
this.emit( 'wait', id );
833790
return;
834791
}
835-
this.store.get( id ).then( compressAndSend );
836-
}
837792

838-
LocalQueue.prototype.compressAndSend = function( id, ghost ) {
839-
var changes = this.queues[id];
840-
var change;
841-
842-
// a change was sent before we could compress and send
843-
if ( this.sent[id] ) {
844-
this.emit( 'wait', id );
845-
return;
846-
}
793+
this.store.get( id ).then( ghost => {
794+
const changes = this.queues[id];
847795

848-
const sending = changes.reduce( ( chosen, next ) => {
849-
if ( chosen.type === 'remove' ) {
850-
return chosen;
796+
// a change was sent before we could compress and send
797+
if ( this.sent[id] ) {
798+
this.emit( 'wait', id );
799+
return;
851800
}
852-
return next;
853-
} );
854801

855-
change = buildOperation( sending, ghost );
856-
this.queues[id] = [];
857-
if ( change_util.isEmptyChange( change ) ) {
858-
return;
859-
}
860-
this.sent[id] = change;
861-
this.emit( 'send', change );
862-
}
802+
const sending = changes.reduce(
803+
( chosen, next ) => 'remove' === chosen.type ? chosen : next
804+
);
805+
806+
const change = buildOperation( sending, ghost );
807+
this.queues[id] = [];
808+
if ( change_util.isEmptyChange( change ) ) {
809+
return;
810+
}
811+
this.sent[id] = change;
812+
this.emit( 'send', change );
813+
});
814+
};
863815

864816
LocalQueue.prototype.resendSentChanges = function() {
865817
for ( let ccid in this.sent ) {

0 commit comments

Comments
 (0)