Engine-state ablation: full sweep harness + results
Real-time engine state is NOT the routing lever. Across 6 policies × es0/es1, real state reshuffles 44-76% of decisions but never beats the champion (unified+A+B, p90 7.62s). The effect's SIGN is set by reactivity: one-shot placement (sticky) HELPS -26%; per-request affinity-dominated is a wash; per-request pure-load (lmetric +17%, load_only +27%) HURTS via herding (stale shadow was a dampener). Feed verified fresh (median 25ms, <=92ms during prefills). Prior shadow-state results stand. ES_ABLATION_RESULTS.md has the table + mechanism; run_full_ablation.sh / fresh_sampler.py / cmp_es.py are the harness. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
77
microbench/connector_tax/layerwise/ES_ABLATION_RESULTS.md
Normal file
77
microbench/connector_tax/layerwise/ES_ABLATION_RESULTS.md
Normal file
@@ -0,0 +1,77 @@
|
||||
# Engine-state ablation: real-time state vs router shadow counters
|
||||
|
||||
**Question.** The router (`cache_aware_proxy`) routes on **shadow counters** it
|
||||
maintains itself (incremented at dispatch, reconciled to vLLM `/metrics` only
|
||||
every 30 s → stale). Does feeding it **real** per-engine state (running/waiting,
|
||||
KV-used, pending-prefill, `max_prefill_remaining`) change routing decisions,
|
||||
performance, or the policy ranking?
|
||||
|
||||
**Setup.** dash1, 8×H20 (TP=1), Qwen3-Coder-30B-A3B, trace
|
||||
`w600_r0.0015_st30.jsonl` (1214 reqs / 274 sessions). Each policy run as a
|
||||
matched pair: `es0` (shadow only) vs `es1` (real-state feed via
|
||||
`file:///dev/shm/...`, published ~50 ms by a scheduler daemon thread, read by
|
||||
the proxy via `eff_* = max(shadow, real)`). Only the state source differs.
|
||||
Driver: `run_full_ablation.sh`; per-cell freshness via `fresh_sampler.py`;
|
||||
comparison via `cmp_es.py`.
|
||||
|
||||
## Result — real-time state is NOT the routing lever
|
||||
|
||||
It reshuffles 44–76% of routing decisions but **never beats the champion**;
|
||||
the cache-affinity champion (`unified+A+B`, es0 p90 **7.62 s**) stays best.
|
||||
|
||||
| Policy | how it uses load | inst/session | reroute % | TTFT p90 es0→es1 | mean es0→es1 | verdict |
|
||||
|---|---|--:|--:|--:|--:|---|
|
||||
| `sticky` | **once at session birth, then pinned** | 1.00 | 44.5% | 13.42 → **9.95 (−26%)** | 4.13→3.65 | **HELPS** |
|
||||
| `unified+A+B` | per-req, affinity-dominated | 1.22 | 76.4% | 7.62 → 7.76 (+1.8%) | 3.20→3.24 | wash |
|
||||
| `v3_AB_lw` | per-req, affinity-dom + migration | ~1.2 | 71.7% | 9.35 → 9.49 (+1.5%) | 3.34→3.58 | wash* |
|
||||
| `unified_kv_both` | per-req, affinity-dom (same picker) | ~1.2 | 73.6% | 6.45 → 9.28 (+44%) | 3.07→3.49 | worse† |
|
||||
| `lmetric` | per-req, load×batch | 2.04 | 73.4% | 15.63 → 18.23 (+16.6%) | 5.18→5.80 | HURTS |
|
||||
| `load_only` | per-req, pure load | 2.22 | 72.7% | 21.79 → 27.69 (+27%) | 6.65→8.42 | HURTS |
|
||||
|
||||
\* v3 real-state migration targeting backfired: migrations 26→32, migrated-req
|
||||
mean TTFT 11.99→18.45 s (+54%). Real state does not rescue migration.
|
||||
† same picker as `unified`; the 1.8%-vs-44% spread is run-variance (single
|
||||
pairs) in which reshuffled routes hit hotspots — sign is consistently ≥ neutral.
|
||||
|
||||
## Mechanism — the sign is set by reactivity, not "affinity vs not"
|
||||
|
||||
- **One-shot placement (`sticky`) → HELPS.** `pick_instance_sticky` is *not* a
|
||||
stateless hash: the first turn picks `min(eff_num_requests())` (load), then
|
||||
`affinity[session]` pins it for all later turns. State enters at exactly one
|
||||
decision per session; real load → better placement that compounds across the
|
||||
session, locality preserved, no per-request oscillation.
|
||||
- **Per-request, affinity-dominated (`unified`/`v3`/`kv_both`) → wash-to-worse.**
|
||||
The hybrid picker mostly obeys affinity; only the ~12% fallback fraction
|
||||
consults load. Net 0…+44%, never helps.
|
||||
- **Per-request, pure load (`lmetric`/`load_only`) → HURTS, monotonic in
|
||||
load-purity.** Routing on *instantaneous* load induces **herding** (everyone
|
||||
piles onto whatever momentarily looks idle → transient overload → tail
|
||||
inflation); the stale shadow counter was inadvertently a **dampener**.
|
||||
|
||||
## Why the result is trustworthy (not a stale-feed artifact)
|
||||
The feed was fresh on every es1 cell: age median **25 ms**, **≤92 ms even
|
||||
during 100k-token prefills**, <0.5 % of samples >2 s stale (and those not
|
||||
during prefills → reader drops them → shadow fallback). The feared GIL-
|
||||
starvation of the publisher during big prefills did **not** materialize.
|
||||
|
||||
## Implications
|
||||
1. Don't invest in real-time state for per-request routing — it never wins and
|
||||
degrades load-driven policies up to +27 %.
|
||||
2. The cache-affinity champion is robust to state source; A+B+RaceFix already
|
||||
handled the staleness that mattered.
|
||||
3. **Design insight:** the only place ground-truth state helps is **one-shot
|
||||
session placement** (decide well once on real load, then commit) — not
|
||||
per-request load polling.
|
||||
4. All prior shadow-state results stand; the router's approximate state was
|
||||
never the bottleneck. Workload skew + affinity discipline are.
|
||||
|
||||
## Reproduce
|
||||
```bash
|
||||
# per-cell: same proxy, ES=0 (shadow) vs ES=1 (real); see run_v3_trace.sh
|
||||
MODE=baseline POLICY=unified AB_FLAGS="--overload-factor 1.3 --lmetric-decode-weight 0.01" \
|
||||
ES=1 TAG=unified_AB_es1 bash run_v3_trace.sh
|
||||
# full sweep (waits for the champion es1 marker, then runs the rest):
|
||||
bash run_full_ablation.sh
|
||||
# compare a pair:
|
||||
python cmp_es.py <es0_dir>/unified_v3 <es1_dir>/unified_v3 abl_<tag>_es1.freshness.jsonl
|
||||
```
|
||||
76
microbench/connector_tax/layerwise/cmp_es.py
Normal file
76
microbench/connector_tax/layerwise/cmp_es.py
Normal file
@@ -0,0 +1,76 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Compare an es0 (shadow) vs es1 (real-state) run: TTFT, decision split,
|
||||
routing flips, load distribution. Optional 3rd arg = es1 freshness jsonl."""
|
||||
import json, sys, statistics, os
|
||||
|
||||
def load(d):
|
||||
ms = {}
|
||||
for l in open(os.path.join(d, "metrics.jsonl")):
|
||||
m = json.loads(l); ms[m["request_id"]] = m
|
||||
bd = {x["request_id"]: x for x in json.load(open(os.path.join(d, "breakdown.json")))}
|
||||
return ms, bd
|
||||
|
||||
def pct(xs, q):
|
||||
xs = sorted(xs)
|
||||
return xs[min(len(xs) - 1, int(q * len(xs)))] if xs else 0
|
||||
|
||||
def chosen(x):
|
||||
return x.get("routed_to", x.get("chosen_idx"))
|
||||
|
||||
d0, d1 = sys.argv[1], sys.argv[2]
|
||||
m0, b0 = load(d0); m1, b1 = load(d1)
|
||||
|
||||
def ttfts(ms):
|
||||
return [m["ttft_s"] for m in ms.values() if not m.get("error") and m.get("ttft_s") is not None]
|
||||
|
||||
print("=== overall TTFT ===")
|
||||
for tag, ms in [("es0/shadow", m0), ("es1/real ", m1)]:
|
||||
t = ttfts(ms)
|
||||
print(f"{tag}: {len(t)}/{len(ms)} ok p50={pct(t,.5):.2f} p90={pct(t,.9):.2f} "
|
||||
f"p99={pct(t,.99):.2f} max={max(t):.2f} mean={statistics.mean(t):.2f}")
|
||||
|
||||
def byclass(ms, bd):
|
||||
cls = {}
|
||||
for rid, m in ms.items():
|
||||
if m.get("error") or m.get("ttft_s") is None: continue
|
||||
dec = bd.get(rid, {}).get("decision", "?")
|
||||
cls.setdefault(dec, []).append(m["ttft_s"])
|
||||
return cls
|
||||
|
||||
print("\n=== decision split (n / p90 / p99) ===")
|
||||
for tag, ms, bd in [("es0", m0, b0), ("es1", m1, b1)]:
|
||||
print(f" [{tag}]")
|
||||
for dec, ts in sorted(byclass(ms, bd).items()):
|
||||
print(f" {dec:18s} n={len(ts):4d} p90={pct(ts,.9):7.2f} p99={pct(ts,.99):7.2f}")
|
||||
|
||||
common = set(b0) & set(b1)
|
||||
flips = [r for r in common if chosen(b0[r]) != chosen(b1[r])]
|
||||
decflip = [r for r in common if b0[r].get("decision") != b1[r].get("decision")]
|
||||
print(f"\n=== routing changes (common reqs={len(common)}) ===")
|
||||
print(f" instance flips : {len(flips)} ({100*len(flips)/max(1,len(common)):.1f}%)")
|
||||
print(f" decision-type flips: {len(decflip)} ({100*len(decflip)/max(1,len(common)):.1f}%)")
|
||||
|
||||
# TTFT of flipped vs non-flipped (es1 side)
|
||||
fl_t = [m1[r]["ttft_s"] for r in flips if not m1[r].get("error") and m1[r].get("ttft_s") is not None]
|
||||
if fl_t:
|
||||
print(f" flipped reqs es1 TTFT: p50={pct(fl_t,.5):.2f} p90={pct(fl_t,.9):.2f} mean={statistics.mean(fl_t):.2f}")
|
||||
|
||||
def dist(bd):
|
||||
d = {}
|
||||
for x in bd.values():
|
||||
d[chosen(x)] = d.get(chosen(x), 0) + 1
|
||||
return dict(sorted(d.items(), key=lambda kv: str(kv[0])))
|
||||
print("\n=== per-instance request count ===")
|
||||
print(" es0:", dist(b0))
|
||||
print(" es1:", dist(b1))
|
||||
|
||||
if len(sys.argv) > 3 and os.path.exists(sys.argv[3]):
|
||||
rows = [json.loads(l) for l in open(sys.argv[3]) if l.strip()]
|
||||
ages = [r["age_s"] for r in rows]
|
||||
busy = [r["age_s"] for r in rows if (r.get("num_prefilling") or 0) > 0]
|
||||
print(f"\n=== es1 feed freshness (full run, n={len(ages)}) ===")
|
||||
if ages:
|
||||
print(f" age_s med={statistics.median(ages):.3f} p90={pct(ages,.9):.3f} max={max(ages):.3f} "
|
||||
f"stale>2s={sum(1 for a in ages if a>2)}")
|
||||
if busy:
|
||||
print(f" during-prefill n={len(busy)} med={statistics.median(busy):.3f} max={max(busy):.3f}")
|
||||
30
microbench/connector_tax/layerwise/fresh_sampler.py
Normal file
30
microbench/connector_tax/layerwise/fresh_sampler.py
Normal file
@@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sample engine-state feed freshness during the es1 run.
|
||||
Writes one jsonl record per engine per tick: age_s = now - state.ts.
|
||||
Stops when the DONE marker appears (run finished + /dev/shm wiped) or after 90min.
|
||||
"""
|
||||
import json, os, time, sys, glob
|
||||
|
||||
esdir, outpath, donemarker = sys.argv[1], sys.argv[2], sys.argv[3]
|
||||
deadline = time.time() + 90 * 60
|
||||
with open(outpath, "a") as out:
|
||||
while not os.path.exists(donemarker) and time.time() < deadline:
|
||||
now = time.time()
|
||||
if os.path.isdir(esdir):
|
||||
for f in sorted(glob.glob(os.path.join(esdir, "engine_*.json"))):
|
||||
try:
|
||||
s = json.load(open(f))
|
||||
rec = {
|
||||
"now": round(now, 3),
|
||||
"engine": os.path.basename(f)[:-5],
|
||||
"age_s": round(now - s.get("ts", 0), 4),
|
||||
"num_running": s.get("num_running"),
|
||||
"num_prefilling": s.get("num_prefilling"),
|
||||
"max_prefill_remaining": s.get("max_prefill_remaining"),
|
||||
"kv_used": s.get("gpu_kv_used_frac"),
|
||||
}
|
||||
out.write(json.dumps(rec) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
out.flush()
|
||||
time.sleep(5)
|
||||
65
microbench/connector_tax/layerwise/run_full_ablation.sh
Normal file
65
microbench/connector_tax/layerwise/run_full_ablation.sh
Normal file
@@ -0,0 +1,65 @@
|
||||
#!/usr/bin/env bash
|
||||
# Full engine-state ablation sweep on dash1 (sequential; shared-venv patch
|
||||
# can't be parallelized). Waits for the in-flight champion es1 to finish, then
|
||||
# runs the remaining policies as matched es0/es1 pairs. Each es1 cell gets a
|
||||
# freshness sampler (age_s = now - state.ts) written to cpfs.
|
||||
#
|
||||
# Champion: es0 already done (v3trace_unified_AB_es0_20260528_1633),
|
||||
# es1 in flight (launch_es1.sh) -> reused, not re-run here.
|
||||
set -uo pipefail
|
||||
PROJ=/home/admin/cpfs/wjh/agentic-kv
|
||||
LWDIR=$PROJ/microbench/connector_tax/layerwise
|
||||
R=$LWDIR/run_v3_trace.sh
|
||||
SAMPLER=$LWDIR/fresh_sampler.py
|
||||
PY=$PROJ/.venv/bin/python
|
||||
AB="--overload-factor 1.3 --lmetric-decode-weight 0.01"
|
||||
PROG=$PROJ/outputs/abl_full.progress
|
||||
MASTERDONE=$PROJ/outputs/abl_full.done
|
||||
rm -f "$MASTERDONE"
|
||||
|
||||
echo "[driver] $(date) waiting for champion es1 (abl_unified_AB_es1.done) ..." >> "$PROG"
|
||||
while [ ! -f "$PROJ/outputs/abl_unified_AB_es1.done" ]; do sleep 30; done
|
||||
echo "[driver] $(date) champion es1 done -> starting sweep" >> "$PROG"
|
||||
|
||||
# TAG | POLICY | MODE | AB(yes=AB) | ES
|
||||
CONFIGS=(
|
||||
"lmetric_es0|lmetric|baseline||0"
|
||||
"lmetric_es1|lmetric|baseline||1"
|
||||
"load_only_es0|load_only|baseline||0"
|
||||
"load_only_es1|load_only|baseline||1"
|
||||
"sticky_es0|sticky|baseline||0"
|
||||
"sticky_es1|sticky|baseline||1"
|
||||
"v3_AB_lw_es0|unified_v3|layerwise|AB|0"
|
||||
"v3_AB_lw_es1|unified_v3|layerwise|AB|1"
|
||||
"ukvboth_AB_es0|unified_kv_both|baseline|AB|0"
|
||||
"ukvboth_AB_es1|unified_kv_both|baseline|AB|1"
|
||||
)
|
||||
|
||||
for cfg in "${CONFIGS[@]}"; do
|
||||
IFS='|' read -r tag policy mode ab es <<< "$cfg"
|
||||
abf=""; [ "$ab" = "AB" ] && abf="$AB"
|
||||
esdir=/dev/shm/agentic_engine_state_${tag}
|
||||
fresh=$PROJ/outputs/abl_${tag}.freshness.jsonl
|
||||
rl=$PROJ/outputs/abl_${tag}.runlog
|
||||
celldone=$PROJ/outputs/abl_${tag}.celldone
|
||||
rm -f "$fresh" "$celldone"
|
||||
echo "[driver] $(date) START $tag (policy=$policy mode=$mode ab=$ab es=$es)" >> "$PROG"
|
||||
|
||||
sp=""
|
||||
if [ "$es" = "1" ]; then
|
||||
nohup "$PY" "$SAMPLER" "$esdir" "$fresh" "$celldone" >/dev/null 2>&1 &
|
||||
sp=$!
|
||||
fi
|
||||
|
||||
TAG="$tag" POLICY="$policy" MODE="$mode" AB_FLAGS="$abf" ES="$es" \
|
||||
bash "$R" > "$rl" 2>&1
|
||||
ec=$?
|
||||
|
||||
echo "done $ec $(date)" > "$celldone"
|
||||
[ -n "$sp" ] && { kill "$sp" 2>/dev/null || true; }
|
||||
echo "[driver] $(date) END $tag exit=$ec" >> "$PROG"
|
||||
sleep 10
|
||||
done
|
||||
|
||||
echo "ALL DONE $(date)" > "$MASTERDONE"
|
||||
echo "[driver] $(date) SWEEP COMPLETE" >> "$PROG"
|
||||
Reference in New Issue
Block a user