P2: all routing policies read real state via eff_ accessors + ablation harness
InstanceState.eff_{num_requests,pending_prefill,ongoing_decode,ongoing_tokens}
= max(shadow, real) when feed fresh (fixes 30s-stale under-count, keeps
in-flight RaceFix), plus real-only r_max_prefill_remaining / r_kv_used_frac.
Wired into load_only, lmetric, sticky, unified(_kv_both), unified_v3, and
snapshot logging. Feed off => identical to before. run_v3_trace.sh gains ES=1
toggle (always deploys enhanced proxy); run_ablation_es.sh runs each config
ES0-vs-ES1 to test whether real state changes policy performance/ranking.
All unit-tested without GPU.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -52,10 +52,31 @@ dash0, so file backend is the working default).
|
||||
3. Proxy: `EXTRA_PROXY_ARGS="--engine-state-uri file:///dev/shm/agentic_engine_state ..."`.
|
||||
4. Revert the patch + `rm -rf /dev/shm/agentic_engine_state` after.
|
||||
|
||||
## ALL policies now read the real state (update)
|
||||
`InstanceState` exposes effective accessors used by **every** picker:
|
||||
`eff_num_requests / eff_pending_prefill / eff_ongoing_decode /
|
||||
eff_ongoing_tokens` = `max(shadow, real)` when the feed is fresh (real fixes
|
||||
the 30s-stale under-count; shadow's atomic pre-await reservation still covers
|
||||
the in-flight window, preserving the RaceFix), plus real-only
|
||||
`r_max_prefill_remaining / r_kv_used_frac`. Wired into: `load_only`, `lmetric`,
|
||||
`sticky`, `pick_instance` (legacy), `pick_instance_unified_hybrid`
|
||||
(unified / unified_kv_both), `pick_instance_unified_v3` (gate + Mechanism B),
|
||||
and `snapshot_workers` (logged scores now match the decision + real fields).
|
||||
Feed off ⇒ `real_state is None` ⇒ accessors return shadow ⇒ byte-identical to
|
||||
before. (legacy `unified_v2` left on shadow — retired, not in the ablation.)
|
||||
|
||||
## Ablation (when GPU free)
|
||||
`run_v3_trace.sh` gains `ES=1` (apply engine-state patch + feed + proxy flag)
|
||||
and always deploys the enhanced proxy (dormant when feed/write-mode off).
|
||||
`run_ablation_es.sh` runs each config twice (ES=0 vs ES=1) so the only
|
||||
difference is the state source. Default decisive set (4 runs): champion
|
||||
`unified+A+B` and `unified_v3+A+B+layerwise`, each ES0/ES1. Extend CONFIGS for
|
||||
`lmetric` / `unified_kv_both` / `load_only`. Compares per-policy TTFT
|
||||
(overall + migrated) and whether the **ranking** changes with ground-truth
|
||||
state.
|
||||
|
||||
## Status / scope
|
||||
- Built + unit-tested; NOT yet run against live engines (GPU busy).
|
||||
- Scoped to **migration target selection** (the P2 ask). The same real-load
|
||||
signal could also de-stale the base `pick_instance_unified_hybrid` LMetric
|
||||
fallback (the 8007-hotspot class from UNIFIED_ABLATION) — follow-up.
|
||||
- Built + unit-tested (snapshot, round-trip, target scorer, eff_ accessors,
|
||||
end-to-end publish→read→select); NOT yet run against live engines (GPU busy).
|
||||
- TP=1 only (one EngineCore/instance → one publisher/engine_id). TP>1 needs
|
||||
per-rank ids.
|
||||
|
||||
@@ -217,6 +217,50 @@ class InstanceState:
|
||||
# when the feed is disabled/stale. Set by _engine_state_poll_loop.
|
||||
self.real_state: dict | None = None
|
||||
|
||||
# ---- effective-load accessors (P2): prefer REAL engine state when the
|
||||
# feed is fresh, else the proxy shadow counter. We take max(shadow, real)
|
||||
# for load so we never under-count: REAL fixes the 30s-stale under-count,
|
||||
# while the shadow's atomic pre-await reservation still covers the
|
||||
# in-flight window (preserving the RaceFix against concurrent picks).
|
||||
def eff_num_requests(self) -> float:
|
||||
rs = self.real_state
|
||||
if rs is not None:
|
||||
return max(self.num_requests,
|
||||
rs.get("num_running", 0) + rs.get("num_waiting", 0))
|
||||
return self.num_requests
|
||||
|
||||
def eff_pending_prefill(self) -> float:
|
||||
rs = self.real_state
|
||||
if rs is not None:
|
||||
return max(self.pending_prefill_tokens,
|
||||
rs.get("pending_prefill_tokens", 0))
|
||||
return self.pending_prefill_tokens
|
||||
|
||||
def eff_ongoing_decode(self) -> float:
|
||||
rs = self.real_state
|
||||
if rs is not None:
|
||||
return max(self.ongoing_decode_tokens,
|
||||
rs.get("ongoing_decode_tokens", 0))
|
||||
return self.ongoing_decode_tokens
|
||||
|
||||
def eff_ongoing_tokens(self) -> float:
|
||||
rs = self.real_state
|
||||
if rs is not None:
|
||||
return max(self.ongoing_tokens,
|
||||
rs.get("pending_prefill_tokens", 0)
|
||||
+ rs.get("ongoing_decode_tokens", 0))
|
||||
return self.ongoing_tokens
|
||||
|
||||
def r_max_prefill_remaining(self) -> int:
|
||||
rs = self.real_state
|
||||
return int(rs.get("max_prefill_remaining", 0)) if rs is not None else 0
|
||||
|
||||
def r_kv_used_frac(self) -> float:
|
||||
rs = self.real_state
|
||||
if rs is not None:
|
||||
return float(rs.get("gpu_kv_used_frac", 0.0) or 0.0)
|
||||
return 0.0
|
||||
|
||||
def estimate_cache_hit(self, token_ids: list[int] | None) -> int:
|
||||
if not token_ids or len(token_ids) < BLOCK_SIZE:
|
||||
return 0
|
||||
@@ -277,11 +321,19 @@ def snapshot_workers(
|
||||
"cached_blocks": len(inst.cached_blocks),
|
||||
"cache_hit": cache_hit,
|
||||
"new_prefill": new_prefill,
|
||||
"score_linear": (inst.ongoing_tokens
|
||||
# scores reflect the ACTUAL decision basis (eff = real-or-shadow).
|
||||
"score_linear": (inst.eff_ongoing_tokens()
|
||||
+ _p_offload_penalty(inst)
|
||||
- CACHE_HIT_ALPHA * cache_hit),
|
||||
"score_lmetric": (inst.pending_prefill_tokens + new_prefill)
|
||||
* inst.num_requests,
|
||||
"score_lmetric": (inst.eff_pending_prefill() + new_prefill)
|
||||
* inst.eff_num_requests(),
|
||||
# P2: real-state fields when the feed is fresh (None otherwise).
|
||||
"real_num_requests": (inst.eff_num_requests()
|
||||
if inst.real_state is not None else None),
|
||||
"real_max_prefill_remaining": (inst.r_max_prefill_remaining()
|
||||
if inst.real_state is not None else None),
|
||||
"real_kv_used_frac": (inst.r_kv_used_frac()
|
||||
if inst.real_state is not None else None),
|
||||
})
|
||||
return snap
|
||||
|
||||
@@ -297,20 +349,20 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None,
|
||||
Instances doing P-role offloads get a large penalty to steer
|
||||
WARM/MEDIUM traffic away.
|
||||
"""
|
||||
avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0)
|
||||
avg_load = max(sum(i.eff_ongoing_tokens() for i in instances) / len(instances), 1.0)
|
||||
|
||||
if session_id and session_id in affinity:
|
||||
idx = affinity[session_id]
|
||||
if idx < len(instances):
|
||||
inst = instances[idx]
|
||||
if (inst.ongoing_tokens <= avg_load * SETTINGS.overload_factor
|
||||
if (inst.eff_ongoing_tokens() <= avg_load * SETTINGS.overload_factor
|
||||
and inst.active_p_offloads == 0):
|
||||
return inst, idx
|
||||
|
||||
best_idx, best_score = 0, float("inf")
|
||||
for i, inst in enumerate(instances):
|
||||
cache_hit = inst.estimate_cache_hit(token_ids)
|
||||
score = (inst.ongoing_tokens + _p_offload_penalty(inst)
|
||||
score = (inst.eff_ongoing_tokens() + _p_offload_penalty(inst)
|
||||
- CACHE_HIT_ALPHA * cache_hit)
|
||||
if score < best_score:
|
||||
best_score = score
|
||||
@@ -334,7 +386,7 @@ def pick_instance_load_only(
|
||||
isolate the locality contribution of cache-aware policies.
|
||||
"""
|
||||
best_idx = min(range(len(instances)),
|
||||
key=lambda i: instances[i].num_requests)
|
||||
key=lambda i: instances[i].eff_num_requests())
|
||||
return instances[best_idx], best_idx
|
||||
|
||||
|
||||
@@ -357,7 +409,7 @@ def pick_instance_sticky(
|
||||
if idx < len(instances):
|
||||
return instances[idx], idx
|
||||
best_idx = min(range(len(instances)),
|
||||
key=lambda i: instances[i].num_requests)
|
||||
key=lambda i: instances[i].eff_num_requests())
|
||||
if session_id:
|
||||
affinity[session_id] = best_idx
|
||||
return instances[best_idx], best_idx
|
||||
@@ -378,8 +430,8 @@ def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] |
|
||||
for i, inst in enumerate(instances):
|
||||
cache_hit = inst.estimate_cache_hit(token_ids)
|
||||
new_prefill = max(0, input_length - cache_hit)
|
||||
p_tokens = inst.pending_prefill_tokens + new_prefill
|
||||
bs = inst.num_requests
|
||||
p_tokens = inst.eff_pending_prefill() + new_prefill
|
||||
bs = inst.eff_num_requests()
|
||||
score = p_tokens * bs
|
||||
if score < best_score:
|
||||
best_score = score
|
||||
@@ -416,7 +468,7 @@ def pick_instance_unified_hybrid(
|
||||
"""
|
||||
global _unified_fallback_rr_counter
|
||||
n = len(instances)
|
||||
avg_reqs = max(sum(i.num_requests for i in instances) / n, 1.0)
|
||||
avg_reqs = max(sum(i.eff_num_requests() for i in instances) / n, 1.0)
|
||||
|
||||
decision: dict = {
|
||||
"decision": "lmetric_fallback",
|
||||
@@ -439,9 +491,9 @@ def pick_instance_unified_hybrid(
|
||||
decision["affinity_idx"] = a_idx
|
||||
decision["affinity_cache_hit"] = a_hit
|
||||
decision["affinity_cache_ratio"] = a_ratio
|
||||
decision["affinity_num_requests"] = a_inst.num_requests
|
||||
decision["affinity_num_requests"] = a_inst.eff_num_requests()
|
||||
if (a_ratio > 0.5
|
||||
and a_inst.num_requests <= avg_reqs * SETTINGS.overload_factor):
|
||||
and a_inst.eff_num_requests() <= avg_reqs * SETTINGS.overload_factor):
|
||||
decision["decision"] = "affinity"
|
||||
decision["chosen_idx"] = a_idx
|
||||
return a_inst, a_idx, decision
|
||||
@@ -460,9 +512,10 @@ def pick_instance_unified_hybrid(
|
||||
for i, inst in enumerate(instances):
|
||||
cache_hit = inst.estimate_cache_hit(token_ids)
|
||||
new_prefill = max(0, input_length - cache_hit)
|
||||
p_tokens = inst.pending_prefill_tokens + new_prefill
|
||||
decode_pen = SETTINGS.lmetric_decode_weight * inst.ongoing_decode_tokens
|
||||
bs = inst.num_requests
|
||||
# P2: effective (real-or-shadow) load signals.
|
||||
p_tokens = inst.eff_pending_prefill() + new_prefill
|
||||
decode_pen = SETTINGS.lmetric_decode_weight * inst.eff_ongoing_decode()
|
||||
bs = inst.eff_num_requests()
|
||||
score = (p_tokens + decode_pen) * max(bs, 1)
|
||||
keys.append((score, new_prefill, bs, i))
|
||||
|
||||
@@ -669,10 +722,10 @@ def pick_instance_unified_v3(
|
||||
# Gate 2: affinity host must be busy with concurrent decodes — that's
|
||||
# what migrating decode-traffic-away buys us. If the host is idle
|
||||
# there's no point.
|
||||
if prefill_host.ongoing_decode_tokens < SETTINGS.v3_min_prefill_decode_busy:
|
||||
if prefill_host.eff_ongoing_decode() < SETTINGS.v3_min_prefill_decode_busy:
|
||||
decision["v3_reason"] = (
|
||||
f"prefill_host_not_busy "
|
||||
f"(ongoing_decode_tokens={prefill_host.ongoing_decode_tokens} < "
|
||||
f"(ongoing_decode_tokens={prefill_host.eff_ongoing_decode()} < "
|
||||
f"{SETTINGS.v3_min_prefill_decode_busy})"
|
||||
)
|
||||
return prefill_host, prefill_idx, decision, None
|
||||
@@ -683,12 +736,8 @@ def pick_instance_unified_v3(
|
||||
cutoff = now_mono - SETTINGS.v3_recent_mig_window_s
|
||||
|
||||
def _real_load(inst):
|
||||
# P2: prefer REAL engine state (running+waiting) over the proxy's
|
||||
# 30s-stale shadow num_requests, when the engine-state feed is fresh.
|
||||
rs = getattr(inst, "real_state", None)
|
||||
if rs is not None:
|
||||
return rs.get("num_running", 0) + rs.get("num_waiting", 0)
|
||||
return inst.num_requests
|
||||
# P2: effective (real-or-shadow) request load; see eff_num_requests.
|
||||
return inst.eff_num_requests()
|
||||
|
||||
def effective_load(inst):
|
||||
# Drop expired entries lazily.
|
||||
|
||||
42
microbench/connector_tax/layerwise/run_ablation_es.sh
Executable file
42
microbench/connector_tax/layerwise/run_ablation_es.sh
Executable file
@@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env bash
|
||||
# Ablation: does the REAL engine-state feed (P2) change each policy's
|
||||
# performance and ranking vs the stale-shadow baseline?
|
||||
#
|
||||
# Each config is run twice (ES=0 shadow-only, ES=1 real-state feed) so the
|
||||
# ONLY difference is the state source. Sequential, ~47 min each.
|
||||
#
|
||||
# Default = the 4 decisive runs (champion + migration, with/without feed).
|
||||
# Extend CONFIGS for the full sweep (lmetric / unified_kv_both / load_only).
|
||||
set -uo pipefail
|
||||
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
|
||||
R="$PROJ_DIR/microbench/connector_tax/layerwise/run_v3_trace.sh"
|
||||
AB="--overload-factor 1.3 --lmetric-decode-weight 0.01"
|
||||
LOGD=/tmp/dst_break_logs; mkdir -p "$LOGD"
|
||||
|
||||
# CONFIG format: "TAG|POLICY|MODE|AB?|ES"
|
||||
CONFIGS=(
|
||||
"unified_AB_es0|unified|baseline|AB|0"
|
||||
"unified_AB_es1|unified|baseline|AB|1"
|
||||
"v3_AB_lw_es0|unified_v3|layerwise|AB|0"
|
||||
"v3_AB_lw_es1|unified_v3|layerwise|AB|1"
|
||||
# --- extend for the full sweep ---
|
||||
# "lmetric_es0|lmetric|baseline|noAB|0"
|
||||
# "lmetric_es1|lmetric|baseline|noAB|1"
|
||||
# "ukvboth_AB_es0|unified_kv_both|baseline|AB|0"
|
||||
# "ukvboth_AB_es1|unified_kv_both|baseline|AB|1"
|
||||
)
|
||||
|
||||
for cfg in "${CONFIGS[@]}"; do
|
||||
IFS='|' read -r tag policy mode ab es <<< "$cfg"
|
||||
ab_flags=""; [ "$ab" = "AB" ] && ab_flags="$AB"
|
||||
echo "########## $tag (policy=$policy mode=$mode ab=$ab es=$es) ##########"
|
||||
TAG="$tag" POLICY="$policy" MODE="$mode" AB_FLAGS="$ab_flags" ES="$es" \
|
||||
bash "$R" 2>&1 | tee "$LOGD/abl_${tag}.log" | tail -6
|
||||
done
|
||||
|
||||
echo "########## ABLATION DONE — summary ##########"
|
||||
for cfg in "${CONFIGS[@]}"; do
|
||||
IFS='|' read -r tag _ _ _ _ <<< "$cfg"
|
||||
echo "=== $tag ==="
|
||||
grep -E "requests:|TTFT|migrations:" "$LOGD/abl_${tag}.log" 2>/dev/null || true
|
||||
done
|
||||
@@ -26,6 +26,9 @@ MC_FILE="$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_co
|
||||
PROXY_FILE="$PROJ_DIR/scripts/cache_aware_proxy.py"
|
||||
LW_CONN="${LW_CONN:-/tmp/mooncake_connector.LAYERWISE.py}"
|
||||
WM_PROXY="${WM_PROXY:-/tmp/cache_aware_proxy.WRITEMODE.py}"
|
||||
ES_INSTR="$PROJ_DIR/microbench/connector_tax/layerwise/instrument_engine_state.py"
|
||||
ES="${ES:-0}" # 1 = enable real engine-state feed (P2)
|
||||
ES_DIR="/dev/shm/agentic_engine_state_${TAG}"
|
||||
|
||||
mkdir -p "$OUTROOT"
|
||||
cfg_dir="$OUTROOT/unified_v3"; mkdir -p "$cfg_dir"
|
||||
@@ -38,7 +41,9 @@ restore() {
|
||||
cp -f "$MC_FILE.ORIG_BACKUP" "$MC_FILE"
|
||||
cp -f "$PROXY_FILE.ORIG_BACKUP" "$PROXY_FILE"
|
||||
"$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
|
||||
echo "[restore] connector+proxy reset to ORIG, DR-fix reverted"
|
||||
"$PYTHON" "$ES_INSTR" --revert --venv "$VENV" 2>/dev/null || true
|
||||
rm -rf "$ES_DIR" 2>/dev/null || true
|
||||
echo "[restore] connector+proxy reset to ORIG, DR-fix + ES-patch reverted"
|
||||
}
|
||||
cleanup() {
|
||||
pkill -9 -f cache_aware_proxy 2>/dev/null || true
|
||||
@@ -51,21 +56,32 @@ trap cleanup EXIT
|
||||
pkill -9 -f "vllm serve" 2>/dev/null || true; sleep 3
|
||||
restore # start from clean
|
||||
|
||||
echo "=== v3 trace ($MODE) -> $OUTROOT ==="
|
||||
echo "=== v3 trace (mode=$MODE es=$ES tag=$TAG) -> $OUTROOT ==="
|
||||
# Always deploy the enhanced proxy (write-mode + engine-state, both env/flag
|
||||
# gated; with feed off + write-mode off it behaves identically to stock).
|
||||
cp -f "$WM_PROXY" "$PROXY_FILE"
|
||||
if [ "$MODE" = "layerwise" ]; then
|
||||
cp -f "$LW_CONN" "$MC_FILE"
|
||||
cp -f "$WM_PROXY" "$PROXY_FILE"
|
||||
"$PYTHON" -c "import ast; ast.parse(open('$MC_FILE').read()); ast.parse(open('$PROXY_FILE').read()); print('[deploy] LAYERWISE conn + WRITEMODE proxy AST OK')" || exit 1
|
||||
export MOONCAKE_LAYERWISE=1
|
||||
export EAR_WRITE_MODE=1
|
||||
fi
|
||||
"$PYTHON" -c "import ast; ast.parse(open('$MC_FILE').read()); ast.parse(open('$PROXY_FILE').read()); print('[deploy] proxy + connector AST OK')" || exit 1
|
||||
|
||||
PROXY_ES_ARG=""
|
||||
if [ "$ES" = "1" ]; then
|
||||
echo "[ES] apply engine-state patch + enable feed at $ES_DIR"
|
||||
"$PYTHON" "$ES_INSTR" --apply --venv "$VENV"
|
||||
mkdir -p "$ES_DIR"
|
||||
export AGENTIC_ENGINE_STATE_URI="file://$ES_DIR"
|
||||
PROXY_ES_ARG="--engine-state-uri file://$ES_DIR"
|
||||
fi
|
||||
|
||||
echo "[DR-fix] apply"
|
||||
"$PYTHON" "$DR_FIX" --apply --vllm-root "$VLLM_ROOT"
|
||||
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
|
||||
|
||||
echo "[run] $POLICY AB=[$AB_FLAGS] (MOONCAKE_LAYERWISE=${MOONCAKE_LAYERWISE:-0} EAR_WRITE_MODE=${EAR_WRITE_MODE:-0})"
|
||||
EXTRA_PROXY_ARGS="$AB_FLAGS" bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$POLICY" "$TRACE" "$cfg_dir" \
|
||||
EXTRA_PROXY_ARGS="$AB_FLAGS $PROXY_ES_ARG" bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$POLICY" "$TRACE" "$cfg_dir" \
|
||||
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -20
|
||||
|
||||
pkill -9 -f cache_aware_proxy 2>/dev/null || true
|
||||
|
||||
Reference in New Issue
Block a user