MB5 instrumentation: per-request KV-block snapshot from vLLM V1 scheduler
The §3.2 H1 (D-pool capacity wall) argument needs system-level evidence, not just headline latency. This patch lets us record, every ~100 ms, the exact composition of each vLLM instance's KV pool: - total / free / used block counts - for each RUNNING request: blocks held, computed tokens, prompt tokens - for each WAITING request: prompt tokens, status Hook: inside Scheduler.schedule() right before the return. Per-request blocks come from coordinator.single_type_managers[*].req_to_blocks (vLLM 0.18.1's own per-request bookkeeping; no new tracking layer). Throttled by MB5_PERIOD_MS env var (default 100 ms = 10 Hz) so a 13-min trace replay produces ~8 k snapshots per instance instead of ~80 k unthrottled. Output: $MB5_LOG_DIR/mb5_kv_snapshot_pid<pid>.jsonl (default MB5_LOG_DIR=/tmp). One file per EngineCore PID. Apply/revert idempotent, same pattern as instrument_mooncake.py. Markers: # MB5_INSTRUMENT_START / # MB5_INSTRUMENT_END. Validated on dash1 venv: apply → py_compile ok → revert → py_compile ok. With this in place we can build the stacked-area "KV pool composition over time" figure the user asked for: x = wall-clock, y = block count, colored bands = per-request portions. Comparing 8C colo vs 4P+4D on the same trace will directly show whether (and when) the D pool hits its ceiling — turning "PD-disagg is X× worse" into "PD-disagg is X× worse BECAUSE these specific requests at this specific time filled the pool and forced this queue depth". Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
241
microbench/fresh_setup/instrument_kv_snapshot.py
Normal file
241
microbench/fresh_setup/instrument_kv_snapshot.py
Normal file
@@ -0,0 +1,241 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Instrument vLLM V1 scheduler to emit per-request KV-block snapshots.
|
||||
|
||||
Snapshot is dumped from inside `Scheduler.schedule()` right before it
|
||||
returns the SchedulerOutput. Each snapshot captures, for the calling
|
||||
EngineCore instance:
|
||||
|
||||
{t_unix, step,
|
||||
total_blocks, free_blocks, used_blocks,
|
||||
running: [{req_id, n_blocks, n_computed, n_prompt, status}, ...],
|
||||
waiting: [{req_id, n_prompt, status}, ...]}
|
||||
|
||||
We throttle to ~10 Hz (default; override via MB5_PERIOD_MS env var) so
|
||||
the file size stays manageable over a 13-min trace replay.
|
||||
|
||||
Output path: $MB5_LOG_DIR/mb5_kv_snapshot_pid<pid>.jsonl (default
|
||||
MB5_LOG_DIR=/tmp). One file per EngineCore PID — so for a multi-instance
|
||||
launch each instance gets its own log even on the same host.
|
||||
|
||||
Apply / revert marker: `# MB5_INSTRUMENT_START` / `# MB5_INSTRUMENT_END`.
|
||||
|
||||
Usage:
|
||||
python instrument_kv_snapshot.py --apply [--venv PATH]
|
||||
python instrument_kv_snapshot.py --revert [--venv PATH]
|
||||
python instrument_kv_snapshot.py --check [--venv PATH]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv-fresh/.venv")
|
||||
TARGET_REL = "lib/python3.12/site-packages/vllm/v1/core/sched/scheduler.py"
|
||||
|
||||
START_MARK = "# MB5_INSTRUMENT_START"
|
||||
END_MARK = "# MB5_INSTRUMENT_END"
|
||||
|
||||
# ---------- Patch 1: header (helper + globals) -----------------
|
||||
# Insert right after the existing `from collections import ...` block.
|
||||
# We pick a sentinel that's guaranteed to be in the file: the import of
|
||||
# logger.
|
||||
HEADER_ANCHOR = "logger = init_logger(__name__)\n"
|
||||
|
||||
HEADER_INSERT = f"""
|
||||
|
||||
{START_MARK}
|
||||
import json as _mb5_json
|
||||
import os as _mb5_os
|
||||
import threading as _mb5_threading
|
||||
import time as _mb5_time
|
||||
_MB5_LOG_DIR = _mb5_os.environ.get("MB5_LOG_DIR", "/tmp")
|
||||
_MB5_PERIOD_S = float(_mb5_os.environ.get("MB5_PERIOD_MS", "100")) / 1000.0
|
||||
try:
|
||||
_mb5_os.makedirs(_MB5_LOG_DIR, exist_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
_MB5_LOG_PATH = _mb5_os.path.join(
|
||||
_MB5_LOG_DIR, f"mb5_kv_snapshot_pid{{_mb5_os.getpid()}}.jsonl"
|
||||
)
|
||||
_MB5_LOG_FILE = None
|
||||
_MB5_LOG_LOCK = _mb5_threading.Lock()
|
||||
_MB5_LAST_LOG = 0.0
|
||||
_MB5_STEP_COUNT = 0
|
||||
|
||||
|
||||
def _mb5_write_event(d):
|
||||
global _MB5_LOG_FILE
|
||||
if _MB5_LOG_FILE is None:
|
||||
_MB5_LOG_FILE = open(_MB5_LOG_PATH, "a", buffering=1)
|
||||
with _MB5_LOG_LOCK:
|
||||
_MB5_LOG_FILE.write(_mb5_json.dumps(d) + "\\n")
|
||||
|
||||
|
||||
def _mb5_blocks_for_req(kvm, req_id):
|
||||
\"\"\"Sum block count across all single-type managers in the coordinator.\"\"\"
|
||||
total = 0
|
||||
try:
|
||||
managers = getattr(kvm.coordinator, "single_type_managers", None)
|
||||
if managers is None:
|
||||
return 0
|
||||
for mgr in managers:
|
||||
blocks = getattr(mgr, "req_to_blocks", None)
|
||||
if blocks is None:
|
||||
continue
|
||||
try:
|
||||
total += len(blocks.get(req_id, ()))
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
return total
|
||||
|
||||
|
||||
def _mb5_snapshot(scheduler):
|
||||
global _MB5_LAST_LOG, _MB5_STEP_COUNT
|
||||
_MB5_STEP_COUNT += 1
|
||||
now = _mb5_time.time()
|
||||
if now - _MB5_LAST_LOG < _MB5_PERIOD_S:
|
||||
return
|
||||
_MB5_LAST_LOG = now
|
||||
try:
|
||||
kvm = scheduler.kv_cache_manager
|
||||
pool = kvm.block_pool
|
||||
total_blocks = pool.num_gpu_blocks
|
||||
free_blocks = pool.get_num_free_blocks()
|
||||
except Exception:
|
||||
total_blocks = -1
|
||||
free_blocks = -1
|
||||
|
||||
running = []
|
||||
try:
|
||||
for r in scheduler.running:
|
||||
running.append({{
|
||||
"req_id": str(r.request_id),
|
||||
"n_blocks": _mb5_blocks_for_req(kvm, r.request_id),
|
||||
"n_computed": int(getattr(r, "num_computed_tokens", 0)),
|
||||
"n_prompt": int(getattr(r, "num_prompt_tokens", 0)),
|
||||
"n_tokens": int(getattr(r, "num_tokens", 0)),
|
||||
"status": str(getattr(r, "status", "")),
|
||||
}})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
waiting = []
|
||||
try:
|
||||
# self.waiting is a queue; iterate without consuming.
|
||||
for r in list(scheduler.waiting):
|
||||
waiting.append({{
|
||||
"req_id": str(r.request_id),
|
||||
"n_prompt": int(getattr(r, "num_prompt_tokens", 0)),
|
||||
"status": str(getattr(r, "status", "")),
|
||||
}})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_mb5_write_event({{
|
||||
"t_unix": now,
|
||||
"step": _MB5_STEP_COUNT,
|
||||
"total_blocks": total_blocks,
|
||||
"free_blocks": free_blocks,
|
||||
"used_blocks": (total_blocks - free_blocks) if total_blocks >= 0 else -1,
|
||||
"running": running,
|
||||
"waiting": waiting,
|
||||
}})
|
||||
{END_MARK}
|
||||
"""
|
||||
|
||||
# ---------- Patch 2: hook into schedule() return -----------------
|
||||
# The schedule() method ends with `return scheduler_output`. We insert a
|
||||
# snapshot call right before that. We anchor on the unique line near the
|
||||
# end (line ~964) — `return scheduler_output` inside the schedule method.
|
||||
# There may be other `return` statements; we target the one at minimum
|
||||
# indentation that follows the schedule body. Use a stable preceding line.
|
||||
|
||||
SCHED_RET_TARGET = """ return scheduler_output
|
||||
|
||||
def _agentic_emit_step_log("""
|
||||
|
||||
SCHED_RET_REPLACE = f""" {START_MARK}
|
||||
_mb5_snapshot(self)
|
||||
{END_MARK}
|
||||
return scheduler_output
|
||||
|
||||
def _agentic_emit_step_log("""
|
||||
|
||||
|
||||
PATCHES = [
|
||||
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
|
||||
("schedule() return", SCHED_RET_TARGET, SCHED_RET_REPLACE),
|
||||
]
|
||||
|
||||
|
||||
def find_target(venv_or_path: Path) -> Path:
|
||||
candidates = [venv_or_path, DEFAULT_VENV / TARGET_REL]
|
||||
for c in candidates:
|
||||
if c.is_file():
|
||||
return c
|
||||
if c.is_dir():
|
||||
sub = c / TARGET_REL
|
||||
if sub.is_file():
|
||||
return sub
|
||||
raise FileNotFoundError(f"cannot find vllm V1 scheduler at {venv_or_path}")
|
||||
|
||||
|
||||
def is_patched(text: str) -> bool:
|
||||
return START_MARK in text
|
||||
|
||||
|
||||
def apply(target: Path) -> None:
|
||||
text = target.read_text()
|
||||
if is_patched(text):
|
||||
print(f"[mb5-instr] already patched: {target}")
|
||||
return
|
||||
new = text
|
||||
for name, src, dst in PATCHES:
|
||||
if src not in new:
|
||||
raise RuntimeError(
|
||||
f"patch {name!r}: anchor not found in {target}."
|
||||
)
|
||||
new = new.replace(src, dst, 1)
|
||||
target.write_text(new)
|
||||
print(f"[mb5-instr] applied {len(PATCHES)} patches -> {target}")
|
||||
|
||||
|
||||
def revert(target: Path) -> None:
|
||||
text = target.read_text()
|
||||
if not is_patched(text):
|
||||
print(f"[mb5-instr] not patched (nothing to revert): {target}")
|
||||
return
|
||||
pat = re.compile(
|
||||
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n?",
|
||||
flags=re.DOTALL,
|
||||
)
|
||||
new = pat.sub("", text)
|
||||
new = re.sub(r"\n{3,}", "\n\n", new)
|
||||
target.write_text(new)
|
||||
print(f"[mb5-instr] reverted: {target}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--apply", action="store_true")
|
||||
p.add_argument("--revert", action="store_true")
|
||||
p.add_argument("--check", action="store_true")
|
||||
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
|
||||
args = p.parse_args()
|
||||
target = find_target(args.venv)
|
||||
if args.apply:
|
||||
apply(target)
|
||||
elif args.revert:
|
||||
revert(target)
|
||||
elif args.check:
|
||||
text = target.read_text()
|
||||
print(f"[mb5-instr] {'PATCHED' if is_patched(text) else 'CLEAN'}: {target}")
|
||||
else:
|
||||
p.error("specify --apply / --revert / --check")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user