Root cause of the 6P+2D run-to-run collapse (rep1 100%, rep2 56%,
rep3 80%, session-routing 6.6%): not load-shedding, but a consumer
EngineCore crash.
Failure chain observed in the consumer logs:
1. D-pool fills to ~97% (decode-side capacity ceiling, the H1 story)
2. a large request's KV transfer fails: "Mooncake transfer engine
returned -1" (112k-token request, pool full)
3. scheduler fails the request (kv_load_failure_policy=fail)
4. PromptTokenStats.local_cache_hit = num_cached + recomputed -
num_external_computed goes NEGATIVE (external transfer exceeded
cached count)
5. loggers.record() calls Counter.inc(negative) -> prometheus raises
"Counters can only be incremented by non-negative amounts."
6. EngineCore dies -> every subsequent request fails (the cliff:
all successes in the first ~110s, zero after)
This turns ONE failed request into a total config collapse, and is
what made the round-robin 6P+2D reps look randomly variable.
Fix: clamp the three per-source prompt-token counts to >= 0 in
loggers.record() before they hit Counter.inc(). Pure insertion,
revertible via the existing sentinel mechanism. Lets a transfer
failure stay a single failed request instead of killing the engine,
so routing arms can be compared on equal footing.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
298 lines
9.8 KiB
Python
298 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Instrument vLLM V1 scheduler to emit per-request KV-block snapshots.
|
|
|
|
Snapshot is dumped from inside `Scheduler.schedule()` right before it
|
|
returns the SchedulerOutput. Each snapshot captures, for the calling
|
|
EngineCore instance:
|
|
|
|
{t_unix, step,
|
|
total_blocks, free_blocks, used_blocks,
|
|
running: [{req_id, n_blocks, n_computed, n_prompt, status}, ...],
|
|
waiting: [{req_id, n_prompt, status}, ...]}
|
|
|
|
We throttle to ~10 Hz (default; override via MB5_PERIOD_MS env var) so
|
|
the file size stays manageable over a 13-min trace replay.
|
|
|
|
Output path: $MB5_LOG_DIR/mb5_kv_snapshot_pid<pid>.jsonl (default
|
|
MB5_LOG_DIR=/tmp). One file per EngineCore PID — so for a multi-instance
|
|
launch each instance gets its own log even on the same host.
|
|
|
|
Apply / revert marker: `# MB5_INSTRUMENT_START` / `# MB5_INSTRUMENT_END`.
|
|
|
|
Usage:
|
|
python instrument_kv_snapshot.py --apply [--venv PATH]
|
|
python instrument_kv_snapshot.py --revert [--venv PATH]
|
|
python instrument_kv_snapshot.py --check [--venv PATH]
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
from pathlib import Path
|
|
|
|
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv-fresh/.venv")
|
|
TARGET_REL = "lib/python3.12/site-packages/vllm/v1/core/sched/scheduler.py"
|
|
MOONCAKE_REL = (
|
|
"lib/python3.12/site-packages/vllm/distributed/kv_transfer/"
|
|
"kv_connector/v1/mooncake/mooncake_connector.py"
|
|
)
|
|
LOGGERS_REL = "lib/python3.12/site-packages/vllm/v1/metrics/loggers.py"
|
|
|
|
START_MARK = "# MB5_INSTRUMENT_START"
|
|
END_MARK = "# MB5_INSTRUMENT_END"
|
|
|
|
# ---------- Patch 1: header (helper + globals) -----------------
|
|
# Insert right after the existing `from collections import ...` block.
|
|
# We pick a sentinel that's guaranteed to be in the file: the import of
|
|
# logger.
|
|
HEADER_ANCHOR = "logger = init_logger(__name__)\n"
|
|
|
|
HEADER_INSERT = f"""
|
|
|
|
{START_MARK}
|
|
import json as _mb5_json
|
|
import os as _mb5_os
|
|
import threading as _mb5_threading
|
|
import time as _mb5_time
|
|
_MB5_LOG_DIR = _mb5_os.environ.get("MB5_LOG_DIR", "/tmp")
|
|
_MB5_PERIOD_S = float(_mb5_os.environ.get("MB5_PERIOD_MS", "100")) / 1000.0
|
|
try:
|
|
_mb5_os.makedirs(_MB5_LOG_DIR, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
_MB5_LOG_PATH = _mb5_os.path.join(
|
|
_MB5_LOG_DIR, f"mb5_kv_snapshot_pid{{_mb5_os.getpid()}}.jsonl"
|
|
)
|
|
_MB5_LOG_FILE = None
|
|
_MB5_LOG_LOCK = _mb5_threading.Lock()
|
|
_MB5_LAST_LOG = 0.0
|
|
_MB5_STEP_COUNT = 0
|
|
|
|
|
|
def _mb5_write_event(d):
|
|
global _MB5_LOG_FILE
|
|
if _MB5_LOG_FILE is None:
|
|
_MB5_LOG_FILE = open(_MB5_LOG_PATH, "a", buffering=1)
|
|
with _MB5_LOG_LOCK:
|
|
_MB5_LOG_FILE.write(_mb5_json.dumps(d) + "\\n")
|
|
|
|
|
|
def _mb5_blocks_for_req(kvm, req_id):
|
|
\"\"\"Sum block count across all single-type managers in the coordinator.\"\"\"
|
|
total = 0
|
|
try:
|
|
managers = getattr(kvm.coordinator, "single_type_managers", None)
|
|
if managers is None:
|
|
return 0
|
|
for mgr in managers:
|
|
blocks = getattr(mgr, "req_to_blocks", None)
|
|
if blocks is None:
|
|
continue
|
|
try:
|
|
total += len(blocks.get(req_id, ()))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return total
|
|
|
|
|
|
def _mb5_snapshot(scheduler):
|
|
global _MB5_LAST_LOG, _MB5_STEP_COUNT
|
|
_MB5_STEP_COUNT += 1
|
|
now = _mb5_time.time()
|
|
if now - _MB5_LAST_LOG < _MB5_PERIOD_S:
|
|
return
|
|
_MB5_LAST_LOG = now
|
|
try:
|
|
kvm = scheduler.kv_cache_manager
|
|
pool = kvm.block_pool
|
|
total_blocks = pool.num_gpu_blocks
|
|
free_blocks = pool.get_num_free_blocks()
|
|
except Exception:
|
|
total_blocks = -1
|
|
free_blocks = -1
|
|
|
|
running = []
|
|
try:
|
|
for r in scheduler.running:
|
|
running.append({{
|
|
"req_id": str(r.request_id),
|
|
"n_blocks": _mb5_blocks_for_req(kvm, r.request_id),
|
|
"n_computed": int(getattr(r, "num_computed_tokens", 0)),
|
|
"n_prompt": int(getattr(r, "num_prompt_tokens", 0)),
|
|
"n_tokens": int(getattr(r, "num_tokens", 0)),
|
|
"status": str(getattr(r, "status", "")),
|
|
}})
|
|
except Exception:
|
|
pass
|
|
|
|
waiting = []
|
|
try:
|
|
# self.waiting is a queue; iterate without consuming.
|
|
for r in list(scheduler.waiting):
|
|
waiting.append({{
|
|
"req_id": str(r.request_id),
|
|
"n_prompt": int(getattr(r, "num_prompt_tokens", 0)),
|
|
"status": str(getattr(r, "status", "")),
|
|
}})
|
|
except Exception:
|
|
pass
|
|
|
|
_mb5_write_event({{
|
|
"t_unix": now,
|
|
"step": _MB5_STEP_COUNT,
|
|
"total_blocks": total_blocks,
|
|
"free_blocks": free_blocks,
|
|
"used_blocks": (total_blocks - free_blocks) if total_blocks >= 0 else -1,
|
|
"running": running,
|
|
"waiting": waiting,
|
|
}})
|
|
{END_MARK}
|
|
"""
|
|
|
|
# ---------- Patch 2: hook into schedule() return -----------------
|
|
# The schedule() method ends with `return scheduler_output`. We insert a
|
|
# snapshot call right before that. We anchor on the unique line near the
|
|
# end (line ~964) — `return scheduler_output` inside the schedule method.
|
|
# There may be other `return` statements; we target the one at minimum
|
|
# indentation that follows the schedule body. Use a stable preceding line.
|
|
|
|
SCHED_RET_TARGET = """ return scheduler_output
|
|
|
|
def _agentic_emit_step_log("""
|
|
|
|
SCHED_RET_REPLACE = f""" {START_MARK}
|
|
_mb5_snapshot(self)
|
|
{END_MARK}
|
|
return scheduler_output
|
|
|
|
def _agentic_emit_step_log("""
|
|
|
|
|
|
SCHED_PATCHES = [
|
|
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
|
|
("schedule() return", SCHED_RET_TARGET, SCHED_RET_REPLACE),
|
|
]
|
|
|
|
# ---------- Patch 3: vLLM 0.18.1 kv_consumer AttributeError fix --------------
|
|
# In MooncakeConnectorWorker.__init__, `self.bootstrap_server` is only assigned
|
|
# inside the `is_kv_producer` branch (around line 615). For kv_consumer roles
|
|
# the attribute is never set, but later code paths (e.g. line ~1060) check
|
|
# `if self.bootstrap_server is not None:` and AttributeError. We initialize it
|
|
# unconditionally just before the role-conditional branch.
|
|
MOONCAKE_ANCHOR = " self.reqs_need_send: dict[TransferId, SendBlockMeta] = {}\n"
|
|
MOONCAKE_INSERT = (
|
|
f" {START_MARK}\n"
|
|
f" self.bootstrap_server = None # vLLM 0.18.1 kv_consumer fix\n"
|
|
f" {END_MARK}\n"
|
|
)
|
|
|
|
MOONCAKE_PATCHES = [
|
|
("kv_consumer bootstrap_server init", MOONCAKE_ANCHOR,
|
|
MOONCAKE_ANCHOR + MOONCAKE_INSERT),
|
|
]
|
|
|
|
# ---------- Patch 4: vLLM 0.18.1 PD-consumer metrics counter underflow ------
|
|
# In PromptTokenStats.update_from_output, local_cache_hit is computed as
|
|
# (num_cached_tokens + recomputed - num_external_computed_tokens). On a
|
|
# kv_consumer, a remote KV transfer can report more external-computed tokens
|
|
# than the scheduler's cached count (esp. on a KV-load failure for a large
|
|
# request), driving local_cache_hit negative. loggers.record() then calls
|
|
# Counter.inc() with that negative value and prometheus_client raises
|
|
# "Counters can only be incremented by non-negative amounts.", which kills the
|
|
# EngineCore — turning one failed request into a total config collapse.
|
|
# We clamp the per-source counts to >= 0 right before they are recorded.
|
|
LOGGERS_ANCHOR = " pts = iteration_stats.prompt_token_stats\n"
|
|
LOGGERS_INSERT = (
|
|
f" {START_MARK}\n"
|
|
f" if pts.local_cache_hit < 0:\n"
|
|
f" pts.local_cache_hit = 0\n"
|
|
f" if pts.computed < 0:\n"
|
|
f" pts.computed = 0\n"
|
|
f" if pts.external_kv_transfer < 0:\n"
|
|
f" pts.external_kv_transfer = 0\n"
|
|
f" {END_MARK}\n"
|
|
)
|
|
|
|
LOGGERS_PATCHES = [
|
|
("PD-consumer counter underflow clamp", LOGGERS_ANCHOR,
|
|
LOGGERS_ANCHOR + LOGGERS_INSERT),
|
|
]
|
|
|
|
PATCH_FILES = [
|
|
(TARGET_REL, SCHED_PATCHES),
|
|
(MOONCAKE_REL, MOONCAKE_PATCHES),
|
|
(LOGGERS_REL, LOGGERS_PATCHES),
|
|
]
|
|
|
|
|
|
def find_target(venv_or_path: Path, rel_path: str) -> Path:
|
|
candidates = [venv_or_path / rel_path, DEFAULT_VENV / rel_path]
|
|
for c in candidates:
|
|
if c.is_file():
|
|
return c
|
|
raise FileNotFoundError(
|
|
f"cannot find {rel_path} under {venv_or_path}"
|
|
)
|
|
|
|
|
|
def is_patched(text: str) -> bool:
|
|
return START_MARK in text
|
|
|
|
|
|
def apply_one(target: Path, patches: list) -> None:
|
|
text = target.read_text()
|
|
if is_patched(text):
|
|
print(f"[mb5-instr] already patched: {target}")
|
|
return
|
|
new = text
|
|
for name, src, dst in patches:
|
|
if src not in new:
|
|
raise RuntimeError(
|
|
f"patch {name!r}: anchor not found in {target}."
|
|
)
|
|
new = new.replace(src, dst, 1)
|
|
target.write_text(new)
|
|
print(f"[mb5-instr] applied {len(patches)} patches -> {target}")
|
|
|
|
|
|
def revert_one(target: Path) -> None:
|
|
text = target.read_text()
|
|
if not is_patched(text):
|
|
print(f"[mb5-instr] not patched (nothing to revert): {target}")
|
|
return
|
|
pat = re.compile(
|
|
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n?",
|
|
flags=re.DOTALL,
|
|
)
|
|
new = pat.sub("", text)
|
|
new = re.sub(r"\n{3,}", "\n\n", new)
|
|
target.write_text(new)
|
|
print(f"[mb5-instr] reverted: {target}")
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--apply", action="store_true")
|
|
p.add_argument("--revert", action="store_true")
|
|
p.add_argument("--check", action="store_true")
|
|
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
|
|
args = p.parse_args()
|
|
for rel_path, patches in PATCH_FILES:
|
|
target = find_target(args.venv, rel_path)
|
|
if args.apply:
|
|
apply_one(target, patches)
|
|
elif args.revert:
|
|
revert_one(target)
|
|
elif args.check:
|
|
text = target.read_text()
|
|
state = 'PATCHED' if is_patched(text) else 'CLEAN'
|
|
print(f"[mb5-instr] {state}: {target}")
|
|
else:
|
|
p.error("specify --apply / --revert / --check")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|