B3: load_only + sticky policies, capped-trace builder, sweep driver

Three additions land together because B3's whole point is comparing
LMetric against meaningful controls.

- scripts/cache_aware_proxy.py: two new --policy values.
  - load_only: pure min(num_requests) routing, no cache or affinity.
    The B3 control that strips locality so the LMetric-vs-load gap is
    legible.
  - sticky: first turn goes to min-load, subsequent turns ALWAYS
    return to the same instance, even under saturation. The B3
    control that maxes out locality so the hot-spot cost is legible.
- scripts/build_capped_trace.py: per-session turn cap (default 8).
  Generates the session-mass-equalized variant the TODO calls for so
  that hot-spot index can be re-measured with the heavy-tail removed.
- scripts/b3_sweep.sh: orchestrates the 5-cell sweep.
  - GPU_INDICES makes it easy to skip a dead GPU.
  - EXTRA_VLLM_ARGS defaults to --enable-prompt-tokens-details so
    usage.prompt_tokens_details.cached_tokens is populated. vLLM
    0.18.1 omits the field by default and breaks the reuse-decomp
    pipeline; the smoke run surfaced this.
  - Trap kills EngineCore by name in addition to "vllm serve" — the
    parent dies first but the child holds GPU memory. Was the root
    cause of the 89 GB ghost on GPU 0 earlier today.
  - Proxy readiness is a polling loop, not a fixed sleep.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 17:54:24 +08:00
parent 763355b825
commit c6b7c3471b
4 changed files with 352 additions and 1 deletions

177
scripts/b3_sweep.sh Executable file
View File

