Files
agentic-kvc/analysis/pd_sep_paper_section/scripts/plot_pd_matrix.py
Gahow Wang cd82b8c2a2 PD-sep matrix results: C2/C3/C4 figures + empirical mechanism refined
Captures 5 runs from the experiment matrix (combined-ca x3 seeds,
pdsep-4p4d seed1, pdsep-6p2d seed1) on traces/w600_r0.0015_st30.jsonl
with cuda graphs enabled. The headline:

  combined-ca:  TTFT p50 0.91s   success 99.5%
  pdsep-4p4d:   TTFT p50 62.8s   success 52%   (69x worse, half dropped)
  pdsep-6p2d:   TTFT p50 51.1s   success 68%   (56x worse, third dropped)

C2 (fig_c2): headline bars per config with error bars.
C3 (fig_c3): per-instance KV utilization time-series. Both PD-sep
  splits hit the memory wall, but the side differs by P:D ratio --
  4P+4D pins the P-side, 6P+2D pins both sides (D-side back-pressures
  P-side).
C4 (fig_c4): TTFT stacked breakdown. 99% of PD-sep TTFT is P-side
  prefill compute; D-side wait + first token is <=1.2s. The bottleneck
  is P-side prefill queueing, not D-side decode wait as the original
  analytical model assumed.

system_analysis.md gains a Layer 5b that reconciles the analytical
KV-wall model (which considered D-side only) with the empirical
finding that the wall hits whichever side has fewer GPUs, and
co-saturates both at extreme splits via D-side back-pressure.

