From 5b26c345f4b6678ee1abfd2222ffe854f3880fb4 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Thu, 28 May 2026 20:21:12 +0800 Subject: [PATCH] P2: all routing policies read real state via eff_ accessors + ablation harness InstanceState.eff_{num_requests,pending_prefill,ongoing_decode,ongoing_tokens} = max(shadow, real) when feed fresh (fixes 30s-stale under-count, keeps in-flight RaceFix), plus real-only r_max_prefill_remaining / r_kv_used_frac. Wired into load_only, lmetric, sticky, unified(_kv_both), unified_v3, and snapshot logging. Feed off => identical to before. run_v3_trace.sh gains ES=1 toggle (always deploys enhanced proxy); run_ablation_es.sh runs each config ES0-vs-ES1 to test whether real state changes policy performance/ranking. All unit-tested without GPU. Co-Authored-By: Claude Opus 4.7 --- .../layerwise/P2_ENGINE_STATE.md | 29 +++++- .../layerwise/cache_aware_proxy.WRITEMODE.py | 97 ++++++++++++++----- .../layerwise/run_ablation_es.sh | 42 ++++++++ .../connector_tax/layerwise/run_v3_trace.sh | 26 ++++- 4 files changed, 161 insertions(+), 33 deletions(-) create mode 100755 microbench/connector_tax/layerwise/run_ablation_es.sh diff --git a/microbench/connector_tax/layerwise/P2_ENGINE_STATE.md b/microbench/connector_tax/layerwise/P2_ENGINE_STATE.md index e92a91c..9ad51bc 100644 --- a/microbench/connector_tax/layerwise/P2_ENGINE_STATE.md +++ b/microbench/connector_tax/layerwise/P2_ENGINE_STATE.md @@ -52,10 +52,31 @@ dash0, so file backend is the working default). 3. Proxy: `EXTRA_PROXY_ARGS="--engine-state-uri file:///dev/shm/agentic_engine_state ..."`. 4. Revert the patch + `rm -rf /dev/shm/agentic_engine_state` after. +## ALL policies now read the real state (update) +`InstanceState` exposes effective accessors used by **every** picker: +`eff_num_requests / eff_pending_prefill / eff_ongoing_decode / +eff_ongoing_tokens` = `max(shadow, real)` when the feed is fresh (real fixes +the 30s-stale under-count; shadow's atomic pre-await reservation still covers +the in-flight window, preserving the RaceFix), plus real-only +`r_max_prefill_remaining / r_kv_used_frac`. Wired into: `load_only`, `lmetric`, +`sticky`, `pick_instance` (legacy), `pick_instance_unified_hybrid` +(unified / unified_kv_both), `pick_instance_unified_v3` (gate + Mechanism B), +and `snapshot_workers` (logged scores now match the decision + real fields). +Feed off ⇒ `real_state is None` ⇒ accessors return shadow ⇒ byte-identical to +before. (legacy `unified_v2` left on shadow — retired, not in the ablation.) + +## Ablation (when GPU free) +`run_v3_trace.sh` gains `ES=1` (apply engine-state patch + feed + proxy flag) +and always deploys the enhanced proxy (dormant when feed/write-mode off). +`run_ablation_es.sh` runs each config twice (ES=0 vs ES=1) so the only +difference is the state source. Default decisive set (4 runs): champion +`unified+A+B` and `unified_v3+A+B+layerwise`, each ES0/ES1. Extend CONFIGS for +`lmetric` / `unified_kv_both` / `load_only`. Compares per-policy TTFT +(overall + migrated) and whether the **ranking** changes with ground-truth +state. + ## Status / scope -- Built + unit-tested; NOT yet run against live engines (GPU busy). -- Scoped to **migration target selection** (the P2 ask). The same real-load - signal could also de-stale the base `pick_instance_unified_hybrid` LMetric - fallback (the 8007-hotspot class from UNIFIED_ABLATION) — follow-up. +- Built + unit-tested (snapshot, round-trip, target scorer, eff_ accessors, + end-to-end publish→read→select); NOT yet run against live engines (GPU busy). - TP=1 only (one EngineCore/instance → one publisher/engine_id). TP>1 needs per-rank ids. diff --git a/microbench/connector_tax/layerwise/cache_aware_proxy.WRITEMODE.py b/microbench/connector_tax/layerwise/cache_aware_proxy.WRITEMODE.py index 054e8da..9103a50 100644 --- a/microbench/connector_tax/layerwise/cache_aware_proxy.WRITEMODE.py +++ b/microbench/connector_tax/layerwise/cache_aware_proxy.WRITEMODE.py @@ -217,6 +217,50 @@ class InstanceState: # when the feed is disabled/stale. Set by _engine_state_poll_loop. self.real_state: dict | None = None + # ---- effective-load accessors (P2): prefer REAL engine state when the + # feed is fresh, else the proxy shadow counter. We take max(shadow, real) + # for load so we never under-count: REAL fixes the 30s-stale under-count, + # while the shadow's atomic pre-await reservation still covers the + # in-flight window (preserving the RaceFix against concurrent picks). + def eff_num_requests(self) -> float: + rs = self.real_state + if rs is not None: + return max(self.num_requests, + rs.get("num_running", 0) + rs.get("num_waiting", 0)) + return self.num_requests + + def eff_pending_prefill(self) -> float: + rs = self.real_state + if rs is not None: + return max(self.pending_prefill_tokens, + rs.get("pending_prefill_tokens", 0)) + return self.pending_prefill_tokens + + def eff_ongoing_decode(self) -> float: + rs = self.real_state + if rs is not None: + return max(self.ongoing_decode_tokens, + rs.get("ongoing_decode_tokens", 0)) + return self.ongoing_decode_tokens + + def eff_ongoing_tokens(self) -> float: + rs = self.real_state + if rs is not None: + return max(self.ongoing_tokens, + rs.get("pending_prefill_tokens", 0) + + rs.get("ongoing_decode_tokens", 0)) + return self.ongoing_tokens + + def r_max_prefill_remaining(self) -> int: + rs = self.real_state + return int(rs.get("max_prefill_remaining", 0)) if rs is not None else 0 + + def r_kv_used_frac(self) -> float: + rs = self.real_state + if rs is not None: + return float(rs.get("gpu_kv_used_frac", 0.0) or 0.0) + return 0.0 + def estimate_cache_hit(self, token_ids: list[int] | None) -> int: if not token_ids or len(token_ids) < BLOCK_SIZE: return 0 @@ -277,11 +321,19 @@ def snapshot_workers( "cached_blocks": len(inst.cached_blocks), "cache_hit": cache_hit, "new_prefill": new_prefill, - "score_linear": (inst.ongoing_tokens + # scores reflect the ACTUAL decision basis (eff = real-or-shadow). + "score_linear": (inst.eff_ongoing_tokens() + _p_offload_penalty(inst) - CACHE_HIT_ALPHA * cache_hit), - "score_lmetric": (inst.pending_prefill_tokens + new_prefill) - * inst.num_requests, + "score_lmetric": (inst.eff_pending_prefill() + new_prefill) + * inst.eff_num_requests(), + # P2: real-state fields when the feed is fresh (None otherwise). + "real_num_requests": (inst.eff_num_requests() + if inst.real_state is not None else None), + "real_max_prefill_remaining": (inst.r_max_prefill_remaining() + if inst.real_state is not None else None), + "real_kv_used_frac": (inst.r_kv_used_frac() + if inst.real_state is not None else None), }) return snap @@ -297,20 +349,20 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, Instances doing P-role offloads get a large penalty to steer WARM/MEDIUM traffic away. """ - avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) + avg_load = max(sum(i.eff_ongoing_tokens() for i in instances) / len(instances), 1.0) if session_id and session_id in affinity: idx = affinity[session_id] if idx < len(instances): inst = instances[idx] - if (inst.ongoing_tokens <= avg_load * SETTINGS.overload_factor + if (inst.eff_ongoing_tokens() <= avg_load * SETTINGS.overload_factor and inst.active_p_offloads == 0): return inst, idx best_idx, best_score = 0, float("inf") for i, inst in enumerate(instances): cache_hit = inst.estimate_cache_hit(token_ids) - score = (inst.ongoing_tokens + _p_offload_penalty(inst) + score = (inst.eff_ongoing_tokens() + _p_offload_penalty(inst) - CACHE_HIT_ALPHA * cache_hit) if score < best_score: best_score = score @@ -334,7 +386,7 @@ def pick_instance_load_only( isolate the locality contribution of cache-aware policies. """ best_idx = min(range(len(instances)), - key=lambda i: instances[i].num_requests) + key=lambda i: instances[i].eff_num_requests()) return instances[best_idx], best_idx @@ -357,7 +409,7 @@ def pick_instance_sticky( if idx < len(instances): return instances[idx], idx best_idx = min(range(len(instances)), - key=lambda i: instances[i].num_requests) + key=lambda i: instances[i].eff_num_requests()) if session_id: affinity[session_id] = best_idx return instances[best_idx], best_idx @@ -378,8 +430,8 @@ def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] | for i, inst in enumerate(instances): cache_hit = inst.estimate_cache_hit(token_ids) new_prefill = max(0, input_length - cache_hit) - p_tokens = inst.pending_prefill_tokens + new_prefill - bs = inst.num_requests + p_tokens = inst.eff_pending_prefill() + new_prefill + bs = inst.eff_num_requests() score = p_tokens * bs if score < best_score: best_score = score @@ -416,7 +468,7 @@ def pick_instance_unified_hybrid( """ global _unified_fallback_rr_counter n = len(instances) - avg_reqs = max(sum(i.num_requests for i in instances) / n, 1.0) + avg_reqs = max(sum(i.eff_num_requests() for i in instances) / n, 1.0) decision: dict = { "decision": "lmetric_fallback", @@ -439,9 +491,9 @@ def pick_instance_unified_hybrid( decision["affinity_idx"] = a_idx decision["affinity_cache_hit"] = a_hit decision["affinity_cache_ratio"] = a_ratio - decision["affinity_num_requests"] = a_inst.num_requests + decision["affinity_num_requests"] = a_inst.eff_num_requests() if (a_ratio > 0.5 - and a_inst.num_requests <= avg_reqs * SETTINGS.overload_factor): + and a_inst.eff_num_requests() <= avg_reqs * SETTINGS.overload_factor): decision["decision"] = "affinity" decision["chosen_idx"] = a_idx return a_inst, a_idx, decision @@ -460,9 +512,10 @@ def pick_instance_unified_hybrid( for i, inst in enumerate(instances): cache_hit = inst.estimate_cache_hit(token_ids) new_prefill = max(0, input_length - cache_hit) - p_tokens = inst.pending_prefill_tokens + new_prefill - decode_pen = SETTINGS.lmetric_decode_weight * inst.ongoing_decode_tokens - bs = inst.num_requests + # P2: effective (real-or-shadow) load signals. + p_tokens = inst.eff_pending_prefill() + new_prefill + decode_pen = SETTINGS.lmetric_decode_weight * inst.eff_ongoing_decode() + bs = inst.eff_num_requests() score = (p_tokens + decode_pen) * max(bs, 1) keys.append((score, new_prefill, bs, i)) @@ -669,10 +722,10 @@ def pick_instance_unified_v3( # Gate 2: affinity host must be busy with concurrent decodes — that's # what migrating decode-traffic-away buys us. If the host is idle # there's no point. - if prefill_host.ongoing_decode_tokens < SETTINGS.v3_min_prefill_decode_busy: + if prefill_host.eff_ongoing_decode() < SETTINGS.v3_min_prefill_decode_busy: decision["v3_reason"] = ( f"prefill_host_not_busy " - f"(ongoing_decode_tokens={prefill_host.ongoing_decode_tokens} < " + f"(ongoing_decode_tokens={prefill_host.eff_ongoing_decode()} < " f"{SETTINGS.v3_min_prefill_decode_busy})" ) return prefill_host, prefill_idx, decision, None @@ -683,12 +736,8 @@ def pick_instance_unified_v3( cutoff = now_mono - SETTINGS.v3_recent_mig_window_s def _real_load(inst): - # P2: prefer REAL engine state (running+waiting) over the proxy's - # 30s-stale shadow num_requests, when the engine-state feed is fresh. - rs = getattr(inst, "real_state", None) - if rs is not None: - return rs.get("num_running", 0) + rs.get("num_waiting", 0) - return inst.num_requests + # P2: effective (real-or-shadow) request load; see eff_num_requests. + return inst.eff_num_requests() def effective_load(inst): # Drop expired entries lazily. diff --git a/microbench/connector_tax/layerwise/run_ablation_es.sh b/microbench/connector_tax/layerwise/run_ablation_es.sh new file mode 100755 index 0000000..9737815 --- /dev/null +++ b/microbench/connector_tax/layerwise/run_ablation_es.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Ablation: does the REAL engine-state feed (P2) change each policy's +# performance and ranking vs the stale-shadow baseline? +# +# Each config is run twice (ES=0 shadow-only, ES=1 real-state feed) so the +# ONLY difference is the state source. Sequential, ~47 min each. +# +# Default = the 4 decisive runs (champion + migration, with/without feed). +# Extend CONFIGS for the full sweep (lmetric / unified_kv_both / load_only). +set -uo pipefail +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +R="$PROJ_DIR/microbench/connector_tax/layerwise/run_v3_trace.sh" +AB="--overload-factor 1.3 --lmetric-decode-weight 0.01" +LOGD=/tmp/dst_break_logs; mkdir -p "$LOGD" + +# CONFIG format: "TAG|POLICY|MODE|AB?|ES" +CONFIGS=( + "unified_AB_es0|unified|baseline|AB|0" + "unified_AB_es1|unified|baseline|AB|1" + "v3_AB_lw_es0|unified_v3|layerwise|AB|0" + "v3_AB_lw_es1|unified_v3|layerwise|AB|1" + # --- extend for the full sweep --- + # "lmetric_es0|lmetric|baseline|noAB|0" + # "lmetric_es1|lmetric|baseline|noAB|1" + # "ukvboth_AB_es0|unified_kv_both|baseline|AB|0" + # "ukvboth_AB_es1|unified_kv_both|baseline|AB|1" +) + +for cfg in "${CONFIGS[@]}"; do + IFS='|' read -r tag policy mode ab es <<< "$cfg" + ab_flags=""; [ "$ab" = "AB" ] && ab_flags="$AB" + echo "########## $tag (policy=$policy mode=$mode ab=$ab es=$es) ##########" + TAG="$tag" POLICY="$policy" MODE="$mode" AB_FLAGS="$ab_flags" ES="$es" \ + bash "$R" 2>&1 | tee "$LOGD/abl_${tag}.log" | tail -6 +done + +echo "########## ABLATION DONE — summary ##########" +for cfg in "${CONFIGS[@]}"; do + IFS='|' read -r tag _ _ _ _ <<< "$cfg" + echo "=== $tag ===" + grep -E "requests:|TTFT|migrations:" "$LOGD/abl_${tag}.log" 2>/dev/null || true +done diff --git a/microbench/connector_tax/layerwise/run_v3_trace.sh b/microbench/connector_tax/layerwise/run_v3_trace.sh index 19f09dd..7a4aeae 100755 --- a/microbench/connector_tax/layerwise/run_v3_trace.sh +++ b/microbench/connector_tax/layerwise/run_v3_trace.sh @@ -26,6 +26,9 @@ MC_FILE="$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_co PROXY_FILE="$PROJ_DIR/scripts/cache_aware_proxy.py" LW_CONN="${LW_CONN:-/tmp/mooncake_connector.LAYERWISE.py}" WM_PROXY="${WM_PROXY:-/tmp/cache_aware_proxy.WRITEMODE.py}" +ES_INSTR="$PROJ_DIR/microbench/connector_tax/layerwise/instrument_engine_state.py" +ES="${ES:-0}" # 1 = enable real engine-state feed (P2) +ES_DIR="/dev/shm/agentic_engine_state_${TAG}" mkdir -p "$OUTROOT" cfg_dir="$OUTROOT/unified_v3"; mkdir -p "$cfg_dir" @@ -38,7 +41,9 @@ restore() { cp -f "$MC_FILE.ORIG_BACKUP" "$MC_FILE" cp -f "$PROXY_FILE.ORIG_BACKUP" "$PROXY_FILE" "$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true - echo "[restore] connector+proxy reset to ORIG, DR-fix reverted" + "$PYTHON" "$ES_INSTR" --revert --venv "$VENV" 2>/dev/null || true + rm -rf "$ES_DIR" 2>/dev/null || true + echo "[restore] connector+proxy reset to ORIG, DR-fix + ES-patch reverted" } cleanup() { pkill -9 -f cache_aware_proxy 2>/dev/null || true @@ -51,21 +56,32 @@ trap cleanup EXIT pkill -9 -f "vllm serve" 2>/dev/null || true; sleep 3 restore # start from clean -echo "=== v3 trace ($MODE) -> $OUTROOT ===" +echo "=== v3 trace (mode=$MODE es=$ES tag=$TAG) -> $OUTROOT ===" +# Always deploy the enhanced proxy (write-mode + engine-state, both env/flag +# gated; with feed off + write-mode off it behaves identically to stock). +cp -f "$WM_PROXY" "$PROXY_FILE" if [ "$MODE" = "layerwise" ]; then cp -f "$LW_CONN" "$MC_FILE" - cp -f "$WM_PROXY" "$PROXY_FILE" - "$PYTHON" -c "import ast; ast.parse(open('$MC_FILE').read()); ast.parse(open('$PROXY_FILE').read()); print('[deploy] LAYERWISE conn + WRITEMODE proxy AST OK')" || exit 1 export MOONCAKE_LAYERWISE=1 export EAR_WRITE_MODE=1 fi +"$PYTHON" -c "import ast; ast.parse(open('$MC_FILE').read()); ast.parse(open('$PROXY_FILE').read()); print('[deploy] proxy + connector AST OK')" || exit 1 + +PROXY_ES_ARG="" +if [ "$ES" = "1" ]; then + echo "[ES] apply engine-state patch + enable feed at $ES_DIR" + "$PYTHON" "$ES_INSTR" --apply --venv "$VENV" + mkdir -p "$ES_DIR" + export AGENTIC_ENGINE_STATE_URI="file://$ES_DIR" + PROXY_ES_ARG="--engine-state-uri file://$ES_DIR" +fi echo "[DR-fix] apply" "$PYTHON" "$DR_FIX" --apply --vllm-root "$VLLM_ROOT" export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 echo "[run] $POLICY AB=[$AB_FLAGS] (MOONCAKE_LAYERWISE=${MOONCAKE_LAYERWISE:-0} EAR_WRITE_MODE=${EAR_WRITE_MODE:-0})" -EXTRA_PROXY_ARGS="$AB_FLAGS" bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$POLICY" "$TRACE" "$cfg_dir" \ +EXTRA_PROXY_ARGS="$AB_FLAGS $PROXY_ES_ARG" bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$POLICY" "$TRACE" "$cfg_dir" \ 2>&1 | tee "$cfg_dir/orchestrator.log" | tail -20 pkill -9 -f cache_aware_proxy 2>/dev/null || true