@@ -0,0 +1,177 @@
#!/usr/bin/env bash
# B3 routing sweep: 5 policies on 8x TP1 instances with full instrumentation.
#
# Policies:
# lmetric — cache-aware P_tokens × BS routing (main baseline)
# load_only — pure min-num_requests (B3 control: no cache)
# sticky — hard session affinity (B3 control: perfect locality)
# unified — hybrid affinity + LMetric fallback
# capped — lmetric on a per-session turn-capped trace
#
# Each policy run produces metrics.jsonl + breakdown.json + worker_state.json
# + run_window.json (start/end unix timestamps so the analyzer can slice the
# shared engine_*.jsonl by time).
set -euo pipefail
ROOT="${ROOT:-/home/admin/cpfs/wjh/agentic-kv}"
VENV="$ROOT/.venv/bin"
MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}"
TRACE="${TRACE:-$ROOT/traces/w600_r0.0015_st30.jsonl}"
OUTDIR="${OUTDIR:-$ROOT/outputs/b3_sweep_$(date +%Y%m%d_%H%M%S)}"
PROXY_PORT="${PROXY_PORT:-9300}"
BASE_PORT="${BASE_PORT:-8000}"
# Space-separated list of GPU indices to use, one vLLM instance per index.
# Override via GPU_INDICES="1 2 3 4 5 6 7" when GPU 0 holds ghost memory.
GPU_INDICES="${GPU_INDICES:-0 1 2 3 4 5 6 7}"
POLICIES="${POLICIES:-lmetric load_only sticky unified}"
MAX_TURNS_CAP="${MAX_TURNS_CAP:-8}"
EXTRA_VLLM_ARGS="${EXTRA_VLLM_ARGS:---enable-prompt-tokens-details}"
# Derive N_INSTANCES from GPU_INDICES
N_INSTANCES=$(echo $GPU_INDICES | wc -w)
mkdir -p "$OUTDIR/engine_state" "$OUTDIR/logs"
echo "[b3_sweep] OUTDIR=$OUTDIR"
cleanup() {
pkill -9 -f "vllm serve" 2>/dev/null || true
# vLLM spawns an EngineCore child whose process name is
# "VLLM::EngineCor" — pkill -f "vllm serve" misses it and leaves
# the GPU memory locked by a dead-but-tracked-by-driver context.
pkill -9 -f "EngineCore" 2>/dev/null || true
pkill -9 -f cache_aware_proxy 2>/dev/null || true
sleep 3
}
trap cleanup EXIT
# 1) Launch one vLLM per GPU index in GPU_INDICES; each emits engine_<i>.jsonl
launch_vllm() {
echo "[b3_sweep] launching $N_INSTANCES vLLM instances on GPUs $GPU_INDICES ..."
local i=0
for gpu in $GPU_INDICES; do
local port=$((BASE_PORT + i))
local master=$((29500 + i))
local log="$OUTDIR/logs/vllm_inst_${i}_gpu${gpu}.log"
AGENTIC_STEP_LOG_PATH="$OUTDIR/engine_state/engine_${i}.jsonl" \
AGENTIC_WORKER_ID="engine_${i}" \
CUDA_VISIBLE_DEVICES=$gpu \
MASTER_PORT=$master \
nohup "$VENV/vllm" serve "$MODEL" \
--host 0.0.0.0 --port "$port" \
--tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching \
--dtype auto --gpu-memory-utilization 0.9 \
--max-model-len 200000 \
$EXTRA_VLLM_ARGS \
> "$log" 2>&1 &
disown
sleep 2
i=$((i + 1))
done
echo "[b3_sweep] waiting for vLLM health ..."
for i in $(seq 0 $((N_INSTANCES - 1))); do
local port=$((BASE_PORT + i))
local tries=0
while ! curl -sf "http://127.0.0.1:$port/health" >/dev/null 2>&1; do
tries=$((tries + 1))
if [ $tries -gt 90 ]; then
echo "[b3_sweep] FATAL: inst_$i (port $port) not healthy after 180s"
exit 1
fi
sleep 2
done
echo " inst_$i ready"
done
}
launch_proxy() {
local policy="$1"
local logfile="$2"
local combined_args=""
for i in $(seq 0 $((N_INSTANCES - 1))); do
combined_args="$combined_args http://127.0.0.1:$((BASE_PORT + i))"
done
nohup "$VENV/python" "$ROOT/scripts/cache_aware_proxy.py" \
--port "$PROXY_PORT" \
--combined $combined_args \
--policy "$policy" \
> "$logfile" 2>&1 &
disown
local tries=0
until curl -sf "http://127.0.0.1:$PROXY_PORT/stats" >/dev/null 2>&1; do
tries=$((tries + 1))
if [ $tries -gt 30 ]; then
echo "[b3_sweep] FATAL: proxy did not come up in 60s"
tail -30 "$logfile"
exit 1
fi
sleep 2
done
}
run_policy() {
local policy="$1"
local trace="$2"
local rundir="$OUTDIR/$policy"
mkdir -p "$rundir"
echo "[b3_sweep] === policy=$policy trace=$(basename "$trace") ==="
pkill -9 -f cache_aware_proxy 2>/dev/null || true
sleep 2
launch_proxy "$policy" "$rundir/proxy.log"
local t_start
t_start=$(date +%s.%N)
echo "{\"policy\": \"$policy\", \"trace\": \"$trace\", \"t_start_unix\": $t_start}" \
> "$rundir/run_window.json.partial"
PYTHONPATH="$ROOT" "$VENV/python" -m replayer \
--trace "$trace" \
--output "$rundir/metrics.jsonl" \
--endpoint "http://127.0.0.1:$PROXY_PORT" \
--model "$MODEL" \
2>&1 | tee "$rundir/replayer.log" | tail -3
local t_end
t_end=$(date +%s.%N)
python3 - "$rundir" "$policy" "$trace" "$t_start" "$t_end" <<'PY'
import json, sys
rundir, policy, trace, t_start, t_end = sys.argv[1:]
with open(f"{rundir}/run_window.json", "w") as f:
json.dump({
"policy": policy, "trace": trace,
"t_start_unix": float(t_start),
"t_end_unix": float(t_end),
}, f, indent=2)
PY
rm -f "$rundir/run_window.json.partial"
curl -s "http://127.0.0.1:$PROXY_PORT/breakdown" > "$rundir/breakdown.json"
curl -s "http://127.0.0.1:$PROXY_PORT/worker_state" > "$rundir/worker_state.json"
curl -s "http://127.0.0.1:$PROXY_PORT/stats" > "$rundir/stats.json"
echo "[b3_sweep] $policy done: $(wc -l < "$rundir/metrics.jsonl") metric rows"
}
# 2) Run each policy
launch_vllm
for policy in $POLICIES; do
run_policy "$policy" "$TRACE"
done
# 3) Capped variant on lmetric
echo "[b3_sweep] building capped trace (max_turns=$MAX_TURNS_CAP) ..."
CAPPED_TRACE="$OUTDIR/capped/trace.jsonl"
mkdir -p "$OUTDIR/capped"
"$VENV/python" "$ROOT/scripts/build_capped_trace.py" \
--input "$TRACE" \
--output "$CAPPED_TRACE" \
--max-turns "$MAX_TURNS_CAP" | tee "$OUTDIR/capped/build.log"
run_policy "capped" "$CAPPED_TRACE"
# 4) Snapshot final engine state file sizes for the analyzer
ls -l "$OUTDIR/engine_state/" > "$OUTDIR/engine_state_files.txt"
echo "[b3_sweep] sweep complete. OUTDIR=$OUTDIR"

