Files
xtrain/crates/xtrain-distributed/tests/ddp_proc.rs
Gahow Wang 6465a2d5ce test: T21-for-proc — clear ENV_DROPOUT across tests to sever ordering coupling
libtest with --test-threads=1 (the documented invariant for this file's DDP
tests) runs tests alphabetically. The new
proc_per_gpu_dropout_is_live_and_p0_matches_no_dropout ('d') runs BEFORE
proc_per_gpu_matches_single_gpu_and_thread_path ('m'). It sets ENV_DROPOUT=0.2
via std::env::set_var; if left in place, the correctness test's spawned workers
would inherit it (Command inherits parent env by default) and build with
cfg.dropout=0.2 while its single-GPU baseline (run_single_gpu → test_config →
dropout=0) stays at 0 — GATE (a) `max_rel_single < 1e-3` would blow up by
orders of magnitude.

Two defenses:
- correctness test remove_var(ENV_DROPOUT) before spawn (belt): even if the
  dropout test forgot to clean up, this test starts from a clean env.
- dropout test remove_var(ENV_DROPOUT, ENV_DUMP_DIR) at exit (suspenders):
  keep the invariant "each test leaves the env as it found it" so any future
  test added after these two starts clean too.

Same --test-threads=1 SAFETY comment applies (no concurrent env access).
2026-07-01 14:09:42 +08:00

410 lines
16 KiB
Rust

