import csv import json import subprocess import sys import tempfile import unittest from pathlib import Path from trace_analyzer.features import compute_features from trace_analyzer.parser import default_output_dir, infer_analysis_dataset_name, load_records from trace_analyzer.report import build_summary from trace_analyzer.study import ( build_alive_block_timeline, build_input_length_bucket_defs, compute_theoretical_cache, parse_input_length_bucket_thresholds, summarize_cache_reuse_by_input_length_bucket, summarize_session_bucket_boundary_miss, ) from trace_formatter.formatting import format_and_sort_trace def make_record( request_id, session_id, messages, tools, usage, total_cost_time, status_code="1000", model="glm-5", ): return { "request_id": request_id, "session_id": session_id, "request_model": model, "time": "2026-04-09 09:00:00.000", "status_code": status_code, "status_name": "ok", "total_cost_time": str(total_cost_time), "request_params": json.dumps( { "payload": { "input": {"messages": messages}, "parameters": {"tools": tools}, } }, ensure_ascii=False, ), "response_params": json.dumps( { "header": { "attributes": { "x-ds-backend-first-request-time": "123", "x-ds-backend-first-response-time": "456", } }, "payload": { "usage": usage, } }, ensure_ascii=False, ), } def make_qwen_record(request_id="qwen-1", session_id="sess-qwen-1", total_cost_time=900): return { "request_id": request_id, "session_id": session_id, "request_model": "qwen3-coder-plus-2025-09-23", "time": "2026-04-19 15:00:00.000", "status_code": "200", "status_name": "OK", "total_cost_time": str(total_cost_time), "request_params": json.dumps( { "header": { "attributes": { "x-dashscope-inner-requestreadytime": "1776582000000", } }, "payload": { "input": { "messages": [ {"role": "system", "content": "You are Qwen."}, {"role": "user", "content": "List files"}, ] }, "parameters": { "tools": [ { "type": "function", "function": { "name": "run_command", "parameters": { "type": "object", "properties": {"cmd": {"type": "string"}}, }, }, } ] }, }, }, ensure_ascii=False, ), "response_params": json.dumps( { "header": { "attributes": { "x-ds-backend-first-request-time": "1776581999083", "x-ds-backend-first-response-time": "1776581999918", } }, "payload": { "output": { "choices": [ { "delta": {"role": "assistant", "content": "ls"}, "finish_reason": "stop", } ], "usage": { "prompt_tokens": 90, "completion_tokens": 12, "total_tokens": 102, "prompt_tokens_details": {"cached_tokens": 24}, }, } } }, ensure_ascii=False, ), } class TraceAnalyzerTest(unittest.TestCase): def write_raw_fixture(self, rows): temp_dir = tempfile.TemporaryDirectory() path = Path(temp_dir.name) / "trace.jsonl" with open(path, "w", encoding="utf-8") as handle: for row in rows: handle.write(json.dumps(row, ensure_ascii=False) + "\n") self.addCleanup(temp_dir.cleanup) return path def format_fixture(self, rows): raw_path = self.write_raw_fixture(rows) formatted_path = raw_path.parent / "trace-raw.jsonl" format_and_sort_trace( input_dir=raw_path, output_path=formatted_path, chunk_bytes=256, truncate_to_window=False, ) return formatted_path def test_infer_analysis_dataset_name_includes_model_slug_from_formatted_parent(self): path = Path("trace-qwen3-coder-formatted/041915-041917-raw.jsonl") self.assertEqual( infer_analysis_dataset_name(path), "qwen3-coder-041915-041917", ) self.assertEqual( default_output_dir(path), Path("outputs/analysis/qwen3-coder-041915-041917"), ) def test_load_records_parses_formatter_output(self): path = self.format_fixture( [ make_record( "req-1", "sess-1", [ { "role": "system", "content": [ { "text": "sys", "cache_control": {"type": "ephemeral"}, } ], }, {"role": "user", "content": "hello"}, {"role": "assistant", "content": "working"}, {"role": "tool", "content": "tool-output"}, ], [{"type": "function", "function": {"name": "read"}}], { "input_tokens": 100, "output_tokens": 20, "total_tokens": 120, "output_tokens_details": {"reasoning_tokens": 7}, "prompt_tokens_details": {"cached_tokens": 60}, }, total_cost_time=500, ) ] ) records = load_records(path) self.assertEqual(len(records), 1) record = records[0] self.assertEqual(record.meta.request_id, "req-1") self.assertEqual(record.meta.line_number, 1) self.assertEqual(record.messages[0].has_cache_control, True) self.assertEqual(record.declared_tools[0].name, "read") self.assertEqual(record.usage.cached_tokens, 60) self.assertEqual(record.usage.reasoning_tokens, 7) self.assertEqual(record.meta.backend_first_request_time_ms, 123) self.assertEqual(record.meta.backend_first_response_time_ms, 456) self.assertEqual(record.meta.total_cost_time_ms, 500) self.assertEqual(record.raw_messages[0]["role"], "system") self.assertTrue(record.canonical_prompt) self.assertIn("[gMASK]", record.canonical_prompt) self.assertIn("<|assistant|>", record.canonical_prompt) self.assertIn("", record.canonical_prompt) def test_load_records_keeps_formatter_normalized_tool_calls_and_textual_tool_content(self): path = self.format_fixture( [ make_record( "req-tool-shape", "sess-tool-shape", [ {"role": "user", "content": "hello"}, { "role": "assistant", "content": "calling tool", "tool_calls": [ { "id": "call-1", "type": "function", "name": "read_file", "arguments": "{\"path\": \"/tmp/a.txt\"}", } ], }, { "role": "tool", "content": [ { "text": "tool-output", "cache_control": {"type": "ephemeral"}, } ], }, ], [{"type": "function", "function": {"name": "read_file"}}], { "input_tokens": 50, "output_tokens": 10, "total_tokens": 60, "prompt_tokens_details": {"cached_tokens": 0}, }, total_cost_time=100, ) ] ) record = load_records(path)[0] self.assertIn("read_file", record.canonical_prompt) self.assertIn("path", record.canonical_prompt) self.assertIn("tool-output", record.canonical_prompt) def test_load_records_supports_qwen_formatter_output(self): path = self.format_fixture([make_qwen_record()]) record = load_records(path)[0] self.assertEqual(record.meta.provider, "qwen3-coder") self.assertEqual(record.usage.input_tokens, 90) self.assertEqual(record.usage.output_tokens, 12) self.assertEqual(record.usage.cached_tokens, 24) self.assertIn("<|im_start|>system", record.canonical_prompt) self.assertIn("run_command", record.canonical_prompt) def test_qwen_formatter_serializes_assistant_tool_calls_with_model_tool_parser_shape(self): row = make_qwen_record() request_params = json.loads(row["request_params"]) request_params["payload"]["input"]["messages"].append( { "role": "assistant", "content": "calling tool", "tool_calls": [ { "id": "call-1", "type": "function", "function": { "name": "run_command", "arguments": "{\"cmd\": \"ls -la\"}", }, } ], } ) row["request_params"] = json.dumps(request_params, ensure_ascii=False) path = self.format_fixture([row]) record = load_records(path)[0] self.assertIn("", record.canonical_prompt) self.assertIn("", record.canonical_prompt) self.assertIn("", record.canonical_prompt) self.assertIn("ls -la", record.canonical_prompt) def test_load_records_rejects_provider_raw_trace(self): path = self.write_raw_fixture([make_qwen_record()]) with self.assertRaisesRegex(ValueError, r"formatter-generated \*-raw\.jsonl"): load_records(path) def test_input_length_bucket_cache_reuse_summary(self): summary, bucket_rows = summarize_cache_reuse_by_input_length_bucket( [ { "input_tokens": 100, "cached_tokens": 25, "cache_hit_ratio": 0.25, "theoretical_prompt_unit_length": 120, "theoretical_prefix_hit_units": 60, "theoretical_prefix_hit_ratio": 0.5, }, { "input_tokens": 40000, "cached_tokens": 20000, "cache_hit_ratio": 0.5, "theoretical_prompt_unit_length": 42000, "theoretical_prefix_hit_units": 31500, "theoretical_prefix_hit_ratio": 0.75, }, ] ) self.assertEqual(summary["request_count"], 2) bucket_by_name = {row["bucket"]: row for row in bucket_rows} self.assertEqual(bucket_by_name["0-32Ki"]["request_count"], 1) self.assertEqual(bucket_by_name["32-85Ki"]["request_count"], 1) self.assertAlmostEqual(bucket_by_name["0-32Ki"]["weighted_actual_cache_hit_ratio"], 0.25) self.assertAlmostEqual(bucket_by_name["32-85Ki"]["weighted_theoretical_cache_hit_ratio"], 0.75) def test_input_length_bucket_cache_reuse_summary_supports_custom_buckets(self): summary, bucket_rows = summarize_cache_reuse_by_input_length_bucket( [ { "input_tokens": 40, "cached_tokens": 10, "cache_hit_ratio": 0.25, "theoretical_prompt_unit_length": 50, "theoretical_prefix_hit_units": 20, "theoretical_prefix_hit_ratio": 0.4, }, { "input_tokens": 60, "cached_tokens": 30, "cache_hit_ratio": 0.5, "theoretical_prompt_unit_length": 70, "theoretical_prefix_hit_units": 35, "theoretical_prefix_hit_ratio": 0.5, }, ], bucket_defs=build_input_length_bucket_defs([50]), ) self.assertEqual( summary["bucket_definition"]["buckets"], [ { "bucket": "0-50", "input_tokens_min_inclusive": 0, "input_tokens_max_exclusive": 50, }, { "bucket": "50+", "input_tokens_min_inclusive": 50, "input_tokens_max_exclusive": None, }, ], ) bucket_by_name = {row["bucket"]: row for row in bucket_rows} self.assertEqual(bucket_by_name["0-50"]["request_count"], 1) self.assertEqual(bucket_by_name["50+"]["request_count"], 1) def test_input_length_bucket_cache_reuse_summary_tracks_bucketed_theoretical_upper_bound(self): summary, bucket_rows = summarize_cache_reuse_by_input_length_bucket( [ { "input_tokens": 100, "cached_tokens": 20, "cache_hit_ratio": 0.2, "theoretical_prompt_unit_length": 100, "theoretical_prefix_hit_units": 60, "theoretical_prefix_hit_ratio": 0.6, "bucketed_theoretical_prefix_hit_units": 40, "bucketed_theoretical_prefix_hit_ratio": 0.4, } ], bucket_defs=build_input_length_bucket_defs([200]), ) self.assertEqual(summary["request_count"], 1) row = bucket_rows[0] self.assertEqual(row["bucket"], "0-200") self.assertAlmostEqual(row["weighted_theoretical_cache_hit_ratio"], 0.6) self.assertAlmostEqual(row["weighted_bucketed_theoretical_cache_hit_ratio"], 0.4) self.assertAlmostEqual(row["weighted_bucket_boundary_loss_ratio"], 0.2) self.assertAlmostEqual(row["bucketed_theoretical_reused_request_fraction"], 1.0) def test_session_bucket_boundary_miss_summary_counts_cross_bucket_shared_prefix_loss(self): summary, bucket_rows = summarize_session_bucket_boundary_miss( [ { "child_bucket": "0-32Ki", "child_input_tokens": 100, "shared_prefix_units": 8, "is_cross_bucket": 1, }, { "child_bucket": "0-32Ki", "child_input_tokens": 120, "shared_prefix_units": 4, "is_cross_bucket": 0, }, ], bucket_defs=build_input_length_bucket_defs(), ) self.assertEqual(summary["edge_count"], 2) self.assertEqual(summary["cross_bucket_edge_count"], 1) self.assertAlmostEqual(summary["cross_bucket_shared_prefix_unit_fraction"], 8 / 12) bucket_by_name = {row["bucket"]: row for row in bucket_rows} self.assertEqual(bucket_by_name["0-32Ki"]["edge_count"], 2) self.assertAlmostEqual( bucket_by_name["0-32Ki"]["cross_bucket_shared_prefix_unit_fraction"], 8 / 12, ) def test_build_alive_block_timeline_counts_live_blocks_from_first_seen_to_last_reuse(self): summary, rows = build_alive_block_timeline( [ {"first_seen_ms": 10, "span_end_ms": 20}, {"first_seen_ms": 15, "span_end_ms": 15}, ] ) self.assertEqual(summary["peak_alive_blocks"], 2) self.assertEqual(rows[0]["timestamp_ms"], 10) self.assertEqual(rows[0]["alive_block_count"], 1) row_by_ts = {row["timestamp_ms"]: row for row in rows} self.assertEqual(row_by_ts[15]["alive_block_count"], 2) self.assertEqual(row_by_ts[16]["alive_block_count"], 1) self.assertEqual(row_by_ts[21]["alive_block_count"], 0) def test_parse_input_length_bucket_thresholds_supports_ki_units(self): self.assertEqual( parse_input_length_bucket_thresholds("32Ki;85Ki;128Ki"), [32 * 1024, 85 * 1024, 128 * 1024], ) def test_compute_features_detects_bursts_and_cache(self): path = self.format_fixture( [ make_record( "req-2", "sess-2", [ {"role": "user", "content": "u"}, {"role": "assistant", "content": "a"}, {"role": "tool", "content": "t1"}, {"role": "tool", "content": "t2"}, {"role": "tool", "content": "t3"}, {"role": "assistant", "content": "done"}, ], [{"type": "function", "function": {"name": "exec"}}], { "input_tokens": 40000, "output_tokens": 200, "total_tokens": 40200, "prompt_tokens_details": {"cached_tokens": 1000}, }, total_cost_time=9000, ) ] ) features = compute_features(load_records(path)) feature = features[0] self.assertEqual(feature.assistant_to_tool_count, 1) self.assertEqual(feature.tool_to_tool_count, 2) self.assertEqual(feature.max_consecutive_tool_msgs, 3) self.assertAlmostEqual(feature.cache_hit_ratio, 0.025) self.assertIn("cache-cold", feature.pattern_labels) self.assertIn("long-context-no-cache", feature.pattern_labels) def test_compute_theoretical_cache_detects_prefix_reuse(self): rows = [ make_record( "req-a", "sess-a", [{"role": "user", "content": "prefix shared"}], [], { "input_tokens": 10, "output_tokens": 1, "total_tokens": 11, "prompt_tokens_details": {"cached_tokens": 0}, }, total_cost_time=10, ), make_record( "req-b", "sess-a", [{"role": "user", "content": "prefix shared and more"}], [], { "input_tokens": 20, "output_tokens": 1, "total_tokens": 21, "prompt_tokens_details": {"cached_tokens": 0}, }, total_cost_time=10, ), ] first = json.loads(rows[0]["request_params"]) second = json.loads(rows[1]["request_params"]) first["header"] = {"attributes": {"x-dashscope-inner-requestreadytime": "1000"}} second["header"] = {"attributes": {"x-dashscope-inner-requestreadytime": "2000"}} rows[0]["request_params"] = json.dumps(first, ensure_ascii=False) rows[1]["request_params"] = json.dumps(second, ensure_ascii=False) theoretical = compute_theoretical_cache( load_records(self.format_fixture(rows)), block_size=8, segment_mode="bytes", ) request_rows = {row["request_id"]: row for row in theoretical["request_rows"]} self.assertEqual(request_rows["req-a"]["theoretical_prefix_hit_ratio"], 0.0) self.assertGreater(request_rows["req-b"]["theoretical_prefix_hit_ratio"], 0.0) self.assertTrue(theoretical["reuse_gap_rows"]) reused_blocks = [row for row in theoretical["block_rows"] if row["reuse_count"] > 0] self.assertTrue(reused_blocks) self.assertIn("last_reuse_ms", reused_blocks[0]) self.assertIn("span_ms", reused_blocks[0]) self.assertGreaterEqual(reused_blocks[0]["lifetime_ms"], 0) def test_report_cli_writes_outputs(self): path = self.format_fixture( [ make_record( "req-3", "sess-3", [ {"role": "user", "content": "u"}, {"role": "assistant", "content": "a"}, ], [], { "input_tokens": 20, "output_tokens": 5, "total_tokens": 25, "prompt_tokens_details": {"cached_tokens": 0}, }, total_cost_time=30, ), make_record( "req-4", "sess-3", [ {"role": "user", "content": "u"}, {"role": "assistant", "content": "a"}, {"role": "tool", "content": "t"}, {"role": "assistant", "content": "done"}, ], [{"type": "function", "function": {"name": "read"}}], { "input_tokens": 200, "output_tokens": 50, "total_tokens": 250, "prompt_tokens_details": {"cached_tokens": 150}, }, total_cost_time=300, ), ] ) with tempfile.TemporaryDirectory() as temp_dir: completed = subprocess.run( [ sys.executable, "-m", "trace_analyzer", "report", str(path), "--output-dir", temp_dir, "--limit", "2", ], cwd=Path(__file__).resolve().parents[1], check=True, capture_output=True, text=True, ) self.assertIn("report.md", completed.stdout) summary_path = Path(temp_dir) / "summary.json" report_path = Path(temp_dir) / "report.md" features_path = Path(temp_dir) / "features.csv" self.assertTrue(summary_path.exists()) self.assertTrue(report_path.exists()) self.assertTrue(features_path.exists()) summary = json.loads(summary_path.read_text(encoding="utf-8")) self.assertIn("tool_patterns", summary) self.assertIn("cache_patterns", summary) with open(features_path, "r", encoding="utf-8") as handle: rows = list(csv.DictReader(handle)) self.assertEqual(len(rows), 2) def test_study_cli_writes_advanced_outputs(self): raw_rows = [ make_record( "req-6", "sess-6", [{"role": "user", "content": "hello world"}], [{"type": "function", "function": {"name": "read"}}], { "input_tokens": 40, "output_tokens": 2, "total_tokens": 42, "prompt_tokens_details": {"cached_tokens": 10}, }, total_cost_time=100, ), make_record( "req-7", "sess-6", [ {"role": "assistant", "content": "a"}, {"role": "tool", "content": "result"}, {"role": "user", "content": "hello world again"}, ], [{"type": "function", "function": {"name": "read"}}], { "input_tokens": 60, "output_tokens": 3, "total_tokens": 63, "prompt_tokens_details": {"cached_tokens": 20}, }, total_cost_time=150, ), ] path = self.format_fixture(raw_rows) with tempfile.TemporaryDirectory() as temp_dir: subprocess.run( [ sys.executable, "-m", "trace_analyzer", "study", str(path), "--output-dir", temp_dir, "--block-size", "8", "--segment-mode", "bytes", "--input-length-buckets", "50", ], cwd=Path(__file__).resolve().parents[1], check=True, capture_output=True, text=True, ) self.assertTrue((Path(temp_dir) / "details" / "request_metrics.csv").exists()) self.assertTrue((Path(temp_dir) / "details" / "cdf_lengths.png").exists()) self.assertTrue((Path(temp_dir) / "details" / "tools_catalog.csv").exists()) bucket_summary = json.loads( (Path(temp_dir) / "details" / "input_length_bucket_cache_reuse_summary.json").read_text( encoding="utf-8" ) ) self.assertEqual( [row["bucket"] for row in bucket_summary["bucket_definition"]["buckets"]], ["0-50", "50+"], ) def test_study_cli_reuses_existing_base_outputs(self): raw_rows = [ make_record( "req-8", "sess-8", [{"role": "user", "content": "prefix shared"}], [{"type": "function", "function": {"name": "read"}}], { "input_tokens": 20, "output_tokens": 2, "total_tokens": 22, "prompt_tokens_details": {"cached_tokens": 0}, }, total_cost_time=80, ), make_record( "req-9", "sess-8", [{"role": "user", "content": "prefix shared again"}], [{"type": "function", "function": {"name": "read"}}], { "input_tokens": 30, "output_tokens": 3, "total_tokens": 33, "prompt_tokens_details": {"cached_tokens": 10}, }, total_cost_time=120, ), ] path = self.format_fixture(raw_rows) with tempfile.TemporaryDirectory() as temp_dir: subprocess.run( [ sys.executable, "-m", "trace_analyzer", "report", str(path), "--output-dir", temp_dir, ], cwd=Path(__file__).resolve().parents[1], check=True, capture_output=True, text=True, ) completed = subprocess.run( [ sys.executable, "-m", "trace_analyzer", "study", str(path), "--output-dir", temp_dir, "--block-size", "8", "--segment-mode", "bytes", ], cwd=Path(__file__).resolve().parents[1], check=True, capture_output=True, text=True, ) self.assertIn("details_summary.json", completed.stdout) self.assertTrue((Path(temp_dir) / "details" / "progress.json").exists()) report_text = (Path(temp_dir) / "report.md").read_text(encoding="utf-8") self.assertIn("Study Outputs", report_text) def test_build_summary_contains_expected_keys(self): path = self.format_fixture( [ make_record( "req-5", "sess-5", [{"role": "user", "content": "u"}], [], { "input_tokens": 10, "output_tokens": 1, "total_tokens": 11, "prompt_tokens_details": {"cached_tokens": 0}, }, total_cost_time=5, ) ] ) records = load_records(path) features = compute_features(records) summary = build_summary(records, features) self.assertIn("record_count", summary) self.assertIn("tool_patterns", summary) self.assertIn("cache_patterns", summary) self.assertIn("anomalies", summary) if __name__ == "__main__": unittest.main()