开发手册 欢迎您!
软件开发者资料库

流(Stream) | Node.js

Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。Node.js 使用了一个事件驱动、非阻塞式 I/O 的模型,使其轻量又高效。Node.js 的包管理器 npm,是全球最大的开源库生态系统。

Node.js v8.x 中文文档


目录

stream (流)#

稳定性: 2 - 稳定的

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如,HTTP 请求process.stdout就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是EventEmitter 的实例。

stream 模块可以通过以下方式引入:

const stream = require('stream');

尽管所有的 Node.js 用户都应该理解流的工作方式,这点很重要,但是 stream 模块本身只对于那些需要创建新的流的实例的开发者最有用处。 对于主要是 消费 流的开发者来说,他们很少(如果有的话)需要直接使用 stream 模块。

本文档的组织#

本文档主要分为两节,第三节是一些额外的注意事项。第一节阐述了在应用中和 使用 流相关的 API 。 第二节阐述了和 实现 新的流类型相关的 API 。

流的类型#

Node.js 中有四种基本的流类型:

对象模式#

所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer(或 Uint8Array)对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。

在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。

缓冲#

WritableReadable 流都会将数据存储到内部的缓冲器(buffer)中。这些缓冲器可以通过相应的 writable._writableState.getBuffer()readable._readableState.buffer 来获取。

缓冲器的大小取决于传递给流构造函数的 highWaterMark 选项。对于普通的流, highWaterMark 选项指定了总共的字节数。对于工作在对象模式的流,highWaterMark 指定了对象的总数。

当可读流的实现调用 stream.push(chunk) 方法时,数据被放到缓冲器中。如果流的消费者没有调用 stream.read() 方法, 这些数据会始终存在于内部队列中,直到被消费。

当内部可读缓冲器的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前缓冲器的数据被消费 (也就是说,流会在内部停止调用 readable._read() 来填充可读缓冲器)。

可写流通过反复调用writable.write(chunk) 方法将数据放到缓冲器。当内部可写缓冲器的总大小小于highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓冲器的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false

stream API 的关键目标, 尤其对于 stream.pipe() 方法,就是限制缓冲器数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。

DuplexTransform 都是可读写的。在内部,它们都维护了 两个 相互独立的缓冲器用于读和写。在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。例如, net.Socket 就是 Duplex 的实例,它的可读端可以消费从套接字(socket)中接收的数据, 可写端则可以将数据写入到套接字。由于数据写入到套接字中的速度可能比从套接字接收数据的速度快或者慢,在读写两端使用独立缓冲器,并进行独立操作就显得很重要了。

流消费者的 API#

几乎所有的 Node.js 应用,不管多么简单,都在某种程度上使用了流。下面是在 Node.js 应用中使用流实现的一个简单的 HTTP 服务器:

