Skip to content

Commit 271fb76

Browse files
committed
events: Add addDisposableListener method to EventEmitter
1 parent f58613a commit 271fb76

File tree

5 files changed

+150
-34
lines changed

5 files changed

+150
-34
lines changed

doc/api/events.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,33 @@ changes:
504504

505505
The `'removeListener'` event is emitted _after_ the `listener` is removed.
506506

507+
### `emitter.addDisposableListener(eventName, listener[, options])`
508+
509+
<!-- YAML
510+
added: REPLACEME
511+
-->
512+
513+
> Stability: 1 - Experimental
514+
515+
* `eventName` {string|symbol} The name of the event.
516+
* `listener` {Function} The callback function
517+
* `options` {Object}
518+
* `once` {boolean} If `true`, the listener will be removed after being called
519+
once.
520+
* Returns: {Object} An object with a dispose method that will remove the listener.
521+
The function will also have a `Symbol.dispose` method so the function can
522+
be used with the `using` keyword.
523+
524+
```mjs
525+
import { EventEmitter } from 'node:events';
526+
const myEmitter = new EventEmitter();
527+
{
528+
using disposer = myEmitter.addDisposableListener('event', console.log);
529+
console.log(myEmitter.listenerCount('event')); // Prints: 1
530+
}
531+
console.log(myEmitter.listenerCount('event')); // Prints: 0
532+
```
533+
507534
### `emitter.addListener(eventName, listener)`
508535

509536
<!-- YAML

lib/events.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,3 +1210,45 @@ function listenersController() {
12101210
},
12111211
};
12121212
}
1213+
1214+
function makeDisposer(self, type, listener) {
1215+
return {
1216+
dispose() {
1217+
if (self === undefined) return;
1218+
self.removeListener(type, listener);
1219+
self = undefined;
1220+
},
1221+
[SymbolDispose]() {
1222+
this.dispose();
1223+
},
1224+
};
1225+
}
1226+
1227+
/**
1228+
* A variation on `addListener` that returns a function that can be called
1229+
* to remove the listener. The function includes a Symbol.dispose property
1230+
* that allows the function to be used with `using` statements.
1231+
* @param {string|symbol} type
1232+
* @param {Function} listener
1233+
* @param {{
1234+
* once?: boolean;
1235+
* }} [options]
1236+
* @returns {{ dispose: Function, [SymbolDispose]: Function }}
1237+
*/
1238+
function addDisposableListener(type, listener, options = kEmptyObject) {
1239+
validateObject(options, 'options');
1240+
const {
1241+
once = false,
1242+
} = options;
1243+
validateBoolean(once, 'options.once');
1244+
if (once) {
1245+
this.once(type, listener);
1246+
} else {
1247+
this.on(type, listener);
1248+
}
1249+
1250+
// We use a function to create the disposer to further limiit what
1251+
// the closure captures.
1252+
return makeDisposer(this, type, listener);
1253+
};
1254+
EventEmitter.prototype.addDisposableListener = addDisposableListener;

lib/internal/streams/end-of-stream.js

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
'use strict';
55

