From 49c7653222e42960dd36860ffc313436166958de Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Thu, 28 May 2026 11:18:52 +0800 Subject: [PATCH] tools: add llama.cpp comparison baseline + standard benchmark suite Vendor llama.cpp as a submodule pinned to b9371 and add a one-click benchmark driver that compares xserv against it on identical workloads: - setup-llama-cpp.sh: network-optional CUDA build (SM120); convert-to-gguf.sh converts the same safetensors to BF16 GGUF for an apples-to-apples baseline. - tools/bench/: black-box OpenAI-API driver measuring TTFT/TPOT/throughput (single-stream + concurrent) and response quality on AIME 2025 + GSM8K. - fetch_datasets.py pulls datasets to local JSON (GPU host has no network); task loaders prefer the local JSON. - sync-and-build.sh: `bench` subcommand transfers source + datasets to the GPU host via tar-over-ssh (no rsync there), builds, and runs the suite. Co-Authored-By: Claude Opus 4.7 --- .gitignore | 12 ++ .gitmodules | 3 + docs/16-llama-cpp-comparison.md | 153 ++++++++++++++++++++++++ third_party/llama.cpp | 1 + tools/bench/__init__.py | 0 tools/bench/client.py | 154 ++++++++++++++++++++++++ tools/bench/config.py | 51 ++++++++ tools/bench/fetch_datasets.py | 40 +++++++ tools/bench/quality.py | 146 +++++++++++++++++++++++ tools/bench/report.py | 122 +++++++++++++++++++ tools/bench/requirements.txt | 2 + tools/bench/runner.py | 202 ++++++++++++++++++++++++++++++++ tools/bench/servers.py | 145 +++++++++++++++++++++++ tools/bench/speed.py | 169 ++++++++++++++++++++++++++ tools/bench/tasks/__init__.py | 46 ++++++++ tools/bench/tasks/aime.py | 114 ++++++++++++++++++ tools/bench/tasks/gsm8k.py | 90 ++++++++++++++ tools/convert-to-gguf.sh | 55 +++++++++ tools/setup-llama-cpp.sh | 94 +++++++++++++++ tools/sync-and-build.sh | 105 ++++++++++++++--- 20 files changed, 1690 insertions(+), 14 deletions(-) create mode 100644 .gitmodules create mode 100644 docs/16-llama-cpp-comparison.md create mode 160000 third_party/llama.cpp create mode 100644 tools/bench/__init__.py create mode 100644 tools/bench/client.py create mode 100644 tools/bench/config.py create mode 100644 tools/bench/fetch_datasets.py create mode 100644 tools/bench/quality.py create mode 100644 tools/bench/report.py create mode 100644 tools/bench/requirements.txt create mode 100644 tools/bench/runner.py create mode 100644 tools/bench/servers.py create mode 100644 tools/bench/speed.py create mode 100644 tools/bench/tasks/__init__.py create mode 100644 tools/bench/tasks/aime.py create mode 100644 tools/bench/tasks/gsm8k.py create mode 100755 tools/convert-to-gguf.sh create mode 100755 tools/setup-llama-cpp.sh diff --git a/.gitignore b/.gitignore index 47e1d95..6866abf 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,15 @@ **/*.rs.bk .env *.npy + +# llama.cpp baseline (cloned/submoduled by tools/setup-llama-cpp.sh) +/third_party/llama.cpp/build/ +/third_party/llama.cpp/models/ +*.gguf + +# Benchmark output + fetched datasets (transferred to GPU host, not committed) +/bench-out/ +/tools/bench/data/ +/tools/bench/__pycache__/ +/tools/bench/**/__pycache__/ + diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..f919647 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "third_party/llama.cpp"] + path = third_party/llama.cpp + url = https://github.com/ggerganov/llama.cpp diff --git a/docs/16-llama-cpp-comparison.md b/docs/16-llama-cpp-comparison.md new file mode 100644 index 0000000..406053a --- /dev/null +++ b/docs/16-llama-cpp-comparison.md @@ -0,0 +1,153 @@ +# Phase 16: llama.cpp Comparison Baseline + +> **Goal.** Replace HF transformers with **llama.cpp** as the standing +> performance baseline, and add a standard quality (response correctness) +> benchmark suite (AIME 2025, GSM8K). Provide a one-click entrypoint that runs +> both systems under identical workloads and emits a side-by-side report. + +## Motivation + +xserv has cleared 140% of HF transformers throughput on Qwen3-8B (Phase 15). +HF is no longer a useful performance bar — it's a *correctness* baseline. + +**llama.cpp** is the right next bar because: +- It's a serious C++/CUDA inference engine with active optimization +- Same OpenAI-compatible API → black-box, fair comparison +- Same GGUF↔safetensors weight source (we convert BF16, no quantization shortcuts) +- Used widely as a reference point in the community + +We also need **quality benchmarks** so that performance improvements don't +silently regress model quality (numerical precision, sampling, prompt +formatting). AIME and GSM8K are the cheapest credible signals. + +## Architecture + +``` +xserv/ +├── third_party/llama.cpp/ # cloned by setup-llama-cpp.sh +│ └── build/bin/llama-server # CUDA build (SM120) +├── tools/ +│ ├── setup-llama-cpp.sh # clone + cmake build (idempotent) +│ ├── convert-to-gguf.sh # safetensors → BF16 GGUF (same weights) +│ ├── sync-and-build.sh # extended with `bench` subcommand +│ └── bench/ # Python benchmark driver +│ ├── runner.py # entrypoint +│ ├── servers.py # subprocess lifecycle (start/stop both) +│ ├── client.py # OpenAI streaming client + TTFT/TPOT +│ ├── speed.py # speed suite +│ ├── quality.py # quality suite +│ ├── tasks/{aime,gsm8k}.py # dataset loaders + scorers +│ ├── report.py # markdown + json output +│ └── requirements.txt # httpx, datasets +└── bench-out/ # report artifacts (gitignored) + ├── comparison-.md + ├── comparison-.json + └── logs/{xserv,llama_cpp}.log +``` + +Both systems are treated as **black-box HTTP servers** speaking the OpenAI +streaming chat API. No in-process integration, no shared Python bindings. This +keeps the comparison fair (same protocol, same prompt-template path) and +isolates the test harness from internal API churn on either side. + +## Workflow + +``` +local repo dash5 (GPU host) +────────── ──────────────── +tools/sync-and-build.sh bench → rsync project (excl. target, third_party, bench-out) + → setup-llama-cpp.sh (no-op if built) + → convert-to-gguf.sh (no-op if .gguf exists) + → cargo build --release + → python3 -m tools.bench.runner ... + → bench-out/comparison-.md +tools/sync-and-build.sh fetch-bench-out ← rsync bench-out back +``` + +## What gets measured + +### Speed (TTFT / TPOT / throughput) + +- **Single-stream**, three prompt lengths (short / medium / long), `cfg.speed_prompts` repeats each + - `TTFT p50/p95`, `TPOT p50/p95`, per-request throughput +- **Concurrent**, fixed medium prompt, sweep `concurrency ∈ {1, 2, 4, 8}` + - Aggregate `tok/s`, `TTFT p95`, error count +- Both at `temperature=0`, `max_tokens=128` by default. + +### Quality (response correctness) + +| Task | N | Source | Scoring | Why | +|---|---|---|---|---| +| AIME 2025 | 30 | `MathArena/aime_2025` (HF) | exact-match boxed integer (0..999) | reasoning + math, hard signal | +| GSM8K | 1319 | `openai/gsm8k` (HF), `test` split | exact-match `\boxed{n}` or last number | broad sanity, decimals allowed | + +Same `temperature=0` sampling across both systems. Max tokens: 16384 for AIME +(reasoning long), 2048 for GSM8K. Subsample with `--quality-limit N` for smoke. + +### Report + +`bench-out/comparison-.md` contains: +- Environment (GPU, driver, xserv commit, python) +- Speed table per scenario (xserv | llama.cpp | xserv÷llama.cpp speedup) +- Quality table per task (n, correct, accuracy, mean tokens, TTFT, TPOT, wall) + +A sibling `.json` holds all per-request raw rows and per-problem case detail +(prediction, gold, response preview) so we can diff regressions in CI later. + +## Running it + +**Full sweep on dash5 (recommended):** +```bash +./tools/sync-and-build.sh bench +./tools/sync-and-build.sh fetch-bench-out +open bench-out/comparison-*.md +``` + +**Speed-only smoke (fast):** +```bash +./tools/sync-and-build.sh bench -- --suite speed --speed-prompts 2 +``` + +**Quality smoke with 5 problems each:** +```bash +./tools/sync-and-build.sh bench -- --suite quality --quality-limit 5 +``` + +**On a host that already has both servers running** (e.g. local dev with two +shells open): +```bash +python3 -m tools.bench.runner \ + --xserv-base-url http://127.0.0.1:8080 \ + --llama-base-url http://127.0.0.1:8081 \ + --suite all +``` + +## Design choices + +1. **Black-box HTTP, not FFI.** Both engines bind the same OpenAI surface and + real serving traffic uses HTTP. Anything that doesn't show up over the wire + doesn't matter for serving. +2. **Same BF16 weights.** We convert the same safetensors with llama.cpp's + `convert_hf_to_gguf.py --outtype bf16`. No quantization at this stage; if we + want a quant comparison later we'll add a separate column, not replace this + one. +3. **Streaming everywhere.** TTFT and TPOT only make sense with streaming. We + ask both servers for `stream=true` with `include_usage` so we can read + server-reported token counts when available. +4. **Idempotent setup.** `setup-llama-cpp.sh` and `convert-to-gguf.sh` are + safe to re-run — they no-op when the build / file already exists. The + `bench` subcommand wires them so the first run does a full setup and + subsequent runs are fast. +5. **Subprocess lifecycle owned by the driver.** We spawn each server in its + own process group and SIGTERM the group on exit so half-dead llama-server + children don't survive. If the user is already running a server somewhere, + pass `--xserv-base-url` / `--llama-base-url` to skip launch. + +## Future extensions + +- Add quant runs (Q8_0, Q4_K_M) as separate "system" columns +- Wire to GitHub Actions for nightly regression +- Track results across commits to flag regressions (per-commit JSON in + `docs/benchmarks/history/`) +- Add MMLU-Pro / HumanEval when budget allows +- Long-context benchmark (8K, 32K prompts) to compare prefill scaling diff --git a/third_party/llama.cpp b/third_party/llama.cpp new file mode 160000 index 0000000..f12cc6d --- /dev/null +++ b/third_party/llama.cpp @@ -0,0 +1 @@ +Subproject commit f12cc6d0fa96d6a3c33952f06b7439ac43a3c3fe diff --git a/tools/bench/__init__.py b/tools/bench/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/bench/client.py b/tools/bench/client.py new file mode 100644 index 0000000..977c32e --- /dev/null +++ b/tools/bench/client.py @@ -0,0 +1,154 @@ +"""HTTP client for OpenAI-compatible /v1/chat/completions. + +Records per-request: TTFT (time to first content token), TPOT (mean +inter-token latency over the decode phase), and end-to-end throughput. + +We don't care about parsing exact OpenAI envelope semantics, just enough to +get the deltas + finish_reason + token counts. +""" + +from __future__ import annotations + +import asyncio +import json +import time +from dataclasses import dataclass, field +from typing import Any + +import httpx + + +@dataclass +class StreamResult: + text: str = "" + completion_tokens: int = 0 + prompt_tokens: int = 0 + finish_reason: str | None = None + # Timings (seconds; -1 means not measured) + ttft_s: float = -1.0 + e2e_s: float = -1.0 + chunk_times: list[float] = field(default_factory=list) # absolute monotonic times of content chunks + error: str | None = None + + @property + def tpot_s(self) -> float: + """Mean inter-content-chunk latency after the first chunk (seconds/token).""" + if len(self.chunk_times) < 2: + return -1.0 + deltas = [self.chunk_times[i] - self.chunk_times[i - 1] for i in range(1, len(self.chunk_times))] + return sum(deltas) / len(deltas) + + @property + def throughput_tok_s(self) -> float: + if self.e2e_s <= 0 or self.completion_tokens <= 0: + return -1.0 + return self.completion_tokens / self.e2e_s + + +async def chat_stream( + client: httpx.AsyncClient, + base_url: str, + model: str, + messages: list[dict[str, str]], + *, + max_tokens: int, + temperature: float = 0.0, + api_key: str | None = None, + timeout: float = 1800.0, +) -> StreamResult: + payload: dict[str, Any] = { + "model": model, + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature, + "stream": True, + } + # llama-server returns usage in the final stream chunk when this is set; + # xserv ignores unknown fields, so this is harmless there. + payload["stream_options"] = {"include_usage": True} + + headers = {"Content-Type": "application/json"} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + url = base_url.rstrip("/") + "/v1/chat/completions" + res = StreamResult() + t_start = time.perf_counter() + + try: + async with client.stream( + "POST", url, json=payload, headers=headers, timeout=timeout, + ) as resp: + if resp.status_code != 200: + body = await resp.aread() + res.error = f"HTTP {resp.status_code}: {body.decode(errors='replace')[:400]}" + res.e2e_s = time.perf_counter() - t_start + return res + + async for line in resp.aiter_lines(): + if not line or not line.startswith("data:"): + continue + data = line[len("data:"):].strip() + if data == "[DONE]": + break + try: + chunk = json.loads(data) + except json.JSONDecodeError: + continue + + if "usage" in chunk and chunk["usage"]: + usage = chunk["usage"] + res.prompt_tokens = usage.get("prompt_tokens", res.prompt_tokens) + res.completion_tokens = usage.get("completion_tokens", res.completion_tokens) + + choices = chunk.get("choices") or [] + if not choices: + continue + choice = choices[0] + delta = choice.get("delta") or {} + content = delta.get("content") + if content: + now = time.perf_counter() + if res.ttft_s < 0: + res.ttft_s = now - t_start + res.chunk_times.append(now) + res.text += content + if choice.get("finish_reason"): + res.finish_reason = choice["finish_reason"] + except Exception as e: # noqa: BLE001 — surface any failure to the report + res.error = f"{type(e).__name__}: {e}" + + res.e2e_s = time.perf_counter() - t_start + # Fall back to chunk count when server doesn't report usage (xserv stream path). + if res.completion_tokens == 0: + res.completion_tokens = len(res.chunk_times) + return res + + +async def chat_concurrent( + base_url: str, + model: str, + prompts: list[list[dict[str, str]]], + *, + max_tokens: int, + temperature: float = 0.0, + api_key: str | None = None, + timeout: float = 1800.0, + concurrency: int, +) -> tuple[list[StreamResult], float]: + """Fire `concurrency` requests in parallel waves. Returns per-request results + plus wall-clock elapsed time of the entire batch.""" + sem = asyncio.Semaphore(concurrency) + limits = httpx.Limits(max_connections=concurrency * 2, max_keepalive_connections=concurrency) + async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: + async def one(messages: list[dict[str, str]]) -> StreamResult: + async with sem: + return await chat_stream( + client, base_url, model, messages, + max_tokens=max_tokens, temperature=temperature, + api_key=api_key, timeout=timeout, + ) + t0 = time.perf_counter() + results = await asyncio.gather(*(one(p) for p in prompts)) + wall = time.perf_counter() - t0 + return results, wall diff --git a/tools/bench/config.py b/tools/bench/config.py new file mode 100644 index 0000000..7309231 --- /dev/null +++ b/tools/bench/config.py @@ -0,0 +1,51 @@ +"""Defaults + CLI argument shapes for the benchmark driver. + +All paths default to the dash5 layout (/opt/wjh/...) because that's where the +GPU lives — see docs/16-llama-cpp-comparison.md. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field + + +# Names used in reports and as logical keys throughout the driver. +SYSTEM_XSERV = "xserv" +SYSTEM_LLAMA_CPP = "llama.cpp" +DEFAULT_SYSTEMS = (SYSTEM_XSERV, SYSTEM_LLAMA_CPP) + + +@dataclass +class SystemEndpoint: + """How to reach (or how to start) one of the systems under test.""" + + name: str + base_url: str # http://host:port (OpenAI-compatible root, no /v1) + model_id: str # what to put in the request body's "model" field + api_key: str | None = None # llama-server doesn't need one; xserv ignores it + # Process supervision is optional — if base_url is already serving, we skip launch. + launch_cmd: list[str] | None = None + launch_env: dict[str, str] = field(default_factory=dict) + launch_cwd: str | None = None + health_path: str = "/health" + ready_timeout_s: float = 600.0 # cold loads of 8B BF16 take a while + + +@dataclass +class BenchConfig: + out_dir: str = "bench-out" + # Speed suite + speed_prompts: int = 8 # synthetic prompts per length bucket + speed_max_tokens: int = 128 + speed_concurrency: tuple[int, ...] = (1, 2, 4, 8) + # Quality suite + quality_max_tokens_aime: int = 16384 + quality_max_tokens_gsm8k: int = 2048 + quality_limit: int | None = None # subsample for smoke tests; None = all + quality_temperature: float = 0.0 + request_timeout_s: float = 1800.0 + + +def env_default(key: str, fallback: str) -> str: + return os.environ.get(key, fallback) diff --git a/tools/bench/fetch_datasets.py b/tools/bench/fetch_datasets.py new file mode 100644 index 0000000..783d839 --- /dev/null +++ b/tools/bench/fetch_datasets.py @@ -0,0 +1,40 @@ +"""Pre-fetch quality-benchmark datasets into local JSON. + +Run this on a machine WITH network (e.g. your laptop). The resulting +tools/bench/data/*.json files are then shipped to the GPU host (which has no +network) by the bench sync step. + +Usage: + python3 -m tools.bench.fetch_datasets # all tasks + python3 -m tools.bench.fetch_datasets aime2025 # one task +""" + +from __future__ import annotations + +import os +import sys + +if __package__ in (None, ""): + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from tools.bench.tasks import aime, gsm8k, save_local + +FETCHERS = { + "aime2025": aime.load_remote, + "gsm8k": gsm8k.load_remote, +} + + +def main() -> None: + wanted = sys.argv[1:] or list(FETCHERS) + for name in wanted: + if name not in FETCHERS: + raise SystemExit(f"unknown task: {name} (have: {', '.join(FETCHERS)})") + print(f"[fetch] {name} ...") + records = FETCHERS[name]() + path = save_local(name, records) + print(f"[fetch] {name}: {len(records)} records -> {path}") + + +if __name__ == "__main__": + main() diff --git a/tools/bench/quality.py b/tools/bench/quality.py new file mode 100644 index 0000000..082e3de --- /dev/null +++ b/tools/bench/quality.py @@ -0,0 +1,146 @@ +"""Quality suite — run dataset tasks against each system, score, report. + +Each task module exposes the same surface: + load() -> list[{id, problem, answer, source}] + make_messages(problem) -> list[dict] + extract_answer(text) -> str | None + score(pred, gold) -> bool + +Concurrency is fixed at 1 per system for quality runs. Mixing concurrent +requests with quality scoring is fine (deterministic temperature=0) but the +extra moving parts aren't worth it for the first iteration. +""" + +from __future__ import annotations + +import asyncio +import statistics +import time +from dataclasses import asdict, dataclass +from typing import Any + +import httpx + +from .client import chat_stream +from .config import BenchConfig, SystemEndpoint +from .tasks import aime, gsm8k + +TASKS = { + "aime2025": (aime, "quality_max_tokens_aime"), + "gsm8k": (gsm8k, "quality_max_tokens_gsm8k"), +} + + +@dataclass +class QualityRow: + system: str + task: str + n_total: int + n_correct: int + n_errors: int + accuracy: float + mean_completion_tokens: float + mean_ttft_ms: float + mean_tpot_ms: float + wall_s: float + + +@dataclass +class QualityCase: + system: str + task: str + problem_id: str + gold: str + pred: str | None + correct: bool + completion_tokens: int + ttft_ms: float + tpot_ms: float + e2e_s: float + error: str | None + response_preview: str + + +async def _run_one_task( + ep: SystemEndpoint, task_name: str, task_mod, max_tokens: int, cfg: BenchConfig, +) -> tuple[QualityRow, list[QualityCase]]: + problems = task_mod.load() + if cfg.quality_limit is not None: + problems = problems[: cfg.quality_limit] + print(f"[quality] {ep.name} / {task_name}: {len(problems)} problems " + f"(max_tokens={max_tokens})") + + cases: list[QualityCase] = [] + t_wall = time.perf_counter() + async with httpx.AsyncClient(timeout=cfg.request_timeout_s) as client: + for prob in problems: + messages = task_mod.make_messages(prob["problem"]) + r = await chat_stream( + client, ep.base_url, ep.model_id, messages, + max_tokens=max_tokens, + temperature=cfg.quality_temperature, + api_key=ep.api_key, + timeout=cfg.request_timeout_s, + ) + pred = task_mod.extract_answer(r.text) if r.error is None else None + correct = task_mod.score(pred, prob["answer"]) if r.error is None else False + cases.append(QualityCase( + system=ep.name, task=task_name, + problem_id=prob["id"], gold=prob["answer"], pred=pred, + correct=correct, completion_tokens=r.completion_tokens, + ttft_ms=r.ttft_s * 1000 if r.ttft_s > 0 else -1.0, + tpot_ms=r.tpot_s * 1000 if r.tpot_s > 0 else -1.0, + e2e_s=r.e2e_s, error=r.error, + response_preview=(r.text or "")[:240].replace("\n", " "), + )) + mark = "✓" if correct else ("E" if r.error else "✗") + print(f" [{mark}] {prob['id']:>4s} gold={prob['answer']:>6s} " + f"pred={str(pred):>6s} tok={r.completion_tokens:5d} " + f"{r.e2e_s:6.1f}s") + wall = time.perf_counter() - t_wall + + ok = [c for c in cases if c.error is None] + correct = sum(1 for c in cases if c.correct) + errors = sum(1 for c in cases if c.error) + row = QualityRow( + system=ep.name, + task=task_name, + n_total=len(cases), + n_correct=correct, + n_errors=errors, + accuracy=correct / max(len(cases) - errors, 1), + mean_completion_tokens=statistics.mean(c.completion_tokens for c in ok) if ok else 0.0, + mean_ttft_ms=statistics.mean(c.ttft_ms for c in ok if c.ttft_ms > 0) if ok else -1.0, + mean_tpot_ms=statistics.mean(c.tpot_ms for c in ok if c.tpot_ms > 0) if ok else -1.0, + wall_s=wall, + ) + return row, cases + + +def run_quality( + endpoints: list[SystemEndpoint], cfg: BenchConfig, tasks: list[str], +) -> tuple[list[QualityRow], list[QualityCase]]: + all_rows: list[QualityRow] = [] + all_cases: list[QualityCase] = [] + for ep in endpoints: + print(f"[quality] === {ep.name} ===") + for task_name in tasks: + if task_name not in TASKS: + raise ValueError(f"unknown task: {task_name}") + task_mod, max_tok_attr = TASKS[task_name] + row, cases = asyncio.run(_run_one_task( + ep, task_name, task_mod, getattr(cfg, max_tok_attr), cfg, + )) + all_rows.append(row) + all_cases.extend(cases) + print(f" -> {row.task}: {row.n_correct}/{row.n_total} = " + f"{row.accuracy * 100:.1f}% ({row.wall_s:.1f}s wall)") + return all_rows, all_cases + + +def rows_to_dicts(rows: list[QualityRow]) -> list[dict[str, Any]]: + return [asdict(r) for r in rows] + + +def cases_to_dicts(cases: list[QualityCase]) -> list[dict[str, Any]]: + return [asdict(c) for c in cases] diff --git a/tools/bench/report.py b/tools/bench/report.py new file mode 100644 index 0000000..75e2f8b --- /dev/null +++ b/tools/bench/report.py @@ -0,0 +1,122 @@ +"""Combined speed + quality report (markdown + json side-cars).""" + +from __future__ import annotations + +import datetime as dt +import json +import os +from typing import Any + +from .config import DEFAULT_SYSTEMS + + +def _fmt(x: float, nd: int = 1) -> str: + if x is None or x < 0: + return "—" + return f"{x:.{nd}f}" + + +def _speed_table(rows: list[dict[str, Any]]) -> str: + if not rows: + return "_(no speed results)_\n" + + # scenarios in stable order + scenarios: list[str] = [] + for r in rows: + if r["scenario"] not in scenarios: + scenarios.append(r["scenario"]) + systems: list[str] = [] + for r in rows: + if r["system"] not in systems: + systems.append(r["system"]) + + by = {(r["system"], r["scenario"]): r for r in rows} + out = [] + out.append("| scenario | metric | " + " | ".join(systems) + " | speedup (xserv ÷ llama.cpp) |") + out.append("|---|---|" + "|".join(["---"] * (len(systems) + 1)) + "|") + + metrics = [ + ("ttft_ms_p50", "TTFT p50 (ms)", "lower"), + ("ttft_ms_p95", "TTFT p95 (ms)", "lower"), + ("tpot_ms_p50", "TPOT p50 (ms/tok)", "lower"), + ("throughput_tok_s", "Throughput (tok/s)", "higher"), + ] + for sc in scenarios: + for key, label, direction in metrics: + cells = [] + vals = {} + for s in systems: + row = by.get((s, sc)) + v = row[key] if row else -1.0 + vals[s] = v + cells.append(_fmt(v, 2 if "tpot" in key else 1)) + x = vals.get("xserv", -1.0) + l = vals.get("llama.cpp", -1.0) + if x > 0 and l > 0: + ratio = (x / l) if direction == "higher" else (l / x) + cells.append(f"{ratio:.2f}×") + else: + cells.append("—") + out.append(f"| {sc} | {label} | " + " | ".join(cells) + " |") + return "\n".join(out) + "\n" + + +def _quality_table(rows: list[dict[str, Any]]) -> str: + if not rows: + return "_(no quality results)_\n" + by_task: dict[str, list[dict[str, Any]]] = {} + for r in rows: + by_task.setdefault(r["task"], []).append(r) + out: list[str] = [] + out.append("| task | system | n | correct | accuracy | mean tokens | TTFT (ms) | TPOT (ms/tok) | wall (s) |") + out.append("|---|---|---|---|---|---|---|---|---|") + for task, task_rows in by_task.items(): + for r in task_rows: + out.append( + f"| {task} | {r['system']} | {r['n_total']} | {r['n_correct']} | " + f"{r['accuracy'] * 100:.1f}% | {r['mean_completion_tokens']:.0f} | " + f"{_fmt(r['mean_ttft_ms'])} | {_fmt(r['mean_tpot_ms'], 2)} | {r['wall_s']:.1f} |" + ) + return "\n".join(out) + "\n" + + +def write_report( + out_dir: str, + speed_rows: list[dict[str, Any]], + speed_raw: list[dict[str, Any]], + quality_rows: list[dict[str, Any]], + quality_cases: list[dict[str, Any]], + env: dict[str, Any], +) -> str: + os.makedirs(out_dir, exist_ok=True) + stamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S") + md_path = os.path.join(out_dir, f"comparison-{stamp}.md") + json_path = os.path.join(out_dir, f"comparison-{stamp}.json") + + with open(json_path, "w") as f: + json.dump({ + "stamp": stamp, + "env": env, + "speed": {"summary": speed_rows, "raw": speed_raw}, + "quality": {"summary": quality_rows, "cases": quality_cases}, + }, f, indent=2) + + lines: list[str] = [] + lines.append(f"# xserv vs llama.cpp — comparison\n") + lines.append(f"_Generated: {stamp}_\n") + lines.append("## Environment\n") + for k, v in env.items(): + lines.append(f"- **{k}**: {v}") + lines.append("") + lines.append("## Speed\n") + lines.append(_speed_table(speed_rows)) + lines.append("\n## Quality\n") + lines.append(_quality_table(quality_rows)) + lines.append(f"\n_Raw results: `{os.path.basename(json_path)}`_\n") + + with open(md_path, "w") as f: + f.write("\n".join(lines)) + + print(f"\n[report] wrote {md_path}") + print(f"[report] wrote {json_path}") + return md_path diff --git a/tools/bench/requirements.txt b/tools/bench/requirements.txt new file mode 100644 index 0000000..75e1dce --- /dev/null +++ b/tools/bench/requirements.txt @@ -0,0 +1,2 @@ +httpx>=0.27 +datasets>=2.20 diff --git a/tools/bench/runner.py b/tools/bench/runner.py new file mode 100644 index 0000000..44e0b17 --- /dev/null +++ b/tools/bench/runner.py @@ -0,0 +1,202 @@ +"""One-click entrypoint: spin up both servers, run suites, write report. + +Usage examples: + + # Full sweep against both systems + python3 -m tools.bench.runner \ + --xserv-bin ./target/release/xserv-server \ + --xserv-model /opt/wjh/models/qwen3-8b \ + --llama-bin third_party/llama.cpp/build/bin/llama-server \ + --llama-gguf /opt/wjh/models/qwen3-8b/qwen3-8b-bf16.gguf \ + --suite all + + # Speed-only smoke test + python3 -m tools.bench.runner ... --suite speed + + # Quality with 5-problem subsample + python3 -m tools.bench.runner ... --suite quality --quality-limit 5 +""" + +from __future__ import annotations + +import argparse +import os +import platform +import subprocess +import sys +from contextlib import ExitStack +from typing import Any + +# Allow running as `python3 tools/bench/runner.py` from repo root. +if __package__ in (None, ""): + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from tools.bench.config import ( + BenchConfig, SystemEndpoint, SYSTEM_XSERV, SYSTEM_LLAMA_CPP, +) +from tools.bench.servers import ( + ServerHandle, start_server, stop_server, + xserv_launch_cmd, llama_cpp_launch_cmd, +) +from tools.bench.speed import run_speed, rows_to_dicts as speed_rows_to_dicts +from tools.bench.quality import ( + run_quality, rows_to_dicts as q_rows_to_dicts, cases_to_dicts, +) +from tools.bench.report import write_report + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="xserv vs llama.cpp benchmark suite") + # Targets + p.add_argument("--xserv-bin", default="./target/release/xserv-server") + p.add_argument("--xserv-model", required=False, + help="HF model directory for xserv-server (defaults to $XSERV_MODEL_DIR)") + p.add_argument("--xserv-port", type=int, default=18080) + p.add_argument("--xserv-base-url", default=None, + help="If set, skip launching xserv and target this URL.") + p.add_argument("--xserv-model-id", default="qwen3-8b") + + p.add_argument("--llama-bin", default="third_party/llama.cpp/build/bin/llama-server") + p.add_argument("--llama-gguf", required=False, + help="GGUF model for llama-server (defaults to $LLAMA_GGUF)") + p.add_argument("--llama-port", type=int, default=18081) + p.add_argument("--llama-base-url", default=None, + help="If set, skip launching llama-server and target this URL.") + p.add_argument("--llama-model-id", default="qwen3-8b", + help="String to send in OpenAI 'model' field; llama-server is permissive.") + + # Shared + p.add_argument("--max-batch", type=int, default=4) + p.add_argument("--max-seq-len", type=int, default=8192) + p.add_argument("--systems", default="xserv,llama.cpp", + help="Comma-separated subset to run, e.g. 'xserv' to skip llama.cpp") + + # Suites + p.add_argument("--suite", choices=["speed", "quality", "all"], default="all") + p.add_argument("--quality-tasks", default="aime2025,gsm8k") + p.add_argument("--quality-limit", type=int, default=None, + help="Cap problems per task (smoke test). None = all problems.") + p.add_argument("--speed-prompts", type=int, default=8) + p.add_argument("--speed-max-tokens", type=int, default=128) + p.add_argument("--speed-concurrency", default="1,2,4,8") + + p.add_argument("--out-dir", default="bench-out") + return p.parse_args() + + +def build_endpoints(args) -> list[SystemEndpoint]: + wanted = set(s.strip() for s in args.systems.split(",") if s.strip()) + eps: list[SystemEndpoint] = [] + + if SYSTEM_XSERV in wanted: + if args.xserv_base_url: + eps.append(SystemEndpoint( + name=SYSTEM_XSERV, base_url=args.xserv_base_url, + model_id=args.xserv_model_id, launch_cmd=None, + )) + else: + model_dir = args.xserv_model or os.environ.get("XSERV_MODEL_DIR") + if not model_dir: + raise SystemExit("--xserv-model or XSERV_MODEL_DIR required (or pass --xserv-base-url)") + eps.append(SystemEndpoint( + name=SYSTEM_XSERV, + base_url=f"http://127.0.0.1:{args.xserv_port}", + model_id=args.xserv_model_id, + launch_cmd=xserv_launch_cmd( + args.xserv_bin, model_dir, args.xserv_port, + max_batch=args.max_batch, max_seq_len=args.max_seq_len, + ), + health_path="/health", + ready_timeout_s=900.0, + )) + + if SYSTEM_LLAMA_CPP in wanted: + if args.llama_base_url: + eps.append(SystemEndpoint( + name=SYSTEM_LLAMA_CPP, base_url=args.llama_base_url, + model_id=args.llama_model_id, launch_cmd=None, + )) + else: + gguf = args.llama_gguf or os.environ.get("LLAMA_GGUF") + if not gguf: + raise SystemExit("--llama-gguf or LLAMA_GGUF required (or pass --llama-base-url)") + eps.append(SystemEndpoint( + name=SYSTEM_LLAMA_CPP, + base_url=f"http://127.0.0.1:{args.llama_port}", + model_id=args.llama_model_id, + launch_cmd=llama_cpp_launch_cmd( + args.llama_bin, gguf, args.llama_port, + n_parallel=args.max_batch, ctx_size=args.max_seq_len, + ), + # llama-server's health endpoint also returns 200 only when model is loaded. + health_path="/health", + ready_timeout_s=900.0, + )) + return eps + + +def collect_env() -> dict[str, Any]: + env: dict[str, Any] = { + "platform": platform.platform(), + "python": sys.version.split()[0], + } + for cmd, key in [ + (["nvidia-smi", "--query-gpu=name,driver_version,memory.total", "--format=csv,noheader"], "gpu"), + (["git", "rev-parse", "HEAD"], "xserv_commit"), + ]: + try: + out = subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL, timeout=5).strip() + env[key] = out.splitlines()[0] if out else "?" + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + env[key] = "?" + return env + + +def main() -> None: + args = parse_args() + endpoints = build_endpoints(args) + if not endpoints: + raise SystemExit("no systems selected (check --systems)") + + cfg = BenchConfig( + out_dir=args.out_dir, + speed_prompts=args.speed_prompts, + speed_max_tokens=args.speed_max_tokens, + speed_concurrency=tuple(int(c) for c in args.speed_concurrency.split(",") if c.strip()), + quality_limit=args.quality_limit, + ) + + os.makedirs(args.out_dir, exist_ok=True) + log_dir = os.path.join(args.out_dir, "logs") + + handles: list[ServerHandle] = [] + speed_rows: list[Any] = [] + speed_raw: list[dict[str, Any]] = [] + quality_rows: list[Any] = [] + quality_cases: list[Any] = [] + + with ExitStack() as stack: + for ep in endpoints: + h = start_server(ep, log_dir) + handles.append(h) + stack.callback(stop_server, h) + + if args.suite in ("speed", "all"): + speed_rows, speed_raw = run_speed(endpoints, cfg) + + if args.suite in ("quality", "all"): + tasks = [t.strip() for t in args.quality_tasks.split(",") if t.strip()] + quality_rows, quality_cases = run_quality(endpoints, cfg, tasks) + + write_report( + out_dir=args.out_dir, + speed_rows=speed_rows_to_dicts(speed_rows) if speed_rows else [], + speed_raw=speed_raw, + quality_rows=q_rows_to_dicts(quality_rows) if quality_rows else [], + quality_cases=cases_to_dicts(quality_cases) if quality_cases else [], + env=collect_env(), + ) + + +if __name__ == "__main__": + main() diff --git a/tools/bench/servers.py b/tools/bench/servers.py new file mode 100644 index 0000000..6961feb --- /dev/null +++ b/tools/bench/servers.py @@ -0,0 +1,145 @@ +"""Start/stop xserv-server and llama-server as subprocesses. + +The benchmark driver treats both systems as black-box HTTP servers — it does +not import their Rust/C++ code. This keeps the comparison fair (same wire +protocol, no in-process shortcut) and avoids coupling the bench harness to +internal APIs. +""" + +from __future__ import annotations + +import contextlib +import os +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from dataclasses import dataclass + +from .config import SystemEndpoint + + +@dataclass +class ServerHandle: + endpoint: SystemEndpoint + proc: subprocess.Popen[bytes] | None + log_path: str | None + + +def _wait_ready(base_url: str, health_path: str, timeout_s: float) -> bool: + url = base_url.rstrip("/") + health_path + deadline = time.monotonic() + timeout_s + last_err = "" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=5) as r: + if r.status == 200: + return True + except (urllib.error.URLError, ConnectionError, TimeoutError) as e: + last_err = repr(e) + time.sleep(1.0) + print(f"[servers] not ready after {timeout_s}s ({url}): {last_err}", file=sys.stderr) + return False + + +def start_server(ep: SystemEndpoint, log_dir: str) -> ServerHandle: + """Launch `ep.launch_cmd` if set; otherwise assume it's already running.""" + if ep.launch_cmd is None: + if _wait_ready(ep.base_url, ep.health_path, timeout_s=10.0): + print(f"[servers] reusing already-running {ep.name} at {ep.base_url}") + return ServerHandle(endpoint=ep, proc=None, log_path=None) + raise RuntimeError(f"{ep.name}: no launch_cmd and not reachable at {ep.base_url}") + + os.makedirs(log_dir, exist_ok=True) + log_path = os.path.join(log_dir, f"{ep.name.replace('.', '_')}.log") + log_f = open(log_path, "wb") + env = os.environ.copy() + env.update(ep.launch_env) + + print(f"[servers] launching {ep.name}: {' '.join(ep.launch_cmd)}") + print(f"[servers] log: {log_path}") + proc = subprocess.Popen( + ep.launch_cmd, + cwd=ep.launch_cwd, + env=env, + stdout=log_f, + stderr=subprocess.STDOUT, + # Own process group so SIGTERM kills children (llama-server in particular). + preexec_fn=os.setsid, + ) + + ok = _wait_ready(ep.base_url, ep.health_path, timeout_s=ep.ready_timeout_s) + if not ok: + # Hand back enough info so caller can drain logs before dying. + log_f.flush() + try: + os.killpg(proc.pid, signal.SIGTERM) + except ProcessLookupError: + pass + raise RuntimeError( + f"{ep.name} failed to become ready (see {log_path}). " + "Common causes: model path wrong, port already in use, OOM." + ) + + return ServerHandle(endpoint=ep, proc=proc, log_path=log_path) + + +def stop_server(h: ServerHandle, *, grace_s: float = 10.0) -> None: + if h.proc is None: + return + print(f"[servers] stopping {h.endpoint.name} (pid {h.proc.pid})") + try: + os.killpg(h.proc.pid, signal.SIGTERM) + except ProcessLookupError: + return + try: + h.proc.wait(timeout=grace_s) + except subprocess.TimeoutExpired: + print(f"[servers] {h.endpoint.name} did not exit, sending SIGKILL") + with contextlib.suppress(ProcessLookupError): + os.killpg(h.proc.pid, signal.SIGKILL) + h.proc.wait(timeout=5) + + +# ---------- launch-command builders ---------- + + +def xserv_launch_cmd( + bin_path: str, + model_dir: str, + port: int, + *, + max_batch: int, + max_seq_len: int, +) -> list[str]: + return [ + bin_path, + model_dir, + "--port", str(port), + "--max-batch", str(max_batch), + "--max-seq-len", str(max_seq_len), + ] + + +def llama_cpp_launch_cmd( + bin_path: str, + gguf_path: str, + port: int, + *, + n_parallel: int, + ctx_size: int, + n_gpu_layers: int = 99, +) -> list[str]: + return [ + bin_path, + "-m", gguf_path, + "--port", str(port), + "--host", "0.0.0.0", + "-c", str(ctx_size), + "-ngl", str(n_gpu_layers), + "--parallel", str(n_parallel), + # Be quiet by default; the log file already captures stderr. + "--log-disable", + ] diff --git a/tools/bench/speed.py b/tools/bench/speed.py new file mode 100644 index 0000000..2c57b0b --- /dev/null +++ b/tools/bench/speed.py @@ -0,0 +1,169 @@ +"""Speed suite: TTFT, TPOT, throughput; serial and concurrent. + +Single-stream and concurrent throughput are reported separately because they +stress different things — TTFT/TPOT are kernel/latency bound (single stream), +throughput at high concurrency is scheduler/batching bound. +""" + +from __future__ import annotations + +import asyncio +import statistics +from dataclasses import asdict, dataclass +from typing import Any + +from .client import StreamResult, chat_concurrent +from .config import BenchConfig, SystemEndpoint + + +# Three prompt-length buckets cover the common interesting points: +# short = greeting-style; medium = QA; long = summarize-ish (prefill-heavy). +SPEED_PROMPTS = { + "short": "What is 2 + 2?", + "medium": "Explain the difference between TCP and UDP, briefly.", + "long": ( + "Write a detailed comparison of Python and Rust for systems programming. " + "Cover memory management, performance, ergonomics, ecosystem, and typical " + "use cases. Be specific." + ), +} + + +@dataclass +class SpeedRow: + system: str + scenario: str # e.g. "single/short", "concurrent-4" + requests: int + completion_tokens_total: int + wall_s: float + ttft_ms_p50: float + ttft_ms_p95: float + tpot_ms_p50: float + tpot_ms_p95: float + throughput_tok_s: float # aggregate completion_tokens / wall + per_req_throughput_tok_s_mean: float + errors: int + + +def _percentile(values: list[float], p: float) -> float: + if not values: + return -1.0 + s = sorted(values) + idx = max(0, min(len(s) - 1, int(round((p / 100.0) * (len(s) - 1))))) + return s[idx] + + +def _summarize(system: str, scenario: str, results: list[StreamResult], wall_s: float) -> SpeedRow: + ok = [r for r in results if r.error is None] + ttft_ms = [r.ttft_s * 1000 for r in ok if r.ttft_s >= 0] + tpot_ms = [r.tpot_s * 1000 for r in ok if r.tpot_s >= 0] + per_req_tps = [r.throughput_tok_s for r in ok if r.throughput_tok_s > 0] + total_tokens = sum(r.completion_tokens for r in ok) + return SpeedRow( + system=system, + scenario=scenario, + requests=len(results), + completion_tokens_total=total_tokens, + wall_s=wall_s, + ttft_ms_p50=_percentile(ttft_ms, 50), + ttft_ms_p95=_percentile(ttft_ms, 95), + tpot_ms_p50=_percentile(tpot_ms, 50), + tpot_ms_p95=_percentile(tpot_ms, 95), + throughput_tok_s=total_tokens / wall_s if wall_s > 0 else -1.0, + per_req_throughput_tok_s_mean=statistics.mean(per_req_tps) if per_req_tps else -1.0, + errors=len(results) - len(ok), + ) + + +async def run_single_stream( + ep: SystemEndpoint, cfg: BenchConfig, +) -> tuple[list[SpeedRow], list[dict[str, Any]]]: + """One request at a time, three prompt lengths. Repeat each `cfg.speed_prompts` times.""" + rows: list[SpeedRow] = [] + raw: list[dict[str, Any]] = [] + for bucket, prompt in SPEED_PROMPTS.items(): + messages = [[{"role": "user", "content": prompt}]] * cfg.speed_prompts + results, wall = await chat_concurrent( + ep.base_url, ep.model_id, messages, + max_tokens=cfg.speed_max_tokens, + temperature=0.0, + api_key=ep.api_key, + timeout=cfg.request_timeout_s, + concurrency=1, + ) + rows.append(_summarize(ep.name, f"single/{bucket}", results, wall)) + for i, r in enumerate(results): + raw.append({ + "system": ep.name, "scenario": f"single/{bucket}", "i": i, + "ttft_s": r.ttft_s, "tpot_s": r.tpot_s, + "completion_tokens": r.completion_tokens, + "e2e_s": r.e2e_s, "error": r.error, + "finish_reason": r.finish_reason, + }) + return rows, raw + + +async def run_concurrent( + ep: SystemEndpoint, cfg: BenchConfig, +) -> tuple[list[SpeedRow], list[dict[str, Any]]]: + """Fixed medium-length prompt, sweep concurrency.""" + rows: list[SpeedRow] = [] + raw: list[dict[str, Any]] = [] + prompt = SPEED_PROMPTS["medium"] + for c in cfg.speed_concurrency: + # Send 4x concurrency requests so the scheduler sees sustained load, + # not just one wave. + n = max(c * 4, 8) + messages = [[{"role": "user", "content": prompt}]] * n + results, wall = await chat_concurrent( + ep.base_url, ep.model_id, messages, + max_tokens=cfg.speed_max_tokens, + temperature=0.0, + api_key=ep.api_key, + timeout=cfg.request_timeout_s, + concurrency=c, + ) + rows.append(_summarize(ep.name, f"concurrent-{c}", results, wall)) + for i, r in enumerate(results): + raw.append({ + "system": ep.name, "scenario": f"concurrent-{c}", "i": i, + "ttft_s": r.ttft_s, "tpot_s": r.tpot_s, + "completion_tokens": r.completion_tokens, + "e2e_s": r.e2e_s, "error": r.error, + "finish_reason": r.finish_reason, + }) + return rows, raw + + +def run_speed( + endpoints: list[SystemEndpoint], cfg: BenchConfig, +) -> tuple[list[SpeedRow], list[dict[str, Any]]]: + all_rows: list[SpeedRow] = [] + all_raw: list[dict[str, Any]] = [] + for ep in endpoints: + print(f"[speed] === {ep.name} ===") + # Tiny warmup so the first row isn't penalized by lazy cache allocation. + warm_messages = [[{"role": "user", "content": "Hello"}]] + asyncio.run(chat_concurrent( + ep.base_url, ep.model_id, warm_messages, + max_tokens=8, temperature=0.0, api_key=ep.api_key, + timeout=120, concurrency=1, + )) + + rows1, raw1 = asyncio.run(run_single_stream(ep, cfg)) + all_rows.extend(rows1); all_raw.extend(raw1) + for r in rows1: + print(f" {r.scenario:18s} ttft p50={r.ttft_ms_p50:7.1f}ms " + f"tpot p50={r.tpot_ms_p50:6.2f}ms thpt={r.throughput_tok_s:6.1f} tok/s") + + rows2, raw2 = asyncio.run(run_concurrent(ep, cfg)) + all_rows.extend(rows2); all_raw.extend(raw2) + for r in rows2: + print(f" {r.scenario:18s} reqs={r.requests:3d} thpt={r.throughput_tok_s:6.1f} tok/s " + f"ttft p95={r.ttft_ms_p95:7.1f}ms errs={r.errors}") + + return all_rows, all_raw + + +def rows_to_dicts(rows: list[SpeedRow]) -> list[dict[str, Any]]: + return [asdict(r) for r in rows] diff --git a/tools/bench/tasks/__init__.py b/tools/bench/tasks/__init__.py new file mode 100644 index 0000000..9718cef --- /dev/null +++ b/tools/bench/tasks/__init__.py @@ -0,0 +1,46 @@ +"""Shared helpers for quality tasks. + +Each task can be backed by a pre-fetched local JSON file (so the GPU host +doesn't need network). The JSON is a list of records: + [{"id": str, "problem": str, "answer": str, "source": str}, ...] + +Use tools/bench/fetch_datasets.py on a networked machine to produce these +files, then ship them to the GPU host (the bench sync does this automatically). +""" + +from __future__ import annotations + +import json +import os +from typing import Any + + +def data_dir() -> str: + """Directory holding pre-fetched dataset JSON. Override via BENCH_DATA_DIR.""" + return os.environ.get( + "BENCH_DATA_DIR", + os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data"), + ) + + +def local_json_path(task_name: str) -> str: + return os.path.normpath(os.path.join(data_dir(), f"{task_name}.json")) + + +def load_local(task_name: str) -> list[dict[str, Any]] | None: + """Return records from the local JSON file if present, else None.""" + path = local_json_path(task_name) + if not os.path.isfile(path): + return None + with open(path) as f: + records = json.load(f) + print(f"[tasks] loaded {len(records)} records from {path}") + return records + + +def save_local(task_name: str, records: list[dict[str, Any]]) -> str: + path = local_json_path(task_name) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + json.dump(records, f, ensure_ascii=False, indent=1) + return path diff --git a/tools/bench/tasks/aime.py b/tools/bench/tasks/aime.py new file mode 100644 index 0000000..00baedf --- /dev/null +++ b/tools/bench/tasks/aime.py @@ -0,0 +1,114 @@ +"""AIME 2025 — 30 problems, integer answers 0..999. + +Scoring: exact-match of the integer in the last `\\boxed{...}` in the response, +falling back to the last standalone integer in the response. Matches the +convention used by most reasoning-model leaderboards. +""" + +from __future__ import annotations + +import re +from typing import Any + +from . import load_local + +TASK_NAME = "aime2025" + +# Tried in order; first one to load wins. These are the most-cited HF datasets +# for AIME 2025 at time of writing; we don't depend on any one being present. +DATASET_CANDIDATES = [ + ("MathArena/aime_2025", None, "test"), + ("yentinglin/aime_2025", None, "train"), + ("opencompass/AIME2025", "AIME2025-I", "test"), +] + + +def load() -> list[dict[str, Any]]: + # Prefer the pre-fetched local JSON (GPU host has no network). + local = load_local(TASK_NAME) + if local is not None: + return local + return load_remote() + + +def load_remote() -> list[dict[str, Any]]: + """Fetch from HuggingFace. Requires network — used by fetch_datasets.py.""" + from datasets import load_dataset # noqa: PLC0415 — optional dep, see requirements.txt + + last_err: Exception | None = None + for repo, config, split in DATASET_CANDIDATES: + try: + ds = load_dataset(repo, config, split=split) if config else load_dataset(repo, split=split) + except Exception as e: # noqa: BLE001 — try the next candidate + last_err = e + continue + + problems: list[dict[str, Any]] = [] + for i, row in enumerate(ds): + problem = row.get("problem") or row.get("question") or row.get("Problem") + answer = row.get("answer") or row.get("Answer") or row.get("solution_int") + if problem is None or answer is None: + continue + problems.append({ + "id": str(row.get("id") or row.get("ID") or i), + "problem": problem, + "answer": str(answer).strip(), + "source": repo, + }) + if problems: + return problems + + raise RuntimeError( + f"Could not load AIME 2025 from any of {[c[0] for c in DATASET_CANDIDATES]} " + f"(last error: {last_err!r}). Set HF_HOME / HF_TOKEN if needed." + ) + + +SYSTEM_PROMPT = ( + "You are a careful math problem solver. Solve the problem step by step. " + "Put your final integer answer (an integer from 0 to 999) inside \\boxed{}." +) + + +def make_messages(problem: str) -> list[dict[str, str]]: + return [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": problem}, + ] + + +_BOXED_RE = re.compile(r"\\boxed\s*\{([^{}]*)\}") +_INT_RE = re.compile(r"-?\d+") + + +def extract_answer(text: str) -> str | None: + """Return canonical integer string, or None if nothing parseable.""" + if not text: + return None + boxed = _BOXED_RE.findall(text) + candidates: list[str] = [] + if boxed: + # Inside the \boxed{} there may be extra latex; grab the last integer. + ints = _INT_RE.findall(boxed[-1]) + if ints: + candidates.append(ints[-1]) + # Fallback: the last integer anywhere in the response. + if not candidates: + ints = _INT_RE.findall(text) + if ints: + candidates.append(ints[-1]) + if not candidates: + return None + try: + return str(int(candidates[-1])) + except ValueError: + return None + + +def score(pred: str | None, gold: str) -> bool: + if pred is None: + return False + try: + return int(pred) == int(gold) + except ValueError: + return False diff --git a/tools/bench/tasks/gsm8k.py b/tools/bench/tasks/gsm8k.py new file mode 100644 index 0000000..21acde3 --- /dev/null +++ b/tools/bench/tasks/gsm8k.py @@ -0,0 +1,90 @@ +"""GSM8K — 1319 grade-school math problems with integer/decimal answers. + +Gold answers in the dataset are in the form `... #### 42`. We score by +exact-match of the final number, with the same `\\boxed{}` / last-number +extraction used for AIME, since for instruction-tuned models the response +follows the prompt instructions, not the dataset's `####` convention. +""" + +from __future__ import annotations + +import re +from typing import Any + +from . import load_local + +TASK_NAME = "gsm8k" + + +def load() -> list[dict[str, Any]]: + local = load_local(TASK_NAME) + if local is not None: + return local + return load_remote() + + +def load_remote() -> list[dict[str, Any]]: + """Fetch from HuggingFace. Requires network — used by fetch_datasets.py.""" + from datasets import load_dataset # noqa: PLC0415 + + ds = load_dataset("openai/gsm8k", "main", split="test") + out: list[dict[str, Any]] = [] + for i, row in enumerate(ds): + ans_full: str = row["answer"] + # gold format: "\n#### 42" + gold = ans_full.split("####")[-1].strip().replace(",", "") + out.append({ + "id": str(i), + "problem": row["question"], + "answer": gold, + "source": "openai/gsm8k", + }) + return out + + +SYSTEM_PROMPT = ( + "You are a careful math problem solver. Solve the problem step by step. " + "Put your final numeric answer inside \\boxed{}." +) + + +def make_messages(problem: str) -> list[dict[str, str]]: + return [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": problem}, + ] + + +_BOXED_RE = re.compile(r"\\boxed\s*\{([^{}]*)\}") +# Allow comma-grouped thousands (e.g. "3,500"); _normalize_num strips them. +_NUM_RE = re.compile(r"-?\d+(?:,\d{3})*(?:\.\d+)?") + + +def _normalize_num(s: str) -> str | None: + s = s.replace(",", "").strip() + try: + f = float(s) + except ValueError: + return None + return str(int(f)) if f.is_integer() else f"{f:g}" + + +def extract_answer(text: str) -> str | None: + if not text: + return None + boxed = _BOXED_RE.findall(text) + if boxed: + nums = _NUM_RE.findall(boxed[-1]) + if nums: + return _normalize_num(nums[-1]) + nums = _NUM_RE.findall(text) + if nums: + return _normalize_num(nums[-1]) + return None + + +def score(pred: str | None, gold: str) -> bool: + if pred is None: + return False + gold_norm = _normalize_num(gold) + return gold_norm is not None and pred == gold_norm diff --git a/tools/convert-to-gguf.sh b/tools/convert-to-gguf.sh new file mode 100755 index 0000000..d631e00 --- /dev/null +++ b/tools/convert-to-gguf.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# Convert a HuggingFace safetensors model dir into a BF16 GGUF for llama.cpp. +# +# Why BF16: we run xserv in BF16, so the baseline must run BF16 too. If we +# compared xserv-BF16 against llama.cpp-Q4_K_M the speed delta would be +# dominated by quantization, not by our kernels — that's not an apples-to- +# apples comparison. +# +# Usage: +# tools/convert-to-gguf.sh [out.gguf] +# +# Example: +# tools/convert-to-gguf.sh /opt/wjh/models/qwen3-8b +# # → /opt/wjh/models/qwen3-8b/qwen3-8b-bf16.gguf + +set -euo pipefail + +if [ "$#" -lt 1 ]; then + echo "Usage: $0 [out.gguf]" >&2 + exit 1 +fi + +SRC="$(realpath "$1")" +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +CONVERT_PY="$ROOT_DIR/third_party/llama.cpp/convert_hf_to_gguf.py" + +if [ ! -f "$CONVERT_PY" ]; then + echo "convert script not found: $CONVERT_PY" >&2 + echo "Run tools/setup-llama-cpp.sh first." >&2 + exit 1 +fi + +if [ ! -d "$SRC" ]; then + echo "source model dir not found: $SRC" >&2 + exit 1 +fi + +if [ "$#" -ge 2 ]; then + OUT="$2" +else + BASENAME="$(basename "$SRC")" + OUT="$SRC/${BASENAME}-bf16.gguf" +fi + +if [ -f "$OUT" ]; then + echo "==> already exists: $OUT (skipping; remove to force re-convert)" + echo "$OUT" + exit 0 +fi + +echo "==> converting $SRC -> $OUT (BF16)" +python3 "$CONVERT_PY" "$SRC" --outfile "$OUT" --outtype bf16 + +echo "=== done ===" +echo "$OUT" diff --git a/tools/setup-llama-cpp.sh b/tools/setup-llama-cpp.sh new file mode 100755 index 0000000..4cca9b9 --- /dev/null +++ b/tools/setup-llama-cpp.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash +# Build the llama.cpp baseline (third_party/llama.cpp) with CUDA. +# +# Source is vendored as a git submodule pinned to a fixed tag (see .gitmodules +# and the recorded gitlink commit). This script does NOT fetch from the network +# by default — it expects the source to already be present, either via: +# - `git submodule update --init` (on a host with network), or +# - rsync/tar transfer (how it reaches dash5, which has no network). +# +# It only fetches as a convenience fallback when the source is missing AND +# network is reachable. +# +# Idempotent. Safe to re-run. +# +# Usage: +# tools/setup-llama-cpp.sh # build (configure if needed) +# tools/setup-llama-cpp.sh --rebuild # wipe build dir, reconfigure, rebuild +# +# Env: +# CUDA_ARCH CUDA architectures for cmake (default 120-real = RTX 5090 SM120) +# CUDA_HOME CUDA toolkit root (auto-detected: /usr/local/cuda-12.9 then cuda) + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +VENDOR_DIR="$ROOT_DIR/third_party/llama.cpp" +CUDA_ARCH="${CUDA_ARCH:-120-real}" +REBUILD=0 +for arg in "$@"; do + case "$arg" in + --rebuild) REBUILD=1 ;; + --help|-h) grep -E '^#' "$0" | sed 's/^# \{0,1\}//'; exit 0 ;; + esac +done + +if [ -d /usr/local/cuda-12.9 ]; then + export CUDA_HOME="${CUDA_HOME:-/usr/local/cuda-12.9}" +elif [ -d /usr/local/cuda ]; then + export CUDA_HOME="${CUDA_HOME:-/usr/local/cuda}" +fi +[ -n "${CUDA_HOME:-}" ] && export PATH="$CUDA_HOME/bin:$PATH" + +echo "=== llama.cpp build ===" +echo " vendor dir : $VENDOR_DIR" +echo " CUDA arch : $CUDA_ARCH" +echo " CUDA_HOME : ${CUDA_HOME:-}" + +# --- Ensure source is present --- +if [ ! -f "$VENDOR_DIR/CMakeLists.txt" ]; then + echo "==> source missing at $VENDOR_DIR" + if git -C "$ROOT_DIR" rev-parse --git-dir >/dev/null 2>&1 \ + && timeout 8 git ls-remote https://github.com/ggerganov/llama.cpp HEAD >/dev/null 2>&1; then + echo "==> network OK, initializing submodule" + git -C "$ROOT_DIR" submodule update --init --recursive third_party/llama.cpp + else + echo "ERROR: llama.cpp source not present and network unavailable." >&2 + echo " On a networked host run: git submodule update --init third_party/llama.cpp" >&2 + echo " Then transfer the source here (the bench tooling does this via rsync)." >&2 + exit 1 + fi +fi + +BUILD_DIR="$VENDOR_DIR/build" +if [ "$REBUILD" -eq 1 ] && [ -d "$BUILD_DIR" ]; then + echo "==> --rebuild: removing $BUILD_DIR" + rm -rf "$BUILD_DIR" +fi + +SERVER_BIN="$BUILD_DIR/bin/llama-server" +if [ -x "$SERVER_BIN" ] && [ "$REBUILD" -eq 0 ]; then + echo "==> already built: $SERVER_BIN (use --rebuild to force)" + echo "$SERVER_BIN" + exit 0 +fi + +echo "==> cmake configure" +cmake -S "$VENDOR_DIR" -B "$BUILD_DIR" \ + -DGGML_CUDA=ON \ + -DLLAMA_CURL=OFF \ + -DLLAMA_BUILD_TESTS=OFF \ + -DLLAMA_BUILD_EXAMPLES=OFF \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CUDA_ARCHITECTURES="$CUDA_ARCH" + +echo "==> build llama-server llama-cli (jobs: $(nproc))" +cmake --build "$BUILD_DIR" --target llama-server llama-cli -j "$(nproc)" + +if [ ! -x "$SERVER_BIN" ]; then + echo "ERROR: llama-server did not build at $SERVER_BIN" >&2 + exit 1 +fi + +echo "=== done ===" +echo "$SERVER_BIN" diff --git a/tools/sync-and-build.sh b/tools/sync-and-build.sh index aa4db1d..abf1b1a 100755 --- a/tools/sync-and-build.sh +++ b/tools/sync-and-build.sh @@ -1,25 +1,102 @@ #!/bin/bash -# Sync local project to dash5 and build/test there. -# Usage: ./tools/sync-and-build.sh [test|build|run] +# Sync local project to dash5 and build/test/bench there. +# +# Usage: +# ./tools/sync-and-build.sh [test|build|run] +# Runs `cargo --release` on dash5. +# +# ./tools/sync-and-build.sh bench [-- ] +# Ensures llama.cpp is built (tools/setup-llama-cpp.sh) and a BF16 GGUF +# exists, then runs tools/bench/runner.py against both xserv-server and +# llama-server. Result lands in $REMOTE_DIR/bench-out/. +# +# ./tools/sync-and-build.sh fetch-bench-out +# Copies dash5:$REMOTE_DIR/bench-out/ back to ./bench-out/. set -e REMOTE="dash5" REMOTE_DIR="/opt/wjh/projects/xserv" +REMOTE_MODEL_DIR="${REMOTE_MODEL_DIR:-/opt/wjh/models/qwen3-8b}" LOCAL_DIR="$(cd "$(dirname "$0")/.." && pwd)" ACTION="${1:-build}" +shift || true -echo "=== Syncing to $REMOTE:$REMOTE_DIR ===" -ssh "$REMOTE" "mkdir -p $REMOTE_DIR" -rsync -az --delete \ - --exclude target \ - --exclude .git \ - "$LOCAL_DIR/" "$REMOTE:$REMOTE_DIR/" +cuda_env='if [ -d /usr/local/cuda-12.9 ]; then export CUDA_HOME=/usr/local/cuda-12.9; else export CUDA_HOME=/usr/local/cuda; fi && export PATH=$CUDA_HOME/bin:/usr/local/cuda/bin:$PATH' -echo "=== Running: cargo $ACTION ===" -ssh "$REMOTE" "source \$HOME/.cargo/env && \ - export PATH=/usr/local/cuda/bin:\$PATH && \ - export CUDA_HOME=/usr/local/cuda && \ - cd $REMOTE_DIR && \ - cargo $ACTION --release 2>&1" +sync_project() { + echo "=== Syncing to $REMOTE:$REMOTE_DIR ===" + # Preserve `target/`, `third_party/` (large + arch-specific) and `bench-out/` + # on the remote side. Everything else is wiped + replaced. + ssh "$REMOTE" "mkdir -p $REMOTE_DIR && find $REMOTE_DIR -mindepth 1 -maxdepth 1 ! -name target ! -name third_party ! -name bench-out -exec rm -rf {} +" + tar --exclude target --exclude third_party --exclude bench-out --exclude .git \ + -C "$LOCAL_DIR" -czf - . \ + | ssh "$REMOTE" "tar -xzf - -C $REMOTE_DIR" +} + +sync_llama_src() { + # dash5 has no network (and no rsync), so we transfer the llama.cpp submodule + # working tree (source only — never the build dir or .git) via tar-over-ssh. + local src="$LOCAL_DIR/third_party/llama.cpp" + if [ ! -f "$src/CMakeLists.txt" ]; then + echo "ERROR: llama.cpp source not found at $src" >&2 + echo " Run: git submodule update --init third_party/llama.cpp" >&2 + exit 1 + fi + echo "=== Syncing llama.cpp source to $REMOTE (tar) ===" + # Preserve the remote build/ dir; only refresh source files. + ssh "$REMOTE" "mkdir -p $REMOTE_DIR/third_party/llama.cpp" + tar --exclude build --exclude .git --exclude '*.gguf' \ + -C "$src" -czf - . \ + | ssh "$REMOTE" "tar -xzf - -C $REMOTE_DIR/third_party/llama.cpp" +} + +case "$ACTION" in + test|build|run|check|clippy) + sync_project + echo "=== Running: cargo $ACTION ===" + ssh "$REMOTE" "source \$HOME/.cargo/env && $cuda_env && cd $REMOTE_DIR && cargo $ACTION --release 2>&1" + ;; + + bench) + sync_project + sync_llama_src + echo "=== Ensuring llama.cpp baseline is built ===" + ssh "$REMOTE" "source \$HOME/.cargo/env && $cuda_env && cd $REMOTE_DIR && \ + ./tools/setup-llama-cpp.sh 2>&1" + + echo "=== Ensuring BF16 GGUF exists for $REMOTE_MODEL_DIR ===" + # Returned path on stdout's last line is what we feed --llama-gguf. + GGUF_PATH=$(ssh "$REMOTE" "$cuda_env && cd $REMOTE_DIR && \ + ./tools/convert-to-gguf.sh $REMOTE_MODEL_DIR 2>&1 | tail -1") + echo " gguf: $GGUF_PATH" + + echo "=== Building xserv (release) ===" + ssh "$REMOTE" "source \$HOME/.cargo/env && $cuda_env && cd $REMOTE_DIR && \ + cargo build --release 2>&1" + + echo "=== Running benchmark suite ===" + ssh "$REMOTE" "$cuda_env && cd $REMOTE_DIR && \ + python3 -m tools.bench.runner \ + --xserv-bin ./target/release/xserv-server \ + --xserv-model $REMOTE_MODEL_DIR \ + --llama-bin third_party/llama.cpp/build/bin/llama-server \ + --llama-gguf $GGUF_PATH \ + $* 2>&1" + ;; + + fetch-bench-out) + mkdir -p "$LOCAL_DIR/bench-out" + echo "=== Fetching bench-out from $REMOTE:$REMOTE_DIR/bench-out (tar) ===" + ssh "$REMOTE" "tar -C $REMOTE_DIR/bench-out -czf - ." \ + | tar -xzf - -C "$LOCAL_DIR/bench-out" + echo " -> $LOCAL_DIR/bench-out/" + ;; + + *) + echo "Unknown action: $ACTION" >&2 + echo "Usage: $0 {build|test|run|check|clippy|bench|fetch-bench-out} [-- extra args]" >&2 + exit 2 + ;; +esac