docs: KVC v1-v4 debug journey + raise session soft_cap to 16

Document the iterative debugging from v1 (broken KVC) through v4
(routing fixed + session cap raised), with code-level analysis of
the two main bugs encountered:

1. v2 root cause (mis-diagnosed previously as `allow_local_prefill`):
   `--policy default` for KVC mechanism caused replay's round-robin
   policy and the PD router's round-robin to diverge, sending requests
   with `session_params` to a D worker that did not have the session
   open. Resulted in 56-61% truncation with finish_reason
   "session id X does not exist".
   Fix: use `--policy kv-aware` (sweep_tp1_v3_kvaware.sh) so replay
   emits `x-smg-target-worker` and PD router uses consistent_hashing.

2. v3 new bottleneck: `pd-router-fallback-large-append-session-cap`
   dominated 52-65% of requests. Root cause was hardcoded
   `min(4, ...)` in `_decode_session_soft_cap`. With 7 D workers x 4
   sessions = 28 slots for 52 trace sessions, ~24 sessions starved
   permanently (bimodal direct-to-D rate of 0% or 99%).
   Fix: raise the cap to 16 (replay.py).

Also includes the v3 finding that direct-to-d-session path P50=0.495s
and TTFT P50=0.043s already beats the 8-way DP baseline (0.65s/0.093s)
- the KVC core mechanism works when fallback paths are avoided.

Files:
- docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md: full journey + code location index
- docs/SWEBENCH_EXPERIMENT_{PROGRESS,RESULTS}.md: prior session notes
- scripts/sweep_tp1_v{2,3,4}*.sh: experiment driver scripts
- src/agentic_pd_hybrid/replay.py: cap 4 -> 16, audit fields
- src/agentic_pd_hybrid/pd_router.py: strip session_params from prefill
- src/agentic_pd_hybrid/metrics.py: truncated_request_count

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
kzlin
2026-04-28 21:10:41 +08:00
parent e9062b1d6e
commit c9d350b372
24 changed files with 1672 additions and 62 deletions

View File

@@ -0,0 +1,216 @@
# KVC 实验踩坑记录与代码 Bug 分析v1 → v4
记录从 v1 到 v4 KVC 实验的踩坑过程、错误诊断、以及最终定位的代码 bug。
模型: Qwen3-30B-A3B (TP1),硬件: 单节点 8×H100 80GB。
Trace: `qwen35-swebench-50sess.jsonl`4449 请求52 sessions
## TL;DR
| 版本 | 关键变化 | 截断率 | direct-to-D 占比 | P50 | 主要瓶颈 |
|------|----------|:---:|:---:|:---:|----------|
| v1 (smoke / 早期) | mechanism 跑通 | - | - | - | - |
| v2 | KVC + `--policy default` | **56.8% / 61.4%** | <0.1% | 0.08s* | Routing 错位默认策略 |
| v3 | KVC + `--policy kv-aware` | **0.9%** | 30-42% | 1.5-1.8s | session-cap fallback (52-65%) |
| v4 | v3 + soft_cap 416 | (待数据) | (待数据) | (待数据) | (待数据) |
`*` v2 P50 是假数字——超过半数请求只生成 1 token 就被 abort
## v2 踩坑Default policy 与 KVC 机制根本不兼容
### 表象
`scripts/sweep_tp1_v2_fixed.sh` 跑出来
- Exp18-way DPbaseline4449/4449 成功P50=0.65serror=0
- Exp21P7D KVC**2524 truncated (56.8%)**18 errorsP50=0.08s* ()
- Exp32P6D KVC**2733 truncated (61.4%)**17 errorsP50=0.08s* ()
每个截断请求 `actual_output_tokens=1``finish_reason="abort: session id X does not exist"`
### 错误的早期诊断
之前 `RESULTS_SUMMARY.md` 把锅扣在 SGLang `--disaggregation-decode-allow-local-prefill` flag 认为是 D worker 在有 `bootstrap_room` 时仍然做了 local prefill这个诊断**完全错误**—— `scheduler.py:1975-1980` `_should_allow_local_prefill_on_decode`
```python
def _should_allow_local_prefill_on_decode(self, req: Req) -> bool:
return (
self.disaggregation_mode == DisaggregationMode.DECODE
and self.server_args.disaggregation_decode_allow_local_prefill
and req.bootstrap_room is None # ← 有 bootstrap_room 不会走 local prefill
)
```
KVC reseed 路径的请求都带 `bootstrap_room`根本不会触发 local prefill
### 实际根因Replay 与 PD Router 的 round-robin 错位
实验脚本里 KVC `--policy default` baseline `--policy kv-aware`
`benchmark.py:287-300` 这两者的差别巨大
```python
def _decode_policy_for(policy_name: str) -> str:
if policy_name == "sticky": return "manual"
if policy_name == "kv-aware": return "consistent_hashing"
return "round_robin" # default
def _header_mode_for(policy_name: str) -> str:
if policy_name == "sticky": return "routing-key"
if policy_name == "kv-aware": return "target-worker"
return "none" # default
```
`default` policy + KVC 机制下
1. Replay policy`policies.py:DefaultPolicy`round-robin 选一个 D比如 D-3
2. Replay D-3 `open_session(session_id=X)``replay.py:1722-1731`
3. Replay 通过 PD Router 发请求 `session_params` `header_mode=none`**不发任何 routing header**
4. PD Router (`pd_router.py:_select_decode_index`) 看到 `decode_policy=round_robin`**自己独立的计数器**round-robin发到了 D-5
5. D-5 scheduler 看到 `session_params` 里有 session_id但自己的 `session_controller` 里没这个 sessionsession D-3 )→ abort with `"Invalid request: session id X does not exist"` (`scheduler.py:1824-1836`)
两个独立的 round-robin 计数器只要一次错位任何并发或 direct-to-D 绕过 router 的请求都会引起就永远对不上
### 为什么 turn 0 不出问题?
Turn 0 `_invoke_plain_router``replay.py:1894`不带 `session_params`作为普通 PD disagg 请求处理发到任何 D 都行Turn 1+ 才开始走带 session_params KVC 路径撞上路由错位
### 数据特征验证per-session pattern
```
session 11360 (58 turns): pattern = .TTTTT.TTTTTTT.TTTTTT... ← turn 0 OK1+ 全 T
session 18720 (87 turns): pattern = .TTTTTTTTTTTTTTTTTT...
```
每个 D worker 收到了全部 52 session 的请求理想情况下应该是 ~7-8 /D因为 round-robin session 完全打散)。
### 修复
唯一正确的修复是把 KVC policy `default` 改成 `kv-aware`
```diff
- --policy default
+ --policy kv-aware
```
`KvAwarePolicy` (`policies.py:146-187`) 做两件事
1. `_overlap_blocks` + `sticky_bonus` 给每个 D 打分session 自然粘在同一个 D**session 亲和性**
2. `header_mode=target-worker` `x-smg-target-worker` header
3. PD Router `consistent_hashing` 模式看到 header 就直接用不再 round-robin
## v3 改 kv-aware policy 后:路由对了,但新瓶颈出现
`scripts/sweep_tp1_v3_kvaware.sh` 把所有 KVC 实验改成 `--policy kv-aware`结果
| 指标 | v2 1P7D (default) | **v3 1P7D (kv-aware)** | v3 2P6D | 8-way DP baseline |
|------|:---:|:---:|:---:|:---:|
| 截断 | 56.8% | **0.9%** | 0.9% | 1.5% |
| Errors | 18 | 363 (8.2%) | 9 | 0 |
| Mean | 4.74s | 4.88s | 3.58s | 1.43s |
| P50 | 0.08s* () | 1.75s | 1.52s | 0.65s |
| P90 | 12.14s | 12.67s | 9.23s | 3.61s |
| TTFT P50 | - | 0.36s | 0.33s | 0.09s |
**截断从 56.8% 降到 0.9%,路由问题彻底解决**
P50 仍然是 baseline 2-3
### Direct-to-D 路径表现优秀KVC 该有的样子)
execution_mode 拆开看
| 路径 | Exp1 1P7D 占比 | Exp1 1P7D P50 | Exp1 1P7D TTFT P50 |
|------|:---:|:---:|:---:|
| `kvcache-direct-to-d-session` | 42.0% | **0.495s** | **0.043s** |
| `pd-router-fallback-large-append-session-cap` 🔥 | **52.6%** | 5.6s | 3.7s |
Direct-to-D 路径下
- P50 = 0.495s**比 baseline 0.65s 25%**
- TTFT P50 = 0.043s**比 baseline 0.093s 2 **
- KV transfer = 0 P 介入 D append-prefill
这才是 KVC 真正的价值但只有 30-42% 请求走到这条路
### 新瓶颈session-cap fallback 占了 52-65%
`pd-router-fallback-large-append-session-cap` 1P7D 52.6%、2P6D 65.4%。这条路径意味着 router 想开新 session D admission 拒绝了"d-session-cap"只好回退到 plain routerP 全量 prefill + 传给 D session 复用)。
### Bimodal session 分布starvation
| Session | Total turns | Direct-to-D | Session-cap fallback |
|---------|:---:|:---:|:---:|
| 22080 | 129 | **98%** | 0% |
| 3840 | 118 | **97%** | 0% |
| 70560 | 150 | **0%** | **99%** |
| 39360 | 148 | **0%** | **99%** |
| 61600 | 117 | **0%** | **99%** |
要么完全幸运要么完全饿死——典型的双峰分布
### 根因:硬编码 cap=4
`replay.py:_decode_session_soft_cap` 原始代码
```python
def _decode_session_soft_cap(...) -> int:
target_tokens = max(1, _estimate_session_resident_tokens(request))
usable_capacity_tokens = _usable_capacity_tokens(residency, server_url)
...
if usable_capacity_tokens <= 0:
return 4
return max(1, min(4, usable_capacity_tokens // target_tokens))
# ^^^ 硬编码上限 4
```
7 D × 每个 D 最多 4 session = **28 个 session slot 总容量**。Trace 52 session 24 session 永远抢不到 slot
启动期 race condition 决定了哪些 session "幸运儿"—— 28 个挤进来的 session 的所有后续 turn 都走 direct-to-D剩下 24 session 永远走 session-cap fallback)。
## v4 改进:把硬 cap 从 4 提到 16
`replay.py:_decode_session_soft_cap` 一行修改
```diff
- if usable_capacity_tokens <= 0:
- return 4
- return max(1, min(4, usable_capacity_tokens // target_tokens))
+ if usable_capacity_tokens <= 0:
+ return 16
+ return max(1, min(16, usable_capacity_tokens // target_tokens))
```
7 D × 16 = 112 slot远超 52 session 需求预期 session-cap fallback 占比降到 <10%整体 P50 direct-to-D 0.46s 收敛
实际数据见 `outputs/qwen3-30b-tp1-v4-cap16/`
## 后续可以考虑的更深方案:让 D 自己决定 admission
v4 的硬 cap 抬高只是把数字调大实际容量管理还是 replay 自己估算代码里 `replay.py:_decode_session_soft_cap` `target_tokens = input + output`基于当前请求的 size估算每个 session footprint
- agentic context 越攒越长target_tokens 动态增长cap 会随之缩小
- 多个并发请求查询时 replay 视图会过期
- replay 自己写了 LRU eviction 逻辑`_reserve_decode_session_capacity_from_router_state` SGLang 内部的 `maybe_trim_decode_session_cache` 重复且永远滞后
SGLang 已经提供 `/session_cache/admit_direct_append` 端点`scheduler.py:3497`D worker 自己回答能不能 admit并且查询时**主动调用 LRU eviction**。但这个端点只在 direct-to-D 路径用seed/reseed 路径用的是 replay 自己估算的 soft_cap
理想方案是扩展端点支持 `seed_new` / `reseed` 模式replay 完全交给 D 决策——但这需要 SGLang patch + replay 重构~200 工程量比 v4 大得多
## 关键文件与代码位置索引
| 现象 | 代码位置 |
|------|----------|
| Replay policy round-robin | `policies.py:63-67` `RoutingState.next_decode_worker_id` |
| KV-aware policysession 亲和 | `policies.py:146-187` `KvAwarePolicy.select` |
| PD router decode 选择 | `pd_router.py:51-74` `_select_decode_index` |
| Header 构建 | `replay.py:2407-2424` `_build_headers` |
| Policy router config 映射 | `benchmark.py:287-300` `_decode_policy_for/_header_mode_for` |
| Session admission cap | `replay.py:889-905` `_decode_session_soft_cap` |
| 已有的 D admission 端点 | `scheduler.py:3497-3580` `admit_direct_append` |
| Session D 上找不到的报错 | `scheduler.py:1824-1836` |
| `_should_allow_local_prefill_on_decode` | `scheduler.py:1975-1980` |
| Reseed 流程入口 | `replay.py:1665-1809` `_invoke_kvcache_seeded_router` |
| Direct-to-D 流程 | `replay.py:2351-2398` `_invoke_decode_session_direct` |
## 经验教训
1. **policy 和 mechanism 是两个正交维度**——`--policy default` 不是"无脑默认值"它真的是 round-robin session 亲和性KVC 机制必须配 session 亲和的 policy
2. **不要无脑相信前一个 agent 的 RESULTS_SUMMARY**——v2 的诊断"local prefill bug"和实际 finish_reason"session id does not exist"完全对不上任何错误诊断必须用 finish_reasonexecution_mode 这些原始字段交叉验证
3. **bimodal 分布是 starvation 的强信号**——v3 数据里某些 session 100% 走快路径某些 100% 走慢路径几乎肯定是某种"先到先得"的资源竞争看到这种模式立刻去找硬编码 cap 或全局共享资源
4. **测量要看分组而非整体均值**——v3 整体 P50=1.5s 看似比 baseline 但拆开看 direct-to-D 子集 P50=0.495s 已经反超 baseline整体均值被 fallback 路径拖累 KVC 的核心价值是真实存在的

