Shell-Skripting mit Node.js
Sie können die Offline-Version dieses Buches (HTML, PDF, EPUB, MOBI) kaufen und damit die kostenlose Online-Version unterstützen.
(Werbung, bitte nicht blockieren.)

10 Web Streams in Node.js



Web Streams sind ein Standard für Streams, der jetzt auf allen wichtigen Web-Plattformen unterstützt wird: Webbrowser, Node.js und Deno. (Streams sind eine Abstraktion zum sequenziellen Lesen und Schreiben von Daten in kleinen Stücken aus allen Arten von Quellen – Dateien, auf Servern gehostete Daten usw.)

Zum Beispiel gibt die globale Funktion fetch() (die Online-Ressourcen herunterlädt) asynchron eine Response zurück, die eine Eigenschaft .body mit einem Web-Stream hat.

Dieses Kapitel behandelt Web Streams in Node.js, aber die meisten Dinge, die wir lernen, gelten für alle Web-Plattformen, die sie unterstützen.

10.1 Was sind Web Streams?

Wir beginnen mit einem Überblick über einige Grundlagen von Web Streams. Danach gehen wir schnell zu Beispielen über.

Streams sind eine Datenstruktur für den Zugriff auf Daten wie

Zwei ihrer Vorteile sind

Web Streams (oft wird "Web" weggelassen) sind ein relativ neuer Standard, der in Webbrowsern entstanden ist, aber jetzt auch von Node.js und Deno unterstützt wird (wie in dieser MDN-Kompatibilitätstabelle gezeigt).

In Web Streams sind Chunks normalerweise entweder

10.1.1 Arten von Streams

Es gibt drei Hauptarten von Web Streams

ReadableStreams, WritableStreams und TransformStreams können zum Transport von Text- oder Binärdaten verwendet werden. Wir werden uns in diesem Kapitel meistens mit ersterem beschäftigen. Byte-Streams für Binärdaten werden am Ende kurz erwähnt.

10.1.2 Pipe Chains

Piping ist eine Operation, die es uns ermöglicht, einen ReadableStream an einen WritableStream zu pipen: Solange der ReadableStream Daten produziert, liest diese Operation diese Daten und schreibt sie in den WritableStream. Wenn wir nur zwei Streams verbinden, erhalten wir eine bequeme Möglichkeit, Daten von einem Ort zum anderen zu übertragen (z.B. eine Datei zu kopieren). Wir können aber auch mehr als zwei Streams verbinden und Pipe Chains erhalten, die Daten auf vielfältige Weise verarbeiten können. Dies ist ein Beispiel für eine Pipe Chain

Ein ReadableStream wird mit einem TransformStream verbunden, indem der erstere an die beschreibbare Seite letzteren gepiped wird. Ebenso wird ein TransformStream mit einem anderen TransformStream verbunden, indem die lesbare Seite des ersteren an die beschreibbare Seite des letzteren gepiped wird. Und ein TransformStream wird mit einem WritableStream verbunden, indem die lesbare Seite des ersteren an letzteren gepiped wird.

10.1.3 Backpressure

Ein Problem bei Pipe Chains ist, dass ein Mitglied mehr Daten empfangen kann, als es im Moment verarbeiten kann. Backpressure ist eine Technik zur Lösung dieses Problems: Sie ermöglicht es einem Datenempfänger, seinem Sender mitzuteilen, dass er vorübergehend keine Daten mehr senden soll, damit der Empfänger nicht überfordert wird.

Eine andere Sichtweise auf Backpressure ist ein Signal, das rückwärts durch eine Pipe Chain wandert, von einem Mitglied, das überlastet wird, zum Anfang der Kette. Betrachten wir als Beispiel die folgende Pipe Chain

ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream

So wandert Backpressure durch diese Kette

Wir haben den Anfang der Pipe Chain erreicht. Daher sammeln sich keine Daten im ReadableStream an (der ebenfalls gepuffert ist), und der WritableStream hat Zeit, sich zu erholen. Sobald dies geschehen ist, signalisiert er, dass er wieder bereit ist, Daten zu empfangen. Dieses Signal wandert ebenfalls durch die Kette zurück, bis es den ReadableStream erreicht und die Datenverarbeitung wieder aufgenommen wird.

Bei diesem ersten Blick auf Backpressure wurden mehrere Details weggelassen, um die Sache einfacher zu gestalten. Diese werden später behandelt.

10.1.4 Unterstützung für Web Streams in Node.js

In Node.js sind Web Streams aus zwei Quellen verfügbar

Momentan unterstützt nur eine API Web Streams direkt in Node.js – die Fetch API

const response = await fetch('https://example.com');
const readableStream = response.body;

Für andere Dinge müssen wir eine der folgenden statischen Methoden im Modul 'node:stream' verwenden, um entweder einen Node.js-Stream in einen Web-Stream oder umgekehrt zu konvertieren

Eine weitere API unterstützt Web Streams teilweise: FileHandles haben die Methode .readableWebStream().

10.2 Lesen von ReadableStreams

ReadableStreams ermöglichen es uns, Daten-Chunks aus verschiedenen Quellen zu lesen. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)

interface ReadableStream<TChunk> {
  getReader(): ReadableStreamDefaultReader<TChunk>;
  readonly locked: boolean;
  [Symbol.asyncIterator](): AsyncIterator<TChunk>;

  cancel(reason?: any): Promise<void>;

  pipeTo(
    destination: WritableStream<TChunk>,
    options?: StreamPipeOptions
  ): Promise<void>;
  pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
  ): ReadableStream<TChunk2>;
  
  // Not used in this chapter:
  tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}

