MB5 aggregate: cross-config KV-pool + latency comparison

Reads sweep root + tag, for each (config, rep):
- merges per-PID snapshots into cluster-wide KV timeline (carry-forward
  for PIDs without a sample in the bin)
- computes peak (max) and steady-state (10-90% median) pool utilization
- pulls latency p50/p90/p99 from replay_metrics.summary.json

Produces 4 outputs in --out-dir:
- mb5_kv_timeline.png    — N-panel cluster KV % over time, one panel per
                            config, faint per-rep lines + bold median
- mb5_peak_utilization.png — bar chart (peak vs steady) with ±std error bars
- mb5_latency_compare.png  — bar chart p50/p90/p99 e2e latency per config
- mb5_summary.csv          — flat per-(config, rep) table for the writeup

Validated on 4P+4D × 20-req smoke:
  4P+4D rep1: peak=12.8% steady=10.7% peak_wait=1
  p50=1.3s p90=10.5s p99=17.1s (vs. <1s for 8C — expected gap).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 00:49:21 +08:00
parent a9c7310f4a
commit a66f24d242

View File

@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""Aggregate MB5 sweep data into cross-config comparison figures.
Reads a sweep root (e.g. /home/admin/.../mb5_runs/) and a tag
(e.g. "20260527_164040"). For each (config, rep) tuple, loads:
${tag}_${config}_rep${N}/replay_metrics.summary.json -> aggregate stats
${tag}_${config}_rep${N}/replay_metrics.jsonl -> per-request latency
${tag}_${config}_rep${N}_${config}/kv_snapshots/ -> per-instance KV state
Produces, in --out-dir:
mb5_kv_timeline.png — 4 panels, cluster-wide KV utilization over time
(1 faint line per rep + bold median across reps)
mb5_peak_utilization.png — bar chart: peak / steady KV util per config
(mean across reps + error bars)
mb5_latency_compare.png — bar chart: p50 / p90 / p99 e2e latency per config
mb5_summary.csv — flat table for the writeup
Use case:
python aggregate_mb5.py --sweep-root /home/.../mb5_runs \\
--tag 20260527_164040 \\
--configs "8C 6P+2D 4P+4D 2P+6D" \\
--reps 3 \\
--out-dir figs/mb5
"""
from __future__ import annotations
import argparse
import csv
import json
from collections import defaultdict
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
def load_snapshots_for_run(snap_dir: Path) -> list[dict]:
"""Merge all per-PID snapshot files in snap_dir, tag with pid, sort by t_unix."""
out = []
for f in sorted(snap_dir.glob("mb5_kv_snapshot_pid*.jsonl")):
pid = int(f.stem.replace("mb5_kv_snapshot_pid", ""))
with f.open() as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
d = json.loads(line)
except json.JSONDecodeError:
continue
d["pid"] = pid
out.append(d)
out.sort(key=lambda d: d["t_unix"])
return out
def cluster_timeline(snaps: list[dict], bin_size_s: float = 1.0) -> tuple[np.ndarray, ...]:
"""Bin per-PID snapshots into a cluster-wide timeline.
For each time bin, sum used_blocks across PIDs that emitted a snapshot
in that bin. PIDs without a sample in a bin carry their previous value
forward (so a quiet PID doesn't artificially drop the total).
"""
if not snaps:
empty = np.array([], dtype=float)
return empty, empty, empty, empty, empty
t0 = snaps[0]["t_unix"]
t_end = snaps[-1]["t_unix"]
n_bins = max(1, int(np.ceil((t_end - t0) / bin_size_s)) + 1)
times = np.arange(n_bins) * bin_size_s
pids = sorted({s["pid"] for s in snaps})
pid_to_idx = {pid: i for i, pid in enumerate(pids)}
# last-known used/total/waiting per PID at each bin (carry-forward)
used = np.zeros((len(pids), n_bins), dtype=np.int64)
waiting = np.zeros((len(pids), n_bins), dtype=np.int64)
running = np.zeros((len(pids), n_bins), dtype=np.int64)
total_per_pid = np.zeros(len(pids), dtype=np.int64)
last_used = [0] * len(pids)
last_waiting = [0] * len(pids)
last_running = [0] * len(pids)
snap_iter = iter(snaps)
next_snap = next(snap_iter, None)
for b in range(n_bins):
t_lo = t0 + b * bin_size_s
t_hi = t_lo + bin_size_s
while next_snap is not None and next_snap["t_unix"] < t_hi:
i = pid_to_idx[next_snap["pid"]]
last_used[i] = next_snap.get("used_blocks", 0)
last_waiting[i] = len(next_snap.get("waiting", []))
last_running[i] = len(next_snap.get("running", []))
total_per_pid[i] = next_snap.get("total_blocks", 0)
next_snap = next(snap_iter, None)
for i in range(len(pids)):
used[i, b] = last_used[i]
waiting[i, b] = last_waiting[i]
running[i, b] = last_running[i]
total_used = used.sum(axis=0)
total_pool = int(total_per_pid.sum())
total_waiting = waiting.sum(axis=0)
total_running = running.sum(axis=0)
pool_frac = total_used / max(total_pool, 1)
return times, total_used, pool_frac, total_waiting, total_running
def load_summary(rundir: Path) -> dict | None:
p = rundir / "replay_metrics.summary.json"
if not p.is_file():
return None
return json.loads(p.read_text())
def per_run_metrics(snaps_dir: Path, rundir: Path) -> dict:
snaps = load_snapshots_for_run(snaps_dir)
times, total_used, pool_frac, total_waiting, total_running = cluster_timeline(snaps)
summary = load_summary(rundir) or {}
# Trim the warmup/cooldown 10% to compute "steady-state" stats
n = len(times)
if n >= 10:
lo, hi = int(n * 0.1), int(n * 0.9)
frac_steady = pool_frac[lo:hi]
wait_steady = total_waiting[lo:hi]
else:
frac_steady = pool_frac
wait_steady = total_waiting
return {
"snaps": snaps,
"times": times,
"total_used": total_used,
"pool_frac": pool_frac,
"total_waiting": total_waiting,
"total_running": total_running,
"peak_pool_frac": float(pool_frac.max()) if n else 0.0,
"steady_pool_frac": float(np.median(frac_steady)) if n else 0.0,
"peak_waiting": int(total_waiting.max()) if n else 0,
"summary": summary,
}
def collect_sweep(sweep_root: Path, tag: str, configs: list[str], reps: int) -> dict:
"""Returns {config: [run_record_per_rep]}."""
out: dict[str, list[dict]] = defaultdict(list)
for config in configs:
for rep in range(1, reps + 1):
rundir = sweep_root / f"{tag}_{config}_rep{rep}"
snap_dir = sweep_root / f"{tag}_{config}_rep{rep}_{config}/kv_snapshots"
if not snap_dir.is_dir():
print(f"[agg] MISSING: {snap_dir}")
continue
metrics = per_run_metrics(snap_dir, rundir)
metrics["rep"] = rep
out[config].append(metrics)
print(
f"[agg] {config} rep{rep}: peak={metrics['peak_pool_frac']:.1%} "
f"steady={metrics['steady_pool_frac']:.1%} "
f"peak_wait={metrics['peak_waiting']}"
)
return out
def plot_kv_timeline(sweep: dict, out: Path) -> None:
n_configs = len(sweep)
if n_configs == 0:
return
fig, axes = plt.subplots(n_configs, 1, figsize=(14, 2.5 * n_configs), sharex=True)
if n_configs == 1:
axes = [axes]
for ax, (config, reps) in zip(axes, sweep.items()):
for rep_data in reps:
t = rep_data["times"]
ax.plot(t, rep_data["pool_frac"] * 100, alpha=0.4, lw=1.0,
label=f"rep{rep_data['rep']}")
# bold median across reps (need to align times — use longest series)
if reps:
max_len = max(len(r["times"]) for r in reps)
arr = np.full((len(reps), max_len), np.nan)
for i, r in enumerate(reps):
arr[i, :len(r["pool_frac"])] = r["pool_frac"]
median = np.nanmedian(arr, axis=0) * 100
ax.plot(np.arange(max_len), median, color="#222", lw=2.0, label="median")
ax.axhline(90, color="#c44e52", ls="--", alpha=0.6, lw=1, label="90%")
ax.set_ylim(0, 105)
ax.set_ylabel(f"{config}\ncluster KV (%)")
ax.grid(True, alpha=0.3)
ax.legend(loc="upper right", fontsize=8)
axes[-1].set_xlabel("wall-clock since first snapshot (s)")
fig.suptitle("MB5: cluster-wide KV pool utilization over time", fontsize=12)
fig.tight_layout()
out.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def plot_peak_utilization(sweep: dict, out: Path) -> None:
configs = list(sweep.keys())
peaks = [[r["peak_pool_frac"] * 100 for r in sweep[c]] for c in configs]
steady = [[r["steady_pool_frac"] * 100 for r in sweep[c]] for c in configs]
peak_means = [np.mean(p) if p else 0 for p in peaks]
peak_std = [np.std(p) if len(p) > 1 else 0 for p in peaks]
steady_means = [np.mean(s) if s else 0 for s in steady]
steady_std = [np.std(s) if len(s) > 1 else 0 for s in steady]
x = np.arange(len(configs))
width = 0.35
fig, ax = plt.subplots(figsize=(9, 4.5))
ax.bar(x - width/2, peak_means, width, yerr=peak_std, label="peak",
color="#c44e52", capsize=4)
ax.bar(x + width/2, steady_means, width, yerr=steady_std, label="steady (1090%)",
color="#4c72b0", capsize=4)
ax.axhline(90, color="#444", ls="--", alpha=0.5, lw=1, label="90% red line")
ax.set_xticks(x)
ax.set_xticklabels(configs)
ax.set_ylabel("Cluster KV pool utilization (%)")
ax.set_ylim(0, 105)
ax.set_title("MB5: KV pool pressure — peak vs steady-state")
ax.legend(loc="upper left", fontsize=9)
ax.grid(True, axis="y", alpha=0.3)
fig.tight_layout()
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def plot_latency_compare(sweep: dict, out: Path) -> None:
configs = list(sweep.keys())
metrics = ["p50", "p90", "p99"]
data = {m: [] for m in metrics}
for c in configs:
for m in metrics:
vals = []
for r in sweep[c]:
s = r["summary"].get("latency_stats_s")
if s and s.get(m) is not None:
vals.append(s[m])
data[m].append(np.mean(vals) if vals else 0.0)
x = np.arange(len(configs))
width = 0.25
colors = {"p50": "#4c72b0", "p90": "#dd8452", "p99": "#c44e52"}
fig, ax = plt.subplots(figsize=(9, 4.5))
for i, m in enumerate(metrics):
ax.bar(x + (i - 1) * width, data[m], width, label=m, color=colors[m])
ax.set_xticks(x)
ax.set_xticklabels(configs)
ax.set_ylabel("End-to-end latency (s)")
ax.set_title("MB5: e2e latency by PD configuration")
ax.legend()
ax.grid(True, axis="y", alpha=0.3)
fig.tight_layout()
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def write_summary_csv(sweep: dict, out: Path) -> None:
rows = []
for config, reps in sweep.items():
for r in reps:
s = r["summary"]
lat = s.get("latency_stats_s") or {}
ttft = s.get("ttft_stats_s") or {}
rows.append({
"config": config,
"rep": r["rep"],
"n_requests": s.get("request_count"),
"n_success": s.get("success_count"),
"wall_clock_s": s.get("wall_clock_s"),
"peak_pool_frac": r["peak_pool_frac"],
"steady_pool_frac": r["steady_pool_frac"],
"peak_waiting": r["peak_waiting"],
"latency_p50_s": lat.get("p50"),
"latency_p90_s": lat.get("p90"),
"latency_p99_s": lat.get("p99"),
"ttft_p50_s": ttft.get("p50"),
"ttft_p90_s": ttft.get("p90"),
"ttft_p99_s": ttft.get("p99"),
"prefix_cache_hit_ratio": s.get("prefix_cache_hit_ratio"),
})
if not rows:
print("[agg] no rows; skipping CSV")
return
out.parent.mkdir(parents=True, exist_ok=True)
with out.open("w", newline="") as fh:
w = csv.DictWriter(fh, fieldnames=list(rows[0].keys()))
w.writeheader()
w.writerows(rows)
print(f"wrote {out} ({len(rows)} rows)")
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--sweep-root", type=Path, required=True,
help="dir containing ${tag}_${config}_rep${N}/ subdirs")
p.add_argument("--tag", required=True)
p.add_argument("--configs", default="8C 6P+2D 4P+4D 2P+6D",
help="space-separated config names")
p.add_argument("--reps", type=int, default=3)
p.add_argument("--out-dir", type=Path, default=Path("figs/mb5"))
args = p.parse_args()
configs = args.configs.split()
sweep = collect_sweep(args.sweep_root, args.tag, configs, args.reps)
plot_kv_timeline(sweep, args.out_dir / "mb5_kv_timeline.png")
plot_peak_utilization(sweep, args.out_dir / "mb5_peak_utilization.png")
plot_latency_compare(sweep, args.out_dir / "mb5_latency_compare.png")
write_summary_csv(sweep, args.out_dir / "mb5_summary.csv")
if __name__ == "__main__":
main()