455 lines
18 KiB
Python
Executable File
455 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Summarize a Frontier RS1 smoke run."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import math
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
CACHE_COLUMNS = {
|
|
"request_cached_prefill_tokens",
|
|
"request_prefix_cache_query_blocks",
|
|
"request_prefix_cache_hit_blocks",
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Postprocess Frontier smoke output.")
|
|
parser.add_argument("--run-dir", required=True, type=Path)
|
|
parser.add_argument("--fixture-dir", required=True, type=Path)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
data = json.load(handle)
|
|
if not isinstance(data, dict):
|
|
raise ValueError(f"{path}: JSON value must be an object")
|
|
return data
|
|
|
|
|
|
def load_jsonl(path: Path) -> list[dict[str, Any]]:
|
|
rows: list[dict[str, Any]] = []
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
for line_number, line in enumerate(handle, start=1):
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
row = json.loads(stripped)
|
|
if not isinstance(row, dict):
|
|
raise ValueError(f"{path}: line {line_number}: expected object")
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def load_csv(path: Path) -> tuple[list[str], list[dict[str, str]]]:
|
|
with path.open("r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle)
|
|
return list(reader.fieldnames or []), list(reader)
|
|
|
|
|
|
def find_metrics_dir(run_dir: Path) -> Path:
|
|
candidates = sorted(run_dir.glob("frontier_metrics/**/system_metrics.json"))
|
|
if len(candidates) != 1:
|
|
raise ValueError(
|
|
f"{run_dir}: expected exactly one system_metrics.json under "
|
|
f"frontier_metrics, found {len(candidates)}"
|
|
)
|
|
return candidates[0].parent
|
|
|
|
|
|
def read_text_if_exists(path: Path) -> str:
|
|
if not path.exists():
|
|
return ""
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
|
|
|
|
def parse_memory_state(log_text: str) -> dict[str, Any]:
|
|
matches = re.findall(
|
|
r"\[MEMORY_STATE\]\s+total_blocks=(?P<total_blocks>\d+),\s+"
|
|
r"max_blocks_per_sequence=(?P<max_blocks_per_sequence>\d+),\s+"
|
|
r"max_request_slots=(?P<max_request_slots>[^,]+),\s+"
|
|
r"max_batch_size=(?P<max_batch_size>\d+)",
|
|
log_text,
|
|
)
|
|
if not matches:
|
|
return {"available": False}
|
|
total_blocks, max_blocks_per_sequence, max_request_slots, max_batch_size = matches[-1]
|
|
return {
|
|
"available": True,
|
|
"total_blocks": int(total_blocks),
|
|
"max_blocks_per_sequence": int(max_blocks_per_sequence),
|
|
"max_request_slots": max_request_slots,
|
|
"max_batch_size": int(max_batch_size),
|
|
"source": "last [MEMORY_STATE] log line",
|
|
}
|
|
|
|
|
|
def extract_scheduler_config(config: dict[str, Any]) -> dict[str, Any]:
|
|
cluster = config.get("cluster_config")
|
|
if not isinstance(cluster, dict):
|
|
return {}
|
|
scheduler = cluster.get("replica_scheduler_config")
|
|
return scheduler if isinstance(scheduler, dict) else {}
|
|
|
|
|
|
def extract_replica_config(config: dict[str, Any]) -> dict[str, Any]:
|
|
cluster = config.get("cluster_config")
|
|
if not isinstance(cluster, dict):
|
|
return {}
|
|
replica = cluster.get("replica_config")
|
|
return replica if isinstance(replica, dict) else {}
|
|
|
|
|
|
def compute_token_weighted_cache(
|
|
request_metrics_path: Path,
|
|
sidecar_path: Path,
|
|
) -> dict[str, Any]:
|
|
fieldnames, rows = load_csv(request_metrics_path)
|
|
missing = sorted(CACHE_COLUMNS - set(fieldnames))
|
|
if missing:
|
|
return {
|
|
"available": False,
|
|
"reason": f"request_metrics.csv missing cache columns: {missing}",
|
|
}
|
|
|
|
sidecar_by_id = {int(row["request_id"]): row for row in load_jsonl(sidecar_path)}
|
|
total_query_blocks = 0
|
|
total_hit_blocks = 0
|
|
total_query_tokens = 0
|
|
total_hit_tokens = 0
|
|
total_frontier_cached_tokens = 0
|
|
completed_rows = 0
|
|
rows_with_missing_cache_metrics: list[int] = []
|
|
|
|
for row in rows:
|
|
request_id = int(float(row["Request Id"]))
|
|
sidecar = sidecar_by_id.get(request_id)
|
|
if sidecar is None:
|
|
raise ValueError(f"request_metrics.csv contains unknown request id {request_id}")
|
|
|
|
cache_values = [
|
|
row["request_prefix_cache_query_blocks"],
|
|
row["request_prefix_cache_hit_blocks"],
|
|
row["request_cached_prefill_tokens"],
|
|
]
|
|
if any(value == "" for value in cache_values):
|
|
rows_with_missing_cache_metrics.append(request_id)
|
|
continue
|
|
|
|
query_blocks = int(float(cache_values[0]))
|
|
hit_blocks = int(float(cache_values[1]))
|
|
cached_prefill_tokens = int(float(cache_values[2]))
|
|
block_token_counts = [int(value) for value in sidecar["block_token_counts"]]
|
|
input_length = int(sidecar["input_length"])
|
|
|
|
if query_blocks != len(block_token_counts):
|
|
raise ValueError(
|
|
f"request {request_id}: query_blocks={query_blocks} does not match "
|
|
f"sidecar blocks={len(block_token_counts)}"
|
|
)
|
|
if hit_blocks > query_blocks:
|
|
raise ValueError(
|
|
f"request {request_id}: hit_blocks={hit_blocks} > query_blocks={query_blocks}"
|
|
)
|
|
|
|
total_query_blocks += query_blocks
|
|
total_hit_blocks += hit_blocks
|
|
total_query_tokens += input_length
|
|
total_hit_tokens += sum(block_token_counts[:hit_blocks])
|
|
total_frontier_cached_tokens += cached_prefill_tokens
|
|
completed_rows += 1
|
|
|
|
if completed_rows == 0:
|
|
return {
|
|
"available": False,
|
|
"reason": "no request rows had complete prefix-cache metrics",
|
|
"rows_with_missing_cache_metrics": rows_with_missing_cache_metrics,
|
|
}
|
|
|
|
return {
|
|
"available": True,
|
|
"completed_request_rows": completed_rows,
|
|
"total_request_metric_rows": len(rows),
|
|
"rows_with_missing_cache_metrics": rows_with_missing_cache_metrics,
|
|
"frontier_block_level": {
|
|
"total_query_blocks": total_query_blocks,
|
|
"total_hit_blocks": total_hit_blocks,
|
|
"hit_ratio": (
|
|
total_hit_blocks / total_query_blocks if total_query_blocks else 0.0
|
|
),
|
|
"total_cached_prefill_tokens_frontier_whole_block": total_frontier_cached_tokens,
|
|
},
|
|
"replayserve_token_weighted": {
|
|
"total_query_tokens": total_query_tokens,
|
|
"total_hit_tokens": total_hit_tokens,
|
|
"hit_ratio": (
|
|
total_hit_tokens / total_query_tokens if total_query_tokens else 0.0
|
|
),
|
|
},
|
|
"semantics": (
|
|
"Frontier reports whole-block hits; ReplayServe weights the first "
|
|
"hit_blocks sidecar block_token_counts, so partial final blocks count "
|
|
"by their true token length when they are hit."
|
|
),
|
|
}
|
|
|
|
|
|
def compute_completion_summary(
|
|
system_metrics: dict[str, Any],
|
|
request_metrics_path: Path,
|
|
) -> dict[str, Any]:
|
|
fieldnames, rows = load_csv(request_metrics_path)
|
|
missing_latency_rows: list[int] = []
|
|
if "Request Id" in fieldnames and "request_e2e_time" in fieldnames:
|
|
for row in rows:
|
|
if row.get("request_e2e_time", "") == "":
|
|
missing_latency_rows.append(int(float(row["Request Id"])))
|
|
|
|
metadata = system_metrics.get("simulation_metadata", {})
|
|
total_requests = int(metadata.get("total_requests") or len(rows))
|
|
completed_requests = int(metadata.get("completed_requests") or 0)
|
|
is_complete = (
|
|
total_requests > 0
|
|
and completed_requests == total_requests
|
|
and not missing_latency_rows
|
|
)
|
|
|
|
return {
|
|
"is_complete": is_complete,
|
|
"total_requests": total_requests,
|
|
"completed_requests": completed_requests,
|
|
"request_metric_rows": len(rows),
|
|
"missing_latency_request_ids": missing_latency_rows,
|
|
}
|
|
|
|
|
|
def get_nested(data: dict[str, Any], *keys: str) -> Any:
|
|
value: Any = data
|
|
for key in keys:
|
|
if not isinstance(value, dict):
|
|
return None
|
|
value = value.get(key)
|
|
return value
|
|
|
|
|
|
def estimate_memory_planner_blocks(
|
|
*,
|
|
config: dict[str, Any],
|
|
scheduler_config: dict[str, Any],
|
|
model_weight_memory: dict[str, Any] | None,
|
|
) -> dict[str, Any]:
|
|
replica_config = extract_replica_config(config)
|
|
model_config = replica_config.get("model_config")
|
|
device_config = replica_config.get("device_config")
|
|
if (
|
|
not isinstance(model_config, dict)
|
|
or not isinstance(device_config, dict)
|
|
or not isinstance(model_weight_memory, dict)
|
|
):
|
|
return {"available": False, "reason": "missing model/device/weight config"}
|
|
|
|
block_size = int(scheduler_config.get("block_size", 0))
|
|
if block_size <= 0:
|
|
return {"available": False, "reason": "missing positive block_size"}
|
|
|
|
total_memory_gb = float(device_config["total_memory_gb"])
|
|
gpu_memory_utilization = scheduler_config.get("gpu_memory_utilization")
|
|
if gpu_memory_utilization is None:
|
|
gpu_memory_utilization = 1.0 - float(replica_config.get("memory_margin_fraction", 0.1))
|
|
gpu_memory_utilization = float(gpu_memory_utilization)
|
|
|
|
parameter_memory_bytes = int(model_weight_memory["total_memory_bytes"])
|
|
overhead_bytes = int(scheduler_config.get("non_kv_cache_overhead_bytes") or 0)
|
|
requested_memory_bytes = int(total_memory_gb * 1024**3 * gpu_memory_utilization)
|
|
available_kv_cache_memory_bytes = (
|
|
requested_memory_bytes - parameter_memory_bytes - overhead_bytes
|
|
)
|
|
|
|
embedding_dim = int(model_config["embedding_dim"])
|
|
num_q_heads = int(model_config["num_q_heads"])
|
|
head_dim = model_config.get("head_dim")
|
|
if head_dim is None:
|
|
head_dim = embedding_dim // num_q_heads
|
|
head_dim = int(head_dim)
|
|
num_kv_heads = int(model_config["num_kv_heads"])
|
|
attn_tp = int(replica_config["attn_tensor_parallel_size"])
|
|
kv_heads_per_tensor_parallel_worker = math.ceil(num_kv_heads / attn_tp)
|
|
num_layers = int(model_config["num_layers"])
|
|
page_size_bytes_per_layer_per_block = (
|
|
2 * 2 * block_size * kv_heads_per_tensor_parallel_worker * head_dim
|
|
)
|
|
if available_kv_cache_memory_bytes <= 0 or page_size_bytes_per_layer_per_block <= 0:
|
|
derived_num_blocks = 0
|
|
else:
|
|
derived_num_blocks = int(
|
|
available_kv_cache_memory_bytes
|
|
// page_size_bytes_per_layer_per_block
|
|
// num_layers
|
|
)
|
|
|
|
return {
|
|
"available": True,
|
|
"source": "ReplayServe fallback using Frontier MemoryPlanner.get_num_blocks formula",
|
|
"total_blocks": derived_num_blocks,
|
|
"requested_memory_bytes": requested_memory_bytes,
|
|
"parameter_memory_per_device_bytes": parameter_memory_bytes,
|
|
"non_kv_cache_overhead_bytes": overhead_bytes,
|
|
"available_kv_cache_memory_bytes": available_kv_cache_memory_bytes,
|
|
"block_size": block_size,
|
|
"num_layers": num_layers,
|
|
"head_dim": head_dim,
|
|
"num_kv_heads": num_kv_heads,
|
|
"attn_tensor_parallel_size": attn_tp,
|
|
"kv_heads_per_tensor_parallel_worker": kv_heads_per_tensor_parallel_worker,
|
|
"page_size_bytes_per_layer_per_block": page_size_bytes_per_layer_per_block,
|
|
"gpu_memory_utilization": gpu_memory_utilization,
|
|
"total_memory_gb": total_memory_gb,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
try:
|
|
run_dir = args.run_dir
|
|
fixture_dir = args.fixture_dir
|
|
metrics_dir = find_metrics_dir(run_dir)
|
|
system_metrics_path = metrics_dir / "system_metrics.json"
|
|
request_metrics_path = metrics_dir / "request_metrics.csv"
|
|
config_path = metrics_dir / "config.json"
|
|
sidecar_path = fixture_dir / "sidecar.jsonl"
|
|
|
|
system_metrics = load_json(system_metrics_path)
|
|
config = load_json(config_path)
|
|
scheduler_config = extract_scheduler_config(config)
|
|
log_text = (
|
|
read_text_if_exists(run_dir / "stdout.log")
|
|
+ "\n"
|
|
+ read_text_if_exists(run_dir / "stderr.log")
|
|
)
|
|
memory_state = parse_memory_state(log_text)
|
|
completion_summary = compute_completion_summary(
|
|
system_metrics, request_metrics_path
|
|
)
|
|
cache_summary = compute_token_weighted_cache(request_metrics_path, sidecar_path)
|
|
|
|
model_weight_memory = get_nested(system_metrics, "model_weight_memory", "MONOLITHIC")
|
|
if not memory_state.get("available"):
|
|
memory_state = estimate_memory_planner_blocks(
|
|
config=config,
|
|
scheduler_config=scheduler_config,
|
|
model_weight_memory=model_weight_memory,
|
|
)
|
|
preemption_statistics = system_metrics.get("preemption_statistics", {})
|
|
allocation_pressure_lines = [
|
|
line
|
|
for line in log_text.splitlines()
|
|
if re.search(
|
|
r"preempt|insufficient|cannot allocate|allocation pressure|oom",
|
|
line,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
]
|
|
|
|
summary = {
|
|
"run_dir": str(run_dir),
|
|
"fixture_dir": str(fixture_dir),
|
|
"metrics_dir": str(metrics_dir),
|
|
"system_metrics": str(system_metrics_path),
|
|
"request_metrics": str(request_metrics_path),
|
|
"config": str(config_path),
|
|
"frontier_prefix_cache_statistics": system_metrics.get(
|
|
"prefix_cache_statistics"
|
|
),
|
|
"completion": completion_summary,
|
|
"prefix_cache_postprocess": cache_summary,
|
|
"memory_planner": {
|
|
"mode": scheduler_config.get("num_blocks_mode"),
|
|
"gpu_memory_utilization": scheduler_config.get(
|
|
"gpu_memory_utilization"
|
|
),
|
|
"non_kv_cache_overhead_bytes": scheduler_config.get(
|
|
"non_kv_cache_overhead_bytes"
|
|
),
|
|
"derived": memory_state,
|
|
"model_weight_memory_monolithic": model_weight_memory,
|
|
"assumption": (
|
|
"RS1 uses Frontier memory_planner with analytical parameter "
|
|
"memory and non_kv_cache_overhead_bytes=0 for plumbing smoke."
|
|
),
|
|
},
|
|
"preemption_statistics": preemption_statistics,
|
|
"allocation_pressure_log_line_count": len(allocation_pressure_lines),
|
|
"allocation_pressure_log_excerpt": allocation_pressure_lines[:20],
|
|
}
|
|
|
|
output_json = run_dir / "postprocess_summary.json"
|
|
output_md = run_dir / "postprocess_summary.md"
|
|
with output_json.open("w", encoding="utf-8") as handle:
|
|
json.dump(summary, handle, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
|
|
cache = summary["prefix_cache_postprocess"]
|
|
mem = summary["memory_planner"]
|
|
with output_md.open("w", encoding="utf-8") as handle:
|
|
handle.write(f"# RS1 Frontier Smoke: {fixture_dir.name}\n\n")
|
|
handle.write(f"- Metrics dir: `{metrics_dir}`\n")
|
|
handle.write(f"- Frontier system metrics: `{system_metrics_path}`\n")
|
|
handle.write(f"- Frontier request metrics: `{request_metrics_path}`\n")
|
|
handle.write(
|
|
"- Completion: "
|
|
f"`{completion_summary['completed_requests']}/"
|
|
f"{completion_summary['total_requests']}`\n"
|
|
)
|
|
missing_latency_rows = completion_summary.get("missing_latency_request_ids") or []
|
|
if missing_latency_rows:
|
|
handle.write(
|
|
"- Missing latency request rows: "
|
|
f"`{missing_latency_rows}`\n"
|
|
)
|
|
if cache.get("available"):
|
|
frontier_ratio = cache["frontier_block_level"]["hit_ratio"]
|
|
token_ratio = cache["replayserve_token_weighted"]["hit_ratio"]
|
|
handle.write(f"- Frontier block-level prefix hit ratio: {frontier_ratio:.8f}\n")
|
|
handle.write(f"- ReplayServe token-weighted prefix hit ratio: {token_ratio:.8f}\n")
|
|
missing_cache_rows = cache.get("rows_with_missing_cache_metrics") or []
|
|
if missing_cache_rows:
|
|
handle.write(
|
|
"- Prefix-cache metric rows skipped: "
|
|
f"`{missing_cache_rows}`\n"
|
|
)
|
|
else:
|
|
handle.write(f"- Prefix cache postprocess unavailable: {cache.get('reason')}\n")
|
|
derived = mem.get("derived", {})
|
|
handle.write(f"- Memory planner mode: `{mem.get('mode')}`\n")
|
|
handle.write(f"- GPU memory utilization: `{mem.get('gpu_memory_utilization')}`\n")
|
|
handle.write(
|
|
f"- Non-KV overhead bytes assumption: `{mem.get('non_kv_cache_overhead_bytes')}`\n"
|
|
)
|
|
if derived.get("available"):
|
|
handle.write(f"- Derived KV blocks: `{derived.get('total_blocks')}`\n")
|
|
handle.write(f"- Max batch size: `{derived.get('max_batch_size', 'n/a')}`\n")
|
|
else:
|
|
handle.write("- Derived KV blocks: not found in logs\n")
|
|
preemptions = preemption_statistics.get("total_preemption_events")
|
|
handle.write(f"- Total preemption events: `{preemptions}`\n")
|
|
handle.write(
|
|
f"- Allocation/preemption/OOM log lines: `{len(allocation_pressure_lines)}`\n"
|
|
)
|
|
|
|
except Exception as exc:
|
|
print(f"postprocess_frontier_smoke.py: error: {exc}", file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|