aggregate_mb5.py: - Split the cluster KV timeline by role (P-pool vs D-pool) using a PID->role map parsed from vllm_logs filenames. The cluster average hid the result — 6P+2D/4P+4D look ~45% utilized but the decode pool is actually pegged at ~100% while prefill idles at ~30%. - Two-stage reduce/plot: --reduce-to (numpy-only, runs on the serving host over multi-GB snapshot dirs) dumps a compact JSON; --from-reduced (matplotlib) renders locally. matplotlib import is now lazy. - New plot_role_split figure + p/d peak/steady columns in the CSV. PD_DISAGG_RESULTS.md: consolidated writeup with figures inline. Verdict: no static P:D ratio beats 8C colocation. The binding constraint moves with the ratio (D-pool saturates at 6P+2D/4P+4D, P-pool jams at 2P+6D -> 91% request loss); 8C's shared pool stays elastic at 34% steady, 100% completion. PD wins TPOT (10-35x cleaner, the MB1 phase-isolation benefit is real) but loses TTFT and sheds load. Round-robin P routing also zeroes prefix-cache reuse; a session-affinity re-run of 6P+2D is in flight to test the fix. Figures (rep1): mb5_kv_timeline, mb5_role_split, mb5_peak_utilization, mb5_latency_compare + mb5_summary.csv. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
482 lines
18 KiB
Python
482 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""Aggregate MB5 sweep data into cross-config comparison figures.
|
||
|
||
Reads a sweep root (e.g. /home/admin/.../mb5_runs/) and a tag
|
||
(e.g. "20260527_164040"). For each (config, rep) tuple, loads:
|
||
|
||
${tag}_${config}_rep${N}/replay_metrics.summary.json -> aggregate stats
|
||
${tag}_${config}_rep${N}/replay_metrics.jsonl -> per-request latency
|
||
${tag}_${config}_rep${N}_${config}/kv_snapshots/ -> per-instance KV state
|
||
|
||
Produces, in --out-dir:
|
||
mb5_kv_timeline.png — 4 panels, cluster-wide KV utilization over time
|
||
(1 faint line per rep + bold median across reps)
|
||
mb5_peak_utilization.png — bar chart: peak / steady KV util per config
|
||
(mean across reps + error bars)
|
||
mb5_latency_compare.png — bar chart: p50 / p90 / p99 e2e latency per config
|
||
mb5_summary.csv — flat table for the writeup
|
||
|
||
Use case:
|
||
python aggregate_mb5.py --sweep-root /home/.../mb5_runs \\
|
||
--tag 20260527_164040 \\
|
||
--configs "8C 6P+2D 4P+4D 2P+6D" \\
|
||
--reps 3 \\
|
||
--out-dir figs/mb5
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
|
||
# matplotlib is imported lazily inside the plot functions so the --reduce
|
||
# path (numpy-only) can run on a serving host without matplotlib installed.
|
||
|
||
|
||
def load_snapshots_for_run(snap_dir: Path) -> list[dict]:
|
||
"""Merge all per-PID snapshot files in snap_dir, tag with pid, sort by t_unix."""
|
||
out = []
|
||
for f in sorted(snap_dir.glob("mb5_kv_snapshot_pid*.jsonl")):
|
||
pid = int(f.stem.replace("mb5_kv_snapshot_pid", ""))
|
||
with f.open() as fh:
|
||
for line in fh:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
d = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
d["pid"] = pid
|
||
out.append(d)
|
||
out.sort(key=lambda d: d["t_unix"])
|
||
return out
|
||
|
||
|
||
def load_pid_roles(logs_dir: Path) -> dict[int, str]:
|
||
"""Map EngineCore PID -> 'P' | 'D' | 'C' by parsing vllm_logs filenames.
|
||
|
||
File names look like vllm_idx{i}_gpu{g}_kv_{producer|consumer|both}.log and
|
||
each contains '(EngineCore pid=NNNN)'. Returns {} if no logs found.
|
||
"""
|
||
role_map = {"producer": "P", "consumer": "D", "both": "C"}
|
||
out: dict[int, str] = {}
|
||
if not logs_dir.is_dir():
|
||
return out
|
||
for f in logs_dir.glob("vllm_idx*_kv_*.log"):
|
||
role = None
|
||
for key, short in role_map.items():
|
||
if f.name.endswith(f"kv_{key}.log"):
|
||
role = short
|
||
break
|
||
if role is None:
|
||
continue
|
||
with f.open(errors="ignore") as fh:
|
||
for line in fh:
|
||
if "EngineCore pid=" in line:
|
||
try:
|
||
pid = int(line.split("EngineCore pid=")[1].split(")")[0].split()[0])
|
||
out[pid] = role
|
||
break
|
||
except (ValueError, IndexError):
|
||
continue
|
||
return out
|
||
|
||
|
||
def cluster_timeline(snaps: list[dict], bin_size_s: float = 1.0,
|
||
keep_pids: set | None = None,
|
||
t0: float | None = None,
|
||
n_bins: int | None = None) -> tuple[np.ndarray, ...]:
|
||
"""Bin per-PID snapshots into a cluster-wide timeline.
|
||
|
||
For each time bin, sum used_blocks across PIDs that emitted a snapshot
|
||
in that bin. PIDs without a sample in a bin carry their previous value
|
||
forward (so a quiet PID doesn't artificially drop the total).
|
||
|
||
If keep_pids is given, only those PIDs are counted (used for per-role
|
||
P-pool / D-pool splits); the pool ceiling is summed over the same subset.
|
||
Pass a shared t0/n_bins so role-splits land on the same time grid.
|
||
"""
|
||
if keep_pids is not None:
|
||
snaps = [s for s in snaps if s["pid"] in keep_pids]
|
||
if not snaps:
|
||
empty = np.array([], dtype=float)
|
||
return empty, empty, empty, empty, empty
|
||
if t0 is None:
|
||
t0 = snaps[0]["t_unix"]
|
||
if n_bins is None:
|
||
t_end = snaps[-1]["t_unix"]
|
||
n_bins = max(1, int(np.ceil((t_end - t0) / bin_size_s)) + 1)
|
||
times = np.arange(n_bins) * bin_size_s
|
||
|
||
pids = sorted({s["pid"] for s in snaps})
|
||
pid_to_idx = {pid: i for i, pid in enumerate(pids)}
|
||
|
||
# last-known used/total/waiting per PID at each bin (carry-forward)
|
||
used = np.zeros((len(pids), n_bins), dtype=np.int64)
|
||
waiting = np.zeros((len(pids), n_bins), dtype=np.int64)
|
||
running = np.zeros((len(pids), n_bins), dtype=np.int64)
|
||
total_per_pid = np.zeros(len(pids), dtype=np.int64)
|
||
|
||
last_used = [0] * len(pids)
|
||
last_waiting = [0] * len(pids)
|
||
last_running = [0] * len(pids)
|
||
|
||
snap_iter = iter(snaps)
|
||
next_snap = next(snap_iter, None)
|
||
|
||
for b in range(n_bins):
|
||
t_lo = t0 + b * bin_size_s
|
||
t_hi = t_lo + bin_size_s
|
||
while next_snap is not None and next_snap["t_unix"] < t_hi:
|
||
i = pid_to_idx[next_snap["pid"]]
|
||
last_used[i] = next_snap.get("used_blocks", 0)
|
||
last_waiting[i] = len(next_snap.get("waiting", []))
|
||
last_running[i] = len(next_snap.get("running", []))
|
||
total_per_pid[i] = next_snap.get("total_blocks", 0)
|
||
next_snap = next(snap_iter, None)
|
||
for i in range(len(pids)):
|
||
used[i, b] = last_used[i]
|
||
waiting[i, b] = last_waiting[i]
|
||
running[i, b] = last_running[i]
|
||
|
||
total_used = used.sum(axis=0)
|
||
total_pool = int(total_per_pid.sum())
|
||
total_waiting = waiting.sum(axis=0)
|
||
total_running = running.sum(axis=0)
|
||
pool_frac = total_used / max(total_pool, 1)
|
||
return times, total_used, pool_frac, total_waiting, total_running
|
||
|
||
|
||
def load_summary(rundir: Path) -> dict | None:
|
||
p = rundir / "replay_metrics.summary.json"
|
||
if not p.is_file():
|
||
return None
|
||
return json.loads(p.read_text())
|
||
|
||
|
||
def _steady_median(arr: np.ndarray) -> float:
|
||
n = len(arr)
|
||
if n == 0:
|
||
return 0.0
|
||
if n >= 10:
|
||
return float(np.median(arr[int(n * 0.1):int(n * 0.9)]))
|
||
return float(np.median(arr))
|
||
|
||
|
||
def per_run_metrics(snaps_dir: Path, rundir: Path) -> dict:
|
||
snaps = load_snapshots_for_run(snaps_dir)
|
||
summary = load_summary(rundir) or {}
|
||
|
||
# Establish a shared time grid (global t0 / n_bins) so the overall and
|
||
# per-role timelines all line up on the same x axis.
|
||
if snaps:
|
||
t0 = snaps[0]["t_unix"]
|
||
t_end = snaps[-1]["t_unix"]
|
||
n_bins = max(1, int(np.ceil(t_end - t0)) + 1)
|
||
else:
|
||
t0, n_bins = None, None
|
||
|
||
times, total_used, pool_frac, total_waiting, total_running = cluster_timeline(
|
||
snaps, t0=t0, n_bins=n_bins
|
||
)
|
||
n = len(times)
|
||
|
||
out = {
|
||
"times": times.tolist(),
|
||
"total_used": total_used.tolist(),
|
||
"pool_frac": pool_frac.tolist(),
|
||
"total_waiting": total_waiting.tolist(),
|
||
"total_running": total_running.tolist(),
|
||
"peak_pool_frac": float(pool_frac.max()) if n else 0.0,
|
||
"steady_pool_frac": _steady_median(pool_frac),
|
||
"peak_waiting": int(total_waiting.max()) if n else 0,
|
||
"summary": summary,
|
||
}
|
||
|
||
# Per-role (P-pool vs D-pool) split for PD configs.
|
||
roles = load_pid_roles(snaps_dir.parent / "vllm_logs")
|
||
p_pids = {pid for pid, r in roles.items() if r == "P"}
|
||
d_pids = {pid for pid, r in roles.items() if r == "D"}
|
||
if p_pids and d_pids:
|
||
for tag, subset in (("p", p_pids), ("d", d_pids)):
|
||
_, _, frac, _, run = cluster_timeline(
|
||
snaps, keep_pids=subset, t0=t0, n_bins=n_bins
|
||
)
|
||
out[f"{tag}_pool_frac"] = frac.tolist()
|
||
out[f"{tag}_running"] = run.tolist()
|
||
out[f"{tag}_peak_frac"] = float(frac.max()) if len(frac) else 0.0
|
||
out[f"{tag}_steady_frac"] = _steady_median(frac)
|
||
return out
|
||
|
||
|
||
def collect_sweep(sweep_root: Path, tag: str, configs: list[str], reps: int) -> dict:
|
||
"""Returns {config: [run_record_per_rep]}."""
|
||
out: dict[str, list[dict]] = defaultdict(list)
|
||
for config in configs:
|
||
for rep in range(1, reps + 1):
|
||
rundir = sweep_root / f"{tag}_{config}_rep{rep}"
|
||
snap_dir = sweep_root / f"{tag}_{config}_rep{rep}_{config}/kv_snapshots"
|
||
if not snap_dir.is_dir():
|
||
print(f"[agg] MISSING: {snap_dir}")
|
||
continue
|
||
metrics = per_run_metrics(snap_dir, rundir)
|
||
metrics["rep"] = rep
|
||
out[config].append(metrics)
|
||
print(
|
||
f"[agg] {config} rep{rep}: peak={metrics['peak_pool_frac']:.1%} "
|
||
f"steady={metrics['steady_pool_frac']:.1%} "
|
||
f"peak_wait={metrics['peak_waiting']}"
|
||
)
|
||
return out
|
||
|
||
|
||
def plot_kv_timeline(sweep: dict, out: Path) -> None:
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
n_configs = len(sweep)
|
||
if n_configs == 0:
|
||
return
|
||
fig, axes = plt.subplots(n_configs, 1, figsize=(14, 2.5 * n_configs), sharex=True)
|
||
if n_configs == 1:
|
||
axes = [axes]
|
||
for ax, (config, reps) in zip(axes, sweep.items()):
|
||
for rep_data in reps:
|
||
t = np.asarray(rep_data["times"])
|
||
ax.plot(t, np.asarray(rep_data["pool_frac"]) * 100, alpha=0.4, lw=1.0,
|
||
label=f"rep{rep_data['rep']}")
|
||
# bold median across reps (need to align times — use longest series)
|
||
if reps:
|
||
max_len = max(len(r["times"]) for r in reps)
|
||
arr = np.full((len(reps), max_len), np.nan)
|
||
for i, r in enumerate(reps):
|
||
arr[i, :len(r["pool_frac"])] = r["pool_frac"]
|
||
median = np.nanmedian(arr, axis=0) * 100
|
||
ax.plot(np.arange(max_len), median, color="#222", lw=2.0, label="median")
|
||
ax.axhline(90, color="#c44e52", ls="--", alpha=0.6, lw=1, label="90%")
|
||
ax.set_ylim(0, 105)
|
||
ax.set_ylabel(f"{config}\ncluster KV (%)")
|
||
ax.grid(True, alpha=0.3)
|
||
ax.legend(loc="upper right", fontsize=8)
|
||
axes[-1].set_xlabel("wall-clock since first snapshot (s)")
|
||
fig.suptitle("MB5: cluster-wide KV pool utilization over time", fontsize=12)
|
||
fig.tight_layout()
|
||
out.parent.mkdir(parents=True, exist_ok=True)
|
||
fig.savefig(out, dpi=120)
|
||
plt.close(fig)
|
||
print(f"wrote {out}")
|
||
|
||
|
||
def plot_peak_utilization(sweep: dict, out: Path) -> None:
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
configs = list(sweep.keys())
|
||
peaks = [[r["peak_pool_frac"] * 100 for r in sweep[c]] for c in configs]
|
||
steady = [[r["steady_pool_frac"] * 100 for r in sweep[c]] for c in configs]
|
||
peak_means = [np.mean(p) if p else 0 for p in peaks]
|
||
peak_std = [np.std(p) if len(p) > 1 else 0 for p in peaks]
|
||
steady_means = [np.mean(s) if s else 0 for s in steady]
|
||
steady_std = [np.std(s) if len(s) > 1 else 0 for s in steady]
|
||
|
||
x = np.arange(len(configs))
|
||
width = 0.35
|
||
|
||
fig, ax = plt.subplots(figsize=(9, 4.5))
|
||
ax.bar(x - width/2, peak_means, width, yerr=peak_std, label="peak",
|
||
color="#c44e52", capsize=4)
|
||
ax.bar(x + width/2, steady_means, width, yerr=steady_std, label="steady (10–90%)",
|
||
color="#4c72b0", capsize=4)
|
||
ax.axhline(90, color="#444", ls="--", alpha=0.5, lw=1, label="90% red line")
|
||
ax.set_xticks(x)
|
||
ax.set_xticklabels(configs)
|
||
ax.set_ylabel("Cluster KV pool utilization (%)")
|
||
ax.set_ylim(0, 105)
|
||
ax.set_title("MB5: KV pool pressure — peak vs steady-state")
|
||
ax.legend(loc="upper left", fontsize=9)
|
||
ax.grid(True, axis="y", alpha=0.3)
|
||
fig.tight_layout()
|
||
fig.savefig(out, dpi=120)
|
||
plt.close(fig)
|
||
print(f"wrote {out}")
|
||
|
||
|
||
def plot_latency_compare(sweep: dict, out: Path) -> None:
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
configs = list(sweep.keys())
|
||
metrics = ["p50", "p90", "p99"]
|
||
data = {m: [] for m in metrics}
|
||
for c in configs:
|
||
for m in metrics:
|
||
vals = []
|
||
for r in sweep[c]:
|
||
s = r["summary"].get("latency_stats_s")
|
||
if s and s.get(m) is not None:
|
||
vals.append(s[m])
|
||
data[m].append(np.mean(vals) if vals else 0.0)
|
||
|
||
x = np.arange(len(configs))
|
||
width = 0.25
|
||
colors = {"p50": "#4c72b0", "p90": "#dd8452", "p99": "#c44e52"}
|
||
fig, ax = plt.subplots(figsize=(9, 4.5))
|
||
for i, m in enumerate(metrics):
|
||
ax.bar(x + (i - 1) * width, data[m], width, label=m, color=colors[m])
|
||
ax.set_xticks(x)
|
||
ax.set_xticklabels(configs)
|
||
ax.set_ylabel("End-to-end latency (s)")
|
||
ax.set_title("MB5: e2e latency by PD configuration")
|
||
ax.legend()
|
||
ax.grid(True, axis="y", alpha=0.3)
|
||
fig.tight_layout()
|
||
fig.savefig(out, dpi=120)
|
||
plt.close(fig)
|
||
print(f"wrote {out}")
|
||
|
||
|
||
def plot_role_split(sweep: dict, out: Path) -> None:
|
||
"""For PD configs, show P-pool vs D-pool KV % over time (rep1) — exposes
|
||
the imbalance that the cluster average hides. 8C (no role split) shows
|
||
the overall cluster line for reference."""
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
n_configs = len(sweep)
|
||
if n_configs == 0:
|
||
return
|
||
fig, axes = plt.subplots(n_configs, 1, figsize=(14, 2.6 * n_configs), sharex=True)
|
||
if n_configs == 1:
|
||
axes = [axes]
|
||
for ax, (config, reps) in zip(axes, sweep.items()):
|
||
if not reps:
|
||
continue
|
||
r = reps[0] # rep1
|
||
t = np.asarray(r["times"])
|
||
if "p_pool_frac" in r and "d_pool_frac" in r:
|
||
ax.plot(t, np.asarray(r["p_pool_frac"]) * 100, color="#4c72b0",
|
||
lw=1.5, label="P-pool (prefill)")
|
||
ax.plot(t, np.asarray(r["d_pool_frac"]) * 100, color="#c44e52",
|
||
lw=1.5, label="D-pool (decode)")
|
||
ax.plot(t, np.asarray(r["pool_frac"]) * 100, color="#999",
|
||
lw=1.0, ls=":", label="cluster avg")
|
||
else:
|
||
ax.plot(t, np.asarray(r["pool_frac"]) * 100, color="#222",
|
||
lw=1.5, label="cluster (kv_both)")
|
||
ax.axhline(90, color="#444", ls="--", alpha=0.5, lw=1)
|
||
ax.set_ylim(0, 105)
|
||
ax.set_ylabel(f"{config}\nKV pool (%)")
|
||
ax.grid(True, alpha=0.3)
|
||
ax.legend(loc="upper right", fontsize=8, ncol=3)
|
||
axes[-1].set_xlabel("wall-clock since first snapshot (s)")
|
||
fig.suptitle("MB5: per-role KV pool utilization (P-pool vs D-pool), rep1",
|
||
fontsize=12)
|
||
fig.tight_layout()
|
||
out.parent.mkdir(parents=True, exist_ok=True)
|
||
fig.savefig(out, dpi=120)
|
||
plt.close(fig)
|
||
print(f"wrote {out}")
|
||
|
||
|
||
def write_summary_csv(sweep: dict, out: Path) -> None:
|
||
rows = []
|
||
for config, reps in sweep.items():
|
||
for r in reps:
|
||
s = r["summary"]
|
||
lat = s.get("latency_stats_s") or {}
|
||
ttft = s.get("ttft_stats_s") or {}
|
||
rows.append({
|
||
"config": config,
|
||
"rep": r["rep"],
|
||
"n_requests": s.get("request_count"),
|
||
"n_success": s.get("success_count"),
|
||
"wall_clock_s": s.get("wall_clock_s"),
|
||
"peak_pool_frac": r["peak_pool_frac"],
|
||
"steady_pool_frac": r["steady_pool_frac"],
|
||
"p_pool_peak_frac": r.get("p_peak_frac"),
|
||
"p_pool_steady_frac": r.get("p_steady_frac"),
|
||
"d_pool_peak_frac": r.get("d_peak_frac"),
|
||
"d_pool_steady_frac": r.get("d_steady_frac"),
|
||
"peak_waiting": r["peak_waiting"],
|
||
"latency_p50_s": lat.get("p50"),
|
||
"latency_p90_s": lat.get("p90"),
|
||
"latency_p99_s": lat.get("p99"),
|
||
"ttft_p50_s": ttft.get("p50"),
|
||
"ttft_p90_s": ttft.get("p90"),
|
||
"ttft_p99_s": ttft.get("p99"),
|
||
"prefix_cache_hit_ratio": s.get("prefix_cache_hit_ratio"),
|
||
})
|
||
if not rows:
|
||
print("[agg] no rows; skipping CSV")
|
||
return
|
||
out.parent.mkdir(parents=True, exist_ok=True)
|
||
with out.open("w", newline="") as fh:
|
||
w = csv.DictWriter(fh, fieldnames=list(rows[0].keys()))
|
||
w.writeheader()
|
||
w.writerows(rows)
|
||
print(f"wrote {out} ({len(rows)} rows)")
|
||
|
||
|
||
def render_all(sweep: dict, out_dir: Path) -> None:
|
||
plot_kv_timeline(sweep, out_dir / "mb5_kv_timeline.png")
|
||
plot_role_split(sweep, out_dir / "mb5_role_split.png")
|
||
plot_peak_utilization(sweep, out_dir / "mb5_peak_utilization.png")
|
||
plot_latency_compare(sweep, out_dir / "mb5_latency_compare.png")
|
||
write_summary_csv(sweep, out_dir / "mb5_summary.csv")
|
||
|
||
|
||
def main() -> None:
|
||
p = argparse.ArgumentParser(
|
||
description="MB5 aggregate. Two-stage: --reduce (numpy-only, runs on "
|
||
"a serving host) dumps a compact JSON; --from-reduced "
|
||
"(needs matplotlib) renders figures locally. Or run "
|
||
"directly (raw snapshots -> figures) when both the data "
|
||
"and matplotlib are local."
|
||
)
|
||
p.add_argument("--sweep-root", type=Path,
|
||
help="dir containing ${tag}_${config}_rep${N}/ subdirs")
|
||
p.add_argument("--tag")
|
||
p.add_argument("--configs", default="8C 6P+2D 4P+4D 2P+6D",
|
||
help="space-separated config names")
|
||
p.add_argument("--reps", type=int, default=3)
|
||
p.add_argument("--out-dir", type=Path, default=Path("figs/mb5"))
|
||
p.add_argument("--reduce-to", type=Path,
|
||
help="numpy-only: write reduced sweep JSON here and exit "
|
||
"(no plotting, no matplotlib needed)")
|
||
p.add_argument("--from-reduced", type=Path,
|
||
help="load a reduced sweep JSON (from --reduce-to) and "
|
||
"render figures into --out-dir")
|
||
args = p.parse_args()
|
||
|
||
if args.from_reduced:
|
||
sweep = json.loads(args.from_reduced.read_text())
|
||
render_all(sweep, args.out_dir)
|
||
return
|
||
|
||
if not (args.sweep_root and args.tag):
|
||
p.error("--sweep-root and --tag are required unless --from-reduced is given")
|
||
|
||
configs = args.configs.split()
|
||
sweep = collect_sweep(args.sweep_root, args.tag, configs, args.reps)
|
||
|
||
if args.reduce_to:
|
||
args.reduce_to.parent.mkdir(parents=True, exist_ok=True)
|
||
args.reduce_to.write_text(json.dumps(sweep))
|
||
print(f"wrote reduced sweep -> {args.reduce_to}")
|
||
return
|
||
|
||
render_all(sweep, args.out_dir)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|