From 74374226183261ca6989a799ed81ba828af8105d Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Wed, 27 May 2026 17:47:04 +0800 Subject: [PATCH] MB2 scaffolding: launch script for vLLM pair + KV-transfer-time client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new files prepare measurement of T_transfer(KV_size, network_path), the gap §3.2's PD-disagg cost argument has had since day one. microbench/fresh_setup/start_vllm_pair.sh start | status | stop two vLLM 0.18.1 instances on local GPUs (A, B) with --kv-transfer-config '{"kv_connector":"MooncakeConnector", "kv_role":"kv_both"}' running off the fresh venv (vanilla wheel + vanilla mooncake 0.3.11, NOT the dash0 patched build). GPU IDs and ports are env-overridable so the same script drives the intra-node pair (GPU_A=0 GPU_B=1 on one host) and the inter-node pair (GPU_A=0 on dash1, GPU_B=0 on dash2 — launched per host separately). microbench/fresh_setup/mb2_kv_transfer.py Three-step measurement borrowed from connector_tax/.../smoke_test_ migrate_cache.py: 1. do_remote_decode on A (compute & cache KV; max_tokens=1) 2. do_remote_prefill on B (pull KV from A — this is the timed step) 3. plain completion on B (sanity check: cached_tokens ≈ prompt len) Sweeps input_tokens ∈ {512, 1k, 2k, 4k, 8k, 16k, 32k, 64k} with 5 repeats each; reports mean / p50 / p90 transfer time and a per-size raw log. Per-token KV is 98304 B (Qwen3-Coder-30B-A3B), so the upper end ≈ 6 GiB transfers — within the p99 11.5 GiB range from §2 but below it (the model's max_model_len 200000 caps the absolute upper). What we will NOT learn from this design: - Bandwidth saturation when the system is loaded (single-request bench) - vLLM-internal scheduling overhead vs pure transfer (the timed step folds them together — but for the §3.2 argument that's the right "what does PD-disagg actually pay" number) Intentionally not committed yet: an orchestrator that loops over intra-/inter-node configs. We start manual on dash1 intra-node to verify the measurement is sane before scaling out. Co-Authored-By: Claude Opus 4.7 --- microbench/fresh_setup/mb2_kv_transfer.py | 204 ++++++++++++++++++++++ microbench/fresh_setup/start_vllm_pair.sh | 105 +++++++++++ 2 files changed, 309 insertions(+) create mode 100755 microbench/fresh_setup/mb2_kv_transfer.py create mode 100755 microbench/fresh_setup/start_vllm_pair.sh diff --git a/microbench/fresh_setup/mb2_kv_transfer.py b/microbench/fresh_setup/mb2_kv_transfer.py new file mode 100755 index 0000000..fa44cfd --- /dev/null +++ b/microbench/fresh_setup/mb2_kv_transfer.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +"""MB2: measure KV transfer time between two vLLM instances over Mooncake. + +Pattern (adapted from microbench/connector_tax/cache_sweep/smoke_test_migrate_cache.py): + + 1. Prefill on A: do_remote_decode with max_tokens=1 (A computes & caches KV) + 2. Pull to B: do_remote_prefill on B with kv_transfer_params from step 1 + (this is the operation that performs the KV transfer) + 3. Verify: send a follow-up to B; cached_tokens should equal the + prompt length (confirms the KV landed on B) + +We time step 2 — that gives us E2E "transfer + B's prefill check" latency. +By sweeping input_length we trace T_transfer(KV_size). + +The follow-up step gives us a sanity check (correctness) but isn't timed. +""" +from __future__ import annotations + +import argparse +import asyncio +import json +import statistics +import time +import uuid +from pathlib import Path + +import httpx + +MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct" + + +async def get_engine_id(client: httpx.AsyncClient, port: int) -> str: + r = await client.get(f"http://127.0.0.1:{port}/query") + r.raise_for_status() + data = r.json() + return data["0"]["engine_id"] + + +async def completion( + client: httpx.AsyncClient, + port: int, + prompt_token_ids: list[int], + max_tokens: int, + kv_transfer_params: dict | None = None, +) -> tuple[float, dict]: + payload = { + "model": MODEL_PATH, + "prompt": prompt_token_ids, + "max_tokens": max_tokens, + "min_tokens": max_tokens if max_tokens == 1 else 1, + "temperature": 0.0, + "stream": False, + } + if kv_transfer_params: + payload["kv_transfer_params"] = kv_transfer_params + t0 = time.perf_counter() + r = await client.post( + f"http://127.0.0.1:{port}/v1/completions", + json=payload, timeout=600.0, + ) + elapsed_s = time.perf_counter() - t0 + r.raise_for_status() + return elapsed_s, r.json() + + +def synth_prompt(rng_seed: int, n_tokens: int) -> list[int]: + """Deterministic token-id sequence, far enough from special tokens.""" + import random + rng = random.Random(rng_seed) + return [rng.randint(100, 150000) for _ in range(n_tokens)] + + +async def measure_one( + client: httpx.AsyncClient, + src_port: int, dst_port: int, + src_eid: str, dst_eid: str, + input_tokens: int, + rng_seed: int, +) -> dict: + prompt = synth_prompt(rng_seed, input_tokens) + session = uuid.uuid4().hex + # Step 1: prefill on A. max_tokens=1 ensures KV is cached but no real decode work. + t_prefill_s, prefill_resp = await completion( + client, src_port, prompt, max_tokens=1, + kv_transfer_params={ + "do_remote_decode": True, + "remote_block_ids": None, + "remote_engine_id": src_eid, + "remote_host": "127.0.0.1", + "remote_port": src_port, + }, + ) + src_kvp = prefill_resp.get("kv_transfer_params") or {} + # Step 2: pull from A to B (the transfer step we time) + t_transfer_s, pull_resp = await completion( + client, dst_port, prompt, max_tokens=1, + kv_transfer_params={ + "do_remote_prefill": True, + "remote_block_ids": src_kvp.get("remote_block_ids"), + "remote_engine_id": src_eid, + "remote_host": "127.0.0.1", + "remote_port": src_kvp.get("remote_port", src_port), + }, + ) + # Step 3: follow-up, no kv_transfer_params — should hit B's cache fully + t_followup_s, follow_resp = await completion( + client, dst_port, prompt, max_tokens=1, + ) + usage = (follow_resp.get("usage") or {}) + details = usage.get("prompt_tokens_details") or {} + cached_followup = details.get("cached_tokens", 0) or usage.get("cached_tokens", 0) + + return { + "input_tokens": input_tokens, + "session": session, + "t_prefill_s": t_prefill_s, + "t_transfer_s": t_transfer_s, + "t_followup_s": t_followup_s, + "cached_followup": cached_followup, + "ok": cached_followup >= input_tokens * 0.9, # ≥90 % cached = transfer succeeded + } + + +async def main_async(args: argparse.Namespace) -> None: + sizes_str = args.sizes + sizes = [int(s) for s in sizes_str.split(",")] + repeats = args.repeats + src_port, dst_port = args.src_port, args.dst_port + + limits = httpx.Limits(max_connections=10, max_keepalive_connections=10) + async with httpx.AsyncClient(limits=limits, trust_env=False) as client: + src_eid = await get_engine_id(client, src_port) + dst_eid = await get_engine_id(client, dst_port) + print(f"[mb2] src_eid={src_eid[:16]}... dst_eid={dst_eid[:16]}...") + + results = [] + for sz in sizes: + for r in range(repeats): + row = await measure_one( + client, src_port, dst_port, src_eid, dst_eid, + input_tokens=sz, rng_seed=sz * 1000 + r, + ) + print(f" size={sz:>6} rep={r} " + f"transfer={row['t_transfer_s']*1000:7.1f}ms " + f"followup_cached={row['cached_followup']}/{sz} " + f"ok={row['ok']}") + results.append(row) + + # Summarise per-size + summary = [] + for sz in sizes: + ts = [r["t_transfer_s"] for r in results if r["input_tokens"] == sz and r["ok"]] + if not ts: + continue + summary.append({ + "input_tokens": sz, + "n_ok": len(ts), + "transfer_s_mean": statistics.mean(ts), + "transfer_s_p50": statistics.median(ts), + "transfer_s_p90": statistics.quantiles(ts, n=10)[-1] if len(ts) >= 10 else max(ts), + "transfer_s_min": min(ts), + "transfer_s_max": max(ts), + }) + + out = { + "model": MODEL_PATH, + "kv_bytes_per_token": 98304, + "src_port": src_port, + "dst_port": dst_port, + "config_label": args.label, + "raw": results, + "summary": summary, + } + Path(args.out).write_text(json.dumps(out, indent=2)) + print(f"[mb2] wrote {args.out}") + for s in summary: + sz = s["input_tokens"] + kv_mib = sz * 98304 / 1024 / 1024 + print(f" {sz:>6} tok ({kv_mib:>7.1f} MiB KV): " + f"mean {s['transfer_s_mean']*1000:7.1f} ms · " + f"p50 {s['transfer_s_p50']*1000:7.1f} · " + f"p90 {s['transfer_s_p90']*1000:7.1f} " + f"(n_ok={s['n_ok']})") + + +def main() -> None: + p = argparse.ArgumentParser() + p.add_argument("--src-port", type=int, default=8000) + p.add_argument("--dst-port", type=int, default=8001) + p.add_argument( + "--sizes", + default="512,1024,2048,4096,8192,16384,32768,65536", + help="Comma-separated input_token sizes to sweep", + ) + p.add_argument("--repeats", type=int, default=5) + p.add_argument("--label", default="intra-node", + help="Label written into the output (e.g. intra-node / inter-node)") + p.add_argument("--out", default="mb2_result.json") + args = p.parse_args() + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + main() diff --git a/microbench/fresh_setup/start_vllm_pair.sh b/microbench/fresh_setup/start_vllm_pair.sh new file mode 100755 index 0000000..d69a215 --- /dev/null +++ b/microbench/fresh_setup/start_vllm_pair.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash +# Start 2 vLLM instances with Mooncake kv_connector (kv_both) for MB2. +# +# Default config: both on local GPU 0 and 1 (intra-node A/B test). +# Override via GPU_A / GPU_B / HOST_A / HOST_B env vars. +# +# This uses the FRESH venv at /home/admin/cpfs/wjh/agentic-kv-fresh/.venv +# (vanilla vllm 0.18.1 + vanilla mooncake-transfer-engine 0.3.11), NOT +# the dash0 patched build. +# +# Usage: +# GPU_A=0 GPU_B=1 bash microbench/fresh_setup/start_vllm_pair.sh +# bash microbench/fresh_setup/start_vllm_pair.sh status +# bash microbench/fresh_setup/start_vllm_pair.sh stop + +set -eo pipefail + +FRESH_ROOT="/home/admin/cpfs/wjh/agentic-kv-fresh" +VENV="${FRESH_ROOT}/.venv" +MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}" +LOGS_DIR="${LOGS_DIR:-${FRESH_ROOT}/mb2_logs}" +mkdir -p "${LOGS_DIR}" + +GPU_A="${GPU_A:-0}" +GPU_B="${GPU_B:-1}" +PORT_A=8000 +PORT_B=8001 +BP_A=8998 +BP_B=8999 +MASTER_A=29500 +MASTER_B=29501 + +stop_all() { + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 2 +} + +case "${1:-start}" in + stop) + stop_all + exit 0 + ;; + status) + for p in "${PORT_A}" "${PORT_B}"; do + if curl -sf "http://127.0.0.1:${p}/health" >/dev/null 2>&1; then + echo "port ${p}: UP" + else + echo "port ${p}: DOWN" + fi + done + exit 0 + ;; + start) + ;; + *) + echo "Unknown command: $1"; exit 1;; +esac + +stop_all + +source "${VENV}/bin/activate" + +launch() { + local idx="$1" gpu="$2" port="$3" bp="$4" master="$5" + echo "[mb2] launching instance ${idx} on GPU ${gpu}, port ${port}, bp ${bp}" + PYTHONHASHSEED=42 \ + VLLM_MOONCAKE_BOOTSTRAP_PORT="${bp}" \ + CUDA_VISIBLE_DEVICES="${gpu}" \ + MASTER_PORT="${master}" \ + nohup vllm serve "${MODEL}" \ + --host 0.0.0.0 --port "${port}" \ + --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching \ + --dtype auto --gpu-memory-utilization 0.9 \ + --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + --enable-prompt-tokens-details \ + > "${LOGS_DIR}/vllm_${idx}_gpu${gpu}.log" 2>&1 & + disown +} + +launch A "${GPU_A}" "${PORT_A}" "${BP_A}" "${MASTER_A}" +sleep 3 +launch B "${GPU_B}" "${PORT_B}" "${BP_B}" "${MASTER_B}" + +echo "[mb2] waiting for both /health endpoints..." +for port in "${PORT_A}" "${PORT_B}"; do + tries=0 + while ! curl -sf "http://127.0.0.1:${port}/health" >/dev/null 2>&1; do + tries=$((tries+1)) + if [ ${tries} -gt 180 ]; then + echo "[mb2] FATAL port ${port} did not come up in 6 min" + tail -40 "${LOGS_DIR}/vllm_"*"_gpu"*".log" || true + exit 1 + fi + sleep 2 + done + echo " port=${port} ready" +done + +echo "[mb2] both instances UP" +echo " A: 127.0.0.1:${PORT_A} (GPU ${GPU_A}, bp ${BP_A})" +echo " B: 127.0.0.1:${PORT_B} (GPU ${GPU_B}, bp ${BP_B})" +echo " logs: ${LOGS_DIR}"