Compare commits

...

4 Commits

Author SHA1 Message Date
4abb17383a test: process-per-GPU DDP correctness (ddp_proc.rs)
Self-launching test: worker mode (XTRAIN_RANK set) trains on synthetic corpus
and dumps loss+params; launcher mode runs single-GPU baseline + thread-per-GPU
launch + spawns 2 worker processes, then asserts (a) proc loss == single-GPU
<1e-3, (b) cross-rank params <1e-6 (KI-5 ULP), (c) proc loss == thread-per-GPU
<1e-3. Run with --test-threads=1 (distributed harness property).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 17:48:52 +08:00
a188c8a277 distributed: train_ddp_mp bin (process-per-GPU launcher/worker)
Dual-mode binary self-detecting via XTRAIN_RANK: launcher spawns one worker
per visible GPU forwarding full argv; worker rebuilds config from argv and runs
run_worker. CLI flags identical to train_ddp (thread-per-GPU, kept), so it
doubles as the before->after throughput driver. thread-per-GPU path untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 17:48:52 +08:00
ffd548b80b distributed: process-per-GPU launcher + worker (proc.rs)
torchrun-style process-per-GPU: launch_processes spawns one worker process per
GPU (re-exec current_exe with XTRAIN_{RANK,WORLD,LOCAL_RANK,NCCL_ID} env),
mints the ncclUniqueId once in the launcher and hex-injects it via env (no
shared FS/TCP, race-free). worker_env/run_worker read the env, bind the device
(own CUDA context), DdpContext::init + build_model + train_rank reused from T8
UNCHANGED. hex_encode/decode_unique_id are host-testable pure fns.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 17:48:43 +08:00
c470c627a7 docs: Phase T17 — process-per-GPU DDP design
torchrun-style: launcher spawns N worker processes, each with its own CUDA
context; cross-process ncclUniqueId distributed via launcher-minted hex env
injection (race-free, no shared FS / TCP); train_rank + grad all-reduce reused
unchanged. Keeps thread-per-GPU path as regression baseline. ZeRO-1 dropped
(user scope decision).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-18 17:44:38 +08:00
5 changed files with 905 additions and 0 deletions

View File

