Files
agentic-kvc/scripts/render_b3_report.py
Gahow Wang 123a74a4b9 B3 report renderer: incremental markdown table from comparison JSON
Reads b3_policy_comparison.json (produced by b3_analyze.sh) and emits
a markdown report with three tables: headline latency + APC,
mechanism indices (interference / hotspot / reuse), and slow-request
cause breakdown. Rows for policies not yet present in the sweep are
left as "pending" so the same renderer can be re-invoked as each
policy finishes, producing an evolving report rather than waiting
for the full sweep.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 18:58:21 +08:00

152 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Render a B3 comparison markdown from b3_policy_comparison.json.
Drops rows for policies not yet present in the sweep. Designed to be
re-run whenever a new policy finishes; the markdown lives next to
the sweep directory so future re-runs just overwrite it.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
POLICY_ORDER = ["lmetric", "load_only", "sticky", "unified", "capped"]
POLICY_DESCR = {
"lmetric": "cache-aware P_tokens × BS (main baseline)",
"load_only": "control: min(num_requests), no cache, no affinity",
"sticky": "control: hard session affinity (never break)",
"unified": "hybrid affinity + LMetric fallback",
"capped": "lmetric on per-session turn-capped trace",
}
def _fmt(v, kind="f2"):
if v is None:
return ""
if kind == "f2":
return f"{v:.2f}"
if kind == "f3":
return f"{v:.3f}"
if kind == "pct":
return f"{v * 100:.1f}%"
if kind == "ms":
return f"{v * 1000:.1f}"
return str(v)
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--sweep-dir", type=Path, required=True)
p.add_argument("--output", type=Path, default=None,
help="Markdown output; default <sweep-dir>/b3_report.md")
args = p.parse_args()
comp_path = args.sweep_dir / "b3_policy_comparison.json"
if not comp_path.exists():
raise SystemExit(f"missing {comp_path}; run b3_analyze.sh first")
data = json.loads(comp_path.read_text())
by_pol = {r["policy"]: r for r in data["rows"]}
lines: list[str] = []
lines.append(f"# B3 Routing Sweep Report")
lines.append("")
lines.append(f"Sweep dir: `{args.sweep_dir.name}`")
lines.append(f"Trace: w600_r0.0015_st30.jsonl (~1.2k reqs, 8 × TP1)")
lines.append(f"Policies present: {', '.join(p for p in POLICY_ORDER if p in by_pol)}")
lines.append(f"Policies pending: {', '.join(p for p in POLICY_ORDER if p not in by_pol) or ''}")
lines.append("")
lines.append("## Headline latencies + APC")
lines.append("")
lines.append("| policy | ok/total | TTFT p50/p90/p99 (s) | TPOT p50/p90/p99 (ms) | E2E p50/p90/p99 (s) | APC |")
lines.append("|---|---:|---|---|---|---:|")
for pol in POLICY_ORDER:
r = by_pol.get(pol)
if not r:
lines.append(f"| **{pol}** | _pending_ | _pending_ | _pending_ | _pending_ | _pending_ |")
continue
lines.append(
f"| **{pol}** | {r['n_ok']}/{r['n_total']} | "
f"{_fmt(r['ttft_p50_s'])}/{_fmt(r['ttft_p90_s'])}/{_fmt(r['ttft_p99_s'])} | "
f"{_fmt(r['tpot_p50_s'], 'ms')}/{_fmt(r['tpot_p90_s'], 'ms')}/{_fmt(r['tpot_p99_s'], 'ms')} | "
f"{_fmt(r['e2e_p50_s'])}/{_fmt(r['e2e_p90_s'])}/{_fmt(r['e2e_p99_s'])} | "
f"{_fmt(r['apc_ratio'], 'pct')} |"
)
lines.append("")
lines.append("## Mechanism indices")
lines.append("")
lines.append("| policy | interference_index | hotspot_index (TTFT p90) | intra-session reuse | cross-session reuse | n_slow |")
lines.append("|---|---:|---:|---:|---:|---:|")
for pol in POLICY_ORDER:
r = by_pol.get(pol)
if not r:
lines.append(f"| **{pol}** | _pending_ | _pending_ | _pending_ | _pending_ | _pending_ |")
continue
lines.append(
f"| **{pol}** | {_fmt(r['interference_index'])} | "
f"{_fmt(r['hotspot_index_ttft_p90'])} | "
f"{_fmt(r['reuse_intra_frac'], 'pct')} | "
f"{_fmt(r['reuse_cross_frac'], 'pct')} | "
f"{r['n_slow']} |"
)
lines.append("")
lines.append("- **interference_index** = TPOT_p90(decode overlapping same-worker prefill) / TPOT_p90(clean)")
lines.append("- **hotspot_index** = max(worker TTFT_p90) / median(worker TTFT_p90)")
lines.append("")
lines.append("## Slow-request cause breakdown")
lines.append("")
lines.append("| policy | n_slow | same-worker overlap | hot worker queue | cache miss large append | high KV | unknown |")
lines.append("|---|---:|---:|---:|---:|---:|---:|")
for pol in POLICY_ORDER:
r = by_pol.get(pol)
if not r:
lines.append(f"| **{pol}** | _pending_ | _pending_ | _pending_ | _pending_ | _pending_ | _pending_ |")
continue
fc = r.get("failure_counts") or {}
def cnt(k): return fc.get(k, 0)
lines.append(
f"| **{pol}** | {r['n_slow']} | "
f"{cnt('same_worker_prefill_overlap')} | {cnt('hot_worker_queue')} | "
f"{cnt('cache_miss_large_append')} | {cnt('high_kv_occupancy')} | "
f"{cnt('unknown')} |"
)
lines.append("")
lines.append("## Policy notes")
lines.append("")
for pol in POLICY_ORDER:
lines.append(f"- **{pol}** — {POLICY_DESCR[pol]}")
lines.append("")
lines.append("## Per-policy per-worker TTFT p90 (s)")
lines.append("")
for pol in POLICY_ORDER:
r = by_pol.get(pol)
if not r:
lines.append(f"### {pol} _(pending)_")
lines.append("")
continue
path = args.sweep_dir / pol / "joined" / "hotspot_index.json"
if not path.exists():
continue
hot = json.loads(path.read_text())
per = hot.get("per_worker_ttft_p90_s") or {}
lines.append(f"### {pol}")
lines.append("")
lines.append("| worker | TTFT p90 (s) |")
lines.append("|---|---:|")
for w, v in sorted(per.items()):
lines.append(f"| {w} | {v:.2f} |")
lines.append("")
out = args.output or args.sweep_dir / "b3_report.md"
out.write_text("\n".join(lines))
print(f"wrote {out}")
if __name__ == "__main__":
main()