feat(kvc): Option D - delegate seed/reseed admission to D worker
v4 (cap=16) saw 35% session-cap fallback because the local soft_cap
min(16, usable / target) evaluates to 1-2 for large agentic inputs.
The cap was hit not because D was full but because replay's heuristic
underestimated capacity.
This change makes worker admission_mode authoritative for ALL paths:
SGLang side:
- io_struct.py: DirectAppendAdmissionReqInput gains a `mode` field
("direct_append" | "seed", default "direct_append" preserves prior
behavior).
- scheduler.py:admit_direct_append: when mode == "seed", skip the
resident-on-D requirement and run the same capacity check + LRU
eviction (maybe_trim_decode_session_cache) that direct_append uses.
This lets D atomically decide if a new session can be admitted based
on actual token_to_kv_pool_allocator state.
Replay side (replay.py):
- _query_decode_direct_admission gains a `mode` parameter.
- _reserve_decode_session_capacity: in worker admission_mode, the
seed/reseed branch now queries D with mode="seed" and trusts the
result, instead of estimating capacity from the residency snapshot.
- _should_admit_new_decode_session: in worker mode, skip the local
soft_cap pre-check and let D decide. Same-D session fast-path is
preserved.
Effects:
- Local hardcoded cap of 16 is bypassed under worker mode; D's real
KV pool size is the only constraint.
- LRU eviction runs in D's process atomically with admission, so
starvation (the v3 bimodal "lucky vs starved sessions" pattern)
should resolve.
scripts/sweep_tp1_v5_optD.sh added to run the same 1P7D / 2P6D
configs as v4 with the new admission path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
114
scripts/sweep_tp1_v5_optD.sh
Executable file
114
scripts/sweep_tp1_v5_optD.sh
Executable file
@@ -0,0 +1,114 @@
|
||||
#!/bin/bash
|
||||
# TP1 v5 sweep — Option D: D-side admission for seed/reseed.
|
||||
#
|
||||
# v4 (cap=16) still saw 35% session-cap fallback because the local soft_cap
|
||||
# evaluates min(16, usable_capacity_tokens / target_tokens) and target_tokens
|
||||
# (= input + output) is 50-100K in agentic workloads, giving cap = 1-2.
|
||||
#
|
||||
# v5 makes worker admission_mode authoritative for ALL admission decisions
|
||||
# (direct_append AND seed/reseed). Replay calls D's
|
||||
# /session_cache/admit_direct_append with mode={direct_append|seed} and
|
||||
# defers to D's KV pool availability + LRU eviction. Replay's local
|
||||
# _decode_session_soft_cap is bypassed entirely under worker mode.
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
|
||||
TRACE=outputs/qwen35-swebench-50sess.jsonl
|
||||
OUTPUT=outputs/qwen3-30b-tp1-v5-optD
|
||||
VENV_PYTHON=.venv/bin/python
|
||||
RESULTS_FILE=$OUTPUT/sweep_results.txt
|
||||
|
||||
mkdir -p $OUTPUT
|
||||
|
||||
log() {
|
||||
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
|
||||
}
|
||||
|
||||
save_result() {
|
||||
local label=$1
|
||||
local run_dir=$2
|
||||
log "=== $label COMPLETED ==="
|
||||
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
|
||||
log "Summary:"
|
||||
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
|
||||
echo "" >> $RESULTS_FILE
|
||||
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
|
||||
cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl"
|
||||
log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl"
|
||||
else
|
||||
log "WARNING: No summary file found in $run_dir"
|
||||
fi
|
||||
}
|
||||
|
||||
log "Starting TP1 v5 sweep (Option D: D-side seed admission)"
|
||||
log "Model: $MODEL"
|
||||
log "Trace: $TRACE (4449 requests, 52 sessions)"
|
||||
log "Key change: worker admission_mode now drives seed/reseed via D's admit endpoint"
|
||||
|
||||
########################################
|
||||
# Experiment 1: 1P + 7D KVC kv-aware Option D
|
||||
########################################
|
||||
log ""
|
||||
log "=== [EXP1] 1P7D KVC kv-aware Option D ==="
|
||||
PYTHONPATH=src:third_party/sglang/python \
|
||||
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
|
||||
--trace $TRACE \
|
||||
--output-root $OUTPUT \
|
||||
--mechanism kvcache-centric \
|
||||
--policy kv-aware \
|
||||
--model-path $MODEL \
|
||||
--prefill-workers 1 --decode-workers 7 \
|
||||
--prefill-tp-size 1 --decode-tp-size 1 \
|
||||
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
|
||||
--transfer-backend mooncake \
|
||||
--gpu-budget 8 \
|
||||
--time-scale 10 \
|
||||
--session-sample-rate 1.0 \
|
||||
--target-duration-s 100000 \
|
||||
--concurrency-limit 32 \
|
||||
--timeout-s 900 \
|
||||
--request-timeout-s 300 \
|
||||
--kvcache-admission-mode worker \
|
||||
--kvcache-seed-min-turn-id 1 \
|
||||
--kvcache-seed-max-inflight-decode -1 \
|
||||
--kvcache-prefill-backup-policy release-after-transfer \
|
||||
--kvcache-prefill-priority-eviction
|
||||
|
||||
EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
|
||||
save_result "exp1_1p7d_kvc_optD" "$EXP1_DIR"
|
||||
|
||||
########################################
|
||||
# Experiment 2: 2P + 6D KVC kv-aware Option D
|
||||
########################################
|
||||
log ""
|
||||
log "=== [EXP2] 2P6D KVC kv-aware Option D ==="
|
||||
PYTHONPATH=src:third_party/sglang/python \
|
||||
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
|
||||
--trace $TRACE \
|
||||
--output-root $OUTPUT \
|
||||
--mechanism kvcache-centric \
|
||||
--policy kv-aware \
|
||||
--model-path $MODEL \
|
||||
--prefill-workers 2 --decode-workers 6 \
|
||||
--prefill-tp-size 1 --decode-tp-size 1 \
|
||||
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
|
||||
--transfer-backend mooncake \
|
||||
--gpu-budget 8 \
|
||||
--time-scale 10 \
|
||||
--session-sample-rate 1.0 \
|
||||
--target-duration-s 100000 \
|
||||
--concurrency-limit 32 \
|
||||
--timeout-s 900 \
|
||||
--request-timeout-s 300 \
|
||||
--kvcache-admission-mode worker \
|
||||
--kvcache-seed-min-turn-id 1 \
|
||||
--kvcache-seed-max-inflight-decode -1 \
|
||||
--kvcache-prefill-backup-policy release-after-transfer \
|
||||
--kvcache-prefill-priority-eviction
|
||||
|
||||
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
|
||||
save_result "exp2_2p6d_kvc_optD" "$EXP2_DIR"
|
||||
|
||||
log ""
|
||||
log "=== ALL TP1 V5 SWEEP EXPERIMENTS DONE ==="
|
||||
@@ -651,6 +651,7 @@ async def _query_decode_direct_admission(
|
||||
session_id: str,
|
||||
uncached_input_tokens: int,
|
||||
output_tokens: int,
|
||||
mode: str = "direct_append",
|
||||
) -> dict[str, Any]:
|
||||
try:
|
||||
response = await client.post(
|
||||
@@ -659,6 +660,7 @@ async def _query_decode_direct_admission(
|
||||
"session_id": session_id,
|
||||
"uncached_input_tokens": max(0, uncached_input_tokens),
|
||||
"output_tokens": max(0, output_tokens),
|
||||
"mode": mode,
|
||||
},
|
||||
timeout=_ADMISSION_PROBE_TIMEOUT_S,
|
||||
)
|
||||
@@ -913,6 +915,7 @@ def _should_admit_new_decode_session(
|
||||
session: DirectSessionState,
|
||||
direct_sessions: dict[str, DirectSessionState],
|
||||
treat_as_fresh_session: bool,
|
||||
admission_mode: KvCacheAdmissionMode = "router",
|
||||
) -> bool:
|
||||
if (
|
||||
not treat_as_fresh_session
|
||||
@@ -920,6 +923,11 @@ def _should_admit_new_decode_session(
|
||||
and session.server_url == server_url
|
||||
):
|
||||
return True
|
||||
if admission_mode == "worker":
|
||||
# Defer the capacity decision to D's admit_direct_append (mode=seed),
|
||||
# which checks real KV pool availability and runs LRU eviction. The
|
||||
# local soft cap is router-mode only.
|
||||
return True
|
||||
open_sessions = sum(
|
||||
1
|
||||
for candidate in direct_sessions.values()
|
||||
@@ -1331,6 +1339,7 @@ async def _reserve_decode_session_capacity(
|
||||
session_id=session.session_id,
|
||||
uncached_input_tokens=max(0, request.input_length - current_tokens),
|
||||
output_tokens=request.output_length,
|
||||
mode="direct_append",
|
||||
)
|
||||
if not bool(admission.get("resident")):
|
||||
return False, 0, 0, 0, str(admission.get("reason") or "d-session-not-resident")
|
||||
@@ -1355,6 +1364,41 @@ async def _reserve_decode_session_capacity(
|
||||
None,
|
||||
)
|
||||
|
||||
# Seed / reseed path: ask D itself via the seed-mode admission endpoint
|
||||
# instead of estimating capacity from a stale router-state snapshot. D
|
||||
# will run LRU eviction internally to make room. Falls through to the
|
||||
# legacy router-state logic below if the endpoint is unavailable.
|
||||
seed_admission = await _query_decode_direct_admission(
|
||||
client=client,
|
||||
server_url=server_url,
|
||||
session_id=session.session_id,
|
||||
uncached_input_tokens=max(0, request.input_length - current_tokens),
|
||||
output_tokens=request.output_length,
|
||||
mode="seed",
|
||||
)
|
||||
seed_reason = seed_admission.get("reason")
|
||||
if seed_reason != "admission-query-failed":
|
||||
if not bool(seed_admission.get("can_admit")):
|
||||
return (
|
||||
False,
|
||||
0,
|
||||
int(seed_admission.get("evicted_session_count", 0) or 0),
|
||||
0,
|
||||
str(seed_reason or "d-no-space"),
|
||||
)
|
||||
reserved_tokens = int(
|
||||
seed_admission.get("required_tokens", required_extra_tokens)
|
||||
or required_extra_tokens
|
||||
)
|
||||
_add_reserved_tokens(residency, server_url, reserved_tokens)
|
||||
return (
|
||||
True,
|
||||
reserved_tokens,
|
||||
int(seed_admission.get("evicted_session_count", 0) or 0),
|
||||
0,
|
||||
None,
|
||||
)
|
||||
|
||||
session_cache, max_total_num_tokens, reserved_decode_tokens = (
|
||||
await _fetch_decode_server_state(
|
||||
client=client,
|
||||
@@ -1906,6 +1950,7 @@ async def _execute_request(
|
||||
session=decode_session,
|
||||
direct_sessions=direct_sessions,
|
||||
treat_as_fresh_session=True,
|
||||
admission_mode=config.kvcache_admission_mode,
|
||||
)
|
||||
if not admit_new_decode_session:
|
||||
can_seed = False
|
||||
@@ -2060,6 +2105,7 @@ async def _execute_request(
|
||||
session=decode_session,
|
||||
direct_sessions=direct_sessions,
|
||||
treat_as_fresh_session=True,
|
||||
admission_mode=config.kvcache_admission_mode,
|
||||
)
|
||||
if not admit_new_decode_session:
|
||||
can_seed = False
|
||||
|
||||
@@ -1602,6 +1602,9 @@ class DirectAppendAdmissionReqInput(BaseReq):
|
||||
session_id: str
|
||||
uncached_input_tokens: int
|
||||
output_tokens: int
|
||||
# "direct_append": existing behavior — require session resident on this D
|
||||
# "seed": new admission for session not yet resident; do capacity check + LRU eviction
|
||||
mode: str = "direct_append"
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -3508,6 +3508,9 @@ class Scheduler(
|
||||
reason="unsupported",
|
||||
)
|
||||
|
||||
mode = getattr(recv_req, "mode", "direct_append") or "direct_append"
|
||||
is_seed = mode == "seed"
|
||||
|
||||
session_cache_status = self.session_controller.get_streaming_session_cache_status(
|
||||
recv_req.session_id
|
||||
)
|
||||
@@ -3515,7 +3518,9 @@ class Scheduler(
|
||||
resident = bool(
|
||||
isinstance(target_session, dict) and target_session.get("resident")
|
||||
)
|
||||
if not resident:
|
||||
if not resident and not is_seed:
|
||||
# direct_append requires the session already resident on this D.
|
||||
# For seed we skip this check and let capacity decide.
|
||||
return DirectAppendAdmissionReqOutput(
|
||||
can_admit=False,
|
||||
resident=False,
|
||||
@@ -3543,10 +3548,13 @@ class Scheduler(
|
||||
0, recv_req.output_tokens
|
||||
)
|
||||
available_tokens_before = int(self.token_to_kv_pool_allocator.available_size())
|
||||
# Don't evict the session itself when it's already resident; for seed
|
||||
# of a fresh session there is nothing to exclude.
|
||||
exclude_ids = {recv_req.session_id} if resident else set()
|
||||
trim_result = self.maybe_trim_decode_session_cache(
|
||||
required_tokens=required_tokens,
|
||||
force=available_tokens_before < required_tokens,
|
||||
exclude_session_ids={recv_req.session_id},
|
||||
exclude_session_ids=exclude_ids,
|
||||
)
|
||||
available_tokens_after = int(self.token_to_kv_pool_allocator.available_size())
|
||||
decode_retracted_queue_reqs = len(self.disagg_decode_prealloc_queue.retracted_queue)
|
||||
|
||||
Reference in New Issue
Block a user