diff --git a/scripts/bench.sh b/scripts/bench.sh index 57f8df4..856a553 100755 --- a/scripts/bench.sh +++ b/scripts/bench.sh @@ -161,12 +161,24 @@ launch_instances() { use_mooncake=true fi + # Optional: when AGENTIC_STEP_LOG_DIR is exported, point each engine at its + # own JSONL file so the patched scheduler emits per-step records. + local step_log_dir="${AGENTIC_STEP_LOG_DIR:-}" + if [ -n "$step_log_dir" ]; then + mkdir -p "$step_log_dir" + fi + for i in $(seq 0 $((N_INSTANCES - 1))); do local port=$((BASE_PORT + i)) local master=$((29500 + i)) local logfile="$OUTDIR/vllm_inst_${i}.log" + local step_env="" + if [ -n "$step_log_dir" ]; then + step_env="AGENTIC_STEP_LOG_PATH=$step_log_dir/engine_${i}.jsonl AGENTIC_WORKER_ID=engine_${i}" + fi if [ "$use_mooncake" = "true" ]; then + env $step_env \ PYTHONHASHSEED=42 \ VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998 + i)) \ MASTER_PORT=$master \ @@ -180,6 +192,7 @@ launch_instances() { $vllm_extra_args \ > "$logfile" 2>&1 & else + env $step_env \ MASTER_PORT=$master \ CUDA_VISIBLE_DEVICES=$i \ $VLLM serve "$MODEL" \ diff --git a/tests/test_scheduler_step_log_patch.py b/tests/test_scheduler_step_log_patch.py new file mode 100644 index 0000000..ea265a3 --- /dev/null +++ b/tests/test_scheduler_step_log_patch.py @@ -0,0 +1,64 @@ +"""Tests for A3 vLLM scheduler patch: per-step JSONL log hooks. + +The vendored vLLM at third_party/vllm/vllm/v1/core/sched/scheduler.py must +contain the agentic-kv hooks. We test by inspecting the file on disk because +importing vllm.Scheduler requires the full GPU runtime — those imports are +expensive and not portable to CI. Hooks are anchor strings we control. +""" + +from __future__ import annotations + +import py_compile +from pathlib import Path + +SCHEDULER_PATH = ( + Path(__file__).resolve().parent.parent + / "third_party" / "vllm" / "vllm" / "v1" / "core" / "sched" / "scheduler.py" +) + + +def test_scheduler_file_exists(): + assert SCHEDULER_PATH.exists(), f"missing {SCHEDULER_PATH}" + + +def test_scheduler_syntactically_valid(): + py_compile.compile(str(SCHEDULER_PATH), doraise=True) + + +def test_scheduler_has_agentic_step_log_init_hook(): + src = SCHEDULER_PATH.read_text() + assert "AGENTIC_STEP_LOG_PATH" in src + assert "AGENTIC_WORKER_ID" in src + assert "self._agentic_step_log_fh" in src + assert "self._agentic_worker_id" in src + + +def test_scheduler_has_step_emit_helper(): + src = SCHEDULER_PATH.read_text() + assert "def _agentic_emit_step_log(" in src + assert "prefill_tokens" in src + assert "decode_tokens" in src + assert "n_running_total" in src + assert "n_waiting" in src + assert "per_req" in src + + +def test_scheduler_emit_is_invoked_in_schedule_return_path(): + """The emit call must sit between _update_after_schedule and return.""" + src = SCHEDULER_PATH.read_text() + assert "self._agentic_emit_step_log(" in src + update_idx = src.index("self._update_after_schedule(scheduler_output)") + emit_idx = src.index("self._agentic_emit_step_log(") + return_idx = src.index("return scheduler_output", update_idx) + assert update_idx < emit_idx < return_idx, ( + "emit hook must be after update_after_schedule and before return" + ) + + +def test_bench_script_threads_step_log_env(): + bench = ( + Path(__file__).resolve().parent.parent / "scripts" / "bench.sh" + ).read_text() + assert "AGENTIC_STEP_LOG_DIR" in bench + assert "AGENTIC_STEP_LOG_PATH" in bench + assert "AGENTIC_WORKER_ID" in bench diff --git a/third_party/vllm/vllm/v1/core/sched/scheduler.py b/third_party/vllm/vllm/v1/core/sched/scheduler.py index 528fc40..6f931ad 100644 --- a/third_party/vllm/vllm/v1/core/sched/scheduler.py +++ b/third_party/vllm/vllm/v1/core/sched/scheduler.py @@ -287,6 +287,29 @@ class Scheduler(SchedulerInterface): self._pause_state: PauseState = PauseState.UNPAUSED + # agentic-kv: optional step-level JSONL logger for B2 interference + # analysis. Activated by env vars; no-op otherwise. + self._agentic_step_log_fh = None + self._agentic_step_id = 0 + self._agentic_worker_id = None + import os as _os + _step_path = _os.environ.get("AGENTIC_STEP_LOG_PATH") + if _step_path: + try: + _os.makedirs(_os.path.dirname(_step_path) or ".", exist_ok=True) + self._agentic_step_log_fh = open(_step_path, "a", buffering=1) + self._agentic_worker_id = _os.environ.get( + "AGENTIC_WORKER_ID", + f"dp{self.parallel_config.data_parallel_rank}", + ) + logger.info( + "agentic-kv step log enabled: path=%s worker_id=%s", + _step_path, self._agentic_worker_id, + ) + except Exception as _exc: + logger.warning("agentic-kv step log disabled (%r)", _exc) + self._agentic_step_log_fh = None + def _mamba_block_aligned_split( self, request: Request, @@ -926,8 +949,83 @@ class Scheduler(SchedulerInterface): with record_function_or_nullcontext("schedule: update_after_schedule"): self._update_after_schedule(scheduler_output) + + if self._agentic_step_log_fh is not None: + self._agentic_emit_step_log( + scheduled_timestamp=scheduled_timestamp, + num_scheduled_tokens=num_scheduled_tokens, + total_num_scheduled_tokens=total_num_scheduled_tokens, + scheduled_new_reqs=scheduled_new_reqs, + scheduled_resumed_reqs=scheduled_resumed_reqs, + scheduled_running_reqs=scheduled_running_reqs, + preempted_reqs=preempted_reqs, + ) + return scheduler_output + def _agentic_emit_step_log( + self, + scheduled_timestamp: float, + num_scheduled_tokens: dict[str, int], + total_num_scheduled_tokens: int, + scheduled_new_reqs: list[Request], + scheduled_resumed_reqs: list[Request], + scheduled_running_reqs: list[Request], + preempted_reqs: list[Request], + ) -> None: + """Emit one JSONL line per scheduler step for agentic-kv B2 analysis. + + Cheap when enabled (a few dozen dict lookups + one write). When the + env var AGENTIC_STEP_LOG_PATH is unset the caller does not invoke + this method at all. + """ + import json as _json + import time as _time + new_ids = {r.request_id for r in scheduled_new_reqs} + resumed_ids = {r.request_id for r in scheduled_resumed_reqs} + running_ids = {r.request_id for r in scheduled_running_reqs} + per_req: list[dict[str, Any]] = [] + prefill_tokens = 0 + decode_tokens = 0 + for rid, n in num_scheduled_tokens.items(): + is_new = rid in new_ids + is_resumed = rid in resumed_ids + # Heuristic: any step touching a new/resumed request is prefill; + # otherwise per-step tokens >1 is chunked prefill, ==1 is decode. + if is_new or is_resumed or n > 1: + prefill_tokens += n + phase = "prefill" + else: + decode_tokens += n + phase = "decode" + per_req.append({ + "rid": rid, "n": n, "phase": phase, + "is_new": is_new, "is_resumed": is_resumed, + }) + record = { + "t_unix": _time.time(), + "t_monotonic": scheduled_timestamp, + "step_id": self._agentic_step_id, + "worker_id": self._agentic_worker_id, + "total_scheduled_tokens": total_num_scheduled_tokens, + "prefill_tokens": prefill_tokens, + "decode_tokens": decode_tokens, + "n_new": len(scheduled_new_reqs), + "n_resumed": len(scheduled_resumed_reqs), + "n_running_scheduled": len(scheduled_running_reqs), + "n_running_total": len(self.running), + "n_waiting": len(self.waiting), + "n_preempted": len(preempted_reqs), + "preempted_ids": [r.request_id for r in preempted_reqs], + "per_req": per_req, + } + try: + self._agentic_step_log_fh.write(_json.dumps(record) + "\n") + except Exception as _exc: + logger.warning("agentic-kv step log write failed (%r)", _exc) + self._agentic_step_log_fh = None + self._agentic_step_id += 1 + def _preempt_request(self, request: Request, timestamp: float) -> None: """Preempt a request and put it back to the waiting queue.