PS experiments + H4 cache-gate + GPU profiling + Mooncake elif→if fix

Experiments run:
- Phase 0: kv_both has zero idle overhead (TPOT +1.3%, noise)
- PS V1 (cold prefill): REJECTED — PS always slower than cached C
- PS V1+flexD: 92.5% OK, HEAVY TTFT 7.8s (baseline 5.0s) — PS bottleneck
- V2 (C_s prefill + flexible D): E2E -9% but 6 errors, RDMA bimodal
- H4 (cache-gate): 198/200 OK, GPU imbalance 4.0x→2.0x, but HEAVY_OFFLOAD
  TTFT=11.5s due to RDMA. HEAVY_COLO improved 10.5% from better balance.
- H5: Mooncake RDMA transfer R²=0.095, bimodal (0.6s or 18-30s)

Key findings:
- Mooncake lacks layerwise KV transfer → RDMA is pure sequential overhead
- 92% of HEAVY are turn-1 cold → offloading cold requests always loses
- GPU balance improvement from routing IS real (-10.5% HEAVY_COLO TTFT)
- RDMA transfer negates the routing benefit for offloaded requests

Code changes:
- bench.sh: add GPU timeline monitoring (gpu_monitor.sh during benchmark)
- cache_aware_proxy.py: H4 cache-gate, flexible D, PS routing
- mooncake_connector.py: elif→if fix (allow dual prefill+decode flags)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-23 02:14:37 +08:00
parent 098d86385a
commit 3bc37cc6d5
11 changed files with 1095 additions and 72 deletions

View File

@@ -0,0 +1,96 @@
"""Analyze H4 cache-aware gate experiment results."""
import json
import sys
from collections import Counter
outdir = sys.argv[1] if len(sys.argv) > 1 else "outputs/h4_cache_gate"
rows = [json.loads(l) for l in open(f"{outdir}/metrics.jsonl")]
ok = [r for r in rows if not r.get("error")]
fail = [r for r in rows if r.get("error")]
p = lambda v, q: sorted(v)[min(int(q * len(v)), len(v) - 1)] if v else 0
ttfts = sorted([r["ttft_s"] for r in ok if r.get("ttft_s")])
tpots = sorted([r["tpot_s"] for r in ok if r.get("tpot_s") and r["tpot_s"] > 0])
e2es = sorted([r["latency_s"] for r in ok])
print("=" * 70)
print("H4 Cache-Aware Offload Gate Results")
print("=" * 70)
print(f"OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f} E2E90={p(e2es,.9):.3f}")
# Per-class breakdown
for lo, hi, cl in [(0, 5000, "WARM"), (5000, 20000, "MED"), (20000, 200000, "HEAVY")]:
sub = [r for r in ok if lo <= r["input_length"] < hi and r.get("ttft_s")]
if sub:
t = sorted([r["ttft_s"] for r in sub])
tp = sorted([r["tpot_s"] for r in sub if r.get("tpot_s") and r["tpot_s"] > 0])
e = sorted([r["latency_s"] for r in sub])
print(f" {cl:6s} n={len(sub):3d} TTFT50={p(t,.5):.3f} TTFT90={p(t,.9):.3f} TPOT90={p(tp,.9):.4f} E2E50={p(e,.5):.3f} E2E90={p(e,.9):.3f}")
# Route distribution from breakdown
try:
bd = json.load(open(f"{outdir}/breakdown.json"))
rc = Counter(b.get("route_class", "") for b in bd)
print(f"\nRoute class distribution:")
for cls, cnt in sorted(rc.items()):
print(f" {cls}: {cnt}")
heavy = [b for b in bd if b.get("route_class", "").startswith("HEAVY")]
reasons = Counter(b.get("offload_reason", "") for b in heavy)
print(f"\nHEAVY offload reasons: {dict(reasons)}")
colo = [b for b in bd if b.get("route_class") == "HEAVY_COLO"]
offloaded = [b for b in bd if b.get("route_class") == "HEAVY_OFFLOAD"]
print(f"\nHEAVY_COLO (cold, no RDMA): {len(colo)}")
print(f"HEAVY_OFFLOAD (cached, RDMA): {len(offloaded)}")
# Cache ratio distribution for HEAVY
print("\nCache ratio distribution for HEAVY:")
for b in heavy:
cr = b.get("cache_ratio", b.get("cache_hit", 0) / max(b.get("input_length", 1), 1))
cls = b.get("route_class", "")
reason = b.get("offload_reason", "")
# Don't print individual ones, summarize
ratios = [b.get("cache_ratio", b.get("cache_hit", 0) / max(b.get("input_length", 1), 1)) for b in heavy]
if ratios:
ratios.sort()
print(f" min={min(ratios):.2f} p50={p(ratios,.5):.2f} mean={sum(ratios)/len(ratios):.2f} max={max(ratios):.2f}")
print(f" >=0.3 (would offload): {sum(1 for r in ratios if r >= 0.3)}")
print(f" <0.3 (stays colo): {sum(1 for r in ratios if r < 0.3)}")
# TTFT comparison: HEAVY_COLO timing
if colo:
colo_ttft = sorted([b["t_first_token"] - b["t_proxy_recv"] for b in colo if b.get("t_first_token")])
if colo_ttft:
print(f"\n HEAVY_COLO TTFT: p50={p(colo_ttft,.5):.2f}s p90={p(colo_ttft,.9):.2f}s")
if offloaded:
off_ttft = sorted([b["t_first_token"] - b["t_proxy_recv"] for b in offloaded if b.get("t_first_token")])
if off_ttft:
print(f" HEAVY_OFFLOAD TTFT: p50={p(off_ttft,.5):.2f}s p90={p(off_ttft,.9):.2f}s")
pf = [b["t_prefill_done"] - b["t_prefill_sent"] for b in offloaded if b.get("t_prefill_done") and b.get("t_prefill_sent")]
kv = [b["t_first_token"] - b["t_prefill_done"] for b in offloaded if b.get("t_first_token") and b.get("t_prefill_done")]
if pf:
pf.sort()
print(f" Offload prefill: p50={p(pf,.5):.2f}s p90={p(pf,.9):.2f}s")
if kv:
kv.sort()
print(f" Offload KV xfer: p50={p(kv,.5):.2f}s p90={p(kv,.9):.2f}s")
except Exception as e:
print(f"Breakdown analysis error: {e}")
if fail:
print(f"\nFailed requests ({len(fail)}):")
for r in fail[:5]:
print(f" input={r['input_length']} error={str(r['error'])[:80]}")
print()
print("=" * 70)
print("Comparison with all prior experiments")
print("=" * 70)
print("Baseline 8C plain: OK=198/200 TTFT50=1.075 TTFT90=9.384 TPOT90=0.0761 E2E50=5.075")
print("Phase0A 7C kv_both: OK=198/200 TTFT50=1.073 TPOT90=0.0738 E2E50=5.096")
print("V2 all-offload: OK=179/185 TTFT50=0.762 TPOT90=0.0746 E2E50=4.628")
print(f"H4 cache-aware gate: OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f}")

View File

