diff --git a/REPORT.md b/REPORT.md index 5861a02..79f1983 100644 --- a/REPORT.md +++ b/REPORT.md @@ -165,9 +165,12 @@ done | Config | OK/N | TTFT p50 | TTFT p90 | TPOT p50 | TPOT p90 | E2E p50 | |--------|------|----------|----------|----------|----------|---------| -| Baseline (combined) | 198/200 | 2.383s | 27.622s | 0.069s | 0.117s | 10.232s | +| Baseline linear | 198/200 | 2.383s | 27.622s | 0.069s | 0.117s | 10.232s | +| Baseline LMetric | 198/200 | 1.099s | 9.392s | 0.063s | 0.073s | 5.205s | | Elastic P2P (cap=4) | 185/196 | **1.315s** | **13.179s** | **0.066s** | **0.075s** | **5.708s** | -| **Delta** | | **-45%** | **-52%** | **-4%** | **-36%** | **-44%** | + +> Note: "Baseline linear" was run on dash0 during the initial A/B (different machine load conditions). +> "Baseline LMetric" was run on fresh-restart dash0, same conditions as "Baseline linear (fresh)" below in §3.6. ### 3.2 KV Cache Hit Ratio @@ -218,6 +221,40 @@ Key finding: elastic has **much more uniform** prefix APC across instances (std HEAVY requests (51% of traffic) dominate tail latency. Elastic offloads precisely these. +### 3.6 Routing Policy Comparison: Linear vs LMetric (OSDI'26) + +LMetric (Zhang et al., OSDI'26) replaces linear combination `score = load - α·cache_hit` with hyperparameter-free multiplication `score = P_tokens × BS`: +- **P_tokens** = pending prefill tokens on instance + new request's uncached tokens +- **BS** = batch size (waiting + running request count) + 1 + +Both experiments: 8× TP=1 fresh-restart instances on dash0, same trace (200 req, time_scale=20). + +| Policy | OK/N | TTFT p50 | TTFT p90 | TPOT p90 | E2E p50 | +|--------|------|----------|----------|----------|---------| +| Linear | 198/200 | 1.086s | 9.432s | 0.0773s | 5.423s | +| LMetric | 198/200 | 1.099s | 9.392s | 0.0727s | 5.205s | +| **Delta** | | **+1.2%** | **-0.4%** | **-5.9%** | **-4.0%** | + +Per-class breakdown: + +| Class | Linear TTFT p50 | LMetric TTFT p50 | Linear TPOT p90 | LMetric TPOT p90 | +|-------|----------------|-----------------|----------------|-----------------| +| WARM (<5k, n=46) | 0.143s | 0.134s | 0.058s | 0.061s | +| MEDIUM (5-20k, n=50) | 0.921s | 0.809s | 0.078s | 0.073s | +| HEAVY (>20k, n=102) | 4.875s | 4.943s | 0.078s | 0.074s | + +APC comparison (prefix cache hit rate per instance): + +| | Linear | LMetric | +|--|--------|---------| +| Mean | 32.5% | 30.8% | +| Std | ~22pp | ~19pp | +| Range | 3.3%–63.3% | 4.9%–67.2% | + +**Analysis**: LMetric provides modest improvements in TPOT (-5.9%) and E2E (-4.0%) through better load balancing (the multiplication naturally penalizes overloaded instances). TTFT is unchanged because HEAVY requests dominate and session affinity constrains routing freedom. APC skew is slightly reduced. The improvement is far smaller than elastic P2P offload (-44% E2E), confirming that for agentic workloads, **the bottleneck is prefill-decode interference, not routing policy**. + +Data: `outputs/ab_linear/` and `outputs/ab_lmetric/` on dash0. Logs: `/tmp/lmetric_ab_inst_*.log` (linear) and `/tmp/lmetric_inst_*.log` (LMetric). + ## 4. System-Level Analysis ### 4.1 Why Elastic Wins Despite Lower GPU Utilization @@ -256,6 +293,8 @@ Root causes: | `outputs/ab_elastic/` | dash1 | Elastic P2P cap=4 | Fair A/B elastic (§3) | | `outputs/gpu_ab_combined/` | local | Combined 8× TP=1 | Earlier run, has gpu_util.csv | | `outputs/gpu_ab_pdsep/` | local | PD-Sep 4P+4D | Earlier run, has gpu_util.csv | +| `outputs/ab_linear/` | dash0 | Linear policy, 200 req | §3.6 routing policy comparison | +| `outputs/ab_lmetric/` | dash0 | LMetric policy, 200 req | §3.6 routing policy comparison | | `outputs/exp2_combined_tp1_dp8/` | local | Combined 8× TP=1 | 1000 req, cache-aware | | `outputs/exp3_pd_sep_tp1_mooncake/` | local | PD-Sep 4P+4D Mooncake | 1000 req | @@ -265,6 +304,8 @@ Root causes: |-------------|---------|--------| | `/tmp/ab_base_$i.log` | dash0 | Baseline instances 0-7 | | `/tmp/ab_elastic_$i.log` | dash1 | Elastic instances 0-7 | +| `/tmp/lmetric_ab_inst_$i.log` | dash0 | Linear policy instances 0-7 (§3.6) | +| `/tmp/lmetric_inst_$i.log` | dash0 | LMetric policy instances 0-7 (§3.6) | Logs contain `Prefix cache hit rate` and `External prefix cache hit rate` lines for APC extraction. @@ -324,7 +365,8 @@ agentic-kv/ | Script | What it does | Key flags | |--------|-------------|-----------| -| `scripts/cache_aware_proxy.py` | Global scheduler + elastic offload proxy | `--combined`, `--offload`, `--heavy-threshold`, `--bootstrap-ports` | +| `scripts/cache_aware_proxy.py` | Global scheduler + elastic offload proxy | `--combined`, `--offload`, `--policy {linear,lmetric}`, `--heavy-threshold`, `--bootstrap-ports` | +| `scripts/run_lmetric_ab.sh` | A/B: linear vs lmetric routing policy | Runs both experiments with fresh restart | | `scripts/sample_trace.py` | Sample complete sessions from cluster trace | `--target-requests`, `--seed` | | `python -m replayer` | Replay trace against vLLM endpoint | `--time-scale`, `--max-inflight-sessions`, `--request-limit` | | `scripts/gpu_monitor.sh` | Sample nvidia-smi to CSV | Pipe to `outputs//gpu_util.csv` | @@ -337,13 +379,15 @@ agentic-kv/ 2. Cache-aware session-sticky routing is the **dominant optimization** (+24pp APC, -60% TTFT) 3. Elastic P2P offload achieves **-45% TTFT, -36% TPOT, -44% E2E** by selectively isolating heavy prefills while preserving decode cache locality 4. The GPU utilization paradox (lower util but better performance) is explained by higher per-request efficiency +5. LMetric (OSDI'26) multiplication-based routing provides modest improvement over linear (**E2E -4%, TPOT -6%**), confirming that routing policy alone has limited headroom — the bottleneck is prefill-decode interference ### Open problems: 1. GPU load imbalance (3.0× in elastic) — round-robin P fix implemented, needs validation 2. Elastic success rate (94.4%) — Mooncake transfer timeouts on >60k requests 3. Scaling to multi-machine (cross-node Mooncake transfers not yet tested) 4. Adaptive offload threshold (fixed 20k may not be optimal for all load levels) +5. Router state accuracy: proxy shadow state vs vLLM-internal exact state (TODO: vLLM → Redis → router pipeline for ablation) --- -*Generated from experiments run on 2026-05-22. Git commit: `1e86285` (A/B results) + subsequent proxy improvements.* +*Generated from experiments run on 2026-05-22. Git commits: `1e86285` (elastic A/B), `2b0ac70` (phase 1 milestone), subsequent LMetric implementation.* diff --git a/TODO.md b/TODO.md index 5800384..04d2d37 100644 --- a/TODO.md +++ b/TODO.md @@ -21,5 +21,10 @@ GPU 机器:dash0,是 8*H20 的机器,可以直接 `ssh dash0` 进行连接 3. 跑通 PD 分离,确认 PD 分离能够比普通的 PD 混合一起跑的性能要好,给出两者详细的性能对比以及原因分析 4. 判断 trace 的 pattern,是否有必要 PD 完全混合或者 PD 分离 5. 参考本地的 `~/phd/agentic-pd-hybrid`,判断是否能够实现一套 prefill-as-a-service 的架构,把重的 prefill 交给 prefill service,prefill service 能够从本地 GPU/DRAM/别的 GPU 机器上 pull KVCache,提高本地的 prefix KVCache hit ratio,不影响 decoding 的 prefill,就可以交给过去 PD 分离定义中 D-node 来做,提高 KVCache 命中率 +6. [TODO] Router 侧状态精确性 ablation:当前 router 自己维护 shadow state(ongoing_tokens, pending_prefill_tokens, cached_blocks 等),与 vLLM 引擎内部真实状态可能存在 gap(尤其是 cache eviction 导致的 APC 偏差)。需要: + - 修改 vLLM,让每个 instance 定期把内部状态(waiting queue depth, running batch size, KV cache usage, actual prefix cache hit blocks)写入 Redis + - Router 从 Redis 读取精确状态,替代当前的 proxy-side 近似 + - 对比 proxy shadow state vs Redis exact state 的 routing 决策差异和最终性能差异 + - 量化 gap 大小:哪些指标差距最大?是否影响实际 routing 质量? diff --git a/scripts/cache_aware_proxy.py b/scripts/cache_aware_proxy.py index 0a612b8..8e9b83c 100644 --- a/scripts/cache_aware_proxy.py +++ b/scripts/cache_aware_proxy.py @@ -4,10 +4,12 @@ Supports two modes: --combined URL [URL ...]: PD co-located instances (normal vLLM, no KV transfer) --prefill URL BP --decode URL: PD disaggregated instances (Mooncake KV transfer) -Routing policy (same for both modes): - score = ongoing_tokens / avg_ongoing - ALPHA * cache_hit_ratio - Normalized load prevents "rich get richer"; cache bonus gives affinity. - Session affinity: multi-turn sessions stick to same instance. +Routing policies (--policy): + linear (default): score = ongoing_tokens - ALPHA * cache_hit_tokens + lmetric: score = P_tokens * BS (LMetric, OSDI'26) + P_tokens = pending_prefill_tokens + new_uncached_tokens + BS = num_requests (waiting + running) + Session affinity: multi-turn sessions stick to same instance (all policies). """ import argparse @@ -39,6 +41,8 @@ class InstanceState: ) self.ongoing_tokens = 0 self.ongoing_decode_tokens = 0 # subset: tokens in decode phase + self.pending_prefill_tokens = 0 # tokens for requests still in prefill + self.num_requests = 0 # total in-flight requests (waiting + running) self.engine_id: dict[int, str] = {} self.dp_size = 1 self.cached_blocks: set[int] = set() @@ -109,6 +113,39 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, return instances[best_idx], best_idx +def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] | None, + session_id: str | None, input_length: int, + affinity: dict[str, int]) -> tuple[InstanceState, int]: + """LMetric routing: score = P_tokens × BS (OSDI'26). + + P_tokens = pending_prefill_tokens on instance + new request's uncached tokens. + BS = num_requests on instance + 1 (counting the new request). + """ + avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) + + if session_id and session_id in affinity: + idx = affinity[session_id] + if idx < len(instances): + inst = instances[idx] + if inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR: + return inst, idx + + best_idx, best_score = 0, float("inf") + for i, inst in enumerate(instances): + cache_hit = inst.estimate_cache_hit(token_ids) + new_prefill = max(0, input_length - cache_hit) + p_tokens = inst.pending_prefill_tokens + new_prefill + bs = inst.num_requests + 1 + score = p_tokens * bs + if score < best_score: + best_score = score + best_idx = i + + if session_id: + affinity[session_id] = best_idx + return instances[best_idx], best_idx + + global_args = None combined_instances: list[InstanceState] = [] prefill_instances: list[InstanceState] = [] @@ -159,7 +196,8 @@ async def lifespan(app: FastAPI): await init_prefill_bootstrap(combined_instances, app.state.ready) else: app.state.ready.set() - print(f"Combined mode: {len(combined_instances)} instances, offload={'ON' if global_args.offload else 'OFF'}") + policy = getattr(global_args, 'policy', 'linear') + print(f"Combined mode: {len(combined_instances)} instances, policy={policy}, offload={'ON' if global_args.offload else 'OFF'}") else: is_pd_sep = True for url, bp in global_args.prefill: @@ -219,8 +257,10 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h session-sticky instance. Only works if instances have kv_role=kv_both. Falls back to co-located if --no-offload or instances lack Mooncake. """ - best_inst, best_idx = pick_instance(combined_instances, token_ids, session_id, - input_length, session_affinity) + policy = getattr(global_args, 'policy', 'linear') if global_args else 'linear' + picker = pick_instance_lmetric if policy == 'lmetric' else pick_instance + best_inst, best_idx = picker(combined_instances, token_ids, session_id, + input_length, session_affinity) cache_hit = best_inst.estimate_cache_hit(token_ids) estimated_new = max(0, input_length - cache_hit) @@ -267,6 +307,8 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h if use_offload: d_idx = best_idx p_inst.ongoing_tokens += input_length # reserve immediately + p_inst.pending_prefill_tokens += estimated_new + p_inst.num_requests += 1 breakdown["route_class"] = "HEAVY_P2P" breakdown["offload_reason"] = offload_reason @@ -288,23 +330,31 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h inst = best_inst breakdown["routed_to"] = inst.url + breakdown["policy"] = policy inst.ongoing_tokens += input_length + inst.pending_prefill_tokens += estimated_new + inst.num_requests += 1 async def generate(): - first_token = True + prefill_done = False try: async with inst.client.stream("POST", api, json=req_data, headers=headers) as resp: resp.raise_for_status() - inst.ongoing_decode_tokens += input_length async for chunk in resp.aiter_bytes(): - if first_token: + if not prefill_done: + inst.pending_prefill_tokens -= estimated_new + inst.ongoing_decode_tokens += input_length breakdown["t_first_token"] = _time.monotonic() - first_token = False + prefill_done = True yield chunk inst.record_prefix(token_ids) finally: + if not prefill_done: + inst.pending_prefill_tokens -= estimated_new + else: + inst.ongoing_decode_tokens -= input_length inst.ongoing_tokens -= input_length - inst.ongoing_decode_tokens -= input_length + inst.num_requests -= 1 breakdown["t_done"] = _time.monotonic() _breakdown_log.append(breakdown) @@ -342,14 +392,19 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length, _breakdown_log.append(breakdown) global _offload_inflight _offload_inflight = max(0, _offload_inflight - 1) + p_inst.num_requests -= 1 raise HTTPException(status_code=502, detail="Prefill failed: %s" % e) finally: p_inst.ongoing_tokens -= input_length + p_inst.pending_prefill_tokens -= breakdown.get("estimated_new_tokens", 0) _offload_inflight = max(0, _offload_inflight - 1) + p_inst.num_requests -= 1 + # Step 2: Stream decode on d_inst (pulls KV from Mooncake) d_inst.ongoing_tokens += input_length d_inst.ongoing_decode_tokens += input_length + d_inst.num_requests += 1 breakdown["t_decode_sent"] = _time.monotonic() parsed = urllib.parse.urlparse(str(p_inst.client.base_url)) @@ -377,6 +432,7 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length, finally: d_inst.ongoing_tokens -= input_length d_inst.ongoing_decode_tokens -= input_length + d_inst.num_requests -= 1 breakdown["t_done"] = _time.monotonic() _breakdown_log.append(breakdown) @@ -485,6 +541,20 @@ async def get_breakdown(): return _breakdown_log +@app.get("/stats") +async def get_stats(): + """Return per-instance live state for debugging.""" + instances = combined_instances or prefill_instances + decode_instances + return [{ + "url": inst.url, + "ongoing_tokens": inst.ongoing_tokens, + "pending_prefill_tokens": inst.pending_prefill_tokens, + "ongoing_decode_tokens": inst.ongoing_decode_tokens, + "num_requests": inst.num_requests, + "cached_blocks": len(inst.cached_blocks), + } for inst in instances] + + def parse_args(): p = argparse.ArgumentParser(description="Unified cache-aware global scheduler") p.add_argument("--port", type=int, default=8000) @@ -502,6 +572,8 @@ def parse_args(): help="Enable Mooncake KV offload for HEAVY requests (requires kv_both instances)") p.add_argument("--bootstrap-ports", type=str, default="", help="Comma-separated bootstrap ports for combined instances (for offload mode)") + p.add_argument("--policy", type=str, default="linear", choices=["linear", "lmetric"], + help="Routing policy: linear (default) or lmetric (P_tokens × BS, OSDI'26)") args = p.parse_args() args.prefill = [] diff --git a/scripts/run_lmetric_ab.sh b/scripts/run_lmetric_ab.sh new file mode 100755 index 0000000..8fa4340 --- /dev/null +++ b/scripts/run_lmetric_ab.sh @@ -0,0 +1,149 @@ +#!/bin/bash +# A/B comparison: linear (current baseline) vs lmetric (OSDI'26) routing policy. +# Both use same 8× TP=1 combined instances, fresh restart between experiments. +set -euo pipefail + +PROJECT_DIR="/home/admin/cpfs/wjh/agentic-kv" +VENV="$PROJECT_DIR/.venv/bin" +VLLM="$VENV/vllm" +PYTHON="$VENV/python" +MODEL="/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct" +TRACE="$PROJECT_DIR/traces/sampled_1000req_seed42.jsonl" + +N_INSTANCES=8 +BASE_PORT=8000 +PROXY_PORT=9090 +REQUEST_LIMIT=200 +TIME_SCALE=20 +MAX_SESSIONS=8 + +cleanup() { + for p in $(ps aux | grep 'vllm serve' | grep -v grep | awk '{print $2}'); do kill -9 $p 2>/dev/null; done + for p in $(ps aux | grep 'cache_aware_proxy' | grep -v grep | awk '{print $2}'); do kill -9 $p 2>/dev/null; done + sleep 5 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u); do kill -9 $p 2>/dev/null; done + sleep 10 +} + +start_instances() { + echo " Starting $N_INSTANCES vLLM instances..." + for i in $(seq 0 $((N_INSTANCES - 1))); do + port=$((BASE_PORT + i)) + MASTER_PORT=$((29500 + i)) CUDA_VISIBLE_DEVICES=$i \ + $VLLM serve "$MODEL" \ + --host 0.0.0.0 --port $port \ + --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + > /tmp/lmetric_ab_inst_$i.log 2>&1 & + done + + echo " Waiting for instances..." + for i in $(seq 0 $((N_INSTANCES - 1))); do + port=$((BASE_PORT + i)) + timeout 600 bash -c "until curl -s localhost:$port/v1/models > /dev/null 2>&1; do sleep 5; done" + echo " Instance $i (port $port) ready" + done +} + +run_experiment() { + local policy=$1 + local tag=$2 + local outdir="$PROJECT_DIR/outputs/$tag" + mkdir -p "$outdir" + + echo " Starting proxy (policy=$policy)..." + $PYTHON "$PROJECT_DIR/scripts/cache_aware_proxy.py" \ + --combined $(for i in $(seq 0 $((N_INSTANCES - 1))); do echo -n "http://127.0.0.1:$((BASE_PORT + i)) "; done) \ + --policy "$policy" \ + --port $PROXY_PORT > /tmp/lmetric_ab_proxy_${policy}.log 2>&1 & + PROXY_PID=$! + sleep 3 + + # Smoke test + result=$(curl -s -m 30 http://localhost:$PROXY_PORT/v1/completions \ + -X POST -H "Content-Type: application/json" \ + -d "{\"model\":\"$MODEL\",\"prompt\":[100,200,300],\"max_tokens\":3,\"temperature\":0}" 2>&1) + if ! echo "$result" | grep -q "choices"; then + echo " ERROR: Smoke test failed: $result" + kill $PROXY_PID 2>/dev/null + return 1 + fi + echo " Smoke test passed" + + # Start GPU monitor + bash "$PROJECT_DIR/scripts/gpu_monitor.sh" > "$outdir/gpu_util.csv" & + GPU_MON_PID=$! + + # Run benchmark + echo " Running benchmark (policy=$policy, $REQUEST_LIMIT requests)..." + $PYTHON -m replayer \ + --trace "$TRACE" \ + --output "$outdir/metrics.jsonl" \ + --endpoint "http://localhost:$PROXY_PORT" \ + --model "$MODEL" \ + --time-scale $TIME_SCALE \ + --max-inflight-sessions $MAX_SESSIONS \ + --request-limit $REQUEST_LIMIT \ + -v + + # Save breakdown + curl -s http://localhost:$PROXY_PORT/breakdown > "$outdir/breakdown.json" 2>/dev/null + curl -s http://localhost:$PROXY_PORT/stats > "$outdir/stats.json" 2>/dev/null + + # Collect APC from vLLM logs + echo " Collecting APC..." + for i in $(seq 0 $((N_INSTANCES - 1))); do + pch=$(grep "Prefix cache hit rate" /tmp/lmetric_ab_inst_$i.log 2>/dev/null | tail -1 | grep -oP "Prefix cache hit rate: \K[0-9.]+" || echo "0") + echo " inst_$i: prefix=$pch%" + done | tee "$outdir/apc.txt" + + kill $GPU_MON_PID 2>/dev/null + kill $PROXY_PID 2>/dev/null + wait $PROXY_PID 2>/dev/null + echo " Done: $(wc -l < "$outdir/metrics.jsonl") requests -> $outdir" +} + +echo "================================================================" +echo " A/B: Linear vs LMetric routing policy" +echo " $(date)" +echo "================================================================" + +# Experiment 1: Linear (current baseline) +echo "" +echo "=== Experiment 1: Linear policy ===" +cleanup +start_instances +run_experiment "linear" "ab_linear" + +# Experiment 2: LMetric (OSDI'26) +echo "" +echo "=== Experiment 2: LMetric policy ===" +cleanup +start_instances +run_experiment "lmetric" "ab_lmetric" + +# Compare +echo "" +echo "================================================================" +echo " Results comparison" +echo "================================================================" +$PYTHON -c " +import json, statistics + +def summarize(path, label): + rows = [json.loads(l) for l in open(path)] + ok = [r for r in rows if not r.get('error')] + p = lambda v,q: v[min(int(q*len(v)),len(v)-1)] if v else 0 + ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')]) + tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0]) + e2es = sorted([r['latency_s'] for r in ok]) + print('%-20s OK=%3d/%3d TTFT50=%.3f TTFT90=%.3f TPOT90=%.3f E2E50=%.3f' % ( + label, len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5))) + +summarize('$PROJECT_DIR/outputs/ab_linear/metrics.jsonl', 'Linear') +summarize('$PROJECT_DIR/outputs/ab_lmetric/metrics.jsonl', 'LMetric') +" + +echo "" +echo "Done at $(date)"