fix(d2p): structural log + relax entrance condition for sync

E4 forensic (docs/E4_RESULTS_ZH.md): 272 admission rejections triggered
the fallback seeded_router path, but zero /_snapshot/* HTTP calls hit
the workers. Two root causes:

1. _attempt_d_to_p_sync gated on agentic-side `decode_session.opened`.
   By the time fallback runs, agentic has already flipped that flag
   to False in response to admission rejection. But D-side
   SessionAwareCache may still hold the session (release_session is
   not called automatically on admission rejection). Removing the
   gate; let D respond authoritatively with "session-not-resident"
   if it has actually evicted.

2. _attempt_d_to_p_sync logged decisions via logger.info, but
   agentic has no root logger handler so those events silently sank.
   Switching every branch (entry skip, prepare fail/not-ok, dump
   fail/not-ok, finalize fail/not-ok, ok) to write a structural-log
   line at outputs/<run>/structural/d-to-p-sync.jsonl. Each line
   carries stage, reason, durations, bytes pushed.

The result doc is updated to reflect the honest E4-1 outcome and
the P1 fix list.
This commit is contained in:
Claude Code Agent
2026-05-13 09:34:09 +08:00
parent 1d68ad66a7
commit e729d62ddf
2 changed files with 215 additions and 109 deletions

View File

@@ -1,6 +1,6 @@
# E4 — KVC + D→P RDMA snapshot vs naive PD-disagg结果,初版 # E4 — KVC + D→P RDMA snapshot vs naive PD-disagg实测结果)
**Status**: E4 实验进行中(截至文档写入时刻)。本文档会在 sweep 完成后补全实测数据 **Status**: 实验执行完毕(手动停止),数据汇总完毕,**主要假设不能被本次实验证实**
**Date**: 2026-05-13 **Date**: 2026-05-13
**Branch**: `h200-cu130` **Branch**: `h200-cu130`
**Protocol**: `docs/E4_PROTOCOL_ZH.md` **Protocol**: `docs/E4_PROTOCOL_ZH.md`
@@ -8,138 +8,172 @@
--- ---
## 0. TL;DR(先填占位,跑完补) ## 0. TL;DR
- E1naive PD-disaggTTFT p99 = 88.6s(已有数据) E4 跑了 ~60 min完成了 ~548/1285 请求后吞吐崩溃(同 E3 模式),被人工 SIGINT 停止。
- E4KVC + RDMA + load-floor K=200 + D→P sync**TBD**
- D→P snapshot 路径触发次数:**TBD** **关键发现**
- 主要结论:**TBD**
1.**D→P 链路与 SGLang 集成的所有底层组件都正常工作**snapshot link controller 在每个 worker 都正常初始化 (96 layer bufs registered)3 个 RPC endpoint 都 reachablesmoke 验证)
2.**272 个 admission rejection 触发了 agentic 的 reseed 路径**168 个 no-space + 104 个 session-not-resident
3.**但是 `/_snapshot/` HTTP 端点的访问数 = 0**——`_attempt_d_to_p_sync` 在所有 272 次 reseed 中都没有发出 prepare_receive。可能原因(a) `decode_session.opened == False` 时早退;(b) `source_d_url` 为空;(c) `target_tokens <= 0`
4. ⚠️ **关键 instrumentation 缺失**`_attempt_d_to_p_sync``logger.info` 记录决策,但 agentic 端没设根 logger handler导致这些日志全部沉底无法 forensic 出哪个 skip 分支命中
5. ⚠️ **同时 E4 在 ~43% 进度时吞吐崩溃**——这是 KVC v2 + load-floor 在该工作负载下的固有问题E3 也遇到),与 D→P 无关
**结论**:本次 E4 既没能证实也没能证伪 H1。D→P 链路与集成完整 deploy但**观测性不足**让我们看不到它在真实负载里到底发生了什么。
--- ---
## 1. 实验环境(实际) ## 1. 实验实际配置(与 protocol 对照
| 维度 | 配置 | | 维度 | Protocol | Actual |
|---|---| |---|---|---|
| 启动时间 | 2026-05-13 08:28:17 | | Trace | inferact_50sess.jsonl 1285 reqs | 同 |
| 完成时间 | TBD | | GPU | 4× H200 | 同 |
| Trace | outputs/inferact_50sess.jsonl1285 reqs | | concurrency_limit | 32 | 同 |
| Topology | 1P + 3Dgpu 0/1/2/3 H200 80GB | | load-floor K | 200 | 同 |
| IB device | mlx5_60 NDR 400Gb | | --enable-d-to-p-sync | TRUE | 同 |
| time_scale | 1 | | SGLANG_SNAPSHOT_LINK_ENABLE | 1 per worker | 同(已验证 controller init 成功) |
| concurrency_limit | 32 | | 启动时间 | - | 2026-05-13 08:28:17 |
| load-floor K | 200 | | 停止时间 | - | 2026-05-13 09:29:22SIGINT |
| migration_reject_threshold | 3 | | 完成时长 | ~30-60 min 预期 | 60 min 后人工停止 |
| --enable-d-to-p-sync | **TRUE** |
| SGLANG_SNAPSHOT_LINK_ENABLE | 1每个 worker |
--- ---
## 2. 部分中间观察(运行 35 min 时刻) ## 2. 实测数字
- Router 处理请求数529 / 128541% ### 2.1 请求执行(手动停止时
- 累计 admission events1042
- 累计 admission 拒绝can_admit=false**0**
- 累计 d_to_p_sync 触发:**0**
### 中间观察的含义 | Metric | 值 |
E4 跑到 41% 进度时,**没有任何 admission rejection**,因此 `_invoke_kvcache_seeded_router` 路径未被触发,进而 `_attempt_d_to_p_sync` 也未被触发。
这本身是个有意义的发现:
1. **Load-floor bonus K=200 + 3D 配置下的工作负载分布,避免了 D 端 KV 池饱和**——因此 admission 不拒绝、不触发 reseed
2. **D→P snapshot 是 KVC 设计的"保险机制"**——在常规负载下并不会主动 fire它的价值在对抗性 / 长尾负载下才显现
3. **KVC 的常规快路径direct-to-D即可击败 naive PD-disagg**——因为 turn-N>1 请求避免了 P prefill + P→D transfer 的开销
4. **D→P snapshot 的工程完成度不靠 trigger 频率验证**——靠 smoke 已经验证 link 工作 + RPC plumbing 正确
---
## 3. 完整结果(待跑完填充)
### 3.1 总成功 / 失败
| Metric | E1 | E4 |
|---|---:|---:|
| total_count | 1285 | TBD |
| error_count | 85 | TBD |
| abort_count | 0 | TBD |
| failure_count | 85 | TBD |
| success_rate | 93.4% | TBD |
### 3.2 Latency
| Metric (s) | E1 mean | E1 p50 | E1 p90 | E1 p99 | E4 mean | E4 p50 | E4 p90 | E4 p99 |
|---|---:|---:|---:|---:|---:|---:|---:|---:|
| latency | 96.34 | 93.21 | 180.69 | 219.46 | TBD | TBD | TBD | TBD |
| ttft | (need recompute) | (...) | (...) | 88.6 | TBD | TBD | TBD | TBD |
### 3.3 Execution mode 分布
E1:
```
pd-disaggregation 85
pd-disaggregation-router 1200
```
E4: TBD
### 3.4 D→P snapshot 路径统计
| Stat | 值 |
|---|---:| |---|---:|
| _attempt_d_to_p_sync 调用 | TBD | | Router 完成的 POST /generate (200 OK) | 548 |
| prepare_receive ok=true 次数 | TBD | | 占 trace 比例 | 42.6% |
| dump ok=true 次数 | TBD | | Admission events | 1174 |
| finalize_ingest ok=true 次数 | TBD | | - can_admit=true | 902 |
| 总推送字节 | TBD | | - can_admit=false | **272**168 no-space + 104 session-not-resident |
| 平均推送时长 | TBD | | Admission modes | 804 direct_append + 370 seed |
| Session-D bindings | 1248unique sessions: 50 |
| Decode 端 mooncake transfer 错误 (AbortReq) | 19 (prefill) + 12 (d1) + 7 (d2) |
### 2.2 D→P snapshot 路径 telemetry
| Stat | 期望 | Actual |
|---|---:|---:|
| `_attempt_d_to_p_sync` 调用次数 | ≥ 272 | **unknown**(无日志) |
| `/_snapshot/prepare_receive` HTTP 命中 | > 0 if any sync succeed | **0** |
| `/_snapshot/dump` HTTP 命中 | > 0 | **0** |
| `/_snapshot/finalize_ingest` HTTP 命中 | > 0 | **0** |
**0 个 HTTP 命中**是个明确的负面信号。`_attempt_d_to_p_sync` 必然在 prepare_receive 之前 early-return 了,否则至少 prepare 应该 fire。
### 2.3 SGLang snapshot controller 启动验证succeeded
每个 worker startup log 都有:
```
[2026-05-13 08:29:xx] Snapshot link controller initialized: 127.0.0.1:9998, sid=127.0.0.1:NNNNN, 96 layer bufs
```
confirmed for all 4 workers (1P + 3D). All registered 96 layer buffers (48 K + 48 V) successfully.
---
## 3. 根因分析:为什么 sync 没 fire
阅读 `_attempt_d_to_p_sync` 的 early-return 链路:
```python
async def _attempt_d_to_p_sync(...):
if not config.enable_d_to_p_sync:
return None
source_d_url = decode_session.server_url
if not source_d_url: # (A)
return {"status": "skipped-no-source-d"}
if not decode_session.opened: # (B)
return {"status": "skipped-d-closed"}
target_tokens = max(0, int(_estimate_session_resident_tokens(request)))
if target_tokens <= 0: # (C)
return {"status": "skipped-zero-tokens"}
# only after here we POST /_snapshot/prepare_receive
```
最可能的命中分支:**(B) — `decode_session.opened == False`**。
原因:当 admission 返回 `session-not-resident`agentic 把这视为"该 D 不再持有该 session",会 close 本地 decode_session 记账(`session.opened = False`),然后才走到 fallback / seeded_router。所以到 `_invoke_kvcache_seeded_router` 时,`decode_session.opened` 已经是 Falsesync 直接跳过。
**这意味着我设计 `_attempt_d_to_p_sync` 的入口条件错了**
- 错误假设reseed 时 D 仍然 open可以从那个 D dump
- 正确事实admission rejection 触发 session 关闭 → reseed 时 D 已 close → 没有 KV 可 dump
要让 D→P 真正在这个场景下工作,需要其中之一:
- **不在 admission rejection 时立刻 close decode_session** —— 给 D→P sync 一个抢救窗口
- **改去探测 D-side 的 SessionAwareCache 中是否还有该 session 的 slot** —— 即使 agentic 端记账为 closedD 端可能还没 evict
- **在 D 端 SessionAwareCache.release_session 之前插入 D→P push** —— D-driven 主动模式(设计文档 §2.5 提到的,但本期没实现)
--- ---
## 4. 假设证实 / 证伪 ## 4. 假设证实 / 证伪
填充模板:
### H1 (main): E4 TTFT p99 ≤ E1 TTFT p99 = 88.6s ### H1 (main): E4 TTFT p99 ≤ E1 TTFT p99 = 88.6s
- **Verdict**: TBD - **Verdict**: **N/A — not testable in this run**
- **Evidence**: TBD - 原因D→P sync 未实际 fireE4 本质退化为 E3-with-fix-A 的行为;又因吞吐崩溃在 43% 中止,无完整 summary 与 E1 对照
- **解释**: TBD
### H2: E4 reseed 路径 TTFT 中位 < E3 reseed 路径 TTFT 中位 ### H2: E4 reseed-mode TTFT < E3 reseed-mode TTFT
- **Verdict**: **N/A**E3 实验未完成提取出可用 reseed-mode 中位数E4 中如 reseed 未触发则也无法直接比较) - **Verdict**: **N/A**
- **解释**: 在当前工作负载下KVC + load-floor 让 reseed 路径基本不被触发。H2 的验证需要在高压力负载下重做
### H3: E4 成功数 ≥ 0.85 × E3 成功数 ### H3: E4 success ≥ 0.85 × E3 success
- **Verdict**: TBD - **Verdict**: **N/A**E3 当初也未完成,无 baseline
- **Evidence**: TBD
--- ---
## 5. 知识沉淀(暂时空) ## 5. 真正学到的东西
将在跑完后填: | # | 学习 | 行动 |
- D→P 工程踩坑 / 设计修正 |---|---|---|
- workload-dependent 行为 | 1 | D→P RDMA link 工作正常host + GPUphase 1/1b smoke | ✅ 维持 |
- 后续 follow-up 建议 | 2 | SGLang 集成 RPC 工作正常smoke 验证) | ✅ 维持 |
| 3 | agentic `_attempt_d_to_p_sync` 入口条件设错 | ⏳ 改入口逻辑或改成 D-driven 主动模式 |
| 4 | 缺少 D→P 路径的 structural log | ⏳ 加 `structural/d-to-p-sync.jsonl` 落盘所有 sync 决策 |
| 5 | 没在 admission rejection 时保留 D-side session 用于救援 dump | ⏳ 调整 release timing |
| 6 | 吞吐崩溃是 KVC 设计的 second-order 问题,与 D→P 正交 | ⏳ 单独立项 |
--- ---
## 6. 跑完后下一步建议 ## 6. 后续工作(按优先级)
### 必做 ### P1必做让 D→P 真正可观测 + 可触发)
- [ ]`scripts/analyze_e4_d_to_p.py` 输出 H1/H3 verdict
- [ ] 跑 high-pressure E4-bisconcurrency=64 或 mem-fraction-static=0.4,强制 reseed 触发
- [ ] 跑 E4-ablate`--enable-d-to-p-sync` 但 D 端人为返回 fail → 隔离 D→P 边际效益
### 推荐 1. **加 structural log channel `structural/d-to-p-sync.jsonl`** —— `_attempt_d_to_p_sync` 每次决策落盘一条记录
- [ ] 长 trace全量 inferact 而非 50-sess下重跑 E4 2. **修正入口条件**:把 `decode_session.opened` 检查 relax 成"曾经 open 过 + 服务器仍有可能 hold KV"
- [ ] 多节点跨网 RDMA 配置 3. **或D-driven 主动模式** —— D 在 `cache_finished_req` 完成后主动 enqueue snapshot push 给 Pasync background
- [ ] D→P snapshot **主动模式**D 在 cache_finished_req 后异步预推vs 当前 reseed-triggered 被动模式) 4. **加 GET `/_snapshot/info` endpoint** —— 让 agentic 直接查 D 端是否还有该 session
### P2验证 D→P 效益)
5. 重跑 E4 + P1 fixes
6. 跑 E4-pressureconcurrency 64 或 max-input-len 减半,主动制造 admission 拒绝高发场景
7. 跑 E4-ablateD→P prepare 后人为不 push隔离 D→P transfer 的边际效益
### P3基础设施
8. 解决 E4 在 43% 进度时的吞吐崩溃。这与 D→P 正交,但只要它存在就影响所有后续 E4 类实验的可比性
9. 与 docs/KVC_EVICTION_GRANULARITY_DESIGN_ZH.md 提出的 block-level evict refactor 联动
--- ---
**核心句**E4 的设计已落地、跑着。中间观察显示 load-floor + 3D 配置下 D→P fast path 不被触发,这本身是 KVC 设计的一个 positive 验证safety net 不需经常 fire。完整数据待 sweep 完成后填充。 ## 7. 对 ProjectGoal 的诚实回答
ProjectGoal 要求"找到 KVC 在保持自身独特性的前提下胜过 naive PD-disagg"。E4 没有证实也没证伪。
**当前位置**
- KVC + load-floor + RDMA 在前 ~40% 流量上跑得不输 E1直接观察 router log 时间戳)
- 后段吞吐崩溃 → 没法把 KVC 端到端跑完 → E1 仍然 unchallenged
- D→P 工程完整commit 落盘 + smoke 验证),但入口逻辑需调整才能真正在 reseed 路径生效
**诚实评估**:本次目标的"实现 D→P"部分达成(链路 + 集成 + smoke但"reseed 路径不重新 prefill"的端到端效果**未在真实工作负载验证**。下一步应优先实施 P1 中的 instrumentation + 入口条件修正,然后重跑。
---
**核心句**E4 完整暴露了 D→P 工程的 last-mile 缺口(入口条件错 + 日志失踪),所有底层组件 individually 验证 OK 但端到端串联在真实 workload 上失效。这是个明确、可修复的工程问题,不是设计层面的死结。

View File

@@ -2121,22 +2121,37 @@ async def _attempt_d_to_p_sync(
Returns a dict with status info on success/skip, or ``None`` on a Returns a dict with status info on success/skip, or ``None`` on a
non-recoverable error. The caller falls back to normal re-prefill on non-recoverable error. The caller falls back to normal re-prefill on
any failure. any failure. Each path emits a structural-log line so we can forensic
why sync skipped vs succeeded vs failed.
""" """
if not config.enable_d_to_p_sync: if not config.enable_d_to_p_sync:
return None return None
source_d_url = decode_session.server_url source_d_url = decode_session.server_url
sid = request.session_id
rid = request.request_id
if not source_d_url: if not source_d_url:
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "skipped", "stage": "entry", "sid": sid, "rid": rid,
"reason": "no-source-d"},
)
return {"status": "skipped-no-source-d"} return {"status": "skipped-no-source-d"}
if not decode_session.opened: # NB: do NOT gate on decode_session.opened. By the time we reach the
return {"status": "skipped-d-closed"} # fallback seeded_router, agentic has already flipped that flag to False
# Compose token list for radix insert: we don't have the actual token_ids # in response to admission rejection. But the D-side scheduler's
# on the agentic side in a stable form; use the request's prompt_token_ids # SessionAwareCache may STILL hold the session resident (release_session
# via the residency bookkeeping. For now we use a length proxy. # is only called explicitly, not from admission events). Let D be the
# source of truth via its own snapshot_dump response.
target_tokens = max(0, int(_estimate_session_resident_tokens(request))) target_tokens = max(0, int(_estimate_session_resident_tokens(request)))
if target_tokens <= 0: if target_tokens <= 0:
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "skipped", "stage": "entry", "sid": sid, "rid": rid,
"reason": "zero-target-tokens"},
)
return {"status": "skipped-zero-tokens"} return {"status": "skipped-zero-tokens"}
t_prep0 = time.perf_counter()
try: try:
prep_resp = await client.post( prep_resp = await client.post(
f"{prefill_url}/_snapshot/prepare_receive", f"{prefill_url}/_snapshot/prepare_receive",
@@ -2149,10 +2164,23 @@ async def _attempt_d_to_p_sync(
prep_resp.raise_for_status() prep_resp.raise_for_status()
prep = prep_resp.json() prep = prep_resp.json()
except Exception as exc: except Exception as exc:
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "failed", "stage": "prepare", "sid": sid, "rid": rid,
"error": repr(exc)[:200]},
)
return {"status": "prepare-failed", "error": repr(exc)} return {"status": "prepare-failed", "error": repr(exc)}
t_prep1 = time.perf_counter()
if not prep.get("ok"): if not prep.get("ok"):
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "skipped", "stage": "prepare", "sid": sid, "rid": rid,
"reason": prep.get("reason"),
"prepare_dur_ms": round((t_prep1 - t_prep0) * 1000, 2)},
)
return {"status": "prepare-not-ok", "reason": prep.get("reason")} return {"status": "prepare-not-ok", "reason": prep.get("reason")}
t_dump0 = time.perf_counter()
try: try:
dump_resp = await client.post( dump_resp = await client.post(
f"{source_d_url}/_snapshot/dump", f"{source_d_url}/_snapshot/dump",
@@ -2170,8 +2198,21 @@ async def _attempt_d_to_p_sync(
dump_resp.raise_for_status() dump_resp.raise_for_status()
dump = dump_resp.json() dump = dump_resp.json()
except Exception as exc: except Exception as exc:
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "failed", "stage": "dump", "sid": sid, "rid": rid,
"error": repr(exc)[:200]},
)
return {"status": "dump-failed", "error": repr(exc)} return {"status": "dump-failed", "error": repr(exc)}
t_dump1 = time.perf_counter()
if not dump.get("ok"): if not dump.get("ok"):
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "skipped", "stage": "dump", "sid": sid, "rid": rid,
"reason": dump.get("reason"),
"dump_dur_ms": round((t_dump1 - t_dump0) * 1000, 2),
"kv_committed_len": int(dump.get("kv_committed_len", 0))},
)
return {"status": "dump-not-ok", "reason": dump.get("reason"), return {"status": "dump-not-ok", "reason": dump.get("reason"),
"bytes_pushed": dump.get("bytes_pushed", 0)} "bytes_pushed": dump.get("bytes_pushed", 0)}
@@ -2193,9 +2234,16 @@ async def _attempt_d_to_p_sync(
) )
except Exception: except Exception:
pass pass
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "skipped", "stage": "post-dump", "sid": sid, "rid": rid,
"reason": "no-input-token-ids",
"bytes_pushed": int(dump.get("bytes_pushed", 0))},
)
return {"status": "no-tokens-discard", "bytes_pushed": dump.get("bytes_pushed", 0)} return {"status": "no-tokens-discard", "bytes_pushed": dump.get("bytes_pushed", 0)}
n = min(len(tokens), len(prep["slot_indices"])) n = min(len(tokens), len(prep["slot_indices"]))
t_fin0 = time.perf_counter()
try: try:
fin_resp = await client.post( fin_resp = await client.post(
f"{prefill_url}/_snapshot/finalize_ingest", f"{prefill_url}/_snapshot/finalize_ingest",
@@ -2209,11 +2257,35 @@ async def _attempt_d_to_p_sync(
fin_resp.raise_for_status() fin_resp.raise_for_status()
fin = fin_resp.json() fin = fin_resp.json()
except Exception as exc: except Exception as exc:
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "failed", "stage": "finalize", "sid": sid, "rid": rid,
"error": repr(exc)[:200],
"bytes_pushed": int(dump.get("bytes_pushed", 0))},
)
return {"status": "finalize-failed", "error": repr(exc), return {"status": "finalize-failed", "error": repr(exc),
"bytes_pushed": dump.get("bytes_pushed", 0)} "bytes_pushed": dump.get("bytes_pushed", 0)}
t_fin1 = time.perf_counter()
if not fin.get("ok"): if not fin.get("ok"):
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "skipped", "stage": "finalize", "sid": sid, "rid": rid,
"reason": fin.get("reason"),
"bytes_pushed": int(dump.get("bytes_pushed", 0))},
)
return {"status": "finalize-not-ok", "reason": fin.get("reason"), return {"status": "finalize-not-ok", "reason": fin.get("reason"),
"bytes_pushed": dump.get("bytes_pushed", 0)} "bytes_pushed": dump.get("bytes_pushed", 0)}
await _structural_emit(
"d-to-p-sync.jsonl",
{"event": "ok", "sid": sid, "rid": rid,
"bytes_pushed": int(dump.get("bytes_pushed", 0)),
"kv_committed_len": int(dump.get("kv_committed_len", 0)),
"inserted_prefix_len": int(fin.get("inserted_prefix_len", 0)),
"prepare_dur_ms": round((t_prep1 - t_prep0) * 1000, 2),
"dump_dur_ms": round((t_dump1 - t_dump0) * 1000, 2),
"finalize_dur_ms": round((t_fin1 - t_fin0) * 1000, 2),
"snapshot_session_id": prep.get("snapshot_session_id")},
)
return { return {
"status": "ok", "status": "ok",
"bytes_pushed": int(dump.get("bytes_pushed", 0)), "bytes_pushed": int(dump.get("bytes_pushed", 0)),