Files
2026-04-21 15:44:47 +00:00

201 lines
7.5 KiB
Python

from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
from .formatting import derive_output_label, discover_source_files, export_release_ready_trace, format_and_sort_trace
from .time_windows import infer_time_window
def _default_output_root(input_path: str | Path) -> Path:
resolved = Path(input_path)
if resolved.is_dir():
return resolved.parent / f"{resolved.name}-formatted"
if resolved.parent.name.startswith("trace-"):
return resolved.parent.parent / f"{resolved.parent.name}-formatted"
return resolved.parent / f"{resolved.stem}-formatted"
def _resolve_raw_output_path(args: argparse.Namespace) -> Path:
if args.output:
explicit = Path(args.output)
return explicit if explicit.stem.endswith("-raw") else explicit.with_name(f"{explicit.stem}-raw.jsonl")
output_root = Path(args.output_root) if args.output_root else _default_output_root(args.input)
source_files = discover_source_files(args.input)
time_window = infer_time_window(
source_files,
start_time=None if args.no_truncate_to_window else args.start_time,
end_time=None if args.no_truncate_to_window else args.end_time,
) if (args.start_time and args.end_time) or (not args.no_truncate_to_window) else None
label = derive_output_label(args.input, time_window=time_window)
return output_root / f"{label}-raw.jsonl"
def _resolve_release_output_path(args: argparse.Namespace) -> Path:
if args.output:
explicit = Path(args.output)
return explicit if not explicit.stem.endswith("-raw") else explicit.with_name(f"{explicit.stem[:-4]}.jsonl")
input_path = Path(args.input)
if input_path.stem.endswith("-raw"):
return input_path.with_name(f"{input_path.stem[:-4]}.jsonl")
return input_path.with_suffix(".jsonl")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Format raw trace shards into one time-sorted trace jsonl.")
subparsers = parser.add_subparsers(dest="command", required=True)
format_parser = subparsers.add_parser(
"format",
help="Format a raw trace directory or one .jsonl/.jsonl.zst file into one unified *-raw jsonl.",
)
format_parser.add_argument("input", help="Raw trace directory or one .jsonl/.jsonl.zst file.")
format_parser.add_argument(
"--output",
default=None,
help="Explicit raw output jsonl path. Defaults to a sibling trace-*-formatted/<label>-raw.jsonl path.",
)
format_parser.add_argument(
"--output-root",
default=None,
help="Base directory used when --output is omitted. Defaults to a sibling trace-*-formatted directory.",
)
format_parser.add_argument("--tmp-dir", default=None)
format_parser.add_argument("--chunk-bytes", type=int, default=128 * 1024 * 1024)
format_parser.add_argument(
"--log-file",
default=None,
help="Optional log file path. When set, progress bars are mirrored to this file.",
)
format_parser.add_argument(
"--no-progress",
action="store_true",
help="Disable progress bars during formatting.",
)
format_parser.add_argument(
"--start-time",
default=None,
help="Explicit UTC+8 start time for ready-time truncation, e.g. '2026-04-17 15:00:00.000'.",
)
format_parser.add_argument(
"--end-time",
default=None,
help="Explicit UTC+8 end time for ready-time truncation, e.g. '2026-04-17 17:00:00.000'.",
)
format_parser.add_argument(
"--no-truncate-to-window",
action="store_true",
help="Disable ready-time window truncation inferred from shard names or --start-time/--end-time.",
)
format_parser.add_argument(
"--build-release",
action="store_true",
help="Also build the open-source-ready release jsonl after formatting the raw output.",
)
format_parser.add_argument(
"--release-output",
default=None,
help="Explicit release jsonl path used only with --build-release.",
)
format_parser.add_argument(
"--release-jobs",
type=int,
default=min(os.cpu_count() or 1, 16),
help="Worker processes used by release building when --build-release is enabled.",
)
release_parser = subparsers.add_parser(
"build-release",
help="Build the open-source-ready release jsonl from one formatted *-raw trace.",
)
release_parser.add_argument("input", help="Path to the formatted *-raw jsonl.")
release_parser.add_argument(
"--output",
default=None,
help="Explicit release output jsonl path. Defaults to the sibling path without the -raw suffix.",
)
release_parser.add_argument("--tmp-dir", default=None)
release_parser.add_argument("--block-size", type=int, default=512)
release_parser.add_argument(
"--jobs",
type=int,
default=min(os.cpu_count() or 1, 16),
help="Worker processes used for release tokenization shards.",
)
release_parser.add_argument(
"--log-file",
default=None,
help="Optional log file path. When set, progress bars are mirrored to this file.",
)
release_parser.add_argument(
"--no-progress",
action="store_true",
help="Disable progress bars during release building.",
)
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
if args.command == "format":
raw_output_path = _resolve_raw_output_path(args)
result = format_and_sort_trace(
input_dir=args.input,
output_path=raw_output_path,
tmp_dir=args.tmp_dir,
chunk_bytes=args.chunk_bytes,
start_time=args.start_time,
end_time=args.end_time,
truncate_to_window=not args.no_truncate_to_window,
show_progress=not args.no_progress,
log_file=args.log_file,
)
payload = {
"input_path": str(Path(args.input)),
"formatted_name": raw_output_path.stem,
**result,
}
if args.build_release:
release_output_path = Path(args.release_output) if args.release_output else raw_output_path.with_name(
f"{raw_output_path.stem[:-4]}.jsonl"
)
release_result = export_release_ready_trace(
raw_input_path=raw_output_path,
release_output_path=release_output_path,
tmp_dir=args.tmp_dir,
jobs=args.release_jobs,
show_progress=not args.no_progress,
log_file=args.log_file,
)
payload.update(release_result)
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
if args.command == "build-release":
release_output_path = _resolve_release_output_path(args)
result = export_release_ready_trace(
raw_input_path=args.input,
release_output_path=release_output_path,
tmp_dir=args.tmp_dir,
block_size=args.block_size,
jobs=args.jobs,
show_progress=not args.no_progress,
log_file=args.log_file,
)
payload = {
"input_path": str(Path(args.input)),
"formatted_name": release_output_path.stem,
**result,
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
raise ValueError(f"Unsupported command: {args.command}")
if __name__ == "__main__":
raise SystemExit(main())