Three independent bugs were blocking PD-disagg smoke; each fix is
isolated so the next PD experiment doesn't re-hit them.
1. mb5_launch.sh
- stop_all() also kills mb5_pd_proxy.py (our vendored copy),
not just the upstream filename, and asserts ports 8000-8007 +
PROXY_PORT are free before launching — stale proxies were
silently passing the readiness check.
- Proxy readiness uses a generic "any HTTP response" probe;
mooncake_connector_proxy only exposes /v1/completions so
/v1/models 404 is expected.
2. mb5_pd_proxy.py (vendored from third_party so deploy.sh ships it)
- Force min_tokens=1 on the prefill leg. Clients that set
min_tokens == max_tokens (our replayer does) collide with
vLLM's min_tokens<=max_tokens check after the proxy caps
max_tokens=1.
3. instrument_kv_snapshot.py
- Adds a second patch target: initialize
MooncakeConnectorWorker.bootstrap_server = None in __init__.
vLLM 0.18.1 only sets it under the is_kv_producer branch, so
kv_consumer hits AttributeError as soon as the first remote
prefill request lands.
- apply/revert refactored to iterate over (path, patches) pairs.
plot_kv_pool_timeline.py also handles snapshot files that never
captured a running request (would otherwise IndexError on an empty
stackplot input).
Smoke: 4P+4D × 20 reqs → 20/20 success, mean 3.9s, p99 17s, 8 PIDs
all writing snapshots (601 total), well above the 8C baseline.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
165 lines
5.8 KiB
Python
165 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
||
"""Plot per-instance KV-pool composition over time from MB5 snapshots.
|
||
|
||
Input: a directory of mb5_kv_snapshot_pid<PID>.jsonl files (one per
|
||
EngineCore PID) — typically MB5_LOG_DIR set during a run.
|
||
|
||
For each snapshot file, builds a stacked-area chart:
|
||
x axis = wall-clock time since first snapshot
|
||
y axis = KV block count (total pool size = ceiling line)
|
||
stacked bands = per-request block usage; each request gets one
|
||
band colored by a hash-based palette so the eye
|
||
can track individual requests across the run.
|
||
|
||
Also overlays:
|
||
- free_blocks (white area at the top)
|
||
- 90% capacity line (red dashed)
|
||
- waiting-queue depth (separate small subplot beneath)
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
|
||
|
||
def load_snapshots(path: Path) -> list[dict]:
|
||
out = []
|
||
with path.open() as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
out.append(json.loads(line))
|
||
except json.JSONDecodeError:
|
||
continue
|
||
return out
|
||
|
||
|
||
def req_color(req_id: str) -> str:
|
||
"""Deterministic color per request_id."""
|
||
h = hashlib.md5(req_id.encode()).hexdigest()
|
||
return f"#{h[:6]}"
|
||
|
||
|
||
def plot_one_instance(snaps: list[dict], out: Path, title: str) -> None:
|
||
if not snaps:
|
||
return
|
||
snaps = sorted(snaps, key=lambda s: s["t_unix"])
|
||
t0 = snaps[0]["t_unix"]
|
||
times = [s["t_unix"] - t0 for s in snaps]
|
||
total_blocks = snaps[0]["total_blocks"]
|
||
|
||
# Build the request × time block-count matrix
|
||
all_req_ids: list[str] = []
|
||
req_first_seen: dict[str, float] = {}
|
||
for s in snaps:
|
||
for r in s.get("running", []):
|
||
rid = r["req_id"]
|
||
if rid not in req_first_seen:
|
||
req_first_seen[rid] = s["t_unix"] - t0
|
||
all_req_ids.append(rid)
|
||
# Sort by first-seen time so the band order follows arrival
|
||
all_req_ids.sort(key=lambda r: req_first_seen[r])
|
||
|
||
if not all_req_ids:
|
||
# No requests ever ran on this instance; plot a flat used_blocks line
|
||
# instead of the stackplot (which can't handle empty input).
|
||
fig, ax1 = plt.subplots(figsize=(13, 4))
|
||
used = [s["used_blocks"] for s in snaps]
|
||
ax1.plot(times, used, color="#888", lw=1.5, label="used_blocks (no running reqs sampled)")
|
||
ax1.axhline(total_blocks, color="#444", lw=1.5, ls="-",
|
||
label=f"pool total = {total_blocks} blocks")
|
||
ax1.axhline(total_blocks * 0.9, color="#c44e52", lw=1.2, ls="--", alpha=0.7,
|
||
label="90% capacity")
|
||
ax1.set_ylabel("KV blocks")
|
||
ax1.set_ylim(0, total_blocks * 1.05)
|
||
ax1.set_xlabel("wall-clock since first snapshot (s)")
|
||
ax1.set_title(title + " [no per-request data; instance idle?]")
|
||
ax1.legend(loc="upper right", fontsize=9)
|
||
ax1.grid(True, alpha=0.3)
|
||
out.parent.mkdir(parents=True, exist_ok=True)
|
||
fig.tight_layout()
|
||
fig.savefig(out, dpi=120)
|
||
plt.close(fig)
|
||
print(f"wrote {out} (n_snapshots={len(snaps)}, 0 running reqs ever)")
|
||
return
|
||
|
||
matrix = np.zeros((len(all_req_ids), len(times)), dtype=np.int64)
|
||
req_to_row = {r: i for i, r in enumerate(all_req_ids)}
|
||
for j, s in enumerate(snaps):
|
||
for r in s.get("running", []):
|
||
i = req_to_row[r["req_id"]]
|
||
matrix[i, j] = r.get("n_blocks", 0)
|
||
|
||
fig, (ax1, ax2) = plt.subplots(
|
||
2, 1, figsize=(13, 6),
|
||
sharex=True,
|
||
gridspec_kw={"height_ratios": [4, 1]},
|
||
)
|
||
|
||
colors = [req_color(r) for r in all_req_ids]
|
||
ax1.stackplot(times, matrix, colors=colors, linewidth=0)
|
||
ax1.axhline(total_blocks, color="#444", lw=1.5, ls="-",
|
||
label=f"pool total = {total_blocks} blocks")
|
||
ax1.axhline(total_blocks * 0.9, color="#c44e52", lw=1.2, ls="--", alpha=0.7,
|
||
label="90% capacity")
|
||
ax1.set_ylabel("KV blocks (per-request stacked)")
|
||
ax1.set_ylim(0, total_blocks * 1.05)
|
||
ax1.set_title(title)
|
||
ax1.legend(loc="upper right", fontsize=9, framealpha=0.95)
|
||
ax1.grid(True, alpha=0.3)
|
||
|
||
# Waiting queue depth subplot
|
||
wait_lens = [len(s.get("waiting", [])) for s in snaps]
|
||
ax2.fill_between(times, 0, wait_lens, color="#c44e52", alpha=0.55,
|
||
label="waiting requests")
|
||
ax2.set_ylabel("queue\ndepth")
|
||
ax2.set_xlabel("wall-clock since first snapshot (s)")
|
||
ax2.set_ylim(0, max(max(wait_lens, default=0), 1) * 1.2 + 1)
|
||
ax2.grid(True, alpha=0.3)
|
||
ax2.legend(loc="upper right", fontsize=8)
|
||
|
||
out.parent.mkdir(parents=True, exist_ok=True)
|
||
fig.tight_layout()
|
||
fig.savefig(out, dpi=120)
|
||
plt.close(fig)
|
||
print(f"wrote {out} (n_snapshots={len(snaps)}, n_requests={len(all_req_ids)})")
|
||
|
||
|
||
def main() -> None:
|
||
p = argparse.ArgumentParser()
|
||
p.add_argument("--snapshot-dir", type=Path, required=True,
|
||
help="dir containing mb5_kv_snapshot_pid*.jsonl files")
|
||
p.add_argument("--out-dir", type=Path, required=True)
|
||
p.add_argument("--label", default="",
|
||
help="prefix for output filenames + figure title")
|
||
args = p.parse_args()
|
||
|
||
files = sorted(args.snapshot_dir.glob("mb5_kv_snapshot_pid*.jsonl"))
|
||
if not files:
|
||
print(f"[plot] no snapshot files in {args.snapshot_dir}")
|
||
return
|
||
|
||
for f in files:
|
||
pid = f.stem.replace("mb5_kv_snapshot_pid", "")
|
||
snaps = load_snapshots(f)
|
||
if not snaps:
|
||
print(f"[plot] {f.name}: empty")
|
||
continue
|
||
out = args.out_dir / f"{args.label}_pid{pid}.png"
|
||
title = f"{args.label} pid={pid} (n_snap={len(snaps)})"
|
||
plot_one_instance(snaps, out, title)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|