通过异步迭代简化Node.js流程

luochicun Web安全 2019年12月7日发布
Favorite收藏

导语:如果我们使用异步迭代,那么使用Node.js流程将更加高效。

如果我们使用异步迭代,那么使用Node.js流程将更加高效。

异步迭代和异步生成器

异步迭代是用于异步检索数据容器内容的协议,这也意味着当前“任务”可以在检索项目之前被暂停。

异步生成器有助于异步迭代,如下所示,就是一个异步生成器函数:

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}

for-await-of循环遍历输入asyncIterable,这个循环在普通的异步函数中也可用。另外,yield将值输入到此生成器返回的异步迭代中。

接下里,请密切关注以下函数是异步函数还是异步生成器函数:

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

Node.js支持多种流程,例如:

1.可读流程(Readable stream)是我们可以从中读取数据的流程,换句话说,它们是数据的源头。比如可读的文件流程,它允许我们读取文件的内容。

2.可写流程(Writable stream)是我们可以写入数据的流程,换句话说,它们是数据的接收器。比如可写的文件流程,它允许我们将数据写入文件。

3.转换流程(transform stream)既可读又可写,作为可写流程时,它接收数据片段,对其进行转换,更改或删除它们,然后将它们作为可读流程输出。

流水线技术(Pipelining)

计算机中的流水线是把一个重复的过程分解为若干个子过程,每个子过程与其他子过程并行进行。由于这种工作方式与工厂中的生产流水线十分相似, 因此称为流水线技术。从本质上讲,流水线技术是一种时间并行技术。

要在多个步骤中处理流程数据,即可使用流水线技术:

1.通过可读的流程接收输入;

2.每个处理步骤都是通过转换流程执行的;

3.对于最后一个处理步骤,我们有两个选项:

3.1我们可以将最近的可读流程中的数据写入可写流程,也就是说,可写流程是流水线上的最后一个进程。

3.2我们可以以其他方式处理最近的可读流程中的数据;

其中,第2个选项是可选的。

文本编码

当创建文本流程时,最好一直指定一个编码:

Node.js文档有一个支持编码和默认拼写的列表,如下所示:

'utf8'
'utf16le'
'base64'

也可以使用一些不同的拼写,你可以使用Buffer.isEncoding()来检查哪些是:

> buffer.Buffer.isEncoding('utf8')
true
> buffer.Buffer.isEncoding('utf-8')
true
> buffer.Buffer.isEncoding('UTF-8')
true
> buffer.Buffer.isEncoding('UTF:8')
false

编码的默认值是null,它相当于'utf8'。

辅助函数:readableToString()  

有时候,我们会使用以下辅助函数。不过在本文中,你不需要具体了解它是如何工作的,大致了解既可。

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

这个函数是通过基于事件的API实现的,稍后我们将看到一种更简单的方法,即通过异步迭代方法。

在这篇文章中,我们将只使用文本流程。

在这些示例中,我们偶尔会遇到在顶层使用await的情况。在本文的示例中,我们假设我们在模块内部或异步函数的主体内部。

每当有换行符时,我们都会使用以下函数:

Unix: '\n' (LF)
Windows: '\r\n' (CR LF)

可以通过模块os中的常量EOL访问当前平台的换行符

可读的流程

创建可读的流程

可以从文件中创建可读的流程,具体来说,我们可以使用fs.createReadStream()来创建可读的流程:

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');

Readable.from():通过可迭代器创建可读流程

静态方法readable .from(iterable, options?)可以创建一个可读的流程,该流程包含可迭代器中包含的数据。可迭代可以是同步迭代的,也可以是异步迭代的。参数选项是可选的,还可以用于指定文本编码。

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');

通过字符串创建可读的流程

Readable.from() 接受任何可迭代的对象,同时也可以用来将字符串转换成流程:

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

这样,Readable.from()可将其他可迭代器视为字符串,在其代码点上进行迭代。虽然这在迭代性能方面并不理想,但在大多数情况下应该是够用了。我希望Readable.from()要经常与字符串一起使用,所以它将来可能会有被优化。

通过for-await-of读取可读流程

每个可读的流程都是异步可迭代的,这意味着我们可以使用for-await-of循环来读取它的内容:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'

在字符串中收集可读流程的内容

