Compare commits
2 Commits
531cd3fe08
...
0314b4f3ac
| Author | SHA1 | Date | |
|---|---|---|---|
| 0314b4f3ac | |||
| cfbd64d206 |
@@ -38,6 +38,9 @@ struct Sequence {
|
||||
seq_slot: Option<usize>,
|
||||
sender: tokio::sync::mpsc::Sender<GenerateEvent>,
|
||||
prefilled: bool,
|
||||
/// Set when a `try_send` failed (client too slow or gone). The scheduler
|
||||
/// reaps the sequence next iteration instead of blocking the decode thread.
|
||||
client_stalled: bool,
|
||||
eos_token_id: Option<u32>,
|
||||
decode_buffer: Vec<u8>,
|
||||
created_at: Instant,
|
||||
@@ -370,6 +373,7 @@ impl Engine {
|
||||
seq_slot: None,
|
||||
sender: req.sender,
|
||||
prefilled: false,
|
||||
client_stalled: false,
|
||||
eos_token_id: self.tokenizer.eos_token_id(),
|
||||
decode_buffer: Vec::new(),
|
||||
created_at: Instant::now(),
|
||||
@@ -392,7 +396,7 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) {
|
||||
if tokenizer.eos_token_id() == Some(token_id) {
|
||||
let tail = tokenizer.flush_decode_stream(&mut seq.decode_buffer);
|
||||
send_token_if_nonempty(seq, tail);
|
||||
let _ = seq.sender.blocking_send(GenerateEvent::Done {
|
||||
try_send_event(seq, GenerateEvent::Done {
|
||||
finish_reason: "stop".to_string(),
|
||||
});
|
||||
return;
|
||||
@@ -403,7 +407,7 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) {
|
||||
let tail = tokenizer.flush_decode_stream(&mut seq.decode_buffer);
|
||||
send_token_if_nonempty(seq, text);
|
||||
send_token_if_nonempty(seq, tail);
|
||||
let _ = seq.sender.blocking_send(GenerateEvent::Done {
|
||||
try_send_event(seq, GenerateEvent::Done {
|
||||
finish_reason: "length".to_string(),
|
||||
});
|
||||
} else {
|
||||
@@ -411,14 +415,34 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) {
|
||||
}
|
||||
}
|
||||
|
||||
fn send_token_if_nonempty(seq: &Sequence, text: String) {
|
||||
fn send_token_if_nonempty(seq: &mut Sequence, text: String) {
|
||||
if !text.is_empty() {
|
||||
let id = *seq.generated_tokens.last().unwrap_or(&0);
|
||||
let _ = seq.sender.blocking_send(GenerateEvent::Token { id, text });
|
||||
try_send_event(seq, GenerateEvent::Token { id, text });
|
||||
}
|
||||
}
|
||||
|
||||
/// Send an event without blocking the shared decode thread. If the client is
|
||||
/// too slow (channel full) or gone (closed), flag the sequence for eviction
|
||||
/// instead of blocking — one slow consumer must never stall the whole
|
||||
/// continuous-batching loop. When the sequence is reaped its `sender` drops,
|
||||
/// closing the channel so the client's receive loop ends rather than hanging.
|
||||
fn try_send_event(seq: &mut Sequence, event: GenerateEvent) {
|
||||
if let Err(err) = seq.sender.try_send(event) {
|
||||
seq.client_stalled = true;
|
||||
if let tokio::sync::mpsc::error::TrySendError::Full(_) = err {
|
||||
eprintln!(
|
||||
"[scheduler] seq {}: client too slow (stream channel full), evicting",
|
||||
seq.id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_finished(seq: &Sequence) -> bool {
|
||||
if seq.client_stalled {
|
||||
return true;
|
||||
}
|
||||
if seq.generated_tokens.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -268,9 +268,12 @@ pub fn run_pp(
|
||||
|
||||
let mut decode_buf: Vec<u8> = Vec::new();
|
||||
let mut generated = 1usize;
|
||||
emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
let mut stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
|
||||
let finish = loop {
|
||||
if stalled {
|
||||
break "error";
|
||||
}
|
||||
if tokenizer.is_eos(next) {
|
||||
break "stop";
|
||||
}
|
||||
@@ -289,17 +292,17 @@ pub fn run_pp(
|
||||
send_hidden(&sc, &x, next_peer);
|
||||
next = token_rx.recv().expect("decode token");
|
||||
generated += 1;
|
||||
emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
};
|
||||
|
||||
let tail = tokenizer.flush_decode_stream(&mut decode_buf);
|
||||
if !tail.is_empty() {
|
||||
let _ = req.sender.blocking_send(GenerateEvent::Token {
|
||||
let _ = req.sender.try_send(GenerateEvent::Token {
|
||||
id: next,
|
||||
text: tail,
|
||||
});
|
||||
}
|
||||
let _ = req.sender.blocking_send(GenerateEvent::Done {
|
||||
let _ = req.sender.try_send(GenerateEvent::Done {
|
||||
finish_reason: finish.to_string(),
|
||||
});
|
||||
|
||||
@@ -312,14 +315,19 @@ pub fn run_pp(
|
||||
}
|
||||
|
||||
/// Stream a token's decoded text to the client (EOS contributes no text).
|
||||
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) {
|
||||
/// Returns false if the send would block (client too slow) or the client is
|
||||
/// gone — the caller stops generating so the coordinator thread is free to
|
||||
/// admit the next request instead of blocking on one slow consumer.
|
||||
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) -> bool {
|
||||
if tokenizer.is_eos(token_id) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
let text = tokenizer.decode_token_stream(token_id, buf);
|
||||
if !text.is_empty() {
|
||||
let _ = req
|
||||
return req
|
||||
.sender
|
||||
.blocking_send(GenerateEvent::Token { id: token_id, text });
|
||||
.try_send(GenerateEvent::Token { id: token_id, text })
|
||||
.is_ok();
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
@@ -294,9 +294,12 @@ pub fn run_tp(
|
||||
|
||||
let mut decode_buf: Vec<u8> = Vec::new();
|
||||
let mut generated = 1usize;
|
||||
emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
let mut stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
|
||||
let finish = loop {
|
||||
if stalled {
|
||||
break "error";
|
||||
}
|
||||
if tokenizer.is_eos(next) {
|
||||
break "stop";
|
||||
}
|
||||
@@ -317,17 +320,17 @@ pub fn run_tp(
|
||||
next = pick(&logits, &req.sampling, &gen_ids);
|
||||
gen_ids.push(next);
|
||||
generated += 1;
|
||||
emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf);
|
||||
};
|
||||
|
||||
let tail = tokenizer.flush_decode_stream(&mut decode_buf);
|
||||
if !tail.is_empty() {
|
||||
let _ = req.sender.blocking_send(GenerateEvent::Token {
|
||||
let _ = req.sender.try_send(GenerateEvent::Token {
|
||||
id: next,
|
||||
text: tail,
|
||||
});
|
||||
}
|
||||
let _ = req.sender.blocking_send(GenerateEvent::Done {
|
||||
let _ = req.sender.try_send(GenerateEvent::Done {
|
||||
finish_reason: finish.to_string(),
|
||||
});
|
||||
|
||||
@@ -340,14 +343,19 @@ pub fn run_tp(
|
||||
}
|
||||
|
||||
/// Stream a token's decoded text to the client (EOS contributes no text).
|
||||
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) {
|
||||
/// Returns false if the send would block (client too slow) or the client is
|
||||
/// gone — the caller stops generating so the serial coordinator thread is free
|
||||
/// to admit the next request instead of blocking on one slow consumer.
|
||||
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) -> bool {
|
||||
if tokenizer.is_eos(token_id) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
let text = tokenizer.decode_token_stream(token_id, buf);
|
||||
if !text.is_empty() {
|
||||
let _ = req
|
||||
return req
|
||||
.sender
|
||||
.blocking_send(GenerateEvent::Token { id: token_id, text });
|
||||
.try_send(GenerateEvent::Token { id: token_id, text })
|
||||
.is_ok();
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
@@ -49,10 +49,12 @@ __device__ __forceinline__ float block_reduce_max(float val) {
|
||||
return val;
|
||||
}
|
||||
|
||||
// --- Launch error checking (debug builds only) ---
|
||||
#ifdef NDEBUG
|
||||
#define CUDA_CHECK_LAST_ERROR() ((void)0)
|
||||
#else
|
||||
// --- Launch error checking ---
|
||||
// Always on, including release builds. A launch with an invalid config
|
||||
// (e.g. 32-bit overflow in grid/index math) is otherwise silent and produces
|
||||
// garbage with no clue — the MoE int32-overflow bug was found exactly because
|
||||
// release swallowed the launch failure. `cudaGetLastError()` does not
|
||||
// synchronize the stream, so the per-launch host cost is negligible.
|
||||
#include <cstdio>
|
||||
#define CUDA_CHECK_LAST_ERROR() do { \
|
||||
cudaError_t err = cudaGetLastError(); \
|
||||
@@ -61,4 +63,3 @@ __device__ __forceinline__ float block_reduce_max(float val) {
|
||||
__FILE__, __LINE__, cudaGetErrorString(err)); \
|
||||
} \
|
||||
} while(0)
|
||||
#endif
|
||||
|
||||
@@ -89,13 +89,17 @@ __global__ void moe_replicate_bf16_kernel(
|
||||
__nv_bfloat16* __restrict__ x_rep,
|
||||
int num_tokens, int hidden, int local_experts
|
||||
) {
|
||||
int idx = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
int total = local_experts * num_tokens * hidden;
|
||||
// 64-bit index: local_experts * num_tokens * hidden overflows int32 at
|
||||
// ~2.3k prefill tokens (gpt-oss TP=1, 32 experts), which is inside the
|
||||
// supported context window. A 32-bit `total` silently wraps, the launch
|
||||
// fails, and (in release) the error is invisible — see common.cuh.
|
||||
long long idx = (long long)blockIdx.x * blockDim.x + threadIdx.x;
|
||||
long long total = (long long)local_experts * num_tokens * hidden;
|
||||
if (idx >= total) return;
|
||||
|
||||
int remainder = idx % (num_tokens * hidden);
|
||||
// x_rep[expert, token, dim] = x[token, dim]
|
||||
x_rep[idx] = x[remainder];
|
||||
long long row_stride = (long long)num_tokens * hidden;
|
||||
x_rep[idx] = x[idx % row_stride];
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
@@ -112,13 +116,16 @@ __global__ void moe_bias_add_3d_bf16_kernel(
|
||||
const __nv_bfloat16* __restrict__ bias,
|
||||
int batch, int num_tokens, int dim
|
||||
) {
|
||||
int idx = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
int total = batch * num_tokens * dim;
|
||||
// 64-bit index: batch * num_tokens * dim overflows int32 at ~3.6k prefill
|
||||
// tokens (gpt-oss TP=1, 32 experts, 2*intermediate dim) — see moe_replicate.
|
||||
long long idx = (long long)blockIdx.x * blockDim.x + threadIdx.x;
|
||||
long long total = (long long)batch * num_tokens * dim;
|
||||
if (idx >= total) return;
|
||||
|
||||
int b = idx / (num_tokens * dim);
|
||||
int d = idx % dim;
|
||||
float v = __bfloat162float(x[idx]) + __bfloat162float(bias[b * dim + d]);
|
||||
long long td = (long long)num_tokens * dim;
|
||||
int b = (int)(idx / td); // < batch (small)
|
||||
int d = (int)(idx % dim); // < dim
|
||||
float v = __bfloat162float(x[idx]) + __bfloat162float(bias[(long long)b * dim + d]);
|
||||
x[idx] = __float2bfloat16(v);
|
||||
}
|
||||
|
||||
@@ -151,14 +158,16 @@ __global__ void moe_weighted_sum_bf16_kernel(
|
||||
int num_tokens, int hidden, int top_k,
|
||||
int expert_start, int local_experts
|
||||
) {
|
||||
int idx = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
int total = num_tokens * hidden;
|
||||
// 64-bit index: `local_id * expert_stride` overflows int32 for long prefills
|
||||
// (expert_stride = num_tokens * hidden), reading the wrong expert element.
|
||||
long long idx = (long long)blockIdx.x * blockDim.x + threadIdx.x;
|
||||
long long total = (long long)num_tokens * hidden;
|
||||
if (idx >= total) return;
|
||||
|
||||
int token = idx / hidden;
|
||||
int dim = idx % hidden;
|
||||
long long token = idx / hidden;
|
||||
int dim = (int)(idx % hidden);
|
||||
|
||||
int expert_stride = num_tokens * hidden; // stride between experts in expert_out
|
||||
long long expert_stride = (long long)num_tokens * hidden; // stride between experts in expert_out
|
||||
|
||||
float sum = 0.0f;
|
||||
for (int k = 0; k < top_k; k++) {
|
||||
@@ -196,9 +205,9 @@ void launch_moe_replicate_bf16(
|
||||
int num_tokens, int hidden, int local_experts,
|
||||
void* stream
|
||||
) {
|
||||
int total = local_experts * num_tokens * hidden;
|
||||
long long total = (long long)local_experts * num_tokens * hidden;
|
||||
int block = 256;
|
||||
int grid = (total + block - 1) / block;
|
||||
int grid = (int)((total + block - 1) / block);
|
||||
moe_replicate_bf16_kernel<<<grid, block, 0, (cudaStream_t)stream>>>(
|
||||
(const __nv_bfloat16*)x, (__nv_bfloat16*)x_rep,
|
||||
num_tokens, hidden, local_experts
|
||||
@@ -211,9 +220,9 @@ void launch_moe_bias_add_3d_bf16(
|
||||
int batch, int num_tokens, int dim,
|
||||
void* stream
|
||||
) {
|
||||
int total = batch * num_tokens * dim;
|
||||
long long total = (long long)batch * num_tokens * dim;
|
||||
int block = 256;
|
||||
int grid = (total + block - 1) / block;
|
||||
int grid = (int)((total + block - 1) / block);
|
||||
moe_bias_add_3d_bf16_kernel<<<grid, block, 0, (cudaStream_t)stream>>>(
|
||||
(__nv_bfloat16*)x, (const __nv_bfloat16*)bias,
|
||||
batch, num_tokens, dim
|
||||
@@ -229,9 +238,9 @@ void launch_moe_weighted_sum_bf16(
|
||||
int expert_start, int local_experts,
|
||||
void* stream
|
||||
) {
|
||||
int total = num_tokens * hidden;
|
||||
long long total = (long long)num_tokens * hidden;
|
||||
int block = 256;
|
||||
int grid = (total + block - 1) / block;
|
||||
int grid = (int)((total + block - 1) / block);
|
||||
moe_weighted_sum_bf16_kernel<<<grid, block, 0, (cudaStream_t)stream>>>(
|
||||
(const __nv_bfloat16*)expert_out,
|
||||
(const int*)topk_ids, (const float*)topk_weights,
|
||||
|
||||
Reference in New Issue
Block a user