from __future__ import annotations import json import tempfile import unittest from pathlib import Path from aituner.job import append_job, build_trial_job from aituner.llm import build_prompt, parse_proposal_text from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.slo import RequestOutcome, summarize_evaluations from aituner.spec import Proposal, load_study_spec from aituner.store import StudyStore from aituner.trace import load_trace_requests, summarize_window def _write_study_assets(tmp_path: Path) -> Path: trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) trace_path = trace_dir / "chat_w1.jsonl" rows = [ { "request_id": "r1", "timestamp": 0.0, "sampling_u": 0.10, "messages": [{"role": "user", "content": "hello"}], "input_length": 1000, "output_length": 16 }, { "request_id": "r2", "timestamp": 1.0, "sampling_u": 0.50, "messages": [{"role": "user", "content": "world"}], "input_length": 5000, "output_length": 32 }, { "request_id": "r3", "timestamp": 2.0, "sampling_u": 0.90, "messages": [{"role": "user", "content": "!"}], "input_length": 20000, "output_length": 64 } ] with trace_path.open("w", encoding="utf-8") as handle: for row in rows: handle.write(json.dumps(row) + "\n") windows_path = tmp_path / "trace_windows" / "windows.json" windows_payload = { "u_field": "sampling_u", "windows": [ { "window_id": "chat_w1", "trace_type": "chat", "trace_file": "traces/chat_w1.jsonl", "window_start": 0.0, "window_end": 10.0 } ] } windows_path.write_text(json.dumps(windows_payload), encoding="utf-8") capability_path = tmp_path / "capability.json" capability_path.write_text( json.dumps({"prefill_service_by_bucket": {"4k": {"tp4_ms": 320, "tp8_ms": 240}}}), encoding="utf-8", ) study_path = tmp_path / "study.json" study_payload = { "study_id": "study-1", "hardware": {"gpu_count": 8, "gpu_model": "H20", "host_candidates": ["dash0"]}, "model": { "model_id": "qwen", "served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507" }, "engine": { "engine_name": "vllm", "engine_version": "0.1", "exec_path": "/usr/local/bin/vllm", "cwd": str(tmp_path), "host": "127.0.0.1", "port": 8000, "healthcheck_path": "/v1/models", "ready_timeout_s": 30, "request_timeout_s": 30, "launch_args": ["serve", "/models/qwen"], "base_envs": {"BASE_ENV": "1"}, "base_flags": {"host": "127.0.0.1", "port": 8000}, "tunable_envs": ["VLLM_ATTENTION_BACKEND"], "tunable_flags": ["tensor-parallel-size", "max-num-seqs"], "python_executable": "python3" }, "trace": { "windows_path": str(windows_path), "window_id": "chat_w1", "u_field": "sampling_u", "timestamp_field": "timestamp", "max_concurrency": 4 }, "slo": { "target_pass_rate": 0.95, "ttft_rule": { "kind": "step_ms", "buckets": [ {"max_input_tokens": 4096, "threshold_ms": 2000}, {"max_input_tokens": 16384, "threshold_ms": 5000}, {"threshold_ms": 9000} ] }, "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120} }, "search": { "low": 0.0, "high": 1.0, "tolerance": 0.01, "max_probes": 8, "sample_seed": 20260325 }, "llm": {"system_prompt": "Tune it.", "max_history_trials": 8}, "capability_profile_path": str(capability_path) } study_path.write_text(json.dumps(study_payload), encoding="utf-8") return study_path class CoreFlowTests(unittest.TestCase): def test_trace_and_prompt_flow(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") study_root = store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) window, requests = load_trace_requests(study, study_spec_path=study_path) summary = summarize_window(requests, window) self.assertEqual(summary["request_count"], 3) self.assertEqual(summary["request_rate"], 0.3) prompt = build_prompt( study=study, window_summary=summary, state=state, capability_profile={"queueing_knee_by_bucket": {"4k": 1000}}, ) self.assertIn("allowed_flag_keys", prompt) self.assertIn("study-1", prompt) self.assertIn("queueing_knee_by_bucket", prompt) self.assertTrue(study_root.exists()) def test_length_only_trace_rows_are_synthesized(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) trace_path = trace_dir / "chat_len_only.jsonl" with trace_path.open("w", encoding="utf-8") as handle: handle.write( json.dumps( { "timestamp": 0.0, "sampling_u": 0.1, "input_length": 32, "output_length": 16 } ) + "\n" ) windows_path = tmp_path / "trace_windows" / "windows.json" windows_path.write_text( json.dumps( { "windows": [ { "window_id": "w1", "trace_type": "chat", "trace_file": "traces/chat_len_only.jsonl", "window_start": 0.0, "window_end": 10.0 } ] } ), encoding="utf-8", ) study_path = tmp_path / "study.json" study_path.write_text( json.dumps( { "study_id": "study-len-only", "hardware": {"gpu_count": 1}, "model": { "model_id": "m1", "served_model_name": "dummy-model" }, "engine": { "engine_name": "vllm", "exec_path": "/usr/local/bin/vllm", "host": "127.0.0.1", "port": 8000, "ready_timeout_s": 10, "request_timeout_s": 10, "healthcheck_path": "/v1/models", "launch_args": [], "base_envs": {}, "base_flags": {}, "tunable_envs": [], "tunable_flags": [] }, "trace": { "windows_path": str(windows_path), "window_id": "w1", "max_concurrency": 1, "synthetic_prompt_cap_tokens": 8 }, "slo": {"target_pass_rate": 0.95}, "search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1}, "llm": {"system_prompt": "", "max_history_trials": 1} } ), encoding="utf-8", ) study = load_study_spec(study_path) _, requests = load_trace_requests(study, study_spec_path=study_path) self.assertEqual(len(requests), 1) message = requests[0].body["messages"][0]["content"] self.assertEqual(message.count("token"), 8) self.assertEqual(requests[0].body["min_tokens"], 16) self.assertEqual(requests[0].body["max_tokens"], 16) def test_slo_evaluation_step_and_fixed_rules(self) -> None: with tempfile.TemporaryDirectory() as tmp: study = load_study_spec(_write_study_assets(Path(tmp))) outcomes = [ RequestOutcome( request_id="r1", success=True, ttft_ms=1000, tpot_ms=100, prompt_tokens=1000, completion_tokens=16, ), RequestOutcome( request_id="r2", success=True, ttft_ms=6000, tpot_ms=100, prompt_tokens=5000, completion_tokens=16, ), ] evaluations, summary = summarize_evaluations(outcomes, study.slo) self.assertTrue(evaluations[0].passed) self.assertFalse(evaluations[1].passed) self.assertEqual(summary["slo_pass_rate"], 0.5) self.assertFalse(summary["feasible"]) def test_binary_search_max_feasible(self) -> None: result = binary_search_max_feasible( low=0.0, high=1.0, tolerance=0.01, max_probes=8, evaluator=lambda threshold: ThresholdProbe( threshold=threshold, feasible=threshold <= 0.625, payload={"threshold": threshold}, ), ) self.assertLessEqual(result.best_threshold, 0.625) self.assertGreaterEqual(result.best_threshold, 0.5) self.assertIsNotNone(result.best_feasible_payload) def test_proposal_validation_and_job_emission(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal_text = json.dumps( { "observation": "Current TTFT fails before TPOT.", "diagnosis": "Prefill pressure dominates.", "config_patch": { "env_patch": {"VLLM_ATTENTION_BACKEND": "FLASHINFER"}, "flag_patch": {"tensor-parallel-size": 4, "max-num-seqs": 64} }, "expected_effects": ["lower TTFT", "raise feasible sampling_u"], "why_not_previous_failures": "Avoids changing unsupported envs." } ) proposal = parse_proposal_text(proposal_text, study) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) job = build_trial_job(study=study, trial=trial, repo_root=tmp_path) jobs_path = tmp_path / "jobs.toml" append_job(jobs_path, job) rendered = jobs_path.read_text(encoding="utf-8") self.assertIn('name = "study-1-trial-0001"', rendered) self.assertIn('command = "python3 -m aituner.cli worker run-trial', rendered) self.assertIn('PYTHONPATH = "src"', rendered) def test_ingest_trial_results_updates_best(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) study_path = _write_study_assets(tmp_path) study = load_study_spec(study_path) store = StudyStore(tmp_path / ".aituner" / "studies") store.init_study(spec_path=study_path, study=study) state = store.load_state(study.study_id) proposal = Proposal.from_dict( { "observation": "Obs", "diagnosis": "Diag", "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, "expected_effects": ["raise rate"] } ) trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) Path(trial.result_path).write_text( json.dumps( { "study_id": study.study_id, "trial_id": trial.trial_id, "status": "completed", "best_sampling_u": 0.75, "best_request_rate": 12.5, "best_pass_rate": 0.97 } ), encoding="utf-8", ) next_state = store.ingest_trial_results(study.study_id) self.assertEqual(next_state.best_trial_id, trial.trial_id) self.assertEqual(next_state.best_request_rate, 12.5) if __name__ == "__main__": unittest.main()