Shell-Skripting mit Node.js
Sie können die Offline-Version dieses Buches (HTML, PDF, EPUB, MOBI) kaufen und damit die kostenlose Online-Version unterstützen.
(Werbung, bitte nicht blockieren.)

9 Native Node.js Streams



Dieses Kapitel ist eine Einführung in die nativen Streams von Node. Sie unterstützen asynchrone Iteration, was die Arbeit damit erleichtert und was wir in diesem Kapitel hauptsächlich verwenden werden.

Beachten Sie, dass plattformübergreifende Web Streams in §10 „Web Streams auf Node.js verwenden“ behandelt werden. Wir werden diese hauptsächlich in diesem Buch verwenden. Daher können Sie das aktuelle Kapitel überspringen, wenn Sie möchten.

9.1 Wiederholung: Asynchrone Iteration und asynchrone Generatoren

Asynchrone Iteration ist ein Protokoll zum asynchronen Abrufen der Inhalte eines Datenbehälters (was bedeutet, dass die aktuelle „Aufgabe“ vor dem Abrufen eines Elements pausiert werden kann).

Asynchrone Generatoren helfen bei der asynchronen Iteration. Zum Beispiel ist dies eine asynchrone Generatorfunktion

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}

Achten Sie im Rest des Kapitels genau darauf, ob eine Funktion eine async-Funktion oder eine async-Generatorfunktion ist.

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

9.2 Streams

Ein Stream ist ein Muster, dessen Kernidee darin besteht, eine große Datenmenge zu „teilen und erobern“: Wir können sie handhaben, wenn wir sie in kleinere Teile aufteilen und jeweils einen Teil verarbeiten.

Node.js unterstützt verschiedene Arten von Streams – zum Beispiel

9.2.1 Pipelining

Um gestreamte Daten in mehreren Schritten zu verarbeiten, können wir Streams pipen (verbinden)

  1. Eingaben werden über einen lesbaren Stream empfangen.
  2. Jeder Verarbeitungsschritt wird über einen Transform Stream ausgeführt.
  3. Für den letzten Verarbeitungsschritt haben wir zwei Optionen
    • Wir können die Daten im neuesten lesbaren Stream in einen beschreibbaren Stream schreiben. Das heißt, der beschreibbare Stream ist das letzte Element unserer Pipeline.
    • Wir können die Daten im neuesten lesbaren Stream auf andere Weise verarbeiten.

Teil (2) ist optional.

9.2.2 Text-Kodierungen

Beim Erstellen von Text-Streams ist es am besten, immer eine Kodierung anzugeben.

Der Standardwert für Kodierungen ist null, was 'utf8' entspricht.

9.2.3 Hilfsfunktion: readableToString()

Wir werden gelegentlich die folgende Hilfsfunktion verwenden. Sie müssen nicht verstehen, wie sie funktioniert, nur (ungefähr), was sie tut.

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

Diese Funktion wird über die ereignisbasierte API implementiert. Später werden wir eine einfachere Methode dafür sehen – über asynchrone Iteration.

9.2.4 Ein paar vorläufige Bemerkungen

9.3 Readable Streams

9.3.1 Erstellen von Readable Streams

9.3.1.1 Erstellen von Readable Streams aus Dateien

Wir können fs.createReadStream() verwenden, um Readable Streams zu erstellen.

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');
9.3.1.2 Readable.from(): Erstellen von Readable Streams aus Iterables

Die statische Methode Readable.from(iterable, options?) erstellt einen lesbaren Stream, der die im iterable enthaltenen Daten enthält. iterable kann ein synchrones oder asynchrones Iterable sein. Der Parameter options ist optional und kann unter anderem zur Angabe einer Textkodierung verwendet werden.

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');
9.3.1.2.1 Erstellen von Readable Streams aus Strings

Readable.from() akzeptiert jedes Iterable und kann daher auch verwendet werden, um Strings in Streams umzuwandeln.

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