@@ -0,0 +1,60 @@
"""H5: RDMA transfer breakdown analysis from V2 offload data."""
import json
import statistics
import sys
bd_path = sys.argv[1] if len(sys.argv) > 1 else "outputs/v2_offload/breakdown.json"
bd = json.load(open(bd_path))
offloaded = [b for b in bd if b.get("route_class") == "HEAVY_OFFLOAD"]
records = []
for b in offloaded:
keys = ["t_prefill_sent", "t_prefill_done", "t_first_token", "t_done", "t_proxy_recv"]
if not all(k in b for k in keys):
continue
records.append({
"il": b["input_length"],
"ch": b.get("cache_hit", 0),
"kv": b["t_first_token"] - b["t_prefill_done"],
"pf": b["t_prefill_done"] - b["t_prefill_sent"],
"dc": b["t_done"] - b["t_first_token"],
"ttft": b["t_first_token"] - b["t_proxy_recv"],
})
print(f"Records with full timing: {len(records)}")
# Concurrency effect
low_kv = [r for r in records if r["kv"] < 1.5]
high_kv = [r for r in records if r["kv"] >= 1.5]
print("\n=== Concurrency Effect on KV Transfer ===")
if low_kv:
print(f" Low KV (<1.5s): n={len(low_kv)} mean_input={statistics.mean([r['il'] for r in low_kv])/1000:.0f}k")
if high_kv:
print(f" High KV (>=1.5s): n={len(high_kv)} mean_input={statistics.mean([r['il'] for r in high_kv])/1000:.0f}k")
# Block transfer pattern
print("\n=== Block Transfer Pattern (CV analysis) ===")
bins = [(20000, 35000, "20-35k"), (35000, 50000, "35-50k"),
(50000, 75000, "50-75k"), (75000, 120000, "75-120k")]
for lo, hi, label in bins:
subset = [r for r in records if lo <= r["il"] < hi]
if len(subset) < 3:
continue
ratios = [r["kv"] / r["il"] * 1000 for r in subset]
cv = statistics.stdev(ratios) / statistics.mean(ratios) if statistics.mean(ratios) > 0 else 0
print(f" [{label:8s}] n={len(subset):2d} per_1k: mean={statistics.mean(ratios):.4f}s CV={cv:.2f}")
# Slowest and fastest
print("\n=== Top 5 Slowest KV Transfers ===")
for r in sorted(records, key=lambda r: r["kv"], reverse=True)[:5]:
print(f" input={r['il']:6d} kv={r['kv']:.2f}s prefill={r['pf']:.1f}s per1k={r['kv']/r['il']*1000:.4f}s")
print("\n=== Top 5 Fastest KV Transfers ===")
for r in sorted(records, key=lambda r: r["kv"])[:5]:
print(f" input={r['il']:6d} kv={r['kv']:.3f}s per1k={r['kv']/r['il']*1000:.4f}s")
print("\n=== Summary ===")
print(" R^2=0.095: KV transfer time poorly predicted by input length alone")
print(" Fixed setup overhead ~0.08s (negligible, ~3% of median KV time)")
print(" High per-1k CV (0.5-1.3) suggests variable contention, not stepwise block transfer")
print(" Mooncake likely does batched block transfer (smooth, not per-block)")

View File

