JavaScript/TransformStream

TransformStreamオブジェクト

はじめに

TransformStreamは、JavaScriptのStreams APIの一部であり、データのストリーミング処理において重要な役割を果たします。入力ストリームからデータを受け取り、それを変換して出力ストリームに送信するパイプラインを構築することができます。

基本概念

TransformStreamは、読み取り可能なストリームと書き込み可能なストリームの間に位置し、データの変換を担当します。これにより、ストリームデータに対してさまざまな操作(フィルタリング、マッピング、集約など)を実行できます。

TransformStreamの構造

TransformStreamは次の3つの主要コンポーネントで構成されています:

  1. readable - 変換されたデータを読み取るためのReadableStream
  2. writable - 変換するデータを書き込むためのWritableStream
  3. transformer - データ変換のロジックを定義するオブジェクト

TransformStreamの作成

TransformStreamを作成するには、transformerオブジェクトを定義する必要があります。このオブジェクトには、次のメソッドを実装できます:

const transformStream = new TransformStream({
  start(controller) {
    // 変換の初期化処理
  },
  transform(chunk, controller) {
    // 各チャンクの変換処理
    controller.enqueue(transformedChunk);
  },
  flush(controller) {
    // 変換の終了処理
  }
});

各メソッドの役割は以下の通りです:

メソッド 説明 引数 任意/必須
start 変換の初期化時に一度だけ呼び出される controller 任意
transform 入力チャンクごとに呼び出される chunk, controller 必須
flush 入力ストリームが閉じられた後に呼び出される controller 任意

実用的な例

テキスト大文字変換

次の例では、テキストストリームの各文字を大文字に変換します:

async function uppercaseExample() {
  try {
    const response = await fetch('https://example.com/sometext.txt');

    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }

    // レスポンスボディをTransformStreamに通して大文字に変換
    const transformedStream = response.body
      .pipeThrough(new TextDecoderStream()) // バイトをテキストに変換
      .pipeThrough(new TransformStream({ // テキストを大文字に変換
        transform(chunk, controller) {
          controller.enqueue(chunk.toUpperCase());
        }
      }))
      .pipeThrough(new TextEncoderStream()); // テキストをバイトに戻す

    // 結果の処理
    const reader = transformedStream.getReader();
    const chunks = []; // チャンクを格納する配列
    const textDecoder = new TextDecoder(); // TextDecoderを再利用

    for (;;) {
      const {
        done,
        value
      } = await reader.read();
      if (done) break;
      chunks.push(textDecoder.decode(value));
    }

    const result = chunks.join(''); // チャンクを連結
    console.log(result); // 大文字に変換されたテキスト

  } catch (error) {
    console.error('Error:', error);
  }
}

uppercaseExample();

JSONフィルタリング

この例では、JSONデータのストリームからある条件に基づいて項目をフィルタリングします:

const jsonFilterTransformer = new TransformStream({
  transform(chunk, controller) {
    try {
      const data = JSON.parse(chunk);
      
      // 例: scoreが80以上のオブジェクトのみをフィルタリング
      if (data.score >= 80) {
        controller.enqueue(JSON.stringify(data) + '\n');
      }
    } catch (error) {
      console.error('JSON解析エラー:', error);
    }
  }
});

async function filterJsonData() {
  const response = await fetch('https://example.com/data.json');
  
  // JSONデータをフィルタリング
  const filteredStream = response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(jsonFilterTransformer);
  
  const reader = filteredStream.getReader();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log('フィルタリングされたデータ:', value);
  }
}

画像処理の例

WebAssemblyと組み合わせることで、画像処理などの複雑な変換も可能です:

// 画像処理のためのTransformStream
const imageProcessor = new TransformStream({
  async start(controller) {
    // WebAssemblyモジュールの読み込み(例)
    this.wasm = await WebAssembly.instantiateStreaming(
      fetch('/image-processor.wasm')
    );
  },
  async transform(chunk, controller) {
    // WAMSを使って画像データを処理
    const processedImageData = this.wasm.instance.exports.processImage(chunk);
    controller.enqueue(processedImageData);
  }
});

