From 40bdbd43b65455b257e050fe9039b4dbc5be4fe3 Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Tue, 29 Apr 2025 14:02:36 +0100 Subject: [PATCH 1/7] Add socket stream --- packages/polling/src/socketstream.ts | 115 +++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 packages/polling/src/socketstream.ts diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts new file mode 100644 index 000000000..c5a7b5bc0 --- /dev/null +++ b/packages/polling/src/socketstream.ts @@ -0,0 +1,115 @@ +// Copyright (c) Jupyter Development Team. +// Distributed under the terms of the Modified BSD License. + +import { IDisposable } from '@lumino/disposable'; + +import { Signal, Stream } from '@lumino/signaling'; + +import { Poll } from './poll'; + +/** + * A utility class to wrap and augment a web socket. A socket stream emits web + * socket messages as an async iterable and also as a Lumino signal. It uses + * an internal poll instance to manage reconnection logic automatically. + * + * @typeparam T - The type of the stream owner (i.e., the `sender` of a signal). + * + * @typeparam U - The type of the socket stream's emissions. + */ +export class SocketStream extends Stream implements IDisposable { + /** + * Construct a new socket stream. + * + * @param sender - The sender which owns the stream. + * + * @param options = The socket stream instantiation options. + */ + constructor( + sender: T, + protected readonly options: SocketStream.IOptions + ) { + super(sender); + this.subscription = new Poll({ factory: () => this.subscribe() }); + } + + /** + * Whether the stream is disposed. + */ + get isDisposed() { + return this.subscription.isDisposed; + } + + /** + * Dispose the stream. + */ + dispose() { + super.stop(); + this.subscription.dispose(); + const { socket } = this; + if (socket) { + this.socket = null; + socket.onclose = () => undefined; + socket.onerror = () => undefined; + socket.onmessage = () => undefined; + socket.onopen = () => undefined; + socket.close(); + } + Signal.clearData(this); + } + + /** + * Send a message to the underlying web socket. + * + * @param data - The payload of the message sent via the web socket. + */ + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + this.socket?.send(data); + } + + /** + * The current active socket. This value is updated by the `subscribe` method. + */ + protected socket: WebSocket | null = null; + + /** + * The poll instance that mediates the web socket lifecycle. + */ + protected readonly subscription: Poll; + + /** + * Open a web socket and subscribe to its updates. + * + * @returns A promise that rejects when the socket connection is closed. + */ + protected async subscribe(): Promise { + if (this.isDisposed) { + return; + } + return new Promise((_, reject) => { + const Socket = this.options.WebSocket || WebSocket; + const socket = (this.socket = new Socket(this.options.url)); + socket.onclose = () => reject(new Error('socket stream: socket closed')); + socket.onmessage = msg => msg.data && this.emit(JSON.parse(msg.data)); + }); + } +} + +/** + * A namespace for `SocketStream` statics. + */ +export namespace SocketStream { + /** + * Instantiation options for a socket stream. + */ + export interface IOptions { + /** + * The web socket URL to open. + */ + url: string; + + /** + * An optional web socket constructor. + */ + WebSocket?: typeof WebSocket; + } +} From 7bc50928c0efa9d5f6aa20a13edabb44ec847886 Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Wed, 30 Apr 2025 19:27:12 +0100 Subject: [PATCH 2/7] Add SocketStream#factory --- packages/polling/src/socketstream.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts index c5a7b5bc0..ffa0c18bd 100644 --- a/packages/polling/src/socketstream.ts +++ b/packages/polling/src/socketstream.ts @@ -24,11 +24,9 @@ export class SocketStream extends Stream implements IDisposable { * * @param options = The socket stream instantiation options. */ - constructor( - sender: T, - protected readonly options: SocketStream.IOptions - ) { + constructor(sender: T, options: SocketStream.IOptions) { super(sender); + this.factory = () => new (options.WebSocket || WebSocket)(options.url); this.subscription = new Poll({ factory: () => this.subscribe() }); } @@ -66,6 +64,11 @@ export class SocketStream extends Stream implements IDisposable { this.socket?.send(data); } + /** + * A factory that generates a new web socket instance for subscription. + */ + protected readonly factory: () => WebSocket; + /** * The current active socket. This value is updated by the `subscribe` method. */ @@ -86,10 +89,9 @@ export class SocketStream extends Stream implements IDisposable { return; } return new Promise((_, reject) => { - const Socket = this.options.WebSocket || WebSocket; - const socket = (this.socket = new Socket(this.options.url)); - socket.onclose = () => reject(new Error('socket stream: socket closed')); - socket.onmessage = msg => msg.data && this.emit(JSON.parse(msg.data)); + this.socket = this.factory(); + this.socket.onclose = () => reject(new Error('socket stream has closed')); + this.socket.onmessage = ({ data }) => data && this.emit(JSON.parse(data)); }); } } From 502a69b9d8a886d2254f45e200f5b7ccae660d46 Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Wed, 30 Apr 2025 20:27:25 +0100 Subject: [PATCH 3/7] Abstract the underlying poll --- packages/polling/src/socketstream.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts index ffa0c18bd..1a1ac90df 100644 --- a/packages/polling/src/socketstream.ts +++ b/packages/polling/src/socketstream.ts @@ -18,11 +18,11 @@ import { Poll } from './poll'; */ export class SocketStream extends Stream implements IDisposable { /** - * Construct a new socket stream. + * Construct a new web socket stream. * * @param sender - The sender which owns the stream. * - * @param options = The socket stream instantiation options. + * @param options - Web socket `url` and optional `WebSocket` constructor. */ constructor(sender: T, options: SocketStream.IOptions) { super(sender); @@ -41,9 +41,8 @@ export class SocketStream extends Stream implements IDisposable { * Dispose the stream. */ dispose() { - super.stop(); - this.subscription.dispose(); - const { socket } = this; + const { socket, subscription } = this; + subscription.dispose(); if (socket) { this.socket = null; socket.onclose = () => undefined; @@ -53,6 +52,7 @@ export class SocketStream extends Stream implements IDisposable { socket.close(); } Signal.clearData(this); + super.stop(); } /** @@ -75,9 +75,9 @@ export class SocketStream extends Stream implements IDisposable { protected socket: WebSocket | null = null; /** - * The poll instance that mediates the web socket lifecycle. + * A handle to the socket subscription to dispose when necessary. */ - protected readonly subscription: Poll; + protected readonly subscription: IDisposable; /** * Open a web socket and subscribe to its updates. @@ -88,7 +88,7 @@ export class SocketStream extends Stream implements IDisposable { if (this.isDisposed) { return; } - return new Promise((_, reject) => { + return new Promise((_, reject) => { this.socket = this.factory(); this.socket.onclose = () => reject(new Error('socket stream has closed')); this.socket.onmessage = ({ data }) => data && this.emit(JSON.parse(data)); From ca3aef17efe52ab492e50c2730394ef1d0e61a6b Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Tue, 6 May 2025 22:47:25 +0100 Subject: [PATCH 4/7] update API --- packages/polling/src/socketstream.ts | 37 ++++++---------------------- 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts index 1a1ac90df..51be1ea7e 100644 --- a/packages/polling/src/socketstream.ts +++ b/packages/polling/src/socketstream.ts @@ -22,11 +22,13 @@ export class SocketStream extends Stream implements IDisposable { * * @param sender - The sender which owns the stream. * - * @param options - Web socket `url` and optional `WebSocket` constructor. + * @param connector - A factory that returns a new web socket connection. */ - constructor(sender: T, options: SocketStream.IOptions) { + constructor( + sender: T, + protected readonly connector: () => WebSocket + ) { super(sender); - this.factory = () => new (options.WebSocket || WebSocket)(options.url); this.subscription = new Poll({ factory: () => this.subscribe() }); } @@ -56,7 +58,7 @@ export class SocketStream extends Stream implements IDisposable { } /** - * Send a message to the underlying web socket. + * Send a message via the underlying web socket. * * @param data - The payload of the message sent via the web socket. */ @@ -64,11 +66,6 @@ export class SocketStream extends Stream implements IDisposable { this.socket?.send(data); } - /** - * A factory that generates a new web socket instance for subscription. - */ - protected readonly factory: () => WebSocket; - /** * The current active socket. This value is updated by the `subscribe` method. */ @@ -89,29 +86,9 @@ export class SocketStream extends Stream implements IDisposable { return; } return new Promise((_, reject) => { - this.socket = this.factory(); + this.socket = this.connector(); this.socket.onclose = () => reject(new Error('socket stream has closed')); this.socket.onmessage = ({ data }) => data && this.emit(JSON.parse(data)); }); } } - -/** - * A namespace for `SocketStream` statics. - */ -export namespace SocketStream { - /** - * Instantiation options for a socket stream. - */ - export interface IOptions { - /** - * The web socket URL to open. - */ - url: string; - - /** - * An optional web socket constructor. - */ - WebSocket?: typeof WebSocket; - } -} From 85cd242e9c6fcdfbf62315267988cb9c53bbc8ba Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Wed, 7 May 2025 17:25:16 +0100 Subject: [PATCH 5/7] Add rudimentary test --- packages/polling/package.json | 3 ++- packages/polling/src/index.ts | 2 ++ .../polling/tests/src/socketstream.spec.ts | 24 +++++++++++++++++++ yarn.lock | 16 +++++++++++++ 4 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 packages/polling/tests/src/socketstream.spec.ts diff --git a/packages/polling/package.json b/packages/polling/package.json index ec1ba6b4b..ea36f3027 100644 --- a/packages/polling/package.json +++ b/packages/polling/package.json @@ -61,7 +61,8 @@ "rollup-plugin-sourcemaps": "^0.6.3", "terser": "^5.18.1", "tslib": "^2.5.3", - "typescript": "~5.1.3" + "typescript": "~5.1.3", + "ws": "^8.18.2" }, "publishConfig": { "access": "public" diff --git a/packages/polling/src/index.ts b/packages/polling/src/index.ts index 4d66fae59..6f95ecea9 100644 --- a/packages/polling/src/index.ts +++ b/packages/polling/src/index.ts @@ -13,6 +13,8 @@ export { Poll } from './poll'; export { Debouncer, RateLimiter, Throttler } from './ratelimiter'; +export { SocketStream } from './socketstream'; + /** * A readonly poll that calls an asynchronous function with each tick. * diff --git a/packages/polling/tests/src/socketstream.spec.ts b/packages/polling/tests/src/socketstream.spec.ts new file mode 100644 index 000000000..9577c5720 --- /dev/null +++ b/packages/polling/tests/src/socketstream.spec.ts @@ -0,0 +1,24 @@ +// Copyright (c) Jupyter Development Team. +// Distributed under the terms of the Modified BSD License. + +import { expect } from 'chai'; + +import WebSocket from 'ws'; + +import { SocketStream } from '@lumino/polling'; + +describe('SocketStream', () => { + let stream: SocketStream; + + afterEach(() => { + stream.dispose(); + }); + + describe('#constructor()', () => { + it('should create a socket stream', () => { + const url = 'https://www.example.com/'; + stream = new SocketStream(null, () => new WebSocket(url) as any); + expect(stream).to.be.an.instanceof(SocketStream); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index d4efbd479..1cb25e34b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -718,6 +718,7 @@ __metadata: terser: ^5.18.1 tslib: ^2.5.3 typescript: ~5.1.3 + ws: ^8.18.2 languageName: unknown linkType: soft @@ -12600,6 +12601,21 @@ __metadata: languageName: node linkType: hard +"ws@npm:^8.18.2": + version: 8.18.2 + resolution: "ws@npm:8.18.2" + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: ">=5.0.2" + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + checksum: e38beae19ba4d68577ec24eb34fbfab376333fedd10f99b07511a8e842e22dbc102de39adac333a18e4c58868d0703cd5f239b04b345e22402d0ed8c34ea0aa0 + languageName: node + linkType: hard + "xtend@npm:~4.0.1": version: 4.0.2 resolution: "xtend@npm:4.0.2" From 2c81a5e708847def3a853bec95d3869f670cc3c6 Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Thu, 8 May 2025 18:11:37 +0100 Subject: [PATCH 6/7] Update tests --- packages/polling/package.json | 4 +- packages/polling/src/socketstream.ts | 6 +-- packages/polling/tests/src/index.spec.ts | 1 + packages/polling/tests/src/poll.spec.ts | 4 +- .../polling/tests/src/socketstream.spec.ts | 42 +++++++++++++++++-- yarn.lock | 24 ++++------- 6 files changed, 54 insertions(+), 27 deletions(-) diff --git a/packages/polling/package.json b/packages/polling/package.json index ea36f3027..4b0466c9f 100644 --- a/packages/polling/package.json +++ b/packages/polling/package.json @@ -54,6 +54,7 @@ "@web/test-runner-playwright": "^0.11.0", "chai": "^4.3.4", "mocha": "^9.0.3", + "mock-socket": "^9.3.1", "postcss": "^8.4.24", "rimraf": "^5.0.1", "rollup": "^3.25.1", @@ -61,8 +62,7 @@ "rollup-plugin-sourcemaps": "^0.6.3", "terser": "^5.18.1", "tslib": "^2.5.3", - "typescript": "~5.1.3", - "ws": "^8.18.2" + "typescript": "~5.1.3" }, "publishConfig": { "access": "public" diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts index 51be1ea7e..9c8fa4f7b 100644 --- a/packages/polling/src/socketstream.ts +++ b/packages/polling/src/socketstream.ts @@ -5,7 +5,7 @@ import { IDisposable } from '@lumino/disposable'; import { Signal, Stream } from '@lumino/signaling'; -import { Poll } from './poll'; +import { Poll } from '.'; /** * A utility class to wrap and augment a web socket. A socket stream emits web @@ -72,9 +72,9 @@ export class SocketStream extends Stream implements IDisposable { protected socket: WebSocket | null = null; /** - * A handle to the socket subscription to dispose when necessary. + * A handle to the socket subscription. */ - protected readonly subscription: IDisposable; + protected readonly subscription: Poll; /** * Open a web socket and subscribe to its updates. diff --git a/packages/polling/tests/src/index.spec.ts b/packages/polling/tests/src/index.spec.ts index 67c8cf46e..db3f4d7b2 100644 --- a/packages/polling/tests/src/index.spec.ts +++ b/packages/polling/tests/src/index.spec.ts @@ -3,3 +3,4 @@ import './poll.spec'; import './ratelimiter.spec'; +import './socketstream.spec'; diff --git a/packages/polling/tests/src/poll.spec.ts b/packages/polling/tests/src/poll.spec.ts index 3da5d8515..2f514f271 100644 --- a/packages/polling/tests/src/poll.spec.ts +++ b/packages/polling/tests/src/poll.spec.ts @@ -334,11 +334,11 @@ describe('Poll', () => { const tock = (poll: Poll): void => { tocker.push(poll.state.phase); expect(ticker.join(' ')).to.equal(tocker.join(' ')); - poll.tick.then(tock); + poll.tick.then(tock).catch(_ => undefined); }; // Kick off the promise listener, but void its settlement to verify that // the poll's internal sync of the promise and the signal is correct. - poll.tick.then(tock); + void poll.tick.then(tock); await poll.stop(); await poll.start(); await poll.tick; diff --git a/packages/polling/tests/src/socketstream.spec.ts b/packages/polling/tests/src/socketstream.spec.ts index 9577c5720..f7fc6c487 100644 --- a/packages/polling/tests/src/socketstream.spec.ts +++ b/packages/polling/tests/src/socketstream.spec.ts @@ -3,12 +3,35 @@ import { expect } from 'chai'; -import WebSocket from 'ws'; +import { WebSocket } from 'mock-socket'; -import { SocketStream } from '@lumino/polling'; +import { IPoll, SocketStream } from '@lumino/polling'; + +window.WebSocket = WebSocket; + +/** + * Return a promise that resolves in the given milliseconds with the given value. + */ +function sleep(milliseconds: number = 0, value?: T): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + resolve(value); + }, milliseconds); + }); +} + +class TestSocketStream extends SocketStream { + constructor(sender: T, connector: () => WebSocket) { + super(sender, connector); + this.subscription.ticked.connect((_, state) => { + this.phases.push(state.phase); + }); + } + phases: IPoll.Phase<'standby'>[] = []; +} describe('SocketStream', () => { - let stream: SocketStream; + let stream: TestSocketStream; afterEach(() => { stream.dispose(); @@ -17,8 +40,19 @@ describe('SocketStream', () => { describe('#constructor()', () => { it('should create a socket stream', () => { const url = 'https://www.example.com/'; - stream = new SocketStream(null, () => new WebSocket(url) as any); + stream = new TestSocketStream(null, () => new WebSocket(url)); expect(stream).to.be.an.instanceof(SocketStream); }); }); + + describe('#dispose()', () => { + it('should clean up after itself upon dispose', async () => { + const url = 'https://www.example.com/'; + stream = new TestSocketStream(null, () => new WebSocket(url)); + await sleep(500); + stream.dispose(); + expect(stream.phases[0]).to.equal('started'); + stream.phases.slice(1).every(phase => expect(phase).to.equal('rejected')); + }); + }); }); diff --git a/yarn.lock b/yarn.lock index 1cb25e34b..2ac79010a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -710,6 +710,7 @@ __metadata: "@web/test-runner-playwright": ^0.11.0 chai: ^4.3.4 mocha: ^9.0.3 + mock-socket: ^9.3.1 postcss: ^8.4.24 rimraf: ^5.0.1 rollup: ^3.25.1 @@ -718,7 +719,6 @@ __metadata: terser: ^5.18.1 tslib: ^2.5.3 typescript: ~5.1.3 - ws: ^8.18.2 languageName: unknown linkType: soft @@ -8321,6 +8321,13 @@ __metadata: languageName: node linkType: hard +"mock-socket@npm:^9.3.1": + version: 9.3.1 + resolution: "mock-socket@npm:9.3.1" + checksum: cb2dde4fc5dde280dd5ccb78eaaa223382ee16437f46b86558017655584ad08c22e733bde2dd5cc86927def506b6caeb0147e3167b9a62d70d5cf19d44103853 + languageName: node + linkType: hard + "modify-values@npm:^1.0.1": version: 1.0.1 resolution: "modify-values@npm:1.0.1" @@ -12601,21 +12608,6 @@ __metadata: languageName: node linkType: hard -"ws@npm:^8.18.2": - version: 8.18.2 - resolution: "ws@npm:8.18.2" - peerDependencies: - bufferutil: ^4.0.1 - utf-8-validate: ">=5.0.2" - peerDependenciesMeta: - bufferutil: - optional: true - utf-8-validate: - optional: true - checksum: e38beae19ba4d68577ec24eb34fbfab376333fedd10f99b07511a8e842e22dbc102de39adac333a18e4c58868d0703cd5f239b04b345e22402d0ed8c34ea0aa0 - languageName: node - linkType: hard - "xtend@npm:~4.0.1": version: 4.0.2 resolution: "xtend@npm:4.0.2" From 505a0dcc051b12f10ca87f772fed352d9e06e4ca Mon Sep 17 00:00:00 2001 From: "Afshin T. Darian" Date: Wed, 14 May 2025 13:38:49 +0100 Subject: [PATCH 7/7] Update API, add more tests --- packages/polling/src/socketstream.ts | 34 ++++++------ .../polling/tests/src/socketstream.spec.ts | 54 ++++++++++++------- 2 files changed, 54 insertions(+), 34 deletions(-) diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts index 9c8fa4f7b..5361849ce 100644 --- a/packages/polling/src/socketstream.ts +++ b/packages/polling/src/socketstream.ts @@ -29,30 +29,29 @@ export class SocketStream extends Stream implements IDisposable { protected readonly connector: () => WebSocket ) { super(sender); - this.subscription = new Poll({ factory: () => this.subscribe() }); } /** * Whether the stream is disposed. */ get isDisposed() { - return this.subscription.isDisposed; + return this.connection.isDisposed; } /** * Dispose the stream. */ dispose() { - const { socket, subscription } = this; - subscription.dispose(); + const { connection, socket } = this; + connection.dispose(); if (socket) { - this.socket = null; - socket.onclose = () => undefined; - socket.onerror = () => undefined; - socket.onmessage = () => undefined; - socket.onopen = () => undefined; + socket.onclose = null; + socket.onerror = null; + socket.onmessage = null; + socket.onopen = null; socket.close(); } + this.socket = null; Signal.clearData(this); super.stop(); } @@ -63,25 +62,28 @@ export class SocketStream extends Stream implements IDisposable { * @param data - The payload of the message sent via the web socket. */ send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { - this.socket?.send(data); + if (this.isDisposed) { + return; + } + this.socket!.send(data); } /** - * The current active socket. This value is updated by the `subscribe` method. + * A handle to the socket connection poll. */ - protected socket: WebSocket | null = null; + protected readonly connection = new Poll({ factory: () => this.reconnect() }); /** - * A handle to the socket subscription. + * The current active socket. This value is updated by the `reconnect` method. */ - protected readonly subscription: Poll; + protected socket: WebSocket | null = null; /** - * Open a web socket and subscribe to its updates. + * (Re)open a web socket connection and subscribe to its updates. * * @returns A promise that rejects when the socket connection is closed. */ - protected async subscribe(): Promise { + protected async reconnect(): Promise { if (this.isDisposed) { return; } diff --git a/packages/polling/tests/src/socketstream.spec.ts b/packages/polling/tests/src/socketstream.spec.ts index f7fc6c487..cd4bd3693 100644 --- a/packages/polling/tests/src/socketstream.spec.ts +++ b/packages/polling/tests/src/socketstream.spec.ts @@ -3,43 +3,52 @@ import { expect } from 'chai'; -import { WebSocket } from 'mock-socket'; +import { Server, WebSocket } from 'mock-socket'; import { IPoll, SocketStream } from '@lumino/polling'; -window.WebSocket = WebSocket; - /** - * Return a promise that resolves in the given milliseconds with the given value. + * Returns a promise that resolves to `value` after `milliseconds` elapse. */ -function sleep(milliseconds: number = 0, value?: T): Promise { - return new Promise((resolve, reject) => { - setTimeout(() => { - resolve(value); - }, milliseconds); - }); -} +const sleep = (milliseconds: number = 0, value?: unknown): Promise => + new Promise(resolve => void setTimeout(() => resolve(value), milliseconds)); class TestSocketStream extends SocketStream { constructor(sender: T, connector: () => WebSocket) { super(sender, connector); - this.subscription.ticked.connect((_, state) => { + this.connection.ticked.connect((_, state) => { this.phases.push(state.phase); }); + void this.collect(); } - phases: IPoll.Phase<'standby'>[] = []; + + async collect() { + for await (const message of this) { + this.messages.push(message); + } + } + + messages: U[] = []; + phases: IPoll.Phase[] = []; } describe('SocketStream', () => { + const url = 'ws://localhost:8888'; + let server: Server; let stream: TestSocketStream; + before(async () => { + server = new Server(url); + }); + afterEach(() => { stream.dispose(); }); + after(async () => new Promise(resolve => server.stop(() => resolve()))); + describe('#constructor()', () => { it('should create a socket stream', () => { - const url = 'https://www.example.com/'; stream = new TestSocketStream(null, () => new WebSocket(url)); expect(stream).to.be.an.instanceof(SocketStream); }); @@ -47,12 +56,21 @@ describe('SocketStream', () => { describe('#dispose()', () => { it('should clean up after itself upon dispose', async () => { - const url = 'https://www.example.com/'; stream = new TestSocketStream(null, () => new WebSocket(url)); - await sleep(500); stream.dispose(); - expect(stream.phases[0]).to.equal('started'); - stream.phases.slice(1).every(phase => expect(phase).to.equal('rejected')); + expect(stream.isDisposed).to.equal(true); + }); + }); + + describe('[Symbol.asyncIterator]', () => { + it('should receive socket messages', async () => { + stream = new TestSocketStream(null, () => new WebSocket(url)); + server.on('connection', socket => { + socket.send('{ "alpha": 1 }'); + socket.send('{ "bravo": 2 }'); + }); + await sleep(250); + expect(stream.messages).to.eql([{ alpha: 1 }, { bravo: 2 }]); }); }); });