View File

@@ -0,0 +1,80 @@
"""Cap per-session turn count to isolate session-mass effects in B3.
Input trace is grouped by session_id (or reconstructed from
parent_chat_id chains). Sessions with more than --max-turns turns are
truncated to keep only their first N turns in trace order. The output
preserves the original line ordering (timestamp order).
"""
from __future__ import annotations
import argparse
import json
from collections import defaultdict
from pathlib import Path
def _resolve_session_id(row: dict, chat_to_session: dict[int, str]) -> str:
if "session_id" in row:
return str(row["session_id"])
cid = int(row["chat_id"])
pcid = int(row["parent_chat_id"])
if pcid < 0:
sid = str(cid)
else:
sid = chat_to_session.get(pcid, str(pcid))
chat_to_session[cid] = sid
return sid
def main() -> None:
p = argparse.ArgumentParser(description="Cap per-session turn count")
p.add_argument("--input", type=Path, required=True)
p.add_argument("--output", type=Path, required=True)
p.add_argument("--max-turns", type=int, default=8,
help="Keep at most N earliest turns per session")
args = p.parse_args()
chat_to_session: dict[int, str] = {}
kept: dict[str, int] = defaultdict(int)
rows: list[tuple[str, dict]] = []
with args.input.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
row = json.loads(line)
sid = _resolve_session_id(row, chat_to_session)
row["session_id"] = sid
rows.append((sid, row))
in_n = len(rows)
sessions = len({sid for sid, _ in rows})
rows.sort(key=lambda x: (x[1]["session_id"], x[1].get("turn", 0)))
capped_rows: list[dict] = []
for sid, row in rows:
if kept[sid] >= args.max_turns:
continue
kept[sid] += 1
capped_rows.append(row)
capped_rows.sort(key=lambda r: r.get("timestamp", 0.0))
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as fh:
for r in capped_rows:
fh.write(json.dumps(r) + "\n")
print(f"input rows: {in_n}, sessions: {sessions}")
print(f"capped rows: {len(capped_rows)} (max_turns={args.max_turns})")
dropped = in_n - len(capped_rows)
print(f"dropped: {dropped} ({100 * dropped / max(in_n, 1):.1f}%)")
if capped_rows:
from collections import Counter
turns_dist = Counter(kept[s] for s in kept)
top = sorted(turns_dist.items())[:6]
print(f"turns/session (capped) sample: {top}")
if __name__ == "__main__":
main()

View File