interface StreamPipeOptions {
  signal?: AbortSignal;
  preventClose?: boolean;
  preventAbort?: boolean;
  preventCancel?: boolean;
}

Erklärungen zu diesen Eigenschaften

Die folgenden Unterkapitel behandeln drei Möglichkeiten, ReadableStreams zu konsumieren

10.2.1 Consuming ReadableStreams über Reader

Wir können Reader verwenden, um Daten aus ReadableStreams zu lesen. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)

interface ReadableStreamGenericReader {
  readonly closed: Promise<undefined>;
  cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
  extends ReadableStreamGenericReader
{
  releaseLock(): void;
  read(): Promise<ReadableStreamReadResult<TChunk>>;
}

interface ReadableStreamReadResult<TChunk> {
  done: boolean;
  value: TChunk | undefined;
}

Erklärungen zu diesen Eigenschaften

ReadableStreamReadResult mag Ihnen bekannt vorkommen, wenn Sie wissen, wie Iteration funktioniert: ReadableStreams ähneln Iterables, Reader ähneln Iteratoren, und ReadableStreamReadResults ähneln Objekten, die von der Iterator-Methode .next() zurückgegeben werden.

Der folgende Code demonstriert das Protokoll zur Verwendung von Readern

const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
  while (true) {
    const {done, value: chunk} = await reader.read(); // (C)
    if (done) break;
    // Use `chunk`
  }
} finally {
  reader.releaseLock(); // (D)
}

Einen Reader erhalten. Wir können nicht direkt von readableStream lesen, wir müssen zuerst einen Reader erwerben (Zeile A). Jeder ReadableStream kann höchstens einen Reader haben. Nachdem ein Reader erworben wurde, ist readableStream gesperrt (Zeile B). Bevor wir .getReader() erneut aufrufen können, müssen wir .releaseLock() aufrufen (Zeile D).

Chunks lesen. .read() gibt ein Promise für ein Objekt mit den Eigenschaften .done und .value zurück (Zeile C). Nachdem der letzte Chunk gelesen wurde, ist .done true. Dieser Ansatz ähnelt der Funktionsweise der asynchronen Iteration in JavaScript.

10.2.1.1 Beispiel: Lesen einer Datei über einen ReadableStream

Im folgenden Beispiel lesen wir Chunks (Strings) aus einer Textdatei data.txt

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)

const reader = webReadableStream.getReader();
try {
  while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    console.log(value);
  }
} finally {
  reader.releaseLock();
}
// Output:
// 'Content of text file\n'

Wir konvertieren einen Node.js Readable in einen Web ReadableStream (Zeile A). Dann verwenden wir das zuvor erklärte Protokoll, um die Chunks zu lesen.

10.2.1.2 Beispiel: Zusammenfügen eines Strings mit dem Inhalt eines ReadableStreams

Im nächsten Beispiel verketten wir alle Chunks eines ReadableStreams zu einem String und geben ihn zurück

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString(readableStream) {
  const reader = readableStream.getReader();
  try {
    let result = '';
    while (true) {
      const {done, value} = await reader.read();
      if (done) {
        return result; // (A)
      }
      result += value;
    }
  } finally {
    reader.releaseLock(); // (B)
  }
}

Praktischerweise wird die finally-Klausel immer ausgeführt – egal wie wir die try-Klausel verlassen. Das heißt, der Lock wird korrekt freigegeben (Zeile B), wenn wir ein Ergebnis zurückgeben (Zeile A).

10.2.2 Consuming ReadableStreams über asynchrone Iteration

ReadableStreams können auch über asynchrone Iteration konsumiert werden

const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
  while (true) {
    let chunk;
    ({done: exhaustive, value: chunk} = await iterator.next());
    if (exhaustive) break;
    console.log(chunk);
  }
} finally {
  // If the loop was terminated before we could iterate exhaustively
  // (via an exception or `return`), we must call `iterator.return()`.
  // Check if that was the case.
  if (!exhaustive) {
    iterator.return();
  }
}

Glücklicherweise kümmert sich die for-await-of-Schleife um alle Details der asynchronen Iteration für uns

for await (const chunk of readableStream) {
  console.log(chunk);
}
10.2.2.1 Beispiel: Verwenden asynchroner Iteration zum Lesen eines Streams

Lassen Sie uns unseren vorherigen Versuch, Text aus einer Datei zu lesen, wiederholen. Diesmal verwenden wir asynchrone Iteration anstelle eines Readers

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
  console.log(chunk);
}
// Output:
// 'Content of text file'
10.2.2.2 Beispiel: Zusammenfügen eines Strings mit dem Inhalt eines ReadableStreams

Wir haben zuvor einen Reader verwendet, um einen String mit dem Inhalt eines ReadableStreams zusammenzufügen. Mit asynchroner Iteration wird der Code einfacher

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString2(readableStream) {
  let result = '';
  for await (const chunk of readableStream) {
    result += chunk;
  }
  return result;
}
10.2.2.3 Vorbehalt: Browser unterstützen keine asynchrone Iteration über ReadableStreams

Momentan unterstützen Node.js und Deno asynchrone Iteration über ReadableStreams, Webbrowser jedoch nicht: Es gibt ein GitHub-Issue, das auf Bug-Reports verweist.

Da noch nicht ganz klar ist, wie die asynchrone Iteration in Browsern unterstützt wird, ist Wrapping eine sicherere Wahl als Polyfilling. Der folgende Code basiert auf einem Vorschlag im Chromium-Bugreport

