ストリーミングは、8 秒間まったく反応しないチャットボックスと、1 秒も かからずに文字を打ち始めるチャットボックスとの違いそのものです。仕組みは、 base_url の背後にあるモデルが Claude でも Gemini でも GPT でも 変わりません。stream=True を設定し、チャンクを反復処理し、 それぞれの delta を読み、[DONE] という終端マーカーで止めるだけです。Brievio は OpenAI の Chat Completions プロトコルを話すので、まったく同じループがすべての本物のファーストパーティ モデルで動作します — 変えるのは model の文字列だけで、 ほかには何もありません。
この記事では、OpenAI 互換エンドポイント越しに Server-Sent Events ストリーミングが実際どう動くのか、stream_options を使って最後のチャンクで正確なトークン使用量を得る方法、Python と Node で 同一になるパターン、そしてストリームが見た目は問題なさそうに 見えながら静かに裏切ってくる 2 つのサイレントな故障モード — 偽の (バッファリングされた)ストリーミングと、欠落した usage — を取り上げます。
HTTP 上で「ストリーミング」とは何を意味するのか
ストリーミングしない呼び出しは、1 リクエスト・1 レスポンスです。サーバーが 数秒考え、それから補完結果をまとめて一度に渡してきます。ストリーミングは HTTP 接続を開いたまま保ち、モデルが生成するそばから答えを少しずつ 押し出してきます。これに使われるのが Server-Sent Events (SSE) です。通信路の上では、各断片は data: で始まる行に JSON オブジェクトが続く形で届き、ストリームは data: [DONE] というそのままの行で終わります。
そのテキストを自分でパースすることはまずありません — SDK がやってくれます。 コードで手にするのはチャンクの iterable です。各チャンクは通常の 補完オブジェクトとほぼ同じですが、内容が choices[0].message ではなく choices[0].delta に入っており、前のチャンク以降に 生成された断片だけを保持しています。すべての delta.content を順番に連結すれば、完全なメッセージが復元できます。ここで重要になる 唯一の指標が 最初のトークンまでの時間 (TTFB) です。 最初の空でない delta が現れるまでにどれだけかかるか。この数字こそが、 ストリーミングする理由のすべてです。
Python のパターン
これが全体像です — usage も取得する、本物のストリーミングループ。 Brievio に固有なのは base_url の行だけです。
from openai import OpenAI
client = OpenAI(
api_key="sk-brievio-...",
base_url="https://api.brievio.com/v1", # ひとつの base_url、本物のファーストパーティモデル
)
# stream=True にすると、レスポンスが Server-Sent Events ストリームに切り替わる。
# オブジェクトを反復処理し、各要素が部分的な "delta" を持つ 1 チャンクになる。
stream = client.chat.completions.create(
model="claude-sonnet-4-6", # または gemini-2.5-flash、gpt-... など
messages=[{"role": "user", "content": "Explain Raft consensus in 200 words."}],
stream=True,
stream_options={"include_usage": True}, # 最後のチャンクで usage を返すよう要求する
)
usage = None
for chunk in stream:
# [DONE] の直前の最後のデータイベントが usage を運び、choices は空のリストになる。
if chunk.usage is not None:
usage = chunk.usage
continue
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True) # 届いたそばからトークンを描画する
print()
# usage が入っているのは include_usage を設定したからにほかならない。これは本物の数値。
print(usage.prompt_tokens, usage.completion_tokens, usage.total_tokens)多くの人がつまずく 3 つの細部があります。第一に、content の delta は 一部のチャンクで None や空になりえます(先頭のチャンクは role を設定するだけのことが多い)。なので print する前にガードしてください。 第二に、usage を運ぶチャンクは content が終わったあとに 来て、choices は空のリストになります — 例で chunk.usage を先にチェックして continue しているのはそのためです。第三に、[DONE] を自分で探す必要はありません。SDK がその終端マーカーを消費して、反復処理を 終わらせてくれます。もし生の requests や fetch でエンドポイントを呼んでいるなら、改行で分割し [DONE] で手動でループを抜けることになります。
Node でも同じループ
Node SDK はストリームを async iterable として公開するので、構造は 同一です — for の代わりに for await ... of を、 flush する print の代わりに process.stdout.write を使います。
import OpenAI from "openai";
const client = new OpenAI({
apiKey: "sk-brievio-...",
baseURL: "https://api.brievio.com/v1",
});
// Node でも同じ契約: stream=true はチャンクの async iterable を返す。
const stream = await client.chat.completions.create({
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: "Explain Raft consensus in 200 words." }],
stream: true,
stream_options: { include_usage: true },
});
let usage = null;
for await (const chunk of stream) {
// 最後のイベント: choices は空で、usage が入っている。
if (chunk.usage) {
usage = chunk.usage;
continue;
}
const delta = chunk.choices[0]?.delta?.content;
if (delta) process.stdout.write(delta); // 各トークンをターミナルへ flush する
}
console.log();
console.log(usage?.prompt_tokens, usage?.completion_tokens, usage?.total_tokens);オプショナルチェイニング(chunk.choices[0]?.delta?.content)に注目してください。usage を運ぶ最後のチャンクでは choices が空なので、ガードなしで [0] を添字参照すると、まさに ゴール直前で例外を投げてしまいます。レスポンス全体は完璧に動いて いるように見えたのに、Node のストリーミングハンドラが最後のイベントで クラッシュする — その最も多い原因がこれです。
最後のチャンクで本物の usage を得る
デフォルトでは、ストリーミングレスポンスにトークン数は含まれません — どのチャンクにも usage オブジェクトはありません。これは OpenAI プロトコルの意図的な仕様であり、本番でストリーミングしておきながら 請求と突き合わせができないチームを苦しめます。直し方はパラメータひとつです。
stream_options={"include_usage": True}(Python)またはstream_options: { include_usage: true }(Node)を設定する。- するとサーバーは
[DONE]の直前に追加のチャンクを 1 つ 発行します。そのチャンクはchoicesが空で、usageにprompt_tokens、completion_tokens、total_tokensを保持しています。 - Brievio では、これらはモデルが報告する本物の数値です — ストリーミングしない呼び出しで得られるのと同じ数字で、公式レートより おおよそ 15% 安く課金されます。水増しされた usage オブジェクトも、 入力側を膨らませる注入されたシステムプロンプトもありません。
include_usage を省いたうえでなおトークンの概算が必要なら、 唯一の手段はモデルのトークナイザーでローカルに数えることです — これは 近似でしかなく、メンテナンスの負担にもなります。フラグを設定するだけに してください。
サイレントな破綻: 偽のストリーミングと欠落した usage
ざっと目視するテストはすり抜け、よく調べたときにだけ姿を現す故障モードが 2 つあります。どちらも、本物のトラフィックをゲートウェイに任せる前に 20 秒のチェックをする価値があります。
- バッファリングされた「偽の」ストリーミング。 ゲートウェイによっては、
stream=Trueを受け付けたうえで 上流の補完全体を待ち、それから最後にまとめてチャンクの束として 再生してきます。ループは走り、delta は届き、何もかもストリーミングされて いるように見えます — けれども TTFB はストリーミングしない呼び出しと まったく同じです。モデルが終わるまで何も送られていないからです。 見分け方は単純です。リクエストを送ってから最初の空でない delta までの 間隔を計ってください。本物のストリーミングなら 1 秒を十分に下回りますが、 バッファリングされた再生では生成全体にかかった時間と等しくなります。 最初のトークンの遅延が全体の遅延に連動しているなら、それはストリーミング ではなく、録画を見ているだけです。 - 欠落した、またはでっち上げられた usage。
include_usageを尊重しないゲートウェイは、ストリーミング 呼び出しでトークン数を一切返してくれません — つまり請求書を何もない空気と 突き合わせることになります。さらに悪いのは、不誠実なゲートウェイは膨らませた 数値を載せたusageオブジェクトを付けることができる点です。 ストリームではクライアントが数え直すことはまずないからです。地味なやり方で 検証してください。同じプロンプトをストリーミングありで 1 回、なしで 1 回走らせ、ストリーミングの最終チャンクの usage が、ストリーミングしない ときのusageと一致することを確かめます。両者は同一になるはずです。 - きれいな終了に見えるストリーム途中のエラー。 上流のモデルが途中でエラーになった場合、正しいゲートウェイはそれを サイレントな打ち切りではなく、ループ内の例外として表面化させます。 テキストを完成品として扱う前に、finish reason(または usage チャンク)を 受け取ったかどうかを必ず確認してください — ただ止まっただけのストリームは、 きちんと終わったストリームとは別物です。
要点
OpenAI 互換エンドポイント越しのストリーミングは、4 つの可動部品です。 stream=True、チャンクの反復処理、各 delta の読み取り、 そして [DONE] の処理は SDK に任せること。さらに stream_options={"include_usage": True} を加えれば、最後のチャンクで正直なトークン数も手に入ります。この同じ 15 行が、 ひとつの base_url の背後にある Claude Sonnet 4.6、Gemini 2.5 Flash、GPT ファミリーで、そのまま動きます — モデルの文字列を入れ替えても、 ループはそのままです。
出荷する前に、最初のトークンまでの時間を計測し、ストリーミングあり/なしの usage を突き合わせてください。本物のストリーミングなら 1 秒未満の TTFB と 一致する数値が得られ、バッファリングされた再生は遅延でそれを露呈します。 Brievio では失敗した 4xx/5xx の呼び出しは課金されないので、これらのチェックを無料で実行できます。完全な パラメータ一覧は Chat Completions リファレンスを、同じ ストリーム上でのツールやビジョンについては残りの API ドキュメントを、ストリーミングしない基礎に ついては OpenAI SDK で Claude を呼び出すガイド を、このループを向けられるあらゆるスラッグについては モデルカタログをご覧ください。