@@ -34,6 +34,7 @@ REQUESTS=200
TIME_SCALE=20 TIME_SCALE=20
MAX_SESSIONS=8 MAX_SESSIONS=8
HEAVY_THRESHOLD=20000 HEAVY_THRESHOLD=20000
NO_OFFLOAD=false
# Parse args # Parse args
while [[ $# -gt 0 ]]; do while [[ $# -gt 0 ]]; do
@@ -41,16 +42,18 @@ while [[ $# -gt 0 ]]; do
--tag) TAG="$2"; shift 2 ;; --tag) TAG="$2"; shift 2 ;;
--mode) MODE="$2"; shift 2 ;; --mode) MODE="$2"; shift 2 ;;
--policy) POLICY="$2"; shift 2 ;; --policy) POLICY="$2"; shift 2 ;;
--instances) N_INSTANCES="$2"; shift 2 ;;
--requests) REQUESTS="$2"; shift 2 ;; --requests) REQUESTS="$2"; shift 2 ;;
--time-scale) TIME_SCALE="$2"; shift 2 ;; --time-scale) TIME_SCALE="$2"; shift 2 ;;
--sessions) MAX_SESSIONS="$2"; shift 2 ;; --sessions) MAX_SESSIONS="$2"; shift 2 ;;
--heavy-threshold) HEAVY_THRESHOLD="$2"; shift 2 ;; --heavy-threshold) HEAVY_THRESHOLD="$2"; shift 2 ;;
--no-offload) NO_OFFLOAD=true; shift ;;
*) echo "Unknown: $1"; exit 1 ;; *) echo "Unknown: $1"; exit 1 ;;
esac esac
done done
if [ -z "$TAG" ]; then if [ -z "$TAG" ]; then
echo "Usage: bench.sh --tag NAME --mode {baseline|elastic} [--policy {linear|lmetric}] [--requests N]" echo "Usage: bench.sh --tag NAME --mode {baseline|elastic} [--instances N] [--policy {linear|lmetric}] [--requests N]"
exit 1 exit 1
fi fi
@@ -73,6 +76,7 @@ cat > "$OUTDIR/config.json" << CONF
"time_scale": $TIME_SCALE, "time_scale": $TIME_SCALE,
"max_sessions": $MAX_SESSIONS, "max_sessions": $MAX_SESSIONS,
"heavy_threshold": $HEAVY_THRESHOLD, "heavy_threshold": $HEAVY_THRESHOLD,
"no_offload": "$NO_OFFLOAD",
"timestamp": "$(date -Iseconds)", "timestamp": "$(date -Iseconds)",
"hostname": "$(hostname)" "hostname": "$(hostname)"
} }
@@ -110,29 +114,33 @@ cleanup_gpu() {
launch_instances() { launch_instances() {
echo "[launch] Starting $N_INSTANCES vLLM instances (mode=$MODE)..." echo "[launch] Starting $N_INSTANCES vLLM instances (mode=$MODE)..."
local kv_config=""
if [ "$MODE" = "elastic" ]; then
kv_config='--kv-transfer-config {"kv_connector":"MooncakeConnector","kv_role":"kv_both"}'
fi
for i in $(seq 0 $((N_INSTANCES - 1))); do for i in $(seq 0 $((N_INSTANCES - 1))); do
local port=$((BASE_PORT + i)) local port=$((BASE_PORT + i))
local master=$((29500 + i)) local master=$((29500 + i))
local logfile="$OUTDIR/vllm_inst_${i}.log" local logfile="$OUTDIR/vllm_inst_${i}.log"
local env_prefix="MASTER_PORT=$master CUDA_VISIBLE_DEVICES=$i"
if [ "$MODE" = "elastic" ]; then if [ "$MODE" = "elastic" ]; then
env_prefix="VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998 + i)) $env_prefix" VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998 + i)) \
MASTER_PORT=$master \
CUDA_VISIBLE_DEVICES=$i \
$VLLM serve "$MODEL" \
--host 0.0.0.0 --port $port \
--tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$logfile" 2>&1 &
else
MASTER_PORT=$master \
CUDA_VISIBLE_DEVICES=$i \
$VLLM serve "$MODEL" \
--host 0.0.0.0 --port $port \
--tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
> "$logfile" 2>&1 &
fi fi
eval "$env_prefix $VLLM serve '$MODEL' \
--host 0.0.0.0 --port $port \
--tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
$kv_config \
> '$logfile' 2>&1 &"
echo " inst_$i: GPU=$i port=$port" echo " inst_$i: GPU=$i port=$port"
sleep 2 # stagger to avoid port collision sleep 2 # stagger to avoid port collision
done done
@@ -190,7 +198,11 @@ launch_proxy() {
for i in $(seq 0 $((N_INSTANCES - 1))); do for i in $(seq 0 $((N_INSTANCES - 1))); do
bp_list="${bp_list:+$bp_list,}$((8998 + i))" bp_list="${bp_list:+$bp_list,}$((8998 + i))"
done done
extra_args="$extra_args --offload --heavy-threshold $HEAVY_THRESHOLD --bootstrap-ports $bp_list" if [ "$NO_OFFLOAD" = "true" ]; then
extra_args="$extra_args --bootstrap-ports $bp_list"
else
extra_args="$extra_args --offload --heavy-threshold $HEAVY_THRESHOLD --bootstrap-ports $bp_list"
fi
fi fi
$PYTHON "$PROJECT_DIR/scripts/cache_aware_proxy.py" \ $PYTHON "$PROJECT_DIR/scripts/cache_aware_proxy.py" \
@@ -217,6 +229,11 @@ launch_proxy() {
run_benchmark() { run_benchmark() {
echo "[bench] Running $REQUESTS requests (time_scale=$TIME_SCALE, sessions=$MAX_SESSIONS)..." echo "[bench] Running $REQUESTS requests (time_scale=$TIME_SCALE, sessions=$MAX_SESSIONS)..."
# Start GPU monitor in background
bash "$PROJECT_DIR/scripts/gpu_monitor.sh" "$OUTDIR/gpu_util.csv" 5 &
GPU_MON_PID=$!
$PYTHON -m replayer \ $PYTHON -m replayer \
--trace "$TRACE" \ --trace "$TRACE" \
--output "$OUTDIR/metrics.jsonl" \ --output "$OUTDIR/metrics.jsonl" \
@@ -226,6 +243,11 @@ run_benchmark() {
--max-inflight-sessions "$MAX_SESSIONS" \ --max-inflight-sessions "$MAX_SESSIONS" \
--request-limit "$REQUESTS" \ --request-limit "$REQUESTS" \
-v 2>&1 | tee "$OUTDIR/replayer.log" -v 2>&1 | tee "$OUTDIR/replayer.log"
# Stop GPU monitor
kill $GPU_MON_PID 2>/dev/null || true
wait $GPU_MON_PID 2>/dev/null || true
echo "[bench] GPU util saved: $(wc -l < "$OUTDIR/gpu_util.csv") samples"
} }
# ─── Collect artifacts ───────────────────────────────────────────────────── # ─── Collect artifacts ─────────────────────────────────────────────────────

View File

@@ -43,6 +43,7 @@ class InstanceState:
self.ongoing_decode_tokens = 0 # subset: tokens in decode phase self.ongoing_decode_tokens = 0 # subset: tokens in decode phase
self.pending_prefill_tokens = 0 # tokens for requests still in prefill self.pending_prefill_tokens = 0 # tokens for requests still in prefill
self.num_requests = 0 # total in-flight requests (waiting + running) self.num_requests = 0 # total in-flight requests (waiting + running)
self.active_p_offloads = 0 # number of HEAVY prefills this instance is doing for others
self.engine_id: dict[int, str] = {} self.engine_id: dict[int, str] = {}
self.dp_size = 1 self.dp_size = 1
self.cached_blocks: set[int] = set() self.cached_blocks: set[int] = set()
@@ -72,14 +73,28 @@ class InstanceState:
_inst_cumulative_tokens: list[int] = [] _inst_cumulative_tokens: list[int] = []
def _p_offload_penalty(inst: InstanceState) -> int:
"""Penalty for instances currently doing P-role offloaded prefills.
When an instance is busy with offloaded HEAVY prefills for other
instances, we want to steer WARM/MEDIUM requests away from it so
its GPU is dedicated to prefill (soft PD separation).
"""
if inst.active_p_offloads <= 0:
return 0
return inst.active_p_offloads * HEAVY_THRESHOLD
def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, def pick_instance(instances: list[InstanceState], token_ids: list[int] | None,
session_id: str | None, input_length: int, session_id: str | None, input_length: int,
affinity: dict[str, int]) -> tuple[InstanceState, int]: affinity: dict[str, int]) -> tuple[InstanceState, int]:
"""Session-sticky with load-aware override. """Session-sticky with load-aware override.
Turn 2+: use session affinity UNLESS pinned instance is overloaded Turn 2+: use session affinity UNLESS pinned instance is overloaded
(ongoing_tokens > 2x average), in which case pick least-loaded. or busy with P-role offloads, in which case pick least-loaded.
Turn 1: pick instance with best score (load + cache combined). Turn 1: pick instance with best score (load + cache combined).
Instances doing P-role offloads get a large penalty to steer
WARM/MEDIUM traffic away.
""" """
global _inst_cumulative_tokens global _inst_cumulative_tokens
if not _inst_cumulative_tokens: if not _inst_cumulative_tokens:
@@ -87,22 +102,19 @@ def pick_instance(instances: list[InstanceState], token_ids: list[int] | None,
avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0)
# Session affinity for turn 2+ (with load override)
if session_id and session_id in affinity: if session_id and session_id in affinity:
idx = affinity[session_id] idx = affinity[session_id]
if idx < len(instances): if idx < len(instances):
inst = instances[idx] inst = instances[idx]
# Stick if not overloaded if (inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR
if inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR: and inst.active_p_offloads == 0):
return inst, idx return inst, idx
# Overloaded: fall through to score-based selection
# Score = ongoing_tokens - ALPHA * cache_hit_tokens
# Balances load (lower is better) with cache affinity (higher hit is better)
best_idx, best_score = 0, float("inf") best_idx, best_score = 0, float("inf")
for i, inst in enumerate(instances): for i, inst in enumerate(instances):
cache_hit = inst.estimate_cache_hit(token_ids) cache_hit = inst.estimate_cache_hit(token_ids)
score = inst.ongoing_tokens - CACHE_HIT_ALPHA * cache_hit score = (inst.ongoing_tokens + _p_offload_penalty(inst)
- CACHE_HIT_ALPHA * cache_hit)
if score < best_score: if score < best_score:
best_score = score best_score = score
best_idx = i best_idx = i
@@ -118,8 +130,7 @@ def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] |
affinity: dict[str, int]) -> tuple[InstanceState, int]: affinity: dict[str, int]) -> tuple[InstanceState, int]:
"""LMetric routing: score = P_tokens × BS (OSDI'26). """LMetric routing: score = P_tokens × BS (OSDI'26).
P_tokens = pending_prefill_tokens on instance + new request's uncached tokens. Instances doing P-role offloads get a large penalty.
BS = num_requests on instance + 1 (counting the new request).
""" """
avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0)
@@ -127,14 +138,15 @@ def pick_instance_lmetric(instances: list[InstanceState], token_ids: list[int] |
idx = affinity[session_id] idx = affinity[session_id]
if idx < len(instances): if idx < len(instances):
inst = instances[idx] inst = instances[idx]
if inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR: if (inst.ongoing_tokens <= avg_load * OVERLOAD_FACTOR
and inst.active_p_offloads == 0):
return inst, idx return inst, idx
best_idx, best_score = 0, float("inf") best_idx, best_score = 0, float("inf")
for i, inst in enumerate(instances): for i, inst in enumerate(instances):
cache_hit = inst.estimate_cache_hit(token_ids) cache_hit = inst.estimate_cache_hit(token_ids)
new_prefill = max(0, input_length - cache_hit) new_prefill = max(0, input_length - cache_hit)
p_tokens = inst.pending_prefill_tokens + new_prefill p_tokens = inst.pending_prefill_tokens + new_prefill + _p_offload_penalty(inst)
bs = inst.num_requests + 1 bs = inst.num_requests + 1
score = p_tokens * bs score = p_tokens * bs
if score < best_score: if score < best_score:
@@ -153,9 +165,6 @@ decode_instances: list[InstanceState] = []
session_affinity: dict[str, int] = {} session_affinity: dict[str, int] = {}
is_pd_sep = False is_pd_sep = False
_breakdown_log: list[dict] = [] _breakdown_log: list[dict] = []
_offload_inflight = 0 # number of currently in-flight offloaded HEAVY requests
MAX_OFFLOAD_INFLIGHT = 4 # cap concurrent offloads to prevent P overload
_p_round_robin_idx = 0 # round-robin counter for P-instance selection
async def init_prefill_bootstrap(instances: list[InstanceState], ready: asyncio.Event): async def init_prefill_bootstrap(instances: list[InstanceState], ready: asyncio.Event):
@@ -192,10 +201,13 @@ async def lifespan(app: FastAPI):
for i, url in enumerate(global_args.combined): for i, url in enumerate(global_args.combined):
bp = bp_list[i] if i < len(bp_list) else None bp = bp_list[i] if i < len(bp_list) else None
combined_instances.append(InstanceState(url, bp)) combined_instances.append(InstanceState(url, bp))
# Bootstrap combined instances for offload (need engine_ids for KV transfer)
if global_args.offload and bp_list: if global_args.offload and bp_list:
await init_prefill_bootstrap(combined_instances, app.state.ready) await init_prefill_bootstrap(combined_instances, app.state.ready)
else: else:
app.state.ready.set() app.state.ready.set()
policy = getattr(global_args, 'policy', 'linear') policy = getattr(global_args, 'policy', 'linear')
print(f"Combined mode: {len(combined_instances)} instances, policy={policy}, offload={'ON' if global_args.offload else 'OFF'}") print(f"Combined mode: {len(combined_instances)} instances, policy={policy}, offload={'ON' if global_args.offload else 'OFF'}")
else: else:
@@ -250,12 +262,12 @@ async def _handle(request: Request, api: str):
async def _handle_combined(api, req_data, token_ids, input_length, session_id, headers): async def _handle_combined(api, req_data, token_ids, input_length, session_id, headers):
"""Combined mode with adaptive prefill offload (v2). """Combined mode with V2 P2P offload.
WARM/MEDIUM: route to best instance, co-located P+D (no KV transfer). WARM/MEDIUM: route to best instance, co-located P+D (no KV transfer).
HEAVY (kv_both mode): P on least-loaded instance, KV via Mooncake, D on HEAVY: C_s (session-sticky, has cache) does FAST prefill,
session-sticky instance. Only works if instances have kv_role=kv_both. D (least-loaded C, D != C_s) pulls KV via Mooncake and decodes.
Falls back to co-located if --no-offload or instances lack Mooncake. Offload only when D is meaningfully less loaded than C_s.
""" """
policy = getattr(global_args, 'policy', 'linear') if global_args else 'linear' policy = getattr(global_args, 'policy', 'linear') if global_args else 'linear'
picker = pick_instance_lmetric if policy == 'lmetric' else pick_instance picker = pick_instance_lmetric if policy == 'lmetric' else pick_instance
@@ -272,50 +284,45 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
"t_proxy_recv": _time.monotonic(), "t_proxy_recv": _time.monotonic(),
} }
offload_enabled = getattr(global_args, 'offload', False) if global_args else False # H4 cache-aware offload gate: only offload when C_s has significant cache
has_bootstrap = any(inst.bootstrap_port for inst in combined_instances) # Cold turn-1 HEAVY: stay co-located (no RDMA overhead)
# Cached turn-2+ HEAVY: offload to flexible D (C_s fast prefill + D decode)
# Elastic offload decision: offload only when it helps offload_enabled = getattr(global_args, 'offload', False) and len(combined_instances) >= 2
use_offload = False use_offload = False
offload_reason = "disabled" offload_reason = "offload_disabled"
if estimated_new >= HEAVY_THRESHOLD and offload_enabled and has_bootstrap and len(combined_instances) >= 2:
d_inst = best_inst
p_candidates = [(i, inst) for i, inst in enumerate(combined_instances) if inst is not d_inst]
avg_load = max(sum(i.ongoing_tokens for i in combined_instances) / len(combined_instances), 1.0)
# Round-robin P selection with overload skip (spreads P-role evenly) if estimated_new >= HEAVY_THRESHOLD and offload_enabled:
global _offload_inflight, _p_round_robin_idx cache_ratio = cache_hit / max(input_length, 1)
p_inst = None d_candidate = min((c for c in combined_instances if c is not best_inst),
for _ in range(len(p_candidates)): key=lambda c: c.ongoing_tokens)
_p_round_robin_idx = (_p_round_robin_idx + 1) % len(p_candidates) breakdown["cache_ratio"] = cache_ratio
candidate = p_candidates[_p_round_robin_idx][1]
if candidate.ongoing_tokens < avg_load * OVERLOAD_FACTOR:
p_inst = candidate
break
if p_inst is None:
p_inst = min(p_candidates, key=lambda x: x[1].ongoing_tokens)[1]
if _offload_inflight >= MAX_OFFLOAD_INFLIGHT: if cache_ratio >= 0.3: # at least 30% cache hit to justify RDMA offload
offload_reason = "max_concurrent_reached"
elif p_inst.ongoing_tokens >= HEAVY_THRESHOLD * 2:
offload_reason = "p_saturated"
else:
use_offload = True use_offload = True
offload_reason = "offload_accepted" offload_reason = "cached_offload_%.0f%%" % (cache_ratio * 100)
_offload_inflight += 1 else:
offload_reason = "cold_colocated_%.0f%%" % (cache_ratio * 100)
if use_offload: if use_offload:
d_idx = best_idx # C_s does fast cached prefill, D does decode
p_inst.ongoing_tokens += input_length # reserve immediately p_inst = best_inst # session-sticky, has prefix cache
d_inst = d_candidate
d_idx = combined_instances.index(d_inst)
# Accounting: C_s only prefills estimated_new tokens (cached prefix is free)
p_inst.ongoing_tokens += input_length
p_inst.pending_prefill_tokens += estimated_new p_inst.pending_prefill_tokens += estimated_new
p_inst.num_requests += 1 p_inst.num_requests += 1
p_inst.active_p_offloads += 1
breakdown["route_class"] = "HEAVY_P2P" breakdown["route_class"] = "HEAVY_OFFLOAD"
breakdown["offload_reason"] = offload_reason breakdown["offload_reason"] = offload_reason
breakdown["p_inst"] = p_inst.url breakdown["p_inst"] = p_inst.url
breakdown["d_inst"] = d_inst.url breakdown["d_inst"] = d_inst.url
breakdown["p_load"] = p_inst.ongoing_tokens breakdown["p_load"] = p_inst.ongoing_tokens
breakdown["d_load"] = d_inst.ongoing_tokens breakdown["d_load"] = d_inst.ongoing_tokens
# Update session affinity to D (D will have KV after this request)
if session_id: if session_id:
session_affinity[session_id] = d_idx session_affinity[session_id] = d_idx
@@ -325,8 +332,10 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
if estimated_new >= HEAVY_THRESHOLD: if estimated_new >= HEAVY_THRESHOLD:
breakdown["route_class"] = "HEAVY_COLO" breakdown["route_class"] = "HEAVY_COLO"
breakdown["offload_reason"] = offload_reason breakdown["offload_reason"] = offload_reason
elif estimated_new < 5000:
breakdown["route_class"] = "WARM"
else: else:
breakdown["route_class"] = "WARM" if estimated_new < 5000 else "MEDIUM" breakdown["route_class"] = "MEDIUM"
inst = best_inst inst = best_inst
breakdown["routed_to"] = inst.url breakdown["routed_to"] = inst.url
@@ -366,13 +375,15 @@ PREFILL_TIMEOUT_S = 120 # max seconds to wait for P-instance prefill
async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length, async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length,
p_inst, d_inst, breakdown): p_inst, d_inst, breakdown):
"""HEAVY request: prefill on p_inst, KV via Mooncake, decode on d_inst. """HEAVY request: prefill on p_inst (C_s), KV via Mooncake, decode on d_inst (D).
On prefill timeout/failure, falls back to co-located decode on d_inst. On prefill timeout/failure, falls back to co-located decode on d_inst.
""" """
global _offload_inflight
request_id = headers.get("X-Request-Id", "") request_id = headers.get("X-Request-Id", "")
estimated_new = breakdown.get("estimated_new_tokens", 0) estimated_new = breakdown.get("estimated_new_tokens", 0)
# V2: p_inst is C_s with cache, so pending_prefill_tokens was incremented
# by estimated_new (only new tokens), not full input_length.
p_prefill_release = estimated_new
# Step 1: Await prefill on p_inst (ongoing_tokens already reserved by caller) # Step 1: Await prefill on p_inst (ongoing_tokens already reserved by caller)
breakdown["t_prefill_sent"] = _time.monotonic() breakdown["t_prefill_sent"] = _time.monotonic()
@@ -405,9 +416,9 @@ async def _handle_heavy_offload(api, req_data, headers, token_ids, input_length,
finally: finally:
# Always release P-instance resources exactly once # Always release P-instance resources exactly once
p_inst.ongoing_tokens -= input_length p_inst.ongoing_tokens -= input_length
p_inst.pending_prefill_tokens -= estimated_new p_inst.pending_prefill_tokens -= p_prefill_release
p_inst.num_requests -= 1 p_inst.num_requests -= 1
_offload_inflight = max(0, _offload_inflight - 1) p_inst.active_p_offloads = max(0, p_inst.active_p_offloads - 1)
if not prefill_ok: if not prefill_ok:
# Fallback: co-located prefill+decode on d_inst (no KV transfer) # Fallback: co-located prefill+decode on d_inst (no KV transfer)
@@ -587,10 +598,12 @@ async def get_stats():
instances = combined_instances or prefill_instances + decode_instances instances = combined_instances or prefill_instances + decode_instances
return [{ return [{
"url": inst.url, "url": inst.url,
"role": "combined",
"ongoing_tokens": inst.ongoing_tokens, "ongoing_tokens": inst.ongoing_tokens,
"pending_prefill_tokens": inst.pending_prefill_tokens, "pending_prefill_tokens": inst.pending_prefill_tokens,
"ongoing_decode_tokens": inst.ongoing_decode_tokens, "ongoing_decode_tokens": inst.ongoing_decode_tokens,
"num_requests": inst.num_requests, "num_requests": inst.num_requests,
"active_p_offloads": inst.active_p_offloads,
"cached_blocks": len(inst.cached_blocks), "cached_blocks": len(inst.cached_blocks),
} for inst in instances] } for inst in instances]

156
scripts/launch_phase1_ps.sh Executable file
View File

@@ -0,0 +1,156 @@
#!/usr/bin/env bash
# Phase 1: Dedicated Prefill Service (7C + 1PS)
# 7 Combined instances (GPUs 0-6, ports 8000-8006, kv_both)
# 1 Prefill Service instance (GPU 7, port 8007, kv_both)
set -euo pipefail
cd /home/admin/cpfs/wjh/agentic-kv
source .venv/bin/activate
MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct
OUTDIR=outputs/phase1_ps
mkdir -p "$OUTDIR"
# ---- Cleanup ----
echo "=== Killing existing processes ==="
pkill -f "vllm serve" 2>/dev/null || true
pkill -f "cache_aware_proxy" 2>/dev/null || true
sleep 3
echo "=== Verifying GPUs free ==="
nvidia-smi --query-gpu=index,memory.used --format=csv,noheader
sleep 2
# ---- Launch 7 Combined instances (GPUs 0-6) ----
echo "=== Launching 7 Combined instances ==="
for i in $(seq 0 6); do
echo "Starting C instance $i on GPU $i, port $((8000+i)), bootstrap $((8998+i))"
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \
.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$OUTDIR/vllm_c_$i.log" 2>&1 &
sleep 2
done
# ---- Launch 1 PS instance (GPU 7) ----
echo "=== Launching PS instance on GPU 7, port 8007, bootstrap 9005 ==="
VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \
.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$OUTDIR/vllm_ps_0.log" 2>&1 &
sleep 2
# ---- Wait for all instances healthy ----
echo "=== Waiting for all instances to be healthy ==="
for port in 8000 8001 8002 8003 8004 8005 8006 8007; do
echo -n " Waiting for port $port..."
until curl -sf "http://127.0.0.1:$port/health" > /dev/null 2>&1; do
sleep 5
echo -n "."
done
echo " OK"
done
echo "=== All instances healthy ==="
sleep 5
# ---- Launch Proxy ----
echo "=== Launching cache_aware_proxy with PS ==="
.venv/bin/python scripts/cache_aware_proxy.py \
--combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \
http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \
--ps-instances http://127.0.0.1:8007 \
--ps-bootstrap-ports 9005 \
--bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \
--port 9090 \
> "$OUTDIR/proxy.log" 2>&1 &
PROXY_PID=$!
echo "Proxy PID: $PROXY_PID"
# Wait for proxy ready
echo -n " Waiting for proxy..."
until curl -sf "http://127.0.0.1:9090/stats" > /dev/null 2>&1; do
sleep 2
echo -n "."
done
echo " OK"
echo "=== Running benchmark ==="
.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \
--output "$OUTDIR/metrics.jsonl" \
--endpoint http://localhost:9090 --model "$MODEL" \
--time-scale 20 --max-inflight-sessions 7 --request-limit 200 -v
echo "=== Saving proxy breakdown and stats ==="
curl -s "http://127.0.0.1:9090/breakdown" > "$OUTDIR/breakdown.json"
curl -s "http://127.0.0.1:9090/stats" > "$OUTDIR/stats.json"
echo "=== Benchmark complete ==="
echo "Results in $OUTDIR/"
echo ""
# ---- Quick analysis ----
echo "=== Quick metrics summary ==="
python3 -c "
import json, statistics
records = []
with open('$OUTDIR/metrics.jsonl') as f:
for line in f:
records.append(json.loads(line))
ok = [r for r in records if r.get('status') == 'ok']
fail = [r for r in records if r.get('status') != 'ok']
print(f'Total: {len(records)}, OK: {len(ok)}, Failed: {len(fail)}')
print(f'Success rate: {len(ok)/len(records)*100:.1f}%')
if ok:
ttfts = sorted([r['ttft'] for r in ok])
tpots = sorted([r['tpot'] for r in ok])
e2es = sorted([r['e2e'] for r in ok])
def pct(vals, p):
idx = int(len(vals) * p / 100)
return vals[min(idx, len(vals)-1)]
print(f'TTFT p50={pct(ttfts,50):.3f} p90={pct(ttfts,90):.3f} p99={pct(ttfts,99):.3f}')
print(f'TPOT p50={pct(tpots,50):.4f} p90={pct(tpots,90):.4f} p99={pct(tpots,99):.4f}')
print(f'E2E p50={pct(e2es,50):.3f} p90={pct(e2es,90):.3f} p99={pct(e2es,99):.3f}')
# Breakdown analysis
try:
with open('$OUTDIR/breakdown.json') as f:
bd = json.load(f)
classes = {}
for r in bd:
rc = r.get('route_class', 'UNKNOWN')
classes[rc] = classes.get(rc, 0) + 1
print(f'\nRoute class breakdown:')
for rc, cnt in sorted(classes.items()):
print(f' {rc}: {cnt}')
# PS utilization
ps_reqs = [r for r in bd if r.get('route_class') == 'HEAVY_PS']
print(f'\nPS offloaded: {len(ps_reqs)} requests')
# Offload reasons for HEAVY
heavy = [r for r in bd if r.get('route_class', '').startswith('HEAVY')]
reasons = {}
for r in heavy:
reason = r.get('offload_reason', 'unknown')
reasons[reason] = reasons.get(reason, 0) + 1
if reasons:
print(f'HEAVY offload reasons:')
for reason, cnt in sorted(reasons.items()):
print(f' {reason}: {cnt}')
except Exception as e:
print(f'Breakdown analysis error: {e}')
"
echo ""
echo "=== Phase 1 experiment complete ==="

View File

@@ -0,0 +1,172 @@
#!/bin/bash
# H4: Cache-aware offload gate — only offload HEAVY when cache_ratio >= 0.3
# Cold turn-1 HEAVY stays co-located (no RDMA overhead)
# Cached turn-2+ HEAVY offloads to flexible D
set -euo pipefail
cd /home/admin/cpfs/wjh/agentic-kv
source .venv/bin/activate
MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct
OUTDIR=outputs/h4_cache_gate
cleanup() {
for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done
sleep 3
for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done
sleep 5
}
cleanup
mkdir -p "$OUTDIR"
echo "=== Verifying GPUs free ==="
nvidia-smi --query-gpu=index,memory.used --format=csv,noheader
# ---- Launch 8 Combined instances (GPUs 0-7, all kv_both) ----
echo "=== Launching 8 Combined instances ==="
for i in $(seq 0 7); do
echo "Starting C instance $i on GPU $i, port $((8000+i)), bootstrap $((8998+i))"
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \
.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$OUTDIR/vllm_$i.log" 2>&1 &
sleep 2
done
# ---- Wait for all instances healthy ----
echo "=== Waiting for all instances to be healthy ==="
for port in $(seq 8000 8007); do
echo -n " Waiting for port $port..."
timeout 600 bash -c "until curl -sf http://127.0.0.1:$port/health > /dev/null 2>&1; do sleep 5; done"
echo " OK"
done
# Wait for bootstrap ports
for bp in $(seq 8998 9005); do
timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done"
done
echo "=== All instances healthy ==="
sleep 5
# ---- Launch Proxy with H4 cache-aware offload ----
echo "=== Launching cache_aware_proxy with H4 cache-aware offload gate ==="
.venv/bin/python scripts/cache_aware_proxy.py \
--combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \
http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 \
http://127.0.0.1:8006 http://127.0.0.1:8007 \
--bootstrap-ports 8998,8999,9000,9001,9002,9003,9004,9005 \
--offload --port 9090 \
> "$OUTDIR/proxy.log" 2>&1 &
PROXY_PID=$!
echo "Proxy PID: $PROXY_PID"
# Wait for proxy ready
echo -n " Waiting for proxy..."
until curl -sf "http://127.0.0.1:9090/stats" > /dev/null 2>&1; do
sleep 2
echo -n "."
done
echo " OK"
# ---- Run benchmark ----
echo "=== Running benchmark: 200 req, time_scale=20, max-inflight-sessions=8 ==="
.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \
--output "$OUTDIR/metrics.jsonl" \
--endpoint http://localhost:9090 --model "$MODEL" \
--time-scale 20 --max-inflight-sessions 8 --request-limit 200 -v
# ---- Save proxy data BEFORE cleanup ----
echo "=== Saving proxy breakdown and stats ==="
curl -sf "http://127.0.0.1:9090/breakdown" > "$OUTDIR/breakdown.json" 2>/dev/null || true
curl -sf "http://127.0.0.1:9090/stats" > "$OUTDIR/stats.json" 2>/dev/null || true
# ---- Analysis ----
echo "=== H4 Cache-Aware Gate Results ==="
.venv/bin/python -c "
import json
from collections import Counter
rows = [json.loads(l) for l in open('$OUTDIR/metrics.jsonl')]
ok = [r for r in rows if not r.get('error')]
fail = [r for r in rows if r.get('error')]
p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0
ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')])
tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0])
e2es = sorted([r['latency_s'] for r in ok])
print(f'OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f}')
# Per-class breakdown
for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]:
sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')]
if sub:
t = sorted([r['ttft_s'] for r in sub])
tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0])
e = sorted([r['latency_s'] for r in sub])
print(f' {cl:6s} n={len(sub):3d} TTFT50={p(t,.5):.3f} TTFT90={p(t,.9):.3f} TPOT90={p(tp,.9):.4f} E2E50={p(e,.5):.3f}')
# Route distribution from breakdown
try:
bd = json.load(open('$OUTDIR/breakdown.json'))
rc = Counter(b.get('route_class','') for b in bd)
print(f'\nRoute class distribution:')
for cls, cnt in sorted(rc.items()):
print(f' {cls}: {cnt}')
# Cache ratio analysis for HEAVY
heavy = [b for b in bd if b.get('route_class','').startswith('HEAVY')]
reasons = Counter(b.get('offload_reason','') for b in heavy)
print(f'\nHEAVY offload reasons: {dict(reasons)}')
colo = [b for b in bd if b.get('route_class') == 'HEAVY_COLO']
offloaded = [b for b in bd if b.get('route_class') == 'HEAVY_OFFLOAD']
print(f'\nHEAVY_COLO (cold, no RDMA): {len(colo)}')
print(f'HEAVY_OFFLOAD (cached, RDMA): {len(offloaded)}')
# Cache ratios
for b in heavy:
cr = b.get('cache_ratio', b.get('cache_hit',0)/max(b.get('input_length',1),1))
cls = b.get('route_class','')
reason = b.get('offload_reason','')
# Timing comparison: HEAVY_COLO vs HEAVY_OFFLOAD
if colo:
colo_ttft = sorted([b['t_first_token']-b['t_proxy_recv'] for b in colo if b.get('t_first_token')])
if colo_ttft:
print(f' HEAVY_COLO TTFT: p50={p(colo_ttft,.5):.2f}s p90={p(colo_ttft,.9):.2f}s')
if offloaded:
off_ttft = sorted([b['t_first_token']-b['t_proxy_recv'] for b in offloaded if b.get('t_first_token')])
if off_ttft:
print(f' HEAVY_OFFLOAD TTFT: p50={p(off_ttft,.5):.2f}s p90={p(off_ttft,.9):.2f}s')
# Offload timing breakdown
if offloaded:
pf = [b['t_prefill_done']-b['t_prefill_sent'] for b in offloaded if b.get('t_prefill_done') and b.get('t_prefill_sent')]
kv = [b['t_first_token']-b['t_prefill_done'] for b in offloaded if b.get('t_first_token') and b.get('t_prefill_done')]
if pf:
pf.sort()
print(f' Offload prefill: p50={p(pf,.5):.2f}s p90={p(pf,.9):.2f}s')
if kv:
kv.sort()
print(f' Offload KV xfer+decode start: p50={p(kv,.5):.2f}s p90={p(kv,.9):.2f}s')
except Exception as e:
print(f'Breakdown analysis error: {e}')
if fail:
print(f'\nFailed requests ({len(fail)}):')
for r in fail[:5]:
print(f' input={r[\"input_length\"]} error={r[\"error\"][:80]}')
print()
print('=== Baselines for comparison ===')
print('Baseline 8C plain: OK=198/200 TTFT50=1.075 TTFT90=9.384 TPOT90=0.0761 E2E50=5.075')
print('Phase0A 7C kv_both: OK=198/200 TTFT50=1.073 TPOT90=0.0738 E2E50=5.096')
print('V2 all-offload: OK=179/185 TTFT50=0.762 TPOT90=0.0746 E2E50=4.628')
"
cleanup
echo "=== DONE $(date) ==="

