MB5 driver updates: PD-proxy + snapshot instrument + launcher tweaks

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 11:53:27 +08:00
parent bad512d3c5
commit ee5db0b321
4 changed files with 153 additions and 17 deletions

View File

@@ -194,24 +194,35 @@ MOONCAKE_PATCHES = [
]
# ---------- Patch 4: vLLM 0.18.1 PD-consumer metrics counter underflow ------
# In PromptTokenStats.update_from_output, local_cache_hit is computed as
# (num_cached_tokens + recomputed - num_external_computed_tokens). On a
# kv_consumer, a remote KV transfer can report more external-computed tokens
# than the scheduler's cached count (esp. on a KV-load failure for a large
# request), driving local_cache_hit negative. loggers.record() then calls
# Counter.inc() with that negative value and prometheus_client raises
# On a kv_consumer, a KV-load failure (Mooncake transfer returns -1 when the
# D-pool is full) makes vLLM emit an iteration-stats "correction" with NEGATIVE
# token deltas: PromptTokenStats fields (local_cache_hit, cached_tokens, ...)
# AND iteration_stats.{num_prompt_tokens, num_generation_tokens} all go below
# zero. Every Counter.inc() in loggers.record() then trips prometheus_client's
# "Counters can only be incremented by non-negative amounts.", which kills the
# EngineCore — turning one failed request into a total config collapse.
# We clamp the per-source counts to >= 0 right before they are recorded.
LOGGERS_ANCHOR = " pts = iteration_stats.prompt_token_stats\n"
#
# Clamp every field that feeds a Counter.inc() in record() to >= 0. We anchor
# right after the `if iteration_stats is None: return` guard so the clamp runs
# before the first inc() (the corrupted-requests / preempted / prompt-token
# counters at the top of the method).
LOGGERS_ANCHOR = " if iteration_stats is None:\n return\n"
LOGGERS_INSERT = (
f" {START_MARK}\n"
f" if pts.local_cache_hit < 0:\n"
f" pts.local_cache_hit = 0\n"
f" if pts.computed < 0:\n"
f" pts.computed = 0\n"
f" if pts.external_kv_transfer < 0:\n"
f" pts.external_kv_transfer = 0\n"
f" _mb5_pts = iteration_stats.prompt_token_stats\n"
f" for _mb5_o, _mb5_a in (\n"
f" (iteration_stats, 'num_prompt_tokens'),\n"
f" (iteration_stats, 'num_generation_tokens'),\n"
f" (iteration_stats, 'num_preempted_reqs'),\n"
f" (iteration_stats, 'num_corrupted_reqs'),\n"
f" (_mb5_pts, 'computed'),\n"
f" (_mb5_pts, 'local_cache_hit'),\n"
f" (_mb5_pts, 'external_kv_transfer'),\n"
f" (_mb5_pts, 'cached_tokens'),\n"
f" (_mb5_pts, 'recomputed_tokens'),\n"
f" ):\n"
f" if getattr(_mb5_o, _mb5_a, 0) < 0:\n"
f" setattr(_mb5_o, _mb5_a, 0)\n"
f" {END_MARK}\n"
)

View File

