feat(kvc): Option D - delegate seed/reseed admission to D worker

v4 (cap=16) saw 35% session-cap fallback because the local soft_cap
min(16, usable / target) evaluates to 1-2 for large agentic inputs.
The cap was hit not because D was full but because replay's heuristic
underestimated capacity.

This change makes worker admission_mode authoritative for ALL paths:

SGLang side:
- io_struct.py: DirectAppendAdmissionReqInput gains a `mode` field
  ("direct_append" | "seed", default "direct_append" preserves prior
  behavior).
- scheduler.py:admit_direct_append: when mode == "seed", skip the
  resident-on-D requirement and run the same capacity check + LRU
  eviction (maybe_trim_decode_session_cache) that direct_append uses.
  This lets D atomically decide if a new session can be admitted based
  on actual token_to_kv_pool_allocator state.

Replay side (replay.py):
- _query_decode_direct_admission gains a `mode` parameter.
- _reserve_decode_session_capacity: in worker admission_mode, the
  seed/reseed branch now queries D with mode="seed" and trusts the
  result, instead of estimating capacity from the residency snapshot.
- _should_admit_new_decode_session: in worker mode, skip the local
  soft_cap pre-check and let D decide. Same-D session fast-path is
  preserved.

Effects:
- Local hardcoded cap of 16 is bypassed under worker mode; D's real
  KV pool size is the only constraint.
- LRU eviction runs in D's process atomically with admission, so
  starvation (the v3 bimodal "lucky vs starved sessions" pattern)
  should resolve.

scripts/sweep_tp1_v5_optD.sh added to run the same 1P7D / 2P6D
configs as v4 with the new admission path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
kzlin
2026-04-28 23:40:03 +08:00
parent 74194e660a
commit 6e5ed8da80
4 changed files with 173 additions and 2 deletions

114
scripts/sweep_tp1_v5_optD.sh Executable file
View File

@@ -0,0 +1,114 @@
#!/bin/bash
# TP1 v5 sweep — Option D: D-side admission for seed/reseed.
#
# v4 (cap=16) still saw 35% session-cap fallback because the local soft_cap
# evaluates min(16, usable_capacity_tokens / target_tokens) and target_tokens
# (= input + output) is 50-100K in agentic workloads, giving cap = 1-2.
#
# v5 makes worker admission_mode authoritative for ALL admission decisions
# (direct_append AND seed/reseed). Replay calls D's
# /session_cache/admit_direct_append with mode={direct_append|seed} and
# defers to D's KV pool availability + LRU eviction. Replay's local
# _decode_session_soft_cap is bypassed entirely under worker mode.
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-tp1-v5-optD
VENV_PYTHON=.venv/bin/python
RESULTS_FILE=$OUTPUT/sweep_results.txt
mkdir -p $OUTPUT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
}
save_result() {
local label=$1
local run_dir=$2
log "=== $label COMPLETED ==="
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
log "Summary:"
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
echo "" >> $RESULTS_FILE
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl"
log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl"
else
log "WARNING: No summary file found in $run_dir"
fi
}
log "Starting TP1 v5 sweep (Option D: D-side seed admission)"
log "Model: $MODEL"
log "Trace: $TRACE (4449 requests, 52 sessions)"
log "Key change: worker admission_mode now drives seed/reseed via D's admit endpoint"
########################################
# Experiment 1: 1P + 7D KVC kv-aware Option D
########################################
log ""
log "=== [EXP1] 1P7D KVC kv-aware Option D ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 7 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp1_1p7d_kvc_optD" "$EXP1_DIR"
########################################
# Experiment 2: 2P + 6D KVC kv-aware Option D
########################################
log ""
log "=== [EXP2] 2P6D KVC kv-aware Option D ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 2 --decode-workers 6 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp2_2p6d_kvc_optD" "$EXP2_DIR"
log ""
log "=== ALL TP1 V5 SWEEP EXPERIMENTS DONE ==="

View File