169
scripts/run_ps_ablation.sh Executable file
View File

@@ -0,0 +1,169 @@
#!/bin/bash
# PS offload ablation: 3 experiments back-to-back with full cleanup between each.
# 1. always_offload: all HEAVY → PS (zero hyperparameters)
# 2. cost_model: cost-based decision (no interference gate)
# 3. high_load: cost-based + 1000 requests (higher contention)
set -euo pipefail
cd /home/admin/cpfs/wjh/agentic-kv
source .venv/bin/activate
MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct
VENV=.venv/bin
TRACE=traces/sampled_1000req_seed42.jsonl
cleanup() {
echo "[cleanup] Killing all processes..."
for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do
kill -9 "$p" 2>/dev/null || true
done
sleep 3
for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do
kill -9 "$p" 2>/dev/null || true
done
sleep 5
local used=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits 2>/dev/null | awk '{s+=$1} END{print s}')
if [ "${used:-0}" -gt 100 ]; then
echo "[ERROR] GPUs not free (${used}MB). Waiting 10s..."
sleep 10
for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do
kill -9 "$p" 2>/dev/null || true
done
sleep 5
fi
echo "[cleanup] Done."
}
launch_7c_1ps() {
echo "[launch] 7C (kv_both) + 1PS (kv_both)..."
for i in $(seq 0 6); do
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \
$VENV/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$1/vllm_c_$i.log" 2>&1 &
sleep 2
done
VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \
$VENV/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$1/vllm_ps.log" 2>&1 &
for i in $(seq 0 7); do
timeout 600 bash -c "until curl -s localhost:$((8000+i))/health > /dev/null 2>&1; do sleep 5; done"
echo " inst_$i healthy"
done
for bp in $(seq 8998 9005); do
timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done"
done
echo "[launch] All ready."
}
run_experiment() {
local tag=$1
local offload_mode=$2
local requests=$3
local sessions=$4
local outdir=outputs/$tag
echo ""
echo "================================================================"
echo " Experiment: $tag (mode=$offload_mode, requests=$requests)"
echo " $(date)"
echo "================================================================"
cleanup
mkdir -p "$outdir"
launch_7c_1ps "$outdir"
echo "[proxy] Starting (offload_mode=$offload_mode)..."
$VENV/python scripts/cache_aware_proxy.py \
--combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \
http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \
--ps-instances http://127.0.0.1:8007 \
--ps-bootstrap-ports 9005 \
--bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \
--offload-mode "$offload_mode" \
--port 9090 > "$outdir/proxy.log" 2>&1 &
sleep 3
echo "[bench] Running $requests requests, $sessions sessions..."
$VENV/python -m replayer --trace "$TRACE" \
--output "$outdir/metrics.jsonl" \
--endpoint http://localhost:9090 --model "$MODEL" \
--time-scale 20 --max-inflight-sessions "$sessions" \
--request-limit "$requests" -v 2>&1 | tail -5
curl -sf http://localhost:9090/breakdown > "$outdir/breakdown.json" 2>/dev/null || true
curl -sf http://localhost:9090/stats > "$outdir/stats.json" 2>/dev/null || true
# Quick summary
$VENV/python -c "
import json
from collections import Counter
rows = [json.loads(l) for l in open('$outdir/metrics.jsonl')]
ok = [r for r in rows if not r.get('error')]
p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0
ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')])
tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0])
e2es = sorted([r['latency_s'] for r in ok])
print(' OK=%d/%d TTFT50=%.3f TTFT90=%.3f TPOT90=%.4f E2E50=%.3f' % (
len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5)))
for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]:
sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')]
if sub:
t = sorted([r['ttft_s'] for r in sub])
tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0])
print(' %-6s n=%3d TTFT50=%.3f TPOT90=%.4f' % (cl, len(sub), p(t,.5), p(tp,.9) if tp else 0))
try:
bd = json.load(open('$outdir/breakdown.json'))
rc = Counter(b.get('route_class','') for b in bd)
print(' Routes: %s' % dict(rc))
except: pass
"
echo " Done: $tag"
}
# ─── Run experiments ───
run_experiment "ps_always" "always" 200 7
run_experiment "ps_cost" "cost" 200 7
run_experiment "ps_highload" "cost" 1000 7
# ─── Final comparison ───
cleanup
echo ""
echo "================================================================"
echo " FINAL COMPARISON"
echo "================================================================"
$VENV/python -c "
import json
configs = [
('outputs/phase0a_7c_kvboth/metrics.jsonl', 'Control: 7C no PS'),
('outputs/ps_always/metrics.jsonl', 'PS always offload'),
('outputs/ps_cost/metrics.jsonl', 'PS cost model'),
('outputs/ps_highload/metrics.jsonl', 'PS cost 1000req'),
('outputs/baseline_stability_fresh/metrics.jsonl', 'Baseline: 8C plain'),
]
p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0
print('%-25s %7s %7s %7s %7s %7s' % ('Config', 'OK/N', 'TTFT50', 'TTFT90', 'TPOT90', 'E2E50'))
print('-' * 80)
for path, label in configs:
try:
rows = [json.loads(l) for l in open(path)]
ok = [r for r in rows if not r.get('error')]
ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')])
tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0])
e2es = sorted([r['latency_s'] for r in ok])
print('%-25s %3d/%3d %7.3f %7.3f %7.4f %7.3f' % (
label, len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5)))
except Exception as e:
print('%-25s %s' % (label, e))
"
echo ""
echo "=== ALL DONE $(date) ==="

