Migration transfer-cost study: KV transfer is slow on busy GPUs

MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at
~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into
~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s)
+ ~45% control-plane GIL starvation during long prefills. Reproduced on a
fresh upstream venv (byte-identical transfer path) -> upstream/hardware
inherent, not our patch. Layerwise is the wrong lever; the tax is structural
on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6,
instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 11:53:01 +08:00
parent 67fcec7933
commit 1262c9c22e
11 changed files with 2055 additions and 0 deletions

View File

@@ -0,0 +1,178 @@
# Why KV-transfer is slow during migration under real load
**Question.** EAR's unified+A+B routing beats migration (v3) on agentic
workloads. We wanted to know whether *layerwise* KV transfer would shrink
migration's overhead enough to make it viable. Investigating that led to a
sharper question: **in a real (loaded) cluster, when we migrate, the KV
transfer is already slow — the effective bandwidth is far below the
~10 GB/s wire rate. Why?**
This doc answers that with instrumented measurements.
**TL;DR.** Migration fires precisely when instances are *busy* (that's the
trigger). But on a busy instance, KV transfer runs at **~3 GB/s instead of
~10 GB/s**, because:
1. **The RDMA write itself slows ~2× under compute load** — GPU-direct RDMA
(`batch_transfer_sync_write`) contends with the running attention/MLP
kernels for **HBM and PCIe bandwidth**. (idle 7.6 GB/s → busy 4.0 GB/s)
2. **The connector's Python control plane gets GIL-starved** — mooncake's
ZMQ handshake + transfer orchestration run on asyncio threads inside the
engine process; when the engine's main thread is doing a long forward
pass (e.g. a 100k-token prefill), those threads stall for *seconds*.
Both are **inherent to upstream vLLM 0.18.1 + mooncake** (reproduced on a
clean fresh venv; the transfer path is byte-identical to upstream — our
patches did not cause this), and both get **worse**, not better, with
layerwise transfer. So the bandwidth gap is not a layerwise problem; it is a
*transfer-on-a-busy-GPU* problem.
---
## 1. Evidence chain
Three independent measurements, all on dash0 (8×H100, Qwen3-Coder-30B-A3B,
TP=1), Mooncake `kv_both`.
### 1a. Instrumented v3 trace replay — where does migration time go?
Run `outputs/b3_v3_fullbreak_20260528_0338/`. Instruments:
`instrument_dst_migration.py` (dst scheduler lifecycle) +
`instrument_mooncake.py` (connector internals: `send_blocks` RDMA,
`receive_kv` window, `ready_wait`).
25 migrations fired over the trace. Dst-side migration overhead
(`T_kv_pull` = scheduler marks `WAITING_FOR_REMOTE_KVS``finished_recving`):
| component | share | what it is |
|---|---:|---|
| RDMA-actual (`batch_transfer_sync_write`) | **55%** (55.2 s) | the real RDMA write |
| dst control-plane gap | **45%** (45.4 s) | scheduler↔receiver_loop dispatch + completion propagation |
| `ready_wait` (src KV not committed) | 0% | 25/25 already committed — **ruled out** |
- Pure RDMA aggregate rate: **2.03 GB/s** (vs MB2 idle 9.7 GB/s).
- RDMA rate **collapses with transfer size**: <3 GiB 49.5 GB/s,
>5 GiB → 0.92.6 GB/s.
- The control-plane gap is **bimodal**: median 0.04 s, but a handful of
requests stall ~10 s. Those are small-KV transfers (0.18 s of actual RDMA)
whose `T_kv_pull` is 811 s — i.e. the dst's `receiver_loop` thread was
GIL-starved for ~10 s while the engine did a big forward pass.
> Earlier (pre-instrumentation) we wrongly attributed ~90% of migration
> overhead to "dst scheduler queueing" by estimating transfer at clean wire
> speed. With real instrumentation, dst *scheduler admission* is ~0
> (`T_admission_post_kv` = 0.003 s); the time is the transfer phase (RDMA +
> connector control plane), both degraded by instance busy-ness.
### 1b. MB6 controlled microbench — does busy-ness cause it?
`microbench/fresh_setup/mb6_transfer_under_load.py` + `run_mb6.sh`: 2
instances, transfer a fixed-size KV (prefill on A → migrate to B) while
holding *N* background decode streams on both. Sweep N.
Effective transfer bandwidth (65k-token KV ≈ 6 GiB), main venv:
| background load | 65k transfer | eff bandwidth |
|---|---:|---:|
| **0 (idle)** | 747 ms | **8.76 GB/s** |
| 8 (4/instance) | 2423 ms | 4.53 GB/s |
| **24 (12/instance)** | 2015 ms | **3.33 GB/s** |
Monotonic degradation with load. **The busy level (3.3 GB/s) matches the
v3 trace's 3.3 GB/s median exactly** — because agentic instances run
~10+ concurrent requests, i.e. the bg=24 regime.
Decomposing the 65k transfer into RDMA-actual vs control-plane:
| bg | RDMA rate | control-plane share |
|---|---:|---:|
| 0 (idle) | 7.56 GB/s | 13% |
| 8 | 4.07 GB/s | 11% |
| 24 (busy) | 3.97 GB/s | 15% |
In the clean microbench the **RDMA write itself is the dominant degrading
term** (7.6 → 4.0 GB/s). The ~10 s control-plane stalls seen in the trace
(1a) don't reproduce here because steady decode forward passes are short;
they require the long (100k-token) prefills that the real trace has.
### 1c. Fresh-venv comparison — is it our patch?
Same MB6 sweep on `agentic-kv-fresh/.venv` (clean upstream-style 0.18.1):
| bg | 65k eff (fresh) | 65k eff (main/patched) |
|---|---:|---:|
| 0 | 8.73 GB/s | 8.76 GB/s |
| 8 | 4.52 GB/s | 4.53 GB/s |
| 24 | 3.27 GB/s | 3.33 GB/s |
**Identical within noise.** Plus a static check: the v3 transfer path
(`send_kv_to_decode`, `_send_blocks`/`batch_transfer_sync_write`,
`_build_transfer_params`) is **byte-identical** to pristine upstream 0.18.1
(commit `445e491`); `receive_kv_from_single_worker` differs only by a 4-line
error branch. Our mooncake commits (`a7df84b` direct-read,
`ea51497` partial-prefill, `e3a1d70` read→push) only touch a *separate*
`direct_read` path that v3 does **not** use (v3 requests carry no
`direct_read` flag → normal push path).
**The slowdown is upstream/hardware-inherent, not introduced by us.**
---
## 2. Root cause
Migration in agentic serving transfers KV **between instances that are
concurrently busy with compute** — by construction, since v3 migrates *away
from* a busy host. On a busy instance:
- **HBM/PCIe contention (the dominant, irreducible part).** Mooncake's
transfer is GPU-direct RDMA: the NIC DMAs KV straight out of / into GPU
HBM. While the GPU runs attention+MLP kernels, those kernels saturate HBM
bandwidth, so the NIC's RDMA gets a smaller slice. Effective transfer
bandwidth roughly halves (7.6 → 4.0 GB/s at our load), and degrades
further for large multi-segment transfers.
- **Control-plane GIL starvation (secondary, bursty).** The connector runs
its ZMQ handshake + `send_kv_to_decode`/`receive_kv` orchestration on
asyncio threads (`sender_loop`/`receiver_loop`) *inside the engine
process*. A long forward pass (100k-token prefill) holds the GIL for
seconds, stalling those threads → multi-second dispatch gaps even when the
actual transfer is 0.2 s.
MB2 measured 9.7 GB/s precisely because both endpoints were **idle**. The
real-workload gap is the difference between "idle benchmark" and "transfer
while the GPU is doing the day job."
---
## 3. Implication: layerwise is the wrong lever; migration's tax is largely irreducible
| lever | effect on the gap |
|---|---|
| **Model-level layerwise transfer** (push each layer's KV during prefill) | **Worse.** Prefill is the most HBM-intensive phase, so per-layer transfers contend *harder* for HBM (Cause 1); and they multiply the control-plane round-trips (Cause 2). |
| **Control-plane fix** (move mooncake orchestration off the GIL-contended threads / separate process) | Addresses only the bursty ~10 s stalls (~15% in the clean case, up to ~45% of the trace tail). Does **not** touch the HBM-contention half. |
| **Reduce bytes** (cache-aware target so less KV moves) | Helps linearly; v3 Mechanism B already does some. Orthogonal. |
| **Migrate to/from idle instances** | Would restore ~10 GB/s — but defeats the purpose (we migrate *because* the host is busy). |
The dominant cost (RDMA contending with compute for HBM on busy instances)
is a **hardware reality**, not a software bug we can patch away, and not
something layerwise improves. This reinforces
[UNIFIED_ABLATION.md](UNIFIED_ABLATION.md): the unified no-migration path
(A+B'+RaceFix) remains the right default; migration's transfer tax is
structural on a loaded agentic cluster.
---
## 4. Repro / artifacts
- Instrumented v3 breakdown: `outputs/b3_v3_fullbreak_20260528_0338/unified_v3/`
(`transfer_decomp.txt`, `dst_migration_breakdown.{csv,png}`,
`transfer_rootcause.png`)
- MB6 main: `outputs/mb6_agentic-kv_20260528_0552/mb6_result.json`
- MB6 fresh: `outputs/mb6_fresh_20260528_0559/mb6_result.json`
- Instruments: `microbench/fresh_setup/instrument_dst_migration.py`,
`microbench/fresh_setup/instrument_mooncake.py`
- Microbench: `microbench/fresh_setup/mb6_transfer_under_load.py` +
`run_mb6.sh` (`VENV=… bash run_mb6.sh`)
- Analyzers: `analyze_dst_migration.py`, `analyze_transfer_decomp.py`
All instruments apply/revert cleanly via `--apply`/`--revert`; both venvs
were restored after the runs.

View File

@@ -0,0 +1,333 @@
#!/usr/bin/env python3
"""Analyze dst-side migration breakdown for unified_v3 runs.
Joins the proxy `breakdown.json` (per-request route + phase timestamps)
with the dst engine per-PID logs written by
`instrument_dst_migration.py` (`dm_mig_pid<pid>.jsonl`), to attribute
each migration's dst-side wall-clock into:
T_relay proxy decode-sent → dst arrival
T_admission_pre_kv dst arrival → status=WAITING_FOR_REMOTE_KVS
(waiting in dst's scheduler queue before KV pull
is even initiated)
T_kv_pull WAITING_FOR_REMOTE_KVS → finished_recving
(the actual RDMA transfer + connector ack)
T_admission_post_kv finished_recving → first time in self.running
(KV ready, waiting for batch slot)
T_first_iter first scheduled → first generated token
(one decode-iter compute + sampler latency)
Layerwise transfer can at best eliminate T_kv_pull. Everything else is
queueing or compute that layerwise does not touch.
Usage:
python analyze_dst_migration.py \
--proxy-breakdown <RUNDIR>/breakdown.json \
--dst-log-dir <DST_LOG_DIR>
[--output <RUNDIR>/dst_migration_breakdown.csv]
[--plot <RUNDIR>/dst_migration_breakdown.png]
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
import statistics
import sys
from pathlib import Path
def _core_req_id(rid: str) -> str:
"""Normalize a vLLM engine req_id back to the proxy's request_id.
vLLM wraps the proxy id `S:T:U:N` as `cmpl-S:T:U:N-<dp_rank>-<hex>`.
Strip the `cmpl-` prefix and the trailing `-<digits>-<hex>` suffix so
it joins against the proxy `breakdown.json` request_id.
"""
if not rid:
return rid
s = rid
if s.startswith("cmpl-"):
s = s[len("cmpl-"):]
m = re.match(r"^(.*)-\d+-[0-9a-fA-F]+$", s)
if m:
s = m.group(1)
return s
def _pct(vals: list[float], q: float) -> float:
if not vals:
return float("nan")
vs = sorted(vals)
i = max(0, min(len(vs) - 1, int(math.ceil(q * len(vs))) - 1))
return vs[i]
def _summary(name: str, vals: list[float]) -> dict:
if not vals:
return {"name": name, "n": 0}
return {
"name": name,
"n": len(vals),
"mean_s": statistics.mean(vals),
"p50_s": _pct(vals, 0.5),
"p90_s": _pct(vals, 0.9),
"p99_s": _pct(vals, 0.99),
"max_s": max(vals),
"sum_s": sum(vals),
}
def load_dst_log(dst_log_dir: Path) -> dict[str, dict]:
by_req: dict[str, dict] = {}
found_files = sorted(dst_log_dir.glob("dm_mig_pid*.jsonl"))
print(f"[analyze] dst log files: {len(found_files)} under {dst_log_dir}")
for f in found_files:
with f.open() as fh:
for line in fh:
try:
rec = json.loads(line)
except Exception:
continue
rid = rec.get("req_id")
if not rid:
continue
key = _core_req_id(rid)
rec["_raw_req_id"] = rid
# If a req shows up twice (shouldn't, but be safe), prefer the
# one with t_first_token_unix populated.
prev = by_req.get(key)
if prev is None or (
rec.get("t_first_token_unix") and
not prev.get("t_first_token_unix")
):
by_req[key] = rec
print(f"[analyze] unique dst records: {len(by_req)}")
return by_req
def load_proxy_breakdown(path: Path) -> list[dict]:
with path.open() as fh:
data = json.load(fh)
assert isinstance(data, list), f"unexpected breakdown.json shape: {type(data)}"
return data
def decompose(proxy_recs: list[dict], dst_by_req: dict[str, dict]) -> list[dict]:
"""Build per-migration breakdown rows by joining proxy + dst by req_id."""
rows: list[dict] = []
migrations = [x for x in proxy_recs if x.get("route_class") == "PD_SEP_V2"]
print(f"[analyze] proxy migrations: {len(migrations)} "
f"(of {len(proxy_recs)} total requests)")
miss_in_dst = 0
missing_phases = 0
for p in migrations:
rid = p.get("request_id")
dst = dst_by_req.get(rid)
if dst is None:
miss_in_dst += 1
continue
if dst.get("t_first_token_unix") is None:
missing_phases += 1
# still include the row but mark phases as NaN downstream
t_decode_sent = p.get("t_decode_sent_unix")
t_first_tok = p.get("t_first_token_unix")
t_arrival = dst.get("t_arrival_unix")
t_wait_kvs = dst.get("t_wait_for_kvs_unix")
t_kv_done = dst.get("t_kv_recv_done_unix")
t_first_sched = dst.get("t_first_scheduled_unix")
t_first_tok_dst = dst.get("t_first_token_unix")
def _diff(a, b):
if a is None or b is None:
return None
return float(a) - float(b)
rows.append({
"request_id": rid,
"session_id": p.get("session_id"),
"input_length": p.get("input_length"),
"v3_new_local": p.get("v3_new_local"),
"v3_target_idx": p.get("v3_target_idx") or p.get("v3_decode_target_idx"),
"arrival_n_running": (dst.get("arrival_state") or {}).get("n_running"),
"arrival_n_waiting": (dst.get("arrival_state") or {}).get("n_waiting"),
"arrival_pending_prefill_tok": (dst.get("arrival_state") or {}).get("pending_prefill_tok"),
"arrival_n_waiting_for_kvs": (dst.get("arrival_state") or {}).get("n_waiting_for_kvs"),
# Phase durations (seconds)
"T_proxy_total_dst_first_token_s": _diff(t_first_tok, t_decode_sent),
"T_relay_s": _diff(t_arrival, t_decode_sent),
"T_admission_pre_kv_s": _diff(t_wait_kvs, t_arrival),
"T_kv_pull_s": _diff(t_kv_done, t_wait_kvs),
"T_admission_post_kv_s": _diff(t_first_sched, t_kv_done),
"T_first_iter_s": _diff(t_first_tok_dst, t_first_sched),
# Raw timestamps for debugging
"t_decode_sent_unix": t_decode_sent,
"t_dst_arrival_unix": t_arrival,
"t_dst_wait_for_kvs_unix": t_wait_kvs,
"t_dst_kv_recv_done_unix": t_kv_done,
"t_dst_first_scheduled_unix": t_first_sched,
"t_dst_first_token_unix": t_first_tok_dst,
"t_proxy_first_token_unix": t_first_tok,
})
print(f"[analyze] missing in dst log: {miss_in_dst}")
print(f"[analyze] dst record incomplete (no t_first_token): {missing_phases}")
return rows
def emit_summary(rows: list[dict]) -> None:
if not rows:
print("[analyze] no rows — nothing to summarize.")
return
phase_keys = [
"T_proxy_total_dst_first_token_s",
"T_relay_s",
"T_admission_pre_kv_s",
"T_kv_pull_s",
"T_admission_post_kv_s",
"T_first_iter_s",
]
print()
print("=" * 88)
print(f"Migration dst-side phase breakdown (n_migrations={len(rows)})")
print("=" * 88)
print(f"{'phase':<36} {'n':>4} {'mean(s)':>9} {'p50':>8} {'p90':>8} "
f"{'p99':>8} {'max':>8} {'sum(s)':>9}")
print("-" * 88)
for k in phase_keys:
vals = [r[k] for r in rows if r.get(k) is not None]
if not vals:
print(f"{k:<36} {'n/a':>4}")
continue
s = _summary(k, vals)
print(f"{k:<36} {s['n']:>4} {s['mean_s']:>9.3f} {s['p50_s']:>8.3f} "
f"{s['p90_s']:>8.3f} {s['p99_s']:>8.3f} {s['max_s']:>8.3f} "
f"{s['sum_s']:>9.2f}")
print()
print("Aggregate attribution (sum across all migrations):")
sums = {}
for k in ("T_relay_s", "T_admission_pre_kv_s", "T_kv_pull_s",
"T_admission_post_kv_s", "T_first_iter_s"):
sums[k] = sum(r[k] for r in rows if r.get(k) is not None)
total = sum(sums.values())
total_proxy = sum(r["T_proxy_total_dst_first_token_s"] for r in rows
if r.get("T_proxy_total_dst_first_token_s") is not None)
print(f" decomposed sum : {total:>8.2f} s")
print(f" proxy total sum : {total_proxy:>8.2f} s "
f"(should be ~equal; gap = uninstrumented)")
if total > 0:
for k, v in sums.items():
print(f" {k:<28} {v:>8.2f} s ({v/total*100:5.1f} %)")
# Headline: "How much could layerwise save?"
layerwise_addressable = sums.get("T_kv_pull_s", 0.0)
queue_residual = sum(v for k, v in sums.items() if k != "T_kv_pull_s")
print()
print("Layerwise-addressable vs queue-residual:")
print(f" T_kv_pull_s (addressable by layerwise) : {layerwise_addressable:>8.2f} s "
f"({layerwise_addressable / total * 100 if total else 0:5.1f} %)")
print(f" everything else (queue/admission/iter) : {queue_residual:>8.2f} s "
f"({queue_residual / total * 100 if total else 0:5.1f} %)")
def write_csv(rows: list[dict], path: Path) -> None:
import csv
if not rows:
path.write_text("")
return
fields = list(rows[0].keys())
with path.open("w", newline="") as fh:
w = csv.DictWriter(fh, fieldnames=fields)
w.writeheader()
w.writerows(rows)
print(f"[analyze] wrote CSV: {path} (n={len(rows)})")
def maybe_plot(rows: list[dict], out_path: Path) -> None:
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
except Exception as e:
print(f"[analyze] matplotlib unavailable ({e}); skipping plot.")
return
if not rows:
return
rows_sorted = sorted(
rows,
key=lambda r: r.get("T_proxy_total_dst_first_token_s") or 0.0,
)
n = len(rows_sorted)
idx = list(range(n))
def col(k):
return [(r.get(k) or 0.0) for r in rows_sorted]
relay = col("T_relay_s")
pre = col("T_admission_pre_kv_s")
pull = col("T_kv_pull_s")
post = col("T_admission_post_kv_s")
first_iter = col("T_first_iter_s")
fig, ax = plt.subplots(figsize=(11, 5))
bot = [0.0] * n
for vals, label, color in [
(relay, "HTTP relay", "#cccccc"),
(pre, "admission pre-KV", "#f4a261"),
(pull, "KV pull (layerwise-addressable)", "#e76f51"),
(post, "admission post-KV", "#2a9d8f"),
(first_iter, "first decode iter", "#264653"),
]:
ax.bar(idx, vals, bottom=bot, color=color, label=label, width=0.85)
bot = [b + v for b, v in zip(bot, vals)]
ax.set_xticks(idx)
ax.set_xticklabels([str(i + 1) for i in idx], rotation=0, fontsize=8)
ax.set_xlabel("Migrated request (sorted by total dst wait, ascending)")
ax.set_ylabel("Time (s)")
ax.set_title("Per-migration dst-side phase breakdown (v3 unified_v3 run)")
ax.legend(loc="upper left", fontsize=9)
ax.grid(axis="y", linestyle=":", alpha=0.5)
fig.tight_layout()
fig.savefig(out_path, dpi=120)
plt.close(fig)
print(f"[analyze] wrote plot: {out_path}")
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--proxy-breakdown", type=Path, required=True)
p.add_argument("--dst-log-dir", type=Path, required=True)
p.add_argument("--output", type=Path, default=None,
help="CSV path (default: <run>/dst_migration_breakdown.csv)")
p.add_argument("--plot", type=Path, default=None,
help="PNG path (default: <run>/dst_migration_breakdown.png)")
args = p.parse_args()
if not args.proxy_breakdown.is_file():
sys.exit(f"missing proxy breakdown: {args.proxy_breakdown}")
if not args.dst_log_dir.is_dir():
sys.exit(f"missing dst log dir: {args.dst_log_dir}")
run_dir = args.proxy_breakdown.parent
out_csv = args.output or (run_dir / "dst_migration_breakdown.csv")
out_png = args.plot or (run_dir / "dst_migration_breakdown.png")
proxy_recs = load_proxy_breakdown(args.proxy_breakdown)
dst_by_req = load_dst_log(args.dst_log_dir)
rows = decompose(proxy_recs, dst_by_req)
emit_summary(rows)
write_csv(rows, out_csv)
maybe_plot(rows, out_png)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,133 @@
#!/usr/bin/env python3
"""Per-migration log + per-instance summary for a v3 trace replay.
Reads <run_dir>/breakdown.json and <run_dir>/metrics.jsonl and emits:
1. A row per migration showing src→dst, per-side state snapshots, and
the resulting TTFT.
2. Histograms: migrations received per inst, sent per inst, all
(src→dst) pairs.
3. Post-rotation tail: how many turns of migrated sessions ended up on
each inst (downstream impact of rotation).
4. Anti-hotspot signal: recent_mig_received_in_window at decision time.
Run any v3 replay through this to spot pathological clustering of
migrations on the same dst within a short window.
Usage:
python analyze_migration_log.py <RUN_DIR>
where <RUN_DIR> contains breakdown.json + metrics.jsonl (i.e. the proxy's
per-policy output folder, e.g. .../b3_v3_20260527_1344/unified_v3).
"""
import json
import sys
from collections import Counter, defaultdict
from pathlib import Path
def main(run_dir: Path) -> None:
bd = json.load(open(run_dir / "breakdown.json"))
m = {json.loads(l)["request_id"]: json.loads(l)
for l in open(run_dir / "metrics.jsonl")}
mig = [e for e in bd if e.get("v3_migrate")]
mig.sort(key=lambda x: x.get("t_decision_unix", 0))
print(f"=== {len(mig)} migrations in {run_dir.name} ===\n")
cols = (
"#", "t_rel", "session", "turn",
"src", "dst", "src_nreq", "src_dec_tok",
"dst_nreq", "dst_cache", "dst_recent_recv",
"inlen", "self_ttft_ms",
)
print(" " + " ".join(f"{c:>13}" for c in cols))
print("-" * (15 * len(cols)))
t0 = mig[0]["t_decision_unix"] if mig else 0
for i, e in enumerate(mig):
rid = e["request_id"]
src_idx = e.get("v3_src_idx", e["chosen_idx"])
dst_idx = e.get("v3_target_idx", -1)
src_state = e.get("v3_src_state") or {}
dst_state = e.get("v3_target_state") or {}
cands = {c["idx"]: c for c in e.get("candidate_scores", [])}
# Fall back to candidate_scores if dedicated v3_*_state fields aren't present.
src_nreq = src_state.get("num_requests", cands.get(src_idx, {}).get("num_requests", "-"))
src_dec_tok = src_state.get("ongoing_decode_tokens",
cands.get(src_idx, {}).get("ongoing_decode_tokens", "-"))
dst_nreq = dst_state.get("num_requests", cands.get(dst_idx, {}).get("num_requests", "-"))
dst_cache = e.get("v3_target_cache_hit", dst_state.get("cache_hit_estimate", 0))
dst_recent = e.get("v3_target_recent_received",
dst_state.get("recent_mig_received_in_window", "-"))
inlen = e.get("input_length") or m.get(rid, {}).get("input_length", 0)
ttft = m.get(rid, {}).get("ttft_s") or 0
t_rel = e["t_decision_unix"] - t0
turn = m.get(rid, {}).get("turn_id", "?")
print(
f" {i+1:>13} {t_rel:>13.1f} {e['session_id']:>13} {turn:>13} "
f"{src_idx:>13} {dst_idx:>13} {src_nreq:>13} {src_dec_tok:>13} "
f"{dst_nreq:>13} {dst_cache:>13} {dst_recent:>13} "
f"{inlen:>13} {ttft*1000:>13.0f}"
)
# Aggregate counts
print("\n=== Migrations TO each instance ===")
to_count = Counter(e.get("v3_target_idx", -1) for e in mig)
for idx in range(8):
print(f" inst_{idx}: {to_count.get(idx, 0)} migrations received")
print("\n=== Migrations FROM each instance ===")
from_count = Counter(e.get("v3_src_idx", e["chosen_idx"]) for e in mig)
for idx in range(8):
print(f" inst_{idx}: {from_count.get(idx, 0)} migrations sent")
print("\n=== Migration pairs (src→dst, count) ===")
pair_count = Counter(
(e.get("v3_src_idx", e["chosen_idx"]), e.get("v3_target_idx", -1))
for e in mig
)
for (s, d), n in sorted(pair_count.items(), key=lambda x: -x[1]):
print(f" {s}{d}: {n}")
print("\n=== Sessions migrating multiple times ===")
sess_mig = defaultdict(list)
for e in mig:
sess_mig[e["session_id"]].append(
(e.get("t_decision_unix", 0),
e.get("v3_src_idx", e["chosen_idx"]),
e.get("v3_target_idx", -1))
)
multi = {s: ev for s, ev in sess_mig.items() if len(ev) > 1}
if not multi:
print(" (none)")
for sess, events in sorted(multi.items()):
chain = "".join(f"{s}->{d}" for _, s, d in sorted(events))
print(f" session {sess}: {chain}")
# Recent-received hotspot signal — non-zero values mean the picker
# accepted a target that recently got another migration.
print("\n=== Anti-hotspot signal: dst.recent_mig_received_in_window ===")
rec = [e.get("v3_target_recent_received", 0) for e in mig]
if rec:
nonzero = [v for v in rec if v]
print(f" total migrations: {len(rec)}, "
f"with recent_received > 0: {len(nonzero)}, "
f"max recent_received: {max(rec)}")
# Post-rotation tail: turns of migrated sessions after their LAST mig
print("\n=== Post-rotation tail per inst (turns of migrated sessions after last mig) ===")
tail = Counter()
for sess, events in sess_mig.items():
final_dst = sorted(events)[-1][2]
last_t = max(t for t, _, _ in events)
sess_turns = [mm for rid, mm in m.items() if mm["session_id"] == sess]
tail[final_dst] += sum(1 for mm in sess_turns
if mm.get("t_dispatch_unix", 0) > last_t)
for idx in range(8):
print(f" inst_{idx}: {tail.get(idx, 0)} tail turns")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("usage: analyze_migration_log.py <run_dir>", file=sys.stderr)
sys.exit(1)
main(Path(sys.argv[1]))

View File

@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""Decompose migration KV-transfer time into RDMA-actual vs control-plane.
Joins three logs from an instrumented unified_v3 run:
proxy breakdown.json — per-request route + phase timestamps
dst_mig_log/dm_mig_pid*.jsonl — dst lifecycle (instrument_dst_migration.py)
gives T_kv_pull = wait_for_kvs -> recv_done
mooncake xfer/mb2_transfer_pid*.jsonl — connector internals
(instrument_mooncake.py):
send_blocks : pure RDMA (total_bytes, duration_s) [producer]
receive_kv_enter/finish: consumer-observed transfer window [consumer]
ready_wait : producer wait for src KV commit [producer]
send_kv_to_decode_enter: producer received the pull request [producer]
Decisive question: of the 87% dst-side overhead that is T_kv_pull, how
much is the actual RDMA write (`send_blocks`) vs control-plane
(handshake / ready-wait / GIL starvation on the busy src)?
- send_blocks bandwidth ~ wire (10 GB/s) AND << T_kv_pull
=> loss is control-plane; layerwise (which only moves WHEN the
RDMA fires) will NOT fix it.
- send_blocks bandwidth << wire
=> the RDMA write itself is slow (NIC / src-side servicing);
characterize with a load microbench next.
Usage:
python analyze_transfer_decomp.py \
--proxy-breakdown <RUN>/unified_v3/breakdown.json \
--dst-log-dir <RUN>/dst_mig_log \
--xfer-log-dir <RUN>/xfer_log
"""
from __future__ import annotations
import argparse
import json
import math
import re
import statistics
import sys
from pathlib import Path
def _core_req_id(rid: str) -> str:
if not rid:
return rid
s = rid
if s.startswith("cmpl-"):
s = s[len("cmpl-"):]
m = re.match(r"^(.*)-\d+-[0-9a-fA-F]+$", s)
if m:
s = m.group(1)
return s
def _pct(vals, q):
if not vals:
return float("nan")
vs = sorted(vals)
i = max(0, min(len(vs) - 1, int(math.ceil(q * len(vs))) - 1))
return vs[i]
def _stat_line(name, vals, unit="s"):
if not vals:
print(f"{name:<34} n=0")
return
print(f"{name:<34} n={len(vals):>3} mean={statistics.mean(vals):>8.3f} "
f"p50={_pct(vals,0.5):>8.3f} p90={_pct(vals,0.9):>8.3f} "
f"max={max(vals):>8.3f} sum={sum(vals):>8.2f} {unit}")
def load_events(xfer_dir: Path):
files = sorted(xfer_dir.glob("mb2_transfer_pid*.jsonl"))
print(f"[xfer] log files: {len(files)} under {xfer_dir}")
send_blocks, recv_enter, recv_finish, ready_wait, send_enter = [], [], [], [], []
for f in files:
pid = f.stem.replace("mb2_transfer_pid", "")
with f.open() as fh:
for line in fh:
try:
e = json.loads(line)
except Exception:
continue
e["_pid"] = pid
ev = e.get("event")
if ev == "send_blocks":
send_blocks.append(e)
elif ev == "receive_kv_enter":
recv_enter.append(e)
elif ev == "receive_kv_finish":
recv_finish.append(e)
elif ev == "ready_wait":
ready_wait.append(e)
elif ev == "send_kv_to_decode_enter":
send_enter.append(e)
print(f"[xfer] events: send_blocks={len(send_blocks)} "
f"recv_enter={len(recv_enter)} recv_finish={len(recv_finish)} "
f"ready_wait={len(ready_wait)} send_enter={len(send_enter)}")
return send_blocks, recv_enter, recv_finish, ready_wait, send_enter
def main():
p = argparse.ArgumentParser()
p.add_argument("--proxy-breakdown", type=Path, required=True)
p.add_argument("--dst-log-dir", type=Path, required=True)
p.add_argument("--xfer-log-dir", type=Path, required=True)
args = p.parse_args()
for pth in (args.proxy_breakdown, args.dst_log_dir, args.xfer_log_dir):
if not pth.exists():
sys.exit(f"missing: {pth}")
proxy = json.load(args.proxy_breakdown.open())
migrations = [x for x in proxy if x.get("route_class") == "PD_SEP_V2"]
mig_ids = {x.get("request_id") for x in migrations}
print(f"[proxy] migrations: {len(migrations)} / {len(proxy)} total")
# dst lifecycle: T_kv_pull per migration (core req id)
dst_pull = {}
for f in sorted(args.dst_log_dir.glob("dm_mig_pid*.jsonl")):
for line in f.open():
try:
r = json.loads(line)
except Exception:
continue
tw = r.get("t_wait_for_kvs_unix")
td = r.get("t_kv_recv_done_unix")
if tw and td:
dst_pull[_core_req_id(r.get("req_id"))] = td - tw
sb, re_enter, re_finish, rw, se = load_events(args.xfer_log_dir)
# ---- 1. Pure RDMA bandwidth from send_blocks (the decisive number) ----
print("\n" + "=" * 90)
print("1. PURE RDMA WRITE rate (`send_blocks` = batch_transfer_sync_write)")
print("=" * 90)
bws, durs, bytes_l = [], [], []
for e in sb:
b = e.get("total_bytes", 0)
d = e.get("duration_s", 0)
if d and d > 0 and b > 0:
bws.append(b / 1e9 / d)
durs.append(d)
bytes_l.append(b)
if bws:
tot_b = sum(bytes_l)
tot_d = sum(durs)
print(f" send_blocks calls: {len(bws)}")
print(f" total bytes moved : {tot_b/2**30:.2f} GiB")
print(f" total RDMA time : {tot_d:.2f} s")
print(f" AGGREGATE rate : {tot_b/1e9/tot_d:.2f} GB/s "
f"(MB2 idle-src steady-state = ~9.7-10 GB/s)")
_stat_line(" per-call rate (GB/s)", bws, unit="GB/s")
_stat_line(" per-call duration", durs)
# bandwidth vs size — small ops are latency-bound
print("\n rate vs transfer size:")
pairs = sorted(zip(bytes_l, bws))
for b, w in pairs:
bar = "#" * int(min(40, w * 4))
print(f" {b/2**20:>8.1f} MiB {w:>6.2f} GB/s {bar}")
else:
print(" no send_blocks events with positive duration")
# ---- 2. Producer ready-wait (src KV commit) ----
print("\n" + "=" * 90)
print("2. PRODUCER ready-wait (src KV not yet committed when pull arrived)")
print("=" * 90)
rw_vals = [e.get("ready_wait_s", 0) for e in rw if e.get("ready_wait_s") is not None]
already = sum(1 for e in rw if e.get("ready_already_set"))
_stat_line(" ready_wait", rw_vals)
print(f" ready_already_set at entry: {already}/{len(rw)} "
f"(if most are True, src commit is not the bottleneck)")
# ---- 3. Consumer-observed receive_kv window ----
print("\n" + "=" * 90)
print("3. CONSUMER receive_kv window (enter->FINISH, ~most of T_kv_pull)")
print("=" * 90)
rf_vals = [e.get("duration_s", 0) for e in re_finish if e.get("duration_s")]
_stat_line(" receive_kv duration", rf_vals)
# ---- 4. Per-migration join: T_kv_pull vs receive_kv vs ready_wait ----
print("\n" + "=" * 90)
print("4. PER-MIGRATION join (T_kv_pull from dst vs connector internals)")
print("=" * 90)
# index connector events by core req id
rf_by_req = {}
for e in re_finish:
for rid in e.get("req_ids", []):
rf_by_req[_core_req_id(rid)] = e.get("duration_s")
rw_by_req = {}
for e in rw:
rw_by_req[_core_req_id(e.get("d_req_id", ""))] = e.get("ready_wait_s")
joined = 0
sum_pull = sum_recv = sum_rw = 0.0
rows = []
for m in migrations:
core = m.get("request_id")
pull = dst_pull.get(core)
recv = rf_by_req.get(core)
rwv = rw_by_req.get(core)
if pull is None and recv is None:
continue
joined += 1
if pull: sum_pull += pull
if recv: sum_recv += recv
if rwv: sum_rw += rwv
rows.append((core, m.get("input_length"), m.get("v3_target_cache_hit"),
pull, recv, rwv))
print(f" joined migrations: {joined}")
print(f" Σ T_kv_pull (dst) = {sum_pull:8.2f} s")
print(f" Σ receive_kv (consumer) = {sum_recv:8.2f} s")
print(f" Σ ready_wait (producer) = {sum_rw:8.2f} s")
# The RDMA share: best-effort total send_blocks time
sum_rdma = sum(durs) if durs else 0.0
print(f" Σ send_blocks RDMA = {sum_rdma:8.2f} s (all transfers, "
f"not just migrations)")
if sum_pull > 0:
print(f"\n RDMA-actual / T_kv_pull ≈ {sum_rdma/sum_pull*100:5.1f} %")
print(f" ready-wait / T_kv_pull ≈ {sum_rw/sum_pull*100:5.1f} %")
resid = sum_pull - sum_rdma - sum_rw
print(f" control-plane residual ≈ {resid/sum_pull*100:5.1f} % "
f"(handshake / ZMQ / GIL starvation)")
print("\n per-migration detail:")
print(f" {'req_id':<22} {'in_len':>7} {'dst_hit':>8} {'kv_pull':>8} "
f"{'recv_kv':>8} {'rdy_wait':>8}")
for core, il, hit, pull, recv, rwv in sorted(
rows, key=lambda r: -(r[3] or 0)):
def s(v): return f"{v:.2f}" if v is not None else " --"
print(f" {core:<22} {str(il):>7} {str(hit):>8} {s(pull):>8} "
f"{s(recv):>8} {s(rwv):>8}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env bash
# v3 trace replay with dst-side migration breakdown instrumentation.
#
# Same trace + DR_FIX as `run_v3_replay.sh`, plus:
# - instrument_dst_migration.py applied to vLLM scheduler
# - DM_LOG_DIR exported to all 8 vLLM instances so per-PID
# dst-migration logs land in <RUNDIR>/dst_mig_log/
# - analyze_dst_migration.py runs on completion to print the
# T_kv_pull vs queue-residual decomposition
#
# Usage: bash run_v3_dst_breakdown.sh
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_dstbreak_${DATE}}"
PYTHON="$PROJ_DIR/.venv/bin/python"
VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}"
DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py"
DM_INSTRUMENT="$PROJ_DIR/microbench/fresh_setup/instrument_dst_migration.py"
ANALYZE="$PROJ_DIR/microbench/connector_tax/cache_sweep/analyze_dst_migration.py"
mkdir -p "$OUTROOT"
DST_LOG_DIR="$OUTROOT/dst_mig_log"
mkdir -p "$DST_LOG_DIR"
echo "=== unified_v3 + dst-side migration breakdown ==="
echo "Trace : $TRACE"
echo "Out : $OUTROOT"
echo "DST logs : $DST_LOG_DIR"
echo ""
cleanup_all() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
"$PYTHON" "$DM_INSTRUMENT" --revert --venv "$PROJ_DIR/.venv" 2>/dev/null || true
}
trap cleanup_all EXIT
cleanup_all
echo "[stage 0a] applying CT_DR_FIX (env-gated)"
"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT"
echo "[stage 0b] applying DST migration instrumentation"
"$PYTHON" "$DM_INSTRUMENT" --apply --venv "$PROJ_DIR/.venv"
"$PYTHON" "$DM_INSTRUMENT" --check --venv "$PROJ_DIR/.venv"
cfg_dir="$OUTROOT/unified_v3"
mkdir -p "$cfg_dir"
# Activate DR-fix env gate (consistent with run_v3_replay.sh)
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
# Export DM_LOG_DIR — every vLLM EngineCore inherits this env and writes
# its own dm_mig_pid<pid>.jsonl into it.
export DM_LOG_DIR="$DST_LOG_DIR"
echo ""
echo "====== unified_v3 ; DR_SYNC_DISABLED=1 ; DM_LOG_DIR=$DST_LOG_DIR ======"
bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
echo ""
echo "[stage Z] reverting DR_FIX + DM instrument"
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT"
"$PYTHON" "$DM_INSTRUMENT" --revert --venv "$PROJ_DIR/.venv"
echo ""
echo "[stage analyze] dst-side migration breakdown"
"$PYTHON" "$ANALYZE" \
--proxy-breakdown "$cfg_dir/breakdown.json" \
--dst-log-dir "$DST_LOG_DIR" \
--output "$cfg_dir/dst_migration_breakdown.csv" \
--plot "$cfg_dir/dst_migration_breakdown.png" \
2>&1 | tee "$cfg_dir/dst_migration_breakdown.txt"
echo ""
echo "Done."
echo " proxy breakdown : $cfg_dir/breakdown.json"
echo " dst per-PID log : $DST_LOG_DIR/"
echo " decomposition : $cfg_dir/dst_migration_breakdown.{csv,png,txt}"

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env bash
# v3 trace replay with FULL migration instrumentation:
# - instrument_dst_migration.py : dst lifecycle -> T_kv_pull
# - instrument_mooncake.py : connector internals (send_blocks RDMA,
# receive_kv window, ready_wait)
# Goal: decompose the 87% T_kv_pull into RDMA-actual vs control-plane to
# explain why effective bandwidth is far below the ~10 GB/s wire rate.
#
# Usage: bash run_v3_full_breakdown.sh
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_fullbreak_${DATE}}"
PYTHON="$PROJ_DIR/.venv/bin/python"
VENV="$PROJ_DIR/.venv"
VLLM_ROOT="${VLLM_ROOT:-$VENV/lib/python3.12/site-packages/vllm}"
DR_FIX="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py"
DM_INSTR="$PROJ_DIR/microbench/fresh_setup/instrument_dst_migration.py"
MC_INSTR="$PROJ_DIR/microbench/fresh_setup/instrument_mooncake.py"
ANALYZE_DST="$PROJ_DIR/microbench/connector_tax/cache_sweep/analyze_dst_migration.py"
ANALYZE_XFER="$PROJ_DIR/microbench/connector_tax/cache_sweep/analyze_transfer_decomp.py"
mkdir -p "$OUTROOT"
DST_LOG_DIR="$OUTROOT/dst_mig_log"
XFER_LOG_DIR="$OUTROOT/xfer_log"
mkdir -p "$DST_LOG_DIR" "$XFER_LOG_DIR"
echo "=== unified_v3 + FULL migration breakdown ==="
echo "Out : $OUTROOT"
echo "DST logs : $DST_LOG_DIR"
echo "XFER logs: $XFER_LOG_DIR"
echo ""
cleanup_all() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
"$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
"$PYTHON" "$DM_INSTR" --revert --venv "$VENV" 2>/dev/null || true
"$PYTHON" "$MC_INSTR" --revert --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py" 2>/dev/null || true
}
trap cleanup_all EXIT
cleanup_all
echo "[0a] DR_FIX"
"$PYTHON" "$DR_FIX" --apply --vllm-root "$VLLM_ROOT"
echo "[0b] DST migration instrument"
"$PYTHON" "$DM_INSTR" --apply --venv "$VENV"
echo "[0c] Mooncake transfer instrument"
"$PYTHON" "$MC_INSTR" --apply --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
"$PYTHON" "$DM_INSTR" --check --venv "$VENV"
"$PYTHON" "$MC_INSTR" --check --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
cfg_dir="$OUTROOT/unified_v3"
mkdir -p "$cfg_dir"
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
export DM_LOG_DIR="$DST_LOG_DIR"
export MB2_LOG_DIR="$XFER_LOG_DIR"
echo ""
echo "====== unified_v3 ; DM_LOG_DIR + MB2_LOG_DIR set ======"
bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -25
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
echo ""
echo "[Z] revert all instruments"
"$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT"
"$PYTHON" "$DM_INSTR" --revert --venv "$VENV"
"$PYTHON" "$MC_INSTR" --revert --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
echo ""
echo "[analyze 1] dst-side T_kv_pull breakdown"
"$PYTHON" "$ANALYZE_DST" \
--proxy-breakdown "$cfg_dir/breakdown.json" \
--dst-log-dir "$DST_LOG_DIR" \
--output "$cfg_dir/dst_migration_breakdown.csv" \
--plot "$cfg_dir/dst_migration_breakdown.png" \
2>&1 | tee "$cfg_dir/dst_migration_breakdown.txt" || echo "(dst analyze failed)"
echo ""
echo "[analyze 2] transfer decomposition: RDMA-actual vs control-plane"
"$PYTHON" "$ANALYZE_XFER" \
--proxy-breakdown "$cfg_dir/breakdown.json" \
--dst-log-dir "$DST_LOG_DIR" \
--xfer-log-dir "$XFER_LOG_DIR" \
2>&1 | tee "$cfg_dir/transfer_decomp.txt" || echo "(xfer analyze failed)"
echo ""
echo "Done. Artifacts in $cfg_dir/"
echo " dst_migration_breakdown.{csv,png,txt}"
echo " transfer_decomp.txt"
echo " raw: $DST_LOG_DIR/ $XFER_LOG_DIR/"

