Web Streams sind ein Standard für Streams, der jetzt auf allen wichtigen Web-Plattformen unterstützt wird: Webbrowser, Node.js und Deno. (Streams sind eine Abstraktion zum sequenziellen Lesen und Schreiben von Daten in kleinen Stücken aus allen Arten von Quellen – Dateien, auf Servern gehostete Daten usw.)
Zum Beispiel gibt die globale Funktion fetch() (die Online-Ressourcen herunterlädt) asynchron eine Response zurück, die eine Eigenschaft .body mit einem Web-Stream hat.
Dieses Kapitel behandelt Web Streams in Node.js, aber die meisten Dinge, die wir lernen, gelten für alle Web-Plattformen, die sie unterstützen.
Wir beginnen mit einem Überblick über einige Grundlagen von Web Streams. Danach gehen wir schnell zu Beispielen über.
Streams sind eine Datenstruktur für den Zugriff auf Daten wie
Zwei ihrer Vorteile sind
Wir können mit großen Datenmengen arbeiten, da Streams es uns ermöglichen, diese in kleinere Stücke (sogenannte Chunks) aufzuteilen, die wir einzeln verarbeiten können.
Wir können mit derselben Datenstruktur, Streams, arbeiten, während wir verschiedene Daten verarbeiten. Das erleichtert die Wiederverwendung von Code.
Web Streams (oft wird "Web" weggelassen) sind ein relativ neuer Standard, der in Webbrowsern entstanden ist, aber jetzt auch von Node.js und Deno unterstützt wird (wie in dieser MDN-Kompatibilitätstabelle gezeigt).
In Web Streams sind Chunks normalerweise entweder
Es gibt drei Hauptarten von Web Streams
Ein ReadableStream dient zum Lesen von Daten aus einer Quelle. Code, der dies tut, wird als Konsument bezeichnet.
Ein WritableStream dient zum Schreiben von Daten in eine Senke. Code, der dies tut, wird als Produzent bezeichnet.
Ein TransformStream besteht aus zwei Streams
Die Idee ist, Daten zu transformieren, indem sie durch einen TransformStream "gepiped" werden. Das heißt, wir schreiben Daten in die beschreibbare Seite und lesen transformierte Daten von der lesbaren Seite. Die folgenden TransformStreams sind in die meisten JavaScript-Plattformen integriert (mehr dazu später)
TextDecoderStream wandelt solche Daten in Strings um.TextEncoderStream wandelt JavaScript-Strings in UTF-8-Daten um.CompressionStream komprimiert Binärdaten in GZIP und andere Komprimierungsformate.DecompressionStream dekomprimiert Binärdaten aus GZIP und anderen Komprimierungsformaten.ReadableStreams, WritableStreams und TransformStreams können zum Transport von Text- oder Binärdaten verwendet werden. Wir werden uns in diesem Kapitel meistens mit ersterem beschäftigen. Byte-Streams für Binärdaten werden am Ende kurz erwähnt.
Piping ist eine Operation, die es uns ermöglicht, einen ReadableStream an einen WritableStream zu pipen: Solange der ReadableStream Daten produziert, liest diese Operation diese Daten und schreibt sie in den WritableStream. Wenn wir nur zwei Streams verbinden, erhalten wir eine bequeme Möglichkeit, Daten von einem Ort zum anderen zu übertragen (z.B. eine Datei zu kopieren). Wir können aber auch mehr als zwei Streams verbinden und Pipe Chains erhalten, die Daten auf vielfältige Weise verarbeiten können. Dies ist ein Beispiel für eine Pipe Chain
Ein ReadableStream wird mit einem TransformStream verbunden, indem der erstere an die beschreibbare Seite letzteren gepiped wird. Ebenso wird ein TransformStream mit einem anderen TransformStream verbunden, indem die lesbare Seite des ersteren an die beschreibbare Seite des letzteren gepiped wird. Und ein TransformStream wird mit einem WritableStream verbunden, indem die lesbare Seite des ersteren an letzteren gepiped wird.
Ein Problem bei Pipe Chains ist, dass ein Mitglied mehr Daten empfangen kann, als es im Moment verarbeiten kann. Backpressure ist eine Technik zur Lösung dieses Problems: Sie ermöglicht es einem Datenempfänger, seinem Sender mitzuteilen, dass er vorübergehend keine Daten mehr senden soll, damit der Empfänger nicht überfordert wird.
Eine andere Sichtweise auf Backpressure ist ein Signal, das rückwärts durch eine Pipe Chain wandert, von einem Mitglied, das überlastet wird, zum Anfang der Kette. Betrachten wir als Beispiel die folgende Pipe Chain
ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream
So wandert Backpressure durch diese Kette
Wir haben den Anfang der Pipe Chain erreicht. Daher sammeln sich keine Daten im ReadableStream an (der ebenfalls gepuffert ist), und der WritableStream hat Zeit, sich zu erholen. Sobald dies geschehen ist, signalisiert er, dass er wieder bereit ist, Daten zu empfangen. Dieses Signal wandert ebenfalls durch die Kette zurück, bis es den ReadableStream erreicht und die Datenverarbeitung wieder aufgenommen wird.
Bei diesem ersten Blick auf Backpressure wurden mehrere Details weggelassen, um die Sache einfacher zu gestalten. Diese werden später behandelt.
In Node.js sind Web Streams aus zwei Quellen verfügbar
'node:stream/web'Momentan unterstützt nur eine API Web Streams direkt in Node.js – die Fetch API
const response = await fetch('https://example.com');
const readableStream = response.body;Für andere Dinge müssen wir eine der folgenden statischen Methoden im Modul 'node:stream' verwenden, um entweder einen Node.js-Stream in einen Web-Stream oder umgekehrt zu konvertieren
Readable.toWeb(nodeReadable)Readable.fromWeb(webReadableStream, options?)Writable.toWeb(nodeWritable)Writable.fromWeb(webWritableStream, options?)Duplex.toWeb(nodeDuplex)Duplex.fromWeb(webTransformStream, options?)Eine weitere API unterstützt Web Streams teilweise: FileHandles haben die Methode .readableWebStream().
ReadableStreams ermöglichen es uns, Daten-Chunks aus verschiedenen Quellen zu lesen. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)
interface ReadableStream<TChunk> {
getReader(): ReadableStreamDefaultReader<TChunk>;
readonly locked: boolean;
[Symbol.asyncIterator](): AsyncIterator<TChunk>;
cancel(reason?: any): Promise<void>;
pipeTo(
destination: WritableStream<TChunk>,
options?: StreamPipeOptions
): Promise<void>;
pipeThrough<TChunk2>(
transform: ReadableWritablePair<TChunk2, TChunk>,
options?: StreamPipeOptions
): ReadableStream<TChunk2>;
// Not used in this chapter:
tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}
interface StreamPipeOptions {
signal?: AbortSignal;
preventClose?: boolean;
preventAbort?: boolean;
preventCancel?: boolean;
}Erklärungen zu diesen Eigenschaften
.getReader() gibt einen Reader zurück – ein Objekt, über das wir von einem ReadableStream lesen können. ReadableStreams, die Reader zurückgeben, sind ähnlich wie Iterables, die Iteratoren zurückgeben..locked: Es kann nur einen aktiven Reader pro ReadableStream gleichzeitig geben. Solange ein Reader in Gebrauch ist, ist der ReadableStream gesperrt und .getReader() kann nicht aufgerufen werden.[Symbol.asyncIterator](https://exploringjs.de/impatient-js/ch_async-iteration.html): Diese Methode macht ReadableStreams asynchron iterierbar. Sie ist derzeit nur auf einigen Plattformen implementiert..cancel(reason) bricht den Stream ab, weil der Konsument kein Interesse mehr daran hat. reason wird an die .cancel()-Methode der underlying source des ReadableStreams weitergegeben (mehr dazu später). Das zurückgegebene Promise wird erfüllt, wenn diese Operation abgeschlossen ist..pipeTo() leitet den Inhalt seines ReadableStreams an einen WritableStream weiter. Das zurückgegebene Promise wird erfüllt, wenn diese Operation abgeschlossen ist. .pipeTo() stellt sicher, dass Backpressure, Schließen, Fehler usw. alle korrekt durch eine Pipe Chain propagiert werden. Wir können Optionen über den zweiten Parameter angeben.signal ermöglicht es uns, ein AbortSignal an diese Methode zu übergeben, was uns ermöglicht, das Piping über einen AbortController abzubrechen..preventClose: Wenn true, verhindert es, dass der WritableStream geschlossen wird, wenn der ReadableStream geschlossen wird. Das ist nützlich, wenn wir mehr als einen ReadableStream an denselben WritableStream pipen wollen..pipeThrough() verbindet seinen ReadableStream mit einem ReadableWritablePair (ungefähr: einem TransformStream, mehr dazu später). Es gibt den resultierenden ReadableStream zurück (d.h. die lesbare Seite des ReadableWritablePair).Die folgenden Unterkapitel behandeln drei Möglichkeiten, ReadableStreams zu konsumieren
Wir können Reader verwenden, um Daten aus ReadableStreams zu lesen. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)
interface ReadableStreamGenericReader {
readonly closed: Promise<undefined>;
cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
extends ReadableStreamGenericReader
{
releaseLock(): void;
read(): Promise<ReadableStreamReadResult<TChunk>>;
}
interface ReadableStreamReadResult<TChunk> {
done: boolean;
value: TChunk | undefined;
}Erklärungen zu diesen Eigenschaften
.closed: Dieses Promise wird erfüllt, nachdem der Stream geschlossen wurde. Es wird abgelehnt, wenn der Stream fehlerhaft ist oder wenn der Lock eines Readers vor dem Schließen des Streams freigegeben wird..cancel(): In einem aktiven Reader bricht diese Methode den zugehörigen ReadableStream ab..releaseLock() deaktiviert den Reader und entsperrt seinen Stream..read() gibt ein Promise für ein ReadableStreamReadResult (einen verpackten Chunk) zurück, das zwei Eigenschaften hat.done ist ein boolescher Wert, der false ist, solange Chunks gelesen werden können, und true nach dem letzten Chunk..value ist der Chunk (oder undefined nach dem letzten Chunk).ReadableStreamReadResult mag Ihnen bekannt vorkommen, wenn Sie wissen, wie Iteration funktioniert: ReadableStreams ähneln Iterables, Reader ähneln Iteratoren, und ReadableStreamReadResults ähneln Objekten, die von der Iterator-Methode .next() zurückgegeben werden.
Der folgende Code demonstriert das Protokoll zur Verwendung von Readern
const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
while (true) {
const {done, value: chunk} = await reader.read(); // (C)
if (done) break;
// Use `chunk`
}
} finally {
reader.releaseLock(); // (D)
}Einen Reader erhalten. Wir können nicht direkt von readableStream lesen, wir müssen zuerst einen Reader erwerben (Zeile A). Jeder ReadableStream kann höchstens einen Reader haben. Nachdem ein Reader erworben wurde, ist readableStream gesperrt (Zeile B). Bevor wir .getReader() erneut aufrufen können, müssen wir .releaseLock() aufrufen (Zeile D).
Chunks lesen. .read() gibt ein Promise für ein Objekt mit den Eigenschaften .done und .value zurück (Zeile C). Nachdem der letzte Chunk gelesen wurde, ist .done true. Dieser Ansatz ähnelt der Funktionsweise der asynchronen Iteration in JavaScript.
Im folgenden Beispiel lesen wir Chunks (Strings) aus einer Textdatei data.txt
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)
const reader = webReadableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}
} finally {
reader.releaseLock();
}
// Output:
// 'Content of text file\n'Wir konvertieren einen Node.js Readable in einen Web ReadableStream (Zeile A). Dann verwenden wir das zuvor erklärte Protokoll, um die Chunks zu lesen.
Im nächsten Beispiel verketten wir alle Chunks eines ReadableStreams zu einem String und geben ihn zurück
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString(readableStream) {
const reader = readableStream.getReader();
try {
let result = '';
while (true) {
const {done, value} = await reader.read();
if (done) {
return result; // (A)
}
result += value;
}
} finally {
reader.releaseLock(); // (B)
}
}Praktischerweise wird die finally-Klausel immer ausgeführt – egal wie wir die try-Klausel verlassen. Das heißt, der Lock wird korrekt freigegeben (Zeile B), wenn wir ein Ergebnis zurückgeben (Zeile A).
ReadableStreams können auch über asynchrone Iteration konsumiert werden
const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
while (true) {
let chunk;
({done: exhaustive, value: chunk} = await iterator.next());
if (exhaustive) break;
console.log(chunk);
}
} finally {
// If the loop was terminated before we could iterate exhaustively
// (via an exception or `return`), we must call `iterator.return()`.
// Check if that was the case.
if (!exhaustive) {
iterator.return();
}
}Glücklicherweise kümmert sich die for-await-of-Schleife um alle Details der asynchronen Iteration für uns
for await (const chunk of readableStream) {
console.log(chunk);
}Lassen Sie uns unseren vorherigen Versuch, Text aus einer Datei zu lesen, wiederholen. Diesmal verwenden wir asynchrone Iteration anstelle eines Readers
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
console.log(chunk);
}
// Output:
// 'Content of text file'Wir haben zuvor einen Reader verwendet, um einen String mit dem Inhalt eines ReadableStreams zusammenzufügen. Mit asynchroner Iteration wird der Code einfacher
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString2(readableStream) {
let result = '';
for await (const chunk of readableStream) {
result += chunk;
}
return result;
}Momentan unterstützen Node.js und Deno asynchrone Iteration über ReadableStreams, Webbrowser jedoch nicht: Es gibt ein GitHub-Issue, das auf Bug-Reports verweist.
Da noch nicht ganz klar ist, wie die asynchrone Iteration in Browsern unterstützt wird, ist Wrapping eine sicherere Wahl als Polyfilling. Der folgende Code basiert auf einem Vorschlag im Chromium-Bugreport
async function* getAsyncIterableFor(readableStream) {
const reader = readableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}ReadableStreams haben zwei Methoden zum Piping
readableStream.pipeTo(writeableStream) gibt synchron ein Promise p zurück. Es liest asynchron alle Chunks von readableStream und schreibt sie in writableStream. Wenn es fertig ist, wird p erfüllt.
Wir werden Beispiele für .pipeTo() sehen, wenn wir WritableStreams untersuchen, da es eine bequeme Möglichkeit bietet, Daten in sie zu übertragen.
readableStream.pipeThrough(transformStream) pipet readableStream in transformStream.writable und gibt transformStream.readable zurück (jeder TransformStream hat diese Eigenschaften, die sich auf seine beschreibbare und seine lesbare Seite beziehen). Eine andere Sichtweise auf diese Operation ist, dass wir einen neuen ReadableStream erstellen, indem wir einen transformStream mit einem readableStream verbinden.
Wir werden Beispiele für .pipeThrough() sehen, wenn wir TransformStreams untersuchen, da diese Methode die Hauptverwendung dafür ist.
Wenn wir eine externe Quelle über einen ReadableStream lesen möchten, können wir sie in ein Adapterobjekt wickeln und dieses Objekt dem ReadableStream-Konstruktor übergeben. Das Adapterobjekt wird als underlying source des ReadableStreams bezeichnet (Quing-Strategien werden später erklärt, wenn wir uns näher mit Backpressure befassen)
new ReadableStream(underlyingSource?, queuingStrategy?)Dies ist der Typ von underlying sources (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)
interface UnderlyingSource<TChunk> {
start?(
controller: ReadableStreamController<TChunk>
): void | Promise<void>;
pull?(
controller: ReadableStreamController<TChunk>
): void | Promise<void>;
cancel?(reason?: any): void | Promise<void>;
// Only used in byte streams and ignored in this section:
type: 'bytes' | undefined;
autoAllocateChunkSize: bigint;
}Dies ist, wann der ReadableStream diese Methoden aufruft
.start(controller) wird unmittelbar nach dem Aufruf des Konstruktors von ReadableStream aufgerufen.
.pull(controller) wird aufgerufen, wann immer Platz in der internen Warteschlange des ReadableStreams ist. Es wird wiederholt aufgerufen, bis die Warteschlange wieder voll ist. Diese Methode wird nur aufgerufen, nachdem .start() abgeschlossen ist. Wenn .pull() nichts enqueued, wird es nicht wieder aufgerufen.
.cancel(reason) wird aufgerufen, wenn der Konsument eines ReadableStreams ihn über readableStream.cancel() oder reader.cancel() abbricht. reason ist der Wert, der an diese Methoden übergeben wurde.
Jede dieser Methoden kann ein Promise zurückgeben, und erst wenn das Promise erfüllt oder abgelehnt ist, werden weitere Schritte unternommen. Das ist nützlich, wenn wir etwas Asynchrones tun möchten.
Der Parameter controller von .start() und .pull() ermöglicht ihnen den Zugriff auf den Stream. Er hat den folgenden Typ
type ReadableStreamController<TChunk> =
| ReadableStreamDefaultController<TChunk>
| ReadableByteStreamController<TChunk> // ignored here
;
interface ReadableStreamDefaultController<TChunk> {
enqueue(chunk?: TChunk): void;
readonly desiredSize: number | null;
close(): void;
error(err?: any): void;
}Vorerst sind Chunks Strings. Wir werden später zu Byte-Streams kommen, wo Uint8Arrays üblich sind. Dies ist, was die Methoden tun
.enqueue(chunk) fügt chunk der internen Warteschlange des ReadableStreams hinzu..desiredSize gibt an, wie viel Platz in der Warteschlange vorhanden ist, in die .enqueue() schreibt. Sie ist null, wenn die Warteschlange voll ist, und negativ, wenn sie ihre maximale Größe überschritten hat. Daher müssen wir das Enqueuing stoppen, wenn die gewünschte Größe null oder negativ ist.null..close() schließt den ReadableStream. Konsumenten können die Warteschlange immer noch leeren, aber danach endet der Stream. Es ist wichtig, dass eine underlying source diese Methode aufruft – andernfalls wird das Lesen ihres Streams niemals enden..error(err) versetzt den Stream in einen Fehlerzustand: Alle zukünftigen Interaktionen damit werden mit dem Fehlerwert err fehlschlagen.In unserem ersten Beispiel für die Implementierung einer underlying source stellen wir nur die Methode .start() bereit. Wir werden Anwendungsfälle für .pull() im nächsten Unterkapitel sehen.
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('First line\n'); // (A)
controller.enqueue('Second line\n'); // (B)
controller.close(); // (C)
},
});
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'First line\n'
// 'Second line\n'Wir verwenden den Controller, um einen Stream mit zwei Chunks zu erstellen (Zeile A und Zeile B). Es ist wichtig, dass wir den Stream schließen (Zeile C). Andernfalls würde die for-await-of-Schleife niemals enden!
Beachten Sie, dass diese Art des Enqueuings nicht ganz sicher ist: Es besteht die Gefahr, die Kapazität der internen Warteschlange zu überschreiten. Wir werden bald sehen, wie wir dieses Risiko vermeiden können.
Ein gängiges Szenario ist die Umwandlung einer Push- oder Pull-Source in einen ReadableStream. Ob die Quelle Push oder Pull ist, bestimmt, wie wir uns mit unserer UnderlyingSource an den ReadableStream hängen
Push-Quelle: Eine solche Quelle benachrichtigt uns, wenn neue Daten vorhanden sind. Wir verwenden .start(), um Listener und unterstützende Datenstrukturen einzurichten. Wenn wir zu viele Daten erhalten und die gewünschte Größe nicht mehr positiv ist, müssen wir unserer Quelle mitteilen, dass sie pausieren soll. Wenn .pull() später aufgerufen wird, können wir sie wieder aktivieren. Das Pausieren einer externen Quelle als Reaktion auf das Nicht-mehr-positiv-Werden der gewünschten Größe wird als Anwenden von Backpressure bezeichnet.
Pull-Quelle: Wir fordern von einer solchen Quelle neue Daten an – oft asynchron. Daher tun wir normalerweise nicht viel in .start() und rufen Daten ab, wann immer .pull() aufgerufen wird.
Wir werden als Nächstes Beispiele für beide Arten von Quellen sehen.
Im folgenden Beispiel wickeln wir einen ReadableStream um einen Socket – der uns seine Daten pusht (er ruft uns an). Dieses Beispiel ist der Web-Stream-Spezifikation entnommen
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
socket.ondata = event => {
controller.enqueue(event.data);
if (controller.desiredSize <= 0) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
socket.readStop();
}
};
socket.onend = () => controller.close();
socket.onerror = () => controller.error(
new Error('The socket errored!'));
},
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
socket.readStart();
},
cancel() {
socket.close();
},
});
}Die Hilfsfunktion iterableToReadableStream() nimmt ein Iterable von Chunks und wandelt es in einen ReadableStream um
/**
* @param iterable an iterable (asynchronous or synchronous)
*/
function iterableToReadableStream(iterable) {
return new ReadableStream({
start() {
if (typeof iterable[Symbol.asyncIterator] === 'function') {
this.iterator = iterable[Symbol.asyncIterator]();
} else if (typeof iterable[Symbol.iterator] === 'function') {
this.iterator = iterable[Symbol.iterator]();
} else {
throw new Error('Not an iterable: ' + iterable);
}
},
async pull(controller) {
if (this.iterator === null) return;
// Sync iterators return non-Promise values,
// but `await` doesn’t mind and simply passes them on
const {value, done} = await this.iterator.next();
if (done) {
this.iterator = null;
controller.close();
return;
}
controller.enqueue(value);
},
cancel() {
this.iterator = null;
controller.close();
},
});
}Lassen Sie uns eine Async-Generatorfunktion verwenden, um ein asynchrones Iterable zu erstellen und dieses Iterable in einen ReadableStream umzuwandeln
async function* genAsyncIterable() {
yield 'how';
yield 'are';
yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'how'
// 'are'
// 'you'iterableToReadableStream() funktioniert auch mit synchronen Iterables
const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'hello'
// 'everyone'Es wird möglicherweise irgendwann eine statische Hilfsmethode ReadableStream.from() geben, die diese Funktionalität bietet (siehe ihr Pull Request für weitere Informationen).
WritableStreams ermöglichen es uns, Daten-Chunks in verschiedene Senken zu schreiben. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)
interface WritableStream<TChunk> {
getWriter(): WritableStreamDefaultWriter<TChunk>;
readonly locked: boolean;
close(): Promise<void>;
abort(reason?: any): Promise<void>;
}Erklärungen zu diesen Eigenschaften
.getWriter() gibt einen Writer zurück – ein Objekt, über das wir in einen WritableStream schreiben können..locked: Es kann nur einen aktiven Writer pro WritableStream gleichzeitig geben. Solange ein Writer in Gebrauch ist, ist der WritableStream gesperrt und .getWriter() kann nicht aufgerufen werden..close() schließt den Stream.abort() bricht den Stream abDie folgenden Unterkapitel behandeln zwei Ansätze zum Senden von Daten an WritableStreams
Wir können Writer verwenden, um in WritableStreams zu schreiben. Sie haben den folgenden Typ (scheuen Sie sich nicht, diesen Typ und die Erklärungen seiner Eigenschaften zu überfliegen; sie werden erneut erklärt, wenn wir ihnen in Beispielen begegnen)
interface WritableStreamDefaultWriter<TChunk> {
readonly desiredSize: number | null;
readonly ready: Promise<undefined>;
write(chunk?: TChunk): Promise<void>;
releaseLock(): void;
close(): Promise<void>;
readonly closed: Promise<undefined>;
abort(reason?: any): Promise<void>;
}Erklärungen zu diesen Eigenschaften
.desiredSize gibt an, wie viel Platz in der Warteschlange dieses WriteStreams vorhanden ist. Sie ist null, wenn die Warteschlange voll ist, und negativ, wenn sie ihre maximale Größe überschritten hat. Daher müssen wir mit dem Schreiben aufhören, wenn die gewünschte Größe null oder negativ ist.
null..ready gibt ein Promise zurück, das erfüllt wird, wenn sich die gewünschte Größe von nicht-positiv zu positiv ändert. Das bedeutet, dass keine Backpressure aktiv ist und es in Ordnung ist, Daten zu schreiben. Wenn sich die gewünschte Größe später wieder in nicht-positiv ändert, wird ein neues ausstehendes Promise erstellt und zurückgegeben.
.write() schreibt einen Chunk in den Stream. Es gibt ein Promise zurück, das nach erfolgreichem Schreiben erfüllt und bei einem Fehler abgelehnt wird.
.releaseLock() gibt den Lock des Writers auf seinen Stream frei.
.close() hat die gleiche Wirkung wie das Schließen des Streams des Writers.
.closed gibt ein Promise zurück, das erfüllt wird, wenn der Stream geschlossen ist.
.abort() hat die gleiche Wirkung wie das Abbrechen des Streams des Writers.
Der folgende Code zeigt das Protokoll zur Verwendung von Write-Objekten
const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
// Writing the chunks (explained later)
} finally {
writer.releaseLock(); // (C)
}Wir können nicht direkt in einen writableStream schreiben, wir müssen zuerst einen Writer erwerben (Zeile A). Jeder WritableStream kann höchstens einen Writer haben. Nachdem ein Writer erworben wurde, ist writableStream gesperrt (Zeile B). Bevor wir .getWriter() erneut aufrufen können, müssen wir .releaseLock() aufrufen (Zeile C).
Es gibt drei Ansätze zum Schreiben von Chunks.
.write() (ineffiziente Behandlung von Backpressure)Der erste Schreibansatz besteht darin, auf jedes Ergebnis von .write() zu warten
await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();Das von .write() zurückgegebene Promise wird erfüllt, wenn der Chunk, den wir ihm übergeben haben, erfolgreich geschrieben wurde. Was genau "erfolgreich geschrieben" bedeutet, hängt davon ab, wie ein WritableStream implementiert ist – z.B. bei einem Dateistream könnte der Chunk an das Betriebssystem gesendet worden sein, aber immer noch in einem Cache liegen und daher noch nicht tatsächlich auf die Festplatte geschrieben worden sein.
Das von .close() zurückgegebene Promise wird erfüllt, wenn der Stream geschlossen wird.
Ein Nachteil dieses Schreibansatzes ist, dass das Warten, bis das Schreiben erfolgreich ist, bedeutet, dass die Warteschlange nicht genutzt wird. Infolgedessen kann der Datendurchsatz geringer sein.
.write()-Ablehnungen (Ignorieren von Backpressure)Beim zweiten Schreibansatz ignorieren wir die von .write() zurückgegebenen Promises und warten nur auf das von .close() zurückgegebene Promise
writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errorsDie synchronen Aufrufe von .write() fügen Chunks zur internen Warteschlange des WritableStreams hinzu. Indem wir die zurückgegebenen Promises nicht abwarten, warten wir nicht, bis jeder Chunk geschrieben ist. Das Abwarten von .close() stellt jedoch sicher, dass die Warteschlange leer ist und alle Schreibvorgänge erfolgreich waren, bevor wir fortfahren.
Das Aufrufen von .catch() in Zeile A und Zeile B ist notwendig, um Warnungen über nicht behandelte Promise-Ablehnungen zu vermeiden, wenn beim Schreiben etwas schief geht. Solche Warnungen werden oft in die Konsole geloggt. Wir können es uns leisten, die von .write() gemeldeten Fehler zu ignorieren, da .close() sie uns ebenfalls melden wird.
Der vorherige Code kann durch Verwendung einer Hilfsfunktion, die Promise-Ablehnungen ignoriert, verbessert werden
ignoreRejections(
writer.write('Chunk 1'),
writer.write('Chunk 2'),
);
await writer.close(); // reports errors
function ignoreRejections(...promises) {
for (const promise of promises) {
promise.catch(() => {});
}
}Ein Nachteil dieses Ansatzes ist, dass Backpressure ignoriert wird: Wir gehen einfach davon aus, dass die Warteschlange groß genug ist, um alles aufzunehmen, was wir schreiben.
.ready (effiziente Behandlung von Backpressure)In diesem Schreibansatz behandeln wir Backpressure effizient, indem wir auf den Writer-Getter .ready warten
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});
await writer.close(); // reports errorsDas Promise in .ready wird erfüllt, wann immer der Stream von Backpressure zu keiner Backpressure wechselt.
In diesem Beispiel erstellen wir eine Textdatei data.txt über einen WritableStream
import * as fs from 'node:fs';
import {Writable} from 'node:stream';
const nodeWritable = fs.createWriteStream(
'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)
const writer = webWritableStream.getWriter();
try {
await writer.write('First line\n');
await writer.write('Second line\n');
await writer.close();
} finally {
writer.releaseLock()
}In Zeile A erstellen wir einen Node.js-Stream für die Datei data.txt. In Zeile B konvertieren wir diesen Stream in einen Web-Stream. Dann verwenden wir einen Writer, um Strings hineinzuschreiben.
Anstatt Writer zu verwenden, können wir auch zu WritableStreams schreiben, indem wir ReadableStreams an sie pipen
await readableStream.pipeTo(writableStream);Das von .pipeTo() zurückgegebene Promise wird erfüllt, wenn das Piping erfolgreich abgeschlossen ist.
Piping wird nach Abschluss oder Pause der aktuellen Aufgabe ausgeführt. Der folgende Code demonstriert dies
const readableStream = new ReadableStream({ // (A)
start(controller) {
controller.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
},
});
const writableStream = new WritableStream({ // (B)
write(chunk) {
console.log('WRITE: ' + JSON.stringify(chunk));
},
close() {
console.log('CLOSE WritableStream');
},
});
console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');
// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'In Zeile A erstellen wir einen ReadableStream. In Zeile B erstellen wir einen WritableStream.
Wir sehen, dass .pipeTo() (Zeile C) sofort zurückkehrt. In einer neuen Aufgabe werden Chunks gelesen und geschrieben. Dann wird writableStream geschlossen und schließlich promise erfüllt.
Im folgenden Beispiel erstellen wir einen WritableStream für eine Datei und pipen einen ReadableStream dorthin
const webReadableStream = new ReadableStream({ // (A)
async start(controller) {
controller.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
},
});
const nodeWritable = fs.createWriteStream( // (B)
'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)
await webReadableStream.pipeTo(webWritableStream); // (D)In Zeile A erstellen wir einen ReadableStream. In Zeile B erstellen wir einen Node.js-Stream für die Datei data.txt. In Zeile C konvertieren wir diesen Stream in einen Web-Stream. In Zeile D pipen wir unseren webReadableStream an den WritableStream für die Datei.
Im folgenden Beispiel schreiben wir zwei ReadableStreams in einen einzigen WritableStream.
function createReadableStream(prefix) {
return new ReadableStream({
async start(controller) {
controller.enqueue(prefix + 'chunk 1');
controller.enqueue(prefix + 'chunk 2');
controller.close();
},
});
}
const writableStream = new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
},
close() {
console.log('CLOSE');
},
abort(err) {
console.log('ABORT ' + err);
},
});
await createReadableStream('Stream 1: ')
.pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
.pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();
// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'Wir weisen .pipeTo() an, den WritableStream nach dem Schließen des ReadableStreams nicht zu schließen (Zeile A und Zeile B). Daher bleibt der WritableStream nach Zeile A offen und wir können einen weiteren ReadableStream daran pipen.
Wenn wir über einen WritableStream in eine externe Senke schreiben möchten, können wir sie in ein Adapterobjekt wickeln und dieses Objekt dem WritableStream-Konstruktor übergeben. Das Adapterobjekt wird als underlying sink des WritableStreams bezeichnet (Quing-Strategien werden später erklärt, wenn wir uns näher mit Backpressure befassen)
new WritableStream(underlyingSink?, queuingStrategy?)Dies ist die Art von zugrunde liegenden Sinks (können Sie diese Art und die Erklärungen ihrer Eigenschaften gerne überfliegen; sie werden wieder erklärt, wenn wir ihnen in Beispielen begegnen)
interface UnderlyingSink<TChunk> {
start?(
controller: WritableStreamDefaultController
): void | Promise<void>;
write?(
chunk: TChunk,
controller: WritableStreamDefaultController
): void | Promise<void>;
close?(): void | Promise<void>;;
abort?(reason?: any): void | Promise<void>;
}Erklärungen zu diesen Eigenschaften
.start(controller) wird unmittelbar nach dem Aufruf des Konstruktors von WritableStream aufgerufen. Wenn wir etwas Asynchrones tun, können wir ein Promise zurückgeben. In dieser Methode können wir uns auf das Schreiben vorbereiten.
.write(chunk, controller) wird aufgerufen, wenn ein neuer Chunk zum Schreiben an den externen Sink bereit ist. Wir können Backpressure ausüben, indem wir ein Promise zurückgeben, das erfüllt wird, sobald die Backpressure verschwunden ist.
.close() wird aufgerufen, nachdem writer.close() aufgerufen wurde und alle wartenden Schreibvorgänge erfolgreich waren. In dieser Methode können wir nach dem Schreiben aufräumen.
.abort(reason) wird aufgerufen, wenn writeStream.abort() oder writer.abort() aufgerufen wurden. reason ist der Wert, der an diese Methoden übergeben wird.
Der Parameter controller von .start() und .write() ermöglicht es ihnen, den WritableStream zu fehlerhaften. Er hat den folgenden Typ
interface WritableStreamDefaultController {
readonly signal: AbortSignal;
error(err?: any): void;
}.signal ist ein AbortSignal, auf das wir hören können, wenn wir einen Schreib- oder Schließvorgang abbrechen möchten, wenn der Stream abgebrochen wird..error(err) versetzt den WritableStream in einen Fehlerzustand: Er wird geschlossen und alle zukünftigen Interaktionen damit schlagen mit dem Fehlerwert err fehl.Im nächsten Beispiel leiten wir einen ReadableStream an einen WritableStream weiter, um zu überprüfen, wie der ReadableStream Chunks produziert.
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('First chunk');
controller.enqueue('Second chunk');
controller.close();
},
});
await readableStream.pipeTo(
new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
},
close() {
console.log('CLOSE');
},
abort(err) {
console.log('ABORT ' + err);
},
})
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'Im nächsten Beispiel erstellen wir eine Unterklasse von WriteStream, die alle geschriebenen Chunks in einem String sammelt. Wir können auf diesen String über die Methode .getString() zugreifen.
class StringWritableStream extends WritableStream {
#string = '';
constructor() {
super({
// We need to access the `this` of `StringWritableStream`.
// Hence the arrow function (and not a method).
write: (chunk) => {
this.#string += chunk;
},
});
}
getString() {
return this.#string;
}
}
const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
} finally {
writer.releaseLock()
}
assert.equal(
stringStream.getString(),
'How are you?'
);Ein Nachteil dieses Ansatzes ist, dass wir zwei APIs mischen: die API von WritableStream und unsere neue String-Stream-API. Eine Alternative ist, an den WritableStream zu delegieren, anstatt ihn zu erweitern.
function StringcreateWritableStream() {
let string = '';
return {
stream: new WritableStream({
write(chunk) {
string += chunk;
},
}),
getString() {
return string;
},
};
}
const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
} finally {
writer.releaseLock()
}
assert.equal(
stringStream.getString(),
'How are you?'
);Diese Funktionalität könnte auch über eine Klasse (anstatt als Factory-Funktion für Objekte) implementiert werden.
Ein TransformStream
Die gebräuchlichste Art, TransformStreams zu verwenden, ist, sie “durchzuleiten” (pipe through).
const transformedStream = readableStream.pipeThrough(transformStream);.pipeThrough() leitet readableStream an die beschreibbare Seite von transformStream weiter und gibt deren lesbare Seite zurück. Mit anderen Worten: Wir haben einen neuen ReadableStream erstellt, der eine transformierte Version von readableStream ist.
.pipeThrough() akzeptiert nicht nur TransformStreams, sondern jedes Objekt, das die folgende Form hat:
interface ReadableWritablePair<RChunk, WChunk> {
readable: ReadableStream<RChunk>;
writable: WritableStream<WChunk>;
}Node.js unterstützt die folgenden Standard-TransformStreams:
TextEncoderStream und TextDecoderStreamTextDecoderStream behandelt diese Fälle korrekt.TextEncoderStream, TextDecoderStream).CompressionStream, DecompressionStreamdeflate (ZLIB Compressed Data Format), deflate-raw (DEFLATE-Algorithmus), gzip (GZIP-Dateiformat).CompressionStream, DecompressionStream).Im folgenden Beispiel dekodieren wir einen Stream von UTF-8-kodierten Bytes.
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}response.body ist ein ReadableByteStream, dessen Chunks Instanzen von Uint8Array sind (TypedArrays). Wir leiten diesen Stream durch einen TextDecoderStream, um einen Stream mit String-Chunks zu erhalten.
Beachten Sie, dass das separate Übersetzen jedes Byte-Chunks (z. B. über einen TextDecoder) nicht funktioniert, da ein einzelner Unicode-Codepunkt in UTF-8 als bis zu vier Bytes kodiert wird und diese Bytes möglicherweise nicht alle im selben Chunk enthalten sind.
Das folgende Node.js-Modul protokolliert alles, was ihm über die Standardeingabe gesendet wird.
// echo-stdin.mjs
import {Readable} from 'node:stream';
const webStream = Readable.toWeb(process.stdin)
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
console.log('>>>', chunk);
}Wir können auf die Standardeingabe über einen Stream zugreifen, der in process.stdin gespeichert ist (process ist eine globale Node.js-Variable). Wenn wir keine Kodierung für diesen Stream festlegen und ihn über Readable.toWeb() konvertieren, erhalten wir einen Byte-Stream. Wir leiten ihn durch einen TextDecoderStream, um einen Text-Stream zu erhalten.
Beachten Sie, dass wir die Standardeingabe inkrementell verarbeiten: Sobald ein weiterer Chunk verfügbar ist, protokollieren wir ihn. Mit anderen Worten, wir warten nicht, bis die Standardeingabe abgeschlossen ist. Das ist nützlich, wenn die Daten entweder groß sind oder nur intermittierend gesendet werden.
Wir können einen benutzerdefinierten TransformStream implementieren, indem wir ein Transformer-Objekt an den Konstruktor von TransformStream übergeben. Ein solches Objekt hat den folgenden Typ (können Sie diesen Typ und die Erklärungen seiner Eigenschaften gerne überfliegen; sie werden wieder erklärt, wenn wir ihnen in Beispielen begegnen).
interface Transformer<TInChunk, TOutChunk> {
start?(
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
transform?(
chunk: TInChunk,
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
flush?(
controller: TransformStreamDefaultController<TOutChunk>
): void | Promise<void>;
}Erklärungen zu diesen Eigenschaften
.start(controller) wird unmittelbar nach dem Aufruf des Konstruktors von TransformStream aufgerufen. Hier können wir Dinge vorbereiten, bevor die Transformationen beginnen..transform(chunk, controller) führt die eigentlichen Transformationen durch. Er empfängt einen Eingabe-Chunk und kann seinen Parameter controller verwenden, um einen oder mehrere transformierte Ausgabe-Chunks einzureihen. Er kann auch wählen, nichts einzureihen..flush(controller) wird aufgerufen, nachdem alle Eingabe-Chunks erfolgreich transformiert wurden. Hier können wir nach Abschluss der Transformationen aufräumen.Jede dieser Methoden kann ein Promise zurückgeben, und erst wenn das Promise erfüllt oder abgelehnt ist, werden weitere Schritte unternommen. Das ist nützlich, wenn wir etwas Asynchrones tun möchten.
Der Parameter controller hat den folgenden Typ.
interface TransformStreamDefaultController<TOutChunk> {
enqueue(chunk?: TOutChunk): void;
readonly desiredSize: number | null;
terminate(): void;
error(err?: any): void;
}.enqueue(chunk) fügt chunk zur lesbaren Seite (Ausgabe) des TransformStream hinzu..desiredSize gibt die gewünschte Größe der internen Warteschlange der lesbaren Seite (Ausgabe) des TransformStream zurück..terminate() schließt die lesbare Seite (Ausgabe) und versetzt die beschreibbare Seite (Eingabe) des TransformStream in einen Fehlerzustand. Sie kann verwendet werden, wenn ein Transformer nicht an den verbleibenden Chunks der beschreibbaren Seite (Eingabe) interessiert ist und diese überspringen möchte..error(err) versetzt den TransformStream in einen Fehlerzustand: Alle zukünftigen Interaktionen damit schlagen mit dem Fehlerwert err fehl.Was ist mit Backpressure in einem TransformStream? Die Klasse leitet die Backpressure von ihrer lesbaren Seite (Ausgabe) an ihre beschreibbare Seite (Eingabe) weiter. Die Annahme ist, dass die Transformation die Datenmenge nicht wesentlich verändert. Daher können Transfomers davon absehen, Backpressure zu ignorieren. Sie könnte jedoch über transformStreamDefaultController.desiredSize erkannt und durch Rückgabe eines Promises von transformer.transform() weitergegeben werden.
Die folgende Unterklasse von TransformStream konvertiert einen Stream mit beliebigen Chunks in einen Stream, bei dem jeder Chunk genau eine Textzeile enthält. Das heißt, mit Ausnahme des letzten Chunks endet jeder Chunk mit einem Zeilenende (EOL)-String: '\n' unter Unix (inkl. macOS) und '\r\n' unter Windows.
class ChunksToLinesTransformer {
#previous = '';
transform(chunk, controller) {
let startSearch = this.#previous.length;
this.#previous += chunk;
while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = this.#previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = this.#previous.slice(0, eolIndex+1);
controller.enqueue(line);
this.#previous = this.#previous.slice(eolIndex+1);
startSearch = 0;
}
}
flush(controller) {
// Clean up and enqueue any text we’re still holding on to
if (this.#previous.length > 0) {
controller.enqueue(this.#previous);
}
}
}
class ChunksToLinesStream extends TransformStream {
constructor() {
super(new ChunksToLinesTransformer());
}
}
const stream = new ReadableStream({
async start(controller) {
controller.enqueue('multiple\nlines of\ntext');
controller.close();
},
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);
for await (const line of transformed) {
console.log('>>>', JSON.stringify(line));
}
// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'Beachten Sie, dass Deno's integrierter TextLineStream ähnliche Funktionalität bietet.
Tipp: Wir können diese Transformation auch über einen asynchronen Generator realisieren. Er würde asynchron über einen ReadableStream iterieren und ein asynchrones Iterable mit Zeilen zurückgeben. Seine Implementierung wird in §9.4 “Transforming readable streams via async generators” gezeigt.
Aufgrund der Tatsache, dass ReadableStreams asynchron iterierbar sind, können wir asynchrone Generatoren verwenden, um sie zu transformieren. Das führt zu sehr elegantem Code.
const stream = new ReadableStream({
async start(controller) {
controller.enqueue('one');
controller.enqueue('two');
controller.enqueue('three');
controller.close();
},
});
async function* prefixChunks(prefix, asyncIterable) {
for await (const chunk of asyncIterable) {
yield '> ' + chunk;
}
}
const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
console.log(transformedChunk);
}
// Output:
// '> one'
// '> two'
// '> three'Werfen wir einen genaueren Blick auf Backpressure. Betrachten Sie die folgende Pipe-Kette:
rs.pipeThrough(ts).pipeTo(ws);rs ist ein ReadableStream, ts ist ein TransformStream, ws ist ein WritableStream. Dies sind die Verbindungen, die durch den vorherigen Ausdruck erstellt werden (.pipeThrough verwendet .pipeTo, um rs mit der beschreibbaren Seite von ts zu verbinden).
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
Beobachtungen
rs kann als Mitglied einer Pipe-Kette betrachtet werden, das vor rs kommt.ws kann als Mitglied einer Pipe-Kette betrachtet werden, das nach ws kommt.Nehmen wir an, dass der zugrunde liegende Sink von ws langsam ist und der Puffer von ws schließlich voll ist. Dann geschehen die folgenden Schritte:
ws signalisiert, dass es voll ist.pipeTo stoppt das Lesen von ts.readable.ts.readable signalisiert, dass es voll ist.ts hört auf, Chunks von ts.writable nach ts.readable zu verschieben.ts.writable signalisiert, dass es voll ist.pipeTo stoppt das Lesen von rs.rs signalisiert, dass es für seine zugrunde liegende Quelle voll ist.Dieses Beispiel veranschaulicht, dass wir zwei Arten von Funktionalität benötigen:
Lassen Sie uns untersuchen, wie diese Funktionalitäten in der Web Streams API implementiert sind.
Backpressure wird von den Daten empfangenden Entitäten signalisiert. Web Streams haben zwei solcher Entitäten:
.write()..enqueue() aufruft.In beiden Fällen werden die Eingaben über Warteschlangen gepuffert. Das Signal zum Anwenden von Backpressure ist, wenn eine Warteschlange voll ist. Lassen Sie uns sehen, wie das erkannt werden kann.
Hier sind die Speicherorte der Warteschlangen:
Die *gewünschte Größe* (desired size) einer Warteschlange ist eine Zahl, die angibt, wie viel Platz in der Warteschlange noch übrig ist.
Daher müssen wir Backpressure anwenden, wenn die gewünschte Größe null oder kleiner ist. Sie ist über den Getter .desiredSize des Objekts verfügbar, das die Warteschlange enthält.
Wie wird die gewünschte Größe berechnet? Über ein Objekt, das eine sogenannte *Queuing-Strategie* spezifiziert. ReadableStream und WritableStream haben Standard-Queuing-Strategien, die über optionale Parameter ihrer Konstruktoren überschrieben werden können. Die Schnittstelle QueuingStrategy hat zwei Eigenschaften:
.size(chunk) gibt eine Größe für chunk zurück..highWaterMark gibt die maximale Größe einer Warteschlange an.Die gewünschte Größe einer Warteschlange ist die High Water Mark abzüglich der aktuellen Größe der Warteschlange.
Daten sendende Entitäten müssen auf signalisierte Backpressure reagieren, indem sie Backpressure ausüben.
Wir können auf das Promise in writer.ready warten. Währenddessen sind wir blockiert und die gewünschte Backpressure wird erreicht. Das Promise wird erfüllt, sobald Platz in der Warteschlange ist. Die Erfüllung wird ausgelöst, wenn writer.desiredSize einen Wert größer als null hat.
Alternativ können wir auf das von writer.write() zurückgegebene Promise warten. Wenn wir das tun, wird die Warteschlange gar nicht erst gefüllt.
Wenn wir möchten, können wir zusätzlich die Größe unserer Chunks basierend auf writer.desiredSize festlegen.
Das zugrunde liegende Source-Objekt, das an einen ReadableStream übergeben werden kann, umschließt eine externe Quelle. In gewisser Weise ist es auch ein Mitglied der Pipe-Kette; eines, das vor seinem ReadableStream kommt.
Unterliegende Pull-Quellen werden nur dann nach neuen Daten gefragt, wenn Platz in der Warteschlange ist. Solange dies nicht der Fall ist, wird automatisch Backpressure ausgeübt, da keine Daten gezogen werden.
Unterliegende Push-Quellen sollten controller.desiredSize überprüfen, nachdem sie etwas eingereiht haben: Wenn es null oder weniger ist, sollten sie Backpressure ausüben, indem sie ihre externen Quellen pausieren.
Das zugrunde liegende Sink-Objekt, das an einen WritableStream übergeben werden kann, umschließt einen externen Sink. In gewisser Weise ist es auch ein Mitglied der Pipe-Kette; eines, das nach seinem WritableStream kommt.
Jeder externe Sink signalisiert Backpressure unterschiedlich (in einigen Fällen gar nicht). Der zugrunde liegende Sink kann Backpressure ausüben, indem er aus der Methode .write() ein Promise zurückgibt, das erfüllt wird, sobald das Schreiben abgeschlossen ist. Es gibt ein Beispiel im Web Streams Standard, das zeigt, wie das funktioniert.
.writable → .readable)Der TransformStream verbindet seine beschreibbare Seite mit seiner lesbaren Seite, indem er einen zugrunde liegenden Sink für erstere und eine zugrunde liegende Quelle für letztere implementiert. Er hat einen internen Slot .[[backpressure]], der anzeigt, ob interne Backpressure derzeit aktiv ist oder nicht.
Die Methode .write() des zugrunde liegenden Sinks der beschreibbaren Seite wartet asynchron, bis keine interne Backpressure mehr vorhanden ist, bevor sie einen weiteren Chunk an den Transformer des TransformStream füttert (Web Streams Standard: TransformStreamDefaultSinkWriteAlgorithm). Der Transformer kann dann etwas über seinen TransformStreamDefaultController einreihen. Beachten Sie, dass .write() ein Promise zurückgibt, das erfüllt wird, wenn die Methode abgeschlossen ist. Bis dahin puffert der WriteStream eingehende Schreibanforderungen über seine Warteschlange. Daher wird Backpressure für die beschreibbare Seite über diese Warteschlange und ihre gewünschte Größe signalisiert.
Die Backpressure des TransformStream wird aktiviert, wenn ein Chunk über den TransformStreamDefaultController eingereiht wird und die Warteschlange der lesbaren Seite voll wird (Web Streams Standard: TransformStreamDefaultControllerEnqueue).
Die Backpressure des TransformStream kann deaktiviert werden, wenn etwas vom Reader gelesen wird (Web Streams Standard: ReadableStreamDefaultReaderRead).
.pull() der zugrunde liegenden Quelle aufzurufen (Web Streams Standard: .[[PullSteps]])..pull() der zugrunde liegenden Quelle der lesbaren Seite deaktiviert die Backpressure (Web Streams Standard: TransformStreamDefaultSourcePullAlgorithm)..pipeTo() (ReadableStream → WritableStream).pipeTo() liest Chunks vom ReadableStream über einen Reader und schreibt sie über einen Writer in den WritableStream. Er pausiert, wann immer writer.desiredSize null oder kleiner ist (Web Streams Standard: Schritt 15 von ReadableStreamPipeTo).
Bisher haben wir nur mit *Text-Streams* gearbeitet, Streams, deren Chunks Strings waren. Aber die Web Streams API unterstützt auch *Byte-Streams* für Binärdaten, bei denen Chunks Uint8Arrays sind (TypedArrays).
ReadableStream hat einen speziellen 'bytes'-Modus.WritableStream selbst kümmert sich nicht darum, ob Chunks Strings oder Uint8Arrays sind. Daher hängt es davon ab, welche Art von Chunks der zugrunde liegende Sink verarbeiten kann, ob eine Instanz ein Text-Stream oder ein Byte-Stream ist.TransformStream verarbeiten kann, hängt auch von seinem Transformer ab.Als Nächstes lernen wir, wie man lesbare Byte-Streams erstellt.
Welche Art von Stream vom ReadableStream-Konstruktor erstellt wird, hängt von der optionalen Eigenschaft .type des optionalen ersten Parameters underlyingSource ab.
Wenn .type weggelassen wird oder keine zugrunde liegende Quelle angegeben wird, ist die neue Instanz ein Text-Stream.
Wenn .type der String 'bytes' ist, ist die neue Instanz ein Byte-Stream.
const readableByteStream = new ReadableStream({
type: 'bytes',
async start() { /*...*/ }
// ...
});Was ändert sich, wenn ein ReadableStream im 'bytes'-Modus ist?
Im Standardmodus kann die zugrunde liegende Quelle jede Art von Chunk zurückgeben. Im Byte-Modus müssen die Chunks ArrayBufferViews sein, d. h. TypedArrays (wie Uint8Arrays) oder DataViews.
Zusätzlich kann ein lesbarer Byte-Stream zwei Arten von Readern erstellen:
.getReader() gibt eine Instanz von ReadableStreamDefaultReader zurück..getReader({mode: 'byob'}) gibt eine Instanz von ReadableStreamBYOBReader zurück.“BYOB” steht für “Bring Your Own Buffer” und bedeutet, dass wir einen Puffer (einen ArrayBufferView) an reader.read() übergeben können. Danach wird dieser ArrayBufferView freigegeben und kann nicht mehr verwendet werden. Aber .read() gibt seine Daten in einem neuen ArrayBufferView zurück, der denselben Typ hat und auf denselben Bereich desselben ArrayBuffers zugreift.
Zusätzlich haben lesbare Byte-Streams unterschiedliche Controller: Sie sind Instanzen von ReadableByteStreamController (im Gegensatz zu ReadableStreamDefaultController). Abgesehen davon, dass zugrunde liegende Quellen gezwungen werden, ArrayBufferViews (TypedArrays oder DataViews) einzureihen, unterstützt er auch ReadableStreamBYOBReaders über seine Eigenschaft .byobRequest. Eine zugrunde liegende Quelle schreibt ihre Daten in die BYOBRequest, die in dieser Eigenschaft gespeichert ist. Der Web Streams Standard hat zwei Beispiele für die Verwendung von .byobRequest in seinem Abschnitt “Examples of creating streams”.
Im nächsten Beispiel erstellen wir einen unendlichen lesbaren Byte-Stream, der seine Chunks mit Zufallsdaten füllt (Inspiration: example4.mjs in “Implementing the Web Streams API in Node.js”).
import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);
const readableByteStream = new ReadableStream({
type: 'bytes',
async pull(controller) {
const byobRequest = controller.byobRequest;
await asyncRandomFill(byobRequest.view);
byobRequest.respond(byobRequest.view.byteLength);
},
});
const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);Da readableByteStream unendlich ist, können wir ihn nicht durchlaufen. Deshalb lesen wir nur seinen ersten Chunk (Zeile B).
Der Puffer, den wir in Zeile A erstellen, wird übertragen und ist daher nach Zeile B unlesbar.
Im folgenden Beispiel erstellen wir einen lesbaren Byte-Stream und leiten ihn durch einen Stream, der ihn in das GZIP-Format komprimiert.
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// 256 zeros
controller.enqueue(new Uint8Array(256));
controller.close();
},
});
const transformedStream = readableByteStream.pipeThrough(
new CompressionStream('gzip'));
await logChunks(transformedStream);
async function logChunks(readableByteStream) {
const reader = readableByteStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}
} finally {
reader.releaseLock();
}
}fetch()Das Ergebnis von fetch() wird zu einem Response-Objekt aufgelöst, dessen Eigenschaft .body ein lesbarer Byte-Stream ist. Wir konvertieren diesen Byte-Stream in einen Text-Stream über TextDecoderStream.
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}Node.js ist die einzige Web-Plattform, die die folgenden Hilfsfunktionen unterstützt, die sie als Utility Consumers bezeichnet.
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';Diese Funktionen konvertieren Web ReadableStreams, Node.js Readables und AsyncIteratoren in Promises, die mit Folgendem erfüllt werden:
arrayBuffer())blob())buffer())json())text())Binärdaten werden als UTF-8-kodiert angenommen.
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// TextEncoder converts strings to UTF-8 encoded Uint8Arrays
const encoder = new TextEncoder();
const view = encoder.encode('"😀"');
assert.deepEqual(
view,
Uint8Array.of(34, 240, 159, 152, 128, 34)
);
controller.enqueue(view);
controller.close();
},
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');String-Streams funktionieren wie erwartet.
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
start(controller) {
controller.enqueue('"😀"');
controller.close();
},
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');Alle in diesem Abschnitt genannten Materialien waren eine Quelle für dieses Kapitel.
Dieses Kapitel deckt nicht alle Aspekte der Web Streams API ab. Weitere Informationen finden Sie hier:
Weiteres Material