From 3c9d5e260e172be89cf715715e5e4b8059c2a0e8 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Sun, 31 May 2026 00:56:33 +0800 Subject: [PATCH] server: harmony termination via is_eos + TP repetition penalty Use tokenizer.is_eos() (multi-eos) for generation termination in both PP and TP engines instead of a single eos id, so gpt-oss stops on <|return|> /<|call|>/<|endoftext|>. In the TP engine, optionally apply a repetition penalty on the greedy decode path (XSERV_REP_PENALTY>1 over XSERV_REP_WINDOW recent tokens; off by default) to break greedy repetition loops. Co-Authored-By: Claude Opus 4.8 --- crates/xserv-server/src/pp_engine.rs | 11 ++++----- crates/xserv-server/src/tp_engine.rs | 37 +++++++++++++++++++++------- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/xserv-server/src/pp_engine.rs b/crates/xserv-server/src/pp_engine.rs index 35685bd..9396fd3 100644 --- a/crates/xserv-server/src/pp_engine.rs +++ b/crates/xserv-server/src/pp_engine.rs @@ -187,7 +187,6 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece let mut sc = build_stage(model_dir, &config, 0, world, 0, max_seq_len, id); eprintln!("[pp-engine] ready (pp={world}, max_seq_len={max_seq_len})"); - let eos = tokenizer.eos_token_id(); let n_workers = world - 1; let next_peer = 1usize; let broadcast = |txs: &[mpsc::Sender], cmd: PpCommand| { @@ -220,10 +219,10 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece let mut decode_buf: Vec = Vec::new(); let mut generated = 1usize; - emit_text(&tokenizer, &req, next, eos, &mut decode_buf); + emit_text(&tokenizer, &req, next, &mut decode_buf); let finish = loop { - if eos == Some(next) { + if tokenizer.is_eos(next) { break "stop"; } if generated >= req.max_tokens { @@ -235,7 +234,7 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece send_hidden(&sc, &x, next_peer); next = token_rx.recv().expect("decode token"); generated += 1; - emit_text(&tokenizer, &req, next, eos, &mut decode_buf); + emit_text(&tokenizer, &req, next, &mut decode_buf); }; let tail = tokenizer.flush_decode_stream(&mut decode_buf); @@ -253,8 +252,8 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece } /// Stream a token's decoded text to the client (EOS contributes no text). -fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, eos: Option, buf: &mut Vec) { - if eos == Some(token_id) { +fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec) { + if tokenizer.is_eos(token_id) { return; } let text = tokenizer.decode_token_stream(token_id, buf); diff --git a/crates/xserv-server/src/tp_engine.rs b/crates/xserv-server/src/tp_engine.rs index 975a138..36602e6 100644 --- a/crates/xserv-server/src/tp_engine.rs +++ b/crates/xserv-server/src/tp_engine.rs @@ -19,7 +19,7 @@ use std::thread; use xserv_distributed::{TpContext, UniqueId}; use xserv_model::loader; -use xserv_model::{sample, GptOss, ModelConfig, PagedKVCache, Qwen3, BLOCK_SIZE}; +use xserv_model::{sample, sample_greedy_penalized, GptOss, ModelConfig, PagedKVCache, Qwen3, BLOCK_SIZE}; use xserv_tensor::{DType, Device, Tensor}; use xserv_tokenizer::Tokenizer; @@ -149,7 +149,23 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece let mut rc = build_rank(model_dir, &config, 0, world, 0, max_seq_len, Some(tp)); eprintln!("[tp-engine] ready (tp={world}, max_seq_len={max_seq_len})"); - let eos = tokenizer.eos_token_id(); + // Optional repetition penalty to break greedy repetition loops (reasoning + // models loop under pure greedy when numerics diverge from the reference). + // Off by default; XSERV_REP_PENALTY>1 enables it over the last + // XSERV_REP_WINDOW generated tokens. Applied only on the greedy path. + let rep_penalty: f32 = std::env::var("XSERV_REP_PENALTY").ok() + .and_then(|s| s.parse().ok()).unwrap_or(1.0); + let rep_window: usize = std::env::var("XSERV_REP_WINDOW").ok() + .and_then(|s| s.parse().ok()).unwrap_or(128); + let pick = |logits: &Tensor, sp: &xserv_model::SamplingParams, history: &[u32]| -> u32 { + if rep_penalty > 1.0 && sp.temperature == 0.0 { + let start = history.len().saturating_sub(rep_window); + sample_greedy_penalized(logits, &history[start..], rep_penalty) + } else { + sample(logits, sp) + } + }; + let n_workers = world - 1; let broadcast = |txs: &[mpsc::Sender], cmd: TpCommand| { for t in txs { @@ -172,14 +188,16 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece broadcast(&cmd_txs, TpCommand::Prefill { tokens: req.prompt_tokens.clone(), slot }); let logits = rc.model.forward_prefill_paged(&req.prompt_tokens, slot, &mut rc.cache); wait_acks(&ack_rx); - let mut next = sample(&logits, &req.sampling); + let mut gen_ids: Vec = Vec::new(); + let mut next = pick(&logits, &req.sampling, &gen_ids); + gen_ids.push(next); let mut decode_buf: Vec = Vec::new(); let mut generated = 1usize; - emit_text(&tokenizer, &req, next, eos, &mut decode_buf); + emit_text(&tokenizer, &req, next, &mut decode_buf); let finish = loop { - if eos == Some(next) { + if tokenizer.is_eos(next) { break "stop"; } if generated >= req.max_tokens { @@ -189,9 +207,10 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece broadcast(&cmd_txs, TpCommand::Decode { tokens: vec![next], positions: vec![pos], slots: vec![slot] }); let logits = rc.model.forward_decode_paged(&[next], &[pos], &[slot], &mut rc.cache); wait_acks(&ack_rx); - next = sample(&logits, &req.sampling); + next = pick(&logits, &req.sampling, &gen_ids); + gen_ids.push(next); generated += 1; - emit_text(&tokenizer, &req, next, eos, &mut decode_buf); + emit_text(&tokenizer, &req, next, &mut decode_buf); }; let tail = tokenizer.flush_decode_stream(&mut decode_buf); @@ -209,8 +228,8 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece } /// Stream a token's decoded text to the client (EOS contributes no text). -fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, eos: Option, buf: &mut Vec) { - if eos == Some(token_id) { +fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec) { + if tokenizer.is_eos(token_id) { return; } let text = tokenizer.decode_token_stream(token_id, buf);