Systematic study of prefill-decode disaggregation for agentic LLM workloads using production GLM-5.1 coder trace (2.1M requests, 71B input tokens). Key findings: - Cache-aware routing improves TPOT p90 by 15% and APC from 20.8% to 44.7% without PD separation, matching PD-Sep's decode isolation benefit - PD separation adds +72% TTFT overhead (KV transfer) with no TPOT gain when using the same cache-aware scheduler - Prefill remains compute-bound even at 95% KV cache reuse (AI >1000x vs decode AI <2), but absolute FLOPs drop 71% from cache hits - For agentic MoE workloads, cache-aware routing > PD separation Infrastructure: - Trace sampler preserving session structure + hash_ids for prefix sharing - Async trace replayer with streaming TTFT/TPOT/E2E measurement - Unified cache-aware + token-level load-balanced global scheduler proxy supporting both PD-colocated and PD-disaggregated (Mooncake/RDMA) modes - vLLM 0.18.1 scheduler patch for KV transfer abort race condition - Roofline analysis tool for prefill/decode compute characterization Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
108 lines
3.4 KiB
Python
108 lines
3.4 KiB
Python
"""Per-request metrics collection and summary reporting."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import statistics
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RequestMetrics:
|
|
request_id: str
|
|
session_id: str
|
|
turn_id: int
|
|
trace_timestamp_s: float
|
|
input_length: int
|
|
output_length: int
|
|
request_type: str
|
|
effective_input_length: int | None
|
|
cached_tokens: int
|
|
latency_s: float | None
|
|
ttft_s: float | None
|
|
tpot_s: float | None
|
|
actual_output_tokens: int | None = None
|
|
requested_output_tokens: int | None = None
|
|
finish_reason: str | None = None
|
|
error: str | None = None
|
|
|
|
|
|
class IncrementalMetricSink:
|
|
"""Append each RequestMetrics to JSONL immediately (crash-safe)."""
|
|
|
|
def __init__(self, path: Path):
|
|
self.path = path
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("")
|
|
self._lock = asyncio.Lock()
|
|
self._fh = path.open("a", encoding="utf-8", buffering=1)
|
|
|
|
async def append(self, metric: RequestMetrics) -> None:
|
|
line = json.dumps(asdict(metric), sort_keys=True) + "\n"
|
|
async with self._lock:
|
|
self._fh.write(line)
|
|
self._fh.flush()
|
|
|
|
def close(self) -> None:
|
|
try:
|
|
self._fh.flush()
|
|
self._fh.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def write_summary_json(path: Path, rows: list[RequestMetrics]) -> None:
|
|
successful = [r for r in rows if r.error is None]
|
|
latencies = [r.latency_s for r in successful if r.latency_s is not None]
|
|
ttfts = [r.ttft_s for r in successful if r.ttft_s is not None]
|
|
tpots = [r.tpot_s for r in successful if r.tpot_s is not None]
|
|
|
|
total_input = sum(r.input_length for r in successful)
|
|
total_cached = sum(r.cached_tokens for r in successful)
|
|
|
|
summary: dict[str, Any] = {
|
|
"request_count": len(rows),
|
|
"success_count": len(successful),
|
|
"error_count": sum(1 for r in rows if r.error is not None),
|
|
"latency_stats_s": _stats(latencies),
|
|
"ttft_stats_s": _stats(ttfts),
|
|
"tpot_stats_s": _stats(tpots),
|
|
"cache_hit_request_count": sum(1 for r in successful if r.cached_tokens > 0),
|
|
"total_input_tokens": total_input,
|
|
"total_cached_tokens": total_cached,
|
|
"prefix_cache_hit_ratio": total_cached / total_input if total_input > 0 else 0.0,
|
|
"cached_tokens_stats": _stats([float(r.cached_tokens) for r in successful]),
|
|
"actual_output_tokens_stats": _stats(
|
|
[float(r.actual_output_tokens) for r in successful
|
|
if r.actual_output_tokens is not None]
|
|
),
|
|
}
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as fh:
|
|
json.dump(summary, fh, indent=2, sort_keys=True)
|
|
|
|
|
|
def _stats(values: list[float | None]) -> dict[str, float] | None:
|
|
clean = [v for v in values if v is not None]
|
|
if not clean:
|
|
return None
|
|
clean.sort()
|
|
return {
|
|
"count": float(len(clean)),
|
|
"mean": statistics.fmean(clean),
|
|
"p50": _percentile(clean, 0.50),
|
|
"p90": _percentile(clean, 0.90),
|
|
"p99": _percentile(clean, 0.99),
|
|
}
|
|
|
|
|
|
def _percentile(sorted_vals: list[float], pct: float) -> float:
|
|
if len(sorted_vals) == 1:
|
|
return sorted_vals[0]
|
|
idx = round((len(sorted_vals) - 1) * pct)
|
|
return sorted_vals[idx]
|