@@ -0,0 +1,203 @@
//! Process-per-GPU DDP launcher / worker (Phase T17, torchrun-style).
//!
//! ONE binary, two modes (it self-detects via `XTRAIN_RANK`):
//! - **launcher** (env unset): mints the NCCL `ncclUniqueId`, then spawns one
//! WORKER process per visible GPU, re-execing this same binary with the same
//! argv plus `XTRAIN_{RANK,WORLD,LOCAL_RANK,NCCL_ID}` env, and waits for them.
//! - **worker** (`XTRAIN_RANK` set): binds its GPU (→ its own CUDA context),
//! inits NCCL with the launcher-supplied id, builds its model, runs
//! `train_rank` — the T8 training step reused UNCHANGED.
//!
//! Versus `train_ddp` (thread-per-GPU, kept as the regression baseline) the ONLY
//! difference is the launch model + cross-process UniqueId bootstrap. CLI flags
//! are identical, so it doubles as the before→after throughput driver.
//!
//! Run on dash5 (pick idle GPUs — dash5 is shared):
//! export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
//! CUDA_VISIBLE_DEVICES=0,1,2,3 cargo run -p xtrain-distributed --release \
//! --bin train_ddp_mp -- /opt/wjh/models/gpt2/tokenizer.json \
//! data/tinystories-valid-3mb.txt \
//! --dim 384 --heads 12 --head-dim 32 --layers 12 --ffn 1536 \
//! --steps 200 --batch 128 --seq 256
#[cfg(no_cuda)]
fn main() {
eprintln!("train_ddp_mp: built without CUDA (no_cuda); run on a GPU host (dash5).");
}
#[cfg(not(no_cuda))]
use std::path::PathBuf;
// A flag like `--dim 384`: scan argv for `name`, parse the following token.
#[cfg(not(no_cuda))]
fn flag<T: std::str::FromStr>(args: &[String], name: &str, default: T) -> T {
args.iter()
.position(|a| a == name)
.and_then(|i| args.get(i + 1))
.and_then(|s| s.parse().ok())
.unwrap_or(default)
}
#[cfg(not(no_cuda))]
fn main() {
use xtrain_cuda::device;
use xtrain_distributed::DdpConfig;
use xtrain_distributed::proc::{ModelOpts, launch_processes, run_worker, worker_env};
use xtrain_model::Config;
use xtrain_train::data::Corpus;
use xtrain_train::schedule::LrSchedule;
let args: Vec<String> = std::env::args().collect();
// ── Launcher mode: no XTRAIN_RANK in env → spawn one worker per visible GPU.
let env = worker_env();
if env.is_none() {
let count = device::device_count().expect("device_count");
assert!(count > 0, "no CUDA device visible");
let world = count as usize;
// Forward the full argv (minus argv[0]) to each worker verbatim.
let extra: Vec<String> = args[1..].to_vec();
println!("DDP (process-per-GPU): launching {world} worker processes (one per visible GPU)");
match launch_processes(world, &extra) {
Ok(()) => {}
Err(e) => {
eprintln!("launcher: {e}");
std::process::exit(1);
}
}
return;
}
let env = env.unwrap();
// ── Worker mode: build config from the forwarded argv, then train this rank.
// First two non-flag positionals: tokenizer.json, corpus.txt.
let positionals: Vec<&String> = args[1..].iter().filter(|a| !a.starts_with("--")).collect();
let tok_path = positionals
.first()
.map(|s| PathBuf::from(s.as_str()))
.unwrap_or_else(|| PathBuf::from("/opt/wjh/models/gpt2/tokenizer.json"));
let corpus_path = positionals
.get(1)
.map(|s| PathBuf::from(s.as_str()))
.unwrap_or_else(|| PathBuf::from("data/tinystories-valid-3mb.txt"));
// Architecture (scaling-ladder rung). Defaults = v0-baseline tiny config.
let n_heads = flag(&args, "--heads", 2usize);
let head_dim = flag(&args, "--head-dim", 16usize);
let n_layers = flag(&args, "--layers", 4usize);
let ffn = flag(&args, "--ffn", 64usize);
let kv_heads = flag(&args, "--kv-heads", n_heads);
let dim_flag = flag(&args, "--dim", 0usize);
if dim_flag != 0 && dim_flag != n_heads * head_dim {
eprintln!(
"warning: --dim {dim_flag} != heads*head_dim {}; using {}",
n_heads * head_dim,
n_heads * head_dim
);
}
// Optimization knobs (mirror train_ddp).
let steps: usize = flag(&args, "--steps", 100);
let batch: usize = flag(&args, "--batch", 16);
let accum_steps: usize = flag(&args, "--accum-steps", 1).max(1);
let seq_len: usize = flag(&args, "--seq", 64);
let max_lr: f32 = flag(&args, "--max-lr", 3e-3);
let min_lr: f32 = flag(&args, "--min-lr", max_lr * 0.1);
let weight_decay: f32 = flag(&args, "--wd", 0.1);
let max_grad_norm: f32 = flag(&args, "--clip", 1.0);
let val_tokens: usize = flag(&args, "--val-tokens", 0);
let eval_every: usize = flag(&args, "--eval-every", 0);
let eval_batches: usize = flag(&args, "--eval-batches", 64);
let opts = ModelOpts {
bf16: args.iter().any(|a| a == "--bf16"),
recompute: args.iter().any(|a| a == "--recompute"),
flash: args.iter().any(|a| a == "--flash"),
};
let ckpt: Option<PathBuf> = args
.iter()
.position(|a| a == "--ckpt")
.and_then(|i| args.get(i + 1))
.map(PathBuf::from);
assert_eq!(
batch % env.world,
0,
"global batch {batch} not divisible by world {}",
env.world
);
// Each worker loads the corpus independently (read-only u16 cache hit → cheap).
let corpus = Corpus::load_cached(&tok_path, &corpus_path);
let vocab = corpus.vocab_size;
let (train_corpus, valid): (Corpus, Option<Corpus>) = if val_tokens > 0 {
let (t, v) = corpus.split_tail(val_tokens);
(t, Some(v))
} else {
(corpus, None)
};
let cfg = Config::from_arch(vocab, n_heads, head_dim, n_layers, ffn).with_kv_heads(kv_heads);
if env.rank == 0 {
println!(
"model: dim {} layers {} heads {} kv_heads {} head_dim {} ffn {} → core {:.3}M params \
(+ embed/lm {:.2}M = {:.2}M total) | world={} mode=process-per-GPU",
cfg.dim,
cfg.n_layers,
cfg.n_heads,
cfg.num_kv_heads,
cfg.head_dim,
cfg.ffn_hidden,
cfg.core_params() as f32 / 1e6,
(cfg.num_params() - cfg.core_params()) as f32 / 1e6,
cfg.num_params() as f32 / 1e6,
env.world,
);
if opts.bf16 {
println!("bf16 mixed precision: ON (fp32 master weights)");
}
if opts.recompute {
println!("activation recompute: ON (per-block gradient checkpointing)");
}
if opts.flash {
println!("flash-attention: ON (fused SDPA kernel, no materialized scores)");
}
}
let dcfg = DdpConfig {
seq_len,
batch_size: batch,
accum_steps,
steps,
schedule: LrSchedule {
max_lr,
min_lr,
warmup: (steps / 20).max(5),
total: steps,
},
weight_decay,
max_grad_norm,
log_every: 50,
seed: 42,
eval_every,
eval_batches,
ckpt_path: ckpt.clone(),
};
let res = run_worker(&env, cfg, opts, &train_corpus, valid.as_ref(), &dcfg);
if env.rank == 0 {
let start = res.losses.first().copied().unwrap_or(0.0);
let end = res.losses.last().copied().unwrap_or(0.0);
println!("train loss: start {start:.4} → end {end:.4}");
if let Some(best) = res.best_val {
println!("best val loss: {best:.4}");
}
if let Some((s, v)) = res.evals.last() {
println!("final val loss (step {s}): {v:.4}");
}
if let Some(path) = &ckpt {
println!("best-val checkpoint → {}", path.display());
}
}
}

View File

@@ -18,8 +18,13 @@
pub mod ddp;
pub mod ffi;
pub mod proc;
pub use ddp::{DdpConfig, DdpResult, build_model, launch, train_rank};
pub use proc::{
ModelOpts, WorkerEnv, build_worker_model, hex_decode_unique_id, hex_encode_unique_id,
launch_processes, run_worker, worker_env,
};
use std::ffi::c_void;

View File

