fix(d2p): structural log + relax entrance condition for sync
E4 forensic (docs/E4_RESULTS_ZH.md): 272 admission rejections triggered the fallback seeded_router path, but zero /_snapshot/* HTTP calls hit the workers. Two root causes: 1. _attempt_d_to_p_sync gated on agentic-side `decode_session.opened`. By the time fallback runs, agentic has already flipped that flag to False in response to admission rejection. But D-side SessionAwareCache may still hold the session (release_session is not called automatically on admission rejection). Removing the gate; let D respond authoritatively with "session-not-resident" if it has actually evicted. 2. _attempt_d_to_p_sync logged decisions via logger.info, but agentic has no root logger handler so those events silently sank. Switching every branch (entry skip, prepare fail/not-ok, dump fail/not-ok, finalize fail/not-ok, ok) to write a structural-log line at outputs/<run>/structural/d-to-p-sync.jsonl. Each line carries stage, reason, durations, bytes pushed. The result doc is updated to reflect the honest E4-1 outcome and the P1 fix list.
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
# E4 — KVC + D→P RDMA snapshot vs naive PD-disagg(结果,初版)
|
# E4 — KVC + D→P RDMA snapshot vs naive PD-disagg(实测结果)
|
||||||
|
|
||||||
**Status**: E4 实验进行中(截至文档写入时刻)。本文档会在 sweep 完成后补全实测数据。
|
**Status**: 实验执行完毕(手动停止),数据汇总完毕,**主要假设不能被本次实验证实**。
|
||||||
**Date**: 2026-05-13
|
**Date**: 2026-05-13
|
||||||
**Branch**: `h200-cu130`
|
**Branch**: `h200-cu130`
|
||||||
**Protocol**: `docs/E4_PROTOCOL_ZH.md`
|
**Protocol**: `docs/E4_PROTOCOL_ZH.md`
|
||||||
@@ -8,138 +8,172 @@
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 0. TL;DR(先填占位,跑完补)
|
## 0. TL;DR
|
||||||
|
|
||||||
- E1(naive PD-disagg):TTFT p99 = 88.6s(已有数据)
|
E4 跑了 ~60 min,完成了 ~548/1285 请求后吞吐崩溃(同 E3 模式),被人工 SIGINT 停止。
|
||||||
- E4(KVC + RDMA + load-floor K=200 + D→P sync):**TBD**
|
|
||||||
- D→P snapshot 路径触发次数:**TBD**
|
**关键发现**:
|
||||||
- 主要结论:**TBD**
|
|
||||||
|
1. ✅ **D→P 链路与 SGLang 集成的所有底层组件都正常工作**:snapshot link controller 在每个 worker 都正常初始化 (96 layer bufs registered),3 个 RPC endpoint 都 reachable(smoke 验证)
|
||||||
|
2. ✅ **272 个 admission rejection 触发了 agentic 的 reseed 路径**(168 个 no-space + 104 个 session-not-resident)
|
||||||
|
3. ❌ **但是 `/_snapshot/` HTTP 端点的访问数 = 0**——`_attempt_d_to_p_sync` 在所有 272 次 reseed 中都没有发出 prepare_receive。可能原因:(a) `decode_session.opened == False` 时早退;(b) `source_d_url` 为空;(c) `target_tokens <= 0`
|
||||||
|
4. ⚠️ **关键 instrumentation 缺失**:`_attempt_d_to_p_sync` 用 `logger.info` 记录决策,但 agentic 端没设根 logger handler,导致这些日志全部沉底,无法 forensic 出哪个 skip 分支命中
|
||||||
|
5. ⚠️ **同时 E4 在 ~43% 进度时吞吐崩溃**——这是 KVC v2 + load-floor 在该工作负载下的固有问题(E3 也遇到),与 D→P 无关
|
||||||
|
|
||||||
|
**结论**:本次 E4 既没能证实也没能证伪 H1。D→P 链路与集成完整 deploy,但**观测性不足**让我们看不到它在真实负载里到底发生了什么。
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 1. 实验环境(实际)
|
## 1. 实验实际配置(与 protocol 对照)
|
||||||
|
|
||||||
| 维度 | 配置 |
|
| 维度 | Protocol | Actual |
|
||||||
|---|---|
|
|---|---|---|
|
||||||
| 启动时间 | 2026-05-13 08:28:17 |
|
| Trace | inferact_50sess.jsonl 1285 reqs | 同 |
|
||||||
| 完成时间 | TBD |
|
| GPU | 4× H200 | 同 |
|
||||||
| Trace | outputs/inferact_50sess.jsonl,1285 reqs |
|
| concurrency_limit | 32 | 同 |
|
||||||
| Topology | 1P + 3D,gpu 0/1/2/3 H200 80GB |
|
| load-floor K | 200 | 同 |
|
||||||
| IB device | mlx5_60 NDR 400Gb |
|
| --enable-d-to-p-sync | TRUE | 同 |
|
||||||
| time_scale | 1 |
|
| SGLANG_SNAPSHOT_LINK_ENABLE | 1 per worker | 同(已验证 controller init 成功) |
|
||||||
| concurrency_limit | 32 |
|
| 启动时间 | - | 2026-05-13 08:28:17 |
|
||||||
| load-floor K | 200 |
|
| 停止时间 | - | 2026-05-13 09:29:22(SIGINT) |
|
||||||
| migration_reject_threshold | 3 |
|
| 完成时长 | ~30-60 min 预期 | 60 min 后人工停止 |
|
||||||
| --enable-d-to-p-sync | **TRUE** |
|
|
||||||
| SGLANG_SNAPSHOT_LINK_ENABLE | 1(每个 worker) |
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 2. 部分中间观察(运行 35 min 时刻)
|
## 2. 实测数字
|
||||||
|
|
||||||
- Router 处理请求数:529 / 1285(41%)
|
### 2.1 请求执行(手动停止时)
|
||||||
- 累计 admission events:1042
|
|
||||||
- 累计 admission 拒绝(can_admit=false):**0**
|
|
||||||
- 累计 d_to_p_sync 触发:**0**
|
|
||||||
|
|
||||||
### 中间观察的含义
|
| Metric | 值 |
|
||||||
|
|
||||||
E4 跑到 41% 进度时,**没有任何 admission rejection**,因此 `_invoke_kvcache_seeded_router` 路径未被触发,进而 `_attempt_d_to_p_sync` 也未被触发。
|
|
||||||
|
|
||||||
这本身是个有意义的发现:
|
|
||||||
|
|
||||||
1. **Load-floor bonus K=200 + 3D 配置下的工作负载分布,避免了 D 端 KV 池饱和**——因此 admission 不拒绝、不触发 reseed
|
|
||||||
2. **D→P snapshot 是 KVC 设计的"保险机制"**——在常规负载下并不会主动 fire;它的价值在对抗性 / 长尾负载下才显现
|
|
||||||
3. **KVC 的常规快路径(direct-to-D)即可击败 naive PD-disagg**——因为 turn-N>1 请求避免了 P prefill + P→D transfer 的开销
|
|
||||||
4. **D→P snapshot 的工程完成度不靠 trigger 频率验证**——靠 smoke 已经验证 link 工作 + RPC plumbing 正确
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 3. 完整结果(待跑完填充)
|
|
||||||
|
|
||||||
### 3.1 总成功 / 失败
|
|
||||||
|
|
||||||
| Metric | E1 | E4 |
|
|
||||||
|---|---:|---:|
|
|
||||||
| total_count | 1285 | TBD |
|
|
||||||
| error_count | 85 | TBD |
|
|
||||||
| abort_count | 0 | TBD |
|
|
||||||
| failure_count | 85 | TBD |
|
|
||||||
| success_rate | 93.4% | TBD |
|
|
||||||
|
|
||||||
### 3.2 Latency
|
|
||||||
|
|
||||||
| Metric (s) | E1 mean | E1 p50 | E1 p90 | E1 p99 | E4 mean | E4 p50 | E4 p90 | E4 p99 |
|
|
||||||
|---|---:|---:|---:|---:|---:|---:|---:|---:|
|
|
||||||
| latency | 96.34 | 93.21 | 180.69 | 219.46 | TBD | TBD | TBD | TBD |
|
|
||||||
| ttft | (need recompute) | (...) | (...) | 88.6 | TBD | TBD | TBD | TBD |
|
|
||||||
|
|
||||||
### 3.3 Execution mode 分布
|
|
||||||
|
|
||||||
E1:
|
|
||||||
```
|
|
||||||
pd-disaggregation 85
|
|
||||||
pd-disaggregation-router 1200
|
|
||||||
```
|
|
||||||
|
|
||||||
E4: TBD
|
|
||||||
|
|
||||||
### 3.4 D→P snapshot 路径统计
|
|
||||||
|
|
||||||
| Stat | 值 |
|
|
||||||
|---|---:|
|
|---|---:|
|
||||||
| _attempt_d_to_p_sync 调用 | TBD |
|
| Router 完成的 POST /generate (200 OK) | 548 |
|
||||||
| prepare_receive ok=true 次数 | TBD |
|
| 占 trace 比例 | 42.6% |
|
||||||
| dump ok=true 次数 | TBD |
|
| Admission events | 1174 |
|
||||||
| finalize_ingest ok=true 次数 | TBD |
|
| - can_admit=true | 902 |
|
||||||
| 总推送字节 | TBD |
|
| - can_admit=false | **272**(168 no-space + 104 session-not-resident) |
|
||||||
| 平均推送时长 | TBD |
|
| Admission modes | 804 direct_append + 370 seed |
|
||||||
|
| Session-D bindings | 1248(unique sessions: 50) |
|
||||||
|
| Decode 端 mooncake transfer 错误 (AbortReq) | 19 (prefill) + 12 (d1) + 7 (d2) |
|
||||||
|
|
||||||
|
### 2.2 D→P snapshot 路径 telemetry
|
||||||
|
|
||||||
|
| Stat | 期望 | Actual |
|
||||||
|
|---|---:|---:|
|
||||||
|
| `_attempt_d_to_p_sync` 调用次数 | ≥ 272 | **unknown**(无日志) |
|
||||||
|
| `/_snapshot/prepare_receive` HTTP 命中 | > 0 if any sync succeed | **0** |
|
||||||
|
| `/_snapshot/dump` HTTP 命中 | > 0 | **0** |
|
||||||
|
| `/_snapshot/finalize_ingest` HTTP 命中 | > 0 | **0** |
|
||||||
|
|
||||||
|
**0 个 HTTP 命中**是个明确的负面信号。`_attempt_d_to_p_sync` 必然在 prepare_receive 之前 early-return 了,否则至少 prepare 应该 fire。
|
||||||
|
|
||||||
|
### 2.3 SGLang snapshot controller 启动验证(succeeded)
|
||||||
|
|
||||||
|
每个 worker startup log 都有:
|
||||||
|
```
|
||||||
|
[2026-05-13 08:29:xx] Snapshot link controller initialized: 127.0.0.1:9998, sid=127.0.0.1:NNNNN, 96 layer bufs
|
||||||
|
```
|
||||||
|
|
||||||
|
confirmed for all 4 workers (1P + 3D). All registered 96 layer buffers (48 K + 48 V) successfully.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. 根因分析:为什么 sync 没 fire
|
||||||
|
|
||||||
|
阅读 `_attempt_d_to_p_sync` 的 early-return 链路:
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def _attempt_d_to_p_sync(...):
|
||||||
|
if not config.enable_d_to_p_sync:
|
||||||
|
return None
|
||||||
|
source_d_url = decode_session.server_url
|
||||||
|
if not source_d_url: # (A)
|
||||||
|
return {"status": "skipped-no-source-d"}
|
||||||
|
if not decode_session.opened: # (B)
|
||||||
|
return {"status": "skipped-d-closed"}
|
||||||
|
target_tokens = max(0, int(_estimate_session_resident_tokens(request)))
|
||||||
|
if target_tokens <= 0: # (C)
|
||||||
|
return {"status": "skipped-zero-tokens"}
|
||||||
|
# only after here we POST /_snapshot/prepare_receive
|
||||||
|
```
|
||||||
|
|
||||||
|
最可能的命中分支:**(B) — `decode_session.opened == False`**。
|
||||||
|
|
||||||
|
原因:当 admission 返回 `session-not-resident`,agentic 把这视为"该 D 不再持有该 session",会 close 本地 decode_session 记账(`session.opened = False`),然后才走到 fallback / seeded_router。所以到 `_invoke_kvcache_seeded_router` 时,`decode_session.opened` 已经是 False,sync 直接跳过。
|
||||||
|
|
||||||
|
**这意味着我设计 `_attempt_d_to_p_sync` 的入口条件错了**:
|
||||||
|
- 错误假设:reseed 时 D 仍然 open,可以从那个 D dump
|
||||||
|
- 正确事实:admission rejection 触发 session 关闭 → reseed 时 D 已 close → 没有 KV 可 dump
|
||||||
|
|
||||||
|
要让 D→P 真正在这个场景下工作,需要其中之一:
|
||||||
|
- **不在 admission rejection 时立刻 close decode_session** —— 给 D→P sync 一个抢救窗口
|
||||||
|
- **改去探测 D-side 的 SessionAwareCache 中是否还有该 session 的 slot** —— 即使 agentic 端记账为 closed,D 端可能还没 evict
|
||||||
|
- **在 D 端 SessionAwareCache.release_session 之前插入 D→P push** —— D-driven 主动模式(设计文档 §2.5 提到的,但本期没实现)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 4. 假设证实 / 证伪
|
## 4. 假设证实 / 证伪
|
||||||
|
|
||||||
填充模板:
|
|
||||||
|
|
||||||
### H1 (main): E4 TTFT p99 ≤ E1 TTFT p99 = 88.6s
|
### H1 (main): E4 TTFT p99 ≤ E1 TTFT p99 = 88.6s
|
||||||
|
|
||||||
- **Verdict**: TBD
|
- **Verdict**: **N/A — not testable in this run**
|
||||||
- **Evidence**: TBD
|
- 原因:D→P sync 未实际 fire,E4 本质退化为 E3-with-fix-A 的行为;又因吞吐崩溃在 43% 中止,无完整 summary 与 E1 对照
|
||||||
- **解释**: TBD
|
|
||||||
|
|
||||||
### H2: E4 reseed 路径 TTFT 中位 < E3 reseed 路径 TTFT 中位
|
### H2: E4 reseed-mode TTFT < E3 reseed-mode TTFT
|
||||||
|
|
||||||
- **Verdict**: **N/A**(E3 实验未完成提取出可用 reseed-mode 中位数;E4 中如 reseed 未触发则也无法直接比较)
|
- **Verdict**: **N/A**
|
||||||
- **解释**: 在当前工作负载下,KVC + load-floor 让 reseed 路径基本不被触发。H2 的验证需要在高压力负载下重做
|
|
||||||
|
|
||||||
### H3: E4 成功数 ≥ 0.85 × E3 成功数
|
### H3: E4 success ≥ 0.85 × E3 success
|
||||||
|
|
||||||
- **Verdict**: TBD
|
- **Verdict**: **N/A**(E3 当初也未完成,无 baseline)
|
||||||
- **Evidence**: TBD
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 5. 知识沉淀(暂时空)
|
## 5. 真正学到的东西
|
||||||
|
|
||||||
将在跑完后填:
|
| # | 学习 | 行动 |
|
||||||
- D→P 工程踩坑 / 设计修正
|
|---|---|---|
|
||||||
- workload-dependent 行为
|
| 1 | D→P RDMA link 工作正常(host + GPU,phase 1/1b smoke) | ✅ 维持 |
|
||||||
- 后续 follow-up 建议
|
| 2 | SGLang 集成 RPC 工作正常(smoke 验证) | ✅ 维持 |
|
||||||
|
| 3 | agentic `_attempt_d_to_p_sync` 入口条件设错 | ⏳ 改入口逻辑或改成 D-driven 主动模式 |
|
||||||
|
| 4 | 缺少 D→P 路径的 structural log | ⏳ 加 `structural/d-to-p-sync.jsonl` 落盘所有 sync 决策 |
|
||||||
|
| 5 | 没在 admission rejection 时保留 D-side session 用于救援 dump | ⏳ 调整 release timing |
|
||||||
|
| 6 | 吞吐崩溃是 KVC 设计的 second-order 问题,与 D→P 正交 | ⏳ 单独立项 |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 6. 跑完后下一步建议
|
## 6. 后续工作(按优先级)
|
||||||
|
|
||||||
### 必做
|
### P1(必做,让 D→P 真正可观测 + 可触发)
|
||||||
- [ ] 用 `scripts/analyze_e4_d_to_p.py` 输出 H1/H3 verdict
|
|
||||||
- [ ] 跑 high-pressure E4-bis:concurrency=64 或 mem-fraction-static=0.4,强制 reseed 触发
|
|
||||||
- [ ] 跑 E4-ablate:`--enable-d-to-p-sync` 但 D 端人为返回 fail → 隔离 D→P 边际效益
|
|
||||||
|
|
||||||
### 推荐
|
1. **加 structural log channel `structural/d-to-p-sync.jsonl`** —— `_attempt_d_to_p_sync` 每次决策落盘一条记录
|
||||||
- [ ] 长 trace(全量 inferact 而非 50-sess)下重跑 E4
|
2. **修正入口条件**:把 `decode_session.opened` 检查 relax 成"曾经 open 过 + 服务器仍有可能 hold KV"
|
||||||
- [ ] 多节点跨网 RDMA 配置
|
3. **或:D-driven 主动模式** —— D 在 `cache_finished_req` 完成后主动 enqueue snapshot push 给 P(async background)
|
||||||
- [ ] D→P snapshot **主动模式**:D 在 cache_finished_req 后异步预推(vs 当前 reseed-triggered 被动模式)
|
4. **加 GET `/_snapshot/info` endpoint** —— 让 agentic 直接查 D 端是否还有该 session
|
||||||
|
|
||||||
|
### P2(验证 D→P 效益)
|
||||||
|
|
||||||
|
5. 重跑 E4 + P1 fixes
|
||||||
|
6. 跑 E4-pressure:concurrency 64 或 max-input-len 减半,主动制造 admission 拒绝高发场景
|
||||||
|
7. 跑 E4-ablate:D→P prepare 后人为不 push,隔离 D→P transfer 的边际效益
|
||||||
|
|
||||||
|
### P3(基础设施)
|
||||||
|
|
||||||
|
8. 解决 E4 在 43% 进度时的吞吐崩溃。这与 D→P 正交,但只要它存在就影响所有后续 E4 类实验的可比性
|
||||||
|
9. 与 docs/KVC_EVICTION_GRANULARITY_DESIGN_ZH.md 提出的 block-level evict refactor 联动
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
**核心句**:E4 的设计已落地、跑着。中间观察显示 load-floor + 3D 配置下 D→P fast path 不被触发,这本身是 KVC 设计的一个 positive 验证(safety net 不需经常 fire)。完整数据待 sweep 完成后填充。
|
## 7. 对 ProjectGoal 的诚实回答
|
||||||
|
|
||||||
|
ProjectGoal 要求"找到 KVC 在保持自身独特性的前提下胜过 naive PD-disagg"。E4 没有证实也没证伪。
|
||||||
|
|
||||||
|
**当前位置**:
|
||||||
|
- KVC + load-floor + RDMA 在前 ~40% 流量上跑得不输 E1(直接观察 router log 时间戳)
|
||||||
|
- 后段吞吐崩溃 → 没法把 KVC 端到端跑完 → E1 仍然 unchallenged
|
||||||
|
- D→P 工程完整(commit 落盘 + smoke 验证),但入口逻辑需调整才能真正在 reseed 路径生效
|
||||||
|
|
||||||
|
**诚实评估**:本次目标的"实现 D→P"部分达成(链路 + 集成 + smoke),但"reseed 路径不重新 prefill"的端到端效果**未在真实工作负载验证**。下一步应优先实施 P1 中的 instrumentation + 入口条件修正,然后重跑。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**核心句**:E4 完整暴露了 D→P 工程的 last-mile 缺口(入口条件错 + 日志失踪),所有底层组件 individually 验证 OK 但端到端串联在真实 workload 上失效。这是个明确、可修复的工程问题,不是设计层面的死结。
|
||||||
|
|||||||
@@ -2121,22 +2121,37 @@ async def _attempt_d_to_p_sync(
|
|||||||
|
|
||||||
Returns a dict with status info on success/skip, or ``None`` on a
|
Returns a dict with status info on success/skip, or ``None`` on a
|
||||||
non-recoverable error. The caller falls back to normal re-prefill on
|
non-recoverable error. The caller falls back to normal re-prefill on
|
||||||
any failure.
|
any failure. Each path emits a structural-log line so we can forensic
|
||||||
|
why sync skipped vs succeeded vs failed.
|
||||||
"""
|
"""
|
||||||
if not config.enable_d_to_p_sync:
|
if not config.enable_d_to_p_sync:
|
||||||
return None
|
return None
|
||||||
source_d_url = decode_session.server_url
|
source_d_url = decode_session.server_url
|
||||||
|
sid = request.session_id
|
||||||
|
rid = request.request_id
|
||||||
if not source_d_url:
|
if not source_d_url:
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "skipped", "stage": "entry", "sid": sid, "rid": rid,
|
||||||
|
"reason": "no-source-d"},
|
||||||
|
)
|
||||||
return {"status": "skipped-no-source-d"}
|
return {"status": "skipped-no-source-d"}
|
||||||
if not decode_session.opened:
|
# NB: do NOT gate on decode_session.opened. By the time we reach the
|
||||||
return {"status": "skipped-d-closed"}
|
# fallback seeded_router, agentic has already flipped that flag to False
|
||||||
# Compose token list for radix insert: we don't have the actual token_ids
|
# in response to admission rejection. But the D-side scheduler's
|
||||||
# on the agentic side in a stable form; use the request's prompt_token_ids
|
# SessionAwareCache may STILL hold the session resident (release_session
|
||||||
# via the residency bookkeeping. For now we use a length proxy.
|
# is only called explicitly, not from admission events). Let D be the
|
||||||
|
# source of truth via its own snapshot_dump response.
|
||||||
target_tokens = max(0, int(_estimate_session_resident_tokens(request)))
|
target_tokens = max(0, int(_estimate_session_resident_tokens(request)))
|
||||||
if target_tokens <= 0:
|
if target_tokens <= 0:
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "skipped", "stage": "entry", "sid": sid, "rid": rid,
|
||||||
|
"reason": "zero-target-tokens"},
|
||||||
|
)
|
||||||
return {"status": "skipped-zero-tokens"}
|
return {"status": "skipped-zero-tokens"}
|
||||||
|
|
||||||
|
t_prep0 = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
prep_resp = await client.post(
|
prep_resp = await client.post(
|
||||||
f"{prefill_url}/_snapshot/prepare_receive",
|
f"{prefill_url}/_snapshot/prepare_receive",
|
||||||
@@ -2149,10 +2164,23 @@ async def _attempt_d_to_p_sync(
|
|||||||
prep_resp.raise_for_status()
|
prep_resp.raise_for_status()
|
||||||
prep = prep_resp.json()
|
prep = prep_resp.json()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "failed", "stage": "prepare", "sid": sid, "rid": rid,
|
||||||
|
"error": repr(exc)[:200]},
|
||||||
|
)
|
||||||
return {"status": "prepare-failed", "error": repr(exc)}
|
return {"status": "prepare-failed", "error": repr(exc)}
|
||||||
|
t_prep1 = time.perf_counter()
|
||||||
if not prep.get("ok"):
|
if not prep.get("ok"):
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "skipped", "stage": "prepare", "sid": sid, "rid": rid,
|
||||||
|
"reason": prep.get("reason"),
|
||||||
|
"prepare_dur_ms": round((t_prep1 - t_prep0) * 1000, 2)},
|
||||||
|
)
|
||||||
return {"status": "prepare-not-ok", "reason": prep.get("reason")}
|
return {"status": "prepare-not-ok", "reason": prep.get("reason")}
|
||||||
|
|
||||||
|
t_dump0 = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
dump_resp = await client.post(
|
dump_resp = await client.post(
|
||||||
f"{source_d_url}/_snapshot/dump",
|
f"{source_d_url}/_snapshot/dump",
|
||||||
@@ -2170,8 +2198,21 @@ async def _attempt_d_to_p_sync(
|
|||||||
dump_resp.raise_for_status()
|
dump_resp.raise_for_status()
|
||||||
dump = dump_resp.json()
|
dump = dump_resp.json()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "failed", "stage": "dump", "sid": sid, "rid": rid,
|
||||||
|
"error": repr(exc)[:200]},
|
||||||
|
)
|
||||||
return {"status": "dump-failed", "error": repr(exc)}
|
return {"status": "dump-failed", "error": repr(exc)}
|
||||||
|
t_dump1 = time.perf_counter()
|
||||||
if not dump.get("ok"):
|
if not dump.get("ok"):
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "skipped", "stage": "dump", "sid": sid, "rid": rid,
|
||||||
|
"reason": dump.get("reason"),
|
||||||
|
"dump_dur_ms": round((t_dump1 - t_dump0) * 1000, 2),
|
||||||
|
"kv_committed_len": int(dump.get("kv_committed_len", 0))},
|
||||||
|
)
|
||||||
return {"status": "dump-not-ok", "reason": dump.get("reason"),
|
return {"status": "dump-not-ok", "reason": dump.get("reason"),
|
||||||
"bytes_pushed": dump.get("bytes_pushed", 0)}
|
"bytes_pushed": dump.get("bytes_pushed", 0)}
|
||||||
|
|
||||||
@@ -2193,9 +2234,16 @@ async def _attempt_d_to_p_sync(
|
|||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "skipped", "stage": "post-dump", "sid": sid, "rid": rid,
|
||||||
|
"reason": "no-input-token-ids",
|
||||||
|
"bytes_pushed": int(dump.get("bytes_pushed", 0))},
|
||||||
|
)
|
||||||
return {"status": "no-tokens-discard", "bytes_pushed": dump.get("bytes_pushed", 0)}
|
return {"status": "no-tokens-discard", "bytes_pushed": dump.get("bytes_pushed", 0)}
|
||||||
|
|
||||||
n = min(len(tokens), len(prep["slot_indices"]))
|
n = min(len(tokens), len(prep["slot_indices"]))
|
||||||
|
t_fin0 = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
fin_resp = await client.post(
|
fin_resp = await client.post(
|
||||||
f"{prefill_url}/_snapshot/finalize_ingest",
|
f"{prefill_url}/_snapshot/finalize_ingest",
|
||||||
@@ -2209,11 +2257,35 @@ async def _attempt_d_to_p_sync(
|
|||||||
fin_resp.raise_for_status()
|
fin_resp.raise_for_status()
|
||||||
fin = fin_resp.json()
|
fin = fin_resp.json()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "failed", "stage": "finalize", "sid": sid, "rid": rid,
|
||||||
|
"error": repr(exc)[:200],
|
||||||
|
"bytes_pushed": int(dump.get("bytes_pushed", 0))},
|
||||||
|
)
|
||||||
return {"status": "finalize-failed", "error": repr(exc),
|
return {"status": "finalize-failed", "error": repr(exc),
|
||||||
"bytes_pushed": dump.get("bytes_pushed", 0)}
|
"bytes_pushed": dump.get("bytes_pushed", 0)}
|
||||||
|
t_fin1 = time.perf_counter()
|
||||||
if not fin.get("ok"):
|
if not fin.get("ok"):
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "skipped", "stage": "finalize", "sid": sid, "rid": rid,
|
||||||
|
"reason": fin.get("reason"),
|
||||||
|
"bytes_pushed": int(dump.get("bytes_pushed", 0))},
|
||||||
|
)
|
||||||
return {"status": "finalize-not-ok", "reason": fin.get("reason"),
|
return {"status": "finalize-not-ok", "reason": fin.get("reason"),
|
||||||
"bytes_pushed": dump.get("bytes_pushed", 0)}
|
"bytes_pushed": dump.get("bytes_pushed", 0)}
|
||||||
|
await _structural_emit(
|
||||||
|
"d-to-p-sync.jsonl",
|
||||||
|
{"event": "ok", "sid": sid, "rid": rid,
|
||||||
|
"bytes_pushed": int(dump.get("bytes_pushed", 0)),
|
||||||
|
"kv_committed_len": int(dump.get("kv_committed_len", 0)),
|
||||||
|
"inserted_prefix_len": int(fin.get("inserted_prefix_len", 0)),
|
||||||
|
"prepare_dur_ms": round((t_prep1 - t_prep0) * 1000, 2),
|
||||||
|
"dump_dur_ms": round((t_dump1 - t_dump0) * 1000, 2),
|
||||||
|
"finalize_dur_ms": round((t_fin1 - t_fin0) * 1000, 2),
|
||||||
|
"snapshot_session_id": prep.get("snapshot_session_id")},
|
||||||
|
)
|
||||||
return {
|
return {
|
||||||
"status": "ok",
|
"status": "ok",
|
||||||
"bytes_pushed": int(dump.get("bytes_pushed", 0)),
|
"bytes_pushed": int(dump.get("bytes_pushed", 0)),
|
||||||
|
|||||||
Reference in New Issue
Block a user