diff --git a/pyproject.toml b/pyproject.toml index 82d4764..220c56d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,10 @@ trace-formatter = "trace_formatter.cli:main" trace-analyzer = "trace_analyzer.cli:main" [tool.setuptools.packages.find] -include = ["trace_analyzer", "trace_formatter", "trace_model_meta"] +include = ["trace_analyzer", "trace_formatter", "trace_model_meta*"] + +[tool.setuptools.package-data] +trace_model_meta = ["**/*.json", "**/*.jinja", "**/*.py"] [dependency-groups] dev = [ diff --git a/tests/test_ali_trace_pipeline.py b/tests/test_ali_trace_pipeline.py index 766f991..e104514 100644 --- a/tests/test_ali_trace_pipeline.py +++ b/tests/test_ali_trace_pipeline.py @@ -1,5 +1,8 @@ import json +import shutil +import subprocess import tempfile +import tomllib import unittest from datetime import datetime, timezone from pathlib import Path @@ -655,6 +658,33 @@ class AliTracePipelineTest(unittest.TestCase): self.assertEqual(Path(captured["dir"]), output_dir) + def test_format_and_sort_trace_supports_zstd_input_during_time_inference(self): + if shutil.which("zstd") is None or shutil.which("zstdcat") is None: + self.skipTest("zstd/zstdcat are required for .jsonl.zst formatter smoke test") + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + raw_path = root / "0417-1500-1530.jsonl" + zst_path = root / "0417-1500-1530.jsonl.zst" + output_path = root / "formatted" / "trace-raw.jsonl" + raw_path.write_text( + json.dumps( + make_raw_row( + "req-zst", + utc_ms("2026-04-17 15:00:01.000"), + time_text="2026-04-17 15:00:01.000", + ) + ) + + "\n", + encoding="utf-8", + ) + subprocess.run(["zstd", "-q", str(raw_path), "-o", str(zst_path)], check=True) + + stats = format_and_sort_trace(input_dir=zst_path, output_path=output_path, chunk_bytes=256) + + self.assertEqual(stats["row_count"], 1) + formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()] + self.assertEqual([row["meta"]["request_id"] for row in formatted_rows], ["req-zst"]) + def test_export_release_ready_trace_defaults_temp_dir_to_output_parent(self): with tempfile.TemporaryDirectory() as temp_dir: root = Path(temp_dir) @@ -855,3 +885,57 @@ class AliTracePipelineTest(unittest.TestCase): (analysis_dir / "details" / "details_summary.json").stat().st_mtime_ns, details_summary_mtime_ns, ) + + def test_trace_analyzer_rebuilds_when_same_output_gets_different_input(self): + with tempfile.TemporaryDirectory() as temp_dir: + root = Path(temp_dir) + formatted_root = root / "outputs" / "formatted" + analysis_root = root / "outputs" / "analysis" + + def build_trace(label: str, request_id: str, ready: str) -> Path: + raw_dir = root / label + raw_dir.mkdir() + raw_path = raw_dir / "0417-1500-1530.jsonl" + raw_path.write_text( + json.dumps(make_raw_row(request_id, utc_ms(ready), user_id=f"user-{request_id}")) + "\n", + encoding="utf-8", + ) + self.assertEqual(formatter_main(["format", str(raw_dir), "--output-root", str(formatted_root)]), 0) + formatted_path = formatted_root / "041715-041715-raw.jsonl" + renamed_path = formatted_root / f"{label}-raw.jsonl" + formatted_path.replace(renamed_path) + self.assertEqual(formatter_main(["build-release", str(renamed_path), "--jobs", "1", "--block-size", "8"]), 0) + return renamed_path + + first_path = build_trace("first", "req-first", "2026-04-17 15:00:01.000") + second_path = build_trace("second", "req-second", "2026-04-17 15:00:02.000") + + common_args = [ + "--output-root", + str(analysis_root), + "--dataset-name", + "same-dataset", + "--segment-mode", + "bytes", + "--block-size", + "8", + ] + self.assertEqual(analyzer_main(["analyze", str(first_path), *common_args]), 0) + analysis_dir = analysis_root / "same-dataset" + self.assertIn("req-first", (analysis_dir / "features.csv").read_text(encoding="utf-8")) + + self.assertEqual(analyzer_main(["analyze", str(second_path), *common_args]), 0) + + features_text = (analysis_dir / "features.csv").read_text(encoding="utf-8") + self.assertIn("req-second", features_text) + self.assertNotIn("req-first", features_text) + details_summary = json.loads((analysis_dir / "details" / "details_summary.json").read_text(encoding="utf-8")) + self.assertTrue(str(details_summary["release_path"]).endswith("second.jsonl")) + + def test_pyproject_includes_trace_model_meta_package_data(self): + pyproject = tomllib.loads(Path("pyproject.toml").read_text(encoding="utf-8")) + package_data = pyproject["tool"]["setuptools"]["package-data"] + + self.assertIn("trace_model_meta", package_data) + self.assertIn("**/*.json", package_data["trace_model_meta"]) + self.assertIn("**/*.jinja", package_data["trace_model_meta"]) diff --git a/trace_analyzer/cli.py b/trace_analyzer/cli.py index 34d85de..e389c45 100644 --- a/trace_analyzer/cli.py +++ b/trace_analyzer/cli.py @@ -14,6 +14,9 @@ from .reporting import write_reports from .resume_advanced import collect_existing_detail_paths, run_advanced_from_existing from .study import parse_input_length_bucket_thresholds, run_study +ANALYSIS_PROVENANCE_FILENAME = "analysis_provenance.json" +ANALYSIS_PROVENANCE_SCHEMA_VERSION = 1 + def build_parser(): parser = argparse.ArgumentParser(description="Analyze coding-agent trace patterns.") @@ -282,6 +285,70 @@ def _existing_detail_outputs(output_dir): return collect_existing_detail_paths(output_dir) +def _file_fingerprint(path: str | Path) -> dict: + resolved = Path(path).resolve() + stat = resolved.stat() + return { + "path": str(resolved), + "size": stat.st_size, + "mtime_ns": stat.st_mtime_ns, + } + + +def _path_option(value: str | None) -> str | None: + if value is None: + return None + path = Path(value) + return str(path.resolve()) if path.exists() else str(value) + + +def _expected_analysis_provenance(args, release_input_path: Path, input_length_bucket_thresholds: list[int]) -> dict: + return { + "schema_version": ANALYSIS_PROVENANCE_SCHEMA_VERSION, + "raw_input": _file_fingerprint(args.input), + "release_input": _file_fingerprint(release_input_path), + "options": { + "block_size": args.block_size, + "segment_mode": args.segment_mode, + "tokenizer_path": _path_option(args.tokenizer_path), + "tokenizer_batch_size": args.tokenizer_batch_size, + "model_family": args.model_family, + "model_meta_dir": _path_option(args.model_meta_dir), + "input_length_buckets": input_length_bucket_thresholds, + }, + } + + +def _analysis_provenance_path(output_dir: Path) -> Path: + return output_dir / ANALYSIS_PROVENANCE_FILENAME + + +def _load_analysis_provenance(output_dir: Path) -> dict | None: + path = _analysis_provenance_path(output_dir) + if not path.exists(): + return None + try: + with path.open("r", encoding="utf-8") as handle: + data = json.load(handle) + except (OSError, json.JSONDecodeError): + return None + return data if isinstance(data, dict) else None + + +def _analysis_provenance_matches(output_dir: Path, expected: dict) -> bool: + return _load_analysis_provenance(output_dir) == expected + + +def _write_analysis_provenance(output_dir: Path, provenance: dict) -> None: + output_dir.mkdir(parents=True, exist_ok=True) + destination = _analysis_provenance_path(output_dir) + temp_path = destination.with_suffix(destination.suffix + ".tmp") + with temp_path.open("w", encoding="utf-8") as handle: + json.dump(provenance, handle, ensure_ascii=False, indent=2, sort_keys=True) + handle.write("\n") + temp_path.replace(destination) + + def _stage_message(progress, step: int, total_steps: int, message: str) -> None: tqdm.write(f"Stage {step}/{total_steps}: {message}") progress.update(1) @@ -301,6 +368,8 @@ def main(argv=None): f"Release trace not found for raw trace {args.input}. " "Run `python -m trace_formatter build-release ` first, or pass --release-input." ) + expected_provenance = _expected_analysis_provenance(args, release_input_path, input_length_bucket_thresholds) + reuse_allowed = _analysis_provenance_matches(output_dir, expected_provenance) total_steps = 4 progress = tqdm( total=total_steps, @@ -310,7 +379,7 @@ def main(argv=None): ) try: prepare_result = None - reusable_base = _existing_base_outputs(output_dir) + reusable_base = _existing_base_outputs(output_dir) if reuse_allowed else None if reusable_base: _stage_message(progress, 1, total_steps, "reuse existing features.csv") prepare_result = { @@ -320,7 +389,7 @@ def main(argv=None): else: _stage_message(progress, 1, total_steps, "prepare features.csv") prepare_result = stream_prepare(args.input, output_dir, show_progress=True) - reusable_details = _existing_detail_outputs(output_dir) + reusable_details = _existing_detail_outputs(output_dir) if reuse_allowed else None if reusable_details: _stage_message(progress, 2, total_steps, "reuse existing details/") advanced_paths = reusable_details @@ -362,6 +431,7 @@ def main(argv=None): dataset_title=dataset_name, show_progress=True, ) + _write_analysis_provenance(output_dir, expected_provenance) finally: progress.close() print( diff --git a/trace_formatter/formatting.py b/trace_formatter/formatting.py index 80706b3..e15f46c 100644 --- a/trace_formatter/formatting.py +++ b/trace_formatter/formatting.py @@ -6,14 +6,13 @@ import json import os import shutil import sys -import subprocess import tempfile from array import array from concurrent.futures import ProcessPoolExecutor, as_completed from contextlib import contextmanager, nullcontext from dataclasses import asdict from pathlib import Path -from typing import Iterator, TextIO +from typing import Iterator from trace_analyzer.helpers import normalize_unicode_text, parse_jsonish, safe_int from tokenizers import Tokenizer @@ -32,6 +31,7 @@ from .sessionization import ( encode_roles, extract_user_id, ) +from .trace_io import open_trace_text from .time_windows import infer_time_offset_ms, infer_time_window, parse_time_to_ms @@ -88,34 +88,6 @@ def discover_source_files(input_dir: str | Path) -> list[Path]: return files -@contextmanager -def open_trace_text(path: str | Path) -> Iterator[TextIO]: - resolved = Path(path) - if resolved.suffix == ".zst": - proc = subprocess.Popen( - ["zstdcat", str(resolved)], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - encoding="utf-8", - ) - if proc.stdout is None: - raise RuntimeError(f"Failed to stream {resolved}") - try: - yield proc.stdout - finally: - stdout = proc.stdout - stdout.close() - stderr = proc.stderr.read() if proc.stderr else "" - return_code = proc.wait() - if return_code != 0: - raise RuntimeError(f"zstdcat failed for {resolved}: {stderr.strip()}") - return - - with resolved.open("r", encoding="utf-8") as handle: - yield handle - - def _normalize_time_ms(*, raw_time_ms: int, wall_clock_ms: int, time_offset_ms: int) -> int: if not raw_time_ms: return wall_clock_ms - time_offset_ms if wall_clock_ms and time_offset_ms else wall_clock_ms diff --git a/trace_formatter/time_windows.py b/trace_formatter/time_windows.py index 081366d..c65eba6 100644 --- a/trace_formatter/time_windows.py +++ b/trace_formatter/time_windows.py @@ -8,6 +8,8 @@ from pathlib import Path from trace_analyzer.helpers import parse_jsonish, safe_int +from .trace_io import open_trace_text + WINDOW_RE = re.compile(r"(?P\d{4})-(?P\d{4})-(?P\d{4})$") UTC_PLUS_8 = timezone(timedelta(hours=8)) @@ -32,8 +34,17 @@ def parse_time_to_ms(value: str) -> int: raise ValueError(f"Unsupported timestamp format: {value!r}") +def _trace_window_name(path: Path) -> str: + name = path.name + if name.endswith(".jsonl.zst"): + return name[: -len(".jsonl.zst")] + if name.endswith(".jsonl"): + return name[: -len(".jsonl")] + return path.stem + + def _read_first_timestamp(path: Path) -> str: - with path.open("r", encoding="utf-8") as handle: + with open_trace_text(path) as handle: for line in handle: stripped = line.strip() if not stripped: @@ -46,7 +57,7 @@ def _read_first_timestamp(path: Path) -> str: def _read_first_timestamp_and_ready_ms(path: Path) -> tuple[str, int]: - with path.open("r", encoding="utf-8") as handle: + with open_trace_text(path) as handle: for line in handle: stripped = line.strip() if not stripped: @@ -90,8 +101,8 @@ def infer_time_window( if not source_files: return None - first_match = WINDOW_RE.match(source_files[0].stem) - last_match = WINDOW_RE.match(source_files[-1].stem) + first_match = WINDOW_RE.match(_trace_window_name(source_files[0])) + last_match = WINDOW_RE.match(_trace_window_name(source_files[-1])) if first_match is None or last_match is None: return None diff --git a/trace_formatter/trace_io.py b/trace_formatter/trace_io.py new file mode 100644 index 0000000..6db3dc2 --- /dev/null +++ b/trace_formatter/trace_io.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import subprocess +from contextlib import contextmanager +from pathlib import Path +from typing import Iterator, TextIO + + +@contextmanager +def open_trace_text(path: str | Path) -> Iterator[TextIO]: + resolved = Path(path) + if resolved.suffix == ".zst": + proc = subprocess.Popen( + ["zstdcat", str(resolved)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + ) + if proc.stdout is None: + raise RuntimeError(f"Failed to stream {resolved}") + try: + yield proc.stdout + finally: + proc.stdout.close() + stderr = proc.stderr.read() if proc.stderr else "" + return_code = proc.wait() + if return_code != 0: + raise RuntimeError(f"zstdcat failed for {resolved}: {stderr.strip()}") + return + + with resolved.open("r", encoding="utf-8") as handle: + yield handle