2 Commits

Author SHA1 Message Date
7568e041ff docs(kvc): record real Ali KVC experiment results 2026-05-12 05:28:06 +00:00
4e8f943875 feat(kvc): add real Ali replay workflow 2026-05-12 05:28:00 +00:00
6 changed files with 1415 additions and 21 deletions

View File

@@ -0,0 +1,514 @@
# Real Ali KVC 实验日志
**分支**`kvc-real-ali-iter-v1`,从 `kvc-debug-journey-v1-to-v4` checkout 出来。
**日期**2026-05-11/12。
**环境**:单机 8x NVIDIA H20SGLang xPyD模型 `/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct`
**真实 trace**`/home/admin/cpfs/wjh/ali-trace/trace-qwen3-coder-formatted/041715-041717.jsonl`
本日志记录真实 Ali workload 上的 KVC pd-hybrid 迭代。结论只按当前证据成立;`time-scale=10` smoke 和 KVC-friendly slice 不作为 full workload headline。
## 1. 当前最新进展
已新增真实 Ali trace 的固定样本和 sweep 管线:
- `scripts/prepare_real_ali_samples.py`:从真实 Ali trace 生成可复现实验样本,保留真实 input/output/hash_ids/timestamp可选择 rebase timestamp。
- `scripts/sweep_real_ali_kvc.sh`:对同一 prebuilt sample 依次跑 DP cache-aware、PD-disaggregation、KVC、KVC+backpressure。
- `benchmark-live --use-trace-as-sample`:直接 replay 指定 trace避免不同策略重新采样导致不可比。
- `replay-progress.jsonl` heartbeat后续长跑会每 30s 写客户端侧进度,不轮询 `/server_info`,避免扰动 scheduler。
- `prepare_real_ali_samples.py --max-sampled-duration-s`:为快速 smoke 生成 capped sample只用于迭代不用于 headline。
已经完成的真实 Ali KVC-fit smoke
- 样本:`outputs/real-ali-kvc-iter/samples-balanced/ali-kvc-fit-smallappend.jsonl`
- 179 requests64 sessions全部 multi-turnturn2+ 共 115 个direct-eligible ratio 100%。
- `time-scale=10`concurrency 32。
- DP cache-aware、PD-disaggregation、KVC no-backpressure、KVC+backpressure 均已完成。
## 2. 全量 Ali trace 画像
`outputs/real-ali-kvc-iter/ali-full-profile.json` 显示:
| 指标 | 数值 |
|---|---:|
| requests | 763,727 |
| sessions | 555,905 |
| multi-turn sessions | 39,247 |
| turn2+ requests | 207,822 |
| turn2+ direct-eligible ratio | 82.95% |
| input p50 / p90 / p99 | 4,329 / 51,067 / 112,955 tokens |
| output p50 / p90 / p99 | 93 / 826 / 5,616 tokens |
| append p50 / p90 / p99 | 303 / 2,879 / 17,885 tokens |
| inter-turn gap p50 / p90 / p99 | 4.65s / 38.68s / 1,133s |
这个 profile 说明 KVC 有真实适用面turn2+ 的 hash overlap 和小 append 很常见。但 full workload 里 single-turn session 极多KVC 收益会被显著稀释;因此必须分 slice 报告,不能只报 KVC-fit 子集。
## 3. 已跑样本
### Continuous 15min cold-window session sample
路径:`outputs/real-ali-kvc-iter/samples-window-900s-600req/ali-window.jsonl`
- 600 requests439 sessions32 multi-turn sessions。
- rebased duration886.544s,覆盖约 15min。
- turn2+ requests161direct-eligible143ratio 88.8%。
- input p50 / p90 / p993,871 / 68,234 / 98,131。
- output p50 / p90 / p9985 / 712 / 5,195。
- append p50 / p90 / p99274 / 2,202 / 16,120。
- inter-turn gap p50 / p90 / p994.656s / 19.376s / 63.575s。
这是对 179-request KVC-fit smoke 的替代验证样本。它按 900s 窗口分成 15 个时间桶,轮转选择窗口内从 root 开始的整 session直到达到 600 requests。这样避免 parent 缺失导致 `load_trace()` 把真实 session 切碎,也让请求覆盖整个 15min而不是只取窗口开头 600 条。
重要边界:它是 **cold-window / new-session-only** sample不是完整 raw production window它排除了窗口开始前已经活跃的 ongoing sessions。因此可以用于“600+ 请求、15min、真实混合负载”的稳定性验证但不能单独代表全量 Ali production window。
### KVC-fit small append
路径:`outputs/real-ali-kvc-iter/samples-balanced/ali-kvc-fit-smallappend.jsonl`
- 179 requests64 sessions。
- input p50 / p906,446 / 15,491。
- output p50 / p90112 / 1,159。
- append p50 / p90215 / 855。
- overlap ratio p50 / p900.875 / 0.938。
这是 KVC-friendly slice用来验证机制上限和 microbenchmark 是否能迁移到真实 token/hash 序列。
### Representative-mt / early multi-turn balanced
路径:`outputs/real-ali-kvc-iter/samples-balanced/ali-representative-mt.jsonl`
- 460 requests64 sessions。
- input p50 / p9041,175 / 98,621。
- append p50 / p90 / p99272 / 1,979 / 13,900。
这个样本更接近真实 multi-turn 压力,后续用于验证大上下文、大 resident KV 下是否仍能稳定。但它当前实现是“从 start_time 后取最早 64 个 multi-turn session”不是严格随机或分层 representative正式 headline 需要按 input/append/output/gap 分层抽样。
### Capped smoke samples
为避免少数真实长 gap 让 smoke 浪费大量 wall time新增
- `outputs/real-ali-kvc-iter/samples-balanced-cap120s/ali-kvc-fit-smallappend.jsonl`177 requests64 sessionsduration 65.859s。
- `outputs/real-ali-kvc-iter/samples-balanced-cap120s/ali-representative-mt.jsonl`359 requests64 sessionsduration 117.366s。
这些样本去掉了 KVC-fit 原样本末尾 timestamp 3613s 和 5414s 的两个请求,因此只能用于快速工程迭代;正式对比仍应使用完整样本或真实连续窗口。
## 4. 当前结果
### 4.1 DP cache-aware vs KVC+backpressure, KVC-fit, time-scale=10
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 |
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|
| 8-way DP cache-aware | 179 | 0 | 0 | 6.603s | 3.126s | 17.639s | 34.582s | 1.112s | 1.052s |
| KVC 2P6D + worker admission + backpressure | 179 | 0 | 0 | 4.443s | 2.076s | 13.288s | 21.202s | 0.700s | 0.154s |
Paired comparisonKVC - DP
- overall E2E mean delta-2.161sp50 delta-1.427s152/179 wins。
- turn2+ direct 子集mean delta -2.503sp50 delta -1.508s103/115 wins。
- turn2+ TTFT mean delta-0.930sp50 delta -0.887s。
执行路径:
- KVC turn1 seed64 requests。
- `kvcache-direct-to-d-session`115 requests。
- session reused115。
- actual KV transfer blocks623。
结构日志:
- admission probes179全为 `ok`
- transfer queue depthp50=0p90=2max=3。
- backpressure event0。
解释:这轮证明的是 **KVC direct-to-D/session reuse** 在真实 Ali KVC-fit slice 上有正信号;不是证明 backpressure 有效,因为没有触发 backpressure。
### 4.2 PD-disaggregation baseline, KVC-fit, time-scale=10
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 |
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|
| PD-disaggregation 2P6D | 179 | 0 | 0 | 7.850s | 6.306s | 15.192s | 22.405s | 4.994s | 5.336s |
Paired comparisonPD - DP
- overall E2E mean delta+1.247s。
- p50 delta+2.231s。
- 46/179 faster133/179 slower。
解释:在这个 KVC-fit slice 上,普通 PD-disaggregation 明显弱于 8-way DP cache-aware。它付出了 P->D transfer 和拆分调度成本,却没有 KVC direct-to-D 的 bypass 收益。
### 4.3 KVC no-backpressure 消融, KVC-fit, time-scale=10
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 |
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|
| KVC 2P6D worker admission, no backpressure | 179 | 0 | 0 | 4.404s | 1.936s | 13.200s | 21.326s | 0.604s | 0.139s |
Paired comparison
- KVC no-BP vs DPmean delta -2.200sp50 delta -1.434s153/179 wins。
- KVC no-BP vs PD-disaggregationmean delta -3.447sp50 delta -3.514s163/179 wins。
- KVC no-BP vs KVC+BPmean delta -0.039sp50 delta -0.005s92/179 wins。
结构分析:
- direct-to-D rate64.25%。
- admission probes179全为 `ok`
- transfer queue depthp50=0p90=2max=3。
- pause_ms 全 0backpressure event 0。
解释no-backpressure 与 +backpressure 几乎等价,说明本 slice 没有 D 压力;本轮提升来自 direct-to-D不来自反压。
### 4.4 Continuous 15min / 600-request window, time-scale=1
样本:`outputs/real-ali-kvc-iter/samples-window-900s-600req/ali-window.jsonl`
重要边界:这是 cold-window / new-session-only session sample不是完整 raw window。它覆盖约 15min`missing_parent_count=0`,但排除了窗口开始前已活跃的 ongoing sessions。
运行结果:
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 | TTFT p90 |
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
| DP cache-aware 8-way | 600 | 1 | 0 | 13.942s | 5.222s | 29.299s | 151.183s | 6.162s | 1.746s | 19.176s |
| PD-disaggregation 2P6D | 600 | 1 | 0 | 40.886s | 40.018s | 84.681s | 113.460s | 38.545s | 37.782s | 81.852s |
| KVC 2P6D mem_fraction_static=0.82 | 600 | 53 | 0 | 12.386s | 4.225s | 37.998s | 78.234s | 10.078s | 2.674s | 27.774s |
KVC 默认启动失败:
- 默认 KVC 2P6D 在 H20 上两次启动 OOM均未进入 replay。
- 日志显示 decode/prefill worker 启动时只剩约 526MB模型加载阶段 OOM。
- `--load-format layered` 不支持 Qwen3-Coder-30B-A3B。
- 使用 `--mem-fraction-static 0.82` 后 KVC 能启动并完成 replay但这降低了 KV pool 容量,因此这轮 KVC 是 memory-constrained rerun。
- 尝试 `KVC_SEED_MIN_TURN_ID=2` + `mem_fraction_static=0.82` 时,启动阶段 scheduler 被 SIGKILL疑似 OS OOM killer未进入 replay。
Paired comparison只在两边都有 latency 的 547 个 paired request 上计算):
- KVC vs DPmean delta -1.335sp50 delta -0.055sp90 delta +19.371s284 wins / 263 losses。
- KVC vs PDmean delta -28.341sp50 delta -25.687sp90 delta +2.834s465 wins / 82 losses。
KVC 结构数据:
- execution modes388 `pd-router-turn1-seed`90 `kvcache-direct-to-d-session`67 `pd-router-fallback-large-append-session-cap`1 `pd-router-large-append-reseed`1 `pd-router-turn1-d-backpressure`53 `kvcache-centric` error rows。
- direct-to-D rate15.0%。
- direct-to-D session 分布413/439 sessions 在 0-20% direct rate只有 6 sessions 在 80-100%。
- admission probes533reason `ok` 531`no-space` 2queue depth p50=0p90=2max=5。
- pause hint 非零 20 次,但没有 backpressure event因为本轮 no-BP。
KVC error breakdown
- 50 `ReadTimeout`
- 2 `HTTPStatusError 400 Bad Request` on `open_session`
- 1 context length error同 DP/PD 的 `input_length=310521 > 262144`
- 错误主要集中在 turn150 turn13 turn2+。
解释:
1. KVC 相对普通 PD 仍明显更好,说明普通 P->D disaggregation 在真实 600-request 窗口上成本很高。
2. KVC 相对 DP 只在 clean request 的 mean/p50 上有小幅正信号,但 p90 变差,而且 error_count 从 DP 的 1 增到 53。
3. 因此在这个 600-request / 15min window 上,**KVC 不能算稳定提升系统**。主要问题不是 direct-to-D 快路径无效,而是该快路径覆盖率只有 15%,并且 turn1 seed / session admission / memory-constrained KV pool 引入大量 timeout。
4. 这直接修正 179-request KVC-fit smoke 的结论:小样本证明 KVC 适用 slice 存在600-request mixed window 证明当前实现还不能稳定服务真实混合 workload。
## 5. 是否已经相对 pd-colocation/pd-disaggregation 取得提升
当前只能下这个限定结论:
1. **相对 PD-disaggregation已经取得清晰提升。**
PD-disaggregation p50 6.306sKVC no-BP p50 1.936sKVC+BP p50 2.076sTTFT p50 5.336s vs 0.139s/0.154s。收益主要来自 turn2+ 直接打到已有 D session避免每轮 P 全量 prefill 和 P->D KV transfer。
2. **相对强 DP cache-aware在 KVC-fit slice 上有提升。**
KVC no-BP 和 KVC+BP overall mean/p50/p90/p99 都优于 DP并且 paired wins 分别是 153/179 和 152/179。但这是 KVC-friendly、全 multi-turn、turn2+ 100% direct-eligible 的 slice不代表 full Ali workload。
3. **相对 full workload尚未证明。**
全量 Ali 里 single-turn 占多数,且长上下文和长尾 output 较多。KVC 的收益面会被 single-turn 稀释D resident KV 容量和 tail 稳定性会成为更强约束。
4. **相对 600-request / 15min mixed window尚未取得稳定提升。**
KVC clean E2E mean/p50 有正信号,但 error_count=53/600p90 paired delta 相对 DP 变差。按“E2E + error/truncation”标准这不能算系统性胜出。
## 6. 提升来自哪里
主要收益链路:
1. turn1 seed 在 D 上建立 session。
2. turn2+ 若 append 小、hash overlap 高,直接走 `kvcache-direct-to-d-session`
3. direct-to-D 避免 P worker 参与,不走 P->D KV transfer。
4. D 只对 append suffix 做少量 prefill已有前缀 KV 直接复用。
这带来两个可观测收益:
- TTFT 大幅下降turn2+ direct 子集 TTFT mean 从 DP 的约 1.04s 降到约 0.112s。
- E2E 下降direct 子集 mean E2E 降低约 2.50s。
另外KVC 的 cached_tokens 统计显著更高KVC mean cached tokens 5,992DP mean 228。这说明它确实复用了大段真实前缀 KV。
## 7. 遇到的问题与修复
### 问题 1通用 sampler 会被单个长 session 主导
现象:真实 Ali session 分布长尾明显duration-oriented 采样容易选出不均衡样本,导致策略比较不可重复或不代表多 session 竞争。
修复:新增 `scripts/prepare_real_ali_samples.py`,按 session 上限和每 session turn 上限生成 balanced sample并保留真实 token/hash/timestamp。
### 问题 2不同策略重新采样导致不可比
现象:`benchmark-live` 原本会按参数重新采样,不同策略可能 replay 不同请求。
修复:新增 `--use-trace-as-sample`,所有策略 copy 并 replay 同一个 prebuilt sample后续 paired comparison 才有意义。
### 问题 3长 trace replay 中途没有进度
现象:`request-metrics.jsonl` 和 summary 只在 replay 结束后写出,跑真实 pacing 时很难判断是正常等待还是卡住。
修复:新增 `replay-progress.jsonl` heartbeat每 30s 写 submitted/completed/inflight/errors/execution_modes。它只使用客户端本地状态不访问 `/server_info`
### 问题 4`/server_info` polling 会扰动 scheduler
现象:旧 profiling 里 1Hz polling 曾明显改变错误数。真实 performance run 如果持续 poll pool会把测量工具变成干扰源。
修复:`scripts/sweep_real_ali_kvc.sh` 默认关闭 pool polling。容量类问题依赖结构日志和必要时单独 profile run不混入 headline performance run。
### 问题 5backpressure smoke 没有触发 backpressure
现象KVC-fit smoke 中 transfer queue max 只有 3所有 admission reason 都是 `ok`pause_ms 全 0。
结论:这轮不能证明 backpressure 有效,只能证明 direct-to-D 有效。需要更高 session 数、更大 resident KV 或更强并发的压力样本专门验证 backpressure。
### 问题 6环境和旧报告不一致
现象:旧文档写的是 H100本轮真实环境是 H20模型路径也在 `/home/admin/cpfs/wjh/models/...`
处理:本日志按 H20 记录;跨文档比较时只看机制趋势,不把 H100/H20 的绝对 latency 混为同一实验。
### 问题 7continuous window 可能截断 session ancestry
现象:按 timestamp 直接截窗口可能留下 parent turn 在窗口外的请求。对 KVC 来说,这会让 session reuse/turn chain 与真实 workload 不一致。
处理:当前 continuous window 只作为待改进候选,不作为正式 headline。正式窗口需要保留 warmup ancestors或显式保留原始 session chain 信息。
## 8. 如果后续 full workload 效果不好,当前假设
可能不是实现小 bug而是方案适用面和资源约束共同导致
1. **single-turn 稀释收益**:全量 Ali session 中 single-turn 占多数KVC seed 只带来成本,没有 turn2+ reuse。
2. **长上下文挤占 D KV 池**input p90 51K、p99 113Kresident KV 长尾会限制 D 上可同时保留的 session。
3. **direct 不是免费 lunch**turn1 seed、admission probe、session lifecycle 都有额外成本;只有后续 turns 充分复用时才摊薄。
4. **D 端容量和 eviction 仍是核心风险**:旧 SWE 实验已经显示 session pinning + D 容量盲选会造成 starvationearly multi-turn balanced 样本可能复现。
5. **普通 PD-disaggregation 很弱**:如果 KVC fallback 频繁退回普通 PD 路径,整体会被 P->D transfer 和高 TTFT 拖垮。
6. **H20 显存余量不足会改变 KVC 条件**:默认 KVC 2P6D 启动 OOM必须降 `mem_fraction_static` 才能完成 600-request run这会进一步降低 D KV pool放大 session-cap 和 timeout。
## 9. 下一步验证顺序
1. 补 sticky/session-affinity baseline拆出“粘到同一个 D”和“KVC direct bypass”的贡献。
2. 补 KVC `seed-min-turn-id=2` 或 no-turn1-seed验证 turn1 seed 成本是否值得。
3. 在 early multi-turn balanced 样本上跑 DP / PD / KVC no-BP / KVC+BP验证大上下文真实 multi-turn 压力。
4. 选小固定样本跑 `time-scale=1`,避免只在压缩 replay 条件下成立。
5. 做包含 single-turn 的 continuous window并处理窗口内 parent turn 缺失问题,再按 full Ali 分布加权报告。
6. 对最终候选配置做 N>=3 rerun报告方差N=1 只作为 smoke。
7. 针对 600-request window 优先跑 `seed-min-turn-id=2`,减少 single-turn turn1 seed目标是先把 53/600 errors 降到接近 DP 的 1/600再讨论 latency。
- 当前第一次尝试未进入 replay启动阶段疑似 OS OOM需要先解决 H20 启动显存/系统内存稳定性,或者降低 worker 数/模型内存占用。
## 10. KVC error 根因与 multi-turn-only 验证准备
用户指出 179-request run 不够,并要求至少 15min / 600+ 请求;当前正式问题定位基于
`outputs/real-ali-kvc-iter/runs/window900s-600req-ts1-kvc-mem082/kvcache-centric-kv-aware-worker-admission-20260511T093601Z`
### 10.1 为什么 KVC 有大量 error
该 run 为 600 requestsKVC mem0.82 有 53 errors
- 50 个 `ReadTimeout`
- 2 个 `/open_session` HTTP 400。
- 1 个真实超上下文错误input 310,521 > model context 262,144。
按 turn 看50/53 errors 在 turn1。按 structural admission 看,绝大多数失败请求在
`structural/admission-events.jsonl` 中已经被 D 端 admission 判定 `can_admit=true`,所以这不是单纯的
`d-session-cap``no-space`。主要失败点是 turn1 seed 进入 KVC seeded path 后,在
P/D streaming session bootstrap、P->D transfer 或 router streaming 过程中超时;而混合真实窗口中 single-turn session 很多,
这些 turn1 seed 对大多数 session 没有后续复用收益。
结论:当前 KVC error 的主因是 **对 single-turn / 未知是否多轮的 session 做了过多 turn1 seed**,它把大量新 session 推进
KVC control-plane 和 seeded router 路径,增加超时和 session lifecycle 残留;不是 direct-to-D fast path 本身出错。
### 10.2 已做修复/消融开关
代码与脚本修复:
- `scripts/sweep_real_ali_kvc.sh` 新增 `KVC_SEED_ONLY_MULTITURN=1`,会传入
`--kvcache-seed-only-multiturn-sessions`。这是 oracle 消融,用来验证“只 seed 会有后续 turn 的 session”能否消除 turn1 seed 错误。
- `src/agentic_pd_hybrid/replay.py``/open_session` 400 增加 close+retry 一次,并写
`structural/session-lifecycle.jsonl`。这是 lifecycle 健壮性修复,目标是处理 timeout 后服务端残留 session 导致的
“already exists” 400不改变 routing policy。
- `scripts/prepare_real_ali_samples.py` 新增 `--window-min-turns``--window-output-name`,用于生成可复现的 multi-turn-only window 样本。
验证:
- `uv run python -m py_compile scripts/prepare_real_ali_samples.py src/agentic_pd_hybrid/replay.py src/agentic_pd_hybrid/benchmark.py src/agentic_pd_hybrid/cli.py`
- `bash -n scripts/sweep_real_ali_kvc.sh`
### 10.3 已生成 multi-turn-only 样本
样本路径:
`outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl`
生成命令:
```bash
uv run python scripts/prepare_real_ali_samples.py \
--trace /home/admin/cpfs/wjh/ali-trace/trace-qwen3-coder-formatted/041715-041717.jsonl \
--output-root outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn \
--window-duration-s 900 \
--window-target-requests 600 \
--window-buckets 15 \
--window-min-turns 2 \
--window-output-name ali-window-multiturn.jsonl \
--profiles representative-mt \
--max-sessions 64 \
--max-turns-per-session 12
```
样本 profile
- 626 requests107 sessions107 个都是 multi-turn sessions。
- sampled duration 889.341s。
- turn2+ = 519。
- direct-eligible turn2+ = 473 / 519 = 91.1%。
- missing parent = 0。
- input p50/p90/p99 = 26,846 / 91,596 / 123,898 tokens。
这个 case 是“过滤掉 single-turn 的多轮压力切片”,不能替代 full mixed workload但可以回答
如果 workload 确实以多轮 coding agent session 为主KVC 的 direct-to-D 覆盖率和稳定性是否接近 microbenchmark。
### 10.4 GPU 资源阻塞
截至本次记录8 张 GPU 均被另一组 `vllm serve` 进程占用,每张约 82GB / 98GB端口为 51000-51007。
这些不是本 repo 的 SGLang/benchmark 进程,因此未启动新的性能 run避免把资源冲突误判为 KVC 策略失败。
GPU 释放后,优先跑两组:
```bash
# 混合真实窗口:验证 seed-only-multiturn 是否把 53/600 errors 降下来
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req/ali-window.jsonl \
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-ts1-kvc-seedonly-mt-mem082 \
RUNS="kvc" \
TIME_SCALE=1 \
CONCURRENCY=32 \
REQUEST_TIMEOUT_S=600 \
STACK_TIMEOUT_S=1800 \
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
KVC_SEED_ONLY_MULTITURN=1 \
bash scripts/sweep_real_ali_kvc.sh
# 多轮-only workloadDP vs KVC对照过滤 workload 是否能复现 microbenchmark 收益
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl \
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-mem082 \
RUNS="dp kvc" \
TIME_SCALE=1 \
CONCURRENCY=32 \
REQUEST_TIMEOUT_S=600 \
STACK_TIMEOUT_S=1800 \
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
KVC_SEED_ONLY_MULTITURN=1 \
bash scripts/sweep_real_ali_kvc.sh
```
### 10.5 multi-turn-only 启动尝试被 GPU 占用阻塞
用户要求启动 multi-turn-only 的 `pd-disaggregation` vs `kvcache-centric` 对比。启动前检查发现 8 张 GPU 均被外部
`vllm serve` 进程占用,每张约 84GB / 98GB端口为 51000-51007。该进程不属于本 repo 的 SGLang/benchmark run。
因此本次没有强行启动 SGLang。原因是剩余显存不足以启动 2P6D 或 8-worker 对照,强行运行只会得到初始化 OOM 或不稳定超时,
不能用于判断 KVC pd-hybrid 是否优于 pd-disaggregation。
资源释放后要运行的 multi-turn-only 对比命令:
```bash
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl \
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082 \
RUNS="pd kvc" \
TIME_SCALE=1 \
CONCURRENCY=32 \
REQUEST_TIMEOUT_S=600 \
STACK_TIMEOUT_S=1800 \
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
KVC_SEED_ONLY_MULTITURN=1 \
bash scripts/sweep_real_ali_kvc.sh
```
### 10.6 multi-turn-only PD vs KVC 正式结果
资源释放后已启动并完成 multi-turn-only 对比。运行命令:
```bash
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl \
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082 \
RUNS="pd kvc" \
TIME_SCALE=1 \
CONCURRENCY=32 \
REQUEST_TIMEOUT_S=600 \
STACK_TIMEOUT_S=1800 \
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
KVC_SEED_ONLY_MULTITURN=1 \
bash scripts/sweep_real_ali_kvc.sh
```
Run 目录:
- PD`outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082/pd-disaggregation-kv-aware-20260512T030433Z`
- KVC`outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082/kvcache-centric-kv-aware-worker-admission-20260512T040444Z`
样本仍是 626 requests、107 sessions、889.341s,全部为 multi-turn session。
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 | TTFT p90 |
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
| PD-disaggregation 2P6D | 626 | 0 | 0 | 97.013s | 70.243s | 214.309s | 308.406s | 94.506s | 69.048s | 212.528s |
| KVC 2P6D worker admission, no BP, seed-only-multiturn | 626 | 39 | 0 | 43.362s | 8.239s | 135.289s | 236.475s | 40.578s | 1.442s | 132.233s |
Paired comparison 只在 KVC 成功且 PD 也有 latency 的 587 个 request 上计算:
- PD same-request E2E mean/p50/p90/p9997.457s / 70.514s / 214.095s / 309.362s。
- KVC same-request E2E mean/p50/p90/p9943.362s / 8.239s / 135.930s / 237.283s。
- mean E2E reduction55.5%。
- absolute mean improvement54.095s。
- wins/losses472 / 115。
按 KVC execution mode 拆分:
| KVC mode | Count | KVC mean | PD same mean | Reduction |
|---|---:|---:|---:|---:|
| `kvcache-direct-to-d-session` | 286 | 2.255s | 92.944s | 97.6% |
| `pd-router-fallback-large-append-session-cap` | 169 | 88.869s | 113.614s | 21.8% |
| `pd-router-d-session-reseed` | 25 | 143.456s | 106.501s | -34.7% |
| `pd-router-large-append-reseed` | 19 | 47.631s | 88.981s | 46.5% |
| `pd-router-turn1-seed` | 78 | 55.974s | 73.050s | 23.4% |
按 turn 深度拆分:
- turn2+504 successful paired requestsKVC mean 40.791s vs PD mean 101.055sreduction 59.6%。
- turn>=5299 successful paired requestsKVC mean 34.121s vs PD mean 104.697sreduction 67.4%。
- turn>=10161 successful paired requestsKVC mean 39.027s vs PD mean 86.548sreduction 54.9%。
KVC execution modes
- `kvcache-direct-to-d-session`286。
- `pd-router-fallback-large-append-session-cap`169。
- `pd-router-turn1-seed`78。
- `pd-router-d-session-reseed`25。
- `pd-router-large-append-reseed`19。
- `pd-router-fallback-no-d-capacity`4。
- `pd-router-turn1-d-backpressure`5。
- `pd-router-d-session-reseed-after-eviction`1。
- error rows39记录为 `kvcache-centric`
KVC 的收益来源非常清楚286 个 direct-to-D request 的 same-request mean 从 PD 的 92.944s 降到 2.255s,基本复现了 microbenchmark 的核心机制收益。它跳过 P worker 和 P->D KV transfer只在已有 D session 上处理 append suffix。总体 actual KV transfer blocks 从 PD same-success 的 4436 降到 KVC success 的 3827summary 口径下 KVC total actual KV transfer blocks 为 3827低于 PD 的 5276。
但这轮仍不能作为“稳定生产级胜出”结论:
1. KVC 仍有 39/626 errorserror rate 6.23%PD 为 0。
2. 39 个错误全部是客户端 `ReadTimeout`,不是服务端 OOM/Traceback服务端日志未发现对应崩溃关键字。
3. 错误分布24 个 turn115 个 turn2+;按 decode 节点分布为 decode-0 15、decode-1 9、decode-3 7、decode-4 5、decode-5 3。
4. 8 次 `/open_session` 400 已被 close+retry 兜住,并写入 `structural/session-lifecycle.jsonl`,没有形成 HTTP 400 error row。
5. 长尾 drain 明显PD 约 60min 完成KVC 约 40min 完成;二者都远超 889s trace duration。KVC 在 900s 时已完成 490/626而 PD 只完成 283/626说明 KVC 中段吞吐更好,但最后几十个 large-append fallback 仍然拖尾。
6. direct-to-D 覆盖率为 286/626 = 45.7%,低于样本静态 direct-eligible turn2+ ratio 91.1%。缺口主要来自 D session/residency capacity、large append session cap、reseed/fallback。
当前判断:
- 如果只看 successful paired requestmulti-turn-only workload 上 KVC 相对 PD-disaggregation 已经有很强 E2E 提升,且提升主要来自 direct-to-D session reuse。
- 如果按系统可靠性看,当前实现还不合格,因为 6.23% timeout 会抵消“稳定系统”的结论。
- 真实 workload 与 microbenchmark 差距的主要原因不是 KVC fast path 无效,而是 fast path 覆盖率不足、D 侧 resident KV/session admission 压力、large append fallback、以及 seeded/reseed path 的 timeout 稳定性。

