Unified-routing A+B ablation: decode-aware LMetric + v3 anti-hotspot

cache_aware_proxy: add lmetric_decode_weight (decode-load penalty in the
LMetric fallback score) and a v3 anti-hotspot recent-migration penalty
(effective_load = num_req + recent-migration count over a sliding window),
preventing back-to-back migration clustering. UNIFIED_ABLATION.md documents
the A (overload_factor=1.3) + B' (decode-weight, max(num_req,1)) + RaceFix
sweep: A+B'+RaceFix reaches TTFT p90 7770ms, beating v3 PD-sep migration by
~20%. Runners/analyzer for the b3 trace replay included.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 11:52:44 +08:00
parent a2f2645fda
commit 67fcec7933
7 changed files with 984 additions and 22 deletions

View File

@@ -0,0 +1,431 @@
# Unified routing ablation: A (tighter affinity) + B (decode-aware LMetric)
Goal: judge whether `unified` (cache-aware hybrid affinity + LMetric fallback)
has enough headroom to surpass v3 migration-based routing on agentic
workloads, without invoking PD-sep migration.
## Workload / baseline
- Trace: `w600_r0.0015_st30.jsonl` (1214 reqs, 274 sessions)
- Hardware: 8 × H100 (dash0), Qwen3-Coder-30B-A3B, TP=1, max_model_len=200000
- Trace replay through `cache_aware_proxy.py` with policy `unified`
- `b3_replay_20260527_0114/unified/` reference
| Metric (ms) | baseline (`overload_factor=2.0`) |
|---|---:|
| TTFT p50 | 520 |
| TTFT p90 | **8781** |
| TTFT p99 | 47647 |
| TPOT p90 | 17.8 |
| E2E p90 | 19989 |
| E2E p99 | 85841 |
Reference points we're trying to beat / match:
- v3 fixed rotation (cache-blind picker): TTFT p90 = 10828
- v3 + Mechanism B (cache-rich picker): TTFT p90 = 9711
- All v3 variants are +1023% worse than `unified` baseline.
## Tail-source diagnostic on baseline
Decision split, baseline unified:
| Decision | n | TTFT mean | TTFT p90 | TTFT p99 |
|---|---:|---:|---:|---:|
| affinity | 852 | 3183 | 7011 | 47432 |
| lmetric_fallback | 362 | 4285 | 12083 | 46036 |
Long-tail (>20s, n=65):
- 40 / 65 came from `affinity` decisions
- 25 / 65 came from `lmetric_fallback`
For the 40 slow `affinity` reqs:
- only 12 / 40 were actually overloaded at decision time (`aff_num_req > avg_num_req`)
- overload ratio at decision: mean=0.93, p50=0.87
- **most slow affinity reqs looked fine when the picker stuck — load piled
on after dispatch**.
This is a snapshot-based-routing limitation. Tightening
`overload_factor` only helps the genuine cases above the new threshold —
expected to be a 5-10% improvement at best.
---
## Direction A — tighten affinity overflow
**Hypothesis.** `overload_factor=2.0` lets the picker stick to affinity
even when `affinity.num_req` is up to 2× the cluster average. Reducing to
1.3 forces earlier overflow to LMetric fallback, escaping busy affinity
hosts before the tail blows up.
**Change.** Single CLI flag: `--overload-factor 1.3`. No code change.
**Run.** `unified_of13_20260527_1532/unified/`.
### A vs baseline
| Metric (ms) | baseline (of=2.0) | A (of=1.3) | Δ |
|---|---:|---:|---:|
| TTFT p50 | 520 | 495 | 5% |
| TTFT p90 | 8781 | 8730 | ≈0 |
| TTFT p99 | 47647 | 43059 | 10% |
| TPOT p50 | 7.9 | 8.0 | ≈0 |
| TPOT p90 | 17.8 | **15.5** | **13%** |
| E2E p50 | 1761 | 1824 | +4% |
| E2E p90 | 19989 | 18407 | 8% |
| E2E p99 | 85841 | **71396** | **17%** |
TTFT p90 is essentially unchanged but the **deeper tail (p99) and
TPOT both improved meaningfully**. Net: A alone gives roughly 10% to
17% on the long tail without hurting medians.
### Decision split, A vs baseline
| Decision | baseline n / p90 | A n / p90 | Δ p90 |
|---|---|---|---|
| affinity | 852 / 7011 | 817 / **5817** | **17%** ✅ |
| lmetric_fallback | 362 / 12083 | 397 / **15360** | **+27%** ⚠️ |
The picker now sticks to affinity 35 fewer times. The remaining affinity
decisions are higher-quality (no longer "barely-fitting" cases), so their
p90 drops 17%.
But the 35 extra reqs that got pushed into fallback **got slower**:
fallback p90 went from 12083 → 15360. The LMetric scorer is selecting a
worse instance for them.
### Per-worker TTFT under A (of=1.3)
```
port 8000: n= 94 mean=4424 p90=12290 port 8004: n=192 mean=2597 p90=6968
port 8001: n= 135 mean=2779 p90= 5553 port 8005: n=202 mean=3102 p90=6113
port 8002: n= 88 mean=5827 p90=15804 port 8006: n=136 mean=4006 p90=10899
port 8003: n= 217 mean=2674 p90= 4598 port 8007: n=150 mean=3648 p90= 7025
```
Compared to baseline (88..217 reqs/port), A redistributes more evenly
(88..217 still but distribution is fatter in the middle). port 8002
remains slow (p90 15.8s) — its cache pool seems to keep getting cold
work routed there by LMetric.
### Why A alone isn't enough
LMetric scorer (`unified_hybrid` fallback path):
```python
score = (pending_prefill_tokens + new_uncached_tokens) * num_requests
```
This **ignores `ongoing_decode_tokens`** entirely. An instance with no
pending prefill but 200k tokens currently in decode looks "ideal"
(score=0×num_req=0) — yet a new request landing there waits behind
slow decode iters caused by the large batch KV reads.
A pushes more requests into fallback, but fallback can't tell which
instance is actually free. → Direction B is mandatory companion.
---
## Direction B — decode-aware LMetric
**Hypothesis.** Adding a decode-load penalty to the LMetric score lets
fallback distinguish "no prefill queued but heavy decode running" from
"truly idle". Should restore fallback p90 ≤ 12s baseline level.
**Change.**
```python
score = (pending_prefill + new + lmetric_decode_weight * ongoing_decode_tokens) * num_requests
```
- `lmetric_decode_weight=0.0` ⇒ original LMetric (control)
- `lmetric_decode_weight=0.01` ⇒ first experiment (rationale: 1 decode token
in batch costs ~0.01 prefill-token-equivalent in scheduler iter time
on H100 + Qwen3-30B-A3B)
CLI: `--lmetric-decode-weight 0.01`. Setting in code:
`cache_aware_proxy.py:Settings.lmetric_decode_weight`.
**Run.** `unified_of13_lmw001_20260527_1628/unified/`.
### A+B vs baseline / A
| Metric (ms) | baseline | A (of=1.3) | A+B (of=1.3, lmw=0.01) | Δ vs baseline |
|---|---:|---:|---:|---:|
| TTFT p50 | 520 | 495 | 514 | 1% |
| **TTFT p90** | 8781 | 8730 | **8421** | **4%** ✅ |
| TTFT p99 | 47647 | 43059 | 44800 | 6% |
| TPOT p50 | 7.9 | 8.0 | 7.9 | ≈0 |
| TPOT p90 | 17.8 | 15.5 | 15.7 | 12% |
| E2E p50 | 1761 | 1824 | 1870 | +6% |
| E2E p90 | 19989 | 18407 | **21064** | **+5%** ⚠️ |
| E2E p99 | 85841 | 71396 | **64344** | **25%** ✅ |
Long-tail counts:
```
thresh baseline A A+B v3 MechB
> 5000ms 170 173 170 177
> 10000ms 105 109 109 119
> 20000ms 65 64 59 78
> 30000ms 41 40 37 50
> 50000ms 8 5 6 14
```
A+B is best on every long-tail-count threshold ≤30s, marginal worse at 50s.
### Decision split (A+B vs A)
| Decision | A (of=1.3) | A+B | Note |
|---|---|---|---|
| affinity p90 | 5817 | 5836 | ≈ same |
| fallback p90 | **15360** | **13501** | B recovered some of A's fallback regression |
B partially fixed fallback's selection (12% on fallback p90 vs A alone),
but still worse than baseline (12083).
### Per-worker TTFT (A+B)
```
port 8000: n=134 mean=3495 p90=10967 port 8004: n=136 mean=3102 p90= 7906
port 8001: n=143 mean=2981 p90=10189 port 8005: n=179 mean=1624 p90= 2735
port 8002: n=221 mean=2355 p90= 3502 port 8006: n=137 mean=5356 p90= 9628
port 8003: n=146 mean=3932 p90=10729 port 8007: n=118 mean=5210 p90=26798 ← new hotspot
```
A+B trades the baseline's 8002 hotspot (p90=35s) for a new 8007 hotspot
(p90=26.8s). Lower amplitude but hotspot survives.
### Why 8007 became a hotspot under A+B — **found a bug in B**
8007 in A+B: 118 reqs, **53% affinity / 47% fallback** (vs other ports
6077% affinity), **cache_hit_mean=50.5% (lowest)**.
Top-10 slowest at 8007: all are big-prompt (100k+ tokens) fallback decisions
with `cached_tokens=0` (cold prefill). LMetric is pushing many cold-prefill
fallbacks to 8007.
Looking at the B formula:
```python
decode_pen = lmetric_decode_weight * ongoing_decode_tokens
score = (pending_prefill + new + decode_pen) * num_requests # ← BUG
```
When `num_requests = 0`, the entire score (including decode penalty) zeros
out. So an idle-but-decoding host (num_req=0 because its last prefill
finished but decode is still running) looks like score=0, beating every
busy host.
**Fix (B'):** multiply by `max(num_requests, 1)`:
```python
score = (pending_prefill + new + decode_pen) * max(num_requests, 1)
```
Now idle hosts with high decode load get score = decode_pen × 1 = real
nonzero penalty, beating zero-load hosts only when decode is small.
### A+B' — re-run with the fix
**Run.** `unified_of13_lmw001_v2_20260527_1724/unified/`.
| Metric (ms) | baseline | A+B (BUG) | A+B' (fix) | Δ vs baseline |
|---|---:|---:|---:|---:|
| TTFT p50 | 520 | 514 | **485** | 7% |
| **TTFT p90** | 8781 | 8421 | **8287** | **5.6%** ✅ |
| TTFT p99 | 47647 | 44800 | **41876** | **12%** ✅ |
| TPOT p90 | 17.8 | 15.7 | 17.5 | 2% |
| E2E p90 | 19989 | 21064 | 20625 | +3% |
| E2E p99 | 85841 | 64344 | 77827 | 9% |
A+B' **best of all variants on TTFT p90 (8287) and TTFT p99 (41876)**.
Long-tail counts (>30s, >50s) also best across variants.
vs v3 reference points:
| | TTFT p90 | TPOT p90 | E2E p99 |
|---|---:|---:|---:|
| **A+B'** | **8287** | 17.5 | 77827 |
| v3 fixed (cache-blind) | 10828 | 21.0 | 47610 |
| v3 + Mech B | 9711 | 18.3 | 84492 |
A+B' **beats v3 Mech B by 15% TTFT p90** with no migration overhead.
### Per-worker (A+B' fixed)
```
8000: n=158 p90= 5688 8004: n=189 p90= 4249
8001: n=159 p90= 7323 8005: n=116 p90=14598
8002: n=114 p90= 8726 8006: n=180 p90= 6198
8003: n=173 p90= 6715 8007: n=125 p90=22242 ← still hot
```
A+B' redistributed load more evenly (114..189) but **8007 still has p90=22s**.
### 8007 deep-dive in A+B'
```
8007: n=125, affinity=69 (55%), fallback=56 (45%), cache_hit_mean=lowest
```
Top-15 slow at 8007:
- 7 of them are session **1313181** turns 914 (130k+ tokens each, agentic
long context, ~50% cache hit)
- Several others are cold-start turn-1 of large-prompt sessions
- First two slow reqs arrived **0.7 s apart** — strong hint of concurrent
picker race
### Iteration 3: race-condition fix
**Diagnosis.** In `_handle_combined`:
```python
chosen, best_idx, decision = pick_instance_unified_hybrid(...) # sync
# ... sync breakdown updates ...
return await _handle_local_request(...) # ← await yields here
# THEN reservation happens
```
`return await async_func(...)` evaluates the async call (creates coroutine)
and yields to the event loop **before** the coroutine body executes. The
reservation (`chosen.pending_prefill_tokens += new`, etc.) lives at the top
of `_handle_local_request`, so between the picker and the reservation there
is a **window where another coroutine can run and re-pick the same instance**.
When two big-prompt reqs arrive within milliseconds, both run pick →
both pick the "free" 8007 → both yield → both reserve. Result: 8007 gets
back-to-back 130k-token cold prefills, each waiting for the other.
**Fix.** Move the reservation **before** the await, inside `_handle_combined`:
```python
# Race fix: reserve atomically with pick, before any await.
chosen.ongoing_tokens += input_length
chosen.pending_prefill_tokens += estimated_new
chosen.num_requests += 1
return await _handle_local_request(..., _pre_reserved=True)
```
`_handle_local_request` skips its own reservation when `_pre_reserved=True`.
PD-sep paths are unaffected (they have their own reservation).
**Run.** Pending — `unified_of13_lmw001_racefix_*`. Hypothesis: 8007 p90
drops to within ±3s of cluster median, since concurrent picks for the
same "free" instance no longer happen.
---
## A+B'+RaceFix — results
**Run.** `unified_of13_lmw001_racefix_20260527_1821/unified/`.
| Metric (ms) | baseline | A+B' | A+B'+RF | Δ vs baseline |
|---|---:|---:|---:|---:|
| TTFT p50 | 520 | 485 | **478** | 8% |
| **TTFT p90** | 8781 | 8287 | **7770** | **11.5%** ✅ |
| TTFT p99 | 47647 | 41876 | **42447** | 11% |
| TPOT p90 | 17.8 | 17.5 | 18.0 | +1% |
| E2E p90 | 19989 | 20625 | **18418** | 8% |
| E2E p99 | 85841 | 77827 | **71227** | 17% |
vs v3 reference:
- **A+B'+RF TTFT p90 = 7770ms, vs v3 Mech B 9711ms → 20%** ✅
Long-tail counts (best across all variants):
```
> 5s: 170 → 158 > 30s: 41 → 33
>10s: 105 → 103 > 50s: 8 → 4
>20s: 65 → 57 >100s: 0 → 0
```
### Decision split — race fix mainly helped affinity
| Decision | baseline | A+B'+RF |
|---|---:|---:|
| affinity p90 | 7011 | **5042** ✅ (28%) |
| fallback p90 | 12083 | 13944 (+15%) |
The race-condition was hurting affinity decisions the most. When two
concurrent reqs both stuck to a "free-looking" affinity instance, they
piled up and inflated affinity's tail. Fix removed this collision.
### Per-worker
```
8000: n=86 p90=11541 8004: n=150 p90=11906
8001: n=186 p90= 8307 8005: n=109 p90= 4798
8002: n=105 p90=14540 8006: n=183 p90= 6258
8003: n=264 p90= 3079 8007: n=131 p90=21850 ← still hot
8000 spread now 86..264 — race fix did disperse routing
```
### 8007 still hot — but it's **workload-inherent, not a routing bug**
Top sessions on 8007:
```
session 1279412: n=22 mean= 2208 max=18985 decisions: 91% affinity
session 1313181: n=17 mean=17399 max=49089 decisions: 65% affinity
session 1262354: n=15 mean= 622 max= 2325 decisions: 87% affinity
session 1342921: n= 7 mean=17817 max=55589 decisions: 86% affinity
session 1260327: n= 8 mean= 1636 max= 5382 decisions: 75% affinity
session 1268831: n= 5 mean= 1443 max= 2673 decisions: 80% affinity
```
Sessions 1313181 and 1342921 are **long agentic contexts**: 100k130k tokens
per turn with ~50% cache hit (i.e. 50k new tokens prefill per turn). Even
on a perfectly load-balanced instance, each turn is 715s of pure compute.
Forcing these sessions to spread across instances would mean **cold prefill
every turn (0% cache hit)** → each turn becomes 2030s instead of 715s.
Spreading is **net-negative**.
→ The 8007 p90=22s is the floor imposed by these sessions' structure,
not by routing policy. Unified is at its ceiling for this workload.
---
## Final ranking and take-aways
| Policy | TTFT p90 (ms) | Δ vs baseline | Notes |
|---|---:|---:|---|
| baseline unified (of=2.0) | 8781 | — | reference |
| A (of=1.3) | 8730 | ≈0 | affinity p90 -17%, fallback p90 +27% |
| A+B (of=1.3, lmw=0.01, BUG) | 8421 | 4% | 8007 hotspot from `*num_req` zeroing bug |
| A+B' (formula fix) | 8287 | 5.6% | Bug fixed, still 8007 mild hotspot |
| **A+B'+RaceFix** | **7770** | **11.5%** ✅ | **Best unified variant** |
| v3 fixed | 10828 | +23% | PD-sep migration, cache-blind picker |
| v3 + Mech B | 9711 | +11% | PD-sep + cache-rich target picker |
### Conclusions
1. **Unified path beats v3 PD-sep on this workload by 20%+ TTFT p90.**
PD-sep migration's fixed cost (src prefill + dst first-token waiting on
loaded scheduler) outweighs any decode-time savings for short-output
agentic turns.
2. **Three orthogonal fixes compound for a 11.5% TTFT p90 win:**
- A (`overload_factor=1.3`): tighter affinity overflow → 0.6% but
much cleaner affinity decisions (p90 -17%)
- B' (`lmetric_decode_weight=0.01` with `max(num_req,1)`): decode-aware
fallback → 3.5%
- RaceFix (atomic reserve before await): kills concurrent-pick
collisions → 5.6%
3. **Race condition was the biggest single hidden bug.** `return await
async_func(...)` yields to the event loop **before** the body of
`async_func` runs, so reservations done in the body don't take effect
in time to deter concurrent picks. This affects ANY async dispatch
with separate pick/reserve steps — worth checking other routing
policies.
4. **8007 p90=22s is workload-inherent.** Sessions with 100k+ token turns
at 50% cache hit cannot finish faster than 715s per turn regardless
of routing. Forcing spread would hurt rather than help.
5. **Migration (v3) is not necessary** when unified routing is tuned
well. Save the PD-sep mechanism for cases where it can be proven
net-positive (e.g. very-long-output sessions on extremely overloaded
prefill hosts) and use unified A+B'+RaceFix as the default.
---
## Direction A+B — run pending
(Will be filled when `unified_of13_lmw001_*/unified/` finishes.)