@@ -0,0 +1,200 @@
//! Process-per-GPU DDP launcher + worker (Phase T17, torchrun-style).
//!
//! T8's DDP is single-process, thread-per-GPU: N rank threads share ONE CUDA
//! primary context, so much of the driver work (kernel launch, cuBLAS handle,
//! stream queueing) serializes at the context level — the residual ~5×@8
//! non-linearity left after T11's allocator fix (see docs/10 / KI-5).
//!
//! Process-per-GPU gives each rank its OWN OS process and OWN CUDA context, so
//! those driver calls no longer queue in a shared context. Only the LAUNCH model
//! and the cross-process NCCL bootstrap change; the training step
//! (`train_rank` → grad all-reduce → local AdamW) and the consistency argument
//! are reused from T8 UNCHANGED.
//!
//! UniqueId rendezvous: the LAUNCHER (the common parent of every worker) mints
//! the `ncclUniqueId` once, hex-encodes it, and injects it into each worker's env
//! at spawn time. No shared file / TCP server / polling — the id is atomically
//! present before the child exists, so there is no "id not ready yet" race. This
//! is the simplest single-node mechanism (see docs/16).
use std::path::PathBuf;
use std::process::{Command, Stdio};
use xtrain_model::{Config, TinyTransformer};
use xtrain_tensor::{DType, Device};
use xtrain_train::data::Corpus;
use crate::ddp::{DdpConfig, DdpResult, build_model, train_rank};
use crate::ffi::NcclUniqueId;
use crate::{DdpContext, get_unique_id};
// Env keys the launcher sets on every spawned worker (torchrun-style: a worker
// detects its role by the presence of `XTRAIN_RANK`).
pub const ENV_RANK: &str = "XTRAIN_RANK";
pub const ENV_WORLD: &str = "XTRAIN_WORLD";
pub const ENV_LOCAL_RANK: &str = "XTRAIN_LOCAL_RANK";
pub const ENV_NCCL_ID: &str = "XTRAIN_NCCL_ID";
/// Hex-encode the 128-byte `ncclUniqueId` for env transport (128 B → 256 chars,
/// well under any env-var length limit). `c_char` is signed on this target, so
/// reinterpret the bytes as `u8` first.
pub fn hex_encode_unique_id(id: &NcclUniqueId) -> String {
let mut s = String::with_capacity(256);
for &b in &id.internal {
s.push_str(&format!("{:02x}", b as u8));
}
s
}
/// Inverse of [`hex_encode_unique_id`]: parse 256 hex chars back into the
/// 128-byte opaque blob. Panics on malformed input (the launcher always writes a
/// well-formed value, so a bad value means a corrupted env).
pub fn hex_decode_unique_id(hex: &str) -> NcclUniqueId {
assert_eq!(
hex.len(),
256,
"NCCL id hex must be 256 chars, got {}",
hex.len()
);
let mut id = NcclUniqueId::default();
for (i, slot) in id.internal.iter_mut().enumerate() {
let byte = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16).expect("NCCL id hex byte parse");
*slot = byte as std::os::raw::c_char;
}
id
}
/// Spawn `world` worker processes (re-exec of the current binary with the same
/// argv), each pinned to one GPU via `XTRAIN_LOCAL_RANK`, and wait for all of
/// them. The launcher mints the `ncclUniqueId` and injects it (hex) into every
/// worker's env, so the cross-process NCCL bootstrap needs no shared file/TCP.
///
/// Returns `Ok(())` iff every worker exits 0; otherwise an error naming the first
/// failing rank (so the caller — `main` / a test — can propagate a non-zero exit).
/// `extra_args` is forwarded to each worker verbatim (so all training hyper-params
/// pass straight through); the workers inherit the launcher's env (incl.
/// `CUDA_VISIBLE_DEVICES`) plus the four `XTRAIN_*` keys.
pub fn launch_processes(world: usize, extra_args: &[String]) -> Result<(), String> {
let exe = std::env::current_exe().map_err(|e| format!("current_exe: {e}"))?;
let id = get_unique_id();
let id_hex = hex_encode_unique_id(&id);
let mut children = Vec::with_capacity(world);
for rank in 0..world {
let child = Command::new(&exe)
.args(extra_args)
.env(ENV_RANK, rank.to_string())
.env(ENV_WORLD, world.to_string())
// Single node: local rank == global rank == device ordinal within the
// visible set. (Multi-node would split these; see docs/16 follow-up.)
.env(ENV_LOCAL_RANK, rank.to_string())
.env(ENV_NCCL_ID, &id_hex)
// Workers inherit stdout/stderr so rank 0's training log surfaces.
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.map_err(|e| format!("spawn worker rank {rank}: {e}"))?;
children.push((rank, child));
}
let mut first_err: Option<String> = None;
for (rank, mut child) in children {
let status = child
.wait()
.map_err(|e| format!("wait worker rank {rank}: {e}"))?;
if !status.success() && first_err.is_none() {
first_err = Some(format!("worker rank {rank} exited with {status}"));
}
}
match first_err {
Some(e) => Err(e),
None => Ok(()),
}
}
/// The four `XTRAIN_*` values a worker reads from its env. Present iff this
/// process was spawned by [`launch_processes`].
pub struct WorkerEnv {
pub rank: usize,
pub world: usize,
pub local_rank: u32,
pub id: NcclUniqueId,
}
/// Read the worker env if this process is a spawned worker (i.e. `XTRAIN_RANK`
/// is set), else `None` (this process is the launcher).
pub fn worker_env() -> Option<WorkerEnv> {
let rank: usize = std::env::var(ENV_RANK).ok()?.parse().ok()?;
let world: usize = std::env::var(ENV_WORLD)
.expect("XTRAIN_WORLD set with XTRAIN_RANK")
.parse()
.expect("XTRAIN_WORLD parse");
let local_rank: u32 = std::env::var(ENV_LOCAL_RANK)
.expect("XTRAIN_LOCAL_RANK set with XTRAIN_RANK")
.parse()
.expect("XTRAIN_LOCAL_RANK parse");
let id_hex = std::env::var(ENV_NCCL_ID).expect("XTRAIN_NCCL_ID set with XTRAIN_RANK");
let id = hex_decode_unique_id(&id_hex);
Some(WorkerEnv {
rank,
world,
local_rank,
id,
})
}
/// Per-worker model construction knobs (the opt-in feature flags the launcher
/// forwards). Mirrors the closure `train_ddp` passes to the thread-per-GPU
/// `launch`, but here it runs once in this worker's own process/context.
#[derive(Clone, Copy, Default)]
pub struct ModelOpts {
pub bf16: bool,
pub recompute: bool,
pub flash: bool,
}
/// Run this worker: bind its GPU (→ its own CUDA context), init NCCL with the
/// launcher-supplied id, build its model with the deterministic init (same as
/// every rank + the single-GPU baseline), and run `train_rank`. Reuses the T8
/// training step verbatim — the only difference from thread-per-GPU is how this
/// rank was started and how it got the `UniqueId`.
///
/// `valid` is the held-out corpus for rank 0's periodic eval (pass `None` on
/// other ranks or when `cfg.eval_every == 0`).
pub fn run_worker(
env: &WorkerEnv,
cfg: Config,
opts: ModelOpts,
corpus: &Corpus,
valid: Option<&Corpus>,
dcfg: &DdpConfig,
) -> DdpResult {
// Binding the device here establishes this process's own CUDA primary context.
let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank);
let device = Device::Cuda(env.local_rank);
let model = build_worker_model(cfg, opts, device);
let v = if env.rank == 0 { valid } else { None };
train_rank(&ctx, &model, device, corpus, v, dcfg)
}
/// Build the worker's model with the deterministic `build_model` init + the
/// opt-in feature flags. Shared by `run_worker` and the test worker.
pub fn build_worker_model(cfg: Config, opts: ModelOpts, device: Device) -> TinyTransformer {
let mut m = build_model(cfg, device);
if opts.bf16 {
m = m.with_compute_dtype(DType::BF16);
}
if opts.recompute {
m = m.with_recompute(true);
}
if opts.flash {
m = m.with_flash(true);
}
m
}
/// Convenience: the directory tests/bins can stash per-rank result dumps in
/// (a worker writes its loss/params there; the launching test reads them back).
pub fn rank_dump_path(dir: &std::path::Path, rank: usize) -> PathBuf {
dir.join(format!("rank{rank}.dump"))
}

