Files
agentic-kvc/analysis/pd_sep_paper_section/scripts/plot_workload.py
Gahow Wang d71a111099 Paper section: PD-sep scaffold + drop --enforce-eager from launch scripts
Adds analysis/pd_sep_paper_section/ as the home for the "PD separation is
net negative under agentic workloads" paper section: plot scripts for C1
(workload chars), C6 (roofline), C7 (routing-vs-PD-sep lever), the C6/C7
PDFs already rendered, and a README mapping candidate claims to required
figures plus open re-run items.

Removes --enforce-eager from bench.sh and all active launch scripts so
cuda graphs are captured -- the prior methodology suppressed one of
PD-sep's structural advantages (D-node fixed-shape decode). Legacy
scripts under scripts/legacy/ are intentionally untouched as historical
records.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 11:24:16 +08:00

218 lines
7.7 KiB
Python

"""C1: workload characterization figures.
Generates two figures from the sampled trace:
fig_c1a_io_cdf.pdf -- input / output token CDF (two panels)
fig_c1b_reuse.pdf -- KV-block reuse decomposition
Run on dash0 where the trace lives and matplotlib is installed.
Usage:
.venv/bin/python scripts/plot_workload.py \
--trace traces/w600_r0.0015_st30.jsonl \
--outdir analysis/figures
"""
import argparse
import json
import sys
from collections import Counter
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
BLOCK_SIZE = 512
def load_trace(path):
rows = [json.loads(l) for l in open(path)]
rows.sort(key=lambda r: float(r["timestamp"]))
return rows
def percentile_markers(arr, qs=(0.5, 0.9, 0.99)):
arr = np.asarray(arr)
return {q: float(np.quantile(arr, q)) for q in qs}
def plot_io_cdf(rows, out_path):
inputs = np.array([r["input_length"] for r in rows if r["input_length"] > 0])
outputs = np.array([r["output_length"] for r in rows if r["output_length"] > 0])
fig, axes = plt.subplots(1, 2, figsize=(8.5, 3.2))
for ax, data, label, log in [
(axes[0], inputs, "input tokens (log scale)", True),
(axes[1], outputs, "output tokens", False),
]:
sorted_d = np.sort(data)
cdf = np.arange(1, len(sorted_d) + 1) / len(sorted_d)
ax.plot(sorted_d, cdf, color="#1f77b4", lw=1.6)
if log:
ax.set_xscale("log")
ax.set_xlabel(label)
ax.set_ylabel("CDF")
ax.set_ylim(0, 1.02)
ax.grid(True, alpha=0.3)
pcts = percentile_markers(data)
for q, v in pcts.items():
ax.axvline(v, color="#888", ls=":", lw=0.8)
ax.annotate(
f"p{int(q*100)}={int(v):,}",
xy=(v, q),
xytext=(4, -8),
textcoords="offset points",
fontsize=8,
color="#444",
)
io_ratio = inputs.sum() / max(outputs.sum(), 1)
fig.suptitle(
f"Agentic workload I/O: aggregate ratio = {io_ratio:.1f}x "
f"(N={len(rows)} requests, sampled from GLM-5.1)",
fontsize=10,
)
fig.tight_layout(rect=(0, 0, 1, 0.94))
fig.savefig(out_path, bbox_inches="tight")
plt.close(fig)
print(f"[C1a] wrote {out_path}")
print(f" input p50={int(np.quantile(inputs, 0.5)):,} "
f"p90={int(np.quantile(inputs, 0.9)):,} "
f"p99={int(np.quantile(inputs, 0.99)):,}")
print(f" output p50={int(np.quantile(outputs, 0.5)):,} "
f"p90={int(np.quantile(outputs, 0.9)):,} "
f"p99={int(np.quantile(outputs, 0.99)):,}")
print(f" aggregate I/O ratio = {io_ratio:.2f}x")
def reuse_decomposition(rows):
"""Classify every cacheable block as intra-session / cross-session / unique.
Walk requests in timestamp order. For each block (hash_id) in the request:
- if first time seen globally -> 'unique-or-future-reuse' (resolved later)
- if already seen earlier within the same session -> 'intra-session'
- if already seen in a different session -> 'cross-session'
After the pass, blocks classified as 'unique-or-future-reuse' that have
a global refcount of 1 are 'unique'; those with refcount > 1 stay where
they were first seen (counted under whichever later request reused them).
Token counts use BLOCK_SIZE = 512.
"""
# Session id resolution mirrors analyze_cache_hit.py.
chat_to_session = {}
block_first_session = {} # hid -> session_id of first emitter
block_seen_in_session = {} # hid -> set of session_ids that have seen it
block_global_count = Counter()
intra = 0
cross = 0
first_time = 0 # token-count of blocks the first time they appear
for r in rows:
cid = int(r["chat_id"])
pid = int(r["parent_chat_id"])
sid = r.get("session_id",
str(cid) if pid < 0 else chat_to_session.get(pid, str(pid)))
sid = str(sid)
chat_to_session[cid] = sid
for hid in r.get("hash_ids", []):
block_global_count[hid] += 1
if hid not in block_first_session:
block_first_session[hid] = sid
block_seen_in_session[hid] = {sid}
first_time += BLOCK_SIZE
else:
if sid in block_seen_in_session[hid]:
intra += BLOCK_SIZE
else:
cross += BLOCK_SIZE
block_seen_in_session[hid].add(sid)
# Of the first-time tokens, those whose block was never reused are 'unique'.
unique_tokens = 0
reused_first = 0
for hid, count in block_global_count.items():
if count == 1:
unique_tokens += BLOCK_SIZE
else:
reused_first += BLOCK_SIZE # first emission of a reused block
# Total tokens (block-rounded) = intra + cross + first_time
# first_time decomposes into: unique_tokens + reused_first
# For the reuse story we attribute first_time to 'unique vs the
# first-emit-of-a-shared-block'. Convention used in the figure:
# intra-session reuse = subsequent hits within the same session
# cross-session reuse = subsequent hits across sessions
# first emission (will-reuse) = block emitted once, reused later
# unique (never-reuse) = block emitted exactly once, never hit again
return {
"intra_session_reuse_tokens": intra,
"cross_session_reuse_tokens": cross,
"first_emission_will_reuse_tokens": reused_first,
"unique_no_reuse_tokens": unique_tokens,
}
def plot_reuse(rows, out_path):
d = reuse_decomposition(rows)
total = sum(d.values())
parts = [
("intra-session reuse", d["intra_session_reuse_tokens"], "#2ca02c"),
("cross-session reuse", d["cross_session_reuse_tokens"], "#1f77b4"),
("first emission (reused later)", d["first_emission_will_reuse_tokens"], "#ff7f0e"),
("unique (never reused)", d["unique_no_reuse_tokens"], "#d62728"),
]
fig, ax = plt.subplots(figsize=(8.5, 1.9))
left = 0
for label, val, color in parts:
frac = val / total
ax.barh(0, frac, left=left, color=color, edgecolor="white", height=0.6, label=label)
if frac > 0.025:
ax.text(left + frac / 2, 0,
f"{label}\n{frac*100:.1f}%",
ha="center", va="center", fontsize=8.5, color="white")
left += frac
ax.set_xlim(0, 1)
ax.set_yticks([])
ax.set_xlabel("share of total cacheable tokens (block-aligned, 512 tok blocks)")
ax.set_title("Where do prefix cache hits come from? "
f"(N={len(rows)} requests, sampled trace)")
ax.legend(loc="upper center", bbox_to_anchor=(0.5, -0.45), ncol=4, fontsize=8, frameon=False)
for spine in ("top", "right", "left"):
ax.spines[spine].set_visible(False)
fig.tight_layout()
fig.savefig(out_path, bbox_inches="tight")
plt.close(fig)
print(f"[C1b] wrote {out_path}")
for label, val, _ in parts:
print(f" {label:40s} {val/total*100:5.1f}% ({val:>12,} tokens)")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--trace", default="traces/w600_r0.0015_st30.jsonl")
ap.add_argument("--outdir", default="analysis/pd_sep_paper_section/figures")
args = ap.parse_args()
trace = Path(args.trace)
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
if not trace.exists():
sys.exit(f"trace not found: {trace}")
rows = load_trace(trace)
print(f"loaded {len(rows)} requests from {trace}")
plot_io_cdf(rows, outdir / "fig_c1a_io_cdf.pdf")
plot_reuse(rows, outdir / "fig_c1b_reuse.pdf")
if __name__ == "__main__":
main()