async function* getAsyncIterableFor(readableStream) {
  const reader = readableStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) return;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

10.2.3 Piping von ReadableStreams zu WritableStreams

ReadableStreams haben zwei Methoden zum Piping

10.3 Datenquellen in ReadableStreams über Wrapping umwandeln

Wenn wir eine externe Quelle über einen ReadableStream lesen möchten, können wir sie in ein Adapterobjekt wickeln und dieses Objekt dem ReadableStream-Konstruktor übergeben. Das Adapterobjekt wird als underlying source des ReadableStreams bezeichnet (Quing-Strategien werden später erklärt, wenn wir uns näher mit Backpressure befassen)

new ReadableStream(underlyingSource?, queuingStrategy?)

Dies ist der Typ von underlying sources (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)

interface UnderlyingSource<TChunk> {
  start?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  pull?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  cancel?(reason?: any): void | Promise<void>;

  // Only used in byte streams and ignored in this section:
  type: 'bytes' | undefined;
  autoAllocateChunkSize: bigint;
}

Dies ist, wann der ReadableStream diese Methoden aufruft

Jede dieser Methoden kann ein Promise zurückgeben, und erst wenn das Promise erfüllt oder abgelehnt ist, werden weitere Schritte unternommen. Das ist nützlich, wenn wir etwas Asynchrones tun möchten.

Der Parameter controller von .start() und .pull() ermöglicht ihnen den Zugriff auf den Stream. Er hat den folgenden Typ

type ReadableStreamController<TChunk> =
  | ReadableStreamDefaultController<TChunk>
  | ReadableByteStreamController<TChunk> // ignored here
;

interface ReadableStreamDefaultController<TChunk> {
  enqueue(chunk?: TChunk): void;
  readonly desiredSize: number | null;
  close(): void;
  error(err?: any): void;
}

Vorerst sind Chunks Strings. Wir werden später zu Byte-Streams kommen, wo Uint8Arrays üblich sind. Dies ist, was die Methoden tun

10.3.1 Ein erstes Beispiel für die Implementierung einer Underlying Source

In unserem ersten Beispiel für die Implementierung einer underlying source stellen wir nur die Methode .start() bereit. Wir werden Anwendungsfälle für .pull() im nächsten Unterkapitel sehen.

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First line\n'); // (A)
    controller.enqueue('Second line\n'); // (B)
    controller.close(); // (C)
  },
});
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'First line\n'
// 'Second line\n'

Wir verwenden den Controller, um einen Stream mit zwei Chunks zu erstellen (Zeile A und Zeile B). Es ist wichtig, dass wir den Stream schließen (Zeile C). Andernfalls würde die for-await-of-Schleife niemals enden!

Beachten Sie, dass diese Art des Enqueuings nicht ganz sicher ist: Es besteht die Gefahr, die Kapazität der internen Warteschlange zu überschreiten. Wir werden bald sehen, wie wir dieses Risiko vermeiden können.

10.3.2 Verwendung eines ReadableStreams zum Umwickeln einer Push- oder Pull-Source

Ein gängiges Szenario ist die Umwandlung einer Push- oder Pull-Source in einen ReadableStream. Ob die Quelle Push oder Pull ist, bestimmt, wie wir uns mit unserer UnderlyingSource an den ReadableStream hängen

Wir werden als Nächstes Beispiele für beide Arten von Quellen sehen.

10.3.2.1 Beispiel: Erstellen eines ReadableStreams aus einer Push-Quelle mit Backpressure-Unterstützung

Im folgenden Beispiel wickeln wir einen ReadableStream um einen Socket – der uns seine Daten pusht (er ruft uns an). Dieses Beispiel ist der Web-Stream-Spezifikation entnommen

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // The internal queue is full, so propagate
          // the backpressure signal to the underlying source.
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(
        new Error('The socket errored!'));
    },

    pull() {
      // This is called if the internal queue has been emptied, but the
      // stream’s consumer still wants more data. In that case, restart
      // the flow of data if we have previously paused it.
      socket.readStart();
    },

    cancel() {
      socket.close();
    },
  });
}
10.3.2.2 Beispiel: Erstellen eines ReadableStreams aus einer Pull-Quelle

Die Hilfsfunktion iterableToReadableStream() nimmt ein Iterable von Chunks und wandelt es in einen ReadableStream um

/**
 * @param iterable an iterable (asynchronous or synchronous)
 */
 function iterableToReadableStream(iterable) {
  return new ReadableStream({
    start() {
      if (typeof iterable[Symbol.asyncIterator] === 'function') {
        this.iterator = iterable[Symbol.asyncIterator]();
      } else if (typeof iterable[Symbol.iterator] === 'function') {
        this.iterator = iterable[Symbol.iterator]();
      } else {
        throw new Error('Not an iterable: ' + iterable);
      }
    },

    async pull(controller) {
      if (this.iterator === null) return;
      // Sync iterators return non-Promise values,
      // but `await` doesn’t mind and simply passes them on
      const {value, done} = await this.iterator.next();
      if (done) {
        this.iterator = null;
        controller.close();
        return;
      }
      controller.enqueue(value);
    },

    cancel() {
      this.iterator = null;
      controller.close();
    },
  });
}

Lassen Sie uns eine Async-Generatorfunktion verwenden, um ein asynchrones Iterable zu erstellen und dieses Iterable in einen ReadableStream umzuwandeln