View File

@@ -0,0 +1,280 @@
//! Process-per-GPU DDP acceptance (Phase T17). Gated to a GPU host; skips when
//! fewer than 2 GPUs. Run with `--test-threads=1` (distributed tests deadlock if
//! they contend for the same GPUs in parallel — known harness property).
//!
//! Self-launching: the test binary detects WORKER mode via `XTRAIN_RANK` (set by
//! `launch_processes`). In worker mode it runs `run_worker` on a synthetic corpus
//! and dumps its per-step loss trace + final params to a per-rank file; in normal
//! mode it is the launcher — it runs the single-GPU baseline, spawns N worker
//! processes (re-execing itself), reads their dumps back, and asserts:
//! (a) multi-process loss matches single-GPU within `<1e-3`,
//! (b) cross-rank params agree within `<1e-6` (KI-5 ULP tolerance),
//! (c) multi-process loss matches the thread-per-GPU `launch` path within `<1e-3`.
#![cfg(not(no_cuda))]
use std::io::Write;
use std::path::Path;
use xtrain_cuda::device;
use xtrain_distributed::proc::{launch_processes, rank_dump_path, worker_env};
use xtrain_distributed::{DdpConfig, DdpContext, build_model, train_rank};
use xtrain_model::{Config, batched_ids_tensor};
use xtrain_optim::GpuAdamW;
use xtrain_tensor::Device;
use xtrain_train::clip::clip_grad_norm_gpu;
use xtrain_train::data::Corpus;
use xtrain_train::schedule::LrSchedule;
// ── Shared fixture (identical on launcher + every worker, so they agree) ──────
const VOCAB: usize = 64;
const STEPS: usize = 20;
fn synth_corpus() -> Corpus {
let tokens: Vec<i32> = (0..4096)
.map(|i| (i * 7 + 3) as i32 % VOCAB as i32)
.collect();
Corpus {
tokens,
vocab_size: VOCAB,
}
}
fn test_config() -> Config {
let mut cfg = Config::tiny();
cfg.vocab = VOCAB;
cfg.n_layers = 2;
cfg
}
fn dcfg(batch_size: usize) -> DdpConfig {
DdpConfig {
seq_len: 32,
batch_size,
accum_steps: 1,
steps: STEPS,
schedule: LrSchedule {
max_lr: 3e-3,
min_lr: 3e-4,
warmup: 3,
total: STEPS,
},
weight_decay: 0.1,
max_grad_norm: 1.0,
log_every: 1_000_000,
seed: 7,
eval_every: 0,
eval_batches: 0,
ckpt_path: None,
}
}
// The dump dir is passed launcher→worker via this env key (separate from the
// XTRAIN_* keys the launcher sets); workers write `rank{N}.dump` there.
const ENV_DUMP_DIR: &str = "XTRAIN_TEST_DUMP_DIR";
const GLOBAL_BATCH: usize = 8;
// ── Worker entry: runs when this test binary is re-execed by launch_processes ─
fn run_as_worker_if_needed() {
let Some(env) = worker_env() else { return };
let dump_dir = std::env::var(ENV_DUMP_DIR).expect("dump dir env");
// This is the worker body `run_worker` performs in production (init ctx →
// build deterministic model → train_rank). We train ONCE inline so we can dump
// both the loss trace AND the final params for the launcher to check; the
// production `run_worker` wrapper is exercised by `bin/train_ddp_mp` on dash5.
let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank);
let device = Device::Cuda(env.local_rank);
let model = build_model(test_config(), device);
let res = train_rank(
&ctx,
&model,
device,
&synth_corpus(),
None,
&dcfg(GLOBAL_BATCH),
);
let params: Vec<Vec<f32>> = model
.params()
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
write_dump(&dump_dir, env.rank, &res.losses, &params);
std::process::exit(0);
}
fn write_dump(dir: &str, rank: usize, losses: &[f32], params: &[Vec<f32>]) {
let path = rank_dump_path(Path::new(dir), rank);
let mut f = std::fs::File::create(&path).expect("create dump");
// Line 1: losses (space-separated). Following lines: one param tensor each.
let loss_line: Vec<String> = losses.iter().map(|x| format!("{x:.8e}")).collect();
writeln!(f, "{}", loss_line.join(" ")).unwrap();
for p in params {
let line: Vec<String> = p.iter().map(|x| format!("{x:.8e}")).collect();
writeln!(f, "{}", line.join(" ")).unwrap();
}
}
fn read_dump(dir: &str, rank: usize) -> (Vec<f32>, Vec<Vec<f32>>) {
let path = rank_dump_path(Path::new(dir), rank);
let text = std::fs::read_to_string(&path).expect("read dump");
let mut lines = text.lines();
let losses: Vec<f32> = lines
.next()
.unwrap()
.split_whitespace()
.map(|s| s.parse().unwrap())
.collect();
let params: Vec<Vec<f32>> = lines
.map(|l| l.split_whitespace().map(|s| s.parse().unwrap()).collect())
.collect();
(losses, params)
}
// ── Single-GPU baseline (same loop as the DDP rank, world=1) ──────────────────
fn run_single_gpu(cfg: Config, corpus: &Corpus, d: &DdpConfig) -> (Vec<f32>, Vec<Vec<f32>>) {
device::set_device(0).unwrap();
let device = Device::Cuda(0);
let model = build_model(cfg, device);
let params = model.params();
let mut opt = GpuAdamW::new(d.weight_decay);
let mut rng = d.seed;
let mut losses = Vec::new();
for step in 0..d.steps {
let lr = d.schedule.lr(step);
let mut inputs = Vec::with_capacity(d.batch_size);
let mut targets_v = Vec::with_capacity(d.batch_size);
for _ in 0..d.batch_size {
let (input, target) = corpus.sample(d.seq_len, &mut rng);
inputs.push(input);
targets_v.push(target);
}
let ids = batched_ids_tensor(&inputs, device);
let targets = batched_ids_tensor(&targets_v, device);
let loss = model.loss_batched(&ids, &targets, d.batch_size);
losses.push(loss.value().to_device(Device::Cpu).as_slice::<f32>()[0]);
loss.backward();
clip_grad_norm_gpu(&params, d.max_grad_norm, 1.0);
opt.step(lr, &params);
for p in &params {
p.zero_grad();
}
}
let host = params
.iter()
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
.collect();
(losses, host)
}
// ── The test (launcher mode) ──────────────────────────────────────────────────
#[test]
fn proc_per_gpu_matches_single_gpu_and_thread_path() {
// If this process was spawned as a worker, do the worker job and exit before
// the test framework runs anything else.
run_as_worker_if_needed();
let world = 2usize;
if device::device_count().unwrap_or(0) < world as i32 {
eprintln!("skip: need >= {world} GPUs");
return;
}
let cfg = test_config();
let corpus = synth_corpus();
let d = dcfg(GLOBAL_BATCH);
// (1) Single-GPU baseline over the global batch.
let (single_losses, single_params) = run_single_gpu(cfg, &corpus, &d);
// (2) Thread-per-GPU path (T8 `launch`) — the regression baseline to match.
let thread_results =
xtrain_distributed::launch(&[0u32, 1u32], &corpus, None, &d, move |device| {
build_model(cfg, device)
});
let thread_losses = &thread_results[0].losses;
// (3) Process-per-GPU: spawn 2 worker processes (re-exec of this test binary),
// each dumps its loss trace + final params to a temp dir.
let dump_dir = std::env::temp_dir().join(format!("xtrain_t17_{}", std::process::id()));
std::fs::create_dir_all(&dump_dir).unwrap();
// SAFETY: single-threaded test (forced by --test-threads=1) sets this env
// before spawning workers; no concurrent env access.
unsafe {
std::env::set_var(ENV_DUMP_DIR, &dump_dir);
}
// Re-exec the test binary but run ONLY this test, single-threaded, so the
// worker process does the worker job and exits without touching other tests.
let worker_args = [
"--exact".to_string(),
"proc_per_gpu_matches_single_gpu_and_thread_path".to_string(),
"--test-threads=1".to_string(),
"--nocapture".to_string(),
];
launch_processes(world, &worker_args).expect("worker processes failed");
let (proc_losses0, proc_p0) = read_dump(dump_dir.to_str().unwrap(), 0);
let (_proc_losses1, proc_p1) = read_dump(dump_dir.to_str().unwrap(), 1);
// (a) process-per-GPU loss matches single-GPU.
let max_rel_single = max_rel(&single_losses, &proc_losses0);
println!(
"proc-per-GPU vs single-GPU loss: single[last]={:.6} proc[last]={:.6} max_rel={max_rel_single:.2e}",
single_losses.last().unwrap(),
proc_losses0.last().unwrap()
);
assert!(
max_rel_single < 1e-3,
"proc-per-GPU loss diverged from single-GPU: {max_rel_single:.3e}"
);
// (c) process-per-GPU loss matches the thread-per-GPU path.
let max_rel_thread = max_rel(thread_losses, &proc_losses0);
println!(
"proc-per-GPU vs thread-per-GPU loss: thread[last]={:.6} proc[last]={:.6} max_rel={max_rel_thread:.2e}",
thread_losses.last().unwrap(),
proc_losses0.last().unwrap()
);
assert!(
max_rel_thread < 1e-3,
"proc-per-GPU loss diverged from thread-per-GPU: {max_rel_thread:.3e}"
);
// (b) cross-rank parameter agreement (KI-5 ULP tolerance).
let mut max_pdiff = 0.0f32;
for (a, b) in proc_p0.iter().zip(&proc_p1) {
for (x, y) in a.iter().zip(b) {
max_pdiff = max_pdiff.max((x - y).abs());
}
}
println!("proc-per-GPU cross-rank max |param diff| = {max_pdiff:.3e}");
assert!(
max_pdiff < 1e-6,
"ranks' params drifted apart: {max_pdiff:.3e}"
);
// Bonus sanity: proc-per-GPU final params vs single-GPU within fp tolerance.
let mut max_sdiff = 0.0f32;
for (a, b) in proc_p0.iter().zip(&single_params) {
for (x, y) in a.iter().zip(b) {
max_sdiff = max_sdiff.max((x - y).abs() / y.abs().max(1e-6));
}
}
println!("proc-per-GPU vs single-GPU max rel |param diff| = {max_sdiff:.3e}");
assert!(
max_sdiff < 1e-2,
"proc-per-GPU params diverged from single-GPU"
);
let _ = std::fs::remove_dir_all(&dump_dir);
}
fn max_rel(a: &[f32], b: &[f32]) -> f32 {
a.iter()
.zip(b)
.map(|(s, d)| (s - d).abs() / s.abs().max(1e-6))
.fold(0.0f32, f32::max)
}

