Files
agentic-kvc/microbench/connector_tax/cache_sweep/analyze_trace_replay.py
Gahow Wang ef9e0102ec Connector tax: trace-replay confirms +45% kv_both penalty is gone; DR-fix adds 22% more
Re-runs the elastic_migration_v2 trace (w600 r0.0015 st30, 1214 reqs,
274 sessions, 8×TP1 vLLM + cache_aware_proxy) with three configs:
- plain unified
- unified + Mooncake kv_both
- unified + Mooncake kv_both + DR-fix (env-gated O(|cache|) hash sync removal)

TTFT p90: 11.97 s → 9.74 s (−18.6%) → 7.58 s (−36.6% vs plain)
E2E p90:  23.48 s → 21.25 s (−9.5%) → 17.93 s (−23.6% vs plain)

Two findings:
1. The "+45% kv_both penalty" claim from elastic_migration_v2 is OBSOLETE
   on current codebase — kv_both is now *faster* than plain at p90.
   Likely fixed by e3a1d70 (RDMA-READ → bootstrap PUSH refactor) and
   the connector-mode delay_free_blocks extending cross-turn prefix
   cache hits on a 93%-intra-session-reuse trace.
2. DR-fix removes another 22% from TTFT p90 by skipping the
   O(|cache|) hash sync in build_connector_meta. Cache-sweep with
   DR-fix shows slope drops from +94.5 to +2.3 μs/1k blocks.

Adds:
- run_trace_replay_drfix.sh: A/B/C harness (env CT_DR_FIX gates patch)
- analyze_trace_replay.py: TTFT/TPOT/E2E delta analysis
- REPORT_TRACE_REPLAY.md: summary + reproduction
- results/20260526_1627_drfix/: cache-sweep with DR-fix
- results/trace_replay_20260526_1652/: full trace-replay A/B/C

Implication for EAR paper: the kv_both substrate is no longer the
bottleneck blocking session migration. The prior 4 migration reverts
were dominated by transfer overhead that has now been characterized
and (partially) removed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 09:13:50 +08:00

116 lines
4.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""Compute TTFT/TPOT/E2E mean/p50/p90/p99 from trace-replay metrics.jsonl.
Usage:
python analyze_trace_replay.py --root <outroot>
where <outroot>/{unified,unified_kv_both,unified_kv_both_drfix}/metrics.jsonl
each contain one JSONL object per request (with ttft_s, tpot_s, latency_s).
"""
import argparse
import json
import statistics
from pathlib import Path
def pct(xs, p):
if not xs:
return None
xs = sorted(xs)
k = max(0, min(len(xs) - 1, int(p / 100.0 * (len(xs) - 1))))
return xs[k]
def summarise(rows):
ok = [r for r in rows if not r.get("error")]
ttft = [r["ttft_s"] * 1000 for r in ok if r.get("ttft_s") is not None]
tpot = [r["tpot_s"] * 1000 for r in ok if r.get("tpot_s")]
e2e = [r["latency_s"] * 1000 for r in ok if r.get("latency_s") is not None]
return {
"n_total": len(rows),
"n_ok": len(ok),
"n_err": len(rows) - len(ok),
"ttft_mean_ms": statistics.mean(ttft) if ttft else None,
"ttft_p50_ms": pct(ttft, 50),
"ttft_p90_ms": pct(ttft, 90),
"ttft_p99_ms": pct(ttft, 99),
"tpot_mean_ms": statistics.mean(tpot) if tpot else None,
"tpot_p50_ms": pct(tpot, 50),
"tpot_p90_ms": pct(tpot, 90),
"tpot_p99_ms": pct(tpot, 99),
"e2e_mean_ms": statistics.mean(e2e) if e2e else None,
"e2e_p50_ms": pct(e2e, 50),
"e2e_p90_ms": pct(e2e, 90),
"e2e_p99_ms": pct(e2e, 99),
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--root", type=Path, required=True)
ap.add_argument("--configs", nargs="+",
default=["unified", "unified_kv_both", "unified_kv_both_drfix"])
args = ap.parse_args()
results = {}
for cfg in args.configs:
p = args.root / cfg / "metrics.jsonl"
if not p.exists():
print(f"MISSING: {p}")
continue
rows = [json.loads(l) for l in open(p) if l.strip()]
results[cfg] = summarise(rows)
def fmt(v):
return f"{v:.0f}" if v is not None else "-"
def pctd(a, b):
if a is None or b is None or a == 0:
return "-"
return f"{(b/a-1)*100:+.1f}%"
print(f"{'metric':<14}", end="")
for cfg in args.configs:
print(f"{cfg[:20]:>22}", end="")
print()
print("-" * (14 + 22 * len(args.configs)))
for m in ["n_ok", "ttft_mean_ms", "ttft_p50_ms", "ttft_p90_ms", "ttft_p99_ms",
"tpot_mean_ms", "tpot_p50_ms", "tpot_p90_ms", "tpot_p99_ms",
"e2e_mean_ms", "e2e_p50_ms", "e2e_p90_ms", "e2e_p99_ms"]:
print(f"{m:<14}", end="")
for cfg in args.configs:
if cfg not in results:
print(f"{'-':>22}", end="")
else:
print(f"{fmt(results[cfg][m]):>22}", end="")
print()
# Tax tables
if len(args.configs) >= 2 and all(c in results for c in args.configs):
plain = args.configs[0]
for ref_cfg in args.configs[1:]:
print(f"\n=== {ref_cfg} vs {plain} ===")
for m in ["ttft_p50_ms", "ttft_p90_ms", "ttft_p99_ms",
"tpot_p50_ms", "tpot_p90_ms", "tpot_p99_ms",
"e2e_p50_ms", "e2e_p90_ms", "e2e_p99_ms"]:
a = results[plain][m]; b = results[ref_cfg][m]
print(f" {m:<14} {fmt(a):>10}{fmt(b):>10} ({pctd(a, b)})")
if "unified_kv_both" in results and "unified_kv_both_drfix" in results:
print(f"\n=== DR-fix improvement: unified_kv_both_drfix vs unified_kv_both ===")
for m in ["ttft_p50_ms", "ttft_p90_ms", "ttft_p99_ms",
"tpot_p50_ms", "tpot_p90_ms", "tpot_p99_ms",
"e2e_p50_ms", "e2e_p90_ms", "e2e_p99_ms"]:
a = results["unified_kv_both"][m]
b = results["unified_kv_both_drfix"][m]
print(f" {m:<14} {fmt(a):>10}{fmt(b):>10} ({pctd(a, b)})")
# Save machine-readable
with open(args.root / "trace_replay_summary.json", "w") as f:
json.dump(results, f, indent=2)
print(f"\nWrote {args.root}/trace_replay_summary.json")
if __name__ == "__main__":
main()