async function processImageStream() {
  const response = await fetch('https://example.com/large-image.jpg');
  const processedStream = response.body.pipeThrough(imageProcessor);
  
  // 処理された画像データをBlobに変換
  const reader = processedStream.getReader();
  const chunks = [];
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    chunks.push(value);
  }
  
  const blob = new Blob(chunks, { type: 'image/jpeg' });
  const url = URL.createObjectURL(blob);
  
  // 画像を表示(例)
  const img = document.createElement('img');
  img.src = url;
  document.body.appendChild(img);
}

高度な使用法

バックプレッシャー制御

TransformStreamは自動的にバックプレッシャーを処理します。これにより、入力が出力より速い場合に、書き込み側がブロックされ、システムメモリの溢れを防ぎます:

// 遅延処理のシミュレーション
const slowTransformer = new TransformStream({
  async transform(chunk, controller) {
    // 重い処理をシミュレート
    await new Promise(resolve => setTimeout(resolve, 500));
    controller.enqueue(`処理済み: ${chunk}`);
  }
});

// バックプレッシャーが効いているのを確認するために
// 書き込みを高速で行う
async function demonstrateBackpressure() {
  const writer = slowTransformer.writable.getWriter();
  const reader = slowTransformer.readable.getReader();
  
  // 読み取りループを開始(バックグラウンドで)
  const readPromise = (async () => {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      console.log('読み取り:', value);
    }
  })();
  
  console.time('書き込み時間');
  
  // 高速で書き込み - バックプレッシャーによって自動的に制御される
  for (let i = 0; i < 10; i++) {
    console.log(`書き込み #${i} 開始`);
    await writer.write(`チャンク ${i}`);
    console.log(`書き込み #${i} 完了`);
  }
  
  console.timeEnd('書き込み時間');
  writer.close();
}

エラー処理

TransformStreamでのエラー処理は重要です。transformメソッド内での例外は自動的に伝播します:

const errorHandlingTransformer = new TransformStream({
  transform(chunk, controller) {
    try {
      if (chunk === 'error') {
        throw new Error('エラーを発生させるチャンク');
      }
      controller.enqueue(`変換済み: ${chunk}`);
    } catch (error) {
      controller.error(error); // エラーをストリームに伝播
    }
  }
});

async function handleErrors() {
  const { writable, readable } = errorHandlingTransformer;
  
  // 読み取り側で、エラーを処理
  const reader = readable.getReader();
  
  // 書き込み側
  const writer = writable.getWriter();
  
  try {
    await writer.write('正常なデータ');
    await writer.write('error'); // エラー発生
    await writer.write('これは処理されない');
  } catch (error) {
    console.error('書き込み側でエラーを捕捉:', error);
  }
  
  // 読み取り側
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      console.log(value);
    }
  } catch (error) {
    console.error('読み取り側でエラーを捕捉:', error);
  }
}

パフォーマンスと考慮事項

TransformStreamを使用する際のパフォーマンスの考慮事項は次のとおりです:

考慮事項 説明
チャンクサイズ 大きすぎるチャンクは処理遅延を招き、小さすぎるとオーバーヘッドが増加
メモリ使用量 大量のデータを一時的に保持するよりも、チャンク単位で処理する
計算複雑性 複雑な変換は別スレッド(Web Worker)での実行を検討する
非同期処理 async/awaitを使用できるが、必要以上に待機しないように注意

ブラウザ互換性

TransformStreamは比較的新しいAPIですが、主要なモダンブラウザでサポートされています:

ブラウザ バージョン
Chrome 67以上
Firefox 102以上
Safari 14.1以上
Edge 79以上

古いブラウザをサポートする必要がある場合は、polyfillの使用を検討してください。

まとめ

TransformStreamは、ストリームデータの処理において強力かつ柔軟なツールです。メモリ効率が良く、大量のデータを扱う際にも適しています。適切に使用することで、ウェブアプリケーションのパフォーマンスとユーザーエクスペリエンスを向上させることができます。

ストリーム処理のパイプラインを構築することで、複雑なデータ変換を効率的に実装できます。また、自動的なバックプレッシャー制御機能により、安全にストリームを扱うことができます。

カテゴリ:JavaScript
カテゴリ:JavaScript