@@ -651,6 +651,7 @@ async def _query_decode_direct_admission(
session_id: str, session_id: str,
uncached_input_tokens: int, uncached_input_tokens: int,
output_tokens: int, output_tokens: int,
mode: str = "direct_append",
) -> dict[str, Any]: ) -> dict[str, Any]:
try: try:
response = await client.post( response = await client.post(
@@ -659,6 +660,7 @@ async def _query_decode_direct_admission(
"session_id": session_id, "session_id": session_id,
"uncached_input_tokens": max(0, uncached_input_tokens), "uncached_input_tokens": max(0, uncached_input_tokens),
"output_tokens": max(0, output_tokens), "output_tokens": max(0, output_tokens),
"mode": mode,
}, },
timeout=_ADMISSION_PROBE_TIMEOUT_S, timeout=_ADMISSION_PROBE_TIMEOUT_S,
) )
@@ -913,6 +915,7 @@ def _should_admit_new_decode_session(
session: DirectSessionState, session: DirectSessionState,
direct_sessions: dict[str, DirectSessionState], direct_sessions: dict[str, DirectSessionState],
treat_as_fresh_session: bool, treat_as_fresh_session: bool,
admission_mode: KvCacheAdmissionMode = "router",
) -> bool: ) -> bool:
if ( if (
not treat_as_fresh_session not treat_as_fresh_session
@@ -920,6 +923,11 @@ def _should_admit_new_decode_session(
and session.server_url == server_url and session.server_url == server_url
): ):
return True return True
if admission_mode == "worker":
# Defer the capacity decision to D's admit_direct_append (mode=seed),
# which checks real KV pool availability and runs LRU eviction. The
# local soft cap is router-mode only.
return True
open_sessions = sum( open_sessions = sum(
1 1
for candidate in direct_sessions.values() for candidate in direct_sessions.values()
@@ -1331,6 +1339,7 @@ async def _reserve_decode_session_capacity(
session_id=session.session_id, session_id=session.session_id,
uncached_input_tokens=max(0, request.input_length - current_tokens), uncached_input_tokens=max(0, request.input_length - current_tokens),
output_tokens=request.output_length, output_tokens=request.output_length,
mode="direct_append",
) )
if not bool(admission.get("resident")): if not bool(admission.get("resident")):
return False, 0, 0, 0, str(admission.get("reason") or "d-session-not-resident") return False, 0, 0, 0, str(admission.get("reason") or "d-session-not-resident")
@@ -1355,6 +1364,41 @@ async def _reserve_decode_session_capacity(
None, None,
) )
# Seed / reseed path: ask D itself via the seed-mode admission endpoint
# instead of estimating capacity from a stale router-state snapshot. D
# will run LRU eviction internally to make room. Falls through to the
# legacy router-state logic below if the endpoint is unavailable.
seed_admission = await _query_decode_direct_admission(
client=client,
server_url=server_url,
session_id=session.session_id,
uncached_input_tokens=max(0, request.input_length - current_tokens),
output_tokens=request.output_length,
mode="seed",
)
seed_reason = seed_admission.get("reason")
if seed_reason != "admission-query-failed":
if not bool(seed_admission.get("can_admit")):
return (
False,
0,
int(seed_admission.get("evicted_session_count", 0) or 0),
0,
str(seed_reason or "d-no-space"),
)
reserved_tokens = int(
seed_admission.get("required_tokens", required_extra_tokens)
or required_extra_tokens
)
_add_reserved_tokens(residency, server_url, reserved_tokens)
return (
True,
reserved_tokens,
int(seed_admission.get("evicted_session_count", 0) or 0),
0,
None,
)
session_cache, max_total_num_tokens, reserved_decode_tokens = ( session_cache, max_total_num_tokens, reserved_decode_tokens = (
await _fetch_decode_server_state( await _fetch_decode_server_state(
client=client, client=client,
@@ -1906,6 +1950,7 @@ async def _execute_request(
session=decode_session, session=decode_session,
direct_sessions=direct_sessions, direct_sessions=direct_sessions,
treat_as_fresh_session=True, treat_as_fresh_session=True,
admission_mode=config.kvcache_admission_mode,
) )
if not admit_new_decode_session: if not admit_new_decode_session:
can_seed = False can_seed = False
@@ -2060,6 +2105,7 @@ async def _execute_request(
session=decode_session, session=decode_session,
direct_sessions=direct_sessions, direct_sessions=direct_sessions,
treat_as_fresh_session=True, treat_as_fresh_session=True,
admission_mode=config.kvcache_admission_mode,
) )
if not admit_new_decode_session: if not admit_new_decode_session:
can_seed = False can_seed = False

View File

@@ -1602,6 +1602,9 @@ class DirectAppendAdmissionReqInput(BaseReq):
session_id: str session_id: str
uncached_input_tokens: int uncached_input_tokens: int
output_tokens: int output_tokens: int
# "direct_append": existing behavior — require session resident on this D
# "seed": new admission for session not yet resident; do capacity check + LRU eviction
mode: str = "direct_append"
@dataclass @dataclass

View File

@@ -3508,6 +3508,9 @@ class Scheduler(
reason="unsupported", reason="unsupported",
) )
mode = getattr(recv_req, "mode", "direct_append") or "direct_append"
is_seed = mode == "seed"
session_cache_status = self.session_controller.get_streaming_session_cache_status( session_cache_status = self.session_controller.get_streaming_session_cache_status(
recv_req.session_id recv_req.session_id
) )
@@ -3515,7 +3518,9 @@ class Scheduler(
resident = bool( resident = bool(
isinstance(target_session, dict) and target_session.get("resident") isinstance(target_session, dict) and target_session.get("resident")
) )
if not resident: if not resident and not is_seed:
# direct_append requires the session already resident on this D.
# For seed we skip this check and let capacity decide.
return DirectAppendAdmissionReqOutput( return DirectAppendAdmissionReqOutput(
can_admit=False, can_admit=False,
resident=False, resident=False,
@@ -3543,10 +3548,13 @@ class Scheduler(
0, recv_req.output_tokens 0, recv_req.output_tokens
) )
available_tokens_before = int(self.token_to_kv_pool_allocator.available_size()) available_tokens_before = int(self.token_to_kv_pool_allocator.available_size())
# Don't evict the session itself when it's already resident; for seed
# of a fresh session there is nothing to exclude.
exclude_ids = {recv_req.session_id} if resident else set()
trim_result = self.maybe_trim_decode_session_cache( trim_result = self.maybe_trim_decode_session_cache(
required_tokens=required_tokens, required_tokens=required_tokens,
force=available_tokens_before < required_tokens, force=available_tokens_before < required_tokens,
exclude_session_ids={recv_req.session_id}, exclude_session_ids=exclude_ids,
) )
available_tokens_after = int(self.token_to_kv_pool_allocator.available_size()) available_tokens_after = int(self.token_to_kv_pool_allocator.available_size())
decode_retracted_queue_reqs = len(self.disagg_decode_prealloc_queue.retracted_queue) decode_retracted_queue_reqs = len(self.disagg_decode_prealloc_queue.retracted_queue)