diff --git a/configs/examples/dash0_smoke_proposals/aggressive.json b/configs/examples/dash0_smoke_proposals/aggressive.json new file mode 100644 index 0000000..5a7a816 --- /dev/null +++ b/configs/examples/dash0_smoke_proposals/aggressive.json @@ -0,0 +1,19 @@ +{ + "observation": "Push batching further after validating the balanced layout.", + "diagnosis": "If TTFT remains under control, a larger admission window should maximize achieved request rate.", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 4, + "max-num-seqs": 24, + "max-num-batched-tokens": 98304, + "gpu-memory-utilization": 0.92, + "block-size": 64 + } + }, + "expected_effects": [ + "Highest throughput among the smoke candidates if memory is sufficient", + "More pressure on TTFT, so the binary search should locate the safe threshold" + ], + "why_not_previous_failures": "Keeps tp=4 and block-size stable while only expanding batching and memory utilization." +} diff --git a/configs/examples/dash0_smoke_proposals/balanced.json b/configs/examples/dash0_smoke_proposals/balanced.json new file mode 100644 index 0000000..04b38a3 --- /dev/null +++ b/configs/examples/dash0_smoke_proposals/balanced.json @@ -0,0 +1,19 @@ +{ + "observation": "Increase batching once tp=4 is fixed.", + "diagnosis": "Throughput should improve if the engine can admit more concurrent prefills without violating TTFT.", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 4, + "max-num-seqs": 16, + "max-num-batched-tokens": 65536, + "gpu-memory-utilization": 0.9, + "block-size": 64 + } + }, + "expected_effects": [ + "Higher feasible sampling_u than the conservative baseline", + "Better token throughput if memory headroom is sufficient" + ], + "why_not_previous_failures": "Raises batching in a controlled step instead of jumping directly to the most aggressive setting." +} diff --git a/configs/examples/dash0_smoke_proposals/conservative.json b/configs/examples/dash0_smoke_proposals/conservative.json new file mode 100644 index 0000000..0ad7c28 --- /dev/null +++ b/configs/examples/dash0_smoke_proposals/conservative.json @@ -0,0 +1,19 @@ +{ + "observation": "Start from a safe tp=4 layout and conservative batching.", + "diagnosis": "The first pass should verify multi-GPU launch and avoid queueing collapse from over-batching.", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 4, + "max-num-seqs": 8, + "max-num-batched-tokens": 32768, + "gpu-memory-utilization": 0.85, + "block-size": 64 + } + }, + "expected_effects": [ + "Stable startup on 4x H20", + "Low risk of OOM during the first binary-search probes" + ], + "why_not_previous_failures": "This is the initial baseline proposal." +} diff --git a/configs/examples/dash0_smoke_study.json b/configs/examples/dash0_smoke_study.json new file mode 100644 index 0000000..968ef9c --- /dev/null +++ b/configs/examples/dash0_smoke_study.json @@ -0,0 +1,83 @@ +{ + "study_id": "dash0-qwen30b-chat-smoke", + "hardware": { + "gpu_count": 4, + "gpu_model": "H20", + "host_candidates": ["dash0"] + }, + "model": { + "model_id": "qwen3-30b-a3b", + "served_model_name": "qwen3-30b-smoke" + }, + "engine": { + "engine_name": "vllm", + "engine_version": "0.13.0rc2.dev2111+gb44b43f43.d20260309", + "exec_path": "/usr/local/bin/vllm", + "cwd": "/home/admin/cpfs/wjh/aituner/aituner", + "host": "127.0.0.1", + "port": 18080, + "healthcheck_path": "/v1/models", + "ready_timeout_s": 900, + "request_timeout_s": 900, + "launch_args": [ + "serve", + "/home/admin/resource/model/464482ce.qwen3-30b-a3b/1m-instruct-0726-fp4" + ], + "base_envs": { + "CUDA_VISIBLE_DEVICES": "0,1,2,3" + }, + "base_flags": { + "host": "127.0.0.1", + "port": 18080, + "served-model-name": "qwen3-30b-smoke", + "max-model-len": 65536, + "disable-log-requests": true, + "trust-remote-code": true + }, + "tunable_envs": [ + "VLLM_ATTENTION_BACKEND" + ], + "tunable_flags": [ + "tensor-parallel-size", + "max-num-seqs", + "max-num-batched-tokens", + "gpu-memory-utilization", + "block-size" + ], + "python_executable": "python3" + }, + "trace": { + "windows_path": "/home/admin/cpfs/wjh/aituner/aituner/trace_windows/windows.json", + "window_id": "chat_w20260311_1000", + "u_field": "sampling_u", + "timestamp_field": "timestamp", + "max_concurrency": 2, + "max_requests_per_probe": 24 + }, + "slo": { + "target_pass_rate": 0.95, + "ttft_rule": { + "kind": "step_ms", + "buckets": [ + {"max_input_tokens": 4096, "threshold_ms": 15000}, + {"max_input_tokens": 16384, "threshold_ms": 30000}, + {"threshold_ms": 45000} + ] + }, + "tpot_rule": { + "kind": "fixed_ms", + "threshold_ms": 1500 + } + }, + "search": { + "low": 0.0, + "high": 1.0, + "tolerance": 0.1, + "max_probes": 4, + "sample_seed": 20260325 + }, + "llm": { + "system_prompt": "Propose a single engine config patch that increases the maximum feasible sampling_u under the SLO target.", + "max_history_trials": 8 + } +} diff --git a/configs/examples/trace_windows/traces/chat_w_example_peak_0001.jsonl b/configs/examples/trace_windows/traces/chat_w_example_0001.jsonl similarity index 100% rename from configs/examples/trace_windows/traces/chat_w_example_peak_0001.jsonl rename to configs/examples/trace_windows/traces/chat_w_example_0001.jsonl diff --git a/src/aituner/cli.py b/src/aituner/cli.py index bb4831d..ad59ce7 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -105,6 +105,76 @@ def cmd_study_ingest(args: argparse.Namespace) -> int: return 0 +def cmd_study_tune(args: argparse.Namespace) -> int: + spec_path = Path(args.spec).resolve() + study = load_study_spec(spec_path) + store = StudyStore(Path(args.store_root) if args.store_root else None) + study_root = store.init_study(spec_path=spec_path, study=study) + capability_profile = load_capability_profile(study, study_spec_path=spec_path) + proposal_files = [Path(item).resolve() for item in (args.proposal_file or [])] + max_trials = args.max_trials or (len(proposal_files) if proposal_files else 1) + if max_trials <= 0: + raise SpecError("max_trials must be positive") + if proposal_files and max_trials > len(proposal_files): + max_trials = len(proposal_files) + if not proposal_files and study.llm.endpoint is None: + raise SpecError("No proposal files provided and study.llm.endpoint is not configured") + + executed: list[dict[str, object]] = [] + for idx in range(max_trials): + state = store.load_state(study.study_id) + window, requests = load_trace_requests(study, study_spec_path=spec_path) + prompt = build_prompt( + study=study, + window_summary=summarize_window(requests, window), + state=state, + capability_profile=capability_profile, + ) + prompt_name = f"prompt-{state.next_trial_index:04d}" + store.write_prompt(study.study_id, prompt_name, prompt) + + if proposal_files: + proposal_source = proposal_files[idx] + proposal_text = proposal_source.read_text(encoding="utf-8") + proposal_name = proposal_source.stem + else: + proposal_source = None + proposal_text = call_llm_for_proposal(policy=study.llm, prompt=prompt) + proposal_name = f"proposal-{state.next_trial_index:04d}" + proposal = parse_proposal_text(proposal_text, study) + store.write_proposal(study.study_id, proposal_name, proposal) + trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) + trial_spec_path = Path(trial.artifact_dir) / "trial_spec.json" + result = run_trial(trial_spec_path) + state = store.ingest_trial_results(study.study_id) + executed.append( + { + "trial_id": trial.trial_id, + "proposal_name": proposal_name, + "proposal_source": str(proposal_source) if proposal_source else "llm", + "best_sampling_u": result.get("best_sampling_u"), + "best_request_rate": result.get("best_request_rate"), + "best_pass_rate": result.get("best_pass_rate"), + "state_best_trial_id": state.best_trial_id, + "state_best_request_rate": state.best_request_rate, + } + ) + + final_state = store.load_state(study.study_id) + print( + json.dumps( + { + "study_root": str(study_root), + "executed_trials": executed, + "best_trial_id": final_state.best_trial_id, + "best_request_rate": final_state.best_request_rate, + }, + ensure_ascii=False, + ) + ) + return 0 + + def cmd_worker_run_trial(args: argparse.Namespace) -> int: result = run_trial(Path(args.trial_spec).resolve()) print(json.dumps(result)) @@ -154,6 +224,13 @@ def build_parser() -> argparse.ArgumentParser: ingest.add_argument("--store-root") ingest.set_defaults(func=cmd_study_ingest) + tune = study_sub.add_parser("tune") + tune.add_argument("--spec", required=True) + tune.add_argument("--store-root") + tune.add_argument("--proposal-file", action="append") + tune.add_argument("--max-trials", type=int) + tune.set_defaults(func=cmd_study_tune) + worker = subparsers.add_parser("worker") worker_sub = worker.add_subparsers(dest="worker_command", required=True) run = worker_sub.add_parser("run-trial") diff --git a/src/aituner/trace.py b/src/aituner/trace.py index 0a711a0..398972f 100644 --- a/src/aituner/trace.py +++ b/src/aituner/trace.py @@ -121,6 +121,17 @@ def _coerce_prompt_tokens(row: Mapping[str, Any]) -> int | None: return None +def _downsample_requests( + requests: list[TraceRequest], *, limit: int +) -> list[TraceRequest]: + if limit <= 0: + return [] + if len(requests) <= limit: + return requests + indexes = sorted({(i * len(requests)) // limit for i in range(limit)}) + return [requests[idx] for idx in indexes] + + def load_trace_requests(study: StudySpec, *, study_spec_path: Path) -> tuple[WindowRecord, list[TraceRequest]]: window = resolve_window_record(study, study_spec_path=study_spec_path) requests: list[TraceRequest] = [] @@ -179,7 +190,10 @@ def load_trace_requests(study: StudySpec, *, study_spec_path: Path) -> tuple[Win ) requests.sort(key=lambda item: item.arrival_s) if study.trace.max_requests_per_probe is not None: - requests = requests[: study.trace.max_requests_per_probe] + requests = _downsample_requests( + requests, + limit=study.trace.max_requests_per_probe, + ) return window, requests diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index fa58e29..d614b8c 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -5,7 +5,9 @@ import subprocess import tempfile import unittest from pathlib import Path +from unittest import mock +from aituner.cli import main as cli_main from aituner.job import append_job, build_trial_job from aituner.llm import build_prompt, parse_proposal_text from aituner.search import ThresholdProbe, binary_search_max_feasible @@ -371,6 +373,82 @@ class CoreFlowTests(unittest.TestCase): self.assertGreaterEqual(result.best_threshold, 0.5) self.assertIsNotNone(result.best_feasible_payload) + def test_trace_max_requests_uses_window_wide_downsample(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + trace_dir = tmp_path / "trace_windows" / "traces" + trace_dir.mkdir(parents=True) + trace_path = trace_dir / "chat_many.jsonl" + with trace_path.open("w", encoding="utf-8") as handle: + for idx in range(10): + handle.write( + json.dumps( + { + "request_id": f"r{idx}", + "timestamp": float(idx), + "sampling_u": idx / 10.0, + "messages": [{"role": "user", "content": f"hello-{idx}"}], + "input_length": 10 + idx, + "output_length": 5, + } + ) + + "\n" + ) + windows_path = tmp_path / "trace_windows" / "windows.json" + windows_path.write_text( + json.dumps( + { + "windows": [ + { + "window_id": "w1", + "trace_type": "chat", + "trace_file": "traces/chat_many.jsonl", + "window_start": 0.0, + "window_end": 10.0, + } + ] + } + ), + encoding="utf-8", + ) + study_path = tmp_path / "study.json" + study_path.write_text( + json.dumps( + { + "study_id": "study-downsample", + "hardware": {"gpu_count": 1}, + "model": {"model_id": "m1", "served_model_name": "dummy-model"}, + "engine": { + "engine_name": "vllm", + "exec_path": "/usr/local/bin/vllm", + "host": "127.0.0.1", + "port": 8000, + "ready_timeout_s": 10, + "request_timeout_s": 10, + "healthcheck_path": "/v1/models", + "launch_args": [], + "base_envs": {}, + "base_flags": {}, + "tunable_envs": [], + "tunable_flags": [], + }, + "trace": { + "windows_path": str(windows_path), + "window_id": "w1", + "max_concurrency": 1, + "max_requests_per_probe": 4, + }, + "slo": {"target_pass_rate": 0.95}, + "search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1}, + "llm": {"system_prompt": "", "max_history_trials": 1}, + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + _, requests = load_trace_requests(study, study_spec_path=study_path) + self.assertEqual([item.row_id for item in requests], ["r0", "r2", "r5", "r7"]) + def test_proposal_validation_and_job_emission(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) @@ -437,6 +515,83 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(next_state.best_trial_id, trial.trial_id) self.assertEqual(next_state.best_request_rate, 12.5) + def test_cli_tune_runs_multiple_manual_proposals(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + proposal1 = tmp_path / "proposal-1.json" + proposal2 = tmp_path / "proposal-2.json" + proposal1.write_text( + json.dumps( + { + "observation": "trial one", + "diagnosis": "conservative", + "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, + "expected_effects": ["stable"], + "why_not_previous_failures": "", + } + ), + encoding="utf-8", + ) + proposal2.write_text( + json.dumps( + { + "observation": "trial two", + "diagnosis": "more batching", + "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}}, + "expected_effects": ["higher throughput"], + "why_not_previous_failures": "", + } + ), + encoding="utf-8", + ) + store_root = tmp_path / "store" + + def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: + payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) + trial_id = str(payload["trial_id"]) + trial_root = Path(payload["artifact_dir"]) + if trial_id.endswith("0001"): + best_rate = 1.0 + best_u = 0.5 + else: + best_rate = 2.0 + best_u = 0.75 + result = { + "study_id": payload["study_id"], + "trial_id": trial_id, + "status": "completed", + "best_sampling_u": best_u, + "best_request_rate": best_rate, + "best_pass_rate": 1.0, + "best_request_count": 2, + "probes": [], + } + (trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8") + return result + + with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial): + exit_code = cli_main( + [ + "study", + "tune", + "--spec", + str(study_path), + "--store-root", + str(store_root), + "--proposal-file", + str(proposal1), + "--proposal-file", + str(proposal2), + ] + ) + self.assertEqual(exit_code, 0) + store = StudyStore(store_root) + state = store.load_state("study-1") + self.assertEqual(state.best_trial_id, "trial-0002") + self.assertEqual(state.best_request_rate, 2.0) + self.assertEqual(state.next_trial_index, 3) + if __name__ == "__main__": unittest.main()