- docs/12-continuous-batching.md: scheduler, sequence management, batching strategy (currently single-request, expandable) - docs/13-http-api.md: HTTP server, OpenAI-compatible API, axum architecture, SSE streaming (TODO) Phase 12 = WHAT to compute (scheduling decisions) Phase 13 = HOW to expose it (HTTP protocol layer) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
4.4 KiB
Phase 13: HTTP API + Streaming — Design Document (Milestone ③)
Goal
提供 OpenAI 兼容的 HTTP API,让 xserv 可以作为一个 serving 后端被任何 OpenAI SDK 调用。
职责划分
| 组件 | 职责 |
|---|---|
| Phase 12 (Scheduler/Engine) | 模型推理 + 请求调度 + token 生成循环 |
| Phase 13 (HTTP API) | HTTP 请求解析 → 内部格式 → 提交给 engine → 从 channel 接收 token → 编码为 HTTP 响应 |
Phase 13 不关心模型如何推理,只负责 HTTP 协议层。
技术栈
- HTTP framework: axum 0.8
- Async runtime: tokio
- Serialization: serde_json
- Channel: tokio::sync::mpsc (API ↔ Engine)
API 端点
GET /health → "ok"
GET /v1/models → {"data": [{"id": "qwen3-8b", ...}]}
POST /v1/chat/completions → JSON response (non-streaming)
POST /v1/chat/completions → SSE stream (streaming, TODO)
Architecture
Client
│ HTTP POST /v1/chat/completions
▼
┌──────────────────────────────┐
│ axum handler │
│ 1. Deserialize ChatRequest │
│ 2. Build prompt text │
│ 3. Tokenize (Mutex<Tokenizer>)│
│ 4. Create mpsc channel │
│ 5. Submit GenerateRequest │
│ 6. await tokens from rx │
│ 7. Build JSON response │
└──────────────────────────────┘
│ GenerateRequest via SyncSender
▼
┌──────────────────────────────┐
│ Engine thread (Phase 12) │
│ - recv() request │
│ - model.forward_gpu_cache() │
│ - blocking_send() tokens │
└──────────────────────────────┘
OpenAI 兼容格式
Request
{
"model": "qwen3-8b",
"messages": [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"}
],
"max_tokens": 256,
"stream": false
}
Response (non-streaming)
{
"id": "chatcmpl-xxx",
"object": "chat.completion",
"created": 1234567890,
"model": "qwen3-8b",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "Hi there!"},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8}
}
SSE Streaming (TODO)
data: {"choices":[{"delta":{"content":"Hi"}}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
当前实现状态
/health— 健康检查/v1/models— 模型列表/v1/chat/completions(non-streaming) — JSON response/v1/chat/completions(streaming) — SSE- 完整的
usage统计 (token 计数) - 错误处理 (400 for bad request, etc.)
- 多轮对话 chat template
Key Design Decisions
-
Extension vs State: 用
axum::Extension<Arc<AppState>>而不是Router::with_state,因为SyncSender不是Sync(需要 Mutex 包装)。 -
Engine 在独立 thread: GPU 同步操作 block 线程,不能放在 tokio runtime 中。
-
tokio::sync::mpsc 做 token 传输: Engine (std thread) 用
blocking_send(),API (async) 用.recv().await。跨 async/sync 边界通信。
Test Plan
- curl /health → "ok"
- curl /v1/models → JSON model list
- curl /v1/chat/completions → JSON with generated text
- Python OpenAI SDK 兼容性测试
- SSE streaming 测试
- 多轮对话测试
Takeaways
-
axum 0.8 的 Handler trait 对 Send 很严格:async fn 返回的 Future 必须是 Send。
std::sync::MutexGuard不是 Send,必须确保它不活过 await point(用 scope 或显式 drop)。 -
std::sync::mpsc::SyncSender 不是 Sync:不能直接放在
Arc<T>中被多个 async task 共享。解决方案:Mutex<SyncSender>或换用tokio::sync::mpsc::Sender(是 Sync 的)。 -
非 streaming 更简单,先跑通再加 SSE:SSE streaming 涉及
Streamtrait、lifetime 问题和复杂的类型推导。先用 collect-all-then-respond 跑通 E2E,streaming 作为增量优化。 -
Engine 加载时间 ~20s(Qwen3-8B):需要在 server 启动后等 engine ready 才接受请求,否则请求会 hang 在 channel send 上。当前靠 sync_channel(1) 的背压天然处理。