MB2 scaffolding: launch script for vLLM pair + KV-transfer-time client
Two new files prepare measurement of T_transfer(KV_size, network_path),
the gap §3.2's PD-disagg cost argument has had since day one.
microbench/fresh_setup/start_vllm_pair.sh
start | status | stop two vLLM 0.18.1 instances on local GPUs (A, B)
with --kv-transfer-config '{"kv_connector":"MooncakeConnector",
"kv_role":"kv_both"}' running off the fresh venv (vanilla wheel +
vanilla mooncake 0.3.11, NOT the dash0 patched build). GPU IDs and
ports are env-overridable so the same script drives the intra-node
pair (GPU_A=0 GPU_B=1 on one host) and the inter-node pair (GPU_A=0
on dash1, GPU_B=0 on dash2 — launched per host separately).
microbench/fresh_setup/mb2_kv_transfer.py
Three-step measurement borrowed from connector_tax/.../smoke_test_
migrate_cache.py:
1. do_remote_decode on A (compute & cache KV; max_tokens=1)
2. do_remote_prefill on B (pull KV from A — this is the timed step)
3. plain completion on B (sanity check: cached_tokens ≈ prompt len)
Sweeps input_tokens ∈ {512, 1k, 2k, 4k, 8k, 16k, 32k, 64k} with 5
repeats each; reports mean / p50 / p90 transfer time and a per-size
raw log. Per-token KV is 98304 B (Qwen3-Coder-30B-A3B), so the upper
end ≈ 6 GiB transfers — within the p99 11.5 GiB range from §2 but
below it (the model's max_model_len 200000 caps the absolute upper).
What we will NOT learn from this design:
- Bandwidth saturation when the system is loaded (single-request bench)
- vLLM-internal scheduling overhead vs pure transfer (the timed step
folds them together — but for the §3.2 argument that's the right
"what does PD-disagg actually pay" number)
Intentionally not committed yet: an orchestrator that loops over
intra-/inter-node configs. We start manual on dash1 intra-node to
verify the measurement is sane before scaling out.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
204
microbench/fresh_setup/mb2_kv_transfer.py
Executable file
204
microbench/fresh_setup/mb2_kv_transfer.py
Executable file
@@ -0,0 +1,204 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""MB2: measure KV transfer time between two vLLM instances over Mooncake.
|
||||||
|
|
||||||
|
Pattern (adapted from microbench/connector_tax/cache_sweep/smoke_test_migrate_cache.py):
|
||||||
|
|
||||||
|
1. Prefill on A: do_remote_decode with max_tokens=1 (A computes & caches KV)
|
||||||
|
2. Pull to B: do_remote_prefill on B with kv_transfer_params from step 1
|
||||||
|
(this is the operation that performs the KV transfer)
|
||||||
|
3. Verify: send a follow-up to B; cached_tokens should equal the
|
||||||
|
prompt length (confirms the KV landed on B)
|
||||||
|
|
||||||
|
We time step 2 — that gives us E2E "transfer + B's prefill check" latency.
|
||||||
|
By sweeping input_length we trace T_transfer(KV_size).
|
||||||
|
|
||||||
|
The follow-up step gives us a sanity check (correctness) but isn't timed.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import statistics
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct"
|
||||||
|
|
||||||
|
|
||||||
|
async def get_engine_id(client: httpx.AsyncClient, port: int) -> str:
|
||||||
|
r = await client.get(f"http://127.0.0.1:{port}/query")
|
||||||
|
r.raise_for_status()
|
||||||
|
data = r.json()
|
||||||
|
return data["0"]["engine_id"]
|
||||||
|
|
||||||
|
|
||||||
|
async def completion(
|
||||||
|
client: httpx.AsyncClient,
|
||||||
|
port: int,
|
||||||
|
prompt_token_ids: list[int],
|
||||||
|
max_tokens: int,
|
||||||
|
kv_transfer_params: dict | None = None,
|
||||||
|
) -> tuple[float, dict]:
|
||||||
|
payload = {
|
||||||
|
"model": MODEL_PATH,
|
||||||
|
"prompt": prompt_token_ids,
|
||||||
|
"max_tokens": max_tokens,
|
||||||
|
"min_tokens": max_tokens if max_tokens == 1 else 1,
|
||||||
|
"temperature": 0.0,
|
||||||
|
"stream": False,
|
||||||
|
}
|
||||||
|
if kv_transfer_params:
|
||||||
|
payload["kv_transfer_params"] = kv_transfer_params
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
r = await client.post(
|
||||||
|
f"http://127.0.0.1:{port}/v1/completions",
|
||||||
|
json=payload, timeout=600.0,
|
||||||
|
)
|
||||||
|
elapsed_s = time.perf_counter() - t0
|
||||||
|
r.raise_for_status()
|
||||||
|
return elapsed_s, r.json()
|
||||||
|
|
||||||
|
|
||||||
|
def synth_prompt(rng_seed: int, n_tokens: int) -> list[int]:
|
||||||
|
"""Deterministic token-id sequence, far enough from special tokens."""
|
||||||
|
import random
|
||||||
|
rng = random.Random(rng_seed)
|
||||||
|
return [rng.randint(100, 150000) for _ in range(n_tokens)]
|
||||||
|
|
||||||
|
|
||||||
|
async def measure_one(
|
||||||
|
client: httpx.AsyncClient,
|
||||||
|
src_port: int, dst_port: int,
|
||||||
|
src_eid: str, dst_eid: str,
|
||||||
|
input_tokens: int,
|
||||||
|
rng_seed: int,
|
||||||
|
) -> dict:
|
||||||
|
prompt = synth_prompt(rng_seed, input_tokens)
|
||||||
|
session = uuid.uuid4().hex
|
||||||
|
# Step 1: prefill on A. max_tokens=1 ensures KV is cached but no real decode work.
|
||||||
|
t_prefill_s, prefill_resp = await completion(
|
||||||
|
client, src_port, prompt, max_tokens=1,
|
||||||
|
kv_transfer_params={
|
||||||
|
"do_remote_decode": True,
|
||||||
|
"remote_block_ids": None,
|
||||||
|
"remote_engine_id": src_eid,
|
||||||
|
"remote_host": "127.0.0.1",
|
||||||
|
"remote_port": src_port,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
src_kvp = prefill_resp.get("kv_transfer_params") or {}
|
||||||
|
# Step 2: pull from A to B (the transfer step we time)
|
||||||
|
t_transfer_s, pull_resp = await completion(
|
||||||
|
client, dst_port, prompt, max_tokens=1,
|
||||||
|
kv_transfer_params={
|
||||||
|
"do_remote_prefill": True,
|
||||||
|
"remote_block_ids": src_kvp.get("remote_block_ids"),
|
||||||
|
"remote_engine_id": src_eid,
|
||||||
|
"remote_host": "127.0.0.1",
|
||||||
|
"remote_port": src_kvp.get("remote_port", src_port),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
# Step 3: follow-up, no kv_transfer_params — should hit B's cache fully
|
||||||
|
t_followup_s, follow_resp = await completion(
|
||||||
|
client, dst_port, prompt, max_tokens=1,
|
||||||
|
)
|
||||||
|
usage = (follow_resp.get("usage") or {})
|
||||||
|
details = usage.get("prompt_tokens_details") or {}
|
||||||
|
cached_followup = details.get("cached_tokens", 0) or usage.get("cached_tokens", 0)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"session": session,
|
||||||
|
"t_prefill_s": t_prefill_s,
|
||||||
|
"t_transfer_s": t_transfer_s,
|
||||||
|
"t_followup_s": t_followup_s,
|
||||||
|
"cached_followup": cached_followup,
|
||||||
|
"ok": cached_followup >= input_tokens * 0.9, # ≥90 % cached = transfer succeeded
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def main_async(args: argparse.Namespace) -> None:
|
||||||
|
sizes_str = args.sizes
|
||||||
|
sizes = [int(s) for s in sizes_str.split(",")]
|
||||||
|
repeats = args.repeats
|
||||||
|
src_port, dst_port = args.src_port, args.dst_port
|
||||||
|
|
||||||
|
limits = httpx.Limits(max_connections=10, max_keepalive_connections=10)
|
||||||
|
async with httpx.AsyncClient(limits=limits, trust_env=False) as client:
|
||||||
|
src_eid = await get_engine_id(client, src_port)
|
||||||
|
dst_eid = await get_engine_id(client, dst_port)
|
||||||
|
print(f"[mb2] src_eid={src_eid[:16]}... dst_eid={dst_eid[:16]}...")
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for sz in sizes:
|
||||||
|
for r in range(repeats):
|
||||||
|
row = await measure_one(
|
||||||
|
client, src_port, dst_port, src_eid, dst_eid,
|
||||||
|
input_tokens=sz, rng_seed=sz * 1000 + r,
|
||||||
|
)
|
||||||
|
print(f" size={sz:>6} rep={r} "
|
||||||
|
f"transfer={row['t_transfer_s']*1000:7.1f}ms "
|
||||||
|
f"followup_cached={row['cached_followup']}/{sz} "
|
||||||
|
f"ok={row['ok']}")
|
||||||
|
results.append(row)
|
||||||
|
|
||||||
|
# Summarise per-size
|
||||||
|
summary = []
|
||||||
|
for sz in sizes:
|
||||||
|
ts = [r["t_transfer_s"] for r in results if r["input_tokens"] == sz and r["ok"]]
|
||||||
|
if not ts:
|
||||||
|
continue
|
||||||
|
summary.append({
|
||||||
|
"input_tokens": sz,
|
||||||
|
"n_ok": len(ts),
|
||||||
|
"transfer_s_mean": statistics.mean(ts),
|
||||||
|
"transfer_s_p50": statistics.median(ts),
|
||||||
|
"transfer_s_p90": statistics.quantiles(ts, n=10)[-1] if len(ts) >= 10 else max(ts),
|
||||||
|
"transfer_s_min": min(ts),
|
||||||
|
"transfer_s_max": max(ts),
|
||||||
|
})
|
||||||
|
|
||||||
|
out = {
|
||||||
|
"model": MODEL_PATH,
|
||||||
|
"kv_bytes_per_token": 98304,
|
||||||
|
"src_port": src_port,
|
||||||
|
"dst_port": dst_port,
|
||||||
|
"config_label": args.label,
|
||||||
|
"raw": results,
|
||||||
|
"summary": summary,
|
||||||
|
}
|
||||||
|
Path(args.out).write_text(json.dumps(out, indent=2))
|
||||||
|
print(f"[mb2] wrote {args.out}")
|
||||||
|
for s in summary:
|
||||||
|
sz = s["input_tokens"]
|
||||||
|
kv_mib = sz * 98304 / 1024 / 1024
|
||||||
|
print(f" {sz:>6} tok ({kv_mib:>7.1f} MiB KV): "
|
||||||
|
f"mean {s['transfer_s_mean']*1000:7.1f} ms · "
|
||||||
|
f"p50 {s['transfer_s_p50']*1000:7.1f} · "
|
||||||
|
f"p90 {s['transfer_s_p90']*1000:7.1f} "
|
||||||
|
f"(n_ok={s['n_ok']})")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
p = argparse.ArgumentParser()
|
||||||
|
p.add_argument("--src-port", type=int, default=8000)
|
||||||
|
p.add_argument("--dst-port", type=int, default=8001)
|
||||||
|
p.add_argument(
|
||||||
|
"--sizes",
|
||||||
|
default="512,1024,2048,4096,8192,16384,32768,65536",
|
||||||
|
help="Comma-separated input_token sizes to sweep",
|
||||||
|
)
|
||||||
|
p.add_argument("--repeats", type=int, default=5)
|
||||||
|
p.add_argument("--label", default="intra-node",
|
||||||
|
help="Label written into the output (e.g. intra-node / inter-node)")
|
||||||
|
p.add_argument("--out", default="mb2_result.json")
|
||||||
|
args = p.parse_args()
|
||||||
|
asyncio.run(main_async(args))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
105
microbench/fresh_setup/start_vllm_pair.sh
Executable file
105
microbench/fresh_setup/start_vllm_pair.sh
Executable file
@@ -0,0 +1,105 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# Start 2 vLLM instances with Mooncake kv_connector (kv_both) for MB2.
|
||||||
|
#
|
||||||
|
# Default config: both on local GPU 0 and 1 (intra-node A/B test).
|
||||||
|
# Override via GPU_A / GPU_B / HOST_A / HOST_B env vars.
|
||||||
|
#
|
||||||
|
# This uses the FRESH venv at /home/admin/cpfs/wjh/agentic-kv-fresh/.venv
|
||||||
|
# (vanilla vllm 0.18.1 + vanilla mooncake-transfer-engine 0.3.11), NOT
|
||||||
|
# the dash0 patched build.
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# GPU_A=0 GPU_B=1 bash microbench/fresh_setup/start_vllm_pair.sh
|
||||||
|
# bash microbench/fresh_setup/start_vllm_pair.sh status
|
||||||
|
# bash microbench/fresh_setup/start_vllm_pair.sh stop
|
||||||
|
|
||||||
|
set -eo pipefail
|
||||||
|
|
||||||
|
FRESH_ROOT="/home/admin/cpfs/wjh/agentic-kv-fresh"
|
||||||
|
VENV="${FRESH_ROOT}/.venv"
|
||||||
|
MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}"
|
||||||
|
LOGS_DIR="${LOGS_DIR:-${FRESH_ROOT}/mb2_logs}"
|
||||||
|
mkdir -p "${LOGS_DIR}"
|
||||||
|
|
||||||
|
GPU_A="${GPU_A:-0}"
|
||||||
|
GPU_B="${GPU_B:-1}"
|
||||||
|
PORT_A=8000
|
||||||
|
PORT_B=8001
|
||||||
|
BP_A=8998
|
||||||
|
BP_B=8999
|
||||||
|
MASTER_A=29500
|
||||||
|
MASTER_B=29501
|
||||||
|
|
||||||
|
stop_all() {
|
||||||
|
pkill -9 -f "vllm serve" 2>/dev/null || true
|
||||||
|
pkill -9 -f "EngineCore" 2>/dev/null || true
|
||||||
|
sleep 2
|
||||||
|
}
|
||||||
|
|
||||||
|
case "${1:-start}" in
|
||||||
|
stop)
|
||||||
|
stop_all
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
status)
|
||||||
|
for p in "${PORT_A}" "${PORT_B}"; do
|
||||||
|
if curl -sf "http://127.0.0.1:${p}/health" >/dev/null 2>&1; then
|
||||||
|
echo "port ${p}: UP"
|
||||||
|
else
|
||||||
|
echo "port ${p}: DOWN"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
start)
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "Unknown command: $1"; exit 1;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
stop_all
|
||||||
|
|
||||||
|
source "${VENV}/bin/activate"
|
||||||
|
|
||||||
|
launch() {
|
||||||
|
local idx="$1" gpu="$2" port="$3" bp="$4" master="$5"
|
||||||
|
echo "[mb2] launching instance ${idx} on GPU ${gpu}, port ${port}, bp ${bp}"
|
||||||
|
PYTHONHASHSEED=42 \
|
||||||
|
VLLM_MOONCAKE_BOOTSTRAP_PORT="${bp}" \
|
||||||
|
CUDA_VISIBLE_DEVICES="${gpu}" \
|
||||||
|
MASTER_PORT="${master}" \
|
||||||
|
nohup vllm serve "${MODEL}" \
|
||||||
|
--host 0.0.0.0 --port "${port}" \
|
||||||
|
--tensor-parallel-size 1 \
|
||||||
|
--trust-remote-code --enable-prefix-caching \
|
||||||
|
--dtype auto --gpu-memory-utilization 0.9 \
|
||||||
|
--max-model-len 200000 \
|
||||||
|
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
|
||||||
|
--enable-prompt-tokens-details \
|
||||||
|
> "${LOGS_DIR}/vllm_${idx}_gpu${gpu}.log" 2>&1 &
|
||||||
|
disown
|
||||||
|
}
|
||||||
|
|
||||||
|
launch A "${GPU_A}" "${PORT_A}" "${BP_A}" "${MASTER_A}"
|
||||||
|
sleep 3
|
||||||
|
launch B "${GPU_B}" "${PORT_B}" "${BP_B}" "${MASTER_B}"
|
||||||
|
|
||||||
|
echo "[mb2] waiting for both /health endpoints..."
|
||||||
|
for port in "${PORT_A}" "${PORT_B}"; do
|
||||||
|
tries=0
|
||||||
|
while ! curl -sf "http://127.0.0.1:${port}/health" >/dev/null 2>&1; do
|
||||||
|
tries=$((tries+1))
|
||||||
|
if [ ${tries} -gt 180 ]; then
|
||||||
|
echo "[mb2] FATAL port ${port} did not come up in 6 min"
|
||||||
|
tail -40 "${LOGS_DIR}/vllm_"*"_gpu"*".log" || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
sleep 2
|
||||||
|
done
|
||||||
|
echo " port=${port} ready"
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "[mb2] both instances UP"
|
||||||
|
echo " A: 127.0.0.1:${PORT_A} (GPU ${GPU_A}, bp ${BP_A})"
|
||||||
|
echo " B: 127.0.0.1:${PORT_B} (GPU ${GPU_B}, bp ${BP_B})"
|
||||||
|
echo " logs: ${LOGS_DIR}"
|
||||||
Reference in New Issue
Block a user