Files
xtrain/docs/07-distributed.md
Gahow Wang 0131f05b26 docs: Phase T8 — distributed data parallel
Design doc for the NCCL DDP path: comm bootstrap (rank-0 UniqueId + grouped
CommInitRank), thread-per-GPU launch model (Var is !Send), all-reduce-then-
local-step scheme (in-place fp32 AllReduce on .grad() + /world, each rank steps
its own GpuAdamW), why params stay consistent (NCCL bit-identical reduce + same
init/state), batch sharding math vs single-GPU, verification plan + scaling
table. Lists TP/PP/ZeRO/bf16-comm as out-of-scope follow-ups.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 17:15:49 +08:00

158 lines
9.5 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Phase T8: Distributed Data Parallel (DDP) — Design Document
## Goal
T7 把单卡训练榨到 **~8500 tok/s**(梯度常驻 deviceAdamW + grad-norm 都在 device 侧)。
T8 的目标是**把训练 scale 到 dash5 的多张 RTX 5090**:用 **NCCL 数据并行**——每个 rank 跑 global batch
的一个分片,反向后把每个参数的梯度在显存里 **all-reduce 求和、按 world size 取均值**,然后各 rank 用
**自己**那份(已同步的)优化器跑 `GpuAdamW.step`。同 init + 同优化器超参/状态 ⇒ 参数永远逐位一致,
无需再同步权重。
验收(两条都要):
1. **正确性**DDP 必须对住单卡。单卡跑 K 步 global batch BDDP 用 2 rank 各跑同一批数据同序的 B/2 →
loss 轨迹与单卡在紧容差内吻合(纯梯度平均,理论上近乎一致);且一步后断言两个 rank 的参数完全相同。
2. **吞吐**:固定每卡 workload测 1/2/4 卡的 tok/s → 近线性 scaling给表。
**本 Phase 范围内只做 DDP**。tensor/pipeline 并行、sharded optimizer 是可选 bonus**不做**model 太小,
当前学习目标是把 DDP 这条最朴素也最核心的多卡链路吃透;切分式并行留待 model 放大)。导出回流 xserv 是 T9。
## Module Layout
```
crates/xtrain-distributed/ # 新 crate镜像 xserv-distributed
├── build.rs # nvcc 探测 → no_cuda cfg有 nvcc 时链 -lnccl -lcudart同 xserv build.rs
├── Cargo.toml # 依赖 cuda/tensor/autodiff/model/optim/train
├── src/ffi.rs # NCCL FFIGetUniqueId/CommInitRank/AllReduce/CommDestroy/Group{Start,End}
├── src/lib.rs # DdpContextcomm bootstrap + all_reduce_average_grads+ get_unique_id
├── src/ddp.rs # DDP 训练 steptrain_rank+ thread-per-GPU launcher + 确定性 build_model
└── src/bin/train_ddp.rs # 多 rank 启动器 / 吞吐 driverCUDA_VISIBLE_DEVICES 选卡)
crates/xtrain-distributed/tests/
└── ddp_correctness.rs # 2 卡 vs 单卡 loss 对拍 + 跨 rank 参数一致 + 1/2/4 卡吞吐表
Cargo.toml # workspace members += xtrain-distributed
docs/07-distributed.md # 本文
```
整个 crate 用 `#![cfg(not(no_cuda))]` 在 crate 根门控:本地(无 nvcccrate 编译为空,不引用任何 NCCL 符号,
`cargo check` 直接过dash5 上 `not(no_cuda)` 全量编译并链接 NCCL。bin 另带一个 `#[cfg(no_cuda)]` stub main。
## Key Design Decisions
### ① NCCL comm bootstraprank 0 发 UniqueId
NCCL 的握手:**rank 0 调 `ncclGetUniqueId`** 生成一个 128 字节不透明 id**带外**分发给所有 rank每个 rank
`ncclCommInitRank(comm, world, id, rank)` 加入通信组。`ncclUniqueId` 按 NCCL ABI **按值传递**128 字节 struct
FFI 里 `#[repr(C)] struct NcclUniqueId { internal: [c_char;128] }``ncclCommInitRank``commid` 参数直接传值。
并发 init 必须用 `ncclGroupStart()/ncclGroupEnd()` 包住,否则多个线程同时 `CommInitRank` 会互相等待握手而死锁——
group 让它们 rendezvous。这套与 xserv-distributed 的 `TpContext::init` 完全一致。
### ② Launch modelthread-per-GPU与 xserv 一致)
**单进程、每卡一个 OS 线程**,线程启动先 `cudaSetDevice(dev)` 绑定本卡。选这个模型的硬约束xtrain 的
autograd `Var``Rc<RefCell<…>>`**不是 `Send`**,整张计算图不能跨线程。所以**每个 rank 线程在闭包内本地
`build_model` 自己的 `Var` 图**——跨线程边界的只有 `UniqueId``Copy`)、标量 config、和 `&Corpus`(只读共享,`Sync`)。
`launch()``std::thread::scope`rank 0 发 id → 每线程 `DdpContext::init` → 本地建模 → `train_rank` → join。
`DdpContext` 持有 `comm`(裸指针),`unsafe impl Send` 因为它被恰好一个 rank 线程独占。
为什么不用多进程torchrun 式):单机多卡,单进程多线程改动最小、无 IPCUniqueId 跨线程就是 move 一个 `Copy`
struct跨进程才需要写文件/env 分发。多进程留待真正跨节点。
### ③ all-reduce-then-local-step梯度在显存里平均各 rank 各自 step
T7 起**梯度常驻 device**`Var::grad()` 是 device tensor。DDP 的通信因此极简——**直接对每个参数的 `.grad()`
device buffer 做 in-place AllReduce**,零 host 往返:
```rust
// DdpContext::all_reduce_average_grads(params)
for p in params { if let Some(g) = p.grad() {
ncclAllReduce(g.data_ptr(), g.data_ptr(), g.numel(), NCCL_FLOAT32, NCCL_SUM, comm, null_stream);
}}
for p in params { if let Some(g) = p.grad() {
launch_scale_inplace_f32(g.data_ptr(), 1.0/world, g.numel(), null_stream); // 复用 T7 的 kernel
}}
```
AllReduce 用 **fp32 + Sum**(梯度就是 fp32发在 **null/legacy stream** 上——xtrain 每个 kernel 都在 null stream
`std::ptr::null_mut()`launch所以 AllReduce 自然排在产出梯度的 backward kernel 之后、消费它的 scale/optimizer
kernel 之前,**无需额外 barrier**。平均直接复用 T7 的 `launch_scale_inplace_f32`grad-clip 用的同一个 kernel
之后**每个 rank 各自跑 `GpuAdamW.step`**——不再同步权重。
### ④ 为什么参数保持一致
三个充分条件:(a) **同 init**:所有 rank含单卡 baseline用同一个确定性 `build_model`(同 LCG 种子),起点逐位相同;
(b) **同梯度**NCCL AllReduce 对参与的所有 rank **返回逐位相同的归约结果**(这是 NCCL 的保证平均、clip-norm
device reduction确定性、AdamW kernel 都是确定性的 → 每步喂给优化器的梯度逐位相同;(c) **同优化器状态**
每个 rank 各自维护 m/v但因为起点相同、每步输入相同、超参相同状态演化也相同。
⇒ 各 rank 参数**逐位一致**(测试里断言 `max|p0-p1| == 0.0`),不需要任何权重再同步。
> 对比单卡DDP 与单卡的最终参数只在 **fp 容差**内一致(`<1e-3` rel不逐位——因为求和顺序不同
> (单卡 tape 顺序 SUM B 个DDP 各 rank 先 SUM 分片再 NCCL 跨 rank SUMfp 加法不结合。跨 rank 才逐位。
### ⑤ Batch sharding对住单卡的均值
单卡 loop`batch_size=B` 个序列各 forward+backwardtape **SUM**`Σ_B`,再 `clip(pre_scale=1/B)` → batch 均值。
DDP 严格对齐(保证 all-reduce 后的和与单卡逐序列一致):**每个 rank 推进同一个 RNG、抽出整批 B 个序列,但只对
分到自己的那些(`i % world == rank`)跑 forward+backward**。各 rank 的并集 == 单卡那一批同序,于是:
```
rank 本地: Σ_local分片内 b = B/world 个的和tape SUM
AllReduce: Σ_global = Σ_ranks Σ_local = 单卡的 Σ_B只是求和顺序不同
/world: Σ_global / world
clip pre_scale = 1/b_local = world/B: Σ_global/world · world/B = Σ_global / B ← 与单卡 clip(1/B) 同一个量
```
「按 world 取均值」放在通信原语里(语义清晰),剩下的 `1/b_local` 交给本就存在的 clip 前置缩放完成——
最终喂给 AdamW 的就是 **global batch 均值梯度**与单卡完全对齐。loss 日志同理:本地 loss 和跨 rank AllReduce 后
除以 B得 global 均值(每个 rank 拿到相同值)。
## 验证方法
测试 `tests/ddp_correctness.rs``#[cfg(not(no_cuda))]`<2 卡自动 skip用合成语料无需 tokenizer/数据文件
### 正确性:`ddp_matches_single_gpu_and_params_consistent`
同一 config + 合成语料 20
- **(a) loss 对拍**单卡 baselineworld=1global batch 8vs 2-rank DDP 4)→ 整条 loss 轨迹 `max_rel < 1e-3`
- **(b) rank 参数一致**断言 `max|p_rank0 - p_rank1| == 0.0`逐位NCCL 保证 + 确定性 step)。
- **(c) DDP vs 单卡参数**`max rel|Δp| < 1e-3`fp 求和顺序差不逐位)。
### 吞吐:`ddp_throughput_scaling`
固定**每卡 batch=8**batch world 放大 rank 成本不变 world {1,2,4} global tok/s打印
```
GPUs | tok/s (global) | speedup
1 | ... | 1.00x
2 | ... | ~2x
4 | ... | ~4x
```
近线性即过tiny model latency-bound + 跨卡通信开销会让大 world 略低于理想)。
### dash5 实跑
```bash
export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
# 选空闲卡dash5 共享,先看 nvidia-smi
CUDA_VISIBLE_DEVICES=<idle GPUs> \
cargo test -p xtrain-distributed --release -- --nocapture --test-threads=1
# 真训练 / 吞吐 driver
CUDA_VISIBLE_DEVICES=0,1 cargo run -p xtrain-distributed --release --bin train_ddp -- \
100 64 16 /opt/wjh/models/gpt2/tokenizer.json data/tinystories-valid-3mb.txt
```
实测数字回填见 xtrain.md T8 note / commit
## 不做(本 Phase 范围外,记为 follow-up
- **Tensor / pipeline 并行**要切权重layer / layer 通信 tiny model 收益小改动大 model 放大再做
- **Sharded optimizerZeRO**每卡只存 1/world 的优化器状态当前 model 优化器状态很小不是瓶颈
- **bf16 AllReduce / 通信压缩**fp32 已够且会动数值正确性 T7 同理由延后)。
- **多进程 / 跨节点 bootstrap**单机多卡用线程模型足够跨节点再上文件/env 分发 UniqueId