async function* genAsyncIterable() {
  yield 'how';
  yield 'are';
  yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'how'
// 'are'
// 'you'

iterableToReadableStream() funktioniert auch mit synchronen Iterables

const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'hello'
// 'everyone'

Es wird möglicherweise irgendwann eine statische Hilfsmethode ReadableStream.from() geben, die diese Funktionalität bietet (siehe ihr Pull Request für weitere Informationen).

10.4 Schreiben in WritableStreams

WritableStreams ermöglichen es uns, Daten-Chunks in verschiedene Senken zu schreiben. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)

interface WritableStream<TChunk> {
  getWriter(): WritableStreamDefaultWriter<TChunk>;
  readonly locked: boolean;

  close(): Promise<void>;
  abort(reason?: any): Promise<void>;
}

Erklärungen zu diesen Eigenschaften

Die folgenden Unterkapitel behandeln zwei Ansätze zum Senden von Daten an WritableStreams

10.4.1 Schreiben in WritableStreams über Writer

Wir können Writer verwenden, um in WritableStreams zu schreiben. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)

interface WritableStreamDefaultWriter<TChunk> {
  readonly desiredSize: number | null;
  readonly ready: Promise<undefined>;
  write(chunk?: TChunk): Promise<void>;
  releaseLock(): void;

  close(): Promise<void>;
  readonly closed: Promise<undefined>;
  abort(reason?: any): Promise<void>;
}

Erklärungen zu diesen Eigenschaften

Der folgende Code zeigt das Protokoll zur Verwendung von Write-Objekten

const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
  // Writing the chunks (explained later)
} finally {
  writer.releaseLock(); // (C)
}

Wir können nicht direkt in einen writableStream schreiben, wir müssen zuerst einen Writer erwerben (Zeile A). Jeder WritableStream kann höchstens einen Writer haben. Nachdem ein Writer erworben wurde, ist writableStream gesperrt (Zeile B). Bevor wir .getWriter() erneut aufrufen können, müssen wir .releaseLock() aufrufen (Zeile C).

Es gibt drei Ansätze zum Schreiben von Chunks.

10.4.1.1 Schreibansatz 1: Warten auf .write() (ineffiziente Behandlung von Backpressure)

Der erste Schreibansatz besteht darin, auf jedes Ergebnis von .write() zu warten

await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();

Das von .write() zurückgegebene Promise wird erfüllt, wenn der Chunk, den wir ihm übergeben haben, erfolgreich geschrieben wurde. Was genau "erfolgreich geschrieben" bedeutet, hängt davon ab, wie ein WritableStream implementiert ist – z.B. bei einem Dateistream könnte der Chunk an das Betriebssystem gesendet worden sein, aber immer noch in einem Cache liegen und daher noch nicht tatsächlich auf die Festplatte geschrieben worden sein.

Das von .close() zurückgegebene Promise wird erfüllt, wenn der Stream geschlossen wird.

Ein Nachteil dieses Schreibansatzes ist, dass das Warten, bis das Schreiben erfolgreich ist, bedeutet, dass die Warteschlange nicht genutzt wird. Infolgedessen kann der Datendurchsatz geringer sein.

10.4.1.2 Schreibansatz 2: Ignorieren von .write()-Ablehnungen (Ignorieren von Backpressure)

Beim zweiten Schreibansatz ignorieren wir die von .write() zurückgegebenen Promises und warten nur auf das von .close() zurückgegebene Promise

writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errors

Die synchronen Aufrufe von .write() fügen Chunks zur internen Warteschlange des WritableStreams hinzu. Indem wir die zurückgegebenen Promises nicht abwarten, warten wir nicht, bis jeder Chunk geschrieben ist. Das Abwarten von .close() stellt jedoch sicher, dass die Warteschlange leer ist und alle Schreibvorgänge erfolgreich waren, bevor wir fortfahren.

Das Aufrufen von .catch() in Zeile A und Zeile B ist notwendig, um Warnungen über nicht behandelte Promise-Ablehnungen zu vermeiden, wenn beim Schreiben etwas schief geht. Solche Warnungen werden oft in die Konsole geloggt. Wir können es uns leisten, die von .write() gemeldeten Fehler zu ignorieren, da .close() sie uns ebenfalls melden wird.

Der vorherige Code kann durch Verwendung einer Hilfsfunktion, die Promise-Ablehnungen ignoriert, verbessert werden

ignoreRejections(
  writer.write('Chunk 1'),
  writer.write('Chunk 2'),
);
await writer.close(); // reports errors

function ignoreRejections(...promises) {
  for (const promise of promises) {
    promise.catch(() => {});
  }
}

Ein Nachteil dieses Ansatzes ist, dass Backpressure ignoriert wird: Wir gehen einfach davon aus, dass die Warteschlange groß genug ist, um alles aufzunehmen, was wir schreiben.

10.4.1.3 Schreibansatz 3: Warten auf .ready (effiziente Behandlung von Backpressure)

In diesem Schreibansatz behandeln wir Backpressure effizient, indem wir auf den Writer-Getter .ready warten

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});

await writer.close(); // reports errors

Das Promise in .ready wird erfüllt, wann immer der Stream von Backpressure zu keiner Backpressure wechselt.

10.4.1.4 Beispiel: Schreiben in eine Datei über einen Writer

In diesem Beispiel erstellen wir eine Textdatei data.txt über einen WritableStream

import * as fs from 'node:fs';
import {Writable} from 'node:stream';