const http = require('http');const server = http.createServer((req, res) => {  // req 是 http.IncomingMessage 的实例,这是一个 Readable Stream  // res 是 http.ServerResponse 的实例,这是一个 Writable Stream  let body = '';  // 接收数据为 utf8 字符串,  // 如果没有设置字符编码,将接收到 Buffer 对象。  req.setEncoding('utf8');  // 如果监听了 'data' 事件,Readable streams 触发 'data' 事件   req.on('data', (chunk) => {    body += chunk;  });  // end 事件表明整个 body 都接收完毕了   req.on('end', () => {    try {      const data = JSON.parse(body);      // 发送一些信息给用户      res.write(typeof data);      res.end();    } catch (er) {      // json 数据解析失败       res.statusCode = 400;      return res.end(`error: ${er.message}`);    }  });});server.listen(1337);// $ curl localhost:1337 -d "{}"// object// $ curl localhost:1337 -d "\"foo\""// string// $ curl localhost:1337 -d "not json"// error: Unexpected token o in JSON at position 1

Writable 流 (比如例子中的 res) 暴露了一些方法,比如write()end() 。这些方法可以将数据写入到流中。

当流中的数据可以读取时,Readable 流使用 EventEmitter API 来通知应用。这些数据可以使用多种方法从流中读取。

WritableReadable 流都使用了 EventEmitter API ,通过多种方式,与流的当前状态进行交互。

DuplexTransform 都是同时满足 WritableReadable

对于只是简单写入数据到流和从流中消费数据的应用来说,不要求直接实现流接口,通常也不需要调用 require('stream')

需要实现两种类型流的开发者可以参考 API for Stream Implementers

可写流#

可写流是对数据写入'目的地'的一种抽象。

Writable 的例子包括了:

注意: 上面的某些例子事实上是 Duplex 流,只是实现了 Writable 接口。

所有 Writable 流都实现了stream.Writable 类定义的接口。

尽管特定的 Writable 流的实现可能略有差别,所有的 Writable streams 都可以按一种基本模式进行使用,如下面例子所示:

const myStream = getWritableStreamSomehow();myStream.write('some data');myStream.write('some more data');myStream.end('done writing data');

stream.Writable 类#

'close' 事件#

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有可写流都会触发 'close' 事件。

'drain' 事件#

如果调用 stream.write(chunk) 方法返回 false'drain' 事件会在适合恢复写入数据到流的时候触发。

// 向可写流中写入数据一百万次。// 需要注意背压 (back-pressure)。function writeOneMillionTimes(writer, data, encoding, callback) {  let i = 1000000;  write();  function write() {    let ok = true;    do {      i--;      if (i === 0) {        // 最后 一次        writer.write(data, encoding, callback);      } else {        // 检查是否可以继续写入。         // 这里不要传递 callback, 因为写入还没有结束!         ok = writer.write(data, encoding);      }    } while (i > 0 && ok);    if (i > 0) {      // 不得不提前停下!      // 当 'drain' 事件触发后继续写入        writer.once('drain', write);    }  }}
'error' 事件#

'error' 事件在写入数据出错或者使用管道出错时触发。事件发生时,回调函数仅会接收到一个 Error 参数。

注意: 'error' 事件发生时,流并不会关闭。

'finish' 事件#

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统(underlying system)之后, 'finish' 事件将被触发。

const writer = getWritableStreamSomehow();for (let i = 0; i < 100; i++) {  writer.write(`hello, #${i}!\n`);}writer.end('This is the end\n');writer.on('finish', () => {  console.error('All writes are now complete.');});
'pipe' 事件#
  • src 输出到目标可写流(writable)的源流(source stream)

在可读流(readable stream)上调用 stream.pipe() 方法,并在目标流向 (destinations) 中添加当前可写流 ( writable ) 时,将会在可写流上触发 'pipe' 事件。

const writer = getWritableStreamSomehow();const reader = getReadableStreamSomehow();writer.on('pipe', (src) => {  console.error('something is piping into the writer');  assert.equal(src, reader);});reader.pipe(writer);
'unpipe' 事件#
  • src unpiped 当前可写流的源流

Readable 上调用 stream.unpipe() 方法,从目标流向中移除当前 Writable 时,将会触发 'unpipe' 事件。

const writer = getWritableStreamSomehow();const reader = getReadableStreamSomehow();writer.on('unpipe', (src) => {  console.error('Something has stopped piping into the writer.');  assert.equal(src, reader);});reader.pipe(writer);reader.unpipe(writer);
writable.cork()#

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。直到调用 stream.uncork()stream.end() 方法时,缓冲区里的数据才会被输出。

在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internalbuffer)可能失效,从而导致性能下降。writable.cork() 方法主要就是用来避免这种情况。 对于这种情况,实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率。

也可查看 writable.uncork()

writable.end([chunk][, encoding][, callback])#
  • chunk | | | 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null
  • encoding 如果 chunk 是字符串,这里指定字符编码。
  • callback 可选的,流结束时的回调函数。

调用 writable.end() 方法表明接下来没有数据要被写入 Writable。通过传入可选的 chunkencoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。

在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

// 写入 'hello, ' ,并用 'world!' 来结束写入const file = fs.createWriteStream('example.txt');file.write('hello, ');file.end('world!');// 后面不允许再写入数据!
writable.setDefaultEncoding(encoding)#
  • encoding 新的默认编码
  • 返回: this

writable.setDefaultEncoding() 用于为 Writable 设置 encoding

writable.uncork()#

writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

如果使用 writable.cork()writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理。

stream.cork();stream.write('some ');stream.write('data ');process.nextTick(() => stream.uncork());

如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据。

stream.cork();stream.write('some ');stream.cork();stream.write('data ');process.nextTick(() => {  stream.uncork();  // 之前的数据只有在 uncork() 被二次调用后才会输出  stream.uncork();});

也可查看 writable.cork()

writable.writableHighWaterMark#

返回构造该可写流时传入的 highWaterMark 参数值。

writable.write(chunk[, encoding][, callback])#
  • chunk | | | 要写入的数据。可选的。对于非对象模式下的流, chunk 必须是字符串, Buffer 或者Uint8Array。对于对象模式下的流,chunk 可以是除 null 外的任意 JavaScript 值。
  • encoding 如果 chunk 是字符串,这里指定字符编码
  • callback 缓冲数据输出时的回调函数
  • 返回: 如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true

writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback 不一定 以这个错误作为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听'error' 事件。

在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。如果返回值为 false ,应该停止向流中写入数据,直到 'drain' 事件被触发。

当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发。我们建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。 然而,当流不处在 'drain' 状态时, 调用 write() 是被允许的, Node.js 会缓存所有已经写入的数据块, 直到达到最大内存占用, 这时它会无条件中止。 甚至在它中止之前, 高内存占用将会导致差的垃圾回收器的性能和高的系统相对敏感性(即使内存不再需要,也通常不会被释放回系统)。 如果远程的另一端没有读取数据, TCP sockets 可能永远也不会 drain , 所以写入到一个不会drain的socket可能会导致远程可利用的漏洞。

对于一个 Transform, 写入数据到一个不会drain的流尤其成问题, 因为 Transform 流默认被暂停, 直到它们被pipe或者被添加了 'data''readable' 事件处理函数。

如果将要被写入的数据可以根据需要生成或者取得,我们建议将逻辑封装为一个 Readable 流并且使用 stream.pipe()。 但是如果调用 write() 优先, 那么可以使用 'drain' 事件来防止回压并且避免内存问题:

function write(data, cb) {  if (!stream.write(data)) {    stream.once('drain', cb);  } else {    process.nextTick(cb);  }}// 在回调函数被执行后再进行其他的写入write('hello', () => {  console.log('write completed, do more writes now');});

对象模式的写入流将忽略 encoding 参数。

writable.destroy([error])#
  • 返回: this

摧毁这个流,并发出传过来的错误。当这个函数被调用后,这个写入流就结束了。使用者不应该重写这个函数,而是重写 writable._destroy

可读流#

可读流(Readable streams)是对提供数据的 源头 (source)的抽象。

可读流的例子包括:

所有的 Readable 都实现了stream.Readable 类定义的接口。

两种模式#

可读流事实上工作在下面两种模式之一:flowing 和 paused 。

在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。

在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。

所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing模式:

可读流可以通过下面途径切换到 paused 模式:

  • 如果不存在管道目标(pipe destination),可以通过调用stream.pause() 方法实现。
  • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

这里需要记住的重要概念就是,可读流需要先为其提供消费或忽略数据的机制,才能开始提供数据。如果消费机制被禁用或取消,可读流将 尝试停止生成数据。

注意: 为了向后兼容,取消 'data' 事件监听并 不会 自动将流暂停。同时,如果存在管道目标(pipe destination),且目标状态变为可以接收数据(drain and ask formore data),调用了 stream.pause() 方法也并不保证流会一直 保持 暂停状态。

注意: 如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

三种状态#

可读流的“两种操作模式”是一种简单抽象。它抽象了在可读流实现(Readable stream implementation)内部发生的复杂的状态管理过程。

在任意时刻,任意可读流应确切处于下面三种状态之一:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

readable._readableState.flowingnull,由于不存在数据消费者,可读流将不会产生数据。在这个状态下,监听 'data' 事件,调用 readable.pipe()方法,或者调用 readable.resume() 方法,readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。

调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure),将导致 readable._readableState.flowing 值变为 false。这将暂停事件流,但 不会 暂停数据生成。在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true

const { PassThrough, Writable } = require('stream');const pass = new PassThrough();const writable = new Writable();pass.pipe(writable);pass.unpipe(writable);// flowing 现在为 falsepass.on('data', (chunk) => { console.log(chunk.toString()); });pass.write('ok'); // 不会触发 'data' 事件pass.resume(); // 只有被调用了才会触发 'data' 事件

readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。

选择一种#

可读流 API 的演化贯穿了多个 Node.js 版本,提供了多种方法来消费流数据。通常开发者应该选择其中 一种 来消费数据,而 不应该 在单个流使用多种方法来消费数据。

对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现。开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitterreadable.pause()/readable.resume() 提供的 API 。

stream.Readable 类#

'close' 事件#

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有 Readable 都会触发 'close' 事件。

'data' 事件#
  • chunk | | 数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。

'data' 事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe()readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。

在没有明确暂停的流上添加 'data' 事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。

如果调用 readable.setEncoding() 方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个Buffer 实例。

const readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log(`Received ${chunk.length} bytes of data.`);});
'end' 事件#

