#!/usr/bin/env python3 """Unified benchmark report — the canonical analyzer for all EAR runs. Per run dir (produced by b3_isolated_policy.sh) it reads: metrics.jsonl per-request latency (ttft/tpot/e2e), output tokens, timing breakdown.json per-request routing (chosen_idx, input_length, cache_hit, decision) run_window.json replay window {t_start_unix, t_end_unix} bench_config.json (optional) {base_port, gpu_indices, n_instances} worker->gpu map gpu_util.csv (optional) timestamp,gpu,util_pct,mem_used_mb,mem_total_mb,power_w and emits, for every run and every input-class: TTFT / TPOT / E2E mean + p50 + p90 + p99 TPS decode(output) tok/s, prefill(new) tok/s (aggregate + per-worker) APC proxy-side prefix-cache hit ratio per-worker n, out-TPS, prefill-TPS, TTFT p90, GPU util mean/max, mem spread n / TTFT-p90 / GPU-util max:min ratios decisions affinity vs lmetric_fallback counts (unified family) Usage: bench_report.py --root # single run bench_report.py --root arm1 arm2 ... # multi-arm compare (no arms given under a root => auto-discover subdirs with metrics.jsonl) """ import argparse import csv import json import statistics from pathlib import Path CLASSES = [("WARM<5k", 0, 5000), ("MED5-20k", 5000, 20000), ("HEAVY20-50k", 20000, 50000), ("HEAVY+>50k", 50000, 10**12)] def pct(xs, p): if not xs: return None xs = sorted(xs) k = max(0, min(len(xs) - 1, int(round(p / 100.0 * (len(xs) - 1))))) return xs[k] def stat(xs): xs = [x for x in xs if x is not None] if not xs: return {"n": 0, "mean": None, "p50": None, "p90": None, "p99": None} return {"n": len(xs), "mean": statistics.fmean(xs), "p50": pct(xs, 50), "p90": pct(xs, 90), "p99": pct(xs, 99)} def rid(r): for k in ("request_id", "proxy_request_id", "req_id", "id"): if r.get(k): return r[k] return None def load_jsonl(p): return [json.loads(l) for l in open(p) if l.strip()] def load_breakdown(p): if not p.exists(): return {} raw = json.load(open(p)) recs = raw if isinstance(raw, list) else raw.get("breakdown", raw.get("requests", [])) if isinstance(recs, dict): recs = list(recs.values()) return {rid(r): r for r in recs if rid(r)} def gpu_util_in_window(csv_path, t0, t1, worker_to_gpu): """Return {worker_idx: {util_mean, util_max, mem_max_mb}} over [t0,t1].""" if not csv_path.exists(): return {} by_gpu = {} with open(csv_path) as f: for row in csv.DictReader(f): try: ts = float(row["timestamp"]) g = int(row["gpu"]) u = float(row["util_pct"]) m = float(row["mem_used_mb"]) except (ValueError, KeyError, TypeError): continue if t0 is not None and not (t0 <= ts <= t1): continue by_gpu.setdefault(g, {"util": [], "mem": []}) by_gpu[g]["util"].append(u) by_gpu[g]["mem"].append(m) out = {} gpu_to_worker = {gpu: w for w, gpu in worker_to_gpu.items()} for g, d in by_gpu.items(): w = gpu_to_worker.get(g, g) out[w] = {"util_mean": statistics.fmean(d["util"]) if d["util"] else None, "util_max": max(d["util"]) if d["util"] else None, "mem_max_mb": max(d["mem"]) if d["mem"] else None, "samples": len(d["util"])} return out def analyze(run_dir): m_path = run_dir / "metrics.jsonl" if not m_path.exists(): return None metrics = load_jsonl(m_path) bd = load_breakdown(run_dir / "breakdown.json") win = {} wp = run_dir / "run_window.json" if wp.exists(): win = json.load(open(wp)) cfg = {} cp = run_dir / "bench_config.json" if cp.exists(): cfg = json.load(open(cp)) gpu_indices = cfg.get("gpu_indices") # list aligned to worker idx worker_to_gpu = ({i: g for i, g in enumerate(gpu_indices)} if gpu_indices else {}) ok = [r for r in metrics if not r.get("error")] t0 = win.get("t_start_unix") t1 = win.get("t_end_unix") if t0 is None: ds = [r.get("t_dispatch_unix") for r in ok if r.get("t_dispatch_unix")] fs = [r.get("t_finish_unix") for r in ok if r.get("t_finish_unix")] t0 = min(ds) if ds else None t1 = max(fs) if fs else None window_s = (t1 - t0) if (t0 and t1) else None def out_tok(r): return r.get("actual_output_tokens") or r.get("output_length") or 0 def new_prefill_tok(r): b = bd.get(rid(r), {}) ih = b.get("input_length", r.get("input_length", 0)) or 0 ch = b.get("cache_hit", r.get("cached_tokens", 0)) or 0 return max(0, ih - ch) res = { "n_total": len(metrics), "n_ok": len(ok), "window_s": window_s, "ttft_ms": stat([r["ttft_s"] * 1000 for r in ok if r.get("ttft_s") is not None]), "tpot_ms": stat([r["tpot_s"] * 1000 for r in ok if r.get("tpot_s")]), "e2e_ms": stat([r["latency_s"] * 1000 for r in ok if r.get("latency_s") is not None]), } tot_out = sum(out_tok(r) for r in ok) tot_new = sum(new_prefill_tok(r) for r in ok) tot_in = sum((bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0) for r in ok) tot_cached = sum(min(bd.get(rid(r), {}).get("cache_hit", r.get("cached_tokens", 0)) or 0, bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0) for r in ok) res["throughput"] = { "decode_tps": tot_out / window_s if window_s else None, "prefill_tps": tot_new / window_s if window_s else None, "total_tps": (tot_out + tot_new) / window_s if window_s else None, "total_output_tokens": tot_out, "total_new_prefill_tokens": tot_new, } res["apc"] = tot_cached / tot_in if tot_in else None # per-worker (route from breakdown chosen_idx) gpu = gpu_util_in_window(run_dir / "gpu_util.csv", t0, t1, worker_to_gpu) by_w = {} decisions = {} for r in ok: b = bd.get(rid(r)) if not b: continue w = b.get("chosen_idx", b.get("routed_to")) by_w.setdefault(w, []).append(r) d = b.get("decision") if d: decisions[d] = decisions.get(d, 0) + 1 per_worker = {} for w, rs in sorted(by_w.items(), key=lambda x: str(x[0])): o = sum(out_tok(r) for r in rs) npf = sum(new_prefill_tok(r) for r in rs) gw = gpu.get(w, {}) per_worker[str(w)] = { "n": len(rs), "decode_tps": o / window_s if window_s else None, "prefill_tps": npf / window_s if window_s else None, "ttft_p90_ms": pct([r["ttft_s"] * 1000 for r in rs if r.get("ttft_s") is not None], 90), "gpu_util_mean": gw.get("util_mean"), "gpu_util_max": gw.get("util_max"), "gpu_mem_max_mb": gw.get("mem_max_mb"), } res["per_worker"] = per_worker res["decisions"] = decisions res["gpu_captured"] = bool(gpu) # spread ns = [v["n"] for v in per_worker.values()] p90s = [v["ttft_p90_ms"] for v in per_worker.values() if v["ttft_p90_ms"]] uts = [v["gpu_util_mean"] for v in per_worker.values() if v["gpu_util_mean"] is not None] res["spread"] = { "n_ratio": (max(ns) / max(min(ns), 1)) if ns else None, "ttft_p90_ratio": (max(p90s) / max(min(p90s), 1e-9)) if p90s else None, "gpu_util_ratio": (max(uts) / max(min(uts), 1e-9)) if uts else None, "gpu_util_min": min(uts) if uts else None, "gpu_util_max": max(uts) if uts else None, } # per-class TTFT per_class = {} for name, lo, hi in CLASSES: rs = [r for r in ok if lo <= (bd.get(rid(r), {}).get("input_length", r.get("input_length", 0)) or 0) < hi] per_class[name] = {"n": len(rs), "ttft_ms": stat([r["ttft_s"] * 1000 for r in rs if r.get("ttft_s") is not None])} res["per_class"] = per_class return res def f(v, d=0): return f"{v:.{d}f}" if isinstance(v, (int, float)) else "-" def main(): ap = argparse.ArgumentParser() ap.add_argument("--root", type=Path, required=True) ap.add_argument("arms", nargs="*") ap.add_argument("--json", type=Path, default=None) a = ap.parse_args() if (a.root / "metrics.jsonl").exists() and not a.arms: runs = {a.root.name: analyze(a.root)} else: arms = a.arms or [p.name for p in sorted(a.root.iterdir()) if p.is_dir() and (p / "metrics.jsonl").exists()] runs = {arm: analyze(a.root / arm) for arm in arms} runs = {k: v for k, v in runs.items() if v} for label, key in [("TTFT (ms)", "ttft_ms"), ("TPOT (ms)", "tpot_ms"), ("E2E (ms)", "e2e_ms")]: print(f"\n================ {label} ================") h = f"{'arm':<14}{'n_ok':>6}{'mean':>10}{'p50':>10}{'p90':>10}{'p99':>10}" print(h); print("-" * len(h)) for arm, r in runs.items(): s = r[key] print(f"{arm:<14}{r['n_ok']:>6}{f(s['mean']):>10}{f(s['p50']):>10}{f(s['p90']):>10}{f(s['p99']):>10}") print("\n================ THROUGHPUT / APC / GPU / SPREAD ================") h = (f"{'arm':<14}{'win_s':>7}{'decTPS':>9}{'pfTPS':>9}{'APC':>7}" f"{'GPUutil%':>9}{'n_ratio':>8}{'ttftR':>7}{'utilR':>7}") print(h); print("-" * len(h)) for arm, r in runs.items(): tp = r["throughput"]; sp = r["spread"] gu = (f"{f(sp['gpu_util_min'])}-{f(sp['gpu_util_max'])}" if sp.get("gpu_util_max") else "n/a") print(f"{arm:<14}{f(r['window_s']):>7}{f(tp['decode_tps']):>9}{f(tp['prefill_tps']):>9}" f"{f(r['apc'],3):>7}{gu:>9}{f(sp['n_ratio'],2):>8}{f(sp['ttft_p90_ratio'],2):>7}" f"{f(sp['gpu_util_ratio'],2) if sp.get('gpu_util_ratio') else '-':>7}") for arm, r in runs.items(): if r["decisions"]: print(f" {arm} decisions: {r['decisions']}") print("\n================ PER-WORKER ================") for arm, r in runs.items(): gflag = "" if r["gpu_captured"] else " (gpu_util.csv absent — N/A)" print(f"-- {arm}{gflag} --") print(f" {'w':<4}{'n':>5}{'decTPS':>9}{'pfTPS':>9}{'TTFTp90':>9}{'util%mean':>10}{'util%max':>9}{'memMB':>9}") for w, v in r["per_worker"].items(): print(f" {w:<4}{v['n']:>5}{f(v['decode_tps']):>9}{f(v['prefill_tps']):>9}" f"{f(v['ttft_p90_ms']):>9}{f(v['gpu_util_mean'],1):>10}{f(v['gpu_util_max'],1):>9}{f(v['gpu_mem_max_mb']):>9}") print("\n================ PER-CLASS TTFT (ms): mean / p50 / p90 / p99 ================") for arm, r in runs.items(): print(f"-- {arm} --") for name, v in r["per_class"].items(): s = v["ttft_ms"] print(f" {name:<12} n={v['n']:>4} mean={f(s['mean']):>7} p50={f(s['p50']):>7} p90={f(s['p90']):>8} p99={f(s['p99']):>9}") out = a.json or (a.root / "bench_summary.json") out.write_text(json.dumps(runs, indent=2)) print(f"\nWrote {out}") if __name__ == "__main__": main()