#!/usr/bin/env python3 """Per-producer KV-pool occupancy: round-robin vs session-affinity. Evidence for the ยง6.3 producer hot-pinning claim. Under session-affinity P routing, heavy multi-turn sessions concentrate onto individual producers, so one producer's KV pool runs hot while the others idle. Round-robin spreads the load, keeping producers balanced. Two-stage (same pattern as aggregate_mb5.py) so the numpy-only reduce can run on a serving host over multi-GB snapshot dirs: # on the host with the data (numpy only): python plot_producer_hotspot.py --reduce \\ --snapshot-dir .../rr_4P4D_..._4P+4D/kv_snapshots \\ --label "round-robin 4P+4D" --out rr_prod.json # locally (matplotlib): python plot_producer_hotspot.py --plot --rr rr_prod.json \\ --session session_prod.json --out figs/mb5/mb5_producer_hotspot.png """ from __future__ import annotations import argparse import json from pathlib import Path import numpy as np from aggregate_mb5 import load_snapshots_for_run, load_pid_roles, cluster_timeline def reduce_run(snapshot_dir: Path, label: str) -> dict: """Per-producer KV-pool fraction timeline on a shared time grid.""" snaps = load_snapshots_for_run(snapshot_dir) roles = load_pid_roles(snapshot_dir.parent / "vllm_logs") if not snaps: return {"label": label, "producers": []} t0 = snaps[0]["t_unix"] t_end = snaps[-1]["t_unix"] n_bins = max(1, int(np.ceil(t_end - t0)) + 1) prod_pids = sorted(pid for pid, r in roles.items() if r == "P") producers = [] for i, pid in enumerate(prod_pids): times, _, frac, _, _ = cluster_timeline( snaps, keep_pids={pid}, t0=t0, n_bins=n_bins ) producers.append({ "pid": pid, "idx": i, "times": times.tolist(), "frac": frac.tolist(), }) return {"label": label, "producers": producers} def _steady_band(frac: np.ndarray) -> float: n = len(frac) if n >= 10: return float(np.median(frac[int(n * 0.1):int(n * 0.9)])) return float(np.median(frac)) if n else 0.0 def plot(rr: dict, session: dict, out: Path) -> None: import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt fig, axes = plt.subplots(1, 2, figsize=(15, 5), sharey=True) palette = ["#4c72b0", "#dd8452", "#55a868", "#c44e52", "#8172b3", "#937860", "#da8bc3", "#8c8c8c"] for ax, run in zip(axes, (rr, session)): prods = run["producers"] steadies = [] for p in prods: t = np.asarray(p["times"]) f = np.asarray(p["frac"]) * 100 ax.plot(t, f, lw=1.3, color=palette[p["idx"] % len(palette)], label=f"P{p['idx']} (pid {p['pid']})") steadies.append(_steady_band(np.asarray(p["frac"])) * 100) # imbalance metric: spread across producers (max - min of steady band) if steadies: spread = max(steadies) - min(steadies) cv = (np.std(steadies) / np.mean(steadies)) if np.mean(steadies) else 0 sub = (f"per-producer steady KV: " f"min={min(steadies):.0f}% max={max(steadies):.0f}% " f"spread={spread:.0f}pp CV={cv:.2f}") else: sub = "no producer data" ax.set_title(f"{run['label']}\n{sub}", fontsize=10) ax.set_xlabel("wall-clock since first snapshot (s)") ax.set_ylim(0, 105) ax.grid(True, alpha=0.3) ax.legend(loc="upper right", fontsize=8) axes[0].set_ylabel("per-producer KV pool utilization (%)") fig.suptitle( "Producer hot-pinning: round-robin spreads prefill load; " "session-affinity concentrates it", fontsize=12, ) fig.tight_layout() out.parent.mkdir(parents=True, exist_ok=True) fig.savefig(out, dpi=120) plt.close(fig) print(f"wrote {out}") def main() -> None: p = argparse.ArgumentParser() p.add_argument("--reduce", action="store_true") p.add_argument("--plot", action="store_true") p.add_argument("--snapshot-dir", type=Path) p.add_argument("--label", default="") p.add_argument("--out", type=Path, required=True) p.add_argument("--rr", type=Path, help="reduced round-robin JSON (for --plot)") p.add_argument("--session", type=Path, help="reduced session JSON (for --plot)") args = p.parse_args() if args.reduce: if not args.snapshot_dir: p.error("--reduce needs --snapshot-dir") data = reduce_run(args.snapshot_dir, args.label) args.out.parent.mkdir(parents=True, exist_ok=True) args.out.write_text(json.dumps(data)) n = len(data["producers"]) print(f"wrote {args.out} ({n} producers)") elif args.plot: if not (args.rr and args.session): p.error("--plot needs --rr and --session") plot(json.loads(args.rr.read_text()), json.loads(args.session.read_text()), args.out) else: p.error("specify --reduce or --plot") if __name__ == "__main__": main()