From c9d350b37217294000af3ee41dbcdfc9481bc08a Mon Sep 17 00:00:00 2001 From: kzlin Date: Tue, 28 Apr 2026 21:10:41 +0800 Subject: [PATCH] docs: KVC v1-v4 debug journey + raise session soft_cap to 16 Document the iterative debugging from v1 (broken KVC) through v4 (routing fixed + session cap raised), with code-level analysis of the two main bugs encountered: 1. v2 root cause (mis-diagnosed previously as `allow_local_prefill`): `--policy default` for KVC mechanism caused replay's round-robin policy and the PD router's round-robin to diverge, sending requests with `session_params` to a D worker that did not have the session open. Resulted in 56-61% truncation with finish_reason "session id X does not exist". Fix: use `--policy kv-aware` (sweep_tp1_v3_kvaware.sh) so replay emits `x-smg-target-worker` and PD router uses consistent_hashing. 2. v3 new bottleneck: `pd-router-fallback-large-append-session-cap` dominated 52-65% of requests. Root cause was hardcoded `min(4, ...)` in `_decode_session_soft_cap`. With 7 D workers x 4 sessions = 28 slots for 52 trace sessions, ~24 sessions starved permanently (bimodal direct-to-D rate of 0% or 99%). Fix: raise the cap to 16 (replay.py). Also includes the v3 finding that direct-to-d-session path P50=0.495s and TTFT P50=0.043s already beats the 8-way DP baseline (0.65s/0.093s) - the KVC core mechanism works when fallback paths are avoided. Files: - docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md: full journey + code location index - docs/SWEBENCH_EXPERIMENT_{PROGRESS,RESULTS}.md: prior session notes - scripts/sweep_tp1_v{2,3,4}*.sh: experiment driver scripts - src/agentic_pd_hybrid/replay.py: cap 4 -> 16, audit fields - src/agentic_pd_hybrid/pd_router.py: strip session_params from prefill - src/agentic_pd_hybrid/metrics.py: truncated_request_count Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md | 216 ++++++++++++++++++ docs/SWEBENCH_EXPERIMENT_PROGRESS.md | 95 ++++++++ docs/SWEBENCH_EXPERIMENT_RESULTS.md | 121 ++++++++++ scripts/convert_audit_to_trace.py | 110 +++++++++ scripts/run_all_experiments.sh | 73 ++++++ scripts/run_exp_a_pd_disagg.sh | 24 ++ scripts/run_exp_b1_dp_colo_rr.sh | 23 ++ scripts/run_exp_b2_dp_colo_cache_aware.sh | 23 ++ scripts/run_exp_b_pd_colo.sh | 24 ++ scripts/run_exp_c_kvcache_centric.sh | 28 +++ scripts/smoke_test.sh | 30 +++ scripts/sweep_kvc_qwen3_30b.sh | 60 +++++ scripts/sweep_tp1_configs.sh | 133 +++++++++++ scripts/sweep_tp1_v2_fixed.sh | 131 +++++++++++ scripts/sweep_tp1_v3_kvaware.sh | 108 +++++++++ scripts/sweep_tp1_v4_cap16.sh | 108 +++++++++ src/agentic_pd_hybrid/benchmark.py | 9 +- src/agentic_pd_hybrid/cli.py | 9 +- src/agentic_pd_hybrid/launcher.py | 70 ++++-- src/agentic_pd_hybrid/metrics.py | 20 ++ src/agentic_pd_hybrid/pd_router.py | 193 ++++++++++++++-- src/agentic_pd_hybrid/replay.py | 118 +++++++--- src/agentic_pd_hybrid/stack.py | 5 + .../mooncake_transfer_engine.py | 3 +- 24 files changed, 1672 insertions(+), 62 deletions(-) create mode 100644 docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md create mode 100644 docs/SWEBENCH_EXPERIMENT_PROGRESS.md create mode 100644 docs/SWEBENCH_EXPERIMENT_RESULTS.md create mode 100644 scripts/convert_audit_to_trace.py create mode 100755 scripts/run_all_experiments.sh create mode 100755 scripts/run_exp_a_pd_disagg.sh create mode 100755 scripts/run_exp_b1_dp_colo_rr.sh create mode 100755 scripts/run_exp_b2_dp_colo_cache_aware.sh create mode 100755 scripts/run_exp_b_pd_colo.sh create mode 100755 scripts/run_exp_c_kvcache_centric.sh create mode 100755 scripts/smoke_test.sh create mode 100755 scripts/sweep_kvc_qwen3_30b.sh create mode 100755 scripts/sweep_tp1_configs.sh create mode 100755 scripts/sweep_tp1_v2_fixed.sh create mode 100755 scripts/sweep_tp1_v3_kvaware.sh create mode 100755 scripts/sweep_tp1_v4_cap16.sh diff --git a/docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md b/docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md new file mode 100644 index 0000000..ff19e0c --- /dev/null +++ b/docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md @@ -0,0 +1,216 @@ +# KVC 实验踩坑记录与代码 Bug 分析(v1 → v4) + +记录从 v1 到 v4 KVC 实验的踩坑过程、错误诊断、以及最终定位的代码 bug。 +模型: Qwen3-30B-A3B (TP1),硬件: 单节点 8×H100 80GB。 +Trace: `qwen35-swebench-50sess.jsonl`(4449 请求,52 sessions)。 + +## TL;DR + +| 版本 | 关键变化 | 截断率 | direct-to-D 占比 | P50 | 主要瓶颈 | +|------|----------|:---:|:---:|:---:|----------| +| v1 (smoke / 早期) | mechanism 跑通 | - | - | - | - | +| v2 | KVC + `--policy default` | **56.8% / 61.4%** | <0.1% | 0.08s* | Routing 错位(默认策略) | +| v3 | KVC + `--policy kv-aware` | **0.9%** | 30-42% | 1.5-1.8s | session-cap fallback (52-65%) | +| v4 | v3 + soft_cap 4→16 | (待数据) | (待数据) | (待数据) | (待数据) | + +`*` v2 的 P50 是假数字——超过半数请求只生成 1 个 token 就被 abort。 + +## v2 踩坑:Default policy 与 KVC 机制根本不兼容 + +### 表象 + +`scripts/sweep_tp1_v2_fixed.sh` 跑出来: +- Exp1(8-way DP,baseline):4449/4449 成功,P50=0.65s,error=0 +- Exp2(1P7D KVC):**2524 truncated (56.8%)**,18 errors,P50=0.08s* (假) +- Exp3(2P6D KVC):**2733 truncated (61.4%)**,17 errors,P50=0.08s* (假) + +每个截断请求 `actual_output_tokens=1`,`finish_reason="abort: session id X does not exist"`。 + +### 错误的早期诊断 + +之前 `RESULTS_SUMMARY.md` 把锅扣在 SGLang 的 `--disaggregation-decode-allow-local-prefill` flag 上,认为是 D worker 在有 `bootstrap_room` 时仍然做了 local prefill。这个诊断**完全错误**——查 `scheduler.py:1975-1980` 的 `_should_allow_local_prefill_on_decode`: + +```python +def _should_allow_local_prefill_on_decode(self, req: Req) -> bool: + return ( + self.disaggregation_mode == DisaggregationMode.DECODE + and self.server_args.disaggregation_decode_allow_local_prefill + and req.bootstrap_room is None # ← 有 bootstrap_room 不会走 local prefill + ) +``` + +KVC reseed 路径的请求都带 `bootstrap_room`,根本不会触发 local prefill。 + +### 实际根因:Replay 与 PD Router 的 round-robin 错位 + +实验脚本里 KVC 用 `--policy default`,而 baseline 用 `--policy kv-aware`。 +看 `benchmark.py:287-300` 这两者的差别巨大: + +```python +def _decode_policy_for(policy_name: str) -> str: + if policy_name == "sticky": return "manual" + if policy_name == "kv-aware": return "consistent_hashing" + return "round_robin" # default + +def _header_mode_for(policy_name: str) -> str: + if policy_name == "sticky": return "routing-key" + if policy_name == "kv-aware": return "target-worker" + return "none" # default +``` + +`default` policy + KVC 机制下: +1. Replay policy(`policies.py:DefaultPolicy`)round-robin 选一个 D,比如 D-3 +2. Replay 在 D-3 上 `open_session(session_id=X)`(`replay.py:1722-1731`) +3. Replay 通过 PD Router 发请求(带 `session_params`),但 `header_mode=none`,**不发任何 routing header** +4. PD Router (`pd_router.py:_select_decode_index`) 看到 `decode_policy=round_robin`,用**自己独立的计数器**round-robin,发到了 D-5 +5. D-5 的 scheduler 看到 `session_params` 里有 session_id,但自己的 `session_controller` 里没这个 session(session 在 D-3 上)→ abort with `"Invalid request: session id X does not exist"` (`scheduler.py:1824-1836`) + +两个独立的 round-robin 计数器只要一次错位(任何并发或 direct-to-D 绕过 router 的请求都会引起)就永远对不上。 + +### 为什么 turn 0 不出问题? + +Turn 0 走 `_invoke_plain_router`(`replay.py:1894`),不带 `session_params`,作为普通 PD disagg 请求处理,发到任何 D 都行。Turn 1+ 才开始走带 session_params 的 KVC 路径,撞上路由错位。 + +### 数据特征验证(per-session pattern) + +``` +session 11360 (58 turns): pattern = .TTTTT.TTTTTTT.TTTTTT... ← turn 0 OK,1+ 全 T +session 18720 (87 turns): pattern = .TTTTTTTTTTTTTTTTTT... +``` + +每个 D worker 收到了全部 52 个 session 的请求(理想情况下应该是 ~7-8 个/D,因为 round-robin 把 session 完全打散)。 + +### 修复 + +唯一正确的修复是把 KVC 的 policy 从 `default` 改成 `kv-aware`: + +```diff +- --policy default ++ --policy kv-aware +``` + +`KvAwarePolicy` (`policies.py:146-187`) 做两件事: +1. 用 `_overlap_blocks` + `sticky_bonus` 给每个 D 打分,session 自然粘在同一个 D(**session 亲和性**) +2. `header_mode=target-worker`,发 `x-smg-target-worker` header +3. PD Router 用 `consistent_hashing` 模式,看到 header 就直接用,不再 round-robin + +## v3 改 kv-aware policy 后:路由对了,但新瓶颈出现 + +`scripts/sweep_tp1_v3_kvaware.sh` 把所有 KVC 实验改成 `--policy kv-aware`,结果: + +| 指标 | v2 1P7D (default) | **v3 1P7D (kv-aware)** | v3 2P6D | 8-way DP baseline | +|------|:---:|:---:|:---:|:---:| +| 截断 | 56.8% | **0.9%** | 0.9% | 1.5% | +| Errors | 18 | 363 (8.2%) | 9 | 0 | +| Mean | 4.74s | 4.88s | 3.58s | 1.43s | +| P50 | 0.08s* (假) | 1.75s | 1.52s | 0.65s | +| P90 | 12.14s | 12.67s | 9.23s | 3.61s | +| TTFT P50 | - | 0.36s | 0.33s | 0.09s | + +✅ **截断从 56.8% 降到 0.9%,路由问题彻底解决**。 +❌ 但 P50 仍然是 baseline 的 2-3 倍。 + +### Direct-to-D 路径表现优秀(KVC 该有的样子) + +按 execution_mode 拆开看: + +| 路径 | Exp1 1P7D 占比 | Exp1 1P7D P50 | Exp1 1P7D TTFT P50 | +|------|:---:|:---:|:---:| +| `kvcache-direct-to-d-session` ✨ | 42.0% | **0.495s** | **0.043s** | +| `pd-router-fallback-large-append-session-cap` 🔥 | **52.6%** | 5.6s | 3.7s | + +Direct-to-D 路径下: +- P50 = 0.495s(**比 baseline 0.65s 快 25%**) +- TTFT P50 = 0.043s(**比 baseline 0.093s 快 2 倍**) +- KV transfer = 0(无 P 介入,纯 D 上 append-prefill) + +这才是 KVC 真正的价值。但只有 30-42% 请求走到这条路。 + +### 新瓶颈:session-cap fallback 占了 52-65% + +`pd-router-fallback-large-append-session-cap` 占 1P7D 的 52.6%、2P6D 的 65.4%。这条路径意味着 router 想开新 session 在 D 上,但 admission 拒绝了("d-session-cap"),只好回退到 plain router(P 全量 prefill + 传给 D,无 session 复用)。 + +### Bimodal session 分布(starvation) + +| Session | Total turns | Direct-to-D | Session-cap fallback | +|---------|:---:|:---:|:---:| +| 22080 | 129 | **98%** | 0% | +| 3840 | 118 | **97%** | 0% | +| 70560 | 150 | **0%** | **99%** | +| 39360 | 148 | **0%** | **99%** | +| 61600 | 117 | **0%** | **99%** | + +要么完全幸运,要么完全饿死——典型的双峰分布。 + +### 根因:硬编码 cap=4 + +看 `replay.py:_decode_session_soft_cap` 原始代码: + +```python +def _decode_session_soft_cap(...) -> int: + target_tokens = max(1, _estimate_session_resident_tokens(request)) + usable_capacity_tokens = _usable_capacity_tokens(residency, server_url) + ... + if usable_capacity_tokens <= 0: + return 4 + return max(1, min(4, usable_capacity_tokens // target_tokens)) + # ^^^ 硬编码上限 4 +``` + +7 个 D × 每个 D 最多 4 个 session = **28 个 session slot 总容量**。Trace 有 52 个 session → 24 个 session 永远抢不到 slot。 + +启动期 race condition 决定了哪些 session 是"幸运儿"——前 28 个挤进来的 session 的所有后续 turn 都走 direct-to-D(快);剩下 24 个 session 永远走 session-cap fallback(慢)。 + +## v4 改进:把硬 cap 从 4 提到 16 + +`replay.py:_decode_session_soft_cap` 一行修改: + +```diff +- if usable_capacity_tokens <= 0: +- return 4 +- return max(1, min(4, usable_capacity_tokens // target_tokens)) ++ if usable_capacity_tokens <= 0: ++ return 16 ++ return max(1, min(16, usable_capacity_tokens // target_tokens)) +``` + +7 D × 16 = 112 个 slot,远超 52 个 session 需求。预期 session-cap fallback 占比降到 <10%,整体 P50 向 direct-to-D 的 0.46s 收敛。 + +实际数据见 `outputs/qwen3-30b-tp1-v4-cap16/`。 + +## 后续可以考虑的更深方案:让 D 自己决定 admission + +v4 的硬 cap 抬高只是把数字调大,实际容量管理还是 replay 自己估算。代码里 `replay.py:_decode_session_soft_cap` 用 `target_tokens = input + output`(基于当前请求的 size)估算每个 session footprint,但: +- agentic context 越攒越长,target_tokens 动态增长,cap 会随之缩小 +- 多个并发请求查询时 replay 视图会过期 +- replay 自己写了 LRU eviction 逻辑(`_reserve_decode_session_capacity_from_router_state`),与 SGLang 内部的 `maybe_trim_decode_session_cache` 重复且永远滞后 + +SGLang 已经提供 `/session_cache/admit_direct_append` 端点(`scheduler.py:3497`),D worker 自己回答能不能 admit,并且查询时**主动调用 LRU eviction**。但这个端点只在 direct-to-D 路径用,seed/reseed 路径用的是 replay 自己估算的 soft_cap。 + +理想方案是扩展端点支持 `seed_new` / `reseed` 模式,replay 完全交给 D 决策——但这需要 SGLang 侧 patch + replay 重构(~200 行),工程量比 v4 大得多。 + +## 关键文件与代码位置索引 + +| 现象 | 代码位置 | +|------|----------| +| Replay policy round-robin | `policies.py:63-67` `RoutingState.next_decode_worker_id` | +| KV-aware policy(session 亲和) | `policies.py:146-187` `KvAwarePolicy.select` | +| PD router decode 选择 | `pd_router.py:51-74` `_select_decode_index` | +| Header 构建 | `replay.py:2407-2424` `_build_headers` | +| Policy → router config 映射 | `benchmark.py:287-300` `_decode_policy_for/_header_mode_for` | +| Session admission 软 cap | `replay.py:889-905` `_decode_session_soft_cap` | +| 已有的 D 侧 admission 端点 | `scheduler.py:3497-3580` `admit_direct_append` | +| Session 在 D 上找不到的报错 | `scheduler.py:1824-1836` | +| `_should_allow_local_prefill_on_decode` | `scheduler.py:1975-1980` | +| Reseed 流程入口 | `replay.py:1665-1809` `_invoke_kvcache_seeded_router` | +| Direct-to-D 流程 | `replay.py:2351-2398` `_invoke_decode_session_direct` | + +## 经验教训 + +1. **policy 和 mechanism 是两个正交维度**——`--policy default` 不是"无脑默认值",它真的是 round-robin 无 session 亲和性。KVC 机制必须配 session 亲和的 policy。 + +2. **不要无脑相信前一个 agent 的 RESULTS_SUMMARY**——v2 的诊断("local prefill bug")和实际 finish_reason("session id does not exist")完全对不上。任何错误诊断必须用 finish_reason、execution_mode 这些原始字段交叉验证。 + +3. **bimodal 分布是 starvation 的强信号**——v3 数据里某些 session 100% 走快路径、某些 100% 走慢路径,几乎肯定是某种"先到先得"的资源竞争。看到这种模式立刻去找硬编码 cap 或全局共享资源。 + +4. **测量要看分组而非整体均值**——v3 整体 P50=1.5s 看似比 baseline 慢,但拆开看 direct-to-D 子集 P50=0.495s 已经反超 baseline。整体均值被 fallback 路径拖累,但 KVC 的核心价值是真实存在的。 diff --git a/docs/SWEBENCH_EXPERIMENT_PROGRESS.md b/docs/SWEBENCH_EXPERIMENT_PROGRESS.md new file mode 100644 index 0000000..92c840c --- /dev/null +++ b/docs/SWEBENCH_EXPERIMENT_PROGRESS.md @@ -0,0 +1,95 @@ +# SWE-Bench PD Hybrid Experiment Progress + +## 实验目标 + +在单节点 8xH100 上复现 agentic-pd-hybrid 三种 serving mechanism,对比 Qwen3.5-35B-A3B 在 SWE-Bench 500 instance agentic trajectory 上的性能。 + +## 硬件环境 + +- 8x H100 80GB (NVLink 互联, 2 NUMA nodes: GPU 0-3 / GPU 4-7) +- 无 RDMA/IB 设备 +- Transfer backend: **mooncake TCP** (nixl UCX 因 pip 包缺少 CUDA 支持导致 segfault,已放弃) + +## 实验矩阵 + +| 实验 | Mechanism | Workers | GPU 分配 | Router | Policy | +|------|-----------|---------|----------|--------|--------| +| A | pd-disaggregation | 1P + 1D (TP4 each) | P: 0-3, D: 4-7 | Yes | default | +| B | pd-colo | 2 direct (TP4 each) | D0: 0-3, D1: 4-7 | No | default | +| C | kvcache-centric | 1P + 1D (TP4 each) | P: 0-3, D: 4-7 | Yes | default | + +## 测试负载 + +- 源数据: `simm-swe-bench/outputs/20260416-205833-hicache-qwen35-verified-0-500/audit.jsonl` +- 39,417 lines (turns), 497 unique instances (sessions) +- 每个 instance 8-150 turns (均值 79.3) +- 转换为 agentic-pd-hybrid trace 格式: `outputs/qwen35-swebench-500.jsonl` + +## 关键发现 + +### Transfer Backend 选择 + +- **nixl (UCX)**: pip 安装的 nixl_cu12 包自带的 UCX 库没有 CUDA 支持,导致 GPU memory registration 时 segfault。系统 UCX (/opt/hpcx/ucx) 有 CUDA 支持但因 RPATH 无法被 NIXL 使用。 +- **mooncake (TCP)**: 可用。需要两处修改: + 1. `third_party/sglang/.../mooncake_transfer_engine.py`: 从环境变量 `MOONCAKE_PROTOCOL` 读取协议,而非硬编码 `"rdma"` + 2. `src/agentic_pd_hybrid/stack.py`: 当 `transfer_backend == "mooncake"` 且非 `force_rdma` 时,自动设置 `MOONCAKE_PROTOCOL=tcp` + +### 代码修改记录 + +1. **`third_party/sglang/python/sglang/srt/distributed/device_communicators/mooncake_transfer_engine.py`** + - 将 `"rdma"` 硬编码改为 `os.environ.get("MOONCAKE_PROTOCOL", "rdma")` + +2. **`src/agentic_pd_hybrid/stack.py`** + - 在 `_build_process_env()` 中添加: mooncake 非 force_rdma 时默认设置 `MOONCAKE_PROTOCOL=tcp` + +3. **`scripts/convert_audit_to_trace.py`** (新建) + - 将 sibench audit.jsonl 转换为 agentic-pd-hybrid trace 格式 + +## 实验进度 + +- [x] Step 0: 环境准备 (uv sync, nixl/mooncake 安装) +- [x] Step 1: Trace 格式转换 (39,417 lines 验证通过) +- [x] Step 2: Smoke test (pd-disaggregation, mooncake TCP, 100 requests) — **通过** + - 100/100 requests, 0 errors + - Mean latency: 1.53s, P50: 0.77s, P90: 2.82s + - TTFT: mean 0.49s, P50 0.29s; TPOT: mean 4.7ms + - 91/100 cache hits +- [x] Step 3a: 实验 A 全量尝试 (39K reqs, 497 sessions) — **中止** + - Run dir: `outputs/swebench-exps/pd-disaggregation-default-20260426T171113Z` (无metrics,被kill) + - 前 90% 完成 ~80min (~8-10 req/s), 但尾部 D 侧 KV cache 98% 饱和 + - 497 并发 session 争抢 D 侧 token 空间, mamba 80-93 sessions 无法 drain + - **教训**: 1P+1D (TP4) 无法支撑 497 并发 session, 需减少 session 数量或降低 concurrency +- [x] Step 3b: 实验 A — pd-disaggregation (52 sessions, 4449 reqs, concurrency=32) — **完成** + - Run dir: `outputs/swebench-exps/pd-disaggregation-default-20260426T202540Z` + - Trace: `outputs/qwen35-swebench-50sess.jsonl` (10% sample, 52 sessions) + - **结果**: 4449/4449 成功, 0 errors + - Latency: mean=1.66s, P50=0.97s, P90=3.64s, P99=7.68s + - TTFT: mean=0.45s, P50=0.34s, P90=0.88s + - TPOT: mean=5.2ms, P50=5.2ms + - Cache hit: 4199/4449 (94.4%) +- [x] Step 4: 实验 B — pd-colo — **失败: SGLang bug** + - Run dir: `outputs/swebench-exps/pd-colo-default-20260426T210129Z` + - **Bug**: `--disaggregation-mode null` (colocation) 下 Qwen3.5-35B-A3B 模型触发 token_to_kv_pool_allocator 内存泄漏 + - 错误: `ValueError: token_to_kv_pool_allocator memory leak detected!` + - 两个 direct worker 在处理 ~5 个请求后均 crash (Scheduler exception) + - **结论**: 当前 vendored SGLang v0.5.10 不支持 Qwen3.5-35B-A3B 的 colocation 模式 +- [x] Step 5: 实验 C — kvcache-centric — **完成 (高错误率)** + - Run dir: `outputs/swebench-exps/kvcache-centric-default-worker-admission-20260426T210800Z` + - 4390/4449 errors (98.7%) — admission control 过于保守 + - 59 成功请求: mean latency 1.24s (比 pd-disagg 快 25%), TTFT 0.18s (快 60%) + - 详细分析见 `docs/SWEBENCH_EXPERIMENT_RESULTS.md` +- [x] Step 6: 结果对比分析 — **完成** + - 完整报告: `docs/SWEBENCH_EXPERIMENT_RESULTS.md` + +## 启动脚本 + +- `scripts/run_exp_a_pd_disagg.sh` — 实验 A +- `scripts/run_exp_b_pd_colo.sh` — 实验 B +- `scripts/run_exp_c_kvcache_centric.sh` — 实验 C +- `scripts/convert_audit_to_trace.py` — Trace 转换 + +## 已知风险 + +1. Qwen3.5-35B-A3B TP4 可用 mem ~12GB/GPU (after model + CUDA graph),长 session (150 turns) 可能 OOM +2. mooncake TCP loopback 延迟远低于真实跨机,结果偏乐观 +3. 原始 trace 时间跨度 ~6000s,全量回放非常耗时 diff --git a/docs/SWEBENCH_EXPERIMENT_RESULTS.md b/docs/SWEBENCH_EXPERIMENT_RESULTS.md new file mode 100644 index 0000000..1a8733c --- /dev/null +++ b/docs/SWEBENCH_EXPERIMENT_RESULTS.md @@ -0,0 +1,121 @@ +# SWE-Bench PD Hybrid Experiment Results + +## 实验配置 + +- **模型**: Qwen3.5-35B-A3B (MoE, 35B total / 3B active), TP4 +- **硬件**: 8x H100 80GB, NVLink, 单节点 +- **Transfer backend**: mooncake TCP (loopback) +- **Trace**: 52 sessions, 4,449 requests (10% sample of SWE-Bench 500 instances) +- **时间压缩**: time-scale=10, concurrency-limit=32 + +## 结果汇总 + +### Experiment A: pd-disaggregation (baseline) + +| Metric | Value | +|--------|-------| +| Run dir | `pd-disaggregation-default-20260426T202540Z` | +| Requests | 4,449 / 4,449 (100%) | +| Errors | 0 | +| **Mean Latency** | **1.662s** | +| P50 Latency | 0.973s | +| P90 Latency | 3.644s | +| P99 Latency | 7.676s | +| Mean TTFT | 0.445s | +| P50 TTFT | 0.340s | +| P90 TTFT | 0.880s | +| Mean TPOT | 5.20ms | +| Cache Hit Rate | 94.4% (4199/4449) | +| Mean Cached Tokens | 27,794 | +| KV Transfer Blocks | 105,235 | + +### Experiment B: pd-colo (colocation) — FAILED + +| Metric | Value | +|--------|-------| +| Run dir | `pd-colo-default-20260426T210129Z` | +| Status | **CRASHED** | +| Error | `token_to_kv_pool_allocator memory leak detected!` | +| Root Cause | SGLang v0.5.10 `--disaggregation-mode null` 与 Qwen3.5-35B-A3B (Mamba/GDN hybrid) 不兼容 | +| Requests | ~10 / 4,449 (0.2%) | + +**结论**: 当前 vendored SGLang 不支持此模型的 colocation 模式。需要修复 token_to_kv_pool_allocator 中 Mamba 模型的内存管理。 + +### Experiment C: kvcache-centric (session-aware PD) + +| Metric | Value | +|--------|-------| +| Run dir | `kvcache-centric-default-worker-admission-20260426T210800Z` | +| Requests | 4,449 total | +| **Errors** | **4,390 (98.7%)** | +| Successful | 59 (1.3%) | +| Mean Latency (success) | 1.238s | +| P50 Latency (success) | 0.484s | +| P90 Latency (success) | 2.550s | +| Mean TTFT (success) | 0.179s | +| P50 TTFT (success) | 0.081s | +| Mean TPOT (success) | 4.70ms | +| Direct-to-D Sessions | 56 | +| KV Transfer (actual) | 196 blocks (vs 105,235 planned) | + +**Execution Mode 分布**: +- `kvcache-centric` (failed): 4,390 +- `kvcache-direct-to-d-session` (success): 56 +- `pd-router-*` variants: 3 + +## 关键分析 + +### 1. pd-disaggregation (A) — 稳定可靠 + +- 100% 成功率,0 错误 +- Mean latency 1.66s 合理 (包含 P→D KV transfer 开销) +- 94.4% cache hit 说明 prefix cache 在 P 侧工作良好 +- KV transfer 105K blocks = 主要开销来源 +- **适合生产使用** + +### 2. pd-colo (B) — 不可用 + +- Qwen3.5-35B-A3B 的 Mamba/GDN hybrid 架构在 `disaggregation-mode null` 下触发内存泄漏 +- 这是 SGLang 的 bug,不是 agentic-pd-hybrid 的问题 +- **需要 SGLang 修复后重新测试** + +### 3. kvcache-centric (C) — Admission 过于保守 + +- 98.7% 错误率说明 admission control 拒绝了几乎所有请求 +- `kvcache-seed-min-turn-id=2` 过滤了 turn 1 的 seed(正确行为) +- 但绝大多数 turn 2+ 请求也走 `kvcache-centric` 模式后失败 +- 可能原因: + - Worker admission 查询发现 D 侧没有对应 session 的 KV cache(因为 turn 1 没有 seed) + - D 侧 transfer queue 积压导致 admission 拒绝 +- 成功的 56 个 `direct-to-d-session` 请求表现优异: TTFT 0.08s (P50), 比 pd-disagg 的 0.34s 快 4x +- **需要调优 admission 参数,或使用 `kvcache-seed-min-turn-id=1` 允许 turn 1 seed** + +### 4. kvcache-centric 成功请求 vs pd-disaggregation 对比 + +| Metric | pd-disagg (A) | kvcache-centric (C, success only) | Delta | +|--------|:---:|:---:|:---:| +| Mean Latency | 1.662s | 1.238s | **-25.5%** | +| P50 Latency | 0.973s | 0.484s | **-50.3%** | +| Mean TTFT | 0.445s | 0.179s | **-59.8%** | +| P50 TTFT | 0.340s | 0.081s | **-76.2%** | +| Mean TPOT | 5.20ms | 4.70ms | -9.6% | +| Actual KV Transfer | 105,235 blk | 196 blk | **-99.8%** | + +**当 kvcache-centric 成功时,性能提升显著:** +- TTFT 降低 60-76% (D 侧直接 append,无需 P→D transfer) +- 端到端 latency 降低 25-50% +- KV transfer 减少 99.8% + +## 后续建议 + +1. **修复 pd-colo**: 提交 SGLang issue 关于 Mamba/GDN 模型在 disaggregation-mode null 下的内存泄漏 +2. **调优 kvcache-centric admission**: + - 尝试 `--kvcache-seed-min-turn-id 1` 允许 turn 1 seed + - 放宽 `--kvcache-seed-max-decode-transfer-queue-reqs` 阈值 + - 使用 `--kvcache-admission-mode router` (shadow state, 不在 critical path) +3. **增加 D 侧内存**: 调整 `--mem-fraction-static` 给 KV cache 更多空间 +4. **多 P/D 配置**: 测试 2P2D (TP2) 配置以增加并行度 + +## 实验日期 + +2026-04-27 diff --git a/scripts/convert_audit_to_trace.py b/scripts/convert_audit_to_trace.py new file mode 100644 index 0000000..6f2da6a --- /dev/null +++ b/scripts/convert_audit_to_trace.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +"""Convert sibench audit.jsonl to agentic-pd-hybrid trace format. + +Source format (sibench audit.jsonl): + {"instance_id": "...", "ts": float, "messages": [...], + "audit": {"prompt_tokens": int, "completion_tokens": int, ...}} + +Target format (agentic-pd-hybrid trace JSONL): + {"chat_id": int, "parent_chat_id": int, "timestamp": float, + "turn": int, "input_length": int, "output_length": int, + "type": str, "hash_ids": [int, ...]} +""" + +import json +import sys +from collections import defaultdict +from pathlib import Path + +BLOCK_TOKEN_BUDGET = 24 # tokens per block, matching trace.py default + + +def convert(src: Path, dst: Path) -> None: + # Group lines by instance_id, preserving order within each instance + instances: dict[str, list[dict]] = defaultdict(list) + with src.open() as f: + for line in f: + line = line.strip() + if not line: + continue + rec = json.loads(line) + instances[rec["instance_id"]].append(rec) + + # Sort each instance's turns by timestamp + for iid in instances: + instances[iid].sort(key=lambda r: r["ts"]) + + # Assign stable chat_id bases: each instance gets a block of IDs + # Max turns across all instances determines the spacing + max_turns = max(len(turns) for turns in instances.values()) + spacing = max_turns + 10 # extra headroom + + total_written = 0 + with dst.open("w") as out: + for inst_idx, (iid, turns) in enumerate(instances.items()): + base_chat_id = (inst_idx + 1) * spacing # start from spacing to avoid 0 + # Track cumulative hash_ids for prefix cache simulation + cumulative_hash_ids: list[int] = [] + global_block_counter = inst_idx * 100_000 # unique block namespace per instance + + for turn_idx, rec in enumerate(turns): + audit = rec.get("audit", {}) + input_length = audit.get("prompt_tokens", 0) + output_length = audit.get("completion_tokens", 0) + + if input_length <= 0: + # Fallback: estimate from message content + total_chars = sum(len(m.get("content", "")) for m in rec.get("messages", [])) + input_length = max(1, total_chars // 4) + if output_length <= 0: + output_length = 128 # reasonable default + + chat_id = base_chat_id + turn_idx + if turn_idx == 0: + parent_chat_id = -1 + else: + parent_chat_id = base_chat_id + turn_idx - 1 + + # Build hash_ids: for turn 0, generate blocks for full input + # For turn N>0, keep previous blocks and add new ones for the delta + if turn_idx == 0: + num_blocks = input_length // BLOCK_TOKEN_BUDGET + cumulative_hash_ids = list( + range(global_block_counter, global_block_counter + num_blocks) + ) + global_block_counter += num_blocks + else: + # The new input is the full prompt (cumulative), so the delta + # is the new tokens beyond what was in the previous turn's prompt + prev_input = audit.get("prompt_tokens", 0) + prev_rec_audit = turns[turn_idx - 1].get("audit", {}) + prev_input_length = prev_rec_audit.get("prompt_tokens", 0) + delta = max(0, prev_input - prev_input_length) if prev_input_length > 0 else 0 + new_blocks = delta // BLOCK_TOKEN_BUDGET + new_ids = list( + range(global_block_counter, global_block_counter + new_blocks) + ) + global_block_counter += new_blocks + cumulative_hash_ids = cumulative_hash_ids + new_ids + + trace_line = { + "chat_id": chat_id, + "parent_chat_id": parent_chat_id, + "timestamp": rec["ts"], + "turn": turn_idx, + "input_length": input_length, + "output_length": output_length, + "type": "chat", + "hash_ids": cumulative_hash_ids, + } + out.write(json.dumps(trace_line, separators=(",", ":")) + "\n") + total_written += 1 + + print(f"Converted {total_written} lines from {len(instances)} instances -> {dst}") + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + convert(Path(sys.argv[1]), Path(sys.argv[2])) diff --git a/scripts/run_all_experiments.sh b/scripts/run_all_experiments.sh new file mode 100755 index 0000000..cd9fcd7 --- /dev/null +++ b/scripts/run_all_experiments.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# Run all 3 PD hybrid experiments sequentially +# Uses 52 sessions / 4,449 requests (10% sample of 497 sessions) +# Each experiment takes ~30-40 min +set -euo pipefail +cd "$(dirname "$0")/.." + +TRACE="outputs/qwen35-swebench-50sess.jsonl" +MODEL="/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B" +OUTPUT="outputs/swebench-exps" + +echo "=== Experiment A: pd-disaggregation ===" +uv run agentic-pd-hybrid benchmark-live \ + --trace "$TRACE" \ + --output-root "$OUTPUT" \ + --mechanism pd-disaggregation \ + --policy default \ + --model-path "$MODEL" \ + --prefill-workers 1 --decode-workers 1 \ + --prefill-tp-size 4 --decode-tp-size 4 \ + --prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 + +echo "=== Experiment B: pd-colo ===" +uv run agentic-pd-hybrid benchmark-live \ + --trace "$TRACE" \ + --output-root "$OUTPUT" \ + --mechanism pd-colo \ + --policy default \ + --model-path "$MODEL" \ + --prefill-workers 0 --decode-workers 0 \ + --direct-workers 2 --direct-tp-size 4 \ + --direct-gpu-ids 0,1,2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 + +echo "=== Experiment C: kvcache-centric ===" +uv run agentic-pd-hybrid benchmark-live \ + --trace "$TRACE" \ + --output-root "$OUTPUT" \ + --mechanism kvcache-centric \ + --policy default \ + --model-path "$MODEL" \ + --prefill-workers 1 --decode-workers 1 \ + --prefill-tp-size 4 --decode-tp-size 4 \ + --prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 2 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +echo "=== All experiments complete ===" diff --git a/scripts/run_exp_a_pd_disagg.sh b/scripts/run_exp_a_pd_disagg.sh new file mode 100755 index 0000000..963b587 --- /dev/null +++ b/scripts/run_exp_a_pd_disagg.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Experiment A: pd-disaggregation baseline +# 1P(GPU 0-3) + 1D(GPU 4-7), TP4, mooncake TCP +# Full 39K trace from SWE-Bench 500 instances +set -euo pipefail +cd "$(dirname "$0")/.." + +uv run agentic-pd-hybrid benchmark-live \ + --trace outputs/qwen35-swebench-500.jsonl \ + --output-root outputs/swebench-exps \ + --mechanism pd-disaggregation \ + --policy default \ + --model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \ + --prefill-workers 1 --decode-workers 1 \ + --prefill-tp-size 4 --decode-tp-size 4 \ + --prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 64 \ + --timeout-s 900 \ + --request-timeout-s 300 diff --git a/scripts/run_exp_b1_dp_colo_rr.sh b/scripts/run_exp_b1_dp_colo_rr.sh new file mode 100755 index 0000000..b058135 --- /dev/null +++ b/scripts/run_exp_b1_dp_colo_rr.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Experiment B1: Naive DP colocation — round-robin policy +# 2 direct workers (GPU 0-3, 4-7), TP4, DP router with round-robin +# No disaggregation — each worker does prefill+decode locally +set -euo pipefail +cd "$(dirname "$0")/.." + +uv run agentic-pd-hybrid benchmark-live \ + --trace outputs/qwen35-swebench-50sess.jsonl \ + --output-root outputs/swebench-exps \ + --mechanism pd-colo \ + --policy default \ + --model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \ + --prefill-workers 0 --decode-workers 0 \ + --direct-workers 2 --direct-tp-size 4 \ + --direct-gpu-ids 0,1,2,3,4,5,6,7 \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 diff --git a/scripts/run_exp_b2_dp_colo_cache_aware.sh b/scripts/run_exp_b2_dp_colo_cache_aware.sh new file mode 100755 index 0000000..090e462 --- /dev/null +++ b/scripts/run_exp_b2_dp_colo_cache_aware.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Experiment B2: Naive DP colocation — cache-aware (kv-aware) policy +# 2 direct workers (GPU 0-3, 4-7), TP4, DP router with consistent-hashing +# Replay kv-aware policy picks the worker with most prefix overlap +set -euo pipefail +cd "$(dirname "$0")/.." + +uv run agentic-pd-hybrid benchmark-live \ + --trace outputs/qwen35-swebench-50sess.jsonl \ + --output-root outputs/swebench-exps \ + --mechanism pd-colo \ + --policy kv-aware \ + --model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \ + --prefill-workers 0 --decode-workers 0 \ + --direct-workers 2 --direct-tp-size 4 \ + --direct-gpu-ids 0,1,2,3,4,5,6,7 \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 diff --git a/scripts/run_exp_b_pd_colo.sh b/scripts/run_exp_b_pd_colo.sh new file mode 100755 index 0000000..9a61165 --- /dev/null +++ b/scripts/run_exp_b_pd_colo.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Experiment B: pd-colo (direct/colocation) +# 2 direct workers (GPU 0-3, 4-7), TP4, no router +# Full 39K trace from SWE-Bench 500 instances +set -euo pipefail +cd "$(dirname "$0")/.." + +uv run agentic-pd-hybrid benchmark-live \ + --trace outputs/qwen35-swebench-500.jsonl \ + --output-root outputs/swebench-exps \ + --mechanism pd-colo \ + --policy default \ + --model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \ + --prefill-workers 0 --decode-workers 0 \ + --direct-workers 2 --direct-tp-size 4 \ + --direct-gpu-ids 0,1,2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 64 \ + --timeout-s 900 \ + --request-timeout-s 300 diff --git a/scripts/run_exp_c_kvcache_centric.sh b/scripts/run_exp_c_kvcache_centric.sh new file mode 100755 index 0000000..03724cf --- /dev/null +++ b/scripts/run_exp_c_kvcache_centric.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Experiment C: kvcache-centric (session-aware PD) +# 1P(GPU 0-3) + 1D(GPU 4-7), TP4, mooncake TCP +# Full 39K trace from SWE-Bench 500 instances +set -euo pipefail +cd "$(dirname "$0")/.." + +uv run agentic-pd-hybrid benchmark-live \ + --trace outputs/qwen35-swebench-500.jsonl \ + --output-root outputs/swebench-exps \ + --mechanism kvcache-centric \ + --policy default \ + --model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \ + --prefill-workers 1 --decode-workers 1 \ + --prefill-tp-size 4 --decode-tp-size 4 \ + --prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 64 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 2 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction diff --git a/scripts/smoke_test.sh b/scripts/smoke_test.sh new file mode 100755 index 0000000..c0b3c08 --- /dev/null +++ b/scripts/smoke_test.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Smoke test: pd-disaggregation with mooncake TCP, 100 requests +set -euo pipefail +cd "$(dirname "$0")/.." + +# Sample a small trace for smoke testing +uv run agentic-pd-hybrid sample-sessions \ + --trace outputs/qwen35-swebench-500.jsonl \ + --output outputs/qwen35-smoke-3sess.jsonl \ + --session-sample-rate 0.02 \ + --min-turns 5 \ + --target-duration-s 300 \ + --max-requests 100 + +# Run smoke test +uv run agentic-pd-hybrid benchmark-live \ + --trace outputs/qwen35-smoke-3sess.jsonl \ + --output-root outputs/smoke \ + --mechanism pd-disaggregation \ + --policy default \ + --model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \ + --prefill-workers 1 --decode-workers 1 \ + --prefill-tp-size 4 --decode-tp-size 4 \ + --prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 diff --git a/scripts/sweep_kvc_qwen3_30b.sh b/scripts/sweep_kvc_qwen3_30b.sh new file mode 100755 index 0000000..748accc --- /dev/null +++ b/scripts/sweep_kvc_qwen3_30b.sh @@ -0,0 +1,60 @@ +#!/bin/bash +# KVC admission control parameter sweep on Qwen3-30B +# 5 experiments, ~35 min each, ~3 hours total +set -euo pipefail +cd "$(dirname "$0")/.." + +MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507 +TRACE=outputs/qwen35-swebench-50sess.jsonl +OUTPUT=outputs/qwen3-30b-exps +VENV_PYTHON=.venv/bin/python + +run_kvc() { + local label=$1 + local inflight=$2 + local min_turn=$3 + + echo "=== [$label] inflight=$inflight min_turn=$min_turn === $(date)" + PYTHONPATH=src:third_party/sglang/python \ + $VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy default \ + --model-path $MODEL \ + --prefill-workers 1 --decode-workers 1 \ + --prefill-tp-size 4 --decode-tp-size 4 \ + --prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id $min_turn \ + --kvcache-seed-max-inflight-decode $inflight \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + echo "=== [$label] DONE === $(date)" + echo "" +} + +# C1: inflight=8, min-turn=2 +run_kvc "C1" 8 2 + +# C2: inflight=16, min-turn=2 +run_kvc "C2" 16 2 + +# C3: inflight=-1 (disabled), min-turn=2 +run_kvc "C3" -1 2 + +# C4: inflight=8, min-turn=1 +run_kvc "C4" 8 1 + +# C5: inflight=-1 (disabled), min-turn=1 +run_kvc "C5" -1 1 + +echo "=== ALL SWEEP EXPERIMENTS DONE === $(date)" diff --git a/scripts/sweep_tp1_configs.sh b/scripts/sweep_tp1_configs.sh new file mode 100755 index 0000000..d0eb605 --- /dev/null +++ b/scripts/sweep_tp1_configs.sh @@ -0,0 +1,133 @@ +#!/bin/bash +# TP1 configuration sweep: 8-way DP, 1P7D KVC, 2P6D KVC +# Qwen3-30B-A3B TP=1, single GPU per worker +# Most aggressive KVC admission: inflight=-1 (off), seed-min-turn=1 +set -euo pipefail +cd "$(dirname "$0")/.." + +MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507 +TRACE=outputs/qwen35-swebench-50sess.jsonl +OUTPUT=outputs/qwen3-30b-tp1-exps +VENV_PYTHON=.venv/bin/python +RESULTS_FILE=$OUTPUT/sweep_results.txt + +mkdir -p $OUTPUT + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE +} + +save_result() { + local label=$1 + local run_dir=$2 + log "=== $label COMPLETED ===" + if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then + log "Summary:" + cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE + echo "" >> $RESULTS_FILE + # Also copy summary to a named file for easy access + cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json" + log "Saved to $OUTPUT/${label}_summary.json" + else + log "WARNING: No summary file found in $run_dir" + fi +} + +log "Starting TP1 configuration sweep" +log "Model: $MODEL" +log "Trace: $TRACE (4449 requests, 52 sessions)" + +######################################## +# Experiment 1: 8-way DP cache-aware +######################################## +log "" +log "=== [EXP1] 8-way DP cache-aware (8 direct × TP1) ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism pd-colo \ + --policy kv-aware \ + --model-path $MODEL \ + --prefill-workers 0 --decode-workers 0 \ + --direct-workers 8 --direct-tp-size 1 \ + --direct-gpu-ids 0,1,2,3,4,5,6,7 \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 + +# Find latest run dir for this experiment +EXP1_DIR=$(ls -td $OUTPUT/pd-colo-kv-aware-*/ 2>/dev/null | head -1) +save_result "exp1_8way_dp_cache_aware" "$EXP1_DIR" + +######################################## +# Experiment 2: 1P + 7D KVC (most aggressive) +######################################## +log "" +log "=== [EXP2] 1P7D KVC (inflight=off, min-turn=1) ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy default \ + --model-path $MODEL \ + --prefill-workers 1 --decode-workers 7 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp2_1p7d_kvc_aggressive" "$EXP2_DIR" + +######################################## +# Experiment 3: 2P + 6D KVC (most aggressive) +######################################## +log "" +log "=== [EXP3] 2P6D KVC (inflight=off, min-turn=1) ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy default \ + --model-path $MODEL \ + --prefill-workers 2 --decode-workers 6 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP3_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp3_2p6d_kvc_aggressive" "$EXP3_DIR" + +######################################## +log "" +log "=== ALL TP1 SWEEP EXPERIMENTS DONE ===" diff --git a/scripts/sweep_tp1_v2_fixed.sh b/scripts/sweep_tp1_v2_fixed.sh new file mode 100755 index 0000000..38c730e --- /dev/null +++ b/scripts/sweep_tp1_v2_fixed.sh @@ -0,0 +1,131 @@ +#!/bin/bash +# TP1 configuration sweep v2 — after session_params fix + audit fields +# Qwen3-30B-A3B TP=1, single GPU per worker +set -euo pipefail +cd "$(dirname "$0")/.." + +MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507 +TRACE=outputs/qwen35-swebench-50sess.jsonl +OUTPUT=outputs/qwen3-30b-tp1-v2-fixed +VENV_PYTHON=.venv/bin/python +RESULTS_FILE=$OUTPUT/sweep_results.txt + +mkdir -p $OUTPUT + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE +} + +save_result() { + local label=$1 + local run_dir=$2 + log "=== $label COMPLETED ===" + if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then + log "Summary:" + cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE + echo "" >> $RESULTS_FILE + cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json" + cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl" + log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl" + else + log "WARNING: No summary file found in $run_dir" + fi +} + +log "Starting TP1 v2 sweep (session_params fix + audit fields)" +log "Model: $MODEL" +log "Trace: $TRACE (4449 requests, 52 sessions)" + +######################################## +# Experiment 1: 8-way DP cache-aware +######################################## +log "" +log "=== [EXP1] 8-way DP cache-aware (8 direct × TP1) ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism pd-colo \ + --policy kv-aware \ + --model-path $MODEL \ + --prefill-workers 0 --decode-workers 0 \ + --direct-workers 8 --direct-tp-size 1 \ + --direct-gpu-ids 0,1,2,3,4,5,6,7 \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 + +EXP1_DIR=$(ls -td $OUTPUT/pd-colo-kv-aware-*/ 2>/dev/null | head -1) +save_result "exp1_8way_dp_cache_aware" "$EXP1_DIR" + +######################################## +# Experiment 2: 1P + 7D KVC (aggressive) +######################################## +log "" +log "=== [EXP2] 1P7D KVC (inflight=off, min-turn=1) ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy default \ + --model-path $MODEL \ + --prefill-workers 1 --decode-workers 7 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp2_1p7d_kvc_aggressive" "$EXP2_DIR" + +######################################## +# Experiment 3: 2P + 6D KVC (aggressive) +######################################## +log "" +log "=== [EXP3] 2P6D KVC (inflight=off, min-turn=1) ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy default \ + --model-path $MODEL \ + --prefill-workers 2 --decode-workers 6 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP3_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp3_2p6d_kvc_aggressive" "$EXP3_DIR" + +######################################## +log "" +log "=== ALL TP1 V2 SWEEP EXPERIMENTS DONE ===" diff --git a/scripts/sweep_tp1_v3_kvaware.sh b/scripts/sweep_tp1_v3_kvaware.sh new file mode 100755 index 0000000..5375eb7 --- /dev/null +++ b/scripts/sweep_tp1_v3_kvaware.sh @@ -0,0 +1,108 @@ +#!/bin/bash +# TP1 v3 sweep — KVC with kv-aware policy (fix routing mismatch) +# v2 used --policy default for KVC experiments, causing session routing +# mismatch: replay round-robin ≠ router round-robin → "session not found". +# v3 uses --policy kv-aware for KVC to ensure session affinity. +set -euo pipefail +cd "$(dirname "$0")/.." + +MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507 +TRACE=outputs/qwen35-swebench-50sess.jsonl +OUTPUT=outputs/qwen3-30b-tp1-v3-kvaware +VENV_PYTHON=.venv/bin/python +RESULTS_FILE=$OUTPUT/sweep_results.txt + +mkdir -p $OUTPUT + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE +} + +save_result() { + local label=$1 + local run_dir=$2 + log "=== $label COMPLETED ===" + if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then + log "Summary:" + cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE + echo "" >> $RESULTS_FILE + cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json" + cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl" + log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl" + else + log "WARNING: No summary file found in $run_dir" + fi +} + +log "Starting TP1 v3 sweep (KVC with kv-aware policy)" +log "Model: $MODEL" +log "Trace: $TRACE (4449 requests, 52 sessions)" +log "Key change: --policy kv-aware for KVC (was --policy default in v2)" + +######################################## +# Experiment 1: 1P + 7D KVC kv-aware +######################################## +log "" +log "=== [EXP1] 1P7D KVC kv-aware ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy kv-aware \ + --model-path $MODEL \ + --prefill-workers 1 --decode-workers 7 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp1_1p7d_kvc_kvaware" "$EXP1_DIR" + +######################################## +# Experiment 2: 2P + 6D KVC kv-aware +######################################## +log "" +log "=== [EXP2] 2P6D KVC kv-aware ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy kv-aware \ + --model-path $MODEL \ + --prefill-workers 2 --decode-workers 6 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp2_2p6d_kvc_kvaware" "$EXP2_DIR" + +######################################## +log "" +log "=== ALL TP1 V3 SWEEP EXPERIMENTS DONE ===" diff --git a/scripts/sweep_tp1_v4_cap16.sh b/scripts/sweep_tp1_v4_cap16.sh new file mode 100755 index 0000000..7e107a5 --- /dev/null +++ b/scripts/sweep_tp1_v4_cap16.sh @@ -0,0 +1,108 @@ +#!/bin/bash +# TP1 v4 sweep — KVC with kv-aware policy + soft_cap raised from 4 to 16 +# v3 (kv-aware) fixed routing but session-cap fallback still dominated 52-65% +# of requests. Hardcoded min(4, ...) in _decode_session_soft_cap was the +# bottleneck — only 4*7=28 session slots for 52 trace sessions. +# v4 raises the cap to 16 (4*7=28 -> 16*7=112 slots). +set -euo pipefail +cd "$(dirname "$0")/.." + +MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507 +TRACE=outputs/qwen35-swebench-50sess.jsonl +OUTPUT=outputs/qwen3-30b-tp1-v4-cap16 +VENV_PYTHON=.venv/bin/python +RESULTS_FILE=$OUTPUT/sweep_results.txt + +mkdir -p $OUTPUT + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE +} + +save_result() { + local label=$1 + local run_dir=$2 + log "=== $label COMPLETED ===" + if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then + log "Summary:" + cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE + echo "" >> $RESULTS_FILE + cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json" + cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl" + log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl" + else + log "WARNING: No summary file found in $run_dir" + fi +} + +log "Starting TP1 v4 sweep (KVC kv-aware, session soft_cap raised 4->16)" +log "Model: $MODEL" +log "Trace: $TRACE (4449 requests, 52 sessions)" +log "Key change: _decode_session_soft_cap now min(16, ...) instead of min(4, ...)" + +######################################## +# Experiment 1: 1P + 7D KVC kv-aware (cap=16) +######################################## +log "" +log "=== [EXP1] 1P7D KVC kv-aware cap=16 ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy kv-aware \ + --model-path $MODEL \ + --prefill-workers 1 --decode-workers 7 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp1_1p7d_kvc_cap16" "$EXP1_DIR" + +######################################## +# Experiment 2: 2P + 6D KVC kv-aware (cap=16) +######################################## +log "" +log "=== [EXP2] 2P6D KVC kv-aware cap=16 ===" +PYTHONPATH=src:third_party/sglang/python \ +$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \ + --trace $TRACE \ + --output-root $OUTPUT \ + --mechanism kvcache-centric \ + --policy kv-aware \ + --model-path $MODEL \ + --prefill-workers 2 --decode-workers 6 \ + --prefill-tp-size 1 --decode-tp-size 1 \ + --prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \ + --transfer-backend mooncake \ + --gpu-budget 8 \ + --time-scale 10 \ + --session-sample-rate 1.0 \ + --target-duration-s 100000 \ + --concurrency-limit 32 \ + --timeout-s 900 \ + --request-timeout-s 300 \ + --kvcache-admission-mode worker \ + --kvcache-seed-min-turn-id 1 \ + --kvcache-seed-max-inflight-decode -1 \ + --kvcache-prefill-backup-policy release-after-transfer \ + --kvcache-prefill-priority-eviction + +EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1) +save_result "exp2_2p6d_kvc_cap16" "$EXP2_DIR" + +log "" +log "=== ALL TP1 V4 SWEEP EXPERIMENTS DONE ===" diff --git a/src/agentic_pd_hybrid/benchmark.py b/src/agentic_pd_hybrid/benchmark.py index 978d62e..24e9006 100644 --- a/src/agentic_pd_hybrid/benchmark.py +++ b/src/agentic_pd_hybrid/benchmark.py @@ -119,6 +119,8 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts: try: signal.signal(signal.SIGINT, _handle_termination) signal.signal(signal.SIGTERM, _handle_termination) + _mechanisms_with_router = {"pd-disaggregation", "kvcache-centric", "pd-colo"} + _naive_dp = config.mechanism_name == "pd-colo" if config.launch_stack: stack = launch_pd_stack( topology=topology, @@ -132,18 +134,19 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts: else config.timeout_s ), include_router=( - config.mechanism_name in {"pd-disaggregation", "kvcache-centric"} + config.mechanism_name in _mechanisms_with_router ), + naive_dp=_naive_dp, ) router_url = ( stack.router_url - if config.mechanism_name in {"pd-disaggregation", "kvcache-centric"} + if config.mechanism_name in _mechanisms_with_router else None ) else: router_url = ( topology.router_url - if config.mechanism_name in {"pd-disaggregation", "kvcache-centric"} + if config.mechanism_name in _mechanisms_with_router else None ) diff --git a/src/agentic_pd_hybrid/cli.py b/src/agentic_pd_hybrid/cli.py index 90e87b8..8221040 100644 --- a/src/agentic_pd_hybrid/cli.py +++ b/src/agentic_pd_hybrid/cli.py @@ -455,11 +455,18 @@ def main() -> None: if args.command == "print-launch": topology = _topology_from_args(args) + has_pd = bool(topology.prefill_workers and topology.decode_workers) + has_direct_only = bool( + topology.direct_workers + and not topology.prefill_workers + and not topology.decode_workers + ) plan = build_launch_plan( topology, prefill_policy=args.prefill_policy, decode_policy=args.decode_policy, - include_router=bool(topology.prefill_workers and topology.decode_workers), + include_router=has_pd or has_direct_only, + naive_dp=has_direct_only, ) print(plan.render()) return diff --git a/src/agentic_pd_hybrid/launcher.py b/src/agentic_pd_hybrid/launcher.py index e246c28..23a08c6 100644 --- a/src/agentic_pd_hybrid/launcher.py +++ b/src/agentic_pd_hybrid/launcher.py @@ -34,7 +34,24 @@ def build_launch_plan( decode_policy: str = "manual", include_router: bool = True, router_request_timeout_s: float | None = None, + naive_dp: bool = False, ) -> LaunchPlan: + router_command: tuple[str, ...] | None = None + if include_router: + if topology.prefill_workers and topology.decode_workers: + router_command = _build_router_command( + topology, + prefill_policy=prefill_policy, + decode_policy=decode_policy, + request_timeout_s=router_request_timeout_s, + ) + elif naive_dp and topology.direct_workers: + router_command = _build_dp_router_command( + topology, + backend_policy=decode_policy, + request_timeout_s=router_request_timeout_s, + ) + return LaunchPlan( prefill_commands=tuple( _build_server_command(topology, worker) for worker in topology.prefill_workers @@ -43,24 +60,17 @@ def build_launch_plan( _build_server_command(topology, worker) for worker in topology.decode_workers ), direct_commands=tuple( - _build_server_command(topology, worker) for worker in topology.direct_workers - ), - router_command=( - _build_router_command( - topology, - prefill_policy=prefill_policy, - decode_policy=decode_policy, - request_timeout_s=router_request_timeout_s, - ) - if include_router and topology.prefill_workers and topology.decode_workers - else None + _build_server_command(topology, worker, naive_dp=naive_dp) + for worker in topology.direct_workers ), + router_command=router_command, ) def _build_server_command( topology: SingleNodeTopology, worker: WorkerSpec, + naive_dp: bool = False, ) -> tuple[str, ...]: command = [ sys.executable, @@ -76,11 +86,15 @@ def _build_server_command( str(worker.port), "--base-gpu-id", str(worker.gpu_id), - "--disaggregation-mode", - _disaggregation_mode_for(worker), - "--disaggregation-transfer-backend", - topology.transfer_backend, ] + # Naive DP direct workers: no disaggregation flags at all + if not (naive_dp and worker.role == "direct"): + command.extend([ + "--disaggregation-mode", + _disaggregation_mode_for(worker), + "--disaggregation-transfer-backend", + topology.transfer_backend, + ]) if worker.tp_size > 1: command.extend(["--tp-size", str(worker.tp_size)]) if topology.trust_remote_code: @@ -135,6 +149,32 @@ def _build_router_command( return tuple(command) +def _build_dp_router_command( + topology: SingleNodeTopology, + *, + backend_policy: str, + request_timeout_s: float | None, +) -> tuple[str, ...]: + command: list[str] = [ + sys.executable, + "-B", + "-u", + "-m", + "agentic_pd_hybrid.pd_router", + "--host", + topology.router_host, + "--port", + str(topology.router_port), + "--backend-policy", + backend_policy, + ] + if request_timeout_s is not None: + command.extend(["--request-timeout-s", str(request_timeout_s)]) + for worker in topology.direct_workers: + command.extend(["--backend", worker.url]) + return tuple(command) + + def _render_named_command(name: str, command: tuple[str, ...]) -> str: return f"# {name}\n" + " ".join(shlex.quote(part) for part in command) diff --git a/src/agentic_pd_hybrid/metrics.py b/src/agentic_pd_hybrid/metrics.py index d813df1..b7d9bba 100644 --- a/src/agentic_pd_hybrid/metrics.py +++ b/src/agentic_pd_hybrid/metrics.py @@ -43,6 +43,9 @@ class RequestMetrics: ttft_s: float | None tpot_s: float | None error: str | None = None + actual_output_tokens: int | None = None + requested_output_tokens: int | None = None + finish_reason: str | None = None @classmethod def from_decision( @@ -63,6 +66,9 @@ class RequestMetrics: prefill_request_priority: int | None = None, decode_request_priority: int | None = None, error: str | None = None, + actual_output_tokens: int | None = None, + requested_output_tokens: int | None = None, + finish_reason: str | None = None, ) -> "RequestMetrics": return cls( request_id=request.request_id, @@ -95,6 +101,9 @@ class RequestMetrics: ttft_s=ttft_s, tpot_s=tpot_s, error=error, + actual_output_tokens=actual_output_tokens, + requested_output_tokens=requested_output_tokens, + finish_reason=finish_reason, ) @@ -158,6 +167,17 @@ def write_summary_json( str(key): value for key, value in sorted(decode_priorities.items()) }, "error_count": sum(1 for row in rows if row.error is not None), + "truncated_request_count": sum( + 1 + for row in rows + if row.actual_output_tokens is not None + and row.requested_output_tokens is not None + and row.requested_output_tokens > 1 + and row.actual_output_tokens < row.requested_output_tokens * 0.5 + ), + "actual_output_tokens_stats": _stats( + [float(row.actual_output_tokens) for row in rows if row.actual_output_tokens is not None] + ), } path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: diff --git a/src/agentic_pd_hybrid/pd_router.py b/src/agentic_pd_hybrid/pd_router.py index 72f3b83..17d0905 100644 --- a/src/agentic_pd_hybrid/pd_router.py +++ b/src/agentic_pd_hybrid/pd_router.py @@ -74,8 +74,58 @@ class RouterState: return idx +@dataclass +class DpRouterConfig: + host: str + port: int + backend_urls: list[str] + backend_policy: str = "round_robin" + request_timeout_s: float = 1800.0 + + +class DpRouterState: + """DP (data-parallel) router: forward each request to exactly one backend.""" + + def __init__(self, config: DpRouterConfig): + if not config.backend_urls: + raise ValueError("At least one backend worker is required") + self.config = config + self.cursor = 0 + self.sticky_map: dict[str, int] = {} + + def select_backend(self, headers: dict[str, str]) -> str: + idx = self._select_index(headers) + return self.config.backend_urls[idx] + + def _select_index(self, headers: dict[str, str]) -> int: + target_worker = headers.get("x-smg-target-worker") + routing_key = headers.get("x-smg-routing-key") + + if ( + self.config.backend_policy == "consistent_hashing" + and target_worker is not None + ): + idx = int(target_worker) + if 0 <= idx < len(self.config.backend_urls): + return idx + + if self.config.backend_policy == "manual" and routing_key: + cached = self.sticky_map.get(routing_key) + if cached is not None: + return cached + idx = self.cursor % len(self.config.backend_urls) + self.cursor += 1 + self.sticky_map[routing_key] = idx + return idx + + idx = self.cursor % len(self.config.backend_urls) + self.cursor += 1 + return idx + + app = FastAPI() router_state: RouterState | None = None +dp_state: DpRouterState | None = None @app.get("/health") @@ -85,6 +135,16 @@ async def health() -> Response: @app.get("/health_generate") async def health_generate() -> Response: + if dp_state is not None: + async with aiohttp.ClientSession() as session: + tasks = [ + session.get(f"{url}/health_generate") + for url in dp_state.config.backend_urls + ] + for response in asyncio.as_completed(tasks): + async with await response: + pass + return Response(status_code=200) state = _require_state() async with aiohttp.ClientSession() as session: tasks = [] @@ -101,6 +161,11 @@ async def health_generate() -> Response: @app.get("/v1/models") async def models() -> ORJSONResponse: + if dp_state is not None: + async with aiohttp.ClientSession() as session: + async with session.get(f"{dp_state.config.backend_urls[0]}/v1/models") as resp: + payload = await resp.json() + return ORJSONResponse(payload, status_code=resp.status) state = _require_state() async with aiohttp.ClientSession() as session: async with session.get(f"{state.config.prefill_urls[0][0]}/v1/models") as response: @@ -147,6 +212,15 @@ async def _forward_to_backend( headers: dict[str, str], endpoint_name: str, ) -> Response: + # DP mode: forward to a single backend + if dp_state is not None: + return await _forward_to_dp_backend( + request_data=request_data, + headers=headers, + endpoint_name=endpoint_name, + ) + + # PD mode: coordinate prefill + decode state = _require_state() prefill_server, bootstrap_port, decode_server = state.select_pair(headers) prefill_request, decode_request = _build_backend_requests( @@ -186,6 +260,63 @@ async def _forward_to_backend( ) +async def _forward_to_dp_backend( + *, + request_data: dict, + headers: dict[str, str], + endpoint_name: str, +) -> Response: + assert dp_state is not None + backend_server = dp_state.select_backend(headers) + cleaned = _strip_internal_fields(request_data) + timeout_s = dp_state.config.request_timeout_s + + if request_data.get("stream", False): + return StreamingResponse( + _stream_dp_generate( + request_data=cleaned, + backend_server=backend_server, + endpoint_name=endpoint_name, + timeout_s=timeout_s, + ), + media_type="text/event-stream", + ) + + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=timeout_s) + ) as session: + async with session.post( + f"{backend_server}/{endpoint_name}", json=cleaned + ) as response: + body = await response.read() + return Response( + content=body, + status_code=response.status, + media_type=response.content_type, + ) + + +async def _stream_dp_generate( + *, + request_data: dict, + backend_server: str, + endpoint_name: str, + timeout_s: float, +) -> AsyncIterator[bytes]: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=timeout_s) + ) as session: + async with session.post( + f"{backend_server}/{endpoint_name}", json=request_data + ) as response: + if response.status != HTTPStatus.OK: + payload = await response.read() + yield payload + return + async for chunk in response.content.iter_chunked(_STREAM_CHUNK_SIZE): + yield chunk + + async def _stream_generate( *, prefill_request: dict, @@ -241,6 +372,12 @@ def _build_backend_requests( prefill_request.update(bootstrap_payload) decode_request.update(bootstrap_payload) + # session_params is only meaningful for the decode worker (streaming session + # KV reuse). Sending it to the prefill worker causes the D side to + # short-circuit with local-prefill on already-open sessions, returning + # truncated responses while P's KV transfer gets aborted. + prefill_request.pop("session_params", None) + if prefill_priority is not None: prefill_request["priority"] = int(prefill_priority) if decode_priority is not None: @@ -262,7 +399,7 @@ def _require_state() -> RouterState: def main() -> None: - parser = argparse.ArgumentParser(description="Minimal local PD router") + parser = argparse.ArgumentParser(description="Minimal local PD / DP router") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8000) parser.add_argument( @@ -270,30 +407,58 @@ def main() -> None: nargs=2, metavar=("URL", "BOOTSTRAP_PORT"), action="append", - required=True, + default=None, ) parser.add_argument( "--decode", action="append", - required=True, + default=None, ) parser.add_argument("--prefill-policy", default="round_robin") parser.add_argument("--decode-policy", default="manual") + parser.add_argument( + "--backend", + action="append", + default=None, + help="Backend URL for DP (data-parallel) mode. Repeat for each worker.", + ) + parser.add_argument( + "--backend-policy", + default="round_robin", + help="Routing policy for DP mode: round_robin, manual, consistent_hashing.", + ) parser.add_argument("--request-timeout-s", type=float, default=1800.0) args = parser.parse_args() - global router_state - router_state = RouterState( - RouterConfig( - host=args.host, - port=args.port, - prefill_urls=[(url, int(port)) for url, port in args.prefill], - decode_urls=list(args.decode), - prefill_policy=args.prefill_policy, - decode_policy=args.decode_policy, - request_timeout_s=args.request_timeout_s, + global router_state, dp_state + + if args.backend: + # DP mode: simple forward to one of N backends + dp_state = DpRouterState( + DpRouterConfig( + host=args.host, + port=args.port, + backend_urls=list(args.backend), + backend_policy=args.backend_policy, + request_timeout_s=args.request_timeout_s, + ) ) - ) + elif args.prefill and args.decode: + # PD mode: prefill/decode coordination + router_state = RouterState( + RouterConfig( + host=args.host, + port=args.port, + prefill_urls=[(url, int(port)) for url, port in args.prefill], + decode_urls=list(args.decode), + prefill_policy=args.prefill_policy, + decode_policy=args.decode_policy, + request_timeout_s=args.request_timeout_s, + ) + ) + else: + parser.error("Either --backend (DP mode) or both --prefill and --decode (PD mode) are required") + uvicorn.run(app, host=args.host, port=args.port, log_level="info") diff --git a/src/agentic_pd_hybrid/replay.py b/src/agentic_pd_hybrid/replay.py index 801abfb..db1297e 100644 --- a/src/agentic_pd_hybrid/replay.py +++ b/src/agentic_pd_hybrid/replay.py @@ -124,6 +124,9 @@ class ExecutionResult: prefill_request_priority: int | None = None decode_request_priority: int | None = None error: str | None = None + actual_output_tokens: int | None = None + requested_output_tokens: int | None = None + finish_reason: str | None = None async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]: @@ -274,6 +277,9 @@ async def _run_request( prefill_request_priority=execution.prefill_request_priority, decode_request_priority=execution.decode_request_priority, error=execution.error, + actual_output_tokens=execution.actual_output_tokens, + requested_output_tokens=execution.requested_output_tokens, + finish_reason=execution.finish_reason, ) @@ -286,7 +292,7 @@ async def _invoke_router( session_id: str | None = None, prefill_request_priority: int | None = None, decode_request_priority: int | None = None, -) -> tuple[float, float | None, float | None, int]: +) -> GenerateResult: headers = _build_headers( request=request, header_mode=config.header_mode, @@ -414,6 +420,18 @@ async def _invoke_chat_completion( return latency_s, ttft_s, tpot_s, cached_tokens +@dataclass(frozen=True) +class GenerateResult: + latency_s: float + ttft_s: float | None + tpot_s: float | None + cached_tokens: int + actual_output_tokens: int + requested_output_tokens: int + finish_reason: str | None + server_meta_info: dict | None + + async def _invoke_generate( *, client: httpx.AsyncClient, @@ -423,12 +441,16 @@ async def _invoke_generate( timeout_s: float, stream_idle_timeout_s: float | None, stream: bool, -) -> tuple[float, float | None, float | None, int]: +) -> GenerateResult: start = time.perf_counter() ttft_s: float | None = None cached_tokens = 0 sampling_params = payload.get("sampling_params", {}) - generated_tokens = int(sampling_params.get("max_new_tokens", 1)) + requested_output_tokens = int(sampling_params.get("max_new_tokens", 1)) + actual_token_count = 0 + finish_reason: str | None = None + last_meta_info: dict | None = None + if stream: async with client.stream( "POST", @@ -452,8 +474,19 @@ async def _invoke_generate( if isinstance(error, dict): raise ValueError(error.get("message", json.dumps(error))) cached_tokens = max(cached_tokens, _extract_generate_cached_tokens(parsed)) - if _contains_generate_token(parsed) and ttft_s is None: - ttft_s = time.perf_counter() - start + if _contains_generate_token(parsed): + actual_token_count += 1 + if ttft_s is None: + ttft_s = time.perf_counter() - start + meta_info = parsed.get("meta_info") + if isinstance(meta_info, dict): + last_meta_info = meta_info + completion_tokens = int(meta_info.get("completion_tokens", 0)) + if completion_tokens > actual_token_count: + actual_token_count = completion_tokens + fr = meta_info.get("finish_reason") + if fr is not None: + finish_reason = str(fr) if _is_generate_terminal_chunk(parsed): break else: @@ -469,15 +502,33 @@ async def _invoke_generate( if isinstance(error, dict): raise ValueError(error.get("message", json.dumps(error))) cached_tokens = _extract_generate_cached_tokens(parsed) + meta_info = parsed.get("meta_info") + if isinstance(meta_info, dict): + last_meta_info = meta_info + actual_token_count = int(meta_info.get("completion_tokens", 0)) + finish_reason = meta_info.get("finish_reason") latency_s = time.perf_counter() - start - if stream and ttft_s is None and generated_tokens > 0: + if stream and ttft_s is None and requested_output_tokens > 0: raise RuntimeError("generate stream ended before producing any token") + + # Use actual token count for TPOT (not requested count) + effective_tokens = max(1, actual_token_count) if actual_token_count > 0 else max(1, requested_output_tokens) if ttft_s is None: tpot_s = None else: - tpot_s = max(0.0, latency_s - ttft_s) / max(1, generated_tokens) - return latency_s, ttft_s, tpot_s, cached_tokens + tpot_s = max(0.0, latency_s - ttft_s) / effective_tokens + + return GenerateResult( + latency_s=latency_s, + ttft_s=ttft_s, + tpot_s=tpot_s, + cached_tokens=cached_tokens, + actual_output_tokens=actual_token_count, + requested_output_tokens=requested_output_tokens, + finish_reason=finish_reason, + server_meta_info=last_meta_info, + ) async def _open_streaming_session( @@ -850,8 +901,8 @@ def _decode_session_soft_cap( - residency.headroom_tokens.get(server_url, 0), ) if usable_capacity_tokens <= 0: - return 4 - return max(1, min(4, usable_capacity_tokens // target_tokens)) + return 16 + return max(1, min(16, usable_capacity_tokens // target_tokens)) def _should_admit_new_decode_session( @@ -1587,7 +1638,7 @@ async def _invoke_plain_router( config=config, direct_to_d_predicted=False, ) - latency_s, ttft_s, tpot_s, cached_tokens = await _invoke_router( + gen = await _invoke_router( client=client, request=request, config=config, @@ -1598,13 +1649,16 @@ async def _invoke_plain_router( execution_mode=execution_mode, actual_kv_transfer_blocks=decision.kv_transfer_blocks, effective_input_length=request.input_length, - cached_tokens=cached_tokens, + cached_tokens=gen.cached_tokens, prefill_request_priority=prefill_priority, session_reused=False, session_reset=False, - latency_s=latency_s, - ttft_s=ttft_s, - tpot_s=tpot_s, + latency_s=gen.latency_s, + ttft_s=gen.ttft_s, + tpot_s=gen.tpot_s, + actual_output_tokens=gen.actual_output_tokens, + requested_output_tokens=gen.requested_output_tokens, + finish_reason=gen.finish_reason, ) @@ -1676,7 +1730,7 @@ async def _invoke_kvcache_seeded_router( decode_session.opened = True decode_session_newly_opened = True decode_session.active_requests += 1 - latency_s, ttft_s, tpot_s, cached_tokens = await _invoke_router( + gen = await _invoke_router( client=client, request=request, config=config, @@ -1742,13 +1796,16 @@ async def _invoke_kvcache_seeded_router( execution_mode=execution_mode, actual_kv_transfer_blocks=decision.kv_transfer_blocks, effective_input_length=request.input_length, - cached_tokens=cached_tokens, + cached_tokens=gen.cached_tokens, prefill_request_priority=prefill_priority, session_reused=False, session_reset=False, - latency_s=latency_s, - ttft_s=ttft_s, - tpot_s=tpot_s, + latency_s=gen.latency_s, + ttft_s=gen.ttft_s, + tpot_s=gen.tpot_s, + actual_output_tokens=gen.actual_output_tokens, + requested_output_tokens=gen.requested_output_tokens, + finish_reason=gen.finish_reason, ) @@ -1774,14 +1831,16 @@ async def _execute_request( ) if config.mechanism_name == "pd-colo": - return await _invoke_direct( + if not config.router_url: + raise ValueError("router_url is required for pd-colo replay") + result = await _invoke_plain_router( client=client, request=request, config=config, decision=decision, - direct_sessions=direct_sessions, - direct_session_lock=direct_session_lock, + execution_mode="dp-colo-router", ) + return replace(result, actual_kv_transfer_blocks=0) if config.mechanism_name == "kvcache-centric": if not config.router_url: @@ -2238,7 +2297,7 @@ async def _invoke_session_direct( session.active_requests += 1 try: - latency_s, ttft_s, tpot_s, cached_tokens = await _invoke_generate( + gen = await _invoke_generate( client=client, base_url=session.server_url, headers={"x-request-id": request.request_id}, @@ -2277,12 +2336,15 @@ async def _invoke_session_direct( execution_mode=execution_mode, actual_kv_transfer_blocks=0, effective_input_length=len(input_ids), - cached_tokens=cached_tokens, + cached_tokens=gen.cached_tokens, session_reused=session_reused, session_reset=session_reset, - latency_s=latency_s, - ttft_s=ttft_s, - tpot_s=tpot_s, + latency_s=gen.latency_s, + ttft_s=gen.ttft_s, + tpot_s=gen.tpot_s, + actual_output_tokens=gen.actual_output_tokens, + requested_output_tokens=gen.requested_output_tokens, + finish_reason=gen.finish_reason, ) diff --git a/src/agentic_pd_hybrid/stack.py b/src/agentic_pd_hybrid/stack.py index 1f2a2be..33e07db 100644 --- a/src/agentic_pd_hybrid/stack.py +++ b/src/agentic_pd_hybrid/stack.py @@ -66,6 +66,7 @@ def launch_pd_stack( timeout_s: float = 1200.0, router_request_timeout_s: float | None = None, include_router: bool = True, + naive_dp: bool = False, ) -> ManagedPdStack: run_dir.mkdir(parents=True, exist_ok=True) logs_dir = run_dir / "logs" @@ -77,6 +78,7 @@ def launch_pd_stack( decode_policy=decode_policy, include_router=include_router, router_request_timeout_s=router_request_timeout_s, + naive_dp=naive_dp, ) prefill_processes = [ @@ -195,6 +197,9 @@ def _build_process_env(topology: SingleNodeTopology) -> dict[str, str]: env["MC_MS_AUTO_DISC"] = "0" if topology.ib_device: env["MOONCAKE_DEVICE"] = topology.ib_device + elif topology.transfer_backend == "mooncake": + # Default to TCP when RDMA is not forced (e.g. loopback on same node) + env.setdefault("MOONCAKE_PROTOCOL", "tcp") repo_root = Path(__file__).resolve().parents[2] python_paths = [ diff --git a/third_party/sglang/python/sglang/srt/distributed/device_communicators/mooncake_transfer_engine.py b/third_party/sglang/python/sglang/srt/distributed/device_communicators/mooncake_transfer_engine.py index ba20176..e4a44da 100644 --- a/third_party/sglang/python/sglang/srt/distributed/device_communicators/mooncake_transfer_engine.py +++ b/third_party/sglang/python/sglang/srt/distributed/device_communicators/mooncake_transfer_engine.py @@ -189,10 +189,11 @@ class MooncakeTransferEngine: device_name if device_name is not None else "", ) else: + protocol = os.environ.get("MOONCAKE_PROTOCOL", "rdma") ret_value = self.engine.initialize( hostname, "P2PHANDSHAKE", - "rdma", + protocol, device_name if device_name is not None else "", ) if ret_value != 0: