Files

548 lines
20 KiB
Python

import argparse
import json
from pathlib import Path
from tqdm.auto import tqdm
from .figures import render_figures
from .features import compute_features
from .layout import details_outputs_exist
from .parser import default_output_dir, infer_analysis_dataset_name, load_records, path_looks_like_release_trace
from .preparation import stream_prepare
from .report import write_features, write_normalized, write_report
from .reporting import write_reports
from .resume_advanced import collect_existing_detail_paths, run_advanced_from_existing
from .study import parse_input_length_bucket_thresholds, run_study
ANALYSIS_PROVENANCE_FILENAME = "analysis_provenance.json"
ANALYSIS_PROVENANCE_SCHEMA_VERSION = 1
def build_parser():
parser = argparse.ArgumentParser(description="Analyze coding-agent trace patterns.")
subparsers = parser.add_subparsers(dest="command", required=True)
analyze_parser = subparsers.add_parser(
"analyze",
help="Run the full analysis workflow from one formatter-generated *-raw.jsonl trace.",
)
analyze_parser.add_argument("input", help="Path to the formatter-generated *-raw.jsonl trace.")
analyze_parser.add_argument(
"--release-input",
default=None,
help="Path to the formatter-generated release .jsonl with hash_ids. Defaults to the sibling file without the `-raw` suffix.",
)
analyze_parser.add_argument(
"--dataset-name",
default=None,
help="Dataset name used for output paths and figure titles. Defaults to the formatted trace stem.",
)
analyze_parser.add_argument(
"--output-dir",
default=None,
help="Explicit analysis output directory. Defaults to outputs/analysis/<dataset>/",
)
analyze_parser.add_argument("--output-root", default="outputs/analysis")
analyze_parser.add_argument(
"--figure-dir",
default=None,
help="Explicit figure directory. Defaults to <output-dir>/figures/.",
)
analyze_parser.add_argument(
"--block-size",
type=int,
default=256,
help="Block size for theoretical cache analysis.",
)
analyze_parser.add_argument(
"--segment-mode",
default="tokenizer",
choices=["bytes", "tokenizer"],
help="How to segment prompts for theoretical cache analysis.",
)
analyze_parser.add_argument(
"--tokenizer-path",
default=None,
help="Local path or model id for tokenizer mode. Defaults to the local resolved tokenizer path.",
)
analyze_parser.add_argument(
"--tokenizer-batch-size",
type=int,
default=64,
help="Batch size used by tokenizer-based theoretical cache analysis.",
)
analyze_parser.add_argument(
"--model-family",
default="auto",
help="Model family for tokenizer/chat-template metadata. Defaults to auto-detect.",
)
analyze_parser.add_argument(
"--model-meta-dir",
default=None,
help="Override the base directory that contains model_meta/<provider>/<model>/.",
)
analyze_parser.add_argument(
"--input-length-buckets",
default=None,
help="Semicolon-separated input-length bucket thresholds in tokens, such as `32768;87040;131072` or `32Ki;85Ki;128Ki`.",
)
parse_parser = subparsers.add_parser("parse", help="Normalize a formatter-generated *-raw.jsonl trace.")
_add_common_args(parse_parser)
parse_parser.add_argument(
"--format",
default="jsonl",
choices=["jsonl", "csv", "parquet"],
help="Normalized output format.",
)
features_parser = subparsers.add_parser("features", help="Extract request-level features.")
_add_common_args(features_parser)
report_parser = subparsers.add_parser("report", help="Generate markdown and json summary reports.")
_add_common_args(report_parser)
report_parser.add_argument(
"--normalized-format",
default="jsonl",
choices=["jsonl", "csv", "parquet"],
help="Also emit normalized records in this format.",
)
study_parser = subparsers.add_parser(
"study",
help="Generate data tables and CDF plots for lengths, cache reuse, and tool timing.",
)
_add_common_args(study_parser)
study_parser.add_argument(
"--normalized-format",
default="jsonl",
choices=["jsonl", "csv", "parquet"],
help="Normalized output format.",
)
study_parser.add_argument(
"--block-size",
type=int,
default=256,
help="Block size for theoretical cache analysis.",
)
study_parser.add_argument(
"--segment-mode",
default="tokenizer",
choices=["bytes", "tokenizer"],
help="How to segment prompts for theoretical cache analysis.",
)
study_parser.add_argument(
"--tokenizer-path",
default=None,
help="Local path or model id for tokenizer mode. Defaults to the local resolved tokenizer path.",
)
study_parser.add_argument(
"--tokenizer-batch-size",
type=int,
default=64,
help="Batch size used by tokenizer-based theoretical cache analysis.",
)
study_parser.add_argument(
"--model-family",
default="auto",
help="Model family for tokenizer/chat-template metadata. Defaults to auto-detect.",
)
study_parser.add_argument(
"--model-meta-dir",
default=None,
help="Override the base directory that contains model_meta/<provider>/<model>/.",
)
study_parser.add_argument(
"--input-length-buckets",
default=None,
help="Semicolon-separated input-length bucket thresholds in tokens, such as `32768;87040;131072` or `32Ki;85Ki;128Ki`.",
)
resume_parser = subparsers.add_parser(
"resume-details",
aliases=["resume-advanced"],
help="Reuse existing source trace (*-raw.jsonl or legacy normalized.jsonl) + features.csv and compute only detailed analysis outputs.",
)
resume_parser.add_argument("input", help="Path to formatter-generated *-raw.jsonl")
resume_parser.add_argument("features", help="Path to existing features.csv")
resume_parser.add_argument(
"--release-input",
default=None,
help="Path to the formatter-generated release .jsonl with hash_ids. Defaults to the sibling file without the `-raw` suffix.",
)
resume_parser.add_argument(
"--output-dir",
required=True,
help="Existing output directory to receive detailed analysis outputs.",
)
resume_parser.add_argument(
"--block-size",
type=int,
default=256,
help="Block size for theoretical cache analysis.",
)
resume_parser.add_argument(
"--segment-mode",
default="tokenizer",
choices=["bytes", "tokenizer"],
help="How to segment prompts for theoretical cache analysis.",
)
resume_parser.add_argument(
"--tokenizer-path",
default=None,
help="Local path or model id for tokenizer mode. Defaults to the local resolved tokenizer path.",
)
resume_parser.add_argument(
"--tokenizer-batch-size",
type=int,
default=64,
help="Batch size used by tokenizer-based theoretical cache analysis.",
)
resume_parser.add_argument(
"--model-family",
default="auto",
help="Model family for tokenizer/chat-template metadata. Defaults to auto-detect.",
)
resume_parser.add_argument(
"--model-meta-dir",
default=None,
help="Override the base directory that contains model_meta/<provider>/<model>/.",
)
resume_parser.add_argument(
"--limit",
type=int,
default=None,
help="Only process the first N source/features rows. Useful for throughput benchmarking.",
)
resume_parser.add_argument(
"--input-length-buckets",
default=None,
help="Semicolon-separated input-length bucket thresholds in tokens, such as `32768;87040;131072` or `32Ki;85Ki;128Ki`.",
)
return parser
def _add_common_args(parser):
parser.add_argument("input", help="Path to the formatter-generated *-raw.jsonl trace.")
parser.add_argument("--limit", type=int, default=None, help="Limit number of input lines.")
parser.add_argument(
"--output-dir",
default=None,
help="Output directory. Defaults to outputs/analysis/<input_stem>/",
)
def resolve_output_dir(input_path, output_dir):
return Path(output_dir) if output_dir else default_output_dir(input_path)
def _normalize_dataset_name(name: str) -> str:
text = str(name)
return text[:-4] if text.endswith("-raw") else text
def _resolve_analysis_output_dir(args):
dataset_name = args.dataset_name or _normalize_dataset_name(infer_analysis_dataset_name(args.input))
output_dir = Path(args.output_dir) if args.output_dir else Path(args.output_root) / dataset_name
figure_dir = Path(args.figure_dir) if args.figure_dir else output_dir / "figures"
return dataset_name, output_dir, figure_dir
def _resolve_release_input_path(raw_input: str, release_input: str | None) -> Path:
if release_input:
return Path(release_input)
raw_path = Path(raw_input)
name = raw_path.name
if name.endswith("-raw.jsonl"):
candidate = raw_path.with_name(name[:-len("-raw.jsonl")] + ".jsonl")
else:
raise ValueError("Expected a formatter-generated *-raw.jsonl input, or pass --release-input explicitly.")
return candidate
def _resolve_existing_release_input_path(raw_input: str, release_input: str | None) -> Path | None:
candidate = _resolve_release_input_path(raw_input, release_input)
if path_looks_like_release_trace(candidate):
return candidate
return None
def _existing_base_outputs(output_dir):
features = output_dir / "features.csv"
report = output_dir / "report.md"
if features.exists():
return {
"features": features,
"report": report if report.exists() else None,
}
return None
def _existing_detail_outputs(output_dir):
if not details_outputs_exist(output_dir):
return None
return collect_existing_detail_paths(output_dir)
def _file_fingerprint(path: str | Path) -> dict:
resolved = Path(path).resolve()
stat = resolved.stat()
return {
"path": str(resolved),
"size": stat.st_size,
"mtime_ns": stat.st_mtime_ns,
}
def _path_option(value: str | None) -> str | None:
if value is None:
return None
path = Path(value)
return str(path.resolve()) if path.exists() else str(value)
def _expected_analysis_provenance(args, release_input_path: Path, input_length_bucket_thresholds: list[int]) -> dict:
return {
"schema_version": ANALYSIS_PROVENANCE_SCHEMA_VERSION,
"raw_input": _file_fingerprint(args.input),
"release_input": _file_fingerprint(release_input_path),
"options": {
"block_size": args.block_size,
"segment_mode": args.segment_mode,
"tokenizer_path": _path_option(args.tokenizer_path),
"tokenizer_batch_size": args.tokenizer_batch_size,
"model_family": args.model_family,
"model_meta_dir": _path_option(args.model_meta_dir),
"input_length_buckets": input_length_bucket_thresholds,
},
}
def _analysis_provenance_path(output_dir: Path) -> Path:
return output_dir / ANALYSIS_PROVENANCE_FILENAME
def _load_analysis_provenance(output_dir: Path) -> dict | None:
path = _analysis_provenance_path(output_dir)
if not path.exists():
return None
try:
with path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (OSError, json.JSONDecodeError):
return None
return data if isinstance(data, dict) else None
def _analysis_provenance_matches(output_dir: Path, expected: dict) -> bool:
return _load_analysis_provenance(output_dir) == expected
def _write_analysis_provenance(output_dir: Path, provenance: dict) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
destination = _analysis_provenance_path(output_dir)
temp_path = destination.with_suffix(destination.suffix + ".tmp")
with temp_path.open("w", encoding="utf-8") as handle:
json.dump(provenance, handle, ensure_ascii=False, indent=2, sort_keys=True)
handle.write("\n")
temp_path.replace(destination)
def _stage_message(progress, step: int, total_steps: int, message: str) -> None:
tqdm.write(f"Stage {step}/{total_steps}: {message}")
progress.update(1)
progress.set_postfix(current=message)
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "analyze":
dataset_name, output_dir, figure_dir = _resolve_analysis_output_dir(args)
input_length_bucket_thresholds = parse_input_length_bucket_thresholds(args.input_length_buckets)
release_input_path = _resolve_existing_release_input_path(args.input, args.release_input)
if release_input_path is None:
raise FileNotFoundError(
f"Release trace not found for raw trace {args.input}. "
"Run `python -m trace_formatter build-release <raw-trace>` first, or pass --release-input."
)
expected_provenance = _expected_analysis_provenance(args, release_input_path, input_length_bucket_thresholds)
reuse_allowed = _analysis_provenance_matches(output_dir, expected_provenance)
total_steps = 4
progress = tqdm(
total=total_steps,
desc="Analyze trace",
unit="stage",
dynamic_ncols=True,
)
try:
prepare_result = None
reusable_base = _existing_base_outputs(output_dir) if reuse_allowed else None
if reusable_base:
_stage_message(progress, 1, total_steps, "reuse existing features.csv")
prepare_result = {
"features_path": str(reusable_base["features"]),
"reused": True,
}
else:
_stage_message(progress, 1, total_steps, "prepare features.csv")
prepare_result = stream_prepare(args.input, output_dir, show_progress=True)
reusable_details = _existing_detail_outputs(output_dir) if reuse_allowed else None
if reusable_details:
_stage_message(progress, 2, total_steps, "reuse existing details/")
advanced_paths = reusable_details
else:
_stage_message(
progress,
2,
total_steps,
"detailed analysis: request metrics, tool/session stats, kvcache stats",
)
advanced_paths = run_advanced_from_existing(
args.input,
release_input_path,
prepare_result["features_path"],
output_dir,
input_length_bucket_thresholds=input_length_bucket_thresholds,
show_progress=True,
)
_stage_message(progress, 3, total_steps, "reporting: summary.json, report.md, analysis_snapshot.json")
report_result = write_reports(
features_path=prepare_result["features_path"],
output_dir=output_dir,
pipeline_summary={
"dataset_name": dataset_name,
"formatted_path": str(Path(args.input)),
"release_path": str(release_input_path),
**{key: str(value) for key, value in advanced_paths.items()},
},
)
_stage_message(
progress,
4,
total_steps,
"figures: 13 approved request/session/tool/kvcache plots",
)
figure_result = render_figures(
analysis_dir=output_dir,
fig_dir=figure_dir,
dataset_title=dataset_name,
show_progress=True,
)
_write_analysis_provenance(output_dir, expected_provenance)
finally:
progress.close()
print(
json.dumps(
{
"dataset_name": dataset_name,
"formatted_path": str(Path(args.input)),
"output_dir": str(output_dir),
"prepare": prepare_result,
"details": {key: str(value) for key, value in advanced_paths.items()},
"report": report_result,
"figures": figure_result,
"release_path": str(release_input_path),
},
ensure_ascii=False,
indent=2,
)
)
return 0
if args.command in {"resume-details", "resume-advanced"}:
input_length_bucket_thresholds = parse_input_length_bucket_thresholds(args.input_length_buckets)
release_input_path = _resolve_existing_release_input_path(args.input, args.release_input)
if release_input_path is None:
raise FileNotFoundError(
f"Release trace not found for raw trace {args.input}. "
"Run `python -m trace_formatter build-release <raw-trace>` first, or pass --release-input."
)
paths = run_advanced_from_existing(
args.input,
release_input_path,
args.features,
args.output_dir,
input_length_bucket_thresholds=input_length_bucket_thresholds,
show_progress=True,
limit=args.limit,
)
for path in paths.values():
print(path)
return 0
output_dir = resolve_output_dir(args.input, args.output_dir)
if args.command == "study" and args.limit is None:
input_length_bucket_thresholds = parse_input_length_bucket_thresholds(args.input_length_buckets)
reusable = _existing_base_outputs(output_dir)
if reusable:
release_input_path = _resolve_existing_release_input_path(args.input, None)
if release_input_path is not None:
paths = _existing_detail_outputs(output_dir)
if paths is None:
paths = run_advanced_from_existing(
args.input,
release_input_path,
reusable["features"],
output_dir,
input_length_bucket_thresholds=input_length_bucket_thresholds,
show_progress=True,
)
for path in paths.values():
print(path)
return 0
show_progress = args.command == "study"
records = load_records(
args.input,
limit=args.limit,
show_progress=show_progress,
progress_desc="Load trace",
)
if args.command == "parse":
path = write_normalized(records, output_dir, output_format=args.format)
print(path)
return 0
features = compute_features(records)
if args.command == "features":
path = write_features(features, output_dir)
print(path)
return 0
if args.command == "study":
input_length_bucket_thresholds = parse_input_length_bucket_thresholds(args.input_length_buckets)
paths = run_study(
records,
output_dir,
normalized_format=args.normalized_format,
source_path=args.input,
block_size=args.block_size,
segment_mode=args.segment_mode,
tokenizer_path=args.tokenizer_path,
model_family=args.model_family,
model_meta_dir=args.model_meta_dir,
input_length_bucket_thresholds=input_length_bucket_thresholds,
show_progress=show_progress,
tokenizer_batch_size=args.tokenizer_batch_size,
)
for path in paths.values():
print(path)
return 0
normalized_path = write_normalized(records, output_dir, output_format=args.normalized_format)
features_path = write_features(features, output_dir)
summary_path, report_path = write_report(records, features, output_dir)
print(normalized_path)
print(features_path)
print(summary_path)
print(report_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())