MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at ~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into ~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s) + ~45% control-plane GIL starvation during long prefills. Reproduced on a fresh upstream venv (byte-identical transfer path) -> upstream/hardware inherent, not our patch. Layerwise is the wrong lever; the tax is structural on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6, instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
285 lines
10 KiB
Python
285 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Instrument vLLM's shipped MooncakeConnector for per-stage MB2 timing.
|
|
|
|
vLLM 0.18.1 ships its OWN MooncakeConnector at
|
|
vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py
|
|
which is the one --kv-transfer-config '{"kv_connector":"MooncakeConnector"}'
|
|
loads. The mooncake-package's mooncake_connector_v1.py is a separate
|
|
implementation and not used by vLLM 0.18.1.
|
|
|
|
We add structured JSONL events at two key sites:
|
|
|
|
_send_blocks (P-side, the actual batch_transfer_sync_write):
|
|
{event=send_blocks, remote_session, total_bytes, duration_s,
|
|
t_start_unix, ret, tp_rank, t_log_unix}
|
|
|
|
receive_kv_from_single_worker (D-side, async ZMQ-driven pull):
|
|
{event=receive_kv_enter / receive_kv_finish, worker_addr,
|
|
req_ids, duration_s, t_start_unix, tp_rank, t_log_unix}
|
|
|
|
Logs land in ``$MB2_LOG_DIR/mb2_transfer_pid<pid>.jsonl`` (default
|
|
MB2_LOG_DIR=/tmp). One file per PID so multi-worker vllm doesn't fight.
|
|
|
|
Apply/revert markers: ``# MB2_INSTRUMENT_START/END``.
|
|
|
|
Usage:
|
|
python instrument_mooncake.py --apply [--venv PATH]
|
|
python instrument_mooncake.py --revert [--venv PATH]
|
|
python instrument_mooncake.py --check [--venv PATH]
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
|
|
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv-fresh/.venv")
|
|
TARGET_REL = (
|
|
"lib/python3.12/site-packages/vllm/"
|
|
"distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
|
|
)
|
|
|
|
START_MARK = "# MB2_INSTRUMENT_START"
|
|
END_MARK = "# MB2_INSTRUMENT_END"
|
|
|
|
HEADER_ANCHOR = "import time\n"
|
|
|
|
HEADER_INSERT = f"""
|
|
|
|
{START_MARK}
|
|
import json as _mb2_json
|
|
import os as _mb2_os
|
|
import threading as _mb2_threading
|
|
import time as _mb2_time
|
|
_MB2_LOG_DIR = _mb2_os.environ.get("MB2_LOG_DIR", "/tmp")
|
|
try:
|
|
_mb2_os.makedirs(_MB2_LOG_DIR, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
_MB2_LOG_PATH = _mb2_os.path.join(
|
|
_MB2_LOG_DIR, f"mb2_transfer_pid{{_mb2_os.getpid()}}.jsonl"
|
|
)
|
|
_MB2_LOG_FILE = None
|
|
_MB2_LOG_LOCK = _mb2_threading.Lock()
|
|
def _mb2_log_event(d):
|
|
global _MB2_LOG_FILE
|
|
d.setdefault("t_log_unix", _mb2_time.time())
|
|
if _MB2_LOG_FILE is None:
|
|
_MB2_LOG_FILE = open(_MB2_LOG_PATH, "a", buffering=1)
|
|
with _MB2_LOG_LOCK:
|
|
_MB2_LOG_FILE.write(_mb2_json.dumps(d) + "\\n")
|
|
{END_MARK}
|
|
"""
|
|
|
|
# ---- Patch 2: _send_blocks ----
|
|
|
|
SEND_TARGET = """ def _send_blocks(
|
|
self,
|
|
remote_session: str,
|
|
src_ptrs: list[int],
|
|
dst_ptrs: list[int],
|
|
lengths: list[int],
|
|
) -> int:
|
|
start_time = time.perf_counter()
|
|
ret_value = self.engine.batch_transfer_sync_write(
|
|
remote_session, src_ptrs, dst_ptrs, lengths
|
|
)"""
|
|
|
|
SEND_REPLACE = f""" def _send_blocks(
|
|
self,
|
|
remote_session: str,
|
|
src_ptrs: list[int],
|
|
dst_ptrs: list[int],
|
|
lengths: list[int],
|
|
) -> int:
|
|
{START_MARK}
|
|
_mb2_total_bytes = sum(lengths)
|
|
_mb2_t_start_unix = time.time()
|
|
{END_MARK}
|
|
start_time = time.perf_counter()
|
|
ret_value = self.engine.batch_transfer_sync_write(
|
|
remote_session, src_ptrs, dst_ptrs, lengths
|
|
)
|
|
{START_MARK}
|
|
_mb2_log_event({{"event": "send_blocks",
|
|
"remote_session": remote_session,
|
|
"total_bytes": _mb2_total_bytes,
|
|
"duration_s": time.perf_counter() - start_time,
|
|
"t_start_unix": _mb2_t_start_unix,
|
|
"ret": ret_value,
|
|
"tp_rank": getattr(self, "tp_rank", -1)}})
|
|
{END_MARK}"""
|
|
|
|
# ---- Patch 3: receive_kv_from_single_worker entry ----
|
|
|
|
RECV_ENTRY_TARGET = """ async def receive_kv_from_single_worker(
|
|
self,
|
|
worker_addr: str,
|
|
pull_metas: dict[ReqId, PullReqMeta],
|
|
):
|
|
req_ids = set(pull_metas)"""
|
|
|
|
RECV_ENTRY_REPLACE = f""" async def receive_kv_from_single_worker(
|
|
self,
|
|
worker_addr: str,
|
|
pull_metas: dict[ReqId, PullReqMeta],
|
|
):
|
|
req_ids = set(pull_metas)
|
|
{START_MARK}
|
|
_mb2_recv_t_start = time.perf_counter()
|
|
_mb2_recv_t_start_unix = time.time()
|
|
_mb2_log_event({{"event": "receive_kv_enter",
|
|
"worker_addr": worker_addr,
|
|
"req_ids": [str(r) for r in req_ids],
|
|
"t_start_unix": _mb2_recv_t_start_unix,
|
|
"tp_rank": getattr(self, "tp_rank", -1)}})
|
|
{END_MARK}"""
|
|
|
|
# ---- Patch 4: receive_kv_from_single_worker FINISH ----
|
|
|
|
RECV_FINISH_TARGET = """ if response.status == MooncakeXferResponseStatus.FINISH:
|
|
break"""
|
|
|
|
RECV_FINISH_REPLACE = f""" if response.status == MooncakeXferResponseStatus.FINISH:
|
|
{START_MARK}
|
|
_mb2_log_event({{"event": "receive_kv_finish",
|
|
"worker_addr": worker_addr,
|
|
"req_ids": [str(r) for r in req_ids],
|
|
"duration_s": time.perf_counter() - _mb2_recv_t_start,
|
|
"t_start_unix": _mb2_recv_t_start_unix,
|
|
"tp_rank": getattr(self, "tp_rank", -1)}})
|
|
{END_MARK}
|
|
break"""
|
|
|
|
# ---- Patch 5: send_kv_to_decode entry (P-side, producer receives pull req) ----
|
|
|
|
SEND_ENTRY_TARGET = """ async def send_kv_to_decode(
|
|
self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata
|
|
):
|
|
pending_reqs: dict[ReqId, SendBlockMeta] = {}"""
|
|
|
|
SEND_ENTRY_REPLACE = f""" async def send_kv_to_decode(
|
|
self, identity: bytes, sock: zmq.asyncio.Socket, meta: MooncakeXferMetadata
|
|
):
|
|
pending_reqs: dict[ReqId, SendBlockMeta] = {{}}
|
|
{START_MARK}
|
|
try:
|
|
_mb2_log_event({{"event": "send_kv_to_decode_enter",
|
|
"d_req_ids": [str(r) for r in meta.req_blocks],
|
|
"t_start_unix": _mb2_time.time(),
|
|
"tp_rank": getattr(self, "tp_rank", -1)}})
|
|
except Exception:
|
|
pass
|
|
{END_MARK}"""
|
|
|
|
# ---- Patch 6: wait_and_ret ready-wait timing (P-side, src KV commit wait) ----
|
|
|
|
READY_WAIT_TARGET = """ async def wait_and_ret(
|
|
d_req_id: ReqId, send_meta: SendBlockMeta
|
|
) -> tuple[ReqId, SendBlockMeta]:
|
|
await send_meta.ready.wait()
|
|
return d_req_id, send_meta"""
|
|
|
|
READY_WAIT_REPLACE = f""" async def wait_and_ret(
|
|
d_req_id: ReqId, send_meta: SendBlockMeta
|
|
) -> tuple[ReqId, SendBlockMeta]:
|
|
{START_MARK}
|
|
_mb2_rw_start = _mb2_time.perf_counter()
|
|
_mb2_rw_start_unix = _mb2_time.time()
|
|
_mb2_rw_already = send_meta.ready.is_set()
|
|
{END_MARK}
|
|
await send_meta.ready.wait()
|
|
{START_MARK}
|
|
try:
|
|
_mb2_log_event({{"event": "ready_wait",
|
|
"d_req_id": str(d_req_id),
|
|
"transfer_id": str(getattr(send_meta, "transfer_id", "")),
|
|
"ready_already_set": bool(_mb2_rw_already),
|
|
"ready_wait_s": _mb2_time.perf_counter() - _mb2_rw_start,
|
|
"t_start_unix": _mb2_rw_start_unix,
|
|
"tp_rank": getattr(self, "tp_rank", -1)}})
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
return d_req_id, send_meta"""
|
|
|
|
PATCHES = [
|
|
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
|
|
("_send_blocks", SEND_TARGET, SEND_REPLACE),
|
|
("receive_kv (entry)", RECV_ENTRY_TARGET, RECV_ENTRY_REPLACE),
|
|
("receive_kv (FINISH)", RECV_FINISH_TARGET, RECV_FINISH_REPLACE),
|
|
("send_kv (entry)", SEND_ENTRY_TARGET, SEND_ENTRY_REPLACE),
|
|
("ready_wait", READY_WAIT_TARGET, READY_WAIT_REPLACE),
|
|
]
|
|
|
|
|
|
def find_target(venv_or_path: Path) -> Path:
|
|
candidates = [venv_or_path, DEFAULT_VENV / TARGET_REL]
|
|
for c in candidates:
|
|
if c.is_file():
|
|
return c
|
|
if c.is_dir():
|
|
sub = c / TARGET_REL
|
|
if sub.is_file():
|
|
return sub
|
|
raise FileNotFoundError(f"cannot find vllm MooncakeConnector at {venv_or_path}")
|
|
|
|
|
|
def is_patched(text: str) -> bool:
|
|
return START_MARK in text
|
|
|
|
|
|
def apply(target: Path) -> None:
|
|
text = target.read_text()
|
|
if is_patched(text):
|
|
print(f"[mb2-instr] already patched: {target}")
|
|
return
|
|
new = text
|
|
for name, src, dst in PATCHES:
|
|
if src not in new:
|
|
raise RuntimeError(
|
|
f"patch {name!r}: anchor not found in {target}. "
|
|
"File may have been refactored."
|
|
)
|
|
new = new.replace(src, dst, 1)
|
|
target.write_text(new)
|
|
print(f"[mb2-instr] applied {len(PATCHES)} patches -> {target}")
|
|
|
|
|
|
def revert(target: Path) -> None:
|
|
text = target.read_text()
|
|
if not is_patched(text):
|
|
print(f"[mb2-instr] not patched (nothing to revert): {target}")
|
|
return
|
|
pat = re.compile(
|
|
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n?",
|
|
flags=re.DOTALL,
|
|
)
|
|
new = pat.sub("", text)
|
|
new = re.sub(r"\n{3,}", "\n\n", new)
|
|
target.write_text(new)
|
|
print(f"[mb2-instr] reverted: {target}")
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--apply", action="store_true")
|
|
p.add_argument("--revert", action="store_true")
|
|
p.add_argument("--check", action="store_true")
|
|
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
|
|
args = p.parse_args()
|
|
target = find_target(args.venv)
|
|
if args.apply:
|
|
apply(target)
|
|
elif args.revert:
|
|
revert(target)
|
|
elif args.check:
|
|
text = target.read_text()
|
|
print(f"[mb2-instr] {'PATCHED' if is_patched(text) else 'CLEAN'}: {target}")
|
|
else:
|
|
p.error("specify --apply / --revert / --check")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|