View File

@@ -0,0 +1,198 @@
"""SLO-goodput analyzer + PD_advantage for the PD-disagg crossover study.
Reads per-arm replayer output (replay_metrics.jsonl) and computes, per arm:
- completion rate (error-free fraction)
- raw TTFT / TPOT / E2E percentiles (over successes — reported for context
only; NEVER the verdict metric, since failing arms have a small success set)
- SLO-goodput: fraction of OFFERED requests that are error-free AND meet a
(TTFT, TPOT) SLO. This is the verdict metric.
The two arms must replay the IDENTICAL trace (same seed), so they are paired
request-for-request. PD_advantage = goodput(arm) / goodput(baseline); y=1 is
the crossover line — PD_advantage >= 1 means PD-disagg wins.
Goodput is computed over a grid of SLO thresholds so the conclusion does not
hinge on one arbitrary cutoff.
Usage:
python analyze_goodput.py \
--arm 8C-proxy .../8C-proxy/replay_metrics.jsonl \
--arm 4P+4D .../4P+4D/replay_metrics.jsonl \
--baseline 8C-proxy \
--ttft-slo 0.5 1 2 5 --tpot-slo 0.05 0.1 0.2
"""
from __future__ import annotations
import argparse
import json
import statistics
from pathlib import Path
def load_metrics(path: Path) -> list[dict]:
rows = []
with path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def percentile(sorted_vals: list[float], pct: float) -> float:
n = len(sorted_vals)
if n == 0:
return float("nan")
if n == 1:
return sorted_vals[0]
rank = pct * (n - 1)
lo = int(rank)
hi = min(lo + 1, n - 1)
frac = rank - lo
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
def pstats(vals: list[float]) -> dict:
clean = sorted(v for v in vals if v is not None)
if not clean:
return {"count": 0}
return {
"count": len(clean),
"mean": statistics.fmean(clean),
"p50": percentile(clean, 0.50),
"p90": percentile(clean, 0.90),
"p99": percentile(clean, 0.99),
}
def offered_window_s(rows: list[dict]) -> float:
ts = [r.get("trace_timestamp_s") for r in rows if r.get("trace_timestamp_s") is not None]
if len(ts) < 2:
return 0.0
return max(ts) - min(ts)
def meets_slo(r: dict, ttft_slo: float, tpot_slo: float) -> bool:
if r.get("error") is not None:
return False
ttft = r.get("ttft_s")
tpot = r.get("tpot_s")
if ttft is None:
return False
if ttft > ttft_slo:
return False
# tpot=0 happens only for single-token outputs; treat as meeting any SLO.
if tpot is not None and tpot > tpot_slo:
return False
return True
def load_summary(jsonl_path: Path) -> dict:
"""Read the sibling replay_metrics.summary.json (wall-clock, amplification)."""
sp = jsonl_path.with_suffix(".summary.json")
if sp.exists():
try:
return json.loads(sp.read_text())
except Exception:
return {}
return {}
def summarize_arm(name: str, jsonl_path: Path, rows: list[dict]) -> dict:
n = len(rows)
ok = [r for r in rows if r.get("error") is None]
window = offered_window_s(rows)
summ = load_summary(jsonl_path)
return {
"name": name,
"n_offered": n,
"n_success": len(ok),
"completion_rate": len(ok) / n if n else 0.0,
"offered_window_s": window,
"offered_qps": n / window if window > 0 else 0.0,
# Throughput: how much longer than the offered window it took to drain.
# ~1.0 = keeps up; >1 = falling behind (the cleanest PD-collapse signal).
"wall_clock_s": summ.get("wall_clock_s"),
"amplification": summ.get("amplification"),
"ttft": pstats([r.get("ttft_s") for r in ok]),
"tpot": pstats([r.get("tpot_s") for r in ok]),
"e2e": pstats([r.get("latency_s") for r in ok]),
"_rows": rows,
}
def main() -> None:
p = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--arm", nargs=2, action="append", metavar=("NAME", "PATH"),
required=True, help="arm name + replay_metrics.jsonl path (repeatable)")
p.add_argument("--baseline", required=True, help="arm name to use as PD_advantage denominator")
p.add_argument("--ttft-slo", nargs="+", type=float, default=[0.5, 1.0, 2.0, 5.0])
p.add_argument("--tpot-slo", nargs="+", type=float, default=[0.05, 0.1, 0.2])
p.add_argument("--out-json", type=Path, default=None)
args = p.parse_args()
arms = {}
for name, path in args.arm:
arms[name] = summarize_arm(name, Path(path), load_metrics(Path(path)))
if args.baseline not in arms:
raise SystemExit(f"baseline {args.baseline!r} not among arms {list(arms)}")
# ---- per-arm overview ------------------------------------------------
print("=" * 78)
print("PER-ARM OVERVIEW (latency stats over successes only — context, not verdict)")
print("=" * 78)
hdr = f"{'arm':<12}{'offered':>8}{'compl%':>8}{'ampl':>6}{'oQPS':>7}" \
f"{'TTFTp50':>9}{'TTFTp90':>9}{'TPOTp50':>9}{'TPOTp99':>9}{'E2Ep90':>9}"
print(hdr)
for name, a in arms.items():
t, tp, e = a["ttft"], a["tpot"], a["e2e"]
ampl = a.get("amplification")
ampl_s = f"{ampl:>6.2f}" if isinstance(ampl, (int, float)) else f"{'--':>6}"
print(f"{name:<12}{a['n_offered']:>8}{100*a['completion_rate']:>7.1f}%"
f"{ampl_s}{a['offered_qps']:>7.2f}"
f"{t.get('p50', float('nan')):>9.2f}{t.get('p90', float('nan')):>9.2f}"
f"{1000*tp.get('p50', float('nan')):>8.0f}m{1000*tp.get('p99', float('nan')):>8.0f}m"
f"{e.get('p90', float('nan')):>9.2f}")
# ---- SLO-goodput grid + PD_advantage --------------------------------
base = arms[args.baseline]
grid = []
print()
print("=" * 78)
print(f"SLO-GOODPUT (attainment = error-free AND TTFT<=slo AND TPOT<=slo)")
print(f"PD_advantage = attainment(arm) / attainment(baseline={args.baseline}); "
f">=1 means arm wins")
print("=" * 78)
for ttft_slo in args.ttft_slo:
for tpot_slo in args.tpot_slo:
row = {"ttft_slo_s": ttft_slo, "tpot_slo_s": tpot_slo, "arms": {}}
base_n = sum(1 for r in base["_rows"] if meets_slo(r, ttft_slo, tpot_slo))
base_att = base_n / base["n_offered"] if base["n_offered"] else 0.0
line = f"TTFT<={ttft_slo:>4}s TPOT<={int(1000*tpot_slo):>4}ms | "
cells = []
for name, a in arms.items():
n_slo = sum(1 for r in a["_rows"] if meets_slo(r, ttft_slo, tpot_slo))
att = n_slo / a["n_offered"] if a["n_offered"] else 0.0
adv = (att / base_att) if base_att > 0 else float("nan")
row["arms"][name] = {"attainment": att, "pd_advantage": adv, "n_slo": n_slo}
tag = "" if name == args.baseline else f" adv={adv:.2f}"
cells.append(f"{name}={100*att:>5.1f}%{tag}")
print(line + " ".join(cells))
grid.append(row)
if args.out_json:
out = {
"baseline": args.baseline,
"arms": {n: {k: v for k, v in a.items() if k != "_rows"}
for n, a in arms.items()},
"slo_grid": grid,
}
args.out_json.write_text(json.dumps(out, indent=2))
print(f"\nwrote {args.out_json}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
"""Instrument vLLM V1 scheduler to dump per-request DST-side migration timeline.
For each request that arrives at the engine with `kv_transfer_params`
containing `do_remote_prefill=True` (i.e., the decode-target of an
EAR v3 migration), record:
t_arrival_unix — Scheduler.add_request() entry
t_wait_for_kvs_unix — status set to WAITING_FOR_REMOTE_KVS (KV pull start)
t_kv_recv_done_unix — req_id added to finished_recving_kv_req_ids
t_first_scheduled_unix — first time req appears in self.running after KV done
t_first_token_unix — first new_token_ids appended in update_from_output
arrival_state — {n_running, n_waiting, pending_prefill_tok,
n_waiting_for_kvs}
We complement the proxy `breakdown.json` (t_decode_sent_unix /
t_first_token_unix) to attribute the migration's dst-side wait into:
HTTP relay + admission_pre_kv + KV pull + admission_post_kv + first_iter
One JSONL per EngineCore PID at $DM_LOG_DIR/dm_mig_pid<pid>.jsonl
(default DM_LOG_DIR=/tmp). Records are flushed when t_first_token is
reached or when the request is aborted/finished.
Co-exists with MB5 KV snapshot patches (different START/END markers).
Usage:
python instrument_dst_migration.py --apply [--venv PATH]
python instrument_dst_migration.py --revert [--venv PATH]
python instrument_dst_migration.py --check [--venv PATH]
"""
from __future__ import annotations
import argparse
import re
from pathlib import Path
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv/.venv")
TARGET_REL = "lib/python3.12/site-packages/vllm/v1/core/sched/scheduler.py"
START_MARK = "# DM_INSTRUMENT_START"
END_MARK = "# DM_INSTRUMENT_END"
# ---------- Patch 1: module-level header (helpers + globals) -----------------
# Anchor: the very first `class Scheduler(SchedulerInterface):` line. We insert
# the entire helper block immediately before that, so MB5's prior block (if
# present) is preserved and our block lives in module scope. The anchor must
# stay outside our own START/END markers so revert() can re-find it.
HEADER_ANCHOR = "class Scheduler(SchedulerInterface):"
HEADER_INSERT = f"""{START_MARK}
import json as _dm_json
import os as _dm_os
import threading as _dm_threading
import time as _dm_time
_DM_LOG_DIR = _dm_os.environ.get("DM_LOG_DIR", "/tmp")
try:
_dm_os.makedirs(_DM_LOG_DIR, exist_ok=True)
except Exception:
pass
_DM_LOG_PATH = _dm_os.path.join(
_DM_LOG_DIR, f"dm_mig_pid{{_dm_os.getpid()}}.jsonl"
)
_DM_LOG_FILE = None
_DM_LOG_LOCK = _dm_threading.Lock()
# req_id -> in-flight record. We pop and flush when t_first_token lands or on
# finish/abort.
_DM_DATA: dict = {{}}
def _dm_write_event(d: dict) -> None:
global _DM_LOG_FILE
if _DM_LOG_FILE is None:
_DM_LOG_FILE = open(_DM_LOG_PATH, "a", buffering=1)
with _DM_LOG_LOCK:
_DM_LOG_FILE.write(_dm_json.dumps(d) + "\\n")
def _dm_is_migrated(request) -> bool:
ktp = getattr(request, "kv_transfer_params", None)
if not isinstance(ktp, dict):
return False
return bool(ktp.get("do_remote_prefill"))
def _dm_snapshot_arrival(scheduler) -> dict:
try:
n_running = len(scheduler.running)
except Exception:
n_running = -1
try:
n_waiting_main = len(scheduler.waiting)
except Exception:
n_waiting_main = -1
try:
n_skipped = len(scheduler.skipped_waiting)
except Exception:
n_skipped = 0
pending_tok = 0
n_kv = 0
try:
from vllm.v1.request import RequestStatus as _RS
for r in list(scheduler.waiting):
try:
if getattr(r, "status", None) == _RS.WAITING_FOR_REMOTE_KVS:
n_kv += 1
npr = int(getattr(r, "num_prompt_tokens", 0))
nct = int(getattr(r, "num_computed_tokens", 0))
pending_tok += max(0, npr - nct)
except Exception:
pass
for r in list(scheduler.skipped_waiting):
try:
if getattr(r, "status", None) == _RS.WAITING_FOR_REMOTE_KVS:
n_kv += 1
except Exception:
pass
except Exception:
pass
return {{
"n_running": int(n_running),
"n_waiting": int(n_waiting_main + n_skipped),
"pending_prefill_tok": int(pending_tok),
"n_waiting_for_kvs": int(n_kv),
}}
def _dm_emit_and_drop(req_id: str, reason: str = "first_token") -> None:
rec = _DM_DATA.pop(req_id, None)
if rec is None:
return
rec["flush_reason"] = reason
rec["t_flush_unix"] = _dm_time.time()
_dm_write_event(rec)
{END_MARK}
"""
# ---------- Patch 2: add_request() hook --------------------------------------
# Right after self.requests[request.request_id] = request (line ~1927) and the
# if self.log_stats: block. Anchor includes the QUEUED record_event line so it
# is uniquely matchable.
ADD_REQUEST_ANCHOR = """ self._enqueue_waiting_request(request)
self.requests[request.request_id] = request
if self.log_stats:
request.record_event(EngineCoreEventType.QUEUED)
"""
ADD_REQUEST_REPLACE = f""" self._enqueue_waiting_request(request)
self.requests[request.request_id] = request
if self.log_stats:
request.record_event(EngineCoreEventType.QUEUED)
{START_MARK}
try:
if _dm_is_migrated(request):
_DM_DATA[request.request_id] = {{
"req_id": str(request.request_id),
"is_migrated": True,
"n_prompt_tokens": int(getattr(request, "num_prompt_tokens", 0)),
"t_arrival_unix": _dm_time.time(),
"t_wait_for_kvs_unix": None,
"t_kv_recv_done_unix": None,
"t_first_scheduled_unix": None,
"t_first_token_unix": None,
"arrival_state": _dm_snapshot_arrival(self),
}}
except Exception:
pass
{END_MARK}
"""
# ---------- Patch 3: WAITING_FOR_REMOTE_KVS transition -----------------------
WAIT_KV_ANCHOR = """ request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
step_skipped_waiting.prepend_request(request)
"""
WAIT_KV_REPLACE = f""" request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
{START_MARK}
try:
_rec = _DM_DATA.get(request.request_id)
if _rec is not None and _rec["t_wait_for_kvs_unix"] is None:
_rec["t_wait_for_kvs_unix"] = _dm_time.time()
except Exception:
pass
{END_MARK}
step_skipped_waiting.prepend_request(request)
"""
# ---------- Patch 4: finished_recving signal ---------------------------------
FINISHED_RECV_ANCHOR = """ if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
self.finished_recving_kv_req_ids.add(req_id)
elif RequestStatus.is_finished(req.status):
"""
FINISHED_RECV_REPLACE = f""" if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
self.finished_recving_kv_req_ids.add(req_id)
{START_MARK}
try:
_rec = _DM_DATA.get(req_id)
if _rec is not None and _rec["t_kv_recv_done_unix"] is None:
_rec["t_kv_recv_done_unix"] = _dm_time.time()
except Exception:
pass
{END_MARK}
elif RequestStatus.is_finished(req.status):
"""
# ---------- Patch 5: first scheduled (sweep at end of schedule()) ------------
# Co-exists with the MB5 snapshot inserted at the same location.
SCHED_END_ANCHOR = """ # MB5_INSTRUMENT_START
_mb5_snapshot(self)
# MB5_INSTRUMENT_END
return scheduler_output
"""
SCHED_END_REPLACE = f""" # MB5_INSTRUMENT_START
_mb5_snapshot(self)
# MB5_INSTRUMENT_END
{START_MARK}
try:
if _DM_DATA:
_now_dm = _dm_time.time()
for _r in self.running:
_rec = _DM_DATA.get(_r.request_id)
if _rec is not None and _rec["t_first_scheduled_unix"] is None:
_rec["t_first_scheduled_unix"] = _now_dm
except Exception:
pass
{END_MARK}
return scheduler_output
"""
# ---------- Patch 6: first new token in update_from_output -------------------
FIRST_TOK_ANCHOR = """ # Check for stop and update request status.
if new_token_ids:
new_token_ids, stopped = self._update_request_with_output(
request, new_token_ids
)
"""
FIRST_TOK_REPLACE = f""" # Check for stop and update request status.
if new_token_ids:
{START_MARK}
try:
_rec = _DM_DATA.get(request.request_id)
if _rec is not None and _rec["t_first_token_unix"] is None:
_rec["t_first_token_unix"] = _dm_time.time()
_dm_emit_and_drop(request.request_id, reason="first_token")
except Exception:
pass
{END_MARK}
new_token_ids, stopped = self._update_request_with_output(
request, new_token_ids
)
"""
# ---------- Patch 7: abort/finish — flush partial record ---------------------
FINISH_ANCHOR = """ request.status = finished_status
self._free_request(request, delay_free_blocks=delay_free_blocks)
"""
FINISH_REPLACE = f""" request.status = finished_status
{START_MARK}
try:
if request.request_id in _DM_DATA:
_dm_emit_and_drop(request.request_id, reason="finish_or_abort")
except Exception:
pass
{END_MARK}
self._free_request(request, delay_free_blocks=delay_free_blocks)
"""
PATCHES = [
("header", HEADER_ANCHOR, HEADER_INSERT + HEADER_ANCHOR),
("add_request", ADD_REQUEST_ANCHOR, ADD_REQUEST_REPLACE),
("wait_for_kvs", WAIT_KV_ANCHOR, WAIT_KV_REPLACE),
("finished_recving", FINISHED_RECV_ANCHOR, FINISHED_RECV_REPLACE),
("first_scheduled", SCHED_END_ANCHOR, SCHED_END_REPLACE),
("first_token", FIRST_TOK_ANCHOR, FIRST_TOK_REPLACE),
("finish_flush", FINISH_ANCHOR, FINISH_REPLACE),
]
def find_target(venv_or_path: Path) -> Path:
candidates = [venv_or_path / TARGET_REL, DEFAULT_VENV / TARGET_REL]
for c in candidates:
if c.is_file():
return c
raise FileNotFoundError(f"cannot find {TARGET_REL} under {venv_or_path}")
def is_patched(text: str) -> bool:
return START_MARK in text
def apply(target: Path) -> None:
text = target.read_text()
if is_patched(text):
print(f"[dm-instr] already patched: {target}")
return
new = text
for name, src, dst in PATCHES:
if src not in new:
raise RuntimeError(
f"patch {name!r}: anchor not found in {target}. "
f"Anchor head: {src.splitlines()[0]!r}"
)
new = new.replace(src, dst, 1)
target.write_text(new)
print(f"[dm-instr] applied {len(PATCHES)} patches -> {target}")
def revert(target: Path) -> None:
text = target.read_text()
if not is_patched(text):
print(f"[dm-instr] not patched (nothing to revert): {target}")
return
# Strip our DM_* block, including the trailing newline that
# terminated the END_MARK line. We do NOT collapse other blank-line
# runs (MB5_* whitespace and original spacing between methods are
# preserved).
pat = re.compile(
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n",
flags=re.DOTALL,
)
new = pat.sub("", text)
# The header insert added a leading "# DM_INSTRUMENT_START\n" with
# two trailing blank lines and the anchor; revert removed the block
# plus its trailing newline, leaving one extra blank line before the
# class — harmless. We additionally collapse the very narrow case of
# "\n\n\nclass Scheduler" -> "\n\nclass Scheduler" so revert is
# byte-identical for that anchor.
new = re.sub(r"\n{3,}class Scheduler\(", "\n\nclass Scheduler(", new)
target.write_text(new)
print(f"[dm-instr] reverted: {target}")
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--apply", action="store_true")
p.add_argument("--revert", action="store_true")
p.add_argument("--check", action="store_true")
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
args = p.parse_args()
target = find_target(args.venv)
if args.apply:
apply(target)
elif args.revert:
revert(target)
elif args.check:
state = "PATCHED" if is_patched(target.read_text()) else "CLEAN"
print(f"[dm-instr] {state}: {target}")
else:
p.error("specify --apply / --revert / --check")
if __name__ == "__main__":
main()

View File

@@ -151,11 +151,65 @@ RECV_FINISH_REPLACE = f""" if response.status == MooncakeXfer
{END_MARK}
break"""
# ---- Patch 5: send_kv_to_decode entry (P-side, producer receives pull req) ----
SEND_ENTRY_TARGET = """ async def send_kv_to_decode(
self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata
):
pending_reqs: dict[ReqId, SendBlockMeta] = {}"""
SEND_ENTRY_REPLACE = f""" async def send_kv_to_decode(
self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata
):
pending_reqs: dict[ReqId, SendBlockMeta] = {{}}
{START_MARK}
try:
_mb2_log_event({{"event": "send_kv_to_decode_enter",
"d_req_ids": [str(r) for r in meta.req_blocks],
"t_start_unix": _mb2_time.time(),
"tp_rank": getattr(self, "tp_rank", -1)}})
except Exception:
pass
{END_MARK}"""
# ---- Patch 6: wait_and_ret ready-wait timing (P-side, src KV commit wait) ----
READY_WAIT_TARGET = """ async def wait_and_ret(
d_req_id: ReqId, send_meta: SendBlockMeta
) -> tuple[ReqId, SendBlockMeta]:
await send_meta.ready.wait()
return d_req_id, send_meta"""
READY_WAIT_REPLACE = f""" async def wait_and_ret(
d_req_id: ReqId, send_meta: SendBlockMeta
) -> tuple[ReqId, SendBlockMeta]:
{START_MARK}
_mb2_rw_start = _mb2_time.perf_counter()
_mb2_rw_start_unix = _mb2_time.time()
_mb2_rw_already = send_meta.ready.is_set()
{END_MARK}
await send_meta.ready.wait()
{START_MARK}
try:
_mb2_log_event({{"event": "ready_wait",
"d_req_id": str(d_req_id),
"transfer_id": str(getattr(send_meta, "transfer_id", "")),
"ready_already_set": bool(_mb2_rw_already),
"ready_wait_s": _mb2_time.perf_counter() - _mb2_rw_start,
"t_start_unix": _mb2_rw_start_unix,
"tp_rank": getattr(self, "tp_rank", -1)}})
except Exception:
pass
{END_MARK}
return d_req_id, send_meta"""
PATCHES = [
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
("_send_blocks", SEND_TARGET, SEND_REPLACE),
("receive_kv (entry)", RECV_ENTRY_TARGET, RECV_ENTRY_REPLACE),
("receive_kv (FINISH)", RECV_FINISH_TARGET, RECV_FINISH_REPLACE),
("send_kv (entry)", SEND_ENTRY_TARGET, SEND_ENTRY_REPLACE),
("ready_wait", READY_WAIT_TARGET, READY_WAIT_REPLACE),
]

View File

@@ -0,0 +1,261 @@
#!/usr/bin/env python3
"""MB6: KV-transfer bandwidth vs instance busy-ness.
Confirms the causal hypothesis from the v3 breakdown: the migration
transfer runs far below wire speed because it happens between instances
that are concurrently busy with compute (GIL-starved control plane +
HBM/NIC contention), NOT because of a wire/NIC limit.
Method (reuses the MB2 transfer primitive):
prefill on A (do_remote_decode, max_tokens=1) -> migrate to B
(do_remote_prefill). Time step 2 = the KV transfer.
For each background-load level B in --bg-loads, we hold B concurrent
long-decode streams on BOTH instances to keep them busy, then run
--repeats measured transfers per size. With the MB2 mooncake instrument
applied (MB2_LOG_DIR set), the analyzer can split the e2e transfer into
RDMA-actual (`send_blocks`) vs control-plane.
Expected: bg=0 reproduces MB2 (~10 GB/s); higher bg degrades toward the
~2-3 GB/s seen in the v3 trace.
Usage:
python mb6_transfer_under_load.py \
--src-port 8000 --dst-port 8001 --src-bp 8998 --dst-bp 8999 \
--sizes 16384,65536 --bg-loads 0,8,24 --repeats 4 \
--out mb6_result.json
"""
from __future__ import annotations
import argparse
import asyncio
import json
import statistics
import time
import uuid
from pathlib import Path
import httpx
MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct"
KV_PER_TOK = 98304 # Qwen3-30B-A3B est bytes/token
def synth_prompt(seed: int, n: int) -> list[int]:
import random
rng = random.Random(seed)
return [rng.randint(100, 150000) for _ in range(n)]
async def get_engine_id(client, host, bp):
r = await client.get(f"http://{host}:{bp}/query")
r.raise_for_status()
return r.json()["0"]["engine_id"]
async def completion(client, host, port, prompt, max_tokens, ktp=None, stream=False):
payload = {
"model": MODEL_PATH, "prompt": prompt, "max_tokens": max_tokens,
"min_tokens": max_tokens if max_tokens == 1 else 1,
"temperature": 0.0, "stream": stream,
}
if ktp:
payload["kv_transfer_params"] = ktp
t0 = time.perf_counter()
if stream:
# consume the stream to keep the instance decoding
async with client.stream("POST", f"http://{host}:{port}/v1/completions",
json=payload, timeout=600.0) as r:
r.raise_for_status()
async for _ in r.aiter_bytes():
pass
return time.perf_counter() - t0, {}
r = await client.post(f"http://{host}:{port}/v1/completions",
json=payload, timeout=600.0)
elapsed = time.perf_counter() - t0
r.raise_for_status()
return elapsed, r.json()
async def num_running(client, host, port) -> int:
"""Read vLLM running-request gauge from /metrics."""
try:
r = await client.get(f"http://{host}:{port}/metrics", timeout=5.0)
for line in r.text.splitlines():
if line.startswith("vllm:num_requests_running"):
return int(float(line.split()[-1]))
except Exception:
pass
return -1
class BackgroundLoad:
"""Maintain N concurrent long-decode streams on a set of (host,port)."""
def __init__(self, client, endpoints, concurrency, prompt_tokens=2000,
out_tokens=6000):
self.client = client
self.endpoints = endpoints
self.concurrency = concurrency
self.prompt_tokens = prompt_tokens
self.out_tokens = out_tokens
self._stop = asyncio.Event()
self._tasks: list[asyncio.Task] = []
async def _worker(self, idx):
host, port = self.endpoints[idx % len(self.endpoints)]
seed = 900000 + idx
while not self._stop.is_set():
prompt = synth_prompt(seed, self.prompt_tokens)
seed += 1
try:
await completion(self.client, host, port, prompt,
max_tokens=self.out_tokens, stream=True)
except Exception:
await asyncio.sleep(0.5)
def start(self):
self._tasks = [asyncio.create_task(self._worker(i))
for i in range(self.concurrency)]
async def stop(self):
self._stop.set()
for t in self._tasks:
t.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks = []
async def measure_transfer(client, src_host, src_port, dst_host, dst_port,
src_eid, src_bootstrap_addr, input_tokens, seed):
prompt = synth_prompt(seed, input_tokens)
transfer_id = uuid.uuid4().hex
# step 1: prefill on A
await completion(client, src_host, src_port, prompt, max_tokens=1,
ktp={"do_remote_decode": True, "transfer_id": transfer_id})
# step 2: migrate to B (this is the timed transfer)
t_start_unix = time.time()
t_xfer, _ = await completion(
client, dst_host, dst_port, prompt, max_tokens=1,
ktp={"do_remote_prefill": True, "transfer_id": transfer_id,
"remote_engine_id": src_eid,
"remote_bootstrap_addr": src_bootstrap_addr})
return {
"input_tokens": input_tokens,
"t_transfer_s": t_xfer,
"t_step2_start_unix": t_start_unix,
"t_step2_end_unix": time.time(),
"kv_bytes": input_tokens * KV_PER_TOK,
"eff_gbps": input_tokens * KV_PER_TOK / 1e9 / t_xfer if t_xfer > 0 else 0,
}
async def main_async(a):
sizes = [int(s) for s in a.sizes.split(",")]
bg_loads = [int(s) for s in a.bg_loads.split(",")]
src_host, dst_host = a.src_host, a.dst_host
limits = httpx.Limits(max_connections=256, max_keepalive_connections=256)
async with httpx.AsyncClient(limits=limits, trust_env=False) as client:
src_eid = await get_engine_id(client, src_host, a.src_bp)
src_bootstrap_addr = f"http://{src_host}:{a.src_bp}"
print(f"[mb6] src eid={src_eid[:16]}... endpoints A={src_host}:{a.src_port} "
f"B={dst_host}:{a.dst_port}")
endpoints = [(src_host, a.src_port), (dst_host, a.dst_port)]
results = []
for bg in bg_loads:
loader = None
if bg > 0:
loader = BackgroundLoad(client, endpoints, bg,
prompt_tokens=a.bg_prompt,
out_tokens=a.bg_out)
loader.start()
# wait for instances to actually be busy
print(f"[mb6] bg={bg}: ramping background load ...")
for _ in range(40):
await asyncio.sleep(1.0)
na = await num_running(client, src_host, a.src_port)
nb = await num_running(client, dst_host, a.dst_port)
if na >= 1 and nb >= 1:
print(f"[mb6] bg={bg}: busy (A running={na} B running={nb})")
break
else:
print(f"[mb6] bg=0: idle baseline")
# ensure idle
await asyncio.sleep(2.0)
for sz in sizes:
for rep in range(a.repeats):
na = await num_running(client, src_host, a.src_port)
nb = await num_running(client, dst_host, a.dst_port)
row = await measure_transfer(
client, src_host, a.src_port, dst_host, a.dst_port,
src_eid, src_bootstrap_addr, sz, seed=sz * 100 + rep + bg * 7)
row["bg_load"] = bg
row["A_running_at_measure"] = na
row["B_running_at_measure"] = nb
results.append(row)
kv_mib = sz * KV_PER_TOK / 2**20
print(f" bg={bg:>3} size={sz:>6} ({kv_mib:6.0f}MiB) rep={rep} "
f"A_run={na:>2} B_run={nb:>2} "
f"transfer={row['t_transfer_s']*1000:7.0f}ms "
f"eff={row['eff_gbps']:5.2f}GB/s")
if loader:
await loader.stop()
# let the instances drain before next bg level
print(f"[mb6] bg={bg}: draining ...")
for _ in range(60):
await asyncio.sleep(1.0)
na = await num_running(client, src_host, a.src_port)
nb = await num_running(client, dst_host, a.dst_port)
if na <= 0 and nb <= 0:
break
# summary per (bg, size)
print("\n=== summary: effective transfer bandwidth vs background load ===")
print(f"{'bg':>4} {'size':>7} {'n':>3} {'xfer_p50_ms':>12} {'eff_p50_GBps':>13} "
f"{'eff_mean':>9}")
summary = []
for bg in bg_loads:
for sz in sizes:
rs = [r for r in results if r["bg_load"] == bg and r["input_tokens"] == sz]
if not rs:
continue
xfer = sorted(r["t_transfer_s"] for r in rs)
eff = sorted(r["eff_gbps"] for r in rs)
p50x = xfer[len(xfer) // 2]
p50e = eff[len(eff) // 2]
meane = statistics.mean(eff)
summary.append({"bg": bg, "size": sz, "n": len(rs),
"xfer_p50_ms": p50x * 1000,
"eff_p50_gbps": p50e, "eff_mean_gbps": meane})
print(f"{bg:>4} {sz:>7} {len(rs):>3} {p50x*1000:>12.0f} "
f"{p50e:>13.2f} {meane:>9.2f}")
Path(a.out).write_text(json.dumps(
{"model": MODEL_PATH, "kv_bytes_per_token": KV_PER_TOK,
"label": a.label, "raw": results, "summary": summary}, indent=2))
print(f"\n[mb6] wrote {a.out}")
def main():
p = argparse.ArgumentParser()
p.add_argument("--src-host", default="127.0.0.1")
p.add_argument("--dst-host", default="127.0.0.1")
p.add_argument("--src-port", type=int, default=8000)
p.add_argument("--dst-port", type=int, default=8001)
p.add_argument("--src-bp", type=int, default=8998)
p.add_argument("--dst-bp", type=int, default=8999)
p.add_argument("--sizes", default="16384,65536")
p.add_argument("--bg-loads", default="0,8,24")
p.add_argument("--repeats", type=int, default=4)
p.add_argument("--bg-prompt", type=int, default=2000)
p.add_argument("--bg-out", type=int, default=6000)
p.add_argument("--label", default="main-venv")
p.add_argument("--out", default="mb6_result.json")
args = p.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

109
microbench/fresh_setup/run_mb6.sh Executable file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env bash
# MB6 launcher: 2 vLLM instances (kv_both, Mooncake) + transfer-under-load
# sweep. Parameterized by VENV so it runs on either the patched main venv
# or the fresh upstream venv, to test whether the bandwidth degradation is
# our patch or inherent to upstream mooncake.
#
# Usage:
# VENV=/home/admin/cpfs/wjh/agentic-kv/.venv bash run_mb6.sh # main
# VENV=/home/admin/cpfs/wjh/agentic-kv-fresh/.venv bash run_mb6.sh # fresh
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
VENV="${VENV:-$PROJ_DIR/.venv}"
LABEL="${LABEL:-$(basename $(dirname $VENV))}"
MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}"
GPUS="${GPUS:-0 1}"
SIZES="${SIZES:-16384,65536}"
BG_LOADS="${BG_LOADS:-0,8,24}"
REPEATS="${REPEATS:-4}"
DATE="$(date +%Y%m%d_%H%M)"
OUTDIR="${OUTDIR:-$PROJ_DIR/outputs/mb6_${LABEL}_${DATE}}"
PYTHON="$VENV/bin/python"
MC_INSTR="$PROJ_DIR/microbench/fresh_setup/instrument_mooncake.py"
DRIVER="$PROJ_DIR/microbench/fresh_setup/mb6_transfer_under_load.py"
MC_FILE="$VENV/lib/python3.12/site-packages/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
mkdir -p "$OUTDIR/logs"
XFER_LOG_DIR="$OUTDIR/xfer_log"; mkdir -p "$XFER_LOG_DIR"
echo "=== MB6 transfer-under-load ($LABEL) ==="
echo "VENV : $VENV"
echo "Out : $OUTDIR"
echo ""
PORTS=(8000 8001); BPS=(8998 8999)
gpu_arr=($GPUS)
cleanup() {
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 4
"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --revert 2>/dev/null || true
}
trap cleanup EXIT
cleanup
echo "[0] apply MB2 mooncake instrument to $LABEL venv"
"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --apply
"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --check
echo "[1] launch 2 instances"
i=0
for gpu in ${gpu_arr[@]:0:2}; do
port=${PORTS[$i]}; bp=${BPS[$i]}; master=$((29600 + i))
PYTHONHASHSEED=42 \
VLLM_MOONCAKE_BOOTSTRAP_PORT=$bp \
MB2_LOG_DIR="$XFER_LOG_DIR" \
CUDA_VISIBLE_DEVICES=$gpu \
MASTER_PORT=$master \
nohup "$VENV/bin/vllm" serve "$MODEL" \
--host 0.0.0.0 --port "$port" \
--tensor-parallel-size 1 --trust-remote-code --enable-prefix-caching \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
--enable-prompt-tokens-details \
> "$OUTDIR/logs/vllm_${i}_gpu${gpu}.log" 2>&1 &
disown
sleep 2
i=$((i + 1))
done
echo "[2] wait for health"
for i in 0 1; do
port=${PORTS[$i]}; tries=0
while ! curl -sf "http://127.0.0.1:$port/health" >/dev/null 2>&1; do
tries=$((tries + 1))
if [ $tries -gt 180 ]; then echo "FATAL inst_$i not healthy"; exit 1; fi
sleep 2
done
echo " inst_$i ready"
done
# bootstrap /query reachable?
for i in 0 1; do
bp=${BPS[$i]}; tries=0
while ! curl -sf "http://127.0.0.1:$bp/query" >/dev/null 2>&1; do
tries=$((tries + 1))
if [ $tries -gt 60 ]; then echo "WARN bootstrap $bp not ready"; break; fi
sleep 2
done
done
echo "[3] run MB6 driver"
"$PYTHON" "$DRIVER" \
--src-port "${PORTS[0]}" --dst-port "${PORTS[1]}" \
--src-bp "${BPS[0]}" --dst-bp "${BPS[1]}" \
--sizes "$SIZES" --bg-loads "$BG_LOADS" --repeats "$REPEATS" \
--label "$LABEL" --out "$OUTDIR/mb6_result.json" \
2>&1 | tee "$OUTDIR/mb6_run.txt"
echo "[4] teardown + revert"
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 4
"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --revert
echo ""
echo "Done. Artifacts in $OUTDIR/"
echo " mb6_result.json mb6_run.txt xfer_log/ logs/"