A3: vLLM scheduler patch for step-level JSONL log

When AGENTIC_STEP_LOG_PATH is set, the scheduler emits one JSONL line
per scheduler step with t_unix, worker_id, prefill/decode token
counts, n_running/n_waiting, preempted ids, and per-request phase
labels. No-op when the env var is unset, so production engines are
not impacted. bench.sh now threads AGENTIC_STEP_LOG_DIR through to
each per-engine launch so step logs end up at engine_${i}.jsonl.

Required by Batch 2 (PD-colo interference index) and Batch 5
(same-worker overlap attribution); engine /metrics polling cannot
provide per-step granularity.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 16:19:11 +08:00
parent fe556b5d98
commit 5816aad731
3 changed files with 175 additions and 0 deletions

View File

@@ -161,12 +161,24 @@ launch_instances() {
use_mooncake=true
fi
# Optional: when AGENTIC_STEP_LOG_DIR is exported, point each engine at its
# own JSONL file so the patched scheduler emits per-step records.
local step_log_dir="${AGENTIC_STEP_LOG_DIR:-}"
if [ -n "$step_log_dir" ]; then
mkdir -p "$step_log_dir"
fi
for i in $(seq 0 $((N_INSTANCES - 1))); do
local port=$((BASE_PORT + i))
local master=$((29500 + i))
local logfile="$OUTDIR/vllm_inst_${i}.log"
local step_env=""
if [ -n "$step_log_dir" ]; then
step_env="AGENTIC_STEP_LOG_PATH=$step_log_dir/engine_${i}.jsonl AGENTIC_WORKER_ID=engine_${i}"
fi
if [ "$use_mooncake" = "true" ]; then
env $step_env \
PYTHONHASHSEED=42 \
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998 + i)) \
MASTER_PORT=$master \
@@ -180,6 +192,7 @@ launch_instances() {
$vllm_extra_args \
> "$logfile" 2>&1 &
else
env $step_env \
MASTER_PORT=$master \
CUDA_VISIBLE_DEVICES=$i \
$VLLM serve "$MODEL" \

View File

@@ -0,0 +1,64 @@
"""Tests for A3 vLLM scheduler patch: per-step JSONL log hooks.
The vendored vLLM at third_party/vllm/vllm/v1/core/sched/scheduler.py must
contain the agentic-kv hooks. We test by inspecting the file on disk because
importing vllm.Scheduler requires the full GPU runtime — those imports are
expensive and not portable to CI. Hooks are anchor strings we control.
"""
from __future__ import annotations
import py_compile
from pathlib import Path
SCHEDULER_PATH = (
Path(__file__).resolve().parent.parent
/ "third_party" / "vllm" / "vllm" / "v1" / "core" / "sched" / "scheduler.py"
)
def test_scheduler_file_exists():
assert SCHEDULER_PATH.exists(), f"missing {SCHEDULER_PATH}"
def test_scheduler_syntactically_valid():
py_compile.compile(str(SCHEDULER_PATH), doraise=True)
def test_scheduler_has_agentic_step_log_init_hook():
src = SCHEDULER_PATH.read_text()
assert "AGENTIC_STEP_LOG_PATH" in src
assert "AGENTIC_WORKER_ID" in src
assert "self._agentic_step_log_fh" in src
assert "self._agentic_worker_id" in src
def test_scheduler_has_step_emit_helper():
src = SCHEDULER_PATH.read_text()
assert "def _agentic_emit_step_log(" in src
assert "prefill_tokens" in src
assert "decode_tokens" in src
assert "n_running_total" in src
assert "n_waiting" in src
assert "per_req" in src
def test_scheduler_emit_is_invoked_in_schedule_return_path():
"""The emit call must sit between _update_after_schedule and return."""
src = SCHEDULER_PATH.read_text()
assert "self._agentic_emit_step_log(" in src
update_idx = src.index("self._update_after_schedule(scheduler_output)")
emit_idx = src.index("self._agentic_emit_step_log(")
return_idx = src.index("return scheduler_output", update_idx)
assert update_idx < emit_idx < return_idx, (
"emit hook must be after update_after_schedule and before return"
)
def test_bench_script_threads_step_log_env():
bench = (
Path(__file__).resolve().parent.parent / "scripts" / "bench.sh"
).read_text()
assert "AGENTIC_STEP_LOG_DIR" in bench
assert "AGENTIC_STEP_LOG_PATH" in bench
assert "AGENTIC_WORKER_ID" in bench

View File

@@ -287,6 +287,29 @@ class Scheduler(SchedulerInterface):
self._pause_state: PauseState = PauseState.UNPAUSED
# agentic-kv: optional step-level JSONL logger for B2 interference
# analysis. Activated by env vars; no-op otherwise.
self._agentic_step_log_fh = None
self._agentic_step_id = 0
self._agentic_worker_id = None
import os as _os
_step_path = _os.environ.get("AGENTIC_STEP_LOG_PATH")
if _step_path:
try:
_os.makedirs(_os.path.dirname(_step_path) or ".", exist_ok=True)
self._agentic_step_log_fh = open(_step_path, "a", buffering=1)
self._agentic_worker_id = _os.environ.get(
"AGENTIC_WORKER_ID",
f"dp{self.parallel_config.data_parallel_rank}",
)
logger.info(
"agentic-kv step log enabled: path=%s worker_id=%s",
_step_path, self._agentic_worker_id,
)
except Exception as _exc:
logger.warning("agentic-kv step log disabled (%r)", _exc)
self._agentic_step_log_fh = None
def _mamba_block_aligned_split(
self,
request: Request,
@@ -926,8 +949,83 @@ class Scheduler(SchedulerInterface):
with record_function_or_nullcontext("schedule: update_after_schedule"):
self._update_after_schedule(scheduler_output)
if self._agentic_step_log_fh is not None:
self._agentic_emit_step_log(
scheduled_timestamp=scheduled_timestamp,
num_scheduled_tokens=num_scheduled_tokens,
total_num_scheduled_tokens=total_num_scheduled_tokens,
scheduled_new_reqs=scheduled_new_reqs,
scheduled_resumed_reqs=scheduled_resumed_reqs,
scheduled_running_reqs=scheduled_running_reqs,
preempted_reqs=preempted_reqs,
)
return scheduler_output
def _agentic_emit_step_log(
self,
scheduled_timestamp: float,
num_scheduled_tokens: dict[str, int],
total_num_scheduled_tokens: int,
scheduled_new_reqs: list[Request],
scheduled_resumed_reqs: list[Request],
scheduled_running_reqs: list[Request],
preempted_reqs: list[Request],
) -> None:
"""Emit one JSONL line per scheduler step for agentic-kv B2 analysis.
Cheap when enabled (a few dozen dict lookups + one write). When the
env var AGENTIC_STEP_LOG_PATH is unset the caller does not invoke
this method at all.
"""
import json as _json
import time as _time
new_ids = {r.request_id for r in scheduled_new_reqs}
resumed_ids = {r.request_id for r in scheduled_resumed_reqs}
running_ids = {r.request_id for r in scheduled_running_reqs}
per_req: list[dict[str, Any]] = []
prefill_tokens = 0
decode_tokens = 0
for rid, n in num_scheduled_tokens.items():
is_new = rid in new_ids
is_resumed = rid in resumed_ids
# Heuristic: any step touching a new/resumed request is prefill;
# otherwise per-step tokens >1 is chunked prefill, ==1 is decode.
if is_new or is_resumed or n > 1:
prefill_tokens += n
phase = "prefill"
else:
decode_tokens += n
phase = "decode"
per_req.append({
"rid": rid, "n": n, "phase": phase,
"is_new": is_new, "is_resumed": is_resumed,
})
record = {
"t_unix": _time.time(),
"t_monotonic": scheduled_timestamp,
"step_id": self._agentic_step_id,
"worker_id": self._agentic_worker_id,
"total_scheduled_tokens": total_num_scheduled_tokens,
"prefill_tokens": prefill_tokens,
"decode_tokens": decode_tokens,
"n_new": len(scheduled_new_reqs),
"n_resumed": len(scheduled_resumed_reqs),
"n_running_scheduled": len(scheduled_running_reqs),
"n_running_total": len(self.running),
"n_waiting": len(self.waiting),
"n_preempted": len(preempted_reqs),
"preempted_ids": [r.request_id for r in preempted_reqs],
"per_req": per_req,
}
try:
self._agentic_step_log_fh.write(_json.dumps(record) + "\n")
except Exception as _exc:
logger.warning("agentic-kv step log write failed (%r)", _exc)
self._agentic_step_log_fh = None
self._agentic_step_id += 1
def _preempt_request(self, request: Request, timestamp: float) -> None:
"""Preempt a request and put it back to the waiting queue.