RDMA overhead 2.0→0.1s (direct read is raw memory, not scheduler flow) +
retry on ConnectError to handle kv_both connection instability With RDMA_overhead=0.1s, offload triggers when C_s has just 700 tokens pending (0.1s queue), vs 38k tokens (5.4s) with the old 2.0s estimate. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -23,6 +23,9 @@ from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
|
||||
import httpx
|
||||
|
||||
MAX_STREAM_RETRIES = 3
|
||||
RETRY_DELAY_S = 0.5
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
@@ -44,7 +47,7 @@ class Settings:
|
||||
max_offload_inflight: int = 4 # global cap on concurrent P-role offloads
|
||||
cache_gate_ratio: float = 0.3 # min cache_hit/input ratio to allow offload
|
||||
prefill_throughput: float = 7000.0 # tokens/s per GPU (H20 measurement)
|
||||
rdma_overhead_s: float = 2.0 # RDMA transfer + decode-start overhead
|
||||
rdma_overhead_s: float = 0.1 # direct RDMA read overhead (raw memory read ~10-50ms)
|
||||
cache_capacity_blocks: int = 200000 # per-instance LRU cap on shadow cached_blocks
|
||||
|
||||
|
||||
@@ -443,17 +446,28 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
|
||||
|
||||
async def generate():
|
||||
prefill_done = False
|
||||
try:
|
||||
async with inst.client.stream("POST", api, json=req_data, headers=headers) as resp:
|
||||
resp.raise_for_status()
|
||||
async for chunk in resp.aiter_bytes():
|
||||
if not prefill_done:
|
||||
inst.pending_prefill_tokens -= estimated_new
|
||||
inst.ongoing_decode_tokens += input_length
|
||||
breakdown["t_first_token"] = _time.monotonic()
|
||||
prefill_done = True
|
||||
yield chunk
|
||||
inst.record_prefix(token_ids)
|
||||
last_err = None
|
||||
for attempt in range(MAX_STREAM_RETRIES):
|
||||
try:
|
||||
async with inst.client.stream("POST", api, json=req_data, headers=headers) as resp:
|
||||
resp.raise_for_status()
|
||||
async for chunk in resp.aiter_bytes():
|
||||
if not prefill_done:
|
||||
inst.pending_prefill_tokens -= estimated_new
|
||||
inst.ongoing_decode_tokens += input_length
|
||||
breakdown["t_first_token"] = _time.monotonic()
|
||||
prefill_done = True
|
||||
yield chunk
|
||||
inst.record_prefix(token_ids)
|
||||
return
|
||||
except (httpx.ConnectError, httpx.RemoteProtocolError) as e:
|
||||
last_err = e
|
||||
if prefill_done:
|
||||
raise
|
||||
if attempt < MAX_STREAM_RETRIES - 1:
|
||||
await asyncio.sleep(RETRY_DELAY_S)
|
||||
if last_err:
|
||||
raise last_err
|
||||
finally:
|
||||
if not prefill_done:
|
||||
inst.pending_prefill_tokens -= estimated_new
|
||||
|
||||
Reference in New Issue
Block a user