"""A5: joined-record analysis from instrumented runs. Inputs (all optional; functions degrade gracefully when missing): - replayer metrics.jsonl with A1 fields (t_dispatch_unix, t_first_token_unix, t_finish_unix, proxy_request_id, endpoint_url, trace_hash_ids) - proxy breakdown.json with A2 fields (session_id, candidate_scores, chosen_score_*, t_first_token_unix, t_done_unix, t_decision_unix) - proxy worker_state.jsonl with A2 schema (one row per route decision) - vLLM scheduler engine_state JSONLs from A3 (one per engine, env AGENTIC_STEP_LOG_PATH) Outputs under /: - joined.jsonl — per-request join across all sources - reuse_decomposition.json - interference_index.json - hotspot_index.json - failure_label.jsonl - window_summary.json — when run_meta.json (from SRR loadgen) is present """ from __future__ import annotations import argparse import json import math import statistics from collections import defaultdict from pathlib import Path from typing import Any, Iterable JsonDict = dict[str, Any] # ---------- I/O --------------------------------------------------------- def load_jsonl(path: Path) -> list[JsonDict]: if not path.exists(): return [] out: list[JsonDict] = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: out.append(json.loads(line)) except json.JSONDecodeError: continue return out def load_json(path: Path) -> Any: if not path.exists(): return None text = path.read_text(encoding="utf-8").strip() if not text: return None return json.loads(text) def write_json(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, sort_keys=True)) def write_jsonl(path: Path, rows: Iterable[JsonDict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as fh: for row in rows: fh.write(json.dumps(row, sort_keys=True) + "\n") # ---------- Joining ----------------------------------------------------- def build_joined_records( metrics: list[JsonDict], breakdown: list[JsonDict], worker_state: list[JsonDict], ) -> list[JsonDict]: """Join metrics + breakdown + worker_state by request_id. Returns one row per metrics record (the load-generator's view of truth). Missing sources leave the corresponding columns as None. """ bk_by_id = {str(r.get("request_id")): r for r in breakdown if r.get("request_id")} ws_by_id = {str(r.get("request_id")): r for r in worker_state if r.get("request_id")} joined: list[JsonDict] = [] for m in metrics: rid = str(m.get("request_id") or m.get("proxy_request_id") or "") bk = bk_by_id.get(rid) ws = ws_by_id.get(rid) row: JsonDict = { "request_id": rid, "session_id": m.get("session_id"), "turn_id": m.get("turn_id"), "trace_timestamp_s": m.get("trace_timestamp_s"), "input_length": m.get("input_length"), "output_length": m.get("output_length"), "cached_tokens": m.get("cached_tokens"), "actual_output_tokens": m.get("actual_output_tokens"), "latency_s": m.get("latency_s"), "ttft_s": m.get("ttft_s"), "tpot_s": m.get("tpot_s"), "t_dispatch_unix": m.get("t_dispatch_unix"), "t_first_token_unix": m.get("t_first_token_unix"), "t_finish_unix": m.get("t_finish_unix"), "endpoint_url": m.get("endpoint_url"), "trace_hash_ids": m.get("trace_hash_ids") or [], "error": m.get("error"), } if bk: row["policy"] = bk.get("policy") row["route_class"] = bk.get("route_class") row["routed_to"] = bk.get("routed_to") row["chosen_idx"] = bk.get("chosen_idx") row["chosen_score_linear"] = bk.get("chosen_score_linear") row["chosen_score_lmetric"] = bk.get("chosen_score_lmetric") row["estimated_new_tokens"] = bk.get("estimated_new_tokens") row["cache_hit_proxy"] = bk.get("cache_hit") row["proxy_t_decision_unix"] = bk.get("t_decision_unix") row["proxy_t_first_token_unix"] = bk.get("t_first_token_unix") row["proxy_t_done_unix"] = bk.get("t_done_unix") if ws: row["worker_state_at_decision"] = ws.get("workers") joined.append(row) return joined # ---------- Reuse decomposition (real) ---------------------------------- def reuse_decomposition(records: list[JsonDict], block_size: int = 16) -> JsonDict: """Real intra/cross/shared decomposition keyed on (session, hash blocks).""" if not records: return {"status": "unavailable", "reason": "no joined records"} # block first-seen index: hash_id -> (session_id, first_seen_seq) first_seen: dict[int, tuple[str, int]] = {} block_sessions: dict[int, set[str]] = defaultdict(set) seq = 0 for r in sorted(records, key=lambda x: x.get("t_dispatch_unix") or 0.0): sid = str(r.get("session_id")) for h in r.get("trace_hash_ids") or []: h_int = int(h) if h_int not in first_seen: first_seen[h_int] = (sid, seq) block_sessions[h_int].add(sid) seq += 1 total_cached = 0 intra = cross = shared = unclassified = 0 for r in records: cached = r.get("cached_tokens") or 0 if not cached: continue total_cached += cached sid = str(r.get("session_id")) hashes = [int(h) for h in (r.get("trace_hash_ids") or [])] if not hashes: unclassified += cached continue # Approximate: classify the cached tokens by the first non-current # owner of any hash block we've seen before. block_tokens = max(cached // max(len(hashes), 1), 1) for h in hashes: first_sid, _ = first_seen.get(h, (None, None)) if first_sid is None: unclassified += min(block_tokens, cached) elif first_sid == sid: intra += min(block_tokens, cached) elif len(block_sessions[h]) >= 8: shared += min(block_tokens, cached) else: cross += min(block_tokens, cached) cached -= block_tokens if cached <= 0: break return { "status": "supported", "total_cached_tokens": total_cached, "intra_session_tokens": intra, "cross_session_tokens": cross, "shared_prefix_tokens": shared, "unclassified_tokens": unclassified, "fractions": _fractions(intra, cross, shared, unclassified), "shared_prefix_min_sessions": 8, } def _fractions(*parts: int) -> JsonDict: total = sum(parts) if total == 0: return {"intra": 0.0, "cross": 0.0, "shared": 0.0, "unclassified": 0.0} labels = ["intra", "cross", "shared", "unclassified"] return {label: parts[i] / total for i, label in enumerate(labels)} # ---------- Interference index (B2) ------------------------------------- def interference_index( joined: list[JsonDict], engine_state_by_worker: dict[str, list[JsonDict]], worker_map: dict[str, str] | None = None, ) -> JsonDict: """Label each completed request's decode period as overlap / no-overlap. A request 'overlaps same-worker prefill' if any scheduler step on the chosen worker between (t_first_token_unix, t_finish_unix) had prefill_tokens > 0 from a request other than this one. """ if not joined or not engine_state_by_worker: return {"status": "unavailable", "reason": "missing joined records or engine state"} tpot_overlap: list[float] = [] tpot_clean: list[float] = [] for r in joined: rid = r.get("request_id") # routed_to is the vLLM worker URL; endpoint_url is the proxy URL. # For worker-id matching we want routed_to. worker = _resolve_worker( r.get("routed_to") or r.get("endpoint_url"), worker_map, ) steps = engine_state_by_worker.get(worker) if not steps: continue t0 = r.get("t_first_token_unix") t1 = r.get("t_finish_unix") tpot = r.get("tpot_s") if t0 is None or t1 is None or tpot is None or r.get("error"): continue overlap = False for s in steps: t = s.get("t_unix") if t is None or t < t0 or t > t1: continue if not s.get("prefill_tokens"): continue # If the only prefill belongs to *this* request, that's still # this request's own prefill warming up, not interference. other_prefill = False for pr in s.get("per_req", []) or []: if pr.get("phase") != "prefill": continue # vLLM rewrites rid as `cmpl---`; strip # the prefix and the trailing suffix so equality to our # proxy request_id works. if _vllm_rid_matches(pr.get("rid"), rid): continue other_prefill = True break if other_prefill: overlap = True break (tpot_overlap if overlap else tpot_clean).append(float(tpot)) p90_overlap = _percentile(tpot_overlap, 0.90) if tpot_overlap else None p90_clean = _percentile(tpot_clean, 0.90) if tpot_clean else None idx = None if p90_overlap is not None and p90_clean and p90_clean > 0: idx = p90_overlap / p90_clean return { "status": "supported" if idx is not None else "partial", "n_overlap_requests": len(tpot_overlap), "n_clean_requests": len(tpot_clean), "tpot_p90_overlap_s": p90_overlap, "tpot_p90_clean_s": p90_clean, "interference_index": idx, } def _normalize_worker(url_or_id: str | None) -> str | None: """Best-effort: map URLs like http://h:8000 to engine_0 by base-port 8000. Use `_resolve_worker(url, worker_map)` instead when you have an explicit URL→worker_id map (e.g. from bench.sh). """ if not url_or_id: return None if url_or_id.startswith("engine_"): return url_or_id try: port = int(url_or_id.rsplit(":", 1)[1].split("/")[0]) return f"engine_{port - 8000}" except (ValueError, IndexError): return url_or_id def _resolve_worker( url_or_id: str | None, worker_map: dict[str, str] | None, ) -> str | None: if not url_or_id: return None if worker_map and url_or_id in worker_map: return worker_map[url_or_id] return _normalize_worker(url_or_id) def _vllm_rid_matches(vllm_rid: Any, proxy_rid: Any) -> bool: """vLLM internally rewrites rid as `cmpl---`.""" if vllm_rid is None or proxy_rid is None: return False if vllm_rid == proxy_rid: return True s = str(vllm_rid) p = str(proxy_rid) return s.startswith(f"cmpl-{p}-") or s.startswith(f"chatcmpl-{p}-") # ---------- Hotspot index (B3) ------------------------------------------ def hotspot_index(joined: list[JsonDict]) -> JsonDict: """max/median per-worker queue-delay p90 across completed requests. Worker key is the raw `routed_to` URL (or proxy `endpoint_url` fallback), so per-worker rows match the user's mental model. """ if not joined: return {"status": "unavailable"} by_worker_queue: dict[str, list[float]] = defaultdict(list) by_worker_latency: dict[str, list[float]] = defaultdict(list) for r in joined: if r.get("error"): continue # routed_to is the vLLM worker URL; endpoint_url is the proxy URL. worker = r.get("routed_to") or r.get("endpoint_url") if not worker: continue # queue delay proxy: (t_first_token - t_dispatch) - prefill estimate # is fragile; use raw TTFT as the queue-stressing signal. ttft = r.get("ttft_s") lat = r.get("latency_s") if ttft is not None: by_worker_queue[worker].append(float(ttft)) if lat is not None: by_worker_latency[worker].append(float(lat)) worker_p90_q: dict[str, float] = { w: _percentile(v, 0.90) for w, v in by_worker_queue.items() if v } worker_p90_lat: dict[str, float] = { w: _percentile(v, 0.90) for w, v in by_worker_latency.items() if v } p90s_q = sorted(worker_p90_q.values()) idx = None if len(p90s_q) >= 2: # True median: average of two middle values for even-length lists. # Previously used sorted[n//2] which returns the ~60th percentile # for n=8 and systematically under-states hotspot_index. median = statistics.median(p90s_q) if median > 0: idx = max(p90s_q) / median return { "status": "supported" if idx is not None else "partial", "per_worker_ttft_p90_s": worker_p90_q, "per_worker_latency_p90_s": worker_p90_lat, "hotspot_index_ttft_p90": idx, } # ---------- Failure label (B5) ------------------------------------------ DEFAULT_SLO = { "ttft_p90_s": 2.0, "tpot_p90_s": 0.15, } def label_slow_requests( joined: list[JsonDict], engine_state_by_worker: dict[str, list[JsonDict]], slo: JsonDict | None = None, slow_ttft_factor: float = 2.0, worker_map: dict[str, str] | None = None, ) -> list[JsonDict]: slo = slo or DEFAULT_SLO ttft_threshold = float(slo["ttft_p90_s"]) * slow_ttft_factor # Per-worker queue p90 to flag hot workers by_worker_ttft: dict[str, list[float]] = defaultdict(list) for r in joined: if r.get("ttft_s") is not None: by_worker_ttft[r.get("routed_to") or ""].append(float(r["ttft_s"])) worker_p90 = {w: _percentile(v, 0.90) for w, v in by_worker_ttft.items() if v} global_p90 = _percentile( [v for vs in by_worker_ttft.values() for v in vs], 0.90, ) if by_worker_ttft else None hot_workers = {w for w, p in worker_p90.items() if global_p90 and p > global_p90 * 1.2} labels: list[JsonDict] = [] for r in joined: ttft = r.get("ttft_s") if ttft is None or r.get("error"): continue if ttft <= ttft_threshold: continue label = _assign_label(r, hot_workers, engine_state_by_worker, worker_map) labels.append({ "request_id": r.get("request_id"), "session_id": r.get("session_id"), "routed_to": r.get("routed_to"), "ttft_s": ttft, "latency_s": r.get("latency_s"), "input_length": r.get("input_length"), "cached_tokens": r.get("cached_tokens"), "estimated_new_tokens": r.get("estimated_new_tokens"), "label": label, }) return labels def _assign_label( r: JsonDict, hot_workers: set[str], engine_state_by_worker: dict[str, list[JsonDict]], worker_map: dict[str, str] | None = None, ) -> str: worker = _resolve_worker( r.get("routed_to") or r.get("endpoint_url"), worker_map, ) rid = r.get("request_id") steps = engine_state_by_worker.get(worker, []) t0 = r.get("t_first_token_unix") t1 = r.get("t_finish_unix") if steps and t0 and t1: for s in steps: t = s.get("t_unix") if t is None or t < t0 or t > t1: continue for pr in s.get("per_req", []) or []: if pr.get("phase") != "prefill": continue if _vllm_rid_matches(pr.get("rid"), rid): continue return "same_worker_prefill_overlap" if (r.get("routed_to") or "") in hot_workers: return "hot_worker_queue" est = r.get("estimated_new_tokens") or 0 inp = r.get("input_length") or 0 if est and inp and est >= 0.5 * inp: return "cache_miss_large_append" snap = r.get("worker_state_at_decision") or [] if snap: chosen_idx = r.get("chosen_idx") if isinstance(chosen_idx, int) and 0 <= chosen_idx < len(snap): cached = snap[chosen_idx].get("cached_blocks", 0) if cached and cached > 50_000: return "high_kv_occupancy" return "unknown" # ---------- Window summary (B4) ----------------------------------------- def window_summary(joined: list[JsonDict], run_meta: JsonDict | None) -> JsonDict: if not run_meta: return {"status": "unavailable", "reason": "no run_meta"} warmup_end = float(run_meta["warmup_end_unix"]) steady_end = float(run_meta["steady_end_unix"]) buckets: dict[str, list[JsonDict]] = {"warmup": [], "steady": [], "drain": []} for r in joined: t = r.get("t_dispatch_unix") if t is None: continue if t < warmup_end: buckets["warmup"].append(r) elif t < steady_end: buckets["steady"].append(r) else: buckets["drain"].append(r) out: JsonDict = {"run_meta": run_meta, "windows": {}} for name, rows in buckets.items(): ttft = [r["ttft_s"] for r in rows if r.get("ttft_s") is not None] tpot = [r["tpot_s"] for r in rows if r.get("tpot_s") is not None] e2e = [r["latency_s"] for r in rows if r.get("latency_s") is not None] errs = sum(1 for r in rows if r.get("error")) out["windows"][name] = { "attempted": len(rows), "completed": len(rows) - errs, "errored": errs, "ttft_p50_s": _percentile(ttft, 0.50) if ttft else None, "ttft_p90_s": _percentile(ttft, 0.90) if ttft else None, "tpot_p50_s": _percentile(tpot, 0.50) if tpot else None, "tpot_p90_s": _percentile(tpot, 0.90) if tpot else None, "e2e_p50_s": _percentile(e2e, 0.50) if e2e else None, "e2e_p90_s": _percentile(e2e, 0.90) if e2e else None, } return out # ---------- helpers ----------------------------------------------------- def _percentile(values: list[float], pct: float) -> float | None: if not values: return None sorted_vals = sorted(values) if len(sorted_vals) == 1: return sorted_vals[0] rank = pct * (len(sorted_vals) - 1) lo = int(rank) hi = min(lo + 1, len(sorted_vals) - 1) frac = rank - lo return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac def load_engine_state(dir_path: Path) -> dict[str, list[JsonDict]]: """Load all engine_*.jsonl files from a directory; key by worker id.""" if not dir_path.exists() or not dir_path.is_dir(): return {} by_worker: dict[str, list[JsonDict]] = {} for p in sorted(dir_path.glob("engine_*.jsonl")): worker_id = p.stem # 'engine_0', etc. rows = load_jsonl(p) # Sort steps by time for binary-search-friendly access. rows.sort(key=lambda r: r.get("t_unix") or 0.0) by_worker[worker_id] = rows return by_worker # ---------- CLI --------------------------------------------------------- def main(argv: list[str] | None = None) -> None: p = argparse.ArgumentParser(description="A5 joined analysis") p.add_argument("--metrics", type=Path, required=True) p.add_argument("--breakdown", type=Path, default=None) p.add_argument("--worker-state", type=Path, default=None) p.add_argument("--engine-state-dir", type=Path, default=None, help="Directory containing engine_*.jsonl from A3 patch") p.add_argument("--run-meta", type=Path, default=None, help="run_meta or window_summary.json from SRR loadgen") p.add_argument("--out-dir", type=Path, required=True) p.add_argument("--slow-ttft-factor", type=float, default=2.0) p.add_argument("--worker-map", type=str, default=None, help="Comma-separated URL=worker_id pairs, e.g. " "http://h:9100=engine_0,http://h:9101=engine_1") args = p.parse_args(argv) worker_map: dict[str, str] | None = None if args.worker_map: worker_map = {} for entry in args.worker_map.split(","): url, _, wid = entry.strip().partition("=") if url and wid: worker_map[url] = wid metrics = load_jsonl(args.metrics) breakdown_raw = load_json(args.breakdown) if args.breakdown else [] if isinstance(breakdown_raw, dict): breakdown_raw = breakdown_raw.get("records", [breakdown_raw]) breakdown = list(breakdown_raw or []) worker_state_raw = load_json(args.worker_state) if args.worker_state else [] if isinstance(worker_state_raw, dict): worker_state_raw = worker_state_raw.get("records", [worker_state_raw]) worker_state = list(worker_state_raw or []) engine_state = ( load_engine_state(args.engine_state_dir) if args.engine_state_dir else {} ) run_meta = load_json(args.run_meta) if args.run_meta else None joined = build_joined_records(metrics, breakdown, worker_state) args.out_dir.mkdir(parents=True, exist_ok=True) write_jsonl(args.out_dir / "joined.jsonl", joined) write_json(args.out_dir / "reuse_decomposition.json", reuse_decomposition(joined)) write_json(args.out_dir / "interference_index.json", interference_index(joined, engine_state, worker_map)) write_json(args.out_dir / "hotspot_index.json", hotspot_index(joined)) labels = label_slow_requests(joined, engine_state, slow_ttft_factor=args.slow_ttft_factor, worker_map=worker_map) write_jsonl(args.out_dir / "failure_label.jsonl", labels) counts: dict[str, int] = defaultdict(int) for L in labels: counts[L["label"]] += 1 write_json(args.out_dir / "failure_breakdown.json", {"counts": dict(counts), "n_slow": len(labels)}) write_json(args.out_dir / "window_summary.json", window_summary(joined, run_meta)) if __name__ == "__main__": main()