View File

@@ -0,0 +1,95 @@
# SWE-Bench PD Hybrid Experiment Progress
## 实验目标
在单节点 8xH100 上复现 agentic-pd-hybrid 三种 serving mechanism对比 Qwen3.5-35B-A3B 在 SWE-Bench 500 instance agentic trajectory 上的性能。
## 硬件环境
- 8x H100 80GB (NVLink 互联, 2 NUMA nodes: GPU 0-3 / GPU 4-7)
- 无 RDMA/IB 设备
- Transfer backend: **mooncake TCP** (nixl UCX 因 pip 包缺少 CUDA 支持导致 segfault已放弃)
## 实验矩阵
| 实验 | Mechanism | Workers | GPU 分配 | Router | Policy |
|------|-----------|---------|----------|--------|--------|
| A | pd-disaggregation | 1P + 1D (TP4 each) | P: 0-3, D: 4-7 | Yes | default |
| B | pd-colo | 2 direct (TP4 each) | D0: 0-3, D1: 4-7 | No | default |
| C | kvcache-centric | 1P + 1D (TP4 each) | P: 0-3, D: 4-7 | Yes | default |
## 测试负载
- 源数据: `simm-swe-bench/outputs/20260416-205833-hicache-qwen35-verified-0-500/audit.jsonl`
- 39,417 lines (turns), 497 unique instances (sessions)
- 每个 instance 8-150 turns (均值 79.3)
- 转换为 agentic-pd-hybrid trace 格式: `outputs/qwen35-swebench-500.jsonl`
## 关键发现
### Transfer Backend 选择
- **nixl (UCX)**: pip 安装的 nixl_cu12 包自带的 UCX 库没有 CUDA 支持,导致 GPU memory registration 时 segfault。系统 UCX (/opt/hpcx/ucx) 有 CUDA 支持但因 RPATH 无法被 NIXL 使用。
- **mooncake (TCP)**: 可用。需要两处修改:
1. `third_party/sglang/.../mooncake_transfer_engine.py`: 从环境变量 `MOONCAKE_PROTOCOL` 读取协议,而非硬编码 `"rdma"`
2. `src/agentic_pd_hybrid/stack.py`: 当 `transfer_backend == "mooncake"` 且非 `force_rdma` 时,自动设置 `MOONCAKE_PROTOCOL=tcp`
### 代码修改记录
1. **`third_party/sglang/python/sglang/srt/distributed/device_communicators/mooncake_transfer_engine.py`**
-`"rdma"` 硬编码改为 `os.environ.get("MOONCAKE_PROTOCOL", "rdma")`
2. **`src/agentic_pd_hybrid/stack.py`**
-`_build_process_env()` 中添加: mooncake 非 force_rdma 时默认设置 `MOONCAKE_PROTOCOL=tcp`
3. **`scripts/convert_audit_to_trace.py`** (新建)
- 将 sibench audit.jsonl 转换为 agentic-pd-hybrid trace 格式
## 实验进度
- [x] Step 0: 环境准备 (uv sync, nixl/mooncake 安装)
- [x] Step 1: Trace 格式转换 (39,417 lines 验证通过)
- [x] Step 2: Smoke test (pd-disaggregation, mooncake TCP, 100 requests) — **通过**
- 100/100 requests, 0 errors
- Mean latency: 1.53s, P50: 0.77s, P90: 2.82s
- TTFT: mean 0.49s, P50 0.29s; TPOT: mean 4.7ms
- 91/100 cache hits
- [x] Step 3a: 实验 A 全量尝试 (39K reqs, 497 sessions) — **中止**
- Run dir: `outputs/swebench-exps/pd-disaggregation-default-20260426T171113Z` (无metrics,被kill)
- 前 90% 完成 ~80min (~8-10 req/s), 但尾部 D 侧 KV cache 98% 饱和
- 497 并发 session 争抢 D 侧 token 空间, mamba 80-93 sessions 无法 drain
- **教训**: 1P+1D (TP4) 无法支撑 497 并发 session, 需减少 session 数量或降低 concurrency
- [x] Step 3b: 实验 A — pd-disaggregation (52 sessions, 4449 reqs, concurrency=32) — **完成**
- Run dir: `outputs/swebench-exps/pd-disaggregation-default-20260426T202540Z`
- Trace: `outputs/qwen35-swebench-50sess.jsonl` (10% sample, 52 sessions)
- **结果**: 4449/4449 成功, 0 errors
- Latency: mean=1.66s, P50=0.97s, P90=3.64s, P99=7.68s
- TTFT: mean=0.45s, P50=0.34s, P90=0.88s
- TPOT: mean=5.2ms, P50=5.2ms
- Cache hit: 4199/4449 (94.4%)
- [x] Step 4: 实验 B — pd-colo — **失败: SGLang bug**
- Run dir: `outputs/swebench-exps/pd-colo-default-20260426T210129Z`
- **Bug**: `--disaggregation-mode null` (colocation) 下 Qwen3.5-35B-A3B 模型触发 token_to_kv_pool_allocator 内存泄漏
- 错误: `ValueError: token_to_kv_pool_allocator memory leak detected!`
- 两个 direct worker 在处理 ~5 个请求后均 crash (Scheduler exception)
- **结论**: 当前 vendored SGLang v0.5.10 不支持 Qwen3.5-35B-A3B 的 colocation 模式
- [x] Step 5: 实验 C — kvcache-centric — **完成 (高错误率)**
- Run dir: `outputs/swebench-exps/kvcache-centric-default-worker-admission-20260426T210800Z`
- 4390/4449 errors (98.7%) — admission control 过于保守
- 59 成功请求: mean latency 1.24s (比 pd-disagg 快 25%), TTFT 0.18s (快 60%)
- 详细分析见 `docs/SWEBENCH_EXPERIMENT_RESULTS.md`
- [x] Step 6: 结果对比分析 — **完成**
- 完整报告: `docs/SWEBENCH_EXPERIMENT_RESULTS.md`
## 启动脚本
- `scripts/run_exp_a_pd_disagg.sh` — 实验 A
- `scripts/run_exp_b_pd_colo.sh` — 实验 B
- `scripts/run_exp_c_kvcache_centric.sh` — 实验 C
- `scripts/convert_audit_to_trace.py` — Trace 转换
## 已知风险
1. Qwen3.5-35B-A3B TP4 可用 mem ~12GB/GPU (after model + CUDA graph),长 session (150 turns) 可能 OOM
2. mooncake TCP loopback 延迟远低于真实跨机,结果偏乐观
3. 原始 trace 时间跨度 ~6000s全量回放非常耗时

