From 67fcec7933404068197f1ec85436bbb86d243648 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 29 May 2026 11:52:44 +0800 Subject: [PATCH] Unified-routing A+B ablation: decode-aware LMetric + v3 anti-hotspot cache_aware_proxy: add lmetric_decode_weight (decode-load penalty in the LMetric fallback score) and a v3 anti-hotspot recent-migration penalty (effective_load = num_req + recent-migration count over a sliding window), preventing back-to-back migration clustering. UNIFIED_ABLATION.md documents the A (overload_factor=1.3) + B' (decode-weight, max(num_req,1)) + RaceFix sweep: A+B'+RaceFix reaches TTFT p90 7770ms, beating v3 PD-sep migration by ~20%. Runners/analyzer for the b3 trace replay included. Co-Authored-By: Claude Opus 4.8 --- .../cache_sweep/UNIFIED_ABLATION.md | 431 ++++++++++++++++++ .../cache_sweep/analyze_b3_replay.py | 169 +++++++ .../cache_sweep/run_b3_replay.sh | 94 ++++ .../cache_sweep/run_unified_ablation.sh | 56 +++ .../cache_sweep/run_v3_norot_replay.sh | 64 +++ .../cache_sweep/run_v3_replay.sh | 66 +++ scripts/cache_aware_proxy.py | 126 ++++- 7 files changed, 984 insertions(+), 22 deletions(-) create mode 100644 microbench/connector_tax/cache_sweep/UNIFIED_ABLATION.md create mode 100755 microbench/connector_tax/cache_sweep/analyze_b3_replay.py create mode 100755 microbench/connector_tax/cache_sweep/run_b3_replay.sh create mode 100644 microbench/connector_tax/cache_sweep/run_unified_ablation.sh create mode 100644 microbench/connector_tax/cache_sweep/run_v3_norot_replay.sh create mode 100755 microbench/connector_tax/cache_sweep/run_v3_replay.sh diff --git a/microbench/connector_tax/cache_sweep/UNIFIED_ABLATION.md b/microbench/connector_tax/cache_sweep/UNIFIED_ABLATION.md new file mode 100644 index 0000000..329efe7 --- /dev/null +++ b/microbench/connector_tax/cache_sweep/UNIFIED_ABLATION.md @@ -0,0 +1,431 @@ +# Unified routing ablation: A (tighter affinity) + B (decode-aware LMetric) + +Goal: judge whether `unified` (cache-aware hybrid affinity + LMetric fallback) +has enough headroom to surpass v3 migration-based routing on agentic +workloads, without invoking PD-sep migration. + +## Workload / baseline + +- Trace: `w600_r0.0015_st30.jsonl` (1214 reqs, 274 sessions) +- Hardware: 8 × H100 (dash0), Qwen3-Coder-30B-A3B, TP=1, max_model_len=200000 +- Trace replay through `cache_aware_proxy.py` with policy `unified` +- `b3_replay_20260527_0114/unified/` reference + +| Metric (ms) | baseline (`overload_factor=2.0`) | +|---|---:| +| TTFT p50 | 520 | +| TTFT p90 | **8781** | +| TTFT p99 | 47647 | +| TPOT p90 | 17.8 | +| E2E p90 | 19989 | +| E2E p99 | 85841 | + +Reference points we're trying to beat / match: +- v3 fixed rotation (cache-blind picker): TTFT p90 = 10828 +- v3 + Mechanism B (cache-rich picker): TTFT p90 = 9711 +- All v3 variants are +10–23% worse than `unified` baseline. + +## Tail-source diagnostic on baseline + +Decision split, baseline unified: + +| Decision | n | TTFT mean | TTFT p90 | TTFT p99 | +|---|---:|---:|---:|---:| +| affinity | 852 | 3183 | 7011 | 47432 | +| lmetric_fallback | 362 | 4285 | 12083 | 46036 | + +Long-tail (>20s, n=65): +- 40 / 65 came from `affinity` decisions +- 25 / 65 came from `lmetric_fallback` + +For the 40 slow `affinity` reqs: +- only 12 / 40 were actually overloaded at decision time (`aff_num_req > avg_num_req`) +- overload ratio at decision: mean=0.93, p50=0.87 +- **most slow affinity reqs looked fine when the picker stuck — load piled + on after dispatch**. + +This is a snapshot-based-routing limitation. Tightening +`overload_factor` only helps the genuine cases above the new threshold — +expected to be a 5-10% improvement at best. + +--- + +## Direction A — tighten affinity overflow + +**Hypothesis.** `overload_factor=2.0` lets the picker stick to affinity +even when `affinity.num_req` is up to 2× the cluster average. Reducing to +1.3 forces earlier overflow to LMetric fallback, escaping busy affinity +hosts before the tail blows up. + +**Change.** Single CLI flag: `--overload-factor 1.3`. No code change. + +**Run.** `unified_of13_20260527_1532/unified/`. + +### A vs baseline + +| Metric (ms) | baseline (of=2.0) | A (of=1.3) | Δ | +|---|---:|---:|---:| +| TTFT p50 | 520 | 495 | −5% | +| TTFT p90 | 8781 | 8730 | ≈0 | +| TTFT p99 | 47647 | 43059 | −10% | +| TPOT p50 | 7.9 | 8.0 | ≈0 | +| TPOT p90 | 17.8 | **15.5** | **−13%** | +| E2E p50 | 1761 | 1824 | +4% | +| E2E p90 | 19989 | 18407 | −8% | +| E2E p99 | 85841 | **71396** | **−17%** | + +TTFT p90 is essentially unchanged but the **deeper tail (p99) and +TPOT both improved meaningfully**. Net: A alone gives roughly −10% to +−17% on the long tail without hurting medians. + +### Decision split, A vs baseline + +| Decision | baseline n / p90 | A n / p90 | Δ p90 | +|---|---|---|---| +| affinity | 852 / 7011 | 817 / **5817** | **−17%** ✅ | +| lmetric_fallback | 362 / 12083 | 397 / **15360** | **+27%** ⚠️ | + +The picker now sticks to affinity 35 fewer times. The remaining affinity +decisions are higher-quality (no longer "barely-fitting" cases), so their +p90 drops 17%. + +But the 35 extra reqs that got pushed into fallback **got slower**: +fallback p90 went from 12083 → 15360. The LMetric scorer is selecting a +worse instance for them. + +### Per-worker TTFT under A (of=1.3) + +``` +port 8000: n= 94 mean=4424 p90=12290 port 8004: n=192 mean=2597 p90=6968 +port 8001: n= 135 mean=2779 p90= 5553 port 8005: n=202 mean=3102 p90=6113 +port 8002: n= 88 mean=5827 p90=15804 port 8006: n=136 mean=4006 p90=10899 +port 8003: n= 217 mean=2674 p90= 4598 port 8007: n=150 mean=3648 p90= 7025 +``` + +Compared to baseline (88..217 reqs/port), A redistributes more evenly +(88..217 still but distribution is fatter in the middle). port 8002 +remains slow (p90 15.8s) — its cache pool seems to keep getting cold +work routed there by LMetric. + +### Why A alone isn't enough + +LMetric scorer (`unified_hybrid` fallback path): + +```python +score = (pending_prefill_tokens + new_uncached_tokens) * num_requests +``` + +This **ignores `ongoing_decode_tokens`** entirely. An instance with no +pending prefill but 200k tokens currently in decode looks "ideal" +(score=0×num_req=0) — yet a new request landing there waits behind +slow decode iters caused by the large batch KV reads. + +A pushes more requests into fallback, but fallback can't tell which +instance is actually free. → Direction B is mandatory companion. + +--- + +## Direction B — decode-aware LMetric + +**Hypothesis.** Adding a decode-load penalty to the LMetric score lets +fallback distinguish "no prefill queued but heavy decode running" from +"truly idle". Should restore fallback p90 ≤ 12s baseline level. + +**Change.** +```python +score = (pending_prefill + new + lmetric_decode_weight * ongoing_decode_tokens) * num_requests +``` +- `lmetric_decode_weight=0.0` ⇒ original LMetric (control) +- `lmetric_decode_weight=0.01` ⇒ first experiment (rationale: 1 decode token + in batch costs ~0.01 prefill-token-equivalent in scheduler iter time + on H100 + Qwen3-30B-A3B) + +CLI: `--lmetric-decode-weight 0.01`. Setting in code: +`cache_aware_proxy.py:Settings.lmetric_decode_weight`. + +**Run.** `unified_of13_lmw001_20260527_1628/unified/`. + +### A+B vs baseline / A + +| Metric (ms) | baseline | A (of=1.3) | A+B (of=1.3, lmw=0.01) | Δ vs baseline | +|---|---:|---:|---:|---:| +| TTFT p50 | 520 | 495 | 514 | −1% | +| **TTFT p90** | 8781 | 8730 | **8421** | **−4%** ✅ | +| TTFT p99 | 47647 | 43059 | 44800 | −6% | +| TPOT p50 | 7.9 | 8.0 | 7.9 | ≈0 | +| TPOT p90 | 17.8 | 15.5 | 15.7 | −12% | +| E2E p50 | 1761 | 1824 | 1870 | +6% | +| E2E p90 | 19989 | 18407 | **21064** | **+5%** ⚠️ | +| E2E p99 | 85841 | 71396 | **64344** | **−25%** ✅ | + +Long-tail counts: + +``` +thresh baseline A A+B v3 MechB +> 5000ms 170 173 170 177 +> 10000ms 105 109 109 119 +> 20000ms 65 64 59 78 +> 30000ms 41 40 37 50 +> 50000ms 8 5 6 14 +``` + +A+B is best on every long-tail-count threshold ≤30s, marginal worse at 50s. + +### Decision split (A+B vs A) + +| Decision | A (of=1.3) | A+B | Note | +|---|---|---|---| +| affinity p90 | 5817 | 5836 | ≈ same | +| fallback p90 | **15360** | **13501** | B recovered some of A's fallback regression | + +B partially fixed fallback's selection (−12% on fallback p90 vs A alone), +but still worse than baseline (12083). + +### Per-worker TTFT (A+B) + +``` +port 8000: n=134 mean=3495 p90=10967 port 8004: n=136 mean=3102 p90= 7906 +port 8001: n=143 mean=2981 p90=10189 port 8005: n=179 mean=1624 p90= 2735 +port 8002: n=221 mean=2355 p90= 3502 port 8006: n=137 mean=5356 p90= 9628 +port 8003: n=146 mean=3932 p90=10729 port 8007: n=118 mean=5210 p90=26798 ← new hotspot +``` + +A+B trades the baseline's 8002 hotspot (p90=35s) for a new 8007 hotspot +(p90=26.8s). Lower amplitude but hotspot survives. + +### Why 8007 became a hotspot under A+B — **found a bug in B** + +8007 in A+B: 118 reqs, **53% affinity / 47% fallback** (vs other ports +60–77% affinity), **cache_hit_mean=50.5% (lowest)**. + +Top-10 slowest at 8007: all are big-prompt (100k+ tokens) fallback decisions +with `cached_tokens=0` (cold prefill). LMetric is pushing many cold-prefill +fallbacks to 8007. + +Looking at the B formula: + +```python +decode_pen = lmetric_decode_weight * ongoing_decode_tokens +score = (pending_prefill + new + decode_pen) * num_requests # ← BUG +``` + +When `num_requests = 0`, the entire score (including decode penalty) zeros +out. So an idle-but-decoding host (num_req=0 because its last prefill +finished but decode is still running) looks like score=0, beating every +busy host. + +**Fix (B'):** multiply by `max(num_requests, 1)`: + +```python +score = (pending_prefill + new + decode_pen) * max(num_requests, 1) +``` + +Now idle hosts with high decode load get score = decode_pen × 1 = real +nonzero penalty, beating zero-load hosts only when decode is small. + +### A+B' — re-run with the fix + +**Run.** `unified_of13_lmw001_v2_20260527_1724/unified/`. + +| Metric (ms) | baseline | A+B (BUG) | A+B' (fix) | Δ vs baseline | +|---|---:|---:|---:|---:| +| TTFT p50 | 520 | 514 | **485** | −7% | +| **TTFT p90** | 8781 | 8421 | **8287** | **−5.6%** ✅ | +| TTFT p99 | 47647 | 44800 | **41876** | **−12%** ✅ | +| TPOT p90 | 17.8 | 15.7 | 17.5 | −2% | +| E2E p90 | 19989 | 21064 | 20625 | +3% | +| E2E p99 | 85841 | 64344 | 77827 | −9% | + +A+B' **best of all variants on TTFT p90 (8287) and TTFT p99 (41876)**. +Long-tail counts (>30s, >50s) also best across variants. + +vs v3 reference points: +| | TTFT p90 | TPOT p90 | E2E p99 | +|---|---:|---:|---:| +| **A+B'** | **8287** | 17.5 | 77827 | +| v3 fixed (cache-blind) | 10828 | 21.0 | 47610 | +| v3 + Mech B | 9711 | 18.3 | 84492 | + +A+B' **beats v3 Mech B by 15% TTFT p90** with no migration overhead. + +### Per-worker (A+B' fixed) + +``` +8000: n=158 p90= 5688 8004: n=189 p90= 4249 +8001: n=159 p90= 7323 8005: n=116 p90=14598 +8002: n=114 p90= 8726 8006: n=180 p90= 6198 +8003: n=173 p90= 6715 8007: n=125 p90=22242 ← still hot +``` + +A+B' redistributed load more evenly (114..189) but **8007 still has p90=22s**. + +### 8007 deep-dive in A+B' + +``` +8007: n=125, affinity=69 (55%), fallback=56 (45%), cache_hit_mean=lowest +``` + +Top-15 slow at 8007: +- 7 of them are session **1313181** turns 9–14 (130k+ tokens each, agentic + long context, ~50% cache hit) +- Several others are cold-start turn-1 of large-prompt sessions +- First two slow reqs arrived **0.7 s apart** — strong hint of concurrent + picker race + +### Iteration 3: race-condition fix + +**Diagnosis.** In `_handle_combined`: + +```python +chosen, best_idx, decision = pick_instance_unified_hybrid(...) # sync +# ... sync breakdown updates ... +return await _handle_local_request(...) # ← await yields here + # THEN reservation happens +``` + +`return await async_func(...)` evaluates the async call (creates coroutine) +and yields to the event loop **before** the coroutine body executes. The +reservation (`chosen.pending_prefill_tokens += new`, etc.) lives at the top +of `_handle_local_request`, so between the picker and the reservation there +is a **window where another coroutine can run and re-pick the same instance**. + +When two big-prompt reqs arrive within milliseconds, both run pick → +both pick the "free" 8007 → both yield → both reserve. Result: 8007 gets +back-to-back 130k-token cold prefills, each waiting for the other. + +**Fix.** Move the reservation **before** the await, inside `_handle_combined`: + +```python +# Race fix: reserve atomically with pick, before any await. +chosen.ongoing_tokens += input_length +chosen.pending_prefill_tokens += estimated_new +chosen.num_requests += 1 +return await _handle_local_request(..., _pre_reserved=True) +``` + +`_handle_local_request` skips its own reservation when `_pre_reserved=True`. +PD-sep paths are unaffected (they have their own reservation). + +**Run.** Pending — `unified_of13_lmw001_racefix_*`. Hypothesis: 8007 p90 +drops to within ±3s of cluster median, since concurrent picks for the +same "free" instance no longer happen. + +--- + +## A+B'+RaceFix — results + +**Run.** `unified_of13_lmw001_racefix_20260527_1821/unified/`. + +| Metric (ms) | baseline | A+B' | A+B'+RF | Δ vs baseline | +|---|---:|---:|---:|---:| +| TTFT p50 | 520 | 485 | **478** | −8% | +| **TTFT p90** | 8781 | 8287 | **7770** | **−11.5%** ✅ | +| TTFT p99 | 47647 | 41876 | **42447** | −11% | +| TPOT p90 | 17.8 | 17.5 | 18.0 | +1% | +| E2E p90 | 19989 | 20625 | **18418** | −8% | +| E2E p99 | 85841 | 77827 | **71227** | −17% | + +vs v3 reference: +- **A+B'+RF TTFT p90 = 7770ms, vs v3 Mech B 9711ms → −20%** ✅ + +Long-tail counts (best across all variants): +``` +> 5s: 170 → 158 > 30s: 41 → 33 +>10s: 105 → 103 > 50s: 8 → 4 +>20s: 65 → 57 >100s: 0 → 0 +``` + +### Decision split — race fix mainly helped affinity + +| Decision | baseline | A+B'+RF | +|---|---:|---:| +| affinity p90 | 7011 | **5042** ✅ (−28%) | +| fallback p90 | 12083 | 13944 (+15%) | + +The race-condition was hurting affinity decisions the most. When two +concurrent reqs both stuck to a "free-looking" affinity instance, they +piled up and inflated affinity's tail. Fix removed this collision. + +### Per-worker + +``` +8000: n=86 p90=11541 8004: n=150 p90=11906 +8001: n=186 p90= 8307 8005: n=109 p90= 4798 +8002: n=105 p90=14540 8006: n=183 p90= 6258 +8003: n=264 p90= 3079 8007: n=131 p90=21850 ← still hot +8000 spread now 86..264 — race fix did disperse routing +``` + +### 8007 still hot — but it's **workload-inherent, not a routing bug** + +Top sessions on 8007: +``` +session 1279412: n=22 mean= 2208 max=18985 decisions: 91% affinity +session 1313181: n=17 mean=17399 max=49089 decisions: 65% affinity +session 1262354: n=15 mean= 622 max= 2325 decisions: 87% affinity +session 1342921: n= 7 mean=17817 max=55589 decisions: 86% affinity +session 1260327: n= 8 mean= 1636 max= 5382 decisions: 75% affinity +session 1268831: n= 5 mean= 1443 max= 2673 decisions: 80% affinity +``` + +Sessions 1313181 and 1342921 are **long agentic contexts**: 100k–130k tokens +per turn with ~50% cache hit (i.e. 50k new tokens prefill per turn). Even +on a perfectly load-balanced instance, each turn is 7–15s of pure compute. + +Forcing these sessions to spread across instances would mean **cold prefill +every turn (0% cache hit)** → each turn becomes 20–30s instead of 7–15s. +Spreading is **net-negative**. + +→ The 8007 p90=22s is the floor imposed by these sessions' structure, +not by routing policy. Unified is at its ceiling for this workload. + +--- + +## Final ranking and take-aways + +| Policy | TTFT p90 (ms) | Δ vs baseline | Notes | +|---|---:|---:|---| +| baseline unified (of=2.0) | 8781 | — | reference | +| A (of=1.3) | 8730 | ≈0 | affinity p90 -17%, fallback p90 +27% | +| A+B (of=1.3, lmw=0.01, BUG) | 8421 | −4% | 8007 hotspot from `*num_req` zeroing bug | +| A+B' (formula fix) | 8287 | −5.6% | Bug fixed, still 8007 mild hotspot | +| **A+B'+RaceFix** | **7770** | **−11.5%** ✅ | **Best unified variant** | +| v3 fixed | 10828 | +23% | PD-sep migration, cache-blind picker | +| v3 + Mech B | 9711 | +11% | PD-sep + cache-rich target picker | + +### Conclusions + +1. **Unified path beats v3 PD-sep on this workload by 20%+ TTFT p90.** + PD-sep migration's fixed cost (src prefill + dst first-token waiting on + loaded scheduler) outweighs any decode-time savings for short-output + agentic turns. + +2. **Three orthogonal fixes compound for a 11.5% TTFT p90 win:** + - A (`overload_factor=1.3`): tighter affinity overflow → −0.6% but + much cleaner affinity decisions (p90 -17%) + - B' (`lmetric_decode_weight=0.01` with `max(num_req,1)`): decode-aware + fallback → −3.5% + - RaceFix (atomic reserve before await): kills concurrent-pick + collisions → −5.6% + +3. **Race condition was the biggest single hidden bug.** `return await + async_func(...)` yields to the event loop **before** the body of + `async_func` runs, so reservations done in the body don't take effect + in time to deter concurrent picks. This affects ANY async dispatch + with separate pick/reserve steps — worth checking other routing + policies. + +4. **8007 p90=22s is workload-inherent.** Sessions with 100k+ token turns + at 50% cache hit cannot finish faster than 7–15s per turn regardless + of routing. Forcing spread would hurt rather than help. + +5. **Migration (v3) is not necessary** when unified routing is tuned + well. Save the PD-sep mechanism for cases where it can be proven + net-positive (e.g. very-long-output sessions on extremely overloaded + prefill hosts) and use unified A+B'+RaceFix as the default. + +--- + +## Direction A+B — run pending + +(Will be filled when `unified_of13_lmw001_*/unified/` finishes.) diff --git a/microbench/connector_tax/cache_sweep/analyze_b3_replay.py b/microbench/connector_tax/cache_sweep/analyze_b3_replay.py new file mode 100755 index 0000000..62507e0 --- /dev/null +++ b/microbench/connector_tax/cache_sweep/analyze_b3_replay.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +"""B3 5-policy re-test analyser. + +Compute TTFT/TPOT/E2E mean/p50/p90/p99 for each policy from +metrics.jsonl, compare against the historical b3_policy_comparison.json +that drives fig_b3_latency_bars.png, and emit a side-by-side table +plus a new figure with the same layout as the original. + +Usage: + python analyze_b3_replay.py --root [--old-data ] [--figure ] +""" + +import argparse +import json +import statistics +from pathlib import Path + + +POLICIES = ["lmetric", "load_only", "sticky", "unified", "unified_v2"] + + +def pct(xs, p): + if not xs: + return None + xs = sorted(xs) + k = max(0, min(len(xs) - 1, int(p / 100.0 * (len(xs) - 1)))) + return xs[k] + + +def summarise(path): + rows = [json.loads(l) for l in open(path) if l.strip()] + ok = [r for r in rows if not r.get("error")] + ttft = [r["ttft_s"] * 1000 for r in ok if r.get("ttft_s") is not None] + tpot = [r["tpot_s"] * 1000 for r in ok if r.get("tpot_s")] + e2e = [r["latency_s"] * 1000 for r in ok if r.get("latency_s") is not None] + return { + "n_total": len(rows), + "n_ok": len(ok), + "ttft_mean_ms": statistics.mean(ttft) if ttft else None, + "ttft_p50_ms": pct(ttft, 50), + "ttft_p90_ms": pct(ttft, 90), + "ttft_p99_ms": pct(ttft, 99), + "tpot_mean_ms": statistics.mean(tpot) if tpot else None, + "tpot_p50_ms": pct(tpot, 50), + "tpot_p90_ms": pct(tpot, 90), + "tpot_p99_ms": pct(tpot, 99), + "e2e_mean_ms": statistics.mean(e2e) if e2e else None, + "e2e_p50_ms": pct(e2e, 50), + "e2e_p90_ms": pct(e2e, 90), + "e2e_p99_ms": pct(e2e, 99), + } + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--root", type=Path, required=True) + ap.add_argument("--old-data", type=Path, + default=Path("analysis/characterization/window_1_results/b3_policy_comparison.json")) + ap.add_argument("--figure", type=Path, default=None) + args = ap.parse_args() + + new = {} + for p in POLICIES: + path = args.root / p / "metrics.jsonl" + if not path.exists(): + print(f"MISSING: {path}") + continue + new[p] = summarise(path) + + old = {} + if args.old_data.exists(): + d = json.load(open(args.old_data)) + for r in d.get("rows", []): + old[r["policy"]] = { + "ttft_p50_ms": r["ttft_p50_s"] * 1000, + "ttft_p90_ms": r["ttft_p90_s"] * 1000, + "ttft_p99_ms": r["ttft_p99_s"] * 1000, + "tpot_p90_ms": r["tpot_p90_s"] * 1000, + "e2e_p90_ms": r.get("e2e_p90_s", 0) * 1000, + } + + def fmt(v): return f"{v:.0f}" if v is not None else "-" + def pctd(a, b): + if a is None or b is None or a == 0: return "-" + return f"{(b/a-1)*100:+.1f}%" + + # Headline table + print(f"\n# NEW: today's re-test") + print(f"{'policy':<14}{'n_ok':>6}{'TTFTp50':>10}{'TTFTp90':>10}{'TTFTp99':>10}{'TPOTp90':>10}{'E2Ep90':>10}") + print("-" * 70) + for p in POLICIES: + if p not in new: continue + r = new[p] + print(f"{p:<14}{r['n_ok']:>6}{fmt(r['ttft_p50_ms']):>9}ms{fmt(r['ttft_p90_ms']):>9}ms{fmt(r['ttft_p99_ms']):>9}ms{fmt(r['tpot_p90_ms']):>9}ms{fmt(r['e2e_p90_ms']):>9}ms") + + print(f"\n# OLD: window_1_results/b3_policy_comparison.json") + print(f"{'policy':<14}{'TTFTp50':>10}{'TTFTp90':>10}{'TTFTp99':>10}{'TPOTp90':>10}{'E2Ep90':>10}") + print("-" * 60) + for p in POLICIES: + if p not in old: continue + r = old[p] + print(f"{p:<14}{fmt(r['ttft_p50_ms']):>9}ms{fmt(r['ttft_p90_ms']):>9}ms{fmt(r['ttft_p99_ms']):>9}ms{fmt(r['tpot_p90_ms']):>9}ms{fmt(r['e2e_p90_ms']):>9}ms") + + print(f"\n# DRIFT: today vs old (same policy)") + print(f"{'policy':<14}{'ΔTTFTp50':>10}{'ΔTTFTp90':>10}{'ΔTTFTp99':>10}{'ΔTPOTp90':>10}{'ΔE2Ep90':>10}") + print("-" * 60) + for p in POLICIES: + if p not in new or p not in old: continue + n, o = new[p], old[p] + print(f"{p:<14}{pctd(o['ttft_p50_ms'], n['ttft_p50_ms']):>10}" + f"{pctd(o['ttft_p90_ms'], n['ttft_p90_ms']):>10}" + f"{pctd(o['ttft_p99_ms'], n['ttft_p99_ms']):>10}" + f"{pctd(o['tpot_p90_ms'], n['tpot_p90_ms']):>10}" + f"{pctd(o['e2e_p90_ms'], n['e2e_p90_ms']):>10}") + + # Relative ordering check + def ranks(values_dict, key): + items = [(p, r[key]) for p, r in values_dict.items() if r.get(key)] + items.sort(key=lambda x: x[1]) + return [p for p, _ in items] + + print(f"\n# TTFT p90 ranking (best → worst)") + for label, src in [("OLD", old), ("NEW", new)]: + if src: + order = ranks(src, "ttft_p90_ms") + print(f" {label}: {' < '.join(order)}") + + out = {"new": new, "old": old} + out_path = args.root / "b3_replay_summary.json" + out_path.write_text(json.dumps(out, indent=2)) + print(f"\nWrote {out_path}") + + # Bar plot (matplotlib) + if not args.figure: + args.figure = args.root / "fig_b3_latency_bars_new.png" + try: + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + + pols = [p for p in POLICIES if p in new] + metrics = [("TTFT p90 (s)", "ttft_p90_ms", 1000), + ("TPOT p90 (ms)", "tpot_p90_ms", 1), + ("E2E p90 (s)", "e2e_p90_ms", 1000)] + colors = {"lmetric": "tab:blue", "load_only": "tab:orange", + "sticky": "tab:green", "unified": "tab:red", + "unified_v2": "tab:purple"} + fig, axes = plt.subplots(1, 3, figsize=(14, 4.5)) + for ax, (label, key, div) in zip(axes, metrics): + vals = [new[p][key] / div for p in pols] + bars = ax.bar(pols, vals, + color=[colors.get(p, "gray") for p in pols], + edgecolor="black", linewidth=0.5) + ax.set_title(label) + ax.tick_params(axis="x", rotation=20) + for b, v in zip(bars, vals): + ax.text(b.get_x() + b.get_width() / 2, v, f"{v:.1f}", + ha="center", va="bottom", fontsize=9) + ax.grid(alpha=0.3, axis="y") + fig.suptitle(f"B3 5-policy re-test ({args.root.name})") + fig.tight_layout() + fig.savefig(args.figure, dpi=120) + print(f"Wrote {args.figure}") + except Exception as e: + print(f"(figure skipped: {e})") + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/cache_sweep/run_b3_replay.sh b/microbench/connector_tax/cache_sweep/run_b3_replay.sh new file mode 100755 index 0000000..1c9c8a3 --- /dev/null +++ b/microbench/connector_tax/cache_sweep/run_b3_replay.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash +# B3 routing-policy reproducibility re-test. +# +# Re-runs the 5 routing policies from fig_b3_latency_bars.png on the same +# trace, in a single same-day session, to check whether the ordering +# (unified < load_only < sticky etc.) still holds today. +# +# Policies (in run order): +# lmetric plain — cache-aware P_tokens × BS +# load_only plain — pure min-num_requests +# sticky plain — hard session affinity +# unified plain — hybrid affinity + LMetric fallback +# unified_v2 Mooncake kv_both + selective PD-sep (with DR-fix applied) +# +# unified_v2 is run with VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 so we +# get the "best Mooncake state" we have today (DR-fix on top of the +# already-fixed mainline after e3a1d70 etc.). The other 4 policies don't +# load any connector so the patch is irrelevant. + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}" +DATE="$(date +%Y%m%d_%H%M)" +OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_replay_${DATE}}" +PYTHON="$PROJ_DIR/.venv/bin/python" +DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py" +VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}" + +mkdir -p "$OUTROOT" +echo "=== B3 5-policy re-test ===" +echo "Trace : $TRACE" +echo "Out : $OUTROOT" +echo "Order : lmetric → load_only → sticky → unified → unified_v2 (DR-fix on)" +echo "" + +cleanup_all() { + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 5 + "$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true +} +trap cleanup_all EXIT +cleanup_all + +# Apply DR-fix once — it's env-gated so only unified_v2 (with env=1) sees it +echo "[stage 0] applying CT_DR_FIX (env-gated, only activates when VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1)" +"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT" + +run_policy() { + local policy="$1" + local skip_dr="$2" + local rundir="$OUTROOT/$policy" + mkdir -p "$rundir" + + echo "" + echo "====== $policy ; DR_SYNC_DISABLED=$skip_dr ======" + + if [ "$skip_dr" = "1" ]; then + export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 + else + unset VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC + fi + + bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$policy" "$TRACE" "$rundir" \ + 2>&1 | tee "$rundir/orchestrator.log" | tail -30 + rc="${PIPESTATUS[0]}" + if [ "$rc" != "0" ]; then + echo "[FAIL] policy $policy rc=$rc" + fi + # Belt-and-braces cleanup between policies + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 10 + return 0 +} + +run_policy "lmetric" "0" +run_policy "load_only" "0" +run_policy "sticky" "0" +run_policy "unified" "0" +run_policy "unified_v2" "1" # uses Mooncake kv_both; activate DR-fix + +echo "" +echo "[stage Z] reverting CT_DR_FIX" +"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" + +echo "" +echo "Done. Artifacts: $OUTROOT" +for p in lmetric load_only sticky unified unified_v2; do + echo " $p: $OUTROOT/$p/metrics.jsonl" +done diff --git a/microbench/connector_tax/cache_sweep/run_unified_ablation.sh b/microbench/connector_tax/cache_sweep/run_unified_ablation.sh new file mode 100644 index 0000000..d34b208 --- /dev/null +++ b/microbench/connector_tax/cache_sweep/run_unified_ablation.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Single-policy trace replay for unified, with tunable overload-factor. +# Used to test direction A: does tightening affinity overflow improve unified? +# +# Usage: +# OVERLOAD_FACTOR=1.3 bash run_unified_ablation.sh +# OVERLOAD_FACTOR=1.0 bash run_unified_ablation.sh +# +# Output: $PROJ_DIR/outputs/unified_of${OF}_${DATE}/unified/ + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}" +OF="${OVERLOAD_FACTOR:-1.3}" +LMW="${LMETRIC_DECODE_WEIGHT:-0.0}" +TAG_DEFAULT="of${OF/./}" +if [ "$(printf '%s' "$LMW" | grep -v '^0\.\?0*$' || true)" != "" ]; then + TAG_DEFAULT="${TAG_DEFAULT}_lmw${LMW/./}" +fi +TAG="${TAG:-$TAG_DEFAULT}" +DATE="$(date +%Y%m%d_%H%M)" +OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/unified_${TAG}_${DATE}}" + +mkdir -p "$OUTROOT" +echo "=== unified ablation: overload_factor=$OF ===" +echo "Trace : $TRACE" +echo "Out : $OUTROOT" +echo "" + +cleanup() { + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 3 +} +trap cleanup EXIT +cleanup + +cfg_dir="$OUTROOT/unified" +mkdir -p "$cfg_dir" + +export EXTRA_PROXY_ARGS="--overload-factor $OF --lmetric-decode-weight $LMW" + +echo "" +echo "====== unified ; overload_factor=$OF lmetric_decode_weight=$LMW ======" +bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified" "$TRACE" "$cfg_dir" \ + 2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30 + +pkill -9 -f cache_aware_proxy 2>/dev/null || true +pkill -9 -f "vllm serve" 2>/dev/null || true +pkill -9 -f "EngineCore" 2>/dev/null || true +sleep 5 + +echo "" +echo "Done. Artifacts: $OUTROOT/unified/metrics.jsonl" diff --git a/microbench/connector_tax/cache_sweep/run_v3_norot_replay.sh b/microbench/connector_tax/cache_sweep/run_v3_norot_replay.sh new file mode 100644 index 0000000..baa5657 --- /dev/null +++ b/microbench/connector_tax/cache_sweep/run_v3_norot_replay.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash +# Trace replay for unified_v3 WITHOUT affinity rotation. +# +# This is the #2 follow-up to cache_miss_audit: prior run showed v3 with +# rotation hits 9.5% cache on post-migration next turn vs 80.6% for unified. +# Hypothesis: rotation destroys prefix cache locality. Test by keeping the +# session affinity on prefill_host even after migration (i.e., the same +# behavior as unified for the post-migration write), so only the *current* +# turn's decode is migrated. +# +# Applies CT_DR_FIX (Mooncake DR sync disabled). +# Output: $PROJ_DIR/outputs/b3_v3_norot_${DATE}/unified_v3/ + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}" +DATE="$(date +%Y%m%d_%H%M)" +OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_norot_${DATE}}" +PYTHON="$PROJ_DIR/.venv/bin/python" +DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py" +VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}" + +mkdir -p "$OUTROOT" +echo "=== unified_v3 (no rotation) trace replay ===" +echo "Trace : $TRACE" +echo "Out : $OUTROOT" +echo "" + +cleanup_all() { + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 5 + "$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true +} +trap cleanup_all EXIT +cleanup_all + +echo "[stage 0] applying CT_DR_FIX (env-gated)" +"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT" + +cfg_dir="$OUTROOT/unified_v3" +mkdir -p "$cfg_dir" + +export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 +export EXTRA_PROXY_ARGS="--v3-rotate-affinity 0" + +echo "" +echo "====== unified_v3 (no rotation) ; DR_SYNC_DISABLED=1 ======" +bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \ + 2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30 + +pkill -9 -f cache_aware_proxy 2>/dev/null || true +pkill -9 -f "vllm serve" 2>/dev/null || true +pkill -9 -f "EngineCore" 2>/dev/null || true +sleep 5 + +echo "" +echo "[stage Z] reverting CT_DR_FIX" +"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" + +echo "" +echo "Done. Artifacts: $OUTROOT/unified_v3/metrics.jsonl" diff --git a/microbench/connector_tax/cache_sweep/run_v3_replay.sh b/microbench/connector_tax/cache_sweep/run_v3_replay.sh new file mode 100755 index 0000000..7f5a23f --- /dev/null +++ b/microbench/connector_tax/cache_sweep/run_v3_replay.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# Trace replay for the new unified_v3 (offload-decode) policy. +# +# Runs the same trace as run_b3_replay.sh on a single policy: +# unified_v3 — prefill on session-affinity host (uses prefix cache), +# decode migrated to a low-load target via Mooncake +# KV transfer (kv_role=kv_both). Session affinity rotates +# to decode_target after migration so next turn lands +# where the KV now lives. +# +# Applies CT_DR_FIX so the run uses the "best Mooncake state" we have +# today (post-e3a1d70 + DR sync skipped). +# +# Usage: bash run_v3_replay.sh + +set -uo pipefail + +PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}" +TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}" +DATE="$(date +%Y%m%d_%H%M)" +OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/b3_v3_${DATE}}" +PYTHON="$PROJ_DIR/.venv/bin/python" +DR_FIX_SCRIPT="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py" +VLLM_ROOT="${VLLM_ROOT:-$PROJ_DIR/.venv/lib/python3.12/site-packages/vllm}" + +mkdir -p "$OUTROOT" +echo "=== unified_v3 (offload-decode) trace replay ===" +echo "Trace : $TRACE" +echo "Out : $OUTROOT" +echo "" + +cleanup_all() { + pkill -9 -f cache_aware_proxy 2>/dev/null || true + pkill -9 -f "vllm serve" 2>/dev/null || true + pkill -9 -f "EngineCore" 2>/dev/null || true + sleep 5 + "$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true +} +trap cleanup_all EXIT +cleanup_all + +echo "[stage 0] applying CT_DR_FIX (env-gated)" +"$PYTHON" "$DR_FIX_SCRIPT" --apply --vllm-root "$VLLM_ROOT" + +cfg_dir="$OUTROOT/unified_v3" +mkdir -p "$cfg_dir" + +# Activate the DR-fix env-gate (unified_v3 uses Mooncake kv_both) +export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1 + +echo "" +echo "====== unified_v3 ; DR_SYNC_DISABLED=1 ======" +bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "unified_v3" "$TRACE" "$cfg_dir" \ + 2>&1 | tee "$cfg_dir/orchestrator.log" | tail -30 + +pkill -9 -f cache_aware_proxy 2>/dev/null || true +pkill -9 -f "vllm serve" 2>/dev/null || true +pkill -9 -f "EngineCore" 2>/dev/null || true +sleep 5 + +echo "" +echo "[stage Z] reverting CT_DR_FIX" +"$PYTHON" "$DR_FIX_SCRIPT" --revert --vllm-root "$VLLM_ROOT" + +echo "" +echo "Done. Artifacts: $OUTROOT/unified_v3/metrics.jsonl" diff --git a/scripts/cache_aware_proxy.py b/scripts/cache_aware_proxy.py index ac7d5ae..c4bee6d 100644 --- a/scripts/cache_aware_proxy.py +++ b/scripts/cache_aware_proxy.py @@ -19,7 +19,7 @@ import os import time as _time import urllib.parse import uuid -from collections import OrderedDict +from collections import OrderedDict, deque from contextlib import asynccontextmanager from dataclasses import dataclass @@ -103,6 +103,20 @@ class Settings: # auto-transfers only the missing portion (verified via # smoke_partial_transfer: cache-rich dst is 77% faster than # cold dst at 33k tokens, +512 ext). + # Anti-hotspot: picker scores effective_load = num_requests + (recent + # migrations received within window). Prevents clustering migrations on + # one instance in rapid succession (observed in Mech B run: inst_5 became + # a hotspot via post-rotation tail accumulation). + v3_recent_mig_window_s: float = 10.0 # sliding window + v3_recent_mig_weight: float = 1.0 # how many "virtual requests" each + # recent migration counts as + + # Direction B knob: LMetric fallback adds decode-token penalty to score. + # score = (pending_prefill + new + lmetric_decode_weight * ongoing_decode_tok) * num_req + # Empirical iter-time slope on H100 + Qwen3-30B-A3B: each decode token in + # batch costs ~0.01 prefill-token-equivalent in scheduler time, so 0.01 is + # a reasonable starting weight. Set 0 to disable (original behavior). + lmetric_decode_weight: float = 0.0 # --- KV connector selection (governs PD-sep handshake) ------------- # "mooncake": pre-baked kv_transfer_params (bootstrap_addr+engine_id+transfer_id). @@ -187,6 +201,11 @@ class InstanceState: self.dp_size = 1 # OrderedDict acts as an LRU keyed by block hash; value is unused. self.cached_blocks: OrderedDict[int, None] = OrderedDict() + # v3 anti-hotspot: timestamps (monotonic) when this instance was picked + # as a v3 migration target. Used to compute effective_load = num_req + + # recent-migration count over a sliding window, preventing back-to-back + # decisions from clustering on the same dst. + self.recent_mig_targeted_at: deque[float] = deque(maxlen=64) def estimate_cache_hit(self, token_ids: list[int] | None) -> int: if not token_ids or len(token_ids) < BLOCK_SIZE: @@ -417,13 +436,24 @@ def pick_instance_unified_hybrid( decision["chosen_idx"] = a_idx return a_inst, a_idx, decision - keys: list[tuple[int, int, int, int]] = [] + # Direction B: extend LMetric with decode-load awareness. + # Original score = (pending_prefill + new_uncached) * num_requests, which + # ignores ongoing decode work. A host with 200k decode tokens looks "ideal" + # (P_tokens=0) but its decode iters are slow due to large batch KV reads. + # + # First attempt (BUG): score = (p_tokens + decode_pen) * num_req — when + # num_req=0 the decode_pen is zeroed out, so idle-but-decoding hosts still + # look free and accumulate cold prefills (8007 hotspot in A+B v1 run). + # + # Fix: max(num_req, 1) so decode_pen contributes on idle hosts too. + keys: list[tuple[float, int, int, int]] = [] for i, inst in enumerate(instances): cache_hit = inst.estimate_cache_hit(token_ids) new_prefill = max(0, input_length - cache_hit) p_tokens = inst.pending_prefill_tokens + new_prefill + decode_pen = SETTINGS.lmetric_decode_weight * inst.ongoing_decode_tokens bs = inst.num_requests - score = p_tokens * bs + score = (p_tokens + decode_pen) * max(bs, 1) keys.append((score, new_prefill, bs, i)) best_triple = min(k[:3] for k in keys) @@ -637,48 +667,80 @@ def pick_instance_unified_v3( ) return prefill_host, prefill_idx, decision, None - # Gate 3: pick the lowest-load target that is materially less loaded - # than the prefill_host. Cache content irrelevant — KV ships over. + # Gate 3: pick the lowest-effective-load target. effective_load adds a + # penalty for recent migrations the instance has received (anti-hotspot). + now_mono = _time.monotonic() + cutoff = now_mono - SETTINGS.v3_recent_mig_window_s + + def effective_load(inst): + # Drop expired entries lazily. + while inst.recent_mig_targeted_at and inst.recent_mig_targeted_at[0] < cutoff: + inst.recent_mig_targeted_at.popleft() + recent = len(inst.recent_mig_targeted_at) + return inst.num_requests + recent * SETTINGS.v3_recent_mig_weight + threshold_loaded = max(1, int(prefill_host.num_requests * SETTINGS.v3_target_load_ratio)) candidates = [ (i, inst) for i, inst in enumerate(instances) if i != prefill_idx - and inst.num_requests < threshold_loaded - and inst.num_requests <= prefill_host.num_requests - SETTINGS.v3_min_load_gap + and effective_load(inst) < threshold_loaded + and effective_load(inst) <= prefill_host.num_requests - SETTINGS.v3_min_load_gap ] if not candidates: decision["v3_reason"] = ( f"no_low_load_target " f"(prefill_host.num_req={prefill_host.num_requests} " - f"threshold={threshold_loaded})" + f"threshold={threshold_loaded} " + f"eff_loads=[{','.join(f'{int(effective_load(i))}' for i in instances)}])" ) return prefill_host, prefill_idx, decision, None # Mechanism B (v3_prefer_cache_target=True): rank candidates first by - # cache_hit DESC (more cache = less KV to transfer), then by load. vLLM - # auto-skips transferring overlapping prefix when dst's local cache - # matches — verified in smoke_partial_transfer: 77% faster on a 33k - # prompt when dst has the prefix already. + # cache_hit DESC (more cache = less KV to transfer), then by effective_load + # (which includes recent-migration penalty), then by ongoing_tokens. if SETTINGS.v3_prefer_cache_target: decode_target_idx, decode_target = min( candidates, key=lambda x: (-x[1].estimate_cache_hit(token_ids), - x[1].num_requests, x[1].ongoing_tokens)) + effective_load(x[1]), + x[1].ongoing_tokens)) else: decode_target_idx, decode_target = min( - candidates, key=lambda x: (x[1].num_requests, x[1].ongoing_tokens)) + candidates, key=lambda x: (effective_load(x[1]), x[1].ongoing_tokens)) target_cache_hit = decode_target.estimate_cache_hit(token_ids) + target_recent_received = len(decode_target.recent_mig_targeted_at) + # Record this decision for the anti-hotspot accounting. + decode_target.recent_mig_targeted_at.append(now_mono) + decision["v3_migrate"] = True decision["v3_decision"] = "migrate_decode" + decision["v3_src_idx"] = prefill_idx decision["v3_target_idx"] = decode_target_idx decision["v3_target_num_req"] = decode_target.num_requests decision["v3_target_cache_hit"] = target_cache_hit + decision["v3_target_recent_received"] = target_recent_received decision["v3_prefill_num_req"] = prefill_host.num_requests + # Snapshot of src state at the moment of decision (for postmortem). + decision["v3_src_state"] = { + "num_requests": prefill_host.num_requests, + "ongoing_tokens": prefill_host.ongoing_tokens, + "ongoing_decode_tokens": prefill_host.ongoing_decode_tokens, + "pending_prefill_tokens": prefill_host.pending_prefill_tokens, + } + decision["v3_target_state"] = { + "num_requests": decode_target.num_requests, + "ongoing_tokens": decode_target.ongoing_tokens, + "ongoing_decode_tokens": decode_target.ongoing_decode_tokens, + "pending_prefill_tokens": decode_target.pending_prefill_tokens, + "cache_hit_estimate": target_cache_hit, + "recent_mig_received_in_window": target_recent_received, + } decision["v3_reason"] = ( f"prefill_host.num_req={prefill_host.num_requests} busy; " - f"target.num_req={decode_target.num_requests} cache_hit={target_cache_hit}, " + f"target.num_req={decode_target.num_requests} cache_hit={target_cache_hit} " + f"recent_received={target_recent_received}, " f"transferring KV after prefill" ) return prefill_host, prefill_idx, decision, (decode_target, decode_target_idx) @@ -987,12 +1049,16 @@ async def _handle(request: Request, api: str): async def _handle_local_request(api, req_data, headers, token_ids, input_length, chosen: InstanceState, estimated_new: int, - breakdown: dict): + breakdown: dict, *, _pre_reserved: bool = False): breakdown.setdefault("route_class", "LOCAL") breakdown.setdefault("routed_to", chosen.url) - chosen.ongoing_tokens += input_length - chosen.pending_prefill_tokens += estimated_new - chosen.num_requests += 1 + # Skip reservation when called from _handle_combined (it already reserved + # synchronously to close the picker→await race). When called directly + # from non-combined paths (PD-Sep, offload), reserve here for safety. + if not _pre_reserved: + chosen.ongoing_tokens += input_length + chosen.pending_prefill_tokens += estimated_new + chosen.num_requests += 1 async def generate(): prefill_done = False @@ -1180,9 +1246,19 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h src_inst, chosen, breakdown, request_id=request_id) + # Race fix: reserve load on `chosen` BEFORE the `await` so concurrent + # picker calls in the same asyncio event-loop tick see the updated + # counters. Without this, two requests arriving back-to-back can both + # pick the same "free" instance and both end up running there + # simultaneously (observed as 8007 hotspot in A+B run). + chosen.ongoing_tokens += input_length + chosen.pending_prefill_tokens += estimated_new + chosen.num_requests += 1 + breakdown.setdefault("route_class", "LOCAL") + breakdown.setdefault("routed_to", chosen.url) return await _handle_local_request( api, req_data, headers, token_ids, input_length, - chosen, estimated_new, breakdown) + chosen, estimated_new, breakdown, _pre_reserved=True) async def _handle_combined_pd_sep_v2( @@ -1545,6 +1621,10 @@ def parse_args(): help="Mechanism B: unified_v3 picks decode_target with the most" " prefix cache among low-load candidates (default 1). Set 0" " to fall back to pure-load tie-break (cache-blind).") + p.add_argument("--lmetric-decode-weight", type=float, default=0.0, + help="Direction B: LMetric fallback adds this × ongoing_decode_tokens" + " to the queue-depth score, so hosts with heavy decode load get" + " penalised. 0 = original behavior; 0.01 is a reasonable start.") p.add_argument("--overload-factor", type=float, default=2.0, help="Break session affinity when instance load > factor * avg") # The four flags below are accepted for bench.sh backward compatibility but @@ -1585,11 +1665,13 @@ if __name__ == "__main__": SETTINGS.v3_rotate_affinity = bool(getattr(global_args, 'v3_rotate_affinity', 1)) SETTINGS.connector_type = getattr(global_args, 'connector_type', 'mooncake') SETTINGS.v3_prefer_cache_target = bool(getattr(global_args, 'v3_prefer_cache_target', 1)) + SETTINGS.lmetric_decode_weight = float(getattr(global_args, 'lmetric_decode_weight', 0.0)) print("SETTINGS: throughput=%.0f rdma_overhead=%.2f offload=%s v3_rotate_affinity=%s " - "connector_type=%s v3_prefer_cache_target=%s" % ( + "connector_type=%s v3_prefer_cache_target=%s lmetric_decode_weight=%.3f" % ( SETTINGS.prefill_throughput, SETTINGS.rdma_overhead_s, getattr(global_args, 'offload', False), SETTINGS.v3_rotate_affinity, SETTINGS.connector_type, - SETTINGS.v3_prefer_cache_target)) + SETTINGS.v3_prefer_cache_target, + SETTINGS.lmetric_decode_weight)) uvicorn.run(app, host=global_args.host, port=global_args.port)