@@ -178,6 +178,48 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None,
return instances[best_idx], best_idx
def pick_instance_load_only(
instances: list[InstanceState],
token_ids: list[int] | None,
session_id: str | None,
input_length: int,
affinity: dict[str, int],
) -> tuple[InstanceState, int]:
"""Pure load balancing: pick instance with fewest in-flight requests.
Ignores cache hits and session affinity. Used as a B3 control to
isolate the locality contribution of cache-aware policies.
"""
best_idx = min(range(len(instances)),
key=lambda i: instances[i].num_requests)
return instances[best_idx], best_idx
def pick_instance_sticky(
instances: list[InstanceState],
token_ids: list[int] | None,
session_id: str | None,
input_length: int,
affinity: dict[str, int],
) -> tuple[InstanceState, int]:
"""Hard session affinity: once assigned, never break.
First turn of a session picks the instance with the lowest
num_requests; subsequent turns always return to the same instance
regardless of load. Used as a B3 control to isolate the hot-spot
cost of perfect locality.
"""
if session_id and session_id in affinity:
idx = affinity[session_id]
if idx < len(instances):
return instances[idx], idx
best_idx = min(range(len(instances)),
key=lambda i: instances[i].num_requests)
if session_id:
affinity[session_id] = best_idx
return instances[best_idx], best_idx
def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] | None,
session_id: str | None, input_length: int,
affinity: dict[str, int]) -> tuple[InstanceState, int]:
@@ -585,6 +627,14 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
chosen, best_idx = pick_instance_lmetric(
combined_instances, token_ids, session_id, input_length,
session_affinity_combined)
elif policy == "load_only":
chosen, best_idx = pick_instance_load_only(
combined_instances, token_ids, session_id, input_length,
session_affinity_combined)
elif policy == "sticky":
chosen, best_idx = pick_instance_sticky(
combined_instances, token_ids, session_id, input_length,
session_affinity_combined)
elif policy == "unified":
chosen, best_idx, decision = pick_instance_unified_hybrid(
combined_instances, token_ids, session_id, input_length,
@@ -799,8 +849,10 @@ def parse_args():
p.add_argument("--bootstrap-ports", type=str, default="",
help="Comma-separated bootstrap ports for combined instances (for offload mode)")
p.add_argument("--policy", type=str, default="linear",
choices=["linear", "lmetric", "unified"],
choices=["linear", "lmetric", "load_only", "sticky", "unified"],
help="Routing policy: linear (cache-aware), lmetric (P_tokens × BS), "
"load_only (B3 control: pure min-num_requests), "
"sticky (B3 control: hard session affinity), "
"or unified (hybrid affinity + LMetric fallback)")
p.add_argument("--overload-factor", type=float, default=2.0,
help="Break session affinity when instance load > factor * avg")

View File

@@ -301,6 +301,48 @@ def test_settings_has_runtime_knobs(proxy):
s.cache_gate_ratio = saved
def test_pick_instance_load_only_picks_min_num_requests(proxy):
insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b"),
_make_inst(proxy, "http://c")]
insts[0].num_requests = 5
insts[1].num_requests = 2
insts[2].num_requests = 8
chosen, idx = proxy.pick_instance_load_only(insts, None, "sess1", 1000, {})
assert idx == 1 and chosen is insts[1]
def test_pick_instance_load_only_ignores_cache_hits(proxy):
insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")]
block_size = proxy.BLOCK_SIZE
prefix = [123] * (block_size * 4)
insts[0].record_prefix(prefix)
insts[0].num_requests = 10
insts[1].num_requests = 0
chosen, idx = proxy.pick_instance_load_only(insts, prefix, None,
len(prefix), {})
assert idx == 1, "load_only must ignore cache hit on inst[0]"
def test_pick_instance_sticky_first_turn_picks_min_load(proxy):
insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")]
insts[0].num_requests = 10
insts[1].num_requests = 2
affinity = {}
chosen, idx = proxy.pick_instance_sticky(insts, None, "sess1", 1000, affinity)
assert idx == 1
assert affinity == {"sess1": 1}
def test_pick_instance_sticky_subsequent_never_breaks(proxy):
"""Once assigned, sticky must never re-route even under massive overload."""
insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")]
affinity = {"sess1": 0}
insts[0].num_requests = 1_000_000
insts[1].num_requests = 0
chosen, idx = proxy.pick_instance_sticky(insts, None, "sess1", 1000, affinity)
assert idx == 0, "sticky must stay even when pinned instance is saturated"
def test_p_offload_penalty_uses_settings_heavy_threshold(proxy):
"""M2: tweaking SETTINGS.heavy_threshold changes the P-offload penalty."""
inst = proxy.InstanceState("http://x")