diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index 9f843b642c49e0..e9ac279cc62917 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -1243,6 +1243,23 @@ Emitted when an error occurs during the processing of a stream on the client. Emitted when a stream is received on the client. +##### Event: `'http2.client.stream.bodyChunkSent'` + +* `stream` {ClientHttp2Stream} +* `writev` {boolean} +* `data` {Buffer | string | Buffer\[] | Object\[]} + * `chunk` {Buffer|string} + * `encoding` {string} +* `encoding` {string} + +Emitted when a chunk of the client stream body is being sent. + +##### Event: `'http2.client.stream.bodySent'` + +* `stream` {ClientHttp2Stream} + +Emitted after the client stream body has been fully sent. + ##### Event: `'http2.client.stream.close'` * `stream` {ClientHttp2Stream} diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 32e02aa3f6c640..1f4ef57593fe4f 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -190,6 +190,8 @@ const dc = require('diagnostics_channel'); const onClientStreamCreatedChannel = dc.channel('http2.client.stream.created'); const onClientStreamStartChannel = dc.channel('http2.client.stream.start'); const onClientStreamErrorChannel = dc.channel('http2.client.stream.error'); +const onClientStreamBodyChunkSentChannel = dc.channel('http2.client.stream.bodyChunkSent'); +const onClientStreamBodySentChannel = dc.channel('http2.client.stream.bodySent'); const onClientStreamFinishChannel = dc.channel('http2.client.stream.finish'); const onClientStreamCloseChannel = dc.channel('http2.client.stream.close'); const onServerStreamCreatedChannel = dc.channel('http2.server.stream.created'); @@ -2300,6 +2302,15 @@ class Http2Stream extends Duplex { req = writeGeneric(this, data, encoding, writeCallback); trackWriteState(this, req.bytes); + + if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodyChunkSentChannel.hasSubscribers) { + onClientStreamBodyChunkSentChannel.publish({ + stream: this, + writev, + data, + encoding, + }); + } } _write(data, encoding, cb) { @@ -2317,6 +2328,10 @@ class Http2Stream extends Duplex { } debugStreamObj(this, 'shutting down writable on _final'); ReflectApply(shutdownWritable, this, [cb]); + + if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodySentChannel.hasSubscribers) { + onClientStreamBodySentChannel.publish({ stream: this }); + } } _read(nread) { diff --git a/test/parallel/test-diagnostics-channel-http2-client-stream-body-multiple-buffers-and-strings.js b/test/parallel/test-diagnostics-channel-http2-client-stream-body-multiple-buffers-and-strings.js new file mode 100644 index 00000000000000..2985beb5cbe526 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http2-client-stream-body-multiple-buffers-and-strings.js @@ -0,0 +1,73 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +// This test ensures that the built-in HTTP/2 diagnostics channels are reporting +// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and +// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are +// being sent with multiple Buffers and strings. + +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http2 = require('http2'); +const { Duplex } = require('stream'); + +let bodyChunkSent = false; + +dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => { + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); + + assert.strictEqual(writev, true); + + assert.ok(Array.isArray(data)); + assert.strictEqual(data.length, 3); + + assert.strictEqual(data[0].chunk, 'héllo'); + assert.strictEqual(data[0].encoding, 'latin1'); + + assert.ok(Buffer.from('foo').equals(data[1].chunk)); + assert.strictEqual(data[1].encoding, 'buffer'); + + assert.ok(Buffer.from('bar').equals(data[2].chunk)); + assert.strictEqual(data[2].encoding, 'buffer'); + + assert.strictEqual(encoding, ''); + + bodyChunkSent = true; +})); + +dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => { + // 'http2.client.stream.bodyChunkSent' must run first. + assert.ok(bodyChunkSent); + + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); +})); + +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.respond({}, { endStream: true }); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' }); + stream.write('héllo', 'latin1'); + stream.write(Buffer.from('foo')); + stream.write(new TextEncoder().encode('bar')); + stream.end(); + + stream.on('response', common.mustCall(() => { + client.close(); + server.close(); + })); +}, 1)); diff --git a/test/parallel/test-diagnostics-channel-http2-client-stream-body-multiple-buffers.js b/test/parallel/test-diagnostics-channel-http2-client-stream-body-multiple-buffers.js new file mode 100644 index 00000000000000..ddb3a5ef001b2c --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http2-client-stream-body-multiple-buffers.js @@ -0,0 +1,66 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +// This test ensures that the built-in HTTP/2 diagnostics channels are reporting +// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and +// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are +// being sent with multiple Buffers. + +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http2 = require('http2'); +const { Duplex } = require('stream'); + +let bodyChunkSent = false; + +dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => { + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); + + assert.strictEqual(writev, true); + + assert.ok(Array.isArray(data)); + assert.strictEqual(data.length, 2); + + assert.ok(Buffer.from('foo').equals(data[0])); + assert.ok(Buffer.from('bar').equals(data[1])); + + assert.strictEqual(encoding, ''); + + bodyChunkSent = true; +})); + +dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => { + // 'http2.client.stream.bodyChunkSent' must run first. + assert.ok(bodyChunkSent); + + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); +})); + +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.respond({}, { endStream: true }); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' }); + stream.write(Buffer.from('foo')); + stream.write(Buffer.from('bar')); + stream.end(); + + stream.on('response', common.mustCall(() => { + client.close(); + server.close(); + })); +}, 1)); diff --git a/test/parallel/test-diagnostics-channel-http2-client-stream-body-no-chunks.js b/test/parallel/test-diagnostics-channel-http2-client-stream-body-no-chunks.js new file mode 100644 index 00000000000000..1053d79c5375b3 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http2-client-stream-body-no-chunks.js @@ -0,0 +1,42 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +// This test ensures that the built-in HTTP/2 diagnostics channels are reporting +// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and +// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are +// being sent with no chunks. + +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http2 = require('http2'); +const { Duplex } = require('stream'); + +dc.subscribe('http2.client.stream.bodyChunkSent', common.mustNotCall()); + +dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => { + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); +})); + +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.respond({}, { endStream: true }); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' }); + stream.end(); + + stream.on('response', common.mustCall(() => { + client.close(); + server.close(); + })); +}, 1)); diff --git a/test/parallel/test-diagnostics-channel-http2-client-stream-body-single-buffer.js b/test/parallel/test-diagnostics-channel-http2-client-stream-body-single-buffer.js new file mode 100644 index 00000000000000..43d02ce839b7d0 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http2-client-stream-body-single-buffer.js @@ -0,0 +1,59 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +// This test ensures that the built-in HTTP/2 diagnostics channels are reporting +// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and +// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are +// being sent with a single Buffer. + +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http2 = require('http2'); +const { Duplex } = require('stream'); + +let bodyChunkSent = false; + +dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => { + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); + + assert.strictEqual(writev, false); + assert.ok(Buffer.from('foo').equals(data)); + assert.strictEqual(encoding, 'buffer'); + + bodyChunkSent = true; +})); + +dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => { + // 'http2.client.stream.bodyChunkSent' must run first. + assert.ok(bodyChunkSent); + + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); +})); + +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.respond({}, { endStream: true }); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' }); + stream.write(Buffer.from('foo')); + stream.end(); + + stream.on('response', common.mustCall(() => { + client.close(); + server.close(); + })); +}, 1)); diff --git a/test/parallel/test-diagnostics-channel-http2-client-stream-body-single-string.js b/test/parallel/test-diagnostics-channel-http2-client-stream-body-single-string.js new file mode 100644 index 00000000000000..76d9b449871ebd --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http2-client-stream-body-single-string.js @@ -0,0 +1,59 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +// This test ensures that the built-in HTTP/2 diagnostics channels are reporting +// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and +// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are +// being sent with a single string. + +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http2 = require('http2'); +const { Duplex } = require('stream'); + +let bodyChunkSent = false; + +dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => { + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); + + assert.strictEqual(writev, false); + assert.strictEqual(data, 'foo'); + assert.strictEqual(encoding, 'utf8'); + + bodyChunkSent = true; +})); + +dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => { + // 'http2.client.stream.bodyChunkSent' must run first. + assert.ok(bodyChunkSent); + + // Since ClientHttp2Stream is not exported from any module, this just checks + // if the stream is an instance of Duplex. + assert.ok(stream instanceof Duplex); + assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream'); +})); + +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.respond({}, { endStream: true }); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' }); + stream.write('foo'); + stream.end(); + + stream.on('response', common.mustCall(() => { + client.close(); + server.close(); + })); +}, 1));