# Phase 12: Continuous Batching + Request Scheduler — Design Document ## Goal 实现 iteration-level 请求调度,支持多个请求并发生成 token。核心能力:同时发 N 个请求,N 个请求同时产出 token,新请求可以在 mid-generation 加入 batch。 ## 为什么需要 Continuous Batching **当前问题(串行)**: ``` 时间 → [req1 prefill][req1 decode x 100][req2 prefill][req2 decode x 50]... GPU利用: ████████████████████████████████████████████████████████████████████ req2 等了 100 个 token 的时间才开始 ``` **目标(continuous batching)**: ``` 时间 → [req1+req2 prefill][req1+req2 decode][req1 done, req3 加入][req2+req3 decode]... GPU利用: ████████████████████████████████████████████████████████████████████ req2 和 req1 同时推理,req3 在 req1 完成后立即加入 ``` ## 核心设计 ### 数据结构 ```rust pub struct Sequence { pub id: u64, pub prompt_tokens: Vec, pub generated_tokens: Vec, pub status: SeqStatus, pub max_tokens: usize, pub kv_cache: GpuKVCache, // 每个 seq 独立的 KV cache pub output_tx: mpsc::Sender, } pub enum SeqStatus { Waiting, // 在队列中等待被 admit Running, // 正在参与 batch forward Finished, // EOS 或 max_tokens 达到 } pub struct Scheduler { waiting: VecDeque, running: Vec, max_batch_size: usize, // 最大并发请求数 next_seq_id: u64, } ``` ### 调度循环(Engine 主循环) ```rust loop { // Step 1: 回收已完成的 sequence running.retain(|seq| seq.status != Finished); // Step 2: Admit 新请求(如果 running < max_batch_size) while running.len() < max_batch_size { if let Some(seq) = waiting.pop_front() { running.push(seq); } else { break; } } if running.is_empty() { // 没有任何工作,等待新请求 let new_req = request_rx.recv(); // blocking wait waiting.push_back(new_req); continue; } // Step 3: 分类 — 哪些需要 prefill,哪些需要 decode let to_prefill: 新加入的 seq(generated_tokens 为空) let to_decode: 已在运行的 seq // Step 4: 执行 for seq in to_prefill { // Prefill: 完整 prompt 一次 forward model.forward_gpu_cache(&seq.prompt_tokens, &mut seq.kv_cache); seq.status = Running; } // Decode: 每个 seq 独立做一步(当前不做 batch forward,留待优化) for seq in to_decode { let last_token = seq.last_generated_token(); let logits = model.forward_gpu_cache(&[last_token], &mut seq.kv_cache); let next = sample_greedy(&logits); seq.generated_tokens.push(next); // 发送 token 给客户端 seq.output_tx.blocking_send(Token { id: next, text: decode(next) }); // 检查完成 if next == eos || seq.generated_tokens.len() >= seq.max_tokens { seq.output_tx.blocking_send(Done); seq.status = Finished; } } // Step 5: 检查是否有新请求到达(non-blocking) while let Ok(new_req) = request_rx.try_recv() { waiting.push_back(new_req); } } ``` ### 关键设计决策 1. **每个 seq 独立 KV cache**:当前不做 batch forward(需要对齐 seq_len),而是每个 seq 独立调用 model.forward_gpu_cache。未来优化为 batched forward。 2. **Prefill 和 Decode 混合**:新加入的 seq 先 prefill(一次 forward),然后下一轮加入 decode batch。 3. **Non-blocking request receive**:decode 循环中用 `try_recv()` 检查新请求,不阻塞推理。 4. **max_batch_size**:受限于 GPU 显存(每个 seq 的 KV cache 占用)。Qwen3-8B 单卡 32GB,每个 seq 的 KV cache 约 256 tokens × 8 heads × 128 dim × 2(KV) × 2B = 1MB。可以并发 ~100 seq。实际受限于推理速度。 ## 与 Phase 13 (HTTP API) 的接口 ``` HTTP Handler Engine Thread │ │ │ ──── GenerateRequest ────────► │ │ (prompt_tokens, max_tokens, │ │ output_tx) │ │ │ │ ◄──── GenerateEvent (Token/Done) ──── │ │ (via tokio::sync::mpsc) │ │ │ ``` 多个 HTTP handler 可以同时提交请求。Engine 线程内部通过 Scheduler 管理并发。 ## 验收测试 必须通过以下测试才算 Phase 12 完成: 1. **并发 3 请求测试**:同时发 3 个请求,验证 3 个请求同时产出 token(不是串行等待) 2. **吞吐量测试**:并发请求的总 token 吞吐量应接近单请求(因为单个 seq 的 decode 是串行的) 3. **动态加入测试**:先发 1 个请求开始生成,过 2 秒再发第 2 个,验证第 2 个立即开始(不等第 1 个完成) 4. **正确性测试**:并发请求的输出内容应与单独跑每个请求一致 ## 实现计划 1. 重构 Engine:从 `while recv → generate` 改为 scheduler loop 2. 每个 Sequence 持有独立的 GpuKVCache 3. 调度循环实现 admit + prefill + decode + finish 4. HTTP API 侧改为 unbounded channel(允许多请求同时提交) 5. 编写并发测试脚本 ## 当前状态 **已实现: iteration-level scheduling**。多请求可以并发进入 batch (max_batch_size),新请求在 mid-generation 动态加入。Prefill 和 decode 阶段在每轮迭代内分离处理。 **未实现: batched GPU forward**。每个 seq 的 model forward 仍是串行调用 (per-seq forward_gpu_cache)。真正的 batched decode (多 seq 的 token 合并为一次 GPU forward) 需要 Flash Attention 的 variable-length attention 支持。Phase 14 实现了 FA2 kernel,为后续 batched forward 提供了基础。 **验证**: 8 个并发请求 (max_batch=4) 总 wall clock 22.5s,各请求延迟之和 135.0s,调度加速 6.0x。Server log 确认 `decode batch_size=4`。