Files
xtrain/docs/07-distributed.md
Gahow Wang 0131f05b26 docs: Phase T8 — distributed data parallel
Design doc for the NCCL DDP path: comm bootstrap (rank-0 UniqueId + grouped
CommInitRank), thread-per-GPU launch model (Var is !Send), all-reduce-then-
local-step scheme (in-place fp32 AllReduce on .grad() + /world, each rank steps
its own GpuAdamW), why params stay consistent (NCCL bit-identical reduce + same
init/state), batch sharding math vs single-GPU, verification plan + scaling
table. Lists TP/PP/ZeRO/bf16-comm as out-of-scope follow-ups.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 17:15:49 +08:00

9.5 KiB
Raw Permalink Blame History

Phase T8: Distributed Data Parallel (DDP) — Design Document

Goal

T7 把单卡训练榨到 ~8500 tok/s(梯度常驻 deviceAdamW + grad-norm 都在 device 侧)。 T8 的目标是把训练 scale 到 dash5 的多张 RTX 5090:用 NCCL 数据并行——每个 rank 跑 global batch 的一个分片,反向后把每个参数的梯度在显存里 all-reduce 求和、按 world size 取均值,然后各 rank 用 自己那份(已同步的)优化器跑 GpuAdamW.step。同 init + 同优化器超参/状态 ⇒ 参数永远逐位一致, 无需再同步权重。

验收(两条都要):

  1. 正确性DDP 必须对住单卡。单卡跑 K 步 global batch BDDP 用 2 rank 各跑同一批数据同序的 B/2 → loss 轨迹与单卡在紧容差内吻合(纯梯度平均,理论上近乎一致);且一步后断言两个 rank 的参数完全相同。
  2. 吞吐:固定每卡 workload测 1/2/4 卡的 tok/s → 近线性 scaling给表。

本 Phase 范围内只做 DDP。tensor/pipeline 并行、sharded optimizer 是可选 bonus不做model 太小, 当前学习目标是把 DDP 这条最朴素也最核心的多卡链路吃透;切分式并行留待 model 放大)。导出回流 xserv 是 T9。

Module Layout

crates/xtrain-distributed/                  # 新 crate镜像 xserv-distributed
├── build.rs                                # nvcc 探测 → no_cuda cfg有 nvcc 时链 -lnccl -lcudart同 xserv build.rs
├── Cargo.toml                              # 依赖 cuda/tensor/autodiff/model/optim/train
├── src/ffi.rs                              # NCCL FFIGetUniqueId/CommInitRank/AllReduce/CommDestroy/Group{Start,End}
├── src/lib.rs                              # DdpContextcomm bootstrap + all_reduce_average_grads+ get_unique_id
├── src/ddp.rs                              # DDP 训练 steptrain_rank+ thread-per-GPU launcher + 确定性 build_model
└── src/bin/train_ddp.rs                    # 多 rank 启动器 / 吞吐 driverCUDA_VISIBLE_DEVICES 选卡)
crates/xtrain-distributed/tests/
└── ddp_correctness.rs                      # 2 卡 vs 单卡 loss 对拍 + 跨 rank 参数一致 + 1/2/4 卡吞吐表
Cargo.toml                                  # workspace members += xtrain-distributed
docs/07-distributed.md                      # 本文

整个 crate 用 #![cfg(not(no_cuda))] 在 crate 根门控:本地(无 nvcccrate 编译为空,不引用任何 NCCL 符号, cargo check 直接过dash5 上 not(no_cuda) 全量编译并链接 NCCL。bin 另带一个 #[cfg(no_cuda)] stub main。

Key Design Decisions

① NCCL comm bootstraprank 0 发 UniqueId

NCCL 的握手:rank 0 调 ncclGetUniqueId 生成一个 128 字节不透明 id带外分发给所有 rank每个 rank 用 ncclCommInitRank(comm, world, id, rank) 加入通信组。ncclUniqueId 按 NCCL ABI 按值传递128 字节 struct FFI 里 #[repr(C)] struct NcclUniqueId { internal: [c_char;128] }ncclCommInitRankcommid 参数直接传值。

并发 init 必须用 ncclGroupStart()/ncclGroupEnd() 包住,否则多个线程同时 CommInitRank 会互相等待握手而死锁—— group 让它们 rendezvous。这套与 xserv-distributed 的 TpContext::init 完全一致。

② Launch modelthread-per-GPU与 xserv 一致)

单进程、每卡一个 OS 线程,线程启动先 cudaSetDevice(dev) 绑定本卡。选这个模型的硬约束xtrain 的 autograd VarRc<RefCell<…>>不是 Send,整张计算图不能跨线程。所以每个 rank 线程在闭包内本地 build_model 自己的 Var——跨线程边界的只有 UniqueIdCopy)、标量 config、和 &Corpus(只读共享,Sync)。

launch()std::thread::scoperank 0 发 id → 每线程 DdpContext::init → 本地建模 → train_rank → join。 DdpContext 持有 comm(裸指针),unsafe impl Send 因为它被恰好一个 rank 线程独占。

为什么不用多进程torchrun 式):单机多卡,单进程多线程改动最小、无 IPCUniqueId 跨线程就是 move 一个 Copy struct跨进程才需要写文件/env 分发。多进程留待真正跨节点。

③ all-reduce-then-local-step梯度在显存里平均各 rank 各自 step

T7 起梯度常驻 deviceVar::grad() 是 device tensor。DDP 的通信因此极简——直接对每个参数的 .grad() device buffer 做 in-place AllReduce,零 host 往返:

