readableToString()Dieses Kapitel ist eine Einführung in die nativen Streams von Node. Sie unterstützen asynchrone Iteration, was die Arbeit damit erleichtert und was wir in diesem Kapitel hauptsächlich verwenden werden.
Beachten Sie, dass plattformübergreifende Web Streams in §10 „Web Streams auf Node.js verwenden“ behandelt werden. Wir werden diese hauptsächlich in diesem Buch verwenden. Daher können Sie das aktuelle Kapitel überspringen, wenn Sie möchten.
Asynchrone Iteration ist ein Protokoll zum asynchronen Abrufen der Inhalte eines Datenbehälters (was bedeutet, dass die aktuelle „Aufgabe“ vor dem Abrufen eines Elements pausiert werden kann).
Asynchrone Generatoren helfen bei der asynchronen Iteration. Zum Beispiel ist dies eine asynchrone Generatorfunktion
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
}
}for-await-of Schleife iteriert über das Eingabe-asyncIterable. Diese Schleife ist auch in normalen asynchronen Funktionen verfügbar.yield liefert Werte in das vom Generator zurückgegebene asynchrone Iterable ein.Achten Sie im Rest des Kapitels genau darauf, ob eine Funktion eine async-Funktion oder eine async-Generatorfunktion ist.
/** @returns a Promise */
async function asyncFunction() { /*···*/ }
/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }Ein Stream ist ein Muster, dessen Kernidee darin besteht, eine große Datenmenge zu „teilen und erobern“: Wir können sie handhaben, wenn wir sie in kleinere Teile aufteilen und jeweils einen Teil verarbeiten.
Node.js unterstützt verschiedene Arten von Streams – zum Beispiel
Readable Streams sind Streams, aus denen wir Daten lesen können. Mit anderen Worten, sie sind Datenquellen. Ein Beispiel ist ein lesbarer Dateistream, mit dem wir den Inhalt einer Datei lesen können.
Writable Streams sind Streams, in die wir Daten schreiben können. Mit anderen Worten, sie sind Senken für Daten. Ein Beispiel ist ein beschreibbarer Dateistream, mit dem wir Daten in eine Datei schreiben können.
Ein Transform Stream ist sowohl lesbar als auch beschreibbar. Als beschreibbarer Stream empfängt er Datenstücke, transformiert (ändert oder verwirft) sie und gibt sie dann als lesbaren Stream aus.
Um gestreamte Daten in mehreren Schritten zu verarbeiten, können wir Streams pipen (verbinden)
Teil (2) ist optional.
Beim Erstellen von Text-Streams ist es am besten, immer eine Kodierung anzugeben.
Die Node.js-Dokumentation enthält eine Liste unterstützter Kodierungen und ihrer Standardbezeichnungen – zum Beispiel
'utf8''utf16le''base64'Es sind auch einige verschiedene Schreibweisen erlaubt. Sie können Buffer.isEncoding() verwenden, um zu prüfen, welche das sind.
> buffer.Buffer.isEncoding('utf8')
true
> buffer.Buffer.isEncoding('utf-8')
true
> buffer.Buffer.isEncoding('UTF-8')
true
> buffer.Buffer.isEncoding('UTF:8')
falseDer Standardwert für Kodierungen ist null, was 'utf8' entspricht.
readableToString()Wir werden gelegentlich die folgende Hilfsfunktion verwenden. Sie müssen nicht verstehen, wie sie funktioniert, nur (ungefähr), was sie tut.
import * as stream from 'stream';
/**
* Reads all the text in a readable stream and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = '';
readable.on('data', function (chunk) {
data += chunk;
});
readable.on('end', function () {
resolve(data);
});
readable.on('error', function (err) {
reject(err);
});
});
}Diese Funktion wird über die ereignisbasierte API implementiert. Später werden wir eine einfachere Methode dafür sehen – über asynchrone Iteration.
await, das auf der obersten Ebene verwendet wird. In diesem Fall stellen wir uns vor, dass wir uns innerhalb eines Moduls oder innerhalb des Körpers einer async-Funktion befinden.'\n' (LF)'\r\n' (CR LF)EOL im Modul os abgerufen werden.Wir können fs.createReadStream() verwenden, um Readable Streams zu erstellen.
import * as fs from 'fs';
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
assert.equal(
await readableToString(readableStream),
'This is a test!\n');Readable.from(): Erstellen von Readable Streams aus IterablesDie statische Methode Readable.from(iterable, options?) erstellt einen lesbaren Stream, der die im iterable enthaltenen Daten enthält. iterable kann ein synchrones oder asynchrones Iterable sein. Der Parameter options ist optional und kann unter anderem zur Angabe einer Textkodierung verwendet werden.
import * as stream from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
await readableToString(readableStream),
'One line\nAnother line\n');Readable.from() akzeptiert jedes Iterable und kann daher auch verwendet werden, um Strings in Streams umzuwandeln.
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
await readableToString(readable),
'Some text!');Derzeit behandelt Readable.from() einen String wie jedes andere Iterable und iteriert daher über seine Codepunkte. Das ist aus Leistungssicht nicht ideal, sollte aber für die meisten Anwendungsfälle in Ordnung sein. Ich gehe davon aus, dass Readable.from() häufig mit Strings verwendet wird, daher wird es in Zukunft möglicherweise Optimierungen geben.
for-await-ofJeder lesbare Stream ist asynchron iterierbar, was bedeutet, dass wir eine for-await-of Schleife verwenden können, um seinen Inhalt zu lesen.
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'Die folgende Funktion ist eine einfachere Neuimplementierung der Funktion, die wir zu Beginn dieses Kapitels gesehen haben.
import {Readable} from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');Beachten Sie, dass wir in diesem Fall eine async-Funktion verwenden mussten, da wir ein Promise zurückgeben wollten.
'node:readlines'Das eingebaute Modul 'node:readline' ermöglicht es uns, Zeilen aus lesbaren Streams zu lesen.
import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';
const filePath = process.argv[2]; // first command line argument
const rl = readline.createInterface({
input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
console.log('>', line);
}
rl.close();Asynchrone Iteration bietet eine elegante Alternative zu Transform Streams für die Verarbeitung von gestreamten Daten in mehreren Schritten.
Readable.from() in einen lesbaren Stream umwandeln (der später in einen beschreibbaren Stream gepiped werden kann).Zusammenfassend lässt sich sagen, dass dies die Teile solcher Verarbeitungspipelines sind
Im nächsten Beispiel sehen wir ein Beispiel für eine Verarbeitungspipeline, wie sie gerade erklärt wurde.
import {Readable} from 'stream';
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
previous += chunk;
while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
previous = previous.slice(eolIndex+1);
startSearch = 0;
}
}
if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
lineNumber++;
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
{encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'Die Verarbeitungspipeline wird in Zeile A eingerichtet. Die Schritte sind
chunksToLines(): Von einem asynchronen Iterable mit Chunks zu einem asynchronen Iterable mit Zeilen.numberLines(): Von einem asynchronen Iterable mit Zeilen zu einem asynchronen Iterable mit nummerierten Zeilen.logLines(): Protokolliert die Elemente eines asynchronen Iterables.Beobachtung
chunksToLines() und numberLines() sind asynchrone Iterables. Deshalb sind es asynchrone Generatoren (erkennbar an async und *).logLines() ist ein asynchrones Iterable. Deshalb ist es eine async-Funktion (erkennbar an async).Wir können fs.createWriteStream() verwenden, um beschreibbare Streams zu erstellen.
const writableStream = fs.createWriteStream(
'tmp/log.txt', {encoding: 'utf8'});In diesem Abschnitt betrachten wir Ansätze zum Schreiben in einen beschreibbaren Stream.
.write().pipeline() aus dem Modul stream, um einen lesbaren Stream in den beschreibbaren Stream zu pipen.Um diese Ansätze zu demonstrieren, implementieren wir damit dieselbe Funktion writeIterableToFile().
Die Methode .pipe() von lesbaren Streams unterstützt ebenfalls das Pipelining, hat aber einen Nachteil und es ist besser, sie zu vermeiden.
writable.write(chunk)Beim Schreiben von Daten in Streams gibt es zwei Callback-basierte Mechanismen, die uns helfen.
'drain' signalisiert, dass der Backpressure beendet ist.finished() ruft einen Callback auf, wenn ein StreamIm folgenden Beispiel promisifizieren wir diese Mechanismen, damit wir sie über eine async-Funktion verwenden können.
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// Handle backpressure
await once(writable, 'drain');
}
}
writable.end(); // (C)
// Wait until done. Throws if there are errors.
await finished(writable);
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
'One line of text.\n');Die Standardversion von stream.finished() ist Callback-basiert, kann aber über util.promisify() (Zeile A) in eine Promise-basierte Version umgewandelt werden.
Wir haben die folgenden beiden Muster verwendet.
Schreiben in einen beschreibbaren Stream unter Berücksichtigung des Backpressures (Zeile B)
if (!writable.write(chunk)) {
await once(writable, 'drain');
}Schließen eines beschreibbaren Streams und Warten, bis das Schreiben abgeschlossen ist (Zeile C)
writable.end();
await finished(writable);stream.pipeline()In Zeile A verwenden wir eine promisifizierte Version von stream.pipeline(), um einen lesbaren Stream readable in einen beschreibbaren Stream writable zu pipen.
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
iterable, {encoding: 'utf8'});
const writable = fs.createWriteStream(filePath);
await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
// ···readable.pipe(destination)Die Methode readable.pipe() unterstützt ebenfalls das Pipelining, hat aber einen Vorbehalt: Wenn der lesbare Stream einen Fehler auslöst, wird der beschreibbare Stream nicht automatisch geschlossen. pipeline() hat diesen Vorbehalt nicht.
Modul os
const EOL: string (seit 0.7.8)
Enthält die Zeilenende-Zeichensequenz, die von der aktuellen Plattform verwendet wird.
Modul buffer
Buffer.isEncoding(encoding: string): boolean (seit 0.9.1)
Gibt true zurück, wenn encoding korrekt eine der unterstützten Node.js-Kodierungen für Text benennt. Unterstützte Kodierungen umfassen
'utf8''utf16le''ascii''latin1'base64''hex' (jedes Byte als zwei Hexadezimalzeichen)Modul stream
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any> (seit 10.0.0)
Readable Streams sind asynchron iterierbar. Sie können zum Beispiel for-await-of Schleifen in async-Funktionen oder async-Generatoren verwenden, um darüber zu iterieren.
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void> (seit 10.0.0)
Das zurückgegebene Promise wird erfüllt, wenn das Lesen/Schreiben abgeschlossen ist oder ein Fehler aufgetreten ist.
Diese promisifizierte Version wird wie folgt erstellt.
const finished = util.promisify(stream.finished);pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void> (seit 10.0.0)
Pipes zwischen Streams. Das zurückgegebene Promise wird erfüllt, wenn die Pipeline abgeschlossen ist oder ein Fehler aufgetreten ist.
Diese promisifizierte Version wird wie folgt erstellt.
const pipeline = util.promisify(stream.pipeline);Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable (seit 12.3.0)
Konvertiert ein Iterable in einen lesbaren Stream.
interface ReadableOptions {
highWaterMark?: number;
encoding?: string;
objectMode?: boolean;
read?(this: Readable, size: number): void;
destroy?(this: Readable, error: Error | null,
callback: (error: Error | null) => void): void;
autoDestroy?: boolean;
}Diese Optionen sind dieselben wie die Optionen für den Readable Konstruktor und dort dokumentiert.
Modul fs
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream (seit 2.3.0)
Erstellt einen lesbaren Stream. Weitere Optionen sind verfügbar.
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream (seit 2.3.0)
Mit der Option .flags können Sie angeben, ob Sie schreiben oder anhängen möchten und was passiert, wenn eine Datei existiert oder nicht existiert. Weitere Optionen sind verfügbar.
Die statischen Typinformationen in diesem Abschnitt basieren auf Definitely Typed.