# Phase T8: Distributed Data Parallel (DDP) — Design Document ## Goal T7 把单卡训练榨到 **~8500 tok/s**(梯度常驻 device,AdamW + grad-norm 都在 device 侧)。 T8 的目标是**把训练 scale 到 dash5 的多张 RTX 5090**:用 **NCCL 数据并行**——每个 rank 跑 global batch 的一个分片,反向后把每个参数的梯度在显存里 **all-reduce 求和、按 world size 取均值**,然后各 rank 用 **自己**那份(已同步的)优化器跑 `GpuAdamW.step`。同 init + 同优化器超参/状态 ⇒ 参数永远逐位一致, 无需再同步权重。 验收(两条都要): 1. **正确性**:DDP 必须对住单卡。单卡跑 K 步 global batch B,DDP 用 2 rank 各跑同一批数据同序的 B/2 → loss 轨迹与单卡在紧容差内吻合(纯梯度平均,理论上近乎一致);且一步后断言两个 rank 的参数完全相同。 2. **吞吐**:固定每卡 workload,测 1/2/4 卡的 tok/s → 近线性 scaling,给表。 **本 Phase 范围内只做 DDP**。tensor/pipeline 并行、sharded optimizer 是可选 bonus,**不做**(model 太小, 当前学习目标是把 DDP 这条最朴素也最核心的多卡链路吃透;切分式并行留待 model 放大)。导出回流 xserv 是 T9。 ## Module Layout ``` crates/xtrain-distributed/ # 新 crate(镜像 xserv-distributed) ├── build.rs # nvcc 探测 → no_cuda cfg;有 nvcc 时链 -lnccl -lcudart(同 xserv build.rs) ├── Cargo.toml # 依赖 cuda/tensor/autodiff/model/optim/train ├── src/ffi.rs # NCCL FFI:GetUniqueId/CommInitRank/AllReduce/CommDestroy/Group{Start,End} ├── src/lib.rs # DdpContext(comm bootstrap + all_reduce_average_grads)+ get_unique_id ├── src/ddp.rs # DDP 训练 step(train_rank)+ thread-per-GPU launcher + 确定性 build_model └── src/bin/train_ddp.rs # 多 rank 启动器 / 吞吐 driver(CUDA_VISIBLE_DEVICES 选卡) crates/xtrain-distributed/tests/ └── ddp_correctness.rs # 2 卡 vs 单卡 loss 对拍 + 跨 rank 参数一致 + 1/2/4 卡吞吐表 Cargo.toml # workspace members += xtrain-distributed docs/07-distributed.md # 本文 ``` 整个 crate 用 `#![cfg(not(no_cuda))]` 在 crate 根门控:本地(无 nvcc)crate 编译为空,不引用任何 NCCL 符号, `cargo check` 直接过;dash5 上 `not(no_cuda)` 全量编译并链接 NCCL。bin 另带一个 `#[cfg(no_cuda)]` stub main。 ## Key Design Decisions ### ① NCCL comm bootstrap(rank 0 发 UniqueId) NCCL 的握手:**rank 0 调 `ncclGetUniqueId`** 生成一个 128 字节不透明 id,**带外**分发给所有 rank,每个 rank 用 `ncclCommInitRank(comm, world, id, rank)` 加入通信组。`ncclUniqueId` 按 NCCL ABI **按值传递**(128 字节 struct), FFI 里 `#[repr(C)] struct NcclUniqueId { internal: [c_char;128] }`,`ncclCommInitRank` 的 `commid` 参数直接传值。 并发 init 必须用 `ncclGroupStart()/ncclGroupEnd()` 包住,否则多个线程同时 `CommInitRank` 会互相等待握手而死锁—— group 让它们 rendezvous。这套与 xserv-distributed 的 `TpContext::init` 完全一致。 ### ② Launch model:thread-per-GPU(与 xserv 一致) **单进程、每卡一个 OS 线程**,线程启动先 `cudaSetDevice(dev)` 绑定本卡。选这个模型的硬约束:xtrain 的 autograd `Var` 是 `Rc>`,**不是 `Send`**,整张计算图不能跨线程。所以**每个 rank 线程在闭包内本地 `build_model` 自己的 `Var` 图**——跨线程边界的只有 `UniqueId`(`Copy`)、标量 config、和 `&Corpus`(只读共享,`Sync`)。 `launch()` 用 `std::thread::scope`:rank 0 发 id → 每线程 `DdpContext::init` → 本地建模 → `train_rank` → join。 `DdpContext` 持有 `comm`(裸指针),`unsafe impl Send` 因为它被恰好一个 rank 线程独占。 为什么不用多进程(torchrun 式):单机多卡,单进程多线程改动最小、无 IPC;UniqueId 跨线程就是 move 一个 `Copy` struct,跨进程才需要写文件/env 分发。多进程留待真正跨节点。 ### ③ all-reduce-then-local-step:梯度在显存里平均,各 rank 各自 step T7 起**梯度常驻 device**(`Var::grad()` 是 device tensor)。DDP 的通信因此极简——**直接对每个参数的 `.grad()` device buffer 做 in-place AllReduce**,零 host 往返: ```rust // DdpContext::all_reduce_average_grads(params) for p in params { if let Some(g) = p.grad() { ncclAllReduce(g.data_ptr(), g.data_ptr(), g.numel(), NCCL_FLOAT32, NCCL_SUM, comm, null_stream); }} for p in params { if let Some(g) = p.grad() { launch_scale_inplace_f32(g.data_ptr(), 1.0/world, g.numel(), null_stream); // 复用 T7 的 kernel }} ``` AllReduce 用 **fp32 + Sum**(梯度就是 fp32),发在 **null/legacy stream** 上——xtrain 每个 kernel 都在 null stream (`std::ptr::null_mut()`)launch,所以 AllReduce 自然排在产出梯度的 backward kernel 之后、消费它的 scale/optimizer kernel 之前,**无需额外 barrier**。平均直接复用 T7 的 `launch_scale_inplace_f32`(grad-clip 用的同一个 kernel)。 之后**每个 rank 各自跑 `GpuAdamW.step`**——不再同步权重。 ### ④ 为什么参数保持一致 三个充分条件:(a) **同 init**:所有 rank(含单卡 baseline)用同一个确定性 `build_model`(同 LCG 种子),起点逐位相同; (b) **同梯度**:NCCL AllReduce 对参与的所有 rank **返回逐位相同的归约结果**(这是 NCCL 的保证),平均、clip-norm (device reduction,确定性)、AdamW kernel 都是确定性的 → 每步喂给优化器的梯度逐位相同;(c) **同优化器状态**: 每个 rank 各自维护 m/v,但因为起点相同、每步输入相同、超参相同,状态演化也相同。 ⇒ 各 rank 参数**逐位一致**(测试里断言 `max|p0-p1| == 0.0`),不需要任何权重再同步。 > 对比单卡:DDP 与单卡的最终参数只在 **fp 容差**内一致(`<1e-3` rel),不逐位——因为求和顺序不同 > (单卡 tape 顺序 SUM B 个;DDP 各 rank 先 SUM 分片再 NCCL 跨 rank SUM),fp 加法不结合。跨 rank 才逐位。 ### ⑤ Batch sharding:对住单卡的均值 单卡 loop:`batch_size=B` 个序列各 forward+backward,tape **SUM** 出 `Σ_B`,再 `clip(pre_scale=1/B)` → batch 均值。 DDP 严格对齐(保证 all-reduce 后的和与单卡逐序列一致):**每个 rank 推进同一个 RNG、抽出整批 B 个序列,但只对 分到自己的那些(`i % world == rank`)跑 forward+backward**。各 rank 的并集 == 单卡那一批同序,于是: ``` rank 本地: Σ_local(分片内 b = B/world 个的和,tape SUM) AllReduce: Σ_global = Σ_ranks Σ_local (= 单卡的 Σ_B,只是求和顺序不同) /world: Σ_global / world clip pre_scale = 1/b_local = world/B: Σ_global/world · world/B = Σ_global / B ← 与单卡 clip(1/B) 同一个量 ``` 「按 world 取均值」放在通信原语里(语义清晰),剩下的 `1/b_local` 交给本就存在的 clip 前置缩放完成—— 最终喂给 AdamW 的就是 **global batch 均值梯度**,与单卡完全对齐。loss 日志同理:本地 loss 和跨 rank AllReduce 后 除以 B,得 global 均值(每个 rank 拿到相同值)。 ## 验证方法 测试 `tests/ddp_correctness.rs`(`#[cfg(not(no_cuda))]`,<2 卡自动 skip),用合成语料(无需 tokenizer/数据文件): ### 正确性:`ddp_matches_single_gpu_and_params_consistent` 同一 config + 合成语料,跑 20 步: - **(a) loss 对拍**:单卡 baseline(world=1,global batch 8)vs 2-rank DDP(各 4)→ 整条 loss 轨迹 `max_rel < 1e-3`。 - **(b) 跨 rank 参数一致**:断言 `max|p_rank0 - p_rank1| == 0.0`(逐位,NCCL 保证 + 确定性 step)。 - **(c) DDP vs 单卡参数**:`max rel|Δp| < 1e-3`(fp 求和顺序差,不逐位)。 ### 吞吐:`ddp_throughput_scaling` 固定**每卡 batch=8**(batch 随 world 放大,每 rank 成本不变),测 world ∈ {1,2,4} 的 global tok/s,打印: ``` GPUs | tok/s (global) | speedup 1 | ... | 1.00x 2 | ... | ~2x 4 | ... | ~4x ``` 近线性即过(tiny model latency-bound + 跨卡通信开销会让大 world 略低于理想)。 ### dash5 实跑 ```bash export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH # 选空闲卡(dash5 共享,先看 nvidia-smi): CUDA_VISIBLE_DEVICES= \ cargo test -p xtrain-distributed --release -- --nocapture --test-threads=1 # 真训练 / 吞吐 driver: CUDA_VISIBLE_DEVICES=0,1 cargo run -p xtrain-distributed --release --bin train_ddp -- \ 100 64 16 /opt/wjh/models/gpt2/tokenizer.json data/tinystories-valid-3mb.txt ``` 实测数字回填见 xtrain.md T8 note / commit。 ## 不做(本 Phase 范围外,记为 follow-up) - **Tensor / pipeline 并行**:要切权重、layer 内/跨 layer 通信,对 tiny model 收益小、改动大 → model 放大再做。 - **Sharded optimizer(ZeRO)**:每卡只存 1/world 的优化器状态。当前 model 优化器状态很小,不是瓶颈。 - **bf16 AllReduce / 通信压缩**:fp32 已够,且会动数值正确性(与 T7 同理由延后)。 - **多进程 / 跨节点 bootstrap**:单机多卡用线程模型足够;跨节点再上文件/env 分发 UniqueId。