Files
replaysim/tools/postprocess_frontier_smoke.py

455 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""Summarize a Frontier RS1 smoke run."""
from __future__ import annotations
import argparse
import csv
import json
import math
import re
import sys
from pathlib import Path
from typing import Any
CACHE_COLUMNS = {
"request_cached_prefill_tokens",
"request_prefix_cache_query_blocks",
"request_prefix_cache_hit_blocks",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Postprocess Frontier smoke output.")
parser.add_argument("--run-dir", required=True, type=Path)
parser.add_argument("--fixture-dir", required=True, type=Path)
return parser.parse_args()
def load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
if not isinstance(data, dict):
raise ValueError(f"{path}: JSON value must be an object")
return data
def load_jsonl(path: Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
stripped = line.strip()
if not stripped:
continue
row = json.loads(stripped)
if not isinstance(row, dict):
raise ValueError(f"{path}: line {line_number}: expected object")
rows.append(row)
return rows
def load_csv(path: Path) -> tuple[list[str], list[dict[str, str]]]:
with path.open("r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle)
return list(reader.fieldnames or []), list(reader)
def find_metrics_dir(run_dir: Path) -> Path:
candidates = sorted(run_dir.glob("frontier_metrics/**/system_metrics.json"))
if len(candidates) != 1:
raise ValueError(
f"{run_dir}: expected exactly one system_metrics.json under "
f"frontier_metrics, found {len(candidates)}"
)
return candidates[0].parent
def read_text_if_exists(path: Path) -> str:
if not path.exists():
return ""
return path.read_text(encoding="utf-8", errors="replace")
def parse_memory_state(log_text: str) -> dict[str, Any]:
matches = re.findall(
r"\[MEMORY_STATE\]\s+total_blocks=(?P<total_blocks>\d+),\s+"
r"max_blocks_per_sequence=(?P<max_blocks_per_sequence>\d+),\s+"
r"max_request_slots=(?P<max_request_slots>[^,]+),\s+"
r"max_batch_size=(?P<max_batch_size>\d+)",
log_text,
)
if not matches:
return {"available": False}
total_blocks, max_blocks_per_sequence, max_request_slots, max_batch_size = matches[-1]
return {
"available": True,
"total_blocks": int(total_blocks),
"max_blocks_per_sequence": int(max_blocks_per_sequence),
"max_request_slots": max_request_slots,
"max_batch_size": int(max_batch_size),
"source": "last [MEMORY_STATE] log line",
}
def extract_scheduler_config(config: dict[str, Any]) -> dict[str, Any]:
cluster = config.get("cluster_config")
if not isinstance(cluster, dict):
return {}
scheduler = cluster.get("replica_scheduler_config")
return scheduler if isinstance(scheduler, dict) else {}
def extract_replica_config(config: dict[str, Any]) -> dict[str, Any]:
cluster = config.get("cluster_config")
if not isinstance(cluster, dict):
return {}
replica = cluster.get("replica_config")
return replica if isinstance(replica, dict) else {}
def compute_token_weighted_cache(
request_metrics_path: Path,
sidecar_path: Path,
) -> dict[str, Any]:
fieldnames, rows = load_csv(request_metrics_path)
missing = sorted(CACHE_COLUMNS - set(fieldnames))
if missing:
return {
"available": False,
"reason": f"request_metrics.csv missing cache columns: {missing}",
}
sidecar_by_id = {int(row["request_id"]): row for row in load_jsonl(sidecar_path)}
total_query_blocks = 0
total_hit_blocks = 0
total_query_tokens = 0
total_hit_tokens = 0
total_frontier_cached_tokens = 0
completed_rows = 0
rows_with_missing_cache_metrics: list[int] = []
for row in rows:
request_id = int(float(row["Request Id"]))
sidecar = sidecar_by_id.get(request_id)
if sidecar is None:
raise ValueError(f"request_metrics.csv contains unknown request id {request_id}")
cache_values = [
row["request_prefix_cache_query_blocks"],
row["request_prefix_cache_hit_blocks"],
row["request_cached_prefill_tokens"],
]
if any(value == "" for value in cache_values):
rows_with_missing_cache_metrics.append(request_id)
continue
query_blocks = int(float(cache_values[0]))
hit_blocks = int(float(cache_values[1]))
cached_prefill_tokens = int(float(cache_values[2]))
block_token_counts = [int(value) for value in sidecar["block_token_counts"]]
input_length = int(sidecar["input_length"])
if query_blocks != len(block_token_counts):
raise ValueError(
f"request {request_id}: query_blocks={query_blocks} does not match "
f"sidecar blocks={len(block_token_counts)}"
)
if hit_blocks > query_blocks:
raise ValueError(
f"request {request_id}: hit_blocks={hit_blocks} > query_blocks={query_blocks}"
)
total_query_blocks += query_blocks
total_hit_blocks += hit_blocks
total_query_tokens += input_length
total_hit_tokens += sum(block_token_counts[:hit_blocks])
total_frontier_cached_tokens += cached_prefill_tokens
completed_rows += 1
if completed_rows == 0:
return {
"available": False,
"reason": "no request rows had complete prefix-cache metrics",
"rows_with_missing_cache_metrics": rows_with_missing_cache_metrics,
}
return {
"available": True,
"completed_request_rows": completed_rows,
"total_request_metric_rows": len(rows),
"rows_with_missing_cache_metrics": rows_with_missing_cache_metrics,
"frontier_block_level": {
"total_query_blocks": total_query_blocks,
"total_hit_blocks": total_hit_blocks,
"hit_ratio": (
total_hit_blocks / total_query_blocks if total_query_blocks else 0.0
),
"total_cached_prefill_tokens_frontier_whole_block": total_frontier_cached_tokens,
},
"replayserve_token_weighted": {
"total_query_tokens": total_query_tokens,
"total_hit_tokens": total_hit_tokens,
"hit_ratio": (
total_hit_tokens / total_query_tokens if total_query_tokens else 0.0
),
},
"semantics": (
"Frontier reports whole-block hits; ReplayServe weights the first "
"hit_blocks sidecar block_token_counts, so partial final blocks count "
"by their true token length when they are hit."
),
}
def compute_completion_summary(
system_metrics: dict[str, Any],
request_metrics_path: Path,
) -> dict[str, Any]:
fieldnames, rows = load_csv(request_metrics_path)
missing_latency_rows: list[int] = []
if "Request Id" in fieldnames and "request_e2e_time" in fieldnames:
for row in rows:
if row.get("request_e2e_time", "") == "":
missing_latency_rows.append(int(float(row["Request Id"])))
metadata = system_metrics.get("simulation_metadata", {})
total_requests = int(metadata.get("total_requests") or len(rows))
completed_requests = int(metadata.get("completed_requests") or 0)
is_complete = (
total_requests > 0
and completed_requests == total_requests
and not missing_latency_rows
)
return {
"is_complete": is_complete,
"total_requests": total_requests,
"completed_requests": completed_requests,
"request_metric_rows": len(rows),
"missing_latency_request_ids": missing_latency_rows,
}
def get_nested(data: dict[str, Any], *keys: str) -> Any:
value: Any = data
for key in keys:
if not isinstance(value, dict):
return None
value = value.get(key)
return value
def estimate_memory_planner_blocks(
*,
config: dict[str, Any],
scheduler_config: dict[str, Any],
model_weight_memory: dict[str, Any] | None,
) -> dict[str, Any]:
replica_config = extract_replica_config(config)
model_config = replica_config.get("model_config")
device_config = replica_config.get("device_config")
if (
not isinstance(model_config, dict)
or not isinstance(device_config, dict)
or not isinstance(model_weight_memory, dict)
):
return {"available": False, "reason": "missing model/device/weight config"}
block_size = int(scheduler_config.get("block_size", 0))
if block_size <= 0:
return {"available": False, "reason": "missing positive block_size"}
total_memory_gb = float(device_config["total_memory_gb"])
gpu_memory_utilization = scheduler_config.get("gpu_memory_utilization")
if gpu_memory_utilization is None:
gpu_memory_utilization = 1.0 - float(replica_config.get("memory_margin_fraction", 0.1))
gpu_memory_utilization = float(gpu_memory_utilization)
parameter_memory_bytes = int(model_weight_memory["total_memory_bytes"])
overhead_bytes = int(scheduler_config.get("non_kv_cache_overhead_bytes") or 0)
requested_memory_bytes = int(total_memory_gb * 1024**3 * gpu_memory_utilization)
available_kv_cache_memory_bytes = (
requested_memory_bytes - parameter_memory_bytes - overhead_bytes
)
embedding_dim = int(model_config["embedding_dim"])
num_q_heads = int(model_config["num_q_heads"])
head_dim = model_config.get("head_dim")
if head_dim is None:
head_dim = embedding_dim // num_q_heads
head_dim = int(head_dim)
num_kv_heads = int(model_config["num_kv_heads"])
attn_tp = int(replica_config["attn_tensor_parallel_size"])
kv_heads_per_tensor_parallel_worker = math.ceil(num_kv_heads / attn_tp)
num_layers = int(model_config["num_layers"])
page_size_bytes_per_layer_per_block = (
2 * 2 * block_size * kv_heads_per_tensor_parallel_worker * head_dim
)
if available_kv_cache_memory_bytes <= 0 or page_size_bytes_per_layer_per_block <= 0:
derived_num_blocks = 0
else:
derived_num_blocks = int(
available_kv_cache_memory_bytes
// page_size_bytes_per_layer_per_block
// num_layers
)
return {
"available": True,
"source": "ReplayServe fallback using Frontier MemoryPlanner.get_num_blocks formula",
"total_blocks": derived_num_blocks,
"requested_memory_bytes": requested_memory_bytes,
"parameter_memory_per_device_bytes": parameter_memory_bytes,
"non_kv_cache_overhead_bytes": overhead_bytes,
"available_kv_cache_memory_bytes": available_kv_cache_memory_bytes,
"block_size": block_size,
"num_layers": num_layers,
"head_dim": head_dim,
"num_kv_heads": num_kv_heads,
"attn_tensor_parallel_size": attn_tp,
"kv_heads_per_tensor_parallel_worker": kv_heads_per_tensor_parallel_worker,
"page_size_bytes_per_layer_per_block": page_size_bytes_per_layer_per_block,
"gpu_memory_utilization": gpu_memory_utilization,
"total_memory_gb": total_memory_gb,
}
def main() -> int:
args = parse_args()
try:
run_dir = args.run_dir
fixture_dir = args.fixture_dir
metrics_dir = find_metrics_dir(run_dir)
system_metrics_path = metrics_dir / "system_metrics.json"
request_metrics_path = metrics_dir / "request_metrics.csv"
config_path = metrics_dir / "config.json"
sidecar_path = fixture_dir / "sidecar.jsonl"
system_metrics = load_json(system_metrics_path)
config = load_json(config_path)
scheduler_config = extract_scheduler_config(config)
log_text = (
read_text_if_exists(run_dir / "stdout.log")
+ "\n"
+ read_text_if_exists(run_dir / "stderr.log")
)
memory_state = parse_memory_state(log_text)
completion_summary = compute_completion_summary(
system_metrics, request_metrics_path
)
cache_summary = compute_token_weighted_cache(request_metrics_path, sidecar_path)
model_weight_memory = get_nested(system_metrics, "model_weight_memory", "MONOLITHIC")
if not memory_state.get("available"):
memory_state = estimate_memory_planner_blocks(
config=config,
scheduler_config=scheduler_config,
model_weight_memory=model_weight_memory,
)
preemption_statistics = system_metrics.get("preemption_statistics", {})
allocation_pressure_lines = [
line
for line in log_text.splitlines()
if re.search(
r"preempt|insufficient|cannot allocate|allocation pressure|oom",
line,
flags=re.IGNORECASE,
)
]
summary = {
"run_dir": str(run_dir),
"fixture_dir": str(fixture_dir),
"metrics_dir": str(metrics_dir),
"system_metrics": str(system_metrics_path),
"request_metrics": str(request_metrics_path),
"config": str(config_path),
"frontier_prefix_cache_statistics": system_metrics.get(
"prefix_cache_statistics"
),
"completion": completion_summary,
"prefix_cache_postprocess": cache_summary,
"memory_planner": {
"mode": scheduler_config.get("num_blocks_mode"),
"gpu_memory_utilization": scheduler_config.get(
"gpu_memory_utilization"
),
"non_kv_cache_overhead_bytes": scheduler_config.get(
"non_kv_cache_overhead_bytes"
),
"derived": memory_state,
"model_weight_memory_monolithic": model_weight_memory,
"assumption": (
"RS1 uses Frontier memory_planner with analytical parameter "
"memory and non_kv_cache_overhead_bytes=0 for plumbing smoke."
),
},
"preemption_statistics": preemption_statistics,
"allocation_pressure_log_line_count": len(allocation_pressure_lines),
"allocation_pressure_log_excerpt": allocation_pressure_lines[:20],
}
output_json = run_dir / "postprocess_summary.json"
output_md = run_dir / "postprocess_summary.md"
with output_json.open("w", encoding="utf-8") as handle:
json.dump(summary, handle, indent=2, sort_keys=True)
handle.write("\n")
cache = summary["prefix_cache_postprocess"]
mem = summary["memory_planner"]
with output_md.open("w", encoding="utf-8") as handle:
handle.write(f"# RS1 Frontier Smoke: {fixture_dir.name}\n\n")
handle.write(f"- Metrics dir: `{metrics_dir}`\n")
handle.write(f"- Frontier system metrics: `{system_metrics_path}`\n")
handle.write(f"- Frontier request metrics: `{request_metrics_path}`\n")
handle.write(
"- Completion: "
f"`{completion_summary['completed_requests']}/"
f"{completion_summary['total_requests']}`\n"
)
missing_latency_rows = completion_summary.get("missing_latency_request_ids") or []
if missing_latency_rows:
handle.write(
"- Missing latency request rows: "
f"`{missing_latency_rows}`\n"
)
if cache.get("available"):
frontier_ratio = cache["frontier_block_level"]["hit_ratio"]
token_ratio = cache["replayserve_token_weighted"]["hit_ratio"]
handle.write(f"- Frontier block-level prefix hit ratio: {frontier_ratio:.8f}\n")
handle.write(f"- ReplayServe token-weighted prefix hit ratio: {token_ratio:.8f}\n")
missing_cache_rows = cache.get("rows_with_missing_cache_metrics") or []
if missing_cache_rows:
handle.write(
"- Prefix-cache metric rows skipped: "
f"`{missing_cache_rows}`\n"
)
else:
handle.write(f"- Prefix cache postprocess unavailable: {cache.get('reason')}\n")
derived = mem.get("derived", {})
handle.write(f"- Memory planner mode: `{mem.get('mode')}`\n")
handle.write(f"- GPU memory utilization: `{mem.get('gpu_memory_utilization')}`\n")
handle.write(
f"- Non-KV overhead bytes assumption: `{mem.get('non_kv_cache_overhead_bytes')}`\n"
)
if derived.get("available"):
handle.write(f"- Derived KV blocks: `{derived.get('total_blocks')}`\n")
handle.write(f"- Max batch size: `{derived.get('max_batch_size', 'n/a')}`\n")
else:
handle.write("- Derived KV blocks: not found in logs\n")
preemptions = preemption_statistics.get("total_preemption_events")
handle.write(f"- Total preemption events: `{preemptions}`\n")
handle.write(
f"- Allocation/preemption/OOM log lines: `{len(allocation_pressure_lines)}`\n"
)
except Exception as exc:
print(f"postprocess_frontier_smoke.py: error: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())