Files
agentic-kvc/analysis/characterization/joined_analysis.py
Gahow Wang 0e82612100 Fix B3 analysis bugs from subagent audit (median + percentile + sweep)
Three fixes from the B3 audit:

1) joined_analysis.hotspot_index used sorted[n//2] as median, which
   returns the ~60th percentile for n=8 (even-length). Systematically
   under-states the hotspot index. Recomputed values:
       lmetric   2.238 -> 2.253  (+0.7%)
       load_only 1.140 -> 1.294  (+13.5%)
       sticky    2.349 -> 2.728  (+16.1%)
       unified   3.350 -> 3.667  (+9.5%)
       capped    1.937 -> 2.020  (+4.3%)
   Qualitative ranking preserved; "capped only modestly reduces hotspot"
   story holds with ~10% drop instead of the previously reported 13%.
   Added test_hotspot_index_uses_true_median_for_even_n to lock in the
   fix.

2) b3_analyze.sh's pct() helper used floor-indexed percentile
   sorted[int(p*(n-1))], inconsistent with metrics._percentile and
   joined_analysis._percentile which both use linear interpolation.
   Now matches.

3) b3_sweep.sh's capped step called run_policy "capped", but the
   proxy's argparse has no "capped" choice, so the hot-sweep variant
   would have crashed on this step. The actual capped data was
   produced via b3_isolated_policy.sh with --policy lmetric. Replace
   the broken inline call with an explicit launch_proxy lmetric +
   inline replayer block so the sweep script matches the data path
   it documents.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 01:08:37 +08:00

597 lines
22 KiB
Python