View File

@@ -0,0 +1,450 @@
#!/usr/bin/env python3
"""Prepare balanced real-Ali trace samples for KVC experiments.
The generic sampler is duration-oriented and can be dominated by one long
session. This script keeps real request lengths/timestamps but caps turns per
session so live sweeps can compare policies on a repeatable multi-session
workload.
"""
from __future__ import annotations
import argparse
import json
import statistics
from collections import defaultdict
from dataclasses import asdict, dataclass
from pathlib import Path
from agentic_pd_hybrid.trace import TraceRequest, load_trace
@dataclass(frozen=True)
class SampleSummary:
input_trace_path: str
output_trace_path: str
profile: str
request_count: int
session_count: int
multi_turn_session_count: int
turn2plus_count: int
direct_eligible_turn2plus_count: int
direct_eligible_turn2plus_ratio: float
missing_parent_count: int
max_sessions: int
max_turns_per_session: int
start_time_s: float
end_time_s: float
sampled_duration_s: float
rebased_timestamps: bool
input_tokens: dict[str, float] | None
output_tokens: dict[str, float] | None
append_tokens: dict[str, float] | None
inter_turn_gap_s: dict[str, float] | None
overlap_ratio: dict[str, float] | None
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--trace", type=Path, required=True)
parser.add_argument("--output-root", type=Path, required=True)
parser.add_argument("--max-sessions", type=int, default=64)
parser.add_argument("--max-turns-per-session", type=int, default=12)
parser.add_argument("--start-time-s", type=float, default=0.0)
parser.add_argument(
"--window-duration-s",
type=float,
default=None,
help=(
"If set, also write continuous-window samples that keep only requests "
"inside [start-time, start-time + window-duration]."
),
)
parser.add_argument(
"--window-target-requests",
type=int,
default=None,
help=(
"For continuous-window samples, select whole sessions across time "
"buckets until at least this many requests are included. This keeps "
"the window span while making live runs tractable."
),
)
parser.add_argument(
"--window-buckets",
type=int,
default=15,
help="Number of time buckets used with --window-target-requests.",
)
parser.add_argument(
"--window-min-turns",
type=int,
default=1,
help=(
"Minimum number of in-window turns per selected session for "
"continuous-window samples."
),
)
parser.add_argument(
"--window-output-name",
default="ali-window.jsonl",
help="Output filename for the continuous-window sample.",
)
parser.add_argument(
"--max-sampled-duration-s",
type=float,
default=None,
help=(
"For balanced profile samples, drop requests after the first selected "
"timestamp plus this duration. Use only for quick smoke runs; headline "
"runs should preserve the full sampled span."
),
)
parser.add_argument(
"--profiles",
nargs="+",
default=["representative-mt", "kvc-fit-smallappend"],
choices=["representative-mt", "kvc-fit-smallappend"],
)
parser.add_argument(
"--no-rebase-timestamps",
action="store_true",
help="Keep original timestamps instead of shifting the sample to start at 0.",
)
args = parser.parse_args()
requests = load_trace(args.trace)
sessions: dict[str, list[TraceRequest]] = defaultdict(list)
for request in requests:
sessions[request.session_id].append(request)
args.output_root.mkdir(parents=True, exist_ok=True)
if args.window_duration_s is not None:
if args.window_target_requests is None:
selected = _select_window(
requests=requests,
start_time_s=args.start_time_s,
window_duration_s=args.window_duration_s,
)
profile = "window"
else:
selected = _select_window_session_sample(
sessions=sessions,
start_time_s=args.start_time_s,
window_duration_s=args.window_duration_s,
target_requests=args.window_target_requests,
bucket_count=args.window_buckets,
min_turns=args.window_min_turns,
)
profile = (
"window-session-sample"
if args.window_min_turns <= 1
else f"window-session-sample-min{args.window_min_turns}turns"
)
output_path = args.output_root / args.window_output_name
summary = _write_sample(
selected=selected,
input_trace_path=args.trace,
output_path=output_path,
profile=profile,
max_sessions=args.max_sessions,
max_turns_per_session=args.max_turns_per_session,
rebase_timestamps=not args.no_rebase_timestamps,
)
print(
f"window: wrote {summary.request_count} requests from "
f"{summary.session_count} sessions to {output_path}"
)
for profile in args.profiles:
selected = _select_profile(
sessions=sessions,
profile=profile,
start_time_s=args.start_time_s,
max_sessions=args.max_sessions,
max_turns_per_session=args.max_turns_per_session,
max_sampled_duration_s=args.max_sampled_duration_s,
)
output_path = args.output_root / f"ali-{profile}.jsonl"
summary = _write_sample(
selected=selected,
input_trace_path=args.trace,
output_path=output_path,
profile=profile,
max_sessions=args.max_sessions,
max_turns_per_session=args.max_turns_per_session,
rebase_timestamps=not args.no_rebase_timestamps,
)
print(
f"{profile}: wrote {summary.request_count} requests from "
f"{summary.session_count} sessions to {output_path}"
)
def _select_profile(
*,
sessions: dict[str, list[TraceRequest]],
profile: str,
start_time_s: float,
max_sessions: int,
max_turns_per_session: int,
max_sampled_duration_s: float | None,
) -> list[TraceRequest]:
eligible: list[list[TraceRequest]] = []
for session_requests in sessions.values():
ordered = _ordered(session_requests)
if len(ordered) < 2:
continue
if ordered[0].timestamp_s < start_time_s:
continue
if profile == "kvc-fit-smallappend" and not _is_kvc_fit_smallappend(ordered):
continue
eligible.append(ordered[:max_turns_per_session])
eligible.sort(key=lambda items: (items[0].timestamp_s, items[0].session_id))
selected_sessions = eligible[:max_sessions]
selected = [request for items in selected_sessions for request in items]
selected.sort(key=lambda request: (request.timestamp_s, request.chat_id))
if selected and max_sampled_duration_s is not None:
first_ts = selected[0].timestamp_s
end_ts = first_ts + max_sampled_duration_s
selected = [
request for request in selected if request.timestamp_s <= end_ts
]
return selected
def _select_window(
*,
requests: list[TraceRequest],
start_time_s: float,
window_duration_s: float,
) -> list[TraceRequest]:
end_time_s = start_time_s + window_duration_s
selected = [
request
for request in requests
if start_time_s <= request.timestamp_s <= end_time_s
]
selected.sort(key=lambda request: (request.timestamp_s, request.chat_id))
return selected
def _select_window_session_sample(
*,
sessions: dict[str, list[TraceRequest]],
start_time_s: float,
window_duration_s: float,
target_requests: int,
bucket_count: int,
min_turns: int,
) -> list[TraceRequest]:
if target_requests <= 0:
raise ValueError("--window-target-requests must be positive")
if bucket_count <= 0:
raise ValueError("--window-buckets must be positive")
if min_turns <= 0:
raise ValueError("--window-min-turns must be positive")
end_time_s = start_time_s + window_duration_s
bucket_width_s = window_duration_s / bucket_count
buckets: list[list[list[TraceRequest]]] = [[] for _ in range(bucket_count)]
for session_requests in sessions.values():
ordered = _ordered(session_requests)
if not ordered:
continue
first = ordered[0]
if first.timestamp_s < start_time_s or first.timestamp_s > end_time_s:
continue
in_window = [
request
for request in ordered
if start_time_s <= request.timestamp_s <= end_time_s
]
if len(in_window) < min_turns:
continue
bucket_index = min(
bucket_count - 1,
int((first.timestamp_s - start_time_s) / bucket_width_s),
)
buckets[bucket_index].append(in_window)
for bucket in buckets:
bucket.sort(key=lambda items: (items[0].timestamp_s, items[0].session_id))
selected_sessions: list[list[TraceRequest]] = []
selected_count = 0
positions = [0 for _ in range(bucket_count)]
while selected_count < target_requests:
progressed = False
for index, bucket in enumerate(buckets):
if positions[index] >= len(bucket):
continue
session_requests = bucket[positions[index]]
positions[index] += 1
selected_sessions.append(session_requests)
selected_count += len(session_requests)
progressed = True
if selected_count >= target_requests:
break
if not progressed:
break
selected = [request for items in selected_sessions for request in items]
selected.sort(key=lambda request: (request.timestamp_s, request.chat_id))
if len(selected) < target_requests:
raise ValueError(
f"window session sample selected only {len(selected)} requests; "
f"target was {target_requests}"
)
return selected
def _is_kvc_fit_smallappend(session_requests: list[TraceRequest]) -> bool:
initial = session_requests[0]
if initial.input_length < 2048 or initial.input_length > 16000:
return False
for request in session_requests:
if request.output_length > 2048:
return False
for previous, current in zip(session_requests, session_requests[1:], strict=False):
append_tokens = current.input_length - (
previous.input_length + previous.output_length
)
if append_tokens <= 0 or append_tokens > 2048:
return False
if _overlap_ratio(previous, current) < 0.75:
return False
return True
def _write_sample(
*,
selected: list[TraceRequest],
input_trace_path: Path,
output_path: Path,
profile: str,
max_sessions: int,
max_turns_per_session: int,
rebase_timestamps: bool,
) -> SampleSummary:
if not selected:
raise ValueError(f"profile {profile!r} selected no requests")
first_ts = selected[0].timestamp_s
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as handle:
for request in selected:
timestamp = request.timestamp_s - first_ts if rebase_timestamps else request.timestamp_s
payload = {
"chat_id": request.chat_id,
"parent_chat_id": request.parent_chat_id,
"timestamp": round(timestamp, 6),
"input_length": request.input_length,
"output_length": request.output_length,
"type": request.request_type,
"turn": request.turn_id,
"hash_ids": list(request.hash_ids),
}
handle.write(json.dumps(payload, sort_keys=True) + "\n")
sessions = defaultdict(list)
for request in selected:
sessions[request.session_id].append(request)
selected_chat_ids = {request.chat_id for request in selected}
missing_parent_count = sum(
1
for request in selected
if request.parent_chat_id >= 0 and request.parent_chat_id not in selected_chat_ids
)
append_values: list[float] = []
gap_values: list[float] = []
overlap_values: list[float] = []
direct_eligible_count = 0
for session_requests in sessions.values():
ordered = _ordered(session_requests)
for previous, current in zip(ordered, ordered[1:], strict=False):
append_tokens = current.input_length - (
previous.input_length + previous.output_length
)
overlap_ratio = _overlap_ratio(previous, current)
append_values.append(float(append_tokens))
gap_values.append(float(current.timestamp_s - previous.timestamp_s))
overlap_values.append(overlap_ratio)
if append_tokens > 0 and append_tokens <= 2048 and overlap_ratio > 0:
direct_eligible_count += 1
turn2plus_count = sum(max(0, len(items) - 1) for items in sessions.values())
start = min(request.timestamp_s for request in selected)
end = max(request.timestamp_s for request in selected)
summary = SampleSummary(
input_trace_path=str(input_trace_path),
output_trace_path=str(output_path),
profile=profile,
request_count=len(selected),
session_count=len(sessions),
multi_turn_session_count=sum(1 for items in sessions.values() if len(items) > 1),
turn2plus_count=turn2plus_count,
direct_eligible_turn2plus_count=direct_eligible_count,
direct_eligible_turn2plus_ratio=(
direct_eligible_count / turn2plus_count if turn2plus_count else 0.0
),
missing_parent_count=missing_parent_count,
max_sessions=max_sessions,
max_turns_per_session=max_turns_per_session,
start_time_s=0.0 if rebase_timestamps else start,
end_time_s=end - start if rebase_timestamps else end,
sampled_duration_s=end - start,
rebased_timestamps=rebase_timestamps,
input_tokens=_stats([float(request.input_length) for request in selected]),
output_tokens=_stats([float(request.output_length) for request in selected]),
append_tokens=_stats(append_values),
inter_turn_gap_s=_stats(gap_values),
overlap_ratio=_stats(overlap_values),
)
with output_path.with_suffix(output_path.suffix + ".summary.json").open(
"w", encoding="utf-8"
) as handle:
json.dump(asdict(summary), handle, indent=2, sort_keys=True)
return summary
def _ordered(session_requests: list[TraceRequest]) -> list[TraceRequest]:
return sorted(
session_requests,
key=lambda request: (request.timestamp_s, request.turn_id, request.chat_id),
)
def _overlap_ratio(previous: TraceRequest, current: TraceRequest) -> float:
if not current.hash_ids:
return 0.0
previous_blocks = set(previous.hash_ids)
overlap = sum(1 for block in current.hash_ids if block in previous_blocks)
return overlap / len(current.hash_ids)
def _stats(values: list[float]) -> dict[str, float] | None:
if not values:
return None
ordered = sorted(values)
return {
"count": float(len(ordered)),
"mean": statistics.fmean(ordered),
"min": ordered[0],
"p50": _percentile(ordered, 0.50),
"p90": _percentile(ordered, 0.90),
"p99": _percentile(ordered, 0.99),
"max": ordered[-1],
}
def _percentile(sorted_values: list[float], percentile: float) -> float:
if len(sorted_values) == 1:
return sorted_values[0]
return sorted_values[round((len(sorted_values) - 1) * percentile)]
if __name__ == "__main__":
main()

