JavaScript/ReadableStream

ReadableStream オブジェクト

概要

ReadableStream は JavaScript のストリーム API の中核をなすオブジェクトであり、データを小さなチャンクに分割して順次処理することを可能にします。ファイル、ネットワークリソース、生成データなど、さまざまなソースからデータを効率的に読み取るための標準化されたインターフェイスを提供します。

基本的な概念

ReadableStream は以下の主要コンポーネントで構成されています:

  • 内部キュー: 読み取り可能なチャンクを保持するバッファ
  • リーダー: ストリームからデータを読み取るインターフェイス
  • コントローラー: ストリームの内部状態を管理するオブジェクト
  • キューイング戦略: バックプレッシャーを制御するメカニズム

基本的な使い方

ReadableStream の作成

最もシンプルな ReadableStream は以下のように作成できます:

const stream = new ReadableStream({
  start(controller) {
    // ストリームの初期化
    // データソースの設定など
  },
  pull(controller) {
    // データが必要なときに呼び出される
    // controller.enqueue() でデータを追加
  },
  cancel(reason) {
    // ストリームがキャンセルされたときの処理
  }
});

具体的な例:カウンターストリーム

const countStream = new ReadableStream({
  start(controller) {
    this.counter = 0;
  },
  pull(controller) {
    // 1秒ごとにカウンターの値をエンキュー
    const chunk = this.counter++;
    controller.enqueue(chunk);
    
    // 10までカウントしたらストリームを閉じる
    if (this.counter > 10) {
      controller.close();
    }
    
    // 次のチャンクまで1秒待つ
    return new Promise(resolve => setTimeout(resolve, 1000));
  }
});

ストリームからの読み取り

ReadableStream からデータを読み取るには、主に二つの方法があります:

1. デフォルトリーダーを使用する

// ストリームからリーダーを取得
const reader = stream.getReader();

async function readAll() {
  try {
    while (true) {
      // チャンクごとに読み取り
      const { done, value } = await reader.read();
      
      // ストリームが終了した場合
      if (done) {
        console.log('ストリームからの読み取りが完了しました');
        break;
      }
      
      // 読み取ったデータを処理
      console.log(`受信データ: ${value}`);
    }
  } finally {
    // リーダーをリリース
    reader.releaseLock();
  }
}

readAll();

2. for await...of を使用する(より簡潔)

async function readWithForAwait() {
  try {
    // ストリームを反復処理のためにリーダーに変換
    for await (const chunk of stream.values()) {
      console.log(`受信データ: ${chunk}`);
    }
    console.log('ストリームからの読み取りが完了しました');
  } catch (error) {
    console.error('読み取りエラー:', error);
  }
}

readWithForAwait();

実践的な例:Fetch API との連携

fetch()ReadableStream を返すため、大きなファイルを効率的に処理できます:

async function processLargeFile() {
  const response = await fetch('https://example.com/large-file.txt');
  const reader = response.body.getReader();
  
  let receivedLength = 0;
  const chunks = [];
  
  while (true) {
    const { done, value } = await reader.read();
    
    if (done) {
      break;
    }
    
    chunks.push(value);
    receivedLength += value.length;
    console.log(`受信済み: ${receivedLength} バイト`);
  }
  
  // 受信したチャンクを連結
  const allChunks = new Uint8Array(receivedLength);
  let position = 0;
  
  for (const chunk of chunks) {
    allChunks.set(chunk, position);
    position += chunk.length;
  }
  
  // バイナリデータをテキストに変換
  const result = new TextDecoder().decode(allChunks);
  console.log(`受信完了: ${result.length} 文字`);
}

変換とパイピング

TransformStream を使った変換

ReadableStreamTransformStream にパイプして、データを処理できます:

const textStream = new ReadableStream({
  start(controller) {
    const lines = ['これは', '複数行の', 'テキストです'];
    for (const line of lines) {
      controller.enqueue(line);
    }
    controller.close();
  }
});

// 大文字に変換するトランスフォーマー
const uppercaseTransformer = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

// ストリームをパイプして変換
const uppercaseStream = textStream.pipeThrough(uppercaseTransformer);

// 結果を読み取る
const reader = uppercaseStream.getReader();
reader.read().then(function process({ done, value }) {
  if (done) return;
  console.log(value); // 大文字に変換されたテキスト
  return reader.read().then(process);
});

