Unified bench report: mean+TPS+per-worker GPU util, auto-captured

scripts/bench_report.py is now the canonical analyzer: per run + per input-
class it emits TTFT/TPOT/E2E mean+p50+p90+p99, decode/prefill TPS (aggregate
and per-worker), APC, per-worker GPU util mean/max, and load-spread ratios.

b3_isolated_policy.sh auto-captures the inputs for every run: gpu_util.csv
(via gpu_monitor.sh, 5s, replay-window only) + bench_config.json (worker->GPU
map); teardown stops the sampler. Future runs populate per-worker GPU util
automatically.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 16:08:22 +08:00
parent d9046322c6
commit 160c29133d
2 changed files with 290 additions and 0 deletions

View File

@@ -50,6 +50,7 @@ mkdir -p "$RUNDIR/engine_state" "$RUNDIR/logs"
echo "[isolated] policy=$POLICY trace=$(basename $TRACE) rundir=$RUNDIR" echo "[isolated] policy=$POLICY trace=$(basename $TRACE) rundir=$RUNDIR"
cleanup() { cleanup() {
pkill -f gpu_monitor.sh 2>/dev/null || true
pkill -9 -f cache_aware_proxy 2>/dev/null || true pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true pkill -9 -f "EngineCore" 2>/dev/null || true
@@ -183,6 +184,18 @@ until curl -sf "http://127.0.0.1:$PROXY_PORT/stats" >/dev/null 2>&1; do
sleep 2 sleep 2
done done
# Unified bench infra: record worker->GPU mapping + sample per-GPU util during
# the replay so bench_report.py can emit per-worker GPU util / TPS for every run.
python3 - "$RUNDIR" "$BASE_PORT" "$PROXY_PORT" "$GPU_INDICES" "$N_INSTANCES" <<'PY'
import json, sys
rundir, base_port, proxy_port, gpu_indices, n = sys.argv[1:]
json.dump({"base_port": int(base_port), "proxy_port": int(proxy_port),
"gpu_indices": [int(x) for x in gpu_indices.split()],
"n_instances": int(n)}, open(f"{rundir}/bench_config.json", "w"), indent=2)
PY
bash "$ROOT/scripts/gpu_monitor.sh" "$RUNDIR/gpu_util.csv" 5 >/dev/null 2>&1 &
GPU_MON_PID=$!
t_start=$(date +%s.%N) t_start=$(date +%s.%N)
echo "[isolated] running replayer ..." echo "[isolated] running replayer ..."
PYTHONPATH="$ROOT" "$VENV/python" -m replayer \ PYTHONPATH="$ROOT" "$VENV/python" -m replayer \
@@ -192,6 +205,7 @@ PYTHONPATH="$ROOT" "$VENV/python" -m replayer \
--model "$MODEL" \ --model "$MODEL" \
2>&1 | tee "$RUNDIR/replayer.log" | tail -3 2>&1 | tee "$RUNDIR/replayer.log" | tail -3
t_end=$(date +%s.%N) t_end=$(date +%s.%N)
kill "${GPU_MON_PID:-}" 2>/dev/null || true
python3 - "$RUNDIR" "$POLICY" "$TRACE" "$t_start" "$t_end" <<'PY' python3 - "$RUNDIR" "$POLICY" "$TRACE" "$t_start" "$t_end" <<'PY'
import json, sys import json, sys

276
scripts/bench_report.py Normal file
View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python3
"""Unified benchmark report — the canonical analyzer for all EAR runs.
Per run dir (produced by b3_isolated_policy.sh) it reads:
metrics.jsonl per-request latency (ttft/tpot/e2e), output tokens, timing
breakdown.json per-request routing (chosen_idx, input_length, cache_hit, decision)
run_window.json replay window {t_start_unix, t_end_unix}
bench_config.json (optional) {base_port, gpu_indices, n_instances} worker->gpu map
gpu_util.csv (optional) timestamp,gpu,util_pct,mem_used_mb,mem_total_mb,power_w
and emits, for every run and every input-class:
TTFT / TPOT / E2E mean + p50 + p90 + p99
TPS decode(output) tok/s, prefill(new) tok/s (aggregate + per-worker)
APC proxy-side prefix-cache hit ratio
per-worker n, out-TPS, prefill-TPS, TTFT p90, GPU util mean/max, mem
spread n / TTFT-p90 / GPU-util max:min ratios
decisions affinity vs lmetric_fallback counts (unified family)
Usage:
bench_report.py --root <run_dir> # single run
bench_report.py --root <root_dir> arm1 arm2 ... # multi-arm compare
(no arms given under a root => auto-discover subdirs with metrics.jsonl)
"""
import argparse
import csv
import json
import statistics
from pathlib import Path
CLASSES = [("WARM<5k", 0, 5000), ("MED5-20k", 5000, 20000),
("HEAVY20-50k", 20000, 50000), ("HEAVY+>50k", 50000, 10**12)]
def pct(xs, p):
if not xs:
return None
xs = sorted(xs)
k = max(0, min(len(xs) - 1, int(round(p / 100.0 * (len(xs) - 1)))))
return xs[k]
def stat(xs):
xs = [x for x in xs if x is not None]
if not xs:
return {"n": 0, "mean": None, "p50": None, "p90": None, "p99": None}
return {"n": len(xs), "mean": statistics.fmean(xs),
"p50": pct(xs, 50), "p90": pct(xs, 90), "p99": pct(xs, 99)}
def rid(r):
for k in ("request_id", "proxy_request_id", "req_id", "id"):
if r.get(k):
return r[k]
return None
def load_jsonl(p):
return [json.loads(l) for l in open(p) if l.strip()]
def load_breakdown(p):
if not p.exists():
return {}
raw = json.load(open(p))
recs = raw if isinstance(raw, list) else raw.get("breakdown", raw.get("requests", []))
if isinstance(recs, dict):
recs = list(recs.values())
return {rid(r): r for r in recs if rid(r)}
def gpu_util_in_window(csv_path, t0, t1, worker_to_gpu):
"""Return {worker_idx: {util_mean, util_max, mem_max_mb}} over [t0,t1]."""
if not csv_path.exists():
return {}
by_gpu = {}
with open(csv_path) as f:
for row in csv.DictReader(f):
try:
ts = float(row["timestamp"])
g = int(row["gpu"])
u = float(row["util_pct"])
m = float(row["mem_used_mb"])
except (ValueError, KeyError, TypeError):
continue
if t0 is not None and not (t0 <= ts <= t1):
continue
by_gpu.setdefault(g, {"util": [], "mem": []})
by_gpu[g]["util"].append(u)
by_gpu[g]["mem"].append(m)
out = {}
gpu_to_worker = {gpu: w for w, gpu in worker_to_gpu.items()}
for g, d in by_gpu.items():
w = gpu_to_worker.get(g, g)
out[w] = {"util_mean": statistics.fmean(d["util"]) if d["util"] else None,
"util_max": max(d["util"]) if d["util"] else None,
"mem_max_mb": max(d["mem"]) if d["mem"] else None,
"samples": len(d["util"])}
return out
def analyze(run_dir):
m_path = run_dir / "metrics.jsonl"
if not m_path.exists():
return None
metrics = load_jsonl(m_path)
bd = load_breakdown(run_dir / "breakdown.json")
win = {}
wp = run_dir / "run_window.json"
if wp.exists():
win = json.load(open(wp))
cfg = {}
cp = run_dir / "bench_config.json"
if cp.exists():
cfg = json.load(open(cp))
gpu_indices = cfg.get("gpu_indices") # list aligned to worker idx
worker_to_gpu = ({i: g for i, g in enumerate(gpu_indices)} if gpu_indices else {})
ok = [r for r in metrics if not r.get("error")]
t0 = win.get("t_start_unix")
t1 = win.get("t_end_unix")
if t0 is None:
ds = [r.get("t_dispatch_unix") for r in ok if r.get("t_dispatch_unix")]
fs = [r.get("t_finish_unix") for r in ok if r.get("t_finish_unix")]
t0 = min(ds) if ds else None
t1 = max(fs) if fs else None
window_s = (t1 - t0) if (t0 and t1) else None
def out_tok(r):
return r.get("actual_output_tokens") or r.get("output_length") or 0
def new_prefill_tok(r):
b = bd.get(rid(r), {})
ih = b.get("input_length", r.get("input_length", 0)) or 0
ch = b.get("cache_hit", r.get("cached_tokens", 0)) or 0
return max(0, ih - ch)
res = {
"n_total": len(metrics), "n_ok": len(ok),
"window_s": window_s,
"ttft_ms": stat([r["ttft_s"] * 1000 for r in ok if r.get("ttft_s") is not None]),
"tpot_ms": stat([r["tpot_s"] * 1000 for r in ok if r.get("tpot_s")]),
"e2e_ms": stat([r["latency_s"] * 1000 for r in ok if r.get("latency_s") is not None]),
}
tot_out = sum(out_tok(r) for r in ok)
tot_new = sum(new_prefill_tok(r) for r in ok)
tot_in = sum((bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0) for r in ok)
tot_cached = sum(min(bd.get(rid(r), {}).get("cache_hit", r.get("cached_tokens", 0)) or 0,
bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0)
for r in ok)
res["throughput"] = {
"decode_tps": tot_out / window_s if window_s else None,
"prefill_tps": tot_new / window_s if window_s else None,
"total_tps": (tot_out + tot_new) / window_s if window_s else None,
"total_output_tokens": tot_out, "total_new_prefill_tokens": tot_new,
}
res["apc"] = tot_cached / tot_in if tot_in else None
# per-worker (route from breakdown chosen_idx)
gpu = gpu_util_in_window(run_dir / "gpu_util.csv", t0, t1, worker_to_gpu)
by_w = {}
decisions = {}
for r in ok:
b = bd.get(rid(r))
if not b:
continue
w = b.get("chosen_idx", b.get("routed_to"))
by_w.setdefault(w, []).append(r)
d = b.get("decision")
if d:
decisions[d] = decisions.get(d, 0) + 1
per_worker = {}
for w, rs in sorted(by_w.items(), key=lambda x: str(x[0])):
o = sum(out_tok(r) for r in rs)
npf = sum(new_prefill_tok(r) for r in rs)
gw = gpu.get(w, {})
per_worker[str(w)] = {
"n": len(rs),
"decode_tps": o / window_s if window_s else None,
"prefill_tps": npf / window_s if window_s else None,
"ttft_p90_ms": pct([r["ttft_s"] * 1000 for r in rs if r.get("ttft_s") is not None], 90),
"gpu_util_mean": gw.get("util_mean"),
"gpu_util_max": gw.get("util_max"),
"gpu_mem_max_mb": gw.get("mem_max_mb"),
}
res["per_worker"] = per_worker
res["decisions"] = decisions
res["gpu_captured"] = bool(gpu)
# spread
ns = [v["n"] for v in per_worker.values()]
p90s = [v["ttft_p90_ms"] for v in per_worker.values() if v["ttft_p90_ms"]]
uts = [v["gpu_util_mean"] for v in per_worker.values() if v["gpu_util_mean"] is not None]
res["spread"] = {
"n_ratio": (max(ns) / max(min(ns), 1)) if ns else None,
"ttft_p90_ratio": (max(p90s) / max(min(p90s), 1e-9)) if p90s else None,
"gpu_util_ratio": (max(uts) / max(min(uts), 1e-9)) if uts else None,
"gpu_util_min": min(uts) if uts else None, "gpu_util_max": max(uts) if uts else None,
}
# per-class TTFT
per_class = {}
for name, lo, hi in CLASSES:
rs = [r for r in ok if lo <= (bd.get(rid(r), {}).get("input_length",
r.get("input_length", 0)) or 0) < hi]
per_class[name] = {"n": len(rs),
"ttft_ms": stat([r["ttft_s"] * 1000 for r in rs if r.get("ttft_s") is not None])}
res["per_class"] = per_class
return res
def f(v, d=0):
return f"{v:.{d}f}" if isinstance(v, (int, float)) else "-"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--root", type=Path, required=True)
ap.add_argument("arms", nargs="*")
ap.add_argument("--json", type=Path, default=None)
a = ap.parse_args()
if (a.root / "metrics.jsonl").exists() and not a.arms:
runs = {a.root.name: analyze(a.root)}
else:
arms = a.arms or [p.name for p in sorted(a.root.iterdir())
if p.is_dir() and (p / "metrics.jsonl").exists()]
runs = {arm: analyze(a.root / arm) for arm in arms}
runs = {k: v for k, v in runs.items() if v}
for label, key in [("TTFT (ms)", "ttft_ms"), ("TPOT (ms)", "tpot_ms"), ("E2E (ms)", "e2e_ms")]:
print(f"\n================ {label} ================")
h = f"{'arm':<14}{'n_ok':>6}{'mean':>10}{'p50':>10}{'p90':>10}{'p99':>10}"
print(h); print("-" * len(h))
for arm, r in runs.items():
s = r[key]
print(f"{arm:<14}{r['n_ok']:>6}{f(s['mean']):>10}{f(s['p50']):>10}{f(s['p90']):>10}{f(s['p99']):>10}")
print("\n================ THROUGHPUT / APC / GPU / SPREAD ================")
h = (f"{'arm':<14}{'win_s':>7}{'decTPS':>9}{'pfTPS':>9}{'APC':>7}"
f"{'GPUutil%':>9}{'n_ratio':>8}{'ttftR':>7}{'utilR':>7}")
print(h); print("-" * len(h))
for arm, r in runs.items():
tp = r["throughput"]; sp = r["spread"]
gu = (f"{f(sp['gpu_util_min'])}-{f(sp['gpu_util_max'])}" if sp.get("gpu_util_max") else "n/a")
print(f"{arm:<14}{f(r['window_s']):>7}{f(tp['decode_tps']):>9}{f(tp['prefill_tps']):>9}"
f"{f(r['apc'],3):>7}{gu:>9}{f(sp['n_ratio'],2):>8}{f(sp['ttft_p90_ratio'],2):>7}"
f"{f(sp['gpu_util_ratio'],2) if sp.get('gpu_util_ratio') else '-':>7}")
for arm, r in runs.items():
if r["decisions"]:
print(f" {arm} decisions: {r['decisions']}")
print("\n================ PER-WORKER ================")
for arm, r in runs.items():
gflag = "" if r["gpu_captured"] else " (gpu_util.csv absent — N/A)"
print(f"-- {arm}{gflag} --")
print(f" {'w':<4}{'n':>5}{'decTPS':>9}{'pfTPS':>9}{'TTFTp90':>9}{'util%mean':>10}{'util%max':>9}{'memMB':>9}")
for w, v in r["per_worker"].items():
print(f" {w:<4}{v['n']:>5}{f(v['decode_tps']):>9}{f(v['prefill_tps']):>9}"
f"{f(v['ttft_p90_ms']):>9}{f(v['gpu_util_mean'],1):>10}{f(v['gpu_util_max'],1):>9}{f(v['gpu_mem_max_mb']):>9}")
print("\n================ PER-CLASS TTFT (ms): mean / p50 / p90 / p99 ================")
for arm, r in runs.items():
print(f"-- {arm} --")
for name, v in r["per_class"].items():
s = v["ttft_ms"]
print(f" {name:<12} n={v['n']:>4} mean={f(s['mean']):>7} p50={f(s['p50']):>7} p90={f(s['p90']):>8} p99={f(s['p99']):>9}")
out = a.json or (a.root / "bench_summary.json")
out.write_text(json.dumps(runs, indent=2))
print(f"\nWrote {out}")
if __name__ == "__main__":
main()