170
scripts/sweep_real_ali_kvc.sh Executable file
View File

@@ -0,0 +1,170 @@
#!/usr/bin/env bash
# Real Ali workload sweep for KVC pd-hybrid.
#
# This script expects a prebuilt sample trace and replays it exactly for every
# mechanism. It intentionally keeps pool polling disabled for performance runs.
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$REPO_ROOT"
MODEL=${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}
TRACE=${TRACE:-outputs/real-ali-kvc-iter/samples-balanced/ali-kvc-fit-smallappend.jsonl}
OUT_ROOT=${OUT_ROOT:-outputs/real-ali-kvc-iter/runs}
TIME_SCALE=${TIME_SCALE:-1}
CONCURRENCY=${CONCURRENCY:-32}
REQUEST_TIMEOUT_S=${REQUEST_TIMEOUT_S:-300}
STACK_TIMEOUT_S=${STACK_TIMEOUT_S:-1200}
RUNS=${RUNS:-"dp kvc_bp"}
EXTRA_SERVER_ARGS=${EXTRA_SERVER_ARGS:-}
PREFILL_EXTRA_SERVER_ARGS=${PREFILL_EXTRA_SERVER_ARGS:-}
DECODE_EXTRA_SERVER_ARGS=${DECODE_EXTRA_SERVER_ARGS:-}
KVC_SEED_MIN_TURN_ID=${KVC_SEED_MIN_TURN_ID:-1}
KVC_SEED_ONLY_MULTITURN=${KVC_SEED_ONLY_MULTITURN:-0}
mkdir -p "$OUT_ROOT"
LOG="$OUT_ROOT/sweep.log"
log() {
echo "[$(date '+%F %T')] $*" | tee -a "$LOG"
}
common_args=(
--trace "$TRACE"
--model-path "$MODEL"
--output-root "$OUT_ROOT"
--use-trace-as-sample
--time-scale "$TIME_SCALE"
--concurrency-limit "$CONCURRENCY"
--timeout-s "$STACK_TIMEOUT_S"
--request-timeout-s "$REQUEST_TIMEOUT_S"
)
if [[ -n "$EXTRA_SERVER_ARGS" ]]; then
common_args+=(--extra-server-args "$EXTRA_SERVER_ARGS")
fi
if [[ -n "$PREFILL_EXTRA_SERVER_ARGS" ]]; then
common_args+=(--prefill-extra-server-args "$PREFILL_EXTRA_SERVER_ARGS")
fi
if [[ -n "$DECODE_EXTRA_SERVER_ARGS" ]]; then
common_args+=(--decode-extra-server-args "$DECODE_EXTRA_SERVER_ARGS")
fi
kvc_args=(
"${common_args[@]}"
--mechanism kvcache-centric
--policy kv-aware
--prefill-workers 2
--decode-workers 6
--prefill-tp-size 1
--decode-tp-size 1
--prefill-gpu-ids 0,1
--decode-gpu-ids 2,3,4,5,6,7
--transfer-backend mooncake
--gpu-budget 8
--kvcache-admission-mode worker
--kvcache-seed-min-turn-id "$KVC_SEED_MIN_TURN_ID"
--kvcache-seed-max-inflight-decode -1
--kvcache-prefill-backup-policy release-after-transfer
--kvcache-prefill-priority-eviction
)
if [[ "$KVC_SEED_ONLY_MULTITURN" == "1" ]]; then
kvc_args+=(--kvcache-seed-only-multiturn-sessions)
fi
run_dp() {
log "=== DP cache-aware baseline: 8 direct workers ==="
uv run agentic-pd-hybrid benchmark-live \
"${common_args[@]}" \
--mechanism pd-colo \
--policy kv-aware \
--prefill-workers 0 \
--decode-workers 0 \
--direct-workers 8 \
--direct-tp-size 1 \
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
--gpu-budget 8
}
run_pd_disagg() {
log "=== PD-disaggregation baseline: 2P6D ==="
uv run agentic-pd-hybrid benchmark-live \
"${common_args[@]}" \
--mechanism pd-disaggregation \
--policy kv-aware \
--prefill-workers 2 \
--decode-workers 6 \
--prefill-tp-size 1 \
--decode-tp-size 1 \
--prefill-gpu-ids 0,1 \
--decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8
}
run_pd_sticky() {
log "=== PD-disaggregation sticky baseline: 2P6D ==="
uv run agentic-pd-hybrid benchmark-live \
"${common_args[@]}" \
--mechanism pd-disaggregation \
--policy sticky \
--prefill-workers 2 \
--decode-workers 6 \
--prefill-tp-size 1 \
--decode-tp-size 1 \
--prefill-gpu-ids 0,1 \
--decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8
}
run_kvc() {
log "=== KVC baseline: 2P6D worker admission, no backpressure ==="
uv run agentic-pd-hybrid benchmark-live "${kvc_args[@]}"
}
run_kvc_bp() {
log "=== KVC candidate: 2P6D worker admission + backpressure ==="
uv run agentic-pd-hybrid benchmark-live \
"${kvc_args[@]}" \
--enable-backpressure \
--backpressure-max-pause-s 2.0
}
summarize_latest() {
log "=== Latest summaries ==="
find "$OUT_ROOT" -maxdepth 2 -name 'request-metrics.jsonl.summary.json' -print \
| sort \
| while read -r summary; do
python - "$summary" <<'PY'
import json, sys
p=sys.argv[1]
d=json.load(open(p))
lat=d.get("latency_stats_s") or {}
tt=d.get("ttft_stats_s") or {}
em=d.get("execution_modes") or {}
print(p)
print(" reqs", d.get("request_count"), "errors", d.get("error_count"), "trunc", d.get("truncated_request_count"))
print(" lat mean/p50/p90/p99", lat.get("mean"), lat.get("p50"), lat.get("p90"), lat.get("p99"))
print(" ttft mean/p50/p90", tt.get("mean"), tt.get("p50"), tt.get("p90"))
print(" modes", em)
PY
done | tee -a "$LOG"
}
log "Trace: $TRACE"
log "Model: $MODEL"
log "Runs: $RUNS | time-scale=$TIME_SCALE concurrency=$CONCURRENCY | kvc-seed-min-turn-id=$KVC_SEED_MIN_TURN_ID | kvc-seed-only-multiturn=$KVC_SEED_ONLY_MULTITURN"
for run in $RUNS; do
case "$run" in
dp) run_dp ;;
pd) run_pd_disagg ;;
pd_sticky) run_pd_sticky ;;
kvc) run_kvc ;;
kvc_bp) run_kvc_bp ;;
*) log "Unknown run name: $run"; exit 2 ;;
esac
done
summarize_latest
log "DONE"

