test: process-per-GPU DDP correctness (ddp_proc.rs)

Self-launching test: worker mode (XTRAIN_RANK set) trains on synthetic corpus
and dumps loss+params; launcher mode runs single-GPU baseline + thread-per-GPU
launch + spawns 2 worker processes, then asserts (a) proc loss == single-GPU
<1e-3, (b) cross-rank params <1e-6 (KI-5 ULP), (c) proc loss == thread-per-GPU
<1e-3. Run with --test-threads=1 (distributed harness property).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-18 17:48:52 +08:00
parent a188c8a277
commit 4abb17383a

View File

@@ -0,0 +1,280 @@
//! Process-per-GPU DDP acceptance (Phase T17). Gated to a GPU host; skips when
//! fewer than 2 GPUs. Run with `--test-threads=1` (distributed tests deadlock if
//! they contend for the same GPUs in parallel — known harness property).
//!
//! Self-launching: the test binary detects WORKER mode via `XTRAIN_RANK` (set by
//! `launch_processes`). In worker mode it runs `run_worker` on a synthetic corpus
//! and dumps its per-step loss trace + final params to a per-rank file; in normal
//! mode it is the launcher — it runs the single-GPU baseline, spawns N worker
//! processes (re-execing itself), reads their dumps back, and asserts:
//! (a) multi-process loss matches single-GPU within `<1e-3`,
//! (b) cross-rank params agree within `<1e-6` (KI-5 ULP tolerance),
//! (c) multi-process loss matches the thread-per-GPU `launch` path within `<1e-3`.
#![cfg(not(no_cuda))]
use std::io::Write;
use std::path::Path;
use xtrain_cuda::device;
use xtrain_distributed::proc::{launch_processes, rank_dump_path, worker_env};
use xtrain_distributed::{DdpConfig, DdpContext, build_model, train_rank};
use xtrain_model::{Config, batched_ids_tensor};
use xtrain_optim::GpuAdamW;
use xtrain_tensor::Device;
use xtrain_train::clip::clip_grad_norm_gpu;
use xtrain_train::data::Corpus;
use xtrain_train::schedule::LrSchedule;
// ── Shared fixture (identical on launcher + every worker, so they agree) ──────
const VOCAB: usize = 64;
const STEPS: usize = 20;
fn synth_corpus() -> Corpus {
let tokens: Vec<i32> = (0..4096)
.map(|i| (i * 7 + 3) as i32 % VOCAB as i32)
.collect();
Corpus {
tokens,
vocab_size: VOCAB,
}
}
fn test_config() -> Config {
let mut cfg = Config::tiny();
cfg.vocab = VOCAB;
cfg.n_layers = 2;
cfg
}
fn dcfg(batch_size: usize) -> DdpConfig {
DdpConfig {
seq_len: 32,
batch_size,
accum_steps: 1,
steps: STEPS,
schedule: LrSchedule {
max_lr: 3e-3,
min_lr: 3e-4,
warmup: 3,
total: STEPS,
},
weight_decay: 0.1,
max_grad_norm: 1.0,
log_every: 1_000_000,
seed: 7,
eval_every: 0,
eval_batches: 0,
ckpt_path: None,
}
}
// The dump dir is passed launcher→worker via this env key (separate from the
// XTRAIN_* keys the launcher sets); workers write `rank{N}.dump` there.
const ENV_DUMP_DIR: &str = "XTRAIN_TEST_DUMP_DIR";
const GLOBAL_BATCH: usize = 8;
// ── Worker entry: runs when this test binary is re-execed by launch_processes ─
fn run_as_worker_if_needed() {
let Some(env) = worker_env() else { return };
let dump_dir = std::env::var(ENV_DUMP_DIR).expect("dump dir env");
// This is the worker body `run_worker` performs in production (init ctx →
// build deterministic model → train_rank). We train ONCE inline so we can dump
// both the loss trace AND the final params for the launcher to check; the
// production `run_worker` wrapper is exercised by `bin/train_ddp_mp` on dash5.
let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank);
let device = Device::Cuda(env.local_rank);
let model = build_model(test_config(), device);
let res = train_rank(
&ctx,
&model,
device,
&synth_corpus(),
None,
&dcfg(GLOBAL_BATCH),
);
let params: Vec<Vec<f32>> = model
.params()
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
write_dump(&dump_dir, env.rank, &res.losses, &params);
std::process::exit(0);
}
fn write_dump(dir: &str, rank: usize, losses: &[f32], params: &[Vec<f32>]) {
let path = rank_dump_path(Path::new(dir), rank);
let mut f = std::fs::File::create(&path).expect("create dump");
// Line 1: losses (space-separated). Following lines: one param tensor each.
let loss_line: Vec<String> = losses.iter().map(|x| format!("{x:.8e}")).collect();
writeln!(f, "{}", loss_line.join(" ")).unwrap();
for p in params {
let line: Vec<String> = p.iter().map(|x| format!("{x:.8e}")).collect();
writeln!(f, "{}", line.join(" ")).unwrap();
}
}
fn read_dump(dir: &str, rank: usize) -> (Vec<f32>, Vec<Vec<f32>>) {
let path = rank_dump_path(Path::new(dir), rank);
let text = std::fs::read_to_string(&path).expect("read dump");
let mut lines = text.lines();
let losses: Vec<f32> = lines
.next()
.unwrap()
.split_whitespace()
.map(|s| s.parse().unwrap())
.collect();
let params: Vec<Vec<f32>> = lines
.map(|l| l.split_whitespace().map(|s| s.parse().unwrap()).collect())
.collect();
(losses, params)
}
// ── Single-GPU baseline (same loop as the DDP rank, world=1) ──────────────────
fn run_single_gpu(cfg: Config, corpus: &Corpus, d: &DdpConfig) -> (Vec<f32>, Vec<Vec<f32>>) {
device::set_device(0).unwrap();
let device = Device::Cuda(0);
let model = build_model(cfg, device);
let params = model.params();
let mut opt = GpuAdamW::new(d.weight_decay);
let mut rng = d.seed;
let mut losses = Vec::new();
for step in 0..d.steps {
let lr = d.schedule.lr(step);
let mut inputs = Vec::with_capacity(d.batch_size);
let mut targets_v = Vec::with_capacity(d.batch_size);
for _ in 0..d.batch_size {
let (input, target) = corpus.sample(d.seq_len, &mut rng);
inputs.push(input);
targets_v.push(target);
}
let ids = batched_ids_tensor(&inputs, device);
let targets = batched_ids_tensor(&targets_v, device);
let loss = model.loss_batched(&ids, &targets, d.batch_size);
losses.push(loss.value().to_device(Device::Cpu).as_slice::<f32>()[0]);
loss.backward();
clip_grad_norm_gpu(&params, d.max_grad_norm, 1.0);
opt.step(lr, &params);
for p in &params {
p.zero_grad();
}
}
let host = params
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
(losses, host)
}
// ── The test (launcher mode) ──────────────────────────────────────────────────
#[test]
fn proc_per_gpu_matches_single_gpu_and_thread_path() {
// If this process was spawned as a worker, do the worker job and exit before
// the test framework runs anything else.
run_as_worker_if_needed();
let world = 2usize;
if device::device_count().unwrap_or(0) < world as i32 {
eprintln!("skip: need >= {world} GPUs");
return;
}
let cfg = test_config();
let corpus = synth_corpus();
let d = dcfg(GLOBAL_BATCH);
// (1) Single-GPU baseline over the global batch.
let (single_losses, single_params) = run_single_gpu(cfg, &corpus, &d);
// (2) Thread-per-GPU path (T8 `launch`) — the regression baseline to match.
let thread_results =
xtrain_distributed::launch(&[0u32, 1u32], &corpus, None, &d, move |device| {
build_model(cfg, device)
});
let thread_losses = &thread_results[0].losses;
// (3) Process-per-GPU: spawn 2 worker processes (re-exec of this test binary),
// each dumps its loss trace + final params to a temp dir.
let dump_dir = std::env::temp_dir().join(format!("xtrain_t17_{}", std::process::id()));
std::fs::create_dir_all(&dump_dir).unwrap();
// SAFETY: single-threaded test (forced by --test-threads=1) sets this env
// before spawning workers; no concurrent env access.
unsafe {
std::env::set_var(ENV_DUMP_DIR, &dump_dir);
}
// Re-exec the test binary but run ONLY this test, single-threaded, so the
// worker process does the worker job and exits without touching other tests.
let worker_args = [
"--exact".to_string(),
"proc_per_gpu_matches_single_gpu_and_thread_path".to_string(),
"--test-threads=1".to_string(),
"--nocapture".to_string(),
];
launch_processes(world, &worker_args).expect("worker processes failed");
let (proc_losses0, proc_p0) = read_dump(dump_dir.to_str().unwrap(), 0);
let (_proc_losses1, proc_p1) = read_dump(dump_dir.to_str().unwrap(), 1);
// (a) process-per-GPU loss matches single-GPU.
let max_rel_single = max_rel(&single_losses, &proc_losses0);
println!(
"proc-per-GPU vs single-GPU loss: single[last]={:.6} proc[last]={:.6} max_rel={max_rel_single:.2e}",
single_losses.last().unwrap(),
proc_losses0.last().unwrap()
);
assert!(
max_rel_single < 1e-3,
"proc-per-GPU loss diverged from single-GPU: {max_rel_single:.3e}"
);
// (c) process-per-GPU loss matches the thread-per-GPU path.
let max_rel_thread = max_rel(thread_losses, &proc_losses0);
println!(
"proc-per-GPU vs thread-per-GPU loss: thread[last]={:.6} proc[last]={:.6} max_rel={max_rel_thread:.2e}",
thread_losses.last().unwrap(),
proc_losses0.last().unwrap()
);
assert!(
max_rel_thread < 1e-3,
"proc-per-GPU loss diverged from thread-per-GPU: {max_rel_thread:.3e}"
);
// (b) cross-rank parameter agreement (KI-5 ULP tolerance).
let mut max_pdiff = 0.0f32;
for (a, b) in proc_p0.iter().zip(&proc_p1) {
for (x, y) in a.iter().zip(b) {
max_pdiff = max_pdiff.max((x - y).abs());
}
}
println!("proc-per-GPU cross-rank max |param diff| = {max_pdiff:.3e}");
assert!(
max_pdiff < 1e-6,
"ranks' params drifted apart: {max_pdiff:.3e}"
);
// Bonus sanity: proc-per-GPU final params vs single-GPU within fp tolerance.
let mut max_sdiff = 0.0f32;
for (a, b) in proc_p0.iter().zip(&single_params) {
for (x, y) in a.iter().zip(b) {
max_sdiff = max_sdiff.max((x - y).abs() / y.abs().max(1e-6));
}
}
println!("proc-per-GPU vs single-GPU max rel |param diff| = {max_sdiff:.3e}");
assert!(
max_sdiff < 1e-2,
"proc-per-GPU params diverged from single-GPU"
);
let _ = std::fs::remove_dir_all(&dump_dir);
}
fn max_rel(a: &[f32], b: &[f32]) -> f32 {
a.iter()
.zip(b)
.map(|(s, d)| (s - d).abs() / s.abs().max(1e-6))
.fold(0.0f32, f32::max)
}