Files
agentic-kvc/microbench/fresh_setup/instrument_mooncake.py
Gahow Wang 1262c9c22e Migration transfer-cost study: KV transfer is slow on busy GPUs
MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at
~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into
~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s)
+ ~45% control-plane GIL starvation during long prefills. Reproduced on a
fresh upstream venv (byte-identical transfer path) -> upstream/hardware
inherent, not our patch. Layerwise is the wrong lever; the tax is structural
on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6,
instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-29 11:53:01 +08:00

285 lines
10 KiB
Python

#!/usr/bin/env python3
"""Instrument vLLM's shipped MooncakeConnector for per-stage MB2 timing.
vLLM 0.18.1 ships its OWN MooncakeConnector at
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py
which is the one --kv-transfer-config '{"kv_connector":"MooncakeConnector"}'
loads. The mooncake-package's mooncake_connector_v1.py is a separate
implementation and not used by vLLM 0.18.1.
We add structured JSONL events at two key sites:
_send_blocks (P-side, the actual batch_transfer_sync_write):
{event=send_blocks, remote_session, total_bytes, duration_s,
t_start_unix, ret, tp_rank, t_log_unix}
receive_kv_from_single_worker (D-side, async ZMQ-driven pull):
{event=receive_kv_enter / receive_kv_finish, worker_addr,
req_ids, duration_s, t_start_unix, tp_rank, t_log_unix}
Logs land in ``$MB2_LOG_DIR/mb2_transfer_pid<pid>.jsonl`` (default
MB2_LOG_DIR=/tmp). One file per PID so multi-worker vllm doesn't fight.
Apply/revert markers: ``# MB2_INSTRUMENT_START/END``.
Usage:
python instrument_mooncake.py --apply [--venv PATH]
python instrument_mooncake.py --revert [--venv PATH]
python instrument_mooncake.py --check [--venv PATH]
"""
from __future__ import annotations
import argparse
import re
from pathlib import Path
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv-fresh/.venv")
TARGET_REL = (
"lib/python3.12/site-packages/vllm/"
"distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
)
START_MARK = "# MB2_INSTRUMENT_START"
END_MARK = "# MB2_INSTRUMENT_END"
HEADER_ANCHOR = "import time\n"
HEADER_INSERT = f"""
{START_MARK}
import json as _mb2_json
import os as _mb2_os
import threading as _mb2_threading
import time as _mb2_time
_MB2_LOG_DIR = _mb2_os.environ.get("MB2_LOG_DIR", "/tmp")
try:
_mb2_os.makedirs(_MB2_LOG_DIR, exist_ok=True)
except Exception:
pass
_MB2_LOG_PATH = _mb2_os.path.join(
_MB2_LOG_DIR, f"mb2_transfer_pid{{_mb2_os.getpid()}}.jsonl"
)
_MB2_LOG_FILE = None
_MB2_LOG_LOCK = _mb2_threading.Lock()
def _mb2_log_event(d):
global _MB2_LOG_FILE
d.setdefault("t_log_unix", _mb2_time.time())
if _MB2_LOG_FILE is None:
_MB2_LOG_FILE = open(_MB2_LOG_PATH, "a", buffering=1)
with _MB2_LOG_LOCK:
_MB2_LOG_FILE.write(_mb2_json.dumps(d) + "\\n")
{END_MARK}
"""
# ---- Patch 2: _send_blocks ----
SEND_TARGET = """ def _send_blocks(
self,
remote_session: str,
src_ptrs: list[int],
dst_ptrs: list[int],
lengths: list[int],
) -> int:
start_time = time.perf_counter()
ret_value = self.engine.batch_transfer_sync_write(
remote_session, src_ptrs, dst_ptrs, lengths
)"""
SEND_REPLACE = f""" def _send_blocks(
self,
remote_session: str,
src_ptrs: list[int],
dst_ptrs: list[int],
lengths: list[int],
) -> int:
{START_MARK}
_mb2_total_bytes = sum(lengths)
_mb2_t_start_unix = time.time()
{END_MARK}
start_time = time.perf_counter()
ret_value = self.engine.batch_transfer_sync_write(
remote_session, src_ptrs, dst_ptrs, lengths
)
{START_MARK}
_mb2_log_event({{"event": "send_blocks",
"remote_session": remote_session,
"total_bytes": _mb2_total_bytes,
"duration_s": time.perf_counter() - start_time,
"t_start_unix": _mb2_t_start_unix,
"ret": ret_value,
"tp_rank": getattr(self, "tp_rank", -1)}})
{END_MARK}"""
# ---- Patch 3: receive_kv_from_single_worker entry ----
RECV_ENTRY_TARGET = """ async def receive_kv_from_single_worker(
self,
worker_addr: str,
pull_metas: dict[ReqId, PullReqMeta],
):
req_ids = set(pull_metas)"""
RECV_ENTRY_REPLACE = f""" async def receive_kv_from_single_worker(
self,
worker_addr: str,
pull_metas: dict[ReqId, PullReqMeta],
):
req_ids = set(pull_metas)
{START_MARK}
_mb2_recv_t_start = time.perf_counter()
_mb2_recv_t_start_unix = time.time()
_mb2_log_event({{"event": "receive_kv_enter",
"worker_addr": worker_addr,
"req_ids": [str(r) for r in req_ids],
"t_start_unix": _mb2_recv_t_start_unix,
"tp_rank": getattr(self, "tp_rank", -1)}})
{END_MARK}"""
# ---- Patch 4: receive_kv_from_single_worker FINISH ----
RECV_FINISH_TARGET = """ if response.status == MooncakeXferResponseStatus.FINISH:
break"""
RECV_FINISH_REPLACE = f""" if response.status == MooncakeXferResponseStatus.FINISH:
{START_MARK}
_mb2_log_event({{"event": "receive_kv_finish",
"worker_addr": worker_addr,
"req_ids": [str(r) for r in req_ids],
"duration_s": time.perf_counter() - _mb2_recv_t_start,
"t_start_unix": _mb2_recv_t_start_unix,
"tp_rank": getattr(self, "tp_rank", -1)}})
{END_MARK}
break"""
# ---- Patch 5: send_kv_to_decode entry (P-side, producer receives pull req) ----
SEND_ENTRY_TARGET = """ async def send_kv_to_decode(
self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata
):
pending_reqs: dict[ReqId, SendBlockMeta] = {}"""
SEND_ENTRY_REPLACE = f""" async def send_kv_to_decode(
self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata
):
pending_reqs: dict[ReqId, SendBlockMeta] = {{}}
{START_MARK}
try:
_mb2_log_event({{"event": "send_kv_to_decode_enter",
"d_req_ids": [str(r) for r in meta.req_blocks],
"t_start_unix": _mb2_time.time(),
"tp_rank": getattr(self, "tp_rank", -1)}})
except Exception:
pass
{END_MARK}"""
# ---- Patch 6: wait_and_ret ready-wait timing (P-side, src KV commit wait) ----
READY_WAIT_TARGET = """ async def wait_and_ret(
d_req_id: ReqId, send_meta: SendBlockMeta
) -> tuple[ReqId, SendBlockMeta]:
await send_meta.ready.wait()
return d_req_id, send_meta"""
READY_WAIT_REPLACE = f""" async def wait_and_ret(
d_req_id: ReqId, send_meta: SendBlockMeta
) -> tuple[ReqId, SendBlockMeta]:
{START_MARK}
_mb2_rw_start = _mb2_time.perf_counter()
_mb2_rw_start_unix = _mb2_time.time()
_mb2_rw_already = send_meta.ready.is_set()
{END_MARK}
await send_meta.ready.wait()
{START_MARK}
try:
_mb2_log_event({{"event": "ready_wait",
"d_req_id": str(d_req_id),
"transfer_id": str(getattr(send_meta, "transfer_id", "")),
"ready_already_set": bool(_mb2_rw_already),
"ready_wait_s": _mb2_time.perf_counter() - _mb2_rw_start,
"t_start_unix": _mb2_rw_start_unix,
"tp_rank": getattr(self, "tp_rank", -1)}})
except Exception:
pass
{END_MARK}
return d_req_id, send_meta"""
PATCHES = [
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
("_send_blocks", SEND_TARGET, SEND_REPLACE),
("receive_kv (entry)", RECV_ENTRY_TARGET, RECV_ENTRY_REPLACE),
("receive_kv (FINISH)", RECV_FINISH_TARGET, RECV_FINISH_REPLACE),
("send_kv (entry)", SEND_ENTRY_TARGET, SEND_ENTRY_REPLACE),
("ready_wait", READY_WAIT_TARGET, READY_WAIT_REPLACE),
]
def find_target(venv_or_path: Path) -> Path:
candidates = [venv_or_path, DEFAULT_VENV / TARGET_REL]
for c in candidates:
if c.is_file():
return c
if c.is_dir():
sub = c / TARGET_REL
if sub.is_file():
return sub
raise FileNotFoundError(f"cannot find vllm MooncakeConnector at {venv_or_path}")
def is_patched(text: str) -> bool:
return START_MARK in text
def apply(target: Path) -> None:
text = target.read_text()
if is_patched(text):
print(f"[mb2-instr] already patched: {target}")
return
new = text
for name, src, dst in PATCHES:
if src not in new:
raise RuntimeError(
f"patch {name!r}: anchor not found in {target}. "
"File may have been refactored."
)
new = new.replace(src, dst, 1)
target.write_text(new)
print(f"[mb2-instr] applied {len(PATCHES)} patches -> {target}")
def revert(target: Path) -> None:
text = target.read_text()
if not is_patched(text):
print(f"[mb2-instr] not patched (nothing to revert): {target}")
return
pat = re.compile(
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n?",
flags=re.DOTALL,
)
new = pat.sub("", text)
new = re.sub(r"\n{3,}", "\n\n", new)
target.write_text(new)
print(f"[mb2-instr] reverted: {target}")
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--apply", action="store_true")
p.add_argument("--revert", action="store_true")
p.add_argument("--check", action="store_true")
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
args = p.parse_args()
target = find_target(args.venv)
if args.apply:
apply(target)
elif args.revert:
revert(target)
elif args.check:
text = target.read_text()
print(f"[mb2-instr] {'PATCHED' if is_patched(text) else 'CLEAN'}: {target}")
else:
p.error("specify --apply / --revert / --check")
if __name__ == "__main__":
main()