plot_pd_matrix.py ingests outputs/pd_matrix/* into all four figures.
bench.sh gained AGENTIC_STEP_LOG_DIR hooks for future runs (set during
this work but not used by the current matrix's data).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 16:23:52 +08:00

491 lines
18 KiB
Python

"""Render C2/C3/C4/C5 from outputs/pd_matrix/.
Reads each completed run in outputs/pd_matrix/<config>_<mode>_seed<N>/ and
produces:
C2: figures/fig_c2_pdsep_vs_combined.pdf
Bar chart with mean ± stderr (over seeds) for
TTFT p50, TTFT p90, TPOT p90, E2E p50.
Bars per config (combined-ca / pdsep-4p4d / pdsep-6p2d),
grouped by cuda-graph mode if eager runs present.
C3: figures/fig_c3_kv_timeseries.pdf
Per-instance GPU KV cache usage time-series mined from
vllm_inst_*.log "Engine 000: ... GPU KV cache usage: X%" lines.
One panel per config; D-instances in PD-sep configs highlighted.
Memory-wall threshold (90%) drawn.
C4: figures/fig_c4_ttft_stacked.pdf
Stacked TTFT bar per config showing per-stage time:
Combined: just TTFT (single segment, no stage decomposition).
PD-sep: proxy_recv -> prefill_sent (queue on P)
prefill_sent -> prefill_done (prefill compute on P)
prefill_done -> decode_sent (proxy hop)
decode_sent -> first_token (KV pull + decode wait on D)
C5: figures/fig_c5_cudagraph_ablation.pdf (only if eager runs exist)
Cuda-graph on vs off, per config. Captures the "PD-sep would benefit
from D-only graphs" claim quantitatively.
Usage:
.venv/bin/python analysis/pd_sep_paper_section/scripts/plot_pd_matrix.py
"""
import argparse
import json
import re
import statistics
from dataclasses import dataclass, field
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
CONFIG_ORDER = ["combined-ca", "combined-rr", "pdsep-4p4d", "pdsep-6p2d"]
CONFIG_COLOR = {
"combined-ca": "#2ca02c",
"combined-rr": "#7f7f7f",
"pdsep-4p4d": "#ff7f0e",
"pdsep-6p2d": "#d62728",
}
# Engine log line: timestamps interleaved with usage metrics.
# Example:
# (APIServer pid=...) INFO 05-22 18:29:55 [loggers.py:259] Engine 000:
# Avg prompt throughput: ... Running: N reqs, Waiting: M reqs,
# GPU KV cache usage: Z%, Prefix cache hit rate: P%
KV_LOG_RE = re.compile(
r"INFO (\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*Engine 000: "
r".*Running: (\d+) reqs, Waiting: (\d+) reqs, "
r"GPU KV cache usage: ([\d.]+)%"
)
@dataclass
class Run:
tag: str
config: str
mode: str # cudagraph | eager
seed: int
summary: dict
breakdown: list = field(default_factory=list)
kv_series: dict = field(default_factory=dict) # inst_idx -> [(t_sec, usage%), ...]
@property
def is_pdsep(self) -> bool:
return self.config.startswith("pdsep")
@property
def is_combined(self) -> bool:
return self.config.startswith("combined")
def parse_tag(name: str) -> tuple[str, str, int] | None:
"""combined-ca_cudagraph_seed1 -> ("combined-ca", "cudagraph", 1)"""
m = re.match(r"(combined-ca|combined-rr|pdsep-4p4d|pdsep-6p2d)_(cudagraph|eager)_seed(\d+)", name)
if not m:
return None
return m.group(1), m.group(2), int(m.group(3))
def mine_kv_series(log_path: Path, start_epoch: float | None = None) -> list[tuple[float, float]]:
"""Return (seconds_since_first_log, kv_usage_percent) pairs."""
out: list[tuple[float, float]] = []
first_t: float | None = None
for line in log_path.read_text(errors="ignore").splitlines():
m = KV_LOG_RE.search(line)
if not m:
continue
# "05-22 18:29:55" -> seconds since first log of this file
# We can't recover absolute epoch without a year, but relative is enough.
ts_str = m.group(1)
# parse MM-DD HH:MM:SS into a comparable scalar (mins since 0)
try:
mm, dd = map(int, ts_str.split(" ")[0].split("-"))
hh, mi, ss = map(int, ts_str.split(" ")[1].split(":"))
except ValueError:
continue
t_abs = ((mm - 1) * 31 + (dd - 1)) * 86400 + hh * 3600 + mi * 60 + ss
if first_t is None:
first_t = t_abs
usage = float(m.group(4))
out.append((t_abs - first_t, usage))
return out
def load_runs(matrix_dir: Path) -> list[Run]:
runs: list[Run] = []
if not matrix_dir.exists():
return runs
for run_dir in sorted(matrix_dir.iterdir()):
if not run_dir.is_dir():
continue
parsed = parse_tag(run_dir.name)
if parsed is None:
continue
config, mode, seed = parsed
summary_p = run_dir / "metrics.summary.json"
if not summary_p.exists():
continue # in-flight or failed
summary = json.loads(summary_p.read_text())
breakdown_p = run_dir / "breakdown.json"
breakdown = json.loads(breakdown_p.read_text()) if breakdown_p.exists() else []
kv_series: dict = {}
for log in sorted(run_dir.glob("vllm_inst_*.log")):
m = re.match(r"vllm_inst_(\d+)\.log", log.name)
if not m:
continue
kv_series[int(m.group(1))] = mine_kv_series(log)
runs.append(Run(tag=run_dir.name, config=config, mode=mode, seed=seed,
summary=summary, breakdown=breakdown, kv_series=kv_series))
return runs
# ---------- C2: headline bars with error bars ----------
C2_METRICS = [
("TTFT p50 (s)", "ttft_stats_s", "p50"),
("TTFT p90 (s)", "ttft_stats_s", "p90"),
("TPOT p90 (s)", "tpot_stats_s", "p90"),
("E2E p50 (s)", "latency_stats_s", "p50"),
]
def aggregate_seeds(runs: list[Run]) -> dict:
"""Group by (config, mode); for each metric, return mean and stderr across seeds."""
grouped: dict[tuple[str, str], list[Run]] = {}
for r in runs:
grouped.setdefault((r.config, r.mode), []).append(r)
out: dict[tuple[str, str], dict] = {}
for key, rs in grouped.items():
agg: dict = {"n_seeds": len(rs)}
for label, family, percentile in C2_METRICS:
vals = []
for r in rs:
v = r.summary.get(family, {}).get(percentile)
if v is not None:
vals.append(float(v))
if not vals:
agg[label] = (float("nan"), 0.0)
elif len(vals) == 1:
agg[label] = (vals[0], 0.0)
else:
agg[label] = (statistics.mean(vals), statistics.stdev(vals) / (len(vals) ** 0.5))
out[key] = agg
return out
def plot_c2(runs: list[Run], out_path: Path):
agg = aggregate_seeds(runs)
if not agg:
print("[C2] no runs available; skipped")
return
modes_present = sorted({k[1] for k in agg})
configs_present = [c for c in CONFIG_ORDER if any(k[0] == c for k in agg)]
fig, axes = plt.subplots(1, len(C2_METRICS), figsize=(3.0 * len(C2_METRICS), 3.6))
if len(C2_METRICS) == 1:
axes = [axes]
bar_w = 0.8 / max(1, len(modes_present))
for ax, (label, _, _) in zip(axes, C2_METRICS):
for mi, mode in enumerate(modes_present):
xs, ys, errs, colors = [], [], [], []
for ci, cfg in enumerate(configs_present):
if (cfg, mode) not in agg:
continue
mean, sem = agg[(cfg, mode)][label]
xs.append(ci + (mi - (len(modes_present) - 1) / 2) * bar_w)
ys.append(mean)
errs.append(sem)
colors.append(CONFIG_COLOR.get(cfg, "#444"))
ax.bar(xs, ys, width=bar_w * 0.9, color=colors, yerr=errs,
capsize=3, edgecolor="black", linewidth=0.5,
label=mode if mi >= 0 else None,
hatch=("" if mode == "cudagraph" else "//"))
ax.set_xticks(range(len(configs_present)))
labels_with_n = [
f"{c}\n(N={agg[(c, modes_present[0])]['n_seeds']})"
for c in configs_present
]
ax.set_xticklabels(labels_with_n, fontsize=8.5)
ax.set_title(label, fontsize=10)
ax.grid(True, axis="y", alpha=0.25)
ax.set_ylim(bottom=0)
handles = []
for mode in modes_present:
handles.append(plt.Rectangle((0, 0), 1, 1, fc="#aaa",
hatch=("" if mode == "cudagraph" else "//"),
edgecolor="black"))
if len(modes_present) > 1:
fig.legend(handles, modes_present, loc="upper right", fontsize=9,
bbox_to_anchor=(0.99, 0.99))
fig.suptitle(
"PD-sep vs Combined headline latency "
"(trace=w600_r0.0015_st30, 850 reqs; per-config N shown under labels)",
fontsize=10, y=1.02,
)
fig.tight_layout()
fig.savefig(out_path, bbox_inches="tight")
plt.close(fig)
print(f"[C2] wrote {out_path}")
for key, v in sorted(agg.items()):
print(f" {key[0]:13s} {key[1]:10s} n={v['n_seeds']} "
+ " ".join(f"{lbl.split(' ')[0].lower()}={v[lbl][0]:.3f}" for lbl, _, _ in C2_METRICS))
# ---------- C3: KV cache utilization time-series ----------
def plot_c3(runs: list[Run], out_path: Path):
# Show seed=1 cudagraph runs only, one panel per config, all instances overlaid.
selected = [r for r in runs if r.mode == "cudagraph" and r.seed == 1 and r.kv_series]
if not selected:
print("[C3] no kv timeseries data; skipped")
return
selected.sort(key=lambda r: CONFIG_ORDER.index(r.config) if r.config in CONFIG_ORDER else 99)
fig, axes = plt.subplots(1, len(selected), figsize=(4.2 * len(selected), 3.6),
sharey=True)
if len(selected) == 1:
axes = [axes]
for ax, r in zip(axes, selected):
n_p, n_d = pd_split(r.config)
# First pass: P (or combined) lines under D lines so D is on top.
p_label_done = False; d_label_done = False
for inst_idx, series in sorted(r.kv_series.items()):
if not series:
continue
xs = [t for t, _ in series]
ys = [u for _, u in series]
if r.is_combined:
color = "#1f77b4"; lw = 0.9; zorder = 2
label = "combined GPU" if not p_label_done else None
p_label_done = True
elif inst_idx < n_p: # P-instance in pdsep
color = "#ff7f0e"; lw = 1.4; zorder = 3
label = f"P (inst 0..{n_p-1})" if not p_label_done else None
p_label_done = True
else: # D-instance
color = "#d62728"; lw = 1.4; zorder = 4
label = f"D (inst {n_p}..{n_p+n_d-1})" if not d_label_done else None
d_label_done = True
ax.plot(xs, ys, color=color, lw=lw, alpha=0.85, zorder=zorder, label=label)
ax.axhline(90, color="#888", ls=":", lw=1)
ax.text(ax.get_xlim()[1] * 0.98, 92, "memory wall (90%)",
fontsize=8, color="#666", ha="right")
# peak summary in title
peaks = [max(u for _, u in s) if s else 0 for s in r.kv_series.values()]
peak_summary = f"peaks {min(peaks):.0f}..{max(peaks):.0f}%"
ax.set_title(f"{r.config}\n{peak_summary}", fontsize=10)
ax.set_xlabel("time since first engine log (s)")
ax.set_ylim(0, 105)
ax.grid(True, alpha=0.25)
ax.legend(loc="lower right", fontsize=8, framealpha=0.9)
axes[0].set_ylabel("GPU KV cache usage (%)")
fig.suptitle(
"KV cache utilization: PD-sep concentrates pressure on whichever side has fewer GPUs",
fontsize=10, y=1.02,
)
fig.tight_layout()
fig.savefig(out_path, bbox_inches="tight")
plt.close(fig)
print(f"[C3] wrote {out_path}")
for r in selected:
peaks = [max(u for _, u in s) if s else 0 for s in r.kv_series.values()]
print(f" {r.config:13s} peak KV per inst: {[f'{p:.0f}%' for p in peaks]}")
def pd_split(config: str) -> tuple[int, int]:
"""Return (N_P, N_D) for the given config name. Combined = (8, 0)."""
if config == "pdsep-4p4d":
return 4, 4
if config == "pdsep-6p2d":
return 6, 2
return 8, 0 # combined: all instances do both
# ---------- C4: TTFT stacked breakdown ----------
def stages_for_record(rec: dict, is_pdsep: bool) -> dict | None:
t0 = rec.get("t_proxy_recv")
t_ft = rec.get("t_first_token")
if t0 is None or t_ft is None:
return None
if not is_pdsep:
return {"TTFT (combined)": t_ft - t0}
t_ps = rec.get("t_prefill_sent")
t_pd = rec.get("t_prefill_done")
t_ds = rec.get("t_decode_sent")
if any(x is None for x in (t_ps, t_pd, t_ds)):
return None
return {
"proxy->P queue": max(0.0, t_ps - t0),
"P prefill compute": max(0.0, t_pd - t_ps),
"P->D handoff": max(0.0, t_ds - t_pd),
"D wait + first tok": max(0.0, t_ft - t_ds),
}
def plot_c4(runs: list[Run], out_path: Path):
"""Stacked TTFT for each (config, seed=1). Combined gets a single-segment bar
so the comparison against PD-sep is direct, even though Combined has no
stage decomposition."""
# Pool all cudagraph seeds that have breakdown data, then compute per-stage p50.
by_config: dict[str, list[Run]] = {}
for r in runs:
if r.mode != "cudagraph" or not r.breakdown:
continue
by_config.setdefault(r.config, []).append(r)
if not by_config:
print("[C4] no breakdown data; skipped")
return
bars = []
for config in CONFIG_ORDER:
if config not in by_config:
continue
per_stage: dict[str, list[float]] = {}
is_pdsep = config.startswith("pdsep")
for r in by_config[config]:
for rec in r.breakdown:
s = stages_for_record(rec, is_pdsep)
if not s:
continue
for k, v in s.items():
per_stage.setdefault(k, []).append(v)
p50 = {k: float(np.median(v)) for k, v in per_stage.items() if v}
bars.append((config, p50, len(by_config[config])))
fig, ax = plt.subplots(figsize=(7.0, 4.2))
width = 0.55
stage_colors = {
"TTFT (combined)": "#2ca02c",
"proxy->P queue": "#1f77b4",
"P prefill compute": "#9467bd",
"P->D handoff": "#8c564b",
"D wait + first tok": "#d62728",
}
# consistent stage order
stage_order = ["TTFT (combined)", "proxy->P queue", "P prefill compute",
"P->D handoff", "D wait + first tok"]
x = list(range(len(bars)))
legend_seen: set[str] = set()
for i, (config, stages, n_seeds) in enumerate(bars):
bottom = 0.0
for stage in stage_order:
if stage not in stages:
continue
val = stages[stage]
color = stage_colors.get(stage, "#444")
label = stage if stage not in legend_seen else None
legend_seen.add(stage)
ax.bar(i, val, bottom=bottom, width=width, color=color,
edgecolor="white", linewidth=0.5, label=label)
if val > 1.0:
ax.text(i, bottom + val / 2, f"{val:.2f}s",
ha="center", va="center", fontsize=9, color="white",
fontweight="bold")
bottom += val
ax.text(i, bottom + 1.5, f"TTFT p50\n{bottom:.2f}s",
ha="center", va="bottom", fontsize=9, color="#222",
fontweight="bold")
ax.set_xticks(x)
ax.set_xticklabels([f"{b[0]}\n(N={b[2]})" for b in bars],
rotation=0, ha="center", fontsize=9)
ax.set_ylabel("TTFT p50 (s, stacked)")
ax.set_title(
"TTFT decomposition: PD-sep's TTFT is dominated by P-side prefill queueing",
fontsize=10,
)
if legend_seen:
ax.legend(loc="upper left", fontsize=8, frameon=False)
ax.grid(True, axis="y", alpha=0.25)
fig.tight_layout()
fig.savefig(out_path, bbox_inches="tight")
plt.close(fig)
print(f"[C4] wrote {out_path}")
for config, stages, n_seeds in bars:
tot = sum(stages.values())
print(f" {config:13s} (N={n_seeds}) TTFT p50 = {tot:.3f}s "
+ " ".join(f"{k}={v:.3f}" for k, v in stages.items()))
# ---------- C5: cuda-graph ablation 2x2 ----------
def plot_c5(runs: list[Run], out_path: Path):
modes = {r.mode for r in runs}
if "eager" not in modes:
print("[C5] skipped (no --with-eager runs)")
return
agg = aggregate_seeds(runs)
configs_present = [c for c in CONFIG_ORDER if any(k[0] == c for k in agg)]
metrics = [("TTFT p50 (s)", "ttft_stats_s", "p50"),
("TPOT p90 (s)", "tpot_stats_s", "p90")]
fig, axes = plt.subplots(1, len(metrics), figsize=(3.5 * len(metrics), 3.6))
if len(metrics) == 1:
axes = [axes]
for ax, (label, _, _) in zip(axes, metrics):
xs = np.arange(len(configs_present))
w = 0.38
for offset, mode in zip([-w / 2, w / 2], ["eager", "cudagraph"]):
ys, errs = [], []
for cfg in configs_present:
k = (cfg, mode)
if k in agg:
m, e = agg[k][label]
else:
m, e = float("nan"), 0.0
ys.append(m); errs.append(e)
ax.bar(xs + offset, ys, w, yerr=errs, capsize=3,
label=mode, edgecolor="black", linewidth=0.5)
ax.set_xticks(xs)
ax.set_xticklabels(configs_present, rotation=20, ha="right", fontsize=8.5)
ax.set_title(label, fontsize=10)
ax.legend(fontsize=8)
ax.grid(True, axis="y", alpha=0.25)
fig.suptitle("Cuda-graph ablation: PD-sep's D-only graphs are the structural advantage that --enforce-eager suppressed",
fontsize=10, y=1.02)
fig.tight_layout()
fig.savefig(out_path, bbox_inches="tight")
plt.close(fig)
print(f"[C5] wrote {out_path}")
# ---------- entrypoint ----------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--matrix-dir", default="outputs/pd_matrix")
ap.add_argument("--outdir", default="analysis/pd_sep_paper_section/figures")
args = ap.parse_args()
matrix_dir = Path(args.matrix_dir)
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
runs = load_runs(matrix_dir)
print(f"loaded {len(runs)} completed runs from {matrix_dir}")
for r in runs:
print(f" - {r.tag}")
if not runs:
print("no runs yet; nothing to plot.")
return
plot_c2(runs, outdir / "fig_c2_pdsep_vs_combined.pdf")
plot_c3(runs, outdir / "fig_c3_kv_timeseries.pdf")
plot_c4(runs, outdir / "fig_c4_ttft_stacked.pdf")
plot_c5(runs, outdir / "fig_c5_cudagraph_ablation.pdf")
if __name__ == "__main__":
main()