Design doc for the NCCL DDP path: comm bootstrap (rank-0 UniqueId + grouped CommInitRank), thread-per-GPU launch model (Var is !Send), all-reduce-then- local-step scheme (in-place fp32 AllReduce on .grad() + /world, each rank steps its own GpuAdamW), why params stay consistent (NCCL bit-identical reduce + same init/state), batch sharding math vs single-GPU, verification plan + scaling table. Lists TP/PP/ZeRO/bf16-comm as out-of-scope follow-ups. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
9.5 KiB
Phase T8: Distributed Data Parallel (DDP) — Design Document
Goal
T7 把单卡训练榨到 ~8500 tok/s(梯度常驻 device,AdamW + grad-norm 都在 device 侧)。
T8 的目标是把训练 scale 到 dash5 的多张 RTX 5090:用 NCCL 数据并行——每个 rank 跑 global batch
的一个分片,反向后把每个参数的梯度在显存里 all-reduce 求和、按 world size 取均值,然后各 rank 用
自己那份(已同步的)优化器跑 GpuAdamW.step。同 init + 同优化器超参/状态 ⇒ 参数永远逐位一致,
无需再同步权重。
验收(两条都要):
- 正确性:DDP 必须对住单卡。单卡跑 K 步 global batch B,DDP 用 2 rank 各跑同一批数据同序的 B/2 → loss 轨迹与单卡在紧容差内吻合(纯梯度平均,理论上近乎一致);且一步后断言两个 rank 的参数完全相同。
- 吞吐:固定每卡 workload,测 1/2/4 卡的 tok/s → 近线性 scaling,给表。
本 Phase 范围内只做 DDP。tensor/pipeline 并行、sharded optimizer 是可选 bonus,不做(model 太小, 当前学习目标是把 DDP 这条最朴素也最核心的多卡链路吃透;切分式并行留待 model 放大)。导出回流 xserv 是 T9。
Module Layout
crates/xtrain-distributed/ # 新 crate(镜像 xserv-distributed)
├── build.rs # nvcc 探测 → no_cuda cfg;有 nvcc 时链 -lnccl -lcudart(同 xserv build.rs)
├── Cargo.toml # 依赖 cuda/tensor/autodiff/model/optim/train
├── src/ffi.rs # NCCL FFI:GetUniqueId/CommInitRank/AllReduce/CommDestroy/Group{Start,End}
├── src/lib.rs # DdpContext(comm bootstrap + all_reduce_average_grads)+ get_unique_id
├── src/ddp.rs # DDP 训练 step(train_rank)+ thread-per-GPU launcher + 确定性 build_model
└── src/bin/train_ddp.rs # 多 rank 启动器 / 吞吐 driver(CUDA_VISIBLE_DEVICES 选卡)
crates/xtrain-distributed/tests/
└── ddp_correctness.rs # 2 卡 vs 单卡 loss 对拍 + 跨 rank 参数一致 + 1/2/4 卡吞吐表
Cargo.toml # workspace members += xtrain-distributed
docs/07-distributed.md # 本文
整个 crate 用 #![cfg(not(no_cuda))] 在 crate 根门控:本地(无 nvcc)crate 编译为空,不引用任何 NCCL 符号,
cargo check 直接过;dash5 上 not(no_cuda) 全量编译并链接 NCCL。bin 另带一个 #[cfg(no_cuda)] stub main。
Key Design Decisions
① NCCL comm bootstrap(rank 0 发 UniqueId)
NCCL 的握手:rank 0 调 ncclGetUniqueId 生成一个 128 字节不透明 id,带外分发给所有 rank,每个 rank
用 ncclCommInitRank(comm, world, id, rank) 加入通信组。ncclUniqueId 按 NCCL ABI 按值传递(128 字节 struct),
FFI 里 #[repr(C)] struct NcclUniqueId { internal: [c_char;128] },ncclCommInitRank 的 commid 参数直接传值。
并发 init 必须用 ncclGroupStart()/ncclGroupEnd() 包住,否则多个线程同时 CommInitRank 会互相等待握手而死锁——
group 让它们 rendezvous。这套与 xserv-distributed 的 TpContext::init 完全一致。
② Launch model:thread-per-GPU(与 xserv 一致)
单进程、每卡一个 OS 线程,线程启动先 cudaSetDevice(dev) 绑定本卡。选这个模型的硬约束:xtrain 的
autograd Var 是 Rc<RefCell<…>>,不是 Send,整张计算图不能跨线程。所以每个 rank 线程在闭包内本地
build_model 自己的 Var 图——跨线程边界的只有 UniqueId(Copy)、标量 config、和 &Corpus(只读共享,Sync)。
launch() 用 std::thread::scope:rank 0 发 id → 每线程 DdpContext::init → 本地建模 → train_rank → join。
DdpContext 持有 comm(裸指针),unsafe impl Send 因为它被恰好一个 rank 线程独占。
为什么不用多进程(torchrun 式):单机多卡,单进程多线程改动最小、无 IPC;UniqueId 跨线程就是 move 一个 Copy
struct,跨进程才需要写文件/env 分发。多进程留待真正跨节点。
③ all-reduce-then-local-step:梯度在显存里平均,各 rank 各自 step
T7 起梯度常驻 device(Var::grad() 是 device tensor)。DDP 的通信因此极简——直接对每个参数的 .grad()
device buffer 做 in-place AllReduce,零 host 往返:
// DdpContext::all_reduce_average_grads(params)
for p in params { if let Some(g) = p.grad() {
ncclAllReduce(g.data_ptr(), g.data_ptr(), g.numel(), NCCL_FLOAT32, NCCL_SUM, comm, null_stream);
}}
for p in params { if let Some(g) = p.grad() {
launch_scale_inplace_f32(g.data_ptr(), 1.0/world, g.numel(), null_stream); // 复用 T7 的 kernel
}}
AllReduce 用 fp32 + Sum(梯度就是 fp32),发在 null/legacy stream 上——xtrain 每个 kernel 都在 null stream
(std::ptr::null_mut())launch,所以 AllReduce 自然排在产出梯度的 backward kernel 之后、消费它的 scale/optimizer
kernel 之前,无需额外 barrier。平均直接复用 T7 的 launch_scale_inplace_f32(grad-clip 用的同一个 kernel)。
之后每个 rank 各自跑 GpuAdamW.step——不再同步权重。
④ 为什么参数保持一致
三个充分条件:(a) 同 init:所有 rank(含单卡 baseline)用同一个确定性 build_model(同 LCG 种子),起点逐位相同;
(b) 同梯度:NCCL AllReduce 对参与的所有 rank 返回逐位相同的归约结果(这是 NCCL 的保证),平均、clip-norm
(device reduction,确定性)、AdamW kernel 都是确定性的 → 每步喂给优化器的梯度逐位相同;(c) 同优化器状态:
每个 rank 各自维护 m/v,但因为起点相同、每步输入相同、超参相同,状态演化也相同。
⇒ 各 rank 参数逐位一致(测试里断言 max|p0-p1| == 0.0),不需要任何权重再同步。
对比单卡:DDP 与单卡的最终参数只在 fp 容差内一致(
<1e-3rel),不逐位——因为求和顺序不同 (单卡 tape 顺序 SUM B 个;DDP 各 rank 先 SUM 分片再 NCCL 跨 rank SUM),fp 加法不结合。跨 rank 才逐位。
⑤ Batch sharding:对住单卡的均值
单卡 loop:batch_size=B 个序列各 forward+backward,tape SUM 出 Σ_B,再 clip(pre_scale=1/B) → batch 均值。
DDP 严格对齐(保证 all-reduce 后的和与单卡逐序列一致):每个 rank 推进同一个 RNG、抽出整批 B 个序列,但只对
分到自己的那些(i % world == rank)跑 forward+backward。各 rank 的并集 == 单卡那一批同序,于是:
rank 本地: Σ_local(分片内 b = B/world 个的和,tape SUM)
AllReduce: Σ_global = Σ_ranks Σ_local (= 单卡的 Σ_B,只是求和顺序不同)
/world: Σ_global / world
clip pre_scale = 1/b_local = world/B: Σ_global/world · world/B = Σ_global / B ← 与单卡 clip(1/B) 同一个量
「按 world 取均值」放在通信原语里(语义清晰),剩下的 1/b_local 交给本就存在的 clip 前置缩放完成——
最终喂给 AdamW 的就是 global batch 均值梯度,与单卡完全对齐。loss 日志同理:本地 loss 和跨 rank AllReduce 后
除以 B,得 global 均值(每个 rank 拿到相同值)。
验证方法
测试 tests/ddp_correctness.rs(#[cfg(not(no_cuda))],<2 卡自动 skip),用合成语料(无需 tokenizer/数据文件):
正确性:ddp_matches_single_gpu_and_params_consistent
同一 config + 合成语料,跑 20 步:
- (a) loss 对拍:单卡 baseline(world=1,global batch 8)vs 2-rank DDP(各 4)→ 整条 loss 轨迹
max_rel < 1e-3。 - (b) 跨 rank 参数一致:断言
max|p_rank0 - p_rank1| == 0.0(逐位,NCCL 保证 + 确定性 step)。 - (c) DDP vs 单卡参数:
max rel|Δp| < 1e-3(fp 求和顺序差,不逐位)。
吞吐:ddp_throughput_scaling
固定每卡 batch=8(batch 随 world 放大,每 rank 成本不变),测 world ∈ {1,2,4} 的 global tok/s,打印:
GPUs | tok/s (global) | speedup
1 | ... | 1.00x
2 | ... | ~2x
4 | ... | ~4x
近线性即过(tiny model latency-bound + 跨卡通信开销会让大 world 略低于理想)。
dash5 实跑
export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
# 选空闲卡(dash5 共享,先看 nvidia-smi):
CUDA_VISIBLE_DEVICES=<idle GPUs> \
cargo test -p xtrain-distributed --release -- --nocapture --test-threads=1
# 真训练 / 吞吐 driver:
CUDA_VISIBLE_DEVICES=0,1 cargo run -p xtrain-distributed --release --bin train_ddp -- \
100 64 16 /opt/wjh/models/gpt2/tokenizer.json data/tinystories-valid-3mb.txt
实测数字回填见 xtrain.md T8 note / commit。
不做(本 Phase 范围外,记为 follow-up)
- Tensor / pipeline 并行:要切权重、layer 内/跨 layer 通信,对 tiny model 收益小、改动大 → model 放大再做。
- Sharded optimizer(ZeRO):每卡只存 1/world 的优化器状态。当前 model 优化器状态很小,不是瓶颈。
- bf16 AllReduce / 通信压缩:fp32 已够,且会动数值正确性(与 T7 同理由延后)。
- 多进程 / 跨节点 bootstrap:单机多卡用线程模型足够;跨节点再上文件/env 分发 UniqueId。