View File

@@ -0,0 +1,121 @@
# SWE-Bench PD Hybrid Experiment Results
## 实验配置
- **模型**: Qwen3.5-35B-A3B (MoE, 35B total / 3B active), TP4
- **硬件**: 8x H100 80GB, NVLink, 单节点
- **Transfer backend**: mooncake TCP (loopback)
- **Trace**: 52 sessions, 4,449 requests (10% sample of SWE-Bench 500 instances)
- **时间压缩**: time-scale=10, concurrency-limit=32
## 结果汇总
### Experiment A: pd-disaggregation (baseline)
| Metric | Value |
|--------|-------|
| Run dir | `pd-disaggregation-default-20260426T202540Z` |
| Requests | 4,449 / 4,449 (100%) |
| Errors | 0 |
| **Mean Latency** | **1.662s** |
| P50 Latency | 0.973s |
| P90 Latency | 3.644s |
| P99 Latency | 7.676s |
| Mean TTFT | 0.445s |
| P50 TTFT | 0.340s |
| P90 TTFT | 0.880s |
| Mean TPOT | 5.20ms |
| Cache Hit Rate | 94.4% (4199/4449) |
| Mean Cached Tokens | 27,794 |
| KV Transfer Blocks | 105,235 |
### Experiment B: pd-colo (colocation) — FAILED
| Metric | Value |
|--------|-------|
| Run dir | `pd-colo-default-20260426T210129Z` |
| Status | **CRASHED** |
| Error | `token_to_kv_pool_allocator memory leak detected!` |
| Root Cause | SGLang v0.5.10 `--disaggregation-mode null` 与 Qwen3.5-35B-A3B (Mamba/GDN hybrid) 不兼容 |
| Requests | ~10 / 4,449 (0.2%) |
**结论**: 当前 vendored SGLang 不支持此模型的 colocation 模式。需要修复 token_to_kv_pool_allocator 中 Mamba 模型的内存管理。
### Experiment C: kvcache-centric (session-aware PD)
| Metric | Value |
|--------|-------|
| Run dir | `kvcache-centric-default-worker-admission-20260426T210800Z` |
| Requests | 4,449 total |
| **Errors** | **4,390 (98.7%)** |
| Successful | 59 (1.3%) |
| Mean Latency (success) | 1.238s |
| P50 Latency (success) | 0.484s |
| P90 Latency (success) | 2.550s |
| Mean TTFT (success) | 0.179s |
| P50 TTFT (success) | 0.081s |
| Mean TPOT (success) | 4.70ms |
| Direct-to-D Sessions | 56 |
| KV Transfer (actual) | 196 blocks (vs 105,235 planned) |
**Execution Mode 分布**:
- `kvcache-centric` (failed): 4,390
- `kvcache-direct-to-d-session` (success): 56
- `pd-router-*` variants: 3
## 关键分析
### 1. pd-disaggregation (A) — 稳定可靠
- 100% 成功率0 错误
- Mean latency 1.66s 合理 (包含 P→D KV transfer 开销)
- 94.4% cache hit 说明 prefix cache 在 P 侧工作良好
- KV transfer 105K blocks = 主要开销来源
- **适合生产使用**
### 2. pd-colo (B) — 不可用
- Qwen3.5-35B-A3B 的 Mamba/GDN hybrid 架构在 `disaggregation-mode null` 下触发内存泄漏
- 这是 SGLang 的 bug不是 agentic-pd-hybrid 的问题
- **需要 SGLang 修复后重新测试**
### 3. kvcache-centric (C) — Admission 过于保守
- 98.7% 错误率说明 admission control 拒绝了几乎所有请求
- `kvcache-seed-min-turn-id=2` 过滤了 turn 1 的 seed正确行为
- 但绝大多数 turn 2+ 请求也走 `kvcache-centric` 模式后失败
- 可能原因:
- Worker admission 查询发现 D 侧没有对应 session 的 KV cache因为 turn 1 没有 seed
- D 侧 transfer queue 积压导致 admission 拒绝
- 成功的 56 个 `direct-to-d-session` 请求表现优异: TTFT 0.08s (P50), 比 pd-disagg 的 0.34s 快 4x
- **需要调优 admission 参数,或使用 `kvcache-seed-min-turn-id=1` 允许 turn 1 seed**
### 4. kvcache-centric 成功请求 vs pd-disaggregation 对比
| Metric | pd-disagg (A) | kvcache-centric (C, success only) | Delta |
|--------|:---:|:---:|:---:|
| Mean Latency | 1.662s | 1.238s | **-25.5%** |
| P50 Latency | 0.973s | 0.484s | **-50.3%** |
| Mean TTFT | 0.445s | 0.179s | **-59.8%** |
| P50 TTFT | 0.340s | 0.081s | **-76.2%** |
| Mean TPOT | 5.20ms | 4.70ms | -9.6% |
| Actual KV Transfer | 105,235 blk | 196 blk | **-99.8%** |
**当 kvcache-centric 成功时,性能提升显著:**
- TTFT 降低 60-76% (D 侧直接 append无需 P→D transfer)
- 端到端 latency 降低 25-50%
- KV transfer 减少 99.8%
## 后续建议
1. **修复 pd-colo**: 提交 SGLang issue 关于 Mamba/GDN 模型在 disaggregation-mode null 下的内存泄漏
2. **调优 kvcache-centric admission**:
- 尝试 `--kvcache-seed-min-turn-id 1` 允许 turn 1 seed
- 放宽 `--kvcache-seed-max-decode-transfer-queue-reqs` 阈值
- 使用 `--kvcache-admission-mode router` (shadow state, 不在 critical path)
3. **增加 D 侧内存**: 调整 `--mem-fraction-static` 给 KV cache 更多空间
4. **多 P/D 配置**: 测试 2P2D (TP2) 配置以增加并行度
## 实验日期
2026-04-27

