MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at ~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into ~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s) + ~45% control-plane GIL starvation during long prefills. Reproduced on a fresh upstream venv (byte-identical transfer path) -> upstream/hardware inherent, not our patch. Layerwise is the wrong lever; the tax is structural on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6, instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
361 lines
13 KiB
Python
Executable File
361 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Instrument vLLM V1 scheduler to dump per-request DST-side migration timeline.
|
|
|
|
For each request that arrives at the engine with `kv_transfer_params`
|
|
containing `do_remote_prefill=True` (i.e., the decode-target of an
|
|
EAR v3 migration), record:
|
|
|
|
t_arrival_unix — Scheduler.add_request() entry
|
|
t_wait_for_kvs_unix — status set to WAITING_FOR_REMOTE_KVS (KV pull start)
|
|
t_kv_recv_done_unix — req_id added to finished_recving_kv_req_ids
|
|
t_first_scheduled_unix — first time req appears in self.running after KV done
|
|
t_first_token_unix — first new_token_ids appended in update_from_output
|
|
arrival_state — {n_running, n_waiting, pending_prefill_tok,
|
|
n_waiting_for_kvs}
|
|
|
|
We complement the proxy `breakdown.json` (t_decode_sent_unix /
|
|
t_first_token_unix) to attribute the migration's dst-side wait into:
|
|
HTTP relay + admission_pre_kv + KV pull + admission_post_kv + first_iter
|
|
|
|
One JSONL per EngineCore PID at $DM_LOG_DIR/dm_mig_pid<pid>.jsonl
|
|
(default DM_LOG_DIR=/tmp). Records are flushed when t_first_token is
|
|
reached or when the request is aborted/finished.
|
|
|
|
Co-exists with MB5 KV snapshot patches (different START/END markers).
|
|
|
|
Usage:
|
|
python instrument_dst_migration.py --apply [--venv PATH]
|
|
python instrument_dst_migration.py --revert [--venv PATH]
|
|
python instrument_dst_migration.py --check [--venv PATH]
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
|
|
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv/.venv")
|
|
TARGET_REL = "lib/python3.12/site-packages/vllm/v1/core/sched/scheduler.py"
|
|
|
|
START_MARK = "# DM_INSTRUMENT_START"
|
|
END_MARK = "# DM_INSTRUMENT_END"
|
|
|
|
# ---------- Patch 1: module-level header (helpers + globals) -----------------
|
|
# Anchor: the very first `class Scheduler(SchedulerInterface):` line. We insert
|
|
# the entire helper block immediately before that, so MB5's prior block (if
|
|
# present) is preserved and our block lives in module scope. The anchor must
|
|
# stay outside our own START/END markers so revert() can re-find it.
|
|
HEADER_ANCHOR = "class Scheduler(SchedulerInterface):"
|
|
|
|
HEADER_INSERT = f"""{START_MARK}
|
|
import json as _dm_json
|
|
import os as _dm_os
|
|
import threading as _dm_threading
|
|
import time as _dm_time
|
|
_DM_LOG_DIR = _dm_os.environ.get("DM_LOG_DIR", "/tmp")
|
|
try:
|
|
_dm_os.makedirs(_DM_LOG_DIR, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
_DM_LOG_PATH = _dm_os.path.join(
|
|
_DM_LOG_DIR, f"dm_mig_pid{{_dm_os.getpid()}}.jsonl"
|
|
)
|
|
_DM_LOG_FILE = None
|
|
_DM_LOG_LOCK = _dm_threading.Lock()
|
|
# req_id -> in-flight record. We pop and flush when t_first_token lands or on
|
|
# finish/abort.
|
|
_DM_DATA: dict = {{}}
|
|
|
|
|
|
def _dm_write_event(d: dict) -> None:
|
|
global _DM_LOG_FILE
|
|
if _DM_LOG_FILE is None:
|
|
_DM_LOG_FILE = open(_DM_LOG_PATH, "a", buffering=1)
|
|
with _DM_LOG_LOCK:
|
|
_DM_LOG_FILE.write(_dm_json.dumps(d) + "\\n")
|
|
|
|
|
|
def _dm_is_migrated(request) -> bool:
|
|
ktp = getattr(request, "kv_transfer_params", None)
|
|
if not isinstance(ktp, dict):
|
|
return False
|
|
return bool(ktp.get("do_remote_prefill"))
|
|
|
|
|
|
def _dm_snapshot_arrival(scheduler) -> dict:
|
|
try:
|
|
n_running = len(scheduler.running)
|
|
except Exception:
|
|
n_running = -1
|
|
try:
|
|
n_waiting_main = len(scheduler.waiting)
|
|
except Exception:
|
|
n_waiting_main = -1
|
|
try:
|
|
n_skipped = len(scheduler.skipped_waiting)
|
|
except Exception:
|
|
n_skipped = 0
|
|
pending_tok = 0
|
|
n_kv = 0
|
|
try:
|
|
from vllm.v1.request import RequestStatus as _RS
|
|
for r in list(scheduler.waiting):
|
|
try:
|
|
if getattr(r, "status", None) == _RS.WAITING_FOR_REMOTE_KVS:
|
|
n_kv += 1
|
|
npr = int(getattr(r, "num_prompt_tokens", 0))
|
|
nct = int(getattr(r, "num_computed_tokens", 0))
|
|
pending_tok += max(0, npr - nct)
|
|
except Exception:
|
|
pass
|
|
for r in list(scheduler.skipped_waiting):
|
|
try:
|
|
if getattr(r, "status", None) == _RS.WAITING_FOR_REMOTE_KVS:
|
|
n_kv += 1
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return {{
|
|
"n_running": int(n_running),
|
|
"n_waiting": int(n_waiting_main + n_skipped),
|
|
"pending_prefill_tok": int(pending_tok),
|
|
"n_waiting_for_kvs": int(n_kv),
|
|
}}
|
|
|
|
|
|
def _dm_emit_and_drop(req_id: str, reason: str = "first_token") -> None:
|
|
rec = _DM_DATA.pop(req_id, None)
|
|
if rec is None:
|
|
return
|
|
rec["flush_reason"] = reason
|
|
rec["t_flush_unix"] = _dm_time.time()
|
|
_dm_write_event(rec)
|
|
|
|
|
|
{END_MARK}
|
|
|
|
|
|
"""
|
|
|
|
# ---------- Patch 2: add_request() hook --------------------------------------
|
|
# Right after self.requests[request.request_id] = request (line ~1927) and the
|
|
# if self.log_stats: block. Anchor includes the QUEUED record_event line so it
|
|
# is uniquely matchable.
|
|
ADD_REQUEST_ANCHOR = """ self._enqueue_waiting_request(request)
|
|
self.requests[request.request_id] = request
|
|
if self.log_stats:
|
|
request.record_event(EngineCoreEventType.QUEUED)
|
|
"""
|
|
|
|
ADD_REQUEST_REPLACE = f""" self._enqueue_waiting_request(request)
|
|
self.requests[request.request_id] = request
|
|
if self.log_stats:
|
|
request.record_event(EngineCoreEventType.QUEUED)
|
|
{START_MARK}
|
|
try:
|
|
if _dm_is_migrated(request):
|
|
_DM_DATA[request.request_id] = {{
|
|
"req_id": str(request.request_id),
|
|
"is_migrated": True,
|
|
"n_prompt_tokens": int(getattr(request, "num_prompt_tokens", 0)),
|
|
"t_arrival_unix": _dm_time.time(),
|
|
"t_wait_for_kvs_unix": None,
|
|
"t_kv_recv_done_unix": None,
|
|
"t_first_scheduled_unix": None,
|
|
"t_first_token_unix": None,
|
|
"arrival_state": _dm_snapshot_arrival(self),
|
|
}}
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
"""
|
|
|
|
# ---------- Patch 3: WAITING_FOR_REMOTE_KVS transition -----------------------
|
|
WAIT_KV_ANCHOR = """ request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
|
step_skipped_waiting.prepend_request(request)
|
|
"""
|
|
|
|
WAIT_KV_REPLACE = f""" request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
|
{START_MARK}
|
|
try:
|
|
_rec = _DM_DATA.get(request.request_id)
|
|
if _rec is not None and _rec["t_wait_for_kvs_unix"] is None:
|
|
_rec["t_wait_for_kvs_unix"] = _dm_time.time()
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
step_skipped_waiting.prepend_request(request)
|
|
"""
|
|
|
|
# ---------- Patch 4: finished_recving signal ---------------------------------
|
|
FINISHED_RECV_ANCHOR = """ if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
|
|
self.finished_recving_kv_req_ids.add(req_id)
|
|
elif RequestStatus.is_finished(req.status):
|
|
"""
|
|
|
|
FINISHED_RECV_REPLACE = f""" if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
|
|
self.finished_recving_kv_req_ids.add(req_id)
|
|
{START_MARK}
|
|
try:
|
|
_rec = _DM_DATA.get(req_id)
|
|
if _rec is not None and _rec["t_kv_recv_done_unix"] is None:
|
|
_rec["t_kv_recv_done_unix"] = _dm_time.time()
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
elif RequestStatus.is_finished(req.status):
|
|
"""
|
|
|
|
# ---------- Patch 5: first scheduled (sweep at end of schedule()) ------------
|
|
# Co-exists with the MB5 snapshot inserted at the same location.
|
|
SCHED_END_ANCHOR = """ # MB5_INSTRUMENT_START
|
|
_mb5_snapshot(self)
|
|
# MB5_INSTRUMENT_END
|
|
return scheduler_output
|
|
"""
|
|
|
|
SCHED_END_REPLACE = f""" # MB5_INSTRUMENT_START
|
|
_mb5_snapshot(self)
|
|
# MB5_INSTRUMENT_END
|
|
{START_MARK}
|
|
try:
|
|
if _DM_DATA:
|
|
_now_dm = _dm_time.time()
|
|
for _r in self.running:
|
|
_rec = _DM_DATA.get(_r.request_id)
|
|
if _rec is not None and _rec["t_first_scheduled_unix"] is None:
|
|
_rec["t_first_scheduled_unix"] = _now_dm
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
return scheduler_output
|
|
"""
|
|
|
|
# ---------- Patch 6: first new token in update_from_output -------------------
|
|
FIRST_TOK_ANCHOR = """ # Check for stop and update request status.
|
|
if new_token_ids:
|
|
new_token_ids, stopped = self._update_request_with_output(
|
|
request, new_token_ids
|
|
)
|
|
"""
|
|
|
|
FIRST_TOK_REPLACE = f""" # Check for stop and update request status.
|
|
if new_token_ids:
|
|
{START_MARK}
|
|
try:
|
|
_rec = _DM_DATA.get(request.request_id)
|
|
if _rec is not None and _rec["t_first_token_unix"] is None:
|
|
_rec["t_first_token_unix"] = _dm_time.time()
|
|
_dm_emit_and_drop(request.request_id, reason="first_token")
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
new_token_ids, stopped = self._update_request_with_output(
|
|
request, new_token_ids
|
|
)
|
|
"""
|
|
|
|
# ---------- Patch 7: abort/finish — flush partial record ---------------------
|
|
FINISH_ANCHOR = """ request.status = finished_status
|
|
self._free_request(request, delay_free_blocks=delay_free_blocks)
|
|
"""
|
|
|
|
FINISH_REPLACE = f""" request.status = finished_status
|
|
{START_MARK}
|
|
try:
|
|
if request.request_id in _DM_DATA:
|
|
_dm_emit_and_drop(request.request_id, reason="finish_or_abort")
|
|
except Exception:
|
|
pass
|
|
{END_MARK}
|
|
self._free_request(request, delay_free_blocks=delay_free_blocks)
|
|
"""
|
|
|
|
PATCHES = [
|
|
("header", HEADER_ANCHOR, HEADER_INSERT + HEADER_ANCHOR),
|
|
("add_request", ADD_REQUEST_ANCHOR, ADD_REQUEST_REPLACE),
|
|
("wait_for_kvs", WAIT_KV_ANCHOR, WAIT_KV_REPLACE),
|
|
("finished_recving", FINISHED_RECV_ANCHOR, FINISHED_RECV_REPLACE),
|
|
("first_scheduled", SCHED_END_ANCHOR, SCHED_END_REPLACE),
|
|
("first_token", FIRST_TOK_ANCHOR, FIRST_TOK_REPLACE),
|
|
("finish_flush", FINISH_ANCHOR, FINISH_REPLACE),
|
|
]
|
|
|
|
|
|
def find_target(venv_or_path: Path) -> Path:
|
|
candidates = [venv_or_path / TARGET_REL, DEFAULT_VENV / TARGET_REL]
|
|
for c in candidates:
|
|
if c.is_file():
|
|
return c
|
|
raise FileNotFoundError(f"cannot find {TARGET_REL} under {venv_or_path}")
|
|
|
|
|
|
def is_patched(text: str) -> bool:
|
|
return START_MARK in text
|
|
|
|
|
|
def apply(target: Path) -> None:
|
|
text = target.read_text()
|
|
if is_patched(text):
|
|
print(f"[dm-instr] already patched: {target}")
|
|
return
|
|
new = text
|
|
for name, src, dst in PATCHES:
|
|
if src not in new:
|
|
raise RuntimeError(
|
|
f"patch {name!r}: anchor not found in {target}. "
|
|
f"Anchor head: {src.splitlines()[0]!r}"
|
|
)
|
|
new = new.replace(src, dst, 1)
|
|
target.write_text(new)
|
|
print(f"[dm-instr] applied {len(PATCHES)} patches -> {target}")
|
|
|
|
|
|
def revert(target: Path) -> None:
|
|
text = target.read_text()
|
|
if not is_patched(text):
|
|
print(f"[dm-instr] not patched (nothing to revert): {target}")
|
|
return
|
|
# Strip our DM_* block, including the trailing newline that
|
|
# terminated the END_MARK line. We do NOT collapse other blank-line
|
|
# runs (MB5_* whitespace and original spacing between methods are
|
|
# preserved).
|
|
pat = re.compile(
|
|
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n",
|
|
flags=re.DOTALL,
|
|
)
|
|
new = pat.sub("", text)
|
|
# The header insert added a leading "# DM_INSTRUMENT_START\n" with
|
|
# two trailing blank lines and the anchor; revert removed the block
|
|
# plus its trailing newline, leaving one extra blank line before the
|
|
# class — harmless. We additionally collapse the very narrow case of
|
|
# "\n\n\nclass Scheduler" -> "\n\nclass Scheduler" so revert is
|
|
# byte-identical for that anchor.
|
|
new = re.sub(r"\n{3,}class Scheduler\(", "\n\nclass Scheduler(", new)
|
|
target.write_text(new)
|
|
print(f"[dm-instr] reverted: {target}")
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--apply", action="store_true")
|
|
p.add_argument("--revert", action="store_true")
|
|
p.add_argument("--check", action="store_true")
|
|
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
|
|
args = p.parse_args()
|
|
target = find_target(args.venv)
|
|
if args.apply:
|
|
apply(target)
|
|
elif args.revert:
|
|
revert(target)
|
|
elif args.check:
|
|
state = "PATCHED" if is_patched(target.read_text()) else "CLEAN"
|
|
print(f"[dm-instr] {state}: {target}")
|
|
else:
|
|
p.error("specify --apply / --revert / --check")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|