// DdpContext::all_reduce_average_grads(params)
for p in params { if let Some(g) = p.grad() {
    ncclAllReduce(g.data_ptr(), g.data_ptr(), g.numel(), NCCL_FLOAT32, NCCL_SUM, comm, null_stream);
}}
for p in params { if let Some(g) = p.grad() {
    launch_scale_inplace_f32(g.data_ptr(), 1.0/world, g.numel(), null_stream);  // 复用 T7 的 kernel
}}

AllReduce 用 fp32 + Sum(梯度就是 fp32发在 null/legacy stream 上——xtrain 每个 kernel 都在 null stream std::ptr::null_mut()launch所以 AllReduce 自然排在产出梯度的 backward kernel 之后、消费它的 scale/optimizer kernel 之前,无需额外 barrier。平均直接复用 T7 的 launch_scale_inplace_f32grad-clip 用的同一个 kernel

之后每个 rank 各自跑 GpuAdamW.step——不再同步权重。

④ 为什么参数保持一致

三个充分条件:(a) 同 init:所有 rank含单卡 baseline用同一个确定性 build_model(同 LCG 种子),起点逐位相同; (b) 同梯度NCCL AllReduce 对参与的所有 rank 返回逐位相同的归约结果(这是 NCCL 的保证平均、clip-norm device reduction确定性、AdamW kernel 都是确定性的 → 每步喂给优化器的梯度逐位相同;(c) 同优化器状态 每个 rank 各自维护 m/v但因为起点相同、每步输入相同、超参相同状态演化也相同。

⇒ 各 rank 参数逐位一致(测试里断言 max|p0-p1| == 0.0),不需要任何权重再同步。

对比单卡DDP 与单卡的最终参数只在 fp 容差内一致(<1e-3 rel不逐位——因为求和顺序不同 (单卡 tape 顺序 SUM B 个DDP 各 rank 先 SUM 分片再 NCCL 跨 rank SUMfp 加法不结合。跨 rank 才逐位。

⑤ Batch sharding对住单卡的均值

单卡 loopbatch_size=B 个序列各 forward+backwardtape SUMΣ_B,再 clip(pre_scale=1/B) → batch 均值。

DDP 严格对齐(保证 all-reduce 后的和与单卡逐序列一致):每个 rank 推进同一个 RNG、抽出整批 B 个序列,但只对 分到自己的那些(i % world == rank)跑 forward+backward。各 rank 的并集 == 单卡那一批同序,于是:

rank 本地:  Σ_local分片内 b = B/world 个的和tape SUM
AllReduce:  Σ_global = Σ_ranks Σ_local        = 单卡的 Σ_B只是求和顺序不同
/world:     Σ_global / world
clip pre_scale = 1/b_local = world/B:  Σ_global/world · world/B = Σ_global / B  ← 与单卡 clip(1/B) 同一个量

「按 world 取均值」放在通信原语里(语义清晰),剩下的 1/b_local 交给本就存在的 clip 前置缩放完成—— 最终喂给 AdamW 的就是 global batch 均值梯度与单卡完全对齐。loss 日志同理:本地 loss 和跨 rank AllReduce 后 除以 B得 global 均值(每个 rank 拿到相同值)。

验证方法

测试 tests/ddp_correctness.rs#[cfg(not(no_cuda))]<2 卡自动 skip用合成语料无需 tokenizer/数据文件):

正确性:ddp_matches_single_gpu_and_params_consistent

同一 config + 合成语料,跑 20 步:

  • (a) loss 对拍:单卡 baselineworld=1global batch 8vs 2-rank DDP各 4→ 整条 loss 轨迹 max_rel < 1e-3
  • (b) 跨 rank 参数一致:断言 max|p_rank0 - p_rank1| == 0.0逐位NCCL 保证 + 确定性 step
  • (c) DDP vs 单卡参数max rel|Δp| < 1e-3fp 求和顺序差,不逐位)。

吞吐:ddp_throughput_scaling

固定每卡 batch=8batch 随 world 放大,每 rank 成本不变),测 world ∈ {1,2,4} 的 global tok/s打印

  GPUs | tok/s (global) | speedup
     1 |          ...   |   1.00x
     2 |          ...   |   ~2x
     4 |          ...   |   ~4x

近线性即过tiny model latency-bound + 跨卡通信开销会让大 world 略低于理想)。

dash5 实跑

export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
# 选空闲卡dash5 共享,先看 nvidia-smi
CUDA_VISIBLE_DEVICES=<idle GPUs> \
  cargo test -p xtrain-distributed --release -- --nocapture --test-threads=1
# 真训练 / 吞吐 driver
CUDA_VISIBLE_DEVICES=0,1 cargo run -p xtrain-distributed --release --bin train_ddp -- \
  100 64 16 /opt/wjh/models/gpt2/tokenizer.json data/tinystories-valid-3mb.txt

实测数字回填见 xtrain.md T8 note / commit。

不做(本 Phase 范围外,记为 follow-up

  • Tensor / pipeline 并行要切权重、layer 内/跨 layer 通信,对 tiny model 收益小、改动大 → model 放大再做。
  • Sharded optimizerZeRO:每卡只存 1/world 的优化器状态。当前 model 优化器状态很小,不是瓶颈。
  • bf16 AllReduce / 通信压缩fp32 已够,且会动数值正确性(与 T7 同理由延后)。
  • 多进程 / 跨节点 bootstrap:单机多卡用线程模型足够;跨节点再上文件/env 分发 UniqueId。