From 1276f89525f6ba6f1fc9b31975193daecc3ba2b8 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 7 Mar 2018 10:56:37 -0800 Subject: [PATCH 1/4] Extracts queue classes from channel module --- src/simperium/channel.js | 196 +------------------------- src/simperium/queues/README.md | 25 ++++ src/simperium/queues/local-queue.js | 144 +++++++++++++++++++ src/simperium/queues/network-queue.js | 21 +++ src/simperium/queues/queue.js | 40 ++++++ 5 files changed, 232 insertions(+), 194 deletions(-) create mode 100644 src/simperium/queues/README.md create mode 100644 src/simperium/queues/local-queue.js create mode 100644 src/simperium/queues/network-queue.js create mode 100644 src/simperium/queues/queue.js diff --git a/src/simperium/channel.js b/src/simperium/channel.js index 4f59d15..7060bc2 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -3,6 +3,8 @@ import { format, inherits } from 'util' import { EventEmitter } from 'events' import { parseMessage, parseVersionMessage, change as change_util } from './util' import { v4 as uuid } from 'uuid' +import NetworkQueue from './queues/network-queue'; +import LocalQueue from './queues/local-queue'; const UNKNOWN_CV = '?'; const CODE_INVALID_VERSION = 405; @@ -342,7 +344,6 @@ internal.indexingComplete = function() { * @returns {Promise} - resolves once the change version is saved */ - /** * Maintains syncing state for a Simperium bucket. * @@ -662,199 +663,6 @@ Channel.prototype.onVersion = function( data ) { this.emit( 'version.' + ghost.id + '.' + ghost.version, ghost.data ); }; -function NetworkQueue() { - this.queues = {}; -} - -NetworkQueue.prototype.queueFor = function( id ) { - var queues = this.queues, - queue = queues[id]; - - if ( !queue ) { - queue = new Queue(); - queue.on( 'finish', function() { - delete queues[id]; - } ); - queues[id] = queue; - } - - return queue; -}; - -function Queue() { - this.queue = []; - this.running = false; -} - -inherits( Queue, EventEmitter ); - -// Add a function at the end of the queue -Queue.prototype.add = function( fn ) { - this.queue.push( fn ); - this.start(); - return this; -}; - -Queue.prototype.start = function() { - if ( this.running ) return; - this.running = true; - this.emit( 'start' ); - setImmediate( this.run.bind( this ) ); -} - -Queue.prototype.run = function() { - var fn; - this.running = true; - - if ( this.queue.length === 0 ) { - this.running = false; - this.emit( 'finish' ); - return; - } - - fn = this.queue.shift(); - fn( this.run.bind( this ) ); -} - -function LocalQueue( store ) { - this.store = store; - this.sent = {}; - this.queues = {}; - this.ready = false; -} - -inherits( LocalQueue, EventEmitter ); - -LocalQueue.prototype.start = function() { - var queueId; - this.ready = true; - for ( queueId in this.queues ) { - this.processQueue( queueId ); - } -} - -LocalQueue.prototype.pause = function() { - this.ready = false; -}; - -LocalQueue.prototype.acknowledge = function( change ) { - if ( this.sent[change.id] === change ) { - delete this.sent[change.id]; - } - - this.processQueue( change.id ); -} - -LocalQueue.prototype.queue = function( change ) { - var queue = this.queues[change.id]; - - if ( !queue ) { - queue = []; - this.queues[change.id] = queue; - } - - queue.push( change ); - - this.emit( 'queued', change.id, change, queue ); - - if ( !this.ready ) return; - - this.processQueue( change.id ); -}; - -LocalQueue.prototype.hasChanges = function() { - return Object.keys( this.queues ).length > 0; -}; - -LocalQueue.prototype.dequeueChangesFor = function( id ) { - var changes = [], sent = this.sent[id], queue = this.queues[id]; - - if ( sent ) { - delete this.sent[id]; - changes.push( sent ); - } - - if ( queue ) { - delete this.queues[id]; - changes = changes.concat( queue ); - } - - return changes; -}; - -LocalQueue.prototype.processQueue = function( id ) { - var queue = this.queues[id]; - var compressAndSend = this.compressAndSend.bind( this, id ); - - // there is no queue, don't do anything - if ( !queue ) return; - - // queue is empty, delete it from memory - if ( queue.length === 0 ) { - delete this.queues[id]; - return; - } - - // waiting for a previous sent change to get acknowledged - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } - - this.store.get( id ).then( compressAndSend ); -} - -LocalQueue.prototype.compressAndSend = function( id, ghost ) { - var changes = this.queues[id]; - var change; - var target = ghost.data; - var c; - var type; - - // a change was sent before we could compress and send - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } - - if ( changes.length === 1 ) { - change = changes.shift(); - this.sent[id] = change; - this.emit( 'send', change ); - return; - } - - if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { - change = changes.shift(); - changes.splice( 0, changes.length - 1 ); - this.sent[id] = change; - this.emit( 'send', change ); - } - - while ( changes.length > 0 ) { - c = changes.shift(); - - if ( c.o === change_util.type.REMOVE ) { - changes.unshift( c ); - break; - } - - target = change_util.apply( c.v, target ); - } - - type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; - change = change_util.buildChange( type, id, target, ghost ); - - this.sent[id] = change; - this.emit( 'send', change ); -} - -LocalQueue.prototype.resendSentChanges = function() { - for ( let ccid in this.sent ) { - this.emit( 'send', this.sent[ccid] ) - } -} - /** * Since revision data is basically immutable we can prevent the * need to refetch it after it has been loaded once. diff --git a/src/simperium/queues/README.md b/src/simperium/queues/README.md new file mode 100644 index 0000000..49acc89 --- /dev/null +++ b/src/simperium/queues/README.md @@ -0,0 +1,25 @@ +# Channel Queues + +These queues were originally in the Channel module. They have been extracted +and flowtyped to improve code quality and clarify the API. + +## LocalQueue + +Each `Channel` instance has a single `LocalQueue` that tracks changes that are sent +are pending to be sent to simperium. + +As bucket objects are updated, the `Channel` will reference this queue to determine +when an object should be sent. It also uses the `LocalQueue` to report if a bucket object +is currently being synced or not. + +## NetworkQueue + +Each `Channel` instance has a single `NetworkQueue`. As changes are received from simperium, +the channel will apply the changes in sequence. Together with the `LocalQueue` the channel +will be able to determine when pending changes in the `LocalQueue` have been accepted or +rejected by the server. + +## Queue + +A generic queue object used by `LocalQueue` and `RemoteQueue` that sequences tasks as +first-in-first-out execution order. diff --git a/src/simperium/queues/local-queue.js b/src/simperium/queues/local-queue.js new file mode 100644 index 0000000..c790106 --- /dev/null +++ b/src/simperium/queues/local-queue.js @@ -0,0 +1,144 @@ +// @flow +import { inherits } from 'util'; +import events from 'events'; + +const { EventEmitter } = events; + +export default function LocalQueue( store ) { + this.store = store; + this.sent = {}; + this.queues = {}; + this.ready = false; +} + +inherits( LocalQueue, EventEmitter ); + +LocalQueue.prototype.start = function() { + var queueId; + this.ready = true; + for ( queueId in this.queues ) { + this.processQueue( queueId ); + } +} + +LocalQueue.prototype.pause = function() { + this.ready = false; +}; + +LocalQueue.prototype.acknowledge = function( change ) { + if ( this.sent[change.id] === change ) { + delete this.sent[change.id]; + } + + this.processQueue( change.id ); +} + +LocalQueue.prototype.queue = function( change ) { + var queue = this.queues[change.id]; + + if ( !queue ) { + queue = []; + this.queues[change.id] = queue; + } + + queue.push( change ); + + this.emit( 'queued', change.id, change, queue ); + + if ( !this.ready ) return; + + this.processQueue( change.id ); +}; + +LocalQueue.prototype.hasChanges = function() { + return Object.keys( this.queues ).length > 0; +}; + +LocalQueue.prototype.dequeueChangesFor = function( id ) { + var changes = [], sent = this.sent[id], queue = this.queues[id]; + + if ( sent ) { + delete this.sent[id]; + changes.push( sent ); + } + + if ( queue ) { + delete this.queues[id]; + changes = changes.concat( queue ); + } + + return changes; +}; + +LocalQueue.prototype.processQueue = function( id ) { + var queue = this.queues[id]; + var compressAndSend = this.compressAndSend.bind( this, id ); + + // there is no queue, don't do anything + if ( !queue ) return; + + // queue is empty, delete it from memory + if ( queue.length === 0 ) { + delete this.queues[id]; + return; + } + + // waiting for a previous sent change to get acknowledged + if ( this.sent[id] ) { + this.emit( 'wait', id ); + return; + } + + this.store.get( id ).then( compressAndSend ); +} + +LocalQueue.prototype.compressAndSend = function( id, ghost ) { + var changes = this.queues[id]; + var change; + var target = ghost.data; + var c; + var type; + + // a change was sent before we could compress and send + if ( this.sent[id] ) { + this.emit( 'wait', id ); + return; + } + + if ( changes.length === 1 ) { + change = changes.shift(); + this.sent[id] = change; + this.emit( 'send', change ); + return; + } + + if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { + change = changes.shift(); + changes.splice( 0, changes.length - 1 ); + this.sent[id] = change; + this.emit( 'send', change ); + } + + while ( changes.length > 0 ) { + c = changes.shift(); + + if ( c.o === change_util.type.REMOVE ) { + changes.unshift( c ); + break; + } + + target = change_util.apply( c.v, target ); + } + + type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; + change = change_util.buildChange( type, id, target, ghost ); + + this.sent[id] = change; + this.emit( 'send', change ); +} + +LocalQueue.prototype.resendSentChanges = function() { + for ( let ccid in this.sent ) { + this.emit( 'send', this.sent[ccid] ) + } +} \ No newline at end of file diff --git a/src/simperium/queues/network-queue.js b/src/simperium/queues/network-queue.js new file mode 100644 index 0000000..5f37dd7 --- /dev/null +++ b/src/simperium/queues/network-queue.js @@ -0,0 +1,21 @@ +// @flow +import Queue from './queue'; + +export default function NetworkQueue() { + this.queues = {}; +} + +NetworkQueue.prototype.queueFor = function( id ) { + const queues: { [string]: ?Queue } = this.queues; + let queue: ?Queue = queues[id]; + + if ( !queue ) { + queue = new Queue(); + queue.on( 'finish', function() { + delete queues[id]; + } ); + queues[id] = queue; + } + + return queue; +}; diff --git a/src/simperium/queues/queue.js b/src/simperium/queues/queue.js new file mode 100644 index 0000000..67e0f6d --- /dev/null +++ b/src/simperium/queues/queue.js @@ -0,0 +1,40 @@ +// @flow +import events from 'events'; +import { inherits } from 'util'; + +const { EventEmitter } = events; + +export default function Queue() { + this.queue = []; + this.running = false; +} + +inherits( Queue, EventEmitter ); + +// Add a function at the end of the queue +Queue.prototype.add = function( fn ) { + this.queue.push( fn ); + this.start(); + return this; +}; + +Queue.prototype.start = function() { + if ( this.running ) return; + this.running = true; + this.emit( 'start' ); + setImmediate( this.run.bind( this ) ); +} + +Queue.prototype.run = function() { + var fn; + this.running = true; + + if ( this.queue.length === 0 ) { + this.running = false; + this.emit( 'finish' ); + return; + } + + fn = this.queue.shift(); + fn( this.run.bind( this ) ); +} From 75836b130a4d3e3d64276aab6b32bfec83f50450 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 7 Mar 2018 11:14:38 -0800 Subject: [PATCH 2/4] Uses es6 class syntax --- src/simperium/queues/local-queue.js | 213 +++++++++++++------------- src/simperium/queues/network-queue.js | 32 ++-- src/simperium/queues/queue.js | 58 +++---- 3 files changed, 153 insertions(+), 150 deletions(-) diff --git a/src/simperium/queues/local-queue.js b/src/simperium/queues/local-queue.js index c790106..ffbbf7a 100644 --- a/src/simperium/queues/local-queue.js +++ b/src/simperium/queues/local-queue.js @@ -1,144 +1,145 @@ // @flow -import { inherits } from 'util'; import events from 'events'; +import * as change_util from '../util/change'; const { EventEmitter } = events; -export default function LocalQueue( store ) { - this.store = store; - this.sent = {}; - this.queues = {}; - this.ready = false; -} - -inherits( LocalQueue, EventEmitter ); - -LocalQueue.prototype.start = function() { - var queueId; - this.ready = true; - for ( queueId in this.queues ) { - this.processQueue( queueId ); +export default class LocalQueue extends EventEmitter { + constructor( store ) { + super(); + this.store = store; + this.sent = {}; + this.queues = {}; + this.ready = false; } -} - -LocalQueue.prototype.pause = function() { - this.ready = false; -}; -LocalQueue.prototype.acknowledge = function( change ) { - if ( this.sent[change.id] === change ) { - delete this.sent[change.id]; + start() { + var queueId; + this.ready = true; + for ( queueId in this.queues ) { + this.processQueue( queueId ); + } } - this.processQueue( change.id ); -} + pause() { + this.ready = false; + }; -LocalQueue.prototype.queue = function( change ) { - var queue = this.queues[change.id]; + acknowledge( change ) { + if ( this.sent[change.id] === change ) { + delete this.sent[change.id]; + } - if ( !queue ) { - queue = []; - this.queues[change.id] = queue; + this.processQueue( change.id ); } - queue.push( change ); + queue( change ) { + var queue = this.queues[change.id]; - this.emit( 'queued', change.id, change, queue ); + if ( !queue ) { + queue = []; + this.queues[change.id] = queue; + } - if ( !this.ready ) return; + queue.push( change ); - this.processQueue( change.id ); -}; + this.emit( 'queued', change.id, change, queue ); -LocalQueue.prototype.hasChanges = function() { - return Object.keys( this.queues ).length > 0; -}; + if ( !this.ready ) return; -LocalQueue.prototype.dequeueChangesFor = function( id ) { - var changes = [], sent = this.sent[id], queue = this.queues[id]; + this.processQueue( change.id ); + }; - if ( sent ) { - delete this.sent[id]; - changes.push( sent ); - } + hasChanges() { + return Object.keys( this.queues ).length > 0; + }; - if ( queue ) { - delete this.queues[id]; - changes = changes.concat( queue ); - } + dequeueChangesFor( id ) { + var changes = [], sent = this.sent[id], queue = this.queues[id]; - return changes; -}; + if ( sent ) { + delete this.sent[id]; + changes.push( sent ); + } -LocalQueue.prototype.processQueue = function( id ) { - var queue = this.queues[id]; - var compressAndSend = this.compressAndSend.bind( this, id ); + if ( queue ) { + delete this.queues[id]; + changes = changes.concat( queue ); + } - // there is no queue, don't do anything - if ( !queue ) return; + return changes; + }; - // queue is empty, delete it from memory - if ( queue.length === 0 ) { - delete this.queues[id]; - return; - } + processQueue( id ) { + var queue = this.queues[id]; + var compressAndSend = this.compressAndSend.bind( this, id ); - // waiting for a previous sent change to get acknowledged - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } + // there is no queue, don't do anything + if ( !queue ) return; - this.store.get( id ).then( compressAndSend ); -} + // queue is empty, delete it from memory + if ( queue.length === 0 ) { + delete this.queues[id]; + return; + } -LocalQueue.prototype.compressAndSend = function( id, ghost ) { - var changes = this.queues[id]; - var change; - var target = ghost.data; - var c; - var type; - - // a change was sent before we could compress and send - if ( this.sent[id] ) { - this.emit( 'wait', id ); - return; - } + // waiting for a previous sent change to get acknowledged + if ( this.sent[id] ) { + this.emit( 'wait', id ); + return; + } - if ( changes.length === 1 ) { - change = changes.shift(); - this.sent[id] = change; - this.emit( 'send', change ); - return; + this.store.get( id ).then( compressAndSend ); } - if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { - change = changes.shift(); - changes.splice( 0, changes.length - 1 ); - this.sent[id] = change; - this.emit( 'send', change ); - } + compressAndSend( id, ghost ) { + var changes = this.queues[id]; + var change; + var target = ghost.data; + var c; + var type; + + // a change was sent before we could compress and send + if ( this.sent[id] ) { + this.emit( 'wait', id ); + return; + } - while ( changes.length > 0 ) { - c = changes.shift(); + if ( changes.length === 1 ) { + change = changes.shift(); + this.sent[id] = change; + this.emit( 'send', change ); + return; + } - if ( c.o === change_util.type.REMOVE ) { - changes.unshift( c ); - break; + if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { + change = changes.shift(); + changes.splice( 0, changes.length - 1 ); + this.sent[id] = change; + this.emit( 'send', change ); } - target = change_util.apply( c.v, target ); - } + while ( changes.length > 0 ) { + c = changes.shift(); - type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; - change = change_util.buildChange( type, id, target, ghost ); + if ( c.o === change_util.type.REMOVE ) { + changes.unshift( c ); + break; + } - this.sent[id] = change; - this.emit( 'send', change ); -} + target = change_util.apply( c.v, target ); + } + + type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; + change = change_util.buildChange( type, id, target, ghost ); -LocalQueue.prototype.resendSentChanges = function() { - for ( let ccid in this.sent ) { - this.emit( 'send', this.sent[ccid] ) + this.sent[id] = change; + this.emit( 'send', change ); } -} \ No newline at end of file + + resendSentChanges() { + for ( let ccid in this.sent ) { + this.emit( 'send', this.sent[ccid] ) + } + } +} diff --git a/src/simperium/queues/network-queue.js b/src/simperium/queues/network-queue.js index 5f37dd7..11d1fb4 100644 --- a/src/simperium/queues/network-queue.js +++ b/src/simperium/queues/network-queue.js @@ -1,21 +1,23 @@ // @flow import Queue from './queue'; -export default function NetworkQueue() { - this.queues = {}; -} +export default class NetworkQueue { + constructor() { + this.queues = {}; + } -NetworkQueue.prototype.queueFor = function( id ) { - const queues: { [string]: ?Queue } = this.queues; - let queue: ?Queue = queues[id]; + queueFor( id ) { + const queues: { [string]: ?Queue } = this.queues; + let queue: ?Queue = queues[id]; - if ( !queue ) { - queue = new Queue(); - queue.on( 'finish', function() { - delete queues[id]; - } ); - queues[id] = queue; - } + if ( !queue ) { + queue = new Queue(); + queue.on( 'finish', function() { + delete queues[id]; + } ); + queues[id] = queue; + } - return queue; -}; + return queue; + } +} diff --git a/src/simperium/queues/queue.js b/src/simperium/queues/queue.js index 67e0f6d..4484bcc 100644 --- a/src/simperium/queues/queue.js +++ b/src/simperium/queues/queue.js @@ -1,40 +1,40 @@ // @flow import events from 'events'; -import { inherits } from 'util'; const { EventEmitter } = events; -export default function Queue() { - this.queue = []; - this.running = false; -} - -inherits( Queue, EventEmitter ); +export default class Queue extends EventEmitter { + constructor() { + super(); + this.queue = []; + this.running = false; + } -// Add a function at the end of the queue -Queue.prototype.add = function( fn ) { - this.queue.push( fn ); - this.start(); - return this; -}; + // Add a function at the end of the queue + add( fn ) { + this.queue.push( fn ); + this.start(); + return this; + }; + + start() { + if ( this.running ) return; + this.running = true; + this.emit( 'start' ); + setImmediate( this.run.bind( this ) ); + } -Queue.prototype.start = function() { - if ( this.running ) return; - this.running = true; - this.emit( 'start' ); - setImmediate( this.run.bind( this ) ); -} + run() { + var fn; + this.running = true; -Queue.prototype.run = function() { - var fn; - this.running = true; + if ( this.queue.length === 0 ) { + this.running = false; + this.emit( 'finish' ); + return; + } - if ( this.queue.length === 0 ) { - this.running = false; - this.emit( 'finish' ); - return; + fn = this.queue.shift(); + fn( this.run.bind( this ) ); } - - fn = this.queue.shift(); - fn( this.run.bind( this ) ); } From 100db2496c2138a8e2f454fd4d6f0a40c2903d6b Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 7 Mar 2018 13:30:12 -0800 Subject: [PATCH 3/4] 100% flowtyped and docblocks --- src/simperium/channel.js | 80 ------------ src/simperium/queues/local-queue.js | 178 ++++++++++++++++++++++---- src/simperium/queues/network-queue.js | 22 +++- src/simperium/queues/queue.js | 29 ++++- 4 files changed, 190 insertions(+), 119 deletions(-) diff --git a/src/simperium/channel.js b/src/simperium/channel.js index 7060bc2..4b01468 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -264,86 +264,6 @@ internal.indexingComplete = function() { this.emit( 'ready' ) } -/** - * A ghost represents a version of a bucket object as known by Simperium - * - * Generally a client will keep the last known ghost stored locally for efficient - * diffing and patching of Simperium change operations. - * - * @typedef {Object} Ghost - * @property {Number} version - the ghost's version - * @property {String} key - the simperium bucket object id this ghost is for - * @property {Object} data - the data for the given ghost version - */ - -/** - * Callback function used by the ghost store to iterate over existing ghosts - * - * @callback ghostIterator - * @param {Ghost} - the current ghost - */ - -/** - * A GhostStore provides the store mechanism for ghost data that the Channel - * uses to maintain syncing state and producing change operations for - * Bucket objects. - * - * @interface GhostStore - */ - -/** - * Retrieve a Ghost for the given bucket object id - * - * @function - * @name GhostStore#get - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Save a ghost in the store. - * - * @function - * @name GhostStore#put - * @param {String} id - bucket object id - * @param {Number} version - version of ghost data - * @param {Object} data - object literal to save as this ghost's data for this version - * @returns {Promise} - the ghost for this object - */ - -/** - * Delete a Ghost from the store. - * - * @function - * @name GhostStore#remove - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Iterate over existing Ghost objects with the given callback. - * - * @function - * @name GhostStore#eachGhost - * @param {ghostIterator} - function to run against each ghost - */ - -/** - * Get the current change version (cv) that this channel has synced. - * - * @function - * @name GhostStore#getChangeVersion - * @returns {Promise} - the current change version for the bucket - */ - -/** - * Set the current change version. - * - * @function - * @name GhostStore#setChangeVersion - * @returns {Promise} - resolves once the change version is saved - */ - /** * Maintains syncing state for a Simperium bucket. * diff --git a/src/simperium/queues/local-queue.js b/src/simperium/queues/local-queue.js index ffbbf7a..59eaf2b 100644 --- a/src/simperium/queues/local-queue.js +++ b/src/simperium/queues/local-queue.js @@ -1,11 +1,104 @@ // @flow import events from 'events'; import * as change_util from '../util/change'; +import type { BucketOperation } from '../util/change'; const { EventEmitter } = events; +/** + * A ghost represents a version of a bucket object as known by Simperium + * + * Generally a client will keep the last known ghost stored locally for efficient + * diffing and patching of Simperium change operations. + * + * @typedef {Object} Ghost + * @property {Number} version - the ghost's version + * @property {String} key - the simperium bucket object id this ghost is for + * @property {Object} data - the data for the given ghost version + */ +type Ghost = { version: number, key: string, data: {} }; + +/** + * A GhostStore provides the store mechanism for ghost data that the Channel + * uses to maintain syncing state and producing change operations for + * Bucket objects. + * + * @interface GhostStore + */ +interface GhostStore { + + /** + * Callback function used by the ghost store to iterate over existing ghosts + * + * @callback ghostIterator + * @param {Ghost} - the current ghost + */ + + /** + * Retrieve a Ghost for the given bucket object id + * + * @function + * @name GhostStore#get + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + get( id: string ): Promise; + + /** + * Save a ghost in the store. + * + * @function + * @name GhostStore#put + * @param {String} id - bucket object id + * @param {Number} version - version of ghost data + * @param {Object} data - object literal to save as this ghost's data for this version + * @returns {Promise} - the ghost for this object + */ + + /** + * Delete a Ghost from the store. + * + * @function + * @name GhostStore#remove + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + + /** + * Iterate over existing Ghost objects with the given callback. + * + * @function + * @name GhostStore#eachGhost + * @param {ghostIterator} - function to run against each ghost + */ + + /** + * Get the current change version (cv) that this channel has synced. + * + * @function + * @name GhostStore#getChangeVersion + * @returns {Promise} - the current change version for the bucket + */ + + /** + * Set the current change version. + * + * @function + * @name GhostStore#setChangeVersion + * @returns {Promise} - resolves once the change version is saved + */ +} + export default class LocalQueue extends EventEmitter { - constructor( store ) { + store: GhostStore; + sent: { [objectId: string]: BucketOperation }; + queues: { [objectId: string]: BucketOperation[] }; + ready: boolean + + /* + * @param {GhostStore} store - the ghost store for retrieving ghost data + */ + constructor( store: GhostStore ) { super(); this.store = store; this.sent = {}; @@ -13,19 +106,33 @@ export default class LocalQueue extends EventEmitter { this.ready = false; } + /* + * Start processing any local changes + */ start() { - var queueId; this.ready = true; - for ( queueId in this.queues ) { + for ( const queueId in this.queues ) { this.processQueue( queueId ); } } + /** + * Pause execution of local changes. No local changes will be sent to + * simperium until .start is called. + */ pause() { this.ready = false; }; - acknowledge( change ) { + /** + * When a change is acknowledged and it matches the sent change for the + * given bucket operation clear it from the sent queue. + * + * Any pending changes for the bucket object will then be sent. + * + * @param {BucketOperation} change - the operation that is being acknowledged + */ + acknowledge( change: BucketOperation ) { if ( this.sent[change.id] === change ) { delete this.sent[change.id]; } @@ -33,8 +140,15 @@ export default class LocalQueue extends EventEmitter { this.processQueue( change.id ); } - queue( change ) { - var queue = this.queues[change.id]; + /** + * Queues a on operation that will modify a bucket object on simperium.com. If the + * local queue has been started the queue for the corresponding bucket object will + * be processed and the next change will be sent. + * + * @param {BucketOperation} change - the bucket operation to send to simperium + */ + queue( change: BucketOperation ) { + let queue = this.queues[change.id]; if ( !queue ) { queue = []; @@ -50,11 +164,17 @@ export default class LocalQueue extends EventEmitter { this.processQueue( change.id ); }; + /** + * Reports if there are any changes pending for this channel. + * + * @returns {boolean} true if there are any pending changes + */ hasChanges() { - return Object.keys( this.queues ).length > 0; + return Object.keys( this.queues ).length > 0 || + Object.keys( this.sent ).length > 0; }; - dequeueChangesFor( id ) { + dequeueChangesFor( id: string ) { var changes = [], sent = this.sent[id], queue = this.queues[id]; if ( sent ) { @@ -70,9 +190,8 @@ export default class LocalQueue extends EventEmitter { return changes; }; - processQueue( id ) { - var queue = this.queues[id]; - var compressAndSend = this.compressAndSend.bind( this, id ); + processQueue( id: string ) { + const queue = this.queues[id]; // there is no queue, don't do anything if ( !queue ) return; @@ -89,15 +208,15 @@ export default class LocalQueue extends EventEmitter { return; } - this.store.get( id ).then( compressAndSend ); + this.store.get( id ).then( ghost => { + this.compressAndSend( id, ghost ); + } ); } - compressAndSend( id, ghost ) { - var changes = this.queues[id]; - var change; - var target = ghost.data; - var c; - var type; + compressAndSend( id: string, ghost: Ghost ) { + const changes = this.queues[id]; + // the starting point of any changes will be the ghost's current data + let modifiedObject = ghost.data; // a change was sent before we could compress and send if ( this.sent[id] ) { @@ -105,33 +224,40 @@ export default class LocalQueue extends EventEmitter { return; } + // there is a single change, remove it from the bucket + // objects pending queue and send it if ( changes.length === 1 ) { - change = changes.shift(); + const change = changes.shift(); this.sent[id] = change; this.emit( 'send', change ); return; } + // there is more than one change but if the firest change is a delete type + // then the following local changes can be discarded if ( changes.length > 1 && changes[0].type === change_util.type.REMOVE ) { - change = changes.shift(); + const change = changes.shift(); changes.splice( 0, changes.length - 1 ); this.sent[id] = change; this.emit( 'send', change ); + return; } while ( changes.length > 0 ) { - c = changes.shift(); + const change = changes.shift(); - if ( c.o === change_util.type.REMOVE ) { - changes.unshift( c ); + if ( change.o === '-' ) { + changes.unshift( change ); break; } - target = change_util.apply( c.v, target ); + if ( change.o === 'M' ) { + modifiedObject = change_util.apply( change.v, modifiedObject ); + } } - type = target === null ? change_util.type.REMOVE : change_util.type.MODIFY; - change = change_util.buildChange( type, id, target, ghost ); + const type = modifiedObject === null ? change_util.type.REMOVE : change_util.type.MODIFY; + const change = change_util.buildChange( type, id, modifiedObject, ghost ); this.sent[id] = change; this.emit( 'send', change ); diff --git a/src/simperium/queues/network-queue.js b/src/simperium/queues/network-queue.js index 11d1fb4..09cbb12 100644 --- a/src/simperium/queues/network-queue.js +++ b/src/simperium/queues/network-queue.js @@ -1,21 +1,31 @@ // @flow import Queue from './queue'; +/** + * Stores a mapping of Queue objects to bucket object ids + */ export default class NetworkQueue { + queues: { [bucketObjectID: string]: ?Queue }; + constructor() { this.queues = {}; } - queueFor( id ) { - const queues: { [string]: ?Queue } = this.queues; - let queue: ?Queue = queues[id]; + /** + * Retrieve the queue for the given bucket object id + * + * @param {string} id - the bucket object id to retrieve the queue for + * @return {Queue} the queue for the giver bucket object, creates a new queue if none exists + */ + queueFor( id: string ) { + let queue: ?Queue = this.queues[id]; if ( !queue ) { queue = new Queue(); - queue.on( 'finish', function() { - delete queues[id]; + queue.on( 'finish', () => { + delete this.queues[id]; } ); - queues[id] = queue; + this.queues[id] = queue; } return queue; diff --git a/src/simperium/queues/queue.js b/src/simperium/queues/queue.js index 4484bcc..5dc380a 100644 --- a/src/simperium/queues/queue.js +++ b/src/simperium/queues/queue.js @@ -3,20 +3,31 @@ import events from 'events'; const { EventEmitter } = events; +type Task = ( onComplete: () => void ) => void + export default class Queue extends EventEmitter { + queue: Task[]; + running: boolean constructor() { super(); this.queue = []; this.running = false; } - - // Add a function at the end of the queue - add( fn ) { - this.queue.push( fn ); + /** + * Add a task to the queue. THe queue will start if it has not been started + * @param {Task} task - the task to execute + * @returns {Queue} the queue instance for chaining + */ + add( task: Task ) { + this.queue.push( task ); this.start(); return this; }; + /** + * Begins processing tasks if the queue is not already running + * @emits 'start' + */ start() { if ( this.running ) return; this.running = true; @@ -24,8 +35,12 @@ export default class Queue extends EventEmitter { setImmediate( this.run.bind( this ) ); } + /** + * Runs the next action on the queue + * @emits finish - when all tasks are completed + * @private + */ run() { - var fn; this.running = true; if ( this.queue.length === 0 ) { @@ -34,7 +49,7 @@ export default class Queue extends EventEmitter { return; } - fn = this.queue.shift(); - fn( this.run.bind( this ) ); + const task = this.queue.shift(); + task( () => this.run() ); } } From 6bac261b84b9253ef9f6f4104ceeee31c93a98c7 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 7 Mar 2018 13:44:15 -0800 Subject: [PATCH 4/4] adds more docblocks --- src/simperium/queues/local-queue.js | 32 +++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/simperium/queues/local-queue.js b/src/simperium/queues/local-queue.js index 59eaf2b..b819400 100644 --- a/src/simperium/queues/local-queue.js +++ b/src/simperium/queues/local-queue.js @@ -91,8 +91,8 @@ interface GhostStore { export default class LocalQueue extends EventEmitter { store: GhostStore; - sent: { [objectId: string]: BucketOperation }; - queues: { [objectId: string]: BucketOperation[] }; + sent: { [objectId: string]: ?BucketOperation }; + queues: { [objectId: string]: ?BucketOperation[] }; ready: boolean /* @@ -174,12 +174,19 @@ export default class LocalQueue extends EventEmitter { Object.keys( this.sent ).length > 0; }; + /** + * Removes pending and sent changes from the queue and returns them + * @param {string} id - bucket object id + * @returns {BucketOperation[]} list of changes removed from the queue + */ dequeueChangesFor( id: string ) { - var changes = [], sent = this.sent[id], queue = this.queues[id]; + let changes: BucketOperation[] = []; + const sent = this.sent[id]; + const queue = this.queues[id]; if ( sent ) { delete this.sent[id]; - changes.push( sent ); + changes = changes.concat( sent ); } if ( queue ) { @@ -190,6 +197,11 @@ export default class LocalQueue extends EventEmitter { return changes; }; + /** + * Send the changes queued for the given bucket object + * @emits wait + * @param {string} id - bucket object id + */ processQueue( id: string ) { const queue = this.queues[id]; @@ -213,6 +225,12 @@ export default class LocalQueue extends EventEmitter { } ); } + /** + * Sends queued changes if any to simperium for the given bucket object id + * + * @param {string} id - bucket object id to send changes for + * @param {*} ghost - the current ghost for the given bucket object + */ compressAndSend( id: string, ghost: Ghost ) { const changes = this.queues[id]; // the starting point of any changes will be the ghost's current data @@ -223,6 +241,9 @@ export default class LocalQueue extends EventEmitter { this.emit( 'wait', id ); return; } + if ( !changes ) { + return; + } // there is a single change, remove it from the bucket // objects pending queue and send it @@ -263,6 +284,9 @@ export default class LocalQueue extends EventEmitter { this.emit( 'send', change ); } + /** + * Retries sending any previously sent changes that have not been acknowledged by simperium + */ resendSentChanges() { for ( let ccid in this.sent ) { this.emit( 'send', this.sent[ccid] )