From 92db1c43708cd99f6b994d0f4bb6e3e1ace66010 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Mon, 25 May 2026 18:51:33 +0800 Subject: [PATCH] B3 post-run helpers: engine_state slicer + per-policy aggregator scripts/slice_engine_state.py filters a shared engine_*.jsonl by a [t_start_unix, t_end_unix] window. Needed because the patched scheduler appends to one file per engine across the whole sweep; per-policy analysis requires the per-policy slice. scripts/b3_analyze.sh drives the slice + joined_analysis loop for every policy directory in a completed sweep, then aggregates one row per policy (latency percentiles, APC, interference_index, hotspot_index, reuse fractions, failure-cause counts) into b3_policy_comparison.json. Co-Authored-By: Claude Opus 4.7 --- scripts/b3_analyze.sh | 111 ++++++++++++++++++++++++++++++++++ scripts/slice_engine_state.py | 46 ++++++++++++++ 2 files changed, 157 insertions(+) create mode 100755 scripts/b3_analyze.sh create mode 100644 scripts/slice_engine_state.py diff --git a/scripts/b3_analyze.sh b/scripts/b3_analyze.sh new file mode 100755 index 0000000..b9a199f --- /dev/null +++ b/scripts/b3_analyze.sh @@ -0,0 +1,111 @@ +#!/usr/bin/env bash +# Per-policy joined_analysis driver for a completed B3 sweep. +# +# For each policy directory under : +# - slice engine_state by run_window.json +# - run joined_analysis.py to emit interference / hotspot / reuse +# / failure breakdown +# Then emit b3_policy_comparison.json aggregating one row per policy. + +set -euo pipefail + +ROOT="${ROOT:-/home/admin/cpfs/wjh/agentic-kv}" +VENV="$ROOT/.venv/bin" +SWEEP_DIR="${1:?usage: $0 }" + +WORKER_MAP="http://127.0.0.1:8000=engine_0,http://127.0.0.1:8001=engine_1,http://127.0.0.1:8002=engine_2,http://127.0.0.1:8003=engine_3,http://127.0.0.1:8004=engine_4,http://127.0.0.1:8005=engine_5,http://127.0.0.1:8006=engine_6,http://127.0.0.1:8007=engine_7" + +for policy_dir in "$SWEEP_DIR"/*/; do + policy=$(basename "$policy_dir") + case "$policy" in + engine_state|logs|capped) ;; + esac + if [ ! -f "$policy_dir/run_window.json" ]; then + continue + fi + echo "=== $policy ===" + + PYTHONPATH="$ROOT" "$VENV/python" \ + "$ROOT/scripts/slice_engine_state.py" \ + --input-dir "$SWEEP_DIR/engine_state" \ + --output-dir "$policy_dir/engine_state" \ + --window "$policy_dir/run_window.json" + + PYTHONPATH="$ROOT" "$VENV/python" \ + "$ROOT/analysis/characterization/joined_analysis.py" \ + --metrics "$policy_dir/metrics.jsonl" \ + --breakdown "$policy_dir/breakdown.json" \ + --worker-state "$policy_dir/worker_state.json" \ + --engine-state-dir "$policy_dir/engine_state" \ + --worker-map "$WORKER_MAP" \ + --out-dir "$policy_dir/joined" +done + +# Also handle capped/ which is nested +if [ -f "$SWEEP_DIR/capped/run_window.json" ]; then + echo "=== capped ===" + PYTHONPATH="$ROOT" "$VENV/python" \ + "$ROOT/scripts/slice_engine_state.py" \ + --input-dir "$SWEEP_DIR/engine_state" \ + --output-dir "$SWEEP_DIR/capped/engine_state" \ + --window "$SWEEP_DIR/capped/run_window.json" + PYTHONPATH="$ROOT" "$VENV/python" \ + "$ROOT/analysis/characterization/joined_analysis.py" \ + --metrics "$SWEEP_DIR/capped/metrics.jsonl" \ + --breakdown "$SWEEP_DIR/capped/breakdown.json" \ + --worker-state "$SWEEP_DIR/capped/worker_state.json" \ + --engine-state-dir "$SWEEP_DIR/capped/engine_state" \ + --worker-map "$WORKER_MAP" \ + --out-dir "$SWEEP_DIR/capped/joined" +fi + +# Aggregate per-policy summary +"$VENV/python" - < None: + p = argparse.ArgumentParser() + p.add_argument("--input-dir", type=Path, required=True) + p.add_argument("--output-dir", type=Path, required=True) + p.add_argument("--window", type=Path, required=True, + help="run_window.json with t_start_unix / t_end_unix") + args = p.parse_args() + + window = json.loads(args.window.read_text()) + ts = float(window["t_start_unix"]) + te = float(window["t_end_unix"]) + + args.output_dir.mkdir(parents=True, exist_ok=True) + print(f"window {ts:.3f} .. {te:.3f}") + for src in sorted(args.input_dir.glob("engine_*.jsonl")): + n_in = n_out = 0 + dst = args.output_dir / src.name + with src.open() as fi, dst.open("w") as fo: + for line in fi: + n_in += 1 + try: + r = json.loads(line) + except json.JSONDecodeError: + continue + t = r.get("t_unix", 0) + if ts <= t <= te: + fo.write(line) + n_out += 1 + print(f" {src.name}: {n_out}/{n_in}") + + +if __name__ == "__main__": + main()