Fix proxy shadow drift: actively reconcile against vLLM /metrics
The proxy maintains shadow counters (num_requests, ongoing_tokens,
pending_prefill_tokens, ongoing_decode_tokens) used by every routing
picker. They are incremented in _handle_local_request and decremented
in the generator's finally block. When the StreamingResponse generator
never enters (client disconnect between proxy returning the response
and Starlette starting iteration, or Starlette failing before
iteration), the decrement never fires and the counter stays elevated
forever. Over a multi-hour run the shadow accumulates "phantom" load
on the affected instances and biases the router away from them.
Concrete observation that prompted the fix: during the unified_kv_both
B3 run, engine_0 sat at proxy num_requests=1 / ongoing_decode_tokens=80406
while vLLM's own /metrics reported num_running=0 num_waiting=0 and the
GPU sat at 0% utilization. Every routing decision after that point
believed engine_0 was busy with an 80k-token decode that did not exist.
Fix: extend _reconcile_loop to actively poll each instance's
/metrics every 30 s. If the proxy's num_requests has been higher than
vLLM's (running + waiting) for two consecutive cycles (~60 s of stable
drift), reduce the shadow to vLLM's truth. When vLLM is fully idle
(running=0, waiting=0), zero ongoing_tokens, ongoing_decode_tokens,
and pending_prefill_tokens as well.
Two-cycle persistence avoids correcting transient mismatches where
the proxy has just incremented for a new request that vLLM has not
scheduled yet. A single ~30 s blip is not large enough to corrupt
routing decisions; only persistent drift gets corrected.
The previous _reconcile_loop only clamped negatives. Phantom positives
are now caught and logged ("[reconcile] {url}: phantom drift ...").
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -624,21 +624,65 @@ async def init_prefill_bootstrap(instances: list[InstanceState], ready: asyncio.
|
||||
ready.set()
|
||||
|
||||
|
||||
async def _reconcile_loop():
|
||||
"""Periodic safety net for shadow state.
|
||||
async def _fetch_vllm_inflight(inst: "InstanceState") -> tuple[int, int] | None:
|
||||
"""Read vLLM's truth: (num_running, num_waiting). Returns None on failure."""
|
||||
try:
|
||||
resp = await asyncio.wait_for(inst.client.get("/metrics"), timeout=5.0)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
text = resp.text
|
||||
except Exception:
|
||||
return None
|
||||
running = 0
|
||||
waiting = 0
|
||||
for line in text.splitlines():
|
||||
if line.startswith("vllm:num_requests_running"):
|
||||
try:
|
||||
running = int(float(line.split()[-1]))
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
elif line.startswith("vllm:num_requests_waiting"):
|
||||
try:
|
||||
waiting = int(float(line.split()[-1]))
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
return running, waiting
|
||||
|
||||
StreamingResponse generators decrement load counters in their finally
|
||||
block, but if a client disconnects before the body is consumed the
|
||||
generator is never entered and the decrement is lost. Clamp negative
|
||||
drift every minute so router scores stay sane. This does not replace
|
||||
proper exact-state syncing with vLLM (see TODO.md item 6).
|
||||
|
||||
async def _reconcile_loop():
|
||||
"""Periodic shadow-state reconciliation against vLLM /metrics truth.
|
||||
|
||||
The proxy maintains shadow counters (num_requests, ongoing_tokens,
|
||||
pending_prefill_tokens, ongoing_decode_tokens) by incrementing in
|
||||
`_handle_local_request` and decrementing in the generator's finally
|
||||
block. When the generator never enters (client disconnect between
|
||||
StreamingResponse construction and Starlette starting iteration, or
|
||||
Starlette failing before iteration), the decrement never fires and
|
||||
the counter stays elevated forever. Over a long run the shadow
|
||||
accumulates "phantom" load that biases routing decisions away from
|
||||
the affected instance.
|
||||
|
||||
Two-pass fix:
|
||||
|
||||
1. Clamp negatives (defensive; rare in practice).
|
||||
2. Sample vLLM's actual num_running + num_waiting via /metrics. If
|
||||
the proxy's num_requests has been *higher* than vLLM's truth for
|
||||
two consecutive cycles, reconcile downward to vLLM's count.
|
||||
Two-cycle persistence avoids correcting transient mismatches
|
||||
(e.g., proxy just incremented but vLLM hasn't scheduled the
|
||||
request yet).
|
||||
|
||||
Cycle period: 30 s. Two-cycle persistence threshold: 60 s of stable
|
||||
drift before correction.
|
||||
"""
|
||||
prev_phantom: dict[str, int] = {}
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(60)
|
||||
await asyncio.sleep(30)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
for inst in combined_instances + prefill_instances + decode_instances:
|
||||
# Pass 1: clamp negatives (cheap, always do).
|
||||
if inst.ongoing_tokens < 0:
|
||||
inst.ongoing_tokens = 0
|
||||
if inst.ongoing_decode_tokens < 0:
|
||||
@@ -650,6 +694,31 @@ async def _reconcile_loop():
|
||||
if inst.active_p_offloads < 0:
|
||||
inst.active_p_offloads = 0
|
||||
|
||||
# Pass 2: detect phantom positives by polling vLLM truth.
|
||||
metrics = await _fetch_vllm_inflight(inst)
|
||||
if metrics is None:
|
||||
continue
|
||||
running, waiting = metrics
|
||||
actual_inflight = running + waiting
|
||||
phantom = inst.num_requests - actual_inflight
|
||||
prev = prev_phantom.get(inst.url, 0)
|
||||
if phantom > 0 and prev > 0:
|
||||
# Drift held across two consecutive cycles (~60 s).
|
||||
# Reconcile shadow to vLLM's truth.
|
||||
old_num = inst.num_requests
|
||||
inst.num_requests = actual_inflight
|
||||
if actual_inflight == 0:
|
||||
# No requests in flight; zero all per-request counters.
|
||||
inst.ongoing_tokens = 0
|
||||
inst.ongoing_decode_tokens = 0
|
||||
inst.pending_prefill_tokens = 0
|
||||
print(
|
||||
f"[reconcile] {inst.url}: phantom drift "
|
||||
f"num_requests {old_num} -> {actual_inflight} "
|
||||
f"(vllm running={running} waiting={waiting})"
|
||||
)
|
||||
prev_phantom[inst.url] = phantom
|
||||
|
||||
|
||||
def _verify_vllm_patch():
|
||||
"""Startup self-check for patches/0001-fix-kv-transfer-abort-race.patch.
|
||||
|
||||
Reference in New Issue
Block a user