Layerwise KV transfer on Mooncake: PoC + microbench (worktree exploration)

Implements per-layer KV push during prefill (write mode) on vLLM's
MooncakeConnector, env-gated by MOONCAKE_LAYERWISE=1. 2-instance microbench
(mb7) shows correctness (KV lands, cached==prompt) and that the transfer is
hidden behind prefill compute: critical-path overhead drops from O(KV size)
(123/202/529ms for 8k/16k/32k) to a flat ~58ms (2-9x), with no prefill
slowdown, on idle instances. Caveats: idle-only, chunked-prefill disabled,
single concurrent transfer — see DESIGN.md.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 15:34:43 +08:00
parent e0d3b5150a
commit fec50fa45d
7 changed files with 3890 additions and 0 deletions

View File

@@ -0,0 +1,124 @@
# Layer-wise KV transfer on Mooncake — exploration
Goal: make vLLM's `MooncakeConnector` push KV **per-layer during prefill**
(write mode) instead of the current **post-hoc full-request transfer**, then
microbench correctness + whether it hides the transfer behind prefill compute
(the thing MoRIIO's write mode does on AMD; no NVIDIA connector ships it).
Everything here is isolated in worktree `worktree-mooncake-layerwise`. The
dash0 venv connector is backed up at `mooncake_connector.py.ORIG_BACKUP`;
revert = copy the backup back. Opt-in via env `MOONCAKE_LAYERWISE=1`, so with
the env unset the connector behaves exactly as upstream.
## Baseline flow (post-hoc, what we have)
1. Proxy: prefill on src (`do_remote_decode`, max_tokens=1) → **await done**
decode on dst (`do_remote_prefill`) which pulls.
2. dst `start_load_kv``receive_kv` sends ZMQ `MooncakeXferMetadata` (its block
addrs) to src bootstrap.
3. src `send_kv_to_decode`: waits `send_meta.ready` (set at `request_finished`,
i.e. **after full prefill**) → `_build_transfer_params` (all layers) →
`_send_blocks` (one big `batch_transfer_sync_write`) → FINISH response.
Measured: this full transfer is on the critical path, runs at ~3 GB/s under
load (vs ~10 GB/s idle), dominating migration TTFT.
## Layer-wise flow (write mode, this exploration)
Key idea: keep all RDMA + completion on the `sender_loop` thread (clean), but
issue **one `batch_transfer_sync_write` per layer**, each fired as soon as that
layer's KV is computed — so writes overlap the remaining prefill compute.
Signaling: `save_kv_layer(layer_name, ...)` (called by vLLM's attention hook
after each layer's forward, on the main worker thread) records "layer L
computed" and wakes the sender_loop. `send_kv_to_decode` loops L=0..N-1,
waits until L is computed, writes layer L's blocks, then sends FINISH.
### Edits to `mooncake_connector.py` (all gated by `_lw_enabled`)
1. **Worker `__init__`**: `_lw_enabled` (env), layer-name→position map,
`_lw_computed: dict[transfer_id,int]`, `_lw_active: set[transfer_id]`,
wake event, lock.
2. **`register_kv_caches`**: build `_lw_layer_pos[layer_name]` (0..N-1) and
`_lw_addr_idx[pos]` = indices into `kv_caches_base_addr` (×2 if
`split_k_and_v`).
3. **Scheduler `update_state_after_alloc`** (`do_remote_decode` branch): in
layer-wise mode capture `blocks.get_block_ids()[0]` and store non-empty in
`_reqs_need_send` so the worker learns local block_ids + sets `ready`
**before** prefill finishes.
4. **Worker `note_layer_computed(layer_name)`** (new) called from
`MooncakeConnector.save_kv_layer`: bump `_lw_computed[tid]` for active
producers, `call_soon_threadsafe(wake.set)`.
5. **Worker `send_kv_to_decode`**: in layer-wise mode, mark transfer active,
loop layers: await `_lw_computed[tid] >= L`, `_send_blocks` for layer L
only (subset of `_build_transfer_params`), then send FINISH.
6. **Worker `_build_layer_transfer_params`** (new): like
`_build_transfer_params` but only the addr indices for one layer position.
### Microbench requirements
- Disable chunked prefill (`--max-num-batched-tokens` ≥ prompt) so prefill is a
single forward and `save_kv_layer` fires once per layer in order.
- Dispatch the dst (`do_remote_prefill`) request **first/concurrently** so the
ZMQ handshake reaches src during prefill.
- Correctness: dst follow-up `cached_tokens == prompt_len` (KV landed),
identical to baseline.
- Perf: src prefill wall-clock (does layer-wise slow it?) and dst TTFT (does
transfer leave the critical path?), swept over KV size, vs baseline.
## Status
- [x] worktree + connector backup + design
- [x] modified connector (LAYERWISE.py, +193/-4 lines, env-gated)
- [x] correctness microbench (mb7_layerwise.py) + launcher (run_mb7.sh)
- [x] correctness run on dash0 — PASS (KV lands; cached == prompt)
- [x] perf run + verdict — POSITIVE (transfer hidden behind prefill)
## Results (2-instance, idle, chunked-prefill off, Qwen3-30B-A3B, 48 layers)
Metric: `overhead = total prefill_only` = the transfer cost left on the
critical path (TTFT). Baseline = post-hoc full pull (sequential).
| KV size | baseline overhead | **layerwise overhead** | reduction |
|--------:|------------------:|-----------------------:|----------:|
| 8192 (0.75 GiB) | 123 ms | **58 ms** | 2.1× |
| 16384 (1.5 GiB) | 202 ms | **58 ms** | 3.5× |
| 32768 (3.0 GiB) | 529 ms | **57 ms** | 9.3× |
Key signatures:
- **Layerwise overhead is ~constant (~58 ms)** regardless of KV size, while
baseline grows O(KV size). The 58 ms is handshake + last-layer tail + 1
decode; the bulk transfer is hidden behind prefill compute.
- **Prefill did NOT slow down**: layerwise `t_A` (575/1495/4440 ms) ==
`prefill_only` (574/1492/4440 ms). The concurrent RDMA was "free" on idle
GPUs — no measurable HBM contention with prefill compute here.
- Producer logs confirm the transfer itself took 0.39/0.55/4.37 s (grows with
size) yet ran *inside* the prefill window, so it left the critical path.
- **Correctness PASS**: B's follow-up cached == prompt for all sizes; the
48-layer / 96-base-addr (split K&V) per-layer addressing is correct.
## Caveats (why this is a proof-of-concept, not a verdict for production)
1. **Idle instances only.** Real migration happens between *busy* instances.
Under load both prefill and transfer slow; transfer (even at ~3 GB/s) is
still < prefill for big contexts so it should still hide, but receive-side
(B) and HBM contention during prefill are untested here. NEXT: rerun with
background load on both A and B.
2. **Chunked prefill disabled.** The monotonic layer counter assumes one
forward, layers in order. Production uses chunked prefill (multi-step),
which needs per-(chunk,layer) tracking not implemented.
3. **Single concurrent producer transfer.** Global counter; real migration is
concurrent. Would need per-transfer state.
4. **Microbench dispatch.** mb7 fires B then A with a 50 ms head start to get
the handshake to A before its forward. The real proxy path
(`_handle_combined_pd_sep_v2`) dispatches sequentially and would need the
write-mode (concurrent) restructure.
## Verdict
The mechanism **works and delivers the predicted benefit**: layer-wise push
turns migration's KV-transfer cost from O(KV size) on the critical path into a
near-constant tail, by overlapping it with prefill compute exactly what
MoRIIO's write mode does on AMD, now demonstrated on NVIDIA/Mooncake. Whether
it flips agentic *migration* to net-positive still depends on the busy-instance
behavior (caveat 1) and is the next experiment.

View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""MB7: correctness + perf of layer-wise KV push vs post-hoc transfer.
Two 2-instance modes against A (src/producer) and B (dst/consumer):
baseline : prefill A (await) -> THEN B pulls (post-hoc full transfer).
T_total = T_prefill + T_xfer (sequential)
layerwise : dispatch B's remote-prefill (handshake) and A's prefill
CONCURRENTLY, so A pushes each layer as it computes it.
If overlap works, T_total ~= max(T_prefill, T_xfer) ~= T_prefill.
Reference: T_prefill_only = a plain prefill on A with no transfer.
Correctness: after the transfer, a plain follow-up to B on the same prompt
must report cached_tokens >= ~prompt_len (the KV actually landed on B).
The connector mode is selected by the launcher (run_mb7.sh): baseline uses the
stock connector; layerwise deploys mooncake_connector.LAYERWISE.py +
MOONCAKE_LAYERWISE=1. This script just drives the requests and measures.
Usage:
python mb7_layerwise.py --mode layerwise --sizes 8192,32768,65536 --repeats 3 \
--src-port 8000 --dst-port 8001 --src-bp 8998 --dst-bp 8999 --out mb7.json
"""
from __future__ import annotations
import argparse
import asyncio
import json
import statistics
import time
import uuid
from pathlib import Path
import httpx
MODEL = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct"
KV_PER_TOK = 98304
def synth_prompt(seed: int, n: int) -> list[int]:
import random
rng = random.Random(seed)
return [rng.randint(100, 150000) for _ in range(n)]
async def get_engine_id(client, host, bp):
r = await client.get(f"http://{host}:{bp}/query")
r.raise_for_status()
return r.json()["0"]["engine_id"]
async def completion(client, host, port, prompt, max_tokens, ktp=None):
payload = {
"model": MODEL, "prompt": prompt, "max_tokens": max_tokens,
"min_tokens": max_tokens if max_tokens == 1 else 1,
"temperature": 0.0, "stream": False,
}
if ktp:
payload["kv_transfer_params"] = ktp
t0 = time.perf_counter()
r = await client.post(f"http://{host}:{port}/v1/completions",
json=payload, timeout=600.0)
dt = time.perf_counter() - t0
r.raise_for_status()
return dt, r.json()
def cached_of(resp) -> int:
usage = resp.get("usage") or {}
det = usage.get("prompt_tokens_details") or {}
return det.get("cached_tokens", 0) or usage.get("cached_tokens", 0) or 0
async def prefill_only(client, host, port, prompt):
"""Reference: plain prefill cost on A, no transfer."""
dt, _ = await completion(client, host, port, prompt, max_tokens=1)
return dt
async def measure_baseline(client, A, B, src_eid, src_bp_addr, prompt, seed):
tid = uuid.uuid4().hex
t0 = time.perf_counter()
t_pf, _ = await completion(client, *A, prompt, 1,
ktp={"do_remote_decode": True, "transfer_id": tid})
t_xfer, _ = await completion(client, *B, prompt, 1,
ktp={"do_remote_prefill": True, "transfer_id": tid,
"remote_engine_id": src_eid,
"remote_bootstrap_addr": src_bp_addr})
t_total = time.perf_counter() - t0
# correctness: B follow-up should hit cache
_, fr = await completion(client, *B, prompt, 1)
return {"t_prefill_s": t_pf, "t_xfer_s": t_xfer, "t_total_s": t_total,
"cached": cached_of(fr)}
async def measure_layerwise(client, A, B, src_eid, src_bp_addr, prompt, seed):
"""Dispatch B handshake + A prefill concurrently => layer-wise overlap."""
tid = uuid.uuid4().hex
t0 = time.perf_counter()
async def run_B():
return await completion(client, *B, prompt, 1,
ktp={"do_remote_prefill": True, "transfer_id": tid,
"remote_engine_id": src_eid,
"remote_bootstrap_addr": src_bp_addr})
async def run_A():
# small head start for B's handshake to reach A before A's forward
await asyncio.sleep(0.05)
return await completion(client, *A, prompt, 1,
ktp={"do_remote_decode": True, "transfer_id": tid})
b_task = asyncio.create_task(run_B())
a_task = asyncio.create_task(run_A())
(t_b, _), (t_a, _) = await asyncio.gather(b_task, a_task)
t_total = time.perf_counter() - t0
_, fr = await completion(client, *B, prompt, 1)
return {"t_A_s": t_a, "t_B_s": t_b, "t_total_s": t_total,
"cached": cached_of(fr)}
async def main_async(a):
sizes = [int(s) for s in a.sizes.split(",")]
A = (a.src_host, a.src_port)
B = (a.dst_host, a.dst_port)
limits = httpx.Limits(max_connections=64, max_keepalive_connections=64)
async with httpx.AsyncClient(limits=limits, trust_env=False) as client:
src_eid = await get_engine_id(client, a.src_host, a.src_bp)
src_bp_addr = f"http://{a.src_host}:{a.src_bp}"
print(f"[mb7] mode={a.mode} src_eid={src_eid[:16]}...")
results = []
for sz in sizes:
for rep in range(a.repeats):
prompt = synth_prompt(sz * 100 + rep, sz)
# reference prefill-only cost (fresh prompt, different seed so no cache)
t_pf_only = await prefill_only(
client, *A, synth_prompt(sz * 100 + rep + 555, sz))
if a.mode == "baseline":
row = await measure_baseline(client, A, B, src_eid, src_bp_addr,
prompt, sz * 100 + rep)
else:
row = await measure_layerwise(client, A, B, src_eid, src_bp_addr,
prompt, sz * 100 + rep)
row.update({"mode": a.mode, "size": sz, "rep": rep,
"t_prefill_only_s": t_pf_only,
"kv_gib": sz * KV_PER_TOK / 2**30,
"correct": row["cached"] >= int(sz * 0.9)})
results.append(row)
extra = (f"xfer={row.get('t_xfer_s', 0)*1000:.0f}ms"
if a.mode == "baseline"
else f"tA={row.get('t_A_s',0)*1000:.0f}ms tB={row.get('t_B_s',0)*1000:.0f}ms")
print(f" sz={sz:>6} rep={rep} pf_only={t_pf_only*1000:6.0f}ms "
f"total={row['t_total_s']*1000:7.0f}ms {extra} "
f"cached={row['cached']}/{sz} correct={row['correct']}")
# summary
print(f"\n=== {a.mode} summary ===")
print(f"{'size':>7} {'n':>2} {'pf_only_ms':>11} {'total_ms':>9} "
f"{'overhead_ms':>12} {'correct':>8}")
summary = []
for sz in sizes:
rs = [r for r in results if r["size"] == sz]
if not rs:
continue
pf = statistics.median(r["t_prefill_only_s"] for r in rs) * 1000
tot = statistics.median(r["t_total_s"] for r in rs) * 1000
allok = all(r["correct"] for r in rs)
# overhead = total - prefill_only = the part NOT hidden behind prefill
overhead = tot - pf
summary.append({"size": sz, "n": len(rs), "pf_only_ms": pf,
"total_ms": tot, "overhead_ms": overhead,
"all_correct": allok})
print(f"{sz:>7} {len(rs):>2} {pf:>11.0f} {tot:>9.0f} {overhead:>12.0f} "
f"{str(allok):>8}")
Path(a.out).write_text(json.dumps(
{"mode": a.mode, "model": MODEL, "raw": results, "summary": summary}, indent=2))
print(f"\n[mb7] wrote {a.out}")
def main():
p = argparse.ArgumentParser()
p.add_argument("--mode", choices=["baseline", "layerwise"], required=True)
p.add_argument("--src-host", default="127.0.0.1")
p.add_argument("--dst-host", default="127.0.0.1")
p.add_argument("--src-port", type=int, default=8000)
p.add_argument("--dst-port", type=int, default=8001)
p.add_argument("--src-bp", type=int, default=8998)
p.add_argument("--dst-bp", type=int, default=8999)
p.add_argument("--sizes", default="8192,32768,65536")
p.add_argument("--repeats", type=int, default=3)
p.add_argument("--out", default="mb7_result.json")
args = p.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,140 @@
{
"mode": "baseline",
"model": "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct",
"raw": [
{
"t_prefill_s": 0.5736213000018324,
"t_xfer_s": 0.36388630099827424,
"t_total_s": 0.9375749369974073,
"cached": 8176,
"mode": "baseline",
"size": 8192,
"rep": 0,
"t_prefill_only_s": 1.0551288530004967,
"kv_gib": 0.75,
"correct": true
},
{
"t_prefill_s": 0.5740011439993395,
"t_xfer_s": 0.12374231500143651,
"t_total_s": 0.6978207100000873,
"cached": 8176,
"mode": "baseline",
"size": 8192,
"rep": 1,
"t_prefill_only_s": 0.5743715360003989,
"kv_gib": 0.75,
"correct": true
},
{
"t_prefill_s": 0.5732713990000775,
"t_xfer_s": 0.10885842400239198,
"t_total_s": 0.6821924389987544,
"cached": 8176,
"mode": "baseline",
"size": 8192,
"rep": 2,
"t_prefill_only_s": 0.5745713680007611,
"kv_gib": 0.75,
"correct": true
},
{
"t_prefill_s": 1.4892208660021424,
"t_xfer_s": 0.2091717740004242,
"t_total_s": 1.6984740270017937,
"cached": 16368,
"mode": "baseline",
"size": 16384,
"rep": 0,
"t_prefill_only_s": 1.4990949730017746,
"kv_gib": 1.5,
"correct": true
},
{
"t_prefill_s": 1.4885207330007688,
"t_xfer_s": 0.2010940889995254,
"t_total_s": 1.6896768289989268,
"cached": 16368,
"mode": "baseline",
"size": 16384,
"rep": 1,
"t_prefill_only_s": 1.4898170189990196,
"kv_gib": 1.5,
"correct": true
},
{
"t_prefill_s": 1.4895933570005582,
"t_xfer_s": 0.2026357979993918,
"t_total_s": 1.6922962099997676,
"cached": 16368,
"mode": "baseline",
"size": 16384,
"rep": 2,
"t_prefill_only_s": 1.4907751430000644,
"kv_gib": 1.5,
"correct": true
},
{
"t_prefill_s": 4.438586502998078,
"t_xfer_s": 0.37847799000155646,
"t_total_s": 4.817142683001293,
"cached": 32752,
"mode": "baseline",
"size": 32768,
"rep": 0,
"t_prefill_only_s": 4.437922253000579,
"kv_gib": 3.0,
"correct": true
},
{
"t_prefill_s": 4.4350325649975275,
"t_xfer_s": 0.5313337980005599,
"t_total_s": 4.966431269000168,
"cached": 32752,
"mode": "baseline",
"size": 32768,
"rep": 1,
"t_prefill_only_s": 4.437473922000208,
"kv_gib": 3.0,
"correct": true
},
{
"t_prefill_s": 4.436279826000828,
"t_xfer_s": 0.6335160570015432,
"t_total_s": 5.069869226001174,
"cached": 32752,
"mode": "baseline",
"size": 32768,
"rep": 2,
"t_prefill_only_s": 4.440119222999783,
"kv_gib": 3.0,
"correct": true
}
],
"summary": [
{
"size": 8192,
"n": 3,
"pf_only_ms": 574.5713680007611,
"total_ms": 697.8207100000873,
"overhead_ms": 123.24934199932613,
"all_correct": true
},
{
"size": 16384,
"n": 3,
"pf_only_ms": 1490.7751430000644,
"total_ms": 1692.2962099997676,
"overhead_ms": 201.52106699970318,
"all_correct": true
},
{
"size": 32768,
"n": 3,
"pf_only_ms": 4437.922253000579,
"total_ms": 4966.431269000168,
"overhead_ms": 528.5090159995889,
"all_correct": true
}
]
}

View File

@@ -0,0 +1,140 @@
{
"mode": "layerwise",
"model": "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct",
"raw": [
{
"t_A_s": 0.5749198459998297,
"t_B_s": 0.6508419569981925,
"t_total_s": 0.6509377910006151,
"cached": 8176,
"mode": "layerwise",
"size": 8192,
"rep": 0,
"t_prefill_only_s": 1.0447357020020718,
"kv_gib": 0.75,
"correct": true
},
{
"t_A_s": 0.574626908000937,
"t_B_s": 0.6306310719992325,
"t_total_s": 0.6307087300010608,
"cached": 8176,
"mode": "layerwise",
"size": 8192,
"rep": 1,
"t_prefill_only_s": 0.5731983850018878,
"kv_gib": 0.75,
"correct": true
},
{
"t_A_s": 0.5756587910000235,
"t_B_s": 0.6316753270002664,
"t_total_s": 0.6317471290021786,
"cached": 8176,
"mode": "layerwise",
"size": 8192,
"rep": 2,
"t_prefill_only_s": 0.5737888650000968,
"kv_gib": 0.75,
"correct": true
},
{
"t_A_s": 1.4953326409995498,
"t_B_s": 1.5502465710014803,
"t_total_s": 1.5503262860001996,
"cached": 16368,
"mode": "layerwise",
"size": 16384,
"rep": 0,
"t_prefill_only_s": 1.5000705940001353,
"kv_gib": 1.5,
"correct": true
},
{
"t_A_s": 1.493850356000621,
"t_B_s": 1.5505031290012994,
"t_total_s": 1.5505791659998067,
"cached": 16368,
"mode": "layerwise",
"size": 16384,
"rep": 1,
"t_prefill_only_s": 1.4924546469992492,
"kv_gib": 1.5,
"correct": true
},
{
"t_A_s": 1.4979969070009247,
"t_B_s": 1.554968774002191,
"t_total_s": 1.5551903560008213,
"cached": 16368,
"mode": "layerwise",
"size": 16384,
"rep": 2,
"t_prefill_only_s": 1.4914496510027675,
"kv_gib": 1.5,
"correct": true
},
{
"t_A_s": 4.4403588690001925,
"t_B_s": 4.496483378999983,
"t_total_s": 4.4965666819989565,
"cached": 32752,
"mode": "layerwise",
"size": 32768,
"rep": 0,
"t_prefill_only_s": 4.440080869000667,
"kv_gib": 3.0,
"correct": true
},
{
"t_A_s": 4.44209005599987,
"t_B_s": 4.499940814999718,
"t_total_s": 4.500021006002498,
"cached": 32752,
"mode": "layerwise",
"size": 32768,
"rep": 1,
"t_prefill_only_s": 4.440225810998527,
"kv_gib": 3.0,
"correct": true
},
{
"t_A_s": 4.437084657998639,
"t_B_s": 4.496842522999941,
"t_total_s": 4.496926485000586,
"cached": 32752,
"mode": "layerwise",
"size": 32768,
"rep": 2,
"t_prefill_only_s": 4.439449855002749,
"kv_gib": 3.0,
"correct": true
}
],
"summary": [
{
"size": 8192,
"n": 3,
"pf_only_ms": 573.7888650000968,
"total_ms": 631.7471290021786,
"overhead_ms": 57.958264002081705,
"all_correct": true
},
{
"size": 16384,
"n": 3,
"pf_only_ms": 1492.4546469992492,
"total_ms": 1550.5791659998067,
"overhead_ms": 58.124519000557484,
"all_correct": true
},
{
"size": 32768,
"n": 3,
"pf_only_ms": 4440.080869000667,
"total_ms": 4496.926485000586,
"overhead_ms": 56.845615999918664,
"all_correct": true
}
]
}

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env bash
# MB7 launcher (runs on dash0). Two 2-instance modes selected by MODE env:
# MODE=baseline : restore stock connector, no layerwise env
# MODE=layerwise : deploy mooncake_connector.LAYERWISE.py + MOONCAKE_LAYERWISE=1
#
# Chunked prefill is DISABLED (max-num-batched-tokens >= max prompt) so the
# producer prefill is a single forward and save_kv_layer fires once per layer
# in order — the layer-wise counter assumes this.
#
# The connector is always restored from .ORIG_BACKUP on exit.
#
# Usage (on dash0):
# MODE=baseline bash run_mb7.sh
# MODE=layerwise bash run_mb7.sh
set -uo pipefail
MODE="${MODE:-baseline}"
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
VENV="${VENV:-$PROJ_DIR/.venv}"
MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}"
GPUS=(${GPUS:-0 1})
SIZES="${SIZES:-8192,16384,32768}"
REPEATS="${REPEATS:-3}"
MAX_BATCHED="${MAX_BATCHED:-40960}" # >= max prompt => no chunked prefill
DATE="$(date +%Y%m%d_%H%M)"
OUTDIR="${OUTDIR:-$PROJ_DIR/outputs/mb7_${MODE}_${DATE}}"
PYTHON="$VENV/bin/python"
MC_FILE="$VENV/lib/python3.12/site-packages/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
LW_SRC="${LW_SRC:-/tmp/mooncake_connector.LAYERWISE.py}"
DRIVER="$PROJ_DIR/microbench/connector_tax/layerwise/mb7_layerwise.py"
mkdir -p "$OUTDIR/logs"
PORTS=(8000 8001); BPS=(8998 8999)
echo "=== MB7 ($MODE) ==="
echo "Out: $OUTDIR ; connector: $MC_FILE"
restore_connector() {
if [ -f "$MC_FILE.ORIG_BACKUP" ]; then
cp -f "$MC_FILE.ORIG_BACKUP" "$MC_FILE"
echo "[restore] connector reset to ORIG"
fi
}
cleanup() {
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 4
restore_connector
}
trap cleanup EXIT
pkill -9 -f "vllm serve" 2>/dev/null || true; sleep 3
# Deploy the connector for the chosen mode.
if [ "$MODE" = "layerwise" ]; then
if [ ! -f "$LW_SRC" ]; then echo "FATAL: $LW_SRC not found (scp it first)"; exit 1; fi
cp -f "$LW_SRC" "$MC_FILE"
"$PYTHON" -c "import ast; ast.parse(open('$MC_FILE').read()); print('[deploy] LAYERWISE connector AST OK')" || exit 1
LW_ENV="MOONCAKE_LAYERWISE=1"
else
restore_connector
LW_ENV=""
fi
echo "[launch] 2 instances (max-num-batched-tokens=$MAX_BATCHED, chunked-prefill off)"
i=0
for gpu in "${GPUS[@]:0:2}"; do
port=${PORTS[$i]}; bp=${BPS[$i]}; master=$((29700 + i))
env $LW_ENV \
PYTHONHASHSEED=42 VLLM_MOONCAKE_BOOTSTRAP_PORT=$bp \
CUDA_VISIBLE_DEVICES=$gpu MASTER_PORT=$master \
nohup "$VENV/bin/vllm" serve "$MODEL" \
--host 0.0.0.0 --port "$port" --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --dtype auto \
--gpu-memory-utilization 0.9 --max-model-len 200000 \
--max-num-batched-tokens "$MAX_BATCHED" \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
--enable-prompt-tokens-details \
> "$OUTDIR/logs/vllm_${i}_gpu${gpu}.log" 2>&1 &
disown; sleep 2; i=$((i + 1))
done
echo "[health] waiting ..."
for i in 0 1; do
port=${PORTS[$i]}; tries=0
while ! curl -sf "http://127.0.0.1:$port/health" >/dev/null 2>&1; do
tries=$((tries + 1)); [ $tries -gt 180 ] && { echo "FATAL inst_$i"; exit 1; }
sleep 2
done
echo " inst_$i ready"
done
for i in 0 1; do
bp=${BPS[$i]}; tries=0
while ! curl -sf "http://127.0.0.1:$bp/query" >/dev/null 2>&1; do
tries=$((tries+1)); [ $tries -gt 60 ] && { echo "WARN bp $bp"; break; }; sleep 2
done
done
echo "[run] mb7 --mode $MODE"
"$PYTHON" "$DRIVER" --mode "$MODE" \
--src-port "${PORTS[0]}" --dst-port "${PORTS[1]}" \
--src-bp "${BPS[0]}" --dst-bp "${BPS[1]}" \
--sizes "$SIZES" --repeats "$REPEATS" --out "$OUTDIR/mb7_result.json" \
2>&1 | tee "$OUTDIR/mb7_run.txt"
echo "[done] $OUTDIR"
# grep layerwise transfer logs from the producer (gpu0) for sanity
if [ "$MODE" = "layerwise" ]; then
echo "=== producer layerwise log lines ==="
grep -i "layerwise" "$OUTDIR/logs/vllm_0_gpu${GPUS[0]}.log" | tail -10 || true
fi