Files
2026-04-21 15:44:47 +00:00

77 lines
2.4 KiB
Python

from __future__ import annotations
from pathlib import Path
import json
DETAILS_DIR_NAME = "details"
LEGACY_DETAILS_DIR_NAME = "advanced"
DETAILS_SUMMARY_FILENAME = "details_summary.json"
LEGACY_DETAILS_SUMMARY_FILENAME = "advanced_summary.json"
def preferred_details_dir(output_dir: str | Path) -> Path:
return Path(output_dir) / DETAILS_DIR_NAME
def legacy_details_dir(output_dir: str | Path) -> Path:
return Path(output_dir) / LEGACY_DETAILS_DIR_NAME
def resolve_existing_details_dir(output_dir: str | Path) -> Path | None:
preferred = preferred_details_dir(output_dir)
if _details_dir_has_outputs(preferred):
return preferred
legacy = legacy_details_dir(output_dir)
if _details_dir_has_outputs(legacy):
return legacy
if preferred.exists():
return preferred
if legacy.exists():
return legacy
return None
def resolve_details_dir(output_dir: str | Path) -> Path:
existing = resolve_existing_details_dir(output_dir)
if existing is not None:
return existing
return preferred_details_dir(output_dir)
def resolve_details_summary_path(output_dir: str | Path) -> Path | None:
for details_dir in [preferred_details_dir(output_dir), legacy_details_dir(output_dir)]:
for filename in [DETAILS_SUMMARY_FILENAME, LEGACY_DETAILS_SUMMARY_FILENAME]:
path = details_dir / filename
if path.exists():
return path
return None
def details_outputs_exist(output_dir: str | Path) -> bool:
return _details_dir_has_outputs(preferred_details_dir(output_dir)) or _details_dir_has_outputs(
legacy_details_dir(output_dir)
)
def _details_dir_has_outputs(details_dir: Path) -> bool:
if not details_dir.exists():
return False
required_files = [
details_dir / "request_metrics.csv",
details_dir / "theoretical_block_reuse_gaps.csv",
details_dir / "theoretical_block_lifetimes.csv",
details_dir / "theoretical_alive_block_timeline.csv",
details_dir / "session_bucket_boundary_miss.csv",
]
if not all(path.exists() for path in required_files):
return False
summary_path = details_dir / DETAILS_SUMMARY_FILENAME
if not summary_path.exists():
return False
try:
payload = json.loads(summary_path.read_text(encoding="utf-8"))
except Exception:
return False
return int(payload.get("schema_version", 0) or 0) >= 3