MB5 PD reuse-centric ablation: tooling, data, Fig 1-3

Three-axis controlled ablation of PD-colo vs PD-disagg on synthetic regular
traces (closed-loop, controlled reuse via REPLAY_NO_REALIZED_PREFIX) on the
clean stack (e13391e gated off).

  Axis 1 (Fig 1) -- reuse 6%->94% at N=8, in8192/out256
  Axis 2 (Fig 2) -- shape in2048/out2048 -> in32768/out64 at N=8, reuse~70%
  Axis 3 (Fig 3) -- concurrency N=8/16/32/64 at reuse~71%, in8192/out256

Findings:
  * APC parity colo=PD at every reuse (5.5/22/44/66/77/82%) -- contamination
    fix validated.
  * PD edge erodes 1.57x->1.10x with reuse; prefill GPUs strand 26%->9%.
  * Shape: PD-best peaks mid-sweep (1.34x at in8192/out512); wrong PD ratio
    catastrophic at prefill extreme (in32768/out64 pd2 = 378/400, p99 432s).
  * Concurrency: PD wins N<=32 (1.23-1.29x), TIPS at N=64 -- pd2/pd4
    crater (APC 71%->1.4%, TPS -30%) while colo scales cleanly.

Infrastructure:
  * replayer: --max-inflight-sessions, --inter-turn-think, --no-realized-prefix
    (env-defaulted via REPLAY_MAX_INFLIGHT, REPLAY_INTER_TURN_THINK_S,
    REPLAY_NO_REALIZED_PREFIX).
  * mb5_run.sh: writes bench_config.json + gpu_util.csv + run_window.json +
    instance_apc.txt + metrics.jsonl for bench_report/fig_agg ingest.
  * fig_agg.py: per-arm GPU role split + producer-side APC; --json mode.
  * gpu_util_report.py: companion per-GPU util report from gpu_util.csv.
  * partial_summary.py: stats from in-flight replay_metrics.jsonl
    (works before metrics.summary.json exists).

Data: analysis/mb5_pd_ablation/fig{1,2,3}.json (24 + 20 + 16 rows).
Figures: figs/mb5_pd_ablation/fig{1_reuse,2_shape,3_concurrency}_axis.png.
This commit is contained in:
2026-05-31 20:14:46 +08:00
parent a2111b6e18
commit fafc44da79
12 changed files with 389 additions and 9 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 146 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 158 KiB

View File

