MB2: working end-to-end intra-node KV transfer microbench

This commit closes the loop on the fresh-venv MB2 path. Three corrections
on top of the previous scaffold made the bench fire successfully on
dash1 GPU 0+1 with kv_both connector roles:

1. Re-target instrumentation patch to vLLM's shipped MooncakeConnector
   (vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py).
   The mooncake-package's own mooncake_connector_v1.py turned out not to
   be the implementation vLLM 0.18.1 loads — the
   '{"kv_connector": "MooncakeConnector"}' config picks up the vLLM-shipped
   one. Patches go at _send_blocks (P-side) and receive_kv_from_single_worker
   (D-side, async, both entry and FINISH branch).

2. /query lives on the mooncake bootstrap port, not the vLLM HTTP port.
   Add --src-bp / --dst-bp args; default 8998 / 8999.

3. kv_transfer_params schema for the vanilla connector:
     do_remote_decode  → {transfer_id}
     do_remote_prefill → {transfer_id, remote_engine_id, remote_bootstrap_addr}
   where remote_bootstrap_addr must include the http:// scheme. The dash0
   smoke_test_migrate_cache.py was written for the patched build, which
   used a different field-name set (remote_host, remote_port,
   remote_block_ids); those are rejected here.

Also discovered (and worked around): vLLM 0.18.1 with kv_role=kv_consumer
raises AttributeError on `self.bootstrap_server` because that attribute
is only assigned conditionally inside `if not self.is_kv_consumer`. We
sidestep by running kv_both for the microbench — transfer mechanics are
identical (same batch_transfer_sync_write call); the role gate only
affects which request types each instance accepts. For §5 strict PD-disagg
baseline we'll need either to fix this bug or front the pair with a
role-aware proxy.

Sanity smoke (3 sizes × 2 repeats, dash1 GPU 0+1, kv_both intra-node):
  input    KV-MiB  send_blocks_ms (P)  receive_kv_ms (D)  client_step2_ms
   512        48          5–23                  7–33               18–91
  2048       192            21                    23                  37
  8192       768            85                    88                 110
=> intra-node bandwidth ~9 GB/s on the actual transfer for 768 MiB,
   which is well below NVLink p2p; likely PCIe-staged. Worth verifying.

Next step (in flight): full sweep 512..128k tokens × 5 repeats with
the per-stage analyzer.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 18:53:25 +08:00
parent 622e0bc04c
commit 91673f1fb8
3 changed files with 112 additions and 78 deletions

View File

