Skip to content

Commit dadd875

Browse files
committed
stream: improve validation of options in stream/iter
1 parent f177ef3 commit dadd875

File tree

9 files changed

+684
-88
lines changed

9 files changed

+684
-88
lines changed

lib/internal/streams/iter/broadcast.js

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ const {
3232
ERR_INVALID_STATE,
3333
},
3434
} = require('internal/errors');
35+
const {
36+
validateAbortSignal,
37+
validateInteger,
38+
validateObject,
39+
} = require('internal/validators');
3540

3641
const {
3742
broadcastProtocol,
@@ -48,6 +53,7 @@ const {
4853
} = require('internal/streams/iter/pull');
4954

5055
const {
56+
kMultiConsumerDefaultHWM,
5157
allUint8Array,
5258
toUint8Array,
5359
validateBackpressure,
@@ -639,23 +645,35 @@ class BroadcastWriter {
639645
* @param {{ highWaterMark?: number, backpressure?: string, signal?: AbortSignal }} [options]
640646
* @returns {{ writer: Writer, broadcast: Broadcast }}
641647
*/
642-
function broadcast(options) {
648+
function broadcast(options = { __proto__: null }) {
649+
validateObject(options, 'options');
650+
const {
651+
highWaterMark = kMultiConsumerDefaultHWM,
652+
backpressure = 'strict',
653+
signal,
654+
} = options;
655+
validateInteger(highWaterMark, 'options.highWaterMark', 1);
656+
validateBackpressure(backpressure);
657+
if (signal !== undefined) {
658+
validateAbortSignal(signal, 'options.signal');
659+
}
660+
643661
const opts = {
644662
__proto__: null,
645-
highWaterMark: MathMax(1, options?.highWaterMark ?? 16),
646-
backpressure: options?.backpressure ?? 'strict',
647-
signal: options?.signal,
663+
highWaterMark: MathMax(1, highWaterMark),
664+
backpressure,
665+
signal,
648666
};
649667

650668
const broadcastImpl = new BroadcastImpl(opts);
651669
const writer = new BroadcastWriter(broadcastImpl);
652670
broadcastImpl.setWriter(writer);
653671

654-
if (opts.signal) {
655-
if (opts.signal.aborted) {
672+
if (signal) {
673+
if (signal.aborted) {
656674
broadcastImpl.cancel();
657675
} else {
658-
opts.signal.addEventListener('abort', () => {
676+
signal.addEventListener('abort', () => {
659677
broadcastImpl.cancel();
660678
}, { __proto__: null, once: true });
661679
}
@@ -684,6 +702,11 @@ const Broadcast = {
684702
return { __proto__: null, writer: { __proto__: null }, broadcast: bc };
685703
}
686704

705+
if (!isAsyncIterable(input) && !isSyncIterable(input)) {
706+
throw new ERR_INVALID_ARG_TYPE(
707+
'input', ['Broadcastable', 'AsyncIterable', 'Iterable'], input);
708+
}
709+
687710
const result = broadcast(options);
688711
const signal = options?.signal;
689712

lib/internal/streams/iter/consumers.js

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,17 @@ const {
2929
const {
3030
codes: {
3131
ERR_INVALID_ARG_TYPE,
32+
ERR_INVALID_ARG_VALUE,
3233
ERR_OUT_OF_RANGE,
3334
},
3435
} = require('internal/errors');
3536
const { TextDecoder } = require('internal/encoding');
37+
const {
38+
validateAbortSignal,
39+
validateFunction,
40+
validateInteger,
41+
validateObject,
42+
} = require('internal/validators');
3643

3744
const {
3845
isAsyncIterable,
@@ -189,6 +196,49 @@ function toArrayBuffer(data) {
189196
byteOffset + byteLength);
190197
}
191198

199+
// =============================================================================
200+
// Shared option validation
201+
// =============================================================================
202+
203+
function validateConsumerOptions(options) {
204+
validateObject(options, 'options');
205+
if (options.signal !== undefined) {
206+
validateAbortSignal(options.signal, 'options.signal');
207+
}
208+
if (options.limit !== undefined) {
209+
validateInteger(options.limit, 'options.limit', 0);
210+
}
211+
if (options.encoding !== undefined) {
212+
if (typeof options.encoding !== 'string') {
213+
throw new ERR_INVALID_ARG_TYPE('options.encoding', 'string',
214+
options.encoding);
215+
}
216+
try {
217+
new TextDecoder(options.encoding);
218+
} catch {
219+
throw new ERR_INVALID_ARG_VALUE('options.encoding', options.encoding);
220+
}
221+
}
222+
}
223+
224+
function validateSyncConsumerOptions(options) {
225+
validateObject(options, 'options');
226+
if (options.limit !== undefined) {
227+
validateInteger(options.limit, 'options.limit', 0);
228+
}
229+
if (options.encoding !== undefined) {
230+
if (typeof options.encoding !== 'string') {
231+
throw new ERR_INVALID_ARG_TYPE('options.encoding', 'string',
232+
options.encoding);
233+
}
234+
try {
235+
new TextDecoder(options.encoding);
236+
} catch {
237+
throw new ERR_INVALID_ARG_VALUE('options.encoding', options.encoding);
238+
}
239+
}
240+
}
241+
192242
// =============================================================================
193243
// Sync Consumers
194244
// =============================================================================
@@ -199,7 +249,8 @@ function toArrayBuffer(data) {
199249
* @param {{ limit?: number }} [options]
200250
* @returns {Uint8Array}
201251
*/
202-
function bytesSync(source, options) {
252+
function bytesSync(source, options = { __proto__: null }) {
253+
validateSyncConsumerOptions(options);
203254
return concatBytes(collectSync(source, options?.limit));
204255
}
205256

@@ -209,9 +260,10 @@ function bytesSync(source, options) {
209260
* @param {{ encoding?: string, limit?: number }} [options]
210261
* @returns {string}
211262
*/
212-
function textSync(source, options) {
213-
const data = bytesSync(source, options);
214-
const decoder = new TextDecoder(options?.encoding ?? 'utf-8', {
263+
function textSync(source, options = { __proto__: null }) {
264+
validateSyncConsumerOptions(options);
265+
const data = concatBytes(collectSync(source, options.limit));
266+
const decoder = new TextDecoder(options.encoding ?? 'utf-8', {
215267
__proto__: null,
216268
fatal: true,
217269
ignoreBOM: true,
@@ -225,8 +277,9 @@ function textSync(source, options) {
225277
* @param {{ limit?: number }} [options]
226278
* @returns {ArrayBuffer}
227279
*/
228-
function arrayBufferSync(source, options) {
229-
return toArrayBuffer(bytesSync(source, options));
280+
function arrayBufferSync(source, options = { __proto__: null }) {
281+
validateSyncConsumerOptions(options);
282+
return toArrayBuffer(concatBytes(collectSync(source, options.limit)));
230283
}
231284

232285
/**
@@ -235,8 +288,9 @@ function arrayBufferSync(source, options) {
235288
* @param {{ limit?: number }} [options]
236289
* @returns {Uint8Array[]}
237290
*/
238-
function arraySync(source, options) {
239-
return collectSync(source, options?.limit);
291+
function arraySync(source, options = { __proto__: null }) {
292+
validateSyncConsumerOptions(options);
293+
return collectSync(source, options.limit);
240294
}
241295

242296
// =============================================================================
@@ -249,8 +303,9 @@ function arraySync(source, options) {
249303
* @param {{ signal?: AbortSignal, limit?: number }} [options]
250304
* @returns {Promise<Uint8Array>}
251305
*/
252-
async function bytes(source, options) {
253-
const chunks = await collectAsync(source, options?.signal, options?.limit);
306+
async function bytes(source, options = { __proto__: null }) {
307+
validateConsumerOptions(options);
308+
const chunks = await collectAsync(source, options.signal, options.limit);
254309
return concatBytes(chunks);
255310
}
256311

@@ -260,9 +315,11 @@ async function bytes(source, options) {
260315
* @param {{ encoding?: string, signal?: AbortSignal, limit?: number }} [options]
261316
* @returns {Promise<string>}
262317
*/
263-
async function text(source, options) {
264-
const data = await bytes(source, options);
265-
const decoder = new TextDecoder(options?.encoding ?? 'utf-8', {
318+
async function text(source, options = { __proto__: null }) {
319+
validateConsumerOptions(options);
320+
const chunks = await collectAsync(source, options.signal, options.limit);
321+
const data = concatBytes(chunks);
322+
const decoder = new TextDecoder(options.encoding ?? 'utf-8', {
266323
__proto__: null,
267324
fatal: true,
268325
ignoreBOM: true,
@@ -276,8 +333,10 @@ async function text(source, options) {
276333
* @param {{ signal?: AbortSignal, limit?: number }} [options]
277334
* @returns {Promise<ArrayBuffer>}
278335
*/
279-
async function arrayBuffer(source, options) {
280-
return toArrayBuffer(await bytes(source, options));
336+
async function arrayBuffer(source, options = { __proto__: null }) {
337+
validateConsumerOptions(options);
338+
const chunks = await collectAsync(source, options.signal, options.limit);
339+
return toArrayBuffer(concatBytes(chunks));
281340
}
282341

283342
/**
@@ -286,8 +345,9 @@ async function arrayBuffer(source, options) {
286345
* @param {{ signal?: AbortSignal, limit?: number }} [options]
287346
* @returns {Promise<Uint8Array[]>}
288347
*/
289-
async function array(source, options) {
290-
return collectAsync(source, options?.signal, options?.limit);
348+
async function array(source, options = { __proto__: null }) {
349+
validateConsumerOptions(options);
350+
return collectAsync(source, options.signal, options.limit);
291351
}
292352

293353
// =============================================================================
@@ -300,6 +360,7 @@ async function array(source, options) {
300360
* @returns {Function}
301361
*/
302362
function tap(callback) {
363+
validateFunction(callback, 'callback');
303364
return async (chunks, options) => {
304365
await callback(chunks, options);
305366
return chunks;
@@ -312,6 +373,7 @@ function tap(callback) {
312373
* @returns {Function}
313374
*/
314375
function tapSync(callback) {
376+
validateFunction(callback, 'callback');
315377
return (chunks) => {
316378
callback(chunks);
317379
return chunks;
@@ -370,6 +432,10 @@ function merge(...args) {
370432
sources = args;
371433
}
372434

435+
if (options?.signal !== undefined) {
436+
validateAbortSignal(options.signal, 'options.signal');
437+
}
438+
373439
return {
374440
__proto__: null,
375441
async *[SymbolAsyncIterator]() {

lib/internal/streams/iter/duplex.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,25 @@ const {
1212
const {
1313
push,
1414
} = require('internal/streams/iter/push');
15+
const {
16+
validateObject,
17+
} = require('internal/validators');
1518

1619
/**
1720
* Create a pair of connected duplex channels for bidirectional communication.
1821
* @param {{ highWaterMark?: number, backpressure?: string, signal?: AbortSignal,
1922
* a?: object, b?: object }} [options]
2023
* @returns {[DuplexChannel, DuplexChannel]}
2124
*/
22-
function duplex(options) {
23-
const { highWaterMark, backpressure, signal, a, b } = options ?? {};
25+
function duplex(options = { __proto__: null }) {
26+
validateObject(options, 'options');
27+
const { highWaterMark, backpressure, signal, a, b } = options;
28+
if (a !== undefined) {
29+
validateObject(a, 'options.a');
30+
}
31+
if (b !== undefined) {
32+
validateObject(b, 'options.b');
33+
}
2434

2535
// Channel A writes to B's readable (A->B direction)
2636
const { writer: aWriter, readable: bReadable } = push({

lib/internal/streams/iter/pull.js

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
},
2424
} = require('internal/errors');
2525
const { isError, lazyDOMException } = require('internal/util');
26+
const { validateAbortSignal } = require('internal/validators');
2627
const {
2728
isPromise,
2829
isUint8Array,
@@ -39,6 +40,7 @@ const {
3940

4041
const {
4142
isPullOptions,
43+
isTransform,
4244
parsePullArgs,
4345
toUint8Array,
4446
} = require('internal/streams/iter/utils');
@@ -102,9 +104,18 @@ function parsePipeToArgs(args) {
102104
throw new ERR_INVALID_ARG_TYPE('writer', 'object with a write method', writer);
103105
}
104106

107+
const transforms = ArrayPrototypeSlice(args, 0, writerIndex);
108+
for (let i = 0; i < transforms.length; i++) {
109+
if (!isTransform(transforms[i])) {
110+
throw new ERR_INVALID_ARG_TYPE(
111+
`transforms[${i}]`, ['Function', 'Object with transform()'],
112+
transforms[i]);
113+
}
114+
}
115+
105116
return {
106117
__proto__: null,
107-
transforms: ArrayPrototypeSlice(args, 0, writerIndex),
118+
transforms,
108119
writer,
109120
options,
110121
};
@@ -534,6 +545,13 @@ async function* createAsyncPipeline(source, transforms, signal) {
534545
* @returns {Iterable<Uint8Array[]>}
535546
*/
536547
function pullSync(source, ...transforms) {
548+
for (let i = 0; i < transforms.length; i++) {
549+
if (!isTransform(transforms[i])) {
550+
throw new ERR_INVALID_ARG_TYPE(
551+
`transforms[${i}]`, ['Function', 'Object with transform()'],
552+
transforms[i]);
553+
}
554+
}
537555
return {
538556
__proto__: null,
539557
*[SymbolIterator]() {
@@ -551,6 +569,9 @@ function pullSync(source, ...transforms) {
551569
*/
552570
function pull(source, ...args) {
553571
const { transforms, options } = parsePullArgs(args);
572+
if (options?.signal !== undefined) {
573+
validateAbortSignal(options.signal, 'options.signal');
574+
}
554575

555576
return {
556577
__proto__: null,
@@ -626,6 +647,9 @@ function pipeToSync(source, ...args) {
626647
*/
627648
async function pipeTo(source, ...args) {
628649
const { transforms, writer, options } = parsePipeToArgs(args);
650+
if (options?.signal !== undefined) {
651+
validateAbortSignal(options.signal, 'options.signal');
652+
}
629653

630654
// Handle transform-writer
631655
const finalTransforms = ArrayPrototypeSlice(transforms);

0 commit comments

Comments
 (0)