JavaScript/CountQueuingStrategy

CountQueuingStrategy オブジェクト

概要

CountQueuingStrategy は JavaScript のストリーム API における重要なクラスで、チャンクの数に基づいてストリームのバックプレッシャーを制御します。ByteLengthQueuingStrategy がチャンクのバイトサイズを考慮するのに対し、CountQueuingStrategy はシンプルにチャンクの数だけを数えます。

基本的な使い方

CountQueuingStrategy オブジェクトは、チャンクの数を追跡し、キューに入れられるチャンクの最大数を制限します。

const readableStream = new ReadableStream({
  start(controller) {
    this.counter = 0;
  },
  pull(controller) {
    if (this.counter < 100) {
      controller.enqueue(`チャンク ${this.counter++}`);
    } else {
      controller.close();
    }
  }
}, new CountQueuingStrategy({ highWaterMark: 10 }));

この例では、キュー内に最大10個のチャンクを許容するストリームを作成しています。キュー内のチャンク数が10を超えると、pull メソッドは一時的に呼び出されなくなります。

コンストラクタ

CountQueuingStrategy のコンストラクタは、次のようなオプションオブジェクトを受け取ります:

new CountQueuingStrategy({ highWaterMark });

highWaterMark パラメータは必須で、キューに許容される最大チャンク数を指定します。

メソッド

CountQueuingStrategy オブジェクトには以下のメソッドがあります:

// チャンクのサイズを計算するメソッド
const size = countStrategy.size(chunk);

size() メソッドは、どのようなチャンクに対しても常に 1 を返します。これにより、チャンクのサイズに関係なく、単純にチャンクの数だけをカウントします。

実際の使用例

テキストデータを処理する例を見てみましょう:

// テキスト行を生成する関数
function* generateLines() {
  for (let i = 0; i < 100; i++) {
    yield `これは行 ${i + 1} のテキストデータです。`;
  }
}

// ReadableStream の作成
const lineGenerator = generateLines();
const readableStream = new ReadableStream({
  pull(controller) {
    const { value, done } = lineGenerator.next();
    if (!done) {
      controller.enqueue(value);
    } else {
      controller.close();
    }
  }
}, new CountQueuingStrategy({ highWaterMark: 5 })); // 最大5行をキューに保持

// WritableStream の作成
const writableStream = new WritableStream({
  write(chunk) {
    console.log(`処理中: ${chunk}`);
    // 書き込み処理のシミュレーション(例: ネットワーク遅延)
    return new Promise(resolve => setTimeout(resolve, 200));
  }
}, new CountQueuingStrategy({ highWaterMark: 2 })); // 最大2行をキューに保持

// ストリームのパイピング
readableStream.pipeTo(writableStream)
  .then(() => console.log('すべての行が処理されました'))
  .catch(err => console.error('エラーが発生しました:', err));

この例では、ReadableStream は最大5行をキューに保持し、WritableStream は最大2行をキューに保持します。これにより、メモリ使用量を制御しながら、データの生成速度と消費速度のバランスを取ります。

ストリームの変換と CountQueuingStrategy

TransformStream でも CountQueuingStrategy を使用できます:

// 大文字に変換する TransformStream
const upperCaseTransformer = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
}, 
new CountQueuingStrategy({ highWaterMark: 3 }), // 読み取り側のキューイング戦略
new CountQueuingStrategy({ highWaterMark: 1 })  // 書き込み側のキューイング戦略
);

// 使用例
fetch('https://example.com/data.txt')
  .then(response => response.body
    .pipeThrough(new TextDecoderStream())     // テキストにデコード
    .pipeThrough(upperCaseTransformer)        // 大文字に変換
    .pipeTo(new WritableStream({
      write(chunk) {
        console.log(chunk);
      }
    }))
  );

この例では、テキストストリームを大文字に変換し、入力側と出力側で異なるキューサイズを設定しています。

ByteLengthQueuingStrategy との比較

以下の表は、両方のキューイング戦略の具体的な使用シナリオを示しています:

シナリオ CountQueuingStrategy ByteLengthQueuingStrategy
テキスト行の処理 適切(行数で制御) 過剰(行の長さにばらつきがある場合)
画像ストリーム 不適切(画像サイズが大きく異なる場合) 適切(メモリ使用量を正確に制御)
均一なオブジェクト 適切(シンプルで効率的) 必要以上に複雑
ビデオフレーム 場合により適切(フレーム数で制御) 適切(各フレームのサイズで制御)
JSONオブジェクト 小さくて均一な場合に適切 サイズが大きく異なる場合に適切

カスタム実装

独自のカウントベースのキューイング戦略を実装することも可能です:

// 特定の条件下でのみチャンクをカウントするカスタム戦略
const conditionalCountStrategy = {
  highWaterMark: 20,
  size(chunk) {
    // 例: 重要なチャンクのみカウント
    return chunk.priority === 'high' ? 1 : 0;
  }
};

const writableStream = new WritableStream({
  write(chunk) {
    console.log(`チャンク処理中: ${JSON.stringify(chunk)}`);
    return Promise.resolve();
  }
}, conditionalCountStrategy);

この例では、priority プロパティが 'high' のチャンクのみをカウントするカスタム戦略を実装しています。

パフォーマンスの考慮事項

CountQueuingStrategy は計算オーバーヘッドが非常に小さいため、パフォーマンスが重要な場合に適しています。各チャンクは常に「1」としてカウントされるため、サイズ計算のための追加処理は必要ありません。

以下のシナリオでは CountQueuingStrategy が特に有用です:

  1. 均一なサイズのデータを処理する場合
  2. 処理速度がチャンク数に比例する場合
  3. メモリ使用量より処理のシンプルさを優先する場合
  4. データのサイズが予測不可能または計算が難しい場合

ストリーム処理を設計する際には、データの性質と処理要件に基づいて、CountQueuingStrategyByteLengthQueuingStrategy のどちらが適切かを判断することが重要です。データの性質に合わせた適切な highWaterMark 値を設定することで、メモリ効率とパフォーマンスのバランスを最適化できます。

カテゴリ:JavaScript
カテゴリ:JavaScript