Files
agentic-kvc/microbench/fresh_setup/analyze_goodput.py
Gahow Wang 1262c9c22e Migration transfer-cost study: KV transfer is slow on busy GPUs
MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at
~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into
~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s)
+ ~45% control-plane GIL starvation during long prefills. Reproduced on a
fresh upstream venv (byte-identical transfer path) -> upstream/hardware
inherent, not our patch. Layerwise is the wrong lever; the tax is structural
on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6,
instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-29 11:53:01 +08:00

199 lines
7.4 KiB
Python

"""SLO-goodput analyzer + PD_advantage for the PD-disagg crossover study.
Reads per-arm replayer output (replay_metrics.jsonl) and computes, per arm:
- completion rate (error-free fraction)
- raw TTFT / TPOT / E2E percentiles (over successes — reported for context
only; NEVER the verdict metric, since failing arms have a small success set)
- SLO-goodput: fraction of OFFERED requests that are error-free AND meet a
(TTFT, TPOT) SLO. This is the verdict metric.
The two arms must replay the IDENTICAL trace (same seed), so they are paired
request-for-request. PD_advantage = goodput(arm) / goodput(baseline); y=1 is
the crossover line — PD_advantage >= 1 means PD-disagg wins.
Goodput is computed over a grid of SLO thresholds so the conclusion does not
hinge on one arbitrary cutoff.
Usage:
python analyze_goodput.py \
--arm 8C-proxy .../8C-proxy/replay_metrics.jsonl \
--arm 4P+4D .../4P+4D/replay_metrics.jsonl \
--baseline 8C-proxy \
--ttft-slo 0.5 1 2 5 --tpot-slo 0.05 0.1 0.2
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
def load_metrics(path: Path) -> list[dict]:
rows = []
with path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def percentile(sorted_vals: list[float], pct: float) -> float:
n = len(sorted_vals)
if n == 0:
return float("nan")
if n == 1:
return sorted_vals[0]
rank = pct * (n - 1)
lo = int(rank)
hi = min(lo + 1, n - 1)
frac = rank - lo
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
def pstats(vals: list[float]) -> dict:
clean = sorted(v for v in vals if v is not None)
if not clean:
return {"count": 0}
return {
"count": len(clean),
"mean": statistics.fmean(clean),
"p50": percentile(clean, 0.50),
"p90": percentile(clean, 0.90),
"p99": percentile(clean, 0.99),
}
def offered_window_s(rows: list[dict]) -> float:
ts = [r.get("trace_timestamp_s") for r in rows if r.get("trace_timestamp_s") is not None]
if len(ts) < 2:
return 0.0
return max(ts) - min(ts)
def meets_slo(r: dict, ttft_slo: float, tpot_slo: float) -> bool:
if r.get("error") is not None:
return False
ttft = r.get("ttft_s")
tpot = r.get("tpot_s")
if ttft is None:
return False
if ttft > ttft_slo:
return False
# tpot=0 happens only for single-token outputs; treat as meeting any SLO.
if tpot is not None and tpot > tpot_slo:
return False
return True
def load_summary(jsonl_path: Path) -> dict:
"""Read the sibling replay_metrics.summary.json (wall-clock, amplification)."""
sp = jsonl_path.with_suffix(".summary.json")
if sp.exists():
try:
return json.loads(sp.read_text())
except Exception:
return {}
return {}
def summarize_arm(name: str, jsonl_path: Path, rows: list[dict]) -> dict:
n = len(rows)
ok = [r for r in rows if r.get("error") is None]
window = offered_window_s(rows)
summ = load_summary(jsonl_path)
return {
"name": name,
"n_offered": n,
"n_success": len(ok),
"completion_rate": len(ok) / n if n else 0.0,
"offered_window_s": window,
"offered_qps": n / window if window > 0 else 0.0,
# Throughput: how much longer than the offered window it took to drain.
# ~1.0 = keeps up; >1 = falling behind (the cleanest PD-collapse signal).
"wall_clock_s": summ.get("wall_clock_s"),
"amplification": summ.get("amplification"),
"ttft": pstats([r.get("ttft_s") for r in ok]),
"tpot": pstats([r.get("tpot_s") for r in ok]),
"e2e": pstats([r.get("latency_s") for r in ok]),
"_rows": rows,
}
def main() -> None:
p = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--arm", nargs=2, action="append", metavar=("NAME", "PATH"),
required=True, help="arm name + replay_metrics.jsonl path (repeatable)")
p.add_argument("--baseline", required=True, help="arm name to use as PD_advantage denominator")
p.add_argument("--ttft-slo", nargs="+", type=float, default=[0.5, 1.0, 2.0, 5.0])
p.add_argument("--tpot-slo", nargs="+", type=float, default=[0.05, 0.1, 0.2])
p.add_argument("--out-json", type=Path, default=None)
args = p.parse_args()
arms = {}
for name, path in args.arm:
arms[name] = summarize_arm(name, Path(path), load_metrics(Path(path)))
if args.baseline not in arms:
raise SystemExit(f"baseline {args.baseline!r} not among arms {list(arms)}")
# ---- per-arm overview ------------------------------------------------
print("=" * 78)
print("PER-ARM OVERVIEW (latency stats over successes only — context, not verdict)")
print("=" * 78)
hdr = f"{'arm':<12}{'offered':>8}{'compl%':>8}{'ampl':>6}{'oQPS':>7}" \
f"{'TTFTp50':>9}{'TTFTp90':>9}{'TPOTp50':>9}{'TPOTp99':>9}{'E2Ep90':>9}"
print(hdr)
for name, a in arms.items():
t, tp, e = a["ttft"], a["tpot"], a["e2e"]
ampl = a.get("amplification")
ampl_s = f"{ampl:>6.2f}" if isinstance(ampl, (int, float)) else f"{'--':>6}"
print(f"{name:<12}{a['n_offered']:>8}{100*a['completion_rate']:>7.1f}%"
f"{ampl_s}{a['offered_qps']:>7.2f}"
f"{t.get('p50', float('nan')):>9.2f}{t.get('p90', float('nan')):>9.2f}"
f"{1000*tp.get('p50', float('nan')):>8.0f}m{1000*tp.get('p99', float('nan')):>8.0f}m"
f"{e.get('p90', float('nan')):>9.2f}")
# ---- SLO-goodput grid + PD_advantage --------------------------------
base = arms[args.baseline]
grid = []
print()
print("=" * 78)
print(f"SLO-GOODPUT (attainment = error-free AND TTFT<=slo AND TPOT<=slo)")
print(f"PD_advantage = attainment(arm) / attainment(baseline={args.baseline}); "
f">=1 means arm wins")
print("=" * 78)
for ttft_slo in args.ttft_slo:
for tpot_slo in args.tpot_slo:
row = {"ttft_slo_s": ttft_slo, "tpot_slo_s": tpot_slo, "arms": {}}
base_n = sum(1 for r in base["_rows"] if meets_slo(r, ttft_slo, tpot_slo))
base_att = base_n / base["n_offered"] if base["n_offered"] else 0.0
line = f"TTFT<={ttft_slo:>4}s TPOT<={int(1000*tpot_slo):>4}ms | "
cells = []
for name, a in arms.items():
n_slo = sum(1 for r in a["_rows"] if meets_slo(r, ttft_slo, tpot_slo))
att = n_slo / a["n_offered"] if a["n_offered"] else 0.0
adv = (att / base_att) if base_att > 0 else float("nan")
row["arms"][name] = {"attainment": att, "pd_advantage": adv, "n_slo": n_slo}
tag = "" if name == args.baseline else f" adv={adv:.2f}"
cells.append(f"{name}={100*att:>5.1f}%{tag}")
print(line + " ".join(cells))
grid.append(row)
if args.out_json:
out = {
"baseline": args.baseline,
"arms": {n: {k: v for k, v in a.items() if k != "_rows"}
for n, a in arms.items()},
"slo_grid": grid,
}
args.out_json.write_text(json.dumps(out, indent=2))
print(f"\nwrote {args.out_json}")
if __name__ == "__main__":
main()