@@ -1,26 +1,26 @@
#!/usr/bin/env python3
"""Instrument mooncake_connector_v1.py with per-stage timing for MB2.
"""Instrument vLLM's shipped MooncakeConnector for per-stage MB2 timing.
Adds structured JSONL logging at two key call sites so we can recover
the full breakdown (queueing / prefill / kv transfer / decode) when
combined with the client-side TTFT + finish timestamps:
vLLM 0.18.1 ships its OWN MooncakeConnector at
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py
which is the one --kv-transfer-config '{"kv_connector":"MooncakeConnector"}'
loads. The mooncake-package's mooncake_connector_v1.py is a separate
implementation and not used by vLLM 0.18.1.
send_blocks (P-side, the actual `batch_transfer_sync_write`):
{event=send_blocks, remote_session, total_bytes,
duration_s, t_start_unix, ret, tp_rank}
We add structured JSONL events at two key sites:
receive_kv (D-side, the ZMQ-driven pull request):
{event=receive_kv, path, local_req_ids, remote_req_ids,
duration_s, t_start_unix, tp_rank}
_send_blocks (P-side, the actual batch_transfer_sync_write):
{event=send_blocks, remote_session, total_bytes, duration_s,
t_start_unix, ret, tp_rank, t_log_unix}
Logs go to ``${MB2_LOG_DIR}/mb2_transfer_pid<pid>.jsonl`` (default
``MB2_LOG_DIR=/tmp``). One file per vLLM worker PID so concurrent send +
receive on the same machine don't fight for fsync ordering.
receive_kv_from_single_worker (D-side, async ZMQ-driven pull):
{event=receive_kv_enter / receive_kv_finish, worker_addr,
req_ids, duration_s, t_start_unix, tp_rank, t_log_unix}
Apply / revert pattern follows microbench/connector_tax/cache_sweep/
apply_direct_read_fix.py. All inserted code is bracketed with
``# MB2_INSTRUMENT_START`` / ``# MB2_INSTRUMENT_END`` so revert is a
single text scan.
Logs land in ``$MB2_LOG_DIR/mb2_transfer_pid<pid>.jsonl`` (default
MB2_LOG_DIR=/tmp). One file per PID so multi-worker vllm doesn't fight.
Apply/revert markers: ``# MB2_INSTRUMENT_START/END``.
Usage:
python instrument_mooncake.py --apply [--venv PATH]
@@ -34,11 +34,16 @@ import re
from pathlib import Path
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv-fresh/.venv")
TARGET_REL = "lib/python3.12/site-packages/mooncake/mooncake_connector_v1.py"
TARGET_REL = (
"lib/python3.12/site-packages/vllm/"
"distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
)
START_MARK = "# MB2_INSTRUMENT_START"
END_MARK = "# MB2_INSTRUMENT_END"
HEADER_ANCHOR = "import time\n"
HEADER_INSERT = f"""
{START_MARK}
@@ -47,7 +52,13 @@ import os as _mb2_os
import threading as _mb2_threading
import time as _mb2_time
_MB2_LOG_DIR = _mb2_os.environ.get("MB2_LOG_DIR", "/tmp")
_MB2_LOG_PATH = _mb2_os.path.join(_MB2_LOG_DIR, f"mb2_transfer_pid{{_mb2_os.getpid()}}.jsonl")
try:
_mb2_os.makedirs(_MB2_LOG_DIR, exist_ok=True)
except Exception:
pass
_MB2_LOG_PATH = _mb2_os.path.join(
_MB2_LOG_DIR, f"mb2_transfer_pid{{_mb2_os.getpid()}}.jsonl"
)
_MB2_LOG_FILE = None
_MB2_LOG_LOCK = _mb2_threading.Lock()
def _mb2_log_event(d):
@@ -60,8 +71,7 @@ def _mb2_log_event(d):
{END_MARK}
"""
# Insert HEADER right after the `import time` near the top.
HEADER_ANCHOR = "import time\n"
# ---- Patch 2: _send_blocks ----
SEND_TARGET = """ def _send_blocks(
self,
@@ -97,54 +107,60 @@ SEND_REPLACE = f""" def _send_blocks(
"duration_s": time.perf_counter() - start_time,
"t_start_unix": _mb2_t_start_unix,
"ret": ret_value,
"tp_rank": self.tp_rank}})
"tp_rank": getattr(self, "tp_rank", -1)}})
{END_MARK}"""
RECV_TARGET = """ async def receive_kv(
self, path: str, req_blocks: list[tuple[str, str, list[int]]]
):
local_req_ids, remote_req_ids, block_ids = map(list, zip(*req_blocks))"""
# ---- Patch 3: receive_kv_from_single_worker entry ----
RECV_REPLACE = f""" async def receive_kv(
self, path: str, req_blocks: list[tuple[str, str, list[int]]]
RECV_ENTRY_TARGET = """ async def receive_kv_from_single_worker(
self,
worker_addr: str,
pull_metas: dict[ReqId, PullReqMeta],
):
local_req_ids, remote_req_ids, block_ids = map(list, zip(*req_blocks))
req_ids = set(pull_metas)"""
RECV_ENTRY_REPLACE = f""" async def receive_kv_from_single_worker(
self,
worker_addr: str,
pull_metas: dict[ReqId, PullReqMeta],
):
req_ids = set(pull_metas)
{START_MARK}
_mb2_t_recv_start = time.perf_counter()
_mb2_t_recv_start_unix = time.time()
_mb2_recv_t_start = time.perf_counter()
_mb2_recv_t_start_unix = time.time()
_mb2_log_event({{"event": "receive_kv_enter",
"worker_addr": worker_addr,
"req_ids": [str(r) for r in req_ids],
"t_start_unix": _mb2_recv_t_start_unix,
"tp_rank": getattr(self, "tp_rank", -1)}})
{END_MARK}"""
RECV_DONE_TARGET = """ self.finished_recving_reqs.update(local_req_ids)
# ---- Patch 4: receive_kv_from_single_worker FINISH ----
logger.debug(
"pulling kv_caches for %s finished (local requests: %s)",
remote_req_ids, local_req_ids)"""
RECV_FINISH_TARGET = """ if response.status == MooncakeXferResponseStatus.FINISH:
break"""
RECV_DONE_REPLACE = f""" {START_MARK}
_mb2_log_event({{"event": "receive_kv",
"path": path,
"local_req_ids": list(local_req_ids),
"remote_req_ids": list(remote_req_ids),
"duration_s": time.perf_counter() - _mb2_t_recv_start,
"t_start_unix": _mb2_t_recv_start_unix,
"tp_rank": self.tp_rank}})
{END_MARK}
self.finished_recving_reqs.update(local_req_ids)
logger.debug(
"pulling kv_caches for %s finished (local requests: %s)",
remote_req_ids, local_req_ids)"""
RECV_FINISH_REPLACE = f""" if response.status == MooncakeXferResponseStatus.FINISH:
{START_MARK}
_mb2_log_event({{"event": "receive_kv_finish",
"worker_addr": worker_addr,
"req_ids": [str(r) for r in req_ids],
"duration_s": time.perf_counter() - _mb2_recv_t_start,
"t_start_unix": _mb2_recv_t_start_unix,
"tp_rank": getattr(self, "tp_rank", -1)}})
{END_MARK}
break"""
PATCHES = [
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
("send_blocks", SEND_TARGET, SEND_REPLACE),
("receive_kv (entry)", RECV_TARGET, RECV_REPLACE),
("receive_kv (done)", RECV_DONE_TARGET, RECV_DONE_REPLACE),
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
("_send_blocks", SEND_TARGET, SEND_REPLACE),
("receive_kv (entry)", RECV_ENTRY_TARGET, RECV_ENTRY_REPLACE),
("receive_kv (FINISH)", RECV_FINISH_TARGET, RECV_FINISH_REPLACE),
]
def find_target(path: Path) -> Path:
candidates = [path, DEFAULT_VENV / TARGET_REL]
def find_target(venv_or_path: Path) -> Path:
candidates = [venv_or_path, DEFAULT_VENV / TARGET_REL]
for c in candidates:
if c.is_file():
return c
@@ -152,7 +168,7 @@ def find_target(path: Path) -> Path:
sub = c / TARGET_REL
if sub.is_file():
return sub
raise FileNotFoundError(f"cannot find mooncake_connector_v1.py near {path}")
raise FileNotFoundError(f"cannot find vllm MooncakeConnector at {venv_or_path}")
def is_patched(text: str) -> bool:
@@ -173,7 +189,7 @@ def apply(target: Path) -> None:
)
new = new.replace(src, dst, 1)
target.write_text(new)
print(f"[mb2-instr] applied 4 patches -> {target}")
print(f"[mb2-instr] applied {len(PATCHES)} patches -> {target}")
def revert(target: Path) -> None:
@@ -181,14 +197,11 @@ def revert(target: Path) -> None:
if not is_patched(text):
print(f"[mb2-instr] not patched (nothing to revert): {target}")
return
# Remove everything between START_MARK and END_MARK, including the markers
# and the surrounding whitespace they introduced.
pat = re.compile(
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n?",
flags=re.DOTALL,
)
new = pat.sub("", text)
# Header insert leaves a leading blank line — collapse runs of 3+ newlines.
new = re.sub(r"\n{3,}", "\n\n", new)
target.write_text(new)
print(f"[mb2-instr] reverted: {target}")

