Compare commits
2 Commits
improve/au
...
kvc-real-a
| Author | SHA1 | Date | |
|---|---|---|---|
| 7568e041ff | |||
| 4e8f943875 |
514
docs/REAL_ALI_KVC_EXPERIMENT_LOG_ZH.md
Normal file
514
docs/REAL_ALI_KVC_EXPERIMENT_LOG_ZH.md
Normal file
@@ -0,0 +1,514 @@
|
||||
# Real Ali KVC 实验日志
|
||||
|
||||
**分支**:`kvc-real-ali-iter-v1`,从 `kvc-debug-journey-v1-to-v4` checkout 出来。
|
||||
**日期**:2026-05-11/12。
|
||||
**环境**:单机 8x NVIDIA H20,SGLang xPyD,模型 `/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct`。
|
||||
**真实 trace**:`/home/admin/cpfs/wjh/ali-trace/trace-qwen3-coder-formatted/041715-041717.jsonl`。
|
||||
|
||||
本日志记录真实 Ali workload 上的 KVC pd-hybrid 迭代。结论只按当前证据成立;`time-scale=10` smoke 和 KVC-friendly slice 不作为 full workload headline。
|
||||
|
||||
## 1. 当前最新进展
|
||||
|
||||
已新增真实 Ali trace 的固定样本和 sweep 管线:
|
||||
|
||||
- `scripts/prepare_real_ali_samples.py`:从真实 Ali trace 生成可复现实验样本,保留真实 input/output/hash_ids/timestamp,可选择 rebase timestamp。
|
||||
- `scripts/sweep_real_ali_kvc.sh`:对同一 prebuilt sample 依次跑 DP cache-aware、PD-disaggregation、KVC、KVC+backpressure。
|
||||
- `benchmark-live --use-trace-as-sample`:直接 replay 指定 trace,避免不同策略重新采样导致不可比。
|
||||
- `replay-progress.jsonl` heartbeat:后续长跑会每 30s 写客户端侧进度,不轮询 `/server_info`,避免扰动 scheduler。
|
||||
- `prepare_real_ali_samples.py --max-sampled-duration-s`:为快速 smoke 生成 capped sample;只用于迭代,不用于 headline。
|
||||
|
||||
已经完成的真实 Ali KVC-fit smoke:
|
||||
|
||||
- 样本:`outputs/real-ali-kvc-iter/samples-balanced/ali-kvc-fit-smallappend.jsonl`
|
||||
- 179 requests,64 sessions,全部 multi-turn;turn2+ 共 115 个,direct-eligible ratio 100%。
|
||||
- `time-scale=10`,concurrency 32。
|
||||
- DP cache-aware、PD-disaggregation、KVC no-backpressure、KVC+backpressure 均已完成。
|
||||
|
||||
## 2. 全量 Ali trace 画像
|
||||
|
||||
`outputs/real-ali-kvc-iter/ali-full-profile.json` 显示:
|
||||
|
||||
| 指标 | 数值 |
|
||||
|---|---:|
|
||||
| requests | 763,727 |
|
||||
| sessions | 555,905 |
|
||||
| multi-turn sessions | 39,247 |
|
||||
| turn2+ requests | 207,822 |
|
||||
| turn2+ direct-eligible ratio | 82.95% |
|
||||
| input p50 / p90 / p99 | 4,329 / 51,067 / 112,955 tokens |
|
||||
| output p50 / p90 / p99 | 93 / 826 / 5,616 tokens |
|
||||
| append p50 / p90 / p99 | 303 / 2,879 / 17,885 tokens |
|
||||
| inter-turn gap p50 / p90 / p99 | 4.65s / 38.68s / 1,133s |
|
||||
|
||||
这个 profile 说明 KVC 有真实适用面:turn2+ 的 hash overlap 和小 append 很常见。但 full workload 里 single-turn session 极多,KVC 收益会被显著稀释;因此必须分 slice 报告,不能只报 KVC-fit 子集。
|
||||
|
||||
## 3. 已跑样本
|
||||
|
||||
### Continuous 15min cold-window session sample
|
||||
|
||||
路径:`outputs/real-ali-kvc-iter/samples-window-900s-600req/ali-window.jsonl`
|
||||
|
||||
- 600 requests,439 sessions,32 multi-turn sessions。
|
||||
- rebased duration:886.544s,覆盖约 15min。
|
||||
- turn2+ requests:161,direct-eligible:143,ratio 88.8%。
|
||||
- input p50 / p90 / p99:3,871 / 68,234 / 98,131。
|
||||
- output p50 / p90 / p99:85 / 712 / 5,195。
|
||||
- append p50 / p90 / p99:274 / 2,202 / 16,120。
|
||||
- inter-turn gap p50 / p90 / p99:4.656s / 19.376s / 63.575s。
|
||||
|
||||
这是对 179-request KVC-fit smoke 的替代验证样本。它按 900s 窗口分成 15 个时间桶,轮转选择窗口内从 root 开始的整 session,直到达到 600 requests。这样避免 parent 缺失导致 `load_trace()` 把真实 session 切碎,也让请求覆盖整个 15min,而不是只取窗口开头 600 条。
|
||||
|
||||
重要边界:它是 **cold-window / new-session-only** sample,不是完整 raw production window;它排除了窗口开始前已经活跃的 ongoing sessions。因此可以用于“600+ 请求、15min、真实混合负载”的稳定性验证,但不能单独代表全量 Ali production window。
|
||||
|
||||
### KVC-fit small append
|
||||
|
||||
路径:`outputs/real-ali-kvc-iter/samples-balanced/ali-kvc-fit-smallappend.jsonl`
|
||||
|
||||
- 179 requests,64 sessions。
|
||||
- input p50 / p90:6,446 / 15,491。
|
||||
- output p50 / p90:112 / 1,159。
|
||||
- append p50 / p90:215 / 855。
|
||||
- overlap ratio p50 / p90:0.875 / 0.938。
|
||||
|
||||
这是 KVC-friendly slice,用来验证机制上限和 microbenchmark 是否能迁移到真实 token/hash 序列。
|
||||
|
||||
### Representative-mt / early multi-turn balanced
|
||||
|
||||
路径:`outputs/real-ali-kvc-iter/samples-balanced/ali-representative-mt.jsonl`
|
||||
|
||||
- 460 requests,64 sessions。
|
||||
- input p50 / p90:41,175 / 98,621。
|
||||
- append p50 / p90 / p99:272 / 1,979 / 13,900。
|
||||
|
||||
这个样本更接近真实 multi-turn 压力,后续用于验证大上下文、大 resident KV 下是否仍能稳定。但它当前实现是“从 start_time 后取最早 64 个 multi-turn session”,不是严格随机或分层 representative;正式 headline 需要按 input/append/output/gap 分层抽样。
|
||||
|
||||
### Capped smoke samples
|
||||
|
||||
为避免少数真实长 gap 让 smoke 浪费大量 wall time,新增:
|
||||
|
||||
- `outputs/real-ali-kvc-iter/samples-balanced-cap120s/ali-kvc-fit-smallappend.jsonl`:177 requests,64 sessions,duration 65.859s。
|
||||
- `outputs/real-ali-kvc-iter/samples-balanced-cap120s/ali-representative-mt.jsonl`:359 requests,64 sessions,duration 117.366s。
|
||||
|
||||
这些样本去掉了 KVC-fit 原样本末尾 timestamp 3613s 和 5414s 的两个请求,因此只能用于快速工程迭代;正式对比仍应使用完整样本或真实连续窗口。
|
||||
|
||||
## 4. 当前结果
|
||||
|
||||
### 4.1 DP cache-aware vs KVC+backpressure, KVC-fit, time-scale=10
|
||||
|
||||
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 |
|
||||
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|
|
||||
| 8-way DP cache-aware | 179 | 0 | 0 | 6.603s | 3.126s | 17.639s | 34.582s | 1.112s | 1.052s |
|
||||
| KVC 2P6D + worker admission + backpressure | 179 | 0 | 0 | 4.443s | 2.076s | 13.288s | 21.202s | 0.700s | 0.154s |
|
||||
|
||||
Paired comparison(KVC - DP):
|
||||
|
||||
- overall E2E mean delta:-2.161s;p50 delta:-1.427s;152/179 wins。
|
||||
- turn2+ direct 子集:mean delta -2.503s;p50 delta -1.508s;103/115 wins。
|
||||
- turn2+ TTFT mean delta:-0.930s;p50 delta -0.887s。
|
||||
|
||||
执行路径:
|
||||
|
||||
- KVC turn1 seed:64 requests。
|
||||
- `kvcache-direct-to-d-session`:115 requests。
|
||||
- session reused:115。
|
||||
- actual KV transfer blocks:623。
|
||||
|
||||
结构日志:
|
||||
|
||||
- admission probes:179,全为 `ok`。
|
||||
- transfer queue depth:p50=0,p90=2,max=3。
|
||||
- backpressure event:0。
|
||||
|
||||
解释:这轮证明的是 **KVC direct-to-D/session reuse** 在真实 Ali KVC-fit slice 上有正信号;不是证明 backpressure 有效,因为没有触发 backpressure。
|
||||
|
||||
### 4.2 PD-disaggregation baseline, KVC-fit, time-scale=10
|
||||
|
||||
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 |
|
||||
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|
|
||||
| PD-disaggregation 2P6D | 179 | 0 | 0 | 7.850s | 6.306s | 15.192s | 22.405s | 4.994s | 5.336s |
|
||||
|
||||
Paired comparison(PD - DP):
|
||||
|
||||
- overall E2E mean delta:+1.247s。
|
||||
- p50 delta:+2.231s。
|
||||
- 46/179 faster,133/179 slower。
|
||||
|
||||
解释:在这个 KVC-fit slice 上,普通 PD-disaggregation 明显弱于 8-way DP cache-aware。它付出了 P->D transfer 和拆分调度成本,却没有 KVC direct-to-D 的 bypass 收益。
|
||||
|
||||
### 4.3 KVC no-backpressure 消融, KVC-fit, time-scale=10
|
||||
|
||||
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 |
|
||||
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|
|
||||
| KVC 2P6D worker admission, no backpressure | 179 | 0 | 0 | 4.404s | 1.936s | 13.200s | 21.326s | 0.604s | 0.139s |
|
||||
|
||||
Paired comparison:
|
||||
|
||||
- KVC no-BP vs DP:mean delta -2.200s,p50 delta -1.434s,153/179 wins。
|
||||
- KVC no-BP vs PD-disaggregation:mean delta -3.447s,p50 delta -3.514s,163/179 wins。
|
||||
- KVC no-BP vs KVC+BP:mean delta -0.039s,p50 delta -0.005s,92/179 wins。
|
||||
|
||||
结构分析:
|
||||
|
||||
- direct-to-D rate:64.25%。
|
||||
- admission probes:179,全为 `ok`。
|
||||
- transfer queue depth:p50=0,p90=2,max=3。
|
||||
- pause_ms 全 0,backpressure event 0。
|
||||
|
||||
解释:no-backpressure 与 +backpressure 几乎等价,说明本 slice 没有 D 压力;本轮提升来自 direct-to-D,不来自反压。
|
||||
|
||||
### 4.4 Continuous 15min / 600-request window, time-scale=1
|
||||
|
||||
样本:`outputs/real-ali-kvc-iter/samples-window-900s-600req/ali-window.jsonl`
|
||||
|
||||
重要边界:这是 cold-window / new-session-only session sample,不是完整 raw window。它覆盖约 15min,且 `missing_parent_count=0`,但排除了窗口开始前已活跃的 ongoing sessions。
|
||||
|
||||
运行结果:
|
||||
|
||||
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 | TTFT p90 |
|
||||
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
|
||||
| DP cache-aware 8-way | 600 | 1 | 0 | 13.942s | 5.222s | 29.299s | 151.183s | 6.162s | 1.746s | 19.176s |
|
||||
| PD-disaggregation 2P6D | 600 | 1 | 0 | 40.886s | 40.018s | 84.681s | 113.460s | 38.545s | 37.782s | 81.852s |
|
||||
| KVC 2P6D mem_fraction_static=0.82 | 600 | 53 | 0 | 12.386s | 4.225s | 37.998s | 78.234s | 10.078s | 2.674s | 27.774s |
|
||||
|
||||
KVC 默认启动失败:
|
||||
|
||||
- 默认 KVC 2P6D 在 H20 上两次启动 OOM,均未进入 replay。
|
||||
- 日志显示 decode/prefill worker 启动时只剩约 526MB,模型加载阶段 OOM。
|
||||
- `--load-format layered` 不支持 Qwen3-Coder-30B-A3B。
|
||||
- 使用 `--mem-fraction-static 0.82` 后 KVC 能启动并完成 replay,但这降低了 KV pool 容量,因此这轮 KVC 是 memory-constrained rerun。
|
||||
- 尝试 `KVC_SEED_MIN_TURN_ID=2` + `mem_fraction_static=0.82` 时,启动阶段 scheduler 被 SIGKILL,疑似 OS OOM killer,未进入 replay。
|
||||
|
||||
Paired comparison(只在两边都有 latency 的 547 个 paired request 上计算):
|
||||
|
||||
- KVC vs DP:mean delta -1.335s,p50 delta -0.055s,p90 delta +19.371s,284 wins / 263 losses。
|
||||
- KVC vs PD:mean delta -28.341s,p50 delta -25.687s,p90 delta +2.834s,465 wins / 82 losses。
|
||||
|
||||
KVC 结构数据:
|
||||
|
||||
- execution modes:388 `pd-router-turn1-seed`,90 `kvcache-direct-to-d-session`,67 `pd-router-fallback-large-append-session-cap`,1 `pd-router-large-append-reseed`,1 `pd-router-turn1-d-backpressure`,53 `kvcache-centric` error rows。
|
||||
- direct-to-D rate:15.0%。
|
||||
- direct-to-D session 分布:413/439 sessions 在 0-20% direct rate;只有 6 sessions 在 80-100%。
|
||||
- admission probes:533;reason `ok` 531,`no-space` 2;queue depth p50=0,p90=2,max=5。
|
||||
- pause hint 非零 20 次,但没有 backpressure event,因为本轮 no-BP。
|
||||
|
||||
KVC error breakdown:
|
||||
|
||||
- 50 `ReadTimeout`。
|
||||
- 2 `HTTPStatusError 400 Bad Request` on `open_session`。
|
||||
- 1 context length error:同 DP/PD 的 `input_length=310521 > 262144`。
|
||||
- 错误主要集中在 turn1:50 turn1,3 turn2+。
|
||||
|
||||
解释:
|
||||
|
||||
1. KVC 相对普通 PD 仍明显更好,说明普通 P->D disaggregation 在真实 600-request 窗口上成本很高。
|
||||
2. KVC 相对 DP 只在 clean request 的 mean/p50 上有小幅正信号,但 p90 变差,而且 error_count 从 DP 的 1 增到 53。
|
||||
3. 因此在这个 600-request / 15min window 上,**KVC 不能算稳定提升系统**。主要问题不是 direct-to-D 快路径无效,而是该快路径覆盖率只有 15%,并且 turn1 seed / session admission / memory-constrained KV pool 引入大量 timeout。
|
||||
4. 这直接修正 179-request KVC-fit smoke 的结论:小样本证明 KVC 适用 slice 存在;600-request mixed window 证明当前实现还不能稳定服务真实混合 workload。
|
||||
|
||||
## 5. 是否已经相对 pd-colocation/pd-disaggregation 取得提升
|
||||
|
||||
当前只能下这个限定结论:
|
||||
|
||||
1. **相对 PD-disaggregation:已经取得清晰提升。**
|
||||
PD-disaggregation p50 6.306s,KVC no-BP p50 1.936s,KVC+BP p50 2.076s;TTFT p50 5.336s vs 0.139s/0.154s。收益主要来自 turn2+ 直接打到已有 D session,避免每轮 P 全量 prefill 和 P->D KV transfer。
|
||||
|
||||
2. **相对强 DP cache-aware:在 KVC-fit slice 上有提升。**
|
||||
KVC no-BP 和 KVC+BP overall mean/p50/p90/p99 都优于 DP,并且 paired wins 分别是 153/179 和 152/179。但这是 KVC-friendly、全 multi-turn、turn2+ 100% direct-eligible 的 slice,不代表 full Ali workload。
|
||||
|
||||
3. **相对 full workload:尚未证明。**
|
||||
全量 Ali 里 single-turn 占多数,且长上下文和长尾 output 较多。KVC 的收益面会被 single-turn 稀释,D resident KV 容量和 tail 稳定性会成为更强约束。
|
||||
|
||||
4. **相对 600-request / 15min mixed window:尚未取得稳定提升。**
|
||||
KVC clean E2E mean/p50 有正信号,但 error_count=53/600,p90 paired delta 相对 DP 变差。按“E2E + error/truncation”标准,这不能算系统性胜出。
|
||||
|
||||
## 6. 提升来自哪里
|
||||
|
||||
主要收益链路:
|
||||
|
||||
1. turn1 seed 在 D 上建立 session。
|
||||
2. turn2+ 若 append 小、hash overlap 高,直接走 `kvcache-direct-to-d-session`。
|
||||
3. direct-to-D 避免 P worker 参与,不走 P->D KV transfer。
|
||||
4. D 只对 append suffix 做少量 prefill,已有前缀 KV 直接复用。
|
||||
|
||||
这带来两个可观测收益:
|
||||
|
||||
- TTFT 大幅下降:turn2+ direct 子集 TTFT mean 从 DP 的约 1.04s 降到约 0.112s。
|
||||
- E2E 下降:direct 子集 mean E2E 降低约 2.50s。
|
||||
|
||||
另外,KVC 的 cached_tokens 统计显著更高:KVC mean cached tokens 5,992,DP mean 228。这说明它确实复用了大段真实前缀 KV。
|
||||
|
||||
## 7. 遇到的问题与修复
|
||||
|
||||
### 问题 1:通用 sampler 会被单个长 session 主导
|
||||
|
||||
现象:真实 Ali session 分布长尾明显,duration-oriented 采样容易选出不均衡样本,导致策略比较不可重复或不代表多 session 竞争。
|
||||
|
||||
修复:新增 `scripts/prepare_real_ali_samples.py`,按 session 上限和每 session turn 上限生成 balanced sample,并保留真实 token/hash/timestamp。
|
||||
|
||||
### 问题 2:不同策略重新采样导致不可比
|
||||
|
||||
现象:`benchmark-live` 原本会按参数重新采样,不同策略可能 replay 不同请求。
|
||||
|
||||
修复:新增 `--use-trace-as-sample`,所有策略 copy 并 replay 同一个 prebuilt sample;后续 paired comparison 才有意义。
|
||||
|
||||
### 问题 3:长 trace replay 中途没有进度
|
||||
|
||||
现象:`request-metrics.jsonl` 和 summary 只在 replay 结束后写出,跑真实 pacing 时很难判断是正常等待还是卡住。
|
||||
|
||||
修复:新增 `replay-progress.jsonl` heartbeat,每 30s 写 submitted/completed/inflight/errors/execution_modes。它只使用客户端本地状态,不访问 `/server_info`。
|
||||
|
||||
### 问题 4:`/server_info` polling 会扰动 scheduler
|
||||
|
||||
现象:旧 profiling 里 1Hz polling 曾明显改变错误数。真实 performance run 如果持续 poll pool,会把测量工具变成干扰源。
|
||||
|
||||
修复:`scripts/sweep_real_ali_kvc.sh` 默认关闭 pool polling。容量类问题依赖结构日志和必要时单独 profile run,不混入 headline performance run。
|
||||
|
||||
### 问题 5:backpressure smoke 没有触发 backpressure
|
||||
|
||||
现象:KVC-fit smoke 中 transfer queue max 只有 3,所有 admission reason 都是 `ok`,pause_ms 全 0。
|
||||
|
||||
结论:这轮不能证明 backpressure 有效,只能证明 direct-to-D 有效。需要更高 session 数、更大 resident KV 或更强并发的压力样本专门验证 backpressure。
|
||||
|
||||
### 问题 6:环境和旧报告不一致
|
||||
|
||||
现象:旧文档写的是 H100,本轮真实环境是 H20;模型路径也在 `/home/admin/cpfs/wjh/models/...`。
|
||||
|
||||
处理:本日志按 H20 记录;跨文档比较时只看机制趋势,不把 H100/H20 的绝对 latency 混为同一实验。
|
||||
|
||||
### 问题 7:continuous window 可能截断 session ancestry
|
||||
|
||||
现象:按 timestamp 直接截窗口可能留下 parent turn 在窗口外的请求。对 KVC 来说,这会让 session reuse/turn chain 与真实 workload 不一致。
|
||||
|
||||
处理:当前 continuous window 只作为待改进候选,不作为正式 headline。正式窗口需要保留 warmup ancestors,或显式保留原始 session chain 信息。
|
||||
|
||||
## 8. 如果后续 full workload 效果不好,当前假设
|
||||
|
||||
可能不是实现小 bug,而是方案适用面和资源约束共同导致:
|
||||
|
||||
1. **single-turn 稀释收益**:全量 Ali session 中 single-turn 占多数,KVC seed 只带来成本,没有 turn2+ reuse。
|
||||
2. **长上下文挤占 D KV 池**:input p90 51K、p99 113K,resident KV 长尾会限制 D 上可同时保留的 session。
|
||||
3. **direct 不是免费 lunch**:turn1 seed、admission probe、session lifecycle 都有额外成本;只有后续 turns 充分复用时才摊薄。
|
||||
4. **D 端容量和 eviction 仍是核心风险**:旧 SWE 实验已经显示 session pinning + D 容量盲选会造成 starvation;early multi-turn balanced 样本可能复现。
|
||||
5. **普通 PD-disaggregation 很弱**:如果 KVC fallback 频繁退回普通 PD 路径,整体会被 P->D transfer 和高 TTFT 拖垮。
|
||||
6. **H20 显存余量不足会改变 KVC 条件**:默认 KVC 2P6D 启动 OOM,必须降 `mem_fraction_static` 才能完成 600-request run;这会进一步降低 D KV pool,放大 session-cap 和 timeout。
|
||||
|
||||
## 9. 下一步验证顺序
|
||||
|
||||
1. 补 sticky/session-affinity baseline,拆出“粘到同一个 D”和“KVC direct bypass”的贡献。
|
||||
2. 补 KVC `seed-min-turn-id=2` 或 no-turn1-seed,验证 turn1 seed 成本是否值得。
|
||||
3. 在 early multi-turn balanced 样本上跑 DP / PD / KVC no-BP / KVC+BP,验证大上下文真实 multi-turn 压力。
|
||||
4. 选小固定样本跑 `time-scale=1`,避免只在压缩 replay 条件下成立。
|
||||
5. 做包含 single-turn 的 continuous window,并处理窗口内 parent turn 缺失问题,再按 full Ali 分布加权报告。
|
||||
6. 对最终候选配置做 N>=3 rerun,报告方差;N=1 只作为 smoke。
|
||||
7. 针对 600-request window 优先跑 `seed-min-turn-id=2`,减少 single-turn turn1 seed;目标是先把 53/600 errors 降到接近 DP 的 1/600,再讨论 latency。
|
||||
- 当前第一次尝试未进入 replay,启动阶段疑似 OS OOM;需要先解决 H20 启动显存/系统内存稳定性,或者降低 worker 数/模型内存占用。
|
||||
|
||||
## 10. KVC error 根因与 multi-turn-only 验证准备
|
||||
|
||||
用户指出 179-request run 不够,并要求至少 15min / 600+ 请求;当前正式问题定位基于
|
||||
`outputs/real-ali-kvc-iter/runs/window900s-600req-ts1-kvc-mem082/kvcache-centric-kv-aware-worker-admission-20260511T093601Z`。
|
||||
|
||||
### 10.1 为什么 KVC 有大量 error
|
||||
|
||||
该 run 为 600 requests,KVC mem0.82 有 53 errors:
|
||||
|
||||
- 50 个 `ReadTimeout`。
|
||||
- 2 个 `/open_session` HTTP 400。
|
||||
- 1 个真实超上下文错误:input 310,521 > model context 262,144。
|
||||
|
||||
按 turn 看,50/53 errors 在 turn1。按 structural admission 看,绝大多数失败请求在
|
||||
`structural/admission-events.jsonl` 中已经被 D 端 admission 判定 `can_admit=true`,所以这不是单纯的
|
||||
`d-session-cap` 或 `no-space`。主要失败点是 turn1 seed 进入 KVC seeded path 后,在
|
||||
P/D streaming session bootstrap、P->D transfer 或 router streaming 过程中超时;而混合真实窗口中 single-turn session 很多,
|
||||
这些 turn1 seed 对大多数 session 没有后续复用收益。
|
||||
|
||||
结论:当前 KVC error 的主因是 **对 single-turn / 未知是否多轮的 session 做了过多 turn1 seed**,它把大量新 session 推进
|
||||
KVC control-plane 和 seeded router 路径,增加超时和 session lifecycle 残留;不是 direct-to-D fast path 本身出错。
|
||||
|
||||
### 10.2 已做修复/消融开关
|
||||
|
||||
代码与脚本修复:
|
||||
|
||||
- `scripts/sweep_real_ali_kvc.sh` 新增 `KVC_SEED_ONLY_MULTITURN=1`,会传入
|
||||
`--kvcache-seed-only-multiturn-sessions`。这是 oracle 消融,用来验证“只 seed 会有后续 turn 的 session”能否消除 turn1 seed 错误。
|
||||
- `src/agentic_pd_hybrid/replay.py` 对 `/open_session` 400 增加 close+retry 一次,并写
|
||||
`structural/session-lifecycle.jsonl`。这是 lifecycle 健壮性修复,目标是处理 timeout 后服务端残留 session 导致的
|
||||
“already exists” 400,不改变 routing policy。
|
||||
- `scripts/prepare_real_ali_samples.py` 新增 `--window-min-turns` 和 `--window-output-name`,用于生成可复现的 multi-turn-only window 样本。
|
||||
|
||||
验证:
|
||||
|
||||
- `uv run python -m py_compile scripts/prepare_real_ali_samples.py src/agentic_pd_hybrid/replay.py src/agentic_pd_hybrid/benchmark.py src/agentic_pd_hybrid/cli.py`
|
||||
- `bash -n scripts/sweep_real_ali_kvc.sh`
|
||||
|
||||
### 10.3 已生成 multi-turn-only 样本
|
||||
|
||||
样本路径:
|
||||
|
||||
`outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl`
|
||||
|
||||
生成命令:
|
||||
|
||||
```bash
|
||||
uv run python scripts/prepare_real_ali_samples.py \
|
||||
--trace /home/admin/cpfs/wjh/ali-trace/trace-qwen3-coder-formatted/041715-041717.jsonl \
|
||||
--output-root outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn \
|
||||
--window-duration-s 900 \
|
||||
--window-target-requests 600 \
|
||||
--window-buckets 15 \
|
||||
--window-min-turns 2 \
|
||||
--window-output-name ali-window-multiturn.jsonl \
|
||||
--profiles representative-mt \
|
||||
--max-sessions 64 \
|
||||
--max-turns-per-session 12
|
||||
```
|
||||
|
||||
样本 profile:
|
||||
|
||||
- 626 requests,107 sessions,107 个都是 multi-turn sessions。
|
||||
- sampled duration 889.341s。
|
||||
- turn2+ = 519。
|
||||
- direct-eligible turn2+ = 473 / 519 = 91.1%。
|
||||
- missing parent = 0。
|
||||
- input p50/p90/p99 = 26,846 / 91,596 / 123,898 tokens。
|
||||
|
||||
这个 case 是“过滤掉 single-turn 的多轮压力切片”,不能替代 full mixed workload,但可以回答:
|
||||
如果 workload 确实以多轮 coding agent session 为主,KVC 的 direct-to-D 覆盖率和稳定性是否接近 microbenchmark。
|
||||
|
||||
### 10.4 GPU 资源阻塞
|
||||
|
||||
截至本次记录,8 张 GPU 均被另一组 `vllm serve` 进程占用,每张约 82GB / 98GB,端口为 51000-51007。
|
||||
这些不是本 repo 的 SGLang/benchmark 进程,因此未启动新的性能 run,避免把资源冲突误判为 KVC 策略失败。
|
||||
|
||||
GPU 释放后,优先跑两组:
|
||||
|
||||
```bash
|
||||
# 混合真实窗口:验证 seed-only-multiturn 是否把 53/600 errors 降下来
|
||||
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req/ali-window.jsonl \
|
||||
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-ts1-kvc-seedonly-mt-mem082 \
|
||||
RUNS="kvc" \
|
||||
TIME_SCALE=1 \
|
||||
CONCURRENCY=32 \
|
||||
REQUEST_TIMEOUT_S=600 \
|
||||
STACK_TIMEOUT_S=1800 \
|
||||
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
|
||||
KVC_SEED_ONLY_MULTITURN=1 \
|
||||
bash scripts/sweep_real_ali_kvc.sh
|
||||
|
||||
# 多轮-only workload:DP vs KVC,对照过滤 workload 是否能复现 microbenchmark 收益
|
||||
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl \
|
||||
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-mem082 \
|
||||
RUNS="dp kvc" \
|
||||
TIME_SCALE=1 \
|
||||
CONCURRENCY=32 \
|
||||
REQUEST_TIMEOUT_S=600 \
|
||||
STACK_TIMEOUT_S=1800 \
|
||||
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
|
||||
KVC_SEED_ONLY_MULTITURN=1 \
|
||||
bash scripts/sweep_real_ali_kvc.sh
|
||||
```
|
||||
|
||||
### 10.5 multi-turn-only 启动尝试被 GPU 占用阻塞
|
||||
|
||||
用户要求启动 multi-turn-only 的 `pd-disaggregation` vs `kvcache-centric` 对比。启动前检查发现 8 张 GPU 均被外部
|
||||
`vllm serve` 进程占用,每张约 84GB / 98GB,端口为 51000-51007。该进程不属于本 repo 的 SGLang/benchmark run。
|
||||
|
||||
因此本次没有强行启动 SGLang。原因是剩余显存不足以启动 2P6D 或 8-worker 对照,强行运行只会得到初始化 OOM 或不稳定超时,
|
||||
不能用于判断 KVC pd-hybrid 是否优于 pd-disaggregation。
|
||||
|
||||
资源释放后要运行的 multi-turn-only 对比命令:
|
||||
|
||||
```bash
|
||||
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl \
|
||||
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082 \
|
||||
RUNS="pd kvc" \
|
||||
TIME_SCALE=1 \
|
||||
CONCURRENCY=32 \
|
||||
REQUEST_TIMEOUT_S=600 \
|
||||
STACK_TIMEOUT_S=1800 \
|
||||
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
|
||||
KVC_SEED_ONLY_MULTITURN=1 \
|
||||
bash scripts/sweep_real_ali_kvc.sh
|
||||
```
|
||||
|
||||
### 10.6 multi-turn-only PD vs KVC 正式结果
|
||||
|
||||
资源释放后已启动并完成 multi-turn-only 对比。运行命令:
|
||||
|
||||
```bash
|
||||
TRACE=outputs/real-ali-kvc-iter/samples-window-900s-600req-multiturn/ali-window-multiturn.jsonl \
|
||||
OUT_ROOT=outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082 \
|
||||
RUNS="pd kvc" \
|
||||
TIME_SCALE=1 \
|
||||
CONCURRENCY=32 \
|
||||
REQUEST_TIMEOUT_S=600 \
|
||||
STACK_TIMEOUT_S=1800 \
|
||||
EXTRA_SERVER_ARGS="--mem-fraction-static 0.82" \
|
||||
KVC_SEED_ONLY_MULTITURN=1 \
|
||||
bash scripts/sweep_real_ali_kvc.sh
|
||||
```
|
||||
|
||||
Run 目录:
|
||||
|
||||
- PD:`outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082/pd-disaggregation-kv-aware-20260512T030433Z`
|
||||
- KVC:`outputs/real-ali-kvc-iter/runs/window900s-600req-multiturn-ts1-pd-vs-kvc-mem082/kvcache-centric-kv-aware-worker-admission-20260512T040444Z`
|
||||
|
||||
样本仍是 626 requests、107 sessions、889.341s,全部为 multi-turn session。
|
||||
|
||||
| 策略 | Requests | Errors | Trunc | E2E mean | E2E p50 | E2E p90 | E2E p99 | TTFT mean | TTFT p50 | TTFT p90 |
|
||||
|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|
|
||||
| PD-disaggregation 2P6D | 626 | 0 | 0 | 97.013s | 70.243s | 214.309s | 308.406s | 94.506s | 69.048s | 212.528s |
|
||||
| KVC 2P6D worker admission, no BP, seed-only-multiturn | 626 | 39 | 0 | 43.362s | 8.239s | 135.289s | 236.475s | 40.578s | 1.442s | 132.233s |
|
||||
|
||||
Paired comparison 只在 KVC 成功且 PD 也有 latency 的 587 个 request 上计算:
|
||||
|
||||
- PD same-request E2E mean/p50/p90/p99:97.457s / 70.514s / 214.095s / 309.362s。
|
||||
- KVC same-request E2E mean/p50/p90/p99:43.362s / 8.239s / 135.930s / 237.283s。
|
||||
- mean E2E reduction:55.5%。
|
||||
- absolute mean improvement:54.095s。
|
||||
- wins/losses:472 / 115。
|
||||
|
||||
按 KVC execution mode 拆分:
|
||||
|
||||
| KVC mode | Count | KVC mean | PD same mean | Reduction |
|
||||
|---|---:|---:|---:|---:|
|
||||
| `kvcache-direct-to-d-session` | 286 | 2.255s | 92.944s | 97.6% |
|
||||
| `pd-router-fallback-large-append-session-cap` | 169 | 88.869s | 113.614s | 21.8% |
|
||||
| `pd-router-d-session-reseed` | 25 | 143.456s | 106.501s | -34.7% |
|
||||
| `pd-router-large-append-reseed` | 19 | 47.631s | 88.981s | 46.5% |
|
||||
| `pd-router-turn1-seed` | 78 | 55.974s | 73.050s | 23.4% |
|
||||
|
||||
按 turn 深度拆分:
|
||||
|
||||
- turn2+:504 successful paired requests,KVC mean 40.791s vs PD mean 101.055s,reduction 59.6%。
|
||||
- turn>=5:299 successful paired requests,KVC mean 34.121s vs PD mean 104.697s,reduction 67.4%。
|
||||
- turn>=10:161 successful paired requests,KVC mean 39.027s vs PD mean 86.548s,reduction 54.9%。
|
||||
|
||||
KVC execution modes:
|
||||
|
||||
- `kvcache-direct-to-d-session`:286。
|
||||
- `pd-router-fallback-large-append-session-cap`:169。
|
||||
- `pd-router-turn1-seed`:78。
|
||||
- `pd-router-d-session-reseed`:25。
|
||||
- `pd-router-large-append-reseed`:19。
|
||||
- `pd-router-fallback-no-d-capacity`:4。
|
||||
- `pd-router-turn1-d-backpressure`:5。
|
||||
- `pd-router-d-session-reseed-after-eviction`:1。
|
||||
- error rows:39,记录为 `kvcache-centric`。
|
||||
|
||||
KVC 的收益来源非常清楚:286 个 direct-to-D request 的 same-request mean 从 PD 的 92.944s 降到 2.255s,基本复现了 microbenchmark 的核心机制收益。它跳过 P worker 和 P->D KV transfer,只在已有 D session 上处理 append suffix。总体 actual KV transfer blocks 从 PD same-success 的 4436 降到 KVC success 的 3827;summary 口径下 KVC total actual KV transfer blocks 为 3827,低于 PD 的 5276。
|
||||
|
||||
但这轮仍不能作为“稳定生产级胜出”结论:
|
||||
|
||||
1. KVC 仍有 39/626 errors,error rate 6.23%,PD 为 0。
|
||||
2. 39 个错误全部是客户端 `ReadTimeout`,不是服务端 OOM/Traceback;服务端日志未发现对应崩溃关键字。
|
||||
3. 错误分布:24 个 turn1,15 个 turn2+;按 decode 节点分布为 decode-0 15、decode-1 9、decode-3 7、decode-4 5、decode-5 3。
|
||||
4. 8 次 `/open_session` 400 已被 close+retry 兜住,并写入 `structural/session-lifecycle.jsonl`,没有形成 HTTP 400 error row。
|
||||
5. 长尾 drain 明显:PD 约 60min 完成,KVC 约 40min 完成;二者都远超 889s trace duration。KVC 在 900s 时已完成 490/626,而 PD 只完成 283/626,说明 KVC 中段吞吐更好,但最后几十个 large-append fallback 仍然拖尾。
|
||||
6. direct-to-D 覆盖率为 286/626 = 45.7%,低于样本静态 direct-eligible turn2+ ratio 91.1%。缺口主要来自 D session/residency capacity、large append session cap、reseed/fallback。
|
||||
|
||||
当前判断:
|
||||
|
||||
- 如果只看 successful paired request,multi-turn-only workload 上 KVC 相对 PD-disaggregation 已经有很强 E2E 提升,且提升主要来自 direct-to-D session reuse。
|
||||
- 如果按系统可靠性看,当前实现还不合格,因为 6.23% timeout 会抵消“稳定系统”的结论。
|
||||
- 真实 workload 与 microbenchmark 差距的主要原因不是 KVC fast path 无效,而是 fast path 覆盖率不足、D 侧 resident KV/session admission 压力、large append fallback、以及 seeded/reseed path 的 timeout 稳定性。
|
||||
450
scripts/prepare_real_ali_samples.py
Executable file
450
scripts/prepare_real_ali_samples.py
Executable file
@@ -0,0 +1,450 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Prepare balanced real-Ali trace samples for KVC experiments.
|
||||
|
||||
The generic sampler is duration-oriented and can be dominated by one long
|
||||
session. This script keeps real request lengths/timestamps but caps turns per
|
||||
session so live sweeps can compare policies on a repeatable multi-session
|
||||
workload.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import statistics
|
||||
from collections import defaultdict
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from agentic_pd_hybrid.trace import TraceRequest, load_trace
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SampleSummary:
|
||||
input_trace_path: str
|
||||
output_trace_path: str
|
||||
profile: str
|
||||
request_count: int
|
||||
session_count: int
|
||||
multi_turn_session_count: int
|
||||
turn2plus_count: int
|
||||
direct_eligible_turn2plus_count: int
|
||||
direct_eligible_turn2plus_ratio: float
|
||||
missing_parent_count: int
|
||||
max_sessions: int
|
||||
max_turns_per_session: int
|
||||
start_time_s: float
|
||||
end_time_s: float
|
||||
sampled_duration_s: float
|
||||
rebased_timestamps: bool
|
||||
input_tokens: dict[str, float] | None
|
||||
output_tokens: dict[str, float] | None
|
||||
append_tokens: dict[str, float] | None
|
||||
inter_turn_gap_s: dict[str, float] | None
|
||||
overlap_ratio: dict[str, float] | None
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--trace", type=Path, required=True)
|
||||
parser.add_argument("--output-root", type=Path, required=True)
|
||||
parser.add_argument("--max-sessions", type=int, default=64)
|
||||
parser.add_argument("--max-turns-per-session", type=int, default=12)
|
||||
parser.add_argument("--start-time-s", type=float, default=0.0)
|
||||
parser.add_argument(
|
||||
"--window-duration-s",
|
||||
type=float,
|
||||
default=None,
|
||||
help=(
|
||||
"If set, also write continuous-window samples that keep only requests "
|
||||
"inside [start-time, start-time + window-duration]."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--window-target-requests",
|
||||
type=int,
|
||||
default=None,
|
||||
help=(
|
||||
"For continuous-window samples, select whole sessions across time "
|
||||
"buckets until at least this many requests are included. This keeps "
|
||||
"the window span while making live runs tractable."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--window-buckets",
|
||||
type=int,
|
||||
default=15,
|
||||
help="Number of time buckets used with --window-target-requests.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--window-min-turns",
|
||||
type=int,
|
||||
default=1,
|
||||
help=(
|
||||
"Minimum number of in-window turns per selected session for "
|
||||
"continuous-window samples."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--window-output-name",
|
||||
default="ali-window.jsonl",
|
||||
help="Output filename for the continuous-window sample.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-sampled-duration-s",
|
||||
type=float,
|
||||
default=None,
|
||||
help=(
|
||||
"For balanced profile samples, drop requests after the first selected "
|
||||
"timestamp plus this duration. Use only for quick smoke runs; headline "
|
||||
"runs should preserve the full sampled span."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--profiles",
|
||||
nargs="+",
|
||||
default=["representative-mt", "kvc-fit-smallappend"],
|
||||
choices=["representative-mt", "kvc-fit-smallappend"],
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-rebase-timestamps",
|
||||
action="store_true",
|
||||
help="Keep original timestamps instead of shifting the sample to start at 0.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
requests = load_trace(args.trace)
|
||||
sessions: dict[str, list[TraceRequest]] = defaultdict(list)
|
||||
for request in requests:
|
||||
sessions[request.session_id].append(request)
|
||||
|
||||
args.output_root.mkdir(parents=True, exist_ok=True)
|
||||
if args.window_duration_s is not None:
|
||||
if args.window_target_requests is None:
|
||||
selected = _select_window(
|
||||
requests=requests,
|
||||
start_time_s=args.start_time_s,
|
||||
window_duration_s=args.window_duration_s,
|
||||
)
|
||||
profile = "window"
|
||||
else:
|
||||
selected = _select_window_session_sample(
|
||||
sessions=sessions,
|
||||
start_time_s=args.start_time_s,
|
||||
window_duration_s=args.window_duration_s,
|
||||
target_requests=args.window_target_requests,
|
||||
bucket_count=args.window_buckets,
|
||||
min_turns=args.window_min_turns,
|
||||
)
|
||||
profile = (
|
||||
"window-session-sample"
|
||||
if args.window_min_turns <= 1
|
||||
else f"window-session-sample-min{args.window_min_turns}turns"
|
||||
)
|
||||
output_path = args.output_root / args.window_output_name
|
||||
summary = _write_sample(
|
||||
selected=selected,
|
||||
input_trace_path=args.trace,
|
||||
output_path=output_path,
|
||||
profile=profile,
|
||||
max_sessions=args.max_sessions,
|
||||
max_turns_per_session=args.max_turns_per_session,
|
||||
rebase_timestamps=not args.no_rebase_timestamps,
|
||||
)
|
||||
print(
|
||||
f"window: wrote {summary.request_count} requests from "
|
||||
f"{summary.session_count} sessions to {output_path}"
|
||||
)
|
||||
|
||||
for profile in args.profiles:
|
||||
selected = _select_profile(
|
||||
sessions=sessions,
|
||||
profile=profile,
|
||||
start_time_s=args.start_time_s,
|
||||
max_sessions=args.max_sessions,
|
||||
max_turns_per_session=args.max_turns_per_session,
|
||||
max_sampled_duration_s=args.max_sampled_duration_s,
|
||||
)
|
||||
output_path = args.output_root / f"ali-{profile}.jsonl"
|
||||
summary = _write_sample(
|
||||
selected=selected,
|
||||
input_trace_path=args.trace,
|
||||
output_path=output_path,
|
||||
profile=profile,
|
||||
max_sessions=args.max_sessions,
|
||||
max_turns_per_session=args.max_turns_per_session,
|
||||
rebase_timestamps=not args.no_rebase_timestamps,
|
||||
)
|
||||
print(
|
||||
f"{profile}: wrote {summary.request_count} requests from "
|
||||
f"{summary.session_count} sessions to {output_path}"
|
||||
)
|
||||
|
||||
|
||||
def _select_profile(
|
||||
*,
|
||||
sessions: dict[str, list[TraceRequest]],
|
||||
profile: str,
|
||||
start_time_s: float,
|
||||
max_sessions: int,
|
||||
max_turns_per_session: int,
|
||||
max_sampled_duration_s: float | None,
|
||||
) -> list[TraceRequest]:
|
||||
eligible: list[list[TraceRequest]] = []
|
||||
for session_requests in sessions.values():
|
||||
ordered = _ordered(session_requests)
|
||||
if len(ordered) < 2:
|
||||
continue
|
||||
if ordered[0].timestamp_s < start_time_s:
|
||||
continue
|
||||
if profile == "kvc-fit-smallappend" and not _is_kvc_fit_smallappend(ordered):
|
||||
continue
|
||||
eligible.append(ordered[:max_turns_per_session])
|
||||
|
||||
eligible.sort(key=lambda items: (items[0].timestamp_s, items[0].session_id))
|
||||
selected_sessions = eligible[:max_sessions]
|
||||
selected = [request for items in selected_sessions for request in items]
|
||||
selected.sort(key=lambda request: (request.timestamp_s, request.chat_id))
|
||||
if selected and max_sampled_duration_s is not None:
|
||||
first_ts = selected[0].timestamp_s
|
||||
end_ts = first_ts + max_sampled_duration_s
|
||||
selected = [
|
||||
request for request in selected if request.timestamp_s <= end_ts
|
||||
]
|
||||
return selected
|
||||
|
||||
|
||||
def _select_window(
|
||||
*,
|
||||
requests: list[TraceRequest],
|
||||
start_time_s: float,
|
||||
window_duration_s: float,
|
||||
) -> list[TraceRequest]:
|
||||
end_time_s = start_time_s + window_duration_s
|
||||
selected = [
|
||||
request
|
||||
for request in requests
|
||||
if start_time_s <= request.timestamp_s <= end_time_s
|
||||
]
|
||||
selected.sort(key=lambda request: (request.timestamp_s, request.chat_id))
|
||||
return selected
|
||||
|
||||
|
||||
def _select_window_session_sample(
|
||||
*,
|
||||
sessions: dict[str, list[TraceRequest]],
|
||||
start_time_s: float,
|
||||
window_duration_s: float,
|
||||
target_requests: int,
|
||||
bucket_count: int,
|
||||
min_turns: int,
|
||||
) -> list[TraceRequest]:
|
||||
if target_requests <= 0:
|
||||
raise ValueError("--window-target-requests must be positive")
|
||||
if bucket_count <= 0:
|
||||
raise ValueError("--window-buckets must be positive")
|
||||
if min_turns <= 0:
|
||||
raise ValueError("--window-min-turns must be positive")
|
||||
|
||||
end_time_s = start_time_s + window_duration_s
|
||||
bucket_width_s = window_duration_s / bucket_count
|
||||
buckets: list[list[list[TraceRequest]]] = [[] for _ in range(bucket_count)]
|
||||
for session_requests in sessions.values():
|
||||
ordered = _ordered(session_requests)
|
||||
if not ordered:
|
||||
continue
|
||||
first = ordered[0]
|
||||
if first.timestamp_s < start_time_s or first.timestamp_s > end_time_s:
|
||||
continue
|
||||
in_window = [
|
||||
request
|
||||
for request in ordered
|
||||
if start_time_s <= request.timestamp_s <= end_time_s
|
||||
]
|
||||
if len(in_window) < min_turns:
|
||||
continue
|
||||
bucket_index = min(
|
||||
bucket_count - 1,
|
||||
int((first.timestamp_s - start_time_s) / bucket_width_s),
|
||||
)
|
||||
buckets[bucket_index].append(in_window)
|
||||
|
||||
for bucket in buckets:
|
||||
bucket.sort(key=lambda items: (items[0].timestamp_s, items[0].session_id))
|
||||
|
||||
selected_sessions: list[list[TraceRequest]] = []
|
||||
selected_count = 0
|
||||
positions = [0 for _ in range(bucket_count)]
|
||||
while selected_count < target_requests:
|
||||
progressed = False
|
||||
for index, bucket in enumerate(buckets):
|
||||
if positions[index] >= len(bucket):
|
||||
continue
|
||||
session_requests = bucket[positions[index]]
|
||||
positions[index] += 1
|
||||
selected_sessions.append(session_requests)
|
||||
selected_count += len(session_requests)
|
||||
progressed = True
|
||||
if selected_count >= target_requests:
|
||||
break
|
||||
if not progressed:
|
||||
break
|
||||
|
||||
selected = [request for items in selected_sessions for request in items]
|
||||
selected.sort(key=lambda request: (request.timestamp_s, request.chat_id))
|
||||
if len(selected) < target_requests:
|
||||
raise ValueError(
|
||||
f"window session sample selected only {len(selected)} requests; "
|
||||
f"target was {target_requests}"
|
||||
)
|
||||
return selected
|
||||
|
||||
|
||||
def _is_kvc_fit_smallappend(session_requests: list[TraceRequest]) -> bool:
|
||||
initial = session_requests[0]
|
||||
if initial.input_length < 2048 or initial.input_length > 16000:
|
||||
return False
|
||||
for request in session_requests:
|
||||
if request.output_length > 2048:
|
||||
return False
|
||||
for previous, current in zip(session_requests, session_requests[1:], strict=False):
|
||||
append_tokens = current.input_length - (
|
||||
previous.input_length + previous.output_length
|
||||
)
|
||||
if append_tokens <= 0 or append_tokens > 2048:
|
||||
return False
|
||||
if _overlap_ratio(previous, current) < 0.75:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _write_sample(
|
||||
*,
|
||||
selected: list[TraceRequest],
|
||||
input_trace_path: Path,
|
||||
output_path: Path,
|
||||
profile: str,
|
||||
max_sessions: int,
|
||||
max_turns_per_session: int,
|
||||
rebase_timestamps: bool,
|
||||
) -> SampleSummary:
|
||||
if not selected:
|
||||
raise ValueError(f"profile {profile!r} selected no requests")
|
||||
|
||||
first_ts = selected[0].timestamp_s
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with output_path.open("w", encoding="utf-8") as handle:
|
||||
for request in selected:
|
||||
timestamp = request.timestamp_s - first_ts if rebase_timestamps else request.timestamp_s
|
||||
payload = {
|
||||
"chat_id": request.chat_id,
|
||||
"parent_chat_id": request.parent_chat_id,
|
||||
"timestamp": round(timestamp, 6),
|
||||
"input_length": request.input_length,
|
||||
"output_length": request.output_length,
|
||||
"type": request.request_type,
|
||||
"turn": request.turn_id,
|
||||
"hash_ids": list(request.hash_ids),
|
||||
}
|
||||
handle.write(json.dumps(payload, sort_keys=True) + "\n")
|
||||
|
||||
sessions = defaultdict(list)
|
||||
for request in selected:
|
||||
sessions[request.session_id].append(request)
|
||||
|
||||
selected_chat_ids = {request.chat_id for request in selected}
|
||||
missing_parent_count = sum(
|
||||
1
|
||||
for request in selected
|
||||
if request.parent_chat_id >= 0 and request.parent_chat_id not in selected_chat_ids
|
||||
)
|
||||
append_values: list[float] = []
|
||||
gap_values: list[float] = []
|
||||
overlap_values: list[float] = []
|
||||
direct_eligible_count = 0
|
||||
for session_requests in sessions.values():
|
||||
ordered = _ordered(session_requests)
|
||||
for previous, current in zip(ordered, ordered[1:], strict=False):
|
||||
append_tokens = current.input_length - (
|
||||
previous.input_length + previous.output_length
|
||||
)
|
||||
overlap_ratio = _overlap_ratio(previous, current)
|
||||
append_values.append(float(append_tokens))
|
||||
gap_values.append(float(current.timestamp_s - previous.timestamp_s))
|
||||
overlap_values.append(overlap_ratio)
|
||||
if append_tokens > 0 and append_tokens <= 2048 and overlap_ratio > 0:
|
||||
direct_eligible_count += 1
|
||||
|
||||
turn2plus_count = sum(max(0, len(items) - 1) for items in sessions.values())
|
||||
|
||||
start = min(request.timestamp_s for request in selected)
|
||||
end = max(request.timestamp_s for request in selected)
|
||||
summary = SampleSummary(
|
||||
input_trace_path=str(input_trace_path),
|
||||
output_trace_path=str(output_path),
|
||||
profile=profile,
|
||||
request_count=len(selected),
|
||||
session_count=len(sessions),
|
||||
multi_turn_session_count=sum(1 for items in sessions.values() if len(items) > 1),
|
||||
turn2plus_count=turn2plus_count,
|
||||
direct_eligible_turn2plus_count=direct_eligible_count,
|
||||
direct_eligible_turn2plus_ratio=(
|
||||
direct_eligible_count / turn2plus_count if turn2plus_count else 0.0
|
||||
),
|
||||
missing_parent_count=missing_parent_count,
|
||||
max_sessions=max_sessions,
|
||||
max_turns_per_session=max_turns_per_session,
|
||||
start_time_s=0.0 if rebase_timestamps else start,
|
||||
end_time_s=end - start if rebase_timestamps else end,
|
||||
sampled_duration_s=end - start,
|
||||
rebased_timestamps=rebase_timestamps,
|
||||
input_tokens=_stats([float(request.input_length) for request in selected]),
|
||||
output_tokens=_stats([float(request.output_length) for request in selected]),
|
||||
append_tokens=_stats(append_values),
|
||||
inter_turn_gap_s=_stats(gap_values),
|
||||
overlap_ratio=_stats(overlap_values),
|
||||
)
|
||||
with output_path.with_suffix(output_path.suffix + ".summary.json").open(
|
||||
"w", encoding="utf-8"
|
||||
) as handle:
|
||||
json.dump(asdict(summary), handle, indent=2, sort_keys=True)
|
||||
return summary
|
||||
|
||||
|
||||
def _ordered(session_requests: list[TraceRequest]) -> list[TraceRequest]:
|
||||
return sorted(
|
||||
session_requests,
|
||||
key=lambda request: (request.timestamp_s, request.turn_id, request.chat_id),
|
||||
)
|
||||
|
||||
|
||||
def _overlap_ratio(previous: TraceRequest, current: TraceRequest) -> float:
|
||||
if not current.hash_ids:
|
||||
return 0.0
|
||||
previous_blocks = set(previous.hash_ids)
|
||||
overlap = sum(1 for block in current.hash_ids if block in previous_blocks)
|
||||
return overlap / len(current.hash_ids)
|
||||
|
||||
|
||||
def _stats(values: list[float]) -> dict[str, float] | None:
|
||||
if not values:
|
||||
return None
|
||||
ordered = sorted(values)
|
||||
return {
|
||||
"count": float(len(ordered)),
|
||||
"mean": statistics.fmean(ordered),
|
||||
"min": ordered[0],
|
||||
"p50": _percentile(ordered, 0.50),
|
||||
"p90": _percentile(ordered, 0.90),
|
||||
"p99": _percentile(ordered, 0.99),
|
||||
"max": ordered[-1],
|
||||
}
|
||||
|
||||
|
||||
def _percentile(sorted_values: list[float], percentile: float) -> float:
|
||||
if len(sorted_values) == 1:
|
||||
return sorted_values[0]
|
||||
return sorted_values[round((len(sorted_values) - 1) * percentile)]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
170
scripts/sweep_real_ali_kvc.sh
Executable file
170
scripts/sweep_real_ali_kvc.sh
Executable file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env bash
|
||||
# Real Ali workload sweep for KVC pd-hybrid.
|
||||
#
|
||||
# This script expects a prebuilt sample trace and replays it exactly for every
|
||||
# mechanism. It intentionally keeps pool polling disabled for performance runs.
|
||||
set -euo pipefail
|
||||
|
||||
REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
MODEL=${MODEL:-/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}
|
||||
TRACE=${TRACE:-outputs/real-ali-kvc-iter/samples-balanced/ali-kvc-fit-smallappend.jsonl}
|
||||
OUT_ROOT=${OUT_ROOT:-outputs/real-ali-kvc-iter/runs}
|
||||
TIME_SCALE=${TIME_SCALE:-1}
|
||||
CONCURRENCY=${CONCURRENCY:-32}
|
||||
REQUEST_TIMEOUT_S=${REQUEST_TIMEOUT_S:-300}
|
||||
STACK_TIMEOUT_S=${STACK_TIMEOUT_S:-1200}
|
||||
RUNS=${RUNS:-"dp kvc_bp"}
|
||||
EXTRA_SERVER_ARGS=${EXTRA_SERVER_ARGS:-}
|
||||
PREFILL_EXTRA_SERVER_ARGS=${PREFILL_EXTRA_SERVER_ARGS:-}
|
||||
DECODE_EXTRA_SERVER_ARGS=${DECODE_EXTRA_SERVER_ARGS:-}
|
||||
KVC_SEED_MIN_TURN_ID=${KVC_SEED_MIN_TURN_ID:-1}
|
||||
KVC_SEED_ONLY_MULTITURN=${KVC_SEED_ONLY_MULTITURN:-0}
|
||||
|
||||
mkdir -p "$OUT_ROOT"
|
||||
LOG="$OUT_ROOT/sweep.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date '+%F %T')] $*" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
common_args=(
|
||||
--trace "$TRACE"
|
||||
--model-path "$MODEL"
|
||||
--output-root "$OUT_ROOT"
|
||||
--use-trace-as-sample
|
||||
--time-scale "$TIME_SCALE"
|
||||
--concurrency-limit "$CONCURRENCY"
|
||||
--timeout-s "$STACK_TIMEOUT_S"
|
||||
--request-timeout-s "$REQUEST_TIMEOUT_S"
|
||||
)
|
||||
if [[ -n "$EXTRA_SERVER_ARGS" ]]; then
|
||||
common_args+=(--extra-server-args "$EXTRA_SERVER_ARGS")
|
||||
fi
|
||||
if [[ -n "$PREFILL_EXTRA_SERVER_ARGS" ]]; then
|
||||
common_args+=(--prefill-extra-server-args "$PREFILL_EXTRA_SERVER_ARGS")
|
||||
fi
|
||||
if [[ -n "$DECODE_EXTRA_SERVER_ARGS" ]]; then
|
||||
common_args+=(--decode-extra-server-args "$DECODE_EXTRA_SERVER_ARGS")
|
||||
fi
|
||||
|
||||
kvc_args=(
|
||||
"${common_args[@]}"
|
||||
--mechanism kvcache-centric
|
||||
--policy kv-aware
|
||||
--prefill-workers 2
|
||||
--decode-workers 6
|
||||
--prefill-tp-size 1
|
||||
--decode-tp-size 1
|
||||
--prefill-gpu-ids 0,1
|
||||
--decode-gpu-ids 2,3,4,5,6,7
|
||||
--transfer-backend mooncake
|
||||
--gpu-budget 8
|
||||
--kvcache-admission-mode worker
|
||||
--kvcache-seed-min-turn-id "$KVC_SEED_MIN_TURN_ID"
|
||||
--kvcache-seed-max-inflight-decode -1
|
||||
--kvcache-prefill-backup-policy release-after-transfer
|
||||
--kvcache-prefill-priority-eviction
|
||||
)
|
||||
if [[ "$KVC_SEED_ONLY_MULTITURN" == "1" ]]; then
|
||||
kvc_args+=(--kvcache-seed-only-multiturn-sessions)
|
||||
fi
|
||||
|
||||
run_dp() {
|
||||
log "=== DP cache-aware baseline: 8 direct workers ==="
|
||||
uv run agentic-pd-hybrid benchmark-live \
|
||||
"${common_args[@]}" \
|
||||
--mechanism pd-colo \
|
||||
--policy kv-aware \
|
||||
--prefill-workers 0 \
|
||||
--decode-workers 0 \
|
||||
--direct-workers 8 \
|
||||
--direct-tp-size 1 \
|
||||
--direct-gpu-ids 0,1,2,3,4,5,6,7 \
|
||||
--gpu-budget 8
|
||||
}
|
||||
|
||||
run_pd_disagg() {
|
||||
log "=== PD-disaggregation baseline: 2P6D ==="
|
||||
uv run agentic-pd-hybrid benchmark-live \
|
||||
"${common_args[@]}" \
|
||||
--mechanism pd-disaggregation \
|
||||
--policy kv-aware \
|
||||
--prefill-workers 2 \
|
||||
--decode-workers 6 \
|
||||
--prefill-tp-size 1 \
|
||||
--decode-tp-size 1 \
|
||||
--prefill-gpu-ids 0,1 \
|
||||
--decode-gpu-ids 2,3,4,5,6,7 \
|
||||
--transfer-backend mooncake \
|
||||
--gpu-budget 8
|
||||
}
|
||||
|
||||
run_pd_sticky() {
|
||||
log "=== PD-disaggregation sticky baseline: 2P6D ==="
|
||||
uv run agentic-pd-hybrid benchmark-live \
|
||||
"${common_args[@]}" \
|
||||
--mechanism pd-disaggregation \
|
||||
--policy sticky \
|
||||
--prefill-workers 2 \
|
||||
--decode-workers 6 \
|
||||
--prefill-tp-size 1 \
|
||||
--decode-tp-size 1 \
|
||||
--prefill-gpu-ids 0,1 \
|
||||
--decode-gpu-ids 2,3,4,5,6,7 \
|
||||
--transfer-backend mooncake \
|
||||
--gpu-budget 8
|
||||
}
|
||||
|
||||
run_kvc() {
|
||||
log "=== KVC baseline: 2P6D worker admission, no backpressure ==="
|
||||
uv run agentic-pd-hybrid benchmark-live "${kvc_args[@]}"
|
||||
}
|
||||
|
||||
run_kvc_bp() {
|
||||
log "=== KVC candidate: 2P6D worker admission + backpressure ==="
|
||||
uv run agentic-pd-hybrid benchmark-live \
|
||||
"${kvc_args[@]}" \
|
||||
--enable-backpressure \
|
||||
--backpressure-max-pause-s 2.0
|
||||
}
|
||||
|
||||
summarize_latest() {
|
||||
log "=== Latest summaries ==="
|
||||
find "$OUT_ROOT" -maxdepth 2 -name 'request-metrics.jsonl.summary.json' -print \
|
||||
| sort \
|
||||
| while read -r summary; do
|
||||
python - "$summary" <<'PY'
|
||||
import json, sys
|
||||
p=sys.argv[1]
|
||||
d=json.load(open(p))
|
||||
lat=d.get("latency_stats_s") or {}
|
||||
tt=d.get("ttft_stats_s") or {}
|
||||
em=d.get("execution_modes") or {}
|
||||
print(p)
|
||||
print(" reqs", d.get("request_count"), "errors", d.get("error_count"), "trunc", d.get("truncated_request_count"))
|
||||
print(" lat mean/p50/p90/p99", lat.get("mean"), lat.get("p50"), lat.get("p90"), lat.get("p99"))
|
||||
print(" ttft mean/p50/p90", tt.get("mean"), tt.get("p50"), tt.get("p90"))
|
||||
print(" modes", em)
|
||||
PY
|
||||
done | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Trace: $TRACE"
|
||||
log "Model: $MODEL"
|
||||
log "Runs: $RUNS | time-scale=$TIME_SCALE concurrency=$CONCURRENCY | kvc-seed-min-turn-id=$KVC_SEED_MIN_TURN_ID | kvc-seed-only-multiturn=$KVC_SEED_ONLY_MULTITURN"
|
||||
|
||||
for run in $RUNS; do
|
||||
case "$run" in
|
||||
dp) run_dp ;;
|
||||
pd) run_pd_disagg ;;
|
||||
pd_sticky) run_pd_sticky ;;
|
||||
kvc) run_kvc ;;
|
||||
kvc_bp) run_kvc_bp ;;
|
||||
*) log "Unknown run name: $run"; exit 2 ;;
|
||||
esac
|
||||
done
|
||||
|
||||
summarize_latest
|
||||
log "DONE"
|
||||
@@ -3,13 +3,20 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import signal
|
||||
import shutil
|
||||
from collections import Counter
|
||||
from dataclasses import asdict, dataclass, replace
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from agentic_pd_hybrid.replay import ReplayConfig, replay_trace
|
||||
from agentic_pd_hybrid.sampling import SessionSampleConfig, sample_trace_sessions
|
||||
from agentic_pd_hybrid.sampling import (
|
||||
SessionSampleConfig,
|
||||
SessionSampleSummary,
|
||||
sample_trace_sessions,
|
||||
)
|
||||
from agentic_pd_hybrid.stack import ManagedPdStack, launch_pd_stack
|
||||
from agentic_pd_hybrid.trace import load_trace
|
||||
from agentic_pd_hybrid.topology import SingleNodeTopology
|
||||
|
||||
|
||||
@@ -47,12 +54,14 @@ class BenchmarkConfig:
|
||||
pool_poll_include_sessions: bool = True
|
||||
enable_backpressure: bool = False
|
||||
backpressure_max_pause_s: float = 2.0
|
||||
progress_interval_s: float = 30.0
|
||||
sample_profile: str = "default"
|
||||
min_initial_input_tokens: int | None = None
|
||||
max_initial_input_tokens: int | None = None
|
||||
max_append_input_tokens: int | None = None
|
||||
max_output_tokens: int | None = None
|
||||
min_overlap_ratio: float | None = None
|
||||
use_trace_as_sample: bool = False
|
||||
launch_stack: bool = True
|
||||
|
||||
|
||||
@@ -94,22 +103,37 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
|
||||
)
|
||||
|
||||
sampled_trace_path = run_dir / "sampled-trace.jsonl"
|
||||
sample_summary = sample_trace_sessions(
|
||||
SessionSampleConfig(
|
||||
trace_path=config.trace_path,
|
||||
output_path=sampled_trace_path,
|
||||
target_duration_s=config.target_duration_s,
|
||||
start_time_s=config.start_time_s,
|
||||
if config.use_trace_as_sample:
|
||||
shutil.copyfile(config.trace_path, sampled_trace_path)
|
||||
sample_summary = _summarize_trace_sample(
|
||||
input_trace_path=config.trace_path,
|
||||
sampled_trace_path=sampled_trace_path,
|
||||
profile=config.sample_profile,
|
||||
session_sample_rate=config.session_sample_rate,
|
||||
min_turns=config.min_turns,
|
||||
profile=config.sample_profile, # type: ignore[arg-type]
|
||||
min_initial_input_tokens=config.min_initial_input_tokens,
|
||||
max_initial_input_tokens=config.max_initial_input_tokens,
|
||||
max_append_input_tokens=config.max_append_input_tokens,
|
||||
max_output_tokens=config.max_output_tokens,
|
||||
min_overlap_ratio=config.min_overlap_ratio,
|
||||
)
|
||||
)
|
||||
else:
|
||||
sample_summary = sample_trace_sessions(
|
||||
SessionSampleConfig(
|
||||
trace_path=config.trace_path,
|
||||
output_path=sampled_trace_path,
|
||||
target_duration_s=config.target_duration_s,
|
||||
start_time_s=config.start_time_s,
|
||||
session_sample_rate=config.session_sample_rate,
|
||||
min_turns=config.min_turns,
|
||||
profile=config.sample_profile, # type: ignore[arg-type]
|
||||
min_initial_input_tokens=config.min_initial_input_tokens,
|
||||
max_initial_input_tokens=config.max_initial_input_tokens,
|
||||
max_append_input_tokens=config.max_append_input_tokens,
|
||||
max_output_tokens=config.max_output_tokens,
|
||||
min_overlap_ratio=config.min_overlap_ratio,
|
||||
)
|
||||
)
|
||||
|
||||
stack: ManagedPdStack | None = None
|
||||
previous_sigint = signal.getsignal(signal.SIGINT)
|
||||
@@ -198,6 +222,7 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
|
||||
pool_poll_include_sessions=config.pool_poll_include_sessions,
|
||||
enable_backpressure=config.enable_backpressure,
|
||||
backpressure_max_pause_s=config.backpressure_max_pause_s,
|
||||
progress_interval_s=config.progress_interval_s,
|
||||
)
|
||||
if config.request_timeout_s is not None:
|
||||
replay_config = replace(
|
||||
@@ -258,12 +283,14 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
|
||||
"pool_poll_include_sessions": config.pool_poll_include_sessions,
|
||||
"enable_backpressure": config.enable_backpressure,
|
||||
"backpressure_max_pause_s": config.backpressure_max_pause_s,
|
||||
"progress_interval_s": config.progress_interval_s,
|
||||
"sample_profile": config.sample_profile,
|
||||
"min_initial_input_tokens": config.min_initial_input_tokens,
|
||||
"max_initial_input_tokens": config.max_initial_input_tokens,
|
||||
"max_append_input_tokens": config.max_append_input_tokens,
|
||||
"max_output_tokens": config.max_output_tokens,
|
||||
"min_overlap_ratio": config.min_overlap_ratio,
|
||||
"use_trace_as_sample": config.use_trace_as_sample,
|
||||
"sample_summary": asdict(sample_summary),
|
||||
"topology": {
|
||||
"model_path": config.topology.model_path,
|
||||
@@ -310,3 +337,44 @@ def _header_mode_for(policy_name: str) -> str:
|
||||
if policy_name == "kv-aware":
|
||||
return "target-worker"
|
||||
return "none"
|
||||
|
||||
|
||||
def _summarize_trace_sample(
|
||||
*,
|
||||
input_trace_path: Path,
|
||||
sampled_trace_path: Path,
|
||||
profile: str,
|
||||
session_sample_rate: float,
|
||||
min_turns: int,
|
||||
min_initial_input_tokens: int | None,
|
||||
max_initial_input_tokens: int | None,
|
||||
max_append_input_tokens: int | None,
|
||||
max_output_tokens: int | None,
|
||||
min_overlap_ratio: float | None,
|
||||
) -> SessionSampleSummary:
|
||||
requests = load_trace(sampled_trace_path)
|
||||
if not requests:
|
||||
raise ValueError(f"Trace sample is empty: {sampled_trace_path}")
|
||||
session_turns = Counter(request.session_id for request in requests)
|
||||
start_time_s = requests[0].timestamp_s
|
||||
end_time_s = requests[-1].timestamp_s
|
||||
return SessionSampleSummary(
|
||||
input_trace_path=str(input_trace_path),
|
||||
output_trace_path=str(sampled_trace_path),
|
||||
request_count=len(requests),
|
||||
session_count=len(session_turns),
|
||||
multi_turn_session_count=sum(1 for turns in session_turns.values() if turns > 1),
|
||||
start_time_s=start_time_s,
|
||||
end_time_s=end_time_s,
|
||||
sampled_duration_s=end_time_s - start_time_s,
|
||||
session_sample_rate=session_sample_rate,
|
||||
min_turns=min_turns,
|
||||
profile=profile,
|
||||
min_initial_input_tokens=min_initial_input_tokens,
|
||||
max_initial_input_tokens=max_initial_input_tokens,
|
||||
max_append_input_tokens=max_append_input_tokens,
|
||||
max_output_tokens=max_output_tokens,
|
||||
min_overlap_ratio=min_overlap_ratio,
|
||||
mean_append_input_tokens=None,
|
||||
mean_turn_overlap_ratio=None,
|
||||
)
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import shlex
|
||||
from pathlib import Path
|
||||
|
||||
from agentic_pd_hybrid.benchmark import BenchmarkConfig, run_live_benchmark
|
||||
@@ -260,6 +261,15 @@ def main() -> None:
|
||||
default=2.0,
|
||||
help="Cap on per-request backpressure sleep, regardless of D hint.",
|
||||
)
|
||||
replay.add_argument(
|
||||
"--progress-interval-s",
|
||||
type=float,
|
||||
default=30.0,
|
||||
help=(
|
||||
"Write client-side replay progress to <output_dir>/replay-progress.jsonl "
|
||||
"every N seconds. 0 disables the heartbeat."
|
||||
),
|
||||
)
|
||||
|
||||
sample = subparsers.add_parser(
|
||||
"sample-sessions",
|
||||
@@ -501,6 +511,15 @@ def main() -> None:
|
||||
default=2.0,
|
||||
help="Cap on per-request backpressure sleep, regardless of D hint.",
|
||||
)
|
||||
benchmark.add_argument(
|
||||
"--progress-interval-s",
|
||||
type=float,
|
||||
default=30.0,
|
||||
help=(
|
||||
"Write client-side replay progress to <run_dir>/replay-progress.jsonl "
|
||||
"every N seconds. 0 disables the heartbeat."
|
||||
),
|
||||
)
|
||||
benchmark.add_argument(
|
||||
"--sample-profile",
|
||||
choices=["default", "small-append"],
|
||||
@@ -512,6 +531,14 @@ def main() -> None:
|
||||
benchmark.add_argument("--max-append-input-tokens", type=int, default=None)
|
||||
benchmark.add_argument("--max-output-tokens", type=int, default=None)
|
||||
benchmark.add_argument("--min-overlap-ratio", type=float, default=None)
|
||||
benchmark.add_argument(
|
||||
"--use-trace-as-sample",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Replay the provided --trace exactly instead of sampling sessions into "
|
||||
"a new trace. Use this for prebuilt real-workload samples."
|
||||
),
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -586,6 +613,7 @@ def main() -> None:
|
||||
pool_poll_include_sessions=not args.pool_poll_no_sessions,
|
||||
enable_backpressure=args.enable_backpressure,
|
||||
backpressure_max_pause_s=args.backpressure_max_pause_s,
|
||||
progress_interval_s=args.progress_interval_s,
|
||||
)
|
||||
results = asyncio.run(replay_trace(config))
|
||||
print(
|
||||
@@ -732,12 +760,14 @@ def main() -> None:
|
||||
pool_poll_include_sessions=not args.pool_poll_no_sessions,
|
||||
enable_backpressure=args.enable_backpressure,
|
||||
backpressure_max_pause_s=args.backpressure_max_pause_s,
|
||||
progress_interval_s=args.progress_interval_s,
|
||||
sample_profile=args.sample_profile,
|
||||
min_initial_input_tokens=args.min_initial_input_tokens,
|
||||
max_initial_input_tokens=args.max_initial_input_tokens,
|
||||
max_append_input_tokens=args.max_append_input_tokens,
|
||||
max_output_tokens=args.max_output_tokens,
|
||||
min_overlap_ratio=args.min_overlap_ratio,
|
||||
use_trace_as_sample=args.use_trace_as_sample,
|
||||
launch_stack=True,
|
||||
)
|
||||
)
|
||||
@@ -797,6 +827,26 @@ def _add_topology_arguments(parser: argparse.ArgumentParser) -> None:
|
||||
"--no-trust-remote-code",
|
||||
action="store_true",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--extra-server-args",
|
||||
default="",
|
||||
help="Extra arguments appended to every sglang.launch_server command.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--prefill-extra-server-args",
|
||||
default="",
|
||||
help="Extra arguments appended only to prefill launch_server commands.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--decode-extra-server-args",
|
||||
default="",
|
||||
help="Extra arguments appended only to decode launch_server commands.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--direct-extra-server-args",
|
||||
default="",
|
||||
help="Extra arguments appended only to direct launch_server commands.",
|
||||
)
|
||||
|
||||
|
||||
def _topology_from_args(args: argparse.Namespace):
|
||||
@@ -826,7 +876,13 @@ def _topology_from_args(args: argparse.Namespace):
|
||||
force_rdma=args.force_rdma,
|
||||
trust_remote_code=not args.no_trust_remote_code,
|
||||
ib_device=args.ib_device,
|
||||
direct_extra_server_args=("--enable-streaming-session",),
|
||||
extra_server_args=tuple(shlex.split(args.extra_server_args)),
|
||||
prefill_extra_server_args=tuple(shlex.split(args.prefill_extra_server_args)),
|
||||
decode_extra_server_args=tuple(shlex.split(args.decode_extra_server_args)),
|
||||
direct_extra_server_args=(
|
||||
"--enable-streaming-session",
|
||||
*tuple(shlex.split(args.direct_extra_server_args)),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -107,6 +107,7 @@ class ReplayConfig:
|
||||
enable_backpressure: bool = False
|
||||
backpressure_max_pause_s: float = 2.0
|
||||
structural_log_dir: Path | None = None
|
||||
progress_interval_s: float = 30.0
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -174,6 +175,62 @@ class ExecutionResult:
|
||||
finish_reason: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplayProgress:
|
||||
total_requests: int
|
||||
output_path: Path
|
||||
interval_s: float
|
||||
start_time_s: float
|
||||
submitted_count: int = 0
|
||||
completed_count: int = 0
|
||||
error_count: int = 0
|
||||
truncated_count: int = 0
|
||||
last_request_id: str | None = None
|
||||
last_session_id: str | None = None
|
||||
last_trace_timestamp_s: float | None = None
|
||||
execution_modes: Counter[str] = field(default_factory=Counter)
|
||||
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
||||
|
||||
async def record_submitted(self, request: TraceRequest) -> None:
|
||||
async with self.lock:
|
||||
self.submitted_count += 1
|
||||
self.last_request_id = request.request_id
|
||||
self.last_session_id = request.session_id
|
||||
self.last_trace_timestamp_s = request.timestamp_s
|
||||
|
||||
async def record_completed(self, row: RequestMetrics) -> None:
|
||||
async with self.lock:
|
||||
self.completed_count += 1
|
||||
if row.error is not None:
|
||||
self.error_count += 1
|
||||
if _is_truncated(row):
|
||||
self.truncated_count += 1
|
||||
self.execution_modes[row.execution_mode] += 1
|
||||
self.last_request_id = row.request_id
|
||||
self.last_session_id = row.session_id
|
||||
self.last_trace_timestamp_s = row.trace_timestamp_s
|
||||
|
||||
async def emit(self, phase: str) -> None:
|
||||
async with self.lock:
|
||||
event = {
|
||||
"phase": phase,
|
||||
"elapsed_s": round(time.perf_counter() - self.start_time_s, 3),
|
||||
"total_requests": self.total_requests,
|
||||
"submitted_count": self.submitted_count,
|
||||
"completed_count": self.completed_count,
|
||||
"inflight_count": self.submitted_count - self.completed_count,
|
||||
"error_count": self.error_count,
|
||||
"truncated_count": self.truncated_count,
|
||||
"last_request_id": self.last_request_id,
|
||||
"last_session_id": self.last_session_id,
|
||||
"last_trace_timestamp_s": self.last_trace_timestamp_s,
|
||||
"execution_modes": dict(sorted(self.execution_modes.items())),
|
||||
}
|
||||
self.output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with self.output_path.open("a", encoding="utf-8") as handle:
|
||||
handle.write(json.dumps(event, sort_keys=True) + "\n")
|
||||
|
||||
|
||||
async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
|
||||
structural_dir = config.structural_log_dir
|
||||
if structural_dir is None and config.output_path is not None:
|
||||
@@ -199,6 +256,23 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
|
||||
session_tail_tasks: dict[str, asyncio.Task[RequestMetrics]] = {}
|
||||
direct_sessions: dict[str, DirectSessionState] = {}
|
||||
direct_session_lock = asyncio.Lock()
|
||||
progress = (
|
||||
ReplayProgress(
|
||||
total_requests=len(requests),
|
||||
output_path=config.output_path.parent / "replay-progress.jsonl",
|
||||
interval_s=config.progress_interval_s,
|
||||
start_time_s=start_time,
|
||||
)
|
||||
if config.progress_interval_s > 0
|
||||
else None
|
||||
)
|
||||
progress_stop = asyncio.Event()
|
||||
progress_task: asyncio.Task[None] | None = None
|
||||
if progress is not None:
|
||||
await progress.emit("start")
|
||||
progress_task = asyncio.create_task(
|
||||
_progress_heartbeat(progress, progress_stop)
|
||||
)
|
||||
async with httpx.AsyncClient(timeout=config.timeout_s, trust_env=False) as client:
|
||||
decode_residency = await _discover_decode_residency(
|
||||
client=client,
|
||||
@@ -230,6 +304,8 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
|
||||
sleep_s = target_offset - (time.perf_counter() - start_time)
|
||||
if sleep_s > 0:
|
||||
await asyncio.sleep(sleep_s)
|
||||
if progress is not None:
|
||||
await progress.record_submitted(request)
|
||||
tasks.append(
|
||||
asyncio.create_task(
|
||||
_run_request(
|
||||
@@ -244,12 +320,15 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
|
||||
direct_session_lock=direct_session_lock,
|
||||
decode_residency=decode_residency,
|
||||
depends_on=session_tail_tasks.get(request.session_id),
|
||||
progress=progress,
|
||||
)
|
||||
)
|
||||
)
|
||||
session_tail_tasks[request.session_id] = tasks[-1]
|
||||
|
||||
results = await asyncio.gather(*tasks)
|
||||
if progress is not None:
|
||||
await progress.emit("requests-complete")
|
||||
if poll_task is not None:
|
||||
poll_task.cancel()
|
||||
try:
|
||||
@@ -285,6 +364,14 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
|
||||
trace_path=config.trace_path,
|
||||
router_url=config.router_url,
|
||||
)
|
||||
if progress is not None:
|
||||
await progress.emit("final")
|
||||
progress_stop.set()
|
||||
if progress_task is not None:
|
||||
try:
|
||||
await progress_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
_structural_close()
|
||||
return results
|
||||
|
||||
@@ -302,6 +389,7 @@ async def _run_request(
|
||||
direct_session_lock: asyncio.Lock,
|
||||
decode_residency: DecodeResidencyState,
|
||||
depends_on: asyncio.Task[RequestMetrics] | None,
|
||||
progress: ReplayProgress | None = None,
|
||||
) -> RequestMetrics:
|
||||
if depends_on is not None:
|
||||
await depends_on
|
||||
@@ -351,7 +439,7 @@ async def _run_request(
|
||||
async with state_lock:
|
||||
state.finish(request, decision)
|
||||
|
||||
return RequestMetrics.from_decision(
|
||||
row = RequestMetrics.from_decision(
|
||||
request,
|
||||
decision,
|
||||
mechanism_name=config.mechanism_name,
|
||||
@@ -371,6 +459,29 @@ async def _run_request(
|
||||
requested_output_tokens=execution.requested_output_tokens,
|
||||
finish_reason=execution.finish_reason,
|
||||
)
|
||||
if progress is not None:
|
||||
await progress.record_completed(row)
|
||||
return row
|
||||
|
||||
|
||||
async def _progress_heartbeat(
|
||||
progress: ReplayProgress,
|
||||
stop_event: asyncio.Event,
|
||||
) -> None:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
await asyncio.wait_for(stop_event.wait(), timeout=progress.interval_s)
|
||||
except asyncio.TimeoutError:
|
||||
await progress.emit("heartbeat")
|
||||
|
||||
|
||||
def _is_truncated(row: RequestMetrics) -> bool:
|
||||
return (
|
||||
row.actual_output_tokens is not None
|
||||
and row.requested_output_tokens is not None
|
||||
and row.requested_output_tokens > 1
|
||||
and row.actual_output_tokens < row.requested_output_tokens * 0.5
|
||||
)
|
||||
|
||||
|
||||
async def _invoke_router(
|
||||
@@ -643,16 +754,41 @@ async def _open_streaming_session(
|
||||
request.input_length * 16,
|
||||
(request.input_length + request.output_length) * 16,
|
||||
)
|
||||
response = await client.post(
|
||||
f"{server_url.rstrip('/')}/open_session",
|
||||
json={
|
||||
"capacity_of_str_len": capacity,
|
||||
"session_id": session_id,
|
||||
"streaming": True,
|
||||
},
|
||||
timeout=_ADMISSION_PROBE_TIMEOUT_S,
|
||||
)
|
||||
response.raise_for_status()
|
||||
payload = {
|
||||
"capacity_of_str_len": capacity,
|
||||
"session_id": session_id,
|
||||
"streaming": True,
|
||||
}
|
||||
url = f"{server_url.rstrip('/')}/open_session"
|
||||
response = await client.post(url, json=payload, timeout=_ADMISSION_PROBE_TIMEOUT_S)
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError:
|
||||
if response.status_code != 400:
|
||||
raise
|
||||
await _structural_emit(
|
||||
"session-lifecycle.jsonl",
|
||||
{
|
||||
"event": "open-session-400-retry",
|
||||
"server_url": server_url,
|
||||
"session_id": session_id,
|
||||
"request_id": request.request_id,
|
||||
"turn_id": request.turn_id,
|
||||
"response_text": response.text[:512],
|
||||
},
|
||||
)
|
||||
await _close_streaming_session(
|
||||
client=client,
|
||||
server_url=server_url,
|
||||
session_id=session_id,
|
||||
allow_missing=True,
|
||||
)
|
||||
response = await client.post(
|
||||
url,
|
||||
json=payload,
|
||||
timeout=_ADMISSION_PROBE_TIMEOUT_S,
|
||||
)
|
||||
response.raise_for_status()
|
||||
opened_session_id = response.json()
|
||||
if opened_session_id != session_id:
|
||||
raise ValueError(
|
||||
|
||||
Reference in New Issue
Block a user