Three fixes from the B3 audit:
1) joined_analysis.hotspot_index used sorted[n//2] as median, which
returns the ~60th percentile for n=8 (even-length). Systematically
under-states the hotspot index. Recomputed values:
lmetric 2.238 -> 2.253 (+0.7%)
load_only 1.140 -> 1.294 (+13.5%)
sticky 2.349 -> 2.728 (+16.1%)
unified 3.350 -> 3.667 (+9.5%)
capped 1.937 -> 2.020 (+4.3%)
Qualitative ranking preserved; "capped only modestly reduces hotspot"
story holds with ~10% drop instead of the previously reported 13%.
Added test_hotspot_index_uses_true_median_for_even_n to lock in the
fix.
2) b3_analyze.sh's pct() helper used floor-indexed percentile
sorted[int(p*(n-1))], inconsistent with metrics._percentile and
joined_analysis._percentile which both use linear interpolation.
Now matches.
3) b3_sweep.sh's capped step called run_policy "capped", but the
proxy's argparse has no "capped" choice, so the hot-sweep variant
would have crashed on this step. The actual capped data was
produced via b3_isolated_policy.sh with --policy lmetric. Replace
the broken inline call with an explicit launch_proxy lmetric +
inline replayer block so the sweep script matches the data path
it documents.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
597 lines
22 KiB
Python
597 lines
22 KiB
Python
"""A5: joined-record analysis from instrumented runs.
|
|
|
|
Inputs (all optional; functions degrade gracefully when missing):
|
|
|
|
- replayer metrics.jsonl with A1 fields
|
|
(t_dispatch_unix, t_first_token_unix, t_finish_unix, proxy_request_id,
|
|
endpoint_url, trace_hash_ids)
|
|
- proxy breakdown.json with A2 fields
|
|
(session_id, candidate_scores, chosen_score_*, t_first_token_unix,
|
|
t_done_unix, t_decision_unix)
|
|
- proxy worker_state.jsonl with A2 schema (one row per route decision)
|
|
- vLLM scheduler engine_state JSONLs from A3
|
|
(one per engine, env AGENTIC_STEP_LOG_PATH)
|
|
|
|
Outputs under <out_dir>/:
|
|
|
|
- joined.jsonl — per-request join across all sources
|
|
- reuse_decomposition.json
|
|
- interference_index.json
|
|
- hotspot_index.json
|
|
- failure_label.jsonl
|
|
- window_summary.json — when run_meta.json (from SRR loadgen) is present
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import statistics
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
JsonDict = dict[str, Any]
|
|
|
|
|
|
# ---------- I/O ---------------------------------------------------------
|
|
|
|
def load_jsonl(path: Path) -> list[JsonDict]:
|
|
if not path.exists():
|
|
return []
|
|
out: list[JsonDict] = []
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
out.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return out
|
|
|
|
|
|
def load_json(path: Path) -> Any:
|
|
if not path.exists():
|
|
return None
|
|
text = path.read_text(encoding="utf-8").strip()
|
|
if not text:
|
|
return None
|
|
return json.loads(text)
|
|
|
|
|
|
def write_json(path: Path, data: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(data, indent=2, sort_keys=True))
|
|
|
|
|
|
def write_jsonl(path: Path, rows: Iterable[JsonDict]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as fh:
|
|
for row in rows:
|
|
fh.write(json.dumps(row, sort_keys=True) + "\n")
|
|
|
|
|
|
# ---------- Joining -----------------------------------------------------
|
|
|
|
def build_joined_records(
|
|
metrics: list[JsonDict],
|
|
breakdown: list[JsonDict],
|
|
worker_state: list[JsonDict],
|
|
) -> list[JsonDict]:
|
|
"""Join metrics + breakdown + worker_state by request_id.
|
|
|
|
Returns one row per metrics record (the load-generator's view of truth).
|
|
Missing sources leave the corresponding columns as None.
|
|
"""
|
|
bk_by_id = {str(r.get("request_id")): r for r in breakdown if r.get("request_id")}
|
|
ws_by_id = {str(r.get("request_id")): r for r in worker_state if r.get("request_id")}
|
|
|
|
joined: list[JsonDict] = []
|
|
for m in metrics:
|
|
rid = str(m.get("request_id") or m.get("proxy_request_id") or "")
|
|
bk = bk_by_id.get(rid)
|
|
ws = ws_by_id.get(rid)
|
|
row: JsonDict = {
|
|
"request_id": rid,
|
|
"session_id": m.get("session_id"),
|
|
"turn_id": m.get("turn_id"),
|
|
"trace_timestamp_s": m.get("trace_timestamp_s"),
|
|
"input_length": m.get("input_length"),
|
|
"output_length": m.get("output_length"),
|
|
"cached_tokens": m.get("cached_tokens"),
|
|
"actual_output_tokens": m.get("actual_output_tokens"),
|
|
"latency_s": m.get("latency_s"),
|
|
"ttft_s": m.get("ttft_s"),
|
|
"tpot_s": m.get("tpot_s"),
|
|
"t_dispatch_unix": m.get("t_dispatch_unix"),
|
|
"t_first_token_unix": m.get("t_first_token_unix"),
|
|
"t_finish_unix": m.get("t_finish_unix"),
|
|
"endpoint_url": m.get("endpoint_url"),
|
|
"trace_hash_ids": m.get("trace_hash_ids") or [],
|
|
"error": m.get("error"),
|
|
}
|
|
if bk:
|
|
row["policy"] = bk.get("policy")
|
|
row["route_class"] = bk.get("route_class")
|
|
row["routed_to"] = bk.get("routed_to")
|
|
row["chosen_idx"] = bk.get("chosen_idx")
|
|
row["chosen_score_linear"] = bk.get("chosen_score_linear")
|
|
row["chosen_score_lmetric"] = bk.get("chosen_score_lmetric")
|
|
row["estimated_new_tokens"] = bk.get("estimated_new_tokens")
|
|
row["cache_hit_proxy"] = bk.get("cache_hit")
|
|
row["proxy_t_decision_unix"] = bk.get("t_decision_unix")
|
|
row["proxy_t_first_token_unix"] = bk.get("t_first_token_unix")
|
|
row["proxy_t_done_unix"] = bk.get("t_done_unix")
|
|
if ws:
|
|
row["worker_state_at_decision"] = ws.get("workers")
|
|
joined.append(row)
|
|
return joined
|
|
|
|
|
|
# ---------- Reuse decomposition (real) ----------------------------------
|
|
|
|
def reuse_decomposition(records: list[JsonDict], block_size: int = 16) -> JsonDict:
|
|
"""Real intra/cross/shared decomposition keyed on (session, hash blocks)."""
|
|
if not records:
|
|
return {"status": "unavailable", "reason": "no joined records"}
|
|
|
|
# block first-seen index: hash_id -> (session_id, first_seen_seq)
|
|
first_seen: dict[int, tuple[str, int]] = {}
|
|
block_sessions: dict[int, set[str]] = defaultdict(set)
|
|
seq = 0
|
|
for r in sorted(records, key=lambda x: x.get("t_dispatch_unix") or 0.0):
|
|
sid = str(r.get("session_id"))
|
|
for h in r.get("trace_hash_ids") or []:
|
|
h_int = int(h)
|
|
if h_int not in first_seen:
|
|
first_seen[h_int] = (sid, seq)
|
|
block_sessions[h_int].add(sid)
|
|
seq += 1
|
|
|
|
total_cached = 0
|
|
intra = cross = shared = unclassified = 0
|
|
for r in records:
|
|
cached = r.get("cached_tokens") or 0
|
|
if not cached:
|
|
continue
|
|
total_cached += cached
|
|
sid = str(r.get("session_id"))
|
|
hashes = [int(h) for h in (r.get("trace_hash_ids") or [])]
|
|
if not hashes:
|
|
unclassified += cached
|
|
continue
|
|
# Approximate: classify the cached tokens by the first non-current
|
|
# owner of any hash block we've seen before.
|
|
block_tokens = max(cached // max(len(hashes), 1), 1)
|
|
for h in hashes:
|
|
first_sid, _ = first_seen.get(h, (None, None))
|
|
if first_sid is None:
|
|
unclassified += min(block_tokens, cached)
|
|
elif first_sid == sid:
|
|
intra += min(block_tokens, cached)
|
|
elif len(block_sessions[h]) >= 8:
|
|
shared += min(block_tokens, cached)
|
|
else:
|
|
cross += min(block_tokens, cached)
|
|
cached -= block_tokens
|
|
if cached <= 0:
|
|
break
|
|
|
|
return {
|
|
"status": "supported",
|
|
"total_cached_tokens": total_cached,
|
|
"intra_session_tokens": intra,
|
|
"cross_session_tokens": cross,
|
|
"shared_prefix_tokens": shared,
|
|
"unclassified_tokens": unclassified,
|
|
"fractions": _fractions(intra, cross, shared, unclassified),
|
|
"shared_prefix_min_sessions": 8,
|
|
}
|
|
|
|
|
|
def _fractions(*parts: int) -> JsonDict:
|
|
total = sum(parts)
|
|
if total == 0:
|
|
return {"intra": 0.0, "cross": 0.0, "shared": 0.0, "unclassified": 0.0}
|
|
labels = ["intra", "cross", "shared", "unclassified"]
|
|
return {label: parts[i] / total for i, label in enumerate(labels)}
|
|
|
|
|
|
# ---------- Interference index (B2) -------------------------------------
|
|
|
|
def interference_index(
|
|
joined: list[JsonDict],
|
|
engine_state_by_worker: dict[str, list[JsonDict]],
|
|
worker_map: dict[str, str] | None = None,
|
|
) -> JsonDict:
|
|
"""Label each completed request's decode period as overlap / no-overlap.
|
|
|
|
A request 'overlaps same-worker prefill' if any scheduler step on the
|
|
chosen worker between (t_first_token_unix, t_finish_unix) had
|
|
prefill_tokens > 0 from a request other than this one.
|
|
"""
|
|
if not joined or not engine_state_by_worker:
|
|
return {"status": "unavailable",
|
|
"reason": "missing joined records or engine state"}
|
|
|
|
tpot_overlap: list[float] = []
|
|
tpot_clean: list[float] = []
|
|
for r in joined:
|
|
rid = r.get("request_id")
|
|
# routed_to is the vLLM worker URL; endpoint_url is the proxy URL.
|
|
# For worker-id matching we want routed_to.
|
|
worker = _resolve_worker(
|
|
r.get("routed_to") or r.get("endpoint_url"),
|
|
worker_map,
|
|
)
|
|
steps = engine_state_by_worker.get(worker)
|
|
if not steps:
|
|
continue
|
|
t0 = r.get("t_first_token_unix")
|
|
t1 = r.get("t_finish_unix")
|
|
tpot = r.get("tpot_s")
|
|
if t0 is None or t1 is None or tpot is None or r.get("error"):
|
|
continue
|
|
overlap = False
|
|
for s in steps:
|
|
t = s.get("t_unix")
|
|
if t is None or t < t0 or t > t1:
|
|
continue
|
|
if not s.get("prefill_tokens"):
|
|
continue
|
|
# If the only prefill belongs to *this* request, that's still
|
|
# this request's own prefill warming up, not interference.
|
|
other_prefill = False
|
|
for pr in s.get("per_req", []) or []:
|
|
if pr.get("phase") != "prefill":
|
|
continue
|
|
# vLLM rewrites rid as `cmpl-<our_id>-<idx>-<hash>`; strip
|
|
# the prefix and the trailing suffix so equality to our
|
|
# proxy request_id works.
|
|
if _vllm_rid_matches(pr.get("rid"), rid):
|
|
continue
|
|
other_prefill = True
|
|
break
|
|
if other_prefill:
|
|
overlap = True
|
|
break
|
|
(tpot_overlap if overlap else tpot_clean).append(float(tpot))
|
|
|
|
p90_overlap = _percentile(tpot_overlap, 0.90) if tpot_overlap else None
|
|
p90_clean = _percentile(tpot_clean, 0.90) if tpot_clean else None
|
|
idx = None
|
|
if p90_overlap is not None and p90_clean and p90_clean > 0:
|
|
idx = p90_overlap / p90_clean
|
|
|
|
return {
|
|
"status": "supported" if idx is not None else "partial",
|
|
"n_overlap_requests": len(tpot_overlap),
|
|
"n_clean_requests": len(tpot_clean),
|
|
"tpot_p90_overlap_s": p90_overlap,
|
|
"tpot_p90_clean_s": p90_clean,
|
|
"interference_index": idx,
|
|
}
|
|
|
|
|
|
def _normalize_worker(url_or_id: str | None) -> str | None:
|
|
"""Best-effort: map URLs like http://h:8000 to engine_0 by base-port 8000.
|
|
|
|
Use `_resolve_worker(url, worker_map)` instead when you have an
|
|
explicit URL→worker_id map (e.g. from bench.sh).
|
|
"""
|
|
if not url_or_id:
|
|
return None
|
|
if url_or_id.startswith("engine_"):
|
|
return url_or_id
|
|
try:
|
|
port = int(url_or_id.rsplit(":", 1)[1].split("/")[0])
|
|
return f"engine_{port - 8000}"
|
|
except (ValueError, IndexError):
|
|
return url_or_id
|
|
|
|
|
|
def _resolve_worker(
|
|
url_or_id: str | None,
|
|
worker_map: dict[str, str] | None,
|
|
) -> str | None:
|
|
if not url_or_id:
|
|
return None
|
|
if worker_map and url_or_id in worker_map:
|
|
return worker_map[url_or_id]
|
|
return _normalize_worker(url_or_id)
|
|
|
|
|
|
def _vllm_rid_matches(vllm_rid: Any, proxy_rid: Any) -> bool:
|
|
"""vLLM internally rewrites rid as `cmpl-<proxy_id>-<i>-<hash>`."""
|
|
if vllm_rid is None or proxy_rid is None:
|
|
return False
|
|
if vllm_rid == proxy_rid:
|
|
return True
|
|
s = str(vllm_rid)
|
|
p = str(proxy_rid)
|
|
return s.startswith(f"cmpl-{p}-") or s.startswith(f"chatcmpl-{p}-")
|
|
|
|
|
|
# ---------- Hotspot index (B3) ------------------------------------------
|
|
|
|
def hotspot_index(joined: list[JsonDict]) -> JsonDict:
|
|
"""max/median per-worker queue-delay p90 across completed requests.
|
|
|
|
Worker key is the raw `routed_to` URL (or proxy `endpoint_url`
|
|
fallback), so per-worker rows match the user's mental model.
|
|
"""
|
|
if not joined:
|
|
return {"status": "unavailable"}
|
|
|
|
by_worker_queue: dict[str, list[float]] = defaultdict(list)
|
|
by_worker_latency: dict[str, list[float]] = defaultdict(list)
|
|
for r in joined:
|
|
if r.get("error"):
|
|
continue
|
|
# routed_to is the vLLM worker URL; endpoint_url is the proxy URL.
|
|
worker = r.get("routed_to") or r.get("endpoint_url")
|
|
if not worker:
|
|
continue
|
|
# queue delay proxy: (t_first_token - t_dispatch) - prefill estimate
|
|
# is fragile; use raw TTFT as the queue-stressing signal.
|
|
ttft = r.get("ttft_s")
|
|
lat = r.get("latency_s")
|
|
if ttft is not None:
|
|
by_worker_queue[worker].append(float(ttft))
|
|
if lat is not None:
|
|
by_worker_latency[worker].append(float(lat))
|
|
|
|
worker_p90_q: dict[str, float] = {
|
|
w: _percentile(v, 0.90) for w, v in by_worker_queue.items() if v
|
|
}
|
|
worker_p90_lat: dict[str, float] = {
|
|
w: _percentile(v, 0.90) for w, v in by_worker_latency.items() if v
|
|
}
|
|
p90s_q = sorted(worker_p90_q.values())
|
|
idx = None
|
|
if len(p90s_q) >= 2:
|
|
# True median: average of two middle values for even-length lists.
|
|
# Previously used sorted[n//2] which returns the ~60th percentile
|
|
# for n=8 and systematically under-states hotspot_index.
|
|
median = statistics.median(p90s_q)
|
|
if median > 0:
|
|
idx = max(p90s_q) / median
|
|
|
|
return {
|
|
"status": "supported" if idx is not None else "partial",
|
|
"per_worker_ttft_p90_s": worker_p90_q,
|
|
"per_worker_latency_p90_s": worker_p90_lat,
|
|
"hotspot_index_ttft_p90": idx,
|
|
}
|
|
|
|
|
|
# ---------- Failure label (B5) ------------------------------------------
|
|
|
|
DEFAULT_SLO = {
|
|
"ttft_p90_s": 2.0,
|
|
"tpot_p90_s": 0.15,
|
|
}
|
|
|
|
|
|
def label_slow_requests(
|
|
joined: list[JsonDict],
|
|
engine_state_by_worker: dict[str, list[JsonDict]],
|
|
slo: JsonDict | None = None,
|
|
slow_ttft_factor: float = 2.0,
|
|
worker_map: dict[str, str] | None = None,
|
|
) -> list[JsonDict]:
|
|
slo = slo or DEFAULT_SLO
|
|
ttft_threshold = float(slo["ttft_p90_s"]) * slow_ttft_factor
|
|
|
|
# Per-worker queue p90 to flag hot workers
|
|
by_worker_ttft: dict[str, list[float]] = defaultdict(list)
|
|
for r in joined:
|
|
if r.get("ttft_s") is not None:
|
|
by_worker_ttft[r.get("routed_to") or ""].append(float(r["ttft_s"]))
|
|
worker_p90 = {w: _percentile(v, 0.90) for w, v in by_worker_ttft.items() if v}
|
|
global_p90 = _percentile(
|
|
[v for vs in by_worker_ttft.values() for v in vs], 0.90,
|
|
) if by_worker_ttft else None
|
|
hot_workers = {w for w, p in worker_p90.items()
|
|
if global_p90 and p > global_p90 * 1.2}
|
|
|
|
labels: list[JsonDict] = []
|
|
for r in joined:
|
|
ttft = r.get("ttft_s")
|
|
if ttft is None or r.get("error"):
|
|
continue
|
|
if ttft <= ttft_threshold:
|
|
continue
|
|
label = _assign_label(r, hot_workers, engine_state_by_worker,
|
|
worker_map)
|
|
labels.append({
|
|
"request_id": r.get("request_id"),
|
|
"session_id": r.get("session_id"),
|
|
"routed_to": r.get("routed_to"),
|
|
"ttft_s": ttft,
|
|
"latency_s": r.get("latency_s"),
|
|
"input_length": r.get("input_length"),
|
|
"cached_tokens": r.get("cached_tokens"),
|
|
"estimated_new_tokens": r.get("estimated_new_tokens"),
|
|
"label": label,
|
|
})
|
|
return labels
|
|
|
|
|
|
def _assign_label(
|
|
r: JsonDict,
|
|
hot_workers: set[str],
|
|
engine_state_by_worker: dict[str, list[JsonDict]],
|
|
worker_map: dict[str, str] | None = None,
|
|
) -> str:
|
|
worker = _resolve_worker(
|
|
r.get("routed_to") or r.get("endpoint_url"),
|
|
worker_map,
|
|
)
|
|
rid = r.get("request_id")
|
|
steps = engine_state_by_worker.get(worker, [])
|
|
t0 = r.get("t_first_token_unix")
|
|
t1 = r.get("t_finish_unix")
|
|
if steps and t0 and t1:
|
|
for s in steps:
|
|
t = s.get("t_unix")
|
|
if t is None or t < t0 or t > t1:
|
|
continue
|
|
for pr in s.get("per_req", []) or []:
|
|
if pr.get("phase") != "prefill":
|
|
continue
|
|
if _vllm_rid_matches(pr.get("rid"), rid):
|
|
continue
|
|
return "same_worker_prefill_overlap"
|
|
if (r.get("routed_to") or "") in hot_workers:
|
|
return "hot_worker_queue"
|
|
est = r.get("estimated_new_tokens") or 0
|
|
inp = r.get("input_length") or 0
|
|
if est and inp and est >= 0.5 * inp:
|
|
return "cache_miss_large_append"
|
|
snap = r.get("worker_state_at_decision") or []
|
|
if snap:
|
|
chosen_idx = r.get("chosen_idx")
|
|
if isinstance(chosen_idx, int) and 0 <= chosen_idx < len(snap):
|
|
cached = snap[chosen_idx].get("cached_blocks", 0)
|
|
if cached and cached > 50_000:
|
|
return "high_kv_occupancy"
|
|
return "unknown"
|
|
|
|
|
|
# ---------- Window summary (B4) -----------------------------------------
|
|
|
|
def window_summary(joined: list[JsonDict], run_meta: JsonDict | None) -> JsonDict:
|
|
if not run_meta:
|
|
return {"status": "unavailable", "reason": "no run_meta"}
|
|
warmup_end = float(run_meta["warmup_end_unix"])
|
|
steady_end = float(run_meta["steady_end_unix"])
|
|
|
|
buckets: dict[str, list[JsonDict]] = {"warmup": [], "steady": [], "drain": []}
|
|
for r in joined:
|
|
t = r.get("t_dispatch_unix")
|
|
if t is None:
|
|
continue
|
|
if t < warmup_end:
|
|
buckets["warmup"].append(r)
|
|
elif t < steady_end:
|
|
buckets["steady"].append(r)
|
|
else:
|
|
buckets["drain"].append(r)
|
|
|
|
out: JsonDict = {"run_meta": run_meta, "windows": {}}
|
|
for name, rows in buckets.items():
|
|
ttft = [r["ttft_s"] for r in rows if r.get("ttft_s") is not None]
|
|
tpot = [r["tpot_s"] for r in rows if r.get("tpot_s") is not None]
|
|
e2e = [r["latency_s"] for r in rows if r.get("latency_s") is not None]
|
|
errs = sum(1 for r in rows if r.get("error"))
|
|
out["windows"][name] = {
|
|
"attempted": len(rows),
|
|
"completed": len(rows) - errs,
|
|
"errored": errs,
|
|
"ttft_p50_s": _percentile(ttft, 0.50) if ttft else None,
|
|
"ttft_p90_s": _percentile(ttft, 0.90) if ttft else None,
|
|
"tpot_p50_s": _percentile(tpot, 0.50) if tpot else None,
|
|
"tpot_p90_s": _percentile(tpot, 0.90) if tpot else None,
|
|
"e2e_p50_s": _percentile(e2e, 0.50) if e2e else None,
|
|
"e2e_p90_s": _percentile(e2e, 0.90) if e2e else None,
|
|
}
|
|
return out
|
|
|
|
|
|
# ---------- helpers -----------------------------------------------------
|
|
|
|
def _percentile(values: list[float], pct: float) -> float | None:
|
|
if not values:
|
|
return None
|
|
sorted_vals = sorted(values)
|
|
if len(sorted_vals) == 1:
|
|
return sorted_vals[0]
|
|
rank = pct * (len(sorted_vals) - 1)
|
|
lo = int(rank)
|
|
hi = min(lo + 1, len(sorted_vals) - 1)
|
|
frac = rank - lo
|
|
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
|
|
|
|
|
|
def load_engine_state(dir_path: Path) -> dict[str, list[JsonDict]]:
|
|
"""Load all engine_*.jsonl files from a directory; key by worker id."""
|
|
if not dir_path.exists() or not dir_path.is_dir():
|
|
return {}
|
|
by_worker: dict[str, list[JsonDict]] = {}
|
|
for p in sorted(dir_path.glob("engine_*.jsonl")):
|
|
worker_id = p.stem # 'engine_0', etc.
|
|
rows = load_jsonl(p)
|
|
# Sort steps by time for binary-search-friendly access.
|
|
rows.sort(key=lambda r: r.get("t_unix") or 0.0)
|
|
by_worker[worker_id] = rows
|
|
return by_worker
|
|
|
|
|
|
# ---------- CLI ---------------------------------------------------------
|
|
|
|
def main(argv: list[str] | None = None) -> None:
|
|
p = argparse.ArgumentParser(description="A5 joined analysis")
|
|
p.add_argument("--metrics", type=Path, required=True)
|
|
p.add_argument("--breakdown", type=Path, default=None)
|
|
p.add_argument("--worker-state", type=Path, default=None)
|
|
p.add_argument("--engine-state-dir", type=Path, default=None,
|
|
help="Directory containing engine_*.jsonl from A3 patch")
|
|
p.add_argument("--run-meta", type=Path, default=None,
|
|
help="run_meta or window_summary.json from SRR loadgen")
|
|
p.add_argument("--out-dir", type=Path, required=True)
|
|
p.add_argument("--slow-ttft-factor", type=float, default=2.0)
|
|
p.add_argument("--worker-map", type=str, default=None,
|
|
help="Comma-separated URL=worker_id pairs, e.g. "
|
|
"http://h:9100=engine_0,http://h:9101=engine_1")
|
|
args = p.parse_args(argv)
|
|
worker_map: dict[str, str] | None = None
|
|
if args.worker_map:
|
|
worker_map = {}
|
|
for entry in args.worker_map.split(","):
|
|
url, _, wid = entry.strip().partition("=")
|
|
if url and wid:
|
|
worker_map[url] = wid
|
|
|
|
metrics = load_jsonl(args.metrics)
|
|
breakdown_raw = load_json(args.breakdown) if args.breakdown else []
|
|
if isinstance(breakdown_raw, dict):
|
|
breakdown_raw = breakdown_raw.get("records", [breakdown_raw])
|
|
breakdown = list(breakdown_raw or [])
|
|
worker_state_raw = load_json(args.worker_state) if args.worker_state else []
|
|
if isinstance(worker_state_raw, dict):
|
|
worker_state_raw = worker_state_raw.get("records", [worker_state_raw])
|
|
worker_state = list(worker_state_raw or [])
|
|
engine_state = (
|
|
load_engine_state(args.engine_state_dir) if args.engine_state_dir else {}
|
|
)
|
|
run_meta = load_json(args.run_meta) if args.run_meta else None
|
|
|
|
joined = build_joined_records(metrics, breakdown, worker_state)
|
|
|
|
args.out_dir.mkdir(parents=True, exist_ok=True)
|
|
write_jsonl(args.out_dir / "joined.jsonl", joined)
|
|
write_json(args.out_dir / "reuse_decomposition.json",
|
|
reuse_decomposition(joined))
|
|
write_json(args.out_dir / "interference_index.json",
|
|
interference_index(joined, engine_state, worker_map))
|
|
write_json(args.out_dir / "hotspot_index.json",
|
|
hotspot_index(joined))
|
|
labels = label_slow_requests(joined, engine_state,
|
|
slow_ttft_factor=args.slow_ttft_factor,
|
|
worker_map=worker_map)
|
|
write_jsonl(args.out_dir / "failure_label.jsonl", labels)
|
|
counts: dict[str, int] = defaultdict(int)
|
|
for L in labels:
|
|
counts[L["label"]] += 1
|
|
write_json(args.out_dir / "failure_breakdown.json",
|
|
{"counts": dict(counts), "n_slow": len(labels)})
|
|
write_json(args.out_dir / "window_summary.json",
|
|
window_summary(joined, run_meta))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|