'end' 事件将在流中再没有数据可供消费时触发。

注意'end' 事件只有在数据被完全消费后 才会触发 。 可以在数据被完全消费后,通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

const readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log(`Received ${chunk.length} bytes of data.`);});readable.on('end', () => {  console.log('There will be no more data.');});
'error' 事件#

'error' 事件可以在任何时候在可读流实现(Readable implementation)上触发。通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。

回调函数将接收到一个 Error 对象。

'readable' 事件#

'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。

const readable = getReadableStreamSomehow();readable.on('readable', () => {  // 有一些数据可读了});

当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。

事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回null。 例如,下面的例子中的 foo.txt 是一个空文件:

const fs = require('fs');const rr = fs.createReadStream('foo.txt');rr.on('readable', () => {  console.log(`readable: ${rr.read()}`);});rr.on('end', () => {  console.log('end');});

上面脚本的输出如下:

$ node test.jsreadable: nullend

注意: 通常情况下,readable.pipe() 方法和 'data' 事件机制比 'readable' 事件更容易理解。然而处理 'readable'事件可能造成吞吐量升高。

readable.isPaused()#
  • 返回:

readable.isPaused() 方法返回可读流的当前操作状态。 该方法主要是在readable.pipe() 方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。

const readable = new stream.Readable();readable.isPaused(); // === falsereadable.pause();readable.isPaused(); // === truereadable.resume();readable.isPaused(); // === false
readable.pause()#
  • 返回: this

readable.pause() 方法将会使 flowing 模式的流停止触发 'data' 事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

const readable = getReadableStreamSomehow();readable.on('data', (chunk) => {  console.log(`Received ${chunk.length} bytes of data.`);  readable.pause();  console.log('There will be no additional data for 1 second.');  setTimeout(() => {    console.log('Now data will start flowing again.');    readable.resume();  }, 1000);});
readable.pipe(destination[, options])#