@@ -0,0 +1,140 @@
"""Aggregate a set of MB5 run dirs into one comparison table.
Pulls the three core metrics the analysis cares about, per run:
- E2E latency (from replay_metrics.summary.json: latency_stats_s)
- TPS (output tokens / wall_clock_s)
- GPU util by workers (gpu_util.csv over run_window, split prefill/decode by role)
plus honest reuse (producer-side APC from instance_apc.txt) and TTFT/TPOT for logs.
Arm + GPU role split + producer APC ports are inferred from the dir name:
*_colo_* -> 8 kv_both ; apc ports 8000-8007 (all keep prefix)
*_pd6_* -> 6P+2D P0-5/D6-7 ; apc 8000-8005
*_pd_* -> 4P+4D P0-3/D4-7 ; apc 8000-8003 (note: "pd" not "pd4")
*_pd2_* -> 2P+6D P0-1/D2-7 ; apc 8000-8001
Usage: fig_agg.py <run_dir> [<run_dir> ...]
"""
from __future__ import annotations
import csv
import json
import re
import statistics
import sys
from pathlib import Path
def arm_of(name: str):
# New driver naming (run_conc.sh / run_reuse_fixed.sh): "..._<CONFIG>_rep<r>".
if "8C-proxy" in name:
return "colo", list(range(8)), [], list(range(8000, 8008))
if "6P+2D" in name:
return "6P+2D", [0, 1, 2, 3, 4, 5], [6, 7], list(range(8000, 8006))
if "2P+6D" in name:
return "2P+6D", [0, 1], [2, 3, 4, 5, 6, 7], list(range(8000, 8002))
if "4P+4D" in name:
return "4P+4D", [0, 1, 2, 3], [4, 5, 6, 7], list(range(8000, 8004))
# Legacy naming (original May-30 corrected runs).
if "_colo_" in name or name.endswith("_colo"):
return "colo", list(range(8)), [], list(range(8000, 8008))
if "_pd6_" in name:
return "6P+2D", [0, 1, 2, 3, 4, 5], [6, 7], list(range(8000, 8006))
if "_pd2_" in name:
return "2P+6D", [0, 1], [2, 3, 4, 5, 6, 7], list(range(8000, 8002))
if "_pd4_" in name or "_pd_" in name:
return "4P+4D", [0, 1, 2, 3], [4, 5, 6, 7], list(range(8000, 8004))
return "?", list(range(8)), [], list(range(8000, 8008))
def util_split(run: Path, pgpus, dgpus):
win = {}
wp = run / "run_window.json"
if wp.exists():
win = json.load(open(wp))
t0, t1 = win.get("t_start_unix"), win.get("t_end_unix")
csvp = run / "gpu_util.csv"
if not csvp.exists():
return None, None
by = {}
for row in csv.DictReader(open(csvp)):
try:
ts = float(row["timestamp"]); g = int(row["gpu"]); u = float(row["util_pct"])
except (ValueError, KeyError):
continue
if t0 and not (t0 <= ts <= t1):
continue
by.setdefault(g, []).append(u)
pm = [v for g in pgpus for v in by.get(g, [])]
dm = [v for g in dgpus for v in by.get(g, [])]
return (statistics.fmean(pm) if pm else None,
statistics.fmean(dm) if dm else None)
def apc(run: Path, ports):
f = run / "instance_apc.txt"
if not f.exists():
return None
q = h = 0
for line in open(f):
m = dict(re.findall(r"(\w+)=(\S+)", line))
try:
p = int(m.get("port", -1))
except ValueError:
continue
if p in ports:
q += float(m.get("queries", 0)); h += float(m.get("hits", 0))
return (h / q) if q else None
def main():
args = sys.argv[1:]
as_json = False
if "--json" in args:
as_json = True
args = [a for a in args if a != "--json"]
rows = []
for d in args:
run = Path(d)
sp = run / "replay_metrics.summary.json"
if not sp.exists():
continue
s = json.load(open(sp))
arm, pg, dg, ports = arm_of(run.name)
lat = s.get("latency_stats_s", {})
ttft = s.get("ttft_stats_s", {})
tpot = s.get("tpot_stats_s", {})
wall = s.get("wall_clock_s") or 1.0
out = s.get("actual_output_tokens_stats", {})
n = s.get("success_count", 0); req = s.get("request_count", 0)
tot_out = out.get("count", 0) * out.get("mean", 0)
tps = tot_out / wall
pu, du = util_split(run, pg, dg)
a = apc(run, ports)
rows.append({
"name": run.name, "arm": arm, "n": n, "req": req,
"e2e_p50": lat.get("p50"), "e2e_p90": lat.get("p90"), "e2e_p99": lat.get("p99"),
"e2e_mean": lat.get("mean"),
"ttft_p90": ttft.get("p90"), "tpot_p99": tpot.get("p99"),
"tps": tps, "wall": wall, "pu": pu, "du": du, "apc": a,
})
if as_json:
print(json.dumps(rows))
return
def f(x, w=7, p=1):
return f"{x:>{w}.{p}f}" if isinstance(x, (int, float)) else f"{'-':>{w}}"
hdr = (f"{'run':<34}{'arm':>7}{'ok/req':>9}{'E2Ep50':>8}{'E2Ep90':>8}{'E2Ep99':>8}"
f"{'TPS':>8}{'Putil':>7}{'Dutil':>7}{'APC%':>7}{'TTFTp90':>9}{'TPOTp99ms':>10}")
print(hdr); print("-" * len(hdr))
for r in sorted(rows, key=lambda r: r["name"]):
print(f"{r['name']:<34}{r['arm']:>7}{str(r['n'])+'/'+str(r['req']):>9}"
f"{f(r['e2e_p50'])}{f(r['e2e_p90'])}{f(r['e2e_p99'])}"
f"{f(r['tps'],8,1)}{f(r['pu'])}{f(r['du'])}"
f"{f((r['apc'] or 0)*100)}{f(r['ttft_p90'],9,2)}"
f"{f((r['tpot_p99'] or 0)*1000,10,1)}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,71 @@
"""Per-GPU utilization report from gpu_util.csv (companion to bench_report.py).
bench_report's per-worker GPU util needs request routing (breakdown.json), which
the MB5 proxy doesn't log. But worker == GPU by index, and the prefill/decode role
split is fixed by config, so per-GPU util from gpu_util.csv directly answers
"GPU utils by workers" — and for PD it exposes the key signal: are the prefill-side
GPUs saturated while the decode-side idles (or vice versa, or stalled at ~0)?
Usage:
gpu_util_report.py <run_dir> [--prefill-gpus 0,1,2,3 --decode-gpus 4,5,6,7]
"""
from __future__ import annotations
import argparse
import csv
import json
import statistics
from pathlib import Path
def pct(xs, p):
xs = sorted(xs)
return xs[max(0, min(len(xs) - 1, int(round(p / 100 * (len(xs) - 1)))))] if xs else None
def main():
ap = argparse.ArgumentParser()
ap.add_argument("run_dir", type=Path)
ap.add_argument("--prefill-gpus", default="")
ap.add_argument("--decode-gpus", default="")
a = ap.parse_args()
win = {}
wp = a.run_dir / "run_window.json"
if wp.exists():
win = json.load(open(wp))
t0, t1 = win.get("t_start_unix"), win.get("t_end_unix")
csvp = a.run_dir / "gpu_util.csv"
if not csvp.exists():
print(f"{a.run_dir.name}: gpu_util.csv absent"); return
by_gpu = {}
for row in csv.DictReader(open(csvp)):
try:
ts = float(row["timestamp"]); g = int(row["gpu"]); u = float(row["util_pct"]); m = float(row["mem_used_mb"])
except (ValueError, KeyError):
continue
if t0 and not (t0 <= ts <= t1):
continue
by_gpu.setdefault(g, {"u": [], "m": []})
by_gpu[g]["u"].append(u); by_gpu[g]["m"].append(m)
print(f"=== {a.run_dir.name}: per-GPU util over replay window ({sum(len(d['u']) for d in by_gpu.values())} samples) ===")
print(f"{'gpu':>4}{'util_mean':>11}{'util_p90':>10}{'util_max':>10}{'mem_max_GB':>12}")
for g in sorted(by_gpu):
u, m = by_gpu[g]["u"], by_gpu[g]["m"]
print(f"{g:>4}{statistics.fmean(u):>11.1f}{pct(u,90):>10.1f}{max(u):>10.1f}{max(m)/1024:>12.1f}")
def agg(gpus, label):
gpus = [int(x) for x in gpus.split(",") if x != ""]
us = [v for g in gpus for v in by_gpu.get(g, {}).get("u", [])]
if us:
print(f" {label:<14} gpus={gpus} util mean={statistics.fmean(us):.1f}% p90={pct(us,90):.1f}% max={max(us):.1f}%")
if a.prefill_gpus:
agg(a.prefill_gpus, "prefill-side")
if a.decode_gpus:
agg(a.decode_gpus, "decode-side")
if __name__ == "__main__":
main()

View File

@@ -69,6 +69,13 @@ run_one() {
source "${VENV}/bin/activate"
local replay_out="${rundir}/replay_metrics.jsonl"
mkdir -p "$(dirname "${replay_out}")"
# bench_report.py inputs: worker->gpu map (worker i == gpu i for every config;
# for PD, workers 0-3 are producers on gpu0-3, 4-7 consumers on gpu4-7).
printf '{"base_port":8000,"n_instances":8,"gpu_indices":[0,1,2,3,4,5,6,7]}\n' \
> "${rundir}/bench_config.json"
# per-GPU utilization timeseries over the replay window (2s sampling)
bash "${SCRIPT_DIR}/gpu_monitor.sh" "${rundir}/gpu_util.csv" 2 >/dev/null 2>&1 &
local GPU_MON=$!
local t0
t0=$(date +%s.%N)
if ! PYTHONPATH="${FRESH_ROOT}" python -m replayer \
@@ -82,6 +89,7 @@ run_one() {
t1=$(date +%s.%N)
local wall=$(python -c "print(${t1} - ${t0})")
echo "[mb5-run] REPLAY FAILED after ${wall} s; see ${OUT_ROOT}/${config}_rep${rep}_replay.log"
kill "${GPU_MON}" 2>/dev/null || true
bash "${LAUNCH}" stop > /dev/null 2>&1 || true
return 1
fi
@@ -91,6 +99,9 @@ run_one() {
wall_clock_s=$(python -c "print(${t1} - ${t0})")
echo "[mb5-run] replay done in ${wall_clock_s}s"
echo "${wall_clock_s}" > "${rundir}/wall_clock_s.txt"
kill "${GPU_MON}" 2>/dev/null || true
printf '{"t_start_unix":%s,"t_end_unix":%s}\n' "${t0}" "${t1}" > "${rundir}/run_window.json"
cp -f "${replay_out}" "${rundir}/metrics.jsonl" # bench_report.py expects metrics.jsonl
# Per-instance prefix-cache counters, scraped from each backend BEFORE
# teardown. For PD this is the only honest reuse signal: producer ports

View File

@@ -0,0 +1,98 @@
"""Compute a per-run summary directly from replay_metrics.jsonl (for partial / in-flight runs).
Used when the replayer hasn't completed (so replay_metrics.summary.json doesn't exist
yet) but enough records have streamed to disk to read out the per-arm result.
Also accepts a finished run's directory and prints the same one-line summary for
apples-to-apples comparison.
"""
from __future__ import annotations
import json
import re
import statistics
import sys
from pathlib import Path
def stats(xs):
xs = sorted(xs)
n = len(xs)
if n == 0:
return None
return {
"n": n,
"mean": statistics.fmean(xs),
"p50": xs[n // 2],
"p90": xs[int(0.9 * (n - 1))],
"p99": xs[int(0.99 * (n - 1))],
}
def apc(run: Path, producer_ports):
f = run / "instance_apc.txt"
if not f.exists():
return None
q = h = 0.0
for line in open(f):
m = dict(re.findall(r"(\w+)=(\S+)", line))
try:
p = int(m.get("port", -1))
except ValueError:
continue
if p in producer_ports:
q += float(m.get("queries", 0))
h += float(m.get("hits", 0))
return (h / q) if q else None
def main():
for d in sys.argv[1:]:
run = Path(d)
# prefer the live replay_metrics.jsonl (so partials work); fall back to metrics.jsonl
for fn in ("replay_metrics.partial.jsonl", "replay_metrics.jsonl", "metrics.jsonl"):
p = run / fn
if p.exists():
rec_path = p
break
else:
print(f"{run.name}: no records"); continue
recs = [json.loads(l) for l in open(rec_path)]
oks = [r for r in recs if r.get("error") is None]
lat = stats([r["latency_s"] for r in oks if "latency_s" in r])
ttft = stats([r["ttft_s"] for r in oks if "ttft_s" in r])
tpot = stats([r["tpot_s"] for r in oks if "tpot_s" in r])
out = sum(r.get("actual_output_tokens", r.get("output_length", 0)) for r in oks)
ts = [r["t_dispatch_unix"] for r in oks if "t_dispatch_unix" in r]
tf = [r["t_finish_unix"] for r in oks if "t_finish_unix" in r]
span = max(tf) - min(ts) if ts and tf else 0
tps = out / span if span else 0
# producer ports by arm tag in dirname
n = run.name
if "_colo_" in n:
ports = list(range(8000, 8008))
elif "_pd6_" in n:
ports = list(range(8000, 8006))
elif "_pd2_" in n:
ports = list(range(8000, 8002))
else:
ports = list(range(8000, 8004))
a = apc(run, ports)
print(f"{run.name}")
print(f" n_ok={len(oks)}/{len(recs)}"
+ (f" (target=1214 -> {len(oks)*100/1214:.1f}%)" if len(recs) < 1214 else ""))
if lat:
print(f" E2E mean={lat['mean']:.2f} p50={lat['p50']:.2f} p90={lat['p90']:.2f} p99={lat['p99']:.2f}")
if ttft:
print(f" TTFT mean={ttft['mean']:.2f} p50={ttft['p50']:.2f} p90={ttft['p90']:.2f} p99={ttft['p99']:.2f}")
if tpot:
print(f" TPOT mean={tpot['mean']*1000:.1f}ms p90={tpot['p90']*1000:.1f}ms p99={tpot['p99']*1000:.1f}ms")
print(f" output_tokens={out:.0f} span={span:.0f}s TPS={tps:.0f}")
if a is not None:
print(f" producer APC={a*100:.1f}%")
if __name__ == "__main__":
main()

View File

@@ -30,12 +30,23 @@ def main() -> None:
default=float(_env_think) if _env_think else None,
help="Closed-loop think-time (s) after each turn completes; "
"ignore absolute trace schedule. Env: REPLAY_INTER_TURN_THINK_S")
p.add_argument("--no-realized-prefix",
action="store_true",
default=bool(os.environ.get("REPLAY_NO_REALIZED_PREFIX")),
help="Controlled-reuse mode: prompt = hash-built tokens only "
"(reuse set by hash_ids). Env: REPLAY_NO_REALIZED_PREFIX")
p.add_argument("--dispatch-mode", choices=["tracets", "thinktime"],
default=os.environ.get("REPLAY_DISPATCH_MODE", "tracets"),
help="tracets (Mode 1): absolute trace ts = max(prev_finished, ts). "
"thinktime (Mode 2): turn-k at prev_finished + "
"time_to_parent_chat. Env: REPLAY_DISPATCH_MODE")
p.add_argument("--request-timeout", type=float, default=600.0)
_env_maxdur = os.environ.get("REPLAY_MAX_DURATION")
p.add_argument("--max-duration", type=float,
default=float(_env_maxdur) if _env_maxdur else None,
help="Overall wall-clock deadline (s): cancel in-flight + write "
"summary (un-run turns counted as failures) to bound a "
"collapsed config's drain. Env: REPLAY_MAX_DURATION")
p.add_argument("--request-limit", type=int, default=None,
help="Limit number of requests to replay")
p.add_argument("-v", "--verbose", action="store_true")
@@ -56,7 +67,9 @@ def main() -> None:
request_limit=args.request_limit,
max_inflight_sessions=args.max_inflight_sessions,
inter_turn_think_s=args.inter_turn_think,
no_realized_prefix=args.no_realized_prefix,
dispatch_mode=args.dispatch_mode,
max_duration_s=args.max_duration,
)
results = asyncio.run(replay_trace(config))

View File

@@ -66,6 +66,13 @@ class ReplayConfig:
# max_inflight_sessions=N this is a stable N-user closed-loop (no open-loop
# runaway), so it removes the "immediate retrigger under load" artifact.
inter_turn_think_s: float | None = None
# Controlled-reuse mode: skip _apply_realized_prefix so each turn's prompt is
# exactly the hash-built tokens. Then prefix-cache reuse is governed solely by
# the generated hash_ids (shared prefix blocks hit, fresh delta blocks miss) —
# required for the reuse-fraction sweep, where realized-prefix would otherwise
# force every fixed-length turn to ≈ the prior turn (≈100% reuse regardless).
# Keep OFF (realized-prefix ON) for the real agentic trace.
no_realized_prefix: bool = False
# Dispatch timing for intra-session turns:
# "tracets" (Mode 1): fire at absolute trace timestamp -> effectively
# max(prev_finished, trace_ts); collapses think-time to 0 when
@@ -73,6 +80,25 @@ class ReplayConfig:
# "thinktime" (Mode 2): turn-1 at trace arrival; turn-k at
# prev_finished + time_to_parent_chat (real production gap).
dispatch_mode: str = "tracets"
# Overall wall-clock deadline for the whole replay (seconds). When exceeded,
# stop awaiting in-flight sessions, cancel them, and write the summary over
# whatever completed — un-run turns are counted as failures so completion%
# stays honest (request_count == full trace). None = no deadline (default,
# original behavior unchanged). Used to bound the slow drain of a collapsed
# config in a sweep. Env: REPLAY_MAX_DURATION.
max_duration_s: float | None = None
def _skipped_metric() -> "RequestMetrics":
"""Placeholder failure row for a turn never run due to a max_duration cutoff.
Only its error (non-None) matters: it counts toward request/error totals but
is excluded from latency/ttft/tpot percentiles (successes only)."""
return RequestMetrics(
request_id="deadline_skipped", session_id="", turn_id=-1,
trace_timestamp_s=0.0, input_length=0, output_length=0,
request_type="skipped", effective_input_length=None, cached_tokens=0,
latency_s=None, ttft_s=None, tpot_s=None, error="deadline_skipped",
)
def _build_prompt_token_ids(req: TraceRequest) -> list[int]:
@@ -318,10 +344,9 @@ async def _run_session(
if elapsed < target_wall:
await asyncio.sleep(target_wall - elapsed)
token_ids = _apply_realized_prefix(
_build_prompt_token_ids(req),
realized_context,
)
token_ids = _build_prompt_token_ids(req)
if not config.no_realized_prefix:
token_ids = _apply_realized_prefix(token_ids, realized_context)
result = await _dispatch_request(
client=client, config=config, req=req,
prompt_token_ids=token_ids, sem=request_sem,
@@ -410,25 +435,44 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
trust_env=False,
limits=limits,
) as client:
states = [_SessionState(session_id=sid, turns=turns)
for sid, turns in sessions]
tasks = [
asyncio.create_task(_run_session(
state=_SessionState(session_id=sid, turns=turns),
config=config, client=client,
state=st, config=config, client=client,
request_sem=request_sem,
earliest_ts=earliest_ts, sweep_start=sweep_start,
sink=sink,
session_sem=session_sem,
))
for sid, turns in sessions
for st in states
]
all_results = await asyncio.gather(*tasks)
if config.max_duration_s and config.max_duration_s > 0:
_done, pending = await asyncio.wait(
tasks, timeout=config.max_duration_s)
if pending:
logger.warning(
"max_duration %.0fs reached: cancelling %d in-flight "
"session(s); un-run turns counted as failures",
config.max_duration_s, len(pending))
for t in pending:
t.cancel()
await asyncio.gather(*pending, return_exceptions=True)
else:
await asyncio.gather(*tasks)
finally:
sink.close()
sweep_elapsed = time.perf_counter() - sweep_start
post_metrics = await _snapshot_prefix_cache_metrics(config.endpoint_url)
flat = [m for group in all_results for m in group]
# Build from the session states (identical to the gather return in the
# uncapped path) so partially-completed (cancelled) sessions still contribute
# their finished turns; pad un-run turns as failures so request_count == trace.
flat = [m for st in states for m in st.metrics]
missing = n_requests - len(flat)
if missing > 0:
flat.extend(_skipped_metric() for _ in range(missing))
summary_path = config.output_path.with_suffix(".summary.json")
write_summary_json(summary_path, flat)