66
const {
7+
DisposableStack,
78
Promise,
89
PromisePrototypeThen,
910
SymbolDispose,
@@ -177,36 +178,38 @@ function eos(stream, options, callback) {
177178
callback.call(stream);
178179
};
179180

181+
const disposableStack = new DisposableStack();
182+
180183
const onrequest = () => {
181-
stream.req.on('finish', onfinish);
184+
disposableStack.use(stream.req.addDisposableListener('finish', onfinish));
182185
};
183186

184187
if (isRequest(stream)) {
185-
stream.on('complete', onfinish);
188+
disposableStack.use(stream.addDisposableListener('complete', onfinish));
186189
if (!willEmitClose) {
187-
stream.on('abort', onclose);
190+
disposableStack.use(stream.addDisposableListener('abort', onclose));
188191
}
189192
if (stream.req) {
190193
onrequest();
191194
} else {
192-
stream.on('request', onrequest);
195+
disposableStack.use(stream.addDisposableListener('request', onrequest));
193196
}
194197
} else if (writable && !wState) { // legacy streams
195-
stream.on('end', onlegacyfinish);
196-
stream.on('close', onlegacyfinish);
198+
disposableStack.use(stream.addDisposableListener('end', onlegacyfinish));
199+
disposableStack.use(stream.addDisposableListener('close', onlegacyfinish));
197200
}
198201

199202
// Not all streams will emit 'close' after 'aborted'.
200203
if (!willEmitClose && typeof stream.aborted === 'boolean') {
201-
stream.on('aborted', onclose);
204+
disposableStack.use(stream.addDisposableListener('aborted', onclose));
202205
}
203206

204-
stream.on('end', onend);
205-
stream.on('finish', onfinish);
207+
disposableStack.use(stream.addDisposableListener('end', onend));
208+
disposableStack.use(stream.addDisposableListener('finish', onfinish));
206209
if (options.error !== false) {
207-
stream.on('error', onerror);
210+
disposableStack.use(stream.addDisposableListener('error', onerror));
208211
}
209-
stream.on('close', onclose);
212+
disposableStack.use(stream.addDisposableListener('close', onclose));
210213

211214
if (closed) {
212215
process.nextTick(onclose);
@@ -233,18 +236,10 @@ function eos(stream, options, callback) {
233236

234237
const cleanup = () => {
235238
callback = nop;
236-
stream.removeListener('aborted', onclose);
237-
stream.removeListener('complete', onfinish);
238-
stream.removeListener('abort', onclose);
239-
stream.removeListener('request', onrequest);
240-
if (stream.req) stream.req.removeListener('finish', onfinish);
241-
stream.removeListener('end', onlegacyfinish);
242-
stream.removeListener('close', onlegacyfinish);
243-
stream.removeListener('finish', onfinish);
244-
stream.removeListener('end', onend);
245-
stream.removeListener('error', onerror);
246-
stream.removeListener('close', onclose);
239+
disposableStack.dispose();
247240
};
241+
// Arrange for the cleanup function to call itself when disposed.
242+
cleanup[SymbolDispose] = cleanup;
248243

249244
if (options.signal && !closed) {
250245
const abort = () => {

lib/internal/test_runner/harness.js

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ const {
77
PromiseWithResolvers,
88
SafeMap,
99
SafePromiseAllReturnVoid,
10+
globalThis: { DisposableStack },
1011
} = primordials;
12+
1113
const { getCallerLocation } = internalBinding('util');
1214
const {
1315
createHook,
@@ -230,6 +232,9 @@ function setupProcessState(root, globalOptions) {
230232
const rejectionHandler =
231233
createProcessEventHandler('unhandledRejection', root);
232234
const coverage = configureCoverage(root, globalOptions);
235+
236+
const disposableStack = new DisposableStack();
237+
233238
const exitHandler = async (kill) => {
234239
if (root.subtests.length === 0 && (root.hooks.before.length > 0 || root.hooks.after.length > 0)) {
235240
// Run global before/after hooks in case there are no tests
@@ -254,27 +259,26 @@ function setupProcessState(root, globalOptions) {
254259
}
255260

256261
hook.disable();
257-
process.removeListener('uncaughtException', exceptionHandler);
258-
process.removeListener('unhandledRejection', rejectionHandler);
259-
process.removeListener('beforeExit', exitHandler);
260-
if (globalOptions.isTestRunner) {
261-
process.removeListener('SIGINT', terminationHandler);
262-
process.removeListener('SIGTERM', terminationHandler);
263-
}
262+
disposableStack.dispose();
264263
};
265264

266265
const terminationHandler = async () => {
267266
await exitHandler(true);
268267
process.exit();
269268
};
270269

271-
process.on('uncaughtException', exceptionHandler);
272-
process.on('unhandledRejection', rejectionHandler);
273-
process.on('beforeExit', exitHandler);
270+
disposableStack.use(
271+
process.addDisposableListener('uncaughtException', exceptionHandler));
272+
disposableStack.use(
273+
process.addDisposableListener('unhandledRejection', rejectionHandler));
274+
disposableStack.use(
275+
process.addDisposableListener('beforeExit', exitHandler));
274276
// TODO(MoLow): Make it configurable to hook when isTestRunner === false.
275277
if (globalOptions.isTestRunner) {
276-
process.on('SIGINT', terminationHandler);
277-
process.on('SIGTERM', terminationHandler);
278+
disposableStack.use(
279+
process.addDisposableListener('SIGINT', terminationHandler));
280+
disposableStack.use(
281+
process.addDisposableListener('SIGTERM', terminationHandler));
278282
}
279283

280284
root.harness.coverage = FunctionPrototypeBind(collectCoverage, null, root, coverage);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { strictEqual, throws } = require('assert');
5+
const { EventEmitter } = require('events');
6+
7+
const emitter = new EventEmitter();
8+
9+
{
10+
// Verify that the disposable stack removes the handlers
11+
// when the stack is disposed.
12+
using ds = new DisposableStack();
13+
ds.use(emitter.addDisposableListener('foo', common.mustCall()));
14+
ds.use(emitter.addDisposableListener('bar', common.mustCall()));
15+
ds.use(emitter.addDisposableListener('baz', common.mustNotCall()),
16+
{ once: true });
17+
emitter.emit('foo');
18+
emitter.emit('bar');
19+
strictEqual(emitter.listenerCount('foo'), 1);
20+
strictEqual(emitter.listenerCount('bar'), 1);
21+
22+
// The disposer returned by addDisposableListener can be called manually.
23+
const disposer = emitter.addDisposableListener('foo', common.mustNotCall());
24+
strictEqual(emitter.listenerCount('foo'), 2);
25+
disposer.dispose();
26+
strictEqual(emitter.listenerCount('foo'), 1);
27+
// Disposer is callable multiple times without error.
28+
disposer.dispose();
29+
}
30+
emitter.emit('foo');
31+
emitter.emit('bar');
32+
emitter.emit('baz');
33+
strictEqual(emitter.listenerCount('foo'), 0);
34+
strictEqual(emitter.listenerCount('bar'), 0);
35+
36+
// ============================================================================
37+
// Type checking on inputs
38+
throws(() => emitter.addDisposableListener('foo', 'not a function'), {
39+
code: 'ERR_INVALID_ARG_TYPE',
40+
});
41+
42+
throws(() => emitter.addDisposableListener('foo', () => {}, ''), {
43+
code: 'ERR_INVALID_ARG_TYPE',
44+
});
45+
46+
throws(() => emitter.addDisposableListener('foo', () => {}, { once: '' }), {
47+
code: 'ERR_INVALID_ARG_TYPE',
48+
});

0 commit comments

Comments
 (0)