Files
xserv/tools/xserv_vs_llama.py
Gahow Wang cf1e9e41db tools: single-stream decode benchmark vs llama.cpp
xserv_vs_llama.py runs each server one at a time on the same GPUs (drains VRAM
between), streams identical prompts through /v1/chat/completions, and reports
median TTFT/TPOT/throughput. Counts llama's reasoning_content as real decode
tokens so the gpt-oss CoT is measured fairly.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 15:01:42 +08:00

227 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""Single-stream decode-speed comparison: xserv vs llama.cpp on the same GPUs.
Runs each server one at a time (drains VRAM between), streams identical prompts
through /v1/chat/completions, and reports median TTFT / TPOT / throughput. Both
servers are OpenAI-compatible, so the same streaming client drives both.
Run ON the GPU box:
python3 tools/xserv_vs_llama.py \
--xserv-model /opt/wjh/models/gpt-oss-20b-fp8 --xserv-tp 2 \
--llama-gguf /opt/wjh/models/gpt-oss-20b-gguf/gpt-oss-20b-mxfp4.gguf \
--llama-tp 2 --gpus 0,1 --reps 6 --max-tokens 256
"""
import argparse
import json
import os
import signal
import subprocess
import time
import urllib.request
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
XSERV_BIN = SCRIPT_DIR.parent / "target" / "release" / "xserv-server"
LLAMA_BIN = SCRIPT_DIR.parent / "third_party" / "llama.cpp" / "build" / "bin" / "llama-server"
PROMPTS = {
"short": "What is the capital of France? Answer in one sentence.",
"medium": ("Explain how backpropagation trains a neural network, covering the "
"forward pass, the chain rule, gradient descent, and weight updates."),
"long": ("Summarize, then critique, the following claim in detail: modern large "
"language models understand language the way humans do. " * 6
+ "Give a structured, multi-paragraph response."),
}
def gpu_max_mem_mb(gpus):
out = subprocess.check_output(
["nvidia-smi", "--query-gpu=index,memory.used", "--format=csv,noheader,nounits"],
text=True)
used = {int(i): int(m) for i, m in (l.split(",") for l in out.strip().splitlines())}
return max(used.get(g, 0) for g in gpus)
def drain(gpus, below_mb=2000, timeout=120):
t0 = time.time()
while time.time() - t0 < timeout:
if gpu_max_mem_mb(gpus) < below_mb:
return
time.sleep(2)
def start(cmd, gpus, log_path):
env = dict(os.environ)
env["CUDA_VISIBLE_DEVICES"] = ",".join(str(g) for g in gpus)
logf = open(log_path, "wb")
return subprocess.Popen(cmd, stdout=logf, stderr=subprocess.STDOUT,
env=env, start_new_session=True)
def stop(p, gpus):
if p.poll() is None:
try:
os.killpg(os.getpgid(p.pid), signal.SIGTERM)
except ProcessLookupError:
pass
try:
p.wait(timeout=30)
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except ProcessLookupError:
pass
drain(gpus)
def wait_ready(base, model_id, timeout=900):
body = json.dumps({"model": model_id, "messages": [{"role": "user", "content": "hi"}],
"max_tokens": 1, "temperature": 0.0, "stream": False}).encode()
t0 = time.time()
while time.time() - t0 < timeout:
try:
req = urllib.request.Request(base + "/v1/chat/completions", data=body,
headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=120) as r:
if r.status == 200:
json.loads(r.read())
return True
except Exception:
time.sleep(3)
return False
def stream_chat(base, model_id, user, max_tokens):
body = json.dumps({"model": model_id, "messages": [{"role": "user", "content": user}],
"max_tokens": max_tokens, "temperature": 0.0, "stream": True}).encode()
req = urllib.request.Request(base + "/v1/chat/completions", data=body,
headers={"Content-Type": "application/json"})
t0 = time.perf_counter()
ttft = None
t_last = t0
n = 0
with urllib.request.urlopen(req, timeout=300) as resp:
for raw in resp:
line = raw.decode("utf-8", "ignore").strip()
if not line.startswith("data:"):
continue
data = line[5:].strip()
if data == "[DONE]":
break
try:
obj = json.loads(data)
except json.JSONDecodeError:
continue
delta = obj["choices"][0].get("delta", {})
# gpt-oss reasoning models split CoT into reasoning_content (llama.cpp)
# vs raw harmony in content (xserv); count BOTH as real decode steps.
piece = delta.get("content") or delta.get("reasoning_content")
if piece:
now = time.perf_counter()
if ttft is None:
ttft = now - t0
n += 1
t_last = now
ttft = ttft if ttft is not None else (time.perf_counter() - t0)
tpot = (t_last - t0 - ttft) / (n - 1) if n > 1 else 0.0
return ttft, tpot, n
def median(xs):
s = sorted(xs)
return s[len(s) // 2] if s else 0.0
def bench(base, model_id, reps, max_tokens):
# warmup
for _ in range(2):
stream_chat(base, model_id, PROMPTS["short"], 16)
out = {}
for name, prompt in PROMPTS.items():
ttfts, tpots, toks = [], [], []
for _ in range(reps):
ttft, tpot, n = stream_chat(base, model_id, prompt, max_tokens)
ttfts.append(ttft * 1000)
if tpot > 0:
tpots.append(tpot * 1000)
toks.append(n)
out[name] = {
"ttft_ms": median(ttfts), "tpot_ms": median(tpots),
"tok_s": 1000.0 / median(tpots) if median(tpots) > 0 else 0.0,
"mean_tok": sum(toks) / len(toks),
}
print(f" {name:7s} ttft={out[name]['ttft_ms']:7.1f}ms tpot={out[name]['tpot_ms']:6.2f}ms "
f"{out[name]['tok_s']:6.1f} tok/s (n={out[name]['mean_tok']:.0f})", flush=True)
return out
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--xserv-model", required=True)
ap.add_argument("--xserv-tp", type=int, default=2)
ap.add_argument("--llama-gguf", required=True)
ap.add_argument("--llama-tp", type=int, default=2)
ap.add_argument("--gpus", default="0,1")
ap.add_argument("--reps", type=int, default=6)
ap.add_argument("--max-tokens", type=int, default=256)
ap.add_argument("--port", type=int, default=18080)
ap.add_argument("--ctx", type=int, default=4096)
args = ap.parse_args()
gpus = [int(g) for g in args.gpus.split(",")]
base = f"http://127.0.0.1:{args.port}"
results = {}
# ---- xserv ----
xid = Path(args.xserv_model).name
xcmd = [str(XSERV_BIN), str(args.xserv_model), "--port", str(args.port),
"--tp", str(args.xserv_tp), "--max-seq-len", "2048", "--max-batch", "8"]
print(f"=== xserv ({xid}, tp={args.xserv_tp}, gpus={gpus}) ===", flush=True)
p = start(xcmd, gpus, "/tmp/cmp_xserv.log")
try:
if wait_ready(base, xid):
results["xserv"] = bench(base, xid, args.reps, args.max_tokens)
else:
print(" xserv NOT READY:", subprocess.run(["tail", "-20", "/tmp/cmp_xserv.log"],
capture_output=True, text=True).stdout)
finally:
stop(p, gpus)
# ---- llama.cpp ----
lcmd = [str(LLAMA_BIN), "-m", str(args.llama_gguf), "--port", str(args.port),
"--host", "127.0.0.1", "-c", str(args.ctx), "-ngl", "99", "--parallel", "1"]
if args.llama_tp > 1:
lcmd += ["--split-mode", "row"]
print(f"\n=== llama.cpp ({Path(args.llama_gguf).name}, tp={args.llama_tp}, gpus={gpus}) ===", flush=True)
p = start(lcmd, gpus, "/tmp/cmp_llama.log")
try:
# llama-server accepts any model field
if wait_ready(base, "gpt-oss", timeout=300):
results["llama"] = bench(base, "gpt-oss", args.reps, args.max_tokens)
else:
print(" llama NOT READY:", subprocess.run(["tail", "-30", "/tmp/cmp_llama.log"],
capture_output=True, text=True).stdout)
finally:
stop(p, gpus)
# ---- summary ----
print(f"\n{'='*70}\n SUMMARY — single-stream decode (gpt-oss-20b)\n{'='*70}")
print(f"{'prompt':8s} {'metric':10s} {'xserv-FP8':>12s} {'llama':>12s} {'ratio':>8s}")
for name in PROMPTS:
x = results.get("xserv", {}).get(name)
l = results.get("llama", {}).get(name)
if not x or not l:
continue
for key, lab in [("ttft_ms", "TTFT ms"), ("tpot_ms", "TPOT ms"), ("tok_s", "tok/s")]:
xv, lv = x[key], l[key]
ratio = (lv / xv) if xv else 0
print(f"{name:8s} {lab:10s} {xv:12.2f} {lv:12.2f} {ratio:7.2f}x")
with open(f"/tmp/xserv_vs_llama_{int(time.time())}.json", "w") as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()