Files
xtrain/crates/xtrain-distributed/tests/ddp_correctness.rs
Gahow Wang ad82e8bf92 dist: lengthen scaling bench so NCCL init amortizes
30-step bench charged the one-time NCCL init + 4 model builds (present at world=4,
absent at world=1) against the wall clock, understating steady-state scaling
(in-loop tok/s already showed ~53k at 4 GPUs). Bump to 150 steps.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 17:18:23 +08:00

246 lines
8.7 KiB
Rust

//! DDP acceptance (Phase T8). Gated to a GPU host; skips when fewer than 2 GPUs.
//!
//! 1. **Correctness**: K steps single-GPU (world=1, global batch B) vs 2-rank DDP
//! (B/2 of the SAME data in the same order each) → loss trajectories match
//! within tight fp tolerance (it's just gradient averaging), and the two
//! ranks' parameters are identical after the run.
//! 2. **Throughput**: 1 / 2 / 4 GPU global tok/s on the SAME per-GPU workload →
//! near-linear scaling. Prints the table (run with `--nocapture`).
#![cfg(not(no_cuda))]
use std::time::Instant;
use xtrain_cuda::device;
use xtrain_distributed::{DdpConfig, DdpContext, build_model, get_unique_id, launch, train_rank};
use xtrain_model::{Config, ids_tensor};
use xtrain_optim::GpuAdamW;
use xtrain_tensor::Device;
use xtrain_train::clip::clip_grad_norm_gpu;
use xtrain_train::data::Corpus;
use xtrain_train::schedule::LrSchedule;
// A self-contained synthetic corpus so the test needs no tokenizer/data files.
fn synth_corpus(vocab: usize, n_tokens: usize) -> Corpus {
let tokens: Vec<i32> = (0..n_tokens)
.map(|i| (i * 7 + 3) as i32 % vocab as i32)
.collect();
Corpus {
tokens,
vocab_size: vocab,
}
}
fn test_config(vocab: usize) -> Config {
let mut cfg = Config::tiny();
cfg.vocab = vocab;
cfg.n_layers = 2;
cfg
}
// Single-GPU baseline: the SAME loop as the DDP rank but world=1, so the global
// batch is processed on one device. Returns (loss trace, final params on host).
fn run_single_gpu(cfg: Config, corpus: &Corpus, dcfg: &DdpConfig) -> (Vec<f32>, Vec<Vec<f32>>) {
device::set_device(0).unwrap();
let device = Device::Cuda(0);
let model = build_model(cfg, device);
let params = model.params();
let mut opt = GpuAdamW::new(dcfg.weight_decay);
let mut rng = dcfg.seed;
let inv_batch = 1.0 / dcfg.batch_size as f32;
let mut losses = Vec::new();
for step in 0..dcfg.steps {
let lr = dcfg.schedule.lr(step);
let mut loss_sum = 0.0f32;
for _ in 0..dcfg.batch_size {
let (input, target) = corpus.sample(dcfg.seq_len, &mut rng);
let ids = ids_tensor(&input, device);
let targets = ids_tensor(&target, device);
let loss = model.loss(&ids, &targets);
loss_sum += loss.value().to_device(Device::Cpu).as_slice::<f32>()[0];
loss.backward();
}
losses.push(loss_sum * inv_batch);
clip_grad_norm_gpu(&params, dcfg.max_grad_norm, inv_batch);
opt.step(lr, &params);
for p in &params {
p.zero_grad();
}
}
let host = params
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
(losses, host)
}
#[test]
fn ddp_matches_single_gpu_and_params_consistent() {
let world = 2usize;
if device::device_count().unwrap_or(0) < world as i32 {
eprintln!("skip: need >= {world} GPUs");
return;
}
let vocab = 64usize;
let cfg = test_config(vocab);
let corpus = synth_corpus(vocab, 4096);
let steps = 20usize;
let dcfg = DdpConfig {
seq_len: 32,
batch_size: 8, // global; 4 per rank with world=2
steps,
schedule: LrSchedule {
max_lr: 3e-3,
min_lr: 3e-4,
warmup: 3,
total: steps,
},
weight_decay: 0.1,
max_grad_norm: 1.0,
log_every: 1_000_000, // silence per-step logging in the test
seed: 7,
};
// Single-GPU baseline (world=1) over the global batch.
let (single_losses, single_params) = run_single_gpu(cfg, &corpus, &dcfg);
// 2-rank DDP over the SAME corpus/config; returns per-rank (losses, params).
let devices = [0u32, 1u32];
let id = get_unique_id();
let results: Vec<(Vec<f32>, Vec<Vec<f32>>)> = std::thread::scope(|s| {
let handles: Vec<_> = devices
.iter()
.enumerate()
.map(|(rank, &dev)| {
let dcfg = dcfg.clone();
let corpus = &corpus;
s.spawn(move || {
let ctx = DdpContext::init(rank, world, id, dev);
let device = Device::Cuda(dev);
let model = build_model(cfg, device);
let losses = train_rank(&ctx, &model, device, corpus, &dcfg);
let host = model
.params()
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect::<Vec<_>>();
(losses, host)
})
})
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
});
let (ddp_losses, ddp_p0) = &results[0];
let (_, ddp_p1) = &results[1];
// (a) DDP loss trajectory matches single-GPU within tight tolerance.
let mut max_rel = 0.0f32;
for (s, d) in single_losses.iter().zip(ddp_losses) {
let rel = (s - d).abs() / s.abs().max(1e-6);
max_rel = max_rel.max(rel);
}
println!(
"DDP vs single-GPU loss: single[last]={:.6} ddp[last]={:.6} max_rel={max_rel:.2e}",
single_losses.last().unwrap(),
ddp_losses.last().unwrap()
);
assert!(
max_rel < 1e-3,
"DDP loss trajectory diverged from single-GPU: max_rel {max_rel:.3e}"
);
// (b) Cross-rank parameter identity (same init + same averaged grad + same
// optimizer state ⇒ identical params).
let mut max_pdiff = 0.0f32;
for (a, b) in ddp_p0.iter().zip(ddp_p1) {
for (x, y) in a.iter().zip(b) {
max_pdiff = max_pdiff.max((x - y).abs());
}
}
println!("cross-rank max |param diff| = {max_pdiff:.3e}");
assert_eq!(max_pdiff, 0.0, "ranks' params drifted apart");
// (c) DDP final params match single-GPU final params within fp tolerance.
// Looser than (a)/(b): DDP and single-GPU differ only in the gradient SUMMATION
// ORDER (single-GPU sums B sequences in tape order; DDP sums per-rank shards
// then NCCL-sums across ranks). fp addition isn't associative, so that tiny
// per-step rounding compounds over the AdamW steps — a few e-3 relative on
// individual params is expected and benign. The loss-trajectory match (a, ~1e-7)
// and bit-identical cross-rank params (b, ==0) are the load-bearing checks.
let mut max_sdiff = 0.0f32;
for (a, b) in ddp_p0.iter().zip(&single_params) {
for (x, y) in a.iter().zip(b) {
max_sdiff = max_sdiff.max((x - y).abs() / y.abs().max(1e-6));
}
}
println!("DDP vs single-GPU max rel |param diff| = {max_sdiff:.3e}");
assert!(max_sdiff < 1e-2, "DDP params diverged from single-GPU");
}
#[test]
fn ddp_throughput_scaling() {
let max_gpus = device::device_count().unwrap_or(0) as usize;
if max_gpus < 1 {
eprintln!("skip: no GPU");
return;
}
// Same PER-GPU workload at each world size (batch scales with world), so the
// per-rank cost is fixed and global tok/s should scale ~linearly. Use enough
// steps that the one-time NCCL init + model-build overhead (which is larger at
// world=4 and absent at world=1) amortizes — otherwise the wall-clock ratio
// understates steady-state scaling.
let per_gpu_batch = 8usize;
let vocab = 256usize;
let cfg = test_config(vocab);
let corpus = synth_corpus(vocab, 8192);
let steps = 150usize;
let seq_len = 64usize;
let worlds: Vec<usize> = [1, 2, 4].into_iter().filter(|&w| w <= max_gpus).collect();
println!("\n=== DDP throughput scaling (per-GPU batch {per_gpu_batch}, seq {seq_len}) ===");
println!(
"{:>6} | {:>14} | {:>8}",
"GPUs", "tok/s (global)", "speedup"
);
let mut base = 0.0f64;
for &world in &worlds {
let devices: Vec<u32> = (0..world as u32).collect();
let dcfg = DdpConfig {
seq_len,
batch_size: per_gpu_batch * world,
steps,
schedule: LrSchedule {
max_lr: 1e-3,
min_lr: 1e-3,
warmup: 1,
total: steps,
},
weight_decay: 0.0,
max_grad_norm: 1.0,
log_every: 1_000_000,
seed: 1,
};
let total_tokens = (steps * dcfg.batch_size * seq_len) as f64;
let t = Instant::now();
let _ = launch(&devices, &corpus, &dcfg, move |device| {
build_model(cfg, device)
});
let secs = t.elapsed().as_secs_f64();
let tps = total_tokens / secs;
if world == 1 {
base = tps;
}
println!(
"{:>6} | {:>14.0} | {:>7.2}x",
world,
tps,
tps / base.max(1e-9)
);
}
}