diff --git a/package.json b/package.json index 66e91d9..d4537f1 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "lerna": "^2.9.1", "mocha": "^5.0.5", "most": "^1.7.3", + "@most/create": "^2.0.1", "nyc": "^11.6.0", "typescript": "^2.8.1" }, diff --git a/packages/most-buffer/src/buffer-sink.js b/packages/most-buffer/src/buffer-sink.js index 55f8fc0..d9f4509 100644 --- a/packages/most-buffer/src/buffer-sink.js +++ b/packages/most-buffer/src/buffer-sink.js @@ -1,7 +1,8 @@ class BufferSink { - constructor(count, sink) { + constructor(count, sink, flushAfter) { this.active = true; this.sink = sink; + this.flushAfter = flushAfter; this.count = count; this.buffer = count === undefined ? [] : new Array(count); this.length = 0; @@ -15,6 +16,18 @@ class BufferSink { // Buffering the new value this.buffer[this.length++] = value; + if (this.flushAfter) { + // make sure we flush this if some time has passed and count is not reached + this.flushTimeout && clearTimeout(this.flushTimeout); + + setTimeout(() => { + if (this.length > 0) { + this.sink.event(time, value); + this.length = 0; + } + }, this.flushAfter); + } + // If the buffer has a limit and is full, let's emit it if (this.length === this.count) { const value = this.buffer; diff --git a/packages/most-buffer/src/index.js b/packages/most-buffer/src/index.js index f55231c..ad46e02 100644 --- a/packages/most-buffer/src/index.js +++ b/packages/most-buffer/src/index.js @@ -1,8 +1,8 @@ const BufferSink = require('./buffer-sink.js'); -function buffer(count = undefined) { +function buffer(count = undefined, flushAfter) { return (stream) => new stream.constructor({ - run: (sink, scheduler) => stream.source.run(new BufferSink(count, sink), scheduler) + run: (sink, scheduler) => stream.source.run(new BufferSink(count, sink, flushAfter), scheduler) }); } diff --git a/packages/most-buffer/test/test-buffer.js b/packages/most-buffer/test/test-buffer.js index 2594b23..42448f1 100644 --- a/packages/most-buffer/test/test-buffer.js +++ b/packages/most-buffer/test/test-buffer.js @@ -2,6 +2,7 @@ const { expect } = require('chai'); const buffer = require('../src'); const BufferSink = require('../src/buffer-sink'); const most = require('most'); +const { create } = require('@most/create'); describe('buffer', function() { it('can group stream events 10 by 10', function() { @@ -54,4 +55,25 @@ describe('buffer', function() { sink.end(time + 1); sink.event(time + 2); }); + it('should flush after the given flush period even if the buffer is not full', function(done) { + const stream$ = create((add, end) => { + setTimeout(() => add('data'), 10); + setTimeout(() => add('data'), 20); + setTimeout(() => add('data'), 50); + setTimeout(() => { + add('data'); + end(); + }, 100); + }); + + return stream$ + .take(10) + .thru(buffer(5, 700)) + .subscribe({ + next: (data) => { + expect(data.length < 5).to.be.true; + }, + complete: () => done() + }); + }); });