Files
agentic-kvc/replayer/metrics.py
Gahow Wang 0ed1ce200e metrics: replace round-based percentile with linear interpolation (B5)
The previous implementation used round((n-1) * pct), which under Python's
banker's rounding returned the upper-middle element on every even-length
array (e.g. p50 of [1,2,3,4] returned 3 instead of 2.5). All summary
JSONs were biased upward at p50 as a result. Match numpy.percentile's
default linear interpolation between the two adjacent sorted values.
2026-05-23 21:00:24 +08:00

112 lines
3.5 KiB
Python

"""Per-request metrics collection and summary reporting."""
from __future__ import annotations
import asyncio
import json
import statistics
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class RequestMetrics:
request_id: str
session_id: str
turn_id: int
trace_timestamp_s: float
input_length: int
output_length: int
request_type: str
effective_input_length: int | None
cached_tokens: int
latency_s: float | None
ttft_s: float | None
tpot_s: float | None
actual_output_tokens: int | None = None
requested_output_tokens: int | None = None
finish_reason: str | None = None
error: str | None = None
class IncrementalMetricSink:
"""Append each RequestMetrics to JSONL immediately (crash-safe)."""
def __init__(self, path: Path):
self.path = path
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("")
self._lock = asyncio.Lock()
self._fh = path.open("a", encoding="utf-8", buffering=1)
async def append(self, metric: RequestMetrics) -> None:
line = json.dumps(asdict(metric), sort_keys=True) + "\n"
async with self._lock:
self._fh.write(line)
self._fh.flush()
def close(self) -> None:
try:
self._fh.flush()
self._fh.close()
except Exception:
pass
def write_summary_json(path: Path, rows: list[RequestMetrics]) -> None:
successful = [r for r in rows if r.error is None]
latencies = [r.latency_s for r in successful if r.latency_s is not None]
ttfts = [r.ttft_s for r in successful if r.ttft_s is not None]
tpots = [r.tpot_s for r in successful if r.tpot_s is not None]
total_input = sum(r.input_length for r in successful)
total_cached = sum(r.cached_tokens for r in successful)
summary: dict[str, Any] = {
"request_count": len(rows),
"success_count": len(successful),
"error_count": sum(1 for r in rows if r.error is not None),
"latency_stats_s": _stats(latencies),
"ttft_stats_s": _stats(ttfts),
"tpot_stats_s": _stats(tpots),
"cache_hit_request_count": sum(1 for r in successful if r.cached_tokens > 0),
"total_input_tokens": total_input,
"total_cached_tokens": total_cached,
"prefix_cache_hit_ratio": total_cached / total_input if total_input > 0 else 0.0,
"cached_tokens_stats": _stats([float(r.cached_tokens) for r in successful]),
"actual_output_tokens_stats": _stats(
[float(r.actual_output_tokens) for r in successful
if r.actual_output_tokens is not None]
),
}
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as fh:
json.dump(summary, fh, indent=2, sort_keys=True)
def _stats(values: list[float | None]) -> dict[str, float] | None:
clean = [v for v in values if v is not None]
if not clean:
return None
clean.sort()
return {
"count": float(len(clean)),
"mean": statistics.fmean(clean),
"p50": _percentile(clean, 0.50),
"p90": _percentile(clean, 0.90),
"p99": _percentile(clean, 0.99),
}
def _percentile(sorted_vals: list[float], pct: float) -> float:
n = len(sorted_vals)
if n == 1:
return sorted_vals[0]
rank = pct * (n - 1)
lo = int(rank)
hi = min(lo + 1, n - 1)
frac = rank - lo
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac