diff --git a/crates/xserv-server/src/engine.rs b/crates/xserv-server/src/engine.rs index 5a98d3f..60a2055 100644 --- a/crates/xserv-server/src/engine.rs +++ b/crates/xserv-server/src/engine.rs @@ -38,6 +38,9 @@ struct Sequence { seq_slot: Option, sender: tokio::sync::mpsc::Sender, prefilled: bool, + /// Set when a `try_send` failed (client too slow or gone). The scheduler + /// reaps the sequence next iteration instead of blocking the decode thread. + client_stalled: bool, eos_token_id: Option, decode_buffer: Vec, created_at: Instant, @@ -370,6 +373,7 @@ impl Engine { seq_slot: None, sender: req.sender, prefilled: false, + client_stalled: false, eos_token_id: self.tokenizer.eos_token_id(), decode_buffer: Vec::new(), created_at: Instant::now(), @@ -392,7 +396,7 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) { if tokenizer.eos_token_id() == Some(token_id) { let tail = tokenizer.flush_decode_stream(&mut seq.decode_buffer); send_token_if_nonempty(seq, tail); - let _ = seq.sender.blocking_send(GenerateEvent::Done { + try_send_event(seq, GenerateEvent::Done { finish_reason: "stop".to_string(), }); return; @@ -403,7 +407,7 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) { let tail = tokenizer.flush_decode_stream(&mut seq.decode_buffer); send_token_if_nonempty(seq, text); send_token_if_nonempty(seq, tail); - let _ = seq.sender.blocking_send(GenerateEvent::Done { + try_send_event(seq, GenerateEvent::Done { finish_reason: "length".to_string(), }); } else { @@ -411,14 +415,34 @@ fn emit_token(tokenizer: &Tokenizer, seq: &mut Sequence, token_id: u32) { } } -fn send_token_if_nonempty(seq: &Sequence, text: String) { +fn send_token_if_nonempty(seq: &mut Sequence, text: String) { if !text.is_empty() { let id = *seq.generated_tokens.last().unwrap_or(&0); - let _ = seq.sender.blocking_send(GenerateEvent::Token { id, text }); + try_send_event(seq, GenerateEvent::Token { id, text }); + } +} + +/// Send an event without blocking the shared decode thread. If the client is +/// too slow (channel full) or gone (closed), flag the sequence for eviction +/// instead of blocking — one slow consumer must never stall the whole +/// continuous-batching loop. When the sequence is reaped its `sender` drops, +/// closing the channel so the client's receive loop ends rather than hanging. +fn try_send_event(seq: &mut Sequence, event: GenerateEvent) { + if let Err(err) = seq.sender.try_send(event) { + seq.client_stalled = true; + if let tokio::sync::mpsc::error::TrySendError::Full(_) = err { + eprintln!( + "[scheduler] seq {}: client too slow (stream channel full), evicting", + seq.id + ); + } } } fn is_finished(seq: &Sequence) -> bool { + if seq.client_stalled { + return true; + } if seq.generated_tokens.is_empty() { return false; } diff --git a/crates/xserv-server/src/pp_engine.rs b/crates/xserv-server/src/pp_engine.rs index 0889165..05b98a0 100644 --- a/crates/xserv-server/src/pp_engine.rs +++ b/crates/xserv-server/src/pp_engine.rs @@ -268,9 +268,12 @@ pub fn run_pp( let mut decode_buf: Vec = Vec::new(); let mut generated = 1usize; - emit_text(&tokenizer, &req, next, &mut decode_buf); + let mut stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf); let finish = loop { + if stalled { + break "error"; + } if tokenizer.is_eos(next) { break "stop"; } @@ -289,17 +292,17 @@ pub fn run_pp( send_hidden(&sc, &x, next_peer); next = token_rx.recv().expect("decode token"); generated += 1; - emit_text(&tokenizer, &req, next, &mut decode_buf); + stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf); }; let tail = tokenizer.flush_decode_stream(&mut decode_buf); if !tail.is_empty() { - let _ = req.sender.blocking_send(GenerateEvent::Token { + let _ = req.sender.try_send(GenerateEvent::Token { id: next, text: tail, }); } - let _ = req.sender.blocking_send(GenerateEvent::Done { + let _ = req.sender.try_send(GenerateEvent::Done { finish_reason: finish.to_string(), }); @@ -312,14 +315,19 @@ pub fn run_pp( } /// Stream a token's decoded text to the client (EOS contributes no text). -fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec) { +/// Returns false if the send would block (client too slow) or the client is +/// gone — the caller stops generating so the coordinator thread is free to +/// admit the next request instead of blocking on one slow consumer. +fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec) -> bool { if tokenizer.is_eos(token_id) { - return; + return true; } let text = tokenizer.decode_token_stream(token_id, buf); if !text.is_empty() { - let _ = req + return req .sender - .blocking_send(GenerateEvent::Token { id: token_id, text }); + .try_send(GenerateEvent::Token { id: token_id, text }) + .is_ok(); } + true } diff --git a/crates/xserv-server/src/tp_engine.rs b/crates/xserv-server/src/tp_engine.rs index b4dad64..d8c3fb8 100644 --- a/crates/xserv-server/src/tp_engine.rs +++ b/crates/xserv-server/src/tp_engine.rs @@ -294,9 +294,12 @@ pub fn run_tp( let mut decode_buf: Vec = Vec::new(); let mut generated = 1usize; - emit_text(&tokenizer, &req, next, &mut decode_buf); + let mut stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf); let finish = loop { + if stalled { + break "error"; + } if tokenizer.is_eos(next) { break "stop"; } @@ -317,17 +320,17 @@ pub fn run_tp( next = pick(&logits, &req.sampling, &gen_ids); gen_ids.push(next); generated += 1; - emit_text(&tokenizer, &req, next, &mut decode_buf); + stalled = !emit_text(&tokenizer, &req, next, &mut decode_buf); }; let tail = tokenizer.flush_decode_stream(&mut decode_buf); if !tail.is_empty() { - let _ = req.sender.blocking_send(GenerateEvent::Token { + let _ = req.sender.try_send(GenerateEvent::Token { id: next, text: tail, }); } - let _ = req.sender.blocking_send(GenerateEvent::Done { + let _ = req.sender.try_send(GenerateEvent::Done { finish_reason: finish.to_string(), }); @@ -340,14 +343,19 @@ pub fn run_tp( } /// Stream a token's decoded text to the client (EOS contributes no text). -fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec) { +/// Returns false if the send would block (client too slow) or the client is +/// gone — the caller stops generating so the serial coordinator thread is free +/// to admit the next request instead of blocking on one slow consumer. +fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec) -> bool { if tokenizer.is_eos(token_id) { - return; + return true; } let text = tokenizer.decode_token_stream(token_id, buf); if !text.is_empty() { - let _ = req + return req .sender - .blocking_send(GenerateEvent::Token { id: token_id, text }); + .try_send(GenerateEvent::Token { id: token_id, text }) + .is_ok(); } + true }