const nodeWritable = fs.createWriteStream(
  'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)

const writer = webWritableStream.getWriter();
try {
  await writer.write('First line\n');
  await writer.write('Second line\n');
  await writer.close();
} finally {
  writer.releaseLock()
}

In Zeile A erstellen wir einen Node.js-Stream für die Datei data.txt. In Zeile B konvertieren wir diesen Stream in einen Web-Stream. Dann verwenden wir einen Writer, um Strings hineinzuschreiben.

10.4.2 Piping zu WritableStreams

Anstatt Writer zu verwenden, können wir auch zu WritableStreams schreiben, indem wir ReadableStreams an sie pipen

await readableStream.pipeTo(writableStream);

Das von .pipeTo() zurückgegebene Promise wird erfüllt, wenn das Piping erfolgreich abgeschlossen ist.

10.4.2.1 Piping erfolgt asynchron

Piping wird nach Abschluss oder Pause der aktuellen Aufgabe ausgeführt. Der folgende Code demonstriert dies

const readableStream = new ReadableStream({ // (A)
  start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});
const writableStream = new WritableStream({ // (B)
  write(chunk) {
    console.log('WRITE: ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE WritableStream');
  },
});


console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');

// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'

In Zeile A erstellen wir einen ReadableStream. In Zeile B erstellen wir einen WritableStream.

Wir sehen, dass .pipeTo() (Zeile C) sofort zurückkehrt. In einer neuen Aufgabe werden Chunks gelesen und geschrieben. Dann wird writableStream geschlossen und schließlich promise erfüllt.

10.4.2.2 Beispiel: Piping zu einem WritableStream für eine Datei

Im folgenden Beispiel erstellen wir einen WritableStream für eine Datei und pipen einen ReadableStream dorthin

const webReadableStream = new ReadableStream({ // (A)
  async start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});

const nodeWritable = fs.createWriteStream( // (B)
  'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)

await webReadableStream.pipeTo(webWritableStream); // (D)

In Zeile A erstellen wir einen ReadableStream. In Zeile B erstellen wir einen Node.js-Stream für die Datei data.txt. In Zeile C konvertieren wir diesen Stream in einen Web-Stream. In Zeile D pipen wir unseren webReadableStream an den WritableStream für die Datei.

10.4.2.3 Beispiel: Schreiben von zwei ReadableStreams in einen WritableStream

Im folgenden Beispiel schreiben wir zwei ReadableStreams in einen einzigen WritableStream.

function createReadableStream(prefix) {
  return new ReadableStream({
    async start(controller) {
      controller.enqueue(prefix + 'chunk 1');
      controller.enqueue(prefix + 'chunk 2');
      controller.close();
    },
  });
}

const writableStream = new WritableStream({
  write(chunk) {
    console.log('WRITE ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE');
  },
  abort(err) {
    console.log('ABORT ' + err);
  },
});

await createReadableStream('Stream 1: ')
  .pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
  .pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();

// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'

Wir weisen .pipeTo() an, den WritableStream nach dem Schließen des ReadableStreams nicht zu schließen (Zeile A und Zeile B). Daher bleibt der WritableStream nach Zeile A offen und wir können einen weiteren ReadableStream daran pipen.

10.5 Datensenken in WritableStreams über Wrapping umwandeln

Wenn wir über einen WritableStream in eine externe Senke schreiben möchten, können wir sie in ein Adapterobjekt wickeln und dieses Objekt dem WritableStream-Konstruktor übergeben. Das Adapterobjekt wird als underlying sink des WritableStreams bezeichnet (Quing-Strategien werden später erklärt, wenn wir uns näher mit Backpressure befassen)

new WritableStream(underlyingSink?, queuingStrategy?)

Dies ist die Art von zugrunde liegenden Sinks (können Sie diese Art und die Erklärungen ihrer Eigenschaften gerne überfliegen; sie werden wieder erklärt, wenn wir ihnen in Beispielen begegnen)

interface UnderlyingSink<TChunk> {
  start?(
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  write?(
    chunk: TChunk,
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  close?(): void | Promise<void>;;
  abort?(reason?: any): void | Promise<void>;
}

Erklärungen zu diesen Eigenschaften

Der Parameter controller von .start() und .write() ermöglicht es ihnen, den WritableStream zu fehlerhaften. Er hat den folgenden Typ

interface WritableStreamDefaultController {
  readonly signal: AbortSignal;
  error(err?: any): void;
}

10.5.1 Beispiel: Verfolgen eines ReadableStream

Im nächsten Beispiel leiten wir einen ReadableStream an einen WritableStream weiter, um zu überprüfen, wie der ReadableStream Chunks produziert.

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First chunk');
    controller.enqueue('Second chunk');
    controller.close();
  },
});
await readableStream.pipeTo(
  new WritableStream({
    write(chunk) {
      console.log('WRITE ' + JSON.stringify(chunk));
    },
    close() {
      console.log('CLOSE');
    },
    abort(err) {
      console.log('ABORT ' + err);
    },
  })
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'

10.5.2 Beispiel: Sammeln von Chunks, die in einen WriteStream geschrieben wurden, in einem String

Im nächsten Beispiel erstellen wir eine Unterklasse von WriteStream, die alle geschriebenen Chunks in einem String sammelt. Wir können auf diesen String über die Methode .getString() zugreifen.

class StringWritableStream extends WritableStream {
  #string = '';
  constructor() {
    super({
      // We need to access the `this` of `StringWritableStream`.
      // Hence the arrow function (and not a method).
      write: (chunk) => {
        this.#string += chunk;
      },
    });
  }
  getString() {
    return this.#string;
  }
}
const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

Ein Nachteil dieses Ansatzes ist, dass wir zwei APIs mischen: die API von WritableStream und unsere neue String-Stream-API. Eine Alternative ist, an den WritableStream zu delegieren, anstatt ihn zu erweitern.

function StringcreateWritableStream() {
  let string = '';
  return {
    stream: new WritableStream({
      write(chunk) {
        string += chunk;
      },
    }),
    getString() {
      return string;
    },
  };
}

const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

Diese Funktionalität könnte auch über eine Klasse (anstatt als Factory-Funktion für Objekte) implementiert werden.

10.6 Verwenden von TransformStreams

Ein TransformStream

Die gebräuchlichste Art, TransformStreams zu verwenden, ist, sie “durchzuleiten” (pipe through).

const transformedStream = readableStream.pipeThrough(transformStream);

.pipeThrough() leitet readableStream an die beschreibbare Seite von transformStream weiter und gibt deren lesbare Seite zurück. Mit anderen Worten: Wir haben einen neuen ReadableStream erstellt, der eine transformierte Version von readableStream ist.

.pipeThrough() akzeptiert nicht nur TransformStreams, sondern jedes Objekt, das die folgende Form hat:

interface ReadableWritablePair<RChunk, WChunk> {
  readable: ReadableStream<RChunk>;
  writable: WritableStream<WChunk>;
}

10.6.1 Standard TransformStreams

Node.js unterstützt die folgenden Standard-TransformStreams:

10.6.1.1 Beispiel: Dekodieren eines Streams von UTF-8-kodierten Bytes

Im folgenden Beispiel dekodieren wir einen Stream von UTF-8-kodierten Bytes.

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

response.body ist ein ReadableByteStream, dessen Chunks Instanzen von Uint8Array sind (TypedArrays). Wir leiten diesen Stream durch einen TextDecoderStream, um einen Stream mit String-Chunks zu erhalten.

Beachten Sie, dass das separate Übersetzen jedes Byte-Chunks (z. B. über einen TextDecoder) nicht funktioniert, da ein einzelner Unicode-Codepunkt in UTF-8 als bis zu vier Bytes kodiert wird und diese Bytes möglicherweise nicht alle im selben Chunk enthalten sind.

10.6.1.2 Beispiel: Erstellen eines lesbaren Textstreams für die Standardeingabe

Das folgende Node.js-Modul protokolliert alles, was ihm über die Standardeingabe gesendet wird.

// echo-stdin.mjs
import {Readable} from 'node:stream';

const webStream = Readable.toWeb(process.stdin)
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
  console.log('>>>', chunk);
}

Wir können auf die Standardeingabe über einen Stream zugreifen, der in process.stdin gespeichert ist (process ist eine globale Node.js-Variable). Wenn wir keine Kodierung für diesen Stream festlegen und ihn über Readable.toWeb() konvertieren, erhalten wir einen Byte-Stream. Wir leiten ihn durch einen TextDecoderStream, um einen Text-Stream zu erhalten.

Beachten Sie, dass wir die Standardeingabe inkrementell verarbeiten: Sobald ein weiterer Chunk verfügbar ist, protokollieren wir ihn. Mit anderen Worten, wir warten nicht, bis die Standardeingabe abgeschlossen ist. Das ist nützlich, wenn die Daten entweder groß sind oder nur intermittierend gesendet werden.

10.7 Implementieren benutzerdefinierter TransformStreams

Wir können einen benutzerdefinierten TransformStream implementieren, indem wir ein Transformer-Objekt an den Konstruktor von TransformStream übergeben. Ein solches Objekt hat den folgenden Typ (können Sie diesen Typ und die Erklärungen seiner Eigenschaften gerne überfliegen; sie werden wieder erklärt, wenn wir ihnen in Beispielen begegnen).

interface Transformer<TInChunk, TOutChunk> {
  start?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  transform?(
    chunk: TInChunk,
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  flush?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
}

Erklärungen zu diesen Eigenschaften

Jede dieser Methoden kann ein Promise zurückgeben, und erst wenn das Promise erfüllt oder abgelehnt ist, werden weitere Schritte unternommen. Das ist nützlich, wenn wir etwas Asynchrones tun möchten.

Der Parameter controller hat den folgenden Typ.

interface TransformStreamDefaultController<TOutChunk> {
  enqueue(chunk?: TOutChunk): void;
  readonly desiredSize: number | null;
  terminate(): void;
  error(err?: any): void;
}

Was ist mit Backpressure in einem TransformStream? Die Klasse leitet die Backpressure von ihrer lesbaren Seite (Ausgabe) an ihre beschreibbare Seite (Eingabe) weiter. Die Annahme ist, dass die Transformation die Datenmenge nicht wesentlich verändert. Daher können Transfomers davon absehen, Backpressure zu ignorieren. Sie könnte jedoch über transformStreamDefaultController.desiredSize erkannt und durch Rückgabe eines Promises von transformer.transform() weitergegeben werden.

10.7.1 Beispiel: Umwandeln eines Streams von beliebigen Chunks in einen Stream von Zeilen

Die folgende Unterklasse von TransformStream konvertiert einen Stream mit beliebigen Chunks in einen Stream, bei dem jeder Chunk genau eine Textzeile enthält. Das heißt, mit Ausnahme des letzten Chunks endet jeder Chunk mit einem Zeilenende (EOL)-String: '\n' unter Unix (inkl. macOS) und '\r\n' unter Windows.

class ChunksToLinesTransformer {
  #previous = '';

  transform(chunk, controller) {
    let startSearch = this.#previous.length;
    this.#previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = this.#previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = this.#previous.slice(0, eolIndex+1);
      controller.enqueue(line);
      this.#previous = this.#previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }

  flush(controller) {
    // Clean up and enqueue any text we’re still holding on to
    if (this.#previous.length > 0) {
      controller.enqueue(this.#previous);
    }
  }
}
class ChunksToLinesStream extends TransformStream {
  constructor() {
    super(new ChunksToLinesTransformer());
  }
}

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('multiple\nlines of\ntext');
    controller.close();
  },
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);

for await (const line of transformed) {
  console.log('>>>', JSON.stringify(line));
}

// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'

Beachten Sie, dass Deno's integrierter TextLineStream ähnliche Funktionalität bietet.

Tipp: Wir können diese Transformation auch über einen asynchronen Generator realisieren. Er würde asynchron über einen ReadableStream iterieren und ein asynchrones Iterable mit Zeilen zurückgeben. Seine Implementierung wird in §9.4 “Transforming readable streams via async generators” gezeigt.

10.7.2 Tipp: Asynchrone Generatoren eignen sich auch hervorragend für die Transformation von Streams

Aufgrund der Tatsache, dass ReadableStreams asynchron iterierbar sind, können wir asynchrone Generatoren verwenden, um sie zu transformieren. Das führt zu sehr elegantem Code.

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('one');
    controller.enqueue('two');
    controller.enqueue('three');
    controller.close();
  },
});

