from __future__ import annotations import json import os import subprocess import tempfile import unittest from pathlib import Path from unittest import mock from aituner.cli import main as cli_main from aituner.compare import load_compare_spec, run_compare from aituner.engine import build_launch_recipe from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy from aituner.job import append_job, build_trial_job from aituner.harness import build_harness_context from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations from aituner.spec import ( ConfigPatch, LLMEndpointSpec, Proposal, SpecError, StudyState, TrialSummary, load_study_spec, ) from aituner.store import StudyStore from aituner.trace import load_trace_requests, summarize_window from aituner.worker import ( _latency_summary, _replay_requests, _terminate_process_tree, _wait_for_server_or_exit, ) from aituner.trace import TraceRequest REPO_ROOT = Path(__file__).resolve().parents[1] def _write_study_assets( tmp_path: Path, *, trace_overrides: dict[str, object] | None = None, slo_overrides: dict[str, object] | None = None, engine_overrides: dict[str, object] | None = None, ) -> Path: trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) trace_path = trace_dir / "chat_w1.jsonl" rows = [ { "request_id": "r1", "timestamp": 0.0, "sampling_u": 0.10, "messages": [{"role": "user", "content": "hello"}], "input_length": 1000, "output_length": 16 }, { "request_id": "r2", "timestamp": 1.0, "sampling_u": 0.50, "messages": [{"role": "user", "content": "world"}], "input_length": 5000, "output_length": 32 }, { "request_id": "r3", "timestamp": 2.0, "sampling_u": 0.90, "messages": [{"role": "user", "content": "!"}], "input_length": 20000, "output_length": 64 } ] with trace_path.open("w", encoding="utf-8") as handle: for row in rows: handle.write(json.dumps(row) + "\n") windows_path = tmp_path / "trace_windows" / "windows.json" windows_payload = { "u_field": "sampling_u", "windows": [ { "window_id": "chat_w1", "trace_type": "chat", "trace_file": "traces/chat_w1.jsonl", "window_start": 0.0, "window_end": 10.0 } ] } windows_path.write_text(json.dumps(windows_payload), encoding="utf-8") capability_path = tmp_path / "capability.json" capability_path.write_text( json.dumps({"prefill_service_by_bucket": {"4k": {"tp4_ms": 320, "tp8_ms": 240}}}), encoding="utf-8", ) study_path = tmp_path / "study.json" trace_payload: dict[str, object] = { "windows_path": str(windows_path), "window_id": "chat_w1", "u_field": "sampling_u", "timestamp_field": "timestamp", "max_concurrency": 4, } if trace_overrides: trace_payload.update(trace_overrides) study_payload = { "study_id": "study-1", "hardware": {"gpu_count": 8, "gpu_model": "H20", "host_candidates": ["dash0"]}, "model": { "model_id": "qwen", "served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507" }, "engine": { "engine_name": "vllm", "engine_version": "0.1", "exec_path": "/usr/local/bin/vllm", "cwd": str(tmp_path), "host": "127.0.0.1", "port": 8000, "healthcheck_path": "/v1/models", "ready_timeout_s": 30, "request_timeout_s": 30, "launch_args": ["serve", "/models/qwen"], "base_envs": {"BASE_ENV": "1"}, "base_flags": {"host": "127.0.0.1", "port": 8000}, "tunable_envs": ["VLLM_ATTENTION_BACKEND"], "tunable_flags": ["tensor-parallel-size", "max-num-seqs"], "python_executable": "python3" }, "trace": trace_payload, "slo": { "target_pass_rate": 0.95, "ttft_rule": { "kind": "step_ms", "buckets": [ {"max_input_tokens": 4096, "threshold_ms": 2000}, {"max_input_tokens": 16384, "threshold_ms": 5000}, {"threshold_ms": 9000} ] }, "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120} }, "search": { "low": 0.0, "high": 1.0, "tolerance": 0.01, "max_probes": 8, "sample_seed": 20260325 }, "llm": {"system_prompt": "Tune it.", "max_history_trials": 8}, "capability_profile_path": str(capability_path) } if slo_overrides: study_payload["slo"].update(slo_overrides) if engine_overrides: study_payload["engine"].update(engine_overrides) study_path.write_text(json.dumps(study_payload), encoding="utf-8") return study_path def _write_compare_assets( tmp_path: Path, *, study_path: Path, window_ids: list[str] | None = None, window_selector: dict[str, object] | None = None, baseline: dict[str, object] | None = None, tuned: dict[str, object] | None = None, ) -> Path: compare_path = tmp_path / "compare.json" payload: dict[str, object] = { "compare_id": "compare-1", "study_spec_path": str(study_path), "baseline": baseline or {"config_patch": {"env_patch": {}, "flag_patch": {}}}, "tuned": tuned or { "config_patch": { "env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}, } }, } if window_ids is not None: payload["window_ids"] = window_ids if window_selector is not None: payload["window_selector"] = window_selector compare_path.write_text(json.dumps(payload), encoding="utf-8") return compare_path class CoreFlowTests(unittest.TestCase): def test_trace_and_prompt_flow(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") study_root = store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) window, requests = load_trace_requests(study, study_spec_path=study_path) summary = summarize_window(requests, window) self.assertEqual(summary["request_count"], 3) self.assertEqual(summary["request_rate"], 0.3) prompt = build_prompt( study=study, window_summary=summary, state=state, capability_profile={"queueing_knee_by_bucket": {"4k": 1000}}, ) self.assertIn("allowed_flag_keys", prompt) self.assertIn("study-1", prompt) self.assertIn('"current_best"', prompt) self.assertIn("queueing_knee_by_bucket", prompt) self.assertIn("Harnesses:", prompt) self.assertIn("workload_lca_profile", prompt) self.assertIn("knob_harnesses", prompt) self.assertTrue(study_root.exists()) def test_harness_uses_latency_failures_before_generic_unrecoverable(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) result_path = tmp_path / "trial-result.json" result_path.write_text( json.dumps( { "status": "completed", "probes": [ { "threshold": 0.25, "feasible": False, "payload": { "request_count": 100, "pass_rate": 0.3, "request_rate": 1.0, "early_stopped": True, "early_stop_reason": "slo_pass_rate_unrecoverable", "latency_summary": { "failed_reason_counts": { "ttft_ms>5000.0": 70, "tpot_ms>50.0": 5, "probe_elapsed_s>240.0": 100, }, "ttft_ms": {"p95": 6500.0, "p99": 7200.0}, }, }, } ], } ), encoding="utf-8", ) state = StudyState( study_id=study.study_id, trials=[ TrialSummary( trial_id="trial-0001", status="completed", result_path=str(result_path), config_patch={"env_patch": {}, "flag_patch": {}}, ) ], ) context = build_harness_context( study=study, window_summary={ "prompt_tokens_p95": 5000, "prompt_tail_ratio_p95_p50": 3.0, }, state=state, ) self.assertEqual( context["recent_trial_diagnostics"][0]["active_bottleneck"], "ttft_prefill", ) def test_harness_blocks_repeating_infeasible_plateau_family(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", ], "topology_constraints": { "allowed_tensor_parallel_sizes": [1, 2, 4], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1], "allowed_tp_dp_products": [1, 2, 4, 8], }, }, ) study = load_study_spec(study_path) trial_summaries = [] for index, (dp, pass_rate, p95) in enumerate( [(4, 0.345, 3818.4), (8, 0.345, 3823.4)], start=3 ): result_path = tmp_path / f"trial-{index:04d}.json" result_path.write_text( json.dumps( { "status": "completed", "best_request_rate": None, "all_infeasible_diagnostics": { "threshold": 0.0078125, "request_count": 148, "request_rate": 0.22, "pass_rate": pass_rate, "early_stopped": True, "early_stop_reason": "elapsed", "latency_summary": { "failed_reason_counts": {"ttft_ms>5000.0": 97}, "ttft_ms": {"p95": p95, "p99": 5800.0}, }, }, } ), encoding="utf-8", ) trial_summaries.append( TrialSummary( trial_id=f"trial-{index:04d}", status="completed", result_path=str(result_path), config_patch={ "env_patch": {}, "flag_patch": { "tensor-parallel-size": 1, "data-parallel-size": dp, "expert-parallel-size": 1, }, }, ) ) context = build_harness_context( study=study, window_summary={ "prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.83, }, state=StudyState(study_id=study.study_id, trials=trial_summaries), ) guard = context["convergence_guard"]["infeasible_progress"] self.assertTrue(guard["plateau_detected"]) self.assertTrue(guard["stop_if_next_probe_repeats_family"]) self.assertEqual(guard["blocked_primary_family"], "data-parallel-size") self.assertTrue( context["convergence_guard"][ "should_stop_if_no_harness_can_justify_a_new_adjacent_probe" ] ) def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, trace_overrides={ "input_length_filter": { "min_input_tokens": 0, "max_input_tokens": 8192, } }, ) study = load_study_spec(study_path) window, requests = load_trace_requests(study, study_spec_path=study_path) summary = summarize_window(requests, window) self.assertEqual(len(requests), 2) self.assertEqual([item.prompt_tokens_hint for item in requests], [1000, 5000]) self.assertEqual(summary["request_count"], 2) self.assertEqual(summary["prompt_tokens_p95"], 5000.0) self.assertIn("prefix_cache", summary) self.assertIn("arrival_burst_ratio_p95_to_mean", summary) prompt = build_prompt( study=study, window_summary=summary, state=StudyState(study_id=study.study_id), capability_profile=None, ) self.assertIn('"input_length_filter"', prompt) self.assertIn('"max_input_tokens": 8192', prompt) def test_trace_input_length_filter_rejects_invalid_bounds(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, trace_overrides={ "input_length_filter": { "min_input_tokens": 8193, "max_input_tokens": 8192, } }, ) with self.assertRaisesRegex(SpecError, "min_input_tokens must be <="): load_study_spec(study_path) def test_decode_only_mode_is_loaded_and_prompt_mentions_it(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, trace_overrides={"request_mode": "decode_only"}, slo_overrides={ "ttft_rule": None, "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20}, }, ) study = load_study_spec(study_path) self.assertEqual(study.trace.request_mode, "decode_only") window, requests = load_trace_requests(study, study_spec_path=study_path) prompt = build_prompt( study=study, window_summary=summarize_window(requests, window), state=StudyState(study_id=study.study_id), capability_profile=None, ) self.assertIn('"request_mode": "decode_only"', prompt) self.assertIn("There is no TTFT SLO for this study.", prompt) self.assertIn("decode-only", prompt) def test_load_study_spec_rejects_mismatched_served_model_name(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "served-model-name": "engine-name", } }, ) payload = json.loads(study_path.read_text(encoding="utf-8")) payload["model"]["served_model_name"] = "trace-name" study_path.write_text(json.dumps(payload), encoding="utf-8") with self.assertRaisesRegex(SpecError, "must match engine.base_flags"): load_study_spec(study_path) def test_bailian_endpoint_defaults(self) -> None: endpoint = LLMEndpointSpec.from_dict({"provider": "bailian", "model": "qwen-plus"}) self.assertEqual(endpoint.provider, "bailian") self.assertEqual( endpoint.base_url, "https://dashscope.aliyuncs.com/compatible-mode/v1" ) self.assertEqual(endpoint.api_key_env, "DASHSCOPE_API_KEY") def test_codex_endpoint_resolves_base_url_from_codex_config(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) codex_dir = tmp_path / ".codex" codex_dir.mkdir(parents=True) (codex_dir / "config.toml").write_text( '\n'.join( [ 'model_provider = "ipads"', 'model_reasoning_effort = "high"', "", "[model_providers.ipads]", 'base_url = "http://codex.example/v1"', 'wire_api = "responses"', ] ), encoding="utf-8", ) with mock.patch.dict(os.environ, {"HOME": str(tmp_path)}, clear=True): endpoint = LLMEndpointSpec.from_dict({"provider": "codex", "model": "gpt-5.4"}) self.assertEqual(endpoint.provider, "codex") self.assertEqual(endpoint.base_url, "http://codex.example/v1") self.assertEqual(endpoint.wire_api, "responses") self.assertFalse(endpoint.stream) self.assertEqual(endpoint.reasoning_effort, "high") self.assertEqual(endpoint.api_key_env, "OPENAI_API_KEY") def test_codex_stream_forces_chat_completions_wire_api(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) codex_dir = tmp_path / ".codex" codex_dir.mkdir(parents=True) (codex_dir / "config.toml").write_text( '\n'.join( [ 'model_provider = "ipads"', "", "[model_providers.ipads]", 'base_url = "http://codex.example/v1"', 'wire_api = "responses"', ] ), encoding="utf-8", ) with mock.patch.dict(os.environ, {"HOME": str(tmp_path)}, clear=True): endpoint = LLMEndpointSpec.from_dict( {"provider": "codex", "model": "gpt-5.4", "stream": True} ) self.assertTrue(endpoint.stream) self.assertEqual(endpoint.wire_api, "chat.completions") def test_endpoint_stream_flag(self) -> None: endpoint = LLMEndpointSpec.from_dict( { "provider": "custom", "base_url": "http://example/v1", "wire_api": "chat.completions", "stream": True, "model": "x", "api_key_env": "OPENAI_API_KEY", } ) self.assertTrue(endpoint.stream) def test_extract_response_text_supports_responses_api_output(self) -> None: text = _extract_response_text( { "output": [ { "type": "message", "content": [ {"type": "output_text", "text": '{"diagnosis":"ok"}'} ], } ] } ) self.assertEqual(text, '{"diagnosis":"ok"}') def test_auth_headers_load_bailian_key_from_dotenv(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) (tmp_path / ".env").write_text('DASHSCOPE_API_KEY="dash-key"\n', encoding="utf-8") with mock.patch.dict(os.environ, {}, clear=True): with mock.patch("pathlib.Path.cwd", return_value=tmp_path): headers = _auth_headers("DASHSCOPE_API_KEY", "bailian") self.assertEqual(headers["Authorization"], "Bearer dash-key") def test_auth_headers_load_codex_auth_and_proxy(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) codex_dir = tmp_path / ".codex" codex_dir.mkdir(parents=True) (codex_dir / "config.toml").write_text( '\n'.join( [ "[network]", 'http_proxy = "http://proxy.example:3128"', 'https_proxy = "http://proxy.example:3128"', ] ), encoding="utf-8", ) (codex_dir / "auth.json").write_text( json.dumps({"OPENAI_API_KEY": "sk-codex-test"}), encoding="utf-8", ) with mock.patch.dict(os.environ, {"HOME": str(tmp_path)}, clear=True): with mock.patch("pathlib.Path.cwd", return_value=tmp_path): headers = _auth_headers("OPENAI_API_KEY", "codex") self.assertEqual(os.environ["http_proxy"], "http://proxy.example:3128") self.assertEqual(os.environ["HTTP_PROXY"], "http://proxy.example:3128") self.assertEqual(headers["Authorization"], "Bearer sk-codex-test") def test_prompt_includes_failed_trial_context(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) window, requests = load_trace_requests(study, study_spec_path=study_path) prompt = build_prompt( study=study, window_summary=summarize_window(requests, window), state=StudyState( study_id=study.study_id, trials=[ TrialSummary( trial_id="trial-0001", status="failed", diagnosis="flashinfer looked promising", config_patch={ "env_patch": {"VLLM_ATTENTION_BACKEND": "FLASHINFER"}, "flag_patch": {"tensor-parallel-size": 4}, }, failure_reason="engine_process_exited_before_ready exit_code=1", ) ], ), capability_profile=None, ) self.assertIn('"status": "failed"', prompt) self.assertIn('"failure_reason": "engine_process_exited_before_ready exit_code=1"', prompt) self.assertIn('"VLLM_ATTENTION_BACKEND": "FLASHINFER"', prompt) self.assertIn("Known launch failures:", prompt) def test_prompt_includes_failure_stage_for_launch_failures(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) window, requests = load_trace_requests(study, study_spec_path=study_path) prompt = build_prompt( study=study, window_summary=summarize_window(requests, window), state=StudyState( study_id=study.study_id, trials=[ TrialSummary( trial_id="trial-0002", status="failed", diagnosis="bad topology", config_patch={ "env_patch": {}, "flag_patch": { "tensor-parallel-size": 3, "data-parallel-size": 3, }, }, failure_stage="engine_launch", failure_reason="engine_process_exited_before_ready exit_code=1", ) ], ), capability_profile=None, ) self.assertIn('"failure_stage": "engine_launch"', prompt) self.assertIn('"implicated_flag_keys"', prompt) def test_prompt_prioritizes_parallel_space_when_tp_dp_ep_are_tunable(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "enable-expert-parallel": True, "tensor-parallel-size": 4, "data-parallel-size": 2, "expert-parallel-size": 8, }, "tunable_envs": [], "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", "max-num-seqs", ], "topology_constraints": { "require_tp_dp_product_equals_gpu_count": True, "require_ep_size_leq_tp_dp_product": True, "require_ep_size_divides_tp_dp_product": True, "allowed_tensor_parallel_sizes": [1, 2, 4, 8], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1, 2, 4, 8], }, }, ) study = load_study_spec(study_path) window, requests = load_trace_requests(study, study_spec_path=study_path) prompt = build_prompt( study=study, window_summary=summarize_window(requests, window), state=StudyState(study_id=study.study_id), capability_profile=None, ) self.assertIn("Prioritize exploring legal topology changes in parallel space", prompt) self.assertIn("Parallel space candidates:", prompt) self.assertIn('"tensor_parallel_size": 2', prompt) def test_parse_proposal_text_repairs_truncated_json(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study = load_study_spec(_write_study_assets(tmp_path)) proposal = parse_proposal_text( """ { "observation": "obs", "diagnosis": "diag", "config_patch": { "env_patch": {}, "flag_patch": { "max-num-seqs": 24 } }, "expected_effects": [ "faster batching" ], "why_not_previous_failures": "none" """, study, ) self.assertEqual(proposal.diagnosis, "diag") self.assertEqual(proposal.config_patch.flag_patch["max-num-seqs"], 24) def test_length_only_trace_rows_are_synthesized(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) trace_path = trace_dir / "chat_len_only.jsonl" with trace_path.open("w", encoding="utf-8") as handle: handle.write( json.dumps( { "timestamp": 0.0, "sampling_u": 0.1, "input_length": 32, "output_length": 16 } ) + "\n" ) windows_path = tmp_path / "trace_windows" / "windows.json" windows_path.write_text( json.dumps( { "windows": [ { "window_id": "w1", "trace_type": "chat", "trace_file": "traces/chat_len_only.jsonl", "window_start": 0.0, "window_end": 10.0 } ] } ), encoding="utf-8", ) study_path = tmp_path / "study.json" study_path.write_text( json.dumps( { "study_id": "study-len-only", "hardware": {"gpu_count": 1}, "model": { "model_id": "m1", "served_model_name": "dummy-model" }, "engine": { "engine_name": "vllm", "exec_path": "/usr/local/bin/vllm", "host": "127.0.0.1", "port": 8000, "ready_timeout_s": 10, "request_timeout_s": 10, "healthcheck_path": "/v1/models", "launch_args": [], "base_envs": {}, "base_flags": {}, "tunable_envs": [], "tunable_flags": [] }, "trace": { "windows_path": str(windows_path), "window_id": "w1", "max_concurrency": 1, "synthetic_prompt_cap_tokens": 8 }, "slo": {"target_pass_rate": 0.95}, "search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1}, "llm": {"system_prompt": "", "max_history_trials": 1} } ), encoding="utf-8", ) study = load_study_spec(study_path) _, requests = load_trace_requests(study, study_spec_path=study_path) self.assertEqual(len(requests), 1) message = requests[0].body["messages"][0]["content"] self.assertEqual(message.count("token"), 8) self.assertEqual(requests[0].body["min_tokens"], 16) self.assertEqual(requests[0].body["max_tokens"], 16) def test_slo_evaluation_step_and_fixed_rules(self) -> None: with tempfile.TemporaryDirectory() as tmp: study = load_study_spec(_write_study_assets(Path(tmp))) outcomes = [ RequestOutcome( request_id="r1", success=True, ttft_ms=1000, tpot_ms=100, prompt_tokens=1000, completion_tokens=16, ), RequestOutcome( request_id="r2", success=True, ttft_ms=6000, tpot_ms=100, prompt_tokens=5000, completion_tokens=16, ), ] evaluations, summary = summarize_evaluations(outcomes, study.slo) self.assertTrue(evaluations[0].passed) self.assertFalse(evaluations[1].passed) self.assertEqual(summary["slo_pass_rate"], 0.5) def test_trace_completion_tokens_override_forces_min_and_max_tokens(self) -> None: with tempfile.TemporaryDirectory() as tmp: study_path = _write_study_assets( Path(tmp), trace_overrides={"completion_tokens_override": 1}, ) study = load_study_spec(study_path) _, requests = load_trace_requests(study, study_spec_path=study_path) self.assertEqual(len(requests), 3) self.assertEqual(requests[0].completion_tokens_hint, 1) self.assertEqual(requests[1].completion_tokens_hint, 1) self.assertEqual(requests[2].completion_tokens_hint, 1) self.assertEqual(requests[0].body["min_tokens"], 1) self.assertEqual(requests[0].body["max_tokens"], 1) self.assertEqual(requests[2].body["min_tokens"], 1) self.assertEqual(requests[2].body["max_tokens"], 1) def test_build_prompt_mentions_completion_tokens_override(self) -> None: with tempfile.TemporaryDirectory() as tmp: study_path = _write_study_assets( Path(tmp), trace_overrides={"completion_tokens_override": 1}, slo_overrides={"tpot_rule": None}, ) study = load_study_spec(study_path) store = StudyStore(Path(tmp) / ".aituner") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) window, requests = load_trace_requests(study, study_spec_path=study_path) prompt = build_prompt( study=study, window_summary=summarize_window(requests, window), state=state, capability_profile=None, ) self.assertIn('"completion_tokens_override": 1', prompt) self.assertIn("min_tokens=max_tokens=1", prompt) def test_slo_evaluation_supports_tpot_only_95_percent_target(self) -> None: with tempfile.TemporaryDirectory() as tmp: study = load_study_spec( _write_study_assets( Path(tmp), slo_overrides={ "ttft_rule": None, "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20}, }, ) ) outcomes = [ RequestOutcome( request_id="r1", success=True, ttft_ms=3000, tpot_ms=10, prompt_tokens=1000, completion_tokens=16, ), RequestOutcome( request_id="r2", success=True, ttft_ms=9000, tpot_ms=21, prompt_tokens=5000, completion_tokens=16, ), ] evaluations, summary = summarize_evaluations(outcomes, study.slo) self.assertEqual([item.passed for item in evaluations], [True, False]) self.assertEqual(summary["slo_pass_rate"], 0.5) self.assertFalse(summary["feasible"]) def test_build_launch_recipe_serializes_list_flags_once(self) -> None: with tempfile.TemporaryDirectory() as tmp: study = load_study_spec(_write_study_assets(Path(tmp))) recipe = build_launch_recipe( study.engine, ConfigPatch( flag_patch={ "cuda-graph-sizes": [1, 2, 4], } ), ) self.assertIn("--cuda-graph-sizes", recipe.argv) flag_index = recipe.argv.index("--cuda-graph-sizes") self.assertEqual(recipe.argv[flag_index + 1 : flag_index + 4], ["1", "2", "4"]) self.assertEqual(recipe.argv.count("--cuda-graph-sizes"), 1) def test_prepare_trace_windows_materializes_repo_local_assets(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) legacy_source = tmp_path / "legacy" thinking_source = tmp_path / "thinking" legacy_source.mkdir() thinking_source.mkdir() for filename in [ "qwen_chat_blksz_64_031109-031111", "qwen_chat_blksz_64_031121-031123", "qwen_chat_blksz_64_031209-031211", "qwen_chat_blksz_64_031221-031223", "qwen_chat_blksz_64_031309-031311", "qwen_chat_blksz_64_031321-031323", "qwen_chat_blksz_64_031409-031411", "qwen_chat_blksz_64_031421-031423", "qwen_chat_blksz_64_031509-031511", "qwen_chat_blksz_64_031521-031523", "qwen_chat_blksz_64_031609-031611", "qwen_chat_blksz_64_031621-031623", "qwen_chat_blksz_64_031709-031711", "qwen_chat_blksz_64_031721-031723", ]: for suffix in [".jsonl", "_prompt.jsonl"]: path = legacy_source / f"{filename}{suffix}" path.write_text("", encoding="utf-8") peak_trace = legacy_source / "qwen_chat_blksz_64_031109-031111.jsonl" peak_prompt = legacy_source / "qwen_chat_blksz_64_031109-031111_prompt.jsonl" peak_trace.write_text( "\n".join( [ json.dumps( { "chat_id": "c1", "turn": 1, "timestamp": 3599.0, "input_length": 10, "output_length": 3, } ), json.dumps( { "chat_id": "c2", "turn": 2, "timestamp": 3605.0, "input_length": 20, "output_length": 7, } ), ] ) + "\n", encoding="utf-8", ) peak_prompt.write_text( "\n".join( [ json.dumps({"chat_id": "c1", "turn": 1, "prompt": "ignore me"}), json.dumps({"chat_id": "c2", "turn": 2, "prompt": "real prompt"}), ] ) + "\n", encoding="utf-8", ) output_root = tmp_path / "trace_windows" subprocess.run( [ "python3", "scripts/prepare_trace_windows.py", "--legacy-source", str(legacy_source), "--thinking-source", str(thinking_source), "--output-root", str(output_root), "--workloads", "chat", "--overwrite", ], check=True, cwd=str(REPO_ROOT), ) windows_payload = json.loads((output_root / "windows.json").read_text(encoding="utf-8")) windows = {item["window_id"]: item for item in windows_payload["windows"]} self.assertIn("chat_w20260311_1000", windows) self.assertEqual(windows["chat_w20260311_1000"]["num_requests"], 1) trace_path = output_root / windows["chat_w20260311_1000"]["trace_file"] rows = [json.loads(line) for line in trace_path.read_text(encoding="utf-8").splitlines()] self.assertEqual(len(rows), 1) self.assertEqual(rows[0]["prompt"], "real prompt") self.assertEqual(rows[0]["timestamp"], 5.0) self.assertEqual(rows[0]["output_length"], 7) self.assertIsInstance(rows[0]["sampling_u"], float) def test_prepare_trace_windows_preserves_existing_files_on_failure(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) legacy_source = tmp_path / "legacy" thinking_source = tmp_path / "thinking" output_root = tmp_path / "trace_windows" traces_dir = output_root / "traces" legacy_source.mkdir() thinking_source.mkdir() traces_dir.mkdir(parents=True) for filename in [ "qwen_chat_blksz_64_031109-031111", "qwen_chat_blksz_64_031121-031123", ]: for suffix in [".jsonl", "_prompt.jsonl"]: path = legacy_source / f"{filename}{suffix}" path.write_text( json.dumps( { "chat_id": "c1", "turn": 1, "timestamp": 3605.0, "input_length": 20, "output_length": 7, "prompt": "prompt", } ) + "\n", encoding="utf-8", ) sentinel = traces_dir / "chat_w20260311_1000.jsonl" sentinel.write_text("sentinel\n", encoding="utf-8") proc = subprocess.run( [ "python3", "scripts/prepare_trace_windows.py", "--legacy-source", str(legacy_source), "--thinking-source", str(thinking_source), "--output-root", str(output_root), "--workloads", "chat", "--overwrite", ], cwd=str(REPO_ROOT), capture_output=True, text=True, ) self.assertNotEqual(proc.returncode, 0) self.assertEqual(sentinel.read_text(encoding="utf-8"), "sentinel\n") self.assertEqual(sorted(path.name for path in traces_dir.glob("*.tmp.*")), []) def test_binary_search_max_feasible(self) -> None: result = binary_search_max_feasible( low=0.0, high=1.0, tolerance=0.01, max_probes=8, evaluator=lambda threshold: ThresholdProbe( threshold=threshold, feasible=threshold <= 0.625, payload={"threshold": threshold}, ), ) self.assertLessEqual(result.best_threshold, 0.625) self.assertGreaterEqual(result.best_threshold, 0.5) self.assertIsNotNone(result.best_feasible_payload) def test_binary_search_continues_below_tolerance_when_all_infeasible(self) -> None: seen = [] def evaluator(threshold): seen.append(threshold) return ThresholdProbe( threshold=threshold, feasible=False, payload={"threshold": threshold}, ) result = binary_search_max_feasible( low=0.0, high=1.0, tolerance=0.1, max_probes=6, evaluator=evaluator, ) self.assertIsNone(result.best_feasible_payload) self.assertEqual(len(result.probes), 6) self.assertEqual( seen, [0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625], ) def test_trace_max_requests_uses_window_wide_downsample(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) trace_path = trace_dir / "chat_many.jsonl" with trace_path.open("w", encoding="utf-8") as handle: for idx in range(10): handle.write( json.dumps( { "request_id": f"r{idx}", "timestamp": float(idx), "sampling_u": idx / 10.0, "messages": [{"role": "user", "content": f"hello-{idx}"}], "input_length": 10 + idx, "output_length": 5, } ) + "\n" ) windows_path = tmp_path / "trace_windows" / "windows.json" windows_path.write_text( json.dumps( { "windows": [ { "window_id": "w1", "trace_type": "chat", "trace_file": "traces/chat_many.jsonl", "window_start": 0.0, "window_end": 10.0, } ] } ), encoding="utf-8", ) study_path = tmp_path / "study.json" study_path.write_text( json.dumps( { "study_id": "study-downsample", "hardware": {"gpu_count": 1}, "model": {"model_id": "m1", "served_model_name": "dummy-model"}, "engine": { "engine_name": "vllm", "exec_path": "/usr/local/bin/vllm", "host": "127.0.0.1", "port": 8000, "ready_timeout_s": 10, "request_timeout_s": 10, "healthcheck_path": "/v1/models", "launch_args": [], "base_envs": {}, "base_flags": {}, "tunable_envs": [], "tunable_flags": [], }, "trace": { "windows_path": str(windows_path), "window_id": "w1", "max_concurrency": 1, "max_requests_per_probe": 4, }, "slo": {"target_pass_rate": 0.95}, "search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1}, "llm": {"system_prompt": "", "max_history_trials": 1}, } ), encoding="utf-8", ) study = load_study_spec(study_path) _, requests = load_trace_requests(study, study_spec_path=study_path) self.assertEqual([item.row_id for item in requests], ["r0", "r2", "r5", "r7"]) def test_trace_replay_time_scale_scales_arrivals_and_window(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) trace_path = trace_dir / "chat_scale.jsonl" trace_path.write_text( json.dumps( { "request_id": "r1", "timestamp": 10.0, "sampling_u": 0.25, "messages": [{"role": "user", "content": "hello"}], "input_length": 16, "output_length": 4, } ) + "\n", encoding="utf-8", ) windows_path = tmp_path / "trace_windows" / "windows.json" windows_path.write_text( json.dumps( { "windows": [ { "window_id": "w1", "trace_type": "chat", "trace_file": "traces/chat_scale.jsonl", "window_start": 0.0, "window_end": 100.0, } ] } ), encoding="utf-8", ) study_path = tmp_path / "study.json" study_path.write_text( json.dumps( { "study_id": "study-scale", "hardware": {"gpu_count": 1}, "model": {"model_id": "m1", "served_model_name": "dummy-model"}, "engine": { "engine_name": "vllm", "exec_path": "/usr/local/bin/vllm", "host": "127.0.0.1", "port": 8000, "ready_timeout_s": 10, "request_timeout_s": 10, "healthcheck_path": "/v1/models", "launch_args": [], "base_envs": {}, "base_flags": {}, "tunable_envs": [], "tunable_flags": [], }, "trace": { "windows_path": str(windows_path), "window_id": "w1", "max_concurrency": 1, "replay_time_scale": 0.1, }, "slo": {"target_pass_rate": 0.95}, "search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1}, "llm": {"system_prompt": "", "max_history_trials": 1}, } ), encoding="utf-8", ) study = load_study_spec(study_path) window, requests = load_trace_requests(study, study_spec_path=study_path) self.assertEqual(window.window_end, 10.0) self.assertEqual(requests[0].arrival_s, 1.0) def test_proposal_validation_and_job_emission(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal_text = json.dumps( { "observation": "Current TTFT fails before TPOT.", "diagnosis": "Prefill pressure dominates.", "config_patch": { "env_patch": {"VLLM_ATTENTION_BACKEND": "FLASHINFER"}, "flag_patch": {"tensor-parallel-size": 4, "max-num-seqs": 64} }, "expected_effects": ["lower TTFT", "raise feasible sampling_u"], "why_not_previous_failures": "Avoids changing unsupported envs." } ) proposal = parse_proposal_text(proposal_text, study) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) job = build_trial_job(study=study, trial=trial, repo_root=tmp_path) jobs_path = tmp_path / "jobs.toml" append_job(jobs_path, job) rendered = jobs_path.read_text(encoding="utf-8") self.assertIn('name = "study-1-trial-0001"', rendered) self.assertIn('command = "python3 -m aituner.cli worker run-trial', rendered) self.assertIn('PYTHONPATH = "src"', rendered) def test_ingest_trial_results_updates_best(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["raise rate"] } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) Path(trial.result_path).write_text( json.dumps( { "study_id": study.study_id, "trial_id": trial.trial_id, "status": "completed", "best_sampling_u": 0.75, "best_request_rate": 12.5, "best_pass_rate": 0.97 } ), encoding="utf-8", ) next_state = store.ingest_trial_results(study.study_id) self.assertEqual(next_state.best_trial_id, trial.trial_id) self.assertEqual(next_state.best_sampling_u, 0.75) self.assertEqual(next_state.best_request_rate, 12.5) self.assertEqual(next_state.best_parallel_size, 4) self.assertEqual(next_state.best_request_rate_per_gpu, 3.125) self.assertEqual( next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"], 3.125, ) def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = StudyState( study_id=study.study_id, best_trial_id="trial-0001", best_parallel_size=4, best_sampling_u=0.375, best_request_rate=3.0, best_request_rate_per_gpu=0.75, next_trial_index=2, best_by_parallel_size={ "4": { "trial_id": "trial-0001", "parallel_size": 4, "best_sampling_u": 0.375, "best_request_rate": 3.0, "best_request_rate_per_gpu": 0.75, } }, trials=[], ) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["raise rate"], } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) self.assertEqual(trial.search.low, 0.375) self.assertEqual(trial.search.high, 1.0) def test_materialize_trial_uses_same_parallel_group_incumbent(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = StudyState( study_id=study.study_id, best_trial_id="trial-0001", best_parallel_size=4, best_sampling_u=0.375, best_request_rate=3.0, best_request_rate_per_gpu=0.75, next_trial_index=2, best_by_parallel_size={ "2": { "trial_id": "trial-0000", "parallel_size": 2, "best_sampling_u": 0.125, "best_request_rate": 0.8, "best_request_rate_per_gpu": 0.4, } }, trials=[], ) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}}, "expected_effects": ["raise rate"], } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) self.assertEqual(trial.search.low, 0.125) def test_materialize_trial_resets_search_floor_for_new_parallel_group(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = StudyState( study_id=study.study_id, best_trial_id="trial-0001", best_parallel_size=4, best_sampling_u=0.4, best_request_rate=3.0, best_request_rate_per_gpu=0.75, next_trial_index=2, trials=[], ) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}}, "expected_effects": ["raise rate"], } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) self.assertEqual(trial.search.low, study.search.low) def test_ingest_trial_results_records_failure_reason(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["raise rate"] } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) Path(trial.result_path).write_text( json.dumps( { "study_id": study.study_id, "trial_id": trial.trial_id, "status": "failed", "failure_reason": "engine_process_exited_before_ready exit_code=1", "probes": [] } ), encoding="utf-8", ) next_state = store.ingest_trial_results(study.study_id) self.assertEqual(next_state.trials[0].status, "failed") self.assertEqual( next_state.trials[0].failure_reason, "engine_process_exited_before_ready exit_code=1", ) self.assertEqual(next_state.trials[0].failure_stage, "") def test_ingest_trial_results_records_failure_stage(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["raise rate"] } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) Path(trial.result_path).write_text( json.dumps( { "study_id": study.study_id, "trial_id": trial.trial_id, "status": "failed", "failure_stage": "engine_launch", "failure_reason": "engine_process_exited_before_ready exit_code=1", "probes": [] } ), encoding="utf-8", ) next_state = store.ingest_trial_results(study.study_id) self.assertEqual(next_state.trials[0].failure_stage, "engine_launch") def test_ingest_trial_results_prefers_higher_request_rate_per_gpu(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal_a = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["raise rate"], } ) trial_a, state = store.materialize_trial(study=study, state=state, proposal=proposal_a) Path(trial_a.result_path).write_text( json.dumps( { "study_id": study.study_id, "trial_id": trial_a.trial_id, "status": "completed", "best_sampling_u": 0.5, "best_request_rate": 4.0, "best_pass_rate": 0.97, } ), encoding="utf-8", ) proposal_b = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}}, "expected_effects": ["raise rate"], } ) trial_b, _ = store.materialize_trial(study=study, state=state, proposal=proposal_b) Path(trial_b.result_path).write_text( json.dumps( { "study_id": study.study_id, "trial_id": trial_b.trial_id, "status": "completed", "best_sampling_u": 0.4, "best_request_rate": 3.0, "best_pass_rate": 0.97, } ), encoding="utf-8", ) next_state = store.ingest_trial_results(study.study_id) self.assertEqual(next_state.best_trial_id, trial_b.trial_id) self.assertEqual(next_state.best_parallel_size, 2) self.assertEqual(next_state.best_request_rate, 3.0) self.assertEqual(next_state.best_request_rate_per_gpu, 1.5) self.assertEqual(next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"], 1.0) self.assertEqual(next_state.best_by_parallel_size["2"]["best_request_rate_per_gpu"], 1.5) def test_validate_proposal_rejects_invalid_tp_dp_product(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "enable-expert-parallel": True, "tensor-parallel-size": 4, "data-parallel-size": 2, "expert-parallel-size": 8, }, "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", ], "topology_constraints": { "require_tp_dp_product_equals_gpu_count": True, "require_ep_size_leq_tp_dp_product": True, "require_ep_size_divides_tp_dp_product": True, "allowed_tensor_parallel_sizes": [1, 2, 4, 8], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1, 2, 4, 8], }, }, ) study = load_study_spec(study_path) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Bad topology", "config_patch": { "env_patch": {}, "flag_patch": { "tensor-parallel-size": 2, "data-parallel-size": 2, "expert-parallel-size": 4, }, }, "expected_effects": ["raise throughput"], } ) with self.assertRaisesRegex(SpecError, "must equal hardware.gpu_count"): validate_proposal(proposal, study) def test_validate_proposal_rejects_invalid_ep_divisibility(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "enable-expert-parallel": True, "tensor-parallel-size": 4, "data-parallel-size": 2, "expert-parallel-size": 8, }, "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", ], "topology_constraints": { "require_tp_dp_product_equals_gpu_count": True, "require_ep_size_leq_tp_dp_product": True, "require_ep_size_divides_tp_dp_product": True, "allowed_tensor_parallel_sizes": [1, 2, 4, 8], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1, 2, 4, 8], }, }, ) study = load_study_spec(study_path) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Bad EP", "config_patch": { "env_patch": {}, "flag_patch": { "expert-parallel-size": 3, }, }, "expected_effects": ["raise throughput"], } ) with self.assertRaisesRegex(SpecError, "expert-parallel-size=3"): validate_proposal(proposal, study) def test_validate_proposal_accepts_valid_tp_dp_ep_combo(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "enable-expert-parallel": True, "tensor-parallel-size": 4, "data-parallel-size": 2, "expert-parallel-size": 8, }, "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", ], "topology_constraints": { "require_tp_dp_product_equals_gpu_count": True, "require_ep_size_leq_tp_dp_product": True, "require_ep_size_divides_tp_dp_product": True, "allowed_tensor_parallel_sizes": [1, 2, 4, 8], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1, 2, 4, 8], }, }, ) study = load_study_spec(study_path) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Valid topology", "config_patch": { "env_patch": {}, "flag_patch": { "tensor-parallel-size": 2, "data-parallel-size": 4, "expert-parallel-size": 4, }, }, "expected_effects": ["raise throughput"], } ) validated = validate_proposal(proposal, study) self.assertEqual(validated.config_patch.flag_patch["tensor-parallel-size"], 2) def test_validate_proposal_accepts_allowed_tp_dp_product_above_gpu_count(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "enable-expert-parallel": False, "tensor-parallel-size": 4, "data-parallel-size": 1, "expert-parallel-size": 1, }, "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", ], "topology_constraints": { "require_tp_dp_product_equals_gpu_count": False, "require_ep_size_leq_tp_dp_product": True, "require_ep_size_divides_tp_dp_product": True, "allowed_tp_dp_products": [1, 2, 4, 8], "allowed_tensor_parallel_sizes": [1, 2, 4, 8], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1], }, }, ) study = load_study_spec(study_path) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Allow product 8", "config_patch": { "env_patch": {}, "flag_patch": { "tensor-parallel-size": 4, "data-parallel-size": 2, "expert-parallel-size": 1, }, }, "expected_effects": ["explore larger topology"], } ) validated = validate_proposal(proposal, study) self.assertEqual(validated.config_patch.flag_patch["data-parallel-size"], 2) def test_validate_proposal_rejects_tp_dp_product_outside_allowed_set(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets( tmp_path, engine_overrides={ "base_flags": { "host": "127.0.0.1", "port": 8000, "enable-expert-parallel": False, "tensor-parallel-size": 4, "data-parallel-size": 1, "expert-parallel-size": 1, }, "tunable_flags": [ "tensor-parallel-size", "data-parallel-size", "expert-parallel-size", ], "topology_constraints": { "require_tp_dp_product_equals_gpu_count": False, "require_ep_size_leq_tp_dp_product": True, "require_ep_size_divides_tp_dp_product": True, "allowed_tp_dp_products": [1, 2, 4, 8], "allowed_tensor_parallel_sizes": [1, 2, 3, 4, 8], "allowed_data_parallel_sizes": [1, 2, 3, 4, 8], "allowed_expert_parallel_sizes": [1], }, }, ) study = load_study_spec(study_path) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Invalid product", "config_patch": { "env_patch": {}, "flag_patch": { "tensor-parallel-size": 3, "data-parallel-size": 2, "expert-parallel-size": 1, }, }, "expected_effects": ["explore invalid topology"], } ) with self.assertRaisesRegex(SpecError, "not in \\[1, 2, 4, 8\\]"): validate_proposal(proposal, study) def test_cli_tune_runs_multiple_manual_proposals(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) proposal1 = tmp_path / "proposal-1.json" proposal2 = tmp_path / "proposal-2.json" proposal1.write_text( json.dumps( { "observation": "trial one", "diagnosis": "conservative", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["stable"], "why_not_previous_failures": "", } ), encoding="utf-8", ) proposal2.write_text( json.dumps( { "observation": "trial two", "diagnosis": "more batching", "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}}, "expected_effects": ["higher throughput"], "why_not_previous_failures": "", } ), encoding="utf-8", ) store_root = tmp_path / "store" def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) trial_id = str(payload["trial_id"]) trial_root = Path(payload["artifact_dir"]) if trial_id.endswith("0001"): best_rate = 1.0 best_u = 0.5 else: best_rate = 2.0 best_u = 0.75 result = { "study_id": payload["study_id"], "trial_id": trial_id, "status": "completed", "best_sampling_u": best_u, "best_request_rate": best_rate, "best_pass_rate": 1.0, "best_request_count": 2, "probes": [], } (trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8") return result with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial): exit_code = cli_main( [ "study", "tune", "--spec", str(study_path), "--store-root", str(store_root), "--proposal-file", str(proposal1), "--proposal-file", str(proposal2), ] ) self.assertEqual(exit_code, 0) store = StudyStore(store_root) state = store.load_state("study-1") self.assertEqual(state.best_trial_id, "trial-0002") self.assertEqual(state.best_sampling_u, 0.75) self.assertEqual(state.best_request_rate, 2.0) self.assertEqual(state.next_trial_index, 3) def test_cli_tune_honors_should_stop_proposal(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) proposal_path = tmp_path / "stop.json" proposal_path.write_text( json.dumps( { "observation": "incumbent converged", "diagnosis": "no adjacent harness probe is justified", "config_patch": {"env_patch": {}, "flag_patch": {}}, "expected_effects": ["stop without spending another GPU trial"], "why_not_previous_failures": "not applicable", "should_stop": True, } ), encoding="utf-8", ) store_root = tmp_path / "store" with mock.patch("aituner.cli.run_trial") as run_trial_mock: exit_code = cli_main( [ "study", "tune", "--spec", str(study_path), "--store-root", str(store_root), "--proposal-file", str(proposal_path), ] ) self.assertEqual(exit_code, 0) run_trial_mock.assert_not_called() store = StudyStore(store_root) state = store.load_state("study-1") self.assertEqual(state.next_trial_index, 1) def test_cli_tune_evaluates_baseline_before_llm_proposal(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) payload = json.loads(study_path.read_text(encoding="utf-8")) payload["llm"]["endpoint"] = { "provider": "custom", "base_url": "http://llm.example/v1", "wire_api": "chat.completions", "model": "test-model", "api_key_env": "OPENAI_API_KEY", } study_path.write_text(json.dumps(payload), encoding="utf-8") store_root = tmp_path / "store" def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) trial_root = Path(payload["artifact_dir"]) result = { "study_id": payload["study_id"], "trial_id": payload["trial_id"], "status": "completed", "best_sampling_u": 0.25, "best_request_rate": 1.0, "best_pass_rate": 1.0, "best_request_count": 2, "probes": [], } (trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8") return result llm_payload = json.dumps( { "observation": "baseline done", "diagnosis": "try more batching", "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}}, "expected_effects": ["higher throughput"], "why_not_previous_failures": "", "should_stop": False, } ) with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial): with mock.patch("aituner.cli.call_llm_for_proposal", return_value=llm_payload): exit_code = cli_main( [ "study", "tune", "--spec", str(study_path), "--store-root", str(store_root), "--max-trials", "2", ] ) self.assertEqual(exit_code, 0) store = StudyStore(store_root) state = store.load_state("study-1") self.assertEqual(state.next_trial_index, 3) self.assertEqual(state.trials[0].config_patch, {"env_patch": {}, "flag_patch": {}}) self.assertEqual(state.trials[1].config_patch["flag_patch"], {"max-num-seqs": 64}) def test_load_compare_spec_requires_window_selection(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) compare_path = tmp_path / "compare.json" compare_path.write_text( json.dumps( { "compare_id": "compare-1", "study_spec_path": str(study_path), "baseline": {"config_patch": {"env_patch": {}, "flag_patch": {}}}, "tuned": {"config_patch": {"env_patch": {}, "flag_patch": {}}}, } ), encoding="utf-8", ) with self.assertRaisesRegex(SpecError, "window_ids or window_selector"): load_compare_spec(compare_path) def test_run_compare_outputs_summary_and_report(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) trace_dir = tmp_path / "trace_windows" / "traces" trace_path = trace_dir / "chat_w2.jsonl" trace_path.write_text( json.dumps( { "request_id": "r4", "timestamp": 0.0, "sampling_u": 0.2, "messages": [{"role": "user", "content": "extra"}], "input_length": 3000, "output_length": 32, } ) + "\n", encoding="utf-8", ) windows_path = tmp_path / "trace_windows" / "windows.json" windows_payload = json.loads(windows_path.read_text(encoding="utf-8")) windows_payload["windows"].append( { "window_id": "chat_w2", "trace_type": "chat", "trace_file": "traces/chat_w2.jsonl", "window_start": 0.0, "window_end": 10.0, "date": "2026-03-12", "slot_token": "1000", "slot_label": "10:00-10:10", } ) windows_payload["windows"][0]["date"] = "2026-03-11" windows_payload["windows"][0]["slot_token"] = "1000" windows_payload["windows"][0]["slot_label"] = "10:00-10:10" windows_path.write_text(json.dumps(windows_payload), encoding="utf-8") compare_path = _write_compare_assets( tmp_path, study_path=study_path, window_ids=["chat_w1", "chat_w2"], ) def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) source_path = Path(trial_payload["study_spec_path"]) actual_spec_path = Path(source_path.read_text(encoding="utf-8").strip()) study_payload = json.loads(actual_spec_path.read_text(encoding="utf-8")) window_id = study_payload["trace"]["window_id"] trial_id = trial_payload["trial_id"] rate_map = { ("chat_w1", "baseline"): 1.0, ("chat_w1", "tuned"): 3.0, ("chat_w2", "baseline"): 3.0, ("chat_w2", "tuned"): 7.0, } best_rate = rate_map[(window_id, trial_id)] result = { "study_id": trial_payload["study_id"], "trial_id": trial_id, "status": "completed", "best_sampling_u": 0.5, "best_request_rate": best_rate, "best_pass_rate": 1.0, "best_request_count": 2, "probes": [], } Path(trial_payload["result_path"]).write_text( json.dumps(result), encoding="utf-8", ) return result with mock.patch("aituner.compare.run_trial", side_effect=fake_run_trial): summary = run_compare(compare_path, output_root=tmp_path / ".compare") self.assertEqual(len(summary["windows"]), 2) self.assertEqual(summary["aggregate"]["wins"]["tuned"], 2) self.assertTrue((tmp_path / ".compare" / "summary.json").exists()) self.assertTrue((tmp_path / ".compare" / "report.md").exists()) def test_run_compare_resolves_trial_ref_candidate(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) prior_root = tmp_path / "prior-study" trial_dir = prior_root / "trials" / "trial-0002" trial_dir.mkdir(parents=True) trial_spec = { "study_id": "prior-study", "trial_id": "trial-0002", "config_patch": { "env_patch": {}, "flag_patch": {"data-parallel-size": 2}, }, "search": { "low": 0.0, "high": 1.0, "tolerance": 0.01, "max_probes": 8, "sample_seed": 20260325, }, "study_spec_path": str(study_path), "artifact_dir": str(trial_dir), "probe_log_path": str(trial_dir / "probe_history.json"), "engine_log_path": str(trial_dir / "engine.log"), "result_path": str(trial_dir / "result.json"), } (trial_dir / "trial_spec.json").write_text(json.dumps(trial_spec), encoding="utf-8") compare_path = _write_compare_assets( tmp_path, study_path=study_path, window_ids=["chat_w1"], baseline={ "trial_ref": { "study_root": str(prior_root), "trial_id": "trial-0002", } }, ) def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) flags = (trial_payload["config_patch"] or {}).get("flag_patch") or {} best_rate = 5.0 if flags.get("data-parallel-size") == 2 else 2.0 result = { "study_id": trial_payload["study_id"], "trial_id": trial_payload["trial_id"], "status": "completed", "best_sampling_u": 0.5, "best_request_rate": best_rate, "best_pass_rate": 1.0, "best_request_count": 2, "probes": [], } Path(trial_payload["result_path"]).write_text(json.dumps(result), encoding="utf-8") return result with mock.patch("aituner.compare.run_trial", side_effect=fake_run_trial): summary = run_compare(compare_path, output_root=tmp_path / ".compare") self.assertEqual(summary["baseline_source"]["kind"], "trial_ref") self.assertEqual( summary["windows"][0]["baseline"]["config_patch"]["flag_patch"]["data-parallel-size"], 2, ) def test_run_compare_window_selector_filters_windows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) trace_dir = tmp_path / "trace_windows" / "traces" for name in ("chat_w2.jsonl", "thinking_w3.jsonl"): (trace_dir / name).write_text( json.dumps( { "request_id": name, "timestamp": 0.0, "sampling_u": 0.2, "messages": [{"role": "user", "content": name}], "input_length": 3000, "output_length": 32, } ) + "\n", encoding="utf-8", ) windows_path = tmp_path / "trace_windows" / "windows.json" windows_payload = json.loads(windows_path.read_text(encoding="utf-8")) windows_payload["windows"][0]["date"] = "2026-03-11" windows_payload["windows"][0]["slot_token"] = "1000" windows_payload["windows"].append( { "window_id": "chat_w2", "trace_type": "chat", "trace_file": "traces/chat_w2.jsonl", "window_start": 0.0, "window_end": 10.0, "date": "2026-03-12", "slot_token": "1000", } ) windows_payload["windows"].append( { "window_id": "thinking_w3", "trace_type": "thinking", "trace_file": "traces/thinking_w3.jsonl", "window_start": 0.0, "window_end": 10.0, "date": "2026-03-12", "slot_token": "1000", } ) windows_path.write_text(json.dumps(windows_payload), encoding="utf-8") compare_path = _write_compare_assets( tmp_path, study_path=study_path, window_selector={"trace_type": "chat", "date_prefix": "2026-03-12"}, ) def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) result = { "study_id": trial_payload["study_id"], "trial_id": trial_payload["trial_id"], "status": "completed", "best_sampling_u": 0.5, "best_request_rate": 1.0, "best_pass_rate": 1.0, "best_request_count": 2, "probes": [], } Path(trial_payload["result_path"]).write_text(json.dumps(result), encoding="utf-8") return result with mock.patch("aituner.compare.run_trial", side_effect=fake_run_trial): summary = run_compare(compare_path, output_root=tmp_path / ".compare") self.assertEqual([row["window_id"] for row in summary["windows"]], ["chat_w2"]) def test_proposal_expected_effects_accepts_string(self) -> None: proposal = Proposal.from_dict( { "observation": "obs", "diagnosis": "diag", "config_patch": {"env_patch": {}, "flag_patch": {}}, "expected_effects": "higher throughput", } ) self.assertEqual(proposal.expected_effects, ["higher throughput"]) def test_proposal_expected_effects_accepts_object(self) -> None: proposal = Proposal.from_dict( { "observation": "obs", "diagnosis": "diag", "config_patch": {"env_patch": {}, "flag_patch": {}}, "expected_effects": { "throughput": "higher", "ttft": "lower", }, } ) self.assertEqual( proposal.expected_effects, ["throughput: higher", "ttft: lower"], ) def test_proposal_accepts_should_stop(self) -> None: proposal = Proposal.from_dict( { "observation": "obs", "diagnosis": "converged", "config_patch": {"env_patch": {}, "flag_patch": {}}, "expected_effects": ["avoid wasting another GPU trial"], "should_stop": True, } ) self.assertTrue(proposal.should_stop) def test_parse_proposal_text_accepts_wrapped_json(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) proposal = parse_proposal_text( """Here is the proposal: ```json {"observation":"obs","diagnosis":"diag","config_patch":{"env_patch":{},"flag_patch":{"max-num-seqs":32}},"expected_effects":["higher throughput"],"why_not_previous_failures":"keeps supported knobs"} ```""", study, ) self.assertEqual(proposal.config_patch.flag_patch["max-num-seqs"], 32) def test_replay_requests_early_stops_when_slo_is_unrecoverable(self) -> None: requests = [ TraceRequest( row_id=f"r{i}", arrival_s=0.0, sampling_u=0.1 * i, body={"model": "m", "messages": [{"role": "user", "content": "x"}]}, prompt_tokens_hint=8, completion_tokens_hint=4, ) for i in range(3) ] outcomes = [ RequestOutcome( request_id="r0", success=False, ttft_ms=None, tpot_ms=None, prompt_tokens=8, completion_tokens=4, error="request_failed", ) ] def fake_run_one_request(*args, **kwargs): return outcomes.pop(0) def fake_evaluate(outcome: RequestOutcome): return type("Eval", (), {"passed": outcome.success})() with mock.patch("aituner.worker._run_one_request", side_effect=fake_run_one_request): replayed, early_stopped, reason = _replay_requests( requests, base_url="http://127.0.0.1:8000", timeout_s=1.0, max_concurrency=1, target_pass_rate=0.95, max_lag_s=None, max_elapsed_s=None, evaluate_outcome=fake_evaluate, ) self.assertTrue(early_stopped) self.assertEqual(reason, "slo_pass_rate_unrecoverable") self.assertEqual(len(replayed), 3) self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable") def test_replay_requests_does_not_wait_for_inflight_after_early_stop(self) -> None: requests = [ TraceRequest( row_id="r0", arrival_s=0.0, sampling_u=0.1, body={"model": "m", "messages": [{"role": "user", "content": "x"}]}, prompt_tokens_hint=8, completion_tokens_hint=4, ), TraceRequest( row_id="r1", arrival_s=0.0, sampling_u=0.2, body={"model": "m", "messages": [{"role": "user", "content": "y"}]}, prompt_tokens_hint=8, completion_tokens_hint=4, ), ] class FakeFuture: def __init__(self, outcome=None, *, should_fail_if_waited=False): self._outcome = outcome self._should_fail_if_waited = should_fail_if_waited def result(self, timeout=None): if self._should_fail_if_waited: raise AssertionError("in-flight future should not be awaited after early stop") return self._outcome def cancel(self): return True done_future = FakeFuture( RequestOutcome( request_id="r0", success=False, ttft_ms=None, tpot_ms=None, prompt_tokens=8, completion_tokens=4, error="request_failed", ) ) inflight_future = FakeFuture(should_fail_if_waited=True) submitted = [] class FakeExecutor: def __init__(self, max_workers): self.max_workers = max_workers def submit(self, fn, request, **kwargs): submitted.append(request.row_id) if request.row_id == "r0": return done_future return inflight_future def shutdown(self, wait=False, cancel_futures=True): return None def fake_wait(futures, timeout=None, return_when=None): self.assertEqual(len(futures), 2) return {done_future}, {inflight_future} def fake_evaluate(outcome: RequestOutcome): return type("Eval", (), {"passed": outcome.success})() with mock.patch("aituner.worker.ThreadPoolExecutor", FakeExecutor): with mock.patch("aituner.worker.wait", side_effect=fake_wait): replayed, early_stopped, reason = _replay_requests( requests, base_url="http://127.0.0.1:8000", timeout_s=30.0, max_concurrency=2, target_pass_rate=0.95, max_lag_s=None, max_elapsed_s=None, evaluate_outcome=fake_evaluate, drain_inflight_on_early_stop=False, ) self.assertEqual(submitted, ["r0", "r1"]) self.assertTrue(early_stopped) self.assertEqual(reason, "slo_pass_rate_unrecoverable") self.assertEqual(len(replayed), 2) self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable") def test_replay_requests_respects_max_elapsed_while_waiting_for_inflight(self) -> None: requests = [ TraceRequest( row_id="r0", arrival_s=0.0, sampling_u=0.1, body={"model": "m", "messages": [{"role": "user", "content": "x"}]}, prompt_tokens_hint=8, completion_tokens_hint=4, ) ] class FakeFuture: def result(self, timeout=None): raise AssertionError("future should not be awaited after elapsed early stop") def cancel(self): return True submitted = [] class FakeExecutor: def __init__(self, max_workers): self.max_workers = max_workers def submit(self, fn, request, **kwargs): submitted.append(request.row_id) return FakeFuture() def shutdown(self, wait=False, cancel_futures=True): return None wait_timeouts: list[float] = [] def fake_wait(futures, timeout=None, return_when=None): wait_timeouts.append(timeout) return set(), set(futures) def fake_evaluate(outcome: RequestOutcome): return type("Eval", (), {"passed": outcome.success})() monotonic_values = iter([0.0, 0.0, 0.4, 1.2]) with mock.patch("aituner.worker.ThreadPoolExecutor", FakeExecutor): with mock.patch("aituner.worker.wait", side_effect=fake_wait): with mock.patch("aituner.worker.time.monotonic", side_effect=lambda: next(monotonic_values)): replayed, early_stopped, reason = _replay_requests( requests, base_url="http://127.0.0.1:8000", timeout_s=30.0, max_concurrency=1, target_pass_rate=0.95, max_lag_s=None, max_elapsed_s=1.0, evaluate_outcome=fake_evaluate, drain_inflight_on_early_stop=False, ) self.assertEqual(submitted, ["r0"]) self.assertTrue(early_stopped) self.assertEqual(reason, "probe_elapsed_s>1.0") self.assertEqual(len(replayed), 1) self.assertEqual(replayed[0].error, "probe_elapsed_s>1.0") self.assertTrue(wait_timeouts) self.assertLessEqual(wait_timeouts[0], 0.5) def test_latency_summary_reports_quantiles_and_slo(self) -> None: study = load_study_spec(_write_study_assets(Path(tempfile.mkdtemp()))) outcomes = [ RequestOutcome( request_id="r1", success=True, ttft_ms=100.0, tpot_ms=10.0, prompt_tokens=100, completion_tokens=10, ), RequestOutcome( request_id="r2", success=True, ttft_ms=200.0, tpot_ms=20.0, prompt_tokens=5000, completion_tokens=10, ), ] evaluations = [evaluate_request(item, study.slo) for item in outcomes] summary = _latency_summary(outcomes=outcomes, evaluations=evaluations, study=study) self.assertEqual(summary["observed_request_count"], 2) self.assertEqual(summary["request_mode"], "chat") self.assertEqual(summary["ttft_ms"]["mean"], 150.0) self.assertEqual(summary["ttft_ms"]["p50"], 100.0) self.assertEqual(summary["ttft_ms"]["p99"], 200.0) self.assertEqual(summary["tpot_ms"]["mean"], 15.0) self.assertEqual(summary["slo"]["target_pass_rate"], 0.95) def test_wait_for_server_or_exit_fails_fast_when_process_exits(self) -> None: process = mock.Mock() process.poll.return_value = 17 with self.assertRaisesRegex(RuntimeError, "engine_process_exited_before_ready exit_code=17"): _wait_for_server_or_exit( process, base_url="http://127.0.0.1:8000", healthcheck_path="/v1/models", ready_timeout_s=10.0, ) def test_terminate_process_tree_kills_process_group(self) -> None: process = mock.Mock() process.pid = 1234 process.poll.side_effect = [None, None, 0] process.wait.return_value = 0 with mock.patch("aituner.worker.os.getpgid", return_value=1234): with mock.patch("aituner.worker.os.killpg") as mock_killpg: _terminate_process_tree(process, timeout_s=1.0) mock_killpg.assert_called_once() self.assertEqual(mock_killpg.call_args[0][0], 1234) def test_openai_url_avoids_double_v1(self) -> None: self.assertEqual( _openai_url("http://example.com", "/v1/chat/completions"), "http://example.com/v1/chat/completions", ) self.assertEqual( _openai_url("http://example.com/v1", "/v1/chat/completions"), "http://example.com/v1/chat/completions", ) def test_loopback_urls_bypass_proxy(self) -> None: self.assertTrue(_should_bypass_proxy("http://127.0.0.1:8000/v1/models")) self.assertTrue(_should_bypass_proxy("http://localhost:8000/health")) self.assertFalse(_should_bypass_proxy("http://example.com/v1/models")) if __name__ == "__main__": unittest.main()