Node.js Stream

流是 Node 中的一个核心模块,它能够优化对于文件或数据处理的内存优化与流程优化。本文是对与 Stream 的学习笔记。

为什么要使用 Stream

使用 fs 模块搭建的文件服务器

import http from 'http'

http.createServer((req, res) => {
  fs.readFile('./bigfile.txt', (err, file) => {
    if (err) {
        res.end(err);
    }
    res.end(file);
  });
}).listen(port)

这是一个简单的静态文件服务器,但是它存在一些问题,fs 在读取文件时会将文件全部内容读取到内存,如果在文件很大的情况下或者并发量很大的情况下服务器可能就会崩溃,所以使用 Stream 来建立服务器可以提高服务器的稳定性和高性能。

使用 Stream

import http from 'http'
import stream from 'stream'

http.createServer((req, res) => {
  const readable = fs.createReadStream('./bigfile.txt');
  readable.pipe(res)
}).listen(port)

Stream 允许数据以连续被读取和写入,这种机制特别适用于处理大量数据,因为它不需要一次性将所有数据加载到内存中。

Stream 使用了生产者和消费者的模型。有以下几个重要的概念

生产者(Producer)

生产者是指创建数据并将数据推送到流中的实体。在 Node.js 中生产者可以是任何向可写流写入数据的源。例如一个文件读取操作可以是生产者,它从文件中读取数据并推送到可写流。

消费者(Consumer)

消费者是指从流中读取并处理数据的实体。在 Node.js 中 它可以是任何监听可读流 data 事件的代码部分。

背压(Backpressure)

背压是流中的一个重要的概念。指的是当消费者无法跟上生产者的数据生产速度时,生产者需要减慢或者暂停数据的生产,以避免内存溢出或丢失。Node.js 默认实现了背压机制,当可读流数据没有被及时消费时,生产者会自动暂停数据的生产。

Stream 的四种类型

  • 可读流 Readable
  • 可写流 Writable
  • 双工流 Duplex
  • 转化流 Transform

Duplex 流,它的特点在于既可读又可写,但是写入的数据和读取的数据之间是没有关联的,就是说通过这个流写入的数据并不应该通过这个流的读取方法拿到,例如 TCP Socket 就是一个 Duplex,读与写的数据之间没有关联。

Transform 流,这像是一个特殊的 Duplex 流,写入的数据经过转化之后可以变为读取数据的源头被下游直接使用,例如压缩 zlib.createGzip。

<!-- ![image-20240704171238232](C:\Users\chaihuibin\AppData\Roaming\Typora\typora-user-images\image-20240704171238232.png) -->

可读流的两种模式

  • 流动模式
  • 暂停模式
import fs from 'fs';
const readStream = fs.createReadStream('./small.txt');

// 流动模式 null、ture、false
console.log(readStream._readableState.flowing)

初始的状态是 null 流动模式下为 true 暂停为 false

  • 当添加了 data 事件或者调用 pipe 方法都可以让可读流变为流动模式。

  • 通过 readStream.pause() 使可读流变为暂停模式

  • 通过 readStream.resume() 使可读流恢复流动模式

  • 也可以通过移除 data 事件的监听,或者调用 unpipe 方法使可读流变为暂停模式

我们可以利用 Stream 模块去实现一个满足足迹需求的流

Readable

实现 _read 方法,通过 push 将数据推入可读流的缓冲区供下游读取

class MyReadAble extends stream.Readable {
  count: number;
  constructor(options?: stream.ReadableOptions) {
    super(options)
    this.count = 0
  }

  _read(size: number) {
    if (this.count === 9) {
      this.push(null);
    } else {
      this.push(this.count.toString());
      this.count++;
    }
  }
}

Writable

实现 _write 方法,如果 _write 在函数中调用了这个 callback 参数说明传入的数据已经做了处理,写入到底层了,可以继续写入数据了,而这个方法可以通过同步调用,也可以通过异步来调用。

class MyWriteAble extends stream.Writable {

  constructor(opts?: stream.WritableOptions) {
    super(opts)
  }

  _write(chunk: any, encoding: BufferEncoding, callback: (error?: Error) => void): void {
    console.log('write data:', chunk.toString())
    process.nextTick(callback)
  }
}

Duplex

实现 _read 和 _write 方法

class MyDuplex extends stream.Duplex {
  data: any[]
  constructor(opts: stream.DuplexOptions) {
    super(opts)
    this.data = []
  }

  _read(size: number): void {
    if (!this.data.length) {
      this.push(null);
    } else {
      this.push(this.data.pop());
    }
  }

  _write(chunk: any, encoding: BufferEncoding, callback: (error?: Error) => void): void {
    console.log('write data:', chunk.toString())
    process.nextTick(callback)
  }
}

Transform

只需要实现 _transform 方法,类似于 _write 在这里对数据进行转换

class MyTransform extends stream.Transform {
  constructor(opts?: stream.TransformOptions) {
    super(opts)
  }

  _transform(chunk: any, encoding: BufferEncoding, callback: stream.TransformCallback): void {
    console.log('transform data:', chunk.toString())
    this.push(chunk.toString() + ' world')
    process.nextTick(callback)
  }

  _flush(callback: stream.TransformCallback): void {
    process.nextTick(callback)
  }
}

ObjectMode

流除了 Buffer String 这样的数据类型,还可以处理复杂的对象类型,只需要在选项中加入 ObjectMode: true 。

在 JavaScript 中,万物皆对象,因此有了 ObjectMode 流就可以处理几乎所有数据。

let source = ['1', '2', { a: 'test', b: 'stream' }, function() {}];
const read = new stream.Readable({ 
    objectMode: true,
    read: function(size) {
        if (source.length) {
            this.push(source.pop());
        } else {
            this.push(null);
        }
    }
});

read.on('data', (buf) => {
    console.log(typeof buf);
});