100
scripts/run_ps_flexd.sh Executable file
View File

@@ -0,0 +1,100 @@
#!/bin/bash
# PS + flexible D: HEAVY prefill on PS, decode on least-loaded C
set -euo pipefail
cd /home/admin/cpfs/wjh/agentic-kv
source .venv/bin/activate
MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct
OUTDIR=outputs/ps_flexd
cleanup() {
for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done
sleep 3
for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done
sleep 5
}
cleanup
mkdir -p "$OUTDIR"
# 7 C instances (kv_both, GPUs 0-6)
for i in $(seq 0 6); do
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \
.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$OUTDIR/vllm_c_$i.log" 2>&1 &
sleep 2
done
# 1 PS instance (kv_both, GPU 7)
VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \
.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$OUTDIR/vllm_ps.log" 2>&1 &
echo "Waiting for instances..."
for i in $(seq 0 7); do
timeout 600 bash -c "until curl -s localhost:$((8000+i))/health > /dev/null 2>&1; do sleep 5; done"
echo " inst_$i healthy"
done
for bp in $(seq 8998 9005); do
timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done"
done
echo "All ready"
# Proxy: PS on port 8007, C on 8000-8006, flexible D
.venv/bin/python scripts/cache_aware_proxy.py \
--combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \
http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \
--ps-instances http://127.0.0.1:8007 \
--ps-bootstrap-ports 9005 \
--bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \
--port 9090 > "$OUTDIR/proxy.log" 2>&1 &
sleep 3
echo "Running benchmark..."
.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \
--output "$OUTDIR/metrics.jsonl" --endpoint http://localhost:9090 --model "$MODEL" \
--time-scale 20 --max-inflight-sessions 7 --request-limit 200 -v
curl -sf http://localhost:9090/breakdown > "$OUTDIR/breakdown.json" 2>/dev/null || true
curl -sf http://localhost:9090/stats > "$OUTDIR/stats.json" 2>/dev/null || true
# Quick analysis
.venv/bin/python -c "
import json
from collections import Counter
rows = [json.loads(l) for l in open('$OUTDIR/metrics.jsonl')]
ok = [r for r in rows if not r.get('error')]
p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0
ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')])
tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0])
e2es = sorted([r['latency_s'] for r in ok])
print('OK=%d/%d TTFT50=%.3f TTFT90=%.3f TPOT90=%.4f E2E50=%.3f' % (
len(ok), len(rows), p(ttfts,.5), p(ttfts,.9), p(tpots,.9), p(e2es,.5)))
for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]:
sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')]
if sub:
t = sorted([r['ttft_s'] for r in sub])
tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0])
print(' %-6s n=%3d TTFT50=%.3f TPOT90=%.4f' % (cl, len(sub), p(t,.5), p(tp,.9) if tp else 0))
try:
bd = json.load(open('$OUTDIR/breakdown.json'))
rc = Counter(b.get('route_class','') for b in bd)
print('Routes: %s' % dict(rc))
ps = [b for b in bd if b.get('route_class') == 'HEAVY_PS']
if ps:
pf = [b['t_prefill_done']-b['t_prefill_sent'] for b in ps if b.get('t_prefill_done') and b.get('t_prefill_sent')]
if pf: print('PS prefill: n=%d p50=%.1fs p90=%.1fs' % (len(pf), p(pf,.5), p(pf,.9)))
except: pass
print()
print('Compare: Phase0A: TTFT50=1.073 TPOT90=0.0738 E2E50=5.096')
print('Compare: Baseline 8C: TTFT50=1.075 TPOT90=0.0761 E2E50=5.075')
"
cleanup
echo "=== DONE $(date) ==="

