Critical: - cache_aware_proxy: _handle_pd_sep leaked p_inst.num_requests (never decremented) and never managed d_inst.num_requests; fix media_type from application/json to text/event-stream for SSE stream High: - b3_sweep/b3_isolated_policy/b3_analyze: replace hardcoded /home/admin/cpfs/wjh/ ROOT with script-relative $(dirname "$0")/.. - b3_analyze: replace hardcoded 8-port WORKER_MAP with dynamic generation from BASE_PORT and N_INSTANCES Medium: - analyze_breakdown: warn on stderr when records are skipped (was silent) - deploy_vllm_patches: fail-fast on SSH/SCP errors instead of continuing with empty VENV_SITE - pyproject.toml: declare fastapi and uvicorn as runtime dependencies - launch_elastic_p2p: kill EngineCore and proxy in trap handler to prevent GPU memory leaks on exit
127 lines
4.6 KiB
Bash
Executable File
127 lines
4.6 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
# Per-policy joined_analysis driver for a completed B3 sweep.
|
|
#
|
|
# For each policy directory under <SWEEP_DIR>:
|
|
# - slice engine_state by run_window.json
|
|
# - run joined_analysis.py to emit interference / hotspot / reuse
|
|
# / failure breakdown
|
|
# Then emit b3_policy_comparison.json aggregating one row per policy.
|
|
|
|
set -euo pipefail
|
|
|
|
ROOT="${ROOT:-$(cd "$(dirname "$0")/.." && pwd)}"
|
|
VENV="$ROOT/.venv/bin"
|
|
SWEEP_DIR="${1:?usage: $0 <sweep_dir>}"
|
|
|
|
BASE_PORT="${BASE_PORT:-8000}"
|
|
N_INSTANCES="${N_INSTANCES:-8}"
|
|
|
|
# Build WORKER_MAP dynamically from BASE_PORT and N_INSTANCES.
|
|
_worker_map_parts=()
|
|
for ((i=0; i<N_INSTANCES; i++)); do
|
|
_worker_map_parts+=("http://127.0.0.1:$((BASE_PORT + i))=engine_$i")
|
|
done
|
|
WORKER_MAP=$(IFS=,; echo "${_worker_map_parts[*]}")
|
|
|
|
_has_engine_data() {
|
|
# Return 0 (true) if $1/*.jsonl contains any non-empty file.
|
|
local dir="$1"
|
|
[ -d "$dir" ] || return 1
|
|
local f
|
|
for f in "$dir"/engine_*.jsonl; do
|
|
if [ -s "$f" ]; then return 0; fi
|
|
done
|
|
return 1
|
|
}
|
|
|
|
for policy_dir in "$SWEEP_DIR"/*/; do
|
|
policy=$(basename "$policy_dir")
|
|
case "$policy" in
|
|
engine_state|logs) continue ;;
|
|
esac
|
|
if [ ! -f "$policy_dir/run_window.json" ]; then
|
|
continue
|
|
fi
|
|
echo "=== $policy ==="
|
|
|
|
# Isolated policies write engine_state into their own dir; hot-sweep
|
|
# policies share the sweep-root engine_state and need slicing.
|
|
if _has_engine_data "$policy_dir/engine_state"; then
|
|
echo " using policy-local engine_state ($(du -sh "$policy_dir/engine_state" | cut -f1))"
|
|
else
|
|
PYTHONPATH="$ROOT" "$VENV/python" \
|
|
"$ROOT/scripts/slice_engine_state.py" \
|
|
--input-dir "$SWEEP_DIR/engine_state" \
|
|
--output-dir "$policy_dir/engine_state" \
|
|
--window "$policy_dir/run_window.json"
|
|
fi
|
|
|
|
PYTHONPATH="$ROOT" "$VENV/python" \
|
|
"$ROOT/analysis/characterization/joined_analysis.py" \
|
|
--metrics "$policy_dir/metrics.jsonl" \
|
|
--breakdown "$policy_dir/breakdown.json" \
|
|
--worker-state "$policy_dir/worker_state.json" \
|
|
--engine-state-dir "$policy_dir/engine_state" \
|
|
--worker-map "$WORKER_MAP" \
|
|
--out-dir "$policy_dir/joined"
|
|
done
|
|
|
|
# Aggregate per-policy summary
|
|
"$VENV/python" - <<PY
|
|
import json, os, statistics
|
|
from pathlib import Path
|
|
|
|
sweep = Path("$SWEEP_DIR")
|
|
def pct(vals, p):
|
|
# Linear-interpolated percentile, matches metrics._percentile.
|
|
# Previously used floor-indexed sorted[int(p*(n-1))] which is
|
|
# inconsistent with how the same percentile is computed elsewhere.
|
|
if not vals: return None
|
|
vs = sorted(vals)
|
|
if len(vs) == 1: return vs[0]
|
|
rank = p * (len(vs) - 1)
|
|
lo = int(rank)
|
|
hi = min(lo + 1, len(vs) - 1)
|
|
frac = rank - lo
|
|
return vs[lo] * (1 - frac) + vs[hi] * frac
|
|
|
|
rows = []
|
|
for sub in sorted(sweep.iterdir()):
|
|
rw = sub / "run_window.json"
|
|
jd = sub / "joined"
|
|
if not rw.exists() or not jd.exists():
|
|
continue
|
|
policy = sub.name
|
|
metrics = [json.loads(l) for l in (sub / "metrics.jsonl").open()]
|
|
ok = [r for r in metrics if r.get("error") is None]
|
|
ttfts = [r["ttft_s"] for r in ok if r.get("ttft_s") is not None]
|
|
tpots = [r["tpot_s"] for r in ok if r.get("tpot_s") is not None]
|
|
e2es = [r["latency_s"] for r in ok if r.get("latency_s") is not None]
|
|
total_input = sum(r.get("input_length", 0) for r in ok)
|
|
total_cached = sum(r.get("cached_tokens", 0) for r in ok)
|
|
interf = json.loads((jd / "interference_index.json").read_text())
|
|
hot = json.loads((jd / "hotspot_index.json").read_text())
|
|
reuse = json.loads((jd / "reuse_decomposition.json").read_text())
|
|
fail = json.loads((jd / "failure_breakdown.json").read_text())
|
|
rows.append({
|
|
"policy": policy,
|
|
"n_ok": len(ok), "n_total": len(metrics),
|
|
"ttft_p50_s": pct(ttfts, 0.5), "ttft_p90_s": pct(ttfts, 0.9),
|
|
"ttft_p99_s": pct(ttfts, 0.99),
|
|
"tpot_p50_s": pct(tpots, 0.5), "tpot_p90_s": pct(tpots, 0.9),
|
|
"tpot_p99_s": pct(tpots, 0.99),
|
|
"e2e_p50_s": pct(e2es, 0.5), "e2e_p90_s": pct(e2es, 0.9),
|
|
"e2e_p99_s": pct(e2es, 0.99),
|
|
"apc_ratio": total_cached / max(total_input, 1),
|
|
"interference_index": interf.get("interference_index"),
|
|
"hotspot_index_ttft_p90": hot.get("hotspot_index_ttft_p90"),
|
|
"reuse_intra_frac": reuse.get("fractions", {}).get("intra"),
|
|
"reuse_cross_frac": reuse.get("fractions", {}).get("cross"),
|
|
"n_slow": fail.get("n_slow"),
|
|
"failure_counts": fail.get("counts"),
|
|
})
|
|
out = sweep / "b3_policy_comparison.json"
|
|
out.write_text(json.dumps({"rows": rows}, indent=2))
|
|
print(json.dumps(rows, indent=2))
|
|
PY
|