Derzeit behandelt Readable.from() einen String wie jedes andere Iterable und iteriert daher über seine Codepunkte. Das ist aus Leistungssicht nicht ideal, sollte aber für die meisten Anwendungsfälle in Ordnung sein. Ich gehe davon aus, dass Readable.from() häufig mit Strings verwendet wird, daher wird es in Zukunft möglicherweise Optimierungen geben.

9.3.2 Lesen von Chunks aus Readable Streams via for-await-of

Jeder lesbare Stream ist asynchron iterierbar, was bedeutet, dass wir eine for-await-of Schleife verwenden können, um seinen Inhalt zu lesen.

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
9.3.2.1 Sammeln des Inhalts eines Readable Streams in einem String

Die folgende Funktion ist eine einfachere Neuimplementierung der Funktion, die wir zu Beginn dieses Kapitels gesehen haben.

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

Beachten Sie, dass wir in diesem Fall eine async-Funktion verwenden mussten, da wir ein Promise zurückgeben wollten.

9.3.3 Lesen von Zeilen aus Readable Streams via Modul 'node:readlines'

Das eingebaute Modul 'node:readline' ermöglicht es uns, Zeilen aus lesbaren Streams zu lesen.

import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';

const filePath = process.argv[2]; // first command line argument

const rl = readline.createInterface({
  input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
  console.log('>', line);
}
rl.close();

9.4 Transformation von Readable Streams via Async Generatoren

Asynchrone Iteration bietet eine elegante Alternative zu Transform Streams für die Verarbeitung von gestreamten Daten in mehreren Schritten.

Zusammenfassend lässt sich sagen, dass dies die Teile solcher Verarbeitungspipelines sind

lesbar
→ erster async Generator [→ … → letzter async Generator]
→ lesbar oder async Funktion

9.4.1 Von Chunks zu nummerierten Zeilen in Async Iterables

Im nächsten Beispiel sehen wir ein Beispiel für eine Verarbeitungspipeline, wie sie gerade erklärt wurde.

import {Readable} from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

Die Verarbeitungspipeline wird in Zeile A eingerichtet. Die Schritte sind

Beobachtung

9.5 Writable Streams

9.5.1 Erstellen von Writable Streams für Dateien

Wir können fs.createWriteStream() verwenden, um beschreibbare Streams zu erstellen.

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

9.5.2 Schreiben in Writable Streams

In diesem Abschnitt betrachten wir Ansätze zum Schreiben in einen beschreibbaren Stream.

  1. Direktes Schreiben in den beschreibbaren Stream über seine Methode .write().
  2. Verwendung der Funktion pipeline() aus dem Modul stream, um einen lesbaren Stream in den beschreibbaren Stream zu pipen.

Um diese Ansätze zu demonstrieren, implementieren wir damit dieselbe Funktion writeIterableToFile().

Die Methode .pipe() von lesbaren Streams unterstützt ebenfalls das Pipelining, hat aber einen Nachteil und es ist besser, sie zu vermeiden.

9.5.2.1 writable.write(chunk)

Beim Schreiben von Daten in Streams gibt es zwei Callback-basierte Mechanismen, die uns helfen.

Im folgenden Beispiel promisifizieren wir diese Mechanismen, damit wir sie über eine async-Funktion verwenden können.

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

Die Standardversion von stream.finished() ist Callback-basiert, kann aber über util.promisify() (Zeile A) in eine Promise-basierte Version umgewandelt werden.

Wir haben die folgenden beiden Muster verwendet.

9.5.2.2 Pipelining von Readable Streams zu Writable Streams via stream.pipeline()

In Zeile A verwenden wir eine promisifizierte Version von stream.pipeline(), um einen lesbaren Stream readable in einen beschreibbaren Stream writable zu pipen.

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

Die Methode readable.pipe() unterstützt ebenfalls das Pipelining, hat aber einen Vorbehalt: Wenn der lesbare Stream einen Fehler auslöst, wird der beschreibbare Stream nicht automatisch geschlossen. pipeline() hat diesen Vorbehalt nicht.

Modul os

Modul buffer

Modul stream

Modul fs

Die statischen Typinformationen in diesem Abschnitt basieren auf Definitely Typed.

9.7 Weiterführende Lektüre und Quellen dieses Kapitels