//! Process-per-GPU DDP acceptance (Phase T17). Gated to a GPU host; skips when
//! fewer than 2 GPUs. Run with `--test-threads=1` (distributed tests deadlock if
//! they contend for the same GPUs in parallel — known harness property).
//!
//! Self-launching: the test binary detects WORKER mode via `XTRAIN_RANK` (set by
//! `launch_processes`). In worker mode it runs `run_worker` on a synthetic corpus
//! and dumps its per-step loss trace + final params to a per-rank file; in normal
//! mode it is the launcher — it runs the single-GPU baseline, spawns N worker
//! processes (re-execing itself), reads their dumps back, and asserts:
//! (a) multi-process loss matches single-GPU within `<1e-3`,
//! (b) cross-rank params agree within `<1e-6` (KI-5 ULP tolerance),
//! (c) multi-process loss matches the thread-per-GPU `launch` path within `<1e-3`.
//!
//! T21-for-proc regression `proc_per_gpu_dropout_is_live_and_p0_matches_no_dropout`
//! (below) additionally proves that `--dropout` propagates through the process-per-
//! GPU launcher — the analogue of the thread-per-GPU T21 fix. Pre-fix
//! `train_ddp_mp` had no `--dropout` flag, so `cfg.dropout` stayed 0 regardless of
//! what the user passed, silently disabling dropout under process-per-GPU. The
//! GATE B loss-trace signal (>1e-3 gap between p=0 and p=0.2) sits orders of
//! magnitude above the KI-5 cross-rank noise floor and catches that gap directly.
#![cfg(not(no_cuda))]
use std::io::Write;
use std::path::Path;
use xtrain_cuda::device;
use xtrain_distributed::proc::{launch_processes, rank_dump_path, worker_env};
use xtrain_distributed::{DdpConfig, DdpContext, build_model, train_rank};
use xtrain_model::{Config, batched_ids_tensor};
use xtrain_optim::GpuAdamW;
use xtrain_tensor::Device;
use xtrain_train::clip::clip_grad_norm_gpu;
use xtrain_train::data::Corpus;
use xtrain_train::schedule::LrSchedule;
// ── Shared fixture (identical on launcher + every worker, so they agree) ──────
const VOCAB: usize = 64;
const STEPS: usize = 20;
fn synth_corpus() -> Corpus {
let tokens: Vec<i32> = (0..4096)
.map(|i| (i * 7 + 3) as i32 % VOCAB as i32)
.collect();
Corpus {
tokens,
labels: None,
vocab_size: VOCAB,
}
}
fn test_config() -> Config {
let mut cfg = Config::tiny();
cfg.vocab = VOCAB;
cfg.n_layers = 2;
cfg
}
fn dcfg(batch_size: usize) -> DdpConfig {
DdpConfig {
seq_len: 32,
batch_size,
accum_steps: 1,
steps: STEPS,
schedule: LrSchedule {
max_lr: 3e-3,
min_lr: 3e-4,
warmup: 3,
total: STEPS,
},
weight_decay: 0.1,
max_grad_norm: 1.0,
log_every: 1_000_000,
seed: 7,
eval_every: 0,
eval_batches: 0,
ckpt_path: None,
}
}
// The dump dir is passed launcher→worker via this env key (separate from the
// XTRAIN_* keys the launcher sets); workers write `rank{N}.dump` there.
const ENV_DUMP_DIR: &str = "XTRAIN_TEST_DUMP_DIR";
// Optional launcher→worker channel for `cfg.dropout`. Absent = 0.0 = the existing
// correctness test's contract (no perturbation). The T21-for-proc regression test
// below sets it before each `launch_processes` call to prove the process-per-GPU
// path actually plumbs `--dropout` into every worker's model.
const ENV_DROPOUT: &str = "XTRAIN_TEST_DROPOUT";
const GLOBAL_BATCH: usize = 8;
fn worker_dropout() -> f32 {
std::env::var(ENV_DROPOUT)
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0.0)
}
// ── Worker entry: runs when this test binary is re-execed by launch_processes ─
fn run_as_worker_if_needed() {
let Some(env) = worker_env() else { return };
let dump_dir = std::env::var(ENV_DUMP_DIR).expect("dump dir env");
// This is the worker body `run_worker` performs in production (init ctx →
// build deterministic model → train_rank). We train ONCE inline so we can dump
// both the loss trace AND the final params for the launcher to check; the
// production `run_worker` wrapper is exercised by `bin/train_ddp_mp` on dash5.
let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank);
let device = Device::Cuda(env.local_rank);
// Mirrors bin/train_ddp_mp's `cfg.dropout = dropout` wiring — the T21-for-proc
// regression: if this line were missing (the pre-fix launcher's exact gap),
// `cfg.dropout` would stay 0 and the GATE B test below would find a bit-
// identical p=0 / p=0.2 loss trace and FAIL.
let mut cfg = test_config();
cfg.dropout = worker_dropout();
let model = build_model(cfg, device);
let res = train_rank(
&ctx,
&model,
device,
&synth_corpus(),
None,
&dcfg(GLOBAL_BATCH),
);
let params: Vec<Vec<f32>> = model
.params()
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
write_dump(&dump_dir, env.rank, &res.losses, &params);
std::process::exit(0);
}
fn write_dump(dir: &str, rank: usize, losses: &[f32], params: &[Vec<f32>]) {
let path = rank_dump_path(Path::new(dir), rank);
let mut f = std::fs::File::create(&path).expect("create dump");
// Line 1: losses (space-separated). Following lines: one param tensor each.
let loss_line: Vec<String> = losses.iter().map(|x| format!("{x:.8e}")).collect();
writeln!(f, "{}", loss_line.join(" ")).unwrap();
for p in params {
let line: Vec<String> = p.iter().map(|x| format!("{x:.8e}")).collect();
writeln!(f, "{}", line.join(" ")).unwrap();
}
}
fn read_dump(dir: &str, rank: usize) -> (Vec<f32>, Vec<Vec<f32>>) {
let path = rank_dump_path(Path::new(dir), rank);
let text = std::fs::read_to_string(&path).expect("read dump");
let mut lines = text.lines();
let losses: Vec<f32> = lines
.next()
.unwrap()
.split_whitespace()
.map(|s| s.parse().unwrap())
.collect();
let params: Vec<Vec<f32>> = lines
.map(|l| l.split_whitespace().map(|s| s.parse().unwrap()).collect())
.collect();
(losses, params)
}
// ── Single-GPU baseline (same loop as the DDP rank, world=1) ──────────────────
fn run_single_gpu(cfg: Config, corpus: &Corpus, d: &DdpConfig) -> (Vec<f32>, Vec<Vec<f32>>) {
device::set_device(0).unwrap();
let device = Device::Cuda(0);
let model = build_model(cfg, device);
let params = model.params();
let mut opt = GpuAdamW::new(d.weight_decay);
let mut rng = d.seed;
let mut losses = Vec::new();
for step in 0..d.steps {
let lr = d.schedule.lr(step);
let mut inputs = Vec::with_capacity(d.batch_size);
let mut targets_v = Vec::with_capacity(d.batch_size);
for _ in 0..d.batch_size {
let (input, target) = corpus.sample(d.seq_len, &mut rng);
inputs.push(input);
targets_v.push(target);
}
let ids = batched_ids_tensor(&inputs, device);
let targets = batched_ids_tensor(&targets_v, device);
let loss = model.loss_batched(&ids, &targets, d.batch_size);
losses.push(loss.value().to_device(Device::Cpu).as_slice::<f32>()[0]);
loss.backward();
clip_grad_norm_gpu(&params, d.max_grad_norm, 1.0);
opt.step(lr, &params);
for p in &params {
p.zero_grad();
}
}
let host = params
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
(losses, host)
}
// ── The test (launcher mode) ──────────────────────────────────────────────────
#[test]
fn proc_per_gpu_matches_single_gpu_and_thread_path() {
// If this process was spawned as a worker, do the worker job and exit before
// the test framework runs anything else.
run_as_worker_if_needed();
let world = 2usize;
if device::device_count().unwrap_or(0) < world as i32 {
eprintln!("skip: need >= {world} GPUs");
return;
}
let cfg = test_config();
let corpus = synth_corpus();
let d = dcfg(GLOBAL_BATCH);
// (1) Single-GPU baseline over the global batch.
let (single_losses, single_params) = run_single_gpu(cfg, &corpus, &d);
// (2) Thread-per-GPU path (T8 `launch`) — the regression baseline to match.
let thread_results =
xtrain_distributed::launch(&[0u32, 1u32], &corpus, None, &d, move |device| {
build_model(cfg, device)
});
let thread_losses = &thread_results[0].losses;
// (3) Process-per-GPU: spawn 2 worker processes (re-exec of this test binary),
// each dumps its loss trace + final params to a temp dir.
let dump_dir = std::env::temp_dir().join(format!("xtrain_t17_{}", std::process::id()));
std::fs::create_dir_all(&dump_dir).unwrap();
// SAFETY: single-threaded test (forced by --test-threads=1) sets this env
// before spawning workers; no concurrent env access. ENV_DROPOUT is cleared
// defensively — libtest orders `--test-threads=1` runs alphabetically, so the
// sibling `proc_per_gpu_dropout_is_live_...` test (starts with 'd') runs BEFORE
// this one (starts with 'm'). If it happened to leak `ENV_DROPOUT=0.2` in this
// process's env, the workers here would inherit it (Command inherits parent
// env by default) and build with dropout=0.2 while the single-GPU baseline
// (run_single_gpu → test_config → dropout=0) stays at 0 — GATE (a) would blow up.
// Explicit remove here severs that ordering coupling.
unsafe {
std::env::remove_var(ENV_DROPOUT);
std::env::set_var(ENV_DUMP_DIR, &dump_dir);
}
// Re-exec the test binary but run ONLY this test, single-threaded, so the
// worker process does the worker job and exits without touching other tests.
let worker_args = [
"--exact".to_string(),
"proc_per_gpu_matches_single_gpu_and_thread_path".to_string(),
"--test-threads=1".to_string(),
"--nocapture".to_string(),
];
launch_processes(world, &worker_args).expect("worker processes failed");
let (proc_losses0, proc_p0) = read_dump(dump_dir.to_str().unwrap(), 0);
let (_proc_losses1, proc_p1) = read_dump(dump_dir.to_str().unwrap(), 1);
// (a) process-per-GPU loss matches single-GPU.
let max_rel_single = max_rel(&single_losses, &proc_losses0);
println!(
"proc-per-GPU vs single-GPU loss: single[last]={:.6} proc[last]={:.6} max_rel={max_rel_single:.2e}",
single_losses.last().unwrap(),
proc_losses0.last().unwrap()
);
assert!(
max_rel_single < 1e-3,
"proc-per-GPU loss diverged from single-GPU: {max_rel_single:.3e}"
);
// (c) process-per-GPU loss matches the thread-per-GPU path.
let max_rel_thread = max_rel(thread_losses, &proc_losses0);
println!(
"proc-per-GPU vs thread-per-GPU loss: thread[last]={:.6} proc[last]={:.6} max_rel={max_rel_thread:.2e}",
thread_losses.last().unwrap(),
proc_losses0.last().unwrap()
);
assert!(
max_rel_thread < 1e-3,
"proc-per-GPU loss diverged from thread-per-GPU: {max_rel_thread:.3e}"
);
// (b) cross-rank parameter agreement (KI-5 ULP tolerance).
let mut max_pdiff = 0.0f32;
for (a, b) in proc_p0.iter().zip(&proc_p1) {
for (x, y) in a.iter().zip(b) {
max_pdiff = max_pdiff.max((x - y).abs());
}
}
println!("proc-per-GPU cross-rank max |param diff| = {max_pdiff:.3e}");
assert!(
max_pdiff < 1e-6,
"ranks' params drifted apart: {max_pdiff:.3e}"
);
// Bonus sanity: proc-per-GPU final params vs single-GPU within fp tolerance.
let mut max_sdiff = 0.0f32;
for (a, b) in proc_p0.iter().zip(&single_params) {
for (x, y) in a.iter().zip(b) {
max_sdiff = max_sdiff.max((x - y).abs() / y.abs().max(1e-6));
}
}
println!("proc-per-GPU vs single-GPU max rel |param diff| = {max_sdiff:.3e}");
assert!(
max_sdiff < 1e-2,
"proc-per-GPU params diverged from single-GPU"
);
let _ = std::fs::remove_dir_all(&dump_dir);
}
/// T21-for-proc regression: prove that `--dropout` actually reaches the model
/// under process-per-GPU. The pre-fix `bin/train_ddp_mp` had no `--dropout` flag
/// and never set `cfg.dropout`, so the launcher's worker built its model with
/// dropout stuck at 0 — silent identity, regardless of what the user passed. The
/// thread-per-GPU T21 fix caught the analogous gap; this test caps the same gap
/// on the proc-per-GPU path with the same GATE-B pattern (loss trajectory of a
/// p=0.2 run differs from p=0 by a large margin, well above the NCCL noise floor).
///
/// Both runs share the corpus, the initial params (via `build_model`'s deterministic
/// LCG), and every other config knob; the ONLY difference is `cfg.dropout`. If the
/// worker didn't plumb the env-provided dropout into `cfg.dropout` (the exact pre-
/// fix regression), both traces would be bit-identical and this test would FAIL.
/// The `>1e-3` threshold sits orders of magnitude above the KI-5 cross-rank ULP
/// noise floor (~1e-7 on this PCIe box), so it's a hard signal for "dropout is
/// active" rather than a noise measurement. Mirrors
/// `ddp_dropout_is_live_and_p0_bit_identical` in ddp_correctness.rs for T21's
/// thread-per-GPU fix.
#[test]
fn proc_per_gpu_dropout_is_live_and_p0_matches_no_dropout() {
run_as_worker_if_needed();
let world = 2usize;
if device::device_count().unwrap_or(0) < world as i32 {
eprintln!("skip: need >= {world} GPUs");
return;
}
let base_dump_dir = std::env::temp_dir().join(format!("xtrain_t21mp_{}", std::process::id()));
std::fs::create_dir_all(&base_dump_dir).unwrap();
let worker_args = [
"--exact".to_string(),
"proc_per_gpu_dropout_is_live_and_p0_matches_no_dropout".to_string(),
"--test-threads=1".to_string(),
"--nocapture".to_string(),
];
// Helper: launch `world` workers with a specific dropout prob (via env), read
// rank 0's loss trace, clean up. Uses a subdir per run so the two invocations
// do not clobber each other's dumps.
let mut launch_with_dropout = |p: f32, tag: &str| -> Vec<f32> {
let dump_dir = base_dump_dir.join(tag);
std::fs::create_dir_all(&dump_dir).unwrap();
// SAFETY: single-threaded test (forced by --test-threads=1); no concurrent env access.
unsafe {
std::env::set_var(ENV_DUMP_DIR, &dump_dir);
std::env::set_var(ENV_DROPOUT, format!("{p}"));
}
launch_processes(world, &worker_args).expect("worker processes failed");
let (losses, _) = read_dump(dump_dir.to_str().unwrap(), 0);
losses
};
let loss_p0 = launch_with_dropout(0.0, "p0");
let loss_p1 = launch_with_dropout(0.2, "p02");
// GATE B — dropout is LIVE under process-per-GPU with p>0. If the worker
// didn't set `cfg.dropout` (the pre-fix gap), the two traces would match to
// the ~1e-7 NCCL noise floor. Anything above ~1e-3 is unambiguous evidence
// that dropout masks are actually applied in every worker's forward.
let max_live_diff = loss_p0
.iter()
.zip(&loss_p1)
.map(|(a, b)| (a - b).abs())
.fold(0.0f32, f32::max);
println!(
"T21-proc GATE B (dropout live under proc-per-GPU): p0[last]={:.6} p0.2[last]={:.6} max |loss diff| = {max_live_diff:.3e}",
loss_p0.last().unwrap(),
loss_p1.last().unwrap()
);
assert!(
max_live_diff > 1e-3,
"p=0.2 proc-per-GPU loss matches p=0 — dropout NOT plumbed through the \
process-per-GPU launcher (cfg.dropout stayed 0 in the worker): max |loss diff| {max_live_diff:.3e}"
);
// No NaN/Inf in the p>0 run.
assert!(
loss_p1.iter().all(|l| l.is_finite()),
"p=0.2 proc-per-GPU loss has non-finite values"
);
// Clear the launcher→worker env keys so we don't leak state to anything that
// runs later in this process. `proc_per_gpu_matches_single_gpu_and_thread_path`
// clears ENV_DROPOUT defensively too, but keeping the invariant "each test
// leaves the env as it found it" costs nothing.
// SAFETY: single-threaded test (forced by --test-threads=1); no concurrent env access.
unsafe {
std::env::remove_var(ENV_DROPOUT);
std::env::remove_var(ENV_DUMP_DIR);
}
let _ = std::fs::remove_dir_all(&base_dump_dir);
}
fn max_rel(a: &[f32], b: &[f32]) -> f32 {
a.iter()
.zip(b)
.map(|(s, d)| (s - d).abs() / s.abs().max(1e-6))
.fold(0.0f32, f32::max)
}