When AGENTIC_STEP_LOG_PATH is set, the scheduler emits one JSONL line
per scheduler step with t_unix, worker_id, prefill/decode token
counts, n_running/n_waiting, preempted ids, and per-request phase
labels. No-op when the env var is unset, so production engines are
not impacted. bench.sh now threads AGENTIC_STEP_LOG_DIR through to
each per-engine launch so step logs end up at engine_${i}.jsonl.
Required by Batch 2 (PD-colo interference index) and Batch 5
(same-worker overlap attribution); engine /metrics polling cannot
provide per-step granularity.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
65 lines
2.1 KiB
Python
65 lines
2.1 KiB
Python
"""Tests for A3 vLLM scheduler patch: per-step JSONL log hooks.
|
|
|
|
The vendored vLLM at third_party/vllm/vllm/v1/core/sched/scheduler.py must
|
|
contain the agentic-kv hooks. We test by inspecting the file on disk because
|
|
importing vllm.Scheduler requires the full GPU runtime — those imports are
|
|
expensive and not portable to CI. Hooks are anchor strings we control.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import py_compile
|
|
from pathlib import Path
|
|
|
|
SCHEDULER_PATH = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "third_party" / "vllm" / "vllm" / "v1" / "core" / "sched" / "scheduler.py"
|
|
)
|
|
|
|
|
|
def test_scheduler_file_exists():
|
|
assert SCHEDULER_PATH.exists(), f"missing {SCHEDULER_PATH}"
|
|
|
|
|
|
def test_scheduler_syntactically_valid():
|
|
py_compile.compile(str(SCHEDULER_PATH), doraise=True)
|
|
|
|
|
|
def test_scheduler_has_agentic_step_log_init_hook():
|
|
src = SCHEDULER_PATH.read_text()
|
|
assert "AGENTIC_STEP_LOG_PATH" in src
|
|
assert "AGENTIC_WORKER_ID" in src
|
|
assert "self._agentic_step_log_fh" in src
|
|
assert "self._agentic_worker_id" in src
|
|
|
|
|
|
def test_scheduler_has_step_emit_helper():
|
|
src = SCHEDULER_PATH.read_text()
|
|
assert "def _agentic_emit_step_log(" in src
|
|
assert "prefill_tokens" in src
|
|
assert "decode_tokens" in src
|
|
assert "n_running_total" in src
|
|
assert "n_waiting" in src
|
|
assert "per_req" in src
|
|
|
|
|
|
def test_scheduler_emit_is_invoked_in_schedule_return_path():
|
|
"""The emit call must sit between _update_after_schedule and return."""
|
|
src = SCHEDULER_PATH.read_text()
|
|
assert "self._agentic_emit_step_log(" in src
|
|
update_idx = src.index("self._update_after_schedule(scheduler_output)")
|
|
emit_idx = src.index("self._agentic_emit_step_log(")
|
|
return_idx = src.index("return scheduler_output", update_idx)
|
|
assert update_idx < emit_idx < return_idx, (
|
|
"emit hook must be after update_after_schedule and before return"
|
|
)
|
|
|
|
|
|
def test_bench_script_threads_step_log_env():
|
|
bench = (
|
|
Path(__file__).resolve().parent.parent / "scripts" / "bench.sh"
|
|
).read_text()
|
|
assert "AGENTIC_STEP_LOG_DIR" in bench
|
|
assert "AGENTIC_STEP_LOG_PATH" in bench
|
|
assert "AGENTIC_WORKER_ID" in bench
|