A5: joined analysis with reuse decomp, interference, hot-spot, labels

New analysis/characterization/joined_analysis.py joins replayer
metrics.jsonl + proxy breakdown.json + worker_state.jsonl by
request_id, plus engine_*.jsonl by worker_id, and emits:

- joined.jsonl              per-request merged record
- reuse_decomposition.json  real intra/cross/shared classification
                            using session_id + hash_ids + cached_tokens
- interference_index.json   TPOT_p90(same-worker prefill overlap)
                            / TPOT_p90(clean), per Batch 2
- hotspot_index.json        max/median worker TTFT-p90, per Batch 3
- failure_label.jsonl       per-slow-request cause label, per Batch 5
- failure_breakdown.json    label histogram
- window_summary.json       SRR warmup/steady/drain aggregates

Closes the analyzer side of Phase A; replaces the
status: unavailable placeholders the existing scaffold emits when
join sources are missing.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 16:19:33 +08:00
parent f42c715ec1
commit 25445e3d18
2 changed files with 701 additions and 0 deletions

View File

@@ -0,0 +1,531 @@
"""A5: joined-record analysis from instrumented runs.
Inputs (all optional; functions degrade gracefully when missing):
- replayer metrics.jsonl with A1 fields
(t_dispatch_unix, t_first_token_unix, t_finish_unix, proxy_request_id,
endpoint_url, trace_hash_ids)
- proxy breakdown.json with A2 fields
(session_id, candidate_scores, chosen_score_*, t_first_token_unix,
t_done_unix, t_decision_unix)
- proxy worker_state.jsonl with A2 schema (one row per route decision)
- vLLM scheduler engine_state JSONLs from A3
(one per engine, env AGENTIC_STEP_LOG_PATH)
Outputs under <out_dir>/:
- joined.jsonl — per-request join across all sources
- reuse_decomposition.json
- interference_index.json
- hotspot_index.json
- failure_label.jsonl
- window_summary.json — when run_meta.json (from SRR loadgen) is present
"""
from __future__ import annotations
import argparse
import json
import math
import statistics
from collections import defaultdict
from pathlib import Path
from typing import Any, Iterable
JsonDict = dict[str, Any]
# ---------- I/O ---------------------------------------------------------
def load_jsonl(path: Path) -> list[JsonDict]:
if not path.exists():
return []
out: list[JsonDict] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
out.append(json.loads(line))
except json.JSONDecodeError:
continue
return out
def load_json(path: Path) -> Any:
if not path.exists():
return None
text = path.read_text(encoding="utf-8").strip()
if not text:
return None
return json.loads(text)
def write_json(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, sort_keys=True))
def write_jsonl(path: Path, rows: Iterable[JsonDict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as fh:
for row in rows:
fh.write(json.dumps(row, sort_keys=True) + "\n")
# ---------- Joining -----------------------------------------------------
def build_joined_records(
metrics: list[JsonDict],
breakdown: list[JsonDict],
worker_state: list[JsonDict],
) -> list[JsonDict]:
"""Join metrics + breakdown + worker_state by request_id.
Returns one row per metrics record (the load-generator's view of truth).
Missing sources leave the corresponding columns as None.
"""
bk_by_id = {str(r.get("request_id")): r for r in breakdown if r.get("request_id")}
ws_by_id = {str(r.get("request_id")): r for r in worker_state if r.get("request_id")}
joined: list[JsonDict] = []
for m in metrics:
rid = str(m.get("request_id") or m.get("proxy_request_id") or "")
bk = bk_by_id.get(rid)
ws = ws_by_id.get(rid)
row: JsonDict = {
"request_id": rid,
"session_id": m.get("session_id"),
"turn_id": m.get("turn_id"),
"trace_timestamp_s": m.get("trace_timestamp_s"),
"input_length": m.get("input_length"),
"output_length": m.get("output_length"),
"cached_tokens": m.get("cached_tokens"),
"actual_output_tokens": m.get("actual_output_tokens"),
"latency_s": m.get("latency_s"),
"ttft_s": m.get("ttft_s"),
"tpot_s": m.get("tpot_s"),
"t_dispatch_unix": m.get("t_dispatch_unix"),
"t_first_token_unix": m.get("t_first_token_unix"),
"t_finish_unix": m.get("t_finish_unix"),
"endpoint_url": m.get("endpoint_url"),
"trace_hash_ids": m.get("trace_hash_ids") or [],
"error": m.get("error"),
}
if bk:
row["policy"] = bk.get("policy")
row["route_class"] = bk.get("route_class")
row["routed_to"] = bk.get("routed_to")
row["chosen_idx"] = bk.get("chosen_idx")
row["chosen_score_linear"] = bk.get("chosen_score_linear")
row["chosen_score_lmetric"] = bk.get("chosen_score_lmetric")
row["estimated_new_tokens"] = bk.get("estimated_new_tokens")
row["cache_hit_proxy"] = bk.get("cache_hit")
row["proxy_t_decision_unix"] = bk.get("t_decision_unix")
row["proxy_t_first_token_unix"] = bk.get("t_first_token_unix")
row["proxy_t_done_unix"] = bk.get("t_done_unix")
if ws:
row["worker_state_at_decision"] = ws.get("workers")
joined.append(row)
return joined
# ---------- Reuse decomposition (real) ----------------------------------
def reuse_decomposition(records: list[JsonDict], block_size: int = 16) -> JsonDict:
"""Real intra/cross/shared decomposition keyed on (session, hash blocks)."""
if not records:
return {"status": "unavailable", "reason": "no joined records"}
# block first-seen index: hash_id -> (session_id, first_seen_seq)
first_seen: dict[int, tuple[str, int]] = {}
block_sessions: dict[int, set[str]] = defaultdict(set)
seq = 0
for r in sorted(records, key=lambda x: x.get("t_dispatch_unix") or 0.0):
sid = str(r.get("session_id"))
for h in r.get("trace_hash_ids") or []:
h_int = int(h)
if h_int not in first_seen:
first_seen[h_int] = (sid, seq)
block_sessions[h_int].add(sid)
seq += 1
total_cached = 0
intra = cross = shared = unclassified = 0
for r in records:
cached = r.get("cached_tokens") or 0
if not cached:
continue
total_cached += cached
sid = str(r.get("session_id"))
hashes = [int(h) for h in (r.get("trace_hash_ids") or [])]
if not hashes:
unclassified += cached
continue
# Approximate: classify the cached tokens by the first non-current
# owner of any hash block we've seen before.
block_tokens = max(cached // max(len(hashes), 1), 1)
for h in hashes:
first_sid, _ = first_seen.get(h, (None, None))
if first_sid is None:
unclassified += min(block_tokens, cached)
elif first_sid == sid:
intra += min(block_tokens, cached)
elif len(block_sessions[h]) >= 8:
shared += min(block_tokens, cached)
else:
cross += min(block_tokens, cached)
cached -= block_tokens
if cached <= 0:
break
return {
"status": "supported",
"total_cached_tokens": total_cached,
"intra_session_tokens": intra,
"cross_session_tokens": cross,
"shared_prefix_tokens": shared,
"unclassified_tokens": unclassified,
"fractions": _fractions(intra, cross, shared, unclassified),
"shared_prefix_min_sessions": 8,
}
def _fractions(*parts: int) -> JsonDict:
total = sum(parts)
if total == 0:
return {"intra": 0.0, "cross": 0.0, "shared": 0.0, "unclassified": 0.0}
labels = ["intra", "cross", "shared", "unclassified"]
return {label: parts[i] / total for i, label in enumerate(labels)}
# ---------- Interference index (B2) -------------------------------------
def interference_index(
joined: list[JsonDict],
engine_state_by_worker: dict[str, list[JsonDict]],
) -> JsonDict:
"""Label each completed request's decode period as overlap / no-overlap.
A request 'overlaps same-worker prefill' if any scheduler step on the
chosen worker between (t_first_token_unix, t_finish_unix) had
prefill_tokens > 0 from a request other than this one.
"""
if not joined or not engine_state_by_worker:
return {"status": "unavailable",
"reason": "missing joined records or engine state"}
tpot_overlap: list[float] = []
tpot_clean: list[float] = []
for r in joined:
rid = r.get("request_id")
worker = _normalize_worker(r.get("endpoint_url") or r.get("routed_to"))
steps = engine_state_by_worker.get(worker)
if not steps:
continue
t0 = r.get("t_first_token_unix")
t1 = r.get("t_finish_unix")
tpot = r.get("tpot_s")
if t0 is None or t1 is None or tpot is None or r.get("error"):
continue
overlap = False
for s in steps:
t = s.get("t_unix")
if t is None or t < t0 or t > t1:
continue
if not s.get("prefill_tokens"):
continue
# If the only prefill belongs to *this* request, that's still
# this request's own prefill warming up, not interference.
other_prefill = False
for pr in s.get("per_req", []) or []:
if pr.get("phase") == "prefill" and pr.get("rid") != rid:
other_prefill = True
break
if other_prefill:
overlap = True
break
(tpot_overlap if overlap else tpot_clean).append(float(tpot))
p90_overlap = _percentile(tpot_overlap, 0.90) if tpot_overlap else None
p90_clean = _percentile(tpot_clean, 0.90) if tpot_clean else None
idx = None
if p90_overlap is not None and p90_clean and p90_clean > 0:
idx = p90_overlap / p90_clean
return {
"status": "supported" if idx is not None else "partial",
"n_overlap_requests": len(tpot_overlap),
"n_clean_requests": len(tpot_clean),
"tpot_p90_overlap_s": p90_overlap,
"tpot_p90_clean_s": p90_clean,
"interference_index": idx,
}
def _normalize_worker(url_or_id: str | None) -> str | None:
"""Map endpoint URLs to AGENTIC_WORKER_ID convention engine_{i}."""
if not url_or_id:
return None
if url_or_id.startswith("engine_"):
return url_or_id
# Endpoint URLs look like http://host:8000; map by port offset against 8000.
try:
port = int(url_or_id.rsplit(":", 1)[1].split("/")[0])
return f"engine_{port - 8000}"
except (ValueError, IndexError):
return url_or_id
# ---------- Hotspot index (B3) ------------------------------------------
def hotspot_index(joined: list[JsonDict]) -> JsonDict:
"""max/median per-worker queue-delay p90 across completed requests."""
if not joined:
return {"status": "unavailable"}
by_worker_queue: dict[str, list[float]] = defaultdict(list)
by_worker_latency: dict[str, list[float]] = defaultdict(list)
for r in joined:
if r.get("error"):
continue
worker = r.get("routed_to") or r.get("endpoint_url")
if not worker:
continue
# queue delay proxy: (t_first_token - t_dispatch) - prefill estimate
# is fragile; use raw TTFT as the queue-stressing signal.
ttft = r.get("ttft_s")
lat = r.get("latency_s")
if ttft is not None:
by_worker_queue[worker].append(float(ttft))
if lat is not None:
by_worker_latency[worker].append(float(lat))
worker_p90_q: dict[str, float] = {
w: _percentile(v, 0.90) for w, v in by_worker_queue.items() if v
}
worker_p90_lat: dict[str, float] = {
w: _percentile(v, 0.90) for w, v in by_worker_latency.items() if v
}
p90s_q = sorted(worker_p90_q.values())
idx = None
if len(p90s_q) >= 2:
median = p90s_q[len(p90s_q) // 2]
if median > 0:
idx = max(p90s_q) / median
return {
"status": "supported" if idx is not None else "partial",
"per_worker_ttft_p90_s": worker_p90_q,
"per_worker_latency_p90_s": worker_p90_lat,
"hotspot_index_ttft_p90": idx,
}
# ---------- Failure label (B5) ------------------------------------------
DEFAULT_SLO = {
"ttft_p90_s": 2.0,
"tpot_p90_s": 0.15,
}
def label_slow_requests(
joined: list[JsonDict],
engine_state_by_worker: dict[str, list[JsonDict]],
slo: JsonDict | None = None,
slow_ttft_factor: float = 2.0,
) -> list[JsonDict]:
slo = slo or DEFAULT_SLO
ttft_threshold = float(slo["ttft_p90_s"]) * slow_ttft_factor
# Per-worker queue p90 to flag hot workers
by_worker_ttft: dict[str, list[float]] = defaultdict(list)
for r in joined:
if r.get("ttft_s") is not None:
by_worker_ttft[r.get("routed_to") or ""].append(float(r["ttft_s"]))
worker_p90 = {w: _percentile(v, 0.90) for w, v in by_worker_ttft.items() if v}
global_p90 = _percentile(
[v for vs in by_worker_ttft.values() for v in vs], 0.90,
) if by_worker_ttft else None
hot_workers = {w for w, p in worker_p90.items()
if global_p90 and p > global_p90 * 1.2}
labels: list[JsonDict] = []
for r in joined:
ttft = r.get("ttft_s")
if ttft is None or r.get("error"):
continue
if ttft <= ttft_threshold:
continue
label = _assign_label(r, hot_workers, engine_state_by_worker)
labels.append({
"request_id": r.get("request_id"),
"session_id": r.get("session_id"),
"routed_to": r.get("routed_to"),
"ttft_s": ttft,
"latency_s": r.get("latency_s"),
"input_length": r.get("input_length"),
"cached_tokens": r.get("cached_tokens"),
"estimated_new_tokens": r.get("estimated_new_tokens"),
"label": label,
})
return labels
def _assign_label(
r: JsonDict,
hot_workers: set[str],
engine_state_by_worker: dict[str, list[JsonDict]],
) -> str:
worker = _normalize_worker(r.get("endpoint_url") or r.get("routed_to"))
rid = r.get("request_id")
steps = engine_state_by_worker.get(worker, [])
t0 = r.get("t_first_token_unix")
t1 = r.get("t_finish_unix")
if steps and t0 and t1:
for s in steps:
t = s.get("t_unix")
if t is None or t < t0 or t > t1:
continue
for pr in s.get("per_req", []) or []:
if pr.get("phase") == "prefill" and pr.get("rid") != rid:
return "same_worker_prefill_overlap"
if (r.get("routed_to") or "") in hot_workers:
return "hot_worker_queue"
est = r.get("estimated_new_tokens") or 0
inp = r.get("input_length") or 0
if est and inp and est >= 0.5 * inp:
return "cache_miss_large_append"
snap = r.get("worker_state_at_decision") or []
if snap:
chosen_idx = r.get("chosen_idx")
if isinstance(chosen_idx, int) and 0 <= chosen_idx < len(snap):
cached = snap[chosen_idx].get("cached_blocks", 0)
if cached and cached > 50_000:
return "high_kv_occupancy"
return "unknown"
# ---------- Window summary (B4) -----------------------------------------
def window_summary(joined: list[JsonDict], run_meta: JsonDict | None) -> JsonDict:
if not run_meta:
return {"status": "unavailable", "reason": "no run_meta"}
warmup_end = float(run_meta["warmup_end_unix"])
steady_end = float(run_meta["steady_end_unix"])
buckets: dict[str, list[JsonDict]] = {"warmup": [], "steady": [], "drain": []}
for r in joined:
t = r.get("t_dispatch_unix")
if t is None:
continue
if t < warmup_end:
buckets["warmup"].append(r)
elif t < steady_end:
buckets["steady"].append(r)
else:
buckets["drain"].append(r)
out: JsonDict = {"run_meta": run_meta, "windows": {}}
for name, rows in buckets.items():
ttft = [r["ttft_s"] for r in rows if r.get("ttft_s") is not None]
tpot = [r["tpot_s"] for r in rows if r.get("tpot_s") is not None]
e2e = [r["latency_s"] for r in rows if r.get("latency_s") is not None]
errs = sum(1 for r in rows if r.get("error"))
out["windows"][name] = {
"attempted": len(rows),
"completed": len(rows) - errs,
"errored": errs,
"ttft_p50_s": _percentile(ttft, 0.50) if ttft else None,
"ttft_p90_s": _percentile(ttft, 0.90) if ttft else None,
"tpot_p50_s": _percentile(tpot, 0.50) if tpot else None,
"tpot_p90_s": _percentile(tpot, 0.90) if tpot else None,
"e2e_p50_s": _percentile(e2e, 0.50) if e2e else None,
"e2e_p90_s": _percentile(e2e, 0.90) if e2e else None,
}
return out
# ---------- helpers -----------------------------------------------------
def _percentile(values: list[float], pct: float) -> float | None:
if not values:
return None
sorted_vals = sorted(values)
if len(sorted_vals) == 1:
return sorted_vals[0]
rank = pct * (len(sorted_vals) - 1)
lo = int(rank)
hi = min(lo + 1, len(sorted_vals) - 1)
frac = rank - lo
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
def load_engine_state(dir_path: Path) -> dict[str, list[JsonDict]]:
"""Load all engine_*.jsonl files from a directory; key by worker id."""
if not dir_path.exists() or not dir_path.is_dir():
return {}
by_worker: dict[str, list[JsonDict]] = {}
for p in sorted(dir_path.glob("engine_*.jsonl")):
worker_id = p.stem # 'engine_0', etc.
rows = load_jsonl(p)
# Sort steps by time for binary-search-friendly access.
rows.sort(key=lambda r: r.get("t_unix") or 0.0)
by_worker[worker_id] = rows
return by_worker
# ---------- CLI ---------------------------------------------------------
def main(argv: list[str] | None = None) -> None:
p = argparse.ArgumentParser(description="A5 joined analysis")
p.add_argument("--metrics", type=Path, required=True)
p.add_argument("--breakdown", type=Path, default=None)
p.add_argument("--worker-state", type=Path, default=None)
p.add_argument("--engine-state-dir", type=Path, default=None,
help="Directory containing engine_*.jsonl from A3 patch")
p.add_argument("--run-meta", type=Path, default=None,
help="run_meta or window_summary.json from SRR loadgen")
p.add_argument("--out-dir", type=Path, required=True)
p.add_argument("--slow-ttft-factor", type=float, default=2.0)
args = p.parse_args(argv)
metrics = load_jsonl(args.metrics)
breakdown_raw = load_json(args.breakdown) if args.breakdown else []
if isinstance(breakdown_raw, dict):
breakdown_raw = breakdown_raw.get("records", [breakdown_raw])
breakdown = list(breakdown_raw or [])
worker_state_raw = load_json(args.worker_state) if args.worker_state else []
if isinstance(worker_state_raw, dict):
worker_state_raw = worker_state_raw.get("records", [worker_state_raw])
worker_state = list(worker_state_raw or [])
engine_state = (
load_engine_state(args.engine_state_dir) if args.engine_state_dir else {}
)
run_meta = load_json(args.run_meta) if args.run_meta else None
joined = build_joined_records(metrics, breakdown, worker_state)
args.out_dir.mkdir(parents=True, exist_ok=True)
write_jsonl(args.out_dir / "joined.jsonl", joined)
write_json(args.out_dir / "reuse_decomposition.json",
reuse_decomposition(joined))
write_json(args.out_dir / "interference_index.json",
interference_index(joined, engine_state))
write_json(args.out_dir / "hotspot_index.json",
hotspot_index(joined))
labels = label_slow_requests(joined, engine_state,
slow_ttft_factor=args.slow_ttft_factor)
write_jsonl(args.out_dir / "failure_label.jsonl", labels)
counts: dict[str, int] = defaultdict(int)
for L in labels:
counts[L["label"]] += 1
write_json(args.out_dir / "failure_breakdown.json",
{"counts": dict(counts), "n_slow": len(labels)})
write_json(args.out_dir / "window_summary.json",
window_summary(joined, run_meta))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,170 @@
"""Tests for A5 joined analysis: join + indices + labels."""
from __future__ import annotations
from analysis.characterization.joined_analysis import (
build_joined_records,
hotspot_index,
interference_index,
label_slow_requests,
reuse_decomposition,
window_summary,
_normalize_worker,
_percentile,
)
def _mk_metric(rid, **kw):
base = {
"request_id": rid, "session_id": "s1", "turn_id": 0,
"trace_timestamp_s": 1.0, "input_length": 1000, "output_length": 50,
"cached_tokens": 0, "actual_output_tokens": 50,
"latency_s": 1.0, "ttft_s": 0.5, "tpot_s": 0.04,
"t_dispatch_unix": 1000.0, "t_first_token_unix": 1000.5,
"t_finish_unix": 1001.0, "endpoint_url": "http://h:8000",
"trace_hash_ids": [], "error": None,
}
base.update(kw)
return base
def test_build_joined_records_merges_by_request_id():
metrics = [_mk_metric("r1"), _mk_metric("r2")]
breakdown = [{"request_id": "r1", "policy": "lmetric", "chosen_idx": 3,
"estimated_new_tokens": 500, "routed_to": "http://h:8000"}]
worker_state = [{"request_id": "r2", "workers": [{"idx": 0, "url": "x"}]}]
joined = build_joined_records(metrics, breakdown, worker_state)
assert len(joined) == 2
j_by_id = {r["request_id"]: r for r in joined}
assert j_by_id["r1"]["policy"] == "lmetric"
assert j_by_id["r1"]["chosen_idx"] == 3
assert j_by_id["r1"]["estimated_new_tokens"] == 500
assert j_by_id["r2"]["worker_state_at_decision"][0]["url"] == "x"
assert j_by_id["r2"].get("policy") is None # no breakdown for r2
def test_reuse_decomposition_classifies_intra_and_cross():
records = [
_mk_metric("r1", session_id="A", trace_hash_ids=[11],
cached_tokens=0, t_dispatch_unix=1.0),
_mk_metric("r2", session_id="A", trace_hash_ids=[11],
cached_tokens=100, t_dispatch_unix=2.0),
_mk_metric("r3", session_id="B", trace_hash_ids=[11],
cached_tokens=100, t_dispatch_unix=3.0),
]
out = reuse_decomposition(records)
assert out["status"] == "supported"
assert out["intra_session_tokens"] > 0
assert out["cross_session_tokens"] > 0
fr = out["fractions"]
assert abs(sum(fr.values()) - 1.0) < 1e-9
def test_normalize_worker_maps_port_to_engine_id():
assert _normalize_worker("http://node:8000") == "engine_0"
assert _normalize_worker("http://node:8005/foo") == "engine_5"
assert _normalize_worker("engine_3") == "engine_3"
assert _normalize_worker(None) is None
def test_interference_index_marks_overlap_when_other_request_prefilling():
metrics = [
_mk_metric("decode_target", endpoint_url="http://h:8000",
t_first_token_unix=10.0, t_finish_unix=11.0,
tpot_s=0.10),
_mk_metric("decode_clean", endpoint_url="http://h:8001",
t_first_token_unix=20.0, t_finish_unix=21.0,
tpot_s=0.04),
]
joined = build_joined_records(metrics, [], [])
engine_state = {
"engine_0": [
{"t_unix": 10.5, "prefill_tokens": 8000,
"per_req": [{"rid": "other", "phase": "prefill"}]},
],
"engine_1": [
{"t_unix": 20.5, "prefill_tokens": 0,
"per_req": [{"rid": "decode_clean", "phase": "decode"}]},
],
}
out = interference_index(joined, engine_state)
assert out["status"] == "supported"
assert out["n_overlap_requests"] == 1
assert out["n_clean_requests"] == 1
assert out["interference_index"] is not None
# Overlap p90 = 0.10; clean p90 = 0.04; ratio > 2
assert out["interference_index"] > 2.0
def test_hotspot_index_max_over_median_p90():
"""One hot worker with TTFT 10x the others should drive a >1 index."""
rows = []
for i in range(3):
for _ in range(10):
rows.append({
"request_id": f"x{i}", "routed_to": f"http://h:800{i}",
"endpoint_url": f"http://h:800{i}",
"ttft_s": 0.5 if i < 2 else 5.0, "latency_s": 1.0,
"error": None,
})
out = hotspot_index(rows)
assert out["status"] == "supported"
assert out["hotspot_index_ttft_p90"] > 5.0
def test_label_slow_requests_flags_overlap_and_hot_worker():
metrics = [
_mk_metric("slow_overlap", ttft_s=10.0,
t_first_token_unix=10.0, t_finish_unix=11.0),
_mk_metric("slow_no_overlap", ttft_s=10.0,
endpoint_url="http://h:8005",
t_first_token_unix=20.0, t_finish_unix=21.0),
_mk_metric("fast", ttft_s=0.5,
t_first_token_unix=15.0, t_finish_unix=16.0),
]
metrics[0]["routed_to"] = "http://h:8000"
metrics[1]["routed_to"] = "http://h:8005"
metrics[2]["routed_to"] = "http://h:8000"
bk = [
{"request_id": "slow_overlap", "routed_to": "http://h:8000"},
{"request_id": "slow_no_overlap", "routed_to": "http://h:8005"},
{"request_id": "fast", "routed_to": "http://h:8000"},
]
joined = build_joined_records(metrics, bk, [])
engine_state = {
"engine_0": [{"t_unix": 10.5, "prefill_tokens": 5000,
"per_req": [{"rid": "other", "phase": "prefill"}]}],
}
labels = label_slow_requests(joined, engine_state, slow_ttft_factor=2.0)
by_id = {L["request_id"]: L["label"] for L in labels}
assert by_id.get("slow_overlap") == "same_worker_prefill_overlap"
assert "fast" not in by_id
assert "slow_no_overlap" in by_id
def test_window_summary_buckets_by_dispatch_unix():
run_meta = {
"run_start_unix": 1000.0,
"warmup_end_unix": 1010.0,
"steady_end_unix": 1030.0,
"drain_end_unix": 1040.0,
}
joined = [
_mk_metric("w", t_dispatch_unix=1005.0, ttft_s=0.5, latency_s=1.0,
tpot_s=0.04),
_mk_metric("s", t_dispatch_unix=1020.0, ttft_s=0.6, latency_s=1.5,
tpot_s=0.05),
_mk_metric("d", t_dispatch_unix=1035.0, ttft_s=0.7, latency_s=2.0,
tpot_s=0.06),
]
out = window_summary(joined, run_meta)
assert out["windows"]["warmup"]["attempted"] == 1
assert out["windows"]["steady"]["attempted"] == 1
assert out["windows"]["drain"]["attempted"] == 1
assert out["windows"]["steady"]["ttft_p90_s"] is not None
def test_percentile_helper_handles_singleton():
assert _percentile([5.0], 0.99) == 5.0
assert _percentile([], 0.50) is None