async function* prefixChunks(prefix, asyncIterable) {
  for await (const chunk of asyncIterable) {
    yield '> ' + chunk;
  }
}

const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
  console.log(transformedChunk);
}

// Output:
// '> one'
// '> two'
// '> three'

10.8 Ein genauerer Blick auf Backpressure

Werfen wir einen genaueren Blick auf Backpressure. Betrachten Sie die folgende Pipe-Kette:

rs.pipeThrough(ts).pipeTo(ws);

rs ist ein ReadableStream, ts ist ein TransformStream, ws ist ein WritableStream. Dies sind die Verbindungen, die durch den vorherigen Ausdruck erstellt werden (.pipeThrough verwendet .pipeTo, um rs mit der beschreibbaren Seite von ts zu verbinden).

rs -pipeTo-> ts{writable,readable} -pipeTo-> ws

Beobachtungen

Nehmen wir an, dass der zugrunde liegende Sink von ws langsam ist und der Puffer von ws schließlich voll ist. Dann geschehen die folgenden Schritte:

Dieses Beispiel veranschaulicht, dass wir zwei Arten von Funktionalität benötigen:

Lassen Sie uns untersuchen, wie diese Funktionalitäten in der Web Streams API implementiert sind.

10.8.1 Backpressure signalisieren