"""A5: joined-record analysis from instrumented runs.
Inputs (all optional; functions degrade gracefully when missing):
- replayer metrics.jsonl with A1 fields
(t_dispatch_unix, t_first_token_unix, t_finish_unix, proxy_request_id,
endpoint_url, trace_hash_ids)
- proxy breakdown.json with A2 fields
(session_id, candidate_scores, chosen_score_*, t_first_token_unix,
t_done_unix, t_decision_unix)
- proxy worker_state.jsonl with A2 schema (one row per route decision)
- vLLM scheduler engine_state JSONLs from A3
(one per engine, env AGENTIC_STEP_LOG_PATH)
Outputs under <out_dir>/:
- joined.jsonl — per-request join across all sources
- reuse_decomposition.json
- interference_index.json
- hotspot_index.json
- failure_label.jsonl
- window_summary.json — when run_meta.json (from SRR loadgen) is present
"""
from __future__ import annotations
import argparse
import json
import math
import statistics
from collections import defaultdict
from pathlib import Path
from typing import Any, Iterable
JsonDict = dict[str, Any]
# ---------- I/O ---------------------------------------------------------
def load_jsonl(path: Path) -> list[JsonDict]:
if not path.exists():
return []
out: list[JsonDict] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except json.JSONDecodeError:
continue
return out
def load_json(path: Path) -> Any:
if not path.exists():
return None
text = path.read_text(encoding="utf-8").strip()
if not text:
return None
return json.loads(text)
def write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, sort_keys=True))
def write_jsonl(path: Path, rows: Iterable[JsonDict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as fh:
for row in rows:
fh.write(json.dumps(row, sort_keys=True) + "\n")
# ---------- Joining -----------------------------------------------------
def build_joined_records(
metrics: list[JsonDict],
breakdown: list[JsonDict],
worker_state: list[JsonDict],
) -> list[JsonDict]:
"""Join metrics + breakdown + worker_state by request_id.
Returns one row per metrics record (the load-generator's view of truth).
Missing sources leave the corresponding columns as None.
"""
bk_by_id = {str(r.get("request_id")): r for r in breakdown if r.get("request_id")}
ws_by_id = {str(r.get("request_id")): r for r in worker_state if r.get("request_id")}
joined: list[JsonDict] = []
for m in metrics:
rid = str(m.get("request_id") or m.get("proxy_request_id") or "")
bk = bk_by_id.get(rid)
ws = ws_by_id.get(rid)
row: JsonDict = {
"request_id": rid,
"session_id": m.get("session_id"),
"turn_id": m.get("turn_id"),
"trace_timestamp_s": m.get("trace_timestamp_s"),
"input_length": m.get("input_length"),
"output_length": m.get("output_length"),
"cached_tokens": m.get("cached_tokens"),
"actual_output_tokens": m.get("actual_output_tokens"),
"latency_s": m.get("latency_s"),
"ttft_s": m.get("ttft_s"),
"tpot_s": m.get("tpot_s"),
"t_dispatch_unix": m.get("t_dispatch_unix"),
"t_first_token_unix": m.get("t_first_token_unix"),
"t_finish_unix": m.get("t_finish_unix"),
"endpoint_url": m.get("endpoint_url"),
"trace_hash_ids": m.get("trace_hash_ids") or [],
"error": m.get("error"),
}
if bk:
row["policy"] = bk.get("policy")
row["route_class"] = bk.get("route_class")
row["routed_to"] = bk.get("routed_to")
row["chosen_idx"] = bk.get("chosen_idx")
row["chosen_score_linear"] = bk.get("chosen_score_linear")
row["chosen_score_lmetric"] = bk.get("chosen_score_lmetric")
row["estimated_new_tokens"] = bk.get("estimated_new_tokens")
row["cache_hit_proxy"] = bk.get("cache_hit")
row["proxy_t_decision_unix"] = bk.get("t_decision_unix")
row["proxy_t_first_token_unix"] = bk.get("t_first_token_unix")
row["proxy_t_done_unix"] = bk.get("t_done_unix")
if ws:
row["worker_state_at_decision"] = ws.get("workers")
joined.append(row)
return joined
# ---------- Reuse decomposition (real) ----------------------------------
def reuse_decomposition(records: list[JsonDict], block_size: int = 16) -> JsonDict:
"""Real intra/cross/shared decomposition keyed on (session, hash blocks)."""
if not records:
return {"status": "unavailable", "reason": "no joined records"}
# block first-seen index: hash_id -> (session_id, first_seen_seq)
first_seen: dict[int, tuple[str, int]] = {}
block_sessions: dict[int, set[str]] = defaultdict(set)
seq = 0
for r in sorted(records, key=lambda x: x.get("t_dispatch_unix") or 0.0):
sid = str(r.get("session_id"))
for h in r.get("trace_hash_ids") or []:
h_int = int(h)
if h_int not in first_seen:
first_seen[h_int] = (sid, seq)
block_sessions[h_int].add(sid)
seq += 1
total_cached = 0
intra = cross = shared = unclassified = 0
for r in records:
cached = r.get("cached_tokens") or 0
if not cached:
continue
total_cached += cached
sid = str(r.get("session_id"))
hashes = [int(h) for h in (r.get("trace_hash_ids") or [])]
if not hashes:
unclassified += cached
continue
# Approximate: classify the cached tokens by the first non-current
# owner of any hash block we've seen before.
block_tokens = max(cached // max(len(hashes), 1), 1)
for h in hashes:
first_sid, _ = first_seen.get(h, (None, None))
if first_sid is None:
unclassified += min(block_tokens, cached)
elif first_sid == sid:
intra += min(block_tokens, cached)
elif len(block_sessions[h]) >= 8:
shared += min(block_tokens, cached)
else:
cross += min(block_tokens, cached)
cached -= block_tokens
if cached <= 0:
break
return {
"status": "supported",
"total_cached_tokens": total_cached,
"intra_session_tokens": intra,
"cross_session_tokens": cross,
"shared_prefix_tokens": shared,
"unclassified_tokens": unclassified,
"fractions": _fractions(intra, cross, shared, unclassified),
"shared_prefix_min_sessions": 8,
}
def _fractions(*parts: int) -> JsonDict:
total = sum(parts)
if total == 0:
return {"intra": 0.0, "cross": 0.0, "shared": 0.0, "unclassified": 0.0}
labels = ["intra", "cross", "shared", "unclassified"]
return {label: parts[i] / total for i, label in enumerate(labels)}
# ---------- Interference index (B2) -------------------------------------
def interference_index(
joined: list[JsonDict],
engine_state_by_worker: dict[str, list[JsonDict]],
worker_map: dict[str, str] | None = None,
) -> JsonDict:
"""Label each completed request's decode period as overlap / no-overlap.
A request 'overlaps same-worker prefill' if any scheduler step on the
chosen worker between (t_first_token_unix, t_finish_unix) had
prefill_tokens > 0 from a request other than this one.
"""
if not joined or not engine_state_by_worker:
return {"status": "unavailable",
"reason": "missing joined records or engine state"}
tpot_overlap: list[float] = []
tpot_clean: list[float] = []
for r in joined:
rid = r.get("request_id")
# routed_to is the vLLM worker URL; endpoint_url is the proxy URL.
# For worker-id matching we want routed_to.
worker = _resolve_worker(
r.get("routed_to") or r.get("endpoint_url"),
worker_map,
)
steps = engine_state_by_worker.get(worker)
if not steps:
continue
t0 = r.get("t_first_token_unix")
t1 = r.get("t_finish_unix")
tpot = r.get("tpot_s")
if t0 is None or t1 is None or tpot is None or r.get("error"):
continue
overlap = False
for s in steps:
t = s.get("t_unix")
if t is None or t < t0 or t > t1:
continue
if not s.get("prefill_tokens"):
continue
# If the only prefill belongs to *this* request, that's still
# this request's own prefill warming up, not interference.
other_prefill = False
for pr in s.get("per_req", []) or []:
if pr.get("phase") != "prefill":
continue
# vLLM rewrites rid as `cmpl-<our_id>-<idx>-<hash>`; strip
# the prefix and the trailing suffix so equality to our
# proxy request_id works.
if _vllm_rid_matches(pr.get("rid"), rid):
continue
other_prefill = True
break
if other_prefill:
overlap = True
break
(tpot_overlap if overlap else tpot_clean).append(float(tpot))
p90_overlap = _percentile(tpot_overlap, 0.90) if tpot_overlap else None
p90_clean = _percentile(tpot_clean, 0.90) if tpot_clean else None
idx = None
if p90_overlap is not None and p90_clean and p90_clean > 0:
idx = p90_overlap / p90_clean
return {
"status": "supported" if idx is not None else "partial",
"n_overlap_requests": len(tpot_overlap),
"n_clean_requests": len(tpot_clean),
"tpot_p90_overlap_s": p90_overlap,
"tpot_p90_clean_s": p90_clean,
"interference_index": idx,
}
def _normalize_worker(url_or_id: str | None) -> str | None:
"""Best-effort: map URLs like http://h:8000 to engine_0 by base-port 8000.
Use `_resolve_worker(url, worker_map)` instead when you have an
explicit URL→worker_id map (e.g. from bench.sh).
"""
if not url_or_id:
return None
if url_or_id.startswith("engine_"):
return url_or_id
try:
port = int(url_or_id.rsplit(":", 1)[1].split("/")[0])
return f"engine_{port - 8000}"
except (ValueError, IndexError):
return url_or_id
def _resolve_worker(
url_or_id: str | None,
worker_map: dict[str, str] | None,
) -> str | None:
if not url_or_id:
return None
if worker_map and url_or_id in worker_map:
return worker_map[url_or_id]
return _normalize_worker(url_or_id)
def _vllm_rid_matches(vllm_rid: Any, proxy_rid: Any) -> bool:
"""vLLM internally rewrites rid as `cmpl-<proxy_id>-<i>-<hash>`."""
if vllm_rid is None or proxy_rid is None:
return False
if vllm_rid == proxy_rid:
return True
s = str(vllm_rid)
p = str(proxy_rid)
return s.startswith(f"cmpl-{p}-") or s.startswith(f"chatcmpl-{p}-")
# ---------- Hotspot index (B3) ------------------------------------------
def hotspot_index(joined: list[JsonDict]) -> JsonDict:
"""max/median per-worker queue-delay p90 across completed requests.
Worker key is the raw `routed_to` URL (or proxy `endpoint_url`
fallback), so per-worker rows match the user's mental model.
"""
if not joined:
return {"status": "unavailable"}
by_worker_queue: dict[str, list[float]] = defaultdict(list)
by_worker_latency: dict[str, list[float]] = defaultdict(list)
for r in joined:
if r.get("error"):
continue
# routed_to is the vLLM worker URL; endpoint_url is the proxy URL.
worker = r.get("routed_to") or r.get("endpoint_url")
if not worker:
continue
# queue delay proxy: (t_first_token - t_dispatch) - prefill estimate
# is fragile; use raw TTFT as the queue-stressing signal.
ttft = r.get("ttft_s")
lat = r.get("latency_s")
if ttft is not None:
by_worker_queue[worker].append(float(ttft))
if lat is not None:
by_worker_latency[worker].append(float(lat))
worker_p90_q: dict[str, float] = {
w: _percentile(v, 0.90) for w, v in by_worker_queue.items() if v
}
worker_p90_lat: dict[str, float] = {
w: _percentile(v, 0.90) for w, v in by_worker_latency.items() if v
}
p90s_q = sorted(worker_p90_q.values())
idx = None
if len(p90s_q) >= 2:
# True median: average of two middle values for even-length lists.
# Previously used sorted[n//2] which returns the ~60th percentile
# for n=8 and systematically under-states hotspot_index.
median = statistics.median(p90s_q)
if median > 0:
idx = max(p90s_q) / median
return {
"status": "supported" if idx is not None else "partial",
"per_worker_ttft_p90_s": worker_p90_q,
"per_worker_latency_p90_s": worker_p90_lat,
"hotspot_index_ttft_p90": idx,
}
# ---------- Failure label (B5) ------------------------------------------
DEFAULT_SLO = {
"ttft_p90_s": 2.0,
"tpot_p90_s": 0.15,
}
def label_slow_requests(
joined: list[JsonDict],
engine_state_by_worker: dict[str, list[JsonDict]],
slo: JsonDict | None = None,
slow_ttft_factor: float = 2.0,
worker_map: dict[str, str] | None = None,
) -> list[JsonDict]:
slo = slo or DEFAULT_SLO
ttft_threshold = float(slo["ttft_p90_s"]) * slow_ttft_factor
# Per-worker queue p90 to flag hot workers
by_worker_ttft: dict[str, list[float]] = defaultdict(list)
for r in joined:
if r.get("ttft_s") is not None:
by_worker_ttft[r.get("routed_to") or ""].append(float(r["ttft_s"]))
worker_p90 = {w: _percentile(v, 0.90) for w, v in by_worker_ttft.items() if v}
global_p90 = _percentile(
[v for vs in by_worker_ttft.values() for v in vs], 0.90,
) if by_worker_ttft else None
hot_workers = {w for w, p in worker_p90.items()
if global_p90 and p > global_p90 * 1.2}
labels: list[JsonDict] = []
for r in joined:
ttft = r.get("ttft_s")
if ttft is None or r.get("error"):
continue
if ttft <= ttft_threshold:
continue
label = _assign_label(r, hot_workers, engine_state_by_worker,
worker_map)
labels.append({
"request_id": r.get("request_id"),
"session_id": r.get("session_id"),
"routed_to": r.get("routed_to"),
"ttft_s": ttft,
"latency_s": r.get("latency_s"),
"input_length": r.get("input_length"),
"cached_tokens": r.get("cached_tokens"),
"estimated_new_tokens": r.get("estimated_new_tokens"),
"label": label,
})
return labels
def _assign_label(
r: JsonDict,
hot_workers: set[str],
engine_state_by_worker: dict[str, list[JsonDict]],
worker_map: dict[str, str] | None = None,
) -> str:
worker = _resolve_worker(
r.get("routed_to") or r.get("endpoint_url"),
worker_map,
)
rid = r.get("request_id")
steps = engine_state_by_worker.get(worker, [])
t0 = r.get("t_first_token_unix")
t1 = r.get("t_finish_unix")
if steps and t0 and t1:
for s in steps:
t = s.get("t_unix")
if t is None or t < t0 or t > t1:
continue
for pr in s.get("per_req", []) or []:
if pr.get("phase") != "prefill":
continue
if _vllm_rid_matches(pr.get("rid"), rid):
continue
return "same_worker_prefill_overlap"
if (r.get("routed_to") or "") in hot_workers:
return "hot_worker_queue"
est = r.get("estimated_new_tokens") or 0
inp = r.get("input_length") or 0
if est and inp and est >= 0.5 * inp:
return "cache_miss_large_append"
snap = r.get("worker_state_at_decision") or []
if snap:
chosen_idx = r.get("chosen_idx")
if isinstance(chosen_idx, int) and 0 <= chosen_idx < len(snap):
cached = snap[chosen_idx].get("cached_blocks", 0)
if cached and cached > 50_000:
return "high_kv_occupancy"
return "unknown"
# ---------- Window summary (B4) -----------------------------------------
def window_summary(joined: list[JsonDict], run_meta: JsonDict | None) -> JsonDict:
if not run_meta:
return {"status": "unavailable", "reason": "no run_meta"}
warmup_end = float(run_meta["warmup_end_unix"])
steady_end = float(run_meta["steady_end_unix"])
buckets: dict[str, list[JsonDict]] = {"warmup": [], "steady": [], "drain": []}
for r in joined:
t = r.get("t_dispatch_unix")
if t is None:
continue
if t < warmup_end:
buckets["warmup"].append(r)
elif t < steady_end:
buckets["steady"].append(r)
else:
buckets["drain"].append(r)
out: JsonDict = {"run_meta": run_meta, "windows": {}}
for name, rows in buckets.items():
ttft = [r["ttft_s"] for r in rows if r.get("ttft_s") is not None]
tpot = [r["tpot_s"] for r in rows if r.get("tpot_s") is not None]
e2e = [r["latency_s"] for r in rows if r.get("latency_s") is not None]
errs = sum(1 for r in rows if r.get("error"))
out["windows"][name] = {
"attempted": len(rows),
"completed": len(rows) - errs,
"errored": errs,
"ttft_p50_s": _percentile(ttft, 0.50) if ttft else None,
"ttft_p90_s": _percentile(ttft, 0.90) if ttft else None,
"tpot_p50_s": _percentile(tpot, 0.50) if tpot else None,
"tpot_p90_s": _percentile(tpot, 0.90) if tpot else None,
"e2e_p50_s": _percentile(e2e, 0.50) if e2e else None,
"e2e_p90_s": _percentile(e2e, 0.90) if e2e else None,
}
return out
# ---------- helpers -----------------------------------------------------
def _percentile(values: list[float], pct: float) -> float | None:
if not values:
return None
sorted_vals = sorted(values)
if len(sorted_vals) == 1:
return sorted_vals[0]
rank = pct * (len(sorted_vals) - 1)
lo = int(rank)
hi = min(lo + 1, len(sorted_vals) - 1)
frac = rank - lo
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
def load_engine_state(dir_path: Path) -> dict[str, list[JsonDict]]:
"""Load all engine_*.jsonl files from a directory; key by worker id."""
if not dir_path.exists() or not dir_path.is_dir():
return {}
by_worker: dict[str, list[JsonDict]] = {}
for p in sorted(dir_path.glob("engine_*.jsonl")):
worker_id = p.stem # 'engine_0', etc.
rows = load_jsonl(p)
# Sort steps by time for binary-search-friendly access.
rows.sort(key=lambda r: r.get("t_unix") or 0.0)
by_worker[worker_id] = rows
return by_worker
# ---------- CLI ---------------------------------------------------------
def main(argv: list[str] | None = None) -> None:
p = argparse.ArgumentParser(description="A5 joined analysis")
p.add_argument("--metrics", type=Path, required=True)
p.add_argument("--breakdown", type=Path, default=None)
p.add_argument("--worker-state", type=Path, default=None)
p.add_argument("--engine-state-dir", type=Path, default=None,
help="Directory containing engine_*.jsonl from A3 patch")
p.add_argument("--run-meta", type=Path, default=None,
help="run_meta or window_summary.json from SRR loadgen")
p.add_argument("--out-dir", type=Path, required=True)
p.add_argument("--slow-ttft-factor", type=float, default=2.0)
p.add_argument("--worker-map", type=str, default=None,
help="Comma-separated URL=worker_id pairs, e.g. "
"http://h:9100=engine_0,http://h:9101=engine_1")
args = p.parse_args(argv)
worker_map: dict[str, str] | None = None
if args.worker_map:
worker_map = {}
for entry in args.worker_map.split(","):
url, _, wid = entry.strip().partition("=")
if url and wid:
worker_map[url] = wid
metrics = load_jsonl(args.metrics)
breakdown_raw = load_json(args.breakdown) if args.breakdown else []
if isinstance(breakdown_raw, dict):
breakdown_raw = breakdown_raw.get("records", [breakdown_raw])
breakdown = list(breakdown_raw or [])
worker_state_raw = load_json(args.worker_state) if args.worker_state else []
if isinstance(worker_state_raw, dict):
worker_state_raw = worker_state_raw.get("records", [worker_state_raw])
worker_state = list(worker_state_raw or [])
engine_state = (
load_engine_state(args.engine_state_dir) if args.engine_state_dir else {}
)
run_meta = load_json(args.run_meta) if args.run_meta else None
joined = build_joined_records(metrics, breakdown, worker_state)
args.out_dir.mkdir(parents=True, exist_ok=True)
write_jsonl(args.out_dir / "joined.jsonl", joined)
write_json(args.out_dir / "reuse_decomposition.json",
reuse_decomposition(joined))
write_json(args.out_dir / "interference_index.json",
interference_index(joined, engine_state, worker_map))
write_json(args.out_dir / "hotspot_index.json",
hotspot_index(joined))
labels = label_slow_requests(joined, engine_state,
slow_ttft_factor=args.slow_ttft_factor,
worker_map=worker_map)
write_jsonl(args.out_dir / "failure_label.jsonl", labels)
counts: dict[str, int] = defaultdict(int)
for L in labels:
counts[L["label"]] += 1
write_json(args.out_dir / "failure_breakdown.json",
{"counts": dict(counts), "n_slow": len(labels)})
write_json(args.out_dir / "window_summary.json",
window_summary(joined, run_meta))
if __name__ == "__main__":
main()