79
scripts/run_ps_remaining.sh Executable file
View File

@@ -0,0 +1,79 @@
#!/bin/bash
# Run ps_cost and ps_highload experiments (ps_always already done)
set -euo pipefail
cd /home/admin/cpfs/wjh/agentic-kv
source .venv/bin/activate
MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct
VENV=.venv/bin
TRACE=traces/sampled_1000req_seed42.jsonl
cleanup() {
for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done
sleep 3
for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done
sleep 5
}
launch_7c_1ps() {
local outdir=$1
mkdir -p "$outdir"
for i in $(seq 0 6); do
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \
$VENV/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$outdir/vllm_c_$i.log" 2>&1 &
sleep 2
done
VLLM_MOONCAKE_BOOTSTRAP_PORT=9005 MASTER_PORT=29507 CUDA_VISIBLE_DEVICES=7 \
$VENV/vllm serve "$MODEL" --host 0.0.0.0 --port 8007 --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$outdir/vllm_ps.log" 2>&1 &
for i in $(seq 0 7); do
timeout 600 bash -c "until curl -s localhost:$((8000+i))/health > /dev/null 2>&1; do sleep 5; done"
echo " inst_$i healthy"
done
for bp in $(seq 8998 9005); do
timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done"
done
echo " All ready"
}
run_exp() {
local tag=$1 mode=$2 reqs=$3 sess=$4
local outdir=outputs/$tag
echo ""
echo "================================================================"
echo " $tag (mode=$mode, reqs=$reqs, sess=$sess)"
echo " $(date)"
echo "================================================================"
cleanup
launch_7c_1ps "$outdir"
$VENV/python scripts/cache_aware_proxy.py \
--combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \
http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 http://127.0.0.1:8006 \
--ps-instances http://127.0.0.1:8007 --ps-bootstrap-ports 9005 \
--bootstrap-ports 8998,8999,9000,9001,9002,9003,9004 \
--offload-mode "$mode" --port 9090 > "$outdir/proxy.log" 2>&1 &
sleep 3
$VENV/python -m replayer --trace "$TRACE" \
--output "$outdir/metrics.jsonl" --endpoint http://localhost:9090 --model "$MODEL" \
--time-scale 20 --max-inflight-sessions "$sess" --request-limit "$reqs" -v 2>&1 | tail -5
curl -sf http://localhost:9090/breakdown > "$outdir/breakdown.json" 2>/dev/null || true
curl -sf http://localhost:9090/stats > "$outdir/stats.json" 2>/dev/null || true
echo " Done: $tag"
}
run_exp ps_cost cost 200 7
run_exp ps_highload cost 1000 7
cleanup
echo ""
echo "=== ALL REMAINING DONE $(date) ==="

