JavaScript/ReadableStream
ReadableStream オブジェクト
概要
ReadableStream
は JavaScript のストリーム API の中核をなすオブジェクトであり、データを小さなチャンクに分割して順次処理することを可能にします。ファイル、ネットワークリソース、生成データなど、さまざまなソースからデータを効率的に読み取るための標準化されたインターフェイスを提供します。
基本的な概念
ReadableStream
は以下の主要コンポーネントで構成されています:
- 内部キュー: 読み取り可能なチャンクを保持するバッファ
- リーダー: ストリームからデータを読み取るインターフェイス
- コントローラー: ストリームの内部状態を管理するオブジェクト
- キューイング戦略: バックプレッシャーを制御するメカニズム
基本的な使い方
ReadableStream の作成
最もシンプルな ReadableStream
は以下のように作成できます:
const stream = new ReadableStream({ start(controller) { // ストリームの初期化 // データソースの設定など }, pull(controller) { // データが必要なときに呼び出される // controller.enqueue() でデータを追加 }, cancel(reason) { // ストリームがキャンセルされたときの処理 } });
具体的な例:カウンターストリーム
const countStream = new ReadableStream({ start(controller) { this.counter = 0; }, pull(controller) { // 1秒ごとにカウンターの値をエンキュー const chunk = this.counter++; controller.enqueue(chunk); // 10までカウントしたらストリームを閉じる if (this.counter > 10) { controller.close(); } // 次のチャンクまで1秒待つ return new Promise(resolve => setTimeout(resolve, 1000)); } });
ストリームからの読み取り
ReadableStream
からデータを読み取るには、主に二つの方法があります:
1. デフォルトリーダーを使用する
// ストリームからリーダーを取得 const reader = stream.getReader(); async function readAll() { try { while (true) { // チャンクごとに読み取り const { done, value } = await reader.read(); // ストリームが終了した場合 if (done) { console.log('ストリームからの読み取りが完了しました'); break; } // 読み取ったデータを処理 console.log(`受信データ: ${value}`); } } finally { // リーダーをリリース reader.releaseLock(); } } readAll();
2. for await...of を使用する(より簡潔)
async function readWithForAwait() { try { // ストリームを反復処理のためにリーダーに変換 for await (const chunk of stream.values()) { console.log(`受信データ: ${chunk}`); } console.log('ストリームからの読み取りが完了しました'); } catch (error) { console.error('読み取りエラー:', error); } } readWithForAwait();
実践的な例:Fetch API との連携
fetch()
は ReadableStream
を返すため、大きなファイルを効率的に処理できます:
async function processLargeFile() { const response = await fetch('https://example.com/large-file.txt'); const reader = response.body.getReader(); let receivedLength = 0; const chunks = []; while (true) { const { done, value } = await reader.read(); if (done) { break; } chunks.push(value); receivedLength += value.length; console.log(`受信済み: ${receivedLength} バイト`); } // 受信したチャンクを連結 const allChunks = new Uint8Array(receivedLength); let position = 0; for (const chunk of chunks) { allChunks.set(chunk, position); position += chunk.length; } // バイナリデータをテキストに変換 const result = new TextDecoder().decode(allChunks); console.log(`受信完了: ${result.length} 文字`); }
変換とパイピング
TransformStream を使った変換
ReadableStream
を TransformStream
にパイプして、データを処理できます:
const textStream = new ReadableStream({ start(controller) { const lines = ['これは', '複数行の', 'テキストです']; for (const line of lines) { controller.enqueue(line); } controller.close(); } }); // 大文字に変換するトランスフォーマー const uppercaseTransformer = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.toUpperCase()); } }); // ストリームをパイプして変換 const uppercaseStream = textStream.pipeThrough(uppercaseTransformer); // 結果を読み取る const reader = uppercaseStream.getReader(); reader.read().then(function process({ done, value }) { if (done) return; console.log(value); // 大文字に変換されたテキスト return reader.read().then(process); });
WritableStream へのパイピング
ReadableStream
から WritableStream
へデータを直接パイプできます:
const readableStream = new ReadableStream({ start(controller) { for (let i = 0; i < 5; i++) { controller.enqueue(`チャンク ${i}`); } controller.close(); } }); const writableStream = new WritableStream({ write(chunk) { console.log(`書き込み: ${chunk}`); return new Promise(resolve => setTimeout(resolve, 100)); // 書き込み遅延をシミュレート }, close() { console.log('書き込みが完了しました'); } }); // readableStream から writableStream へパイプ readableStream.pipeTo(writableStream) .then(() => console.log('パイピング完了')) .catch(err => console.error('パイピングエラー:', err));
テキストとバイナリデータの処理
TextDecoderStream を使ったテキスト処理
fetch('https://example.com/data.txt') .then(response => { const textStream = response.body .pipeThrough(new TextDecoderStream()) // バイト → テキスト .pipeThrough(new TransformStream({ transform(chunk, controller) { // 行ごとに処理 const lines = chunk.split('\n'); for (const line of lines) { if (line) controller.enqueue(line); } } })); // 行ごとに処理 const reader = textStream.getReader(); return readLines(reader); }); async function readLines(reader) { let result = []; try { while (true) { const { done, value } = await reader.read(); if (done) break; result.push(value); } } finally { reader.releaseLock(); } return result; }
バイナリデータの処理
async function processBinaryStream() { const response = await fetch('https://example.com/image.jpg'); const contentLength = response.headers.get('Content-Length'); const total = parseInt(contentLength, 10); let loaded = 0; // プログレスを報告するトランスフォーマー const progressTransformer = new TransformStream({ transform(chunk, controller) { loaded += chunk.length; console.log(`ダウンロード進捗: ${Math.round(loaded / total * 100)}%`); controller.enqueue(chunk); } }); // プログレス付きのストリーム const progressStream = response.body.pipeThrough(progressTransformer); // チャンクを集めてバイナリデータを構築 const reader = progressStream.getReader(); const chunks = []; while (true) { const { done, value } = await reader.read(); if (done) break; chunks.push(value); } // 完全な Blob を作成 const blob = new Blob(chunks); console.log(`ダウンロード完了: ${blob.size} バイト`); return blob; }
カスタムキューイング戦略
ReadableStream
はキューイング戦略を使用してバックプレッシャーを管理します:
// 10MBの高水準マークを持つバイト長キューイング戦略 const hugeStream = new ReadableStream({ start(controller) { // 大量のデータをエンキュー }, pull(controller) { // データの生成ロジック } }, new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 * 1024 // 10MB })); // 最大50チャンクのカウントキューイング戦略 const countStream = new ReadableStream({ /* ソースロジック */ }, new CountQueuingStrategy({ highWaterMark: 50 // 最大50チャンク }));
実際のアプリケーション例
ファイルアップローダーの進捗表示
async function uploadFileWithProgress(file, url, progressCallback) { // ファイルを ReadableStream に変換 const fileStream = file.stream(); // 進捗を追跡するトランスフォーマー const progressTracker = new TransformStream({ transform(chunk, controller) { this.loaded = (this.loaded || 0) + chunk.length; progressCallback(this.loaded, file.size); controller.enqueue(chunk); } }); // 進捗付きのストリーム const trackedStream = fileStream.pipeThrough(progressTracker); // ストリームを使用してファイルをアップロード await fetch(url, { method: 'POST', body: trackedStream, headers: { 'Content-Type': file.type, 'Content-Length': file.size.toString() } }); return { success: true, fileName: file.name }; } // 使用例 const fileInput = document.querySelector('input[type="file"]'); fileInput.addEventListener('change', async () => { const file = fileInput.files[0]; const progressBar = document.querySelector('.progress-bar'); await uploadFileWithProgress( file, 'https://example.com/upload', (loaded, total) => { const percent = Math.round((loaded / total) * 100); progressBar.style.width = `${percent}%`; progressBar.textContent = `${percent}%`; } ); console.log('アップロード完了'); });
データのリアルタイム処理(WebSocket 例)
function createWebSocketStream(url) { // WebSocket 接続を ReadableStream として提供 return new ReadableStream({ start(controller) { this.socket = new WebSocket(url); this.socket.onmessage = event => { // メッセージを受信したらエンキュー controller.enqueue(event.data); }; this.socket.onclose = () => { // 接続が閉じられたらストリームも閉じる controller.close(); }; this.socket.onerror = error => { // エラーが発生したらストリームにエラーを発生させる controller.error(error); }; }, cancel() { // ストリームがキャンセルされたら WebSocket も閉じる this.socket.close(); } }); } // 使用例:株価のリアルタイム処理 const stockStream = createWebSocketStream('wss://example.com/stocks') .pipeThrough(new TextDecoderStream()) .pipeThrough(new TransformStream({ transform(chunk, controller) { // JSON 文字列をパース try { const data = JSON.parse(chunk); // 特定の株価が閾値を超えた場合のみエンキュー if (data.price > 1000) { controller.enqueue(data); } } catch (e) { console.error('不正な JSON:', chunk); } } })); // フィルタリングされた株価データを処理 const reader = stockStream.getReader(); (async function processStocks() { try { while (true) { const { done, value } = await reader.read(); if (done) break; // 高価格の株を処理 console.log(`${value.symbol}: ${value.price}円`); updateStockChart(value); // UIの更新関数 } } catch (error) { console.error('ストック処理エラー:', error); } finally { console.log('ストック処理完了'); } })();
ブラウザ互換性とポリフィル
ReadableStream
は最新のブラウザで広くサポートされていますが、古いブラウザでは動作しない場合があります。互換性の問題に対処するには、何種類かのポリフィルが利用可能です:
// ポリフィルが必要かどうか確認 if (typeof ReadableStream === 'undefined') { // ポリフィルをロード import('web-streams-polyfill') .then(() => { console.log('ReadableStream ポリフィルがロードされました'); // ストリーム処理コード }); } else { // ネイティブ実装を使用 processStreams(); }
パフォーマンスとメモリ効率
ReadableStream
を使用することで、特に大きなデータセットを扱う場合に以下のメリットが得られます:
- メモリ使用量の削減(一度にすべてのデータをメモリに読み込む必要がない)
- 初期表示の高速化(最初のチャンクが到着したらすぐに処理を開始できる)
- ユーザー体験の向上(処理の進捗を表示できる)
以下の表は、異なるアプローチのメモリ使用量とレスポンス時間を比較したものです:
データサイズ | 方法 | メモリピーク | 初期表示までの時間 |
---|---|---|---|
100MB | 一括ダウンロード | 約100MB | 全体をダウンロード後 |
100MB | ReadableStream |
約2MB | 最初のチャンク到着後 |
1GB | 一括ダウンロード | 約1GB+ | 全体をダウンロード後 |
1GB | ReadableStream |
約5MB | 最初のチャンク到着後 |
まとめ
ReadableStream
は、大規模なデータセットを効率的に処理するための強力なツールです。主な利点は次のとおりです:
- チャンク単位でのデータ処理によるメモリ効率の向上
- 標準化されたインターフェイスによる様々なデータソースの統合
- バックプレッシャーの自動管理によるメモリオーバーフローの防止
- 変換ストリームによる柔軟なデータ処理パイプラインの構築
- 非同期プログラミングモデルとの自然な統合
様々なブラウザ API(Fetch
、File
、WebSocket
など)と組み合わせることで、効率的なデータ処理アプリケーションを構築できます。ストリームのコンセプトを理解し、適切に活用することで、特に大規模データや継続的なデータフローを扱うアプリケーションのパフォーマンスと応答性を大幅に向上させることができます。