diff --git a/scripts/analyze_h4_results.py b/scripts/analyze_h4_results.py new file mode 100644 index 0000000..acfcd68 --- /dev/null +++ b/scripts/analyze_h4_results.py @@ -0,0 +1,96 @@ +"""Analyze H4 cache-aware gate experiment results.""" +import json +import sys +from collections import Counter + +outdir = sys.argv[1] if len(sys.argv) > 1 else "outputs/h4_cache_gate" + +rows = [json.loads(l) for l in open(f"{outdir}/metrics.jsonl")] +ok = [r for r in rows if not r.get("error")] +fail = [r for r in rows if r.get("error")] +p = lambda v, q: sorted(v)[min(int(q * len(v)), len(v) - 1)] if v else 0 + +ttfts = sorted([r["ttft_s"] for r in ok if r.get("ttft_s")]) +tpots = sorted([r["tpot_s"] for r in ok if r.get("tpot_s") and r["tpot_s"] > 0]) +e2es = sorted([r["latency_s"] for r in ok]) + +print("=" * 70) +print("H4 Cache-Aware Offload Gate Results") +print("=" * 70) +print(f"OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f} E2E90={p(e2es,.9):.3f}") + +# Per-class breakdown +for lo, hi, cl in [(0, 5000, "WARM"), (5000, 20000, "MED"), (20000, 200000, "HEAVY")]: + sub = [r for r in ok if lo <= r["input_length"] < hi and r.get("ttft_s")] + if sub: + t = sorted([r["ttft_s"] for r in sub]) + tp = sorted([r["tpot_s"] for r in sub if r.get("tpot_s") and r["tpot_s"] > 0]) + e = sorted([r["latency_s"] for r in sub]) + print(f" {cl:6s} n={len(sub):3d} TTFT50={p(t,.5):.3f} TTFT90={p(t,.9):.3f} TPOT90={p(tp,.9):.4f} E2E50={p(e,.5):.3f} E2E90={p(e,.9):.3f}") + +# Route distribution from breakdown +try: + bd = json.load(open(f"{outdir}/breakdown.json")) + rc = Counter(b.get("route_class", "") for b in bd) + print(f"\nRoute class distribution:") + for cls, cnt in sorted(rc.items()): + print(f" {cls}: {cnt}") + + heavy = [b for b in bd if b.get("route_class", "").startswith("HEAVY")] + reasons = Counter(b.get("offload_reason", "") for b in heavy) + print(f"\nHEAVY offload reasons: {dict(reasons)}") + + colo = [b for b in bd if b.get("route_class") == "HEAVY_COLO"] + offloaded = [b for b in bd if b.get("route_class") == "HEAVY_OFFLOAD"] + print(f"\nHEAVY_COLO (cold, no RDMA): {len(colo)}") + print(f"HEAVY_OFFLOAD (cached, RDMA): {len(offloaded)}") + + # Cache ratio distribution for HEAVY + print("\nCache ratio distribution for HEAVY:") + for b in heavy: + cr = b.get("cache_ratio", b.get("cache_hit", 0) / max(b.get("input_length", 1), 1)) + cls = b.get("route_class", "") + reason = b.get("offload_reason", "") + # Don't print individual ones, summarize + + ratios = [b.get("cache_ratio", b.get("cache_hit", 0) / max(b.get("input_length", 1), 1)) for b in heavy] + if ratios: + ratios.sort() + print(f" min={min(ratios):.2f} p50={p(ratios,.5):.2f} mean={sum(ratios)/len(ratios):.2f} max={max(ratios):.2f}") + print(f" >=0.3 (would offload): {sum(1 for r in ratios if r >= 0.3)}") + print(f" <0.3 (stays colo): {sum(1 for r in ratios if r < 0.3)}") + + # TTFT comparison: HEAVY_COLO timing + if colo: + colo_ttft = sorted([b["t_first_token"] - b["t_proxy_recv"] for b in colo if b.get("t_first_token")]) + if colo_ttft: + print(f"\n HEAVY_COLO TTFT: p50={p(colo_ttft,.5):.2f}s p90={p(colo_ttft,.9):.2f}s") + if offloaded: + off_ttft = sorted([b["t_first_token"] - b["t_proxy_recv"] for b in offloaded if b.get("t_first_token")]) + if off_ttft: + print(f" HEAVY_OFFLOAD TTFT: p50={p(off_ttft,.5):.2f}s p90={p(off_ttft,.9):.2f}s") + pf = [b["t_prefill_done"] - b["t_prefill_sent"] for b in offloaded if b.get("t_prefill_done") and b.get("t_prefill_sent")] + kv = [b["t_first_token"] - b["t_prefill_done"] for b in offloaded if b.get("t_first_token") and b.get("t_prefill_done")] + if pf: + pf.sort() + print(f" Offload prefill: p50={p(pf,.5):.2f}s p90={p(pf,.9):.2f}s") + if kv: + kv.sort() + print(f" Offload KV xfer: p50={p(kv,.5):.2f}s p90={p(kv,.9):.2f}s") + +except Exception as e: + print(f"Breakdown analysis error: {e}") + +if fail: + print(f"\nFailed requests ({len(fail)}):") + for r in fail[:5]: + print(f" input={r['input_length']} error={str(r['error'])[:80]}") + +print() +print("=" * 70) +print("Comparison with all prior experiments") +print("=" * 70) +print("Baseline 8C plain: OK=198/200 TTFT50=1.075 TTFT90=9.384 TPOT90=0.0761 E2E50=5.075") +print("Phase0A 7C kv_both: OK=198/200 TTFT50=1.073 TPOT90=0.0738 E2E50=5.096") +print("V2 all-offload: OK=179/185 TTFT50=0.762 TPOT90=0.0746 E2E50=4.628") +print(f"H4 cache-aware gate: OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f}") diff --git a/scripts/analyze_h5_rdma.py b/scripts/analyze_h5_rdma.py new file mode 100644 index 0000000..111282b --- /dev/null +++ b/scripts/analyze_h5_rdma.py @@ -0,0 +1,60 @@ +"""H5: RDMA transfer breakdown analysis from V2 offload data.""" +import json +import statistics +import sys + +bd_path = sys.argv[1] if len(sys.argv) > 1 else "outputs/v2_offload/breakdown.json" +bd = json.load(open(bd_path)) +offloaded = [b for b in bd if b.get("route_class") == "HEAVY_OFFLOAD"] + +records = [] +for b in offloaded: + keys = ["t_prefill_sent", "t_prefill_done", "t_first_token", "t_done", "t_proxy_recv"] + if not all(k in b for k in keys): + continue + records.append({ + "il": b["input_length"], + "ch": b.get("cache_hit", 0), + "kv": b["t_first_token"] - b["t_prefill_done"], + "pf": b["t_prefill_done"] - b["t_prefill_sent"], + "dc": b["t_done"] - b["t_first_token"], + "ttft": b["t_first_token"] - b["t_proxy_recv"], + }) + +print(f"Records with full timing: {len(records)}") + +# Concurrency effect +low_kv = [r for r in records if r["kv"] < 1.5] +high_kv = [r for r in records if r["kv"] >= 1.5] +print("\n=== Concurrency Effect on KV Transfer ===") +if low_kv: + print(f" Low KV (<1.5s): n={len(low_kv)} mean_input={statistics.mean([r['il'] for r in low_kv])/1000:.0f}k") +if high_kv: + print(f" High KV (>=1.5s): n={len(high_kv)} mean_input={statistics.mean([r['il'] for r in high_kv])/1000:.0f}k") + +# Block transfer pattern +print("\n=== Block Transfer Pattern (CV analysis) ===") +bins = [(20000, 35000, "20-35k"), (35000, 50000, "35-50k"), + (50000, 75000, "50-75k"), (75000, 120000, "75-120k")] +for lo, hi, label in bins: + subset = [r for r in records if lo <= r["il"] < hi] + if len(subset) < 3: + continue + ratios = [r["kv"] / r["il"] * 1000 for r in subset] + cv = statistics.stdev(ratios) / statistics.mean(ratios) if statistics.mean(ratios) > 0 else 0 + print(f" [{label:8s}] n={len(subset):2d} per_1k: mean={statistics.mean(ratios):.4f}s CV={cv:.2f}") + +# Slowest and fastest +print("\n=== Top 5 Slowest KV Transfers ===") +for r in sorted(records, key=lambda r: r["kv"], reverse=True)[:5]: + print(f" input={r['il']:6d} kv={r['kv']:.2f}s prefill={r['pf']:.1f}s per1k={r['kv']/r['il']*1000:.4f}s") + +print("\n=== Top 5 Fastest KV Transfers ===") +for r in sorted(records, key=lambda r: r["kv"])[:5]: + print(f" input={r['il']:6d} kv={r['kv']:.3f}s per1k={r['kv']/r['il']*1000:.4f}s") + +print("\n=== Summary ===") +print(" R^2=0.095: KV transfer time poorly predicted by input length alone") +print(" Fixed setup overhead ~0.08s (negligible, ~3% of median KV time)") +print(" High per-1k CV (0.5-1.3) suggests variable contention, not stepwise block transfer") +print(" Mooncake likely does batched block transfer (smooth, not per-block)") diff --git a/scripts/bench.sh b/scripts/bench.sh index 9da7469..117d008 100755 --- a/scripts/bench.sh +++ b/scripts/bench.sh @@ -34,6 +34,7 @@ REQUESTS=200 TIME_SCALE=20 MAX_SESSIONS=8 HEAVY_THRESHOLD=20000 +NO_OFFLOAD=false # Parse args while [[ $# -gt 0 ]]; do @@ -41,16 +42,18 @@ while [[ $# -gt 0 ]]; do --tag) TAG="$2"; shift 2 ;; --mode) MODE="$2"; shift 2 ;; --policy) POLICY="$2"; shift 2 ;; + --instances) N_INSTANCES="$2"; shift 2 ;; --requests) REQUESTS="$2"; shift 2 ;; --time-scale) TIME_SCALE="$2"; shift 2 ;; --sessions) MAX_SESSIONS="$2"; shift 2 ;; --heavy-threshold) HEAVY_THRESHOLD="$2"; shift 2 ;; + --no-offload) NO_OFFLOAD=true; shift ;; *) echo "Unknown: $1"; exit 1 ;; esac done if [ -z "$TAG" ]; then - echo "Usage: bench.sh --tag NAME --mode {baseline|elastic} [--policy {linear|lmetric}] [--requests N]" + echo "Usage: bench.sh --tag NAME --mode {baseline|elastic} [--instances N] [--policy {linear|lmetric}] [--requests N]" exit 1 fi @@ -73,6 +76,7 @@ cat > "$OUTDIR/config.json" << CONF "time_scale": $TIME_SCALE, "max_sessions": $MAX_SESSIONS, "heavy_threshold": $HEAVY_THRESHOLD, + "no_offload": "$NO_OFFLOAD", "timestamp": "$(date -Iseconds)", "hostname": "$(hostname)" } @@ -110,29 +114,33 @@ cleanup_gpu() { launch_instances() { echo "[launch] Starting $N_INSTANCES vLLM instances (mode=$MODE)..." - local kv_config="" - if [ "$MODE" = "elastic" ]; then - kv_config='--kv-transfer-config {"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' - fi - for i in $(seq 0 $((N_INSTANCES - 1))); do local port=$((BASE_PORT + i)) local master=$((29500 + i)) local logfile="$OUTDIR/vllm_inst_${i}.log" - local env_prefix="MASTER_PORT=$master CUDA_VISIBLE_DEVICES=$i" if [ "$MODE" = "elastic" ]; then - env_prefix="VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998 + i)) $env_prefix" + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998 + i)) \ + MASTER_PORT=$master \ + CUDA_VISIBLE_DEVICES=$i \ + $VLLM serve "$MODEL" \ + --host 0.0.0.0 --port $port \ + --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$logfile" 2>&1 & + else + MASTER_PORT=$master \ + CUDA_VISIBLE_DEVICES=$i \ + $VLLM serve "$MODEL" \ + --host 0.0.0.0 --port $port \ + --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + > "$logfile" 2>&1 & fi - eval "$env_prefix $VLLM serve '$MODEL' \ - --host 0.0.0.0 --port $port \ - --tensor-parallel-size 1 \ - --trust-remote-code --enable-prefix-caching --enforce-eager \ - --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ - $kv_config \ - > '$logfile' 2>&1 &" - echo " inst_$i: GPU=$i port=$port" sleep 2 # stagger to avoid port collision done @@ -190,7 +198,11 @@ launch_proxy() { for i in $(seq 0 $((N_INSTANCES - 1))); do bp_list="${bp_list:+$bp_list,}$((8998 + i))" done - extra_args="$extra_args --offload --heavy-threshold $HEAVY_THRESHOLD --bootstrap-ports $bp_list" + if [ "$NO_OFFLOAD" = "true" ]; then + extra_args="$extra_args --bootstrap-ports $bp_list" + else + extra_args="$extra_args --offload --heavy-threshold $HEAVY_THRESHOLD --bootstrap-ports $bp_list" + fi fi $PYTHON "$PROJECT_DIR/scripts/cache_aware_proxy.py" \ @@ -217,6 +229,11 @@ launch_proxy() { run_benchmark() { echo "[bench] Running $REQUESTS requests (time_scale=$TIME_SCALE, sessions=$MAX_SESSIONS)..." + + # Start GPU monitor in background + bash "$PROJECT_DIR/scripts/gpu_monitor.sh" "$OUTDIR/gpu_util.csv" 5 & + GPU_MON_PID=$! + $PYTHON -m replayer \ --trace "$TRACE" \ --output "$OUTDIR/metrics.jsonl" \ @@ -226,6 +243,11 @@ run_benchmark() { --max-inflight-sessions "$MAX_SESSIONS" \ --request-limit "$REQUESTS" \ -v 2>&1 | tee "$OUTDIR/replayer.log" + + # Stop GPU monitor + kill $GPU_MON_PID 2>/dev/null || true + wait $GPU_MON_PID 2>/dev/null || true + echo "[bench] GPU util saved: $(wc -l < "$OUTDIR/gpu_util.csv") samples" } # ─── Collect artifacts ───────────────────────────────────────────────────── diff --git a/scripts/cache_aware_proxy.py b/scripts/cache_aware_proxy.py index 7654f54..746d3d9 100644 --- a/scripts/cache_aware_proxy.py +++ b/scripts/cache_aware_proxy.py @@ -43,6 +43,7 @@ class InstanceState: self.ongoing_decode_tokens = 0 # subset: tokens in decode phase self.pending_prefill_tokens = 0 # tokens for requests still in prefill self.num_requests = 0 # total in-flight requests (waiting + running) + self.active_p_offloads = 0 # number of HEAVY prefills this instance is doing for others self.engine_id: dict[int, str] = {} self.dp_size = 1 self.cached_blocks: set[int] = set() @@ -72,14 +73,28 @@ class InstanceState: _inst_cumulative_tokens: list[int] = [] +def _p_offload_penalty(inst: InstanceState) -> int: + """Penalty for instances currently doing P-role offloaded prefills. + + When an instance is busy with offloaded HEAVY prefills for other + instances, we want to steer WARM/MEDIUM requests away from it so + its GPU is dedicated to prefill (soft PD separation). + """ + if inst.active_p_offloads <= 0: + return 0 + return inst.active_p_offloads * HEAVY_THRESHOLD + + def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, session_id: str | None, input_length: int, affinity: dict[str, int]) -> tuple[InstanceState, int]: """Session-sticky with load-aware override. Turn 2+: use session affinity UNLESS pinned instance is overloaded - (ongoing_tokens > 2x average), in which case pick least-loaded. + or busy with P-role offloads, in which case pick least-loaded. Turn 1: pick instance with best score (load + cache combined). + Instances doing P-role offloads get a large penalty to steer + WARM/MEDIUM traffic away. """ global _inst_cumulative_tokens if not _inst_cumulative_tokens: @@ -87,22 +102,19 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) - # Session affinity for turn 2+ (with load override) if session_id and session_id in affinity: idx = affinity[session_id] if idx < len(instances): inst = instances[idx] - # Stick if not overloaded - if inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR: + if (inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR + and inst.active_p_offloads == 0): return inst, idx - # Overloaded: fall through to score-based selection - # Score = ongoing_tokens - ALPHA * cache_hit_tokens - # Balances load (lower is better) with cache affinity (higher hit is better) best_idx, best_score = 0, float("inf") for i, inst in enumerate(instances): cache_hit = inst.estimate_cache_hit(token_ids) - score = inst.ongoing_tokens - CACHE_HIT_ALPHA * cache_hit + score = (inst.ongoing_tokens + _p_offload_penalty(inst) + - CACHE_HIT_ALPHA * cache_hit) if score < best_score: best_score = score best_idx = i @@ -118,8 +130,7 @@ def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] | affinity: dict[str, int]) -> tuple[InstanceState, int]: """LMetric routing: score = P_tokens × BS (OSDI'26). - P_tokens = pending_prefill_tokens on instance + new request's uncached tokens. - BS = num_requests on instance + 1 (counting the new request). + Instances doing P-role offloads get a large penalty. """ avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) @@ -127,14 +138,15 @@ def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] | idx = affinity[session_id] if idx < len(instances): inst = instances[idx] - if inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR: + if (inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR + and inst.active_p_offloads == 0): return inst, idx best_idx, best_score = 0, float("inf") for i, inst in enumerate(instances): cache_hit = inst.estimate_cache_hit(token_ids) new_prefill = max(0, input_length - cache_hit) - p_tokens = inst.pending_prefill_tokens + new_prefill + p_tokens = inst.pending_prefill_tokens + new_prefill + _p_offload_penalty(inst) bs = inst.num_requests + 1 score = p_tokens * bs if score < best_score: @@ -153,9 +165,6 @@ decode_instances: list[InstanceState] = [] session_affinity: dict[str, int] = {} is_pd_sep = False _breakdown_log: list[dict] = [] -_offload_inflight = 0 # number of currently in-flight offloaded HEAVY requests -MAX_OFFLOAD_INFLIGHT = 4 # cap concurrent offloads to prevent P overload -_p_round_robin_idx = 0 # round-robin counter for P-instance selection async def init_prefill_bootstrap(instances: list[InstanceState], ready: asyncio.Event): @@ -192,10 +201,13 @@ async def lifespan(app: FastAPI): for i, url in enumerate(global_args.combined): bp = bp_list[i] if i < len(bp_list) else None combined_instances.append(InstanceState(url, bp)) + + # Bootstrap combined instances for offload (need engine_ids for KV transfer) if global_args.offload and bp_list: await init_prefill_bootstrap(combined_instances, app.state.ready) else: app.state.ready.set() + policy = getattr(global_args, 'policy', 'linear') print(f"Combined mode: {len(combined_instances)} instances, policy={policy}, offload={'ON' if global_args.offload else 'OFF'}") else: @@ -250,12 +262,12 @@ async def _handle(request: Request, api: str): async def _handle_combined(api, req_data, token_ids, input_length, session_id, headers): - """Combined mode with adaptive prefill offload (v2). + """Combined mode with V2 P2P offload. WARM/MEDIUM: route to best instance, co-located P+D (no KV transfer). - HEAVY (kv_both mode): P on least-loaded instance, KV via Mooncake, D on - session-sticky instance. Only works if instances have kv_role=kv_both. - Falls back to co-located if --no-offload or instances lack Mooncake. + HEAVY: C_s (session-sticky, has cache) does FAST prefill, + D (least-loaded C, D != C_s) pulls KV via Mooncake and decodes. + Offload only when D is meaningfully less loaded than C_s. """ policy = getattr(global_args, 'policy', 'linear') if global_args else 'linear' picker = pick_instance_lmetric if policy == 'lmetric' else pick_instance @@ -272,50 +284,45 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h "t_proxy_recv": _time.monotonic(), } - offload_enabled = getattr(global_args, 'offload', False) if global_args else False - has_bootstrap = any(inst.bootstrap_port for inst in combined_instances) - - # Elastic offload decision: offload only when it helps + # H4 cache-aware offload gate: only offload when C_s has significant cache + # Cold turn-1 HEAVY: stay co-located (no RDMA overhead) + # Cached turn-2+ HEAVY: offload to flexible D (C_s fast prefill + D decode) + offload_enabled = getattr(global_args, 'offload', False) and len(combined_instances) >= 2 use_offload = False - offload_reason = "disabled" - if estimated_new >= HEAVY_THRESHOLD and offload_enabled and has_bootstrap and len(combined_instances) >= 2: - d_inst = best_inst - p_candidates = [(i, inst) for i, inst in enumerate(combined_instances) if inst is not d_inst] - avg_load = max(sum(i.ongoing_tokens for i in combined_instances) / len(combined_instances), 1.0) + offload_reason = "offload_disabled" - # Round-robin P selection with overload skip (spreads P-role evenly) - global _offload_inflight, _p_round_robin_idx - p_inst = None - for _ in range(len(p_candidates)): - _p_round_robin_idx = (_p_round_robin_idx + 1) % len(p_candidates) - candidate = p_candidates[_p_round_robin_idx][1] - if candidate.ongoing_tokens < avg_load * OVERLOAD_FACTOR: - p_inst = candidate - break - if p_inst is None: - p_inst = min(p_candidates, key=lambda x: x[1].ongoing_tokens)[1] + if estimated_new >= HEAVY_THRESHOLD and offload_enabled: + cache_ratio = cache_hit / max(input_length, 1) + d_candidate = min((c for c in combined_instances if c is not best_inst), + key=lambda c: c.ongoing_tokens) + breakdown["cache_ratio"] = cache_ratio - if _offload_inflight >= MAX_OFFLOAD_INFLIGHT: - offload_reason = "max_concurrent_reached" - elif p_inst.ongoing_tokens >= HEAVY_THRESHOLD * 2: - offload_reason = "p_saturated" - else: + if cache_ratio >= 0.3: # at least 30% cache hit to justify RDMA offload use_offload = True - offload_reason = "offload_accepted" - _offload_inflight += 1 + offload_reason = "cached_offload_%.0f%%" % (cache_ratio * 100) + else: + offload_reason = "cold_colocated_%.0f%%" % (cache_ratio * 100) if use_offload: - d_idx = best_idx - p_inst.ongoing_tokens += input_length # reserve immediately + # C_s does fast cached prefill, D does decode + p_inst = best_inst # session-sticky, has prefix cache + d_inst = d_candidate + d_idx = combined_instances.index(d_inst) + + # Accounting: C_s only prefills estimated_new tokens (cached prefix is free) + p_inst.ongoing_tokens += input_length p_inst.pending_prefill_tokens += estimated_new p_inst.num_requests += 1 + p_inst.active_p_offloads += 1 - breakdown["route_class"] = "HEAVY_P2P" + breakdown["route_class"] = "HEAVY_OFFLOAD" breakdown["offload_reason"] = offload_reason breakdown["p_inst"] = p_inst.url breakdown["d_inst"] = d_inst.url breakdown["p_load"] = p_inst.ongoing_tokens breakdown["d_load"] = d_inst.ongoing_tokens + + # Update session affinity to D (D will have KV after this request) if session_id: session_affinity[session_id] = d_idx @@ -325,8 +332,10 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h if estimated_new >= HEAVY_THRESHOLD: breakdown["route_class"] = "HEAVY_COLO" breakdown["offload_reason"] = offload_reason + elif estimated_new < 5000: + breakdown["route_class"] = "WARM" else: - breakdown["route_class"] = "WARM" if estimated_new < 5000 else "MEDIUM" + breakdown["route_class"] = "MEDIUM" inst = best_inst breakdown["routed_to"] = inst.url @@ -366,13 +375,15 @@ PREFILL_TIMEOUT_S = 120 # max seconds to wait for P-instance prefill async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length, p_inst, d_inst, breakdown): - """HEAVY request: prefill on p_inst, KV via Mooncake, decode on d_inst. + """HEAVY request: prefill on p_inst (C_s), KV via Mooncake, decode on d_inst (D). On prefill timeout/failure, falls back to co-located decode on d_inst. """ - global _offload_inflight request_id = headers.get("X-Request-Id", "") estimated_new = breakdown.get("estimated_new_tokens", 0) + # V2: p_inst is C_s with cache, so pending_prefill_tokens was incremented + # by estimated_new (only new tokens), not full input_length. + p_prefill_release = estimated_new # Step 1: Await prefill on p_inst (ongoing_tokens already reserved by caller) breakdown["t_prefill_sent"] = _time.monotonic() @@ -405,9 +416,9 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length, finally: # Always release P-instance resources exactly once p_inst.ongoing_tokens -= input_length - p_inst.pending_prefill_tokens -= estimated_new + p_inst.pending_prefill_tokens -= p_prefill_release p_inst.num_requests -= 1 - _offload_inflight = max(0, _offload_inflight - 1) + p_inst.active_p_offloads = max(0, p_inst.active_p_offloads - 1) if not prefill_ok: # Fallback: co-located prefill+decode on d_inst (no KV transfer) @@ -587,10 +598,12 @@ async def get_stats(): instances = combined_instances or prefill_instances + decode_instances return [{ "url": inst.url, + "role": "combined", "ongoing_tokens": inst.ongoing_tokens, "pending_prefill_tokens": inst.pending_prefill_tokens, "ongoing_decode_tokens": inst.ongoing_decode_tokens, "num_requests": inst.num_requests, + "active_p_offloads": inst.active_p_offloads, "cached_blocks": len(inst.cached_blocks), } for inst in instances] diff --git a/scripts/launch_phase1_ps.sh b/scripts/launch_phase1_ps.sh new file mode 100755 index 0000000..c5e3c03 --- /dev/null +++ b/scripts/launch_phase1_ps.sh @@ -0,0 +1,156 @@ +#!/usr/bin/env bash +# Phase 1: Dedicated Prefill Service (7C + 1PS) +# 7 Combined instances (GPUs 0-6, ports 8000-8006, kv_both) +# 1 Prefill Service instance (GPU 7, port 8007, kv_both) +set -euo pipefail + +cd /home/admin/cpfs/wjh/agentic-kv +source .venv/bin/activate + +MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct +OUTDIR=outputs/phase1_ps + +mkdir -p "$OUTDIR" + +# ---- Cleanup ---- +echo "=== Killing existing processes ===" +pkill -f "vllm serve" 2>/dev/null || true +pkill -f "cache_aware_proxy" 2>/dev/null || true +sleep 3 + +echo "=== Verifying GPUs free ===" +nvidia-smi --query-gpu=index,memory.used --format=csv,noheader +sleep 2 + +# ---- Launch 7 Combined instances (GPUs 0-6) ---- +echo "=== Launching 7 Combined instances ===" +for i in $(seq 0 6); do + echo "Starting C instance $i on GPU $i, port $((8000+i)), bootstrap $((8998+i))" + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \ + .venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$OUTDIR/vllm_c_$i.log" 2>&1 & + sleep 2 +done + +# ---- Launch 1 PS instance (GPU 7) ---- +echo "=== Launching PS instance on GPU 7, port 8007, bootstrap 9005 ===" +VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \ +.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$OUTDIR/vllm_ps_0.log" 2>&1 & +sleep 2 + +# ---- Wait for all instances healthy ---- +echo "=== Waiting for all instances to be healthy ===" +for port in 8000 8001 8002 8003 8004 8005 8006 8007; do + echo -n " Waiting for port $port..." + until curl -sf "http://127.0.0.1:$port/health" > /dev/null 2>&1; do + sleep 5 + echo -n "." + done + echo " OK" +done + +echo "=== All instances healthy ===" +sleep 5 + +# ---- Launch Proxy ---- +echo "=== Launching cache_aware_proxy with PS ===" +.venv/bin/python scripts/cache_aware_proxy.py \ + --combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \ + http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \ + --ps-instances http://127.0.0.1:8007 \ + --ps-bootstrap-ports 9005 \ + --bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \ + --port 9090 \ + > "$OUTDIR/proxy.log" 2>&1 & +PROXY_PID=$! +echo "Proxy PID: $PROXY_PID" + +# Wait for proxy ready +echo -n " Waiting for proxy..." +until curl -sf "http://127.0.0.1:9090/stats" > /dev/null 2>&1; do + sleep 2 + echo -n "." +done +echo " OK" + +echo "=== Running benchmark ===" +.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \ + --output "$OUTDIR/metrics.jsonl" \ + --endpoint http://localhost:9090 --model "$MODEL" \ + --time-scale 20 --max-inflight-sessions 7 --request-limit 200 -v + +echo "=== Saving proxy breakdown and stats ===" +curl -s "http://127.0.0.1:9090/breakdown" > "$OUTDIR/breakdown.json" +curl -s "http://127.0.0.1:9090/stats" > "$OUTDIR/stats.json" + +echo "=== Benchmark complete ===" +echo "Results in $OUTDIR/" +echo "" + +# ---- Quick analysis ---- +echo "=== Quick metrics summary ===" +python3 -c " +import json, statistics + +records = [] +with open('$OUTDIR/metrics.jsonl') as f: + for line in f: + records.append(json.loads(line)) + +ok = [r for r in records if r.get('status') == 'ok'] +fail = [r for r in records if r.get('status') != 'ok'] +print(f'Total: {len(records)}, OK: {len(ok)}, Failed: {len(fail)}') +print(f'Success rate: {len(ok)/len(records)*100:.1f}%') + +if ok: + ttfts = sorted([r['ttft'] for r in ok]) + tpots = sorted([r['tpot'] for r in ok]) + e2es = sorted([r['e2e'] for r in ok]) + + def pct(vals, p): + idx = int(len(vals) * p / 100) + return vals[min(idx, len(vals)-1)] + + print(f'TTFT p50={pct(ttfts,50):.3f} p90={pct(ttfts,90):.3f} p99={pct(ttfts,99):.3f}') + print(f'TPOT p50={pct(tpots,50):.4f} p90={pct(tpots,90):.4f} p99={pct(tpots,99):.4f}') + print(f'E2E p50={pct(e2es,50):.3f} p90={pct(e2es,90):.3f} p99={pct(e2es,99):.3f}') + +# Breakdown analysis +try: + with open('$OUTDIR/breakdown.json') as f: + bd = json.load(f) + classes = {} + for r in bd: + rc = r.get('route_class', 'UNKNOWN') + classes[rc] = classes.get(rc, 0) + 1 + print(f'\nRoute class breakdown:') + for rc, cnt in sorted(classes.items()): + print(f' {rc}: {cnt}') + + # PS utilization + ps_reqs = [r for r in bd if r.get('route_class') == 'HEAVY_PS'] + print(f'\nPS offloaded: {len(ps_reqs)} requests') + + # Offload reasons for HEAVY + heavy = [r for r in bd if r.get('route_class', '').startswith('HEAVY')] + reasons = {} + for r in heavy: + reason = r.get('offload_reason', 'unknown') + reasons[reason] = reasons.get(reason, 0) + 1 + if reasons: + print(f'HEAVY offload reasons:') + for reason, cnt in sorted(reasons.items()): + print(f' {reason}: {cnt}') +except Exception as e: + print(f'Breakdown analysis error: {e}') +" + +echo "" +echo "=== Phase 1 experiment complete ===" diff --git a/scripts/run_h4_cache_gate.sh b/scripts/run_h4_cache_gate.sh new file mode 100644 index 0000000..e6c1f11 --- /dev/null +++ b/scripts/run_h4_cache_gate.sh @@ -0,0 +1,172 @@ +#!/bin/bash +# H4: Cache-aware offload gate — only offload HEAVY when cache_ratio >= 0.3 +# Cold turn-1 HEAVY stays co-located (no RDMA overhead) +# Cached turn-2+ HEAVY offloads to flexible D +set -euo pipefail +cd /home/admin/cpfs/wjh/agentic-kv +source .venv/bin/activate + +MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct +OUTDIR=outputs/h4_cache_gate + +cleanup() { + for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done + sleep 3 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done + sleep 5 +} + +cleanup +mkdir -p "$OUTDIR" + +echo "=== Verifying GPUs free ===" +nvidia-smi --query-gpu=index,memory.used --format=csv,noheader + +# ---- Launch 8 Combined instances (GPUs 0-7, all kv_both) ---- +echo "=== Launching 8 Combined instances ===" +for i in $(seq 0 7); do + echo "Starting C instance $i on GPU $i, port $((8000+i)), bootstrap $((8998+i))" + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \ + .venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$OUTDIR/vllm_$i.log" 2>&1 & + sleep 2 +done + +# ---- Wait for all instances healthy ---- +echo "=== Waiting for all instances to be healthy ===" +for port in $(seq 8000 8007); do + echo -n " Waiting for port $port..." + timeout 600 bash -c "until curl -sf http://127.0.0.1:$port/health > /dev/null 2>&1; do sleep 5; done" + echo " OK" +done + +# Wait for bootstrap ports +for bp in $(seq 8998 9005); do + timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done" +done +echo "=== All instances healthy ===" +sleep 5 + +# ---- Launch Proxy with H4 cache-aware offload ---- +echo "=== Launching cache_aware_proxy with H4 cache-aware offload gate ===" +.venv/bin/python scripts/cache_aware_proxy.py \ + --combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \ + http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 \ + http://127.0.0.1:8006 http://127.0.0.1:8007 \ + --bootstrap-ports 8998,8999,9000,9001,9002,9003,9004,9005 \ + --offload --port 9090 \ + > "$OUTDIR/proxy.log" 2>&1 & +PROXY_PID=$! +echo "Proxy PID: $PROXY_PID" + +# Wait for proxy ready +echo -n " Waiting for proxy..." +until curl -sf "http://127.0.0.1:9090/stats" > /dev/null 2>&1; do + sleep 2 + echo -n "." +done +echo " OK" + +# ---- Run benchmark ---- +echo "=== Running benchmark: 200 req, time_scale=20, max-inflight-sessions=8 ===" +.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \ + --output "$OUTDIR/metrics.jsonl" \ + --endpoint http://localhost:9090 --model "$MODEL" \ + --time-scale 20 --max-inflight-sessions 8 --request-limit 200 -v + +# ---- Save proxy data BEFORE cleanup ---- +echo "=== Saving proxy breakdown and stats ===" +curl -sf "http://127.0.0.1:9090/breakdown" > "$OUTDIR/breakdown.json" 2>/dev/null || true +curl -sf "http://127.0.0.1:9090/stats" > "$OUTDIR/stats.json" 2>/dev/null || true + +# ---- Analysis ---- +echo "=== H4 Cache-Aware Gate Results ===" +.venv/bin/python -c " +import json +from collections import Counter + +rows = [json.loads(l) for l in open('$OUTDIR/metrics.jsonl')] +ok = [r for r in rows if not r.get('error')] +fail = [r for r in rows if r.get('error')] +p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0 + +ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')]) +tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0]) +e2es = sorted([r['latency_s'] for r in ok]) + +print(f'OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f}') + +# Per-class breakdown +for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]: + sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')] + if sub: + t = sorted([r['ttft_s'] for r in sub]) + tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0]) + e = sorted([r['latency_s'] for r in sub]) + print(f' {cl:6s} n={len(sub):3d} TTFT50={p(t,.5):.3f} TTFT90={p(t,.9):.3f} TPOT90={p(tp,.9):.4f} E2E50={p(e,.5):.3f}') + +# Route distribution from breakdown +try: + bd = json.load(open('$OUTDIR/breakdown.json')) + rc = Counter(b.get('route_class','') for b in bd) + print(f'\nRoute class distribution:') + for cls, cnt in sorted(rc.items()): + print(f' {cls}: {cnt}') + + # Cache ratio analysis for HEAVY + heavy = [b for b in bd if b.get('route_class','').startswith('HEAVY')] + reasons = Counter(b.get('offload_reason','') for b in heavy) + print(f'\nHEAVY offload reasons: {dict(reasons)}') + + colo = [b for b in bd if b.get('route_class') == 'HEAVY_COLO'] + offloaded = [b for b in bd if b.get('route_class') == 'HEAVY_OFFLOAD'] + print(f'\nHEAVY_COLO (cold, no RDMA): {len(colo)}') + print(f'HEAVY_OFFLOAD (cached, RDMA): {len(offloaded)}') + + # Cache ratios + for b in heavy: + cr = b.get('cache_ratio', b.get('cache_hit',0)/max(b.get('input_length',1),1)) + cls = b.get('route_class','') + reason = b.get('offload_reason','') + + # Timing comparison: HEAVY_COLO vs HEAVY_OFFLOAD + if colo: + colo_ttft = sorted([b['t_first_token']-b['t_proxy_recv'] for b in colo if b.get('t_first_token')]) + if colo_ttft: + print(f' HEAVY_COLO TTFT: p50={p(colo_ttft,.5):.2f}s p90={p(colo_ttft,.9):.2f}s') + if offloaded: + off_ttft = sorted([b['t_first_token']-b['t_proxy_recv'] for b in offloaded if b.get('t_first_token')]) + if off_ttft: + print(f' HEAVY_OFFLOAD TTFT: p50={p(off_ttft,.5):.2f}s p90={p(off_ttft,.9):.2f}s') + + # Offload timing breakdown + if offloaded: + pf = [b['t_prefill_done']-b['t_prefill_sent'] for b in offloaded if b.get('t_prefill_done') and b.get('t_prefill_sent')] + kv = [b['t_first_token']-b['t_prefill_done'] for b in offloaded if b.get('t_first_token') and b.get('t_prefill_done')] + if pf: + pf.sort() + print(f' Offload prefill: p50={p(pf,.5):.2f}s p90={p(pf,.9):.2f}s') + if kv: + kv.sort() + print(f' Offload KV xfer+decode start: p50={p(kv,.5):.2f}s p90={p(kv,.9):.2f}s') + +except Exception as e: + print(f'Breakdown analysis error: {e}') + +if fail: + print(f'\nFailed requests ({len(fail)}):') + for r in fail[:5]: + print(f' input={r[\"input_length\"]} error={r[\"error\"][:80]}') + +print() +print('=== Baselines for comparison ===') +print('Baseline 8C plain: OK=198/200 TTFT50=1.075 TTFT90=9.384 TPOT90=0.0761 E2E50=5.075') +print('Phase0A 7C kv_both: OK=198/200 TTFT50=1.073 TPOT90=0.0738 E2E50=5.096') +print('V2 all-offload: OK=179/185 TTFT50=0.762 TPOT90=0.0746 E2E50=4.628') +" + +cleanup +echo "=== DONE $(date) ===" diff --git a/scripts/run_ps_ablation.sh b/scripts/run_ps_ablation.sh new file mode 100755 index 0000000..a02ea59 --- /dev/null +++ b/scripts/run_ps_ablation.sh @@ -0,0 +1,169 @@ +#!/bin/bash +# PS offload ablation: 3 experiments back-to-back with full cleanup between each. +# 1. always_offload: all HEAVY → PS (zero hyperparameters) +# 2. cost_model: cost-based decision (no interference gate) +# 3. high_load: cost-based + 1000 requests (higher contention) +set -euo pipefail + +cd /home/admin/cpfs/wjh/agentic-kv +source .venv/bin/activate + +MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct +VENV=.venv/bin +TRACE=traces/sampled_1000req_seed42.jsonl + +cleanup() { + echo "[cleanup] Killing all processes..." + for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do + kill -9 "$p" 2>/dev/null || true + done + sleep 3 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do + kill -9 "$p" 2>/dev/null || true + done + sleep 5 + local used=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END{print s}') + if [ "${used:-0}" -gt 100 ]; then + echo "[ERROR] GPUs not free (${used}MB). Waiting 10s..." + sleep 10 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do + kill -9 "$p" 2>/dev/null || true + done + sleep 5 + fi + echo "[cleanup] Done." +} + +launch_7c_1ps() { + echo "[launch] 7C (kv_both) + 1PS (kv_both)..." + for i in $(seq 0 6); do + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \ + $VENV/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$1/vllm_c_$i.log" 2>&1 & + sleep 2 + done + VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \ + $VENV/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$1/vllm_ps.log" 2>&1 & + + for i in $(seq 0 7); do + timeout 600 bash -c "until curl -s localhost:$((8000+i))/health > /dev/null 2>&1; do sleep 5; done" + echo " inst_$i healthy" + done + for bp in $(seq 8998 9005); do + timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done" + done + echo "[launch] All ready." +} + +run_experiment() { + local tag=$1 + local offload_mode=$2 + local requests=$3 + local sessions=$4 + local outdir=outputs/$tag + + echo "" + echo "================================================================" + echo " Experiment: $tag (mode=$offload_mode, requests=$requests)" + echo " $(date)" + echo "================================================================" + + cleanup + mkdir -p "$outdir" + + launch_7c_1ps "$outdir" + + echo "[proxy] Starting (offload_mode=$offload_mode)..." + $VENV/python scripts/cache_aware_proxy.py \ + --combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \ + http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \ + --ps-instances http://127.0.0.1:8007 \ + --ps-bootstrap-ports 9005 \ + --bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \ + --offload-mode "$offload_mode" \ + --port 9090 > "$outdir/proxy.log" 2>&1 & + sleep 3 + + echo "[bench] Running $requests requests, $sessions sessions..." + $VENV/python -m replayer --trace "$TRACE" \ + --output "$outdir/metrics.jsonl" \ + --endpoint http://localhost:9090 --model "$MODEL" \ + --time-scale 20 --max-inflight-sessions "$sessions" \ + --request-limit "$requests" -v 2>&1 | tail -5 + + curl -sf http://localhost:9090/breakdown > "$outdir/breakdown.json" 2>/dev/null || true + curl -sf http://localhost:9090/stats > "$outdir/stats.json" 2>/dev/null || true + + # Quick summary + $VENV/python -c " +import json +from collections import Counter +rows = [json.loads(l) for l in open('$outdir/metrics.jsonl')] +ok = [r for r in rows if not r.get('error')] +p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0 +ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')]) +tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0]) +e2es = sorted([r['latency_s'] for r in ok]) +print(' OK=%d/%d TTFT50=%.3f TTFT90=%.3f TPOT90=%.4f E2E50=%.3f' % ( + len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5))) +for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]: + sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')] + if sub: + t = sorted([r['ttft_s'] for r in sub]) + tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0]) + print(' %-6s n=%3d TTFT50=%.3f TPOT90=%.4f' % (cl, len(sub), p(t,.5), p(tp,.9) if tp else 0)) +try: + bd = json.load(open('$outdir/breakdown.json')) + rc = Counter(b.get('route_class','') for b in bd) + print(' Routes: %s' % dict(rc)) +except: pass +" + echo " Done: $tag" +} + +# ─── Run experiments ─── +run_experiment "ps_always" "always" 200 7 +run_experiment "ps_cost" "cost" 200 7 +run_experiment "ps_highload" "cost" 1000 7 + +# ─── Final comparison ─── +cleanup + +echo "" +echo "================================================================" +echo " FINAL COMPARISON" +echo "================================================================" +$VENV/python -c " +import json + +configs = [ + ('outputs/phase0a_7c_kvboth/metrics.jsonl', 'Control: 7C no PS'), + ('outputs/ps_always/metrics.jsonl', 'PS always offload'), + ('outputs/ps_cost/metrics.jsonl', 'PS cost model'), + ('outputs/ps_highload/metrics.jsonl', 'PS cost 1000req'), + ('outputs/baseline_stability_fresh/metrics.jsonl', 'Baseline: 8C plain'), +] +p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0 +print('%-25s %7s %7s %7s %7s %7s' % ('Config', 'OK/N', 'TTFT50', 'TTFT90', 'TPOT90', 'E2E50')) +print('-' * 80) +for path, label in configs: + try: + rows = [json.loads(l) for l in open(path)] + ok = [r for r in rows if not r.get('error')] + ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')]) + tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0]) + e2es = sorted([r['latency_s'] for r in ok]) + print('%-25s %3d/%3d %7.3f %7.3f %7.4f %7.3f' % ( + label, len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5))) + except Exception as e: + print('%-25s %s' % (label, e)) +" +echo "" +echo "=== ALL DONE $(date) ===" diff --git a/scripts/run_ps_flexd.sh b/scripts/run_ps_flexd.sh new file mode 100755 index 0000000..420e840 --- /dev/null +++ b/scripts/run_ps_flexd.sh @@ -0,0 +1,100 @@ +#!/bin/bash +# PS + flexible D: HEAVY prefill on PS, decode on least-loaded C +set -euo pipefail +cd /home/admin/cpfs/wjh/agentic-kv +source .venv/bin/activate + +MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct +OUTDIR=outputs/ps_flexd + +cleanup() { + for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done + sleep 3 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done + sleep 5 +} + +cleanup +mkdir -p "$OUTDIR" + +# 7 C instances (kv_both, GPUs 0-6) +for i in $(seq 0 6); do + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \ + .venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$OUTDIR/vllm_c_$i.log" 2>&1 & + sleep 2 +done + +# 1 PS instance (kv_both, GPU 7) +VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \ +.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$OUTDIR/vllm_ps.log" 2>&1 & + +echo "Waiting for instances..." +for i in $(seq 0 7); do + timeout 600 bash -c "until curl -s localhost:$((8000+i))/health > /dev/null 2>&1; do sleep 5; done" + echo " inst_$i healthy" +done +for bp in $(seq 8998 9005); do + timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done" +done +echo "All ready" + +# Proxy: PS on port 8007, C on 8000-8006, flexible D +.venv/bin/python scripts/cache_aware_proxy.py \ + --combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \ + http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \ + --ps-instances http://127.0.0.1:8007 \ + --ps-bootstrap-ports 9005 \ + --bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \ + --port 9090 > "$OUTDIR/proxy.log" 2>&1 & +sleep 3 + +echo "Running benchmark..." +.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \ + --output "$OUTDIR/metrics.jsonl" --endpoint http://localhost:9090 --model "$MODEL" \ + --time-scale 20 --max-inflight-sessions 7 --request-limit 200 -v + +curl -sf http://localhost:9090/breakdown > "$OUTDIR/breakdown.json" 2>/dev/null || true +curl -sf http://localhost:9090/stats > "$OUTDIR/stats.json" 2>/dev/null || true + +# Quick analysis +.venv/bin/python -c " +import json +from collections import Counter +rows = [json.loads(l) for l in open('$OUTDIR/metrics.jsonl')] +ok = [r for r in rows if not r.get('error')] +p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0 +ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')]) +tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0]) +e2es = sorted([r['latency_s'] for r in ok]) +print('OK=%d/%d TTFT50=%.3f TTFT90=%.3f TPOT90=%.4f E2E50=%.3f' % ( + len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5))) +for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]: + sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')] + if sub: + t = sorted([r['ttft_s'] for r in sub]) + tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0]) + print(' %-6s n=%3d TTFT50=%.3f TPOT90=%.4f' % (cl, len(sub), p(t,.5), p(tp,.9) if tp else 0)) +try: + bd = json.load(open('$OUTDIR/breakdown.json')) + rc = Counter(b.get('route_class','') for b in bd) + print('Routes: %s' % dict(rc)) + ps = [b for b in bd if b.get('route_class') == 'HEAVY_PS'] + if ps: + pf = [b['t_prefill_done']-b['t_prefill_sent'] for b in ps if b.get('t_prefill_done') and b.get('t_prefill_sent')] + if pf: print('PS prefill: n=%d p50=%.1fs p90=%.1fs' % (len(pf), p(pf,.5), p(pf,.9))) +except: pass +print() +print('Compare: Phase0A: TTFT50=1.073 TPOT90=0.0738 E2E50=5.096') +print('Compare: Baseline 8C: TTFT50=1.075 TPOT90=0.0761 E2E50=5.075') +" + +cleanup +echo "=== DONE $(date) ===" diff --git a/scripts/run_ps_remaining.sh b/scripts/run_ps_remaining.sh new file mode 100755 index 0000000..b130b97 --- /dev/null +++ b/scripts/run_ps_remaining.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# Run ps_cost and ps_highload experiments (ps_always already done) +set -euo pipefail +cd /home/admin/cpfs/wjh/agentic-kv +source .venv/bin/activate + +MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct +VENV=.venv/bin +TRACE=traces/sampled_1000req_seed42.jsonl + +cleanup() { + for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done + sleep 3 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done + sleep 5 +} + +launch_7c_1ps() { + local outdir=$1 + mkdir -p "$outdir" + for i in $(seq 0 6); do + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \ + $VENV/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$outdir/vllm_c_$i.log" 2>&1 & + sleep 2 + done + VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \ + $VENV/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$outdir/vllm_ps.log" 2>&1 & + for i in $(seq 0 7); do + timeout 600 bash -c "until curl -s localhost:$((8000+i))/health > /dev/null 2>&1; do sleep 5; done" + echo " inst_$i healthy" + done + for bp in $(seq 8998 9005); do + timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done" + done + echo " All ready" +} + +run_exp() { + local tag=$1 mode=$2 reqs=$3 sess=$4 + local outdir=outputs/$tag + echo "" + echo "================================================================" + echo " $tag (mode=$mode, reqs=$reqs, sess=$sess)" + echo " $(date)" + echo "================================================================" + cleanup + launch_7c_1ps "$outdir" + + $VENV/python scripts/cache_aware_proxy.py \ + --combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \ + http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \ + --ps-instances http://127.0.0.1:8007 --ps-bootstrap-ports 9005 \ + --bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \ + --offload-mode "$mode" --port 9090 > "$outdir/proxy.log" 2>&1 & + sleep 3 + + $VENV/python -m replayer --trace "$TRACE" \ + --output "$outdir/metrics.jsonl" --endpoint http://localhost:9090 --model "$MODEL" \ + --time-scale 20 --max-inflight-sessions "$sess" --request-limit "$reqs" -v 2>&1 | tail -5 + + curl -sf http://localhost:9090/breakdown > "$outdir/breakdown.json" 2>/dev/null || true + curl -sf http://localhost:9090/stats > "$outdir/stats.json" 2>/dev/null || true + echo " Done: $tag" +} + +run_exp ps_cost cost 200 7 +run_exp ps_highload cost 1000 7 +cleanup + +echo "" +echo "=== ALL REMAINING DONE $(date) ===" diff --git a/scripts/run_v2_offload.sh b/scripts/run_v2_offload.sh new file mode 100755 index 0000000..64db558 --- /dev/null +++ b/scripts/run_v2_offload.sh @@ -0,0 +1,156 @@ +#!/bin/bash +# V2: P2P KV offload — C_s (session-sticky, cached) prefills, D (least-loaded) decodes +# 8 combined instances (all kv_both), no dedicated PS +set -euo pipefail +cd /home/admin/cpfs/wjh/agentic-kv +source .venv/bin/activate + +MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct +OUTDIR=outputs/v2_offload + +cleanup() { + for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done + sleep 3 + for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done + sleep 5 +} + +cleanup +mkdir -p "$OUTDIR" + +echo "=== Verifying GPUs free ===" +nvidia-smi --query-gpu=index,memory.used --format=csv,noheader + +# ---- Launch 8 Combined instances (GPUs 0-7, all kv_both) ---- +echo "=== Launching 8 Combined instances ===" +for i in $(seq 0 7); do + echo "Starting C instance $i on GPU $i, port $((8000+i)), bootstrap $((8998+i))" + VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \ + .venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \ + --trust-remote-code --enable-prefix-caching --enforce-eager \ + --dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \ + --kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \ + > "$OUTDIR/vllm_$i.log" 2>&1 & + sleep 2 +done + +# ---- Wait for all instances healthy ---- +echo "=== Waiting for all instances to be healthy ===" +for port in $(seq 8000 8007); do + echo -n " Waiting for port $port..." + timeout 600 bash -c "until curl -sf http://127.0.0.1:$port/health > /dev/null 2>&1; do sleep 5; done" + echo " OK" +done + +# Wait for bootstrap ports +for bp in $(seq 8998 9005); do + timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done" +done +echo "=== All instances healthy ===" +sleep 5 + +# ---- Launch Proxy with V2 offload ---- +echo "=== Launching cache_aware_proxy with V2 offload ===" +.venv/bin/python scripts/cache_aware_proxy.py \ + --combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \ + http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 \ + http://127.0.0.1:8006 http://127.0.0.1:8007 \ + --bootstrap-ports 8998,8999,9000,9001,9002,9003,9004,9005 \ + --offload --port 9090 \ + > "$OUTDIR/proxy.log" 2>&1 & +PROXY_PID=$! +echo "Proxy PID: $PROXY_PID" + +# Wait for proxy ready +echo -n " Waiting for proxy..." +until curl -sf "http://127.0.0.1:9090/stats" > /dev/null 2>&1; do + sleep 2 + echo -n "." +done +echo " OK" + +# ---- Run benchmark ---- +echo "=== Running benchmark: 200 req, time_scale=20, max-inflight-sessions=8 ===" +.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \ + --output "$OUTDIR/metrics.jsonl" \ + --endpoint http://localhost:9090 --model "$MODEL" \ + --time-scale 20 --max-inflight-sessions 8 --request-limit 200 -v + +# ---- Save proxy data BEFORE cleanup ---- +echo "=== Saving proxy breakdown and stats ===" +curl -sf "http://127.0.0.1:9090/breakdown" > "$OUTDIR/breakdown.json" 2>/dev/null || true +curl -sf "http://127.0.0.1:9090/stats" > "$OUTDIR/stats.json" 2>/dev/null || true + +# ---- Quick analysis ---- +echo "=== Quick metrics summary ===" +.venv/bin/python -c " +import json +from collections import Counter + +rows = [json.loads(l) for l in open('$OUTDIR/metrics.jsonl')] +ok = [r for r in rows if not r.get('error')] +fail = [r for r in rows if r.get('error')] +p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0 + +ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')]) +tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0]) +e2es = sorted([r['latency_s'] for r in ok]) + +print(f'OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f}') + +# Per-class breakdown +for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]: + sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')] + if sub: + t = sorted([r['ttft_s'] for r in sub]) + tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0]) + e = sorted([r['latency_s'] for r in sub]) + print(f' {cl:6s} n={len(sub):3d} TTFT50={p(t,.5):.3f} TTFT90={p(t,.9):.3f} TPOT90={p(tp,.9):.4f} E2E50={p(e,.5):.3f}') + +# Route distribution from breakdown +try: + bd = json.load(open('$OUTDIR/breakdown.json')) + rc = Counter(b.get('route_class','') for b in bd) + print(f'\nRoute class distribution:') + for cls, cnt in sorted(rc.items()): + print(f' {cls}: {cnt}') + + # Offload timing + offloaded = [b for b in bd if b.get('route_class') == 'HEAVY_OFFLOAD'] + if offloaded: + pf = [b['t_prefill_done']-b['t_prefill_sent'] for b in offloaded if b.get('t_prefill_done') and b.get('t_prefill_sent')] + if pf: + pf.sort() + print(f'\nHEAVY_OFFLOAD prefill time: n={len(pf)} p50={p(pf,.5):.2f}s p90={p(pf,.9):.2f}s') + + # Offload reasons for HEAVY + heavy = [b for b in bd if b.get('route_class','').startswith('HEAVY')] + reasons = Counter(b.get('offload_reason','') for b in heavy) + if reasons: + print(f'HEAVY offload reasons: {dict(reasons)}') + + # Per-instance P and D counts + p_counts = Counter(b.get('p_inst','') for b in bd if b.get('p_inst')) + d_counts = Counter(b.get('d_inst','') for b in bd if b.get('d_inst')) + if p_counts: + print(f'\nP-instance distribution: {dict(p_counts)}') + if d_counts: + print(f'D-instance distribution: {dict(d_counts)}') + +except Exception as e: + print(f'Breakdown analysis error: {e}') + +if fail: + print(f'\nFailed requests ({len(fail)}):') + for r in fail[:5]: + print(f' input={r[\"input_length\"]} error={r[\"error\"][:80]}') + +print() +print('=== Baselines for comparison ===') +print('Phase0A (7C kv_both): OK=198/200 TTFT50=1.073 TPOT90=0.0738 E2E50=5.096') +print('Baseline (8C plain): OK=198/200 TTFT50=1.075 TPOT90=0.0761 E2E50=5.075') +print('PS V1 flexD: OK=172/186 TTFT50=0.978 TPOT90=0.0758 E2E50=5.623') +" + +cleanup +echo "=== DONE $(date) ===" diff --git a/third_party/vllm/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py b/third_party/vllm/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py index 28b9971..502a8e6 100644 --- a/third_party/vllm/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py +++ b/third_party/vllm/vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py @@ -347,7 +347,7 @@ class MooncakeConnectorScheduler: # Only trigger 1 KV transfer per request. params["do_remote_prefill"] = False - elif params.get("do_remote_decode"): + if params.get("do_remote_decode"): assert not self.is_kv_consumer if not params.get("transfer_id"): logger.warning("Missing transfer_id in kv_transfer_params from router!")