diff --git a/README.md b/README.md index 2038dcc..5192828 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # hypercore-streams -External implementation of a WriteStream and ReadStream for Hypercore +Implementation of WriteStream and ReadStream for [hypercore](https://github.com/hypercore-protocol/hypercore). ``` npm install hypercore-streams @@ -11,12 +11,31 @@ npm install hypercore-streams ``` js const { WriteStream, ReadStream } = require('hypercore-streams') -const ws = new WriteStream(feed) -const rs = new ReadStream(feed, { - start: 0, - live: true, - valueEncoding: 'json' -}) +const ws = new WriteStream(feed, [options]) +const rs = new ReadStream(feed, [options]) +``` + +Options for `WriteStream`: + +```js +{ + maxBlockSize: Infinity // set this to auto chunk individual blocks if they are larger than this number +} +``` + +Options for `ReadStream`: + +```js +{ + start: 0, // read from this index + end: feed.length, // read until this index + snapshot: true, // if set to false it will update `end` to `feed.length` on every read + tail: false, // sets `start` to `feed.length` + live: false, // set to true to keep reading forever + timeout: 0, // timeout for each data event (0 means no timeout) + wait: true, // wait for data to be downloaded + batch: 1 // amount of messages to read in batch, increasing it (e.g. 100) can improve the performance reading +} ``` ## License diff --git a/index.js b/index.js index b33e99f..2aceb0c 100644 --- a/index.js +++ b/index.js @@ -42,6 +42,7 @@ class ReadStream extends Readable { this.tail = !!opts.tail this.index = this.start this.options = { wait: opts.wait !== false, ifAvailable: !!opts.ifAvailable, valueEncoding: opts.valueEncoding } + this.batch = opts.batch || 1 } _open (cb) { @@ -63,6 +64,21 @@ class ReadStream extends Readable { this.push(null) return cb(null) } + if (this.batch > 1) { + const batchStart = this.index + const batchEnd = Math.min(batchStart + this.batch, this.end, this.feed.length) + if (batchStart < batchEnd) { + this.index = batchEnd + this.feed.getBatch(batchStart, batchEnd, this.options, (err, blocks) => { + if (err) return cb(err) + for (const block of blocks) { + this.push(block) + } + cb(null) + }) + return + } + } this.feed.get(this.index++, this.options, (err, block) => { if (err) return cb(err) this.push(block) diff --git a/package.json b/package.json index 2ea7c29..fb1d14c 100644 --- a/package.json +++ b/package.json @@ -10,19 +10,20 @@ "hypercore": "^9.2.1", "random-access-memory": "^3.1.1", "standard": "^14.3.4", + "stream-collector": "^1.0.1", "tape": "^5.0.1" }, "scripts": { - "test": "standard && tape test.js" + "test": "standard && tape test/index.js" }, "repository": { "type": "git", - "url": "https://github.com/mafintosh/hypercore-streams.git" + "url": "https://github.com/hypercore-protocol/hypercore-streams.git" }, "author": "Mathias Buus (@mafintosh)", "license": "MIT", "bugs": { - "url": "https://github.com/mafintosh/hypercore-streams/issues" + "url": "https://github.com/hypercore-protocol/hypercore-streams/issues" }, - "homepage": "https://github.com/mafintosh/hypercore-streams" + "homepage": "https://github.com/hypercore-protocol/hypercore-streams" } diff --git a/test.js b/test.js deleted file mode 100644 index c1bf4e0..0000000 --- a/test.js +++ /dev/null @@ -1,109 +0,0 @@ -const tape = require('tape') -const { WriteStream, ReadStream } = require('./') -const ram = require('random-access-memory') -const hypercore = require('hypercore') - -tape('basic readstream', function (t) { - const feed = hypercore(ram) - - feed.append(['a', 'b', 'c'], function () { - const rs = new ReadStream(feed) - const expected = ['a', 'b', 'c'] - - rs.on('data', function (data) { - t.same(data, Buffer.from(expected.shift())) - }) - rs.on('end', function () { - t.end() - }) - }) -}) - -tape('tail reading stream', function (t) { - const feed = hypercore(ram) - t.plan(2) - - feed.append(['a', 'b', 'c'], function () { - const rs = new ReadStream(feed, { tail: true, live: true }) - const expected = ['d', 'e'] - - rs.on('data', function (data) { - t.same(data, Buffer.from(expected.shift())) - }) - - feed.ready(function () { - feed.append(['d', 'e']) - }) - - rs.on('end', function () { - t.fail('should not end') - }) - }) -}) - -tape('live readstream', function (t) { - t.plan(2) - - const feed = hypercore(ram) - - feed.append(['a', 'b', 'c'], function () { - const rs = new ReadStream(feed, { start: 1, live: true }) - const expected = ['b', 'c'] - - rs.on('data', function (data) { - t.same(data, Buffer.from(expected.shift())) - }) - rs.on('end', function () { - t.fail('should not end') - }) - }) -}) - -tape('basic writestream', function (t) { - t.plan(1 + 2 * 3) - - const feed = hypercore(ram) - - const ws = new WriteStream(feed) - - ws.write(Buffer.from('a')) - ws.write(Buffer.from('b')) - ws.write(Buffer.from('c')) - - ws.end() - - ws.on('finish', function () { - t.same(feed.length, 3) - - feed.get(0, function (err, buf) { - t.error(err, 'no error') - t.same(buf, Buffer.from('a')) - }) - - feed.get(1, function (err, buf) { - t.error(err, 'no error') - t.same(buf, Buffer.from('b')) - }) - - feed.get(2, function (err, buf) { - t.error(err, 'no error') - t.same(buf, Buffer.from('c')) - }) - }) -}) - -tape('valueEncoding test', function (t) { - const feed = hypercore(ram, { valueEncoding: 'json' }) - - feed.append(['a', 'b', 'c'], function () { - const rs = new ReadStream(feed, { valueEncoding: 'buffer' }) - const expected = ['a', 'b', 'c'] - - rs.on('data', function (data) { - t.same(data, Buffer.from('"' + expected.shift() + '"\n')) - }) - rs.on('end', function () { - t.end() - }) - }) -}) diff --git a/test/helpers/create-tracking-ram.js b/test/helpers/create-tracking-ram.js new file mode 100644 index 0000000..70fdbe8 --- /dev/null +++ b/test/helpers/create-tracking-ram.js @@ -0,0 +1,27 @@ +const ram = require('random-access-memory') + +module.exports = function () { + const logByFilename = {} + const factory = function (filename) { + const memory = ram() + const log = [] + logByFilename[filename] = log + return { + read: logAndForward('read'), + write: logAndForward('write'), + del: logAndForward('del') + } + + function logAndForward (op) { + return function () { + var statement = {} + statement[op] = [].slice.apply(arguments) + statement[op].pop() + log.push(statement) + return memory[op].apply(memory, arguments) + } + } + } + factory.log = logByFilename + return factory +} diff --git a/test/index.js b/test/index.js new file mode 100644 index 0000000..4fce284 --- /dev/null +++ b/test/index.js @@ -0,0 +1,439 @@ +const tape = require('tape') +const { WriteStream, ReadStream } = require('..') +const ram = require('random-access-memory') +const collect = require('stream-collector') +const hypercore = require('hypercore') +const createTrackingRam = require('./helpers/create-tracking-ram') + +const create = (key, opts) => hypercore(ram, key, opts) + +tape('basic readstream', function (t) { + const feed = create() + + feed.append(['a', 'b', 'c'], function () { + const rs = new ReadStream(feed) + const expected = ['a', 'b', 'c'] + + rs.on('data', function (data) { + t.same(data, Buffer.from(expected.shift())) + }) + rs.on('end', function () { + t.end() + }) + }) +}) + +tape('tail reading stream', function (t) { + const feed = create() + t.plan(2) + + feed.append(['a', 'b', 'c'], function () { + const rs = new ReadStream(feed, { tail: true, live: true }) + const expected = ['d', 'e'] + + rs.on('data', function (data) { + t.same(data, Buffer.from(expected.shift())) + }) + + feed.ready(function () { + feed.append(['d', 'e']) + }) + + rs.on('end', function () { + t.fail('should not end') + }) + }) +}) + +tape('live readstream', function (t) { + t.plan(2) + + const feed = create() + + feed.append(['a', 'b', 'c'], function () { + const rs = new ReadStream(feed, { start: 1, live: true }) + const expected = ['b', 'c'] + + rs.on('data', function (data) { + t.same(data, Buffer.from(expected.shift())) + }) + rs.on('end', function () { + t.fail('should not end') + }) + }) +}) + +tape('basic writestream', function (t) { + t.plan(1 + 2 * 3) + + const feed = create() + + const ws = new WriteStream(feed) + + ws.write(Buffer.from('a')) + ws.write(Buffer.from('b')) + ws.write(Buffer.from('c')) + + ws.end() + + ws.on('finish', function () { + t.same(feed.length, 3) + + feed.get(0, function (err, buf) { + t.error(err, 'no error') + t.same(buf, Buffer.from('a')) + }) + + feed.get(1, function (err, buf) { + t.error(err, 'no error') + t.same(buf, Buffer.from('b')) + }) + + feed.get(2, function (err, buf) { + t.error(err, 'no error') + t.same(buf, Buffer.from('c')) + }) + }) +}) + +tape('valueEncoding test', function (t) { + const feed = create({ valueEncoding: 'json' }) + + feed.append(['a', 'b', 'c'], function () { + const rs = new ReadStream(feed, { valueEncoding: 'buffer' }) + const expected = ['a', 'b', 'c'] + + rs.on('data', function (data) { + t.same(data, Buffer.from('"' + expected.shift() + '"\n')) + }) + rs.on('end', function () { + t.end() + }) + }) +}) + +tape('append and WriteStreams preserve seq', function (t) { + const feed = create() + + const ws = new WriteStream(feed) + + ws.write('a') + ws.write('b') + ws.write('c') + ws.end(function () { + t.same(feed.length, 3) + feed.append('d', function (err, seq) { + t.error(err) + t.same(seq, 3) + t.same(feed.length, 4) + + const ws1 = new WriteStream(feed) + + ws1.write('e') + ws1.write('f') + ws1.end(function () { + feed.append('g', function (err, seq) { + t.error(err) + t.same(seq, 6) + t.same(feed.length, 7) + t.end() + }) + }) + }) + }) +}) + +tape('close calls pending callbacks', function (t) { + t.plan(5) + + const feed = create() + + ;(new ReadStream(feed, { live: true })) + .once('error', function (err) { + t.ok(err, 'read stream errors') + }) + .resume() + + feed.get(0, function (err) { + t.ok(err, 'get errors') + }) + + feed.once('close', function () { + t.pass('close emitted') + }) + + feed.ready(function () { + feed.close(function () { + (new ReadStream(feed, { live: true })) + .once('error', function (err) { + t.ok(err, 'read stream still errors') + }) + .resume() + + feed.get(0, function (err) { + t.ok(err, 'get still errors') + }) + }) + }) +}) + +tape('closing all streams on close', function (t) { + var memories = {} + var feed = hypercore(function (filename) { + var memory = memories[filename] + if (!memory) { + memory = ram() + memories[filename] = memory + } + return memory + }) + var expectedFiles = ['key', 'secret_key', 'tree', 'data', 'bitfield', 'signatures'] + feed.ready(function () { + t.deepEquals(Object.keys(memories), expectedFiles, 'all files are open') + feed.close(function () { + expectedFiles.forEach(function (filename) { + var memory = memories[filename] + t.ok(memory.closed, filename + ' is closed') + }) + t.end() + }) + }) +}) + +tape('writes are batched', function (t) { + var trackingRam = createTrackingRam() + var feed = hypercore(trackingRam) + var ws = new WriteStream(feed) + + ws.write('ab') + ws.write('cd') + setImmediate(function () { + ws.write('ef') + ws.write('gh') + ws.end(function () { + t.deepEquals(trackingRam.log.data, [ + { write: [0, Buffer.from('abcd')] }, + { write: [4, Buffer.from('efgh')] } + ]) + feed.close(function () { + t.end() + }) + }) + }) +}) + +tape('reads are batched', function (t) { + const feed = hypercore(ram) + const _getBatch = feed.getBatch + const batchCalls = [] + feed.getBatch = function (start, end, opts) { + batchCalls.push({ start, end, opts }) + return _getBatch.apply(feed, arguments) + } + + feed.append(['a', 'b', 'c'], function () { + const rs = new ReadStream(feed, { batch: 2 }) + const expected = ['a', 'b', 'c'] + + rs.on('data', function (data) { + t.same(data, Buffer.from(expected.shift())) + }) + rs.on('end', function () { + t.deepEquals(batchCalls, [ + { start: 0, end: 2, opts: { wait: true, ifAvailable: false, valueEncoding: undefined } }, + { start: 2, end: 3, opts: { wait: true, ifAvailable: false, valueEncoding: undefined } } + ]) + t.end() + }) + }) +}) + +tape('value encoding read-stream', function (t) { + const feed = hypercore(ram, { valueEncoding: 'json' }) + + feed.append({ hello: 'world' }, function () { + (new ReadStream(feed)) + .on('data', function (data) { + t.same(data, { hello: 'world' }) + }) + .on('end', function () { + (new ReadStream(feed, { valueEncoding: 'utf-8' })) + .on('data', function (data) { + t.same(data, '{"hello":"world"}\n') + t.end() + }) + }) + }) +}) + +tape('value encoding write-stream', function (t) { + const feed = hypercore(ram, { valueEncoding: 'json' }) + + const ws = new WriteStream(feed) + ws.write([1, 2, 3]) + ws.end(function () { + feed.get(0, function (err, val) { + t.error(err, 'no error') + t.same(val, [1, 2, 3]) + t.end() + }) + }) +}) + +function test (batch = 1) { + tape('ReadStream to WriteStream', function (t) { + var feed1 = create() + var feed2 = create() + + feed1.append(['hello', 'world'], function () { + var r = new ReadStream(feed1, { batch }) + var w = new WriteStream(feed2) + + r.pipe(w).on('finish', function () { + collect(new ReadStream(feed2, { batch }), function (err, data) { + t.error(err, 'no error') + t.same(data, [Buffer.from('hello'), Buffer.from('world')]) + t.end() + }) + }) + }) + }) + + tape('ReadStream with start, end', function (t) { + var feed = create({ valueEncoding: 'utf-8' }) + + feed.append(['hello', 'multiple', 'worlds'], function () { + collect(new ReadStream(feed, { start: 1, end: 2, batch }), function (err, data) { + t.error(err, 'no error') + t.same(data, ['multiple']) + t.end() + }) + }) + }) + + tape('ReadStream with start, no end', function (t) { + var feed = create({ valueEncoding: 'utf-8' }) + + feed.append(['hello', 'multiple', 'worlds'], function () { + collect(new ReadStream(feed, { start: 1, batch }), function (err, data) { + t.error(err, 'no error') + t.same(data, ['multiple', 'worlds']) + t.end() + }) + }) + }) + + tape('ReadStream with no start, end', function (t) { + var feed = create({ valueEncoding: 'utf-8' }) + + feed.append(['hello', 'multiple', 'worlds'], function () { + collect(new ReadStream(feed, { end: 2, batch }), function (err, data) { + t.error(err, 'no error') + t.same(data, ['hello', 'multiple']) + t.end() + }) + }) + }) + + tape('ReadStream with live: true', function (t) { + var feed = create({ valueEncoding: 'utf-8' }) + var expected = ['a', 'b', 'c', 'd', 'e'] + + t.plan(expected.length) + + var rs = new ReadStream(feed, { live: true, batch }) + + rs.on('data', function (data) { + t.same(data, expected.shift()) + }) + + rs.on('end', function () { + t.fail('should never end') + }) + + feed.append('a', function () { + feed.append('b', function () { + feed.append(['c', 'd', 'e']) + }) + }) + }) + + tape('ReadStream with live: true after append', function (t) { + var feed = create({ valueEncoding: 'utf-8' }) + var expected = ['a', 'b', 'c', 'd', 'e'] + + t.plan(expected.length) + + feed.append(['a', 'b'], function () { + var rs = new ReadStream(feed, { live: true, batch }) + + rs.on('data', function (data) { + t.same(data, expected.shift()) + }) + + rs.on('end', function () { + t.fail('should never end') + }) + + feed.append(['c', 'd', 'e']) + }) + }) + + tape('ReadStream with live: true and tail: true', function (t) { + var feed = create({ valueEncoding: 'utf-8' }) + var expected = ['c', 'd', 'e'] + + t.plan(expected.length) + + feed.append(['a', 'b'], function () { + var rs = new ReadStream(feed, { live: true, tail: true, batch }) + + rs.on('data', function (data) { + t.same(data, expected.shift()) + }) + + rs.on('end', function () { + t.fail('should never end') + }) + + setImmediate(function () { + feed.append(['c', 'd', 'e']) + }) + }) + }) +} + +tape('WriteStream with maxBlockSize', function (t) { + t.plan(11 * 2 + 1) + + var feed = create() + + var ws = new WriteStream(feed, { maxBlockSize: 100 * 1024 }) + + ws.write(Buffer.alloc(1024 * 1024)) + ws.end(function () { + t.same(feed.length, 11) + + sameSize(0, 100 * 1024) + sameSize(1, 100 * 1024) + sameSize(2, 100 * 1024) + sameSize(3, 100 * 1024) + sameSize(4, 100 * 1024) + sameSize(5, 100 * 1024) + sameSize(6, 100 * 1024) + sameSize(7, 100 * 1024) + sameSize(8, 100 * 1024) + sameSize(9, 100 * 1024) + sameSize(10, 1024 * 1024 - 10 * 100 * 1024) + + function sameSize (idx, size) { + feed.get(idx, function (err, blk) { + t.error(err, 'no error') + t.same(blk.length, size) + }) + } + }) +}) + +test() +test(10)