WritableStream へのパイピング

ReadableStream から WritableStream へデータを直接パイプできます:

const readableStream = new ReadableStream({
  start(controller) {
    for (let i = 0; i < 5; i++) {
      controller.enqueue(`チャンク ${i}`);
    }
    controller.close();
  }
});

const writableStream = new WritableStream({
  write(chunk) {
    console.log(`書き込み: ${chunk}`);
    return new Promise(resolve => setTimeout(resolve, 100)); // 書き込み遅延をシミュレート
  },
  close() {
    console.log('書き込みが完了しました');
  }
});

// readableStream から writableStream へパイプ
readableStream.pipeTo(writableStream)
  .then(() => console.log('パイピング完了'))
  .catch(err => console.error('パイピングエラー:', err));

テキストとバイナリデータの処理

TextDecoderStream を使ったテキスト処理

fetch('https://example.com/data.txt')
  .then(response => {
    const textStream = response.body
      .pipeThrough(new TextDecoderStream()) // バイト → テキスト
      .pipeThrough(new TransformStream({
        transform(chunk, controller) {
          // 行ごとに処理
          const lines = chunk.split('\n');
          for (const line of lines) {
            if (line) controller.enqueue(line);
          }
        }
      }));
      
    // 行ごとに処理
    const reader = textStream.getReader();
    return readLines(reader);
  });

async function readLines(reader) {
  let result = [];
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      result.push(value);
    }
  } finally {
    reader.releaseLock();
  }
  return result;
}

バイナリデータの処理

async function processBinaryStream() {
  const response = await fetch('https://example.com/image.jpg');
  const contentLength = response.headers.get('Content-Length');
  const total = parseInt(contentLength, 10);
  
  let loaded = 0;
  
  // プログレスを報告するトランスフォーマー
  const progressTransformer = new TransformStream({
    transform(chunk, controller) {
      loaded += chunk.length;
      console.log(`ダウンロード進捗: ${Math.round(loaded / total * 100)}%`);
      controller.enqueue(chunk);
    }
  });
  
  // プログレス付きのストリーム
  const progressStream = response.body.pipeThrough(progressTransformer);
  
  // チャンクを集めてバイナリデータを構築
  const reader = progressStream.getReader();
  const chunks = [];
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    chunks.push(value);
  }
  
  // 完全な Blob を作成
  const blob = new Blob(chunks);
  console.log(`ダウンロード完了: ${blob.size} バイト`);
  return blob;
}

カスタムキューイング戦略

ReadableStream はキューイング戦略を使用してバックプレッシャーを管理します:

// 10MBの高水準マークを持つバイト長キューイング戦略
const hugeStream = new ReadableStream({
  start(controller) {
    // 大量のデータをエンキュー
  },
  pull(controller) {
    // データの生成ロジック
  }
}, new ByteLengthQueuingStrategy({
  highWaterMark: 10 * 1024 * 1024 // 10MB
}));

// 最大50チャンクのカウントキューイング戦略
const countStream = new ReadableStream({
  /* ソースロジック */
}, new CountQueuingStrategy({
  highWaterMark: 50 // 最大50チャンク
}));

実際のアプリケーション例

ファイルアップローダーの進捗表示

async function uploadFileWithProgress(file, url, progressCallback) {
  // ファイルを ReadableStream に変換
  const fileStream = file.stream();
  
  // 進捗を追跡するトランスフォーマー
  const progressTracker = new TransformStream({
    transform(chunk, controller) {
      this.loaded = (this.loaded || 0) + chunk.length;
      progressCallback(this.loaded, file.size);
      controller.enqueue(chunk);
    }
  });
  
  // 進捗付きのストリーム
  const trackedStream = fileStream.pipeThrough(progressTracker);
  
  // ストリームを使用してファイルをアップロード
  await fetch(url, {
    method: 'POST',
    body: trackedStream,
    headers: {
      'Content-Type': file.type,
      'Content-Length': file.size.toString()
    }
  });
  
  return { success: true, fileName: file.name };
}

