From 50f72d88754153ede65090b558f82601e4db34ac Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Wed, 27 May 2026 20:26:54 +0800 Subject: [PATCH] MB2 inter-node scaffolding: per-host single-instance launcher + client host args Adds the pieces needed to run the producer on dash1 and the consumer on dash2 with the same shared cpfs venv: start_vllm_single.sh INSTANCE / GPU / PORT / BP / MASTER / ROLE env vars; brings up ONE vLLM instance + applies the mooncake instrumentation patch (idempotent since the venv is cpfs-shared, so the first invocation applies and the second is a no-op). Per-instance MB2_LOG_DIR keeps producer/consumer events separate even though both directories live on the same cpfs path visible to both hosts. mb2_kv_transfer.py New --src-host / --dst-host args. Defaults stay 127.0.0.1 for backward-compat with the intra-node sweep. /v1/completions URLs and /query URLs now use the supplied hosts. remote_bootstrap_addr is built as http://: so the consumer's do_remote_prefill request carries a routable address. Co-Authored-By: Claude Opus 4.7 --- microbench/fresh_setup/mb2_kv_transfer.py | 40 ++++++--- microbench/fresh_setup/start_vllm_single.sh | 99 +++++++++++++++++++++ 2 files changed, 125 insertions(+), 14 deletions(-) create mode 100755 microbench/fresh_setup/start_vllm_single.sh diff --git a/microbench/fresh_setup/mb2_kv_transfer.py b/microbench/fresh_setup/mb2_kv_transfer.py index 05871a1..f70a81d 100755 --- a/microbench/fresh_setup/mb2_kv_transfer.py +++ b/microbench/fresh_setup/mb2_kv_transfer.py @@ -29,10 +29,10 @@ import httpx MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct" -async def get_engine_id(client: httpx.AsyncClient, bootstrap_port: int) -> str: +async def get_engine_id(client: httpx.AsyncClient, host: str, bootstrap_port: int) -> str: """The /query endpoint lives on the mooncake bootstrap port, not the vLLM HTTP serving port.""" - r = await client.get(f"http://127.0.0.1:{bootstrap_port}/query") + r = await client.get(f"http://{host}:{bootstrap_port}/query") r.raise_for_status() data = r.json() return data["0"]["engine_id"] @@ -40,6 +40,7 @@ async def get_engine_id(client: httpx.AsyncClient, bootstrap_port: int) -> str: async def completion( client: httpx.AsyncClient, + host: str, port: int, prompt_token_ids: list[int], max_tokens: int, @@ -57,7 +58,7 @@ async def completion( payload["kv_transfer_params"] = kv_transfer_params t0 = time.perf_counter() r = await client.post( - f"http://127.0.0.1:{port}/v1/completions", + f"http://{host}:{port}/v1/completions", json=payload, timeout=600.0, ) elapsed_s = time.perf_counter() - t0 @@ -74,7 +75,8 @@ def synth_prompt(rng_seed: int, n_tokens: int) -> list[int]: async def measure_one( client: httpx.AsyncClient, - src_port: int, dst_port: int, + src_host: str, src_port: int, + dst_host: str, dst_port: int, src_eid: str, dst_eid: str, src_bootstrap_addr: str, input_tokens: int, @@ -95,7 +97,7 @@ async def measure_one( transfer_id = uuid.uuid4().hex t_step1_client = time.time() t_prefill_s, prefill_resp = await completion( - client, src_port, prompt, max_tokens=1, + client, src_host, src_port, prompt, max_tokens=1, kv_transfer_params={ "do_remote_decode": True, "transfer_id": transfer_id, @@ -103,7 +105,7 @@ async def measure_one( ) t_step2_client = time.time() t_transfer_s, pull_resp = await completion( - client, dst_port, prompt, max_tokens=1, + client, dst_host, dst_port, prompt, max_tokens=1, kv_transfer_params={ "do_remote_prefill": True, "transfer_id": transfer_id, @@ -117,7 +119,7 @@ async def measure_one( t_followup_s = None if not skip_verify: t_followup_s, follow_resp = await completion( - client, dst_port, prompt, max_tokens=1, + client, dst_host, dst_port, prompt, max_tokens=1, ) usage = (follow_resp.get("usage") or {}) details = usage.get("prompt_tokens_details") or {} @@ -148,28 +150,31 @@ async def main_async(args: argparse.Namespace) -> None: sizes_str = args.sizes sizes = [int(s) for s in sizes_str.split(",")] repeats = args.repeats + src_host, dst_host = args.src_host, args.dst_host src_port, dst_port = args.src_port, args.dst_port src_bp, dst_bp = args.src_bp, args.dst_bp limits = httpx.Limits(max_connections=10, max_keepalive_connections=10) async with httpx.AsyncClient(limits=limits, trust_env=False) as client: - src_eid = await get_engine_id(client, src_bp) - # In strict PD-disagg only the producer binds /query; consumers don't. + src_eid = await get_engine_id(client, src_host, src_bp) try: - dst_eid = await get_engine_id(client, dst_bp) + dst_eid = await get_engine_id(client, dst_host, dst_bp) except Exception as exc: - print(f"[mb2] dst bootstrap ({dst_bp}) unreachable ({exc.__class__.__name__}); " + print(f"[mb2] dst bootstrap ({dst_host}:{dst_bp}) unreachable ({exc.__class__.__name__}); " f"running in strict-PD mode (dst is kv_consumer, no bootstrap).") dst_eid = None - print(f"[mb2] src_eid={src_eid[:16]}... dst_eid=" + print(f"[mb2] src={src_host}:{src_port} bp={src_bp} eid={src_eid[:16]}...") + print(f"[mb2] dst={dst_host}:{dst_port} bp={dst_bp} eid=" f"{(dst_eid[:16] + '...') if dst_eid else 'N/A (kv_consumer)'}") - src_bootstrap_addr = f"http://127.0.0.1:{src_bp}" + src_bootstrap_addr = f"http://{src_host}:{src_bp}" results = [] for sz in sizes: for r in range(repeats): row = await measure_one( - client, src_port, dst_port, src_eid, dst_eid, + client, + src_host, src_port, dst_host, dst_port, + src_eid, dst_eid, src_bootstrap_addr=src_bootstrap_addr, input_tokens=sz, rng_seed=sz * 1000 + r, skip_verify=args.skip_verify, @@ -201,7 +206,9 @@ async def main_async(args: argparse.Namespace) -> None: out = { "model": MODEL_PATH, "kv_bytes_per_token": 98304, + "src_host": src_host, "src_port": src_port, + "dst_host": dst_host, "dst_port": dst_port, "config_label": args.label, "raw": results, @@ -221,6 +228,11 @@ async def main_async(args: argparse.Namespace) -> None: def main() -> None: p = argparse.ArgumentParser() + p.add_argument("--src-host", default="127.0.0.1", + help="hostname/IP of the producer (A); use a routable " + "address for inter-node tests") + p.add_argument("--dst-host", default="127.0.0.1", + help="hostname/IP of the consumer (B)") p.add_argument("--src-port", type=int, default=8000, help="vLLM HTTP port on the producer side (A)") p.add_argument("--dst-port", type=int, default=8001, diff --git a/microbench/fresh_setup/start_vllm_single.sh b/microbench/fresh_setup/start_vllm_single.sh new file mode 100755 index 0000000..f254cff --- /dev/null +++ b/microbench/fresh_setup/start_vllm_single.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +# Start ONE vLLM instance with Mooncake kv_connector, for inter-node MB2. +# Run separately on each host (dash1, dash2) before kicking off the bench. +# +# Usage on each host: +# INSTANCE=A GPU=0 PORT=8000 BP=8998 MASTER=29500 ROLE=kv_both \ +# bash microbench/fresh_setup/start_vllm_single.sh start +# bash microbench/fresh_setup/start_vllm_single.sh status +# bash microbench/fresh_setup/start_vllm_single.sh stop +# +# All hosts share cpfs, so the venv at FRESH_ROOT/.venv is single-installed +# and the instrumentation patch is global. Per-instance logs go under +# FRESH_ROOT/mb2_transfer_logs/{INSTANCE}/ which is visible from any host. + +set -eo pipefail + +FRESH_ROOT="/home/admin/cpfs/wjh/agentic-kv-fresh" +VENV="${FRESH_ROOT}/.venv" +MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}" +LOGS_DIR="${LOGS_DIR:-${FRESH_ROOT}/mb2_logs}" +MB2_LOG_ROOT="${FRESH_ROOT}/mb2_transfer_logs" + +INSTANCE="${INSTANCE:-A}" +GPU="${GPU:-0}" +PORT="${PORT:-8000}" +BP="${BP:-8998}" +MASTER="${MASTER:-29500}" +ROLE="${ROLE:-kv_both}" + +mkdir -p "${LOGS_DIR}" "${MB2_LOG_ROOT}/${INSTANCE}" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +INSTRUMENT="${SCRIPT_DIR}/instrument_mooncake.py" + +stop_local() { + pkill -9 -f "vllm serve.*--port ${PORT} " 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 2 +} + +case "${1:-start}" in + stop) + stop_local + # Patch revert is only safe to do when no other instance is using + # the venv — for a shared cpfs venv we leave it applied until all + # instances are stopped. Do it manually with: + # python instrument_mooncake.py --revert + exit 0;; + status) + if curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null 2>&1; then + echo "port ${PORT}: UP" + else + echo "port ${PORT}: DOWN" + fi + exit 0;; + start) ;; + *) echo "Unknown command: $1"; exit 1;; +esac + +stop_local + +source "${VENV}/bin/activate" + +# Apply instrumentation on first launch (it's idempotent / safe to re-apply). +if [[ -f "${INSTRUMENT}" ]]; then + python "${INSTRUMENT}" --apply --venv "${VENV}" 2>&1 || true +fi + +cfg='{"kv_connector":"MooncakeConnector","kv_role":"'${ROLE}'"}' +echo "[mb2-single] launching ${INSTANCE}: gpu=${GPU} port=${PORT} bp=${BP} role=${ROLE}" + +PYTHONHASHSEED=42 \ +VLLM_MOONCAKE_BOOTSTRAP_PORT="${BP}" \ +CUDA_VISIBLE_DEVICES="${GPU}" \ +MASTER_PORT="${MASTER}" \ +MB2_LOG_DIR="${MB2_LOG_ROOT}/${INSTANCE}" \ +nohup vllm serve "${MODEL}" \ + --host 0.0.0.0 --port "${PORT}" \ + --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching \ + --dtype auto --gpu-memory-utilization 0.9 \ + --max-model-len 200000 \ + --kv-transfer-config "${cfg}" \ + --enable-prompt-tokens-details \ + > "${LOGS_DIR}/vllm_${INSTANCE}_$(hostname -s)_gpu${GPU}.log" 2>&1 & +disown + +echo "[mb2-single] waiting for /health on port ${PORT}..." +tries=0 +while ! curl -sf "http://127.0.0.1:${PORT}/health" >/dev/null 2>&1; do + tries=$((tries+1)) + if [ ${tries} -gt 180 ]; then + echo "[mb2-single] FATAL port ${PORT} did not come up in 6 min" + tail -40 "${LOGS_DIR}/vllm_${INSTANCE}_"*"_gpu${GPU}.log" || true + exit 1 + fi + sleep 2 +done +echo "[mb2-single] ${INSTANCE} UP on $(hostname -s):${PORT} (bp ${BP}, gpu ${GPU})"