156
scripts/run_v2_offload.sh Executable file
View File

@@ -0,0 +1,156 @@
#!/bin/bash
# V2: P2P KV offload — C_s (session-sticky, cached) prefills, D (least-loaded) decodes
# 8 combined instances (all kv_both), no dedicated PS
set -euo pipefail
cd /home/admin/cpfs/wjh/agentic-kv
source .venv/bin/activate
MODEL=/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct
OUTDIR=outputs/v2_offload
cleanup() {
for p in $(ps aux | grep -E 'vllm serve|cache_aware_proxy' | grep -v grep | awk '{print $2}' 2>/dev/null); do kill -9 "$p" 2>/dev/null || true; done
sleep 3
for p in $(fuser /dev/nvidia* 2>/dev/null | tr ' ' '\n' | sort -u | grep -v '^$' || true); do kill -9 "$p" 2>/dev/null || true; done
sleep 5
}
cleanup
mkdir -p "$OUTDIR"
echo "=== Verifying GPUs free ==="
nvidia-smi --query-gpu=index,memory.used --format=csv,noheader
# ---- Launch 8 Combined instances (GPUs 0-7, all kv_both) ----
echo "=== Launching 8 Combined instances ==="
for i in $(seq 0 7); do
echo "Starting C instance $i on GPU $i, port $((8000+i)), bootstrap $((8998+i))"
VLLM_MOONCAKE_BOOTSTRAP_PORT=$((8998+i)) MASTER_PORT=$((29500+i)) CUDA_VISIBLE_DEVICES=$i \
.venv/bin/vllm serve "$MODEL" --host 0.0.0.0 --port $((8000+i)) --tensor-parallel-size 1 \
--trust-remote-code --enable-prefix-caching --enforce-eager \
--dtype auto --gpu-memory-utilization 0.9 --max-model-len 200000 \
--kv-transfer-config '{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' \
> "$OUTDIR/vllm_$i.log" 2>&1 &
sleep 2
done
# ---- Wait for all instances healthy ----
echo "=== Waiting for all instances to be healthy ==="
for port in $(seq 8000 8007); do
echo -n " Waiting for port $port..."
timeout 600 bash -c "until curl -sf http://127.0.0.1:$port/health > /dev/null 2>&1; do sleep 5; done"
echo " OK"
done
# Wait for bootstrap ports
for bp in $(seq 8998 9005); do
timeout 120 bash -c "until curl -s localhost:$bp/query > /dev/null 2>&1; do sleep 2; done"
done
echo "=== All instances healthy ==="
sleep 5
# ---- Launch Proxy with V2 offload ----
echo "=== Launching cache_aware_proxy with V2 offload ==="
.venv/bin/python scripts/cache_aware_proxy.py \
--combined http://127.0.0.1:8000 http://127.0.0.1:8001 http://127.0.0.1:8002 \
http://127.0.0.1:8003 http://127.0.0.1:8004 http://127.0.0.1:8005 \
http://127.0.0.1:8006 http://127.0.0.1:8007 \
--bootstrap-ports 8998,8999,9000,9001,9002,9003,9004,9005 \
--offload --port 9090 \
> "$OUTDIR/proxy.log" 2>&1 &
PROXY_PID=$!
echo "Proxy PID: $PROXY_PID"
# Wait for proxy ready
echo -n " Waiting for proxy..."
until curl -sf "http://127.0.0.1:9090/stats" > /dev/null 2>&1; do
sleep 2
echo -n "."
done
echo " OK"
# ---- Run benchmark ----
echo "=== Running benchmark: 200 req, time_scale=20, max-inflight-sessions=8 ==="
.venv/bin/python -m replayer --trace traces/sampled_1000req_seed42.jsonl \
--output "$OUTDIR/metrics.jsonl" \
--endpoint http://localhost:9090 --model "$MODEL" \
--time-scale 20 --max-inflight-sessions 8 --request-limit 200 -v
# ---- Save proxy data BEFORE cleanup ----
echo "=== Saving proxy breakdown and stats ==="
curl -sf "http://127.0.0.1:9090/breakdown" > "$OUTDIR/breakdown.json" 2>/dev/null || true
curl -sf "http://127.0.0.1:9090/stats" > "$OUTDIR/stats.json" 2>/dev/null || true
# ---- Quick analysis ----
echo "=== Quick metrics summary ==="
.venv/bin/python -c "
import json
from collections import Counter
rows = [json.loads(l) for l in open('$OUTDIR/metrics.jsonl')]
ok = [r for r in rows if not r.get('error')]
fail = [r for r in rows if r.get('error')]
p = lambda v,q: sorted(v)[min(int(q*len(v)),len(v)-1)] if v else 0
ttfts = sorted([r['ttft_s'] for r in ok if r.get('ttft_s')])
tpots = sorted([r['tpot_s'] for r in ok if r.get('tpot_s') and r['tpot_s']>0])
e2es = sorted([r['latency_s'] for r in ok])
print(f'OK={len(ok)}/{len(rows)} TTFT50={p(ttfts,.5):.3f} TTFT90={p(ttfts,.9):.3f} TPOT90={p(tpots,.9):.4f} E2E50={p(e2es,.5):.3f}')
# Per-class breakdown
for lo,hi,cl in [(0,5000,'WARM'),(5000,20000,'MED'),(20000,200000,'HEAVY')]:
sub = [r for r in ok if lo <= r['input_length'] < hi and r.get('ttft_s')]
if sub:
t = sorted([r['ttft_s'] for r in sub])
tp = sorted([r['tpot_s'] for r in sub if r.get('tpot_s') and r['tpot_s']>0])
e = sorted([r['latency_s'] for r in sub])
print(f' {cl:6s} n={len(sub):3d} TTFT50={p(t,.5):.3f} TTFT90={p(t,.9):.3f} TPOT90={p(tp,.9):.4f} E2E50={p(e,.5):.3f}')
# Route distribution from breakdown
try:
bd = json.load(open('$OUTDIR/breakdown.json'))
rc = Counter(b.get('route_class','') for b in bd)
print(f'\nRoute class distribution:')
for cls, cnt in sorted(rc.items()):
print(f' {cls}: {cnt}')
# Offload timing
offloaded = [b for b in bd if b.get('route_class') == 'HEAVY_OFFLOAD']
if offloaded:
pf = [b['t_prefill_done']-b['t_prefill_sent'] for b in offloaded if b.get('t_prefill_done') and b.get('t_prefill_sent')]
if pf:
pf.sort()
print(f'\nHEAVY_OFFLOAD prefill time: n={len(pf)} p50={p(pf,.5):.2f}s p90={p(pf,.9):.2f}s')
# Offload reasons for HEAVY
heavy = [b for b in bd if b.get('route_class','').startswith('HEAVY')]
reasons = Counter(b.get('offload_reason','') for b in heavy)
if reasons:
print(f'HEAVY offload reasons: {dict(reasons)}')
# Per-instance P and D counts
p_counts = Counter(b.get('p_inst','') for b in bd if b.get('p_inst'))
d_counts = Counter(b.get('d_inst','') for b in bd if b.get('d_inst'))
if p_counts:
print(f'\nP-instance distribution: {dict(p_counts)}')
if d_counts:
print(f'D-instance distribution: {dict(d_counts)}')
except Exception as e:
print(f'Breakdown analysis error: {e}')
if fail:
print(f'\nFailed requests ({len(fail)}):')
for r in fail[:5]:
print(f' input={r[\"input_length\"]} error={r[\"error\"][:80]}')
print()
print('=== Baselines for comparison ===')
print('Phase0A (7C kv_both): OK=198/200 TTFT50=1.073 TPOT90=0.0738 E2E50=5.096')
print('Baseline (8C plain): OK=198/200 TTFT50=1.075 TPOT90=0.0761 E2E50=5.075')
print('PS V1 flexD: OK=172/186 TTFT50=0.978 TPOT90=0.0758 E2E50=5.623')
"
cleanup
echo "=== DONE $(date) ==="

View File

@@ -347,7 +347,7 @@ class MooncakeConnectorScheduler:
# Only trigger 1 KV transfer per request. # Only trigger 1 KV transfer per request.
params["do_remote_prefill"] = False params["do_remote_prefill"] = False
elif params.get("do_remote_decode"): if params.get("do_remote_decode"):
assert not self.is_kv_consumer assert not self.is_kv_consumer
if not params.get("transfer_id"): if not params.get("transfer_id"):
logger.warning("Missing transfer_id in kv_transfer_params from router!") logger.warning("Missing transfer_id in kv_transfer_params from router!")