Initial commit
This commit is contained in:
849
trace_formatter/formatting.py
Normal file
849
trace_formatter/formatting.py
Normal file
@@ -0,0 +1,849 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import heapq
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import subprocess
|
||||
import tempfile
|
||||
from array import array
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
from contextlib import contextmanager, nullcontext
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
from typing import Iterator, TextIO
|
||||
|
||||
from trace_analyzer.helpers import parse_jsonish, safe_int
|
||||
from tokenizers import Tokenizer
|
||||
from tqdm.auto import tqdm
|
||||
from trace_model_meta import infer_model_family_from_request_model, resolve_tokenizer_path
|
||||
|
||||
from . import SCHEMA_VERSION
|
||||
from .raw_parser import get_raw_adapter
|
||||
from .sessionization import (
|
||||
LogicalSessionizer,
|
||||
build_message_fingerprints,
|
||||
build_sequence_hashes,
|
||||
decode_prefix_hashes,
|
||||
decode_roles,
|
||||
encode_prefix_hashes,
|
||||
encode_roles,
|
||||
extract_user_id,
|
||||
)
|
||||
from .time_windows import infer_time_offset_ms, infer_time_window, parse_time_to_ms
|
||||
|
||||
|
||||
def _is_supported_trace_file(path: Path) -> bool:
|
||||
return path.name.endswith(".jsonl") or path.name.endswith(".jsonl.zst")
|
||||
|
||||
|
||||
def derive_trace_name(input_path: str | Path) -> str:
|
||||
resolved = Path(input_path)
|
||||
if resolved.is_dir():
|
||||
return resolved.name
|
||||
name = resolved.name
|
||||
if name.endswith(".jsonl.zst"):
|
||||
return name[: -len(".jsonl.zst")]
|
||||
if name.endswith(".jsonl"):
|
||||
return name[: -len(".jsonl")]
|
||||
return resolved.stem
|
||||
|
||||
|
||||
def default_formatted_name(input_path: str | Path) -> str:
|
||||
base_name = derive_trace_name(input_path)
|
||||
return base_name if base_name.endswith("-formatted") else f"{base_name}-formatted"
|
||||
|
||||
|
||||
def derive_output_label(input_path: str | Path, *, time_window=None) -> str:
|
||||
if time_window is not None and getattr(time_window, "label", None):
|
||||
return str(time_window.label)
|
||||
return derive_trace_name(input_path)
|
||||
|
||||
|
||||
def discover_source_files(input_dir: str | Path) -> list[Path]:
|
||||
root = Path(input_dir)
|
||||
if not root.exists():
|
||||
raise FileNotFoundError(f"Input path does not exist: {root}")
|
||||
|
||||
if root.is_file():
|
||||
if not _is_supported_trace_file(root):
|
||||
raise FileNotFoundError(f"Input file must be .jsonl or .jsonl.zst: {root}")
|
||||
return [root]
|
||||
|
||||
preferred: dict[str, Path] = {}
|
||||
for path in sorted(root.iterdir()):
|
||||
if not path.is_file():
|
||||
continue
|
||||
if path.name.endswith(".jsonl.zst"):
|
||||
stem = path.name[: -len(".jsonl.zst")]
|
||||
preferred[stem] = path
|
||||
elif path.name.endswith(".jsonl"):
|
||||
preferred.setdefault(path.stem, path)
|
||||
|
||||
files = [preferred[key] for key in sorted(preferred)]
|
||||
if not files:
|
||||
raise FileNotFoundError(f"No .jsonl or .jsonl.zst files found under {root}")
|
||||
return files
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_trace_text(path: str | Path) -> Iterator[TextIO]:
|
||||
resolved = Path(path)
|
||||
if resolved.suffix == ".zst":
|
||||
proc = subprocess.Popen(
|
||||
["zstdcat", str(resolved)],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
)
|
||||
if proc.stdout is None:
|
||||
raise RuntimeError(f"Failed to stream {resolved}")
|
||||
try:
|
||||
yield proc.stdout
|
||||
finally:
|
||||
stdout = proc.stdout
|
||||
stdout.close()
|
||||
stderr = proc.stderr.read() if proc.stderr else ""
|
||||
return_code = proc.wait()
|
||||
if return_code != 0:
|
||||
raise RuntimeError(f"zstdcat failed for {resolved}: {stderr.strip()}")
|
||||
return
|
||||
|
||||
with resolved.open("r", encoding="utf-8") as handle:
|
||||
yield handle
|
||||
|
||||
|
||||
def _normalize_time_ms(*, raw_time_ms: int, wall_clock_ms: int, time_offset_ms: int) -> int:
|
||||
if not raw_time_ms:
|
||||
return wall_clock_ms - time_offset_ms if wall_clock_ms and time_offset_ms else wall_clock_ms
|
||||
if not wall_clock_ms or not time_offset_ms:
|
||||
return raw_time_ms
|
||||
|
||||
delta_ms = wall_clock_ms - raw_time_ms
|
||||
tolerance_ms = 10 * 60 * 1000
|
||||
if abs(delta_ms - time_offset_ms) <= tolerance_ms:
|
||||
return raw_time_ms
|
||||
if abs(delta_ms) <= tolerance_ms:
|
||||
return raw_time_ms - time_offset_ms
|
||||
return raw_time_ms
|
||||
|
||||
|
||||
def _extract_sort_time_ms(raw: dict, attributes: dict, *, time_offset_ms: int = 0) -> int:
|
||||
wall_clock_ms = parse_time_to_ms(str(raw.get("time", ""))) if raw.get("time") else 0
|
||||
ready_ms = safe_int(attributes.get("x-dashscope-inner-requestreadytime"))
|
||||
if ready_ms:
|
||||
return _normalize_time_ms(raw_time_ms=ready_ms, wall_clock_ms=wall_clock_ms, time_offset_ms=time_offset_ms)
|
||||
if wall_clock_ms:
|
||||
return _normalize_time_ms(raw_time_ms=0, wall_clock_ms=wall_clock_ms, time_offset_ms=time_offset_ms)
|
||||
raw_epoch_seconds = safe_int(raw.get("__time__"))
|
||||
if raw_epoch_seconds:
|
||||
return raw_epoch_seconds * 1000
|
||||
return 0
|
||||
|
||||
|
||||
def _extract_response_message(response_payload: dict) -> dict:
|
||||
output = response_payload.get("output", {}) if isinstance(response_payload, dict) else {}
|
||||
if not isinstance(output, dict):
|
||||
return {}
|
||||
choices = output.get("choices", [])
|
||||
if not isinstance(choices, list) or not choices:
|
||||
return {}
|
||||
message = choices[0].get("message", {})
|
||||
return message if isinstance(message, dict) else {}
|
||||
|
||||
|
||||
def _extract_usage(response_payload: dict) -> dict:
|
||||
usage_payload = response_payload.get("usage", {}) if isinstance(response_payload, dict) else {}
|
||||
output_payload = response_payload.get("output", {}) if isinstance(response_payload, dict) else {}
|
||||
if (not isinstance(usage_payload, dict) or not usage_payload) and isinstance(output_payload, dict):
|
||||
usage_payload = output_payload.get("usage", {})
|
||||
output_details = parse_jsonish(usage_payload.get("output_tokens_details", {}))
|
||||
prompt_details = parse_jsonish(usage_payload.get("prompt_tokens_details", {}))
|
||||
return {
|
||||
"input_tokens": safe_int(usage_payload.get("input_tokens", usage_payload.get("prompt_tokens"))),
|
||||
"output_tokens": safe_int(usage_payload.get("output_tokens", usage_payload.get("completion_tokens"))),
|
||||
"total_tokens": safe_int(usage_payload.get("total_tokens")),
|
||||
"reasoning_tokens": safe_int(
|
||||
output_details.get("reasoning_tokens") if isinstance(output_details, dict) else 0
|
||||
),
|
||||
"cached_tokens": safe_int(
|
||||
prompt_details.get("cached_tokens") if isinstance(prompt_details, dict) else 0
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _extract_request_components(raw: dict) -> tuple[dict, dict | None, dict, dict, list]:
|
||||
request_params = parse_jsonish(raw.get("request_params", {}))
|
||||
response_params = parse_jsonish(raw.get("response_params", {}))
|
||||
request_header = request_params.get("header", {}) if isinstance(request_params, dict) else {}
|
||||
request_attributes = request_header.get("attributes", {}) if isinstance(request_header, dict) else {}
|
||||
request_payload = request_params.get("payload", {}) if isinstance(request_params, dict) else {}
|
||||
request_input = request_payload.get("input", {}) if isinstance(request_payload, dict) else {}
|
||||
messages = request_input.get("messages", [])
|
||||
return request_params, response_params, request_payload, request_attributes, messages
|
||||
|
||||
|
||||
def _build_unified_row_from_components(
|
||||
raw: dict,
|
||||
*,
|
||||
request_params: dict,
|
||||
response_params: dict | None,
|
||||
request_payload: dict,
|
||||
request_attributes: dict,
|
||||
messages: list,
|
||||
source_file: str,
|
||||
source_line: int,
|
||||
time_offset_ms: int = 0,
|
||||
) -> dict:
|
||||
adapter = get_raw_adapter(raw)
|
||||
request_parameters = request_payload.get("parameters", {}) if isinstance(request_payload, dict) else {}
|
||||
response_payload = response_params.get("payload", {}) if isinstance(response_params, dict) else {}
|
||||
response_header = response_params.get("header", {}) if isinstance(response_params, dict) else {}
|
||||
response_attributes = response_header.get("attributes", {}) if isinstance(response_header, dict) else {}
|
||||
|
||||
sort_time_ms = _extract_sort_time_ms(raw, request_attributes, time_offset_ms=time_offset_ms)
|
||||
total_cost_time_ms = safe_int(raw.get("total_cost_time"))
|
||||
request_end_time_ms = sort_time_ms + total_cost_time_ms if sort_time_ms else total_cost_time_ms
|
||||
|
||||
declared_tools = request_parameters.get("tools", [])
|
||||
canonical_prompt = adapter.build_canonical_prompt(request_payload)
|
||||
usage = _extract_usage(response_payload)
|
||||
message_events = [asdict(adapter.parse_message(message)) for message in messages if isinstance(message, dict)]
|
||||
tool_specs = [asdict(adapter.parse_tool(tool)) for tool in declared_tools if isinstance(tool, dict)]
|
||||
role_sequence = [event["role"] for event in message_events]
|
||||
user_id = extract_user_id(request_params)
|
||||
model_family = infer_model_family_from_request_model(raw.get("request_model")) or "glm5"
|
||||
raw_messages = [message for message in messages if isinstance(message, dict)]
|
||||
backend_first_request_time_ms = safe_int(response_attributes.get("x-ds-backend-first-request-time"))
|
||||
backend_first_response_time_ms = safe_int(response_attributes.get("x-ds-backend-first-response-time"))
|
||||
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"sort_time_ms": sort_time_ms,
|
||||
"meta": {
|
||||
"model_family": model_family,
|
||||
"request_id": str(raw.get("request_id", "")),
|
||||
"session_id": "",
|
||||
"raw_session_id": str(raw.get("session_id", "")),
|
||||
"user_id": user_id,
|
||||
"parent_request_id": "",
|
||||
"parent_chat_id": -1,
|
||||
"chat_id": -1,
|
||||
"turn": 0,
|
||||
"request_model": str(raw.get("request_model", "")),
|
||||
"time": str(raw.get("time", "")),
|
||||
"status_code": str(raw.get("status_code", "")),
|
||||
"status_name": str(raw.get("status_name", "")),
|
||||
"request_ready_time_ms": sort_time_ms,
|
||||
"request_end_time_ms": request_end_time_ms,
|
||||
"total_cost_time_ms": total_cost_time_ms,
|
||||
"backend_first_request_time_ms": backend_first_request_time_ms,
|
||||
"backend_first_response_time_ms": backend_first_response_time_ms,
|
||||
},
|
||||
"role_sequence": role_sequence,
|
||||
"message_events": message_events,
|
||||
"declared_tools": tool_specs,
|
||||
"usage": usage,
|
||||
"canonical_prompt": canonical_prompt,
|
||||
"response_message": _extract_response_message(response_payload),
|
||||
"raw_messages": raw_messages,
|
||||
}
|
||||
|
||||
|
||||
def _has_empty_response_params(raw: dict, response_params) -> bool:
|
||||
raw_value = raw.get("response_params")
|
||||
if raw_value is None:
|
||||
return True
|
||||
if isinstance(raw_value, str) and raw_value.strip().lower() in {"", "none", "null"}:
|
||||
return True
|
||||
return response_params is None or (isinstance(response_params, dict) and not response_params)
|
||||
|
||||
|
||||
def build_unified_row(raw: dict, *, source_file: str, source_line: int, time_offset_ms: int = 0) -> dict:
|
||||
request_params, response_params, request_payload, request_attributes, messages = _extract_request_components(raw)
|
||||
return _build_unified_row_from_components(
|
||||
raw,
|
||||
request_params=request_params,
|
||||
response_params=response_params,
|
||||
request_payload=request_payload,
|
||||
request_attributes=request_attributes,
|
||||
messages=messages,
|
||||
source_file=source_file,
|
||||
source_line=source_line,
|
||||
time_offset_ms=time_offset_ms,
|
||||
)
|
||||
|
||||
|
||||
def _build_unified_row_payload(
|
||||
raw: dict,
|
||||
*,
|
||||
source_file: str,
|
||||
source_line: int,
|
||||
time_offset_ms: int = 0,
|
||||
) -> tuple[int, str, str, str, str, str]:
|
||||
request_params, response_params, request_payload, request_attributes, messages = _extract_request_components(raw)
|
||||
return _build_unified_row_payload_from_components(
|
||||
raw,
|
||||
request_params=request_params,
|
||||
response_params=response_params,
|
||||
request_payload=request_payload,
|
||||
request_attributes=request_attributes,
|
||||
messages=messages,
|
||||
source_file=source_file,
|
||||
source_line=source_line,
|
||||
time_offset_ms=time_offset_ms,
|
||||
)
|
||||
|
||||
|
||||
def _build_unified_row_payload_from_components(
|
||||
raw: dict,
|
||||
*,
|
||||
request_params: dict,
|
||||
response_params: dict | None,
|
||||
request_payload: dict,
|
||||
request_attributes: dict,
|
||||
messages: list,
|
||||
source_file: str,
|
||||
source_line: int,
|
||||
time_offset_ms: int = 0,
|
||||
) -> tuple[int, str, str, str, str, str]:
|
||||
normalized_messages = [message for message in messages if isinstance(message, dict)]
|
||||
row = _build_unified_row_from_components(
|
||||
raw,
|
||||
request_params=request_params,
|
||||
response_params=response_params,
|
||||
request_payload=request_payload,
|
||||
request_attributes=request_attributes,
|
||||
messages=messages,
|
||||
source_file=source_file,
|
||||
source_line=source_line,
|
||||
time_offset_ms=time_offset_ms,
|
||||
)
|
||||
message_fingerprints = build_message_fingerprints(normalized_messages)
|
||||
return (
|
||||
safe_int(row.get("sort_time_ms")),
|
||||
str(row["meta"].get("user_id", "")),
|
||||
str(row["meta"].get("request_id", "")),
|
||||
encode_prefix_hashes(build_sequence_hashes(message_fingerprints)),
|
||||
encode_roles([str(message.get("role", "")) for message in normalized_messages]),
|
||||
json.dumps(row, ensure_ascii=False, separators=(",", ":")),
|
||||
)
|
||||
|
||||
|
||||
def _write_chunk(rows: list[tuple[int, int, str, str, str, str, str]], tmp_dir: Path, chunk_index: int) -> Path:
|
||||
rows.sort(key=lambda item: (item[0], item[1]))
|
||||
path = tmp_dir / f"chunk_{chunk_index:05d}.jsonl"
|
||||
with path.open("w", encoding="utf-8") as handle:
|
||||
for sort_time_ms, seq, user_id, request_id, sequence_hashes, roles, row_json in rows:
|
||||
handle.write(f"{sort_time_ms}\t{seq}\t{user_id}\t{request_id}\t{sequence_hashes}\t{roles}\t{row_json}\n")
|
||||
return path
|
||||
|
||||
|
||||
def _iter_chunk_rows(path: Path) -> Iterator[tuple[int, int, str, str, str, str, str]]:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
for line in handle:
|
||||
sort_text, seq_text, user_id, request_id, sequence_hashes, roles, row_json = line.rstrip("\n").split("\t", 6)
|
||||
yield int(sort_text), int(seq_text), user_id, request_id, sequence_hashes, roles, row_json
|
||||
|
||||
|
||||
def _replace_json_field_once(row_json: str, *, key: str, value_text: str) -> str:
|
||||
target = f'"{key}":""'
|
||||
if target in row_json:
|
||||
return row_json.replace(target, f'"{key}":{value_text}', 1)
|
||||
numeric_target = None
|
||||
if key in {"parent_chat_id", "chat_id"}:
|
||||
numeric_target = f'"{key}":-1'
|
||||
elif key == "turn":
|
||||
numeric_target = f'"{key}":0'
|
||||
if numeric_target and numeric_target in row_json:
|
||||
return row_json.replace(numeric_target, f'"{key}":{value_text}', 1)
|
||||
raise ValueError(f"Unable to patch {key} in formatted row json")
|
||||
|
||||
|
||||
def _apply_session_assignment_to_row_json(row_json: str, assignment) -> str:
|
||||
patched = _replace_json_field_once(
|
||||
row_json,
|
||||
key="session_id",
|
||||
value_text=json.dumps(assignment.session_id, ensure_ascii=False),
|
||||
)
|
||||
patched = _replace_json_field_once(
|
||||
patched,
|
||||
key="parent_request_id",
|
||||
value_text=json.dumps(assignment.parent_request_id, ensure_ascii=False),
|
||||
)
|
||||
patched = _replace_json_field_once(
|
||||
patched,
|
||||
key="parent_chat_id",
|
||||
value_text=str(assignment.parent_chat_id),
|
||||
)
|
||||
patched = _replace_json_field_once(
|
||||
patched,
|
||||
key="chat_id",
|
||||
value_text=str(assignment.chat_id),
|
||||
)
|
||||
return _replace_json_field_once(
|
||||
patched,
|
||||
key="turn",
|
||||
value_text=str(assignment.turn),
|
||||
)
|
||||
|
||||
|
||||
class _TeeStream:
|
||||
def __init__(self, *streams):
|
||||
self._streams = [stream for stream in streams if stream is not None]
|
||||
|
||||
def write(self, data):
|
||||
for stream in self._streams:
|
||||
stream.write(data)
|
||||
return len(data)
|
||||
|
||||
def flush(self):
|
||||
for stream in self._streams:
|
||||
stream.flush()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _open_progress_stream(log_file: str | Path | None):
|
||||
if log_file is None:
|
||||
yield sys.stderr
|
||||
return
|
||||
|
||||
path = Path(log_file)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("w", encoding="utf-8") as handle:
|
||||
yield _TeeStream(sys.stderr, handle)
|
||||
|
||||
|
||||
def _block_digest(block: list[int]) -> bytes:
|
||||
digest = hashlib.blake2b(digest_size=16)
|
||||
digest.update(len(block).to_bytes(4, "little", signed=False))
|
||||
digest.update(array("I", block).tobytes())
|
||||
return digest.digest()
|
||||
|
||||
|
||||
def _load_release_tokenizer(model_family: str) -> Tokenizer:
|
||||
resolved = Path(resolve_tokenizer_path(model_family=model_family))
|
||||
tokenizer_file = resolved / "tokenizer.json" if resolved.is_dir() else resolved
|
||||
return Tokenizer.from_file(str(tokenizer_file))
|
||||
|
||||
|
||||
def _infer_window_start_ms_from_raw_rows(raw_path: Path) -> int:
|
||||
with raw_path.open("r", encoding="utf-8") as handle:
|
||||
for line in handle:
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
row = json.loads(stripped)
|
||||
meta = row.get("meta", {}) if isinstance(row.get("meta", {}), dict) else {}
|
||||
return safe_int(meta.get("request_ready_time_ms", row.get("sort_time_ms")))
|
||||
return 0
|
||||
|
||||
|
||||
def _compute_release_row_core(row: dict, *, base_ms: int) -> dict:
|
||||
meta = row.get("meta", {}) if isinstance(row.get("meta", {}), dict) else {}
|
||||
ready_ms = safe_int(meta.get("request_ready_time_ms", row.get("sort_time_ms")))
|
||||
timestamp_ms = ready_ms - base_ms if ready_ms and base_ms else 0
|
||||
return {
|
||||
"chat_id": safe_int(meta.get("chat_id")),
|
||||
"parent_chat_id": safe_int(meta.get("parent_chat_id", -1), default=-1),
|
||||
"timestamp": round(timestamp_ms / 1000.0, 3),
|
||||
"input_length": safe_int(row.get("usage", {}).get("input_tokens")),
|
||||
"output_length": safe_int(row.get("usage", {}).get("output_tokens")),
|
||||
"type": "coder",
|
||||
"turn": safe_int(meta.get("turn")),
|
||||
}
|
||||
|
||||
|
||||
def _compute_release_segments(path: Path, jobs: int) -> list[tuple[int, int, int]]:
|
||||
total_size = path.stat().st_size
|
||||
if total_size <= 0:
|
||||
return [(0, 0, 0)]
|
||||
|
||||
shard_count = max(1, min(jobs, total_size))
|
||||
boundaries = [0]
|
||||
with path.open("rb") as handle:
|
||||
for index in range(1, shard_count):
|
||||
target = total_size * index // shard_count
|
||||
handle.seek(target)
|
||||
handle.readline()
|
||||
boundary = handle.tell()
|
||||
if boundary > boundaries[-1]:
|
||||
boundaries.append(boundary)
|
||||
if boundaries[-1] != total_size:
|
||||
boundaries.append(total_size)
|
||||
|
||||
segments: list[tuple[int, int, int]] = []
|
||||
for index, (start, end) in enumerate(zip(boundaries, boundaries[1:])):
|
||||
if end > start:
|
||||
segments.append((index, start, end))
|
||||
return segments or [(0, 0, total_size)]
|
||||
|
||||
|
||||
def _build_release_shard(
|
||||
*,
|
||||
raw_input_path: str,
|
||||
shard_index: int,
|
||||
start_offset: int,
|
||||
end_offset: int,
|
||||
shard_output_path: str,
|
||||
block_size: int,
|
||||
base_ms: int,
|
||||
) -> dict:
|
||||
input_path = Path(raw_input_path)
|
||||
output_path = Path(shard_output_path)
|
||||
tokenizer_cache: dict[str, Tokenizer] = {}
|
||||
row_count = 0
|
||||
|
||||
with input_path.open("rb") as source, output_path.open("w", encoding="utf-8") as destination:
|
||||
source.seek(start_offset)
|
||||
while source.tell() < end_offset:
|
||||
line_bytes = source.readline()
|
||||
if not line_bytes:
|
||||
break
|
||||
stripped = line_bytes.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
|
||||
row = json.loads(stripped)
|
||||
meta = row.get("meta", {}) if isinstance(row.get("meta", {}), dict) else {}
|
||||
model_family = str(meta.get("model_family", "") or "glm5")
|
||||
tokenizer = tokenizer_cache.get(model_family)
|
||||
if tokenizer is None:
|
||||
tokenizer = _load_release_tokenizer(model_family)
|
||||
tokenizer_cache[model_family] = tokenizer
|
||||
|
||||
token_ids = tokenizer.encode(str(row.get("canonical_prompt", ""))).ids
|
||||
digest_hexes = []
|
||||
for index in range(0, len(token_ids), block_size):
|
||||
block = token_ids[index:index + block_size]
|
||||
digest_hexes.append(_block_digest(block).hex())
|
||||
|
||||
core = _compute_release_row_core(row, base_ms=base_ms)
|
||||
destination.write(json.dumps(core, ensure_ascii=False, separators=(",", ":")))
|
||||
destination.write("\t")
|
||||
destination.write(",".join(digest_hexes))
|
||||
destination.write("\n")
|
||||
row_count += 1
|
||||
|
||||
return {
|
||||
"shard_index": shard_index,
|
||||
"shard_output_path": str(output_path),
|
||||
"row_count": row_count,
|
||||
"size_bytes": output_path.stat().st_size if output_path.exists() else 0,
|
||||
}
|
||||
|
||||
|
||||
def export_release_ready_trace(
|
||||
*,
|
||||
raw_input_path: str | Path,
|
||||
release_output_path: str | Path,
|
||||
window_start_ms: int | None = None,
|
||||
block_size: int = 512,
|
||||
jobs: int | None = None,
|
||||
tmp_dir: str | Path | None = None,
|
||||
show_progress: bool = False,
|
||||
progress_stream=None,
|
||||
log_file: str | Path | None = None,
|
||||
) -> dict:
|
||||
if progress_stream is None:
|
||||
with _open_progress_stream(log_file) as owned_progress_stream:
|
||||
return export_release_ready_trace(
|
||||
raw_input_path=raw_input_path,
|
||||
release_output_path=release_output_path,
|
||||
window_start_ms=window_start_ms,
|
||||
block_size=block_size,
|
||||
jobs=jobs,
|
||||
tmp_dir=tmp_dir,
|
||||
show_progress=show_progress,
|
||||
progress_stream=owned_progress_stream,
|
||||
)
|
||||
|
||||
input_path = Path(raw_input_path)
|
||||
release_destination = Path(release_output_path)
|
||||
release_destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
requested_jobs = jobs if jobs is not None else min(os.cpu_count() or 1, 16)
|
||||
shard_jobs = max(1, requested_jobs)
|
||||
base_ms = window_start_ms or _infer_window_start_ms_from_raw_rows(input_path)
|
||||
segments = _compute_release_segments(input_path, shard_jobs)
|
||||
next_block_id = 0
|
||||
block_ids_by_digest: dict[str, int] = {}
|
||||
row_count = 0
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=tmp_dir) as temp_root:
|
||||
shard_root = Path(temp_root) / "release-shards"
|
||||
shard_root.mkdir(parents=True, exist_ok=True)
|
||||
shard_specs = [
|
||||
{
|
||||
"raw_input_path": str(input_path),
|
||||
"shard_index": shard_index,
|
||||
"start_offset": start_offset,
|
||||
"end_offset": end_offset,
|
||||
"shard_output_path": str(shard_root / f"shard_{shard_index:05d}.jsonl"),
|
||||
"block_size": block_size,
|
||||
"base_ms": base_ms,
|
||||
}
|
||||
for shard_index, start_offset, end_offset in segments
|
||||
]
|
||||
|
||||
shard_progress = tqdm(
|
||||
total=len(shard_specs),
|
||||
desc="Build release shards",
|
||||
unit="shard",
|
||||
dynamic_ncols=True,
|
||||
file=progress_stream or sys.stderr,
|
||||
disable=not show_progress,
|
||||
)
|
||||
shard_results: list[dict] = []
|
||||
try:
|
||||
if len(shard_specs) == 1:
|
||||
shard_results.append(_build_release_shard(**shard_specs[0]))
|
||||
if show_progress:
|
||||
shard_progress.update(1)
|
||||
else:
|
||||
with ProcessPoolExecutor(max_workers=len(shard_specs)) as executor:
|
||||
futures = [executor.submit(_build_release_shard, **spec) for spec in shard_specs]
|
||||
for future in as_completed(futures):
|
||||
shard_results.append(future.result())
|
||||
if show_progress:
|
||||
shard_progress.update(1)
|
||||
finally:
|
||||
if show_progress:
|
||||
shard_progress.close()
|
||||
|
||||
shard_results.sort(key=lambda item: item["shard_index"])
|
||||
shard_paths = [Path(item["shard_output_path"]) for item in shard_results]
|
||||
finalize_progress = tqdm(
|
||||
total=sum(item["size_bytes"] for item in shard_results),
|
||||
desc="Finalize release trace",
|
||||
unit="B",
|
||||
unit_scale=True,
|
||||
dynamic_ncols=True,
|
||||
file=progress_stream or sys.stderr,
|
||||
disable=not show_progress,
|
||||
)
|
||||
try:
|
||||
with release_destination.open("w", encoding="utf-8") as destination:
|
||||
for shard_path in shard_paths:
|
||||
with shard_path.open("r", encoding="utf-8") as source:
|
||||
for line in source:
|
||||
stripped = line.rstrip("\n")
|
||||
if not stripped:
|
||||
if show_progress:
|
||||
finalize_progress.update(len(line.encode("utf-8")))
|
||||
continue
|
||||
core_json, _, digest_text = stripped.partition("\t")
|
||||
release_row = json.loads(core_json)
|
||||
hash_ids = []
|
||||
if digest_text:
|
||||
for digest_hex in digest_text.split(","):
|
||||
if not digest_hex:
|
||||
continue
|
||||
block_id = block_ids_by_digest.get(digest_hex)
|
||||
if block_id is None:
|
||||
block_id = next_block_id
|
||||
next_block_id += 1
|
||||
block_ids_by_digest[digest_hex] = block_id
|
||||
hash_ids.append(block_id)
|
||||
release_row["hash_ids"] = hash_ids
|
||||
destination.write(json.dumps(release_row, ensure_ascii=False))
|
||||
destination.write("\n")
|
||||
row_count += 1
|
||||
if show_progress:
|
||||
finalize_progress.update(len(line.encode("utf-8")))
|
||||
finalize_progress.set_postfix(rows=row_count, unique_blocks=next_block_id)
|
||||
finally:
|
||||
if show_progress:
|
||||
finalize_progress.close()
|
||||
|
||||
return {
|
||||
"release_output_path": str(release_destination),
|
||||
"release_row_count": row_count,
|
||||
"release_unique_block_count": next_block_id,
|
||||
"release_shard_count": len(segments),
|
||||
}
|
||||
|
||||
|
||||
def format_and_sort_trace(
|
||||
*,
|
||||
input_dir: str | Path,
|
||||
output_path: str | Path,
|
||||
tmp_dir: str | Path | None = None,
|
||||
chunk_bytes: int = 128 * 1024 * 1024,
|
||||
start_time: str | None = None,
|
||||
end_time: str | None = None,
|
||||
truncate_to_window: bool = True,
|
||||
show_progress: bool = False,
|
||||
log_file: str | Path | None = None,
|
||||
) -> dict:
|
||||
source_files = discover_source_files(input_dir)
|
||||
destination = Path(output_path)
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
time_offset_ms = infer_time_offset_ms(source_files[0]) if source_files else 0
|
||||
time_window = infer_time_window(source_files, start_time=start_time, end_time=end_time) if truncate_to_window else None
|
||||
total_input_bytes = sum(path.stat().st_size for path in source_files if path.suffix != ".zst")
|
||||
has_zst = any(path.suffix == ".zst" for path in source_files)
|
||||
|
||||
with _open_progress_stream(log_file) as progress_stream, tempfile.TemporaryDirectory(dir=tmp_dir) as temp_root:
|
||||
temp_raw_destination = Path(temp_root) / "formatted-raw.tmp.jsonl"
|
||||
chunk_root = Path(temp_root)
|
||||
chunk_paths: list[Path] = []
|
||||
chunk_rows: list[tuple[int, int, str, str, str, str, str]] = []
|
||||
chunk_size_bytes = 0
|
||||
total_rows = 0
|
||||
global_seq = 0
|
||||
min_sort_time_ms: int | None = None
|
||||
max_sort_time_ms: int | None = None
|
||||
user_scoped_rows = 0
|
||||
truncated_rows = 0
|
||||
filtered_rows = 0
|
||||
filtered_empty_messages_rows = 0
|
||||
filtered_empty_response_rows = 0
|
||||
|
||||
scan_progress = tqdm(
|
||||
total=None if has_zst else total_input_bytes,
|
||||
desc="Scan raw trace",
|
||||
unit="B" if not has_zst else "line",
|
||||
unit_scale=not has_zst,
|
||||
dynamic_ncols=True,
|
||||
file=progress_stream,
|
||||
disable=not show_progress,
|
||||
)
|
||||
try:
|
||||
for source_file in source_files:
|
||||
with open_trace_text(source_file) as handle:
|
||||
for source_line, line in enumerate(handle, start=1):
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
if show_progress:
|
||||
scan_progress.update(1 if has_zst else len(line.encode("utf-8")))
|
||||
continue
|
||||
raw = json.loads(stripped)
|
||||
request_params, response_params, request_payload, attributes, messages = _extract_request_components(raw)
|
||||
empty_messages = not isinstance(messages, list) or len(messages) == 0
|
||||
empty_response_params = _has_empty_response_params(raw, response_params)
|
||||
if empty_messages or empty_response_params:
|
||||
filtered_rows += 1
|
||||
if empty_messages:
|
||||
filtered_empty_messages_rows += 1
|
||||
if empty_response_params:
|
||||
filtered_empty_response_rows += 1
|
||||
if show_progress:
|
||||
scan_progress.update(1 if has_zst else len(line.encode("utf-8")))
|
||||
scan_progress.set_postfix(
|
||||
kept=total_rows,
|
||||
filtered=filtered_rows,
|
||||
truncated=truncated_rows,
|
||||
chunks=len(chunk_paths),
|
||||
)
|
||||
continue
|
||||
if time_window is not None:
|
||||
ready_time_ms = _extract_sort_time_ms(raw, attributes, time_offset_ms=time_offset_ms)
|
||||
if ready_time_ms and (ready_time_ms < time_window.start_ms or ready_time_ms >= time_window.end_ms):
|
||||
truncated_rows += 1
|
||||
if show_progress:
|
||||
scan_progress.update(1 if has_zst else len(line.encode("utf-8")))
|
||||
scan_progress.set_postfix(
|
||||
kept=total_rows,
|
||||
filtered=filtered_rows,
|
||||
truncated=truncated_rows,
|
||||
chunks=len(chunk_paths),
|
||||
)
|
||||
continue
|
||||
sort_time_ms, user_id, request_id, sequence_hashes, roles, row_json = _build_unified_row_payload_from_components(
|
||||
raw,
|
||||
request_params=request_params,
|
||||
response_params=response_params,
|
||||
request_payload=request_payload,
|
||||
request_attributes=attributes,
|
||||
messages=messages,
|
||||
source_file=source_file.name,
|
||||
source_line=source_line,
|
||||
time_offset_ms=time_offset_ms,
|
||||
)
|
||||
chunk_rows.append((sort_time_ms, global_seq, user_id, request_id, sequence_hashes, roles, row_json))
|
||||
chunk_size_bytes += (
|
||||
len(row_json.encode("utf-8"))
|
||||
+ len(user_id.encode("utf-8"))
|
||||
+ len(request_id.encode("utf-8"))
|
||||
+ len(sequence_hashes.encode("utf-8"))
|
||||
+ len(roles.encode("utf-8"))
|
||||
+ 64
|
||||
)
|
||||
total_rows += 1
|
||||
global_seq += 1
|
||||
min_sort_time_ms = sort_time_ms if min_sort_time_ms is None else min(min_sort_time_ms, sort_time_ms)
|
||||
max_sort_time_ms = sort_time_ms if max_sort_time_ms is None else max(max_sort_time_ms, sort_time_ms)
|
||||
if user_id:
|
||||
user_scoped_rows += 1
|
||||
|
||||
if chunk_size_bytes >= chunk_bytes:
|
||||
chunk_paths.append(_write_chunk(chunk_rows, chunk_root, len(chunk_paths)))
|
||||
chunk_rows = []
|
||||
chunk_size_bytes = 0
|
||||
|
||||
if show_progress:
|
||||
scan_progress.update(1 if has_zst else len(line.encode("utf-8")))
|
||||
scan_progress.set_postfix(
|
||||
kept=total_rows,
|
||||
filtered=filtered_rows,
|
||||
truncated=truncated_rows,
|
||||
chunks=len(chunk_paths),
|
||||
)
|
||||
finally:
|
||||
if show_progress:
|
||||
scan_progress.close()
|
||||
|
||||
if chunk_rows:
|
||||
chunk_paths.append(_write_chunk(chunk_rows, chunk_root, len(chunk_paths)))
|
||||
|
||||
iterators = [_iter_chunk_rows(path) for path in chunk_paths]
|
||||
sessionizer = LogicalSessionizer()
|
||||
merge_progress = tqdm(
|
||||
total=total_rows,
|
||||
desc="Merge formatted trace",
|
||||
unit="row",
|
||||
dynamic_ncols=True,
|
||||
file=progress_stream,
|
||||
disable=not show_progress,
|
||||
)
|
||||
try:
|
||||
with temp_raw_destination.open("w", encoding="utf-8") as output_handle:
|
||||
for _, _, user_id, request_id, sequence_hashes, roles, row_json in heapq.merge(
|
||||
*iterators, key=lambda item: (item[0], item[1])
|
||||
):
|
||||
assignment = sessionizer.assign_precomputed(
|
||||
user_id=user_id,
|
||||
request_id=request_id,
|
||||
sequence_hashes=decode_prefix_hashes(sequence_hashes),
|
||||
roles=decode_roles(roles),
|
||||
)
|
||||
output_handle.write(_apply_session_assignment_to_row_json(row_json, assignment))
|
||||
output_handle.write("\n")
|
||||
if show_progress:
|
||||
merge_progress.update(1)
|
||||
merge_progress.set_postfix(rows=total_rows, chunks=len(chunk_paths))
|
||||
finally:
|
||||
if show_progress:
|
||||
merge_progress.close()
|
||||
|
||||
shutil.move(str(temp_raw_destination), str(destination))
|
||||
|
||||
return {
|
||||
"output_path": str(destination),
|
||||
"row_count": total_rows,
|
||||
"source_file_count": len(source_files),
|
||||
"chunk_count": len(chunk_paths),
|
||||
"min_sort_time_ms": min_sort_time_ms or 0,
|
||||
"max_sort_time_ms": max_sort_time_ms or 0,
|
||||
"rows_with_user_id": user_scoped_rows,
|
||||
"truncated_row_count": truncated_rows,
|
||||
"filtered_row_count": filtered_rows,
|
||||
"filtered_empty_messages_row_count": filtered_empty_messages_rows,
|
||||
"filtered_empty_response_params_row_count": filtered_empty_response_rows,
|
||||
"window_start_ms": time_window.start_ms if time_window is not None else None,
|
||||
"window_end_ms": time_window.end_ms if time_window is not None else None,
|
||||
}
|
||||
Reference in New Issue
Block a user