proxy: remove dead state and broken fire-and-forget path (B1, D1)

B1: _inst_cumulative_tokens was written by pick_instance but never read
anywhere; delete the variable, global declaration, and per-call increment.
Load is already tracked via inst.ongoing_tokens.

D1: _send_prefill_async + the --fire-and-forget branch were unreachable
in practice (no launch/bench script enabled the flag) and broken even if
exercised: D-decode would fire before P registered the transfer_id,
guaranteeing a Mooncake 502. Collapse _handle_pd_sep to its synchronous
path and drop the CLI flag.
This commit is contained in:
2026-05-23 20:56:11 +08:00
parent fc445df0ad
commit 556f3011c6

View File

@@ -72,10 +72,6 @@ class InstanceState:
self.cached_blocks = set(list(self.cached_blocks)[-100000:])
# Cumulative token load per instance (for balanced session placement)
_inst_cumulative_tokens: list[int] = []
def _p_offload_penalty(inst: InstanceState) -> int:
"""Penalty for instances currently doing P-role offloaded prefills.
@@ -99,10 +95,6 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None,
Instances doing P-role offloads get a large penalty to steer
WARM/MEDIUM traffic away.
"""
global _inst_cumulative_tokens
if not _inst_cumulative_tokens:
_inst_cumulative_tokens = [0] * len(instances)
avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0)
if session_id and session_id in affinity:
@@ -122,7 +114,6 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None,
best_score = score
best_idx = i
_inst_cumulative_tokens[best_idx] += input_length
if session_id:
affinity[session_id] = best_idx
return instances[best_idx], best_idx
@@ -524,22 +515,6 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length,
return StreamingResponse(generate(), media_type="text/event-stream")
async def _send_prefill_async(p_inst, api, prefill_data, p_headers, token_ids,
input_length, breakdown):
"""Fire-and-forget prefill: send and don't block caller."""
try:
resp = await p_inst.client.post(api, json=prefill_data, headers=p_headers)
breakdown["t_prefill_done"] = _time.monotonic()
resp.raise_for_status()
await resp.aclose()
p_inst.record_prefix(token_ids)
except Exception:
breakdown["t_prefill_done"] = _time.monotonic()
breakdown["prefill_error"] = True
finally:
p_inst.ongoing_tokens -= input_length
async def _handle_pd_sep(api, req_data, request_id, token_ids, input_length,
session_id, headers):
"""PD-Sep mode with per-stage breakdown profiling."""
@@ -569,23 +544,19 @@ async def _handle_pd_sep(api, req_data, request_id, token_ids, input_length,
p_inst.ongoing_tokens += input_length
breakdown["t_prefill_sent"] = _time.monotonic()
if global_args.fire_and_forget:
asyncio.create_task(_send_prefill_async(
p_inst, api, prefill_data, p_headers, token_ids, input_length, breakdown))
else:
try:
resp = await p_inst.client.post(api, json=prefill_data, headers=p_headers)
breakdown["t_prefill_done"] = _time.monotonic()
resp.raise_for_status()
await resp.aclose()
p_inst.record_prefix(token_ids)
except Exception as e:
breakdown["t_prefill_done"] = _time.monotonic()
breakdown["prefill_error"] = True
_breakdown_log.append(breakdown)
raise HTTPException(status_code=502, detail=f"Prefill failed: {e}")
finally:
p_inst.ongoing_tokens -= input_length
try:
resp = await p_inst.client.post(api, json=prefill_data, headers=p_headers)
breakdown["t_prefill_done"] = _time.monotonic()
resp.raise_for_status()
await resp.aclose()
p_inst.record_prefix(token_ids)
except Exception as e:
breakdown["t_prefill_done"] = _time.monotonic()
breakdown["prefill_error"] = True
_breakdown_log.append(breakdown)
raise HTTPException(status_code=502, detail=f"Prefill failed: {e}")
finally:
p_inst.ongoing_tokens -= input_length
# Send decode
d_inst.ongoing_tokens += input_length
@@ -651,8 +622,6 @@ def parse_args():
help="PD-Sep prefill: URL [bootstrap_port]")
p.add_argument("--decode", nargs=1, action="append", dest="decode_raw",
help="PD-Sep decode: URL")
p.add_argument("--fire-and-forget", action="store_true",
help="Send prefill async, don't await before decode")
p.add_argument("--heavy-threshold", type=int, default=20000,
help="New tokens threshold for HEAVY classification (adaptive offload)")
p.add_argument("--offload", action="store_true",