Backpressure wird von den Daten empfangenden Entitäten signalisiert. Web Streams haben zwei solcher Entitäten:

In beiden Fällen werden die Eingaben über Warteschlangen gepuffert. Das Signal zum Anwenden von Backpressure ist, wenn eine Warteschlange voll ist. Lassen Sie uns sehen, wie das erkannt werden kann.

Hier sind die Speicherorte der Warteschlangen:

Die *gewünschte Größe* (desired size) einer Warteschlange ist eine Zahl, die angibt, wie viel Platz in der Warteschlange noch übrig ist.

Daher müssen wir Backpressure anwenden, wenn die gewünschte Größe null oder kleiner ist. Sie ist über den Getter .desiredSize des Objekts verfügbar, das die Warteschlange enthält.

Wie wird die gewünschte Größe berechnet? Über ein Objekt, das eine sogenannte *Queuing-Strategie* spezifiziert. ReadableStream und WritableStream haben Standard-Queuing-Strategien, die über optionale Parameter ihrer Konstruktoren überschrieben werden können. Die Schnittstelle QueuingStrategy hat zwei Eigenschaften:

Die gewünschte Größe einer Warteschlange ist die High Water Mark abzüglich der aktuellen Größe der Warteschlange.

10.8.2 Auf Backpressure reagieren

Daten sendende Entitäten müssen auf signalisierte Backpressure reagieren, indem sie Backpressure ausüben.

10.8.2.1 Schreiben von Code in einen WritableStream über einen Writer

Wenn wir möchten, können wir zusätzlich die Größe unserer Chunks basierend auf writer.desiredSize festlegen.

10.8.2.2 Die zugrunde liegende Quelle eines ReadableStream

Das zugrunde liegende Source-Objekt, das an einen ReadableStream übergeben werden kann, umschließt eine externe Quelle. In gewisser Weise ist es auch ein Mitglied der Pipe-Kette; eines, das vor seinem ReadableStream kommt.

10.8.2.3 Der zugrunde liegende Sink eines WritableStream

Das zugrunde liegende Sink-Objekt, das an einen WritableStream übergeben werden kann, umschließt einen externen Sink. In gewisser Weise ist es auch ein Mitglied der Pipe-Kette; eines, das nach seinem WritableStream kommt.

Jeder externe Sink signalisiert Backpressure unterschiedlich (in einigen Fällen gar nicht). Der zugrunde liegende Sink kann Backpressure ausüben, indem er aus der Methode .write() ein Promise zurückgibt, das erfüllt wird, sobald das Schreiben abgeschlossen ist. Es gibt ein Beispiel im Web Streams Standard, das zeigt, wie das funktioniert.

10.8.2.4 Ein TransformStream (.writable .readable)

Der TransformStream verbindet seine beschreibbare Seite mit seiner lesbaren Seite, indem er einen zugrunde liegenden Sink für erstere und eine zugrunde liegende Quelle für letztere implementiert. Er hat einen internen Slot .[[backpressure]], der anzeigt, ob interne Backpressure derzeit aktiv ist oder nicht.

