Files
xserv/tools/bench/client.py
Gahow Wang 7cb9ee3870 bench: run one server at a time, match thinking mode, fix tools package
Refinements from end-to-end bring-up on the GPU host:

- Run each system start→suites→stop in sequence. Two BF16 8B models don't
  co-reside on one 32GB GPU, and a resident idle engine would distort the
  other's latency/throughput.
- Match generation mode: xserv hardcodes Qwen3 thinking off, so send
  chat_template_kwargs={enable_thinking:false} to llama.cpp via a per-endpoint
  extra_body. --enable-thinking opts back into thinking mode.
- Add tools/__init__.py so `python3 -m tools.bench.runner` resolves our package
  instead of a site-packages `tools` (nvfuser ships one that shadowed it).
- Document offline-GPU-host workflow, thinking-match, and the xserv 8192 OOM
  finding that the bench surfaced.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 11:40:07 +08:00

159 lines
5.5 KiB
Python

"""HTTP client for OpenAI-compatible /v1/chat/completions.
Records per-request: TTFT (time to first content token), TPOT (mean
inter-token latency over the decode phase), and end-to-end throughput.
We don't care about parsing exact OpenAI envelope semantics, just enough to
get the deltas + finish_reason + token counts.
"""
from __future__ import annotations
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Any
import httpx
@dataclass
class StreamResult:
text: str = ""
completion_tokens: int = 0
prompt_tokens: int = 0
finish_reason: str | None = None
# Timings (seconds; -1 means not measured)
ttft_s: float = -1.0
e2e_s: float = -1.0
chunk_times: list[float] = field(default_factory=list) # absolute monotonic times of content chunks
error: str | None = None
@property
def tpot_s(self) -> float:
"""Mean inter-content-chunk latency after the first chunk (seconds/token)."""
if len(self.chunk_times) < 2:
return -1.0
deltas = [self.chunk_times[i] - self.chunk_times[i - 1] for i in range(1, len(self.chunk_times))]
return sum(deltas) / len(deltas)
@property
def throughput_tok_s(self) -> float:
if self.e2e_s <= 0 or self.completion_tokens <= 0:
return -1.0
return self.completion_tokens / self.e2e_s
async def chat_stream(
client: httpx.AsyncClient,
base_url: str,
model: str,
messages: list[dict[str, str]],
*,
max_tokens: int,
temperature: float = 0.0,
api_key: str | None = None,
timeout: float = 1800.0,
extra_body: dict | None = None,
) -> StreamResult:
payload: dict[str, Any] = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": True,
}
# llama-server returns usage in the final stream chunk when this is set;
# xserv ignores unknown fields, so this is harmless there.
payload["stream_options"] = {"include_usage": True}
if extra_body:
payload.update(extra_body)
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
url = base_url.rstrip("/") + "/v1/chat/completions"
res = StreamResult()
t_start = time.perf_counter()
try:
async with client.stream(
"POST", url, json=payload, headers=headers, timeout=timeout,
) as resp:
if resp.status_code != 200:
body = await resp.aread()
res.error = f"HTTP {resp.status_code}: {body.decode(errors='replace')[:400]}"
res.e2e_s = time.perf_counter() - t_start
return res
async for line in resp.aiter_lines():
if not line or not line.startswith("data:"):
continue
data = line[len("data:"):].strip()
if data == "[DONE]":
break
try:
chunk = json.loads(data)
except json.JSONDecodeError:
continue
if "usage" in chunk and chunk["usage"]:
usage = chunk["usage"]
res.prompt_tokens = usage.get("prompt_tokens", res.prompt_tokens)
res.completion_tokens = usage.get("completion_tokens", res.completion_tokens)
choices = chunk.get("choices") or []
if not choices:
continue
choice = choices[0]
delta = choice.get("delta") or {}
content = delta.get("content")
if content:
now = time.perf_counter()
if res.ttft_s < 0:
res.ttft_s = now - t_start
res.chunk_times.append(now)
res.text += content
if choice.get("finish_reason"):
res.finish_reason = choice["finish_reason"]
except Exception as e: # noqa: BLE001 — surface any failure to the report
res.error = f"{type(e).__name__}: {e}"
res.e2e_s = time.perf_counter() - t_start
# Fall back to chunk count when server doesn't report usage (xserv stream path).
if res.completion_tokens == 0:
res.completion_tokens = len(res.chunk_times)
return res
async def chat_concurrent(
base_url: str,
model: str,
prompts: list[list[dict[str, str]]],
*,
max_tokens: int,
temperature: float = 0.0,
api_key: str | None = None,
timeout: float = 1800.0,
concurrency: int,
extra_body: dict | None = None,
) -> tuple[list[StreamResult], float]:
"""Fire `concurrency` requests in parallel waves. Returns per-request results
plus wall-clock elapsed time of the entire batch."""
sem = asyncio.Semaphore(concurrency)
limits = httpx.Limits(max_connections=concurrency * 2, max_keepalive_connections=concurrency)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
async def one(messages: list[dict[str, str]]) -> StreamResult:
async with sem:
return await chat_stream(
client, base_url, model, messages,
max_tokens=max_tokens, temperature=temperature,
api_key=api_key, timeout=timeout, extra_body=extra_body,
)
t0 = time.perf_counter()
results = await asyncio.gather(*(one(p) for p in prompts))
wall = time.perf_counter() - t0
return results, wall