From 160c29133daabbc85076131c6d9b0301dfcc4d38 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 29 May 2026 16:08:22 +0800 Subject: [PATCH] Unified bench report: mean+TPS+per-worker GPU util, auto-captured scripts/bench_report.py is now the canonical analyzer: per run + per input- class it emits TTFT/TPOT/E2E mean+p50+p90+p99, decode/prefill TPS (aggregate and per-worker), APC, per-worker GPU util mean/max, and load-spread ratios. b3_isolated_policy.sh auto-captures the inputs for every run: gpu_util.csv (via gpu_monitor.sh, 5s, replay-window only) + bench_config.json (worker->GPU map); teardown stops the sampler. Future runs populate per-worker GPU util automatically. Co-Authored-By: Claude Opus 4.8 --- scripts/b3_isolated_policy.sh | 14 ++ scripts/bench_report.py | 276 ++++++++++++++++++++++++++++++++++ 2 files changed, 290 insertions(+) create mode 100644 scripts/bench_report.py diff --git a/scripts/b3_isolated_policy.sh b/scripts/b3_isolated_policy.sh index 9634af0..9b6644e 100755 --- a/scripts/b3_isolated_policy.sh +++ b/scripts/b3_isolated_policy.sh @@ -50,6 +50,7 @@ mkdir -p "$RUNDIR/engine_state" "$RUNDIR/logs" echo "[isolated] policy=$POLICY trace=$(basename $TRACE) rundir=$RUNDIR" cleanup() { + pkill -f gpu_monitor.sh 2>/dev/null || true pkill -9 -f cache_aware_proxy 2>/dev/null || true pkill -9 -f "vllm serve" 2>/dev/null || true pkill -9 -f "EngineCore" 2>/dev/null || true @@ -183,6 +184,18 @@ until curl -sf "http://127.0.0.1:$PROXY_PORT/stats" >/dev/null 2>&1; do sleep 2 done +# Unified bench infra: record worker->GPU mapping + sample per-GPU util during +# the replay so bench_report.py can emit per-worker GPU util / TPS for every run. +python3 - "$RUNDIR" "$BASE_PORT" "$PROXY_PORT" "$GPU_INDICES" "$N_INSTANCES" <<'PY' +import json, sys +rundir, base_port, proxy_port, gpu_indices, n = sys.argv[1:] +json.dump({"base_port": int(base_port), "proxy_port": int(proxy_port), + "gpu_indices": [int(x) for x in gpu_indices.split()], + "n_instances": int(n)}, open(f"{rundir}/bench_config.json", "w"), indent=2) +PY +bash "$ROOT/scripts/gpu_monitor.sh" "$RUNDIR/gpu_util.csv" 5 >/dev/null 2>&1 & +GPU_MON_PID=$! + t_start=$(date +%s.%N) echo "[isolated] running replayer ..." PYTHONPATH="$ROOT" "$VENV/python" -m replayer \ @@ -192,6 +205,7 @@ PYTHONPATH="$ROOT" "$VENV/python" -m replayer \ --model "$MODEL" \ 2>&1 | tee "$RUNDIR/replayer.log" | tail -3 t_end=$(date +%s.%N) +kill "${GPU_MON_PID:-}" 2>/dev/null || true python3 - "$RUNDIR" "$POLICY" "$TRACE" "$t_start" "$t_end" <<'PY' import json, sys diff --git a/scripts/bench_report.py b/scripts/bench_report.py new file mode 100644 index 0000000..b3bf2bb --- /dev/null +++ b/scripts/bench_report.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +"""Unified benchmark report — the canonical analyzer for all EAR runs. + +Per run dir (produced by b3_isolated_policy.sh) it reads: + metrics.jsonl per-request latency (ttft/tpot/e2e), output tokens, timing + breakdown.json per-request routing (chosen_idx, input_length, cache_hit, decision) + run_window.json replay window {t_start_unix, t_end_unix} + bench_config.json (optional) {base_port, gpu_indices, n_instances} worker->gpu map + gpu_util.csv (optional) timestamp,gpu,util_pct,mem_used_mb,mem_total_mb,power_w + +and emits, for every run and every input-class: + TTFT / TPOT / E2E mean + p50 + p90 + p99 + TPS decode(output) tok/s, prefill(new) tok/s (aggregate + per-worker) + APC proxy-side prefix-cache hit ratio + per-worker n, out-TPS, prefill-TPS, TTFT p90, GPU util mean/max, mem + spread n / TTFT-p90 / GPU-util max:min ratios + decisions affinity vs lmetric_fallback counts (unified family) + +Usage: + bench_report.py --root # single run + bench_report.py --root arm1 arm2 ... # multi-arm compare + (no arms given under a root => auto-discover subdirs with metrics.jsonl) +""" +import argparse +import csv +import json +import statistics +from pathlib import Path + +CLASSES = [("WARM<5k", 0, 5000), ("MED5-20k", 5000, 20000), + ("HEAVY20-50k", 20000, 50000), ("HEAVY+>50k", 50000, 10**12)] + + +def pct(xs, p): + if not xs: + return None + xs = sorted(xs) + k = max(0, min(len(xs) - 1, int(round(p / 100.0 * (len(xs) - 1))))) + return xs[k] + + +def stat(xs): + xs = [x for x in xs if x is not None] + if not xs: + return {"n": 0, "mean": None, "p50": None, "p90": None, "p99": None} + return {"n": len(xs), "mean": statistics.fmean(xs), + "p50": pct(xs, 50), "p90": pct(xs, 90), "p99": pct(xs, 99)} + + +def rid(r): + for k in ("request_id", "proxy_request_id", "req_id", "id"): + if r.get(k): + return r[k] + return None + + +def load_jsonl(p): + return [json.loads(l) for l in open(p) if l.strip()] + + +def load_breakdown(p): + if not p.exists(): + return {} + raw = json.load(open(p)) + recs = raw if isinstance(raw, list) else raw.get("breakdown", raw.get("requests", [])) + if isinstance(recs, dict): + recs = list(recs.values()) + return {rid(r): r for r in recs if rid(r)} + + +def gpu_util_in_window(csv_path, t0, t1, worker_to_gpu): + """Return {worker_idx: {util_mean, util_max, mem_max_mb}} over [t0,t1].""" + if not csv_path.exists(): + return {} + by_gpu = {} + with open(csv_path) as f: + for row in csv.DictReader(f): + try: + ts = float(row["timestamp"]) + g = int(row["gpu"]) + u = float(row["util_pct"]) + m = float(row["mem_used_mb"]) + except (ValueError, KeyError, TypeError): + continue + if t0 is not None and not (t0 <= ts <= t1): + continue + by_gpu.setdefault(g, {"util": [], "mem": []}) + by_gpu[g]["util"].append(u) + by_gpu[g]["mem"].append(m) + out = {} + gpu_to_worker = {gpu: w for w, gpu in worker_to_gpu.items()} + for g, d in by_gpu.items(): + w = gpu_to_worker.get(g, g) + out[w] = {"util_mean": statistics.fmean(d["util"]) if d["util"] else None, + "util_max": max(d["util"]) if d["util"] else None, + "mem_max_mb": max(d["mem"]) if d["mem"] else None, + "samples": len(d["util"])} + return out + + +def analyze(run_dir): + m_path = run_dir / "metrics.jsonl" + if not m_path.exists(): + return None + metrics = load_jsonl(m_path) + bd = load_breakdown(run_dir / "breakdown.json") + + win = {} + wp = run_dir / "run_window.json" + if wp.exists(): + win = json.load(open(wp)) + cfg = {} + cp = run_dir / "bench_config.json" + if cp.exists(): + cfg = json.load(open(cp)) + gpu_indices = cfg.get("gpu_indices") # list aligned to worker idx + worker_to_gpu = ({i: g for i, g in enumerate(gpu_indices)} if gpu_indices else {}) + + ok = [r for r in metrics if not r.get("error")] + t0 = win.get("t_start_unix") + t1 = win.get("t_end_unix") + if t0 is None: + ds = [r.get("t_dispatch_unix") for r in ok if r.get("t_dispatch_unix")] + fs = [r.get("t_finish_unix") for r in ok if r.get("t_finish_unix")] + t0 = min(ds) if ds else None + t1 = max(fs) if fs else None + window_s = (t1 - t0) if (t0 and t1) else None + + def out_tok(r): + return r.get("actual_output_tokens") or r.get("output_length") or 0 + + def new_prefill_tok(r): + b = bd.get(rid(r), {}) + ih = b.get("input_length", r.get("input_length", 0)) or 0 + ch = b.get("cache_hit", r.get("cached_tokens", 0)) or 0 + return max(0, ih - ch) + + res = { + "n_total": len(metrics), "n_ok": len(ok), + "window_s": window_s, + "ttft_ms": stat([r["ttft_s"] * 1000 for r in ok if r.get("ttft_s") is not None]), + "tpot_ms": stat([r["tpot_s"] * 1000 for r in ok if r.get("tpot_s")]), + "e2e_ms": stat([r["latency_s"] * 1000 for r in ok if r.get("latency_s") is not None]), + } + + tot_out = sum(out_tok(r) for r in ok) + tot_new = sum(new_prefill_tok(r) for r in ok) + tot_in = sum((bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0) for r in ok) + tot_cached = sum(min(bd.get(rid(r), {}).get("cache_hit", r.get("cached_tokens", 0)) or 0, + bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0) + for r in ok) + res["throughput"] = { + "decode_tps": tot_out / window_s if window_s else None, + "prefill_tps": tot_new / window_s if window_s else None, + "total_tps": (tot_out + tot_new) / window_s if window_s else None, + "total_output_tokens": tot_out, "total_new_prefill_tokens": tot_new, + } + res["apc"] = tot_cached / tot_in if tot_in else None + + # per-worker (route from breakdown chosen_idx) + gpu = gpu_util_in_window(run_dir / "gpu_util.csv", t0, t1, worker_to_gpu) + by_w = {} + decisions = {} + for r in ok: + b = bd.get(rid(r)) + if not b: + continue + w = b.get("chosen_idx", b.get("routed_to")) + by_w.setdefault(w, []).append(r) + d = b.get("decision") + if d: + decisions[d] = decisions.get(d, 0) + 1 + per_worker = {} + for w, rs in sorted(by_w.items(), key=lambda x: str(x[0])): + o = sum(out_tok(r) for r in rs) + npf = sum(new_prefill_tok(r) for r in rs) + gw = gpu.get(w, {}) + per_worker[str(w)] = { + "n": len(rs), + "decode_tps": o / window_s if window_s else None, + "prefill_tps": npf / window_s if window_s else None, + "ttft_p90_ms": pct([r["ttft_s"] * 1000 for r in rs if r.get("ttft_s") is not None], 90), + "gpu_util_mean": gw.get("util_mean"), + "gpu_util_max": gw.get("util_max"), + "gpu_mem_max_mb": gw.get("mem_max_mb"), + } + res["per_worker"] = per_worker + res["decisions"] = decisions + res["gpu_captured"] = bool(gpu) + + # spread + ns = [v["n"] for v in per_worker.values()] + p90s = [v["ttft_p90_ms"] for v in per_worker.values() if v["ttft_p90_ms"]] + uts = [v["gpu_util_mean"] for v in per_worker.values() if v["gpu_util_mean"] is not None] + res["spread"] = { + "n_ratio": (max(ns) / max(min(ns), 1)) if ns else None, + "ttft_p90_ratio": (max(p90s) / max(min(p90s), 1e-9)) if p90s else None, + "gpu_util_ratio": (max(uts) / max(min(uts), 1e-9)) if uts else None, + "gpu_util_min": min(uts) if uts else None, "gpu_util_max": max(uts) if uts else None, + } + + # per-class TTFT + per_class = {} + for name, lo, hi in CLASSES: + rs = [r for r in ok if lo <= (bd.get(rid(r), {}).get("input_length", + r.get("input_length", 0)) or 0) < hi] + per_class[name] = {"n": len(rs), + "ttft_ms": stat([r["ttft_s"] * 1000 for r in rs if r.get("ttft_s") is not None])} + res["per_class"] = per_class + return res + + +def f(v, d=0): + return f"{v:.{d}f}" if isinstance(v, (int, float)) else "-" + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--root", type=Path, required=True) + ap.add_argument("arms", nargs="*") + ap.add_argument("--json", type=Path, default=None) + a = ap.parse_args() + + if (a.root / "metrics.jsonl").exists() and not a.arms: + runs = {a.root.name: analyze(a.root)} + else: + arms = a.arms or [p.name for p in sorted(a.root.iterdir()) + if p.is_dir() and (p / "metrics.jsonl").exists()] + runs = {arm: analyze(a.root / arm) for arm in arms} + runs = {k: v for k, v in runs.items() if v} + + for label, key in [("TTFT (ms)", "ttft_ms"), ("TPOT (ms)", "tpot_ms"), ("E2E (ms)", "e2e_ms")]: + print(f"\n================ {label} ================") + h = f"{'arm':<14}{'n_ok':>6}{'mean':>10}{'p50':>10}{'p90':>10}{'p99':>10}" + print(h); print("-" * len(h)) + for arm, r in runs.items(): + s = r[key] + print(f"{arm:<14}{r['n_ok']:>6}{f(s['mean']):>10}{f(s['p50']):>10}{f(s['p90']):>10}{f(s['p99']):>10}") + + print("\n================ THROUGHPUT / APC / GPU / SPREAD ================") + h = (f"{'arm':<14}{'win_s':>7}{'decTPS':>9}{'pfTPS':>9}{'APC':>7}" + f"{'GPUutil%':>9}{'n_ratio':>8}{'ttftR':>7}{'utilR':>7}") + print(h); print("-" * len(h)) + for arm, r in runs.items(): + tp = r["throughput"]; sp = r["spread"] + gu = (f"{f(sp['gpu_util_min'])}-{f(sp['gpu_util_max'])}" if sp.get("gpu_util_max") else "n/a") + print(f"{arm:<14}{f(r['window_s']):>7}{f(tp['decode_tps']):>9}{f(tp['prefill_tps']):>9}" + f"{f(r['apc'],3):>7}{gu:>9}{f(sp['n_ratio'],2):>8}{f(sp['ttft_p90_ratio'],2):>7}" + f"{f(sp['gpu_util_ratio'],2) if sp.get('gpu_util_ratio') else '-':>7}") + for arm, r in runs.items(): + if r["decisions"]: + print(f" {arm} decisions: {r['decisions']}") + + print("\n================ PER-WORKER ================") + for arm, r in runs.items(): + gflag = "" if r["gpu_captured"] else " (gpu_util.csv absent — N/A)" + print(f"-- {arm}{gflag} --") + print(f" {'w':<4}{'n':>5}{'decTPS':>9}{'pfTPS':>9}{'TTFTp90':>9}{'util%mean':>10}{'util%max':>9}{'memMB':>9}") + for w, v in r["per_worker"].items(): + print(f" {w:<4}{v['n']:>5}{f(v['decode_tps']):>9}{f(v['prefill_tps']):>9}" + f"{f(v['ttft_p90_ms']):>9}{f(v['gpu_util_mean'],1):>10}{f(v['gpu_util_max'],1):>9}{f(v['gpu_mem_max_mb']):>9}") + + print("\n================ PER-CLASS TTFT (ms): mean / p50 / p90 / p99 ================") + for arm, r in runs.items(): + print(f"-- {arm} --") + for name, v in r["per_class"].items(): + s = v["ttft_ms"] + print(f" {name:<12} n={v['n']:>4} mean={f(s['mean']):>7} p50={f(s['p50']):>7} p90={f(s['p90']):>8} p99={f(s['p99']):>9}") + + out = a.json or (a.root / "bench_summary.json") + out.write_text(json.dumps(runs, indent=2)) + print(f"\nWrote {out}") + + +if __name__ == "__main__": + main()