下面的函数是我们在这篇文章开头看到的函数的一个更简单的重新实现过程:

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

注意,在本文的示例中,我们必须使用异步函数,因为我们希望返回一个Promise。

通过异步生成器转换可读流程

异步迭代提供了一个轻松的选择方式,将原本一个整体的流程处理流程分解成多个步骤:

1.输入进程便是一个可读的流程;

2.第一个转换是由异步生成器执行的,异步生成器在可读流程上迭代,并根据需要生成相应的结果。

3.我们可以选择使用更多的异步生成器来进一步转换。

4.最后,我们有几个选项来处理异步迭代返回的最后一个生成器:

4.1我们可以通过readable .from()将其转换为可读的流程(稍后可以通过流水线技术将其转换为可写的流程)。

4.2我们可以使用异步函数来处理它。

在下面的示例中,最后一步由async函数logLines()执行,该函数以可迭代的方式将项目记录到控制台。

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    previous += chunk;
    while (true) {
      const eolIndex = previous.indexOf('\n');
      if (eolIndex < 0) break;

      // line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
logLines(numberLines(chunksToLines(chunks)));

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

可写的流程

为文件创建可写的流程

我们可以使用fs.createWriteStream()来创建可写的流程:

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

写入可写流程

在本节中,我们将介绍三种将数据写入可写流程的方法:

1.通过.write()方法直接写入可写流程;

2.使用可读流程的.pipe()方法将其导入可写流程;

3.使用函数pipeline ()从模块流程将可读流程导入可写流程;

为了演示这些方法,我们会以三种不同的方式实现了同一个函数writeIterableToFile()。

在异步函数中写入可写流程

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() 的默认版本是基于回调的,但是可以通过util.promisify()将其转换为基于约定的版本(A行)。

为此,我们使用了以下两种模式:

1.在处理backpressure 时写入可写流程(B行):

if (!writable.write(chunk)) {
  await once(writable, 'drain');
}

2.关闭一个可写流程,并等待写入完成(C行):

writable.end();
await finished(writable);

流水线技术(可读、可写)

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

我们使用以下模式(A行):

await pipeline(readable, writable);

还有readable .prototype.pipe(),但是该方法有一个警告(如果可读对象发出错误,那么可写的就不会自动关闭),但stream.pipeline() 就没有这样的警告。

与流程相关的功能

模块操作系统:

const EOL: string(since 0.7.8)

包含当前平台使用的行尾字符序列。

模块缓冲区:

Buffer.isEncoding(encoding: string): boolean (since 0.9.1)

如果编码正确地为文本指定一个受支持的Node.js编码,则返回true。支持的编码包括:

'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'

模块流程:

Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any> (since 10.0.0)

可读流程是异步可迭代的,例如,你可以在asyc函数或异步生成器中使用For -await-of循环来遍历它们。

finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void> (since 10.0.0)

当读取/写入完成或出现错误时,返回的承诺将被解决。

这个承诺的版本是这样创建的:

const finished = util.promisify(stream.finished);
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>(since 10.0.0)

流程之间的流水线技术,当流水线技术完成或出现错误时,将解决返回的承诺。

这个承诺的版本是这样创建的:

const pipeline = util.promisify(stream.pipeline);
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable (since 12.3.0)

将迭代器转换为可读的流程:

interface ReadableOptions {
  highWaterMark?: number;
  encoding?: string;
  objectMode?: boolean;
  read?(this: Readable, size: number): void;
  destroy?(this: Readable, error: Error | null,
    callback: (error: Error | null) => void): void;
  autoDestroy?: boolean;
}

这些选项与可读构造函数的选项相同,并在此处进行了说明。

模块fs:

createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream (since 2.3.0)

该模块为创建可读的流程提供了更多的选择:

createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream(since 2.3.0)

通过选项.flags,你可以指定是否要写入还是要追加,并对文件在存在或不存在时发生的情况进行快速处理。

本文翻译自:https://2ality.com/2019/11/nodejs-streams-async-iteration.html#pipelining如若转载,请注明原文地址: https://beta.4hou.com/web/21955.html
点赞 1
  • 分享至
取消

感谢您的支持,我会继续努力的!

扫码支持

打开微信扫一扫后点击右上角即可分享哟

发表评论