Files
agentic-pd-hybrid/scripts/analysis/stratified.py
Gahow Wang 4021f27ee2 feat(analysis): stratified latency / TTFT reporter
Implements docs/EVALUATION_PROTOCOL_ZH.md §1.3 (M3 fix):
headline numbers must be accompanied by stratified
breakdowns so reviewers can see which slice the gains
come from.

The script reads one or more request-metrics.jsonl files
and buckets rows along four orthogonal dimensions:
  - turn_id        : {1, 2-5, 6-20, 21+}
  - input_length   : {<=8K, 8K-64K, >64K}
  - overlap_ratio  : {<=0.3, 0.3-0.7, >0.7}
  - append_tokens  : {<=128, 128-1K, 1K-8K, >8K}

Per bucket: n, n_ok, err_pct, latency/ttft mean+p50+p90+p99.
Output is markdown by default, --json for machine read.

stdlib only — no pandas/numpy. Verified on a synthetic
5-row jsonl (turn=1 with one error correctly reports
33.3% err% on the bucket).

Why this script and not pandas:
  - the existing scripts/analysis/* are stdlib-only;
    keeping consistency
  - reviewers can run it on the artifact without
    pip-installing anything beyond pytest
  - speed irrelevant; runs in <1s on the largest existing
    sweep (4449 rows)

Usage shown in EVALUATION_PROTOCOL_ZH §3.
2026-05-12 23:57:13 +08:00

228 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""Stratified latency / TTFT reporter for paper-quality evaluation.
Implements docs/EVALUATION_PROTOCOL_ZH.md §1.3 (M3 fix): every headline
number must be accompanied by a stratified breakdown so reviewers can
see which slice the gains come from.
Buckets the request rows from one or more metrics.jsonl files along:
- turn_id : {1, 2-5, 6-20, 21+}
- input_length : {<=8K, 8K-64K, >64K}
- overlap_ratio : {<=0.3, 0.3-0.7, >0.7}
- append_tokens : input_length - observed_overlap_blocks * BLOCK_SIZE
For each bucket, reports:
- n (total rows in bucket)
- n_ok (rows with no error and latency_s set)
- latency_s mean / p50 / p90 / p99
- ttft_s mean / p50 / p90 / p99
- err_pct (1 - n_ok/n)
Usage:
scripts/analysis/stratified.py outputs/<run>/request-metrics.jsonl \
[outputs/<other-run>/request-metrics.jsonl ...]
scripts/analysis/stratified.py --dim turn_id outputs/<run>/request-metrics.jsonl
scripts/analysis/stratified.py --json outputs/<run>/request-metrics.jsonl > strat.json
stdlib only — no pandas/numpy. Runs without GPU and without SGLang.
"""
from __future__ import annotations
import argparse
import json
import math
import sys
from collections import defaultdict
from pathlib import Path
from typing import Iterable
BLOCK_SIZE = 24 # SGLang radix block, matches docs/KVC_ROUTER_ALGORITHM.md §2
TURN_BUCKETS: list[tuple[str, tuple[int, int]]] = [
("turn=1", (1, 1)),
("turn=2-5", (2, 5)),
("turn=6-20", (6, 20)),
("turn=21+", (21, 10**9)),
]
INPUT_BUCKETS: list[tuple[str, tuple[int, int]]] = [
("input<=8K", (0, 8 * 1024)),
("input=8K-64K", (8 * 1024 + 1, 64 * 1024)),
("input>64K", (64 * 1024 + 1, 10**9)),
]
OVERLAP_BUCKETS: list[tuple[str, tuple[float, float]]] = [
("overlap<=0.3", (0.0, 0.3)),
("overlap=0.3-0.7", (0.3, 0.7)),
("overlap>0.7", (0.7, 1.0001)),
]
APPEND_BUCKETS: list[tuple[str, tuple[int, int]]] = [
("append<=128", (0, 128)),
("append=128-1K", (129, 1024)),
("append=1K-8K", (1025, 8 * 1024)),
("append>8K", (8 * 1024 + 1, 10**9)),
]
DIM_BUCKETS: dict[str, list[tuple[str, tuple]]] = {
"turn_id": TURN_BUCKETS,
"input_length": INPUT_BUCKETS,
"overlap_ratio": OVERLAP_BUCKETS,
"append_tokens": APPEND_BUCKETS,
}
def _quantile(values: list[float], q: float) -> float:
"""Linear-interpolation quantile, stdlib only."""
if not values:
return float("nan")
s = sorted(values)
if len(s) == 1:
return s[0]
pos = (len(s) - 1) * q
lo = math.floor(pos)
hi = math.ceil(pos)
if lo == hi:
return s[lo]
return s[lo] + (s[hi] - s[lo]) * (pos - lo)
def _stats(values: list[float]) -> dict[str, float]:
if not values:
return {"mean": float("nan"), "p50": float("nan"), "p90": float("nan"), "p99": float("nan")}
return {
"mean": sum(values) / len(values),
"p50": _quantile(values, 0.50),
"p90": _quantile(values, 0.90),
"p99": _quantile(values, 0.99),
}
def _bucket_for(value: float | int, buckets: list[tuple[str, tuple]]) -> str:
for label, (lo, hi) in buckets:
if lo <= value <= hi:
return label
return "OOB"
def _classify(row: dict, dim: str) -> str:
if dim == "turn_id":
return _bucket_for(int(row.get("turn_id", 0)), TURN_BUCKETS)
if dim == "input_length":
return _bucket_for(int(row.get("input_length", 0)), INPUT_BUCKETS)
if dim == "overlap_ratio":
inp = max(1, int(row.get("input_length", 0)))
cached = int(row.get("observed_overlap_blocks", 0)) * BLOCK_SIZE
ratio = min(1.0, cached / inp)
return _bucket_for(ratio, OVERLAP_BUCKETS)
if dim == "append_tokens":
inp = int(row.get("input_length", 0))
cached = int(row.get("observed_overlap_blocks", 0)) * BLOCK_SIZE
return _bucket_for(max(0, inp - cached), APPEND_BUCKETS)
raise ValueError(f"Unknown dim: {dim}")
def load_rows(paths: Iterable[Path]) -> list[dict]:
rows: list[dict] = []
for path in paths:
with path.open() as handle:
for line in handle:
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def stratify(rows: list[dict], dim: str) -> dict[str, dict]:
by_bucket: dict[str, list[dict]] = defaultdict(list)
for row in rows:
by_bucket[_classify(row, dim)].append(row)
output: dict[str, dict] = {}
for label, _ in DIM_BUCKETS[dim]:
bucket_rows = by_bucket.get(label, [])
n = len(bucket_rows)
ok = [r for r in bucket_rows if r.get("error") is None and r.get("latency_s") is not None]
n_ok = len(ok)
lat = [float(r["latency_s"]) for r in ok]
ttft = [float(r["ttft_s"]) for r in ok if r.get("ttft_s") is not None]
output[label] = {
"n": n,
"n_ok": n_ok,
"err_pct": (n - n_ok) / n if n else 0.0,
"latency_s": _stats(lat),
"ttft_s": _stats(ttft),
}
return output
def render_table(name: str, stats: dict[str, dict]) -> str:
lines = [
f"## stratified by {name}",
"",
"| bucket | n | n_ok | err% | lat mean | lat p50 | lat p90 | lat p99 | ttft mean | ttft p50 | ttft p90 | ttft p99 |",
"|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|---:|",
]
for label, _ in DIM_BUCKETS[name]:
s = stats[label]
lat = s["latency_s"]
ttft = s["ttft_s"]
lines.append(
"| {label} | {n} | {n_ok} | {err:.1%} | "
"{lm:.3f} | {l50:.3f} | {l90:.3f} | {l99:.3f} | "
"{tm:.3f} | {t50:.3f} | {t90:.3f} | {t99:.3f} |".format(
label=label,
n=s["n"],
n_ok=s["n_ok"],
err=s["err_pct"],
lm=lat["mean"],
l50=lat["p50"],
l90=lat["p90"],
l99=lat["p99"],
tm=ttft["mean"],
t50=ttft["p50"],
t90=ttft["p90"],
t99=ttft["p99"],
)
)
return "\n".join(lines)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__.split("\n\n")[0])
parser.add_argument("metrics_paths", nargs="+", type=Path)
parser.add_argument(
"--dim",
choices=list(DIM_BUCKETS.keys()) + ["all"],
default="all",
help="stratification dimension (default: all four)",
)
parser.add_argument(
"--json",
action="store_true",
help="emit JSON instead of markdown tables",
)
args = parser.parse_args()
rows = load_rows(args.metrics_paths)
if not rows:
print("no rows loaded", file=sys.stderr)
sys.exit(1)
dims = list(DIM_BUCKETS.keys()) if args.dim == "all" else [args.dim]
result = {dim: stratify(rows, dim) for dim in dims}
if args.json:
json.dump(result, sys.stdout, indent=2, default=lambda x: None if isinstance(x, float) and math.isnan(x) else x)
sys.stdout.write("\n")
return
header_paths = ", ".join(str(p) for p in args.metrics_paths)
print(f"# stratified report ({len(rows)} rows from {header_paths})\n")
for dim in dims:
print(render_table(dim, result[dim]))
print()
if __name__ == "__main__":
main()