diff --git a/packages/polling/package.json b/packages/polling/package.json index ec1ba6b4b..4b0466c9f 100644 --- a/packages/polling/package.json +++ b/packages/polling/package.json @@ -54,6 +54,7 @@ "@web/test-runner-playwright": "^0.11.0", "chai": "^4.3.4", "mocha": "^9.0.3", + "mock-socket": "^9.3.1", "postcss": "^8.4.24", "rimraf": "^5.0.1", "rollup": "^3.25.1", diff --git a/packages/polling/src/index.ts b/packages/polling/src/index.ts index 4d66fae59..6f95ecea9 100644 --- a/packages/polling/src/index.ts +++ b/packages/polling/src/index.ts @@ -13,6 +13,8 @@ export { Poll } from './poll'; export { Debouncer, RateLimiter, Throttler } from './ratelimiter'; +export { SocketStream } from './socketstream'; + /** * A readonly poll that calls an asynchronous function with each tick. * diff --git a/packages/polling/src/socketstream.ts b/packages/polling/src/socketstream.ts new file mode 100644 index 000000000..5361849ce --- /dev/null +++ b/packages/polling/src/socketstream.ts @@ -0,0 +1,96 @@ +// Copyright (c) Jupyter Development Team. +// Distributed under the terms of the Modified BSD License. + +import { IDisposable } from '@lumino/disposable'; + +import { Signal, Stream } from '@lumino/signaling'; + +import { Poll } from '.'; + +/** + * A utility class to wrap and augment a web socket. A socket stream emits web + * socket messages as an async iterable and also as a Lumino signal. It uses + * an internal poll instance to manage reconnection logic automatically. + * + * @typeparam T - The type of the stream owner (i.e., the `sender` of a signal). + * + * @typeparam U - The type of the socket stream's emissions. + */ +export class SocketStream extends Stream implements IDisposable { + /** + * Construct a new web socket stream. + * + * @param sender - The sender which owns the stream. + * + * @param connector - A factory that returns a new web socket connection. + */ + constructor( + sender: T, + protected readonly connector: () => WebSocket + ) { + super(sender); + } + + /** + * Whether the stream is disposed. + */ + get isDisposed() { + return this.connection.isDisposed; + } + + /** + * Dispose the stream. + */ + dispose() { + const { connection, socket } = this; + connection.dispose(); + if (socket) { + socket.onclose = null; + socket.onerror = null; + socket.onmessage = null; + socket.onopen = null; + socket.close(); + } + this.socket = null; + Signal.clearData(this); + super.stop(); + } + + /** + * Send a message via the underlying web socket. + * + * @param data - The payload of the message sent via the web socket. + */ + send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + if (this.isDisposed) { + return; + } + this.socket!.send(data); + } + + /** + * A handle to the socket connection poll. + */ + protected readonly connection = new Poll({ factory: () => this.reconnect() }); + + /** + * The current active socket. This value is updated by the `reconnect` method. + */ + protected socket: WebSocket | null = null; + + /** + * (Re)open a web socket connection and subscribe to its updates. + * + * @returns A promise that rejects when the socket connection is closed. + */ + protected async reconnect(): Promise { + if (this.isDisposed) { + return; + } + return new Promise((_, reject) => { + this.socket = this.connector(); + this.socket.onclose = () => reject(new Error('socket stream has closed')); + this.socket.onmessage = ({ data }) => data && this.emit(JSON.parse(data)); + }); + } +} diff --git a/packages/polling/tests/src/index.spec.ts b/packages/polling/tests/src/index.spec.ts index 67c8cf46e..db3f4d7b2 100644 --- a/packages/polling/tests/src/index.spec.ts +++ b/packages/polling/tests/src/index.spec.ts @@ -3,3 +3,4 @@ import './poll.spec'; import './ratelimiter.spec'; +import './socketstream.spec'; diff --git a/packages/polling/tests/src/poll.spec.ts b/packages/polling/tests/src/poll.spec.ts index 3da5d8515..2f514f271 100644 --- a/packages/polling/tests/src/poll.spec.ts +++ b/packages/polling/tests/src/poll.spec.ts @@ -334,11 +334,11 @@ describe('Poll', () => { const tock = (poll: Poll): void => { tocker.push(poll.state.phase); expect(ticker.join(' ')).to.equal(tocker.join(' ')); - poll.tick.then(tock); + poll.tick.then(tock).catch(_ => undefined); }; // Kick off the promise listener, but void its settlement to verify that // the poll's internal sync of the promise and the signal is correct. - poll.tick.then(tock); + void poll.tick.then(tock); await poll.stop(); await poll.start(); await poll.tick; diff --git a/packages/polling/tests/src/socketstream.spec.ts b/packages/polling/tests/src/socketstream.spec.ts new file mode 100644 index 000000000..cd4bd3693 --- /dev/null +++ b/packages/polling/tests/src/socketstream.spec.ts @@ -0,0 +1,76 @@ +// Copyright (c) Jupyter Development Team. +// Distributed under the terms of the Modified BSD License. + +import { expect } from 'chai'; + +import { Server, WebSocket } from 'mock-socket'; + +import { IPoll, SocketStream } from '@lumino/polling'; + +/** + * Returns a promise that resolves to `value` after `milliseconds` elapse. + */ +const sleep = (milliseconds: number = 0, value?: unknown): Promise => + new Promise(resolve => void setTimeout(() => resolve(value), milliseconds)); + +class TestSocketStream extends SocketStream { + constructor(sender: T, connector: () => WebSocket) { + super(sender, connector); + this.connection.ticked.connect((_, state) => { + this.phases.push(state.phase); + }); + void this.collect(); + } + + async collect() { + for await (const message of this) { + this.messages.push(message); + } + } + + messages: U[] = []; + phases: IPoll.Phase[] = []; +} + +describe('SocketStream', () => { + const url = 'ws://localhost:8888'; + let server: Server; + let stream: TestSocketStream; + + before(async () => { + server = new Server(url); + }); + + afterEach(() => { + stream.dispose(); + }); + + after(async () => new Promise(resolve => server.stop(() => resolve()))); + + describe('#constructor()', () => { + it('should create a socket stream', () => { + stream = new TestSocketStream(null, () => new WebSocket(url)); + expect(stream).to.be.an.instanceof(SocketStream); + }); + }); + + describe('#dispose()', () => { + it('should clean up after itself upon dispose', async () => { + stream = new TestSocketStream(null, () => new WebSocket(url)); + stream.dispose(); + expect(stream.isDisposed).to.equal(true); + }); + }); + + describe('[Symbol.asyncIterator]', () => { + it('should receive socket messages', async () => { + stream = new TestSocketStream(null, () => new WebSocket(url)); + server.on('connection', socket => { + socket.send('{ "alpha": 1 }'); + socket.send('{ "bravo": 2 }'); + }); + await sleep(250); + expect(stream.messages).to.eql([{ alpha: 1 }, { bravo: 2 }]); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index d4efbd479..2ac79010a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -710,6 +710,7 @@ __metadata: "@web/test-runner-playwright": ^0.11.0 chai: ^4.3.4 mocha: ^9.0.3 + mock-socket: ^9.3.1 postcss: ^8.4.24 rimraf: ^5.0.1 rollup: ^3.25.1 @@ -8320,6 +8321,13 @@ __metadata: languageName: node linkType: hard +"mock-socket@npm:^9.3.1": + version: 9.3.1 + resolution: "mock-socket@npm:9.3.1" + checksum: cb2dde4fc5dde280dd5ccb78eaaa223382ee16437f46b86558017655584ad08c22e733bde2dd5cc86927def506b6caeb0147e3167b9a62d70d5cf19d44103853 + languageName: node + linkType: hard + "modify-values@npm:^1.0.1": version: 1.0.1 resolution: "modify-values@npm:1.0.1"