Files
agentic-kvc/microbench/fresh_setup/aggregate_mb5.py
Gahow Wang 8596135680 MB5 analysis: per-role KV split proves static-partition mismatch
aggregate_mb5.py:
- Split the cluster KV timeline by role (P-pool vs D-pool) using a
  PID->role map parsed from vllm_logs filenames. The cluster average
  hid the result — 6P+2D/4P+4D look ~45% utilized but the decode pool
  is actually pegged at ~100% while prefill idles at ~30%.
- Two-stage reduce/plot: --reduce-to (numpy-only, runs on the serving
  host over multi-GB snapshot dirs) dumps a compact JSON; --from-reduced
  (matplotlib) renders locally. matplotlib import is now lazy.
- New plot_role_split figure + p/d peak/steady columns in the CSV.

PD_DISAGG_RESULTS.md: consolidated writeup with figures inline.
Verdict: no static P:D ratio beats 8C colocation. The binding
constraint moves with the ratio (D-pool saturates at 6P+2D/4P+4D,
P-pool jams at 2P+6D -> 91% request loss); 8C's shared pool stays
elastic at 34% steady, 100% completion. PD wins TPOT (10-35x cleaner,
the MB1 phase-isolation benefit is real) but loses TTFT and sheds
load. Round-robin P routing also zeroes prefix-cache reuse; a
session-affinity re-run of 6P+2D is in flight to test the fix.

Figures (rep1): mb5_kv_timeline, mb5_role_split, mb5_peak_utilization,
mb5_latency_compare + mb5_summary.csv.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 12:05:17 +08:00

