Refinements from end-to-end bring-up on the GPU host:
- Run each system start→suites→stop in sequence. Two BF16 8B models don't
co-reside on one 32GB GPU, and a resident idle engine would distort the
other's latency/throughput.
- Match generation mode: xserv hardcodes Qwen3 thinking off, so send
chat_template_kwargs={enable_thinking:false} to llama.cpp via a per-endpoint
extra_body. --enable-thinking opts back into thinking mode.
- Add tools/__init__.py so `python3 -m tools.bench.runner` resolves our package
instead of a site-packages `tools` (nvfuser ships one that shadowed it).
- Document offline-GPU-host workflow, thinking-match, and the xserv 8192 OOM
finding that the bench surfaced.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
148 lines
4.9 KiB
Python
148 lines
4.9 KiB
Python
"""Quality suite — run dataset tasks against each system, score, report.
|
|
|
|
Each task module exposes the same surface:
|
|
load() -> list[{id, problem, answer, source}]
|
|
make_messages(problem) -> list[dict]
|
|
extract_answer(text) -> str | None
|
|
score(pred, gold) -> bool
|
|
|
|
Concurrency is fixed at 1 per system for quality runs. Mixing concurrent
|
|
requests with quality scoring is fine (deterministic temperature=0) but the
|
|
extra moving parts aren't worth it for the first iteration.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import statistics
|
|
import time
|
|
from dataclasses import asdict, dataclass
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .client import chat_stream
|
|
from .config import BenchConfig, SystemEndpoint
|
|
from .tasks import aime, gsm8k
|
|
|
|
TASKS = {
|
|
"aime2025": (aime, "quality_max_tokens_aime"),
|
|
"gsm8k": (gsm8k, "quality_max_tokens_gsm8k"),
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class QualityRow:
|
|
system: str
|
|
task: str
|
|
n_total: int
|
|
n_correct: int
|
|
n_errors: int
|
|
accuracy: float
|
|
mean_completion_tokens: float
|
|
mean_ttft_ms: float
|
|
mean_tpot_ms: float
|
|
wall_s: float
|
|
|
|
|
|
@dataclass
|
|
class QualityCase:
|
|
system: str
|
|
task: str
|
|
problem_id: str
|
|
gold: str
|
|
pred: str | None
|
|
correct: bool
|
|
completion_tokens: int
|
|
ttft_ms: float
|
|
tpot_ms: float
|
|
e2e_s: float
|
|
error: str | None
|
|
response_preview: str
|
|
|
|
|
|
async def _run_one_task(
|
|
ep: SystemEndpoint, task_name: str, task_mod, max_tokens: int, cfg: BenchConfig,
|
|
) -> tuple[QualityRow, list[QualityCase]]:
|
|
problems = task_mod.load()
|
|
if cfg.quality_limit is not None:
|
|
problems = problems[: cfg.quality_limit]
|
|
print(f"[quality] {ep.name} / {task_name}: {len(problems)} problems "
|
|
f"(max_tokens={max_tokens})")
|
|
|
|
cases: list[QualityCase] = []
|
|
t_wall = time.perf_counter()
|
|
async with httpx.AsyncClient(timeout=cfg.request_timeout_s) as client:
|
|
for prob in problems:
|
|
messages = task_mod.make_messages(prob["problem"])
|
|
r = await chat_stream(
|
|
client, ep.base_url, ep.model_id, messages,
|
|
max_tokens=max_tokens,
|
|
temperature=cfg.quality_temperature,
|
|
api_key=ep.api_key,
|
|
timeout=cfg.request_timeout_s,
|
|
extra_body=ep.extra_body,
|
|
)
|
|
pred = task_mod.extract_answer(r.text) if r.error is None else None
|
|
correct = task_mod.score(pred, prob["answer"]) if r.error is None else False
|
|
cases.append(QualityCase(
|
|
system=ep.name, task=task_name,
|
|
problem_id=prob["id"], gold=prob["answer"], pred=pred,
|
|
correct=correct, completion_tokens=r.completion_tokens,
|
|
ttft_ms=r.ttft_s * 1000 if r.ttft_s > 0 else -1.0,
|
|
tpot_ms=r.tpot_s * 1000 if r.tpot_s > 0 else -1.0,
|
|
e2e_s=r.e2e_s, error=r.error,
|
|
response_preview=(r.text or "")[:240].replace("\n", " "),
|
|
))
|
|
mark = "✓" if correct else ("E" if r.error else "✗")
|
|
print(f" [{mark}] {prob['id']:>4s} gold={prob['answer']:>6s} "
|
|
f"pred={str(pred):>6s} tok={r.completion_tokens:5d} "
|
|
f"{r.e2e_s:6.1f}s")
|
|
wall = time.perf_counter() - t_wall
|
|
|
|
ok = [c for c in cases if c.error is None]
|
|
correct = sum(1 for c in cases if c.correct)
|
|
errors = sum(1 for c in cases if c.error)
|
|
row = QualityRow(
|
|
system=ep.name,
|
|
task=task_name,
|
|
n_total=len(cases),
|
|
n_correct=correct,
|
|
n_errors=errors,
|
|
accuracy=correct / max(len(cases) - errors, 1),
|
|
mean_completion_tokens=statistics.mean(c.completion_tokens for c in ok) if ok else 0.0,
|
|
mean_ttft_ms=statistics.mean(c.ttft_ms for c in ok if c.ttft_ms > 0) if ok else -1.0,
|
|
mean_tpot_ms=statistics.mean(c.tpot_ms for c in ok if c.tpot_ms > 0) if ok else -1.0,
|
|
wall_s=wall,
|
|
)
|
|
return row, cases
|
|
|
|
|
|
def run_quality(
|
|
endpoints: list[SystemEndpoint], cfg: BenchConfig, tasks: list[str],
|
|
) -> tuple[list[QualityRow], list[QualityCase]]:
|
|
all_rows: list[QualityRow] = []
|
|
all_cases: list[QualityCase] = []
|
|
for ep in endpoints:
|
|
print(f"[quality] === {ep.name} ===")
|
|
for task_name in tasks:
|
|
if task_name not in TASKS:
|
|
raise ValueError(f"unknown task: {task_name}")
|
|
task_mod, max_tok_attr = TASKS[task_name]
|
|
row, cases = asyncio.run(_run_one_task(
|
|
ep, task_name, task_mod, getattr(cfg, max_tok_attr), cfg,
|
|
))
|
|
all_rows.append(row)
|
|
all_cases.extend(cases)
|
|
print(f" -> {row.task}: {row.n_correct}/{row.n_total} = "
|
|
f"{row.accuracy * 100:.1f}% ({row.wall_s:.1f}s wall)")
|
|
return all_rows, all_cases
|
|
|
|
|
|
def rows_to_dicts(rows: list[QualityRow]) -> list[dict[str, Any]]:
|
|
return [asdict(r) for r in rows]
|
|
|
|
|
|
def cases_to_dicts(cases: list[QualityCase]) -> list[dict[str, Any]]:
|
|
return [asdict(c) for c in cases]
|