diff --git a/crates/xtrain-distributed/tests/ddp_proc.rs b/crates/xtrain-distributed/tests/ddp_proc.rs new file mode 100644 index 0000000..9950569 --- /dev/null +++ b/crates/xtrain-distributed/tests/ddp_proc.rs @@ -0,0 +1,280 @@ +//! Process-per-GPU DDP acceptance (Phase T17). Gated to a GPU host; skips when +//! fewer than 2 GPUs. Run with `--test-threads=1` (distributed tests deadlock if +//! they contend for the same GPUs in parallel — known harness property). +//! +//! Self-launching: the test binary detects WORKER mode via `XTRAIN_RANK` (set by +//! `launch_processes`). In worker mode it runs `run_worker` on a synthetic corpus +//! and dumps its per-step loss trace + final params to a per-rank file; in normal +//! mode it is the launcher — it runs the single-GPU baseline, spawns N worker +//! processes (re-execing itself), reads their dumps back, and asserts: +//! (a) multi-process loss matches single-GPU within `<1e-3`, +//! (b) cross-rank params agree within `<1e-6` (KI-5 ULP tolerance), +//! (c) multi-process loss matches the thread-per-GPU `launch` path within `<1e-3`. + +#![cfg(not(no_cuda))] + +use std::io::Write; +use std::path::Path; + +use xtrain_cuda::device; +use xtrain_distributed::proc::{launch_processes, rank_dump_path, worker_env}; +use xtrain_distributed::{DdpConfig, DdpContext, build_model, train_rank}; +use xtrain_model::{Config, batched_ids_tensor}; +use xtrain_optim::GpuAdamW; +use xtrain_tensor::Device; +use xtrain_train::clip::clip_grad_norm_gpu; +use xtrain_train::data::Corpus; +use xtrain_train::schedule::LrSchedule; + +// ── Shared fixture (identical on launcher + every worker, so they agree) ────── + +const VOCAB: usize = 64; +const STEPS: usize = 20; + +fn synth_corpus() -> Corpus { + let tokens: Vec = (0..4096) + .map(|i| (i * 7 + 3) as i32 % VOCAB as i32) + .collect(); + Corpus { + tokens, + vocab_size: VOCAB, + } +} + +fn test_config() -> Config { + let mut cfg = Config::tiny(); + cfg.vocab = VOCAB; + cfg.n_layers = 2; + cfg +} + +fn dcfg(batch_size: usize) -> DdpConfig { + DdpConfig { + seq_len: 32, + batch_size, + accum_steps: 1, + steps: STEPS, + schedule: LrSchedule { + max_lr: 3e-3, + min_lr: 3e-4, + warmup: 3, + total: STEPS, + }, + weight_decay: 0.1, + max_grad_norm: 1.0, + log_every: 1_000_000, + seed: 7, + eval_every: 0, + eval_batches: 0, + ckpt_path: None, + } +} + +// The dump dir is passed launcher→worker via this env key (separate from the +// XTRAIN_* keys the launcher sets); workers write `rank{N}.dump` there. +const ENV_DUMP_DIR: &str = "XTRAIN_TEST_DUMP_DIR"; +const GLOBAL_BATCH: usize = 8; + +// ── Worker entry: runs when this test binary is re-execed by launch_processes ─ + +fn run_as_worker_if_needed() { + let Some(env) = worker_env() else { return }; + let dump_dir = std::env::var(ENV_DUMP_DIR).expect("dump dir env"); + // This is the worker body `run_worker` performs in production (init ctx → + // build deterministic model → train_rank). We train ONCE inline so we can dump + // both the loss trace AND the final params for the launcher to check; the + // production `run_worker` wrapper is exercised by `bin/train_ddp_mp` on dash5. + let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank); + let device = Device::Cuda(env.local_rank); + let model = build_model(test_config(), device); + let res = train_rank( + &ctx, + &model, + device, + &synth_corpus(), + None, + &dcfg(GLOBAL_BATCH), + ); + let params: Vec> = model + .params() + .iter() + .map(|p| p.value().to_device(Device::Cpu).as_slice::().to_vec()) + .collect(); + write_dump(&dump_dir, env.rank, &res.losses, ¶ms); + std::process::exit(0); +} + +fn write_dump(dir: &str, rank: usize, losses: &[f32], params: &[Vec]) { + let path = rank_dump_path(Path::new(dir), rank); + let mut f = std::fs::File::create(&path).expect("create dump"); + // Line 1: losses (space-separated). Following lines: one param tensor each. + let loss_line: Vec = losses.iter().map(|x| format!("{x:.8e}")).collect(); + writeln!(f, "{}", loss_line.join(" ")).unwrap(); + for p in params { + let line: Vec = p.iter().map(|x| format!("{x:.8e}")).collect(); + writeln!(f, "{}", line.join(" ")).unwrap(); + } +} + +fn read_dump(dir: &str, rank: usize) -> (Vec, Vec>) { + let path = rank_dump_path(Path::new(dir), rank); + let text = std::fs::read_to_string(&path).expect("read dump"); + let mut lines = text.lines(); + let losses: Vec = lines + .next() + .unwrap() + .split_whitespace() + .map(|s| s.parse().unwrap()) + .collect(); + let params: Vec> = lines + .map(|l| l.split_whitespace().map(|s| s.parse().unwrap()).collect()) + .collect(); + (losses, params) +} + +// ── Single-GPU baseline (same loop as the DDP rank, world=1) ────────────────── + +fn run_single_gpu(cfg: Config, corpus: &Corpus, d: &DdpConfig) -> (Vec, Vec>) { + device::set_device(0).unwrap(); + let device = Device::Cuda(0); + let model = build_model(cfg, device); + let params = model.params(); + let mut opt = GpuAdamW::new(d.weight_decay); + let mut rng = d.seed; + let mut losses = Vec::new(); + for step in 0..d.steps { + let lr = d.schedule.lr(step); + let mut inputs = Vec::with_capacity(d.batch_size); + let mut targets_v = Vec::with_capacity(d.batch_size); + for _ in 0..d.batch_size { + let (input, target) = corpus.sample(d.seq_len, &mut rng); + inputs.push(input); + targets_v.push(target); + } + let ids = batched_ids_tensor(&inputs, device); + let targets = batched_ids_tensor(&targets_v, device); + let loss = model.loss_batched(&ids, &targets, d.batch_size); + losses.push(loss.value().to_device(Device::Cpu).as_slice::()[0]); + loss.backward(); + clip_grad_norm_gpu(¶ms, d.max_grad_norm, 1.0); + opt.step(lr, ¶ms); + for p in ¶ms { + p.zero_grad(); + } + } + let host = params + .iter() + .map(|p| p.value().to_device(Device::Cpu).as_slice::().to_vec()) + .collect(); + (losses, host) +} + +// ── The test (launcher mode) ────────────────────────────────────────────────── + +#[test] +fn proc_per_gpu_matches_single_gpu_and_thread_path() { + // If this process was spawned as a worker, do the worker job and exit before + // the test framework runs anything else. + run_as_worker_if_needed(); + + let world = 2usize; + if device::device_count().unwrap_or(0) < world as i32 { + eprintln!("skip: need >= {world} GPUs"); + return; + } + + let cfg = test_config(); + let corpus = synth_corpus(); + let d = dcfg(GLOBAL_BATCH); + + // (1) Single-GPU baseline over the global batch. + let (single_losses, single_params) = run_single_gpu(cfg, &corpus, &d); + + // (2) Thread-per-GPU path (T8 `launch`) — the regression baseline to match. + let thread_results = + xtrain_distributed::launch(&[0u32, 1u32], &corpus, None, &d, move |device| { + build_model(cfg, device) + }); + let thread_losses = &thread_results[0].losses; + + // (3) Process-per-GPU: spawn 2 worker processes (re-exec of this test binary), + // each dumps its loss trace + final params to a temp dir. + let dump_dir = std::env::temp_dir().join(format!("xtrain_t17_{}", std::process::id())); + std::fs::create_dir_all(&dump_dir).unwrap(); + // SAFETY: single-threaded test (forced by --test-threads=1) sets this env + // before spawning workers; no concurrent env access. + unsafe { + std::env::set_var(ENV_DUMP_DIR, &dump_dir); + } + // Re-exec the test binary but run ONLY this test, single-threaded, so the + // worker process does the worker job and exits without touching other tests. + let worker_args = [ + "--exact".to_string(), + "proc_per_gpu_matches_single_gpu_and_thread_path".to_string(), + "--test-threads=1".to_string(), + "--nocapture".to_string(), + ]; + launch_processes(world, &worker_args).expect("worker processes failed"); + + let (proc_losses0, proc_p0) = read_dump(dump_dir.to_str().unwrap(), 0); + let (_proc_losses1, proc_p1) = read_dump(dump_dir.to_str().unwrap(), 1); + + // (a) process-per-GPU loss matches single-GPU. + let max_rel_single = max_rel(&single_losses, &proc_losses0); + println!( + "proc-per-GPU vs single-GPU loss: single[last]={:.6} proc[last]={:.6} max_rel={max_rel_single:.2e}", + single_losses.last().unwrap(), + proc_losses0.last().unwrap() + ); + assert!( + max_rel_single < 1e-3, + "proc-per-GPU loss diverged from single-GPU: {max_rel_single:.3e}" + ); + + // (c) process-per-GPU loss matches the thread-per-GPU path. + let max_rel_thread = max_rel(thread_losses, &proc_losses0); + println!( + "proc-per-GPU vs thread-per-GPU loss: thread[last]={:.6} proc[last]={:.6} max_rel={max_rel_thread:.2e}", + thread_losses.last().unwrap(), + proc_losses0.last().unwrap() + ); + assert!( + max_rel_thread < 1e-3, + "proc-per-GPU loss diverged from thread-per-GPU: {max_rel_thread:.3e}" + ); + + // (b) cross-rank parameter agreement (KI-5 ULP tolerance). + let mut max_pdiff = 0.0f32; + for (a, b) in proc_p0.iter().zip(&proc_p1) { + for (x, y) in a.iter().zip(b) { + max_pdiff = max_pdiff.max((x - y).abs()); + } + } + println!("proc-per-GPU cross-rank max |param diff| = {max_pdiff:.3e}"); + assert!( + max_pdiff < 1e-6, + "ranks' params drifted apart: {max_pdiff:.3e}" + ); + + // Bonus sanity: proc-per-GPU final params vs single-GPU within fp tolerance. + let mut max_sdiff = 0.0f32; + for (a, b) in proc_p0.iter().zip(&single_params) { + for (x, y) in a.iter().zip(b) { + max_sdiff = max_sdiff.max((x - y).abs() / y.abs().max(1e-6)); + } + } + println!("proc-per-GPU vs single-GPU max rel |param diff| = {max_sdiff:.3e}"); + assert!( + max_sdiff < 1e-2, + "proc-per-GPU params diverged from single-GPU" + ); + + let _ = std::fs::remove_dir_all(&dump_dir); +} + +fn max_rel(a: &[f32], b: &[f32]) -> f32 { + a.iter() + .zip(b) + .map(|(s, d)| (s - d).abs() / s.abs().max(1e-6)) + .fold(0.0f32, f32::max) +}