Files
agentic-kvc/microbench/patches/apply_patches.py
Gahow Wang 72790ae6c1 PD-sep server-side profiling: vLLM patches + per-request breakdown
Instrumentation patches (microbench/patches/):
  - pd_profile.py: shared event emitter (VLLM_PD_PROFILE_LOG env var)
  - apply_patches.py: idempotent patch installer for mooncake_connector.py
    and scheduler.py, marks insertions with # PD_PROFILE_PATCH
  - analyze_events.py: joins per-process JSONL event logs by transfer_id
    into per-request phase durations

Seven events captured per request:
  D_get_num_matched → P_zmq_received → P_prefill_done →
  P_rdma_start → P_rdma_end → D_recv_complete → D_request_promoted

Driver fix (microbench/lifecycle/driver.py):
  seed_prefix_cache now sends via the proxy URL so P and D both cache
  the seeded prefix with matching block hashes. Previously seeding D
  directly produced different block hashes than the proxy-routed
  measurement requests, making incremental transfer impossible.

Real breakdown (fig_breakdown_real.png, server_breakdown.csv, n=93):
  prefill_compute  620 ms median (95% of overhead)
  rdma_transfer     42 ms median (~71 Gbps effective)
  other overhead    10 ms median (dispatch + params + signal + promote)

Mooncake transfer is NOT the bottleneck. Even with bulk RDMA the
transfer cost is <10% of prefill cost for Qwen3-30B-A3B on H20.
2026-05-26 13:59:09 +08:00

