From c6b7c3471b537588177ef5b7886ed4f12f55a2d8 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Mon, 25 May 2026 17:54:24 +0800 Subject: [PATCH] B3: load_only + sticky policies, capped-trace builder, sweep driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three additions land together because B3's whole point is comparing LMetric against meaningful controls. - scripts/cache_aware_proxy.py: two new --policy values. - load_only: pure min(num_requests) routing, no cache or affinity. The B3 control that strips locality so the LMetric-vs-load gap is legible. - sticky: first turn goes to min-load, subsequent turns ALWAYS return to the same instance, even under saturation. The B3 control that maxes out locality so the hot-spot cost is legible. - scripts/build_capped_trace.py: per-session turn cap (default 8). Generates the session-mass-equalized variant the TODO calls for so that hot-spot index can be re-measured with the heavy-tail removed. - scripts/b3_sweep.sh: orchestrates the 5-cell sweep. - GPU_INDICES makes it easy to skip a dead GPU. - EXTRA_VLLM_ARGS defaults to --enable-prompt-tokens-details so usage.prompt_tokens_details.cached_tokens is populated. vLLM 0.18.1 omits the field by default and breaks the reuse-decomp pipeline; the smoke run surfaced this. - Trap kills EngineCore by name in addition to "vllm serve" — the parent dies first but the child holds GPU memory. Was the root cause of the 89 GB ghost on GPU 0 earlier today. - Proxy readiness is a polling loop, not a fixed sleep. Co-Authored-By: Claude Opus 4.7 --- scripts/b3_sweep.sh | 177 ++++++++++++++++++++++++++++++++++ scripts/build_capped_trace.py | 80 +++++++++++++++ scripts/cache_aware_proxy.py | 54 ++++++++++- tests/test_proxy_pick.py | 42 ++++++++ 4 files changed, 352 insertions(+), 1 deletion(-) create mode 100755 scripts/b3_sweep.sh create mode 100644 scripts/build_capped_trace.py diff --git a/scripts/b3_sweep.sh b/scripts/b3_sweep.sh new file mode 100755 index 0000000..6e02702 --- /dev/null +++ b/scripts/b3_sweep.sh @@ -0,0 +1,177 @@ +#!/usr/bin/env bash +# B3 routing sweep: 5 policies on 8x TP1 instances with full instrumentation. +# +# Policies: +# lmetric — cache-aware P_tokens × BS routing (main baseline) +# load_only — pure min-num_requests (B3 control: no cache) +# sticky — hard session affinity (B3 control: perfect locality) +# unified — hybrid affinity + LMetric fallback +# capped — lmetric on a per-session turn-capped trace +# +# Each policy run produces metrics.jsonl + breakdown.json + worker_state.json +# + run_window.json (start/end unix timestamps so the analyzer can slice the +# shared engine_*.jsonl by time). + +set -euo pipefail + +ROOT="${ROOT:-/home/admin/cpfs/wjh/agentic-kv}" +VENV="$ROOT/.venv/bin" +MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}" +TRACE="${TRACE:-$ROOT/traces/w600_r0.0015_st30.jsonl}" +OUTDIR="${OUTDIR:-$ROOT/outputs/b3_sweep_$(date +%Y%m%d_%H%M%S)}" +PROXY_PORT="${PROXY_PORT:-9300}" +BASE_PORT="${BASE_PORT:-8000}" +# Space-separated list of GPU indices to use, one vLLM instance per index. +# Override via GPU_INDICES="1 2 3 4 5 6 7" when GPU 0 holds ghost memory. +GPU_INDICES="${GPU_INDICES:-0 1 2 3 4 5 6 7}" +POLICIES="${POLICIES:-lmetric load_only sticky unified}" +MAX_TURNS_CAP="${MAX_TURNS_CAP:-8}" +EXTRA_VLLM_ARGS="${EXTRA_VLLM_ARGS:---enable-prompt-tokens-details}" + +# Derive N_INSTANCES from GPU_INDICES +N_INSTANCES=$(echo $GPU_INDICES | wc -w) + +mkdir -p "$OUTDIR/engine_state" "$OUTDIR/logs" +echo "[b3_sweep] OUTDIR=$OUTDIR" + +cleanup() { + pkill -9 -f "vllm serve" 2>/dev/null || true + # vLLM spawns an EngineCore child whose process name is + # "VLLM::EngineCor" — pkill -f "vllm serve" misses it and leaves + # the GPU memory locked by a dead-but-tracked-by-driver context. + pkill -9 -f "EngineCore" 2>/dev/null || true + pkill -9 -f cache_aware_proxy 2>/dev/null || true + sleep 3 +} +trap cleanup EXIT + +# 1) Launch one vLLM per GPU index in GPU_INDICES; each emits engine_.jsonl +launch_vllm() { + echo "[b3_sweep] launching $N_INSTANCES vLLM instances on GPUs $GPU_INDICES ..." + local i=0 + for gpu in $GPU_INDICES; do + local port=$((BASE_PORT + i)) + local master=$((29500 + i)) + local log="$OUTDIR/logs/vllm_inst_${i}_gpu${gpu}.log" + AGENTIC_STEP_LOG_PATH="$OUTDIR/engine_state/engine_${i}.jsonl" \ + AGENTIC_WORKER_ID="engine_${i}" \ + CUDA_VISIBLE_DEVICES=$gpu \ + MASTER_PORT=$master \ + nohup "$VENV/vllm" serve "$MODEL" \ + --host 0.0.0.0 --port "$port" \ + --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching \ + --dtype auto --gpu-memory-utilization 0.9 \ + --max-model-len 200000 \ + $EXTRA_VLLM_ARGS \ + > "$log" 2>&1 & + disown + sleep 2 + i=$((i + 1)) + done + + echo "[b3_sweep] waiting for vLLM health ..." + for i in $(seq 0 $((N_INSTANCES - 1))); do + local port=$((BASE_PORT + i)) + local tries=0 + while ! curl -sf "http://127.0.0.1:$port/health" >/dev/null 2>&1; do + tries=$((tries + 1)) + if [ $tries -gt 90 ]; then + echo "[b3_sweep] FATAL: inst_$i (port $port) not healthy after 180s" + exit 1 + fi + sleep 2 + done + echo " inst_$i ready" + done +} + +launch_proxy() { + local policy="$1" + local logfile="$2" + local combined_args="" + for i in $(seq 0 $((N_INSTANCES - 1))); do + combined_args="$combined_args http://127.0.0.1:$((BASE_PORT + i))" + done + nohup "$VENV/python" "$ROOT/scripts/cache_aware_proxy.py" \ + --port "$PROXY_PORT" \ + --combined $combined_args \ + --policy "$policy" \ + > "$logfile" 2>&1 & + disown + local tries=0 + until curl -sf "http://127.0.0.1:$PROXY_PORT/stats" >/dev/null 2>&1; do + tries=$((tries + 1)) + if [ $tries -gt 30 ]; then + echo "[b3_sweep] FATAL: proxy did not come up in 60s" + tail -30 "$logfile" + exit 1 + fi + sleep 2 + done +} + +run_policy() { + local policy="$1" + local trace="$2" + local rundir="$OUTDIR/$policy" + mkdir -p "$rundir" + echo "[b3_sweep] === policy=$policy trace=$(basename "$trace") ===" + + pkill -9 -f cache_aware_proxy 2>/dev/null || true + sleep 2 + launch_proxy "$policy" "$rundir/proxy.log" + + local t_start + t_start=$(date +%s.%N) + echo "{\"policy\": \"$policy\", \"trace\": \"$trace\", \"t_start_unix\": $t_start}" \ + > "$rundir/run_window.json.partial" + + PYTHONPATH="$ROOT" "$VENV/python" -m replayer \ + --trace "$trace" \ + --output "$rundir/metrics.jsonl" \ + --endpoint "http://127.0.0.1:$PROXY_PORT" \ + --model "$MODEL" \ + 2>&1 | tee "$rundir/replayer.log" | tail -3 + + local t_end + t_end=$(date +%s.%N) + python3 - "$rundir" "$policy" "$trace" "$t_start" "$t_end" <<'PY' +import json, sys +rundir, policy, trace, t_start, t_end = sys.argv[1:] +with open(f"{rundir}/run_window.json", "w") as f: + json.dump({ + "policy": policy, "trace": trace, + "t_start_unix": float(t_start), + "t_end_unix": float(t_end), + }, f, indent=2) +PY + rm -f "$rundir/run_window.json.partial" + + curl -s "http://127.0.0.1:$PROXY_PORT/breakdown" > "$rundir/breakdown.json" + curl -s "http://127.0.0.1:$PROXY_PORT/worker_state" > "$rundir/worker_state.json" + curl -s "http://127.0.0.1:$PROXY_PORT/stats" > "$rundir/stats.json" + echo "[b3_sweep] $policy done: $(wc -l < "$rundir/metrics.jsonl") metric rows" +} + +# 2) Run each policy +launch_vllm + +for policy in $POLICIES; do + run_policy "$policy" "$TRACE" +done + +# 3) Capped variant on lmetric +echo "[b3_sweep] building capped trace (max_turns=$MAX_TURNS_CAP) ..." +CAPPED_TRACE="$OUTDIR/capped/trace.jsonl" +mkdir -p "$OUTDIR/capped" +"$VENV/python" "$ROOT/scripts/build_capped_trace.py" \ + --input "$TRACE" \ + --output "$CAPPED_TRACE" \ + --max-turns "$MAX_TURNS_CAP" | tee "$OUTDIR/capped/build.log" +run_policy "capped" "$CAPPED_TRACE" + +# 4) Snapshot final engine state file sizes for the analyzer +ls -l "$OUTDIR/engine_state/" > "$OUTDIR/engine_state_files.txt" + +echo "[b3_sweep] sweep complete. OUTDIR=$OUTDIR" diff --git a/scripts/build_capped_trace.py b/scripts/build_capped_trace.py new file mode 100644 index 0000000..94665da --- /dev/null +++ b/scripts/build_capped_trace.py @@ -0,0 +1,80 @@ +"""Cap per-session turn count to isolate session-mass effects in B3. + +Input trace is grouped by session_id (or reconstructed from +parent_chat_id chains). Sessions with more than --max-turns turns are +truncated to keep only their first N turns in trace order. The output +preserves the original line ordering (timestamp order). +""" + +from __future__ import annotations + +import argparse +import json +from collections import defaultdict +from pathlib import Path + + +def _resolve_session_id(row: dict, chat_to_session: dict[int, str]) -> str: + if "session_id" in row: + return str(row["session_id"]) + cid = int(row["chat_id"]) + pcid = int(row["parent_chat_id"]) + if pcid < 0: + sid = str(cid) + else: + sid = chat_to_session.get(pcid, str(pcid)) + chat_to_session[cid] = sid + return sid + + +def main() -> None: + p = argparse.ArgumentParser(description="Cap per-session turn count") + p.add_argument("--input", type=Path, required=True) + p.add_argument("--output", type=Path, required=True) + p.add_argument("--max-turns", type=int, default=8, + help="Keep at most N earliest turns per session") + args = p.parse_args() + + chat_to_session: dict[int, str] = {} + kept: dict[str, int] = defaultdict(int) + rows: list[tuple[str, dict]] = [] + with args.input.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line: + continue + row = json.loads(line) + sid = _resolve_session_id(row, chat_to_session) + row["session_id"] = sid + rows.append((sid, row)) + + in_n = len(rows) + sessions = len({sid for sid, _ in rows}) + + rows.sort(key=lambda x: (x[1]["session_id"], x[1].get("turn", 0))) + capped_rows: list[dict] = [] + for sid, row in rows: + if kept[sid] >= args.max_turns: + continue + kept[sid] += 1 + capped_rows.append(row) + capped_rows.sort(key=lambda r: r.get("timestamp", 0.0)) + + args.output.parent.mkdir(parents=True, exist_ok=True) + with args.output.open("w", encoding="utf-8") as fh: + for r in capped_rows: + fh.write(json.dumps(r) + "\n") + + print(f"input rows: {in_n}, sessions: {sessions}") + print(f"capped rows: {len(capped_rows)} (max_turns={args.max_turns})") + dropped = in_n - len(capped_rows) + print(f"dropped: {dropped} ({100 * dropped / max(in_n, 1):.1f}%)") + if capped_rows: + from collections import Counter + turns_dist = Counter(kept[s] for s in kept) + top = sorted(turns_dist.items())[:6] + print(f"turns/session (capped) sample: {top}") + + +if __name__ == "__main__": + main() diff --git a/scripts/cache_aware_proxy.py b/scripts/cache_aware_proxy.py index 171de6d..314c445 100644 --- a/scripts/cache_aware_proxy.py +++ b/scripts/cache_aware_proxy.py @@ -178,6 +178,48 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, return instances[best_idx], best_idx +def pick_instance_load_only( + instances: list[InstanceState], + token_ids: list[int] | None, + session_id: str | None, + input_length: int, + affinity: dict[str, int], +) -> tuple[InstanceState, int]: + """Pure load balancing: pick instance with fewest in-flight requests. + + Ignores cache hits and session affinity. Used as a B3 control to + isolate the locality contribution of cache-aware policies. + """ + best_idx = min(range(len(instances)), + key=lambda i: instances[i].num_requests) + return instances[best_idx], best_idx + + +def pick_instance_sticky( + instances: list[InstanceState], + token_ids: list[int] | None, + session_id: str | None, + input_length: int, + affinity: dict[str, int], +) -> tuple[InstanceState, int]: + """Hard session affinity: once assigned, never break. + + First turn of a session picks the instance with the lowest + num_requests; subsequent turns always return to the same instance + regardless of load. Used as a B3 control to isolate the hot-spot + cost of perfect locality. + """ + if session_id and session_id in affinity: + idx = affinity[session_id] + if idx < len(instances): + return instances[idx], idx + best_idx = min(range(len(instances)), + key=lambda i: instances[i].num_requests) + if session_id: + affinity[session_id] = best_idx + return instances[best_idx], best_idx + + def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] | None, session_id: str | None, input_length: int, affinity: dict[str, int]) -> tuple[InstanceState, int]: @@ -585,6 +627,14 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h chosen, best_idx = pick_instance_lmetric( combined_instances, token_ids, session_id, input_length, session_affinity_combined) + elif policy == "load_only": + chosen, best_idx = pick_instance_load_only( + combined_instances, token_ids, session_id, input_length, + session_affinity_combined) + elif policy == "sticky": + chosen, best_idx = pick_instance_sticky( + combined_instances, token_ids, session_id, input_length, + session_affinity_combined) elif policy == "unified": chosen, best_idx, decision = pick_instance_unified_hybrid( combined_instances, token_ids, session_id, input_length, @@ -799,8 +849,10 @@ def parse_args(): p.add_argument("--bootstrap-ports", type=str, default="", help="Comma-separated bootstrap ports for combined instances (for offload mode)") p.add_argument("--policy", type=str, default="linear", - choices=["linear", "lmetric", "unified"], + choices=["linear", "lmetric", "load_only", "sticky", "unified"], help="Routing policy: linear (cache-aware), lmetric (P_tokens × BS), " + "load_only (B3 control: pure min-num_requests), " + "sticky (B3 control: hard session affinity), " "or unified (hybrid affinity + LMetric fallback)") p.add_argument("--overload-factor", type=float, default=2.0, help="Break session affinity when instance load > factor * avg") diff --git a/tests/test_proxy_pick.py b/tests/test_proxy_pick.py index e076753..319558b 100644 --- a/tests/test_proxy_pick.py +++ b/tests/test_proxy_pick.py @@ -301,6 +301,48 @@ def test_settings_has_runtime_knobs(proxy): s.cache_gate_ratio = saved +def test_pick_instance_load_only_picks_min_num_requests(proxy): + insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b"), + _make_inst(proxy, "http://c")] + insts[0].num_requests = 5 + insts[1].num_requests = 2 + insts[2].num_requests = 8 + chosen, idx = proxy.pick_instance_load_only(insts, None, "sess1", 1000, {}) + assert idx == 1 and chosen is insts[1] + + +def test_pick_instance_load_only_ignores_cache_hits(proxy): + insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")] + block_size = proxy.BLOCK_SIZE + prefix = [123] * (block_size * 4) + insts[0].record_prefix(prefix) + insts[0].num_requests = 10 + insts[1].num_requests = 0 + chosen, idx = proxy.pick_instance_load_only(insts, prefix, None, + len(prefix), {}) + assert idx == 1, "load_only must ignore cache hit on inst[0]" + + +def test_pick_instance_sticky_first_turn_picks_min_load(proxy): + insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")] + insts[0].num_requests = 10 + insts[1].num_requests = 2 + affinity = {} + chosen, idx = proxy.pick_instance_sticky(insts, None, "sess1", 1000, affinity) + assert idx == 1 + assert affinity == {"sess1": 1} + + +def test_pick_instance_sticky_subsequent_never_breaks(proxy): + """Once assigned, sticky must never re-route even under massive overload.""" + insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")] + affinity = {"sess1": 0} + insts[0].num_requests = 1_000_000 + insts[1].num_requests = 0 + chosen, idx = proxy.pick_instance_sticky(insts, None, "sess1", 1000, affinity) + assert idx == 0, "sticky must stay even when pinned instance is saturated" + + def test_p_offload_penalty_uses_settings_heavy_threshold(proxy): """M2: tweaking SETTINGS.heavy_threshold changes the P-offload penalty.""" inst = proxy.InstanceState("http://x")