MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at ~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into ~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s) + ~45% control-plane GIL starvation during long prefills. Reproduced on a fresh upstream venv (byte-identical transfer path) -> upstream/hardware inherent, not our patch. Layerwise is the wrong lever; the tax is structural on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6, instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
199 lines
7.4 KiB
Python
199 lines
7.4 KiB
Python
"""SLO-goodput analyzer + PD_advantage for the PD-disagg crossover study.
|
|
|
|
Reads per-arm replayer output (replay_metrics.jsonl) and computes, per arm:
|
|
- completion rate (error-free fraction)
|
|
- raw TTFT / TPOT / E2E percentiles (over successes — reported for context
|
|
only; NEVER the verdict metric, since failing arms have a small success set)
|
|
- SLO-goodput: fraction of OFFERED requests that are error-free AND meet a
|
|
(TTFT, TPOT) SLO. This is the verdict metric.
|
|
|
|
The two arms must replay the IDENTICAL trace (same seed), so they are paired
|
|
request-for-request. PD_advantage = goodput(arm) / goodput(baseline); y=1 is
|
|
the crossover line — PD_advantage >= 1 means PD-disagg wins.
|
|
|
|
Goodput is computed over a grid of SLO thresholds so the conclusion does not
|
|
hinge on one arbitrary cutoff.
|
|
|
|
Usage:
|
|
python analyze_goodput.py \
|
|
--arm 8C-proxy .../8C-proxy/replay_metrics.jsonl \
|
|
--arm 4P+4D .../4P+4D/replay_metrics.jsonl \
|
|
--baseline 8C-proxy \
|
|
--ttft-slo 0.5 1 2 5 --tpot-slo 0.05 0.1 0.2
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import statistics
|
|
from pathlib import Path
|
|
|
|
|
|
def load_metrics(path: Path) -> list[dict]:
|
|
rows = []
|
|
with path.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if line:
|
|
rows.append(json.loads(line))
|
|
return rows
|
|
|
|
|
|
def percentile(sorted_vals: list[float], pct: float) -> float:
|
|
n = len(sorted_vals)
|
|
if n == 0:
|
|
return float("nan")
|
|
if n == 1:
|
|
return sorted_vals[0]
|
|
rank = pct * (n - 1)
|
|
lo = int(rank)
|
|
hi = min(lo + 1, n - 1)
|
|
frac = rank - lo
|
|
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
|
|
|
|
|
|
def pstats(vals: list[float]) -> dict:
|
|
clean = sorted(v for v in vals if v is not None)
|
|
if not clean:
|
|
return {"count": 0}
|
|
return {
|
|
"count": len(clean),
|
|
"mean": statistics.fmean(clean),
|
|
"p50": percentile(clean, 0.50),
|
|
"p90": percentile(clean, 0.90),
|
|
"p99": percentile(clean, 0.99),
|
|
}
|
|
|
|
|
|
def offered_window_s(rows: list[dict]) -> float:
|
|
ts = [r.get("trace_timestamp_s") for r in rows if r.get("trace_timestamp_s") is not None]
|
|
if len(ts) < 2:
|
|
return 0.0
|
|
return max(ts) - min(ts)
|
|
|
|
|
|
def meets_slo(r: dict, ttft_slo: float, tpot_slo: float) -> bool:
|
|
if r.get("error") is not None:
|
|
return False
|
|
ttft = r.get("ttft_s")
|
|
tpot = r.get("tpot_s")
|
|
if ttft is None:
|
|
return False
|
|
if ttft > ttft_slo:
|
|
return False
|
|
# tpot=0 happens only for single-token outputs; treat as meeting any SLO.
|
|
if tpot is not None and tpot > tpot_slo:
|
|
return False
|
|
return True
|
|
|
|
|
|
def load_summary(jsonl_path: Path) -> dict:
|
|
"""Read the sibling replay_metrics.summary.json (wall-clock, amplification)."""
|
|
sp = jsonl_path.with_suffix(".summary.json")
|
|
if sp.exists():
|
|
try:
|
|
return json.loads(sp.read_text())
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def summarize_arm(name: str, jsonl_path: Path, rows: list[dict]) -> dict:
|
|
n = len(rows)
|
|
ok = [r for r in rows if r.get("error") is None]
|
|
window = offered_window_s(rows)
|
|
summ = load_summary(jsonl_path)
|
|
return {
|
|
"name": name,
|
|
"n_offered": n,
|
|
"n_success": len(ok),
|
|
"completion_rate": len(ok) / n if n else 0.0,
|
|
"offered_window_s": window,
|
|
"offered_qps": n / window if window > 0 else 0.0,
|
|
# Throughput: how much longer than the offered window it took to drain.
|
|
# ~1.0 = keeps up; >1 = falling behind (the cleanest PD-collapse signal).
|
|
"wall_clock_s": summ.get("wall_clock_s"),
|
|
"amplification": summ.get("amplification"),
|
|
"ttft": pstats([r.get("ttft_s") for r in ok]),
|
|
"tpot": pstats([r.get("tpot_s") for r in ok]),
|
|
"e2e": pstats([r.get("latency_s") for r in ok]),
|
|
"_rows": rows,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser(description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p.add_argument("--arm", nargs=2, action="append", metavar=("NAME", "PATH"),
|
|
required=True, help="arm name + replay_metrics.jsonl path (repeatable)")
|
|
p.add_argument("--baseline", required=True, help="arm name to use as PD_advantage denominator")
|
|
p.add_argument("--ttft-slo", nargs="+", type=float, default=[0.5, 1.0, 2.0, 5.0])
|
|
p.add_argument("--tpot-slo", nargs="+", type=float, default=[0.05, 0.1, 0.2])
|
|
p.add_argument("--out-json", type=Path, default=None)
|
|
args = p.parse_args()
|
|
|
|
arms = {}
|
|
for name, path in args.arm:
|
|
arms[name] = summarize_arm(name, Path(path), load_metrics(Path(path)))
|
|
|
|
if args.baseline not in arms:
|
|
raise SystemExit(f"baseline {args.baseline!r} not among arms {list(arms)}")
|
|
|
|
# ---- per-arm overview ------------------------------------------------
|
|
print("=" * 78)
|
|
print("PER-ARM OVERVIEW (latency stats over successes only — context, not verdict)")
|
|
print("=" * 78)
|
|
hdr = f"{'arm':<12}{'offered':>8}{'compl%':>8}{'ampl':>6}{'oQPS':>7}" \
|
|
f"{'TTFTp50':>9}{'TTFTp90':>9}{'TPOTp50':>9}{'TPOTp99':>9}{'E2Ep90':>9}"
|
|
print(hdr)
|
|
for name, a in arms.items():
|
|
t, tp, e = a["ttft"], a["tpot"], a["e2e"]
|
|
ampl = a.get("amplification")
|
|
ampl_s = f"{ampl:>6.2f}" if isinstance(ampl, (int, float)) else f"{'--':>6}"
|
|
print(f"{name:<12}{a['n_offered']:>8}{100*a['completion_rate']:>7.1f}%"
|
|
f"{ampl_s}{a['offered_qps']:>7.2f}"
|
|
f"{t.get('p50', float('nan')):>9.2f}{t.get('p90', float('nan')):>9.2f}"
|
|
f"{1000*tp.get('p50', float('nan')):>8.0f}m{1000*tp.get('p99', float('nan')):>8.0f}m"
|
|
f"{e.get('p90', float('nan')):>9.2f}")
|
|
|
|
# ---- SLO-goodput grid + PD_advantage --------------------------------
|
|
base = arms[args.baseline]
|
|
grid = []
|
|
print()
|
|
print("=" * 78)
|
|
print(f"SLO-GOODPUT (attainment = error-free AND TTFT<=slo AND TPOT<=slo)")
|
|
print(f"PD_advantage = attainment(arm) / attainment(baseline={args.baseline}); "
|
|
f">=1 means arm wins")
|
|
print("=" * 78)
|
|
for ttft_slo in args.ttft_slo:
|
|
for tpot_slo in args.tpot_slo:
|
|
row = {"ttft_slo_s": ttft_slo, "tpot_slo_s": tpot_slo, "arms": {}}
|
|
base_n = sum(1 for r in base["_rows"] if meets_slo(r, ttft_slo, tpot_slo))
|
|
base_att = base_n / base["n_offered"] if base["n_offered"] else 0.0
|
|
line = f"TTFT<={ttft_slo:>4}s TPOT<={int(1000*tpot_slo):>4}ms | "
|
|
cells = []
|
|
for name, a in arms.items():
|
|
n_slo = sum(1 for r in a["_rows"] if meets_slo(r, ttft_slo, tpot_slo))
|
|
att = n_slo / a["n_offered"] if a["n_offered"] else 0.0
|
|
adv = (att / base_att) if base_att > 0 else float("nan")
|
|
row["arms"][name] = {"attainment": att, "pd_advantage": adv, "n_slo": n_slo}
|
|
tag = "" if name == args.baseline else f" adv={adv:.2f}"
|
|
cells.append(f"{name}={100*att:>5.1f}%{tag}")
|
|
print(line + " ".join(cells))
|
|
grid.append(row)
|
|
|
|
if args.out_json:
|
|
out = {
|
|
"baseline": args.baseline,
|
|
"arms": {n: {k: v for k, v in a.items() if k != "_rows"}
|
|
for n, a in arms.items()},
|
|
"slo_grid": grid,
|
|
}
|
|
args.out_json.write_text(json.dumps(out, indent=2))
|
|
print(f"\nwrote {args.out_json}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|