Files
ali-trace-tools/tests/test_ali_trace_pipeline.py

942 lines
42 KiB
Python

import json
import shutil
import subprocess
import tempfile
import tomllib
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest import mock
from trace_analyzer.cli import main as analyzer_main
from trace_analyzer.parser import load_records
from trace_formatter.cli import main as formatter_main
from trace_formatter.formatting import build_unified_row, discover_source_files, export_release_ready_trace, format_and_sort_trace
def utc_ms(value: str) -> int:
return int(datetime.strptime(value, "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc).timestamp() * 1000)
def wall_clock_ms_with_offset(value: str, offset_hours: int) -> int:
return utc_ms(value) - offset_hours * 60 * 60 * 1000
def make_raw_row(
request_id: str,
ready_ms: int,
tool_role: bool = False,
*,
raw_session_id: str = "sess-1",
user_id: str = "user-1",
messages: list[dict] | None = None,
time_text: str = "2026-04-17 15:00:00.000",
) -> dict:
if messages is None:
messages = [{"role": "user", "content": "hello"}]
if tool_role:
messages.extend(
[
{"role": "assistant", "content": "calling"},
{"role": "tool", "content": "tool-output"},
]
)
return {
"__time__": str(ready_ms // 1000),
"request_id": request_id,
"session_id": raw_session_id,
"request_model": "glm-5",
"time": time_text,
"status_code": "1000",
"status_name": "ok",
"total_cost_time": "250",
"request_params": json.dumps(
{
"header": {
"attributes": {
"x-dashscope-inner-requestreadytime": str(ready_ms),
"user_id": user_id,
}
},
"payload": {
"input": {"messages": messages},
"parameters": {
"tools": [{"type": "function", "function": {"name": "read_file"}}],
},
},
}
),
"response_params": json.dumps(
{
"header": {
"attributes": {
"x-ds-backend-first-request-time": str(ready_ms - 100),
"x-ds-backend-first-response-time": str(ready_ms + 150),
}
},
"payload": {
"output": {
"choices": [
{
"message": {
"role": "assistant",
"content": "done",
}
}
]
},
"usage": {
"input_tokens": 20,
"output_tokens": 5,
"total_tokens": 25,
"output_tokens_details": {"reasoning_tokens": 1},
"prompt_tokens_details": {"cached_tokens": 10},
},
}
}
),
}
def make_qwen_raw_row(
request_id: str,
ready_ms: int,
*,
raw_session_id: str = "qwen-sess-1",
user_id: str = "qwen-user-1",
time_text: str = "2026-04-19 15:00:00.000",
) -> dict:
return {
"__time__": str(ready_ms // 1000),
"request_id": request_id,
"session_id": raw_session_id,
"request_model": "qwen3-coder-plus-2025-09-23",
"time": time_text,
"status_code": "200",
"status_name": "OK",
"total_cost_time": "800",
"request_params": json.dumps(
{
"header": {
"attributes": {
"x-dashscope-inner-requestreadytime": str(ready_ms),
"user_id": user_id,
}
},
"payload": {
"input": {
"messages": [
{"role": "system", "content": "You are Qwen."},
{"role": "user", "content": "Write a function."},
]
},
"parameters": {
"tools": [
{
"type": "function",
"function": {
"name": "run_command",
"parameters": {
"type": "object",
"properties": {"cmd": {"type": "string"}},
},
},
}
]
},
},
},
ensure_ascii=False,
),
"response_params": json.dumps(
{
"header": {
"attributes": {
"x-ds-backend-first-request-time": str(ready_ms - 200),
"x-ds-backend-first-response-time": str(ready_ms + 300),
}
},
"payload": {
"output": {
"choices": [
{
"delta": {"role": "assistant", "content": "Sure."},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 128,
"completion_tokens": 16,
"total_tokens": 144,
"prompt_tokens_details": {"cached_tokens": 32},
},
}
}
},
ensure_ascii=False,
),
}
class AliTracePipelineTest(unittest.TestCase):
def test_build_unified_row_keeps_analysis_fields(self):
ready_ms = utc_ms("2026-04-17 15:00:00.321")
row = build_unified_row(make_raw_row("req-1", ready_ms), source_file="a.jsonl", source_line=3)
self.assertEqual(row["meta"]["request_id"], "req-1")
self.assertEqual(row["sort_time_ms"], ready_ms)
self.assertEqual(row["usage"]["cached_tokens"], 10)
self.assertEqual(row["meta"]["raw_session_id"], "sess-1")
self.assertEqual(row["meta"]["user_id"], "user-1")
self.assertEqual(row["meta"]["model_family"], "glm5")
self.assertEqual(row["meta"]["backend_first_request_time_ms"], ready_ms - 100)
self.assertEqual(row["meta"]["backend_first_response_time_ms"], ready_ms + 150)
self.assertEqual(row["meta"]["total_cost_time_ms"], 250)
self.assertEqual(row["declared_tools"][0]["name"], "read_file")
self.assertEqual(row["message_events"][0]["role"], "user")
self.assertEqual(row["raw_messages"][0]["content"], "hello")
self.assertIn("[gMASK]<sop>", row["canonical_prompt"])
def test_build_unified_row_supports_qwen_raw_trace_defaults(self):
ready_ms = utc_ms("2026-04-19 15:00:00.321")
row = build_unified_row(make_qwen_raw_row("qwen-req-1", ready_ms), source_file="qwen.jsonl", source_line=5)
self.assertEqual(row["meta"]["model_family"], "qwen3-coder")
self.assertEqual(row["usage"]["input_tokens"], 128)
self.assertEqual(row["usage"]["output_tokens"], 16)
self.assertEqual(row["usage"]["cached_tokens"], 32)
self.assertEqual(row["meta"]["backend_first_request_time_ms"], ready_ms - 200)
self.assertEqual(row["meta"]["backend_first_response_time_ms"], ready_ms + 300)
self.assertEqual(row["declared_tools"][0]["name"], "run_command")
self.assertIn("<|im_start|>system", row["canonical_prompt"])
def test_format_and_sort_trace_outputs_time_sorted_unified_rows(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows_by_file = {
"0417-1530-1600.jsonl": [
make_raw_row(
"req-late",
utc_ms("2026-04-17 15:00:03.000"),
time_text="2026-04-17 15:30:30.000",
),
make_raw_row(
"req-middle",
utc_ms("2026-04-17 15:00:02.000"),
tool_role=True,
time_text="2026-04-17 15:30:20.000",
),
],
"0417-1500-1530.jsonl": [
make_raw_row(
"req-first",
utc_ms("2026-04-17 15:00:01.000"),
time_text="2026-04-17 15:00:10.000",
),
],
}
for filename, rows in rows_by_file.items():
with (input_dir / filename).open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
discovered = discover_source_files(input_dir)
self.assertEqual([path.name for path in discovered], sorted(rows_by_file))
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=512)
self.assertEqual(stats["row_count"], 3)
records = load_records(output_path)
self.assertEqual([record.meta.request_id for record in records], ["req-first", "req-middle", "req-late"])
self.assertEqual(records[1].usage.cached_tokens, 10)
def test_format_and_sort_trace_reconstructs_logical_sessions_from_message_history(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows = [
make_raw_row(
"req-root-a",
utc_ms("2026-04-17 15:00:01.000"),
raw_session_id="111",
user_id="user-a",
messages=[{"role": "user", "content": "hello"}],
),
make_raw_row(
"req-root-b",
utc_ms("2026-04-17 15:00:01.500"),
raw_session_id="111",
user_id="user-b",
messages=[{"role": "user", "content": "hello"}],
),
make_raw_row(
"req-turn-2-a",
utc_ms("2026-04-17 15:00:02.000"),
raw_session_id="999",
user_id="user-a",
messages=[
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "continue"},
],
),
]
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(formatted_rows[0]["meta"]["turn"], 1)
self.assertEqual(formatted_rows[1]["meta"]["turn"], 1)
self.assertEqual(formatted_rows[2]["meta"]["turn"], 2)
self.assertNotEqual(formatted_rows[0]["meta"]["session_id"], formatted_rows[1]["meta"]["session_id"])
self.assertEqual(formatted_rows[0]["meta"]["session_id"], formatted_rows[2]["meta"]["session_id"])
self.assertEqual(formatted_rows[2]["meta"]["parent_request_id"], "req-root-a")
def test_format_and_sort_trace_does_not_merge_sessions_on_shared_system_prompt_only(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows = [
make_raw_row(
"req-a",
utc_ms("2026-04-17 15:00:01.000"),
user_id="user-a",
messages=[
{"role": "system", "content": "shared system prompt"},
{"role": "user", "content": "question a"},
],
),
make_raw_row(
"req-b",
utc_ms("2026-04-17 15:00:02.000"),
user_id="user-a",
messages=[
{"role": "system", "content": "shared system prompt"},
{"role": "user", "content": "question b"},
],
),
]
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["turn"] for row in formatted_rows], [1, 1])
self.assertNotEqual(formatted_rows[0]["meta"]["session_id"], formatted_rows[1]["meta"]["session_id"])
self.assertEqual([row["meta"]["parent_request_id"] for row in formatted_rows], ["", ""])
def test_format_and_sort_trace_reconstructs_sessions_when_child_drops_parent_suffix(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows = [
make_raw_row(
"req-root",
utc_ms("2026-04-17 15:00:01.000"),
raw_session_id="111",
user_id="user-a",
messages=[
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "draft answer"},
{"role": "user", "content": "continue"},
{"role": "assistant", "content": "long hidden reasoning that may be evicted"},
],
),
make_raw_row(
"req-turn-2",
utc_ms("2026-04-17 15:00:02.000"),
raw_session_id="999",
user_id="user-a",
messages=[
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "draft answer"},
{"role": "user", "content": "continue"},
],
),
]
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(formatted_rows[0]["meta"]["turn"], 1)
self.assertEqual(formatted_rows[1]["meta"]["turn"], 2)
self.assertEqual(formatted_rows[0]["meta"]["session_id"], formatted_rows[1]["meta"]["session_id"])
self.assertEqual(formatted_rows[1]["meta"]["parent_request_id"], "req-root")
def test_format_and_sort_trace_reconstructs_sessions_when_last_non_user_messages_change(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows = [
make_raw_row(
"req-root",
utc_ms("2026-04-17 15:00:01.000"),
user_id="user-a",
messages=[
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "draft answer"},
],
),
make_raw_row(
"req-regenerated",
utc_ms("2026-04-17 15:00:02.000"),
user_id="user-a",
messages=[
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "regenerated answer"},
],
),
]
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["turn"] for row in formatted_rows], [1, 2])
self.assertEqual(formatted_rows[0]["meta"]["session_id"], formatted_rows[1]["meta"]["session_id"])
self.assertEqual(formatted_rows[1]["meta"]["parent_request_id"], "req-root")
def test_format_and_sort_trace_truncates_requests_before_inferred_window_start_by_ready_time(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows = [
make_raw_row(
"req-too-early",
utc_ms("2026-04-17 14:59:59.900"),
time_text="2026-04-17 15:00:00.100",
),
make_raw_row(
"req-kept",
utc_ms("2026-04-17 15:00:00.100"),
time_text="2026-04-17 15:00:00.100",
),
]
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 1)
self.assertEqual(stats["truncated_row_count"], 1)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["request_id"] for row in formatted_rows], ["req-kept"])
def test_format_and_sort_trace_filters_empty_messages_and_empty_response_params(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
empty_messages_row = make_raw_row("req-empty-messages", utc_ms("2026-04-17 15:00:01.000"))
empty_messages_request = json.loads(empty_messages_row["request_params"])
empty_messages_request["payload"]["input"]["messages"] = []
empty_messages_row["request_params"] = json.dumps(empty_messages_request)
empty_response_row = make_raw_row("req-empty-response", utc_ms("2026-04-17 15:00:02.000"))
empty_response_row["response_params"] = None
kept_row = make_raw_row("req-kept", utc_ms("2026-04-17 15:00:03.000"))
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
for row in [empty_messages_row, empty_response_row, kept_row]:
handle.write(json.dumps(row) + "\n")
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 1)
self.assertEqual(stats["filtered_row_count"], 2)
self.assertEqual(stats["filtered_empty_messages_row_count"], 1)
self.assertEqual(stats["filtered_empty_response_params_row_count"], 1)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["request_id"] for row in formatted_rows], ["req-kept"])
def test_format_and_sort_trace_normalizes_invalid_surrogates_before_chunking(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
row = make_raw_row(
"req-\ud83c",
utc_ms("2026-04-17 15:00:03.000"),
messages=[{"role": "user", "content": "bad \ud83c content"}],
)
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
handle.write(json.dumps(row) + "\n")
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 1)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(formatted_rows[0]["meta"]["request_id"], "req-\uFFFD")
self.assertEqual(formatted_rows[0]["message_events"][0]["text_len"], len("bad \uFFFD content"))
self.assertEqual(formatted_rows[0]["raw_messages"][0]["content"], "bad \uFFFD content")
self.assertIn("\uFFFD", formatted_rows[0]["canonical_prompt"])
def test_format_and_sort_trace_normalizes_nonstandard_glm_tool_call_shapes(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
row = make_raw_row(
"req-tool-call-shape",
utc_ms("2026-04-17 15:00:04.000"),
messages=[
{"role": "user", "content": "hello"},
{
"role": "assistant",
"content": "calling tool",
"tool_calls": {
"id": "call-1",
"type": "function",
"arguments": "{\"path\":\"/tmp/a.txt\"}",
},
},
],
)
request_params = json.loads(row["request_params"])
request_params["payload"]["parameters"]["tools"] = {
"type": "function",
"name": "read_file",
"parameters": {"type": "object", "properties": {"path": {"type": "string"}}},
}
row["request_params"] = json.dumps(request_params)
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
handle.write(json.dumps(row) + "\n")
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 1)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertIn("<tools>", formatted_rows[0]["canonical_prompt"])
self.assertIn("\"name\": \"read_file\"", formatted_rows[0]["canonical_prompt"])
self.assertIn("<tool_call>", formatted_rows[0]["canonical_prompt"])
self.assertIn("<arg_key>path</arg_key>", formatted_rows[0]["canonical_prompt"])
def test_trace_formatter_cli_formats_one_raw_jsonl_file(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
raw_path = root / "trace-glm5.jsonl"
output_root = root / "outputs" / "formatted"
with raw_path.open("w", encoding="utf-8") as handle:
handle.write(json.dumps(make_raw_row("req-1", utc_ms("2026-04-17 15:00:01.000"))) + "\n")
handle.write(json.dumps(make_raw_row("req-2", utc_ms("2026-04-17 15:00:02.000"))) + "\n")
exit_code = formatter_main(
["format", str(raw_path), "--output-root", str(output_root), "--chunk-bytes", "256"]
)
self.assertEqual(exit_code, 0)
raw_formatted_path = output_root / "trace-glm5-raw.jsonl"
self.assertTrue(raw_formatted_path.exists())
records = load_records(raw_formatted_path)
self.assertEqual([record.meta.request_id for record in records], ["req-1", "req-2"])
def test_trace_formatter_cli_defaults_to_trace_formatted_dir_for_trace_file(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
trace_dir = root / "trace-demo"
trace_dir.mkdir()
raw_path = trace_dir / "0417-1500-1530.jsonl"
with raw_path.open("w", encoding="utf-8") as handle:
handle.write(json.dumps(make_raw_row("req-1", utc_ms("2026-04-17 15:00:01.000"))) + "\n")
exit_code = formatter_main(["format", str(raw_path)])
self.assertEqual(exit_code, 0)
raw_formatted_path = root / "trace-demo-formatted" / "041715-041715-raw.jsonl"
self.assertTrue(raw_formatted_path.exists())
records = load_records(raw_formatted_path)
self.assertEqual([record.meta.request_id for record in records], ["req-1"])
def test_trace_formatter_cli_build_release_from_raw_in_second_stage(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
raw_path = root / "trace-glm5.jsonl"
output_root = root / "outputs" / "formatted"
with raw_path.open("w", encoding="utf-8") as handle:
handle.write(json.dumps(make_raw_row("req-1", utc_ms("2026-04-17 15:00:01.000"))) + "\n")
handle.write(json.dumps(make_raw_row("req-2", utc_ms("2026-04-17 15:00:02.000"))) + "\n")
format_exit_code = formatter_main(
["format", str(raw_path), "--output-root", str(output_root), "--chunk-bytes", "256"]
)
self.assertEqual(format_exit_code, 0)
raw_formatted_path = output_root / "trace-glm5-raw.jsonl"
release_formatted_path = output_root / "trace-glm5.jsonl"
release_exit_code = formatter_main(
["build-release", str(raw_formatted_path), "--jobs", "2", "--block-size", "8"]
)
self.assertEqual(release_exit_code, 0)
self.assertTrue(release_formatted_path.exists())
release_rows = [json.loads(line) for line in release_formatted_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(len(release_rows), 2)
self.assertEqual(sorted(release_rows[0]), ["chat_id", "hash_ids", "input_length", "output_length", "parent_chat_id", "timestamp", "turn", "type"])
self.assertEqual([row["chat_id"] for row in release_rows], [0, 1])
def test_trace_formatter_cli_can_write_progress_log(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
raw_path = root / "trace-glm5.jsonl"
output_root = root / "formatted"
log_path = root / "tmp" / "format.log"
with raw_path.open("w", encoding="utf-8") as handle:
handle.write(json.dumps(make_raw_row("req-1", utc_ms("2026-04-17 15:00:01.000"))) + "\n")
exit_code = formatter_main(
[
"format",
str(raw_path),
"--output-root",
str(output_root),
"--chunk-bytes",
"256",
"--log-file",
str(log_path),
]
)
self.assertEqual(exit_code, 0)
self.assertTrue(log_path.exists())
self.assertIn("Scan raw trace", log_path.read_text(encoding="utf-8"))
def test_format_and_sort_trace_defaults_temp_dir_to_output_parent(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_dir = root / "formatted"
output_path = output_dir / "formatted.jsonl"
with (input_dir / "0417-1500-1530.jsonl").open("w", encoding="utf-8") as handle:
handle.write(json.dumps(make_raw_row("req-1", utc_ms("2026-04-17 15:00:01.000"))) + "\n")
captured = {}
real_temporary_directory = tempfile.TemporaryDirectory
def recording_temporary_directory(*args, **kwargs):
captured["dir"] = kwargs.get("dir")
return real_temporary_directory(*args, **kwargs)
with mock.patch("trace_formatter.formatting.tempfile.TemporaryDirectory", side_effect=recording_temporary_directory):
format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(Path(captured["dir"]), output_dir)
def test_format_and_sort_trace_supports_zstd_input_during_time_inference(self):
if shutil.which("zstd") is None or shutil.which("zstdcat") is None:
self.skipTest("zstd/zstdcat are required for .jsonl.zst formatter smoke test")
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
raw_path = root / "0417-1500-1530.jsonl"
zst_path = root / "0417-1500-1530.jsonl.zst"
output_path = root / "formatted" / "trace-raw.jsonl"
raw_path.write_text(
json.dumps(
make_raw_row(
"req-zst",
utc_ms("2026-04-17 15:00:01.000"),
time_text="2026-04-17 15:00:01.000",
)
)
+ "\n",
encoding="utf-8",
)
subprocess.run(["zstd", "-q", str(raw_path), "-o", str(zst_path)], check=True)
stats = format_and_sort_trace(input_dir=zst_path, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 1)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["request_id"] for row in formatted_rows], ["req-zst"])
def test_export_release_ready_trace_defaults_temp_dir_to_output_parent(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
raw_input_path = root / "trace-raw.jsonl"
release_output_dir = root / "release"
release_output_path = release_output_dir / "trace.jsonl"
raw_row = {
"schema_version": "2026.04.21",
"sort_time_ms": utc_ms("2026-04-17 15:00:01.000"),
"meta": {
"model_family": "glm5",
"request_ready_time_ms": utc_ms("2026-04-17 15:00:01.000"),
"chat_id": 0,
"parent_chat_id": -1,
"turn": 1,
},
"canonical_prompt": "hello",
"usage": {"input_tokens": 1, "output_tokens": 1},
}
raw_input_path.write_text(json.dumps(raw_row) + "\n", encoding="utf-8")
captured = {}
real_temporary_directory = tempfile.TemporaryDirectory
def recording_temporary_directory(*args, **kwargs):
captured["dir"] = kwargs.get("dir")
return real_temporary_directory(*args, **kwargs)
with mock.patch("trace_formatter.formatting.tempfile.TemporaryDirectory", side_effect=recording_temporary_directory):
export_release_ready_trace(
raw_input_path=raw_input_path,
release_output_path=release_output_path,
jobs=1,
block_size=8,
)
self.assertEqual(Path(captured["dir"]), release_output_dir)
def test_format_and_sort_trace_infers_window_in_ready_time_scale_when_wall_clock_has_timezone_offset(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
rows = [
make_raw_row(
"req-too-early",
wall_clock_ms_with_offset("2026-04-19 14:59:59.900", 8),
time_text="2026-04-19 14:59:59.900",
),
make_raw_row(
"req-kept-start",
wall_clock_ms_with_offset("2026-04-19 15:00:00.100", 8),
time_text="2026-04-19 15:00:00.100",
),
make_raw_row(
"req-kept-end",
wall_clock_ms_with_offset("2026-04-19 16:59:59.900", 8),
time_text="2026-04-19 16:59:59.900",
),
make_raw_row(
"req-too-late",
wall_clock_ms_with_offset("2026-04-19 17:00:00.000", 8),
time_text="2026-04-19 17:00:00.000",
),
]
with (input_dir / "0419-1500-1700.jsonl").open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row) + "\n")
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 2)
self.assertEqual(stats["truncated_row_count"], 2)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["request_id"] for row in formatted_rows], ["req-kept-start", "req-kept-end"])
def test_format_and_sort_trace_normalizes_mixed_ready_time_scales_before_sorting(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
input_dir = root / "raw"
input_dir.mkdir()
output_path = root / "formatted.jsonl"
standard_row = make_raw_row(
"req-standard",
wall_clock_ms_with_offset("2026-04-19 15:01:00.000", 8),
time_text="2026-04-19 15:01:00.000",
)
anomalous_row = make_raw_row(
"req-anomalous",
utc_ms("2026-04-19 15:00:10.000"),
time_text="2026-04-19 15:00:10.000",
)
with (input_dir / "0419-1500-1700.jsonl").open("w", encoding="utf-8") as handle:
handle.write(json.dumps(standard_row) + "\n")
handle.write(json.dumps(anomalous_row) + "\n")
stats = format_and_sort_trace(input_dir=input_dir, output_path=output_path, chunk_bytes=256)
self.assertEqual(stats["row_count"], 1)
self.assertEqual(stats["truncated_row_count"], 1)
formatted_rows = [json.loads(line) for line in output_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual([row["meta"]["request_id"] for row in formatted_rows], ["req-standard"])
def test_trace_analyzer_analyze_writes_reports_and_figures_under_outputs(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
raw_dir = root / "trace-glm5"
raw_dir.mkdir()
formatted_root = root / "outputs" / "formatted"
analysis_root = root / "outputs" / "analysis"
raw_path = raw_dir / "0417-1500-1530.jsonl"
with raw_path.open("w", encoding="utf-8") as handle:
handle.write(
json.dumps(
make_raw_row(
"req-1",
utc_ms("2026-04-17 15:00:01.000"),
user_id="user-1",
)
)
+ "\n"
)
handle.write(
json.dumps(
make_raw_row(
"req-2",
utc_ms("2026-04-17 15:00:02.000"),
user_id="user-1",
messages=[
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "continue"},
],
)
)
+ "\n"
)
formatter_exit_code = formatter_main(["format", str(raw_dir), "--output-root", str(formatted_root)])
self.assertEqual(formatter_exit_code, 0)
formatted_path = formatted_root / "041715-041715-raw.jsonl"
release_exit_code = formatter_main(["build-release", str(formatted_path), "--jobs", "1", "--block-size", "8"])
self.assertEqual(release_exit_code, 0)
analyzer_exit_code = analyzer_main(
[
"analyze",
str(formatted_path),
"--output-root",
str(analysis_root),
"--segment-mode",
"bytes",
"--block-size",
"8",
]
)
self.assertEqual(analyzer_exit_code, 0)
analysis_dir = analysis_root / "041715-041715"
self.assertFalse((analysis_dir / "normalized.jsonl").exists())
self.assertTrue((analysis_dir / "features.csv").exists())
self.assertTrue((analysis_dir / "summary.json").exists())
self.assertTrue((analysis_dir / "report.md").exists())
self.assertTrue((analysis_dir / "details" / "details_summary.json").exists())
self.assertFalse(any((analysis_dir / "details").glob("*.sqlite*")))
self.assertTrue((analysis_dir / "details" / "request_metrics.csv").exists())
self.assertTrue((analysis_dir / "details" / "theoretical_block_reuse_gaps.csv").exists())
self.assertTrue((analysis_dir / "details" / "session_bucket_boundary_miss.csv").exists())
self.assertTrue((analysis_dir / "details" / "theoretical_alive_block_timeline.csv").exists())
self.assertTrue((analysis_dir / "figures" / "01_input_output_length_cdf.png").exists())
self.assertTrue((analysis_dir / "figures" / "02_session_turns_cdf.png").exists())
self.assertTrue((analysis_dir / "figures" / "13_session_cross_bucket_kvcache_miss.png").exists())
self.assertTrue((analysis_dir / "figures" / "manifest.json").exists())
self.assertFalse((root / "figs").exists())
features_mtime_ns = (analysis_dir / "features.csv").stat().st_mtime_ns
details_summary_mtime_ns = (analysis_dir / "details" / "details_summary.json").stat().st_mtime_ns
analyzer_exit_code = analyzer_main(
[
"analyze",
str(formatted_path),
"--output-root",
str(analysis_root),
"--segment-mode",
"bytes",
"--block-size",
"8",
]
)
self.assertEqual(analyzer_exit_code, 0)
self.assertEqual((analysis_dir / "features.csv").stat().st_mtime_ns, features_mtime_ns)
self.assertEqual(
(analysis_dir / "details" / "details_summary.json").stat().st_mtime_ns,
details_summary_mtime_ns,
)
def test_trace_analyzer_rebuilds_when_same_output_gets_different_input(self):
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
formatted_root = root / "outputs" / "formatted"
analysis_root = root / "outputs" / "analysis"
def build_trace(label: str, request_id: str, ready: str) -> Path:
raw_dir = root / label
raw_dir.mkdir()
raw_path = raw_dir / "0417-1500-1530.jsonl"
raw_path.write_text(
json.dumps(make_raw_row(request_id, utc_ms(ready), user_id=f"user-{request_id}")) + "\n",
encoding="utf-8",
)
self.assertEqual(formatter_main(["format", str(raw_dir), "--output-root", str(formatted_root)]), 0)
formatted_path = formatted_root / "041715-041715-raw.jsonl"
renamed_path = formatted_root / f"{label}-raw.jsonl"
formatted_path.replace(renamed_path)
self.assertEqual(formatter_main(["build-release", str(renamed_path), "--jobs", "1", "--block-size", "8"]), 0)
return renamed_path
first_path = build_trace("first", "req-first", "2026-04-17 15:00:01.000")
second_path = build_trace("second", "req-second", "2026-04-17 15:00:02.000")
common_args = [
"--output-root",
str(analysis_root),
"--dataset-name",
"same-dataset",
"--segment-mode",
"bytes",
"--block-size",
"8",
]
self.assertEqual(analyzer_main(["analyze", str(first_path), *common_args]), 0)
analysis_dir = analysis_root / "same-dataset"
self.assertIn("req-first", (analysis_dir / "features.csv").read_text(encoding="utf-8"))
self.assertEqual(analyzer_main(["analyze", str(second_path), *common_args]), 0)
features_text = (analysis_dir / "features.csv").read_text(encoding="utf-8")
self.assertIn("req-second", features_text)
self.assertNotIn("req-first", features_text)
details_summary = json.loads((analysis_dir / "details" / "details_summary.json").read_text(encoding="utf-8"))
self.assertTrue(str(details_summary["release_path"]).endswith("second.jsonl"))
def test_pyproject_includes_trace_model_meta_package_data(self):
pyproject = tomllib.loads(Path("pyproject.toml").read_text(encoding="utf-8"))
package_data = pyproject["tool"]["setuptools"]["package-data"]
self.assertIn("trace_model_meta", package_data)
self.assertIn("**/*.json", package_data["trace_model_meta"])
self.assertIn("**/*.jinja", package_data["trace_model_meta"])