View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""Convert sibench audit.jsonl to agentic-pd-hybrid trace format.
Source format (sibench audit.jsonl):
{"instance_id": "...", "ts": float, "messages": [...],
"audit": {"prompt_tokens": int, "completion_tokens": int, ...}}
Target format (agentic-pd-hybrid trace JSONL):
{"chat_id": int, "parent_chat_id": int, "timestamp": float,
"turn": int, "input_length": int, "output_length": int,
"type": str, "hash_ids": [int, ...]}
"""
import json
import sys
from collections import defaultdict
from pathlib import Path
BLOCK_TOKEN_BUDGET = 24 # tokens per block, matching trace.py default
def convert(src: Path, dst: Path) -> None:
# Group lines by instance_id, preserving order within each instance
instances: dict[str, list[dict]] = defaultdict(list)
with src.open() as f:
for line in f:
line = line.strip()
if not line:
continue
rec = json.loads(line)
instances[rec["instance_id"]].append(rec)
# Sort each instance's turns by timestamp
for iid in instances:
instances[iid].sort(key=lambda r: r["ts"])
# Assign stable chat_id bases: each instance gets a block of IDs
# Max turns across all instances determines the spacing
max_turns = max(len(turns) for turns in instances.values())
spacing = max_turns + 10 # extra headroom
total_written = 0
with dst.open("w") as out:
for inst_idx, (iid, turns) in enumerate(instances.items()):
base_chat_id = (inst_idx + 1) * spacing # start from spacing to avoid 0
# Track cumulative hash_ids for prefix cache simulation
cumulative_hash_ids: list[int] = []
global_block_counter = inst_idx * 100_000 # unique block namespace per instance
for turn_idx, rec in enumerate(turns):
audit = rec.get("audit", {})
input_length = audit.get("prompt_tokens", 0)
output_length = audit.get("completion_tokens", 0)
if input_length <= 0:
# Fallback: estimate from message content
total_chars = sum(len(m.get("content", "")) for m in rec.get("messages", []))
input_length = max(1, total_chars // 4)
if output_length <= 0:
output_length = 128 # reasonable default
chat_id = base_chat_id + turn_idx
if turn_idx == 0:
parent_chat_id = -1
else:
parent_chat_id = base_chat_id + turn_idx - 1
# Build hash_ids: for turn 0, generate blocks for full input
# For turn N>0, keep previous blocks and add new ones for the delta
if turn_idx == 0:
num_blocks = input_length // BLOCK_TOKEN_BUDGET
cumulative_hash_ids = list(
range(global_block_counter, global_block_counter + num_blocks)
)
global_block_counter += num_blocks
else:
# The new input is the full prompt (cumulative), so the delta
# is the new tokens beyond what was in the previous turn's prompt
prev_input = audit.get("prompt_tokens", 0)
prev_rec_audit = turns[turn_idx - 1].get("audit", {})
prev_input_length = prev_rec_audit.get("prompt_tokens", 0)
delta = max(0, prev_input - prev_input_length) if prev_input_length > 0 else 0
new_blocks = delta // BLOCK_TOKEN_BUDGET
new_ids = list(
range(global_block_counter, global_block_counter + new_blocks)
)
global_block_counter += new_blocks
cumulative_hash_ids = cumulative_hash_ids + new_ids
trace_line = {
"chat_id": chat_id,
"parent_chat_id": parent_chat_id,
"timestamp": rec["ts"],
"turn": turn_idx,
"input_length": input_length,
"output_length": output_length,
"type": "chat",
"hash_ids": cumulative_hash_ids,
}
out.write(json.dumps(trace_line, separators=(",", ":")) + "\n")
total_written += 1
print(f"Converted {total_written} lines from {len(instances)} instances -> {dst}")
if __name__ == "__main__":
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <input_audit.jsonl> <output_trace.jsonl>")
sys.exit(1)
convert(Path(sys.argv[1]), Path(sys.argv[2]))

73
scripts/run_all_experiments.sh Executable file
View File

@@ -0,0 +1,73 @@
#!/bin/bash
# Run all 3 PD hybrid experiments sequentially
# Uses 52 sessions / 4,449 requests (10% sample of 497 sessions)
# Each experiment takes ~30-40 min
set -euo pipefail
cd "$(dirname "$0")/.."
TRACE="outputs/qwen35-swebench-50sess.jsonl"
MODEL="/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B"
OUTPUT="outputs/swebench-exps"
echo "=== Experiment A: pd-disaggregation ==="
uv run agentic-pd-hybrid benchmark-live \
--trace "$TRACE" \
--output-root "$OUTPUT" \
--mechanism pd-disaggregation \
--policy default \
--model-path "$MODEL" \
--prefill-workers 1 --decode-workers 1 \
--prefill-tp-size 4 --decode-tp-size 4 \
--prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300
echo "=== Experiment B: pd-colo ==="
uv run agentic-pd-hybrid benchmark-live \
--trace "$TRACE" \
--output-root "$OUTPUT" \
--mechanism pd-colo \
--policy default \
--model-path "$MODEL" \
--prefill-workers 0 --decode-workers 0 \
--direct-workers 2 --direct-tp-size 4 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300
echo "=== Experiment C: kvcache-centric ==="
uv run agentic-pd-hybrid benchmark-live \
--trace "$TRACE" \
--output-root "$OUTPUT" \
--mechanism kvcache-centric \
--policy default \
--model-path "$MODEL" \
--prefill-workers 1 --decode-workers 1 \
--prefill-tp-size 4 --decode-tp-size 4 \
--prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 2 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
echo "=== All experiments complete ==="

24
scripts/run_exp_a_pd_disagg.sh Executable file
View File

@@ -0,0 +1,24 @@
#!/bin/bash
# Experiment A: pd-disaggregation baseline
# 1P(GPU 0-3) + 1D(GPU 4-7), TP4, mooncake TCP
# Full 39K trace from SWE-Bench 500 instances
set -euo pipefail
cd "$(dirname "$0")/.."
uv run agentic-pd-hybrid benchmark-live \
--trace outputs/qwen35-swebench-500.jsonl \
--output-root outputs/swebench-exps \
--mechanism pd-disaggregation \
--policy default \
--model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \
--prefill-workers 1 --decode-workers 1 \
--prefill-tp-size 4 --decode-tp-size 4 \
--prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 64 \
--timeout-s 900 \
--request-timeout-s 300

View File

@@ -0,0 +1,23 @@
#!/bin/bash
# Experiment B1: Naive DP colocation — round-robin policy
# 2 direct workers (GPU 0-3, 4-7), TP4, DP router with round-robin
# No disaggregation — each worker does prefill+decode locally
set -euo pipefail
cd "$(dirname "$0")/.."
uv run agentic-pd-hybrid benchmark-live \
--trace outputs/qwen35-swebench-50sess.jsonl \
--output-root outputs/swebench-exps \
--mechanism pd-colo \
--policy default \
--model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \
--prefill-workers 0 --decode-workers 0 \
--direct-workers 2 --direct-tp-size 4 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300

View File

@@ -0,0 +1,23 @@
#!/bin/bash
# Experiment B2: Naive DP colocation — cache-aware (kv-aware) policy
# 2 direct workers (GPU 0-3, 4-7), TP4, DP router with consistent-hashing
# Replay kv-aware policy picks the worker with most prefix overlap
set -euo pipefail
cd "$(dirname "$0")/.."
uv run agentic-pd-hybrid benchmark-live \
--trace outputs/qwen35-swebench-50sess.jsonl \
--output-root outputs/swebench-exps \
--mechanism pd-colo \
--policy kv-aware \
--model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \
--prefill-workers 0 --decode-workers 0 \
--direct-workers 2 --direct-tp-size 4 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300

24
scripts/run_exp_b_pd_colo.sh Executable file
View File

@@ -0,0 +1,24 @@
#!/bin/bash
# Experiment B: pd-colo (direct/colocation)
# 2 direct workers (GPU 0-3, 4-7), TP4, no router
# Full 39K trace from SWE-Bench 500 instances
set -euo pipefail
cd "$(dirname "$0")/.."
uv run agentic-pd-hybrid benchmark-live \
--trace outputs/qwen35-swebench-500.jsonl \
--output-root outputs/swebench-exps \
--mechanism pd-colo \
--policy default \
--model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \
--prefill-workers 0 --decode-workers 0 \
--direct-workers 2 --direct-tp-size 4 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 64 \
--timeout-s 900 \
--request-timeout-s 300

View File

@@ -0,0 +1,28 @@
#!/bin/bash
# Experiment C: kvcache-centric (session-aware PD)
# 1P(GPU 0-3) + 1D(GPU 4-7), TP4, mooncake TCP
# Full 39K trace from SWE-Bench 500 instances
set -euo pipefail
cd "$(dirname "$0")/.."
uv run agentic-pd-hybrid benchmark-live \
--trace outputs/qwen35-swebench-500.jsonl \
--output-root outputs/swebench-exps \
--mechanism kvcache-centric \
--policy default \
--model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \
--prefill-workers 1 --decode-workers 1 \
--prefill-tp-size 4 --decode-tp-size 4 \
--prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 64 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 2 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction

30
scripts/smoke_test.sh Executable file
View File

@@ -0,0 +1,30 @@
#!/bin/bash
# Smoke test: pd-disaggregation with mooncake TCP, 100 requests
set -euo pipefail
cd "$(dirname "$0")/.."
# Sample a small trace for smoke testing
uv run agentic-pd-hybrid sample-sessions \
--trace outputs/qwen35-swebench-500.jsonl \
--output outputs/qwen35-smoke-3sess.jsonl \
--session-sample-rate 0.02 \
--min-turns 5 \
--target-duration-s 300 \
--max-requests 100
# Run smoke test
uv run agentic-pd-hybrid benchmark-live \
--trace outputs/qwen35-smoke-3sess.jsonl \
--output-root outputs/smoke \
--mechanism pd-disaggregation \
--policy default \
--model-path /mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3.5-35B-A3B \
--prefill-workers 1 --decode-workers 1 \
--prefill-tp-size 4 --decode-tp-size 4 \
--prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300

60
scripts/sweep_kvc_qwen3_30b.sh Executable file
View File

@@ -0,0 +1,60 @@
#!/bin/bash
# KVC admission control parameter sweep on Qwen3-30B
# 5 experiments, ~35 min each, ~3 hours total
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-exps
VENV_PYTHON=.venv/bin/python
run_kvc() {
local label=$1
local inflight=$2
local min_turn=$3
echo "=== [$label] inflight=$inflight min_turn=$min_turn === $(date)"
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy default \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 1 \
--prefill-tp-size 4 --decode-tp-size 4 \
--prefill-gpu-ids 0,1,2,3 --decode-gpu-ids 4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id $min_turn \
--kvcache-seed-max-inflight-decode $inflight \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
echo "=== [$label] DONE === $(date)"
echo ""
}
# C1: inflight=8, min-turn=2
run_kvc "C1" 8 2
# C2: inflight=16, min-turn=2
run_kvc "C2" 16 2
# C3: inflight=-1 (disabled), min-turn=2
run_kvc "C3" -1 2
# C4: inflight=8, min-turn=1
run_kvc "C4" 8 1
# C5: inflight=-1 (disabled), min-turn=1
run_kvc "C5" -1 1
echo "=== ALL SWEEP EXPERIMENTS DONE === $(date)"

133
scripts/sweep_tp1_configs.sh Executable file
View File

@@ -0,0 +1,133 @@
#!/bin/bash
# TP1 configuration sweep: 8-way DP, 1P7D KVC, 2P6D KVC
# Qwen3-30B-A3B TP=1, single GPU per worker
# Most aggressive KVC admission: inflight=-1 (off), seed-min-turn=1
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-tp1-exps
VENV_PYTHON=.venv/bin/python
RESULTS_FILE=$OUTPUT/sweep_results.txt
mkdir -p $OUTPUT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
}
save_result() {
local label=$1
local run_dir=$2
log "=== $label COMPLETED ==="
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
log "Summary:"
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
echo "" >> $RESULTS_FILE
# Also copy summary to a named file for easy access
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
log "Saved to $OUTPUT/${label}_summary.json"
else
log "WARNING: No summary file found in $run_dir"
fi
}
log "Starting TP1 configuration sweep"
log "Model: $MODEL"
log "Trace: $TRACE (4449 requests, 52 sessions)"
########################################
# Experiment 1: 8-way DP cache-aware
########################################
log ""
log "=== [EXP1] 8-way DP cache-aware (8 direct × TP1) ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism pd-colo \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 0 --decode-workers 0 \
--direct-workers 8 --direct-tp-size 1 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300
# Find latest run dir for this experiment
EXP1_DIR=$(ls -td $OUTPUT/pd-colo-kv-aware-*/ 2>/dev/null | head -1)
save_result "exp1_8way_dp_cache_aware" "$EXP1_DIR"
########################################
# Experiment 2: 1P + 7D KVC (most aggressive)
########################################
log ""
log "=== [EXP2] 1P7D KVC (inflight=off, min-turn=1) ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy default \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 7 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp2_1p7d_kvc_aggressive" "$EXP2_DIR"
########################################
# Experiment 3: 2P + 6D KVC (most aggressive)
########################################
log ""
log "=== [EXP3] 2P6D KVC (inflight=off, min-turn=1) ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy default \
--model-path $MODEL \
--prefill-workers 2 --decode-workers 6 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP3_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp3_2p6d_kvc_aggressive" "$EXP3_DIR"
########################################
log ""
log "=== ALL TP1 SWEEP EXPERIMENTS DONE ==="

131
scripts/sweep_tp1_v2_fixed.sh Executable file
View File

@@ -0,0 +1,131 @@
#!/bin/bash
# TP1 configuration sweep v2 — after session_params fix + audit fields
# Qwen3-30B-A3B TP=1, single GPU per worker
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-tp1-v2-fixed
VENV_PYTHON=.venv/bin/python
RESULTS_FILE=$OUTPUT/sweep_results.txt
mkdir -p $OUTPUT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
}
save_result() {
local label=$1
local run_dir=$2
log "=== $label COMPLETED ==="
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
log "Summary:"
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
echo "" >> $RESULTS_FILE
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl"
log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl"
else
log "WARNING: No summary file found in $run_dir"
fi
}
log "Starting TP1 v2 sweep (session_params fix + audit fields)"
log "Model: $MODEL"
log "Trace: $TRACE (4449 requests, 52 sessions)"
########################################
# Experiment 1: 8-way DP cache-aware
########################################
log ""
log "=== [EXP1] 8-way DP cache-aware (8 direct × TP1) ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism pd-colo \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 0 --decode-workers 0 \
--direct-workers 8 --direct-tp-size 1 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300
EXP1_DIR=$(ls -td $OUTPUT/pd-colo-kv-aware-*/ 2>/dev/null | head -1)
save_result "exp1_8way_dp_cache_aware" "$EXP1_DIR"
########################################
# Experiment 2: 1P + 7D KVC (aggressive)
########################################
log ""
log "=== [EXP2] 1P7D KVC (inflight=off, min-turn=1) ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy default \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 7 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp2_1p7d_kvc_aggressive" "$EXP2_DIR"
########################################
# Experiment 3: 2P + 6D KVC (aggressive)
########################################
log ""
log "=== [EXP3] 2P6D KVC (inflight=off, min-turn=1) ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy default \
--model-path $MODEL \
--prefill-workers 2 --decode-workers 6 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP3_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp3_2p6d_kvc_aggressive" "$EXP3_DIR"
########################################
log ""
log "=== ALL TP1 V2 SWEEP EXPERIMENTS DONE ==="

108
scripts/sweep_tp1_v3_kvaware.sh Executable file
View File

@@ -0,0 +1,108 @@
#!/bin/bash
# TP1 v3 sweep — KVC with kv-aware policy (fix routing mismatch)
# v2 used --policy default for KVC experiments, causing session routing
# mismatch: replay round-robin ≠ router round-robin → "session not found".
# v3 uses --policy kv-aware for KVC to ensure session affinity.
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-tp1-v3-kvaware
VENV_PYTHON=.venv/bin/python
RESULTS_FILE=$OUTPUT/sweep_results.txt
mkdir -p $OUTPUT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
}
save_result() {
local label=$1
local run_dir=$2
log "=== $label COMPLETED ==="
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
log "Summary:"
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
echo "" >> $RESULTS_FILE
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl"
log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl"
else
log "WARNING: No summary file found in $run_dir"
fi
}
log "Starting TP1 v3 sweep (KVC with kv-aware policy)"
log "Model: $MODEL"
log "Trace: $TRACE (4449 requests, 52 sessions)"
log "Key change: --policy kv-aware for KVC (was --policy default in v2)"
########################################
# Experiment 1: 1P + 7D KVC kv-aware
########################################
log ""
log "=== [EXP1] 1P7D KVC kv-aware ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 7 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp1_1p7d_kvc_kvaware" "$EXP1_DIR"
########################################
# Experiment 2: 2P + 6D KVC kv-aware
########################################
log ""
log "=== [EXP2] 2P6D KVC kv-aware ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 2 --decode-workers 6 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp2_2p6d_kvc_kvaware" "$EXP2_DIR"
########################################
log ""
log "=== ALL TP1 V3 SWEEP EXPERIMENTS DONE ==="

108
scripts/sweep_tp1_v4_cap16.sh Executable file
View File

@@ -0,0 +1,108 @@
#!/bin/bash
# TP1 v4 sweep — KVC with kv-aware policy + soft_cap raised from 4 to 16
# v3 (kv-aware) fixed routing but session-cap fallback still dominated 52-65%
# of requests. Hardcoded min(4, ...) in _decode_session_soft_cap was the
# bottleneck — only 4*7=28 session slots for 52 trace sessions.
# v4 raises the cap to 16 (4*7=28 -> 16*7=112 slots).
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-tp1-v4-cap16
VENV_PYTHON=.venv/bin/python
RESULTS_FILE=$OUTPUT/sweep_results.txt
mkdir -p $OUTPUT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
}
save_result() {
local label=$1
local run_dir=$2
log "=== $label COMPLETED ==="
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
log "Summary:"
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
echo "" >> $RESULTS_FILE
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl"
log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl"
else
log "WARNING: No summary file found in $run_dir"
fi
}
log "Starting TP1 v4 sweep (KVC kv-aware, session soft_cap raised 4->16)"
log "Model: $MODEL"
log "Trace: $TRACE (4449 requests, 52 sessions)"
log "Key change: _decode_session_soft_cap now min(16, ...) instead of min(4, ...)"
########################################
# Experiment 1: 1P + 7D KVC kv-aware (cap=16)
########################################
log ""
log "=== [EXP1] 1P7D KVC kv-aware cap=16 ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 7 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp1_1p7d_kvc_cap16" "$EXP1_DIR"
########################################
# Experiment 2: 2P + 6D KVC kv-aware (cap=16)
########################################
log ""
log "=== [EXP2] 2P6D KVC kv-aware cap=16 ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 2 --decode-workers 6 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp2_2p6d_kvc_cap16" "$EXP2_DIR"
log ""
log "=== ALL TP1 V4 SWEEP EXPERIMENTS DONE ==="

View File

@@ -119,6 +119,8 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
try:
signal.signal(signal.SIGINT, _handle_termination)
signal.signal(signal.SIGTERM, _handle_termination)
_mechanisms_with_router = {"pd-disaggregation", "kvcache-centric", "pd-colo"}
_naive_dp = config.mechanism_name == "pd-colo"
if config.launch_stack:
stack = launch_pd_stack(
topology=topology,
@@ -132,18 +134,19 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
else config.timeout_s
),
include_router=(
config.mechanism_name in {"pd-disaggregation", "kvcache-centric"}
config.mechanism_name in _mechanisms_with_router
),
naive_dp=_naive_dp,
)
router_url = (
stack.router_url
if config.mechanism_name in {"pd-disaggregation", "kvcache-centric"}
if config.mechanism_name in _mechanisms_with_router
else None
)
else:
router_url = (
topology.router_url
if config.mechanism_name in {"pd-disaggregation", "kvcache-centric"}
if config.mechanism_name in _mechanisms_with_router
else None
)

View File

@@ -455,11 +455,18 @@ def main() -> None:
if args.command == "print-launch":
topology = _topology_from_args(args)
has_pd = bool(topology.prefill_workers and topology.decode_workers)
has_direct_only = bool(
topology.direct_workers
and not topology.prefill_workers
and not topology.decode_workers
)
plan = build_launch_plan(
topology,
prefill_policy=args.prefill_policy,
decode_policy=args.decode_policy,
include_router=bool(topology.prefill_workers and topology.decode_workers),
include_router=has_pd or has_direct_only,
naive_dp=has_direct_only,
)
print(plan.render())
return

View File

@@ -34,7 +34,24 @@ def build_launch_plan(
decode_policy: str = "manual",
include_router: bool = True,
router_request_timeout_s: float | None = None,
naive_dp: bool = False,
) -> LaunchPlan:
router_command: tuple[str, ...] | None = None
if include_router:
if topology.prefill_workers and topology.decode_workers:
router_command = _build_router_command(
topology,
prefill_policy=prefill_policy,
decode_policy=decode_policy,
request_timeout_s=router_request_timeout_s,
)
elif naive_dp and topology.direct_workers:
router_command = _build_dp_router_command(
topology,
backend_policy=decode_policy,
request_timeout_s=router_request_timeout_s,
)
return LaunchPlan(
prefill_commands=tuple(
_build_server_command(topology, worker) for worker in topology.prefill_workers
@@ -43,24 +60,17 @@ def build_launch_plan(
_build_server_command(topology, worker) for worker in topology.decode_workers
),
direct_commands=tuple(
_build_server_command(topology, worker) for worker in topology.direct_workers
),
router_command=(
_build_router_command(
topology,
prefill_policy=prefill_policy,
decode_policy=decode_policy,
request_timeout_s=router_request_timeout_s,
)
if include_router and topology.prefill_workers and topology.decode_workers
else None
_build_server_command(topology, worker, naive_dp=naive_dp)
for worker in topology.direct_workers
),
router_command=router_command,
)
def _build_server_command(
topology: SingleNodeTopology,
worker: WorkerSpec,
naive_dp: bool = False,
) -> tuple[str, ...]:
command = [
sys.executable,
@@ -76,11 +86,15 @@ def _build_server_command(
str(worker.port),
"--base-gpu-id",
str(worker.gpu_id),
"--disaggregation-mode",
_disaggregation_mode_for(worker),
"--disaggregation-transfer-backend",
topology.transfer_backend,
]
# Naive DP direct workers: no disaggregation flags at all
if not (naive_dp and worker.role == "direct"):
command.extend([
"--disaggregation-mode",
_disaggregation_mode_for(worker),
"--disaggregation-transfer-backend",
topology.transfer_backend,
])
if worker.tp_size > 1:
command.extend(["--tp-size", str(worker.tp_size)])
if topology.trust_remote_code:
@@ -135,6 +149,32 @@ def _build_router_command(
return tuple(command)
def _build_dp_router_command(
topology: SingleNodeTopology,
*,
backend_policy: str,
request_timeout_s: float | None,
) -> tuple[str, ...]:
command: list[str] = [
sys.executable,
"-B",
"-u",
"-m",
"agentic_pd_hybrid.pd_router",
"--host",
topology.router_host,
"--port",
str(topology.router_port),
"--backend-policy",
backend_policy,
]
if request_timeout_s is not None:
command.extend(["--request-timeout-s", str(request_timeout_s)])
for worker in topology.direct_workers:
command.extend(["--backend", worker.url])
return tuple(command)
def _render_named_command(name: str, command: tuple[str, ...]) -> str:
return f"# {name}\n" + " ".join(shlex.quote(part) for part in command)

View File

@@ -43,6 +43,9 @@ class RequestMetrics:
ttft_s: float | None
tpot_s: float | None
error: str | None = None
actual_output_tokens: int | None = None
requested_output_tokens: int | None = None
finish_reason: str | None = None
@classmethod
def from_decision(
@@ -63,6 +66,9 @@ class RequestMetrics:
prefill_request_priority: int | None = None,
decode_request_priority: int | None = None,
error: str | None = None,
actual_output_tokens: int | None = None,
requested_output_tokens: int | None = None,
finish_reason: str | None = None,
) -> "RequestMetrics":
return cls(
request_id=request.request_id,
@@ -95,6 +101,9 @@ class RequestMetrics:
ttft_s=ttft_s,
tpot_s=tpot_s,
error=error,
actual_output_tokens=actual_output_tokens,
requested_output_tokens=requested_output_tokens,
finish_reason=finish_reason,
)
@@ -158,6 +167,17 @@ def write_summary_json(
str(key): value for key, value in sorted(decode_priorities.items())
},
"error_count": sum(1 for row in rows if row.error is not None),
"truncated_request_count": sum(
1
for row in rows
if row.actual_output_tokens is not None
and row.requested_output_tokens is not None
and row.requested_output_tokens > 1
and row.actual_output_tokens < row.requested_output_tokens * 0.5
),
"actual_output_tokens_stats": _stats(
[float(row.actual_output_tokens) for row in rows if row.actual_output_tokens is not None]
),
}
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:

View File

@@ -74,8 +74,58 @@ class RouterState:
return idx
@dataclass
class DpRouterConfig:
host: str
port: int
backend_urls: list[str]
backend_policy: str = "round_robin"
request_timeout_s: float = 1800.0
class DpRouterState:
"""DP (data-parallel) router: forward each request to exactly one backend."""
def __init__(self, config: DpRouterConfig):
if not config.backend_urls:
raise ValueError("At least one backend worker is required")
self.config = config
self.cursor = 0
self.sticky_map: dict[str, int] = {}
def select_backend(self, headers: dict[str, str]) -> str:
idx = self._select_index(headers)
return self.config.backend_urls[idx]
def _select_index(self, headers: dict[str, str]) -> int:
target_worker = headers.get("x-smg-target-worker")
routing_key = headers.get("x-smg-routing-key")
if (
self.config.backend_policy == "consistent_hashing"
and target_worker is not None
):
idx = int(target_worker)
if 0 <= idx < len(self.config.backend_urls):
return idx
if self.config.backend_policy == "manual" and routing_key:
cached = self.sticky_map.get(routing_key)
if cached is not None:
return cached
idx = self.cursor % len(self.config.backend_urls)
self.cursor += 1
self.sticky_map[routing_key] = idx
return idx
idx = self.cursor % len(self.config.backend_urls)
self.cursor += 1
return idx
app = FastAPI()
router_state: RouterState | None = None
dp_state: DpRouterState | None = None
@app.get("/health")
@@ -85,6 +135,16 @@ async def health() -> Response:
@app.get("/health_generate")
async def health_generate() -> Response:
if dp_state is not None:
async with aiohttp.ClientSession() as session:
tasks = [
session.get(f"{url}/health_generate")
for url in dp_state.config.backend_urls
]
for response in asyncio.as_completed(tasks):
async with await response:
pass
return Response(status_code=200)
state = _require_state()
async with aiohttp.ClientSession() as session:
tasks = []
@@ -101,6 +161,11 @@ async def health_generate() -> Response:
@app.get("/v1/models")
async def models() -> ORJSONResponse:
if dp_state is not None:
async with aiohttp.ClientSession() as session:
async with session.get(f"{dp_state.config.backend_urls[0]}/v1/models") as resp:
payload = await resp.json()
return ORJSONResponse(payload, status_code=resp.status)
state = _require_state()
async with aiohttp.ClientSession() as session:
async with session.get(f"{state.config.prefill_urls[0][0]}/v1/models") as response:
@@ -147,6 +212,15 @@ async def _forward_to_backend(
headers: dict[str, str],
endpoint_name: str,
) -> Response:
# DP mode: forward to a single backend
if dp_state is not None:
return await _forward_to_dp_backend(
request_data=request_data,
headers=headers,
endpoint_name=endpoint_name,
)
# PD mode: coordinate prefill + decode
state = _require_state()
prefill_server, bootstrap_port, decode_server = state.select_pair(headers)
prefill_request, decode_request = _build_backend_requests(
@@ -186,6 +260,63 @@ async def _forward_to_backend(
)
async def _forward_to_dp_backend(
*,
request_data: dict,
headers: dict[str, str],
endpoint_name: str,
) -> Response:
assert dp_state is not None
backend_server = dp_state.select_backend(headers)
cleaned = _strip_internal_fields(request_data)
timeout_s = dp_state.config.request_timeout_s
if request_data.get("stream", False):
return StreamingResponse(
_stream_dp_generate(
request_data=cleaned,
backend_server=backend_server,
endpoint_name=endpoint_name,
timeout_s=timeout_s,
),
media_type="text/event-stream",
)
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=timeout_s)
) as session:
async with session.post(
f"{backend_server}/{endpoint_name}", json=cleaned
) as response:
body = await response.read()
return Response(
content=body,
status_code=response.status,
media_type=response.content_type,
)
async def _stream_dp_generate(
*,
request_data: dict,
backend_server: str,
endpoint_name: str,
timeout_s: float,
) -> AsyncIterator[bytes]:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=timeout_s)
) as session:
async with session.post(
f"{backend_server}/{endpoint_name}", json=request_data
) as response:
if response.status != HTTPStatus.OK:
payload = await response.read()
yield payload
return
async for chunk in response.content.iter_chunked(_STREAM_CHUNK_SIZE):
yield chunk
async def _stream_generate(
*,
prefill_request: dict,
@@ -241,6 +372,12 @@ def _build_backend_requests(
prefill_request.update(bootstrap_payload)
decode_request.update(bootstrap_payload)
# session_params is only meaningful for the decode worker (streaming session
# KV reuse). Sending it to the prefill worker causes the D side to
# short-circuit with local-prefill on already-open sessions, returning
# truncated responses while P's KV transfer gets aborted.
prefill_request.pop("session_params", None)
if prefill_priority is not None:
prefill_request["priority"] = int(prefill_priority)
if decode_priority is not None:
@@ -262,7 +399,7 @@ def _require_state() -> RouterState:
def main() -> None:
parser = argparse.ArgumentParser(description="Minimal local PD router")
parser = argparse.ArgumentParser(description="Minimal local PD / DP router")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument(
@@ -270,30 +407,58 @@ def main() -> None:
nargs=2,
metavar=("URL", "BOOTSTRAP_PORT"),
action="append",
required=True,
default=None,
)
parser.add_argument(
"--decode",
action="append",
required=True,
default=None,
)
parser.add_argument("--prefill-policy", default="round_robin")
parser.add_argument("--decode-policy", default="manual")
parser.add_argument(
"--backend",
action="append",
default=None,
help="Backend URL for DP (data-parallel) mode. Repeat for each worker.",
)
parser.add_argument(
"--backend-policy",
default="round_robin",
help="Routing policy for DP mode: round_robin, manual, consistent_hashing.",
)
parser.add_argument("--request-timeout-s", type=float, default=1800.0)
args = parser.parse_args()
global router_state
router_state = RouterState(
RouterConfig(
host=args.host,
port=args.port,
prefill_urls=[(url, int(port)) for url, port in args.prefill],
decode_urls=list(args.decode),
prefill_policy=args.prefill_policy,
decode_policy=args.decode_policy,
request_timeout_s=args.request_timeout_s,
global router_state, dp_state
if args.backend:
# DP mode: simple forward to one of N backends
dp_state = DpRouterState(
DpRouterConfig(
host=args.host,
port=args.port,
backend_urls=list(args.backend),
backend_policy=args.backend_policy,
request_timeout_s=args.request_timeout_s,
)
)
)
elif args.prefill and args.decode:
# PD mode: prefill/decode coordination
router_state = RouterState(
RouterConfig(
host=args.host,
port=args.port,
prefill_urls=[(url, int(port)) for url, port in args.prefill],
decode_urls=list(args.decode),
prefill_policy=args.prefill_policy,
decode_policy=args.decode_policy,
request_timeout_s=args.request_timeout_s,
)
)
else:
parser.error("Either --backend (DP mode) or both --prefill and --decode (PD mode) are required")
uvicorn.run(app, host=args.host, port=args.port, log_level="info")

View File

@@ -124,6 +124,9 @@ class ExecutionResult:
prefill_request_priority: int | None = None
decode_request_priority: int | None = None
error: str | None = None
actual_output_tokens: int | None = None
requested_output_tokens: int | None = None
finish_reason: str | None = None
async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
@@ -274,6 +277,9 @@ async def _run_request(
prefill_request_priority=execution.prefill_request_priority,
decode_request_priority=execution.decode_request_priority,
error=execution.error,
actual_output_tokens=execution.actual_output_tokens,
requested_output_tokens=execution.requested_output_tokens,
finish_reason=execution.finish_reason,
)
@@ -286,7 +292,7 @@ async def _invoke_router(
session_id: str | None = None,
prefill_request_priority: int | None = None,
decode_request_priority: int | None = None,
) -> tuple[float, float | None, float | None, int]:
) -> GenerateResult:
headers = _build_headers(
request=request,
header_mode=config.header_mode,
@@ -414,6 +420,18 @@ async def _invoke_chat_completion(
return latency_s, ttft_s, tpot_s, cached_tokens
@dataclass(frozen=True)
class GenerateResult:
latency_s: float
ttft_s: float | None
tpot_s: float | None
cached_tokens: int
actual_output_tokens: int
requested_output_tokens: int
finish_reason: str | None
server_meta_info: dict | None
async def _invoke_generate(
*,
client: httpx.AsyncClient,
@@ -423,12 +441,16 @@ async def _invoke_generate(
timeout_s: float,
stream_idle_timeout_s: float | None,
stream: bool,
) -> tuple[float, float | None, float | None, int]:
) -> GenerateResult:
start = time.perf_counter()
ttft_s: float | None = None
cached_tokens = 0
sampling_params = payload.get("sampling_params", {})
generated_tokens = int(sampling_params.get("max_new_tokens", 1))
requested_output_tokens = int(sampling_params.get("max_new_tokens", 1))
actual_token_count = 0
finish_reason: str | None = None
last_meta_info: dict | None = None
if stream:
async with client.stream(
"POST",
@@ -452,8 +474,19 @@ async def _invoke_generate(
if isinstance(error, dict):
raise ValueError(error.get("message", json.dumps(error)))
cached_tokens = max(cached_tokens, _extract_generate_cached_tokens(parsed))
if _contains_generate_token(parsed) and ttft_s is None:
ttft_s = time.perf_counter() - start
if _contains_generate_token(parsed):
actual_token_count += 1
if ttft_s is None:
ttft_s = time.perf_counter() - start
meta_info = parsed.get("meta_info")
if isinstance(meta_info, dict):
last_meta_info = meta_info
completion_tokens = int(meta_info.get("completion_tokens", 0))
if completion_tokens > actual_token_count:
actual_token_count = completion_tokens
fr = meta_info.get("finish_reason")
if fr is not None:
finish_reason = str(fr)
if _is_generate_terminal_chunk(parsed):
break
else:
@@ -469,15 +502,33 @@ async def _invoke_generate(
if isinstance(error, dict):
raise ValueError(error.get("message", json.dumps(error)))
cached_tokens = _extract_generate_cached_tokens(parsed)
meta_info = parsed.get("meta_info")
if isinstance(meta_info, dict):
last_meta_info = meta_info
actual_token_count = int(meta_info.get("completion_tokens", 0))
finish_reason = meta_info.get("finish_reason")
latency_s = time.perf_counter() - start
if stream and ttft_s is None and generated_tokens > 0:
if stream and ttft_s is None and requested_output_tokens > 0:
raise RuntimeError("generate stream ended before producing any token")
# Use actual token count for TPOT (not requested count)
effective_tokens = max(1, actual_token_count) if actual_token_count > 0 else max(1, requested_output_tokens)
if ttft_s is None:
tpot_s = None
else:
tpot_s = max(0.0, latency_s - ttft_s) / max(1, generated_tokens)
return latency_s, ttft_s, tpot_s, cached_tokens
tpot_s = max(0.0, latency_s - ttft_s) / effective_tokens
return GenerateResult(
latency_s=latency_s,
ttft_s=ttft_s,
tpot_s=tpot_s,
cached_tokens=cached_tokens,
actual_output_tokens=actual_token_count,
requested_output_tokens=requested_output_tokens,
finish_reason=finish_reason,
server_meta_info=last_meta_info,
)
async def _open_streaming_session(
@@ -850,8 +901,8 @@ def _decode_session_soft_cap(
- residency.headroom_tokens.get(server_url, 0),
)
if usable_capacity_tokens <= 0:
return 4
return max(1, min(4, usable_capacity_tokens // target_tokens))
return 16
return max(1, min(16, usable_capacity_tokens // target_tokens))
def _should_admit_new_decode_session(
@@ -1587,7 +1638,7 @@ async def _invoke_plain_router(
config=config,
direct_to_d_predicted=False,
)
latency_s, ttft_s, tpot_s, cached_tokens = await _invoke_router(
gen = await _invoke_router(
client=client,
request=request,
config=config,
@@ -1598,13 +1649,16 @@ async def _invoke_plain_router(
execution_mode=execution_mode,
actual_kv_transfer_blocks=decision.kv_transfer_blocks,
effective_input_length=request.input_length,
cached_tokens=cached_tokens,
cached_tokens=gen.cached_tokens,
prefill_request_priority=prefill_priority,
session_reused=False,
session_reset=False,
latency_s=latency_s,
ttft_s=ttft_s,
tpot_s=tpot_s,
latency_s=gen.latency_s,
ttft_s=gen.ttft_s,
tpot_s=gen.tpot_s,
actual_output_tokens=gen.actual_output_tokens,
requested_output_tokens=gen.requested_output_tokens,
finish_reason=gen.finish_reason,
)
@@ -1676,7 +1730,7 @@ async def _invoke_kvcache_seeded_router(
decode_session.opened = True
decode_session_newly_opened = True
decode_session.active_requests += 1
latency_s, ttft_s, tpot_s, cached_tokens = await _invoke_router(
gen = await _invoke_router(
client=client,
request=request,
config=config,
@@ -1742,13 +1796,16 @@ async def _invoke_kvcache_seeded_router(
execution_mode=execution_mode,
actual_kv_transfer_blocks=decision.kv_transfer_blocks,
effective_input_length=request.input_length,
cached_tokens=cached_tokens,
cached_tokens=gen.cached_tokens,
prefill_request_priority=prefill_priority,
session_reused=False,
session_reset=False,
latency_s=latency_s,
ttft_s=ttft_s,
tpot_s=tpot_s,
latency_s=gen.latency_s,
ttft_s=gen.ttft_s,
tpot_s=gen.tpot_s,
actual_output_tokens=gen.actual_output_tokens,
requested_output_tokens=gen.requested_output_tokens,
finish_reason=gen.finish_reason,
)
@@ -1774,14 +1831,16 @@ async def _execute_request(
)
if config.mechanism_name == "pd-colo":
return await _invoke_direct(
if not config.router_url:
raise ValueError("router_url is required for pd-colo replay")
result = await _invoke_plain_router(
client=client,
request=request,
config=config,
decision=decision,
direct_sessions=direct_sessions,
direct_session_lock=direct_session_lock,
execution_mode="dp-colo-router",
)
return replace(result, actual_kv_transfer_blocks=0)
if config.mechanism_name == "kvcache-centric":
if not config.router_url:
@@ -2238,7 +2297,7 @@ async def _invoke_session_direct(
session.active_requests += 1
try:
latency_s, ttft_s, tpot_s, cached_tokens = await _invoke_generate(
gen = await _invoke_generate(
client=client,
base_url=session.server_url,
headers={"x-request-id": request.request_id},
@@ -2277,12 +2336,15 @@ async def _invoke_session_direct(
execution_mode=execution_mode,
actual_kv_transfer_blocks=0,
effective_input_length=len(input_ids),
cached_tokens=cached_tokens,
cached_tokens=gen.cached_tokens,
session_reused=session_reused,
session_reset=session_reset,
latency_s=latency_s,
ttft_s=ttft_s,
tpot_s=tpot_s,
latency_s=gen.latency_s,
ttft_s=gen.ttft_s,
tpot_s=gen.tpot_s,
actual_output_tokens=gen.actual_output_tokens,
requested_output_tokens=gen.requested_output_tokens,
finish_reason=gen.finish_reason,
)

View File

@@ -66,6 +66,7 @@ def launch_pd_stack(
timeout_s: float = 1200.0,
router_request_timeout_s: float | None = None,
include_router: bool = True,
naive_dp: bool = False,
) -> ManagedPdStack:
run_dir.mkdir(parents=True, exist_ok=True)
logs_dir = run_dir / "logs"
@@ -77,6 +78,7 @@ def launch_pd_stack(
decode_policy=decode_policy,
include_router=include_router,
router_request_timeout_s=router_request_timeout_s,
naive_dp=naive_dp,
)
prefill_processes = [
@@ -195,6 +197,9 @@ def _build_process_env(topology: SingleNodeTopology) -> dict[str, str]:
env["MC_MS_AUTO_DISC"] = "0"
if topology.ib_device:
env["MOONCAKE_DEVICE"] = topology.ib_device
elif topology.transfer_backend == "mooncake":
# Default to TCP when RDMA is not forced (e.g. loopback on same node)
env.setdefault("MOONCAKE_PROTOCOL", "tcp")
repo_root = Path(__file__).resolve().parents[2]
python_paths = [

View File

@@ -189,10 +189,11 @@ class MooncakeTransferEngine:
device_name if device_name is not None else "",
)
else:
protocol = os.environ.get("MOONCAKE_PROTOCOL", "rdma")
ret_value = self.engine.initialize(
hostname,
"P2PHANDSHAKE",
"rdma",
protocol,
device_name if device_name is not None else "",
)
if ret_value != 0: