server: harmony termination via is_eos + TP repetition penalty

Use tokenizer.is_eos() (multi-eos) for generation termination in both PP
and TP engines instead of a single eos id, so gpt-oss stops on <|return|>
/<|call|>/<|endoftext|>.

In the TP engine, optionally apply a repetition penalty on the greedy
decode path (XSERV_REP_PENALTY>1 over XSERV_REP_WINDOW recent tokens; off
by default) to break greedy repetition loops.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-31 00:56:33 +08:00
parent 99b212e6c1
commit 3c9d5e260e
2 changed files with 33 additions and 15 deletions

View File

@@ -187,7 +187,6 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
let mut sc = build_stage(model_dir, &config, 0, world, 0, max_seq_len, id);
eprintln!("[pp-engine] ready (pp={world}, max_seq_len={max_seq_len})");
let eos = tokenizer.eos_token_id();
let n_workers = world - 1;
let next_peer = 1usize;
let broadcast = |txs: &[mpsc::Sender<PpCommand>], cmd: PpCommand| {
@@ -220,10 +219,10 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
let mut decode_buf: Vec<u8> = Vec::new();
let mut generated = 1usize;
emit_text(&tokenizer, &req, next, eos, &mut decode_buf);
emit_text(&tokenizer, &req, next, &mut decode_buf);
let finish = loop {
if eos == Some(next) {
if tokenizer.is_eos(next) {
break "stop";
}
if generated >= req.max_tokens {
@@ -235,7 +234,7 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
send_hidden(&sc, &x, next_peer);
next = token_rx.recv().expect("decode token");
generated += 1;
emit_text(&tokenizer, &req, next, eos, &mut decode_buf);
emit_text(&tokenizer, &req, next, &mut decode_buf);
};
let tail = tokenizer.flush_decode_stream(&mut decode_buf);
@@ -253,8 +252,8 @@ pub fn run_pp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
}
/// Stream a token's decoded text to the client (EOS contributes no text).
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, eos: Option<u32>, buf: &mut Vec<u8>) {
if eos == Some(token_id) {
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) {
if tokenizer.is_eos(token_id) {
return;
}
let text = tokenizer.decode_token_stream(token_id, buf);

View File

@@ -19,7 +19,7 @@ use std::thread;
use xserv_distributed::{TpContext, UniqueId};
use xserv_model::loader;
use xserv_model::{sample, GptOss, ModelConfig, PagedKVCache, Qwen3, BLOCK_SIZE};
use xserv_model::{sample, sample_greedy_penalized, GptOss, ModelConfig, PagedKVCache, Qwen3, BLOCK_SIZE};
use xserv_tensor::{DType, Device, Tensor};
use xserv_tokenizer::Tokenizer;
@@ -149,7 +149,23 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
let mut rc = build_rank(model_dir, &config, 0, world, 0, max_seq_len, Some(tp));
eprintln!("[tp-engine] ready (tp={world}, max_seq_len={max_seq_len})");
let eos = tokenizer.eos_token_id();
// Optional repetition penalty to break greedy repetition loops (reasoning
// models loop under pure greedy when numerics diverge from the reference).
// Off by default; XSERV_REP_PENALTY>1 enables it over the last
// XSERV_REP_WINDOW generated tokens. Applied only on the greedy path.
let rep_penalty: f32 = std::env::var("XSERV_REP_PENALTY").ok()
.and_then(|s| s.parse().ok()).unwrap_or(1.0);
let rep_window: usize = std::env::var("XSERV_REP_WINDOW").ok()
.and_then(|s| s.parse().ok()).unwrap_or(128);
let pick = |logits: &Tensor, sp: &xserv_model::SamplingParams, history: &[u32]| -> u32 {
if rep_penalty > 1.0 && sp.temperature == 0.0 {
let start = history.len().saturating_sub(rep_window);
sample_greedy_penalized(logits, &history[start..], rep_penalty)
} else {
sample(logits, sp)
}
};
let n_workers = world - 1;
let broadcast = |txs: &[mpsc::Sender<TpCommand>], cmd: TpCommand| {
for t in txs {
@@ -172,14 +188,16 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
broadcast(&cmd_txs, TpCommand::Prefill { tokens: req.prompt_tokens.clone(), slot });
let logits = rc.model.forward_prefill_paged(&req.prompt_tokens, slot, &mut rc.cache);
wait_acks(&ack_rx);
let mut next = sample(&logits, &req.sampling);
let mut gen_ids: Vec<u32> = Vec::new();
let mut next = pick(&logits, &req.sampling, &gen_ids);
gen_ids.push(next);
let mut decode_buf: Vec<u8> = Vec::new();
let mut generated = 1usize;
emit_text(&tokenizer, &req, next, eos, &mut decode_buf);
emit_text(&tokenizer, &req, next, &mut decode_buf);
let finish = loop {
if eos == Some(next) {
if tokenizer.is_eos(next) {
break "stop";
}
if generated >= req.max_tokens {
@@ -189,9 +207,10 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
broadcast(&cmd_txs, TpCommand::Decode { tokens: vec![next], positions: vec![pos], slots: vec![slot] });
let logits = rc.model.forward_decode_paged(&[next], &[pos], &[slot], &mut rc.cache);
wait_acks(&ack_rx);
next = sample(&logits, &req.sampling);
next = pick(&logits, &req.sampling, &gen_ids);
gen_ids.push(next);
generated += 1;
emit_text(&tokenizer, &req, next, eos, &mut decode_buf);
emit_text(&tokenizer, &req, next, &mut decode_buf);
};
let tail = tokenizer.flush_decode_stream(&mut decode_buf);
@@ -209,8 +228,8 @@ pub fn run_tp(model_dir: &Path, world: usize, max_seq_len: usize, rx: mpsc::Rece
}
/// Stream a token's decoded text to the client (EOS contributes no text).
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, eos: Option<u32>, buf: &mut Vec<u8>) {
if eos == Some(token_id) {
fn emit_text(tokenizer: &Tokenizer, req: &GenerateRequest, token_id: u32, buf: &mut Vec<u8>) {
if tokenizer.is_eos(token_id) {
return;
}
let text = tokenizer.decode_token_stream(token_id, buf);