Node.js Streams
1. Streams(ストリーム)とは?
Node.js において、Streams(ストリーム) はデータの集合を扱うための仕組みです。最大の特徴は、データが一度にすべて揃っている必要がなく、またデータ全体がメモリに収まる必要もないという点です。
ストリームは、データをある場所から別の場所へと運ぶ 「ベルトコンベア」 のようなものだと考えてください。データセット全体が揃うのを待つのではなく、到着したデータ片から順次処理を行うことができます。
ストリームは Node.js の中でも特に強力な機能の一つであり、以下のような場面で広く活用されています。
- File System 操作(ファイルの読み書き)
- HTTP リクエストとレスポンス
- データの圧縮・解凍
- データベース操作
- リアルタイムデータ処理
2. ストリームを使い始める
ストリームは、データを効率的に扱うための Node.js の基本概念です。すべてを一度にメモリにロードするのではなく、利用可能になったデータから チャンク(Chunks) 単位で処理することができます。
2.1 基本的なストリームの例
const fs = require('fs');
// ファイルから読み取り可能なストリーム(Readable Stream)を作成
const readableStream = fs.createReadStream('input.txt', 'utf8');
// ファイルへの書き込み可能なストリーム(Writable Stream)を作成
const writableStream = fs.createWriteStream('output.txt');
// 読み取りストリームから書き込みストリームへデータをパイプ(転送)する
readableStream.pipe(writableStream);
// 完了とエラーのハンドリング
writableStream.on('finish', () => {
console.log('ファイルのコピーが完了しました!');
});
readableStream.on('error', (err) => {
console.error('読み取りエラー:', err);
});
writableStream.on('error', (err) => {
console.error('書き込みエラー:', err);
});3. なぜストリームを使用するのか?
ストリームを使用することには、いくつかの大きな利点があります。
- メモリ効率(Memory Efficiency): 大容量のファイルをメモリにすべてロードすることなく処理できます。
- 時間効率(Time Efficiency): すべてのデータが揃うのを待たずに、データが届き次第すぐに処理を開始できます。
- 合成可能性(Composability): ストリーム同士を繋ぐことで、強力なデータパイプラインを構築できます。
- ユーザーエクスペリエンスの向上: ビデオストリーミングのように、利用可能になったデータから順次ユーザーに届けることができます。
例えば、メモリが 512MB しかないサーバーで 1GB のファイルを読み込む場合:
- ストリームなし: ファイル全体をメモリに載せようとしてプロセスがクラッシュします。
- ストリームあり: ファイルを小さなチャンク(例:64KB ずつ)に分けて処理するため、安全に実行できます。
4. 主要なストリームの種類
Node.js は 4 つの基本タイプのストリームを提供しており、それぞれデータ処理において特定の役割を担います。
| ストリームの種類 | 説明 | 一般的な例 |
|---|---|---|
| Readable | データを読み取ることができるストリーム(データソース) | fs.createReadStream(), HTTP レスポンス, process.stdin |
| Writable | データを書き込むことができるストリーム(データの出力先) | fs.createWriteStream(), HTTP リクエスト, process.stdout |
| Duplex | 読み取りと書き込みの両方が可能なストリーム | TCP ソケット, Zlib ストリーム |
| Transform | 書き込みと読み取りの際にデータを修正・変換できる Duplex ストリーム | Zlib ストリーム, crypto ストリーム |
注意: Node.js のすべてのストリームは EventEmitter のインスタンスであり、イベントを発行・監視することが可能です。
5. Readable ストリーム
Readable ストリーム は、ソースからデータを読み取るためのものです。
5.1 Readable ストリームの作成
const fs = require('fs');
// ファイルから読み取り可能なストリームを作成
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB チャンク
});
// Readable ストリームのイベント
readableStream.on('data', (chunk) => {
console.log(`${chunk.length} バイトのデータを受信しました。`);
console.log(chunk);
});
readableStream.on('end', () => {
console.log('読み取るデータはもうありません。');
});
readableStream.on('error', (err) => {
console.error('ストリーム読み取りエラー:', err);
});5.2 読み取りモード
Readable ストリームには 2 つの動作モードがあります。
- Flowing モード(自動): ソースからデータが読み取られ、イベントを使用して可能な限り速くアプリケーションに提供されます。
- Paused モード(手動): 明示的に
stream.read()を呼び出してチャンクを取得する必要があります。
const fs = require('fs');
// Paused モードの例
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024
});
// read() を使用して手動でストリームを消費する
readableStream.on('readable', () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log(`${chunk.length} バイトのデータを読み取りました。`);
console.log(chunk);
}
});
readableStream.on('end', () => {
console.log('読み取りが完了しました。');
});6. Writable ストリーム
Writable ストリーム は、目的地にデータを書き込むためのものです。
6.1 Writable ストリームの作成
const fs = require('fs');
// ファイルへの書き込みストリームを作成
const writableStream = fs.createWriteStream('output.txt');
// ストリームにデータを書き込む
writableStream.write('こんにちは、');
writableStream.write('世界!');
writableStream.write('\nストリームへの書き込みは簡単です!');
// ストリームを終了する
writableStream.end();
// Writable ストリームのイベント
writableStream.on('finish', () => {
console.log('すべてのデータがファイルに書き込まれました。');
});
writableStream.on('error', (err) => {
console.error('ストリーム書き込みエラー:', err);
});6.2 バックプレッシャー(Backpressure)の処理
ストリームへの書き込み速度が処理速度を上回ると、バックプレッシャー が発生します。write() メソッドは、書き込みを続けても安全かどうかを示すブール値を返します。
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
function writeData() {
let i = 100;
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// 最後にストリームを閉じる
writableStream.write('最後のチャンク!\n');
writableStream.end();
} else {
const data = `データチャンク ${i}\n`;
// 書き込みを行い、継続可能かチェック
ok = writableStream.write(data);
}
} while (i > 0 && ok);
if (i > 0) {
// バッファが空になる(drain イベント)まで待機してから書き込みを再開
writableStream.once('drain', write);
}
}
write();
}
writeData();7. Pipe(パイプ)
pipe() メソッドは、読み取りストリームと書き取りストリームを接続し、データの流れを自動的に管理しながらバックプレッシャーを適切に処理します。
const fs = require('fs');
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');
// 読み取りストリームを書き込みストリームにパイプする
readableStream.pipe(writableStream);
writableStream.on('finish', () => {
console.log('ファイルのコピーが完了しました!');
});7.1 パイプのチェイン(連結)
pipe() を使用して複数のストリームを連結できます。これは Transform ストリーム を扱う際に特に便利です。
const fs = require('fs');
const zlib = require('zlib');
// ファイルを読み込み、圧縮して、新しいファイルに書き込むパイプライン
fs.createReadStream('source.txt')
.pipe(zlib.createGzip()) // データを圧縮
.pipe(fs.createWriteStream('destination.txt.gz'))
.on('finish', () => {
console.log('ファイルの圧縮が正常に完了しました!');
});8. Duplex と Transform ストリーム
8.1 Duplex ストリーム
Duplex ストリーム は、読み取りと書き込みの両方が可能な、双方向のパイプのようなものです。TCP ソケットがその好例です。
const net = require('net');
// TCP サーバーを作成
const server = net.createServer((socket) => {
// 'socket' は Duplex ストリームです
// データの受信を処理 (Readable 側)
socket.on('data', (data) => {
console.log('受信:', data.toString());
// エコーバック (Writable 側)
socket.write(`エコー: ${data}`);
});
});
server.listen(8080);8.2 Transform ストリーム
Transform ストリーム は、データが通過する際にその内容を修正できる Duplex ストリームです。
const { Transform } = require('stream');
// テキストを大文字に変換する Transform ストリーム
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
}
const uppercaseTransform = new UppercaseTransform();
process.stdin.pipe(uppercaseTransform).pipe(process.stdout);9. ストリームのイベント
すべてのストリームは EventEmitter のインスタンスであり、以下のイベントを発行します。
- Readable ストリーム:
data: 読み取り可能なデータがあるときに発行end: 消費すべきデータがもうないときに発行error: 読み取り中にエラーが発生したときに発行readable: データを読み取る準備ができたときに発行- Writable ストリーム:
drain:write()が false を返した後、再び書き込み可能になったときに発行finish: すべてのデータがシステムにフラッシュされたときに発行pipe: Readable ストリームでpipe()が呼ばれたときに発行
10. stream.pipeline() メソッド
pipeline() 関数(Node.js v10.0.0+)は、ストリームを連結するためのより堅牢な方法です。特にエラーハンドリングにおいて pipe() よりも優れています。
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('source.txt'),
zlib.createGzip(),
fs.createWriteStream('destination.txt.gz'),
(err) => {
if (err) {
console.error('パイプライン失敗:', err);
} else {
console.log('パイプライン成功!');
}
}
); 注意:pipeline() は、いずれかのストリームでエラーが発生した場合、すべてのストリームを適切にクリーンアップし、メモリリークを防ぎます。
11. オブジェクトモード・ストリーム
デフォルトではストリームは Buffer または文字列を扱いますが、objectMode を true に設定することで JavaScript オブジェクトを流すことができます。
const { Readable, Writable } = require('stream');
const objectReadable = new Readable({
objectMode: true,
read() {}
});
const objectWritable = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log('受信オブジェクト:', chunk);
callback();
}
});
objectReadable.pipe(objectWritable);
objectReadable.push({ id: 1, name: 'Alice' });
objectReadable.push(null);12. 高度なストリーム・パターン
12.1 pipeline() によるエラーハンドリングの推奨
複数のストリームを繋ぐ場合、各ストリームのエラーを個別に監視するのは困難です。pipeline() を使うのがベストプラクティスです。
12.2 オブジェクトモードの活用
API のレスポンスデータやデータベースのレコードを 1 つずつ処理する場合に、オブジェクトモードは非常に有効です。
13. 実践的な例
13.1 HTTP ストリーミング(大容量ファイルの配信)
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
if (req.url === '/file') {
res.writeHead(200, { 'Content-Type': 'text/plain' });
const fileStream = fs.createReadStream('largefile.txt');
// 自動的にバックプレッシャーを管理しながらレスポンスを返す
fileStream.pipe(res);
}
});
server.listen(8080);14. ベストプラクティス
- エラーハンドリング: アプリケーションのクラッシュを防ぐため、必ずエラーイベントを処理してください。
- pipeline() の使用:
pipe()よりもエラー管理とクリーンアップが容易なpipeline()を優先しましょう。 - バックプレッシャーの尊重:
write()の戻り値を無視してデータを送り続けると、メモリを過剰に消費します。 - ストリームの終了: Writable ストリームの処理が終わったら、必ず
end()を呼び出してください。 - 同期処理の回避: ストリームのハンドラ内で重い同期処理を行うと、イベントループをブロックしてしまいます。
15. まとめ
ストリームは、Node.js において効率的なデータ処理を実現するための根幹技術です。
- データをメモリに一括ロードせず、チャンクごとに処理する
- 大容量データにおいて圧倒的なメモリ効率を誇る
- パイプライン化により、拡張性の高いデータ処理を構築できる
これらをマスターすることで、パフォーマンスと安定性を両立させた Node.js アプリケーションの開発が可能になります。