// 使用例
const fileInput = document.querySelector('input[type="file"]');
fileInput.addEventListener('change', async () => {
  const file = fileInput.files[0];
  const progressBar = document.querySelector('.progress-bar');
  
  await uploadFileWithProgress(
    file,
    'https://example.com/upload',
    (loaded, total) => {
      const percent = Math.round((loaded / total) * 100);
      progressBar.style.width = `${percent}%`;
      progressBar.textContent = `${percent}%`;
    }
  );
  
  console.log('アップロード完了');
});

データのリアルタイム処理(WebSocket 例)

function createWebSocketStream(url) {
  // WebSocket 接続を ReadableStream として提供
  return new ReadableStream({
    start(controller) {
      this.socket = new WebSocket(url);
      
      this.socket.onmessage = event => {
        // メッセージを受信したらエンキュー
        controller.enqueue(event.data);
      };
      
      this.socket.onclose = () => {
        // 接続が閉じられたらストリームも閉じる
        controller.close();
      };
      
      this.socket.onerror = error => {
        // エラーが発生したらストリームにエラーを発生させる
        controller.error(error);
      };
    },
    
    cancel() {
      // ストリームがキャンセルされたら WebSocket も閉じる
      this.socket.close();
    }
  });
}

// 使用例:株価のリアルタイム処理
const stockStream = createWebSocketStream('wss://example.com/stocks')
  .pipeThrough(new TextDecoderStream())
  .pipeThrough(new TransformStream({
    transform(chunk, controller) {
      // JSON 文字列をパース
      try {
        const data = JSON.parse(chunk);
        // 特定の株価が閾値を超えた場合のみエンキュー
        if (data.price > 1000) {
          controller.enqueue(data);
        }
      } catch (e) {
        console.error('不正な JSON:', chunk);
      }
    }
  }));

// フィルタリングされた株価データを処理
const reader = stockStream.getReader();

(async function processStocks() {
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      // 高価格の株を処理
      console.log(`${value.symbol}: ${value.price}円`);
      updateStockChart(value); // UIの更新関数
    }
  } catch (error) {
    console.error('ストック処理エラー:', error);
  } finally {
    console.log('ストック処理完了');
  }
})();

ブラウザ互換性とポリフィル

ReadableStream は最新のブラウザで広くサポートされていますが、古いブラウザでは動作しない場合があります。互換性の問題に対処するには、何種類かのポリフィルが利用可能です:

// ポリフィルが必要かどうか確認
if (typeof ReadableStream === 'undefined') {
  // ポリフィルをロード
  import('web-streams-polyfill')
    .then(() => {
      console.log('ReadableStream ポリフィルがロードされました');
      // ストリーム処理コード
    });
} else {
  // ネイティブ実装を使用
  processStreams();
}

パフォーマンスとメモリ効率

ReadableStream を使用することで、特に大きなデータセットを扱う場合に以下のメリットが得られます:

  • メモリ使用量の削減(一度にすべてのデータをメモリに読み込む必要がない)
  • 初期表示の高速化(最初のチャンクが到着したらすぐに処理を開始できる)
  • ユーザー体験の向上(処理の進捗を表示できる)

以下の表は、異なるアプローチのメモリ使用量とレスポンス時間を比較したものです:

データサイズ 方法 メモリピーク 初期表示までの時間
100MB 一括ダウンロード 約100MB 全体をダウンロード後
100MB ReadableStream 約2MB 最初のチャンク到着後
1GB 一括ダウンロード 約1GB+ 全体をダウンロード後
1GB ReadableStream 約5MB 最初のチャンク到着後

まとめ

ReadableStream は、大規模なデータセットを効率的に処理するための強力なツールです。主な利点は次のとおりです:

  • チャンク単位でのデータ処理によるメモリ効率の向上
  • 標準化されたインターフェイスによる様々なデータソースの統合
  • バックプレッシャーの自動管理によるメモリオーバーフローの防止
  • 変換ストリームによる柔軟なデータ処理パイプラインの構築
  • 非同期プログラミングモデルとの自然な統合

様々なブラウザ API(FetchFileWebSocket など)と組み合わせることで、効率的なデータ処理アプリケーションを構築できます。ストリームのコンセプトを理解し、適切に活用することで、特に大規模データや継続的なデータフローを扱うアプリケーションのパフォーマンスと応答性を大幅に向上させることができます。

カテゴリ:JavaScript
カテゴリ:JavaScript