10.8.2.5 .pipeTo() (ReadableStream WritableStream)

.pipeTo() liest Chunks vom ReadableStream über einen Reader und schreibt sie über einen Writer in den WritableStream. Er pausiert, wann immer writer.desiredSize null oder kleiner ist (Web Streams Standard: Schritt 15 von ReadableStreamPipeTo).

10.9 Byte-Streams

Bisher haben wir nur mit *Text-Streams* gearbeitet, Streams, deren Chunks Strings waren. Aber die Web Streams API unterstützt auch *Byte-Streams* für Binärdaten, bei denen Chunks Uint8Arrays sind (TypedArrays).

Als Nächstes lernen wir, wie man lesbare Byte-Streams erstellt.

10.9.1 Lesbare Byte-Streams

Welche Art von Stream vom ReadableStream-Konstruktor erstellt wird, hängt von der optionalen Eigenschaft .type des optionalen ersten Parameters underlyingSource ab.

Was ändert sich, wenn ein ReadableStream im 'bytes'-Modus ist?

Im Standardmodus kann die zugrunde liegende Quelle jede Art von Chunk zurückgeben. Im Byte-Modus müssen die Chunks ArrayBufferViews sein, d. h. TypedArrays (wie Uint8Arrays) oder DataViews.

Zusätzlich kann ein lesbarer Byte-Stream zwei Arten von Readern erstellen:

“BYOB” steht für “Bring Your Own Buffer” und bedeutet, dass wir einen Puffer (einen ArrayBufferView) an reader.read() übergeben können. Danach wird dieser ArrayBufferView freigegeben und kann nicht mehr verwendet werden. Aber .read() gibt seine Daten in einem neuen ArrayBufferView zurück, der denselben Typ hat und auf denselben Bereich desselben ArrayBuffers zugreift.

Zusätzlich haben lesbare Byte-Streams unterschiedliche Controller: Sie sind Instanzen von ReadableByteStreamController (im Gegensatz zu ReadableStreamDefaultController). Abgesehen davon, dass zugrunde liegende Quellen gezwungen werden, ArrayBufferViews (TypedArrays oder DataViews) einzureihen, unterstützt er auch ReadableStreamBYOBReaders über seine Eigenschaft .byobRequest. Eine zugrunde liegende Quelle schreibt ihre Daten in die BYOBRequest, die in dieser Eigenschaft gespeichert ist. Der Web Streams Standard hat zwei Beispiele für die Verwendung von .byobRequest in seinem Abschnitt “Examples of creating streams”.

10.9.2 Beispiel: ein unendlicher lesbarer Byte-Stream, gefüllt mit Zufallsdaten

Im nächsten Beispiel erstellen wir einen unendlichen lesbaren Byte-Stream, der seine Chunks mit Zufallsdaten füllt (Inspiration: example4.mjs in “Implementing the Web Streams API in Node.js”).

import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);

const readableByteStream = new ReadableStream({
  type: 'bytes',
  async pull(controller) {
    const byobRequest = controller.byobRequest;
    await asyncRandomFill(byobRequest.view);
    byobRequest.respond(byobRequest.view.byteLength);
  },
});

const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);

Da readableByteStream unendlich ist, können wir ihn nicht durchlaufen. Deshalb lesen wir nur seinen ersten Chunk (Zeile B).

Der Puffer, den wir in Zeile A erstellen, wird übertragen und ist daher nach Zeile B unlesbar.

10.9.3 Beispiel: Komprimieren eines lesbaren Byte-Streams

Im folgenden Beispiel erstellen wir einen lesbaren Byte-Stream und leiten ihn durch einen Stream, der ihn in das GZIP-Format komprimiert.

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // 256 zeros
    controller.enqueue(new Uint8Array(256));
    controller.close();
  },
});
const transformedStream = readableByteStream.pipeThrough(
  new CompressionStream('gzip'));
await logChunks(transformedStream);

async function logChunks(readableByteStream) {
  const reader = readableByteStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) break;
      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

10.9.4 Beispiel: Lesen einer Webseite über fetch()

Das Ergebnis von fetch() wird zu einem Response-Objekt aufgelöst, dessen Eigenschaft .body ein lesbarer Byte-Stream ist. Wir konvertieren diesen Byte-Stream in einen Text-Stream über TextDecoderStream.

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
  new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

10.10 Node.js-spezifische Helfer

Node.js ist die einzige Web-Plattform, die die folgenden Hilfsfunktionen unterstützt, die sie als Utility Consumers bezeichnet.

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';

Diese Funktionen konvertieren Web ReadableStreams, Node.js Readables und AsyncIteratoren in Promises, die mit Folgendem erfüllt werden:

Binärdaten werden als UTF-8-kodiert angenommen.

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // TextEncoder converts strings to UTF-8 encoded Uint8Arrays
    const encoder = new TextEncoder();
    const view = encoder.encode('"😀"');
    assert.deepEqual(
      view,
      Uint8Array.of(34, 240, 159, 152, 128, 34)
    );
    controller.enqueue(view);
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

String-Streams funktionieren wie erwartet.

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  start(controller) {
    controller.enqueue('"😀"');
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

10.11 Weitere Lektüre

Alle in diesem Abschnitt genannten Materialien waren eine Quelle für dieses Kapitel.

Dieses Kapitel deckt nicht alle Aspekte der Web Streams API ab. Weitere Informationen finden Sie hier:

Weiteres Material