feat(kvc): honor admission backpressure hints + structural event logging

Replay-side changes paired with the SGLang admission hint:

- DecodeResidencyState gains pause_until_s; admission probe parses
  recommended_pause_ms and updates the per-D pause window.
- _wait_for_decode_pause is invoked at request entry points
  (_invoke_router, _invoke_session_direct) so requests stall before
  hitting a saturated D instead of timing out via mooncake.
- New CLI flags: --enable-backpressure (default off, baseline preserved),
  --backpressure-max-pause-s (cap on per-request sleep, default 2s).

Structural instrumentation written under <run_dir>/structural/:
- admission-events.jsonl: every admission probe (RTT, queue_depth,
  pause_ms, available_tokens, evicted_count)
- backpressure-events.jsonl: every actual pause sleep
- session-d-binding.jsonl: per-request policy decision

Used to validate the structural claims documented separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
kzlin
2026-05-06 21:29:46 +08:00
parent ca4b64c79a
commit c47adaf8e3
3 changed files with 239 additions and 14 deletions

View File

@@ -45,6 +45,8 @@ class BenchmarkConfig:
kvcache_prefill_normal_priority: int = 100
pool_poll_interval_s: float = 0.0
pool_poll_include_sessions: bool = True
enable_backpressure: bool = False
backpressure_max_pause_s: float = 2.0
sample_profile: str = "default"
min_initial_input_tokens: int | None = None
max_initial_input_tokens: int | None = None
@@ -194,6 +196,8 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
kvcache_prefill_normal_priority=config.kvcache_prefill_normal_priority,
pool_poll_interval_s=config.pool_poll_interval_s,
pool_poll_include_sessions=config.pool_poll_include_sessions,
enable_backpressure=config.enable_backpressure,
backpressure_max_pause_s=config.backpressure_max_pause_s,
)
if config.request_timeout_s is not None:
replay_config = replace(
@@ -252,6 +256,8 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
),
"pool_poll_interval_s": config.pool_poll_interval_s,
"pool_poll_include_sessions": config.pool_poll_include_sessions,
"enable_backpressure": config.enable_backpressure,
"backpressure_max_pause_s": config.backpressure_max_pause_s,
"sample_profile": config.sample_profile,
"min_initial_input_tokens": config.min_initial_input_tokens,
"max_initial_input_tokens": config.max_initial_input_tokens,

View File

@@ -245,6 +245,21 @@ def main() -> None:
"Disable per-session detail in the pool timeseries (smaller files)."
),
)
replay.add_argument(
"--enable-backpressure",
action="store_true",
help=(
"Honor recommended_pause_ms hints from D's admission endpoint. "
"When set, replay sleeps before issuing requests to a saturated D. "
"Default off — preserves baseline behavior."
),
)
replay.add_argument(
"--backpressure-max-pause-s",
type=float,
default=2.0,
help="Cap on per-request backpressure sleep, regardless of D hint.",
)
sample = subparsers.add_parser(
"sample-sessions",
@@ -473,6 +488,19 @@ def main() -> None:
"Disable per-session detail in the pool timeseries (smaller files)."
),
)
benchmark.add_argument(
"--enable-backpressure",
action="store_true",
help=(
"Honor recommended_pause_ms hints from D's admission endpoint."
),
)
benchmark.add_argument(
"--backpressure-max-pause-s",
type=float,
default=2.0,
help="Cap on per-request backpressure sleep, regardless of D hint.",
)
benchmark.add_argument(
"--sample-profile",
choices=["default", "small-append"],
@@ -556,6 +584,8 @@ def main() -> None:
kvcache_prefill_normal_priority=args.kvcache_prefill_normal_priority,
pool_poll_interval_s=args.pool_poll_interval_s,
pool_poll_include_sessions=not args.pool_poll_no_sessions,
enable_backpressure=args.enable_backpressure,
backpressure_max_pause_s=args.backpressure_max_pause_s,
)
results = asyncio.run(replay_trace(config))
print(
@@ -700,6 +730,8 @@ def main() -> None:
),
pool_poll_interval_s=args.pool_poll_interval_s,
pool_poll_include_sessions=not args.pool_poll_no_sessions,
enable_backpressure=args.enable_backpressure,
backpressure_max_pause_s=args.backpressure_max_pause_s,
sample_profile=args.sample_profile,
min_initial_input_tokens=args.min_initial_input_tokens,
max_initial_input_tokens=args.max_initial_input_tokens,