View File

@@ -3,13 +3,20 @@ from __future__ import annotations
import asyncio
import json
import signal
import shutil
from collections import Counter
from dataclasses import asdict, dataclass, replace
from datetime import UTC, datetime
from pathlib import Path
from agentic_pd_hybrid.replay import ReplayConfig, replay_trace
from agentic_pd_hybrid.sampling import SessionSampleConfig, sample_trace_sessions
from agentic_pd_hybrid.sampling import (
SessionSampleConfig,
SessionSampleSummary,
sample_trace_sessions,
)
from agentic_pd_hybrid.stack import ManagedPdStack, launch_pd_stack
from agentic_pd_hybrid.trace import load_trace
from agentic_pd_hybrid.topology import SingleNodeTopology
@@ -47,12 +54,14 @@ class BenchmarkConfig:
pool_poll_include_sessions: bool = True
enable_backpressure: bool = False
backpressure_max_pause_s: float = 2.0
progress_interval_s: float = 30.0
sample_profile: str = "default"
min_initial_input_tokens: int | None = None
max_initial_input_tokens: int | None = None
max_append_input_tokens: int | None = None
max_output_tokens: int | None = None
min_overlap_ratio: float | None = None
use_trace_as_sample: bool = False
launch_stack: bool = True
@@ -94,22 +103,37 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
)
sampled_trace_path = run_dir / "sampled-trace.jsonl"
sample_summary = sample_trace_sessions(
SessionSampleConfig(
trace_path=config.trace_path,
output_path=sampled_trace_path,
target_duration_s=config.target_duration_s,
start_time_s=config.start_time_s,
if config.use_trace_as_sample:
shutil.copyfile(config.trace_path, sampled_trace_path)
sample_summary = _summarize_trace_sample(
input_trace_path=config.trace_path,
sampled_trace_path=sampled_trace_path,
profile=config.sample_profile,
session_sample_rate=config.session_sample_rate,
min_turns=config.min_turns,
profile=config.sample_profile, # type: ignore[arg-type]
min_initial_input_tokens=config.min_initial_input_tokens,
max_initial_input_tokens=config.max_initial_input_tokens,
max_append_input_tokens=config.max_append_input_tokens,
max_output_tokens=config.max_output_tokens,
min_overlap_ratio=config.min_overlap_ratio,
)
)
else:
sample_summary = sample_trace_sessions(
SessionSampleConfig(
trace_path=config.trace_path,
output_path=sampled_trace_path,
target_duration_s=config.target_duration_s,
start_time_s=config.start_time_s,
session_sample_rate=config.session_sample_rate,
min_turns=config.min_turns,
profile=config.sample_profile, # type: ignore[arg-type]
min_initial_input_tokens=config.min_initial_input_tokens,
max_initial_input_tokens=config.max_initial_input_tokens,
max_append_input_tokens=config.max_append_input_tokens,
max_output_tokens=config.max_output_tokens,
min_overlap_ratio=config.min_overlap_ratio,
)
)
stack: ManagedPdStack | None = None
previous_sigint = signal.getsignal(signal.SIGINT)
@@ -198,6 +222,7 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
pool_poll_include_sessions=config.pool_poll_include_sessions,
enable_backpressure=config.enable_backpressure,
backpressure_max_pause_s=config.backpressure_max_pause_s,
progress_interval_s=config.progress_interval_s,
)
if config.request_timeout_s is not None:
replay_config = replace(
@@ -258,12 +283,14 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
"pool_poll_include_sessions": config.pool_poll_include_sessions,
"enable_backpressure": config.enable_backpressure,
"backpressure_max_pause_s": config.backpressure_max_pause_s,
"progress_interval_s": config.progress_interval_s,
"sample_profile": config.sample_profile,
"min_initial_input_tokens": config.min_initial_input_tokens,
"max_initial_input_tokens": config.max_initial_input_tokens,
"max_append_input_tokens": config.max_append_input_tokens,
"max_output_tokens": config.max_output_tokens,
"min_overlap_ratio": config.min_overlap_ratio,
"use_trace_as_sample": config.use_trace_as_sample,
"sample_summary": asdict(sample_summary),
"topology": {
"model_path": config.topology.model_path,
@@ -310,3 +337,44 @@ def _header_mode_for(policy_name: str) -> str:
if policy_name == "kv-aware":
return "target-worker"
return "none"
def _summarize_trace_sample(
*,
input_trace_path: Path,
sampled_trace_path: Path,
profile: str,
session_sample_rate: float,
min_turns: int,
min_initial_input_tokens: int | None,
max_initial_input_tokens: int | None,
max_append_input_tokens: int | None,
max_output_tokens: int | None,
min_overlap_ratio: float | None,
) -> SessionSampleSummary:
requests = load_trace(sampled_trace_path)
if not requests:
raise ValueError(f"Trace sample is empty: {sampled_trace_path}")
session_turns = Counter(request.session_id for request in requests)
start_time_s = requests[0].timestamp_s
end_time_s = requests[-1].timestamp_s
return SessionSampleSummary(
input_trace_path=str(input_trace_path),
output_trace_path=str(sampled_trace_path),
request_count=len(requests),
session_count=len(session_turns),
multi_turn_session_count=sum(1 for turns in session_turns.values() if turns > 1),
start_time_s=start_time_s,
end_time_s=end_time_s,
sampled_duration_s=end_time_s - start_time_s,
session_sample_rate=session_sample_rate,
min_turns=min_turns,
profile=profile,
min_initial_input_tokens=min_initial_input_tokens,
max_initial_input_tokens=max_initial_input_tokens,
max_append_input_tokens=max_append_input_tokens,
max_output_tokens=max_output_tokens,
min_overlap_ratio=min_overlap_ratio,
mean_append_input_tokens=None,
mean_turn_overlap_ratio=None,
)

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import argparse
import asyncio
import shlex
from pathlib import Path
from agentic_pd_hybrid.benchmark import BenchmarkConfig, run_live_benchmark
@@ -260,6 +261,15 @@ def main() -> None:
default=2.0,
help="Cap on per-request backpressure sleep, regardless of D hint.",
)
replay.add_argument(
"--progress-interval-s",
type=float,
default=30.0,
help=(
"Write client-side replay progress to <output_dir>/replay-progress.jsonl "
"every N seconds. 0 disables the heartbeat."
),
)
sample = subparsers.add_parser(
"sample-sessions",
@@ -501,6 +511,15 @@ def main() -> None:
default=2.0,
help="Cap on per-request backpressure sleep, regardless of D hint.",
)
benchmark.add_argument(
"--progress-interval-s",
type=float,
default=30.0,
help=(
"Write client-side replay progress to <run_dir>/replay-progress.jsonl "
"every N seconds. 0 disables the heartbeat."
),
)
benchmark.add_argument(
"--sample-profile",
choices=["default", "small-append"],
@@ -512,6 +531,14 @@ def main() -> None:
benchmark.add_argument("--max-append-input-tokens", type=int, default=None)
benchmark.add_argument("--max-output-tokens", type=int, default=None)
benchmark.add_argument("--min-overlap-ratio", type=float, default=None)
benchmark.add_argument(
"--use-trace-as-sample",
action="store_true",
help=(
"Replay the provided --trace exactly instead of sampling sessions into "
"a new trace. Use this for prebuilt real-workload samples."
),
)
args = parser.parse_args()
@@ -586,6 +613,7 @@ def main() -> None:
pool_poll_include_sessions=not args.pool_poll_no_sessions,
enable_backpressure=args.enable_backpressure,
backpressure_max_pause_s=args.backpressure_max_pause_s,
progress_interval_s=args.progress_interval_s,
)
results = asyncio.run(replay_trace(config))
print(
@@ -732,12 +760,14 @@ def main() -> None:
pool_poll_include_sessions=not args.pool_poll_no_sessions,
enable_backpressure=args.enable_backpressure,
backpressure_max_pause_s=args.backpressure_max_pause_s,
progress_interval_s=args.progress_interval_s,
sample_profile=args.sample_profile,
min_initial_input_tokens=args.min_initial_input_tokens,
max_initial_input_tokens=args.max_initial_input_tokens,
max_append_input_tokens=args.max_append_input_tokens,
max_output_tokens=args.max_output_tokens,
min_overlap_ratio=args.min_overlap_ratio,
use_trace_as_sample=args.use_trace_as_sample,
launch_stack=True,
)
)
@@ -797,6 +827,26 @@ def _add_topology_arguments(parser: argparse.ArgumentParser) -> None:
"--no-trust-remote-code",
action="store_true",
)
parser.add_argument(
"--extra-server-args",
default="",
help="Extra arguments appended to every sglang.launch_server command.",
)
parser.add_argument(
"--prefill-extra-server-args",
default="",
help="Extra arguments appended only to prefill launch_server commands.",
)
parser.add_argument(
"--decode-extra-server-args",
default="",
help="Extra arguments appended only to decode launch_server commands.",
)
parser.add_argument(
"--direct-extra-server-args",
default="",
help="Extra arguments appended only to direct launch_server commands.",
)
def _topology_from_args(args: argparse.Namespace):
@@ -826,7 +876,13 @@ def _topology_from_args(args: argparse.Namespace):
force_rdma=args.force_rdma,
trust_remote_code=not args.no_trust_remote_code,
ib_device=args.ib_device,
direct_extra_server_args=("--enable-streaming-session",),
extra_server_args=tuple(shlex.split(args.extra_server_args)),
prefill_extra_server_args=tuple(shlex.split(args.prefill_extra_server_args)),
decode_extra_server_args=tuple(shlex.split(args.decode_extra_server_args)),
direct_extra_server_args=(
"--enable-streaming-session",
*tuple(shlex.split(args.direct_extra_server_args)),
),
)

