#!/usr/bin/env python3 """Aggregate ReplayServe Frontier run directories into CSV and Markdown.""" from __future__ import annotations import argparse import csv import json from pathlib import Path from typing import Any REPLAYSERVE_ROOT = Path(__file__).resolve().parents[1] FIELDNAMES = [ "suite_id", "sim", "fixture", "config_id", "status", "exit_code", "runtime_seconds", "frontier_mode", "frontier_head", "frontier_dirty", "attn_tp", "attn_dp", "moe_tp", "moe_ep", "batch_size_cap", "max_tokens_in_batch", "block_size", "enable_prefix_caching", "enable_chunked_prefill", "long_prefill_token_threshold", "frontier_block_hit_ratio", "replayserve_token_hit_ratio", "cache_metrics_available", "cache_metrics_unavailable_reason", "cache_metric_rows_complete", "cache_metric_rows_total", "cache_metric_rows_missing", "completion_is_complete", "missing_latency_request_ids", "preemption_events", "preempted_requests", "ttft_mean_ms", "ttft_p50_ms", "ttft_p95_ms", "tpot_mean_ms", "tpot_p50_ms", "tpot_p95_ms", "e2e_mean_ms", "e2e_p50_ms", "e2e_p95_ms", "requests_per_second", "tokens_per_second", "decode_tokens_per_second", "completed_requests", "total_requests", "run_dir", ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Aggregate ReplayServe run outputs.") parser.add_argument("suite_dir", type=Path, help="Run suite directory.") parser.add_argument( "--output-csv", type=Path, help="Output CSV path. Defaults to /summary.csv.", ) parser.add_argument( "--output-md", type=Path, help="Output Markdown path. Defaults to /summary.md.", ) return parser.parse_args() def load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} with path.open("r", encoding="utf-8") as handle: data = json.load(handle) return data if isinstance(data, dict) else {} def read_int(path: Path) -> int | None: try: return int(path.read_text(encoding="utf-8").strip()) except (FileNotFoundError, ValueError): return None def nested(data: dict[str, Any], *keys: str) -> Any: value: Any = data for key in keys: if not isinstance(value, dict): return None value = value.get(key) return value def fmt(value: Any) -> str: if value is None: return "" if isinstance(value, bool): return "true" if value else "false" if isinstance(value, float): return f"{value:.8g}" return str(value) def summarize_run(run_dir: Path) -> dict[str, Any]: manifest = load_json(run_dir / "run_manifest.json") status_json = load_json(run_dir / "run_status.json") post = load_json(run_dir / "postprocess_summary.json") system_metrics_path = post.get("system_metrics") if post else None system_metrics = load_json(Path(system_metrics_path)) if system_metrics_path else {} knobs = manifest.get("knobs", {}) if isinstance(manifest.get("knobs"), dict) else {} frontier = manifest.get("frontier", {}) if isinstance(manifest.get("frontier"), dict) else {} prefix = post.get("prefix_cache_postprocess", {}) if isinstance(post.get("prefix_cache_postprocess"), dict) else {} frontier_block = prefix.get("frontier_block_level", {}) if isinstance(prefix.get("frontier_block_level"), dict) else {} token_weighted = prefix.get("replayserve_token_weighted", {}) if isinstance(prefix.get("replayserve_token_weighted"), dict) else {} missing_rows = prefix.get("rows_with_missing_cache_metrics") or [] if not isinstance(missing_rows, list): missing_rows = [] preemption = post.get("preemption_statistics", {}) if isinstance(post.get("preemption_statistics"), dict) else {} completion = post.get("completion", {}) if isinstance(post.get("completion"), dict) else {} simulation = system_metrics.get("simulation_metadata", {}) if isinstance(system_metrics.get("simulation_metadata"), dict) else {} throughput = system_metrics.get("throughput_metrics", {}) if isinstance(system_metrics.get("throughput_metrics"), dict) else {} exit_code = status_json.get("exit_code") if exit_code is None: exit_code = read_int(run_dir / "exit_code.txt") runtime = status_json.get("runtime_seconds") if runtime is None: runtime = read_int(run_dir / "runtime_seconds.txt") status = status_json.get("status") or ("pass" if exit_code == 0 else "fail") if completion and not completion.get("is_complete", True): status = "incomplete" missing_latency_ids = completion.get("missing_latency_request_ids") or [] if not isinstance(missing_latency_ids, list): missing_latency_ids = [] return { "suite_id": manifest.get("suite_id"), "sim": manifest.get("sim"), "fixture": manifest.get("fixture"), "config_id": manifest.get("config_id"), "status": status, "exit_code": exit_code, "runtime_seconds": runtime, "frontier_mode": frontier.get("mode"), "frontier_head": frontier.get("head"), "frontier_dirty": bool((frontier.get("status_short") or "").strip()), "attn_tp": knobs.get("attn_tensor_parallel_size"), "attn_dp": knobs.get("attn_data_parallel_size"), "moe_tp": knobs.get("moe_tensor_parallel_size"), "moe_ep": knobs.get("moe_expert_parallel_size"), "batch_size_cap": knobs.get("batch_size_cap"), "max_tokens_in_batch": knobs.get("max_tokens_in_batch"), "block_size": knobs.get("block_size"), "enable_prefix_caching": knobs.get("enable_prefix_caching"), "enable_chunked_prefill": knobs.get("enable_chunked_prefill"), "long_prefill_token_threshold": knobs.get("long_prefill_token_threshold"), "frontier_block_hit_ratio": frontier_block.get("hit_ratio"), "replayserve_token_hit_ratio": token_weighted.get("hit_ratio"), "cache_metrics_available": prefix.get("available"), "cache_metrics_unavailable_reason": prefix.get("reason"), "cache_metric_rows_complete": prefix.get("completed_request_rows"), "cache_metric_rows_total": prefix.get("total_request_metric_rows"), "cache_metric_rows_missing": len(missing_rows), "completion_is_complete": completion.get("is_complete"), "missing_latency_request_ids": ",".join(str(value) for value in missing_latency_ids), "preemption_events": preemption.get("total_preemption_events"), "preempted_requests": preemption.get("total_preempted_requests"), "ttft_mean_ms": nested(system_metrics, "ttft_statistics", "mean"), "ttft_p50_ms": nested(system_metrics, "ttft_statistics", "p50"), "ttft_p95_ms": nested(system_metrics, "ttft_statistics", "p95"), "tpot_mean_ms": nested(system_metrics, "tpot_statistics", "mean"), "tpot_p50_ms": nested(system_metrics, "tpot_statistics", "p50"), "tpot_p95_ms": nested(system_metrics, "tpot_statistics", "p95"), "e2e_mean_ms": nested(system_metrics, "request_e2e_time_statistics", "mean"), "e2e_p50_ms": nested(system_metrics, "request_e2e_time_statistics", "p50"), "e2e_p95_ms": nested(system_metrics, "request_e2e_time_statistics", "p95"), "requests_per_second": throughput.get("requests_per_second"), "tokens_per_second": throughput.get("tokens_per_second"), "decode_tokens_per_second": throughput.get("decode_tokens_per_second"), "completed_requests": simulation.get("completed_requests"), "total_requests": simulation.get("total_requests"), "run_dir": str(run_dir), } def write_csv(path: Path, rows: list[dict[str, Any]]) -> None: with path.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=FIELDNAMES) writer.writeheader() for row in rows: writer.writerow({key: fmt(row.get(key)) for key in FIELDNAMES}) def write_markdown(path: Path, rows: list[dict[str, Any]], suite_dir: Path) -> None: columns = [ "config_id", "fixture", "status", "runtime_seconds", "enable_prefix_caching", "enable_chunked_prefill", "frontier_block_hit_ratio", "replayserve_token_hit_ratio", "cache_metric_rows_missing", "completion_is_complete", "preemption_events", "ttft_mean_ms", "tpot_mean_ms", "e2e_mean_ms", "tokens_per_second", ] with path.open("w", encoding="utf-8") as handle: handle.write(f"# Sweep Summary: {suite_dir.name}\n\n") handle.write(f"- Suite dir: `{suite_dir}`\n") handle.write(f"- Runs: `{len(rows)}`\n\n") handle.write("| " + " | ".join(columns) + " |\n") handle.write("|" + "|".join(["---"] * len(columns)) + "|\n") for row in rows: handle.write("| " + " | ".join(fmt(row.get(col)) for col in columns) + " |\n") handle.write("\n") handle.write( "Latency and throughput values are Frontier smoke outputs from the " "configured predictor/profile mode. RS3 tiny smoke uses dummy execution " "time, so these are harness plumbing checks, not performance claims.\n" ) def main() -> int: args = parse_args() suite_dir = args.suite_dir.resolve() run_dirs = sorted(path.parent for path in suite_dir.glob("**/run_manifest.json")) rows = [summarize_run(path) for path in run_dirs] output_csv = args.output_csv or (suite_dir / "summary.csv") output_md = args.output_md or (suite_dir / "summary.md") write_csv(output_csv, rows) write_markdown(output_md, rows, suite_dir) print(f"wrote {output_csv}") print(f"wrote {output_md}") return 0 if __name__ == "__main__": raise SystemExit(main())