@@ -24,7 +24,9 @@
set -eo pipefail
FRESH_ROOT="/home/admin/cpfs/wjh/agentic-kv-fresh"
VENV="${FRESH_ROOT}/.venv"
# MB5_VENV lets a second host use an isolated venv clone (e.g. .venv_dash0) so
# two boxes can run in parallel without racing on the shared cpfs venv patch.
VENV="${MB5_VENV:-${FRESH_ROOT}/.venv}"
MODEL="${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
INSTRUMENT="${SCRIPT_DIR}/instrument_kv_snapshot.py"
@@ -86,14 +88,16 @@ case "${1:-start}" in
esac
# --- parse CONFIG into (prefill_gpus, decode_gpus) ----------------
USE_COLO_PROXY=0
case "${CONFIG}" in
8C) ROLES="combined"; P_GPUS=""; D_GPUS=""; COMBINED_GPUS="0,1,2,3,4,5,6,7" ;;
8C-proxy) ROLES="combined"; USE_COLO_PROXY=1; P_GPUS=""; D_GPUS=""; COMBINED_GPUS="0,1,2,3,4,5,6,7" ;;
6P+2D) ROLES="pd"; P_GPUS="0,1,2,3,4,5"; D_GPUS="6,7" ;;
5P+3D) ROLES="pd"; P_GPUS="0,1,2,3,4"; D_GPUS="5,6,7" ;;
4P+4D) ROLES="pd"; P_GPUS="0,1,2,3"; D_GPUS="4,5,6,7" ;;
3P+5D) ROLES="pd"; P_GPUS="0,1,2"; D_GPUS="3,4,5,6,7" ;;
2P+6D) ROLES="pd"; P_GPUS="0,1"; D_GPUS="2,3,4,5,6,7" ;;
*) echo "Unknown CONFIG=${CONFIG} (expected: 8C, 6P+2D, 5P+3D, 4P+4D, 3P+5D, 2P+6D)"; exit 1;;
*) echo "Unknown CONFIG=${CONFIG} (expected: 8C, 8C-proxy, 6P+2D, 5P+3D, 4P+4D, 3P+5D, 2P+6D)"; exit 1;;
esac
stop_all
@@ -137,6 +141,7 @@ launch_vllm() {
idx=0
proxy_args=()
colo_args=()
ENDPOINTS=""
case "${ROLES}" in
@@ -147,6 +152,7 @@ case "${ROLES}" in
bp=$((BASE_BP+idx))
launch_vllm "${idx}" "${gpu}" "${port}" "kv_both" "${bp}"
ENDPOINTS+="${ENDPOINTS:+,}http://127.0.0.1:${port}"
colo_args+=( --colo "http://127.0.0.1:${port}" )
idx=$((idx+1))
sleep 1
done
@@ -215,6 +221,25 @@ if [ "${ROLES}" = "pd" ]; then
ENDPOINTS="http://127.0.0.1:${PROXY_PORT}"
fi
if [ "${USE_COLO_PROXY}" = "1" ]; then
echo "[mb5] launching colo passthrough proxy on ${PROXY_PORT} (8 kv_both instances)"
nohup python "${PROXY_SRC}" "${colo_args[@]}" --port "${PROXY_PORT}" --host 0.0.0.0 \
> "${LOGS_DIR}/proxy.log" 2>&1 &
disown
tries=0
while ! curl -s -o /dev/null -w "%{http_code}" "http://127.0.0.1:${PROXY_PORT}/" 2>/dev/null | grep -qE "^[0-9]"; do
tries=$((tries+1))
if [ ${tries} -gt 60 ]; then
echo "[mb5] FATAL colo proxy did not come up in 2 min"
tail -40 "${LOGS_DIR}/proxy.log" || true
exit 1
fi
sleep 2
done
echo " colo proxy port=${PROXY_PORT} ready (HTTP responding)"
ENDPOINTS="http://127.0.0.1:${PROXY_PORT}"
fi
echo "[mb5] CONFIG=${CONFIG} RUN_LABEL=${RUN_LABEL} UP"
echo "ENDPOINTS=${ENDPOINTS}"
echo "RUN_ROOT=${RUN_ROOT}"

View File

@@ -72,8 +72,32 @@ async def lifespan(app: FastAPI):
# Startup: Initialize client pools for prefiller and decoder services
app.state.prefill_clients = []
app.state.decode_clients = []
app.state.colo_clients = []
app.state.ready = asyncio.Event()
# Colo (PD-combined) passthrough mode: no bootstrap handshake needed.
if global_args.colo:
for url in global_args.colo:
app.state.colo_clients.append({
"client": httpx.AsyncClient(
timeout=None,
base_url=url,
trust_env=False, # ignore http_proxy env: backends are localhost
limits=httpx.Limits(
max_connections=None,
max_keepalive_connections=None,
),
),
"url": url,
})
app.state.colo_iterator = itertools.cycle(range(len(app.state.colo_clients)))
app.state.ready.set()
print(f"Colo passthrough mode: {len(app.state.colo_clients)} kv_both clients.")
yield
for client_info in app.state.colo_clients:
await client_info["client"].aclose()
return
# Create prefill clients
for i, (url, bootstrap_port) in enumerate(global_args.prefill):
parsed_url = urllib.parse.urlparse(url)
@@ -169,9 +193,25 @@ def parse_args():
help="Decode server URL. Can be specified multiple times.",
)
# MB5: colocated (PD-combined) instances. When given, the proxy runs in
# "colo" mode — it round-robins /v1/completions to these kv_both instances
# with a plain streaming passthrough (no P->D split, no kv_transfer_params).
# This exists so the 8C baseline pays the SAME proxy hop as PD configs,
# removing the "8C bypasses the proxy" confound from the comparison.
parser.add_argument(
"--colo",
nargs=1,
action="append",
dest="colo_raw",
metavar=("URL",),
help="Colocated (kv_both) server URL. Can be specified multiple times. "
"Enables colo passthrough mode.",
)
args = parser.parse_args()
args.prefill = _parse_prefill_urls(args.prefill_raw)
args.decode = _parse_decode_urls(args.decode_raw)
args.colo = [u[0] for u in args.colo_raw] if args.colo_raw else []
return args
@@ -235,6 +275,14 @@ def _parse_decode_urls(decode_list):
# Decode side stays round-robin (load balance) regardless.
MB5_P_ROUTING = os.environ.get("MB5_P_ROUTING", "rr").lower()
# MB5: routing mode for the COLO (kv_both) passthrough proxy.
# "rr" — round-robin (loses session-local prefix cache)
# "session" — consistent hash on X-Session-Id, so all turns of a session land
# on the same kv_both instance and reuse its prefix cache. This is
# the cache-aware colo baseline (the fair strong baseline for the
# agentic reuse regime — D4).
MB5_COLO_ROUTING = os.environ.get("MB5_COLO_ROUTING", "rr").lower()
def get_prefill_by_session(app, session_id: str):
"""Pick a (prefill_client, dp_rank) deterministically from session_id.
@@ -340,7 +388,58 @@ async def stream_service_response(
yield chunk
async def stream_colo_response(
colo_client_info: dict, endpoint: str, req_data: dict, headers: dict
):
"""Plain streaming passthrough to one colocated (kv_both) instance.
The request body is forwarded unchanged (stream/min_tokens/stream_options
all preserved) so the replayer's streaming + usage parsing works exactly
as it does when it talks to a colo instance directly.
"""
async with colo_client_info["client"].stream(
"POST", endpoint, json=req_data, headers=headers
) as response:
response.raise_for_status()
async for chunk in response.aiter_bytes():
yield chunk
async def _handle_colo(api: str, request: Request):
if not app.state.ready.is_set():
raise HTTPException(status_code=503, detail="Service Unavailable")
req_data = await request.json()
request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
headers = {"X-Request-Id": request_id}
session_id = request.headers.get("X-Session-Id")
if session_id:
headers["X-Session-Id"] = session_id
key = os.environ.get("OPENAI_API_KEY")
if key:
headers["Authorization"] = f"Bearer {key}"
if MB5_COLO_ROUTING == "session" and session_id:
# consistent hash -> same kv_both instance reuses its prefix cache
h = int(hashlib.md5(session_id.encode()).hexdigest()[:8], 16)
idx = h % len(app.state.colo_clients)
else:
idx = next(app.state.colo_iterator)
colo_client_info = app.state.colo_clients[idx]
async def generate_stream():
async for chunk in stream_colo_response(
colo_client_info, api, req_data, headers
):
yield chunk
return StreamingResponse(generate_stream(), media_type="text/event-stream")
async def _handle_completions(api: str, request: Request):
if getattr(global_args, "colo", None):
return await _handle_colo(api, request)
if not app.state.ready.is_set():
raise HTTPException(status_code=503, detail="Service Unavailable")

View File

@@ -17,7 +17,8 @@
set -eo pipefail
FRESH_ROOT="/home/admin/cpfs/wjh/agentic-kv-fresh"
VENV="${FRESH_ROOT}/.venv"
# MB5_VENV lets a second host use an isolated venv clone (see mb5_launch.sh).
VENV="${MB5_VENV:-${FRESH_ROOT}/.venv}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LAUNCH="${SCRIPT_DIR}/mb5_launch.sh"
REPLAYER_DIR="${FRESH_ROOT}/replayer"