View File

@@ -31,6 +31,44 @@ KvCachePrefillBackupPolicy = Literal["release-after-transfer", "capacity-backup"
_ADMISSION_PROBE_TIMEOUT_S = 2.0
# --- Structural event logging (admission probes, backpressure pauses, ---
# --- session-D bindings). Module-level state keeps call-site diff small. ---
_STRUCTURAL_LOG_DIR: Path | None = None
_STRUCTURAL_LOG_LOCK = asyncio.Lock()
_STRUCTURAL_LOG_FILES: dict[str, Any] = {}
_STRUCTURAL_RUN_START_S: float = 0.0
def _structural_init(log_dir: Path | None) -> None:
global _STRUCTURAL_LOG_DIR, _STRUCTURAL_RUN_START_S
_STRUCTURAL_LOG_DIR = log_dir
_STRUCTURAL_RUN_START_S = time.perf_counter()
if log_dir is not None:
log_dir.mkdir(parents=True, exist_ok=True)
def _structural_close() -> None:
for handle in _STRUCTURAL_LOG_FILES.values():
try:
handle.close()
except Exception:
pass
_STRUCTURAL_LOG_FILES.clear()
async def _structural_emit(filename: str, event: dict[str, Any]) -> None:
if _STRUCTURAL_LOG_DIR is None:
return
event = {"t": round(time.perf_counter() - _STRUCTURAL_RUN_START_S, 4), **event}
async with _STRUCTURAL_LOG_LOCK:
handle = _STRUCTURAL_LOG_FILES.get(filename)
if handle is None:
handle = (_STRUCTURAL_LOG_DIR / filename).open("a", encoding="utf-8")
_STRUCTURAL_LOG_FILES[filename] = handle
handle.write(json.dumps(event, sort_keys=True) + "\n")
handle.flush()
@dataclass(frozen=True)
class ReplayConfig:
trace_path: Path
@@ -66,6 +104,9 @@ class ReplayConfig:
kvcache_prefill_normal_priority: int = 100
pool_poll_interval_s: float = 0.0
pool_poll_include_sessions: bool = True
enable_backpressure: bool = False
backpressure_max_pause_s: float = 2.0
structural_log_dir: Path | None = None
@dataclass
@@ -97,6 +138,8 @@ class DecodeResidencyState:
prefill_reserved_tokens_by_server: dict[str, int] = field(default_factory=dict)
decode_evictions_prefill_backed: int = 0
decode_evictions_without_prefill_backup: int = 0
# Backpressure: per-D timestamp until which new requests should pause.
pause_until_s: dict[str, float] = field(default_factory=dict)
@dataclass(frozen=True)
@@ -132,6 +175,10 @@ class ExecutionResult:
async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
structural_dir = config.structural_log_dir
if structural_dir is None and config.output_path is not None:
structural_dir = config.output_path.parent / "structural"
_structural_init(structural_dir)
requests = load_trace(config.trace_path, request_limit=config.request_limit)
if config.kvcache_seed_only_multiturn_sessions:
session_turns = Counter(request.session_id for request in requests)
@@ -238,6 +285,7 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
trace_path=config.trace_path,
router_url=config.router_url,
)
_structural_close()
return results
@@ -261,6 +309,21 @@ async def _run_request(
async with state_lock:
decision = policy.select(request, topology=config.topology, state=state)
await _structural_emit(
"session-d-binding.jsonl",
{
"session_id": request.session_id,
"request_id": request.request_id,
"turn_id": request.turn_id,
"decode_worker_index": decision.decode_worker_index,
"decode_worker_id": decision.decode_worker_id,
"prefill_worker_id": decision.prefill_worker_id,
"observed_overlap_blocks": decision.observed_overlap_blocks,
"kv_transfer_blocks": decision.kv_transfer_blocks,
"inflight_decode_load": decision.inflight_decode_load,
},
)
try:
execution = await _execute_request(
client=client,
@@ -319,7 +382,17 @@ async def _invoke_router(
session_id: str | None = None,
prefill_request_priority: int | None = None,
decode_request_priority: int | None = None,
decode_residency: "DecodeResidencyState | None" = None,
) -> GenerateResult:
if decode_residency is not None and config.enable_backpressure:
decode_url = config.topology.decode_workers[decode_worker_index].url
await _wait_for_decode_pause(
config=config,
residency=decode_residency,
server_url=decode_url,
request_id=request.request_id,
session_id=session_id,
)
headers = _build_headers(
request=request,
header_mode=config.header_mode,
@@ -812,7 +885,12 @@ async def _query_decode_direct_admission(
uncached_input_tokens: int,
output_tokens: int,
mode: str = "direct_append",
config: "ReplayConfig | None" = None,
residency: "DecodeResidencyState | None" = None,
request_id: str | None = None,
turn_id: int | None = None,
) -> dict[str, Any]:
started = time.perf_counter()
try:
response = await client.post(
f"{server_url.rstrip('/')}/session_cache/admit_direct_append",
@@ -826,20 +904,97 @@ async def _query_decode_direct_admission(
)
response.raise_for_status()
payload = response.json()
if isinstance(payload, dict):
return payload
except Exception:
pass
return {
"can_admit": False,
"resident": False,
"reason": "admission-query-failed",
"required_tokens": 0,
"available_tokens_before": 0,
"available_tokens_after": 0,
"evicted_session_count": 0,
"freed_tokens": 0,
}
if not isinstance(payload, dict):
payload = None
except Exception as exc:
payload = None
_last_exc_msg = type(exc).__name__
else:
_last_exc_msg = None
if payload is None:
payload = {
"can_admit": False,
"resident": False,
"reason": "admission-query-failed",
"required_tokens": 0,
"available_tokens_before": 0,
"available_tokens_after": 0,
"evicted_session_count": 0,
"freed_tokens": 0,
}
rtt_s = time.perf_counter() - started
pause_ms = int(payload.get("recommended_pause_ms", 0) or 0)
# Update per-D pause window when backpressure is enabled.
if (
config is not None
and residency is not None
and config.enable_backpressure
and pause_ms > 0
):
max_pause_s = max(0.0, config.backpressure_max_pause_s)
applied_pause_s = min(pause_ms / 1000.0, max_pause_s)
new_until = time.perf_counter() + applied_pause_s
prev = residency.pause_until_s.get(server_url, 0.0)
if new_until > prev:
residency.pause_until_s[server_url] = new_until
# Always emit admission event for analysis (even if backpressure disabled).
await _structural_emit(
"admission-events.jsonl",
{
"server_url": server_url,
"session_id": session_id,
"request_id": request_id,
"turn_id": turn_id,
"mode": mode,
"rtt_s": round(rtt_s, 4),
"can_admit": bool(payload.get("can_admit")),
"resident": bool(payload.get("resident")),
"reason": payload.get("reason"),
"queue_depth": int(payload.get("decode_transfer_queue_reqs", 0) or 0),
"retracted_depth": int(payload.get("decode_retracted_queue_reqs", 0) or 0),
"available_tokens_after": int(payload.get("available_tokens_after", 0) or 0),
"token_usage": float(payload.get("token_usage", 0.0) or 0.0),
"evicted_session_count": int(payload.get("evicted_session_count", 0) or 0),
"recommended_pause_ms": pause_ms,
"uncached_input_tokens": int(uncached_input_tokens),
"output_tokens": int(output_tokens),
},
)
return payload
async def _wait_for_decode_pause(
*,
config: "ReplayConfig",
residency: "DecodeResidencyState",
server_url: str,
request_id: str | None = None,
session_id: str | None = None,
) -> None:
if not config.enable_backpressure:
return
until = residency.pause_until_s.get(server_url, 0.0)
if until <= 0:
return
now = time.perf_counter()
if now >= until:
return
sleep_s = min(until - now, config.backpressure_max_pause_s)
await _structural_emit(
"backpressure-events.jsonl",
{
"server_url": server_url,
"session_id": session_id,
"request_id": request_id,
"sleep_s": round(sleep_s, 4),
"until_offset_s": round(until - _STRUCTURAL_RUN_START_S, 4),
},
)
await asyncio.sleep(sleep_s)
async def _discover_decode_residency(
@@ -1500,6 +1655,10 @@ async def _reserve_decode_session_capacity(
uncached_input_tokens=max(0, request.input_length - current_tokens),
output_tokens=request.output_length,
mode="direct_append",
config=config,
residency=residency,
request_id=request.request_id,
turn_id=request.turn_id,
)
if not bool(admission.get("resident")):
return False, 0, 0, 0, str(admission.get("reason") or "d-session-not-resident")
@@ -1535,6 +1694,10 @@ async def _reserve_decode_session_capacity(
uncached_input_tokens=max(0, request.input_length - current_tokens),
output_tokens=request.output_length,
mode="seed",
config=config,
residency=residency,
request_id=request.request_id,
turn_id=request.turn_id,
)
seed_reason = seed_admission.get("reason")
if seed_reason != "admission-query-failed":
@@ -1837,6 +2000,7 @@ async def _invoke_plain_router(
config: ReplayConfig,
decision,
execution_mode: str,
decode_residency: "DecodeResidencyState | None" = None,
) -> ExecutionResult:
prefill_priority = _prefill_priority_for_router_request(
config=config,
@@ -1848,6 +2012,7 @@ async def _invoke_plain_router(
config=config,
decode_worker_index=decision.decode_worker_index,
prefill_request_priority=prefill_priority,
decode_residency=decode_residency,
)
return ExecutionResult(
execution_mode=execution_mode,
@@ -1941,6 +2106,7 @@ async def _invoke_kvcache_seeded_router(
decode_worker_index=decision.decode_worker_index,
session_id=request.session_id,
prefill_request_priority=prefill_priority,
decode_residency=decode_residency,
)
except Exception:
async with direct_session_lock:
@@ -2032,6 +2198,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="pd-disaggregation-router",
decode_residency=decode_residency,
)
if config.mechanism_name == "pd-colo":
@@ -2043,6 +2210,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="dp-colo-router",
decode_residency=decode_residency,
)
return replace(result, actual_kv_transfer_blocks=0)
@@ -2101,6 +2269,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode=f"pd-router-turn1-{seed_filter_reason}",
decode_residency=decode_residency,
)
async with direct_session_lock:
admit_new_decode_session = _should_admit_new_decode_session(
@@ -2138,6 +2307,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="pd-router-turn1-session-cap",
decode_residency=decode_residency,
)
if can_seed:
return await _invoke_kvcache_seeded_router(
@@ -2163,6 +2333,7 @@ async def _execute_request(
if seed_reason is not None and seed_reason != "d-no-space"
else "pd-router-turn1-no-d-capacity"
),
decode_residency=decode_residency,
)
if (
@@ -2234,6 +2405,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="pd-router-fallback-stale-d-session",
decode_residency=decode_residency,
)
if _is_decode_backpressure_reason(direct_reason):
return await _invoke_plain_router(
@@ -2242,6 +2414,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="pd-router-fallback-d-backpressure",
decode_residency=decode_residency,
)
seed_filter_reason = _seed_filter_reason(
@@ -2256,6 +2429,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode=f"pd-router-fallback-{seed_filter_reason}",
decode_residency=decode_residency,
)
async with direct_session_lock:
admit_new_decode_session = _should_admit_new_decode_session(
@@ -2301,6 +2475,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="pd-router-fallback-session-cap",
decode_residency=decode_residency,
)
if can_seed:
return await _invoke_kvcache_seeded_router(
@@ -2332,6 +2507,7 @@ async def _execute_request(
if _is_decode_backpressure_reason(seed_reason)
else "pd-router-fallback-no-d-capacity"
),
decode_residency=decode_residency,
)
seed_filter_reason = _seed_filter_reason(
@@ -2346,6 +2522,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode=f"pd-router-fallback-large-append-{seed_filter_reason}",
decode_residency=decode_residency,
)
async with direct_session_lock:
admit_new_decode_session = _should_admit_new_decode_session(
@@ -2390,6 +2567,7 @@ async def _execute_request(
config=config,
decision=decision,
execution_mode="pd-router-fallback-large-append-session-cap",
decode_residency=decode_residency,
)
if can_seed:
return await _invoke_kvcache_seeded_router(
@@ -2421,6 +2599,7 @@ async def _execute_request(
if _is_decode_backpressure_reason(seed_reason)
else "pd-router-fallback-large-append"
),
decode_residency=decode_residency,
)
raise ValueError(f"Unsupported mechanism: {config.mechanism_name}")
@@ -2466,6 +2645,14 @@ async def _invoke_session_direct(
reserved_tokens: int = 0,
direct_session_lock: asyncio.Lock | None = None,
) -> ExecutionResult:
if decode_residency is not None and config.enable_backpressure:
await _wait_for_decode_pause(
config=config,
residency=decode_residency,
server_url=session.server_url,
request_id=request.request_id,
session_id=session.session_id,
)
_prompt, effective_input_length, session_reused, session_reset = _build_direct_prompt(
request=request,
session=session,