View File

@@ -107,6 +107,7 @@ class ReplayConfig:
enable_backpressure: bool = False
backpressure_max_pause_s: float = 2.0
structural_log_dir: Path | None = None
progress_interval_s: float = 30.0
@dataclass
@@ -174,6 +175,62 @@ class ExecutionResult:
finish_reason: str | None = None
@dataclass
class ReplayProgress:
total_requests: int
output_path: Path
interval_s: float
start_time_s: float
submitted_count: int = 0
completed_count: int = 0
error_count: int = 0
truncated_count: int = 0
last_request_id: str | None = None
last_session_id: str | None = None
last_trace_timestamp_s: float | None = None
execution_modes: Counter[str] = field(default_factory=Counter)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def record_submitted(self, request: TraceRequest) -> None:
async with self.lock:
self.submitted_count += 1
self.last_request_id = request.request_id
self.last_session_id = request.session_id
self.last_trace_timestamp_s = request.timestamp_s
async def record_completed(self, row: RequestMetrics) -> None:
async with self.lock:
self.completed_count += 1
if row.error is not None:
self.error_count += 1
if _is_truncated(row):
self.truncated_count += 1
self.execution_modes[row.execution_mode] += 1
self.last_request_id = row.request_id
self.last_session_id = row.session_id
self.last_trace_timestamp_s = row.trace_timestamp_s
async def emit(self, phase: str) -> None:
async with self.lock:
event = {
"phase": phase,
"elapsed_s": round(time.perf_counter() - self.start_time_s, 3),
"total_requests": self.total_requests,
"submitted_count": self.submitted_count,
"completed_count": self.completed_count,
"inflight_count": self.submitted_count - self.completed_count,
"error_count": self.error_count,
"truncated_count": self.truncated_count,
"last_request_id": self.last_request_id,
"last_session_id": self.last_session_id,
"last_trace_timestamp_s": self.last_trace_timestamp_s,
"execution_modes": dict(sorted(self.execution_modes.items())),
}
self.output_path.parent.mkdir(parents=True, exist_ok=True)
with self.output_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event, sort_keys=True) + "\n")
async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
structural_dir = config.structural_log_dir
if structural_dir is None and config.output_path is not None:
@@ -199,6 +256,23 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
session_tail_tasks: dict[str, asyncio.Task[RequestMetrics]] = {}
direct_sessions: dict[str, DirectSessionState] = {}
direct_session_lock = asyncio.Lock()
progress = (
ReplayProgress(
total_requests=len(requests),
output_path=config.output_path.parent / "replay-progress.jsonl",
interval_s=config.progress_interval_s,
start_time_s=start_time,
)
if config.progress_interval_s > 0
else None
)
progress_stop = asyncio.Event()
progress_task: asyncio.Task[None] | None = None
if progress is not None:
await progress.emit("start")
progress_task = asyncio.create_task(
_progress_heartbeat(progress, progress_stop)
)
async with httpx.AsyncClient(timeout=config.timeout_s, trust_env=False) as client:
decode_residency = await _discover_decode_residency(
client=client,
@@ -230,6 +304,8 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
sleep_s = target_offset - (time.perf_counter() - start_time)
if sleep_s > 0:
await asyncio.sleep(sleep_s)
if progress is not None:
await progress.record_submitted(request)
tasks.append(
asyncio.create_task(
_run_request(
@@ -244,12 +320,15 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
direct_session_lock=direct_session_lock,
decode_residency=decode_residency,
depends_on=session_tail_tasks.get(request.session_id),
progress=progress,
)
)
)
session_tail_tasks[request.session_id] = tasks[-1]
results = await asyncio.gather(*tasks)
if progress is not None:
await progress.emit("requests-complete")
if poll_task is not None:
poll_task.cancel()
try:
@@ -285,6 +364,14 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
trace_path=config.trace_path,
router_url=config.router_url,
)
if progress is not None:
await progress.emit("final")
progress_stop.set()
if progress_task is not None:
try:
await progress_task
except asyncio.CancelledError:
pass
_structural_close()
return results
@@ -302,6 +389,7 @@ async def _run_request(
direct_session_lock: asyncio.Lock,
decode_residency: DecodeResidencyState,
depends_on: asyncio.Task[RequestMetrics] | None,
progress: ReplayProgress | None = None,
) -> RequestMetrics:
if depends_on is not None:
await depends_on
@@ -351,7 +439,7 @@ async def _run_request(
async with state_lock:
state.finish(request, decision)
return RequestMetrics.from_decision(
row = RequestMetrics.from_decision(
request,
decision,
mechanism_name=config.mechanism_name,
@@ -371,6 +459,29 @@ async def _run_request(
requested_output_tokens=execution.requested_output_tokens,
finish_reason=execution.finish_reason,
)
if progress is not None:
await progress.record_completed(row)
return row
async def _progress_heartbeat(
progress: ReplayProgress,
stop_event: asyncio.Event,
) -> None:
while not stop_event.is_set():
try:
await asyncio.wait_for(stop_event.wait(), timeout=progress.interval_s)
except asyncio.TimeoutError:
await progress.emit("heartbeat")
def _is_truncated(row: RequestMetrics) -> bool:
return (
row.actual_output_tokens is not None
and row.requested_output_tokens is not None
and row.requested_output_tokens > 1
and row.actual_output_tokens < row.requested_output_tokens * 0.5
)
async def _invoke_router(
@@ -643,16 +754,41 @@ async def _open_streaming_session(
request.input_length * 16,
(request.input_length + request.output_length) * 16,
)
response = await client.post(
f"{server_url.rstrip('/')}/open_session",
json={
"capacity_of_str_len": capacity,
"session_id": session_id,
"streaming": True,
},
timeout=_ADMISSION_PROBE_TIMEOUT_S,
)
response.raise_for_status()
payload = {
"capacity_of_str_len": capacity,
"session_id": session_id,
"streaming": True,
}
url = f"{server_url.rstrip('/')}/open_session"
response = await client.post(url, json=payload, timeout=_ADMISSION_PROBE_TIMEOUT_S)
try:
response.raise_for_status()
except httpx.HTTPStatusError:
if response.status_code != 400:
raise
await _structural_emit(
"session-lifecycle.jsonl",
{
"event": "open-session-400-retry",
"server_url": server_url,
"session_id": session_id,
"request_id": request.request_id,
"turn_id": request.turn_id,
"response_text": response.text[:512],
},
)
await _close_streaming_session(
client=client,
server_url=server_url,
session_id=session_id,
allow_missing=True,
)
response = await client.post(
url,
json=payload,
timeout=_ADMISSION_PROBE_TIMEOUT_S,
)
response.raise_for_status()
opened_session_id = response.json()
if opened_session_id != session_id:
raise ValueError(