View File

@@ -29,8 +29,10 @@ import httpx
MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct"
async def get_engine_id(client: httpx.AsyncClient, port: int) -> str:
r = await client.get(f"http://127.0.0.1:{port}/query")
async def get_engine_id(client: httpx.AsyncClient, bootstrap_port: int) -> str:
"""The /query endpoint lives on the mooncake bootstrap port, not the
vLLM HTTP serving port."""
r = await client.get(f"http://127.0.0.1:{bootstrap_port}/query")
r.raise_for_status()
data = r.json()
return data["0"]["engine_id"]
@@ -74,36 +76,39 @@ async def measure_one(
client: httpx.AsyncClient,
src_port: int, dst_port: int,
src_eid: str, dst_eid: str,
src_bootstrap_addr: str,
input_tokens: int,
rng_seed: int,
skip_verify: bool = False,
) -> dict:
"""Three-step measurement (step 3 is sanity, optional for strict PD-disagg
where the dst is a kv_consumer-only instance that cannot serve a plain
request)."""
request).
vLLM-shipped MooncakeConnector kv_transfer_params schema
(vllm/distributed/.../v1/mooncake/mooncake_connector.py:385):
do_remote_decode : {transfer_id}
do_remote_prefill : {transfer_id, remote_engine_id, remote_bootstrap_addr}
"""
prompt = synth_prompt(rng_seed, input_tokens)
session = uuid.uuid4().hex
transfer_id = uuid.uuid4().hex
t_step1_client = time.time()
t_prefill_s, prefill_resp = await completion(
client, src_port, prompt, max_tokens=1,
kv_transfer_params={
"do_remote_decode": True,
"remote_block_ids": None,
"remote_engine_id": src_eid,
"remote_host": "127.0.0.1",
"remote_port": src_port,
"transfer_id": transfer_id,
},
)
src_kvp = prefill_resp.get("kv_transfer_params") or {}
t_step2_client = time.time()
t_transfer_s, pull_resp = await completion(
client, dst_port, prompt, max_tokens=1,
kv_transfer_params={
"do_remote_prefill": True,
"remote_block_ids": src_kvp.get("remote_block_ids"),
"transfer_id": transfer_id,
"remote_engine_id": src_eid,
"remote_host": "127.0.0.1",
"remote_port": src_kvp.get("remote_port", src_port),
"remote_bootstrap_addr": src_bootstrap_addr,
},
)
t_step2_end_client = time.time()
@@ -144,18 +149,28 @@ async def main_async(args: argparse.Namespace) -> None:
sizes = [int(s) for s in sizes_str.split(",")]
repeats = args.repeats
src_port, dst_port = args.src_port, args.dst_port
src_bp, dst_bp = args.src_bp, args.dst_bp
limits = httpx.Limits(max_connections=10, max_keepalive_connections=10)
async with httpx.AsyncClient(limits=limits, trust_env=False) as client:
src_eid = await get_engine_id(client, src_port)
dst_eid = await get_engine_id(client, dst_port)
print(f"[mb2] src_eid={src_eid[:16]}... dst_eid={dst_eid[:16]}...")
src_eid = await get_engine_id(client, src_bp)
# In strict PD-disagg only the producer binds /query; consumers don't.
try:
dst_eid = await get_engine_id(client, dst_bp)
except Exception as exc:
print(f"[mb2] dst bootstrap ({dst_bp}) unreachable ({exc.__class__.__name__}); "
f"running in strict-PD mode (dst is kv_consumer, no bootstrap).")
dst_eid = None
print(f"[mb2] src_eid={src_eid[:16]}... dst_eid="
f"{(dst_eid[:16] + '...') if dst_eid else 'N/A (kv_consumer)'}")
src_bootstrap_addr = f"http://127.0.0.1:{src_bp}"
results = []
for sz in sizes:
for r in range(repeats):
row = await measure_one(
client, src_port, dst_port, src_eid, dst_eid,
src_bootstrap_addr=src_bootstrap_addr,
input_tokens=sz, rng_seed=sz * 1000 + r,
skip_verify=args.skip_verify,
)
@@ -206,8 +221,14 @@ async def main_async(args: argparse.Namespace) -> None:
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--src-port", type=int, default=8000)
p.add_argument("--dst-port", type=int, default=8001)
p.add_argument("--src-port", type=int, default=8000,
help="vLLM HTTP port on the producer side (A)")
p.add_argument("--dst-port", type=int, default=8001,
help="vLLM HTTP port on the consumer side (B)")
p.add_argument("--src-bp", type=int, default=8998,
help="Mooncake bootstrap port on A (serves /query)")
p.add_argument("--dst-bp", type=int, default=8999,
help="Mooncake bootstrap port on B (serves /query)")
p.add_argument(
"--sizes",
default="512,1024,2048,4096,8192,16384,32768,65536",

View File

@@ -29,8 +29,8 @@ BP_A=8998
BP_B=8999
MASTER_A=29500
MASTER_B=29501
ROLE_A="${ROLE_A:-kv_producer}" # or kv_both
ROLE_B="${ROLE_B:-kv_consumer}" # or kv_both
ROLE_A="${ROLE_A:-kv_both}" # kv_both (works) or kv_producer (hits vllm 0.18.1 bootstrap_server bug on D-side counterpart)
ROLE_B="${ROLE_B:-kv_both}" # kv_both / kv_consumer
MB2_LOG_ROOT="${FRESH_ROOT}/mb2_transfer_logs"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"