Compare commits
5 Commits
2ff4573a31
...
71b0a1621f
| Author | SHA1 | Date | |
|---|---|---|---|
| 71b0a1621f | |||
| 4abb17383a | |||
| a188c8a277 | |||
| ffd548b80b | |||
| c470c627a7 |
13
README.md
13
README.md
@@ -26,7 +26,7 @@ borrows, the rest hand-written CUDA + Rust:
|
||||
| `xtrain-model` | tiny **Qwen3-style** transformer (RoPE + RMSNorm + QK-norm + SwiGLU), batched forward |
|
||||
| `xtrain-optim` | hand-written **AdamW** (host + GPU kernels) |
|
||||
| `xtrain-train` | training loop, LR schedule, grad clip, checkpoint, BPE corpus + cache, samplers, safetensors export |
|
||||
| `xtrain-distributed` | **NCCL DDP** (thread-per-GPU, all-reduce) |
|
||||
| `xtrain-distributed` | **NCCL DDP** (thread-per-GPU + torchrun-style process-per-GPU, all-reduce) |
|
||||
|
||||
Every op's backward is verified against **finite differences** and against **PyTorch**
|
||||
(forward + per-parameter grads, batch > 1). Trained weights export to HF-safetensors and
|
||||
@@ -53,6 +53,7 @@ Each phase: design doc + implementation + tests + a scoped commit (see [`docs/`]
|
||||
| **T14** | **fused flash-attention** kernel (online softmax, no materialized N×N; opt-in `--flash`) | peak mem −16%@1k / −23%@2k seq; flash==composed (grads/PyTorch) |
|
||||
| **T15** | **grouped-query attention** (`num_kv_heads<num_heads`; `repeat_kv` broadcast feeds both SDPA paths; backward sums each kv head's group; `--kv-heads`) | repeat_kv grad-check + **group=1 bit-identical to MHA**; GQA flash==composed; PyTorch GQA B>1; **xserv closed loop with real `num_key_value_heads`** token-identical |
|
||||
| **T16** | **gradient accumulation** (`--accum-steps`; DDP all-reduces only at the boundary) | equiv to N× big batch (grad 3.8e-5); same effective-64 batch 27.7GB→7.2GB (−74%) |
|
||||
| **T17** | **process-per-GPU** DDP (torchrun-style: 1 worker process / CUDA context per GPU; launcher mints `ncclUniqueId` → hex env injection; `train_rank` reused unchanged; thread-per-GPU path kept) | proc==thread loss 1.5e-7, cross-rank 1.2e-7, xserv md5 identical · **measured no-op on throughput**: thread 5.27× vs proc 5.31×@8 (8 GPUs 95–99% util) → residual non-linearity is NCCL/PCIe, *not* CUDA-context serialization (falsifies the old KI-5 hypothesis) |
|
||||
| **T18** | **dropout** (hand counter-based device RNG + mask, inverted scaling, train/eval switch) | fixed-seed grad-check; **p=0 bit-identical**; recompute-safe |
|
||||
|
||||
The four performance fixes (T10–T13) each removed a real bottleneck — see
|
||||
@@ -64,8 +65,14 @@ num_heads` via a `repeat_kv` broadcast op whose backward sums each kv head's que
|
||||
group — feeding both SDPA paths unchanged, default MHA bit-identical);
|
||||
T16 = micro-batch gradient accumulation ([`docs/15-grad-accum.md`](docs/15-grad-accum.md)),
|
||||
which decouples the effective batch from activation memory (memory tracks the micro-batch,
|
||||
not N×); T18 = dropout ([`docs/17-dropout.md`](docs/17-dropout.md), hand counter-based
|
||||
device RNG + mask, inverted scaling, train/eval switch).
|
||||
not N×); T17 = torchrun-style process-per-GPU DDP
|
||||
([`docs/16-process-per-gpu.md`](docs/16-process-per-gpu.md), one process + CUDA context per
|
||||
GPU, launcher-minted `ncclUniqueId` via env injection, reusing the T8 training step
|
||||
unchanged) — which **measured** that, at this scale, separate contexts give no throughput
|
||||
gain over thread-per-GPU (the residual ~5.3×@8 is the NCCL/PCIe communication wall, not
|
||||
single-context serialization as the old KI-5 note speculated); T18 = dropout
|
||||
([`docs/17-dropout.md`](docs/17-dropout.md), hand counter-based device RNG + mask, inverted
|
||||
scaling, train/eval switch).
|
||||
|
||||
## The scaling study — v0 → v8
|
||||
|
||||
|
||||
203
crates/xtrain-distributed/src/bin/train_ddp_mp.rs
Normal file
203
crates/xtrain-distributed/src/bin/train_ddp_mp.rs
Normal file
@@ -0,0 +1,203 @@
|
||||
//! Process-per-GPU DDP launcher / worker (Phase T17, torchrun-style).
|
||||
//!
|
||||
//! ONE binary, two modes (it self-detects via `XTRAIN_RANK`):
|
||||
//! - **launcher** (env unset): mints the NCCL `ncclUniqueId`, then spawns one
|
||||
//! WORKER process per visible GPU, re-execing this same binary with the same
|
||||
//! argv plus `XTRAIN_{RANK,WORLD,LOCAL_RANK,NCCL_ID}` env, and waits for them.
|
||||
//! - **worker** (`XTRAIN_RANK` set): binds its GPU (→ its own CUDA context),
|
||||
//! inits NCCL with the launcher-supplied id, builds its model, runs
|
||||
//! `train_rank` — the T8 training step reused UNCHANGED.
|
||||
//!
|
||||
//! Versus `train_ddp` (thread-per-GPU, kept as the regression baseline) the ONLY
|
||||
//! difference is the launch model + cross-process UniqueId bootstrap. CLI flags
|
||||
//! are identical, so it doubles as the before→after throughput driver.
|
||||
//!
|
||||
//! Run on dash5 (pick idle GPUs — dash5 is shared):
|
||||
//! export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
|
||||
//! CUDA_VISIBLE_DEVICES=0,1,2,3 cargo run -p xtrain-distributed --release \
|
||||
//! --bin train_ddp_mp -- /opt/wjh/models/gpt2/tokenizer.json \
|
||||
//! data/tinystories-valid-3mb.txt \
|
||||
//! --dim 384 --heads 12 --head-dim 32 --layers 12 --ffn 1536 \
|
||||
//! --steps 200 --batch 128 --seq 256
|
||||
|
||||
#[cfg(no_cuda)]
|
||||
fn main() {
|
||||
eprintln!("train_ddp_mp: built without CUDA (no_cuda); run on a GPU host (dash5).");
|
||||
}
|
||||
|
||||
#[cfg(not(no_cuda))]
|
||||
use std::path::PathBuf;
|
||||
|
||||
// A flag like `--dim 384`: scan argv for `name`, parse the following token.
|
||||
#[cfg(not(no_cuda))]
|
||||
fn flag<T: std::str::FromStr>(args: &[String], name: &str, default: T) -> T {
|
||||
args.iter()
|
||||
.position(|a| a == name)
|
||||
.and_then(|i| args.get(i + 1))
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(default)
|
||||
}
|
||||
|
||||
#[cfg(not(no_cuda))]
|
||||
fn main() {
|
||||
use xtrain_cuda::device;
|
||||
use xtrain_distributed::DdpConfig;
|
||||
use xtrain_distributed::proc::{ModelOpts, launch_processes, run_worker, worker_env};
|
||||
use xtrain_model::Config;
|
||||
use xtrain_train::data::Corpus;
|
||||
use xtrain_train::schedule::LrSchedule;
|
||||
|
||||
let args: Vec<String> = std::env::args().collect();
|
||||
|
||||
// ── Launcher mode: no XTRAIN_RANK in env → spawn one worker per visible GPU.
|
||||
let env = worker_env();
|
||||
if env.is_none() {
|
||||
let count = device::device_count().expect("device_count");
|
||||
assert!(count > 0, "no CUDA device visible");
|
||||
let world = count as usize;
|
||||
// Forward the full argv (minus argv[0]) to each worker verbatim.
|
||||
let extra: Vec<String> = args[1..].to_vec();
|
||||
println!("DDP (process-per-GPU): launching {world} worker processes (one per visible GPU)");
|
||||
match launch_processes(world, &extra) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
eprintln!("launcher: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
let env = env.unwrap();
|
||||
|
||||
// ── Worker mode: build config from the forwarded argv, then train this rank.
|
||||
// First two non-flag positionals: tokenizer.json, corpus.txt.
|
||||
let positionals: Vec<&String> = args[1..].iter().filter(|a| !a.starts_with("--")).collect();
|
||||
let tok_path = positionals
|
||||
.first()
|
||||
.map(|s| PathBuf::from(s.as_str()))
|
||||
.unwrap_or_else(|| PathBuf::from("/opt/wjh/models/gpt2/tokenizer.json"));
|
||||
let corpus_path = positionals
|
||||
.get(1)
|
||||
.map(|s| PathBuf::from(s.as_str()))
|
||||
.unwrap_or_else(|| PathBuf::from("data/tinystories-valid-3mb.txt"));
|
||||
|
||||
// Architecture (scaling-ladder rung). Defaults = v0-baseline tiny config.
|
||||
let n_heads = flag(&args, "--heads", 2usize);
|
||||
let head_dim = flag(&args, "--head-dim", 16usize);
|
||||
let n_layers = flag(&args, "--layers", 4usize);
|
||||
let ffn = flag(&args, "--ffn", 64usize);
|
||||
let kv_heads = flag(&args, "--kv-heads", n_heads);
|
||||
let dim_flag = flag(&args, "--dim", 0usize);
|
||||
if dim_flag != 0 && dim_flag != n_heads * head_dim {
|
||||
eprintln!(
|
||||
"warning: --dim {dim_flag} != heads*head_dim {}; using {}",
|
||||
n_heads * head_dim,
|
||||
n_heads * head_dim
|
||||
);
|
||||
}
|
||||
|
||||
// Optimization knobs (mirror train_ddp).
|
||||
let steps: usize = flag(&args, "--steps", 100);
|
||||
let batch: usize = flag(&args, "--batch", 16);
|
||||
let accum_steps: usize = flag(&args, "--accum-steps", 1).max(1);
|
||||
let seq_len: usize = flag(&args, "--seq", 64);
|
||||
let max_lr: f32 = flag(&args, "--max-lr", 3e-3);
|
||||
let min_lr: f32 = flag(&args, "--min-lr", max_lr * 0.1);
|
||||
let weight_decay: f32 = flag(&args, "--wd", 0.1);
|
||||
let max_grad_norm: f32 = flag(&args, "--clip", 1.0);
|
||||
let val_tokens: usize = flag(&args, "--val-tokens", 0);
|
||||
let eval_every: usize = flag(&args, "--eval-every", 0);
|
||||
let eval_batches: usize = flag(&args, "--eval-batches", 64);
|
||||
let opts = ModelOpts {
|
||||
bf16: args.iter().any(|a| a == "--bf16"),
|
||||
recompute: args.iter().any(|a| a == "--recompute"),
|
||||
flash: args.iter().any(|a| a == "--flash"),
|
||||
};
|
||||
let ckpt: Option<PathBuf> = args
|
||||
.iter()
|
||||
.position(|a| a == "--ckpt")
|
||||
.and_then(|i| args.get(i + 1))
|
||||
.map(PathBuf::from);
|
||||
|
||||
assert_eq!(
|
||||
batch % env.world,
|
||||
0,
|
||||
"global batch {batch} not divisible by world {}",
|
||||
env.world
|
||||
);
|
||||
|
||||
// Each worker loads the corpus independently (read-only u16 cache hit → cheap).
|
||||
let corpus = Corpus::load_cached(&tok_path, &corpus_path);
|
||||
let vocab = corpus.vocab_size;
|
||||
let (train_corpus, valid): (Corpus, Option<Corpus>) = if val_tokens > 0 {
|
||||
let (t, v) = corpus.split_tail(val_tokens);
|
||||
(t, Some(v))
|
||||
} else {
|
||||
(corpus, None)
|
||||
};
|
||||
|
||||
let cfg = Config::from_arch(vocab, n_heads, head_dim, n_layers, ffn).with_kv_heads(kv_heads);
|
||||
|
||||
if env.rank == 0 {
|
||||
println!(
|
||||
"model: dim {} layers {} heads {} kv_heads {} head_dim {} ffn {} → core {:.3}M params \
|
||||
(+ embed/lm {:.2}M = {:.2}M total) | world={} mode=process-per-GPU",
|
||||
cfg.dim,
|
||||
cfg.n_layers,
|
||||
cfg.n_heads,
|
||||
cfg.num_kv_heads,
|
||||
cfg.head_dim,
|
||||
cfg.ffn_hidden,
|
||||
cfg.core_params() as f32 / 1e6,
|
||||
(cfg.num_params() - cfg.core_params()) as f32 / 1e6,
|
||||
cfg.num_params() as f32 / 1e6,
|
||||
env.world,
|
||||
);
|
||||
if opts.bf16 {
|
||||
println!("bf16 mixed precision: ON (fp32 master weights)");
|
||||
}
|
||||
if opts.recompute {
|
||||
println!("activation recompute: ON (per-block gradient checkpointing)");
|
||||
}
|
||||
if opts.flash {
|
||||
println!("flash-attention: ON (fused SDPA kernel, no materialized scores)");
|
||||
}
|
||||
}
|
||||
|
||||
let dcfg = DdpConfig {
|
||||
seq_len,
|
||||
batch_size: batch,
|
||||
accum_steps,
|
||||
steps,
|
||||
schedule: LrSchedule {
|
||||
max_lr,
|
||||
min_lr,
|
||||
warmup: (steps / 20).max(5),
|
||||
total: steps,
|
||||
},
|
||||
weight_decay,
|
||||
max_grad_norm,
|
||||
log_every: 50,
|
||||
seed: 42,
|
||||
eval_every,
|
||||
eval_batches,
|
||||
ckpt_path: ckpt.clone(),
|
||||
};
|
||||
|
||||
let res = run_worker(&env, cfg, opts, &train_corpus, valid.as_ref(), &dcfg);
|
||||
|
||||
if env.rank == 0 {
|
||||
let start = res.losses.first().copied().unwrap_or(0.0);
|
||||
let end = res.losses.last().copied().unwrap_or(0.0);
|
||||
println!("train loss: start {start:.4} → end {end:.4}");
|
||||
if let Some(best) = res.best_val {
|
||||
println!("best val loss: {best:.4}");
|
||||
}
|
||||
if let Some((s, v)) = res.evals.last() {
|
||||
println!("final val loss (step {s}): {v:.4}");
|
||||
}
|
||||
if let Some(path) = &ckpt {
|
||||
println!("best-val checkpoint → {}", path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,8 +18,13 @@
|
||||
|
||||
pub mod ddp;
|
||||
pub mod ffi;
|
||||
pub mod proc;
|
||||
|
||||
pub use ddp::{DdpConfig, DdpResult, build_model, launch, train_rank};
|
||||
pub use proc::{
|
||||
ModelOpts, WorkerEnv, build_worker_model, hex_decode_unique_id, hex_encode_unique_id,
|
||||
launch_processes, run_worker, worker_env,
|
||||
};
|
||||
|
||||
use std::ffi::c_void;
|
||||
|
||||
|
||||
200
crates/xtrain-distributed/src/proc.rs
Normal file
200
crates/xtrain-distributed/src/proc.rs
Normal file
@@ -0,0 +1,200 @@
|
||||
//! Process-per-GPU DDP launcher + worker (Phase T17, torchrun-style).
|
||||
//!
|
||||
//! T8's DDP is single-process, thread-per-GPU: N rank threads share ONE CUDA
|
||||
//! primary context, so much of the driver work (kernel launch, cuBLAS handle,
|
||||
//! stream queueing) serializes at the context level — the residual ~5×@8
|
||||
//! non-linearity left after T11's allocator fix (see docs/10 / KI-5).
|
||||
//!
|
||||
//! Process-per-GPU gives each rank its OWN OS process and OWN CUDA context, so
|
||||
//! those driver calls no longer queue in a shared context. Only the LAUNCH model
|
||||
//! and the cross-process NCCL bootstrap change; the training step
|
||||
//! (`train_rank` → grad all-reduce → local AdamW) and the consistency argument
|
||||
//! are reused from T8 UNCHANGED.
|
||||
//!
|
||||
//! UniqueId rendezvous: the LAUNCHER (the common parent of every worker) mints
|
||||
//! the `ncclUniqueId` once, hex-encodes it, and injects it into each worker's env
|
||||
//! at spawn time. No shared file / TCP server / polling — the id is atomically
|
||||
//! present before the child exists, so there is no "id not ready yet" race. This
|
||||
//! is the simplest single-node mechanism (see docs/16).
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Command, Stdio};
|
||||
|
||||
use xtrain_model::{Config, TinyTransformer};
|
||||
use xtrain_tensor::{DType, Device};
|
||||
use xtrain_train::data::Corpus;
|
||||
|
||||
use crate::ddp::{DdpConfig, DdpResult, build_model, train_rank};
|
||||
use crate::ffi::NcclUniqueId;
|
||||
use crate::{DdpContext, get_unique_id};
|
||||
|
||||
// Env keys the launcher sets on every spawned worker (torchrun-style: a worker
|
||||
// detects its role by the presence of `XTRAIN_RANK`).
|
||||
pub const ENV_RANK: &str = "XTRAIN_RANK";
|
||||
pub const ENV_WORLD: &str = "XTRAIN_WORLD";
|
||||
pub const ENV_LOCAL_RANK: &str = "XTRAIN_LOCAL_RANK";
|
||||
pub const ENV_NCCL_ID: &str = "XTRAIN_NCCL_ID";
|
||||
|
||||
/// Hex-encode the 128-byte `ncclUniqueId` for env transport (128 B → 256 chars,
|
||||
/// well under any env-var length limit). `c_char` is signed on this target, so
|
||||
/// reinterpret the bytes as `u8` first.
|
||||
pub fn hex_encode_unique_id(id: &NcclUniqueId) -> String {
|
||||
let mut s = String::with_capacity(256);
|
||||
for &b in &id.internal {
|
||||
s.push_str(&format!("{:02x}", b as u8));
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
/// Inverse of [`hex_encode_unique_id`]: parse 256 hex chars back into the
|
||||
/// 128-byte opaque blob. Panics on malformed input (the launcher always writes a
|
||||
/// well-formed value, so a bad value means a corrupted env).
|
||||
pub fn hex_decode_unique_id(hex: &str) -> NcclUniqueId {
|
||||
assert_eq!(
|
||||
hex.len(),
|
||||
256,
|
||||
"NCCL id hex must be 256 chars, got {}",
|
||||
hex.len()
|
||||
);
|
||||
let mut id = NcclUniqueId::default();
|
||||
for (i, slot) in id.internal.iter_mut().enumerate() {
|
||||
let byte = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16).expect("NCCL id hex byte parse");
|
||||
*slot = byte as std::os::raw::c_char;
|
||||
}
|
||||
id
|
||||
}
|
||||
|
||||
/// Spawn `world` worker processes (re-exec of the current binary with the same
|
||||
/// argv), each pinned to one GPU via `XTRAIN_LOCAL_RANK`, and wait for all of
|
||||
/// them. The launcher mints the `ncclUniqueId` and injects it (hex) into every
|
||||
/// worker's env, so the cross-process NCCL bootstrap needs no shared file/TCP.
|
||||
///
|
||||
/// Returns `Ok(())` iff every worker exits 0; otherwise an error naming the first
|
||||
/// failing rank (so the caller — `main` / a test — can propagate a non-zero exit).
|
||||
/// `extra_args` is forwarded to each worker verbatim (so all training hyper-params
|
||||
/// pass straight through); the workers inherit the launcher's env (incl.
|
||||
/// `CUDA_VISIBLE_DEVICES`) plus the four `XTRAIN_*` keys.
|
||||
pub fn launch_processes(world: usize, extra_args: &[String]) -> Result<(), String> {
|
||||
let exe = std::env::current_exe().map_err(|e| format!("current_exe: {e}"))?;
|
||||
let id = get_unique_id();
|
||||
let id_hex = hex_encode_unique_id(&id);
|
||||
|
||||
let mut children = Vec::with_capacity(world);
|
||||
for rank in 0..world {
|
||||
let child = Command::new(&exe)
|
||||
.args(extra_args)
|
||||
.env(ENV_RANK, rank.to_string())
|
||||
.env(ENV_WORLD, world.to_string())
|
||||
// Single node: local rank == global rank == device ordinal within the
|
||||
// visible set. (Multi-node would split these; see docs/16 follow-up.)
|
||||
.env(ENV_LOCAL_RANK, rank.to_string())
|
||||
.env(ENV_NCCL_ID, &id_hex)
|
||||
// Workers inherit stdout/stderr so rank 0's training log surfaces.
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()
|
||||
.map_err(|e| format!("spawn worker rank {rank}: {e}"))?;
|
||||
children.push((rank, child));
|
||||
}
|
||||
|
||||
let mut first_err: Option<String> = None;
|
||||
for (rank, mut child) in children {
|
||||
let status = child
|
||||
.wait()
|
||||
.map_err(|e| format!("wait worker rank {rank}: {e}"))?;
|
||||
if !status.success() && first_err.is_none() {
|
||||
first_err = Some(format!("worker rank {rank} exited with {status}"));
|
||||
}
|
||||
}
|
||||
match first_err {
|
||||
Some(e) => Err(e),
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// The four `XTRAIN_*` values a worker reads from its env. Present iff this
|
||||
/// process was spawned by [`launch_processes`].
|
||||
pub struct WorkerEnv {
|
||||
pub rank: usize,
|
||||
pub world: usize,
|
||||
pub local_rank: u32,
|
||||
pub id: NcclUniqueId,
|
||||
}
|
||||
|
||||
/// Read the worker env if this process is a spawned worker (i.e. `XTRAIN_RANK`
|
||||
/// is set), else `None` (this process is the launcher).
|
||||
pub fn worker_env() -> Option<WorkerEnv> {
|
||||
let rank: usize = std::env::var(ENV_RANK).ok()?.parse().ok()?;
|
||||
let world: usize = std::env::var(ENV_WORLD)
|
||||
.expect("XTRAIN_WORLD set with XTRAIN_RANK")
|
||||
.parse()
|
||||
.expect("XTRAIN_WORLD parse");
|
||||
let local_rank: u32 = std::env::var(ENV_LOCAL_RANK)
|
||||
.expect("XTRAIN_LOCAL_RANK set with XTRAIN_RANK")
|
||||
.parse()
|
||||
.expect("XTRAIN_LOCAL_RANK parse");
|
||||
let id_hex = std::env::var(ENV_NCCL_ID).expect("XTRAIN_NCCL_ID set with XTRAIN_RANK");
|
||||
let id = hex_decode_unique_id(&id_hex);
|
||||
Some(WorkerEnv {
|
||||
rank,
|
||||
world,
|
||||
local_rank,
|
||||
id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Per-worker model construction knobs (the opt-in feature flags the launcher
|
||||
/// forwards). Mirrors the closure `train_ddp` passes to the thread-per-GPU
|
||||
/// `launch`, but here it runs once in this worker's own process/context.
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub struct ModelOpts {
|
||||
pub bf16: bool,
|
||||
pub recompute: bool,
|
||||
pub flash: bool,
|
||||
}
|
||||
|
||||
/// Run this worker: bind its GPU (→ its own CUDA context), init NCCL with the
|
||||
/// launcher-supplied id, build its model with the deterministic init (same as
|
||||
/// every rank + the single-GPU baseline), and run `train_rank`. Reuses the T8
|
||||
/// training step verbatim — the only difference from thread-per-GPU is how this
|
||||
/// rank was started and how it got the `UniqueId`.
|
||||
///
|
||||
/// `valid` is the held-out corpus for rank 0's periodic eval (pass `None` on
|
||||
/// other ranks or when `cfg.eval_every == 0`).
|
||||
pub fn run_worker(
|
||||
env: &WorkerEnv,
|
||||
cfg: Config,
|
||||
opts: ModelOpts,
|
||||
corpus: &Corpus,
|
||||
valid: Option<&Corpus>,
|
||||
dcfg: &DdpConfig,
|
||||
) -> DdpResult {
|
||||
// Binding the device here establishes this process's own CUDA primary context.
|
||||
let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank);
|
||||
let device = Device::Cuda(env.local_rank);
|
||||
let model = build_worker_model(cfg, opts, device);
|
||||
let v = if env.rank == 0 { valid } else { None };
|
||||
train_rank(&ctx, &model, device, corpus, v, dcfg)
|
||||
}
|
||||
|
||||
/// Build the worker's model with the deterministic `build_model` init + the
|
||||
/// opt-in feature flags. Shared by `run_worker` and the test worker.
|
||||
pub fn build_worker_model(cfg: Config, opts: ModelOpts, device: Device) -> TinyTransformer {
|
||||
let mut m = build_model(cfg, device);
|
||||
if opts.bf16 {
|
||||
m = m.with_compute_dtype(DType::BF16);
|
||||
}
|
||||
if opts.recompute {
|
||||
m = m.with_recompute(true);
|
||||
}
|
||||
if opts.flash {
|
||||
m = m.with_flash(true);
|
||||
}
|
||||
m
|
||||
}
|
||||
|
||||
/// Convenience: the directory tests/bins can stash per-rank result dumps in
|
||||
/// (a worker writes its loss/params there; the launching test reads them back).
|
||||
pub fn rank_dump_path(dir: &std::path::Path, rank: usize) -> PathBuf {
|
||||
dir.join(format!("rank{rank}.dump"))
|
||||
}
|
||||
280
crates/xtrain-distributed/tests/ddp_proc.rs
Normal file
280
crates/xtrain-distributed/tests/ddp_proc.rs
Normal file
@@ -0,0 +1,280 @@
|
||||
//! Process-per-GPU DDP acceptance (Phase T17). Gated to a GPU host; skips when
|
||||
//! fewer than 2 GPUs. Run with `--test-threads=1` (distributed tests deadlock if
|
||||
//! they contend for the same GPUs in parallel — known harness property).
|
||||
//!
|
||||
//! Self-launching: the test binary detects WORKER mode via `XTRAIN_RANK` (set by
|
||||
//! `launch_processes`). In worker mode it runs `run_worker` on a synthetic corpus
|
||||
//! and dumps its per-step loss trace + final params to a per-rank file; in normal
|
||||
//! mode it is the launcher — it runs the single-GPU baseline, spawns N worker
|
||||
//! processes (re-execing itself), reads their dumps back, and asserts:
|
||||
//! (a) multi-process loss matches single-GPU within `<1e-3`,
|
||||
//! (b) cross-rank params agree within `<1e-6` (KI-5 ULP tolerance),
|
||||
//! (c) multi-process loss matches the thread-per-GPU `launch` path within `<1e-3`.
|
||||
|
||||
#![cfg(not(no_cuda))]
|
||||
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
use xtrain_cuda::device;
|
||||
use xtrain_distributed::proc::{launch_processes, rank_dump_path, worker_env};
|
||||
use xtrain_distributed::{DdpConfig, DdpContext, build_model, train_rank};
|
||||
use xtrain_model::{Config, batched_ids_tensor};
|
||||
use xtrain_optim::GpuAdamW;
|
||||
use xtrain_tensor::Device;
|
||||
use xtrain_train::clip::clip_grad_norm_gpu;
|
||||
use xtrain_train::data::Corpus;
|
||||
use xtrain_train::schedule::LrSchedule;
|
||||
|
||||
// ── Shared fixture (identical on launcher + every worker, so they agree) ──────
|
||||
|
||||
const VOCAB: usize = 64;
|
||||
const STEPS: usize = 20;
|
||||
|
||||
fn synth_corpus() -> Corpus {
|
||||
let tokens: Vec<i32> = (0..4096)
|
||||
.map(|i| (i * 7 + 3) as i32 % VOCAB as i32)
|
||||
.collect();
|
||||
Corpus {
|
||||
tokens,
|
||||
vocab_size: VOCAB,
|
||||
}
|
||||
}
|
||||
|
||||
fn test_config() -> Config {
|
||||
let mut cfg = Config::tiny();
|
||||
cfg.vocab = VOCAB;
|
||||
cfg.n_layers = 2;
|
||||
cfg
|
||||
}
|
||||
|
||||
fn dcfg(batch_size: usize) -> DdpConfig {
|
||||
DdpConfig {
|
||||
seq_len: 32,
|
||||
batch_size,
|
||||
accum_steps: 1,
|
||||
steps: STEPS,
|
||||
schedule: LrSchedule {
|
||||
max_lr: 3e-3,
|
||||
min_lr: 3e-4,
|
||||
warmup: 3,
|
||||
total: STEPS,
|
||||
},
|
||||
weight_decay: 0.1,
|
||||
max_grad_norm: 1.0,
|
||||
log_every: 1_000_000,
|
||||
seed: 7,
|
||||
eval_every: 0,
|
||||
eval_batches: 0,
|
||||
ckpt_path: None,
|
||||
}
|
||||
}
|
||||
|
||||
// The dump dir is passed launcher→worker via this env key (separate from the
|
||||
// XTRAIN_* keys the launcher sets); workers write `rank{N}.dump` there.
|
||||
const ENV_DUMP_DIR: &str = "XTRAIN_TEST_DUMP_DIR";
|
||||
const GLOBAL_BATCH: usize = 8;
|
||||
|
||||
// ── Worker entry: runs when this test binary is re-execed by launch_processes ─
|
||||
|
||||
fn run_as_worker_if_needed() {
|
||||
let Some(env) = worker_env() else { return };
|
||||
let dump_dir = std::env::var(ENV_DUMP_DIR).expect("dump dir env");
|
||||
// This is the worker body `run_worker` performs in production (init ctx →
|
||||
// build deterministic model → train_rank). We train ONCE inline so we can dump
|
||||
// both the loss trace AND the final params for the launcher to check; the
|
||||
// production `run_worker` wrapper is exercised by `bin/train_ddp_mp` on dash5.
|
||||
let ctx = DdpContext::init(env.rank, env.world, env.id, env.local_rank);
|
||||
let device = Device::Cuda(env.local_rank);
|
||||
let model = build_model(test_config(), device);
|
||||
let res = train_rank(
|
||||
&ctx,
|
||||
&model,
|
||||
device,
|
||||
&synth_corpus(),
|
||||
None,
|
||||
&dcfg(GLOBAL_BATCH),
|
||||
);
|
||||
let params: Vec<Vec<f32>> = model
|
||||
.params()
|
||||
.iter()
|
||||
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
|
||||
.collect();
|
||||
write_dump(&dump_dir, env.rank, &res.losses, ¶ms);
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
fn write_dump(dir: &str, rank: usize, losses: &[f32], params: &[Vec<f32>]) {
|
||||
let path = rank_dump_path(Path::new(dir), rank);
|
||||
let mut f = std::fs::File::create(&path).expect("create dump");
|
||||
// Line 1: losses (space-separated). Following lines: one param tensor each.
|
||||
let loss_line: Vec<String> = losses.iter().map(|x| format!("{x:.8e}")).collect();
|
||||
writeln!(f, "{}", loss_line.join(" ")).unwrap();
|
||||
for p in params {
|
||||
let line: Vec<String> = p.iter().map(|x| format!("{x:.8e}")).collect();
|
||||
writeln!(f, "{}", line.join(" ")).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn read_dump(dir: &str, rank: usize) -> (Vec<f32>, Vec<Vec<f32>>) {
|
||||
let path = rank_dump_path(Path::new(dir), rank);
|
||||
let text = std::fs::read_to_string(&path).expect("read dump");
|
||||
let mut lines = text.lines();
|
||||
let losses: Vec<f32> = lines
|
||||
.next()
|
||||
.unwrap()
|
||||
.split_whitespace()
|
||||
.map(|s| s.parse().unwrap())
|
||||
.collect();
|
||||
let params: Vec<Vec<f32>> = lines
|
||||
.map(|l| l.split_whitespace().map(|s| s.parse().unwrap()).collect())
|
||||
.collect();
|
||||
(losses, params)
|
||||
}
|
||||
|
||||
// ── Single-GPU baseline (same loop as the DDP rank, world=1) ──────────────────
|
||||
|
||||
fn run_single_gpu(cfg: Config, corpus: &Corpus, d: &DdpConfig) -> (Vec<f32>, Vec<Vec<f32>>) {
|
||||
device::set_device(0).unwrap();
|
||||
let device = Device::Cuda(0);
|
||||
let model = build_model(cfg, device);
|
||||
let params = model.params();
|
||||
let mut opt = GpuAdamW::new(d.weight_decay);
|
||||
let mut rng = d.seed;
|
||||
let mut losses = Vec::new();
|
||||
for step in 0..d.steps {
|
||||
let lr = d.schedule.lr(step);
|
||||
let mut inputs = Vec::with_capacity(d.batch_size);
|
||||
let mut targets_v = Vec::with_capacity(d.batch_size);
|
||||
for _ in 0..d.batch_size {
|
||||
let (input, target) = corpus.sample(d.seq_len, &mut rng);
|
||||
inputs.push(input);
|
||||
targets_v.push(target);
|
||||
}
|
||||
let ids = batched_ids_tensor(&inputs, device);
|
||||
let targets = batched_ids_tensor(&targets_v, device);
|
||||
let loss = model.loss_batched(&ids, &targets, d.batch_size);
|
||||
losses.push(loss.value().to_device(Device::Cpu).as_slice::<f32>()[0]);
|
||||
loss.backward();
|
||||
clip_grad_norm_gpu(¶ms, d.max_grad_norm, 1.0);
|
||||
opt.step(lr, ¶ms);
|
||||
for p in ¶ms {
|
||||
p.zero_grad();
|
||||
}
|
||||
}
|
||||
let host = params
|
||||
.iter()
|
||||
.map(|p| p.value().to_device(Device::Cpu).as_slice::<f32>().to_vec())
|
||||
.collect();
|
||||
(losses, host)
|
||||
}
|
||||
|
||||
// ── The test (launcher mode) ──────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn proc_per_gpu_matches_single_gpu_and_thread_path() {
|
||||
// If this process was spawned as a worker, do the worker job and exit before
|
||||
// the test framework runs anything else.
|
||||
run_as_worker_if_needed();
|
||||
|
||||
let world = 2usize;
|
||||
if device::device_count().unwrap_or(0) < world as i32 {
|
||||
eprintln!("skip: need >= {world} GPUs");
|
||||
return;
|
||||
}
|
||||
|
||||
let cfg = test_config();
|
||||
let corpus = synth_corpus();
|
||||
let d = dcfg(GLOBAL_BATCH);
|
||||
|
||||
// (1) Single-GPU baseline over the global batch.
|
||||
let (single_losses, single_params) = run_single_gpu(cfg, &corpus, &d);
|
||||
|
||||
// (2) Thread-per-GPU path (T8 `launch`) — the regression baseline to match.
|
||||
let thread_results =
|
||||
xtrain_distributed::launch(&[0u32, 1u32], &corpus, None, &d, move |device| {
|
||||
build_model(cfg, device)
|
||||
});
|
||||
let thread_losses = &thread_results[0].losses;
|
||||
|
||||
// (3) Process-per-GPU: spawn 2 worker processes (re-exec of this test binary),
|
||||
// each dumps its loss trace + final params to a temp dir.
|
||||
let dump_dir = std::env::temp_dir().join(format!("xtrain_t17_{}", std::process::id()));
|
||||
std::fs::create_dir_all(&dump_dir).unwrap();
|
||||
// SAFETY: single-threaded test (forced by --test-threads=1) sets this env
|
||||
// before spawning workers; no concurrent env access.
|
||||
unsafe {
|
||||
std::env::set_var(ENV_DUMP_DIR, &dump_dir);
|
||||
}
|
||||
// Re-exec the test binary but run ONLY this test, single-threaded, so the
|
||||
// worker process does the worker job and exits without touching other tests.
|
||||
let worker_args = [
|
||||
"--exact".to_string(),
|
||||
"proc_per_gpu_matches_single_gpu_and_thread_path".to_string(),
|
||||
"--test-threads=1".to_string(),
|
||||
"--nocapture".to_string(),
|
||||
];
|
||||
launch_processes(world, &worker_args).expect("worker processes failed");
|
||||
|
||||
let (proc_losses0, proc_p0) = read_dump(dump_dir.to_str().unwrap(), 0);
|
||||
let (_proc_losses1, proc_p1) = read_dump(dump_dir.to_str().unwrap(), 1);
|
||||
|
||||
// (a) process-per-GPU loss matches single-GPU.
|
||||
let max_rel_single = max_rel(&single_losses, &proc_losses0);
|
||||
println!(
|
||||
"proc-per-GPU vs single-GPU loss: single[last]={:.6} proc[last]={:.6} max_rel={max_rel_single:.2e}",
|
||||
single_losses.last().unwrap(),
|
||||
proc_losses0.last().unwrap()
|
||||
);
|
||||
assert!(
|
||||
max_rel_single < 1e-3,
|
||||
"proc-per-GPU loss diverged from single-GPU: {max_rel_single:.3e}"
|
||||
);
|
||||
|
||||
// (c) process-per-GPU loss matches the thread-per-GPU path.
|
||||
let max_rel_thread = max_rel(thread_losses, &proc_losses0);
|
||||
println!(
|
||||
"proc-per-GPU vs thread-per-GPU loss: thread[last]={:.6} proc[last]={:.6} max_rel={max_rel_thread:.2e}",
|
||||
thread_losses.last().unwrap(),
|
||||
proc_losses0.last().unwrap()
|
||||
);
|
||||
assert!(
|
||||
max_rel_thread < 1e-3,
|
||||
"proc-per-GPU loss diverged from thread-per-GPU: {max_rel_thread:.3e}"
|
||||
);
|
||||
|
||||
// (b) cross-rank parameter agreement (KI-5 ULP tolerance).
|
||||
let mut max_pdiff = 0.0f32;
|
||||
for (a, b) in proc_p0.iter().zip(&proc_p1) {
|
||||
for (x, y) in a.iter().zip(b) {
|
||||
max_pdiff = max_pdiff.max((x - y).abs());
|
||||
}
|
||||
}
|
||||
println!("proc-per-GPU cross-rank max |param diff| = {max_pdiff:.3e}");
|
||||
assert!(
|
||||
max_pdiff < 1e-6,
|
||||
"ranks' params drifted apart: {max_pdiff:.3e}"
|
||||
);
|
||||
|
||||
// Bonus sanity: proc-per-GPU final params vs single-GPU within fp tolerance.
|
||||
let mut max_sdiff = 0.0f32;
|
||||
for (a, b) in proc_p0.iter().zip(&single_params) {
|
||||
for (x, y) in a.iter().zip(b) {
|
||||
max_sdiff = max_sdiff.max((x - y).abs() / y.abs().max(1e-6));
|
||||
}
|
||||
}
|
||||
println!("proc-per-GPU vs single-GPU max rel |param diff| = {max_sdiff:.3e}");
|
||||
assert!(
|
||||
max_sdiff < 1e-2,
|
||||
"proc-per-GPU params diverged from single-GPU"
|
||||
);
|
||||
|
||||
let _ = std::fs::remove_dir_all(&dump_dir);
|
||||
}
|
||||
|
||||
fn max_rel(a: &[f32], b: &[f32]) -> f32 {
|
||||
a.iter()
|
||||
.zip(b)
|
||||
.map(|(s, d)| (s - d).abs() / s.abs().max(1e-6))
|
||||
.fold(0.0f32, f32::max)
|
||||
}
|
||||
266
docs/16-process-per-gpu.md
Normal file
266
docs/16-process-per-gpu.md
Normal file
@@ -0,0 +1,266 @@
|
||||
# Phase T17: Process-per-GPU DDP(torchrun 式独立 CUDA context)— Design Document
|
||||
|
||||
## Goal
|
||||
|
||||
T8 的 DDP 是**单进程 thread-per-GPU**:一个进程开 N 个 OS 线程,每线程 `cudaSetDevice` 绑一张卡、
|
||||
在**同一个 CUDA primary context** 里跑自己 rank 的训练。T11 修掉 per-op `cudaMalloc` 串行后,8 卡
|
||||
scaling 从 ~1.3× 恢复到 **~5×@8**,但**残留 5×@8 而非 ~8× 的非线性**——根因在 T11 doc / KI-5 已点明:
|
||||
**N 个 rank 线程共享同一个 CUDA context,driver 层很多调用(kernel launch、cuBLAS handle、stream
|
||||
排队)在单 context 内进程级串行**,pool allocator 只消掉了其中最大的一笔(malloc),剩下的 launch /
|
||||
cuBLAS 串行仍在。
|
||||
|
||||
T17 的目标 = **torchrun 式 process-per-GPU**:每个 rank 是一个**独立 OS 进程**,各自持有**独立的
|
||||
CUDA context**,彼此的 driver 调用不再在同一 context 排队 → 移除 thread-per-GPU 的残留串行,把 8 卡
|
||||
scaling 推向更接近线性。这是 Phase 2 里**改动最大**的一项(launcher 结构性重写 + 跨进程 NCCL
|
||||
bootstrap),所以本 doc 先行。
|
||||
|
||||
**Scope(用户已拍板):process-per-GPU ONLY。ZeRO-1 / sharded optimizer 明确 drop**——本尺度
|
||||
optimizer state 小、收益薄。本任务只换**启动模型与 NCCL bootstrap**,**训练 step(grad all-reduce →
|
||||
本地 AdamW)原样复用、零改动**。
|
||||
|
||||
**保留 thread-per-GPU 路径**:T8 的 `launch()` + `train_ddp` bin 不删(回归保护 + 闸门 ①要求新旧路径
|
||||
loss 对齐)。process-per-GPU 作为**并列的新 launcher** 加上去。
|
||||
|
||||
验收(硬闸门全绿,诚实正确性,不放宽容差):
|
||||
1. 多进程(world=2 / world=4)训练 loss **对单卡贴合**(进既有 DDP 容差 `<1e-3`),且对住旧 thread-per-GPU 路径;
|
||||
2. 跨 rank 参数一致(repo 既有 `<1e-6` 约定);
|
||||
3. **8 卡线性度 before→after 实测**:thread-per-GPU baseline(~5×@8)vs process-per-GPU @ {1,2,4,8},给数字;
|
||||
4. 全回归套绿(含 xserv 闭环 md5 / token-identical);单卡与旧 thread-per-GPU 路径不回归。
|
||||
|
||||
## 什么变、什么不变
|
||||
|
||||
```
|
||||
thread-per-GPU (T8, 保留) process-per-GPU (T17, 新增)
|
||||
启动 1 进程 × N 线程 1 launcher 进程 → fork/exec N 个 worker 进程
|
||||
CUDA context N 线程共享 1 个 primary context 每 worker 进程 1 个独立 context
|
||||
rank/world/device 闭包捕获 + thread::scope env: RANK / WORLD_SIZE / LOCAL_RANK
|
||||
模型构建 每线程闭包内 build_model(!Send) 每进程 main 内 build_model(天然隔离)
|
||||
NCCL UniqueId 分发 move 一个 Copy struct 进线程闭包 launcher 生成 → hex 编码进子进程 env
|
||||
NCCL comm init DdpContext::init(不变) DdpContext::init(不变)
|
||||
─────────────────────────────────────────────────────────────────────────────────────
|
||||
grad all-reduce all_reduce_average_grads(不变) ← 同一份代码,零改动
|
||||
本地 AdamW step train_rank(不变) ← 同一份代码,零改动
|
||||
batch sharding i % world == rank(不变) ← 同一份代码,零改动
|
||||
参数一致性证明 同 init+同 grad+同 opt(不变) ← 同一论证
|
||||
```
|
||||
|
||||
**核心洞察**:T8 早把训练 step 写成「**per-rank**、接受 `&DdpContext`」的形状(`train_rank`)。
|
||||
thread-per-GPU 与 process-per-GPU **唯一的区别只在「怎么把 rank 跑起来 + 怎么把 UniqueId 递给每个
|
||||
rank」**——前者跨线程 move,后者跨进程 env。`train_rank` / `all_reduce_average_grads` / sharding /
|
||||
一致性论证**全部原样复用**。这正是把启动模型与训练逻辑解耦的回报。
|
||||
|
||||
## Module Layout
|
||||
|
||||
```
|
||||
crates/xtrain-distributed/src/
|
||||
├── lib.rs ← 加 pub mod proc; re-export hex_encode/decode_unique_id + run_worker entry
|
||||
├── proc.rs ← 新增:① launcher(spawn N worker 进程,env 注入 rank/world/local_rank/uid)
|
||||
│ ② worker entry(读 env → DdpContext::init → build_model → train_rank)
|
||||
│ ③ UniqueId hex 编解码(跨进程 env 传 128 字节)
|
||||
├── ddp.rs ← 不变(train_rank / build_model / DdpConfig 复用)
|
||||
├── lib.rs::DdpContext / all_reduce_average_grads / get_unique_id ← 不变
|
||||
└── bin/
|
||||
├── train_ddp.rs ← 不变(thread-per-GPU,保留)
|
||||
└── train_ddp_mp.rs ← 新增:multi-process launcher / worker 二合一入口
|
||||
crates/xtrain-distributed/tests/
|
||||
└── ddp_proc.rs ← 新增:spawn 多进程跑几步 → loss 对单卡 + 跨 rank 参数一致 +(顺手)before/after 吞吐
|
||||
docs/16-process-per-gpu.md ← 本文
|
||||
```
|
||||
|
||||
`proc.rs` 全程 `#[cfg(not(no_cuda))]` 门控(同 crate 既有约定);本地无 nvcc 时 crate 编空,`cargo
|
||||
check` 过;dash5 上全量编译链 NCCL。
|
||||
|
||||
## Key Design Decisions
|
||||
|
||||
### ① Launch model:同一 binary 双模(launcher / worker)
|
||||
|
||||
`train_ddp_mp` 一个可执行文件,靠**环境变量是否存在**自判角色(torchrun 的 `LOCAL_RANK` 注入 worker
|
||||
是同一思路):
|
||||
|
||||
- **launcher 模式**(直接被用户 / 测试调用,env 里没有 `XTRAIN_RANK`):
|
||||
1. 看 `CUDA_VISIBLE_DEVICES` / `device_count()` 定 `world`;
|
||||
2. 调 `get_unique_id()` 生成一个 `ncclUniqueId`(128 字节),**hex 编码**成字符串;
|
||||
3. `for rank in 0..world`:`Command::new(current_exe())`,**复制自己全部 argv**(超参/路径透传),
|
||||
额外设 env `XTRAIN_RANK=rank`、`XTRAIN_WORLD=world`、`XTRAIN_LOCAL_RANK=rank`、
|
||||
`XTRAIN_NCCL_ID=<hex>`,spawn 为子进程;
|
||||
4. `wait()` 所有子进程,任一非零退出码 → launcher 以非零退出(CI / 闸门可感知)。
|
||||
- **worker 模式**(被 launcher spawn,env 里有 `XTRAIN_RANK`):
|
||||
1. 从 env 读 `rank / world / local_rank / uid_hex`;
|
||||
2. `device::set_device(local_rank)` 绑卡(**每进程独立 primary context** 在此首次 CUDA 调用时建立);
|
||||
3. hex 解码出 `NcclUniqueId`;`DdpContext::init(rank, world, id, local_rank)`(**复用 T8 的 init**);
|
||||
4. `build_model(cfg, device)`(**复用 T8 的确定性 init** → 同种子 → 跨进程逐位同起点);
|
||||
5. `train_rank(&ctx, &model, …, &cfg)`(**复用 T8 的训练 step,零改动**);
|
||||
6. 退出码 0(成功)/ 非零(panic → 进程崩,launcher 感知)。
|
||||
|
||||
**单机 `CUDA_VISIBLE_DEVICES` 处理**:launcher 看到的 visible 设备集就是 `0..world`;每个 worker
|
||||
继承同一个 `CUDA_VISIBLE_DEVICES`(env 默认透传),`local_rank` 直接当作 visible 集内的 device
|
||||
ordinal → `set_device(local_rank)`。这与 thread-per-GPU 的 `devices = 0..count` 语义一致,单节点足够。
|
||||
(真·多节点要把 `LOCAL_RANK` 与全局 `RANK` 分离 + 每节点 `CUDA_VISIBLE_DEVICES` 切片,单节点不需要,
|
||||
记为 follow-up。)
|
||||
|
||||
### ② 跨进程 NCCL UniqueId 分发:launcher 生成 + hex-env 注入(**最简、无竞态**)
|
||||
|
||||
这是 T17 最该想清楚的一处。候选机制(任务列了文件 / TCP / 共享 FS)逐一权衡:
|
||||
|
||||
| 机制 | 怎么做 | 单节点取舍 |
|
||||
|---|---|---|
|
||||
| **共享文件** | rank0-worker 写 `/tmp/xtrain.id`,其余 worker **轮询读** | 要处理「文件还没写好」的 race(轮询 + 重试 + 超时),还要 worker 间约定谁是 rank0、何时清理 |
|
||||
| **TCP rendezvous** | 起一个 c10d-store 式小 server 派发 id | 最贴 torchrun,但要写 socket server/client、端口选择、握手协议——单节点 overkill |
|
||||
| **launcher 生成 + env 注入** ✅ | **launcher**(而非 rank0-worker)调 `ncclGetUniqueId`,hex 编码后**在 spawn 时就写进每个子进程的 env** | 无文件 race、无轮询、无 TCP server、无清理——env 在子进程出生前就备好。子进程读 env 即得 id |
|
||||
|
||||
**选 env 注入**,诚实理由:单节点下 launcher 是所有 worker 的**共同父进程**,env 是父→子最朴素的带外
|
||||
通道,且**在子进程创建那一刻就原子地确定**——天然没有「id 还没就绪」的竞态,比文件轮询 / TCP 握手都
|
||||
简单且更鲁棒。代价是 launcher 进程要链 NCCL 调 `ncclGetUniqueId`(它本就在 distributed crate 里,已链
|
||||
NCCL),可接受。
|
||||
|
||||
> **与「rank 0 生成」的关系**:torchrun 概念上是 rank 0 把 id 放进 c10d store、别的 rank 取。这里
|
||||
> **launcher 充当协调者**替 rank 0 生成——功能等价(id 只是个一次性握手 token,谁生成不影响正确性,
|
||||
> 只要全 rank 拿到同一个),但单节点下省掉了「worker 间再来一轮带外同步」。**128 字节 → hex = 256
|
||||
> 字符**,远低于环境变量长度上限,env 传输安全。
|
||||
|
||||
`hex_encode`/`decode_unique_id` 是 `proc.rs` 里两个纯函数(`[c_char;128] ↔ 256-char hex`),单测可在
|
||||
host 侧验 roundtrip(不需 GPU)。
|
||||
|
||||
### ③ 独立 CUDA context = 移除残留串行(这才是 T17 的 payoff)
|
||||
|
||||
thread-per-GPU 的残留非线性(KI-5 / T11 doc)来自:**N rank 线程共享同一 CUDA primary context**,driver
|
||||
对该 context 的很多操作(kernel launch 队列、cuBLAS handle、内部锁)是进程级 / context 级串行的——
|
||||
pool allocator 消掉了 malloc 这一最大笔,但 launch / cuBLAS 串行仍在,表现为 8 卡 ~5× 而非 ~8×。
|
||||
|
||||
process-per-GPU 下**每个 rank 是独立进程 → 独立 CUDA context → 独立 driver 状态**:各进程的 kernel
|
||||
launch / cuBLAS 调用**互不在同一 context 排队**,残留串行(按此假设)应被结构性移除。这正是闸门 ③
|
||||
(before→after 线性度)要量出来的东西——若 process-per-GPU 把 8 卡从 ~5× 推到明显更高,即验证此假设。
|
||||
**诚实原则**:若提升有限,如实报告(说明残留瓶颈在 NCCL all-reduce / PCIe 拓扑,那是另一层,非本任务 scope)。
|
||||
|
||||
> ⚠️ **此假设被实测证伪**——见下方「实测结果 · 闸门 ③」:process-per-GPU 与 thread-per-GPU 吞吐统计上一致
|
||||
> (~5.3×@8 都一样),且 8 卡全 95–99% util。残留非线性是通信/PCIe 墙,不是单 context 串行。结论钉死、留档。
|
||||
|
||||
### ④ 训练 step / 一致性论证:原样复用 T8,零改动
|
||||
|
||||
process-per-GPU 不碰任何训练数学:
|
||||
|
||||
- **grad all-reduce**:`all_reduce_average_grads(params)` 一字不改——NCCL collective 跨**进程**和跨
|
||||
**线程**对调用方完全一样(comm 是 rank 维度的,与进程/线程无关)。
|
||||
- **batch sharding**:`i % world == rank` 不变——每进程推进**同一个 seed 的 RNG**抽出整批 `B_global`
|
||||
序列、只算自己那片。各进程的并集 == 单卡同序批 → all-reduce 后的 grad 和与单卡逐序列一致。
|
||||
- **参数一致性**:同 ③个充分条件(T8 doc ④)——(a) 同确定性 `build_model`(同 LCG 种子,跨进程同样
|
||||
成立);(b) NCCL all-reduce 跨 rank 返回逐位相同的归约(PCIe-only run-to-run 几 ULP 抖动,故闸门
|
||||
②用 `<1e-6` 而非 `==0`,与 T11 既有约定一致);(c) 同 optimizer 超参/状态演化。
|
||||
- **对单卡**:与单卡只在 **fp 求和顺序**上差(单卡 tape SUM B 个;DDP 各 rank 先 SUM 分片再 NCCL SUM)
|
||||
→ `<1e-3` rel,不逐位。与 thread-per-GPU 路径则应**数值同量级**(同一 sharding + 同一 all-reduce)。
|
||||
|
||||
### ⑤ 进程生命周期 / 失败传播 / 资源清理
|
||||
|
||||
- **失败传播**:worker panic → 进程非零退出;launcher `wait()` 收集所有退出码,任一非零 → launcher
|
||||
非零退出并打印哪个 rank 挂了。NCCL comm 在进程退出时由 OS 回收 context(`DdpContext::Drop` 调
|
||||
`ncclCommDestroy`,正常退出路径走到;崩溃时 OS 兜底回收)。
|
||||
- **不需要跨进程 barrier**:每个 worker 独立跑完 `cfg.steps` 自然退出;NCCL collective 本身是同步点
|
||||
(所有 rank 必须到齐才返回),训练循环天然对齐。
|
||||
- **资源清理**:无临时文件(env 注入,无 `/tmp` id 文件);ckpt 由 rank0-worker 写到 `--ckpt` 指定路径,
|
||||
与 thread-per-GPU 一致;测试用的 ckpt / 进程在测试结束清理。
|
||||
|
||||
## 验证方法(硬闸门全绿,dash5 实跑)
|
||||
|
||||
### 闸门 ①②:正确性 —— `tests/ddp_proc.rs`(`#[cfg(not(no_cuda))]`,<2 卡 skip)
|
||||
|
||||
测试本身是 launcher:用 `Command` spawn N 个 worker 进程(worker = 同测试 binary 的一个特殊模式,或复用
|
||||
`train_ddp_mp`),跑固定步数,worker 把最终 loss / 参数 dump 到各自的 stdout / 临时文件,测试父进程读回:
|
||||
|
||||
- **(a) loss 对单卡**:单卡 baseline(既有 `run_single_gpu`)vs 2-进程 / 4-进程 DDP,整条 loss 轨迹
|
||||
`max_rel < 1e-3`(与 thread-per-GPU 测试同容差)。
|
||||
- **(b) 跨 rank 参数一致**:`max|p_i - p_j| < 1e-6`(KI-5 既有约定)。
|
||||
- **(c) 对住 thread-per-GPU 路径**:同 config 同 seed,process-per-GPU 的 loss 轨迹 vs thread-per-GPU
|
||||
的 loss 轨迹应在 `<1e-3`(两者只差进程/线程,sharding+all-reduce 同)。
|
||||
|
||||
> **harness 注意**:分布式测试在共享 GPU 上并行会争卡 deadlock → 一律 `--test-threads=1`(已知 harness
|
||||
> 属性,capstone/known-issues 记过)。
|
||||
|
||||
### 闸门 ③:线性度 before→after —— `train_ddp`(thread) vs `train_ddp_mp`(process) @ {1,2,4,8}
|
||||
|
||||
固定**每卡 batch 32 / seq 256 / dim384**(与 T11 KI-5 表同口径,便于直接对比),各跑 steady-state tok/s:
|
||||
|
||||
```
|
||||
thread-per-GPU (T11 baseline) process-per-GPU (T17)
|
||||
world tok/s(global) speedup tok/s(global) speedup
|
||||
1 ~92K 1.00× ? 1.00×
|
||||
2 ~147K 1.59× ? ?
|
||||
4 ~270K 2.92× ? ?
|
||||
8 ~461K 4.99× ← 残留非线性 ? ? ← 目标更接近 8×
|
||||
```
|
||||
|
||||
8 卡跑时 `nvidia-smi` 抽样确认 8 卡 util。**资源纪律**:线性度 bench 合法地短用 8 卡,但**短跑**(每个
|
||||
world 几十~一两百步够测 steady-state),跑完清 ckpt / 中间物。
|
||||
|
||||
### 闸门 ④:全回归套(标准 `--test-threads=1`)
|
||||
|
||||
autograd / structural / batched / bf16 / recompute / overfit / AdamW / 既有 DDP loss-match + 跨 rank /
|
||||
flash / gqa / grad_accum / dropout,**+ xserv 闭环**(导出 → md5 对 registry → token-identical)。单卡与
|
||||
旧 thread-per-GPU 路径不得回归(process-per-GPU 是**新增**路径,旧路径代码未动 → 天然不回归,测试确认)。
|
||||
|
||||
### dash5 实跑
|
||||
|
||||
```bash
|
||||
export PATH=/usr/local/cuda/bin:/opt/wjh/.cargo/bin:$PATH
|
||||
# 正确性(多进程):
|
||||
CUDA_VISIBLE_DEVICES=0,1 cargo test -p xtrain-distributed --release --test ddp_proc -- --nocapture --test-threads=1
|
||||
# 多进程训练 / 线性度 driver(process-per-GPU launcher):
|
||||
CUDA_VISIBLE_DEVICES=0,1,2,3 cargo run -p xtrain-distributed --release --bin train_ddp_mp -- \
|
||||
/opt/wjh/models/gpt2/tokenizer.json data/tinystories-valid-3mb.txt \
|
||||
--dim 384 --heads 12 --head-dim 32 --layers 12 --ffn 1536 --steps 200 --batch 128 --seq 256
|
||||
```
|
||||
|
||||
实测数字见下方「实测结果」。
|
||||
|
||||
## 实测结果(dash5, 8× RTX 5090, sm_120)
|
||||
|
||||
### 正确性(闸门 ①②④ 全绿)
|
||||
|
||||
- **闸门 ① loss 对单卡 / 对 thread 路径**(`ddp_proc`, world=2,合成语料 20 步):
|
||||
proc-per-GPU vs single-GPU `max_rel = 5.67e-7`;**proc-per-GPU vs thread-per-GPU `max_rel = 1.5e-7`**
|
||||
(两条路径数值同量级,符合预期——只差进程/线程,sharding+all-reduce 同)。
|
||||
- **闸门 ② 跨 rank 参数**:`max|p0−p1| = 1.19e-7`(< 1e-6,KI-5 既有 ULP 容差,PCIe NCCL run-to-run 抖动)。
|
||||
- **闸门 ④ 全回归**:全 workspace `--test-threads=1` 全绿(autograd/structural/batched/bf16/recompute/
|
||||
overfit/AdamW/既有 DDP/flash/gqa/grad_accum/dropout)+ **xserv 闭环**:v3 ckpt 用 T17 代码重导
|
||||
safetensors 与 registry **md5 逐位一致 `b04fc9f9a0c9af04c47d9ca649aea12e`**(T17 不碰任何数值路径 → 必然一致)。
|
||||
|
||||
### 闸门 ③ 线性度 before→after —— **本任务的关键发现:process-per-GPU 在本尺度对吞吐中性**
|
||||
|
||||
固定每卡 batch 32 / dim384 / seq256 / 150 步(与 T11 KI-5 表同口径),steady-state tok/s:
|
||||
|
||||
| world | thread-per-GPU (`train_ddp`) | speedup | process-per-GPU (`train_ddp_mp`) | speedup |
|
||||
|---|---|---|---|---|
|
||||
| 1 | 93257 | 1.00× | 92952 | 1.00× |
|
||||
| 2 | 149747 | 1.61× | 148809 | 1.60× |
|
||||
| 4 | 278276 | 2.98× | 273308 | 2.94× |
|
||||
| 8 | **491360** | **5.27×** | **493128** | **5.31×** |
|
||||
|
||||
(world=8 重复 2 次确认非噪声:thread 493671/493292,proc 491102/494123——**两路差异 < 1%,落在 run-to-run 噪声内**。)
|
||||
|
||||
→ **process-per-GPU 与 thread-per-GPU 吞吐统计上一致(~5.3×@8 都一样)**。本 doc 设计假设 ③
|
||||
(「残留 5×@8 来自单 CUDA context 的 kernel-launch/cuBLAS 串行,process-per-GPU 给独立 context 即可移除」)
|
||||
**被实测证伪**——这正是 ③ 里预留的「诚实原则」分支。
|
||||
|
||||
**根因重定位(实测佐证)**:proc-per-GPU world=8 跑时 `nvidia-smi` 抽样 **8 卡全部 95–99% util**
|
||||
(每卡 ~23GB)——GPU **已 compute-bound 喂满、并非串行空转**(KI-5 当年「1–2/8 在忙」的串行病在 T11 的
|
||||
caching allocator 就已治好)。8 卡已满载却仍只 5.3×,缺的 ~35% 吞吐只能去向**每步 grad all-reduce +
|
||||
本机 PCIe-only 拓扑在 8 rank 下的通信开销**——即 T11 早已点明的「~7% all-reduce + 8 卡 PCIe 余量」那一层,
|
||||
在 8 卡下被放大。换独立 context 不动这一层,故吞吐不变。
|
||||
|
||||
**这与 T11 自身的方法论一致**:T11 实测证伪了「分桶 all-reduce」;T17 实测证伪了「process-per-GPU 解残留
|
||||
串行」。两次都靠 profile/measure 推翻假设而非硬上。**结论**:本尺度(dim384–1024、单机 8× PCIe RTX 5090)
|
||||
残留非线性是**通信/拓扑墙**,不是 launch 模型;要再逼近线性得动 all-reduce overlap / 更快互联(NVLink),
|
||||
那是另一条线,**非 T17 scope**。
|
||||
|
||||
**T17 的净价值(诚实记账)**:① 学到 / 落地了 torchrun 式 process-per-GPU 这条训练栈标准链路(独立进程 +
|
||||
独立 CUDA context + 跨进程 NCCL bootstrap)——**项目本职「学训练全栈」的目标达成**;② **实测把「process-per-GPU
|
||||
是残留非线性的解」这个长期挂在 KI-5/T11 doc 里的猜想钉死为「在本尺度无吞吐收益」**,移除一个误导性 backlog
|
||||
项;③ 正确性零回归、与 thread 路径数值对齐。**吞吐上它与 thread-per-GPU 等价**——故默认训练路径**不变**
|
||||
(thread-per-GPU 仍是 v1–v8 用的那条),process-per-GPU 作为并列可选路径 + 这条诊断结论留档。
|
||||
|
||||
## 不做(本任务范围外,记 follow-up)
|
||||
|
||||
- **ZeRO-1 / sharded optimizer**:用户已 drop(本尺度 optimizer state 小、收益薄)。
|
||||
- **真·多节点 bootstrap**:本任务单节点(env 注入足够);跨节点要 TCP rendezvous(c10d-store 式)+
|
||||
`LOCAL_RANK`/`RANK` 分离 + 每节点 `CUDA_VISIBLE_DEVICES` 切片 → 留 follow-up。
|
||||
- **NCCL 通信压缩 / overlap with backward**:与 T8/T11 同理由,all-reduce 当前非主瓶颈。
|
||||
- **删除 thread-per-GPU 路径**:保留(回归 baseline + 闸门 ①要求对齐)。
|
||||
@@ -28,6 +28,7 @@
|
||||
| T15 | 模型架构 | **真 GQA**(`num_kv_heads<num_heads`:wk/wv 投影到 `kv_dim`,新 `repeat_kv` broadcast 算子把 K/V 复制 `group=nh/num_kv` 份喂给**未改动**的 composed/flash 两条 SDPA;分组约定对齐 xserv repeat_kv `dst=kvh·group+r`);`repeat_kv` 反向=组内 group 行**确定性求和**(无 atomic)→ 多组 q 头梯度汇一个 kv 头;`num_kv_heads` 进 Config(默认=nh→MHA)、`--kv-heads` flag、导出写真 `num_key_value_heads`(Phase 2) | repeat_kv grad-check 2.1e-4(group3)+group1 identity 逐位;GQA flash==composed fp32 grad 4.1e-5/bf16 在带;**group1 对 MHA 逐位一致**(回归保护);PyTorch GQA B>1 对拍 composed/flash 各 loss 1.7e-8/logits 2.3e-5/25 grad 进 rtol;小 GQA(8h/2kv) 训 600 步 10.9→3.15 连贯;**xserv 闭环真 GQA**(num_kv 2<8):2/3 prompt token-identical、1 在 BF16 漂移处晚分叉;MHA 默认 export md5 逐位一致(b04fc9f9) |
|
||||
| T16 | 算法/Infra | **梯度累积**(N 个 micro-step:每个 micro-loss `×1/N` 再 backward,tape SUM 累加 → 一次 AdamW step+zero;`--accum-steps`);**DDP 只在累积边界 all-reduce**(中间 micro-step 不发 NCCL,`/world` 与 `1/N` 正交);显存随 micro 不随有效 batch | 等效大 batch**逐位贴合**(loss rel 8.5e-8、grad rel 3.8e-5);`accum=1` 逐位回归(0.00);DDP+accum 对单卡 loss 5.7e-7/跨 rank 一致;**显存平**:同有效 batch 64,big-batch 27.7GB→accum(4×16) **7.2GB(−74%)**(big-batch OOM 而 accum 装下);全回归+xserv 闭环 md5 一致 |
|
||||
| T18 | 算法 | **dropout**(手写 counter-based 设备 RNG → Bernoulli mask,训练 inverted 1/(1-p) scaling、eval 恒等);新 autodiff `dropout` 算子(fwd 生成+施加 mask,bwd 用同 mask),接 residual/ffn 两处;`--dropout` flag 默认 0 | 固定 seed grad-check 过;E[out]≈input + keep≈1-p;**p=0 与无 dropout 逐位一致**;recompute(T13) 组合下梯度仍逐位一致(counter-based seed 重算复现同 mask);全回归 + xserv 闭环绿(导出/推理 dropout 关) |
|
||||
| T17 | Infra | **process-per-GPU**(torchrun 式:`launch_processes` 每卡 spawn 一个 worker 进程=独立 CUDA context;launcher 一次性铸 `ncclUniqueId` 后 **hex 编码注入子进程 env**——无共享 FS/TCP、无竞态;worker 读 env→bind device→`DdpContext::init`+`build_model`+`train_rank` **全复用 T8 零改动**;新 `train_ddp_mp` bin/`ddp_proc` test,**保留 thread-per-GPU 旧路径**);scope=process-per-GPU only(ZeRO-1 用户 drop)(Phase 2) | 正确性全绿:proc vs 单卡 loss 5.67e-7、**proc vs thread-per-GPU 1.5e-7**、跨 rank 1.19e-7(<1e-6)、全回归+xserv 闭环 md5 逐位一致 `b04fc9f9`。**⚠️关键发现(实测证伪原假设):本尺度 process-per-GPU 对吞吐中性**——thread vs proc @ {1,2,4,8} = {1.00/1.61/2.98/**5.27**}× vs {1.00/1.60/2.94/**5.31**}×(差<1% 噪声内);8 卡全 95–99% util ⇒ 残留 ~5.3×@8 非线性是 **NCCL all-reduce + 本机 PCIe 拓扑墙**,**非**单 CUDA context 串行(KI-5/T11 doc 的猜想被钉死推翻,方法论同 T11 证伪「分桶 all-reduce」)。净价值=落地 torchrun 式标准链路 + 把误导性 backlog 项实测关闭;默认训练路径不变 |
|
||||
|
||||
---
|
||||
|
||||
@@ -55,7 +56,7 @@
|
||||
|
||||
- **算法**:手写 autograd(tape)+扇出累加 → AdamW/LR-sched/grad-clip → +QK-norm(Qwen3) → batched forward → bf16 混合精度(fp32 master) → 激活重计算(T13) → 融合 flash-attention(T14,online softmax + flash 式 bwd) → 梯度累积(T16,复用 tape SUM,等效大 batch 而显存随 micro) → dropout(T18,counter-based 设备 RNG + inverted scaling,train/eval 切换)。
|
||||
- **模型架构**:固定 Qwen3-style;dim **32→256→384→512→768→1024**(v8 首拨容量轴,头数 24→32);核心参数 **41K→226M**(总 3.26M→329M)。+QK-norm(T9,Qwen3 兼容) → **真 GQA(T15,`num_kv_heads<num_heads`,repeat_kv broadcast + 组内梯度求和;默认=nh→MHA 逐位回归)**——架构补齐到现代 LLM 标配(MHA/GQA/MQA 一条 `num_kv_heads` 轴),两条 SDPA(composed/flash) 共用同一 broadcast,导出真 `num_key_value_heads` 且 xserv 闭环。
|
||||
- **Infra**:单卡 fp32 → cuBLAS/GPU-optim(T7) → NCCL DDP(T8) → batched forward(T10) → caching allocator(T11) → bf16(T12) → 激活重计算(T13,解锁 dim1024) → flash-attention(T14,不物化 N×N,attention 显存收益随 seq 增长) → 梯度累积(T16,DDP 只在累积边界通信,显存随 micro 不随有效 batch)。吞吐 **3.3K→217K tok/s**(dim768 bf16),dim1024+重算 ~129K(重算税);MFU **0.4%→17%**(每次提升都对应一块 perf 基建,详见 known-issues + MFU 分析)。T13/T14/T16 是三条**显存杠杆**(重计算压激活峰值、flash 不物化 N×N attention scores、梯度累积解耦有效 batch 与激活显存),可叠加放大有效 batch。
|
||||
- **Infra**:单卡 fp32 → cuBLAS/GPU-optim(T7) → NCCL DDP(T8) → batched forward(T10) → caching allocator(T11) → bf16(T12) → 激活重计算(T13,解锁 dim1024) → flash-attention(T14,不物化 N×N,attention 显存收益随 seq 增长) → 梯度累积(T16,DDP 只在累积边界通信,显存随 micro 不随有效 batch) → process-per-GPU(T17,torchrun 式独立进程/CUDA context,复用 T8 train_rank 零改动)。吞吐 **3.3K→217K tok/s**(dim768 bf16),dim1024+重算 ~129K(重算税);MFU **0.4%→17%**(每次提升都对应一块 perf 基建,详见 known-issues + MFU 分析)。T13/T14/T16 是三条**显存杠杆**(重计算压激活峰值、flash 不物化 N×N attention scores、梯度累积解耦有效 batch 与激活显存),可叠加放大有效 batch。**T17 实测=负结果记账**:process-per-GPU 在本尺度对吞吐**中性**(thread ~5.27× vs proc ~5.31×@8,差<1% 噪声),8 卡全 95–99% util ⇒ 残留非线性是 NCCL/PCIe 通信墙、**非**单 context 串行——把 KI-5/T11 doc 长挂的「process-per-GPU 是残留串行的解」猜想实测钉死推翻(方法论同 T11 证伪「分桶 all-reduce」)。
|
||||
- **数据集**:TinyStories 3MB 切片 → 全量 TinyStories(epoch 0.01→5.33,**至饱和**)→ **v6 毕业到 FineWeb-edu 真实网页**(2.255B 语料,1.02ep)→ **v7 同子集多 epoch(1.45ep,近顶)→ v8 同子集换大模型**(dim1024,1.05ep)。tokenizer 全程 gpt2 BPE(复用 xserv-tokenizer;v6 刻意不换 tokenizer 以隔离「数据来源」变量,KI-4 留后续版本)。
|
||||
- **v5→v6 数据轴的质变**:v0–v5 都吃合成幼儿故事(TinyStories,低熵、词汇受控),v5 证明同尺寸模型在它上面已饱和;v6 第一版换成**真实教育类网页文本**(FineWeb-edu),语言种类发生质变——采样从「只会写小故事」变成「能写历史/科学/说明文」。
|
||||
- ⚠️ **同子集多 epoch 也有天花板(v6→v7)**:v6 的 FineWeb val 才训 1.02ep、末步仍单调降,曾被读作「还没喂够」;v7 把**同一 2.255B 子集**喂到 1.45ep(多 ~1B token),FineWeb val 仅 ↓0.05(3.07→3.01)且 ~step44000 后走平、采样无质变 ⇒ **该子集在 dim768 已近天花板**。这与 v5 的 TinyStories 数据量饱和是**同一类现象**:**「重复喂老数据」边际都薄,无论是 v5 的同语料多 epoch 还是 v7 的同子集多 epoch**。真正抬天花板的是 v6「换更广的新语料」那一步——**杠杆在「更多样的新 token」,不在「同数据多读几遍」**。后续要继续降 val,必须补**新 FineWeb shards**(更多样、不重复),不是同子集加 epoch。
|
||||
@@ -66,5 +67,6 @@
|
||||
## 四、perf 杠杆台账(详见 [known-issues.md](known-issues.md))
|
||||
|
||||
- **已修**:KI-1 单序列 launch-bound(T10)· KI-5 per-op cudaMalloc 串行(T11)· KI-2 bf16/OOM(T12)· KI-3 激活重计算(T13,解锁 dim1024,v8 用上)。
|
||||
- **待办**:KI-4 大词表小 vocab · process-per-GPU(要更高多卡线性时)。
|
||||
- 两次「先 profile 再动手」证伪了错误的拟修复(KI-1「加大batch」、KI-5「分桶all-reduce」),避免了无效大改——profile-first。
|
||||
- **实测关闭(负结果)**:process-per-GPU(T17)——曾挂在 KI-5/T11 doc 作残留非线性的拟修复方向,T17 实测**吞吐中性**(thread ~5.27× vs proc ~5.31×@8,8 卡全满载),残留是 NCCL/PCIe 通信墙非 context 串行 → 不再是 perf 待办,链路本身已落地留作可选路径。
|
||||
- **待办**:KI-4 大词表小 vocab(接受的建模权衡)· 要更高多卡线性 → all-reduce overlap / NVLink 互联(非本尺度优先)。
|
||||
- **三次「先 profile/measure 再动手」证伪了错误的拟修复**(KI-1「加大batch」、KI-5「分桶all-reduce」、T17「process-per-GPU 解残留串行」),避免了无效大改——profile/measure-first。
|
||||
|
||||
@@ -13,6 +13,26 @@ _(KI-1 fixed in T10. KI-5 fixed in T11. KI-2 fixed in T12. **KI-3(激活重计
|
||||
|
||||
## Fixed
|
||||
|
||||
### process-per-GPU(torchrun 式独立 CUDA context)— `CLOSED / 实测负结果` (T17)
|
||||
- **背景**:KI-5(T11)修掉 per-op `cudaMalloc` 串行后,8 卡 scaling 从 ~1.3× 恢复到 **~5×@8**,但残留 ~5×@8 非完美线性。T11 doc / KI-5「残留」推测下一步是 **process-per-GPU**(每 rank 独立进程 + 独立 CUDA context,torchrun 式)——理由是「N rank 线程共享单 CUDA primary context,kernel-launch/cuBLAS 仍在 context 级串行」。**T17 把这条 torchrun 式链路落地并实测,证伪了该推测。**
|
||||
- **实现([docs/16-process-per-gpu.md](16-process-per-gpu.md))**:`xtrain-distributed` 加 `proc.rs`——`launch_processes` 每卡 spawn 一个 worker 进程(re-exec current_exe + `XTRAIN_{RANK,WORLD,LOCAL_RANK,NCCL_ID}` env);**launcher 一次性铸 `ncclUniqueId` 后 hex 编码注入子进程 env**(无共享 FS/TCP、无轮询、无竞态——id 在子进程出生前就原子就绪);worker 读 env → bind device(独立 CUDA context)→ `DdpContext::init` + `build_model` + `train_rank` **全部复用 T8 零改动**。新 `train_ddp_mp` bin + `ddp_proc` test;**保留 thread-per-GPU 旧路径**(回归 baseline)。scope=process-per-GPU only(ZeRO-1 用户 drop)。
|
||||
- **正确性(全绿,无回归)**:proc vs 单卡 loss `5.67e-7`、**proc vs thread-per-GPU `1.5e-7`**(两路数值同量级)、跨 rank `1.19e-7`(<1e-6);全回归套 `--test-threads=1` 全绿 + **xserv 闭环 v3 重导 md5 逐位一致 `b04fc9f9`**(T17 不碰任何数值路径)。
|
||||
- **实测结果(关键,dash5 8× RTX 5090, dim384 per-rank batch32 seq256, steady-state)**:
|
||||
|
||||
| world | thread-per-GPU (`train_ddp`) | speedup | process-per-GPU (`train_ddp_mp`) | speedup |
|
||||
|---|---|---|---|---|
|
||||
| 1 | 93257 | 1.00× | 92952 | 1.00× |
|
||||
| 2 | 149747 | 1.61× | 148809 | 1.60× |
|
||||
| 4 | 278276 | 2.98× | 273308 | 2.94× |
|
||||
| 8 | **491360** | **5.27×** | **493128** | **5.31×** |
|
||||
|
||||
(world=8 各重复 2 次:thread 493671/493292、proc 491102/494123——**两路差异 <1%,落在噪声内**。)
|
||||
- **诊断(证伪原推测)**:process-per-GPU world=8 跑时 `nvidia-smi` 抽样 **8 卡全部 95–99% util**(每卡 ~23GB)——GPU **已 compute-bound 喂满、非串行空转**(KI-5「1–2/8 在忙」的串行病 T11 allocator 已治好)。8 卡满载却仍只 5.3× ⇒ 缺的 ~35% 吞吐去向**每步 grad all-reduce + 本机 PCIe-only 拓扑在 8 rank 下的通信开销**(T11 早点明的「~7% all-reduce + PCIe 余量」那一层,8 卡放大),换独立 context 不动这一层。**结论:本尺度(dim384–1024、单机 8× PCIe RTX 5090)残留非线性是通信/拓扑墙,不是 launch 模型**——要再逼近线性须动 all-reduce overlap / NVLink 互联(非本尺度优先)。
|
||||
- **方法论一致**:T11 实测证伪「分桶 all-reduce」、T17 实测证伪「process-per-GPU 解残留串行」——两次都靠 measure 推翻假设而非硬上(profile/measure-first)。**净价值**:落地 torchrun 式 process-per-GPU 标准链路(项目本职「学训练全栈」)+ 把这个误导性 backlog 项**实测钉死关闭**。**默认训练路径不变**(thread-per-GPU),process-per-GPU 作并列可选路径留档。
|
||||
- **commit**:见 T17 提交链(`distributed: process-per-GPU launcher + worker` / `distributed: train_ddp_mp bin` / `test: process-per-GPU DDP correctness` / 设计文档 `docs: Phase T17 — process-per-GPU DDP design`)。
|
||||
|
||||
---
|
||||
|
||||
### KI-3 · 激活重计算(gradient checkpointing)— `FIXED` (T13)
|
||||
- **触发点(v8 surfaced)**:容量轴放大到 dim1024(核心 ~210M+)测是否 capacity-limited。autograd tape 为反向保存所有中间激活,激活显存随 dim 线性增长——dim768 bf16 batch32 已 31.1GB(T12 甜点区),**dim1024 batch32 再次 OOM**(实测撞 32100/32607MiB → `OutOfMemory`)。
|
||||
- **设计(per-block gradient checkpointing,opt-in,[docs/12-activation-recompute.md](12-activation-recompute.md))**:新增 `xtrain_autodiff::checkpoint(segment_fn, input, params)` 高阶原语(类比 `torch.utils.checkpoint`)。**前向**:把 input/params detach 成局部 leaf 跑 `segment_fn`,只取输出值,局部 tape 立即 drop → 段内激活释放(不留在外层 tape);checkpoint 节点 parents=[input, ..params]。**反向**:从保存的 input + 未变的 param 值重跑 `segment_fn` 重建局部 tape,用上游 grad seed(`Var::backward_seeded`,新增——段输出非标量)回传,恢复的 input/param 梯度 push 给真 parents,局部 tape drop → 重算激活释放。模型每个 transformer block 前向用它包裹(`--recompute` flag,默认关)。切粒度 = 每 block。
|
||||
@@ -68,7 +88,7 @@ _(KI-1 fixed in T10. KI-5 fixed in T11. KI-2 fixed in T12. **KI-3(激活重计
|
||||
→ **单卡 40226→92638 tok/s (~2.3×)**;**8 卡 49K→461K tok/s (9.4×)**,scaling 从 ~1.3× 封顶恢复到 **~5×@8**;8 卡 `nvidia-smi` 抽样 **全 8 卡 95–99% util**(KI-5 时只 1–2/8 忙)。loss 轨迹逐位对住(单卡 10.9026→4.8453 before/after 一致)。
|
||||
- **正确性(全绿,无回归)**:15 算子 grad-check、5 结构、GEMM 对 cuBLAS、batched==looped、overfit 27/27、AdamW GPU bit-exact + host 对 torch、checkpoint 逐位、DDP loss 对单卡 **5.67e-7** + 跨 rank diff 0.0(loosened `<1e-6`)、**xserv 闭环**(v3 ckpt 重导 safetensors 与 registry md5 逐位一致 + xserv 加载服务贪心 "Once upon a time," 对住)。
|
||||
- **顺手**:DDP `ddp_correctness` 的 cross-rank `==0.0` → `<1e-6`(本机 PCIe-only NCCL run-to-run 跨 rank 非逐位可复现,diff≤1.2e-7 几 ULP 无害,承重闸门是 loss-match 5.67e-7);`ddp_throughput_scaling` 扩到 world=8。
|
||||
- **残留**:~5×@8 非完美线性(grad all-reduce ~7% + 8 卡 PCIe/launch 余量),但弱扩展悬崖已消。v4 若要更高线性度,下一步是 **process-per-GPU**(每 rank 独立 CUDA context,torchrun 式)。
|
||||
- **残留**:~5×@8 非完美线性(grad all-reduce ~7% + 8 卡 PCIe/launch 余量),但弱扩展悬崖已消。曾以为下一步是 **process-per-GPU**(每 rank 独立 CUDA context,torchrun 式)——**T17 实测证伪该方向**(见下方「process-per-GPU(T17)」):残留是**通信/PCIe 墙**,不是单 CUDA context 的 launch/cuBLAS 串行。
|
||||
- **commit**:见 T11 提交链(`cuda: device caching allocator` / `perf: KI-5 …` 那条带 before/after)。
|
||||
- **历史诊断保留如下**(证伪「分桶 all-reduce」的过程):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user