Fix 4 elastic PS bugs: D accounting, offload cap, cache migration, prefix sync
Bug 1+5: D instance had no accounting during prefill phase (7-11s window). Router saw D as idle, routing extra traffic that caused KV allocation failures. Fix: reserve D's ongoing_tokens+num_requests at offload decision time. Bug 7: No cap on concurrent offloads despite REPORT claiming MAX_OFFLOAD=4. Fix: add MAX_OFFLOAD_INFLIGHT=4 check before offloading. Bug 6: Session affinity migrated to D but proxy cache estimator wasn't updated for D. Future turns scored D as cache-cold. Fix: call d_inst.record_prefix(token_ids) after successful decode. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -29,6 +29,7 @@ BLOCK_SIZE = 512
|
||||
CACHE_HIT_ALPHA = 1.0
|
||||
HEAVY_THRESHOLD = 20000 # default; overridden by --heavy-threshold
|
||||
OVERLOAD_FACTOR = 2.0 # default; overridden by --overload-factor
|
||||
MAX_OFFLOAD_INFLIGHT = 4 # cap concurrent P-role offloads
|
||||
|
||||
|
||||
class InstanceState:
|
||||
@@ -283,28 +284,33 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
|
||||
|
||||
if estimated_new >= HEAVY_THRESHOLD and offload_enabled:
|
||||
cache_ratio = cache_hit / max(input_length, 1)
|
||||
current_offloads = sum(c.active_p_offloads for c in combined_instances)
|
||||
d_candidate = min((c for c in combined_instances if c is not best_inst),
|
||||
key=lambda c: c.ongoing_tokens)
|
||||
breakdown["cache_ratio"] = cache_ratio
|
||||
|
||||
if cache_ratio >= 0.3: # at least 30% cache hit to justify RDMA offload
|
||||
if current_offloads >= MAX_OFFLOAD_INFLIGHT:
|
||||
offload_reason = "cap_reached_%d" % current_offloads
|
||||
elif cache_ratio >= 0.3:
|
||||
use_offload = True
|
||||
offload_reason = "cached_offload_%.0f%%" % (cache_ratio * 100)
|
||||
else:
|
||||
offload_reason = "cold_colocated_%.0f%%" % (cache_ratio * 100)
|
||||
|
||||
if use_offload:
|
||||
# C_s does fast cached prefill, D does decode
|
||||
p_inst = best_inst # session-sticky, has prefix cache
|
||||
p_inst = best_inst
|
||||
d_inst = d_candidate
|
||||
d_idx = combined_instances.index(d_inst)
|
||||
|
||||
# Accounting: C_s only prefills estimated_new tokens (cached prefix is free)
|
||||
# Accounting: reserve both P and D immediately so router sees the load
|
||||
p_inst.ongoing_tokens += input_length
|
||||
p_inst.pending_prefill_tokens += estimated_new
|
||||
p_inst.num_requests += 1
|
||||
p_inst.active_p_offloads += 1
|
||||
|
||||
d_inst.ongoing_tokens += input_length
|
||||
d_inst.num_requests += 1
|
||||
|
||||
breakdown["route_class"] = "HEAVY_OFFLOAD"
|
||||
breakdown["offload_reason"] = offload_reason
|
||||
breakdown["p_inst"] = p_inst.url
|
||||
@@ -312,7 +318,6 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
|
||||
breakdown["p_load"] = p_inst.ongoing_tokens
|
||||
breakdown["d_load"] = d_inst.ongoing_tokens
|
||||
|
||||
# Update session affinity to D (D will have KV after this request)
|
||||
if session_id:
|
||||
session_affinity[session_id] = d_idx
|
||||
|
||||
@@ -412,10 +417,9 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length,
|
||||
|
||||
if not prefill_ok:
|
||||
# Fallback: co-located prefill+decode on d_inst (no KV transfer)
|
||||
# D already has ongoing_tokens and num_requests reserved by caller
|
||||
breakdown["route_class"] = "HEAVY_COLO_FALLBACK"
|
||||
d_inst.ongoing_tokens += input_length
|
||||
d_inst.pending_prefill_tokens += estimated_new
|
||||
d_inst.num_requests += 1
|
||||
|
||||
async def generate_fallback():
|
||||
prefill_done = False
|
||||
@@ -443,9 +447,8 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length,
|
||||
return StreamingResponse(generate_fallback(), media_type="text/event-stream")
|
||||
|
||||
# Step 2: Stream decode on d_inst (pulls KV from Mooncake)
|
||||
d_inst.ongoing_tokens += input_length
|
||||
# D already has ongoing_tokens and num_requests reserved by caller
|
||||
d_inst.ongoing_decode_tokens += input_length
|
||||
d_inst.num_requests += 1
|
||||
breakdown["t_decode_sent"] = _time.monotonic()
|
||||
|
||||
parsed = urllib.parse.urlparse(str(p_inst.client.base_url))
|
||||
@@ -470,6 +473,7 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length,
|
||||
breakdown["t_first_token"] = _time.monotonic()
|
||||
first_token = False
|
||||
yield chunk
|
||||
d_inst.record_prefix(token_ids)
|
||||
finally:
|
||||
d_inst.ongoing_tokens -= input_length
|
||||
d_inst.ongoing_decode_tokens -= input_length
|
||||
|
||||
Reference in New Issue
Block a user