View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""B3 5-policy re-test analyser.
Compute TTFT/TPOT/E2E mean/p50/p90/p99 for each policy from
metrics.jsonl, compare against the historical b3_policy_comparison.json
that drives fig_b3_latency_bars.png, and emit a side-by-side table
plus a new figure with the same layout as the original.
Usage:
python analyze_b3_replay.py --root <outroot> [--old-data <path>] [--figure <path>]
"""
import argparse
import json
import statistics
from pathlib import Path
POLICIES = ["lmetric", "load_only", "sticky", "unified", "unified_v2"]
def pct(xs, p):
if not xs:
return None
xs = sorted(xs)
k = max(0, min(len(xs) - 1, int(p / 100.0 * (len(xs) - 1))))
return xs[k]
def summarise(path):
rows = [json.loads(l) for l in open(path) if l.strip()]
ok = [r for r in rows if not r.get("error")]
ttft = [r["ttft_s"] * 1000 for r in ok if r.get("ttft_s") is not None]
tpot = [r["tpot_s"] * 1000 for r in ok if r.get("tpot_s")]
e2e = [r["latency_s"] * 1000 for r in ok if r.get("latency_s") is not None]
return {
"n_total": len(rows),
"n_ok": len(ok),
"ttft_mean_ms": statistics.mean(ttft) if ttft else None,
"ttft_p50_ms": pct(ttft, 50),
"ttft_p90_ms": pct(ttft, 90),
"ttft_p99_ms": pct(ttft, 99),
"tpot_mean_ms": statistics.mean(tpot) if tpot else None,
"tpot_p50_ms": pct(tpot, 50),
"tpot_p90_ms": pct(tpot, 90),
"tpot_p99_ms": pct(tpot, 99),
"e2e_mean_ms": statistics.mean(e2e) if e2e else None,
"e2e_p50_ms": pct(e2e, 50),
"e2e_p90_ms": pct(e2e, 90),
"e2e_p99_ms": pct(e2e, 99),
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--root", type=Path, required=True)
ap.add_argument("--old-data", type=Path,
default=Path("analysis/characterization/window_1_results/b3_policy_comparison.json"))
ap.add_argument("--figure", type=Path, default=None)
args = ap.parse_args()
new = {}
for p in POLICIES:
path = args.root / p / "metrics.jsonl"
if not path.exists():
print(f"MISSING: {path}")
continue
new[p] = summarise(path)
old = {}
if args.old_data.exists():
d = json.load(open(args.old_data))
for r in d.get("rows", []):
old[r["policy"]] = {
"ttft_p50_ms": r["ttft_p50_s"] * 1000,
"ttft_p90_ms": r["ttft_p90_s"] * 1000,
"ttft_p99_ms": r["ttft_p99_s"] * 1000,
"tpot_p90_ms": r["tpot_p90_s"] * 1000,
"e2e_p90_ms": r.get("e2e_p90_s", 0) * 1000,
}
def fmt(v): return f"{v:.0f}" if v is not None else "-"
def pctd(a, b):
if a is None or b is None or a == 0: return "-"
return f"{(b/a-1)*100:+.1f}%"
# Headline table
print(f"\n# NEW: today's re-test")
print(f"{'policy':<14}{'n_ok':>6}{'TTFTp50':>10}{'TTFTp90':>10}{'TTFTp99':>10}{'TPOTp90':>10}{'E2Ep90':>10}")
print("-" * 70)
for p in POLICIES:
if p not in new: continue
r = new[p]
print(f"{p:<14}{r['n_ok']:>6}{fmt(r['ttft_p50_ms']):>9}ms{fmt(r['ttft_p90_ms']):>9}ms{fmt(r['ttft_p99_ms']):>9}ms{fmt(r['tpot_p90_ms']):>9}ms{fmt(r['e2e_p90_ms']):>9}ms")
print(f"\n# OLD: window_1_results/b3_policy_comparison.json")
print(f"{'policy':<14}{'TTFTp50':>10}{'TTFTp90':>10}{'TTFTp99':>10}{'TPOTp90':>10}{'E2Ep90':>10}")
print("-" * 60)
for p in POLICIES:
if p not in old: continue
r = old[p]
print(f"{p:<14}{fmt(r['ttft_p50_ms']):>9}ms{fmt(r['ttft_p90_ms']):>9}ms{fmt(r['ttft_p99_ms']):>9}ms{fmt(r['tpot_p90_ms']):>9}ms{fmt(r['e2e_p90_ms']):>9}ms")
print(f"\n# DRIFT: today vs old (same policy)")
print(f"{'policy':<14}{'ΔTTFTp50':>10}{'ΔTTFTp90':>10}{'ΔTTFTp99':>10}{'ΔTPOTp90':>10}{'ΔE2Ep90':>10}")
print("-" * 60)
for p in POLICIES:
if p not in new or p not in old: continue
n, o = new[p], old[p]
print(f"{p:<14}{pctd(o['ttft_p50_ms'], n['ttft_p50_ms']):>10}"
f"{pctd(o['ttft_p90_ms'], n['ttft_p90_ms']):>10}"
f"{pctd(o['ttft_p99_ms'], n['ttft_p99_ms']):>10}"
f"{pctd(o['tpot_p90_ms'], n['tpot_p90_ms']):>10}"
f"{pctd(o['e2e_p90_ms'], n['e2e_p90_ms']):>10}")
# Relative ordering check
def ranks(values_dict, key):
items = [(p, r[key]) for p, r in values_dict.items() if r.get(key)]
items.sort(key=lambda x: x[1])
return [p for p, _ in items]
print(f"\n# TTFT p90 ranking (best → worst)")
for label, src in [("OLD", old), ("NEW", new)]:
if src:
order = ranks(src, "ttft_p90_ms")
print(f" {label}: {' < '.join(order)}")
out = {"new": new, "old": old}
out_path = args.root / "b3_replay_summary.json"
out_path.write_text(json.dumps(out, indent=2))
print(f"\nWrote {out_path}")
# Bar plot (matplotlib)
if not args.figure:
args.figure = args.root / "fig_b3_latency_bars_new.png"
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
pols = [p for p in POLICIES if p in new]
metrics = [("TTFT p90 (s)", "ttft_p90_ms", 1000),
("TPOT p90 (ms)", "tpot_p90_ms", 1),
("E2E p90 (s)", "e2e_p90_ms", 1000)]
colors = {"lmetric": "tab:blue", "load_only": "tab:orange",
"sticky": "tab:green", "unified": "tab:red",
"unified_v2": "tab:purple"}
fig, axes = plt.subplots(1, 3, figsize=(14, 4.5))
for ax, (label, key, div) in zip(axes, metrics):
vals = [new[p][key] / div for p in pols]
bars = ax.bar(pols, vals,
color=[colors.get(p, "gray") for p in pols],
edgecolor="black", linewidth=0.5)
ax.set_title(label)
ax.tick_params(axis="x", rotation=20)
for b, v in zip(bars, vals):
ax.text(b.get_x() + b.get_width() / 2, v, f"{v:.1f}",
ha="center", va="bottom", fontsize=9)
ax.grid(alpha=0.3, axis="y")
fig.suptitle(f"B3 5-policy re-test ({args.root.name})")
fig.tight_layout()
fig.savefig(args.figure, dpi=120)
print(f"Wrote {args.figure}")
except Exception as e:
print(f"(figure skipped: {e})")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,94 @@
#!/usr/bin/env bash
# B3 routing-policy reproducibility re-test.
#
# Re-runs the 5 routing policies from fig_b3_latency_bars.png on the same
# trace, in a single same-day session, to check whether the ordering
# (unified < load_only < sticky etc.) still holds today.
#
# Policies (in run order):
# lmetric plain — cache-aware P_tokens × BS
# load_only plain — pure min-num_requests
# sticky plain — hard session affinity
# unified plain — hybrid affinity + LMetric fallback
# unified_v2 Mooncake kv_both + selective PD-sep (with DR-fix applied)
#
# unified_v2 is run with VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 so we
# get the "best Mooncake state" we have today (DR-fix on top of the
# already-fixed mainline after e3a1d70 etc.). The other 4 policies don't
# load any connector so the patch is irrelevant.
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_replay_${DATE}}"
PYTHON="$PROJ_DIR/.venv/bin/python"
DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py"
VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}"
mkdir -p "$OUTROOT"
echo "=== B3 5-policy re-test ==="
echo "Trace : $TRACE"
echo "Out : $OUTROOT"
echo "Order : lmetric → load_only → sticky → unified → unified_v2 (DR-fix on)"
echo ""
cleanup_all() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
}
trap cleanup_all EXIT
cleanup_all
# Apply DR-fix once — it's env-gated so only unified_v2 (with env=1) sees it
echo "[stage 0] applying CT_DR_FIX (env-gated, only activates when VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1)"
"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT"
run_policy() {
local policy="$1"
local skip_dr="$2"
local rundir="$OUTROOT/$policy"
mkdir -p "$rundir"
echo ""
echo "====== $policy ; DR_SYNC_DISABLED=$skip_dr ======"
if [ "$skip_dr" = "1" ]; then
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
else
unset VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC
fi
bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$policy" "$TRACE" "$rundir" \
2>&1 | tee "$rundir/orchestrator.log" | tail -30
rc="${PIPESTATUS[0]}"
if [ "$rc" != "0" ]; then
echo "[FAIL] policy $policy rc=$rc"
fi
# Belt-and-braces cleanup between policies
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 10
return 0
}
run_policy "lmetric" "0"
run_policy "load_only" "0"
run_policy "sticky" "0"
run_policy "unified" "0"
run_policy "unified_v2" "1" # uses Mooncake kv_both; activate DR-fix
echo ""
echo "[stage Z] reverting CT_DR_FIX"
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT"
echo ""
echo "Done. Artifacts: $OUTROOT"
for p in lmetric load_only sticky unified unified_v2; do
echo " $p: $OUTROOT/$p/metrics.jsonl"
done

View File

@@ -0,0 +1,56 @@
#!/usr/bin/env bash
# Single-policy trace replay for unified, with tunable overload-factor.
# Used to test direction A: does tightening affinity overflow improve unified?
#
# Usage:
# OVERLOAD_FACTOR=1.3 bash run_unified_ablation.sh
# OVERLOAD_FACTOR=1.0 bash run_unified_ablation.sh
#
# Output: $PROJ_DIR/outputs/unified_of${OF}_${DATE}/unified/
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
OF="${OVERLOAD_FACTOR:-1.3}"
LMW="${LMETRIC_DECODE_WEIGHT:-0.0}"
TAG_DEFAULT="of${OF/./}"
if [ "$(printf '%s' "$LMW" | grep -v '^0\.\?0*$' || true)" != "" ]; then
TAG_DEFAULT="${TAG_DEFAULT}_lmw${LMW/./}"
fi
TAG="${TAG:-$TAG_DEFAULT}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/unified_${TAG}_${DATE}}"
mkdir -p "$OUTROOT"
echo "=== unified ablation: overload_factor=$OF ==="
echo "Trace : $TRACE"
echo "Out : $OUTROOT"
echo ""
cleanup() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 3
}
trap cleanup EXIT
cleanup
cfg_dir="$OUTROOT/unified"
mkdir -p "$cfg_dir"
export EXTRA_PROXY_ARGS="--overload-factor $OF --lmetric-decode-weight $LMW"
echo ""
echo "====== unified ; overload_factor=$OF lmetric_decode_weight=$LMW ======"
bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified" "$TRACE" "$cfg_dir" \
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
echo ""
echo "Done. Artifacts: $OUTROOT/unified/metrics.jsonl"

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env bash
# Trace replay for unified_v3 WITHOUT affinity rotation.
#
# This is the #2 follow-up to cache_miss_audit: prior run showed v3 with
# rotation hits 9.5% cache on post-migration next turn vs 80.6% for unified.
# Hypothesis: rotation destroys prefix cache locality. Test by keeping the
# session affinity on prefill_host even after migration (i.e., the same
# behavior as unified for the post-migration write), so only the *current*
# turn's decode is migrated.
#
# Applies CT_DR_FIX (Mooncake DR sync disabled).
# Output: $PROJ_DIR/outputs/b3_v3_norot_${DATE}/unified_v3/
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_norot_${DATE}}"
PYTHON="$PROJ_DIR/.venv/bin/python"
DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py"
VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}"
mkdir -p "$OUTROOT"
echo "=== unified_v3 (no rotation) trace replay ==="
echo "Trace : $TRACE"
echo "Out : $OUTROOT"
echo ""
cleanup_all() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
}
trap cleanup_all EXIT
cleanup_all
echo "[stage 0] applying CT_DR_FIX (env-gated)"
"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT"
cfg_dir="$OUTROOT/unified_v3"
mkdir -p "$cfg_dir"
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
export EXTRA_PROXY_ARGS="--v3-rotate-affinity 0"
echo ""
echo "====== unified_v3 (no rotation) ; DR_SYNC_DISABLED=1 ======"
bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
echo ""
echo "[stage Z] reverting CT_DR_FIX"
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT"
echo ""
echo "Done. Artifacts: $OUTROOT/unified_v3/metrics.jsonl"

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env bash
# Trace replay for the new unified_v3 (offload-decode) policy.
#
# Runs the same trace as run_b3_replay.sh on a single policy:
# unified_v3 — prefill on session-affinity host (uses prefix cache),
# decode migrated to a low-load target via Mooncake
# KV transfer (kv_role=kv_both). Session affinity rotates
# to decode_target after migration so next turn lands
# where the KV now lives.
#
# Applies CT_DR_FIX so the run uses the "best Mooncake state" we have
# today (post-e3a1d70 + DR sync skipped).
#
# Usage: bash run_v3_replay.sh
set -uo pipefail
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_${DATE}}"
PYTHON="$PROJ_DIR/.venv/bin/python"
DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py"
VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}"
mkdir -p "$OUTROOT"
echo "=== unified_v3 (offload-decode) trace replay ==="
echo "Trace : $TRACE"
echo "Out : $OUTROOT"
echo ""
cleanup_all() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
}
trap cleanup_all EXIT
cleanup_all
echo "[stage 0] applying CT_DR_FIX (env-gated)"
"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT"
cfg_dir="$OUTROOT/unified_v3"
mkdir -p "$cfg_dir"
# Activate the DR-fix env-gate (unified_v3 uses Mooncake kv_both)
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
echo ""
echo "====== unified_v3 ; DR_SYNC_DISABLED=1 ======"
bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
echo ""
echo "[stage Z] reverting CT_DR_FIX"
"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT"
echo ""
echo "Done. Artifacts: $OUTROOT/unified_v3/metrics.jsonl"

View File

@@ -19,7 +19,7 @@ import os
import time as _time import time as _time
import urllib.parse import urllib.parse
import uuid import uuid
from collections import OrderedDict from collections import OrderedDict, deque
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
@@ -103,6 +103,20 @@ class Settings:
# auto-transfers only the missing portion (verified via # auto-transfers only the missing portion (verified via
# smoke_partial_transfer: cache-rich dst is 77% faster than # smoke_partial_transfer: cache-rich dst is 77% faster than
# cold dst at 33k tokens, +512 ext). # cold dst at 33k tokens, +512 ext).
# Anti-hotspot: picker scores effective_load = num_requests + (recent
# migrations received within window). Prevents clustering migrations on
# one instance in rapid succession (observed in Mech B run: inst_5 became
# a hotspot via post-rotation tail accumulation).
v3_recent_mig_window_s: float = 10.0 # sliding window
v3_recent_mig_weight: float = 1.0 # how many "virtual requests" each
# recent migration counts as
# Direction B knob: LMetric fallback adds decode-token penalty to score.
# score = (pending_prefill + new + lmetric_decode_weight * ongoing_decode_tok) * num_req
# Empirical iter-time slope on H100 + Qwen3-30B-A3B: each decode token in
# batch costs ~0.01 prefill-token-equivalent in scheduler time, so 0.01 is
# a reasonable starting weight. Set 0 to disable (original behavior).
lmetric_decode_weight: float = 0.0
# --- KV connector selection (governs PD-sep handshake) ------------- # --- KV connector selection (governs PD-sep handshake) -------------
# "mooncake": pre-baked kv_transfer_params (bootstrap_addr+engine_id+transfer_id). # "mooncake": pre-baked kv_transfer_params (bootstrap_addr+engine_id+transfer_id).
@@ -187,6 +201,11 @@ class InstanceState:
self.dp_size = 1 self.dp_size = 1
# OrderedDict acts as an LRU keyed by block hash; value is unused. # OrderedDict acts as an LRU keyed by block hash; value is unused.
self.cached_blocks: OrderedDict[int, None] = OrderedDict() self.cached_blocks: OrderedDict[int, None] = OrderedDict()
# v3 anti-hotspot: timestamps (monotonic) when this instance was picked
# as a v3 migration target. Used to compute effective_load = num_req +
# recent-migration count over a sliding window, preventing back-to-back
# decisions from clustering on the same dst.
self.recent_mig_targeted_at: deque[float] = deque(maxlen=64)
def estimate_cache_hit(self, token_ids: list[int] | None) -> int: def estimate_cache_hit(self, token_ids: list[int] | None) -> int:
if not token_ids or len(token_ids) < BLOCK_SIZE: if not token_ids or len(token_ids) < BLOCK_SIZE:
@@ -417,13 +436,24 @@ def pick_instance_unified_hybrid(
decision["chosen_idx"] = a_idx decision["chosen_idx"] = a_idx
return a_inst, a_idx, decision return a_inst, a_idx, decision
keys: list[tuple[int, int, int, int]] = [] # Direction B: extend LMetric with decode-load awareness.
# Original score = (pending_prefill + new_uncached) * num_requests, which
# ignores ongoing decode work. A host with 200k decode tokens looks "ideal"
# (P_tokens=0) but its decode iters are slow due to large batch KV reads.
#
# First attempt (BUG): score = (p_tokens + decode_pen) * num_req — when
# num_req=0 the decode_pen is zeroed out, so idle-but-decoding hosts still
# look free and accumulate cold prefills (8007 hotspot in A+B v1 run).
#
# Fix: max(num_req, 1) so decode_pen contributes on idle hosts too.
keys: list[tuple[float, int, int, int]] = []
for i, inst in enumerate(instances): for i, inst in enumerate(instances):
cache_hit = inst.estimate_cache_hit(token_ids) cache_hit = inst.estimate_cache_hit(token_ids)
new_prefill = max(0, input_length - cache_hit) new_prefill = max(0, input_length - cache_hit)
p_tokens = inst.pending_prefill_tokens + new_prefill p_tokens = inst.pending_prefill_tokens + new_prefill
decode_pen = SETTINGS.lmetric_decode_weight * inst.ongoing_decode_tokens
bs = inst.num_requests bs = inst.num_requests
score = p_tokens * bs score = (p_tokens + decode_pen) * max(bs, 1)
keys.append((score, new_prefill, bs, i)) keys.append((score, new_prefill, bs, i))
best_triple = min(k[:3] for k in keys) best_triple = min(k[:3] for k in keys)
@@ -637,48 +667,80 @@ def pick_instance_unified_v3(
) )
return prefill_host, prefill_idx, decision, None return prefill_host, prefill_idx, decision, None
# Gate 3: pick the lowest-load target that is materially less loaded # Gate 3: pick the lowest-effective-load target. effective_load adds a
# than the prefill_host. Cache content irrelevant — KV ships over. # penalty for recent migrations the instance has received (anti-hotspot).
now_mono = _time.monotonic()
cutoff = now_mono - SETTINGS.v3_recent_mig_window_s
def effective_load(inst):
# Drop expired entries lazily.
while inst.recent_mig_targeted_at and inst.recent_mig_targeted_at[0] < cutoff:
inst.recent_mig_targeted_at.popleft()
recent = len(inst.recent_mig_targeted_at)
return inst.num_requests + recent * SETTINGS.v3_recent_mig_weight
threshold_loaded = max(1, threshold_loaded = max(1,
int(prefill_host.num_requests * SETTINGS.v3_target_load_ratio)) int(prefill_host.num_requests * SETTINGS.v3_target_load_ratio))
candidates = [ candidates = [
(i, inst) for i, inst in enumerate(instances) (i, inst) for i, inst in enumerate(instances)
if i != prefill_idx if i != prefill_idx
and inst.num_requests < threshold_loaded and effective_load(inst) < threshold_loaded
and inst.num_requests <= prefill_host.num_requests - SETTINGS.v3_min_load_gap and effective_load(inst) <= prefill_host.num_requests - SETTINGS.v3_min_load_gap
] ]
if not candidates: if not candidates:
decision["v3_reason"] = ( decision["v3_reason"] = (
f"no_low_load_target " f"no_low_load_target "
f"(prefill_host.num_req={prefill_host.num_requests} " f"(prefill_host.num_req={prefill_host.num_requests} "
f"threshold={threshold_loaded})" f"threshold={threshold_loaded} "
f"eff_loads=[{','.join(f'{int(effective_load(i))}' for i in instances)}])"
) )
return prefill_host, prefill_idx, decision, None return prefill_host, prefill_idx, decision, None
# Mechanism B (v3_prefer_cache_target=True): rank candidates first by # Mechanism B (v3_prefer_cache_target=True): rank candidates first by
# cache_hit DESC (more cache = less KV to transfer), then by load. vLLM # cache_hit DESC (more cache = less KV to transfer), then by effective_load
# auto-skips transferring overlapping prefix when dst's local cache # (which includes recent-migration penalty), then by ongoing_tokens.
# matches — verified in smoke_partial_transfer: 77% faster on a 33k
# prompt when dst has the prefix already.
if SETTINGS.v3_prefer_cache_target: if SETTINGS.v3_prefer_cache_target:
decode_target_idx, decode_target = min( decode_target_idx, decode_target = min(
candidates, candidates,
key=lambda x: (-x[1].estimate_cache_hit(token_ids), key=lambda x: (-x[1].estimate_cache_hit(token_ids),
x[1].num_requests, x[1].ongoing_tokens)) effective_load(x[1]),
x[1].ongoing_tokens))
else: else:
decode_target_idx, decode_target = min( decode_target_idx, decode_target = min(
candidates, key=lambda x: (x[1].num_requests, x[1].ongoing_tokens)) candidates, key=lambda x: (effective_load(x[1]), x[1].ongoing_tokens))
target_cache_hit = decode_target.estimate_cache_hit(token_ids) target_cache_hit = decode_target.estimate_cache_hit(token_ids)
target_recent_received = len(decode_target.recent_mig_targeted_at)
# Record this decision for the anti-hotspot accounting.
decode_target.recent_mig_targeted_at.append(now_mono)
decision["v3_migrate"] = True decision["v3_migrate"] = True
decision["v3_decision"] = "migrate_decode" decision["v3_decision"] = "migrate_decode"
decision["v3_src_idx"] = prefill_idx
decision["v3_target_idx"] = decode_target_idx decision["v3_target_idx"] = decode_target_idx
decision["v3_target_num_req"] = decode_target.num_requests decision["v3_target_num_req"] = decode_target.num_requests
decision["v3_target_cache_hit"] = target_cache_hit decision["v3_target_cache_hit"] = target_cache_hit
decision["v3_target_recent_received"] = target_recent_received
decision["v3_prefill_num_req"] = prefill_host.num_requests decision["v3_prefill_num_req"] = prefill_host.num_requests
# Snapshot of src state at the moment of decision (for postmortem).
decision["v3_src_state"] = {
"num_requests": prefill_host.num_requests,
"ongoing_tokens": prefill_host.ongoing_tokens,
"ongoing_decode_tokens": prefill_host.ongoing_decode_tokens,
"pending_prefill_tokens": prefill_host.pending_prefill_tokens,
}
decision["v3_target_state"] = {
"num_requests": decode_target.num_requests,
"ongoing_tokens": decode_target.ongoing_tokens,
"ongoing_decode_tokens": decode_target.ongoing_decode_tokens,
"pending_prefill_tokens": decode_target.pending_prefill_tokens,
"cache_hit_estimate": target_cache_hit,
"recent_mig_received_in_window": target_recent_received,
}
decision["v3_reason"] = ( decision["v3_reason"] = (
f"prefill_host.num_req={prefill_host.num_requests} busy; " f"prefill_host.num_req={prefill_host.num_requests} busy; "
f"target.num_req={decode_target.num_requests} cache_hit={target_cache_hit}, " f"target.num_req={decode_target.num_requests} cache_hit={target_cache_hit} "
f"recent_received={target_recent_received}, "
f"transferring KV after prefill" f"transferring KV after prefill"
) )
return prefill_host, prefill_idx, decision, (decode_target, decode_target_idx) return prefill_host, prefill_idx, decision, (decode_target, decode_target_idx)
@@ -987,12 +1049,16 @@ async def _handle(request: Request, api: str):
async def _handle_local_request(api, req_data, headers, token_ids, input_length, async def _handle_local_request(api, req_data, headers, token_ids, input_length,
chosen: InstanceState, estimated_new: int, chosen: InstanceState, estimated_new: int,
breakdown: dict): breakdown: dict, *, _pre_reserved: bool = False):
breakdown.setdefault("route_class", "LOCAL") breakdown.setdefault("route_class", "LOCAL")
breakdown.setdefault("routed_to", chosen.url) breakdown.setdefault("routed_to", chosen.url)
chosen.ongoing_tokens += input_length # Skip reservation when called from _handle_combined (it already reserved
chosen.pending_prefill_tokens += estimated_new # synchronously to close the picker→await race). When called directly
chosen.num_requests += 1 # from non-combined paths (PD-Sep, offload), reserve here for safety.
if not _pre_reserved:
chosen.ongoing_tokens += input_length
chosen.pending_prefill_tokens += estimated_new
chosen.num_requests += 1
async def generate(): async def generate():
prefill_done = False prefill_done = False
@@ -1180,9 +1246,19 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
src_inst, chosen, breakdown, src_inst, chosen, breakdown,
request_id=request_id) request_id=request_id)
# Race fix: reserve load on `chosen` BEFORE the `await` so concurrent
# picker calls in the same asyncio event-loop tick see the updated
# counters. Without this, two requests arriving back-to-back can both
# pick the same "free" instance and both end up running there
# simultaneously (observed as 8007 hotspot in A+B run).
chosen.ongoing_tokens += input_length
chosen.pending_prefill_tokens += estimated_new
chosen.num_requests += 1
breakdown.setdefault("route_class", "LOCAL")
breakdown.setdefault("routed_to", chosen.url)
return await _handle_local_request( return await _handle_local_request(
api, req_data, headers, token_ids, input_length, api, req_data, headers, token_ids, input_length,
chosen, estimated_new, breakdown) chosen, estimated_new, breakdown, _pre_reserved=True)
async def _handle_combined_pd_sep_v2( async def _handle_combined_pd_sep_v2(
@@ -1545,6 +1621,10 @@ def parse_args():
help="Mechanism B: unified_v3 picks decode_target with the most" help="Mechanism B: unified_v3 picks decode_target with the most"
" prefix cache among low-load candidates (default 1). Set 0" " prefix cache among low-load candidates (default 1). Set 0"
" to fall back to pure-load tie-break (cache-blind).") " to fall back to pure-load tie-break (cache-blind).")
p.add_argument("--lmetric-decode-weight", type=float, default=0.0,
help="Direction B: LMetric fallback adds this × ongoing_decode_tokens"
" to the queue-depth score, so hosts with heavy decode load get"
" penalised. 0 = original behavior; 0.01 is a reasonable start.")
p.add_argument("--overload-factor", type=float, default=2.0, p.add_argument("--overload-factor", type=float, default=2.0,
help="Break session affinity when instance load > factor * avg") help="Break session affinity when instance load > factor * avg")
# The four flags below are accepted for bench.sh backward compatibility but # The four flags below are accepted for bench.sh backward compatibility but
@@ -1585,11 +1665,13 @@ if __name__ == "__main__":
SETTINGS.v3_rotate_affinity = bool(getattr(global_args, 'v3_rotate_affinity', 1)) SETTINGS.v3_rotate_affinity = bool(getattr(global_args, 'v3_rotate_affinity', 1))
SETTINGS.connector_type = getattr(global_args, 'connector_type', 'mooncake') SETTINGS.connector_type = getattr(global_args, 'connector_type', 'mooncake')
SETTINGS.v3_prefer_cache_target = bool(getattr(global_args, 'v3_prefer_cache_target', 1)) SETTINGS.v3_prefer_cache_target = bool(getattr(global_args, 'v3_prefer_cache_target', 1))
SETTINGS.lmetric_decode_weight = float(getattr(global_args, 'lmetric_decode_weight', 0.0))
print("SETTINGS: throughput=%.0f rdma_overhead=%.2f offload=%s v3_rotate_affinity=%s " print("SETTINGS: throughput=%.0f rdma_overhead=%.2f offload=%s v3_rotate_affinity=%s "
"connector_type=%s v3_prefer_cache_target=%s" % ( "connector_type=%s v3_prefer_cache_target=%s lmetric_decode_weight=%.3f" % (
SETTINGS.prefill_throughput, SETTINGS.rdma_overhead_s, SETTINGS.prefill_throughput, SETTINGS.rdma_overhead_s,
getattr(global_args, 'offload', False), getattr(global_args, 'offload', False),
SETTINGS.v3_rotate_affinity, SETTINGS.v3_rotate_affinity,
SETTINGS.connector_type, SETTINGS.connector_type,
SETTINGS.v3_prefer_cache_target)) SETTINGS.v3_prefer_cache_target,
SETTINGS.lmetric_decode_weight))
uvicorn.run(app, host=global_args.host, port=global_args.port) uvicorn.run(app, host=global_args.host, port=global_args.port)