482 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Aggregate MB5 sweep data into cross-config comparison figures.
Reads a sweep root (e.g. /home/admin/.../mb5_runs/) and a tag
(e.g. "20260527_164040"). For each (config, rep) tuple, loads:
${tag}_${config}_rep${N}/replay_metrics.summary.json -> aggregate stats
${tag}_${config}_rep${N}/replay_metrics.jsonl -> per-request latency
${tag}_${config}_rep${N}_${config}/kv_snapshots/ -> per-instance KV state
Produces, in --out-dir:
mb5_kv_timeline.png — 4 panels, cluster-wide KV utilization over time
(1 faint line per rep + bold median across reps)
mb5_peak_utilization.png — bar chart: peak / steady KV util per config
(mean across reps + error bars)
mb5_latency_compare.png — bar chart: p50 / p90 / p99 e2e latency per config
mb5_summary.csv — flat table for the writeup
Use case:
python aggregate_mb5.py --sweep-root /home/.../mb5_runs \\
--tag 20260527_164040 \\
--configs "8C 6P+2D 4P+4D 2P+6D" \\
--reps 3 \\
--out-dir figs/mb5
"""
from __future__ import annotations
import argparse
import csv
import json
from collections import defaultdict
from pathlib import Path
import numpy as np
# matplotlib is imported lazily inside the plot functions so the --reduce
# path (numpy-only) can run on a serving host without matplotlib installed.
def load_snapshots_for_run(snap_dir: Path) -> list[dict]:
"""Merge all per-PID snapshot files in snap_dir, tag with pid, sort by t_unix."""
out = []
for f in sorted(snap_dir.glob("mb5_kv_snapshot_pid*.jsonl")):
pid = int(f.stem.replace("mb5_kv_snapshot_pid", ""))
with f.open() as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
d = json.loads(line)
except json.JSONDecodeError:
continue
d["pid"] = pid
out.append(d)
out.sort(key=lambda d: d["t_unix"])
return out
def load_pid_roles(logs_dir: Path) -> dict[int, str]:
"""Map EngineCore PID -> 'P' | 'D' | 'C' by parsing vllm_logs filenames.
File names look like vllm_idx{i}_gpu{g}_kv_{producer|consumer|both}.log and
each contains '(EngineCore pid=NNNN)'. Returns {} if no logs found.
"""
role_map = {"producer": "P", "consumer": "D", "both": "C"}
out: dict[int, str] = {}
if not logs_dir.is_dir():
return out
for f in logs_dir.glob("vllm_idx*_kv_*.log"):
role = None
for key, short in role_map.items():
if f.name.endswith(f"kv_{key}.log"):
role = short
break
if role is None:
continue
with f.open(errors="ignore") as fh:
for line in fh:
if "EngineCore pid=" in line:
try:
pid = int(line.split("EngineCore pid=")[1].split(")")[0].split()[0])
out[pid] = role
break
except (ValueError, IndexError):
continue
return out
def cluster_timeline(snaps: list[dict], bin_size_s: float = 1.0,
keep_pids: set | None = None,
t0: float | None = None,
n_bins: int | None = None) -> tuple[np.ndarray, ...]:
"""Bin per-PID snapshots into a cluster-wide timeline.
For each time bin, sum used_blocks across PIDs that emitted a snapshot
in that bin. PIDs without a sample in a bin carry their previous value
forward (so a quiet PID doesn't artificially drop the total).
If keep_pids is given, only those PIDs are counted (used for per-role
P-pool / D-pool splits); the pool ceiling is summed over the same subset.
Pass a shared t0/n_bins so role-splits land on the same time grid.
"""
if keep_pids is not None:
snaps = [s for s in snaps if s["pid"] in keep_pids]
if not snaps:
empty = np.array([], dtype=float)
return empty, empty, empty, empty, empty
if t0 is None:
t0 = snaps[0]["t_unix"]
if n_bins is None:
t_end = snaps[-1]["t_unix"]
n_bins = max(1, int(np.ceil((t_end - t0) / bin_size_s)) + 1)
times = np.arange(n_bins) * bin_size_s
pids = sorted({s["pid"] for s in snaps})
pid_to_idx = {pid: i for i, pid in enumerate(pids)}
# last-known used/total/waiting per PID at each bin (carry-forward)
used = np.zeros((len(pids), n_bins), dtype=np.int64)
waiting = np.zeros((len(pids), n_bins), dtype=np.int64)
running = np.zeros((len(pids), n_bins), dtype=np.int64)
total_per_pid = np.zeros(len(pids), dtype=np.int64)
last_used = [0] * len(pids)
last_waiting = [0] * len(pids)
last_running = [0] * len(pids)
snap_iter = iter(snaps)
next_snap = next(snap_iter, None)
for b in range(n_bins):
t_lo = t0 + b * bin_size_s
t_hi = t_lo + bin_size_s
while next_snap is not None and next_snap["t_unix"] < t_hi:
i = pid_to_idx[next_snap["pid"]]
last_used[i] = next_snap.get("used_blocks", 0)
last_waiting[i] = len(next_snap.get("waiting", []))
last_running[i] = len(next_snap.get("running", []))
total_per_pid[i] = next_snap.get("total_blocks", 0)
next_snap = next(snap_iter, None)
for i in range(len(pids)):
used[i, b] = last_used[i]
waiting[i, b] = last_waiting[i]
running[i, b] = last_running[i]
total_used = used.sum(axis=0)
total_pool = int(total_per_pid.sum())
total_waiting = waiting.sum(axis=0)
total_running = running.sum(axis=0)
pool_frac = total_used / max(total_pool, 1)
return times, total_used, pool_frac, total_waiting, total_running
def load_summary(rundir: Path) -> dict | None:
p = rundir / "replay_metrics.summary.json"
if not p.is_file():
return None
return json.loads(p.read_text())
def _steady_median(arr: np.ndarray) -> float:
n = len(arr)
if n == 0:
return 0.0
if n >= 10:
return float(np.median(arr[int(n * 0.1):int(n * 0.9)]))
return float(np.median(arr))
def per_run_metrics(snaps_dir: Path, rundir: Path) -> dict:
snaps = load_snapshots_for_run(snaps_dir)
summary = load_summary(rundir) or {}
# Establish a shared time grid (global t0 / n_bins) so the overall and
# per-role timelines all line up on the same x axis.
if snaps:
t0 = snaps[0]["t_unix"]
t_end = snaps[-1]["t_unix"]
n_bins = max(1, int(np.ceil(t_end - t0)) + 1)
else:
t0, n_bins = None, None
times, total_used, pool_frac, total_waiting, total_running = cluster_timeline(
snaps, t0=t0, n_bins=n_bins
)
n = len(times)
out = {
"times": times.tolist(),
"total_used": total_used.tolist(),
"pool_frac": pool_frac.tolist(),
"total_waiting": total_waiting.tolist(),
"total_running": total_running.tolist(),
"peak_pool_frac": float(pool_frac.max()) if n else 0.0,
"steady_pool_frac": _steady_median(pool_frac),
"peak_waiting": int(total_waiting.max()) if n else 0,
"summary": summary,
}
# Per-role (P-pool vs D-pool) split for PD configs.
roles = load_pid_roles(snaps_dir.parent / "vllm_logs")
p_pids = {pid for pid, r in roles.items() if r == "P"}
d_pids = {pid for pid, r in roles.items() if r == "D"}
if p_pids and d_pids:
for tag, subset in (("p", p_pids), ("d", d_pids)):
_, _, frac, _, run = cluster_timeline(
snaps, keep_pids=subset, t0=t0, n_bins=n_bins
)
out[f"{tag}_pool_frac"] = frac.tolist()
out[f"{tag}_running"] = run.tolist()
out[f"{tag}_peak_frac"] = float(frac.max()) if len(frac) else 0.0
out[f"{tag}_steady_frac"] = _steady_median(frac)
return out
def collect_sweep(sweep_root: Path, tag: str, configs: list[str], reps: int) -> dict:
"""Returns {config: [run_record_per_rep]}."""
out: dict[str, list[dict]] = defaultdict(list)
for config in configs:
for rep in range(1, reps + 1):
rundir = sweep_root / f"{tag}_{config}_rep{rep}"
snap_dir = sweep_root / f"{tag}_{config}_rep{rep}_{config}/kv_snapshots"
if not snap_dir.is_dir():
print(f"[agg] MISSING: {snap_dir}")
continue
metrics = per_run_metrics(snap_dir, rundir)
metrics["rep"] = rep
out[config].append(metrics)
print(
f"[agg] {config} rep{rep}: peak={metrics['peak_pool_frac']:.1%} "
f"steady={metrics['steady_pool_frac']:.1%} "
f"peak_wait={metrics['peak_waiting']}"
)
return out
def plot_kv_timeline(sweep: dict, out: Path) -> None:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
n_configs = len(sweep)
if n_configs == 0:
return
fig, axes = plt.subplots(n_configs, 1, figsize=(14, 2.5 * n_configs), sharex=True)
if n_configs == 1:
axes = [axes]
for ax, (config, reps) in zip(axes, sweep.items()):
for rep_data in reps:
t = np.asarray(rep_data["times"])
ax.plot(t, np.asarray(rep_data["pool_frac"]) * 100, alpha=0.4, lw=1.0,
label=f"rep{rep_data['rep']}")
# bold median across reps (need to align times — use longest series)
if reps:
max_len = max(len(r["times"]) for r in reps)
arr = np.full((len(reps), max_len), np.nan)
for i, r in enumerate(reps):
arr[i, :len(r["pool_frac"])] = r["pool_frac"]
median = np.nanmedian(arr, axis=0) * 100
ax.plot(np.arange(max_len), median, color="#222", lw=2.0, label="median")
ax.axhline(90, color="#c44e52", ls="--", alpha=0.6, lw=1, label="90%")
ax.set_ylim(0, 105)
ax.set_ylabel(f"{config}\ncluster KV (%)")
ax.grid(True, alpha=0.3)
ax.legend(loc="upper right", fontsize=8)
axes[-1].set_xlabel("wall-clock since first snapshot (s)")
fig.suptitle("MB5: cluster-wide KV pool utilization over time", fontsize=12)
fig.tight_layout()
out.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def plot_peak_utilization(sweep: dict, out: Path) -> None:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
configs = list(sweep.keys())
peaks = [[r["peak_pool_frac"] * 100 for r in sweep[c]] for c in configs]
steady = [[r["steady_pool_frac"] * 100 for r in sweep[c]] for c in configs]
peak_means = [np.mean(p) if p else 0 for p in peaks]
peak_std = [np.std(p) if len(p) > 1 else 0 for p in peaks]
steady_means = [np.mean(s) if s else 0 for s in steady]
steady_std = [np.std(s) if len(s) > 1 else 0 for s in steady]
x = np.arange(len(configs))
width = 0.35
fig, ax = plt.subplots(figsize=(9, 4.5))
ax.bar(x - width/2, peak_means, width, yerr=peak_std, label="peak",
color="#c44e52", capsize=4)
ax.bar(x + width/2, steady_means, width, yerr=steady_std, label="steady (1090%)",
color="#4c72b0", capsize=4)
ax.axhline(90, color="#444", ls="--", alpha=0.5, lw=1, label="90% red line")
ax.set_xticks(x)
ax.set_xticklabels(configs)
ax.set_ylabel("Cluster KV pool utilization (%)")
ax.set_ylim(0, 105)
ax.set_title("MB5: KV pool pressure — peak vs steady-state")
ax.legend(loc="upper left", fontsize=9)
ax.grid(True, axis="y", alpha=0.3)
fig.tight_layout()
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def plot_latency_compare(sweep: dict, out: Path) -> None:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
configs = list(sweep.keys())
metrics = ["p50", "p90", "p99"]
data = {m: [] for m in metrics}
for c in configs:
for m in metrics:
vals = []
for r in sweep[c]:
s = r["summary"].get("latency_stats_s")
if s and s.get(m) is not None:
vals.append(s[m])
data[m].append(np.mean(vals) if vals else 0.0)
x = np.arange(len(configs))
width = 0.25
colors = {"p50": "#4c72b0", "p90": "#dd8452", "p99": "#c44e52"}
fig, ax = plt.subplots(figsize=(9, 4.5))
for i, m in enumerate(metrics):
ax.bar(x + (i - 1) * width, data[m], width, label=m, color=colors[m])
ax.set_xticks(x)
ax.set_xticklabels(configs)
ax.set_ylabel("End-to-end latency (s)")
ax.set_title("MB5: e2e latency by PD configuration")
ax.legend()
ax.grid(True, axis="y", alpha=0.3)
fig.tight_layout()
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def plot_role_split(sweep: dict, out: Path) -> None:
"""For PD configs, show P-pool vs D-pool KV % over time (rep1) — exposes
the imbalance that the cluster average hides. 8C (no role split) shows
the overall cluster line for reference."""
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
n_configs = len(sweep)
if n_configs == 0:
return
fig, axes = plt.subplots(n_configs, 1, figsize=(14, 2.6 * n_configs), sharex=True)
if n_configs == 1:
axes = [axes]
for ax, (config, reps) in zip(axes, sweep.items()):
if not reps:
continue
r = reps[0] # rep1
t = np.asarray(r["times"])
if "p_pool_frac" in r and "d_pool_frac" in r:
ax.plot(t, np.asarray(r["p_pool_frac"]) * 100, color="#4c72b0",
lw=1.5, label="P-pool (prefill)")
ax.plot(t, np.asarray(r["d_pool_frac"]) * 100, color="#c44e52",
lw=1.5, label="D-pool (decode)")
ax.plot(t, np.asarray(r["pool_frac"]) * 100, color="#999",
lw=1.0, ls=":", label="cluster avg")
else:
ax.plot(t, np.asarray(r["pool_frac"]) * 100, color="#222",
lw=1.5, label="cluster (kv_both)")
ax.axhline(90, color="#444", ls="--", alpha=0.5, lw=1)
ax.set_ylim(0, 105)
ax.set_ylabel(f"{config}\nKV pool (%)")
ax.grid(True, alpha=0.3)
ax.legend(loc="upper right", fontsize=8, ncol=3)
axes[-1].set_xlabel("wall-clock since first snapshot (s)")
fig.suptitle("MB5: per-role KV pool utilization (P-pool vs D-pool), rep1",
fontsize=12)
fig.tight_layout()
out.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(out, dpi=120)
plt.close(fig)
print(f"wrote {out}")
def write_summary_csv(sweep: dict, out: Path) -> None:
rows = []
for config, reps in sweep.items():
for r in reps:
s = r["summary"]
lat = s.get("latency_stats_s") or {}
ttft = s.get("ttft_stats_s") or {}
rows.append({
"config": config,
"rep": r["rep"],
"n_requests": s.get("request_count"),
"n_success": s.get("success_count"),
"wall_clock_s": s.get("wall_clock_s"),
"peak_pool_frac": r["peak_pool_frac"],
"steady_pool_frac": r["steady_pool_frac"],
"p_pool_peak_frac": r.get("p_peak_frac"),
"p_pool_steady_frac": r.get("p_steady_frac"),
"d_pool_peak_frac": r.get("d_peak_frac"),
"d_pool_steady_frac": r.get("d_steady_frac"),
"peak_waiting": r["peak_waiting"],
"latency_p50_s": lat.get("p50"),
"latency_p90_s": lat.get("p90"),
"latency_p99_s": lat.get("p99"),
"ttft_p50_s": ttft.get("p50"),
"ttft_p90_s": ttft.get("p90"),
"ttft_p99_s": ttft.get("p99"),
"prefix_cache_hit_ratio": s.get("prefix_cache_hit_ratio"),
})
if not rows:
print("[agg] no rows; skipping CSV")
return
out.parent.mkdir(parents=True, exist_ok=True)
with out.open("w", newline="") as fh:
w = csv.DictWriter(fh, fieldnames=list(rows[0].keys()))
w.writeheader()
w.writerows(rows)
print(f"wrote {out} ({len(rows)} rows)")
def render_all(sweep: dict, out_dir: Path) -> None:
plot_kv_timeline(sweep, out_dir / "mb5_kv_timeline.png")
plot_role_split(sweep, out_dir / "mb5_role_split.png")
plot_peak_utilization(sweep, out_dir / "mb5_peak_utilization.png")
plot_latency_compare(sweep, out_dir / "mb5_latency_compare.png")
write_summary_csv(sweep, out_dir / "mb5_summary.csv")
def main() -> None:
p = argparse.ArgumentParser(
description="MB5 aggregate. Two-stage: --reduce (numpy-only, runs on "
"a serving host) dumps a compact JSON; --from-reduced "
"(needs matplotlib) renders figures locally. Or run "
"directly (raw snapshots -> figures) when both the data "
"and matplotlib are local."
)
p.add_argument("--sweep-root", type=Path,
help="dir containing ${tag}_${config}_rep${N}/ subdirs")
p.add_argument("--tag")
p.add_argument("--configs", default="8C 6P+2D 4P+4D 2P+6D",
help="space-separated config names")
p.add_argument("--reps", type=int, default=3)
p.add_argument("--out-dir", type=Path, default=Path("figs/mb5"))
p.add_argument("--reduce-to", type=Path,
help="numpy-only: write reduced sweep JSON here and exit "
"(no plotting, no matplotlib needed)")
p.add_argument("--from-reduced", type=Path,
help="load a reduced sweep JSON (from --reduce-to) and "
"render figures into --out-dir")
args = p.parse_args()
if args.from_reduced:
sweep = json.loads(args.from_reduced.read_text())
render_all(sweep, args.out_dir)
return
if not (args.sweep_root and args.tag):
p.error("--sweep-root and --tag are required unless --from-reduced is given")
configs = args.configs.split()
sweep = collect_sweep(args.sweep_root, args.tag, configs, args.reps)
if args.reduce_to:
args.reduce_to.parent.mkdir(parents=True, exist_ok=True)
args.reduce_to.write_text(json.dumps(sweep))
print(f"wrote reduced sweep -> {args.reduce_to}")
return
render_all(sweep, args.out_dir)
if __name__ == "__main__":
main()