2 Commits

Author SHA1 Message Date
0314b4f3ac server: non-blocking stream send — stop one slow client stalling the batch
All three engines emitted tokens with blocking_send on the single
decode/coordinator OS thread. A streaming client that drains slower than
generation fills its 64-slot channel, and blocking_send then blocks the whole
thread: under continuous batching one slow consumer stalls every other running
sequence (and in the serial TP/PP path it blocks admission of the next request
too). The whole point of continuous batching is defeated.

Fix: switch to try_send. engine.rs sets a client_stalled flag on Full/Closed,
reaped by is_finished() next iteration; tp_engine/pp_engine emit_text returns
bool and the decode loop breaks with finish_reason "error". When the
sequence/request is dropped its sender drops too, closing the channel so the
client receive loop ends rather than hanging. A slow client now only loses its
own sequence, never the batch.

Verified on dash5: gpt-oss FP8 TP=1 streaming via tp_engine still streams
correctly (SSE chunks, coherent content, no hang); bench-gpt-oss TP=2 5.9ms
TPOT unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-01 12:37:32 +08:00
cfbd64d206 cuda: fix int32 overflow in MoE dense kernels; surface launch errors in release
The dense MoE kernels (moe_replicate, moe_bias_add_3d, moe_weighted_sum)
computed total / expert_stride / element indices in int32. gpt-oss prefill
runs the whole prompt through the dense path unchunked (SPARSE_MAX_TOKENS=8),
so local_experts*num_tokens*hidden (and batch*num_tokens*dim, and
local_id*expert_stride) overflow int32 at ~3.6k-23k prefill tokens
(TP-dependent) — well inside the supported context window. The launch then
fails silently because CUDA_CHECK_LAST_ERROR was ((void)0) under NDEBUG, so
the bias / weighted-sum simply never runs and the forward pass is corrupted
with no error reported.

Fix: switch the three kernels and their launchers to long long, mirroring the
(long long) indexing already used in moe_sparse.cu. Also make
CUDA_CHECK_LAST_ERROR always-on — cudaGetLastError does not sync, so the
per-launch host cost is negligible, and a silent launch failure is exactly
the class of bug this one was.

Verified on dash5 (RTX 5090): a direct kernel test at 2.21B elements (>2^31)
for both moe_replicate and moe_bias_add_3d produces correct results with no
launch error; bench-gpt-oss TP=2 holds at 5.9ms TPOT, output unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-01 12:37:21 +08:00
5 changed files with 95 additions and 45 deletions

View File

