RequestMetrics gains absolute unix timestamps (t_dispatch_unix, t_first_token_unix, t_finish_unix), the proxy_request_id, the chosen endpoint URL, and the trace hash_ids. Replayer sends X-Request-Id: <session_id>:<turn_id>:<chat_id>:<idx> so proxy breakdown rows can be joined to metrics by exact key. Required by Batch 0 (online sequentiality proof) and Batch 1 reuse decomposition; existing metrics.jsonl couldn't establish either. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
"""Per-request metrics collection and summary reporting."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import statistics
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RequestMetrics:
|
|
request_id: str
|
|
session_id: str
|
|
turn_id: int
|
|
trace_timestamp_s: float
|
|
input_length: int
|
|
output_length: int
|
|
request_type: str
|
|
effective_input_length: int | None
|
|
cached_tokens: int
|
|
latency_s: float | None
|
|
ttft_s: float | None
|
|
tpot_s: float | None
|
|
actual_output_tokens: int | None = None
|
|
requested_output_tokens: int | None = None
|
|
finish_reason: str | None = None
|
|
error: str | None = None
|
|
t_dispatch_unix: float | None = None
|
|
t_first_token_unix: float | None = None
|
|
t_finish_unix: float | None = None
|
|
proxy_request_id: str | None = None
|
|
endpoint_url: str | None = None
|
|
trace_hash_ids: tuple[int, ...] = ()
|
|
|
|
|
|
class IncrementalMetricSink:
|
|
"""Append each RequestMetrics to JSONL immediately (crash-safe)."""
|
|
|
|
def __init__(self, path: Path):
|
|
self.path = path
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("")
|
|
self._lock = asyncio.Lock()
|
|
self._fh = path.open("a", encoding="utf-8", buffering=1)
|
|
|
|
async def append(self, metric: RequestMetrics) -> None:
|
|
line = json.dumps(asdict(metric), sort_keys=True) + "\n"
|
|
async with self._lock:
|
|
self._fh.write(line)
|
|
self._fh.flush()
|
|
|
|
def close(self) -> None:
|
|
try:
|
|
self._fh.flush()
|
|
self._fh.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def write_summary_json(path: Path, rows: list[RequestMetrics]) -> None:
|
|
successful = [r for r in rows if r.error is None]
|
|
latencies = [r.latency_s for r in successful if r.latency_s is not None]
|
|
ttfts = [r.ttft_s for r in successful if r.ttft_s is not None]
|
|
tpots = [r.tpot_s for r in successful if r.tpot_s is not None]
|
|
|
|
total_input = sum(r.input_length for r in successful)
|
|
total_cached = sum(r.cached_tokens for r in successful)
|
|
|
|
summary: dict[str, Any] = {
|
|
"request_count": len(rows),
|
|
"success_count": len(successful),
|
|
"error_count": sum(1 for r in rows if r.error is not None),
|
|
"latency_stats_s": _stats(latencies),
|
|
"ttft_stats_s": _stats(ttfts),
|
|
"tpot_stats_s": _stats(tpots),
|
|
"cache_hit_request_count": sum(1 for r in successful if r.cached_tokens > 0),
|
|
"total_input_tokens": total_input,
|
|
"total_cached_tokens": total_cached,
|
|
"prefix_cache_hit_ratio": total_cached / total_input if total_input > 0 else 0.0,
|
|
"cached_tokens_stats": _stats([float(r.cached_tokens) for r in successful]),
|
|
"actual_output_tokens_stats": _stats(
|
|
[float(r.actual_output_tokens) for r in successful
|
|
if r.actual_output_tokens is not None]
|
|
),
|
|
}
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as fh:
|
|
json.dump(summary, fh, indent=2, sort_keys=True)
|
|
|
|
|
|
def _stats(values: list[float | None]) -> dict[str, float] | None:
|
|
clean = [v for v in values if v is not None]
|
|
if not clean:
|
|
return None
|
|
clean.sort()
|
|
return {
|
|
"count": float(len(clean)),
|
|
"mean": statistics.fmean(clean),
|
|
"p50": _percentile(clean, 0.50),
|
|
"p90": _percentile(clean, 0.90),
|
|
"p99": _percentile(clean, 0.99),
|
|
}
|
|
|
|
|
|
def _percentile(sorted_vals: list[float], pct: float) -> float:
|
|
n = len(sorted_vals)
|
|
if n == 1:
|
|
return sorted_vals[0]
|
|
rank = pct * (n - 1)
|
|
lo = int(rank)
|
|
hi = min(lo + 1, n - 1)
|
|
frac = rank - lo
|
|
return sorted_vals[lo] * (1 - frac) + sorted_vals[hi] * frac
|