286 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Apply (or revert) PD-sep profiling instrumentation to vLLM.
Inserts time.perf_counter_ns() emit calls into mooncake_connector.py at:
- get_num_new_matched_tokens() [D: cache hit, delta]
- update_state_after_alloc() [D: blocks allocated]
- send_kv_to_decode() [P: zmq from D arrived]
- record_send_reqs() [P: prefill ready event set]
- _send_blocks() [P: RDMA start/end + bytes]
- process_pulling_result() [D: RDMA recv complete]
- request_finished() [common: request lifecycle end]
And into scheduler.py at:
- _try_promote_blocked_waiting_request() [D: request promoted]
Marker comment "# PD_PROFILE_PATCH" is added so revert can locate inserts.
Usage:
python apply_patches.py [--apply | --revert] [--vllm-root PATH]
The patches are idempotent: --apply on already-patched code is a no-op.
"""
import argparse
import re
import sys
from pathlib import Path
MARKER = "# PD_PROFILE_PATCH"
# Default location: venv vLLM 0.18.1 on dash0
DEFAULT_VLLM_ROOT = Path.home() / "agentic-kv/.venv/lib/python3.12/site-packages/vllm"
def _find_block(text: str, signature: str) -> tuple[int, int] | None:
"""Find the start/end line numbers of a function or method definition."""
lines = text.splitlines()
for i, line in enumerate(lines):
if signature in line:
# find indent
stripped = line.lstrip()
indent = len(line) - len(stripped)
# find function end: next line with <= indent and not blank
for j in range(i + 1, len(lines)):
next_line = lines[j]
if next_line.strip() == "":
continue
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent <= indent and not next_line.lstrip().startswith("#"):
return i, j
return i, len(lines)
return None
def _insert_after_line(text: str, line_no: int, snippet: str) -> str:
"""Insert snippet after line_no (1-indexed). snippet should not include trailing newline."""
lines = text.splitlines()
lines.insert(line_no, snippet)
return "\n".join(lines) + ("\n" if text.endswith("\n") else "")
def _already_patched(text: str) -> bool:
return MARKER in text
def _revert(text: str) -> str:
"""Remove all lines containing the marker."""
lines = text.splitlines()
out = [l for l in lines if MARKER not in l]
return "\n".join(out) + ("\n" if text.endswith("\n") else "")
# ─── Patch definitions ──────────────────────────────────────────────────────
def patch_mooncake_connector(text: str) -> str:
"""Apply patches to mooncake_connector.py."""
if _already_patched(text):
print(" mooncake_connector.py already patched, skipping")
return text
# 1. Add import at top (after first 'import' block)
import_snippet = (
"from vllm.distributed.kv_transfer.kv_connector.v1.mooncake "
"import _pd_profile as _pdp " + MARKER
)
# Insert after the last 'import' statement near the top
lines = text.splitlines()
last_import_line = 0
for i, line in enumerate(lines[:80]):
if line.startswith("import ") or line.startswith("from "):
last_import_line = i
lines.insert(last_import_line + 1, import_snippet)
text = "\n".join(lines) + "\n"
# 2. Patch get_num_new_matched_tokens (D side, scheduler)
# Inject right before "return count, True" and "return 0, False"
text = re.sub(
r"(\n if count > 0:\n return count, True\n)",
r"\n if count > 0:\n _pdp.emit('D_get_num_matched', "
r"req_id=request.request_id, role='kv_consumer', "
r"num_local_cached=num_computed_tokens, prompt_tokens=len(token_ids), "
r"remote_total=remote_total, delta_to_pull=count) " + MARKER + "\n"
r" return count, True\n",
text, count=1,
)
# 3. Patch update_state_after_alloc (D side) — add emit for blocks allocated
# The function is around line 348. Find "self._reqs_need_recv[" assignment.
text = re.sub(
r"(\n self\._reqs_need_recv\[request_id\] = PullReqMeta\()",
r"\n _pdp.emit('D_alloc_blocks', req_id=request_id, "
r"role='kv_consumer', num_local_blocks=len(local_block_ids), "
r"num_external_tokens=num_external_tokens) " + MARKER +
r"\n self._reqs_need_recv[request_id] = PullReqMeta(",
text, count=1,
)
# 4. Patch send_kv_to_decode entry (P side) — ZMQ message received
text = re.sub(
r"( async def send_kv_to_decode\(\n"
r" self, identity: bytes, sock: zmq\.asyncio\.Socket, meta: MooncakeXferMetadata\n"
r" \):\n)",
r"\1 _pdp.emit('P_zmq_received', role='kv_producer', "
r"num_reqs=len(meta.req_blocks), remote_host=meta.remote_hostname, "
r"transfer_ids=[tid for tid, _ in meta.req_blocks.values()]) " + MARKER + "\n",
text, count=1,
)
# 5. Patch _send_blocks (P side) — wrap RDMA write
# Inject before and after self.engine.batch_transfer_sync_write
text = re.sub(
r"( start_time = time\.perf_counter\(\)\n"
r" ret_value = self\.engine\.batch_transfer_sync_write\(\n"
r" remote_session, src_ptrs, dst_ptrs, lengths\n"
r" \))",
r" _pdp.emit('P_rdma_start', role='kv_producer', "
r"num_ops=len(src_ptrs), bytes_total=sum(lengths), remote=str(remote_session)) " + MARKER + "\n"
r"\1\n"
r" _pdp.emit('P_rdma_end', role='kv_producer', "
r"num_ops=len(src_ptrs), bytes_total=sum(lengths), ret=ret_value) " + MARKER,
text, count=1,
)
# 6. Patch process_pulling_result (D side) — RDMA recv complete (success path)
# Match the specific pattern inside `if pull_meta.pull_tasks_count == 0:`
text = re.sub(
r"(pull_meta\.pull_tasks_count -= 1\n"
r" if pull_meta\.pull_tasks_count == 0:\n"
r")( self\.finished_recving_reqs\.add\(pull_meta\.d_req_id\))",
r"\1 _pdp.emit('D_recv_complete', req_id=pull_meta.d_req_id, "
r"transfer_id=pull_meta.transfer_id, role='kv_consumer') " + MARKER + "\n"
r"\2",
text, count=1,
)
# 7. Patch request_finished (P side) — when prefill blocks are marked ready to send
# Find: self._reqs_need_send[request.request_id] = (request, send_block_ids)
text = re.sub(
r"(\n if delay_free_blocks:\n"
r" self\._reqs_need_send\[request\.request_id\] = \(request, send_block_ids\))",
r"\n if delay_free_blocks:\n"
r" _pdp.emit('P_prefill_done', req_id=request.request_id, "
r"transfer_id=params.get('transfer_id', ''), role='kv_producer', "
r"num_send_blocks=len(send_block_ids), num_prompt_tokens=request.num_prompt_tokens) "
+ MARKER + "\n"
r" self._reqs_need_send[request.request_id] = (request, send_block_ids)",
text, count=1,
)
return text
def patch_scheduler(text: str) -> str:
"""Patch v1/core/sched/scheduler.py for D-side request promotion."""
if _already_patched(text):
print(" scheduler.py already patched, skipping")
return text
# Add import at the top of the file
lines = text.splitlines()
last_import_line = 0
for i, line in enumerate(lines[:100]):
if line.startswith("import ") or line.startswith("from "):
last_import_line = i
import_snippet = (
"try:\n"
" from vllm.distributed.kv_transfer.kv_connector.v1.mooncake import _pd_profile as _pdp\n"
"except Exception:\n"
" class _pdp: # fallback no-op\n"
" @staticmethod\n"
" def emit(*a, **kw): pass\n"
" @staticmethod\n"
" def enabled(): return False\n"
f"{MARKER}"
)
lines.insert(last_import_line + 1, import_snippet)
text = "\n".join(lines) + "\n"
# Patch _update_waiting_for_remote_kv — match exact `request: Request` (no quotes)
text = re.sub(
r"( def _update_waiting_for_remote_kv\(self, request: Request\) -> None:\n)",
r"\1 _pdp.emit('D_request_promoted', req_id=request.request_id, "
r"role='kv_consumer', num_computed_tokens=request.num_computed_tokens) " + MARKER + "\n",
text, count=1,
)
return text
# ─── Driver ────────────────────────────────────────────────────────────────
def apply_to_file(path: Path, patcher) -> bool:
if not path.exists():
print(f" SKIP {path} (not found)")
return False
original = path.read_text()
patched = patcher(original)
if patched == original:
print(f" unchanged: {path}")
return False
path.write_text(patched)
n_marks = patched.count(MARKER)
print(f" patched ({n_marks} marks): {path}")
return True
def revert_file(path: Path) -> bool:
if not path.exists():
return False
original = path.read_text()
reverted = _revert(original)
if reverted == original:
print(f" no marks found: {path}")
return False
path.write_text(reverted)
print(f" reverted: {path}")
return True
def install_profile_module(vllm_root: Path) -> None:
"""Copy pd_profile.py to mooncake/_pd_profile.py inside vLLM."""
src = Path(__file__).parent / "pd_profile.py"
dst = vllm_root / "distributed/kv_transfer/kv_connector/v1/mooncake/_pd_profile.py"
dst.parent.mkdir(parents=True, exist_ok=True)
dst.write_text(src.read_text())
print(f" installed: {dst}")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true")
ap.add_argument("--revert", action="store_true")
ap.add_argument("--vllm-root", type=Path, default=DEFAULT_VLLM_ROOT)
args = ap.parse_args()
if not (args.apply or args.revert) or (args.apply and args.revert):
ap.error("Specify exactly one of --apply or --revert")
root = args.vllm_root
if not root.exists():
print(f"ERROR: vLLM root not found: {root}")
sys.exit(1)
mc_path = root / "distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
sched_path = root / "v1/core/sched/scheduler.py"
if args.apply:
print(f"Applying PD-sep profile patches to {root}")
install_profile_module(root)
apply_to_file(mc_path, patch_mooncake_connector)
apply_to_file(sched_path, patch_scheduler)
else:
print(f"Reverting PD-sep profile patches from {root}")
revert_file(mc_path)
revert_file(sched_path)
# also remove the module
prof_module = root / "distributed/kv_transfer/kv_connector/v1/mooncake/_pd_profile.py"
if prof_module.exists():
prof_module.unlink()
print(f" removed: {prof_module}")
if __name__ == "__main__":
main()