B3 post-run helpers: engine_state slicer + per-policy aggregator
scripts/slice_engine_state.py filters a shared engine_*.jsonl by a [t_start_unix, t_end_unix] window. Needed because the patched scheduler appends to one file per engine across the whole sweep; per-policy analysis requires the per-policy slice. scripts/b3_analyze.sh drives the slice + joined_analysis loop for every policy directory in a completed sweep, then aggregates one row per policy (latency percentiles, APC, interference_index, hotspot_index, reuse fractions, failure-cause counts) into b3_policy_comparison.json. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
111
scripts/b3_analyze.sh
Executable file
111
scripts/b3_analyze.sh
Executable file
@@ -0,0 +1,111 @@
|
||||
#!/usr/bin/env bash
|
||||
# Per-policy joined_analysis driver for a completed B3 sweep.
|
||||
#
|
||||
# For each policy directory under <SWEEP_DIR>:
|
||||
# - slice engine_state by run_window.json
|
||||
# - run joined_analysis.py to emit interference / hotspot / reuse
|
||||
# / failure breakdown
|
||||
# Then emit b3_policy_comparison.json aggregating one row per policy.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
ROOT="${ROOT:-/home/admin/cpfs/wjh/agentic-kv}"
|
||||
VENV="$ROOT/.venv/bin"
|
||||
SWEEP_DIR="${1:?usage: $0 <sweep_dir>}"
|
||||
|
||||
WORKER_MAP="http://127.0.0.1:8000=engine_0,http://127.0.0.1:8001=engine_1,http://127.0.0.1:8002=engine_2,http://127.0.0.1:8003=engine_3,http://127.0.0.1:8004=engine_4,http://127.0.0.1:8005=engine_5,http://127.0.0.1:8006=engine_6,http://127.0.0.1:8007=engine_7"
|
||||
|
||||
for policy_dir in "$SWEEP_DIR"/*/; do
|
||||
policy=$(basename "$policy_dir")
|
||||
case "$policy" in
|
||||
engine_state|logs|capped) ;;
|
||||
esac
|
||||
if [ ! -f "$policy_dir/run_window.json" ]; then
|
||||
continue
|
||||
fi
|
||||
echo "=== $policy ==="
|
||||
|
||||
PYTHONPATH="$ROOT" "$VENV/python" \
|
||||
"$ROOT/scripts/slice_engine_state.py" \
|
||||
--input-dir "$SWEEP_DIR/engine_state" \
|
||||
--output-dir "$policy_dir/engine_state" \
|
||||
--window "$policy_dir/run_window.json"
|
||||
|
||||
PYTHONPATH="$ROOT" "$VENV/python" \
|
||||
"$ROOT/analysis/characterization/joined_analysis.py" \
|
||||
--metrics "$policy_dir/metrics.jsonl" \
|
||||
--breakdown "$policy_dir/breakdown.json" \
|
||||
--worker-state "$policy_dir/worker_state.json" \
|
||||
--engine-state-dir "$policy_dir/engine_state" \
|
||||
--worker-map "$WORKER_MAP" \
|
||||
--out-dir "$policy_dir/joined"
|
||||
done
|
||||
|
||||
# Also handle capped/ which is nested
|
||||
if [ -f "$SWEEP_DIR/capped/run_window.json" ]; then
|
||||
echo "=== capped ==="
|
||||
PYTHONPATH="$ROOT" "$VENV/python" \
|
||||
"$ROOT/scripts/slice_engine_state.py" \
|
||||
--input-dir "$SWEEP_DIR/engine_state" \
|
||||
--output-dir "$SWEEP_DIR/capped/engine_state" \
|
||||
--window "$SWEEP_DIR/capped/run_window.json"
|
||||
PYTHONPATH="$ROOT" "$VENV/python" \
|
||||
"$ROOT/analysis/characterization/joined_analysis.py" \
|
||||
--metrics "$SWEEP_DIR/capped/metrics.jsonl" \
|
||||
--breakdown "$SWEEP_DIR/capped/breakdown.json" \
|
||||
--worker-state "$SWEEP_DIR/capped/worker_state.json" \
|
||||
--engine-state-dir "$SWEEP_DIR/capped/engine_state" \
|
||||
--worker-map "$WORKER_MAP" \
|
||||
--out-dir "$SWEEP_DIR/capped/joined"
|
||||
fi
|
||||
|
||||
# Aggregate per-policy summary
|
||||
"$VENV/python" - <<PY
|
||||
import json, os, statistics
|
||||
from pathlib import Path
|
||||
|
||||
sweep = Path("$SWEEP_DIR")
|
||||
def pct(vals, p):
|
||||
if not vals: return None
|
||||
vs = sorted(vals)
|
||||
return vs[int(p * (len(vs)-1))]
|
||||
|
||||
rows = []
|
||||
for sub in sorted(sweep.iterdir()):
|
||||
rw = sub / "run_window.json"
|
||||
jd = sub / "joined"
|
||||
if not rw.exists() or not jd.exists():
|
||||
continue
|
||||
policy = sub.name
|
||||
metrics = [json.loads(l) for l in (sub / "metrics.jsonl").open()]
|
||||
ok = [r for r in metrics if r.get("error") is None]
|
||||
ttfts = [r["ttft_s"] for r in ok if r.get("ttft_s") is not None]
|
||||
tpots = [r["tpot_s"] for r in ok if r.get("tpot_s") is not None]
|
||||
e2es = [r["latency_s"] for r in ok if r.get("latency_s") is not None]
|
||||
total_input = sum(r.get("input_length", 0) for r in ok)
|
||||
total_cached = sum(r.get("cached_tokens", 0) for r in ok)
|
||||
interf = json.loads((jd / "interference_index.json").read_text())
|
||||
hot = json.loads((jd / "hotspot_index.json").read_text())
|
||||
reuse = json.loads((jd / "reuse_decomposition.json").read_text())
|
||||
fail = json.loads((jd / "failure_breakdown.json").read_text())
|
||||
rows.append({
|
||||
"policy": policy,
|
||||
"n_ok": len(ok), "n_total": len(metrics),
|
||||
"ttft_p50_s": pct(ttfts, 0.5), "ttft_p90_s": pct(ttfts, 0.9),
|
||||
"ttft_p99_s": pct(ttfts, 0.99),
|
||||
"tpot_p50_s": pct(tpots, 0.5), "tpot_p90_s": pct(tpots, 0.9),
|
||||
"tpot_p99_s": pct(tpots, 0.99),
|
||||
"e2e_p50_s": pct(e2es, 0.5), "e2e_p90_s": pct(e2es, 0.9),
|
||||
"e2e_p99_s": pct(e2es, 0.99),
|
||||
"apc_ratio": total_cached / max(total_input, 1),
|
||||
"interference_index": interf.get("interference_index"),
|
||||
"hotspot_index_ttft_p90": hot.get("hotspot_index_ttft_p90"),
|
||||
"reuse_intra_frac": reuse.get("fractions", {}).get("intra"),
|
||||
"reuse_cross_frac": reuse.get("fractions", {}).get("cross"),
|
||||
"n_slow": fail.get("n_slow"),
|
||||
"failure_counts": fail.get("counts"),
|
||||
})
|
||||
out = sweep / "b3_policy_comparison.json"
|
||||
out.write_text(json.dumps({"rows": rows}, indent=2))
|
||||
print(json.dumps(rows, indent=2))
|
||||
PY
|
||||
46
scripts/slice_engine_state.py
Normal file
46
scripts/slice_engine_state.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Slice shared engine_*.jsonl by a [t_start_unix, t_end_unix] window.
|
||||
|
||||
Used between B3 policy runs and the analyzer so each policy gets
|
||||
its own per-window engine_state directory.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--input-dir", type=Path, required=True)
|
||||
p.add_argument("--output-dir", type=Path, required=True)
|
||||
p.add_argument("--window", type=Path, required=True,
|
||||
help="run_window.json with t_start_unix / t_end_unix")
|
||||
args = p.parse_args()
|
||||
|
||||
window = json.loads(args.window.read_text())
|
||||
ts = float(window["t_start_unix"])
|
||||
te = float(window["t_end_unix"])
|
||||
|
||||
args.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
print(f"window {ts:.3f} .. {te:.3f}")
|
||||
for src in sorted(args.input_dir.glob("engine_*.jsonl")):
|
||||
n_in = n_out = 0
|
||||
dst = args.output_dir / src.name
|
||||
with src.open() as fi, dst.open("w") as fo:
|
||||
for line in fi:
|
||||
n_in += 1
|
||||
try:
|
||||
r = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
t = r.get("t_unix", 0)
|
||||
if ts <= t <= te:
|
||||
fo.write(line)
|
||||
n_out += 1
|
||||
print(f" {src.name}: {n_out}/{n_in}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user