diff --git a/microbench/connector_tax/cache_sweep/MIGRATION_TRANSFER_COST.md b/microbench/connector_tax/cache_sweep/MIGRATION_TRANSFER_COST.md new file mode 100644 index 0000000..bd43cff --- /dev/null +++ b/microbench/connector_tax/cache_sweep/MIGRATION_TRANSFER_COST.md @@ -0,0 +1,178 @@ +# Why KV-transfer is slow during migration under real load + +**Question.** EAR's unified+A+B routing beats migration (v3) on agentic +workloads. We wanted to know whether *layerwise* KV transfer would shrink +migration's overhead enough to make it viable. Investigating that led to a +sharper question: **in a real (loaded) cluster, when we migrate, the KV +transfer is already slow — the effective bandwidth is far below the +~10 GB/s wire rate. Why?** + +This doc answers that with instrumented measurements. + +**TL;DR.** Migration fires precisely when instances are *busy* (that's the +trigger). But on a busy instance, KV transfer runs at **~3 GB/s instead of +~10 GB/s**, because: + +1. **The RDMA write itself slows ~2× under compute load** — GPU-direct RDMA + (`batch_transfer_sync_write`) contends with the running attention/MLP + kernels for **HBM and PCIe bandwidth**. (idle 7.6 GB/s → busy 4.0 GB/s) +2. **The connector's Python control plane gets GIL-starved** — mooncake's + ZMQ handshake + transfer orchestration run on asyncio threads inside the + engine process; when the engine's main thread is doing a long forward + pass (e.g. a 100k-token prefill), those threads stall for *seconds*. + +Both are **inherent to upstream vLLM 0.18.1 + mooncake** (reproduced on a +clean fresh venv; the transfer path is byte-identical to upstream — our +patches did not cause this), and both get **worse**, not better, with +layerwise transfer. So the bandwidth gap is not a layerwise problem; it is a +*transfer-on-a-busy-GPU* problem. + +--- + +## 1. Evidence chain + +Three independent measurements, all on dash0 (8×H100, Qwen3-Coder-30B-A3B, +TP=1), Mooncake `kv_both`. + +### 1a. Instrumented v3 trace replay — where does migration time go? + +Run `outputs/b3_v3_fullbreak_20260528_0338/`. Instruments: +`instrument_dst_migration.py` (dst scheduler lifecycle) + +`instrument_mooncake.py` (connector internals: `send_blocks` RDMA, +`receive_kv` window, `ready_wait`). + +25 migrations fired over the trace. Dst-side migration overhead +(`T_kv_pull` = scheduler marks `WAITING_FOR_REMOTE_KVS` → `finished_recving`): + +| component | share | what it is | +|---|---:|---| +| RDMA-actual (`batch_transfer_sync_write`) | **55%** (55.2 s) | the real RDMA write | +| dst control-plane gap | **45%** (45.4 s) | scheduler↔receiver_loop dispatch + completion propagation | +| `ready_wait` (src KV not committed) | 0% | 25/25 already committed — **ruled out** | + +- Pure RDMA aggregate rate: **2.03 GB/s** (vs MB2 idle 9.7 GB/s). +- RDMA rate **collapses with transfer size**: <3 GiB → 4–9.5 GB/s, + >5 GiB → 0.9–2.6 GB/s. +- The control-plane gap is **bimodal**: median 0.04 s, but a handful of + requests stall ~10 s. Those are small-KV transfers (0.18 s of actual RDMA) + whose `T_kv_pull` is 8–11 s — i.e. the dst's `receiver_loop` thread was + GIL-starved for ~10 s while the engine did a big forward pass. + +> Earlier (pre-instrumentation) we wrongly attributed ~90% of migration +> overhead to "dst scheduler queueing" by estimating transfer at clean wire +> speed. With real instrumentation, dst *scheduler admission* is ~0 +> (`T_admission_post_kv` = 0.003 s); the time is the transfer phase (RDMA + +> connector control plane), both degraded by instance busy-ness. + +### 1b. MB6 controlled microbench — does busy-ness cause it? + +`microbench/fresh_setup/mb6_transfer_under_load.py` + `run_mb6.sh`: 2 +instances, transfer a fixed-size KV (prefill on A → migrate to B) while +holding *N* background decode streams on both. Sweep N. + +Effective transfer bandwidth (65k-token KV ≈ 6 GiB), main venv: + +| background load | 65k transfer | eff bandwidth | +|---|---:|---:| +| **0 (idle)** | 747 ms | **8.76 GB/s** | +| 8 (4/instance) | 2423 ms | 4.53 GB/s | +| **24 (12/instance)** | 2015 ms | **3.33 GB/s** | + +Monotonic degradation with load. **The busy level (3.3 GB/s) matches the +v3 trace's 3.3 GB/s median exactly** — because agentic instances run +~10+ concurrent requests, i.e. the bg=24 regime. + +Decomposing the 65k transfer into RDMA-actual vs control-plane: + +| bg | RDMA rate | control-plane share | +|---|---:|---:| +| 0 (idle) | 7.56 GB/s | 13% | +| 8 | 4.07 GB/s | 11% | +| 24 (busy) | 3.97 GB/s | 15% | + +In the clean microbench the **RDMA write itself is the dominant degrading +term** (7.6 → 4.0 GB/s). The ~10 s control-plane stalls seen in the trace +(1a) don't reproduce here because steady decode forward passes are short; +they require the long (100k-token) prefills that the real trace has. + +### 1c. Fresh-venv comparison — is it our patch? + +Same MB6 sweep on `agentic-kv-fresh/.venv` (clean upstream-style 0.18.1): + +| bg | 65k eff (fresh) | 65k eff (main/patched) | +|---|---:|---:| +| 0 | 8.73 GB/s | 8.76 GB/s | +| 8 | 4.52 GB/s | 4.53 GB/s | +| 24 | 3.27 GB/s | 3.33 GB/s | + +**Identical within noise.** Plus a static check: the v3 transfer path +(`send_kv_to_decode`, `_send_blocks`/`batch_transfer_sync_write`, +`_build_transfer_params`) is **byte-identical** to pristine upstream 0.18.1 +(commit `445e491`); `receive_kv_from_single_worker` differs only by a 4-line +error branch. Our mooncake commits (`a7df84b` direct-read, +`ea51497` partial-prefill, `e3a1d70` read→push) only touch a *separate* +`direct_read` path that v3 does **not** use (v3 requests carry no +`direct_read` flag → normal push path). + +→ **The slowdown is upstream/hardware-inherent, not introduced by us.** + +--- + +## 2. Root cause + +Migration in agentic serving transfers KV **between instances that are +concurrently busy with compute** — by construction, since v3 migrates *away +from* a busy host. On a busy instance: + +- **HBM/PCIe contention (the dominant, irreducible part).** Mooncake's + transfer is GPU-direct RDMA: the NIC DMAs KV straight out of / into GPU + HBM. While the GPU runs attention+MLP kernels, those kernels saturate HBM + bandwidth, so the NIC's RDMA gets a smaller slice. Effective transfer + bandwidth roughly halves (7.6 → 4.0 GB/s at our load), and degrades + further for large multi-segment transfers. +- **Control-plane GIL starvation (secondary, bursty).** The connector runs + its ZMQ handshake + `send_kv_to_decode`/`receive_kv` orchestration on + asyncio threads (`sender_loop`/`receiver_loop`) *inside the engine + process*. A long forward pass (100k-token prefill) holds the GIL for + seconds, stalling those threads → multi-second dispatch gaps even when the + actual transfer is 0.2 s. + +MB2 measured 9.7 GB/s precisely because both endpoints were **idle**. The +real-workload gap is the difference between "idle benchmark" and "transfer +while the GPU is doing the day job." + +--- + +## 3. Implication: layerwise is the wrong lever; migration's tax is largely irreducible + +| lever | effect on the gap | +|---|---| +| **Model-level layerwise transfer** (push each layer's KV during prefill) | **Worse.** Prefill is the most HBM-intensive phase, so per-layer transfers contend *harder* for HBM (Cause 1); and they multiply the control-plane round-trips (Cause 2). | +| **Control-plane fix** (move mooncake orchestration off the GIL-contended threads / separate process) | Addresses only the bursty ~10 s stalls (~15% in the clean case, up to ~45% of the trace tail). Does **not** touch the HBM-contention half. | +| **Reduce bytes** (cache-aware target so less KV moves) | Helps linearly; v3 Mechanism B already does some. Orthogonal. | +| **Migrate to/from idle instances** | Would restore ~10 GB/s — but defeats the purpose (we migrate *because* the host is busy). | + +The dominant cost (RDMA contending with compute for HBM on busy instances) +is a **hardware reality**, not a software bug we can patch away, and not +something layerwise improves. This reinforces +[UNIFIED_ABLATION.md](UNIFIED_ABLATION.md): the unified no-migration path +(A+B'+RaceFix) remains the right default; migration's transfer tax is +structural on a loaded agentic cluster. + +--- + +## 4. Repro / artifacts + +- Instrumented v3 breakdown: `outputs/b3_v3_fullbreak_20260528_0338/unified_v3/` + (`transfer_decomp.txt`, `dst_migration_breakdown.{csv,png}`, + `transfer_rootcause.png`) +- MB6 main: `outputs/mb6_agentic-kv_20260528_0552/mb6_result.json` +- MB6 fresh: `outputs/mb6_fresh_20260528_0559/mb6_result.json` +- Instruments: `microbench/fresh_setup/instrument_dst_migration.py`, + `microbench/fresh_setup/instrument_mooncake.py` +- Microbench: `microbench/fresh_setup/mb6_transfer_under_load.py` + + `run_mb6.sh` (`VENV=… bash run_mb6.sh`) +- Analyzers: `analyze_dst_migration.py`, `analyze_transfer_decomp.py` + +All instruments apply/revert cleanly via `--apply`/`--revert`; both venvs +were restored after the runs. diff --git a/microbench/connector_tax/cache_sweep/analyze_dst_migration.py b/microbench/connector_tax/cache_sweep/analyze_dst_migration.py new file mode 100755 index 0000000..b1e528d --- /dev/null +++ b/microbench/connector_tax/cache_sweep/analyze_dst_migration.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 +"""Analyze dst-side migration breakdown for unified_v3 runs. + +Joins the proxy `breakdown.json` (per-request route + phase timestamps) +with the dst engine per-PID logs written by +`instrument_dst_migration.py` (`dm_mig_pid.jsonl`), to attribute +each migration's dst-side wall-clock into: + + T_relay proxy decode-sent → dst arrival + T_admission_pre_kv dst arrival → status=WAITING_FOR_REMOTE_KVS + (waiting in dst's scheduler queue before KV pull + is even initiated) + T_kv_pull WAITING_FOR_REMOTE_KVS → finished_recving + (the actual RDMA transfer + connector ack) + T_admission_post_kv finished_recving → first time in self.running + (KV ready, waiting for batch slot) + T_first_iter first scheduled → first generated token + (one decode-iter compute + sampler latency) + +Layerwise transfer can at best eliminate T_kv_pull. Everything else is +queueing or compute that layerwise does not touch. + +Usage: + python analyze_dst_migration.py \ + --proxy-breakdown /breakdown.json \ + --dst-log-dir + [--output /dst_migration_breakdown.csv] + [--plot /dst_migration_breakdown.png] +""" +from __future__ import annotations + +import argparse +import json +import math +import os +import re +import statistics +import sys +from pathlib import Path + + +def _core_req_id(rid: str) -> str: + """Normalize a vLLM engine req_id back to the proxy's request_id. + + vLLM wraps the proxy id `S:T:U:N` as `cmpl-S:T:U:N--`. + Strip the `cmpl-` prefix and the trailing `--` suffix so + it joins against the proxy `breakdown.json` request_id. + """ + if not rid: + return rid + s = rid + if s.startswith("cmpl-"): + s = s[len("cmpl-"):] + m = re.match(r"^(.*)-\d+-[0-9a-fA-F]+$", s) + if m: + s = m.group(1) + return s + + +def _pct(vals: list[float], q: float) -> float: + if not vals: + return float("nan") + vs = sorted(vals) + i = max(0, min(len(vs) - 1, int(math.ceil(q * len(vs))) - 1)) + return vs[i] + + +def _summary(name: str, vals: list[float]) -> dict: + if not vals: + return {"name": name, "n": 0} + return { + "name": name, + "n": len(vals), + "mean_s": statistics.mean(vals), + "p50_s": _pct(vals, 0.5), + "p90_s": _pct(vals, 0.9), + "p99_s": _pct(vals, 0.99), + "max_s": max(vals), + "sum_s": sum(vals), + } + + +def load_dst_log(dst_log_dir: Path) -> dict[str, dict]: + by_req: dict[str, dict] = {} + found_files = sorted(dst_log_dir.glob("dm_mig_pid*.jsonl")) + print(f"[analyze] dst log files: {len(found_files)} under {dst_log_dir}") + for f in found_files: + with f.open() as fh: + for line in fh: + try: + rec = json.loads(line) + except Exception: + continue + rid = rec.get("req_id") + if not rid: + continue + key = _core_req_id(rid) + rec["_raw_req_id"] = rid + # If a req shows up twice (shouldn't, but be safe), prefer the + # one with t_first_token_unix populated. + prev = by_req.get(key) + if prev is None or ( + rec.get("t_first_token_unix") and + not prev.get("t_first_token_unix") + ): + by_req[key] = rec + print(f"[analyze] unique dst records: {len(by_req)}") + return by_req + + +def load_proxy_breakdown(path: Path) -> list[dict]: + with path.open() as fh: + data = json.load(fh) + assert isinstance(data, list), f"unexpected breakdown.json shape: {type(data)}" + return data + + +def decompose(proxy_recs: list[dict], dst_by_req: dict[str, dict]) -> list[dict]: + """Build per-migration breakdown rows by joining proxy + dst by req_id.""" + rows: list[dict] = [] + migrations = [x for x in proxy_recs if x.get("route_class") == "PD_SEP_V2"] + print(f"[analyze] proxy migrations: {len(migrations)} " + f"(of {len(proxy_recs)} total requests)") + + miss_in_dst = 0 + missing_phases = 0 + for p in migrations: + rid = p.get("request_id") + dst = dst_by_req.get(rid) + if dst is None: + miss_in_dst += 1 + continue + if dst.get("t_first_token_unix") is None: + missing_phases += 1 + # still include the row but mark phases as NaN downstream + t_decode_sent = p.get("t_decode_sent_unix") + t_first_tok = p.get("t_first_token_unix") + t_arrival = dst.get("t_arrival_unix") + t_wait_kvs = dst.get("t_wait_for_kvs_unix") + t_kv_done = dst.get("t_kv_recv_done_unix") + t_first_sched = dst.get("t_first_scheduled_unix") + t_first_tok_dst = dst.get("t_first_token_unix") + + def _diff(a, b): + if a is None or b is None: + return None + return float(a) - float(b) + + rows.append({ + "request_id": rid, + "session_id": p.get("session_id"), + "input_length": p.get("input_length"), + "v3_new_local": p.get("v3_new_local"), + "v3_target_idx": p.get("v3_target_idx") or p.get("v3_decode_target_idx"), + "arrival_n_running": (dst.get("arrival_state") or {}).get("n_running"), + "arrival_n_waiting": (dst.get("arrival_state") or {}).get("n_waiting"), + "arrival_pending_prefill_tok": (dst.get("arrival_state") or {}).get("pending_prefill_tok"), + "arrival_n_waiting_for_kvs": (dst.get("arrival_state") or {}).get("n_waiting_for_kvs"), + # Phase durations (seconds) + "T_proxy_total_dst_first_token_s": _diff(t_first_tok, t_decode_sent), + "T_relay_s": _diff(t_arrival, t_decode_sent), + "T_admission_pre_kv_s": _diff(t_wait_kvs, t_arrival), + "T_kv_pull_s": _diff(t_kv_done, t_wait_kvs), + "T_admission_post_kv_s": _diff(t_first_sched, t_kv_done), + "T_first_iter_s": _diff(t_first_tok_dst, t_first_sched), + # Raw timestamps for debugging + "t_decode_sent_unix": t_decode_sent, + "t_dst_arrival_unix": t_arrival, + "t_dst_wait_for_kvs_unix": t_wait_kvs, + "t_dst_kv_recv_done_unix": t_kv_done, + "t_dst_first_scheduled_unix": t_first_sched, + "t_dst_first_token_unix": t_first_tok_dst, + "t_proxy_first_token_unix": t_first_tok, + }) + + print(f"[analyze] missing in dst log: {miss_in_dst}") + print(f"[analyze] dst record incomplete (no t_first_token): {missing_phases}") + return rows + + +def emit_summary(rows: list[dict]) -> None: + if not rows: + print("[analyze] no rows — nothing to summarize.") + return + + phase_keys = [ + "T_proxy_total_dst_first_token_s", + "T_relay_s", + "T_admission_pre_kv_s", + "T_kv_pull_s", + "T_admission_post_kv_s", + "T_first_iter_s", + ] + + print() + print("=" * 88) + print(f"Migration dst-side phase breakdown (n_migrations={len(rows)})") + print("=" * 88) + print(f"{'phase':<36} {'n':>4} {'mean(s)':>9} {'p50':>8} {'p90':>8} " + f"{'p99':>8} {'max':>8} {'sum(s)':>9}") + print("-" * 88) + for k in phase_keys: + vals = [r[k] for r in rows if r.get(k) is not None] + if not vals: + print(f"{k:<36} {'n/a':>4}") + continue + s = _summary(k, vals) + print(f"{k:<36} {s['n']:>4} {s['mean_s']:>9.3f} {s['p50_s']:>8.3f} " + f"{s['p90_s']:>8.3f} {s['p99_s']:>8.3f} {s['max_s']:>8.3f} " + f"{s['sum_s']:>9.2f}") + + print() + print("Aggregate attribution (sum across all migrations):") + sums = {} + for k in ("T_relay_s", "T_admission_pre_kv_s", "T_kv_pull_s", + "T_admission_post_kv_s", "T_first_iter_s"): + sums[k] = sum(r[k] for r in rows if r.get(k) is not None) + total = sum(sums.values()) + total_proxy = sum(r["T_proxy_total_dst_first_token_s"] for r in rows + if r.get("T_proxy_total_dst_first_token_s") is not None) + print(f" decomposed sum : {total:>8.2f} s") + print(f" proxy total sum : {total_proxy:>8.2f} s " + f"(should be ~equal; gap = uninstrumented)") + if total > 0: + for k, v in sums.items(): + print(f" {k:<28} {v:>8.2f} s ({v/total*100:5.1f} %)") + + # Headline: "How much could layerwise save?" + layerwise_addressable = sums.get("T_kv_pull_s", 0.0) + queue_residual = sum(v for k, v in sums.items() if k != "T_kv_pull_s") + print() + print("Layerwise-addressable vs queue-residual:") + print(f" T_kv_pull_s (addressable by layerwise) : {layerwise_addressable:>8.2f} s " + f"({layerwise_addressable / total * 100 if total else 0:5.1f} %)") + print(f" everything else (queue/admission/iter) : {queue_residual:>8.2f} s " + f"({queue_residual / total * 100 if total else 0:5.1f} %)") + + +def write_csv(rows: list[dict], path: Path) -> None: + import csv + if not rows: + path.write_text("") + return + fields = list(rows[0].keys()) + with path.open("w", newline="") as fh: + w = csv.DictWriter(fh, fieldnames=fields) + w.writeheader() + w.writerows(rows) + print(f"[analyze] wrote CSV: {path} (n={len(rows)})") + + +def maybe_plot(rows: list[dict], out_path: Path) -> None: + try: + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + except Exception as e: + print(f"[analyze] matplotlib unavailable ({e}); skipping plot.") + return + + if not rows: + return + + rows_sorted = sorted( + rows, + key=lambda r: r.get("T_proxy_total_dst_first_token_s") or 0.0, + ) + n = len(rows_sorted) + idx = list(range(n)) + + def col(k): + return [(r.get(k) or 0.0) for r in rows_sorted] + + relay = col("T_relay_s") + pre = col("T_admission_pre_kv_s") + pull = col("T_kv_pull_s") + post = col("T_admission_post_kv_s") + first_iter = col("T_first_iter_s") + + fig, ax = plt.subplots(figsize=(11, 5)) + bot = [0.0] * n + for vals, label, color in [ + (relay, "HTTP relay", "#cccccc"), + (pre, "admission pre-KV", "#f4a261"), + (pull, "KV pull (layerwise-addressable)", "#e76f51"), + (post, "admission post-KV", "#2a9d8f"), + (first_iter, "first decode iter", "#264653"), + ]: + ax.bar(idx, vals, bottom=bot, color=color, label=label, width=0.85) + bot = [b + v for b, v in zip(bot, vals)] + + ax.set_xticks(idx) + ax.set_xticklabels([str(i + 1) for i in idx], rotation=0, fontsize=8) + ax.set_xlabel("Migrated request (sorted by total dst wait, ascending)") + ax.set_ylabel("Time (s)") + ax.set_title("Per-migration dst-side phase breakdown (v3 unified_v3 run)") + ax.legend(loc="upper left", fontsize=9) + ax.grid(axis="y", linestyle=":", alpha=0.5) + fig.tight_layout() + fig.savefig(out_path, dpi=120) + plt.close(fig) + print(f"[analyze] wrote plot: {out_path}") + + +def main() -> None: + p = argparse.ArgumentParser() + p.add_argument("--proxy-breakdown", type=Path, required=True) + p.add_argument("--dst-log-dir", type=Path, required=True) + p.add_argument("--output", type=Path, default=None, + help="CSV path (default: /dst_migration_breakdown.csv)") + p.add_argument("--plot", type=Path, default=None, + help="PNG path (default: /dst_migration_breakdown.png)") + args = p.parse_args() + + if not args.proxy_breakdown.is_file(): + sys.exit(f"missing proxy breakdown: {args.proxy_breakdown}") + if not args.dst_log_dir.is_dir(): + sys.exit(f"missing dst log dir: {args.dst_log_dir}") + + run_dir = args.proxy_breakdown.parent + out_csv = args.output or (run_dir / "dst_migration_breakdown.csv") + out_png = args.plot or (run_dir / "dst_migration_breakdown.png") + + proxy_recs = load_proxy_breakdown(args.proxy_breakdown) + dst_by_req = load_dst_log(args.dst_log_dir) + rows = decompose(proxy_recs, dst_by_req) + emit_summary(rows) + write_csv(rows, out_csv) + maybe_plot(rows, out_png) + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/cache_sweep/analyze_migration_log.py b/microbench/connector_tax/cache_sweep/analyze_migration_log.py new file mode 100644 index 0000000..db7ec3f --- /dev/null +++ b/microbench/connector_tax/cache_sweep/analyze_migration_log.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +"""Per-migration log + per-instance summary for a v3 trace replay. + +Reads /breakdown.json and /metrics.jsonl and emits: + 1. A row per migration showing src→dst, per-side state snapshots, and + the resulting TTFT. + 2. Histograms: migrations received per inst, sent per inst, all + (src→dst) pairs. + 3. Post-rotation tail: how many turns of migrated sessions ended up on + each inst (downstream impact of rotation). + 4. Anti-hotspot signal: recent_mig_received_in_window at decision time. + +Run any v3 replay through this to spot pathological clustering of +migrations on the same dst within a short window. + +Usage: + python analyze_migration_log.py +where contains breakdown.json + metrics.jsonl (i.e. the proxy's +per-policy output folder, e.g. .../b3_v3_20260527_1344/unified_v3). +""" +import json +import sys +from collections import Counter, defaultdict +from pathlib import Path + + +def main(run_dir: Path) -> None: + bd = json.load(open(run_dir / "breakdown.json")) + m = {json.loads(l)["request_id"]: json.loads(l) + for l in open(run_dir / "metrics.jsonl")} + + mig = [e for e in bd if e.get("v3_migrate")] + mig.sort(key=lambda x: x.get("t_decision_unix", 0)) + + print(f"=== {len(mig)} migrations in {run_dir.name} ===\n") + cols = ( + "#", "t_rel", "session", "turn", + "src", "dst", "src_nreq", "src_dec_tok", + "dst_nreq", "dst_cache", "dst_recent_recv", + "inlen", "self_ttft_ms", + ) + print(" " + " ".join(f"{c:>13}" for c in cols)) + print("-" * (15 * len(cols))) + + t0 = mig[0]["t_decision_unix"] if mig else 0 + for i, e in enumerate(mig): + rid = e["request_id"] + src_idx = e.get("v3_src_idx", e["chosen_idx"]) + dst_idx = e.get("v3_target_idx", -1) + src_state = e.get("v3_src_state") or {} + dst_state = e.get("v3_target_state") or {} + cands = {c["idx"]: c for c in e.get("candidate_scores", [])} + # Fall back to candidate_scores if dedicated v3_*_state fields aren't present. + src_nreq = src_state.get("num_requests", cands.get(src_idx, {}).get("num_requests", "-")) + src_dec_tok = src_state.get("ongoing_decode_tokens", + cands.get(src_idx, {}).get("ongoing_decode_tokens", "-")) + dst_nreq = dst_state.get("num_requests", cands.get(dst_idx, {}).get("num_requests", "-")) + dst_cache = e.get("v3_target_cache_hit", dst_state.get("cache_hit_estimate", 0)) + dst_recent = e.get("v3_target_recent_received", + dst_state.get("recent_mig_received_in_window", "-")) + inlen = e.get("input_length") or m.get(rid, {}).get("input_length", 0) + ttft = m.get(rid, {}).get("ttft_s") or 0 + t_rel = e["t_decision_unix"] - t0 + turn = m.get(rid, {}).get("turn_id", "?") + print( + f" {i+1:>13} {t_rel:>13.1f} {e['session_id']:>13} {turn:>13} " + f"{src_idx:>13} {dst_idx:>13} {src_nreq:>13} {src_dec_tok:>13} " + f"{dst_nreq:>13} {dst_cache:>13} {dst_recent:>13} " + f"{inlen:>13} {ttft*1000:>13.0f}" + ) + + # Aggregate counts + print("\n=== Migrations TO each instance ===") + to_count = Counter(e.get("v3_target_idx", -1) for e in mig) + for idx in range(8): + print(f" inst_{idx}: {to_count.get(idx, 0)} migrations received") + + print("\n=== Migrations FROM each instance ===") + from_count = Counter(e.get("v3_src_idx", e["chosen_idx"]) for e in mig) + for idx in range(8): + print(f" inst_{idx}: {from_count.get(idx, 0)} migrations sent") + + print("\n=== Migration pairs (src→dst, count) ===") + pair_count = Counter( + (e.get("v3_src_idx", e["chosen_idx"]), e.get("v3_target_idx", -1)) + for e in mig + ) + for (s, d), n in sorted(pair_count.items(), key=lambda x: -x[1]): + print(f" {s} → {d}: {n}") + + print("\n=== Sessions migrating multiple times ===") + sess_mig = defaultdict(list) + for e in mig: + sess_mig[e["session_id"]].append( + (e.get("t_decision_unix", 0), + e.get("v3_src_idx", e["chosen_idx"]), + e.get("v3_target_idx", -1)) + ) + multi = {s: ev for s, ev in sess_mig.items() if len(ev) > 1} + if not multi: + print(" (none)") + for sess, events in sorted(multi.items()): + chain = " → ".join(f"{s}->{d}" for _, s, d in sorted(events)) + print(f" session {sess}: {chain}") + + # Recent-received hotspot signal — non-zero values mean the picker + # accepted a target that recently got another migration. + print("\n=== Anti-hotspot signal: dst.recent_mig_received_in_window ===") + rec = [e.get("v3_target_recent_received", 0) for e in mig] + if rec: + nonzero = [v for v in rec if v] + print(f" total migrations: {len(rec)}, " + f"with recent_received > 0: {len(nonzero)}, " + f"max recent_received: {max(rec)}") + + # Post-rotation tail: turns of migrated sessions after their LAST mig + print("\n=== Post-rotation tail per inst (turns of migrated sessions after last mig) ===") + tail = Counter() + for sess, events in sess_mig.items(): + final_dst = sorted(events)[-1][2] + last_t = max(t for t, _, _ in events) + sess_turns = [mm for rid, mm in m.items() if mm["session_id"] == sess] + tail[final_dst] += sum(1 for mm in sess_turns + if mm.get("t_dispatch_unix", 0) > last_t) + for idx in range(8): + print(f" inst_{idx}: {tail.get(idx, 0)} tail turns") + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("usage: analyze_migration_log.py ", file=sys.stderr) + sys.exit(1) + main(Path(sys.argv[1])) diff --git a/microbench/connector_tax/cache_sweep/analyze_transfer_decomp.py b/microbench/connector_tax/cache_sweep/analyze_transfer_decomp.py new file mode 100755 index 0000000..2a56f23 --- /dev/null +++ b/microbench/connector_tax/cache_sweep/analyze_transfer_decomp.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +"""Decompose migration KV-transfer time into RDMA-actual vs control-plane. + +Joins three logs from an instrumented unified_v3 run: + + proxy breakdown.json — per-request route + phase timestamps + dst_mig_log/dm_mig_pid*.jsonl — dst lifecycle (instrument_dst_migration.py) + gives T_kv_pull = wait_for_kvs -> recv_done + mooncake xfer/mb2_transfer_pid*.jsonl — connector internals + (instrument_mooncake.py): + send_blocks : pure RDMA (total_bytes, duration_s) [producer] + receive_kv_enter/finish: consumer-observed transfer window [consumer] + ready_wait : producer wait for src KV commit [producer] + send_kv_to_decode_enter: producer received the pull request [producer] + +Decisive question: of the 87% dst-side overhead that is T_kv_pull, how +much is the actual RDMA write (`send_blocks`) vs control-plane +(handshake / ready-wait / GIL starvation on the busy src)? + + - send_blocks bandwidth ~ wire (10 GB/s) AND << T_kv_pull + => loss is control-plane; layerwise (which only moves WHEN the + RDMA fires) will NOT fix it. + - send_blocks bandwidth << wire + => the RDMA write itself is slow (NIC / src-side servicing); + characterize with a load microbench next. + +Usage: + python analyze_transfer_decomp.py \ + --proxy-breakdown /unified_v3/breakdown.json \ + --dst-log-dir /dst_mig_log \ + --xfer-log-dir /xfer_log +""" +from __future__ import annotations + +import argparse +import json +import math +import re +import statistics +import sys +from pathlib import Path + + +def _core_req_id(rid: str) -> str: + if not rid: + return rid + s = rid + if s.startswith("cmpl-"): + s = s[len("cmpl-"):] + m = re.match(r"^(.*)-\d+-[0-9a-fA-F]+$", s) + if m: + s = m.group(1) + return s + + +def _pct(vals, q): + if not vals: + return float("nan") + vs = sorted(vals) + i = max(0, min(len(vs) - 1, int(math.ceil(q * len(vs))) - 1)) + return vs[i] + + +def _stat_line(name, vals, unit="s"): + if not vals: + print(f"{name:<34} n=0") + return + print(f"{name:<34} n={len(vals):>3} mean={statistics.mean(vals):>8.3f} " + f"p50={_pct(vals,0.5):>8.3f} p90={_pct(vals,0.9):>8.3f} " + f"max={max(vals):>8.3f} sum={sum(vals):>8.2f} {unit}") + + +def load_events(xfer_dir: Path): + files = sorted(xfer_dir.glob("mb2_transfer_pid*.jsonl")) + print(f"[xfer] log files: {len(files)} under {xfer_dir}") + send_blocks, recv_enter, recv_finish, ready_wait, send_enter = [], [], [], [], [] + for f in files: + pid = f.stem.replace("mb2_transfer_pid", "") + with f.open() as fh: + for line in fh: + try: + e = json.loads(line) + except Exception: + continue + e["_pid"] = pid + ev = e.get("event") + if ev == "send_blocks": + send_blocks.append(e) + elif ev == "receive_kv_enter": + recv_enter.append(e) + elif ev == "receive_kv_finish": + recv_finish.append(e) + elif ev == "ready_wait": + ready_wait.append(e) + elif ev == "send_kv_to_decode_enter": + send_enter.append(e) + print(f"[xfer] events: send_blocks={len(send_blocks)} " + f"recv_enter={len(recv_enter)} recv_finish={len(recv_finish)} " + f"ready_wait={len(ready_wait)} send_enter={len(send_enter)}") + return send_blocks, recv_enter, recv_finish, ready_wait, send_enter + + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--proxy-breakdown", type=Path, required=True) + p.add_argument("--dst-log-dir", type=Path, required=True) + p.add_argument("--xfer-log-dir", type=Path, required=True) + args = p.parse_args() + + for pth in (args.proxy_breakdown, args.dst_log_dir, args.xfer_log_dir): + if not pth.exists(): + sys.exit(f"missing: {pth}") + + proxy = json.load(args.proxy_breakdown.open()) + migrations = [x for x in proxy if x.get("route_class") == "PD_SEP_V2"] + mig_ids = {x.get("request_id") for x in migrations} + print(f"[proxy] migrations: {len(migrations)} / {len(proxy)} total") + + # dst lifecycle: T_kv_pull per migration (core req id) + dst_pull = {} + for f in sorted(args.dst_log_dir.glob("dm_mig_pid*.jsonl")): + for line in f.open(): + try: + r = json.loads(line) + except Exception: + continue + tw = r.get("t_wait_for_kvs_unix") + td = r.get("t_kv_recv_done_unix") + if tw and td: + dst_pull[_core_req_id(r.get("req_id"))] = td - tw + + sb, re_enter, re_finish, rw, se = load_events(args.xfer_log_dir) + + # ---- 1. Pure RDMA bandwidth from send_blocks (the decisive number) ---- + print("\n" + "=" * 90) + print("1. PURE RDMA WRITE rate (`send_blocks` = batch_transfer_sync_write)") + print("=" * 90) + bws, durs, bytes_l = [], [], [] + for e in sb: + b = e.get("total_bytes", 0) + d = e.get("duration_s", 0) + if d and d > 0 and b > 0: + bws.append(b / 1e9 / d) + durs.append(d) + bytes_l.append(b) + if bws: + tot_b = sum(bytes_l) + tot_d = sum(durs) + print(f" send_blocks calls: {len(bws)}") + print(f" total bytes moved : {tot_b/2**30:.2f} GiB") + print(f" total RDMA time : {tot_d:.2f} s") + print(f" AGGREGATE rate : {tot_b/1e9/tot_d:.2f} GB/s " + f"(MB2 idle-src steady-state = ~9.7-10 GB/s)") + _stat_line(" per-call rate (GB/s)", bws, unit="GB/s") + _stat_line(" per-call duration", durs) + # bandwidth vs size — small ops are latency-bound + print("\n rate vs transfer size:") + pairs = sorted(zip(bytes_l, bws)) + for b, w in pairs: + bar = "#" * int(min(40, w * 4)) + print(f" {b/2**20:>8.1f} MiB {w:>6.2f} GB/s {bar}") + else: + print(" no send_blocks events with positive duration") + + # ---- 2. Producer ready-wait (src KV commit) ---- + print("\n" + "=" * 90) + print("2. PRODUCER ready-wait (src KV not yet committed when pull arrived)") + print("=" * 90) + rw_vals = [e.get("ready_wait_s", 0) for e in rw if e.get("ready_wait_s") is not None] + already = sum(1 for e in rw if e.get("ready_already_set")) + _stat_line(" ready_wait", rw_vals) + print(f" ready_already_set at entry: {already}/{len(rw)} " + f"(if most are True, src commit is not the bottleneck)") + + # ---- 3. Consumer-observed receive_kv window ---- + print("\n" + "=" * 90) + print("3. CONSUMER receive_kv window (enter->FINISH, ~most of T_kv_pull)") + print("=" * 90) + rf_vals = [e.get("duration_s", 0) for e in re_finish if e.get("duration_s")] + _stat_line(" receive_kv duration", rf_vals) + + # ---- 4. Per-migration join: T_kv_pull vs receive_kv vs ready_wait ---- + print("\n" + "=" * 90) + print("4. PER-MIGRATION join (T_kv_pull from dst vs connector internals)") + print("=" * 90) + # index connector events by core req id + rf_by_req = {} + for e in re_finish: + for rid in e.get("req_ids", []): + rf_by_req[_core_req_id(rid)] = e.get("duration_s") + rw_by_req = {} + for e in rw: + rw_by_req[_core_req_id(e.get("d_req_id", ""))] = e.get("ready_wait_s") + + joined = 0 + sum_pull = sum_recv = sum_rw = 0.0 + rows = [] + for m in migrations: + core = m.get("request_id") + pull = dst_pull.get(core) + recv = rf_by_req.get(core) + rwv = rw_by_req.get(core) + if pull is None and recv is None: + continue + joined += 1 + if pull: sum_pull += pull + if recv: sum_recv += recv + if rwv: sum_rw += rwv + rows.append((core, m.get("input_length"), m.get("v3_target_cache_hit"), + pull, recv, rwv)) + print(f" joined migrations: {joined}") + print(f" Σ T_kv_pull (dst) = {sum_pull:8.2f} s") + print(f" Σ receive_kv (consumer) = {sum_recv:8.2f} s") + print(f" Σ ready_wait (producer) = {sum_rw:8.2f} s") + # The RDMA share: best-effort total send_blocks time + sum_rdma = sum(durs) if durs else 0.0 + print(f" Σ send_blocks RDMA = {sum_rdma:8.2f} s (all transfers, " + f"not just migrations)") + if sum_pull > 0: + print(f"\n RDMA-actual / T_kv_pull ≈ {sum_rdma/sum_pull*100:5.1f} %") + print(f" ready-wait / T_kv_pull ≈ {sum_rw/sum_pull*100:5.1f} %") + resid = sum_pull - sum_rdma - sum_rw + print(f" control-plane residual ≈ {resid/sum_pull*100:5.1f} % " + f"(handshake / ZMQ / GIL starvation)") + + print("\n per-migration detail:") + print(f" {'req_id':<22} {'in_len':>7} {'dst_hit':>8} {'kv_pull':>8} " + f"{'recv_kv':>8} {'rdy_wait':>8}") + for core, il, hit, pull, recv, rwv in sorted( + rows, key=lambda r: -(r[3] or 0)): + def s(v): return f"{v:.2f}" if v is not None else " --" + print(f" {core:<22} {str(il):>7} {str(hit):>8} {s(pull):>8} " + f"{s(recv):>8} {s(rwv):>8}") + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/cache_sweep/run_v3_dst_breakdown.sh b/microbench/connector_tax/cache_sweep/run_v3_dst_breakdown.sh new file mode 100755 index 0000000..405502f --- /dev/null +++ b/microbench/connector_tax/cache_sweep/run_v3_dst_breakdown.sh @@ -0,0 +1,90 @@ +#!/usr/bin/env bash +# v3 trace replay with dst-side migration breakdown instrumentation. +# +# Same trace + DR_FIX as `run_v3_replay.sh`, plus: +# - instrument_dst_migration.py applied to vLLM scheduler +# - DM_LOG_DIR exported to all 8 vLLM instances so per-PID +# dst-migration logs land in /dst_mig_log/ +# - analyze_dst_migration.py runs on completion to print the +# T_kv_pull vs queue-residual decomposition +# +# Usage: bash run_v3_dst_breakdown.sh + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}" +DATE="$(date +%Y%m%d_%H%M)" +OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_dstbreak_${DATE}}" +PYTHON="$PROJ_DIR/.venv/bin/python" +VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}" +DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py" +DM_INSTRUMENT="$PROJ_DIR/microbench/fresh_setup/instrument_dst_migration.py" +ANALYZE="$PROJ_DIR/microbench/connector_tax/cache_sweep/analyze_dst_migration.py" + +mkdir -p "$OUTROOT" +DST_LOG_DIR="$OUTROOT/dst_mig_log" +mkdir -p "$DST_LOG_DIR" + +echo "=== unified_v3 + dst-side migration breakdown ===" +echo "Trace : $TRACE" +echo "Out : $OUTROOT" +echo "DST logs : $DST_LOG_DIR" +echo "" + +cleanup_all() { + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 5 + "$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true + "$PYTHON" "$DM_INSTRUMENT" --revert --venv "$PROJ_DIR/.venv" 2>/dev/null || true +} +trap cleanup_all EXIT +cleanup_all + +echo "[stage 0a] applying CT_DR_FIX (env-gated)" +"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT" + +echo "[stage 0b] applying DST migration instrumentation" +"$PYTHON" "$DM_INSTRUMENT" --apply --venv "$PROJ_DIR/.venv" +"$PYTHON" "$DM_INSTRUMENT" --check --venv "$PROJ_DIR/.venv" + +cfg_dir="$OUTROOT/unified_v3" +mkdir -p "$cfg_dir" + +# Activate DR-fix env gate (consistent with run_v3_replay.sh) +export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 +# Export DM_LOG_DIR — every vLLM EngineCore inherits this env and writes +# its own dm_mig_pid.jsonl into it. +export DM_LOG_DIR="$DST_LOG_DIR" + +echo "" +echo "====== unified_v3 ; DR_SYNC_DISABLED=1 ; DM_LOG_DIR=$DST_LOG_DIR ======" +bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \ + 2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30 + +pkill -9 -f cache_aware_proxy 2>/dev/null || true +pkill -9 -f "vllm serve" 2>/dev/null || true +pkill -9 -f "EngineCore" 2>/dev/null || true +sleep 5 + +echo "" +echo "[stage Z] reverting DR_FIX + DM instrument" +"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" +"$PYTHON" "$DM_INSTRUMENT" --revert --venv "$PROJ_DIR/.venv" + +echo "" +echo "[stage analyze] dst-side migration breakdown" +"$PYTHON" "$ANALYZE" \ + --proxy-breakdown "$cfg_dir/breakdown.json" \ + --dst-log-dir "$DST_LOG_DIR" \ + --output "$cfg_dir/dst_migration_breakdown.csv" \ + --plot "$cfg_dir/dst_migration_breakdown.png" \ + 2>&1 | tee "$cfg_dir/dst_migration_breakdown.txt" + +echo "" +echo "Done." +echo " proxy breakdown : $cfg_dir/breakdown.json" +echo " dst per-PID log : $DST_LOG_DIR/" +echo " decomposition : $cfg_dir/dst_migration_breakdown.{csv,png,txt}" diff --git a/microbench/connector_tax/cache_sweep/run_v3_full_breakdown.sh b/microbench/connector_tax/cache_sweep/run_v3_full_breakdown.sh new file mode 100755 index 0000000..855a0fc --- /dev/null +++ b/microbench/connector_tax/cache_sweep/run_v3_full_breakdown.sh @@ -0,0 +1,102 @@ +#!/usr/bin/env bash +# v3 trace replay with FULL migration instrumentation: +# - instrument_dst_migration.py : dst lifecycle -> T_kv_pull +# - instrument_mooncake.py : connector internals (send_blocks RDMA, +# receive_kv window, ready_wait) +# Goal: decompose the 87% T_kv_pull into RDMA-actual vs control-plane to +# explain why effective bandwidth is far below the ~10 GB/s wire rate. +# +# Usage: bash run_v3_full_breakdown.sh + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}" +DATE="$(date +%Y%m%d_%H%M)" +OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_fullbreak_${DATE}}" +PYTHON="$PROJ_DIR/.venv/bin/python" +VENV="$PROJ_DIR/.venv" +VLLM_ROOT="${VLLM_ROOT:-$VENV/lib/python3.12/site-packages/vllm}" +DR_FIX="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py" +DM_INSTR="$PROJ_DIR/microbench/fresh_setup/instrument_dst_migration.py" +MC_INSTR="$PROJ_DIR/microbench/fresh_setup/instrument_mooncake.py" +ANALYZE_DST="$PROJ_DIR/microbench/connector_tax/cache_sweep/analyze_dst_migration.py" +ANALYZE_XFER="$PROJ_DIR/microbench/connector_tax/cache_sweep/analyze_transfer_decomp.py" + +mkdir -p "$OUTROOT" +DST_LOG_DIR="$OUTROOT/dst_mig_log" +XFER_LOG_DIR="$OUTROOT/xfer_log" +mkdir -p "$DST_LOG_DIR" "$XFER_LOG_DIR" + +echo "=== unified_v3 + FULL migration breakdown ===" +echo "Out : $OUTROOT" +echo "DST logs : $DST_LOG_DIR" +echo "XFER logs: $XFER_LOG_DIR" +echo "" + +cleanup_all() { + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 5 + "$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true + "$PYTHON" "$DM_INSTR" --revert --venv "$VENV" 2>/dev/null || true + "$PYTHON" "$MC_INSTR" --revert --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py" 2>/dev/null || true +} +trap cleanup_all EXIT +cleanup_all + +echo "[0a] DR_FIX" +"$PYTHON" "$DR_FIX" --apply --vllm-root "$VLLM_ROOT" +echo "[0b] DST migration instrument" +"$PYTHON" "$DM_INSTR" --apply --venv "$VENV" +echo "[0c] Mooncake transfer instrument" +"$PYTHON" "$MC_INSTR" --apply --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py" +"$PYTHON" "$DM_INSTR" --check --venv "$VENV" +"$PYTHON" "$MC_INSTR" --check --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py" + +cfg_dir="$OUTROOT/unified_v3" +mkdir -p "$cfg_dir" + +export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 +export DM_LOG_DIR="$DST_LOG_DIR" +export MB2_LOG_DIR="$XFER_LOG_DIR" + +echo "" +echo "====== unified_v3 ; DM_LOG_DIR + MB2_LOG_DIR set ======" +bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \ + 2>&1 | tee "$cfg_dir/orchestrator.log" | tail -25 + +pkill -9 -f cache_aware_proxy 2>/dev/null || true +pkill -9 -f "vllm serve" 2>/dev/null || true +pkill -9 -f "EngineCore" 2>/dev/null || true +sleep 5 + +echo "" +echo "[Z] revert all instruments" +"$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT" +"$PYTHON" "$DM_INSTR" --revert --venv "$VENV" +"$PYTHON" "$MC_INSTR" --revert --venv "$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py" + +echo "" +echo "[analyze 1] dst-side T_kv_pull breakdown" +"$PYTHON" "$ANALYZE_DST" \ + --proxy-breakdown "$cfg_dir/breakdown.json" \ + --dst-log-dir "$DST_LOG_DIR" \ + --output "$cfg_dir/dst_migration_breakdown.csv" \ + --plot "$cfg_dir/dst_migration_breakdown.png" \ + 2>&1 | tee "$cfg_dir/dst_migration_breakdown.txt" || echo "(dst analyze failed)" + +echo "" +echo "[analyze 2] transfer decomposition: RDMA-actual vs control-plane" +"$PYTHON" "$ANALYZE_XFER" \ + --proxy-breakdown "$cfg_dir/breakdown.json" \ + --dst-log-dir "$DST_LOG_DIR" \ + --xfer-log-dir "$XFER_LOG_DIR" \ + 2>&1 | tee "$cfg_dir/transfer_decomp.txt" || echo "(xfer analyze failed)" + +echo "" +echo "Done. Artifacts in $cfg_dir/" +echo " dst_migration_breakdown.{csv,png,txt}" +echo " transfer_decomp.txt" +echo " raw: $DST_LOG_DIR/ $XFER_LOG_DIR/" diff --git a/microbench/fresh_setup/analyze_goodput.py b/microbench/fresh_setup/analyze_goodput.py new file mode 100644 index 0000000..f037ce9 --- /dev/null +++ b/microbench/fresh_setup/analyze_goodput.py @@ -0,0 +1,198 @@ +"""SLO-goodput analyzer + PD_advantage for the PD-disagg crossover study. + +Reads per-arm replayer output (replay_metrics.jsonl) and computes, per arm: + - completion rate (error-free fraction) + - raw TTFT / TPOT / E2E percentiles (over successes — reported for context + only; NEVER the verdict metric, since failing arms have a small success set) + - SLO-goodput: fraction of OFFERED requests that are error-free AND meet a + (TTFT, TPOT) SLO. This is the verdict metric. + +The two arms must replay the IDENTICAL trace (same seed), so they are paired +request-for-request. PD_advantage = goodput(arm) / goodput(baseline); y=1 is +the crossover line — PD_advantage >= 1 means PD-disagg wins. + +Goodput is computed over a grid of SLO thresholds so the conclusion does not +hinge on one arbitrary cutoff. + +Usage: + python analyze_goodput.py \ + --arm 8C-proxy .../8C-proxy/replay_metrics.jsonl \ + --arm 4P+4D .../4P+4D/replay_metrics.jsonl \ + --baseline 8C-proxy \ + --ttft-slo 0.5 1 2 5 --tpot-slo 0.05 0.1 0.2 +""" + +from __future__ import annotations + +import argparse +import json +import statistics +from pathlib import Path + + +def load_metrics(path: Path) -> list[dict]: + rows = [] + with path.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def percentile(sorted_vals: list[float], pct: float) -> float: + n = len(sorted_vals) + if n == 0: + return float("nan") + if n == 1: + return sorted_vals[0] + rank = pct * (n - 1) + lo = int(rank) + hi = min(lo + 1, n - 1) + frac = rank - lo + return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac + + +def pstats(vals: list[float]) -> dict: + clean = sorted(v for v in vals if v is not None) + if not clean: + return {"count": 0} + return { + "count": len(clean), + "mean": statistics.fmean(clean), + "p50": percentile(clean, 0.50), + "p90": percentile(clean, 0.90), + "p99": percentile(clean, 0.99), + } + + +def offered_window_s(rows: list[dict]) -> float: + ts = [r.get("trace_timestamp_s") for r in rows if r.get("trace_timestamp_s") is not None] + if len(ts) < 2: + return 0.0 + return max(ts) - min(ts) + + +def meets_slo(r: dict, ttft_slo: float, tpot_slo: float) -> bool: + if r.get("error") is not None: + return False + ttft = r.get("ttft_s") + tpot = r.get("tpot_s") + if ttft is None: + return False + if ttft > ttft_slo: + return False + # tpot=0 happens only for single-token outputs; treat as meeting any SLO. + if tpot is not None and tpot > tpot_slo: + return False + return True + + +def load_summary(jsonl_path: Path) -> dict: + """Read the sibling replay_metrics.summary.json (wall-clock, amplification).""" + sp = jsonl_path.with_suffix(".summary.json") + if sp.exists(): + try: + return json.loads(sp.read_text()) + except Exception: + return {} + return {} + + +def summarize_arm(name: str, jsonl_path: Path, rows: list[dict]) -> dict: + n = len(rows) + ok = [r for r in rows if r.get("error") is None] + window = offered_window_s(rows) + summ = load_summary(jsonl_path) + return { + "name": name, + "n_offered": n, + "n_success": len(ok), + "completion_rate": len(ok) / n if n else 0.0, + "offered_window_s": window, + "offered_qps": n / window if window > 0 else 0.0, + # Throughput: how much longer than the offered window it took to drain. + # ~1.0 = keeps up; >1 = falling behind (the cleanest PD-collapse signal). + "wall_clock_s": summ.get("wall_clock_s"), + "amplification": summ.get("amplification"), + "ttft": pstats([r.get("ttft_s") for r in ok]), + "tpot": pstats([r.get("tpot_s") for r in ok]), + "e2e": pstats([r.get("latency_s") for r in ok]), + "_rows": rows, + } + + +def main() -> None: + p = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--arm", nargs=2, action="append", metavar=("NAME", "PATH"), + required=True, help="arm name + replay_metrics.jsonl path (repeatable)") + p.add_argument("--baseline", required=True, help="arm name to use as PD_advantage denominator") + p.add_argument("--ttft-slo", nargs="+", type=float, default=[0.5, 1.0, 2.0, 5.0]) + p.add_argument("--tpot-slo", nargs="+", type=float, default=[0.05, 0.1, 0.2]) + p.add_argument("--out-json", type=Path, default=None) + args = p.parse_args() + + arms = {} + for name, path in args.arm: + arms[name] = summarize_arm(name, Path(path), load_metrics(Path(path))) + + if args.baseline not in arms: + raise SystemExit(f"baseline {args.baseline!r} not among arms {list(arms)}") + + # ---- per-arm overview ------------------------------------------------ + print("=" * 78) + print("PER-ARM OVERVIEW (latency stats over successes only — context, not verdict)") + print("=" * 78) + hdr = f"{'arm':<12}{'offered':>8}{'compl%':>8}{'ampl':>6}{'oQPS':>7}" \ + f"{'TTFTp50':>9}{'TTFTp90':>9}{'TPOTp50':>9}{'TPOTp99':>9}{'E2Ep90':>9}" + print(hdr) + for name, a in arms.items(): + t, tp, e = a["ttft"], a["tpot"], a["e2e"] + ampl = a.get("amplification") + ampl_s = f"{ampl:>6.2f}" if isinstance(ampl, (int, float)) else f"{'--':>6}" + print(f"{name:<12}{a['n_offered']:>8}{100*a['completion_rate']:>7.1f}%" + f"{ampl_s}{a['offered_qps']:>7.2f}" + f"{t.get('p50', float('nan')):>9.2f}{t.get('p90', float('nan')):>9.2f}" + f"{1000*tp.get('p50', float('nan')):>8.0f}m{1000*tp.get('p99', float('nan')):>8.0f}m" + f"{e.get('p90', float('nan')):>9.2f}") + + # ---- SLO-goodput grid + PD_advantage -------------------------------- + base = arms[args.baseline] + grid = [] + print() + print("=" * 78) + print(f"SLO-GOODPUT (attainment = error-free AND TTFT<=slo AND TPOT<=slo)") + print(f"PD_advantage = attainment(arm) / attainment(baseline={args.baseline}); " + f">=1 means arm wins") + print("=" * 78) + for ttft_slo in args.ttft_slo: + for tpot_slo in args.tpot_slo: + row = {"ttft_slo_s": ttft_slo, "tpot_slo_s": tpot_slo, "arms": {}} + base_n = sum(1 for r in base["_rows"] if meets_slo(r, ttft_slo, tpot_slo)) + base_att = base_n / base["n_offered"] if base["n_offered"] else 0.0 + line = f"TTFT<={ttft_slo:>4}s TPOT<={int(1000*tpot_slo):>4}ms | " + cells = [] + for name, a in arms.items(): + n_slo = sum(1 for r in a["_rows"] if meets_slo(r, ttft_slo, tpot_slo)) + att = n_slo / a["n_offered"] if a["n_offered"] else 0.0 + adv = (att / base_att) if base_att > 0 else float("nan") + row["arms"][name] = {"attainment": att, "pd_advantage": adv, "n_slo": n_slo} + tag = "" if name == args.baseline else f" adv={adv:.2f}" + cells.append(f"{name}={100*att:>5.1f}%{tag}") + print(line + " ".join(cells)) + grid.append(row) + + if args.out_json: + out = { + "baseline": args.baseline, + "arms": {n: {k: v for k, v in a.items() if k != "_rows"} + for n, a in arms.items()}, + "slo_grid": grid, + } + args.out_json.write_text(json.dumps(out, indent=2)) + print(f"\nwrote {args.out_json}") + + +if __name__ == "__main__": + main() diff --git a/microbench/fresh_setup/instrument_dst_migration.py b/microbench/fresh_setup/instrument_dst_migration.py new file mode 100755 index 0000000..78c4fdf --- /dev/null +++ b/microbench/fresh_setup/instrument_dst_migration.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python3 +"""Instrument vLLM V1 scheduler to dump per-request DST-side migration timeline. + +For each request that arrives at the engine with `kv_transfer_params` +containing `do_remote_prefill=True` (i.e., the decode-target of an +EAR v3 migration), record: + + t_arrival_unix — Scheduler.add_request() entry + t_wait_for_kvs_unix — status set to WAITING_FOR_REMOTE_KVS (KV pull start) + t_kv_recv_done_unix — req_id added to finished_recving_kv_req_ids + t_first_scheduled_unix — first time req appears in self.running after KV done + t_first_token_unix — first new_token_ids appended in update_from_output + arrival_state — {n_running, n_waiting, pending_prefill_tok, + n_waiting_for_kvs} + +We complement the proxy `breakdown.json` (t_decode_sent_unix / +t_first_token_unix) to attribute the migration's dst-side wait into: + HTTP relay + admission_pre_kv + KV pull + admission_post_kv + first_iter + +One JSONL per EngineCore PID at $DM_LOG_DIR/dm_mig_pid.jsonl +(default DM_LOG_DIR=/tmp). Records are flushed when t_first_token is +reached or when the request is aborted/finished. + +Co-exists with MB5 KV snapshot patches (different START/END markers). + +Usage: + python instrument_dst_migration.py --apply [--venv PATH] + python instrument_dst_migration.py --revert [--venv PATH] + python instrument_dst_migration.py --check [--venv PATH] +""" +from __future__ import annotations + +import argparse +import re +from pathlib import Path + +DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv/.venv") +TARGET_REL = "lib/python3.12/site-packages/vllm/v1/core/sched/scheduler.py" + +START_MARK = "# DM_INSTRUMENT_START" +END_MARK = "# DM_INSTRUMENT_END" + +# ---------- Patch 1: module-level header (helpers + globals) ----------------- +# Anchor: the very first `class Scheduler(SchedulerInterface):` line. We insert +# the entire helper block immediately before that, so MB5's prior block (if +# present) is preserved and our block lives in module scope. The anchor must +# stay outside our own START/END markers so revert() can re-find it. +HEADER_ANCHOR = "class Scheduler(SchedulerInterface):" + +HEADER_INSERT = f"""{START_MARK} +import json as _dm_json +import os as _dm_os +import threading as _dm_threading +import time as _dm_time +_DM_LOG_DIR = _dm_os.environ.get("DM_LOG_DIR", "/tmp") +try: + _dm_os.makedirs(_DM_LOG_DIR, exist_ok=True) +except Exception: + pass +_DM_LOG_PATH = _dm_os.path.join( + _DM_LOG_DIR, f"dm_mig_pid{{_dm_os.getpid()}}.jsonl" +) +_DM_LOG_FILE = None +_DM_LOG_LOCK = _dm_threading.Lock() +# req_id -> in-flight record. We pop and flush when t_first_token lands or on +# finish/abort. +_DM_DATA: dict = {{}} + + +def _dm_write_event(d: dict) -> None: + global _DM_LOG_FILE + if _DM_LOG_FILE is None: + _DM_LOG_FILE = open(_DM_LOG_PATH, "a", buffering=1) + with _DM_LOG_LOCK: + _DM_LOG_FILE.write(_dm_json.dumps(d) + "\\n") + + +def _dm_is_migrated(request) -> bool: + ktp = getattr(request, "kv_transfer_params", None) + if not isinstance(ktp, dict): + return False + return bool(ktp.get("do_remote_prefill")) + + +def _dm_snapshot_arrival(scheduler) -> dict: + try: + n_running = len(scheduler.running) + except Exception: + n_running = -1 + try: + n_waiting_main = len(scheduler.waiting) + except Exception: + n_waiting_main = -1 + try: + n_skipped = len(scheduler.skipped_waiting) + except Exception: + n_skipped = 0 + pending_tok = 0 + n_kv = 0 + try: + from vllm.v1.request import RequestStatus as _RS + for r in list(scheduler.waiting): + try: + if getattr(r, "status", None) == _RS.WAITING_FOR_REMOTE_KVS: + n_kv += 1 + npr = int(getattr(r, "num_prompt_tokens", 0)) + nct = int(getattr(r, "num_computed_tokens", 0)) + pending_tok += max(0, npr - nct) + except Exception: + pass + for r in list(scheduler.skipped_waiting): + try: + if getattr(r, "status", None) == _RS.WAITING_FOR_REMOTE_KVS: + n_kv += 1 + except Exception: + pass + except Exception: + pass + return {{ + "n_running": int(n_running), + "n_waiting": int(n_waiting_main + n_skipped), + "pending_prefill_tok": int(pending_tok), + "n_waiting_for_kvs": int(n_kv), + }} + + +def _dm_emit_and_drop(req_id: str, reason: str = "first_token") -> None: + rec = _DM_DATA.pop(req_id, None) + if rec is None: + return + rec["flush_reason"] = reason + rec["t_flush_unix"] = _dm_time.time() + _dm_write_event(rec) + + +{END_MARK} + + +""" + +# ---------- Patch 2: add_request() hook -------------------------------------- +# Right after self.requests[request.request_id] = request (line ~1927) and the +# if self.log_stats: block. Anchor includes the QUEUED record_event line so it +# is uniquely matchable. +ADD_REQUEST_ANCHOR = """ self._enqueue_waiting_request(request) + self.requests[request.request_id] = request + if self.log_stats: + request.record_event(EngineCoreEventType.QUEUED) +""" + +ADD_REQUEST_REPLACE = f""" self._enqueue_waiting_request(request) + self.requests[request.request_id] = request + if self.log_stats: + request.record_event(EngineCoreEventType.QUEUED) + {START_MARK} + try: + if _dm_is_migrated(request): + _DM_DATA[request.request_id] = {{ + "req_id": str(request.request_id), + "is_migrated": True, + "n_prompt_tokens": int(getattr(request, "num_prompt_tokens", 0)), + "t_arrival_unix": _dm_time.time(), + "t_wait_for_kvs_unix": None, + "t_kv_recv_done_unix": None, + "t_first_scheduled_unix": None, + "t_first_token_unix": None, + "arrival_state": _dm_snapshot_arrival(self), + }} + except Exception: + pass + {END_MARK} +""" + +# ---------- Patch 3: WAITING_FOR_REMOTE_KVS transition ----------------------- +WAIT_KV_ANCHOR = """ request.status = RequestStatus.WAITING_FOR_REMOTE_KVS + step_skipped_waiting.prepend_request(request) +""" + +WAIT_KV_REPLACE = f""" request.status = RequestStatus.WAITING_FOR_REMOTE_KVS + {START_MARK} + try: + _rec = _DM_DATA.get(request.request_id) + if _rec is not None and _rec["t_wait_for_kvs_unix"] is None: + _rec["t_wait_for_kvs_unix"] = _dm_time.time() + except Exception: + pass + {END_MARK} + step_skipped_waiting.prepend_request(request) +""" + +# ---------- Patch 4: finished_recving signal --------------------------------- +FINISHED_RECV_ANCHOR = """ if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS: + self.finished_recving_kv_req_ids.add(req_id) + elif RequestStatus.is_finished(req.status): +""" + +FINISHED_RECV_REPLACE = f""" if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS: + self.finished_recving_kv_req_ids.add(req_id) + {START_MARK} + try: + _rec = _DM_DATA.get(req_id) + if _rec is not None and _rec["t_kv_recv_done_unix"] is None: + _rec["t_kv_recv_done_unix"] = _dm_time.time() + except Exception: + pass + {END_MARK} + elif RequestStatus.is_finished(req.status): +""" + +# ---------- Patch 5: first scheduled (sweep at end of schedule()) ------------ +# Co-exists with the MB5 snapshot inserted at the same location. +SCHED_END_ANCHOR = """ # MB5_INSTRUMENT_START + _mb5_snapshot(self) + # MB5_INSTRUMENT_END + return scheduler_output +""" + +SCHED_END_REPLACE = f""" # MB5_INSTRUMENT_START + _mb5_snapshot(self) + # MB5_INSTRUMENT_END + {START_MARK} + try: + if _DM_DATA: + _now_dm = _dm_time.time() + for _r in self.running: + _rec = _DM_DATA.get(_r.request_id) + if _rec is not None and _rec["t_first_scheduled_unix"] is None: + _rec["t_first_scheduled_unix"] = _now_dm + except Exception: + pass + {END_MARK} + return scheduler_output +""" + +# ---------- Patch 6: first new token in update_from_output ------------------- +FIRST_TOK_ANCHOR = """ # Check for stop and update request status. + if new_token_ids: + new_token_ids, stopped = self._update_request_with_output( + request, new_token_ids + ) +""" + +FIRST_TOK_REPLACE = f""" # Check for stop and update request status. + if new_token_ids: + {START_MARK} + try: + _rec = _DM_DATA.get(request.request_id) + if _rec is not None and _rec["t_first_token_unix"] is None: + _rec["t_first_token_unix"] = _dm_time.time() + _dm_emit_and_drop(request.request_id, reason="first_token") + except Exception: + pass + {END_MARK} + new_token_ids, stopped = self._update_request_with_output( + request, new_token_ids + ) +""" + +# ---------- Patch 7: abort/finish — flush partial record --------------------- +FINISH_ANCHOR = """ request.status = finished_status + self._free_request(request, delay_free_blocks=delay_free_blocks) +""" + +FINISH_REPLACE = f""" request.status = finished_status + {START_MARK} + try: + if request.request_id in _DM_DATA: + _dm_emit_and_drop(request.request_id, reason="finish_or_abort") + except Exception: + pass + {END_MARK} + self._free_request(request, delay_free_blocks=delay_free_blocks) +""" + +PATCHES = [ + ("header", HEADER_ANCHOR, HEADER_INSERT + HEADER_ANCHOR), + ("add_request", ADD_REQUEST_ANCHOR, ADD_REQUEST_REPLACE), + ("wait_for_kvs", WAIT_KV_ANCHOR, WAIT_KV_REPLACE), + ("finished_recving", FINISHED_RECV_ANCHOR, FINISHED_RECV_REPLACE), + ("first_scheduled", SCHED_END_ANCHOR, SCHED_END_REPLACE), + ("first_token", FIRST_TOK_ANCHOR, FIRST_TOK_REPLACE), + ("finish_flush", FINISH_ANCHOR, FINISH_REPLACE), +] + + +def find_target(venv_or_path: Path) -> Path: + candidates = [venv_or_path / TARGET_REL, DEFAULT_VENV / TARGET_REL] + for c in candidates: + if c.is_file(): + return c + raise FileNotFoundError(f"cannot find {TARGET_REL} under {venv_or_path}") + + +def is_patched(text: str) -> bool: + return START_MARK in text + + +def apply(target: Path) -> None: + text = target.read_text() + if is_patched(text): + print(f"[dm-instr] already patched: {target}") + return + new = text + for name, src, dst in PATCHES: + if src not in new: + raise RuntimeError( + f"patch {name!r}: anchor not found in {target}. " + f"Anchor head: {src.splitlines()[0]!r}" + ) + new = new.replace(src, dst, 1) + target.write_text(new) + print(f"[dm-instr] applied {len(PATCHES)} patches -> {target}") + + +def revert(target: Path) -> None: + text = target.read_text() + if not is_patched(text): + print(f"[dm-instr] not patched (nothing to revert): {target}") + return + # Strip our DM_* block, including the trailing newline that + # terminated the END_MARK line. We do NOT collapse other blank-line + # runs (MB5_* whitespace and original spacing between methods are + # preserved). + pat = re.compile( + r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n", + flags=re.DOTALL, + ) + new = pat.sub("", text) + # The header insert added a leading "# DM_INSTRUMENT_START\n" with + # two trailing blank lines and the anchor; revert removed the block + # plus its trailing newline, leaving one extra blank line before the + # class — harmless. We additionally collapse the very narrow case of + # "\n\n\nclass Scheduler" -> "\n\nclass Scheduler" so revert is + # byte-identical for that anchor. + new = re.sub(r"\n{3,}class Scheduler\(", "\n\nclass Scheduler(", new) + target.write_text(new) + print(f"[dm-instr] reverted: {target}") + + +def main() -> None: + p = argparse.ArgumentParser() + p.add_argument("--apply", action="store_true") + p.add_argument("--revert", action="store_true") + p.add_argument("--check", action="store_true") + p.add_argument("--venv", type=Path, default=DEFAULT_VENV) + args = p.parse_args() + target = find_target(args.venv) + if args.apply: + apply(target) + elif args.revert: + revert(target) + elif args.check: + state = "PATCHED" if is_patched(target.read_text()) else "CLEAN" + print(f"[dm-instr] {state}: {target}") + else: + p.error("specify --apply / --revert / --check") + + +if __name__ == "__main__": + main() diff --git a/microbench/fresh_setup/instrument_mooncake.py b/microbench/fresh_setup/instrument_mooncake.py index f3622ad..730bf8b 100644 --- a/microbench/fresh_setup/instrument_mooncake.py +++ b/microbench/fresh_setup/instrument_mooncake.py @@ -151,11 +151,65 @@ RECV_FINISH_REPLACE = f""" if response.status == MooncakeXfer {END_MARK} break""" +# ---- Patch 5: send_kv_to_decode entry (P-side, producer receives pull req) ---- + +SEND_ENTRY_TARGET = """ async def send_kv_to_decode( + self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata + ): + pending_reqs: dict[ReqId, SendBlockMeta] = {}""" + +SEND_ENTRY_REPLACE = f""" async def send_kv_to_decode( + self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata + ): + pending_reqs: dict[ReqId, SendBlockMeta] = {{}} + {START_MARK} + try: + _mb2_log_event({{"event": "send_kv_to_decode_enter", + "d_req_ids": [str(r) for r in meta.req_blocks], + "t_start_unix": _mb2_time.time(), + "tp_rank": getattr(self, "tp_rank", -1)}}) + except Exception: + pass + {END_MARK}""" + +# ---- Patch 6: wait_and_ret ready-wait timing (P-side, src KV commit wait) ---- + +READY_WAIT_TARGET = """ async def wait_and_ret( + d_req_id: ReqId, send_meta: SendBlockMeta + ) -> tuple[ReqId, SendBlockMeta]: + await send_meta.ready.wait() + return d_req_id, send_meta""" + +READY_WAIT_REPLACE = f""" async def wait_and_ret( + d_req_id: ReqId, send_meta: SendBlockMeta + ) -> tuple[ReqId, SendBlockMeta]: + {START_MARK} + _mb2_rw_start = _mb2_time.perf_counter() + _mb2_rw_start_unix = _mb2_time.time() + _mb2_rw_already = send_meta.ready.is_set() + {END_MARK} + await send_meta.ready.wait() + {START_MARK} + try: + _mb2_log_event({{"event": "ready_wait", + "d_req_id": str(d_req_id), + "transfer_id": str(getattr(send_meta, "transfer_id", "")), + "ready_already_set": bool(_mb2_rw_already), + "ready_wait_s": _mb2_time.perf_counter() - _mb2_rw_start, + "t_start_unix": _mb2_rw_start_unix, + "tp_rank": getattr(self, "tp_rank", -1)}}) + except Exception: + pass + {END_MARK} + return d_req_id, send_meta""" + PATCHES = [ ("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT), ("_send_blocks", SEND_TARGET, SEND_REPLACE), ("receive_kv (entry)", RECV_ENTRY_TARGET, RECV_ENTRY_REPLACE), ("receive_kv (FINISH)", RECV_FINISH_TARGET, RECV_FINISH_REPLACE), + ("send_kv (entry)", SEND_ENTRY_TARGET, SEND_ENTRY_REPLACE), + ("ready_wait", READY_WAIT_TARGET, READY_WAIT_REPLACE), ] diff --git a/microbench/fresh_setup/mb6_transfer_under_load.py b/microbench/fresh_setup/mb6_transfer_under_load.py new file mode 100755 index 0000000..1778ea5 --- /dev/null +++ b/microbench/fresh_setup/mb6_transfer_under_load.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 +"""MB6: KV-transfer bandwidth vs instance busy-ness. + +Confirms the causal hypothesis from the v3 breakdown: the migration +transfer runs far below wire speed because it happens between instances +that are concurrently busy with compute (GIL-starved control plane + +HBM/NIC contention), NOT because of a wire/NIC limit. + +Method (reuses the MB2 transfer primitive): + prefill on A (do_remote_decode, max_tokens=1) -> migrate to B + (do_remote_prefill). Time step 2 = the KV transfer. + +For each background-load level B in --bg-loads, we hold B concurrent +long-decode streams on BOTH instances to keep them busy, then run +--repeats measured transfers per size. With the MB2 mooncake instrument +applied (MB2_LOG_DIR set), the analyzer can split the e2e transfer into +RDMA-actual (`send_blocks`) vs control-plane. + +Expected: bg=0 reproduces MB2 (~10 GB/s); higher bg degrades toward the +~2-3 GB/s seen in the v3 trace. + +Usage: + python mb6_transfer_under_load.py \ + --src-port 8000 --dst-port 8001 --src-bp 8998 --dst-bp 8999 \ + --sizes 16384,65536 --bg-loads 0,8,24 --repeats 4 \ + --out mb6_result.json +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import statistics +import time +import uuid +from pathlib import Path + +import httpx + +MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct" +KV_PER_TOK = 98304 # Qwen3-30B-A3B est bytes/token + + +def synth_prompt(seed: int, n: int) -> list[int]: + import random + rng = random.Random(seed) + return [rng.randint(100, 150000) for _ in range(n)] + + +async def get_engine_id(client, host, bp): + r = await client.get(f"http://{host}:{bp}/query") + r.raise_for_status() + return r.json()["0"]["engine_id"] + + +async def completion(client, host, port, prompt, max_tokens, ktp=None, stream=False): + payload = { + "model": MODEL_PATH, "prompt": prompt, "max_tokens": max_tokens, + "min_tokens": max_tokens if max_tokens == 1 else 1, + "temperature": 0.0, "stream": stream, + } + if ktp: + payload["kv_transfer_params"] = ktp + t0 = time.perf_counter() + if stream: + # consume the stream to keep the instance decoding + async with client.stream("POST", f"http://{host}:{port}/v1/completions", + json=payload, timeout=600.0) as r: + r.raise_for_status() + async for _ in r.aiter_bytes(): + pass + return time.perf_counter() - t0, {} + r = await client.post(f"http://{host}:{port}/v1/completions", + json=payload, timeout=600.0) + elapsed = time.perf_counter() - t0 + r.raise_for_status() + return elapsed, r.json() + + +async def num_running(client, host, port) -> int: + """Read vLLM running-request gauge from /metrics.""" + try: + r = await client.get(f"http://{host}:{port}/metrics", timeout=5.0) + for line in r.text.splitlines(): + if line.startswith("vllm:num_requests_running"): + return int(float(line.split()[-1])) + except Exception: + pass + return -1 + + +class BackgroundLoad: + """Maintain N concurrent long-decode streams on a set of (host,port).""" + def __init__(self, client, endpoints, concurrency, prompt_tokens=2000, + out_tokens=6000): + self.client = client + self.endpoints = endpoints + self.concurrency = concurrency + self.prompt_tokens = prompt_tokens + self.out_tokens = out_tokens + self._stop = asyncio.Event() + self._tasks: list[asyncio.Task] = [] + + async def _worker(self, idx): + host, port = self.endpoints[idx % len(self.endpoints)] + seed = 900000 + idx + while not self._stop.is_set(): + prompt = synth_prompt(seed, self.prompt_tokens) + seed += 1 + try: + await completion(self.client, host, port, prompt, + max_tokens=self.out_tokens, stream=True) + except Exception: + await asyncio.sleep(0.5) + + def start(self): + self._tasks = [asyncio.create_task(self._worker(i)) + for i in range(self.concurrency)] + + async def stop(self): + self._stop.set() + for t in self._tasks: + t.cancel() + await asyncio.gather(*self._tasks, return_exceptions=True) + self._tasks = [] + + +async def measure_transfer(client, src_host, src_port, dst_host, dst_port, + src_eid, src_bootstrap_addr, input_tokens, seed): + prompt = synth_prompt(seed, input_tokens) + transfer_id = uuid.uuid4().hex + # step 1: prefill on A + await completion(client, src_host, src_port, prompt, max_tokens=1, + ktp={"do_remote_decode": True, "transfer_id": transfer_id}) + # step 2: migrate to B (this is the timed transfer) + t_start_unix = time.time() + t_xfer, _ = await completion( + client, dst_host, dst_port, prompt, max_tokens=1, + ktp={"do_remote_prefill": True, "transfer_id": transfer_id, + "remote_engine_id": src_eid, + "remote_bootstrap_addr": src_bootstrap_addr}) + return { + "input_tokens": input_tokens, + "t_transfer_s": t_xfer, + "t_step2_start_unix": t_start_unix, + "t_step2_end_unix": time.time(), + "kv_bytes": input_tokens * KV_PER_TOK, + "eff_gbps": input_tokens * KV_PER_TOK / 1e9 / t_xfer if t_xfer > 0 else 0, + } + + +async def main_async(a): + sizes = [int(s) for s in a.sizes.split(",")] + bg_loads = [int(s) for s in a.bg_loads.split(",")] + src_host, dst_host = a.src_host, a.dst_host + limits = httpx.Limits(max_connections=256, max_keepalive_connections=256) + async with httpx.AsyncClient(limits=limits, trust_env=False) as client: + src_eid = await get_engine_id(client, src_host, a.src_bp) + src_bootstrap_addr = f"http://{src_host}:{a.src_bp}" + print(f"[mb6] src eid={src_eid[:16]}... endpoints A={src_host}:{a.src_port} " + f"B={dst_host}:{a.dst_port}") + + endpoints = [(src_host, a.src_port), (dst_host, a.dst_port)] + results = [] + for bg in bg_loads: + loader = None + if bg > 0: + loader = BackgroundLoad(client, endpoints, bg, + prompt_tokens=a.bg_prompt, + out_tokens=a.bg_out) + loader.start() + # wait for instances to actually be busy + print(f"[mb6] bg={bg}: ramping background load ...") + for _ in range(40): + await asyncio.sleep(1.0) + na = await num_running(client, src_host, a.src_port) + nb = await num_running(client, dst_host, a.dst_port) + if na >= 1 and nb >= 1: + print(f"[mb6] bg={bg}: busy (A running={na} B running={nb})") + break + else: + print(f"[mb6] bg=0: idle baseline") + # ensure idle + await asyncio.sleep(2.0) + + for sz in sizes: + for rep in range(a.repeats): + na = await num_running(client, src_host, a.src_port) + nb = await num_running(client, dst_host, a.dst_port) + row = await measure_transfer( + client, src_host, a.src_port, dst_host, a.dst_port, + src_eid, src_bootstrap_addr, sz, seed=sz * 100 + rep + bg * 7) + row["bg_load"] = bg + row["A_running_at_measure"] = na + row["B_running_at_measure"] = nb + results.append(row) + kv_mib = sz * KV_PER_TOK / 2**20 + print(f" bg={bg:>3} size={sz:>6} ({kv_mib:6.0f}MiB) rep={rep} " + f"A_run={na:>2} B_run={nb:>2} " + f"transfer={row['t_transfer_s']*1000:7.0f}ms " + f"eff={row['eff_gbps']:5.2f}GB/s") + + if loader: + await loader.stop() + # let the instances drain before next bg level + print(f"[mb6] bg={bg}: draining ...") + for _ in range(60): + await asyncio.sleep(1.0) + na = await num_running(client, src_host, a.src_port) + nb = await num_running(client, dst_host, a.dst_port) + if na <= 0 and nb <= 0: + break + + # summary per (bg, size) + print("\n=== summary: effective transfer bandwidth vs background load ===") + print(f"{'bg':>4} {'size':>7} {'n':>3} {'xfer_p50_ms':>12} {'eff_p50_GBps':>13} " + f"{'eff_mean':>9}") + summary = [] + for bg in bg_loads: + for sz in sizes: + rs = [r for r in results if r["bg_load"] == bg and r["input_tokens"] == sz] + if not rs: + continue + xfer = sorted(r["t_transfer_s"] for r in rs) + eff = sorted(r["eff_gbps"] for r in rs) + p50x = xfer[len(xfer) // 2] + p50e = eff[len(eff) // 2] + meane = statistics.mean(eff) + summary.append({"bg": bg, "size": sz, "n": len(rs), + "xfer_p50_ms": p50x * 1000, + "eff_p50_gbps": p50e, "eff_mean_gbps": meane}) + print(f"{bg:>4} {sz:>7} {len(rs):>3} {p50x*1000:>12.0f} " + f"{p50e:>13.2f} {meane:>9.2f}") + + Path(a.out).write_text(json.dumps( + {"model": MODEL_PATH, "kv_bytes_per_token": KV_PER_TOK, + "label": a.label, "raw": results, "summary": summary}, indent=2)) + print(f"\n[mb6] wrote {a.out}") + + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--src-host", default="127.0.0.1") + p.add_argument("--dst-host", default="127.0.0.1") + p.add_argument("--src-port", type=int, default=8000) + p.add_argument("--dst-port", type=int, default=8001) + p.add_argument("--src-bp", type=int, default=8998) + p.add_argument("--dst-bp", type=int, default=8999) + p.add_argument("--sizes", default="16384,65536") + p.add_argument("--bg-loads", default="0,8,24") + p.add_argument("--repeats", type=int, default=4) + p.add_argument("--bg-prompt", type=int, default=2000) + p.add_argument("--bg-out", type=int, default=6000) + p.add_argument("--label", default="main-venv") + p.add_argument("--out", default="mb6_result.json") + args = p.parse_args() + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + main() diff --git a/microbench/fresh_setup/run_mb6.sh b/microbench/fresh_setup/run_mb6.sh new file mode 100755 index 0000000..cd7acad --- /dev/null +++ b/microbench/fresh_setup/run_mb6.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash +# MB6 launcher: 2 vLLM instances (kv_both, Mooncake) + transfer-under-load +# sweep. Parameterized by VENV so it runs on either the patched main venv +# or the fresh upstream venv, to test whether the bandwidth degradation is +# our patch or inherent to upstream mooncake. +# +# Usage: +# VENV=/home/admin/cpfs/wjh/agentic-kv/.venv bash run_mb6.sh # main +# VENV=/home/admin/cpfs/wjh/agentic-kv-fresh/.venv bash run_mb6.sh # fresh + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +VENV="${VENV:-$PROJ_DIR/.venv}" +LABEL="${LABEL:-$(basename $(dirname $VENV))}" +MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}" +GPUS="${GPUS:-0 1}" +SIZES="${SIZES:-16384,65536}" +BG_LOADS="${BG_LOADS:-0,8,24}" +REPEATS="${REPEATS:-4}" +DATE="$(date +%Y%m%d_%H%M)" +OUTDIR="${OUTDIR:-$PROJ_DIR/outputs/mb6_${LABEL}_${DATE}}" +PYTHON="$VENV/bin/python" +MC_INSTR="$PROJ_DIR/microbench/fresh_setup/instrument_mooncake.py" +DRIVER="$PROJ_DIR/microbench/fresh_setup/mb6_transfer_under_load.py" +MC_FILE="$VENV/lib/python3.12/site-packages/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py" + +mkdir -p "$OUTDIR/logs" +XFER_LOG_DIR="$OUTDIR/xfer_log"; mkdir -p "$XFER_LOG_DIR" + +echo "=== MB6 transfer-under-load ($LABEL) ===" +echo "VENV : $VENV" +echo "Out : $OUTDIR" +echo "" + +PORTS=(8000 8001); BPS=(8998 8999) +gpu_arr=($GPUS) + +cleanup() { + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 4 + "$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --revert 2>/dev/null || true +} +trap cleanup EXIT +cleanup + +echo "[0] apply MB2 mooncake instrument to $LABEL venv" +"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --apply +"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --check + +echo "[1] launch 2 instances" +i=0 +for gpu in ${gpu_arr[@]:0:2}; do + port=${PORTS[$i]}; bp=${BPS[$i]}; master=$((29600 + i)) + PYTHONHASHSEED=42 \ + VLLM_MOONCAKE_BOOTSTRAP_PORT=$bp \ + MB2_LOG_DIR="$XFER_LOG_DIR" \ + CUDA_VISIBLE_DEVICES=$gpu \ + MASTER_PORT=$master \ + nohup "$VENV/bin/vllm" serve "$MODEL" \ + --host 0.0.0.0 --port "$port" \ + --tensor-parallel-size 1 --trust-remote-code --enable-prefix-caching \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + --enable-prompt-tokens-details \ + > "$OUTDIR/logs/vllm_${i}_gpu${gpu}.log" 2>&1 & + disown + sleep 2 + i=$((i + 1)) +done + +echo "[2] wait for health" +for i in 0 1; do + port=${PORTS[$i]}; tries=0 + while ! curl -sf "http://127.0.0.1:$port/health" >/dev/null 2>&1; do + tries=$((tries + 1)) + if [ $tries -gt 180 ]; then echo "FATAL inst_$i not healthy"; exit 1; fi + sleep 2 + done + echo " inst_$i ready" +done +# bootstrap /query reachable? +for i in 0 1; do + bp=${BPS[$i]}; tries=0 + while ! curl -sf "http://127.0.0.1:$bp/query" >/dev/null 2>&1; do + tries=$((tries + 1)) + if [ $tries -gt 60 ]; then echo "WARN bootstrap $bp not ready"; break; fi + sleep 2 + done +done + +echo "[3] run MB6 driver" +"$PYTHON" "$DRIVER" \ + --src-port "${PORTS[0]}" --dst-port "${PORTS[1]}" \ + --src-bp "${BPS[0]}" --dst-bp "${BPS[1]}" \ + --sizes "$SIZES" --bg-loads "$BG_LOADS" --repeats "$REPEATS" \ + --label "$LABEL" --out "$OUTDIR/mb6_result.json" \ + 2>&1 | tee "$OUTDIR/mb6_run.txt" + +echo "[4] teardown + revert" +pkill -9 -f "vllm serve" 2>/dev/null || true +pkill -9 -f "EngineCore" 2>/dev/null || true +sleep 4 +"$PYTHON" "$MC_INSTR" --venv "$MC_FILE" --revert + +echo "" +echo "Done. Artifacts in $OUTDIR/" +echo " mb6_result.json mb6_run.txt xfer_log/ logs/"