From c928c7db2309f0c94efd8188b824a24b82b3d21d Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Sat, 25 Apr 2026 17:29:15 +0000 Subject: [PATCH] Add transfer queue admission knobs --- src/agentic_pd_hybrid/benchmark.py | 14 +++++++ src/agentic_pd_hybrid/cli.py | 64 ++++++++++++++++++++++++++++++ src/agentic_pd_hybrid/replay.py | 24 +++++++++++ 3 files changed, 102 insertions(+) diff --git a/src/agentic_pd_hybrid/benchmark.py b/src/agentic_pd_hybrid/benchmark.py index 2c09296..978d62e 100644 --- a/src/agentic_pd_hybrid/benchmark.py +++ b/src/agentic_pd_hybrid/benchmark.py @@ -38,6 +38,8 @@ class BenchmarkConfig: kvcache_seed_only_multiturn_sessions: bool = False kvcache_prefill_backup_policy: str = "release-after-transfer" kvcache_seed_max_inflight_decode: int | None = 3 + kvcache_seed_max_decode_transfer_queue_reqs: int | None = None + kvcache_direct_max_decode_transfer_queue_reqs: int | None = None kvcache_prefill_priority_eviction: bool = False kvcache_prefill_direct_priority: int = -100 kvcache_prefill_normal_priority: int = 100 @@ -174,6 +176,12 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts: kvcache_seed_max_inflight_decode=( config.kvcache_seed_max_inflight_decode ), + kvcache_seed_max_decode_transfer_queue_reqs=( + config.kvcache_seed_max_decode_transfer_queue_reqs + ), + kvcache_direct_max_decode_transfer_queue_reqs=( + config.kvcache_direct_max_decode_transfer_queue_reqs + ), kvcache_prefill_priority_eviction=( config.kvcache_prefill_priority_eviction ), @@ -220,6 +228,12 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts: "kvcache_seed_max_inflight_decode": ( config.kvcache_seed_max_inflight_decode ), + "kvcache_seed_max_decode_transfer_queue_reqs": ( + config.kvcache_seed_max_decode_transfer_queue_reqs + ), + "kvcache_direct_max_decode_transfer_queue_reqs": ( + config.kvcache_direct_max_decode_transfer_queue_reqs + ), "kvcache_prefill_priority_eviction": ( config.kvcache_prefill_priority_eviction ), diff --git a/src/agentic_pd_hybrid/cli.py b/src/agentic_pd_hybrid/cli.py index 96629a4..90e87b8 100644 --- a/src/agentic_pd_hybrid/cli.py +++ b/src/agentic_pd_hybrid/cli.py @@ -197,6 +197,26 @@ def main() -> None: "Use a negative value to disable this filter." ), ) + replay.add_argument( + "--kvcache-seed-max-decode-transfer-queue-reqs", + type=int, + default=None, + help=( + "For kvcache-centric worker admission, skip seed/reseed when the " + "target D reports more transfer-queue requests than this value. " + "Use a negative value to disable this filter." + ), + ) + replay.add_argument( + "--kvcache-direct-max-decode-transfer-queue-reqs", + type=int, + default=None, + help=( + "For kvcache-centric worker admission, skip direct-to-D append when " + "the target D reports more transfer-queue requests than this value. " + "Use a negative value to disable this filter." + ), + ) replay.add_argument( "--kvcache-prefill-priority-eviction", action="store_true", @@ -389,6 +409,26 @@ def main() -> None: "Use a negative value to disable this filter." ), ) + benchmark.add_argument( + "--kvcache-seed-max-decode-transfer-queue-reqs", + type=int, + default=None, + help=( + "For kvcache-centric worker admission, skip seed/reseed when the " + "target D reports more transfer-queue requests than this value. " + "Use a negative value to disable this filter." + ), + ) + benchmark.add_argument( + "--kvcache-direct-max-decode-transfer-queue-reqs", + type=int, + default=None, + help=( + "For kvcache-centric worker admission, skip direct-to-D append when " + "the target D reports more transfer-queue requests than this value. " + "Use a negative value to disable this filter." + ), + ) benchmark.add_argument( "--kvcache-prefill-priority-eviction", action="store_true", @@ -456,6 +496,18 @@ def main() -> None: if args.kvcache_seed_max_inflight_decode < 0 else args.kvcache_seed_max_inflight_decode ), + kvcache_seed_max_decode_transfer_queue_reqs=( + None + if args.kvcache_seed_max_decode_transfer_queue_reqs is None + or args.kvcache_seed_max_decode_transfer_queue_reqs < 0 + else args.kvcache_seed_max_decode_transfer_queue_reqs + ), + kvcache_direct_max_decode_transfer_queue_reqs=( + None + if args.kvcache_direct_max_decode_transfer_queue_reqs is None + or args.kvcache_direct_max_decode_transfer_queue_reqs < 0 + else args.kvcache_direct_max_decode_transfer_queue_reqs + ), kvcache_prefill_priority_eviction=( args.kvcache_prefill_priority_eviction ), @@ -582,6 +634,18 @@ def main() -> None: if args.kvcache_seed_max_inflight_decode < 0 else args.kvcache_seed_max_inflight_decode ), + kvcache_seed_max_decode_transfer_queue_reqs=( + None + if args.kvcache_seed_max_decode_transfer_queue_reqs is None + or args.kvcache_seed_max_decode_transfer_queue_reqs < 0 + else args.kvcache_seed_max_decode_transfer_queue_reqs + ), + kvcache_direct_max_decode_transfer_queue_reqs=( + None + if args.kvcache_direct_max_decode_transfer_queue_reqs is None + or args.kvcache_direct_max_decode_transfer_queue_reqs < 0 + else args.kvcache_direct_max_decode_transfer_queue_reqs + ), kvcache_prefill_priority_eviction=( args.kvcache_prefill_priority_eviction ), diff --git a/src/agentic_pd_hybrid/replay.py b/src/agentic_pd_hybrid/replay.py index 7bf2d8a..801abfb 100644 --- a/src/agentic_pd_hybrid/replay.py +++ b/src/agentic_pd_hybrid/replay.py @@ -59,6 +59,8 @@ class ReplayConfig: "release-after-transfer" ) kvcache_seed_max_inflight_decode: int | None = 3 + kvcache_seed_max_decode_transfer_queue_reqs: int | None = None + kvcache_direct_max_decode_transfer_queue_reqs: int | None = None kvcache_prefill_priority_eviction: bool = False kvcache_prefill_direct_priority: int = -100 kvcache_prefill_normal_priority: int = 100 @@ -922,16 +924,29 @@ async def _fetch_decode_load_snapshot( def _decode_load_backpressure_reason( snapshot: DecodeLoadSnapshot | None, *, + config: ReplayConfig, routing_mode: Literal["direct", "seed"], ) -> str | None: if snapshot is None: return None if routing_mode == "direct": + if ( + config.kvcache_direct_max_decode_transfer_queue_reqs is not None + and snapshot.decode_transfer_queue_reqs + > config.kvcache_direct_max_decode_transfer_queue_reqs + ): + return "d-transfer-backpressure" if snapshot.decode_retracted_queue_reqs > 0 and snapshot.token_usage >= 0.99: return "d-retracted" if snapshot.token_usage >= 0.992: return "d-token-usage-critical" else: + if ( + config.kvcache_seed_max_decode_transfer_queue_reqs is not None + and snapshot.decode_transfer_queue_reqs + > config.kvcache_seed_max_decode_transfer_queue_reqs + ): + return "d-transfer-backpressure" if snapshot.decode_retracted_queue_reqs > 0: return "d-retracted" if snapshot.token_usage >= 0.985: @@ -949,6 +964,7 @@ def _is_decode_backpressure_reason(reason: str | None) -> bool: "d-retracted", "d-token-usage-critical", "d-prealloc-backpressure", + "d-transfer-backpressure", } @@ -1221,6 +1237,7 @@ async def _reserve_prefill_backup_capacity( async def _reserve_decode_session_capacity( *, client: httpx.AsyncClient, + config: ReplayConfig, request: TraceRequest, server_url: str, session: DirectSessionState, @@ -1233,6 +1250,7 @@ async def _reserve_decode_session_capacity( if admission_mode == "router": return await _reserve_decode_session_capacity_from_router_state( client=client, + config=config, request=request, server_url=server_url, session=session, @@ -1315,6 +1333,7 @@ async def _reserve_decode_session_capacity( residency.capacity_tokens[server_url] = load_snapshot.max_total_num_tokens backpressure_reason = _decode_load_backpressure_reason( load_snapshot, + config=config, routing_mode=routing_mode, ) if backpressure_reason is not None: @@ -1429,6 +1448,7 @@ async def _reserve_decode_session_capacity( async def _reserve_decode_session_capacity_from_router_state( *, client: httpx.AsyncClient, + config: ReplayConfig, request: TraceRequest, server_url: str, session: DirectSessionState, @@ -1836,6 +1856,7 @@ async def _execute_request( can_seed, reserved_tokens, _evicted, _p_backed, seed_reason = ( await _reserve_decode_session_capacity( client=client, + config=config, request=request, server_url=decode_url, session=decode_session, @@ -1911,6 +1932,7 @@ async def _execute_request( ) = ( await _reserve_decode_session_capacity( client=client, + config=config, request=request, server_url=decode_url, session=decode_session, @@ -1996,6 +2018,7 @@ async def _execute_request( ) = ( await _reserve_decode_session_capacity( client=client, + config=config, request=request, server_url=decode_url, session=decode_session, @@ -2084,6 +2107,7 @@ async def _execute_request( ) = ( await _reserve_decode_session_capacity( client=client, + config=config, request=request, server_url=decode_url, session=decode_session,