217
docs/16-process-per-gpu.md Normal file
View File

@@ -0,0 +1,217 @@
# Phase T17: Process-per-GPU DDPtorchrun 式独立 CUDA context— Design Document
## Goal
T8 的 DDP 是**单进程 thread-per-GPU**:一个进程开 N 个 OS 线程,每线程 `cudaSetDevice` 绑一张卡、
在**同一个 CUDA primary context** 里跑自己 rank 的训练。T11 修掉 per-op `cudaMalloc` 串行后8 卡
scaling 从 ~1.3× 恢复到 **~5×@8**,但**残留 5×@8 而非 ~8× 的非线性**——根因在 T11 doc / KI-5 已点明:
**N 个 rank 线程共享同一个 CUDA contextdriver 层很多调用kernel launch、cuBLAS handle、stream
排队)在单 context 内进程级串行**pool allocator 只消掉了其中最大的一笔malloc剩下的 launch /
cuBLAS 串行仍在。
T17 的目标 = **torchrun 式 process-per-GPU**:每个 rank 是一个**独立 OS 进程**,各自持有**独立的
CUDA context**,彼此的 driver 调用不再在同一 context 排队 → 移除 thread-per-GPU 的残留串行,把 8 卡
scaling 推向更接近线性。这是 Phase 2 里**改动最大**的一项launcher 结构性重写 + 跨进程 NCCL
bootstrap所以本 doc 先行。
**Scope用户已拍板process-per-GPU ONLY。ZeRO-1 / sharded optimizer 明确 drop**——本尺度
optimizer state 小、收益薄。本任务只换**启动模型与 NCCL bootstrap****训练 stepgrad all-reduce →
本地 AdamW原样复用、零改动**。
**保留 thread-per-GPU 路径**T8 的 `launch()` + `train_ddp` bin 不删(回归保护 + 闸门 ①要求新旧路径
loss 对齐。process-per-GPU 作为**并列的新 launcher** 加上去。
验收(硬闸门全绿,诚实正确性,不放宽容差):
1. 多进程world=2 / world=4训练 loss **对单卡贴合**(进既有 DDP 容差 `<1e-3`),且对住旧 thread-per-GPU 路径;
2. 跨 rank 参数一致repo 既有 `<1e-6` 约定);
3. **8 卡线性度 before→after 实测**thread-per-GPU baseline~5×@8vs process-per-GPU @ {1,2,4,8},给数字;
4. 全回归套绿(含 xserv 闭环 md5 / token-identical单卡与旧 thread-per-GPU 路径不回归。
## 什么变、什么不变
```
thread-per-GPU (T8, 保留) process-per-GPU (T17, 新增)
启动 1 进程 × N 线程 1 launcher 进程 → fork/exec N 个 worker 进程
CUDA context N 线程共享 1 个 primary context 每 worker 进程 1 个独立 context
rank/world/device 闭包捕获 + thread::scope env: RANK / WORLD_SIZE / LOCAL_RANK
模型构建 每线程闭包内 build_model!Send 每进程 main 内 build_model天然隔离
NCCL UniqueId 分发 move 一个 Copy struct 进线程闭包 launcher 生成 → hex 编码进子进程 env
NCCL comm init DdpContext::init不变 DdpContext::init不变
─────────────────────────────────────────────────────────────────────────────────────
grad all-reduce all_reduce_average_grads不变 ← 同一份代码,零改动
本地 AdamW step train_rank不变 ← 同一份代码,零改动
batch sharding i % world == rank不变 ← 同一份代码,零改动
参数一致性证明 同 init+同 grad+同 opt不变 ← 同一论证
```
**核心洞察**T8 早把训练 step 写成「**per-rank**、接受 `&DdpContext`」的形状(`train_rank`)。
thread-per-GPU 与 process-per-GPU **唯一的区别只在「怎么把 rank 跑起来 + 怎么把 UniqueId 递给每个
rank」**——前者跨线程 move后者跨进程 env。`train_rank` / `all_reduce_average_grads` / sharding /
一致性论证**全部原样复用**。这正是把启动模型与训练逻辑解耦的回报。
## Module Layout
```
crates/xtrain-distributed/src/
├── lib.rs ← 加 pub mod proc; re-export hex_encode/decode_unique_id + run_worker entry
├── proc.rs ← 新增:① launcherspawn N worker 进程env 注入 rank/world/local_rank/uid
│ ② worker entry读 env → DdpContext::init → build_model → train_rank
│ ③ UniqueId hex 编解码(跨进程 env 传 128 字节)
├── ddp.rs ← 不变train_rank / build_model / DdpConfig 复用)
├── lib.rs::DdpContext / all_reduce_average_grads / get_unique_id ← 不变
└── bin/
├── train_ddp.rs ← 不变thread-per-GPU保留
└── train_ddp_mp.rs ← 新增multi-process launcher / worker 二合一入口
crates/xtrain-distributed/tests/
└── ddp_proc.rs ← 新增spawn 多进程跑几步 → loss 对单卡 + 跨 rank 参数一致 +顺手before/after 吞吐
docs/16-process-per-gpu.md ← 本文
```
`proc.rs` 全程 `#[cfg(not(no_cuda))]` 门控(同 crate 既有约定);本地无 nvcc 时 crate 编空,`cargo
check`dash5 上全量编译链 NCCL。
## Key Design Decisions
### ① Launch model同一 binary 双模launcher / worker
`train_ddp_mp` 一个可执行文件,靠**环境变量是否存在**自判角色torchrun 的 `LOCAL_RANK` 注入 worker
是同一思路):
- **launcher 模式**(直接被用户 / 测试调用env 里没有 `XTRAIN_RANK`
1.`CUDA_VISIBLE_DEVICES` / `device_count()``world`
2.`get_unique_id()` 生成一个 `ncclUniqueId`128 字节),**hex 编码**成字符串;
3. `for rank in 0..world``Command::new(current_exe())`**复制自己全部 argv**(超参/路径透传),
额外设 env `XTRAIN_RANK=rank``XTRAIN_WORLD=world``XTRAIN_LOCAL_RANK=rank`
`XTRAIN_NCCL_ID=<hex>`spawn 为子进程;
4. `wait()` 所有子进程,任一非零退出码 → launcher 以非零退出CI / 闸门可感知)。
- **worker 模式**(被 launcher spawnenv 里有 `XTRAIN_RANK`
1. 从 env 读 `rank / world / local_rank / uid_hex`
2. `device::set_device(local_rank)` 绑卡(**每进程独立 primary context** 在此首次 CUDA 调用时建立);
3. hex 解码出 `NcclUniqueId``DdpContext::init(rank, world, id, local_rank)`**复用 T8 的 init**
4. `build_model(cfg, device)`**复用 T8 的确定性 init** → 同种子 → 跨进程逐位同起点);
5. `train_rank(&ctx, &model, …, &cfg)`**复用 T8 的训练 step零改动**
6. 退出码 0成功/ 非零panic → 进程崩launcher 感知)。
**单机 `CUDA_VISIBLE_DEVICES` 处理**launcher 看到的 visible 设备集就是 `0..world`;每个 worker
继承同一个 `CUDA_VISIBLE_DEVICES`env 默认透传),`local_rank` 直接当作 visible 集内的 device
ordinal → `set_device(local_rank)`。这与 thread-per-GPU 的 `devices = 0..count` 语义一致,单节点足够。
(真·多节点要把 `LOCAL_RANK` 与全局 `RANK` 分离 + 每节点 `CUDA_VISIBLE_DEVICES` 切片,单节点不需要,
记为 follow-up。
### ② 跨进程 NCCL UniqueId 分发launcher 生成 + hex-env 注入(**最简、无竞态**
这是 T17 最该想清楚的一处。候选机制(任务列了文件 / TCP / 共享 FS逐一权衡
| 机制 | 怎么做 | 单节点取舍 |
|---|---|---|
| **共享文件** | rank0-worker 写 `/tmp/xtrain.id`,其余 worker **轮询读** | 要处理「文件还没写好」的 race轮询 + 重试 + 超时),还要 worker 间约定谁是 rank0、何时清理 |
| **TCP rendezvous** | 起一个 c10d-store 式小 server 派发 id | 最贴 torchrun但要写 socket server/client、端口选择、握手协议——单节点 overkill |
| **launcher 生成 + env 注入** ✅ | **launcher**(而非 rank0-worker`ncclGetUniqueId`hex 编码后**在 spawn 时就写进每个子进程的 env** | 无文件 race、无轮询、无 TCP server、无清理——env 在子进程出生前就备好。子进程读 env 即得 id |
**选 env 注入**,诚实理由:单节点下 launcher 是所有 worker 的**共同父进程**env 是父→子最朴素的带外
通道,且**在子进程创建那一刻就原子地确定**——天然没有「id 还没就绪」的竞态,比文件轮询 / TCP 握手都
简单且更鲁棒。代价是 launcher 进程要链 NCCL 调 `ncclGetUniqueId`(它本就在 distributed crate 里,已链
NCCL可接受。
> **与「rank 0 生成」的关系**torchrun 概念上是 rank 0 把 id 放进 c10d store、别的 rank 取。这里
> **launcher 充当协调者**替 rank 0 生成——功能等价id 只是个一次性握手 token谁生成不影响正确性
> 只要全 rank 拿到同一个但单节点下省掉了「worker 间再来一轮带外同步」。**128 字节 → hex = 256
> 字符**远低于环境变量长度上限env 传输安全。
`hex_encode`/`decode_unique_id``proc.rs` 里两个纯函数(`[c_char;128] ↔ 256-char hex`),单测可在
host 侧验 roundtrip不需 GPU
### ③ 独立 CUDA context = 移除残留串行(这才是 T17 的 payoff
thread-per-GPU 的残留非线性KI-5 / T11 doc来自**N rank 线程共享同一 CUDA primary context**driver
对该 context 的很多操作kernel launch 队列、cuBLAS handle、内部锁是进程级 / context 级串行的——
pool allocator 消掉了 malloc 这一最大笔,但 launch / cuBLAS 串行仍在,表现为 8 卡 ~5× 而非 ~8×
process-per-GPU 下**每个 rank 是独立进程 → 独立 CUDA context → 独立 driver 状态**:各进程的 kernel
launch / cuBLAS 调用**互不在同一 context 排队**,残留串行被结构性移除。这正是闸门 ③before→after 线性度)
要量出来的东西——若 process-per-GPU 把 8 卡从 ~5× 推到明显更高,即验证了根因诊断。**诚实原则**:若提升
有限,如实报告(说明残留瓶颈转移到 NCCL all-reduce / PCIe 拓扑,那是另一层,非本任务 scope
### ④ 训练 step / 一致性论证:原样复用 T8零改动
process-per-GPU 不碰任何训练数学:
- **grad all-reduce**`all_reduce_average_grads(params)` 一字不改——NCCL collective 跨**进程**和跨
**线程**对调用方完全一样comm 是 rank 维度的,与进程/线程无关)。
- **batch sharding**`i % world == rank` 不变——每进程推进**同一个 seed 的 RNG**抽出整批 `B_global`
序列、只算自己那片。各进程的并集 == 单卡同序批 → all-reduce 后的 grad 和与单卡逐序列一致。
- **参数一致性**:同 ③个充分条件T8 doc ④)——(a) 同确定性 `build_model`(同 LCG 种子,跨进程同样
成立);(b) NCCL all-reduce 跨 rank 返回逐位相同的归约PCIe-only run-to-run 几 ULP 抖动,故闸门
②用 `<1e-6` 而非 `==0`,与 T11 既有约定一致);(c) 同 optimizer 超参/状态演化。
- **对单卡**:与单卡只在 **fp 求和顺序**上差(单卡 tape SUM B 个DDP 各 rank 先 SUM 分片再 NCCL SUM
`<1e-3` rel不逐位。与 thread-per-GPU 路径则应**数值同量级**(同一 sharding + 同一 all-reduce
### ⑤ 进程生命周期 / 失败传播 / 资源清理
- **失败传播**worker panic → 进程非零退出launcher `wait()` 收集所有退出码,任一非零 → launcher
非零退出并打印哪个 rank 挂了。NCCL comm 在进程退出时由 OS 回收 context`DdpContext::Drop`
`ncclCommDestroy`,正常退出路径走到;崩溃时 OS 兜底回收)。
- **不需要跨进程 barrier**:每个 worker 独立跑完 `cfg.steps` 自然退出NCCL collective 本身是同步点
(所有 rank 必须到齐才返回),训练循环天然对齐。
- **资源清理**无临时文件env 注入,无 `/tmp` id 文件ckpt 由 rank0-worker 写到 `--ckpt` 指定路径,
与 thread-per-GPU 一致;测试用的 ckpt / 进程在测试结束清理。
## 验证方法硬闸门全绿dash5 实跑)
### 闸门 ①②:正确性 —— `tests/ddp_proc.rs``#[cfg(not(no_cuda))]`<2 卡 skip
测试本身是 launcher`Command` spawn N 个 worker 进程worker = 同测试 binary 的一个特殊模式,或复用
`train_ddp_mp`跑固定步数worker 把最终 loss / 参数 dump 到各自的 stdout / 临时文件,测试父进程读回:
- **(a) loss 对单卡**:单卡 baseline既有 `run_single_gpu`vs 2-进程 / 4-进程 DDP整条 loss 轨迹
`max_rel < 1e-3`(与 thread-per-GPU 测试同容差)。
- **(b) 跨 rank 参数一致**`max|p_i - p_j| < 1e-6`KI-5 既有约定)。
- **(c) 对住 thread-per-GPU 路径**:同 config 同 seedprocess-per-GPU 的 loss 轨迹 vs thread-per-GPU
的 loss 轨迹应在 `<1e-3`(两者只差进程/线程sharding+all-reduce 同)。
> **harness 注意**:分布式测试在共享 GPU 上并行会争卡 deadlock → 一律 `--test-threads=1`(已知 harness
> 属性capstone/known-issues 记过)。
### 闸门 ③:线性度 before→after —— `train_ddp`(thread) vs `train_ddp_mp`(process) @ {1,2,4,8}
固定**每卡 batch 32 / seq 256 / dim384**(与 T11 KI-5 表同口径,便于直接对比),各跑 steady-state tok/s
```
thread-per-GPU (T11 baseline) process-per-GPU (T17)
world tok/s(global) speedup tok/s(global) speedup
1 ~92K 1.00× ? 1.00×
2 ~147K 1.59× ? ?
4 ~270K 2.92× ? ?
8 ~461K 4.99× ← 残留非线性 ? ? ← 目标更接近 8×
```
8 卡跑时 `nvidia-smi` 抽样确认 8 卡 util。**资源纪律**:线性度 bench 合法地短用 8 卡,但**短跑**(每个
world 几十~一两百步够测 steady-state跑完清 ckpt / 中间物。
### 闸门 ④:全回归套(标准 `--test-threads=1`
autograd / structural / batched / bf16 / recompute / overfit / AdamW / 既有 DDP loss-match + 跨 rank /
flash / gqa / grad_accum / dropout**+ xserv 闭环**(导出 → md5 对 registry → token-identical。单卡与
旧 thread-per-GPU 路径不得回归process-per-GPU 是**新增**路径,旧路径代码未动 → 天然不回归,测试确认)。
### dash5 实跑
```bash
export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
# 正确性(多进程):
CUDA_VISIBLE_DEVICES=0,1 cargo test -p xtrain-distributed --release --test ddp_proc -- --nocapture --test-threads=1
# 多进程训练 / 线性度 driverprocess-per-GPU launcher
CUDA_VISIBLE_DEVICES=0,1,2,3 cargo run -p xtrain-distributed --release --bin train_ddp_mp -- \
/opt/wjh/models/gpt2/tokenizer.json data/tinystories-valid-3mb.txt \
--dim 384 --heads 12 --head-dim 32 --layers 12 --ffn 1536 --steps 200 --batch 128 --seq 256
```
实测数字回填见 xtrain.md T17 note / commit。
## 不做(本任务范围外,记 follow-up
- **ZeRO-1 / sharded optimizer**:用户已 drop本尺度 optimizer state 小、收益薄)。
- **真·多节点 bootstrap**本任务单节点env 注入足够);跨节点要 TCP rendezvousc10d-store 式)+
`LOCAL_RANK`/`RANK` 分离 + 每节点 `CUDA_VISIBLE_DEVICES` 切片 → 留 follow-up。
- **NCCL 通信压缩 / overlap with backward**:与 T8/T11 同理由all-reduce 当前非主瓶颈。
- **删除 thread-per-GPU 路径**:保留(回归 baseline + 闸门 ①要求对齐)。