@@ -149,7 +149,7 @@ const { Readable } = require('stream');
149
149
150
150
class MyReadable extends Readable {
151
151
_read () {
152
-
152
+ console . log ( ' 调用了 _read 函数 ' );
153
153
}
154
154
}
155
155
@@ -164,6 +164,8 @@ for (let i = 0; i < dataLen; i++) {
164
164
const pushResult = reader .push (Buffer .from (data[i]));
165
165
if (! pushResult) {
166
166
console .warn (' 达到highWater值了,最好不要再 push 了' , i);
167
+ } else {
168
+ console .log (' 没有达到highWater值' , i);
167
169
}
168
170
}
169
171
for (let i = 0 ; i < 3 ; i++ ) {
@@ -185,10 +187,12 @@ process.on('uncaughtException', (err) => {
185
187
186
188
** 代码 3.4.2.1.1 chapter3/stream/read_simple.js**
187
189
188
- 上述代码中创建了一个简单的可读流。可读流提供了两种读取模式,flow 模式和 no-flow 模式,可读流有一个 ` readableFlowing ` 属性,默认为 ` null ` 。从上述代码的输出中也可以发现,在没有做任何函数调用的情况下,可读流的 ` readableFlowing ` 为 ` null ` 。 我们将 reader 对象的 highWaterMark 值设置为 4,所以在下面的 for 循环中 i 为 4 的时候,就打印 ` 达到highWater值了,最好不要再 push 了 ` 的警告。可读流的 push 函数可以接受一个特殊值,就是 null 值,调用 push(null) 后,整个流就处于结束状态,不允许调用者再次调用 push 函数,否则会直接抛出 error 事件。
190
+ 我们将 reader 对象的 highWaterMark 值设置为 4,所以在下面的 for 循环中 i 为 4 的时候,就打印 ` 达到highWater值了,最好不要再 push 了 ` 的警告。可读流的 push 函数可以接受一个特殊值,就是 null 值,调用 push(null) 后,整个流就处于结束状态,不允许调用者再次调用 push 函数,否则会直接抛出 error 事件。
189
191
190
192
> Node 中对于对象的 error 事件一定要添加事件监听回调函数,否则从对象中抛出的 error 事件,会外溢到进程级别的 uncaughtException 事件,而一旦触发 uncaughtException 事件,进程默认就会退出。
191
193
194
+ 上述代码中创建了一个简单的可读流。可读流提供了两种读取模式,flow 模式和 no-flow 模式,可读流有一个 ` readableFlowing ` 属性,默认为 ` null ` 。从上述代码的输出中也可以发现,在没有做任何函数调用的情况下,可读流的 ` readableFlowing ` 为 ` null ` 。
195
+
192
196
如果给可读流对象增加 ` data ` 事件监听、调用函数 ` resume ` / ` pipe ` ,将会使用可读流进入 flow 模式,此时 ` readableFlowing ` 会被置为 true。调用 ` pause ` / ` unpipe ` 函数或者添加 ` readable ` 事件监听会将可读流切换到 no-flow 模式,并且将 ` readableFlowing ` 置为 false,这个时候必须手动调用函数 ` resume ` / ` pipe ` 才能将其切换回 flow 模式,如果在这种情况下添加 ` data ` 事件是无法切换为 flow 模式的。
193
197
194
198
注意,如果你同时给可读流添加了 ` readable ` 和 ` data ` 的事件,则 ` readable ` 的优先级高于 ` data ` ,流将回进入 no-flow 模式。当你将 ` readable ` 事件移出,只保留 ` data ` 事件时,则回到 flow 模式。同时需要注意到,添加了 ` readable ` 事件后,调用 ` pause ` ` resume ` 这两个函数是没有意义的。
@@ -261,11 +265,74 @@ readStream
261
265
262
266
** 代码 3.4.2.2.1**
263
267
264
- 前面讲我们只能将可读流 pipe 到可写流里面,但是这个中间 gzipStream 是什么鬼,从 readStream 角度看它是可写流,从 writeStream 的角度看它又是可读流,解开其神秘面纱的关键就是
268
+ 前面讲我们只能将可读流 pipe 到可写流里面,但是这个中间 gzipStream 是什么鬼,从 readStream 角度看它是可写流,从 writeStream 的角度看它又是可读流,解开其神秘面纱的关键就是 ` Transform ` 。它支持以可写流的身份接收从别处写入的数据,经过其加工后,再放置到内置的可写流中。
269
+
270
+ 下面是一个我们自己构建的自定义 Transform 类
271
+
272
+ ``` javascript
273
+ const { Transform , Readable , Writable } = require (' stream' );
274
+ class InputStream extends Readable {
275
+ _read () {
276
+ //
277
+ }
278
+ }
279
+ class OutputStream extends Writable {
280
+ constructor (options ) {
281
+ super (options);
282
+ this .data = [];// 调试用
283
+ }
284
+
285
+ _write (chunk , encoding , callback ) {
286
+ this .data .push (chunk);
287
+ callback ();
288
+ }
289
+ }
290
+ class MyTransform extends Transform {
291
+ _transform (chunk , encoding , callback ) {
292
+ const filterData = [];
293
+ for (const byte of chunk) {
294
+ if (byte % 2 === 0 ) {// 挑选出偶数
295
+ filterData .push (byte);
296
+ }
297
+ }
298
+ if (filterData .length > 0 ) {
299
+ this .push (Buffer .from (filterData));// 将数据写入可读流
300
+ }
301
+ callback ();// 不要忘记调用 callback 函数
302
+ }
303
+ }
304
+ const inputStream = new InputStream ();
305
+ const outputStream = new OutputStream ();
306
+ const myTransform = new MyTransform ();
307
+ inputStream .pipe (myTransform).pipe (outputStream);
308
+ const count = 6 ;
309
+ function readData (i ) {
310
+ if (i < count) {
311
+ inputStream .push (Buffer .from ([i & 0xff ]));
312
+ setTimeout (() => {
313
+ readData (i + 1 );
314
+ }, 100 );
315
+ } else {
316
+ inputStream .push (null );
317
+ }
318
+ }
319
+ readData (0 );
320
+
321
+ myTransform .on (' data' , function (data ) {
322
+ console .log (' transform 得到的转化数据' , data);
323
+ });
324
+ outputStream .on (' finish' , function () {
325
+ console .log (' 流写结束了' );
326
+ console .log (outputStream .data );
327
+ });
328
+ ```
329
+
330
+ ** 代码 3.4.2.2.2 chapter3/stream/transform_simple.js**
265
331
332
+ 上述代码中我们通过可读流 inputStream 来采集数据的数字, myTransfrom 转化的函数中将采集到的数字筛选出偶数来,最终可写流 outputStream 拿到最终转化的数字。
266
333
## 3.5 TCP
267
334
268
- 之所以将 TCP 的内容放到流教程的后面,是由于 TCP 中的 [ net.Socket ] ( https://nodejs.org/dist/latest-v12.x/docs/ api/net .html#net_class_net_socket ) 类就是继承自 [ stream.Duplex ] ( https://nodejs.org/dist/latest-v12.x/docs/api/stream .html#stream_class_stream_duplex ) 类。上一节中没有讲 ` Duplex ` 类,它相当于将可读流和可写流的功能合二为一,不过其内部读写的数据是分别存储在两个缓冲区中,不相互影响;于其对应的是类 [ stream.Transform ] ( https://nodejs.org/dist/latest-v12.x/docs/api/stream.html#stream_class_stream_transform ) ,它也是将可读流和可写流的功能合二为一,不过其内部读写流的缓冲区是共享的 。
335
+ Transform 流其实内部继承自 [ ` Duplex ` ] ( https://nodejs.org/api/stream .html#class-streamduplex ) ,Duplex 内部同时拥有可读流和可写流,但是跟 Transform 不同的是,Duplex 其内部读写的数据是分别存储在两个缓冲区中,两者没有关联关系,相互独立。来自TCP 中的 [ net.Socket ] ( https://nodejs.org/dist/latest-v12.x/docs/api/net .html#net_class_net_socket ) 类就是继承自 [ stream.Duplex ] ( https://nodejs.org/dist/latest-v12.x/docs/api/stream.html#stream_class_stream_duplex ) 类 。
269
336
270
337
TCP 属于传输层协议,大家都对 HTTP 的服务编写比较熟悉,我们可以通过 API 方便的发送请求、接收响应数据。但是你发送的 HTTP 请求时,在 API 底层要封装成符合 HTTP 协议的请求数据数据包,对端在接收响应后,也是 API 底层帮我们把 HTTP 数据包解析出来,抛到应用层。HTTP 1.x 时代,只能通过客户端发送请求来触发通信流动,服务器端不能主动给客户端发送数据,如果想实现全双工的通信,就直接的就是借助 TCP 层面的协议。
271
338
0 commit comments