@@ -38,6 +38,9 @@ struct Sequence {
seq_slot: Option<usize>,
sender: tokio::sync::mpsc::Sender<GenerateEvent>,
prefilled: bool,
/// Set when a `try_send` failed (client too slow or gone). The scheduler
/// reaps the sequence next iteration instead of blocking the decode thread.
client_stalled: bool,
eos_token_id: Option<u32>,
decode_buffer: Vec<u8>,
created_at: Instant,
@@ -370,6 +373,7 @@ impl Engine {
seq_slot: None,
sender: req.sender,
prefilled: false,
client_stalled: false,
eos_token_id: self.tokenizer.eos_token_id(),
decode_buffer: Vec::new(),
created_at: Instant::now(),
@@ -392,7 +396,7 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) {
if tokenizer.eos_token_id() == Some(token_id) {
let tail = tokenizer.flush_decode_stream(&mut seq.decode_buffer);
send_token_if_nonempty(seq, tail);
let _ = seq.sender.blocking_send(GenerateEvent::Done {
try_send_event(seq, GenerateEvent::Done {
finish_reason: "stop".to_string(),
});
return;
@@ -403,7 +407,7 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) {
let tail = tokenizer.flush_decode_stream(&mut seq.decode_buffer);
send_token_if_nonempty(seq, text);
send_token_if_nonempty(seq, tail);
let _ = seq.sender.blocking_send(GenerateEvent::Done {
try_send_event(seq, GenerateEvent::Done {
finish_reason: "length".to_string(),
});
} else {
@@ -411,14 +415,34 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) {
}
}
fn send_token_if_nonempty(seq: &Sequence, text: String) {
fn send_token_if_nonempty(seq: &mut Sequence, text: String) {
if !text.is_empty() {
let id = *seq.generated_tokens.last().unwrap_or(&0);
let _ = seq.sender.blocking_send(GenerateEvent::Token { id, text });
try_send_event(seq, GenerateEvent::Token { id, text });
}
}
/// Send an event without blocking the shared decode thread. If the client is
/// too slow (channel full) or gone (closed), flag the sequence for eviction
/// instead of blocking — one slow consumer must never stall the whole
/// continuous-batching loop. When the sequence is reaped its `sender` drops,
/// closing the channel so the client's receive loop ends rather than hanging.
fn try_send_event(seq: &mut Sequence, event: GenerateEvent) {
if let Err(err) = seq.sender.try_send(event) {
seq.client_stalled = true;
if let tokio::sync::mpsc::error::TrySendError::Full(_) = err {
eprintln!(
"[scheduler] seq {}: client too slow (stream channel full), evicting",
seq.id
);
}
}
}
fn is_finished(seq: &Sequence) -> bool {
if seq.client_stalled {
return true;
}
if seq.generated_tokens.is_empty() {
return false;
}

View File

@@ -268,9 +268,12 @@ pub fn run_pp(
let mut decode_buf: Vec<u8> = Vec::new();
let mut generated = 1usize;
emit_text(&tokenizer, &req, next, &mut decode_buf);
let mut stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
let finish = loop {
if stalled {
break "error";
}
if tokenizer.is_eos(next) {
break "stop";
}
@@ -289,17 +292,17 @@ pub fn run_pp(
send_hidden(&sc, &x, next_peer);
next = token_rx.recv().expect("decode token");
generated += 1;
emit_text(&tokenizer, &req, next, &mut decode_buf);
stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
};
let tail = tokenizer.flush_decode_stream(&mut decode_buf);
if !tail.is_empty() {
let _ = req.sender.blocking_send(GenerateEvent::Token {
let _ = req.sender.try_send(GenerateEvent::Token {
id: next,
text: tail,
});
}
let _ = req.sender.blocking_send(GenerateEvent::Done {
let _ = req.sender.try_send(GenerateEvent::Done {
finish_reason: finish.to_string(),
});
@@ -312,14 +315,19 @@ pub fn run_pp(
}
/// Stream a token's decoded text to the client (EOS contributes no text).
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) {
/// Returns false if the send would block (client too slow) or the client is
/// gone — the caller stops generating so the coordinator thread is free to
/// admit the next request instead of blocking on one slow consumer.
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) -> bool {
if tokenizer.is_eos(token_id) {
return;
return true;
}
let text = tokenizer.decode_token_stream(token_id, buf);
if !text.is_empty() {
let _ = req
return req
.sender
.blocking_send(GenerateEvent::Token { id: token_id, text });
.try_send(GenerateEvent::Token { id: token_id, text })
.is_ok();
}
true
}

View File

@@ -294,9 +294,12 @@ pub fn run_tp(
let mut decode_buf: Vec<u8> = Vec::new();
let mut generated = 1usize;
emit_text(&tokenizer, &req, next, &mut decode_buf);
let mut stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
let finish = loop {
if stalled {
break "error";
}
if tokenizer.is_eos(next) {
break "stop";
}
@@ -317,17 +320,17 @@ pub fn run_tp(
next = pick(&logits, &req.sampling, &gen_ids);
gen_ids.push(next);
generated += 1;
emit_text(&tokenizer, &req, next, &mut decode_buf);
stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
};
let tail = tokenizer.flush_decode_stream(&mut decode_buf);
if !tail.is_empty() {
let _ = req.sender.blocking_send(GenerateEvent::Token {
let _ = req.sender.try_send(GenerateEvent::Token {
id: next,
text: tail,
});
}
let _ = req.sender.blocking_send(GenerateEvent::Done {
let _ = req.sender.try_send(GenerateEvent::Done {
finish_reason: finish.to_string(),
});
@@ -340,14 +343,19 @@ pub fn run_tp(
}
/// Stream a token's decoded text to the client (EOS contributes no text).
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) {
/// Returns false if the send would block (client too slow) or the client is
/// gone — the caller stops generating so the serial coordinator thread is free
/// to admit the next request instead of blocking on one slow consumer.
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) -> bool {
if tokenizer.is_eos(token_id) {
return;
return true;
}
let text = tokenizer.decode_token_stream(token_id, buf);
if !text.is_empty() {
let _ = req
return req
.sender
.blocking_send(GenerateEvent::Token { id: token_id, text });
.try_send(GenerateEvent::Token { id: token_id, text })
.is_ok();
}
true
}

View File

@@ -49,10 +49,12 @@ __device__ __forceinline__ float block_reduce_max(float val) {
return val;
}
// --- Launch error checking (debug builds only) ---
#ifdef NDEBUG
#define CUDA_CHECK_LAST_ERROR() ((void)0)
#else
// --- Launch error checking ---
// Always on, including release builds. A launch with an invalid config
// (e.g. 32-bit overflow in grid/index math) is otherwise silent and produces
// garbage with no clue — the MoE int32-overflow bug was found exactly because
// release swallowed the launch failure. `cudaGetLastError()` does not
// synchronize the stream, so the per-launch host cost is negligible.
#include <cstdio>
#define CUDA_CHECK_LAST_ERROR() do { \
cudaError_t err = cudaGetLastError(); \
@@ -61,4 +63,3 @@ __device__ __forceinline__ float block_reduce_max(float val) {
__FILE__, __LINE__, cudaGetErrorString(err)); \
} \
} while(0)
#endif

View File

@@ -89,13 +89,17 @@ __global__ void moe_replicate_bf16_kernel(
__nv_bfloat16* __restrict__ x_rep,
int num_tokens, int hidden, int local_experts
) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
int total = local_experts * num_tokens * hidden;
// 64-bit index: local_experts * num_tokens * hidden overflows int32 at
// ~2.3k prefill tokens (gpt-oss TP=1, 32 experts), which is inside the
// supported context window. A 32-bit `total` silently wraps, the launch
// fails, and (in release) the error is invisible — see common.cuh.
long long idx = (long long)blockIdx.x * blockDim.x + threadIdx.x;
long long total = (long long)local_experts * num_tokens * hidden;
if (idx >= total) return;
int remainder = idx % (num_tokens * hidden);
// x_rep[expert, token, dim] = x[token, dim]
x_rep[idx] = x[remainder];
long long row_stride = (long long)num_tokens * hidden;
x_rep[idx] = x[idx % row_stride];
}
// ============================================================
@@ -112,13 +116,16 @@ __global__ void moe_bias_add_3d_bf16_kernel(
const __nv_bfloat16* __restrict__ bias,
int batch, int num_tokens, int dim
) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
int total = batch * num_tokens * dim;
// 64-bit index: batch * num_tokens * dim overflows int32 at ~3.6k prefill
// tokens (gpt-oss TP=1, 32 experts, 2*intermediate dim) — see moe_replicate.
long long idx = (long long)blockIdx.x * blockDim.x + threadIdx.x;
long long total = (long long)batch * num_tokens * dim;
if (idx >= total) return;
int b = idx / (num_tokens * dim);
int d = idx % dim;
float v = __bfloat162float(x[idx]) + __bfloat162float(bias[b * dim + d]);
long long td = (long long)num_tokens * dim;
int b = (int)(idx / td); // < batch (small)
int d = (int)(idx % dim); // < dim
float v = __bfloat162float(x[idx]) + __bfloat162float(bias[(long long)b * dim + d]);
x[idx] = __float2bfloat16(v);
}
@@ -151,14 +158,16 @@ __global__ void moe_weighted_sum_bf16_kernel(
int num_tokens, int hidden, int top_k,
int expert_start, int local_experts
) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
int total = num_tokens * hidden;
// 64-bit index: `local_id * expert_stride` overflows int32 for long prefills
// (expert_stride = num_tokens * hidden), reading the wrong expert element.
long long idx = (long long)blockIdx.x * blockDim.x + threadIdx.x;
long long total = (long long)num_tokens * hidden;
if (idx >= total) return;
int token = idx / hidden;
int dim = idx % hidden;
long long token = idx / hidden;
int dim = (int)(idx % hidden);
int expert_stride = num_tokens * hidden; // stride between experts in expert_out
long long expert_stride = (long long)num_tokens * hidden; // stride between experts in expert_out
float sum = 0.0f;
for (int k = 0; k < top_k; k++) {
@@ -196,9 +205,9 @@ void launch_moe_replicate_bf16(
int num_tokens, int hidden, int local_experts,
void* stream
) {
int total = local_experts * num_tokens * hidden;
long long total = (long long)local_experts * num_tokens * hidden;
int block = 256;
int grid = (total + block - 1) / block;
int grid = (int)((total + block - 1) / block);
moe_replicate_bf16_kernel<<<grid, block, 0, (cudaStream_t)stream>>>(
(const __nv_bfloat16*)x, (__nv_bfloat16*)x_rep,
num_tokens, hidden, local_experts
@@ -211,9 +220,9 @@ void launch_moe_bias_add_3d_bf16(
int batch, int num_tokens, int dim,
void* stream
) {
int total = batch * num_tokens * dim;
long long total = (long long)batch * num_tokens * dim;
int block = 256;
int grid = (total + block - 1) / block;
int grid = (int)((total + block - 1) / block);
moe_bias_add_3d_bf16_kernel<<<grid, block, 0, (cudaStream_t)stream>>>(
(__nv_bfloat16*)x, (const __nv_bfloat16*)bias,
batch, num_tokens, dim
@@ -229,9 +238,9 @@ void launch_moe_weighted_sum_bf16(
int expert_start, int local_experts,
void* stream
) {
int total = num_tokens * hidden;
long long total = (long long)num_tokens * hidden;
int block = 256;
int grid = (total + block - 1) / block;
int grid = (int)((total + block - 1) / block);
moe_weighted_sum_bf16_kernel<<<grid, block, 0, (cudaStream_t)stream>>>(
(const __nv_bfloat16*)expert_out,
(const int*)topk_ids, (const float*)topk_weights,