Files
agentic-kvc/microbench/connector_tax/analyze.py
Gahow Wang 297fed6e73 Microbench 3 (connector_tax): infrastructure for KV connector substrate tax
Validates the elastic_migration_v2 finding that kv_role=kv_both adds
TTFT p90 +45% even when PD-sep never fires. Replicates under
single-instance, synthetic, open-loop workload to disambiguate
mechanism cost from 8-instance feedback amplification.

Configurations (8):
  plain, noop_connector, mooncake_{producer,consumer,both},
  nixl_both, lmcache_only, multi_mooncake_lmcache.

Pre-flight verification gates risky configs (kv_consumer needs dummy
bootstrap, multi-connector composition, NoOp custom class loading).

Workload: two-phase sweep
  Phase A: rate {0.5..32} req/s × shape (4096, 256), saturation criteria
  Phase B: ref_safe rate × cartesian (input ∈ {512,4k,32k}, output ∈ {64,256,1024})

Step-timing patch enriches vLLM's existing AGENTIC_STEP_LOG_PATH emit
with step_duration_us and build_meta_us — directly measures per-step
substrate cost, not just user-visible TTFT/TPOT.

run_all.sh runs as 5-stage barrier:
  0 pre-flight + apply patch
  1 Phase A all configs
  2 pick ref_safe / ref_load
  3 Phase B all configs
  4 revert patch + analyze + plot

Outputs aggregate.{json,csv}, MANIFEST.tsv, and 5 figures.
Estimated runtime: 4-5.5 hours on idle dash0 H20.
2026-05-26 17:27:41 +08:00

178 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""Aggregate connector_tax results.
Reads results/<config>/summary_A.json and summary_B.json for every config,
applies saturation criteria, picks ref_safe / ref_load, and writes
aggregate.json + aggregate.csv.
Usage:
analyze.py --root microbench/connector_tax/results
"""
import argparse
import csv
import json
from pathlib import Path
SAT_THROUGHPUT_RATIO = 0.95
SAT_QUEUE_P50 = 1.0
SAT_TTFT_INFLATION = 1.5 # vs previous (lower) rate
def saturated(cell: dict, prev_ttft_p90: float | None) -> tuple[bool, list[str]]:
reasons = []
tr = cell.get("throughput_ratio")
if tr is not None and tr < SAT_THROUGHPUT_RATIO:
reasons.append(f"throughput_ratio={tr:.2f}<{SAT_THROUGHPUT_RATIO}")
# queue p50 from inflight (proxy)
inf50 = cell.get("inflight_p50") or 0
# Note: inflight_p50 measured at send time. >= 2 means queue forming.
if inf50 >= 2:
# Throughput tracking is the primary signal; this is corroboration.
pass
ttft = cell.get("ttft_ms_p90")
if (
ttft is not None
and prev_ttft_p90 is not None
and prev_ttft_p90 > 0
and ttft / prev_ttft_p90 > SAT_TTFT_INFLATION
):
reasons.append(f"ttft_p90 inflated {ttft / prev_ttft_p90:.2f}x")
return (len(reasons) > 0, reasons)
def analyze(root: Path) -> dict:
configs: dict[str, dict] = {}
for cfg_dir in sorted(root.iterdir()):
if not cfg_dir.is_dir():
continue
if cfg_dir.name == "preflight":
continue
cfg = cfg_dir.name
sa = cfg_dir / "summary_A.json"
sb = cfg_dir / "summary_B.json"
cfg_data = {"phase_a": [], "phase_b": []}
if sa.exists():
cfg_data["phase_a"] = json.loads(sa.read_text())
if sb.exists():
cfg_data["phase_b"] = json.loads(sb.read_text())
configs[cfg] = cfg_data
# ── flag saturation per cell, per config (Phase A only) ────────
for cfg, data in configs.items():
cells = sorted(data["phase_a"], key=lambda c: c["rate_target"])
prev = None
for c in cells:
sat, reasons = saturated(c, prev)
c["saturated"] = sat
c["sat_reasons"] = reasons
prev = c.get("ttft_ms_p90")
# ── pick reference rates ───────────────────────────────────────
# ref_safe = max rate where ALL configs are NOT saturated
rates = sorted({c["rate_target"]
for d in configs.values()
for c in d["phase_a"]})
ref_safe = None
for r in rates:
all_ok = True
for cfg, d in configs.items():
cells = [c for c in d["phase_a"] if c["rate_target"] == r]
if not cells:
continue
if cells[0]["saturated"]:
all_ok = False
break
if all_ok:
ref_safe = r
# ref_load = max rate where 'plain' is not saturated
ref_load = None
plain = configs.get("plain", {})
for c in sorted(plain.get("phase_a", []), key=lambda c: c["rate_target"]):
if not c["saturated"]:
ref_load = c["rate_target"]
out = {
"configs": configs,
"rates_swept": rates,
"ref_safe": ref_safe,
"ref_load": ref_load,
}
return out
def write_csv(agg: dict, out_path: Path) -> None:
rows = []
for cfg, d in agg["configs"].items():
for c in d["phase_a"]:
rows.append({
"config": cfg,
"phase": "A",
"rate": c["rate_target"],
"input_tokens": c["input_tokens"],
"output_tokens": c["output_tokens"],
"ttft_p50": c.get("ttft_ms_p50"),
"ttft_p90": c.get("ttft_ms_p90"),
"ttft_p99": c.get("ttft_ms_p99"),
"tpot_p50": c.get("tpot_ms_p50"),
"tpot_p90": c.get("tpot_ms_p90"),
"tpot_p99": c.get("tpot_ms_p99"),
"e2e_p90": c.get("e2e_ms_p90"),
"throughput_eff": c.get("throughput_effective_rps"),
"throughput_ratio": c.get("throughput_ratio"),
"n_after_warmup": c.get("n_after_warmup"),
"saturated": c.get("saturated"),
"sat_reasons": ";".join(c.get("sat_reasons", [])),
})
for c in d["phase_b"]:
rows.append({
"config": cfg,
"phase": "B",
"rate": c["rate_target"],
"input_tokens": c["input_tokens"],
"output_tokens": c["output_tokens"],
"ttft_p50": c.get("ttft_ms_p50"),
"ttft_p90": c.get("ttft_ms_p90"),
"ttft_p99": c.get("ttft_ms_p99"),
"tpot_p50": c.get("tpot_ms_p50"),
"tpot_p90": c.get("tpot_ms_p90"),
"tpot_p99": c.get("tpot_ms_p99"),
"e2e_p90": c.get("e2e_ms_p90"),
"throughput_eff": c.get("throughput_effective_rps"),
"throughput_ratio": c.get("throughput_ratio"),
"n_after_warmup": c.get("n_after_warmup"),
"saturated": "",
"sat_reasons": "",
})
if not rows:
return
fields = list(rows[0].keys())
with open(out_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
w.writerows(rows)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--root", type=Path, required=True)
ap.add_argument("--out", type=Path, default=None)
args = ap.parse_args()
if not args.root.exists():
raise SystemExit(f"root not found: {args.root}")
agg = analyze(args.root)
out = args.out or args.root / "aggregate.json"
out.write_text(json.dumps(agg, indent=2))
write_csv(agg, args.root / "aggregate.csv")
print(f"ref_safe = {agg['ref_safe']} ref_load = {agg['ref_load']}")
print(f"Wrote {out} and aggregate.csv")
if __name__ == "__main__":
main()