diff --git a/configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json b/configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json index 00c394b..3ba070e 100644 --- a/configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json +++ b/configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json @@ -1,7 +1,7 @@ { "study_id": "dash0-qwen27b-tight-slo-10min-run4-chat-0-8k", "hardware": { - "gpu_count": 4, + "gpu_count": 8, "gpu_model": "H20", "host_candidates": [ "dash0" @@ -26,7 +26,6 @@ "/home/admin/resource/model/464482ce/qwen3.5-27b/256k-0223-internal" ], "base_envs": { - "CUDA_VISIBLE_DEVICES": "4,5,6,7", "VLLM_DISABLE_COMPILE_CACHE": "1", "DS_LLM_IGNORE_WARMUP": "1", "DS_LLM_IGNORE_CHECK_WARMUP": "1", @@ -73,10 +72,7 @@ "mamba-cache-dtype": "float32", "skip-mm-profiling": true, "quantization": "fp8", - "tensor-parallel-size": 4, - "data-parallel-size": 1, - "expert-parallel-size": 1, - "max-num-seqs": 16, + "tensor-parallel-size": 1, "disable-log-requests": true }, "tunable_envs": [ @@ -100,7 +96,7 @@ "require_enable_expert_parallel_when_ep_gt_one": true, "validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter": true, "allowed_tp_dp_products": [1, 2, 4, 8], - "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], "allowed_data_parallel_sizes": [1, 2, 4, 8], "allowed_expert_parallel_sizes": [1] }, diff --git a/src/aituner/llm.py b/src/aituner/llm.py index 6658a2e..f6f8f4b 100644 --- a/src/aituner/llm.py +++ b/src/aituner/llm.py @@ -192,8 +192,10 @@ def build_prompt( { "trial_id": trial.trial_id, "status": trial.status, + "parallel_size": trial.parallel_size, "best_sampling_u": trial.best_sampling_u, "best_request_rate": trial.best_request_rate, + "best_request_rate_per_gpu": trial.best_request_rate_per_gpu, "best_pass_rate": trial.best_pass_rate, "diagnosis": trial.diagnosis, "config_patch": trial.config_patch, @@ -224,9 +226,12 @@ def build_prompt( "study_id": study.study_id, "current_best": { "trial_id": state.best_trial_id, + "best_parallel_size": state.best_parallel_size, "best_sampling_u": state.best_sampling_u, "best_request_rate": state.best_request_rate, + "best_request_rate_per_gpu": state.best_request_rate_per_gpu, }, + "best_by_parallel_size": state.best_by_parallel_size, "hardware": { "gpu_count": study.hardware.gpu_count, "gpu_model": study.hardware.gpu_model, @@ -294,9 +299,10 @@ def build_prompt( "Parallel space candidates:", json.dumps(parallel_candidates, ensure_ascii=False, indent=2), "", - "The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.", - "The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.", - "The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.", + "The primary cross-topology comparison metric is request_rate_per_gpu, not raw request_rate.", + "The proposal should beat the incumbent on request_rate_per_gpu under the 95%+ SLO target.", + "The evaluator uses the best feasible sampling_u from the same tp_dp_product group when it exists. For a different tp_dp_product group, it uses a scaled lower floor instead of reusing the global incumbent directly.", + "Do not assume a configuration with fewer GPUs must start from the global incumbent sampling_u.", ] return "\n".join(sections) diff --git a/src/aituner/spec.py b/src/aituner/spec.py index da5acb2..4d78160 100644 --- a/src/aituner/spec.py +++ b/src/aituner/spec.py @@ -681,8 +681,10 @@ class TrialSpec: class TrialSummary: trial_id: str status: str + parallel_size: int | None = None best_sampling_u: float | None = None best_request_rate: float | None = None + best_request_rate_per_gpu: float | None = None best_pass_rate: float | None = None result_path: str | None = None diagnosis: str = "" @@ -695,9 +697,12 @@ class TrialSummary: class StudyState: study_id: str best_trial_id: str | None = None + best_parallel_size: int | None = None best_sampling_u: float | None = None best_request_rate: float | None = None + best_request_rate_per_gpu: float | None = None next_trial_index: int = 1 + best_by_parallel_size: dict[str, dict[str, Any]] = field(default_factory=dict) trials: list[TrialSummary] = field(default_factory=list) diff --git a/src/aituner/store.py b/src/aituner/store.py index 5fc580e..bb9e49e 100644 --- a/src/aituner/store.py +++ b/src/aituner/store.py @@ -32,9 +32,16 @@ class StudyStore: return StudyState( study_id=str(payload["study_id"]), best_trial_id=payload.get("best_trial_id"), + best_parallel_size=payload.get("best_parallel_size"), best_sampling_u=payload.get("best_sampling_u"), best_request_rate=payload.get("best_request_rate"), + best_request_rate_per_gpu=payload.get("best_request_rate_per_gpu"), next_trial_index=int(payload.get("next_trial_index", 1)), + best_by_parallel_size={ + str(key): value + for key, value in (payload.get("best_by_parallel_size") or {}).items() + if isinstance(value, dict) + }, trials=trials, ) @@ -61,21 +68,15 @@ class StudyStore: trial_id = f"trial-{state.next_trial_index:04d}" trial_root = self.study_root(study.study_id) / "trials" / trial_id trial_root.mkdir(parents=True, exist_ok=True) + parallel_size = _parallel_size_for_proposal(study=study, proposal=proposal) + search_low = _derive_search_floor(study=study, state=state, parallel_size=parallel_size) spec = TrialSpec( study_id=study.study_id, trial_id=trial_id, config_patch=proposal.config_patch, search=replace( study.search, - low=min( - study.search.high, - max( - study.search.low, - float(state.best_sampling_u) - if isinstance(state.best_sampling_u, (int, float)) - else study.search.low, - ), - ), + low=search_low, ), study_spec_path=str((self.study_root(study.study_id) / "study_spec.source").resolve()), artifact_dir=str(trial_root), @@ -89,6 +90,7 @@ class StudyStore: TrialSummary( trial_id=trial_id, status="queued", + parallel_size=parallel_size, diagnosis=proposal.diagnosis, config_patch=to_jsonable(proposal.config_patch), ) @@ -97,12 +99,20 @@ class StudyStore: return spec, next_state def ingest_trial_results(self, study_id: str) -> StudyState: + study = self.load_study_snapshot(study_id) state = self.load_state(study_id) by_id = {item.trial_id: item for item in state.trials} trials_dir = self.study_root(study_id) / "trials" best_trial_id = state.best_trial_id + best_parallel_size = state.best_parallel_size best_sampling_u = state.best_sampling_u best_rate = state.best_request_rate + best_rate_per_gpu = state.best_request_rate_per_gpu + best_by_parallel_size = { + str(key): dict(value) + for key, value in (state.best_by_parallel_size or {}).items() + if isinstance(value, dict) + } for trial_dir in sorted(trials_dir.glob("trial-*")): result_path = trial_dir / "result.json" if not result_path.exists(): @@ -115,30 +125,140 @@ class StudyStore: state.trials.append(summary) by_id[trial_id] = summary summary.status = str(payload.get("status") or "completed") + if summary.parallel_size is None: + summary.parallel_size = _parallel_size_for_trial_id( + study=study, + study_root=self.study_root(study_id), + trial_id=trial_id, + ) summary.best_sampling_u = payload.get("best_sampling_u") summary.best_request_rate = payload.get("best_request_rate") + summary.best_request_rate_per_gpu = _request_rate_per_gpu( + summary.best_request_rate, + summary.parallel_size, + ) summary.best_pass_rate = payload.get("best_pass_rate") summary.result_path = str(result_path) summary.failure_reason = str(payload.get("failure_reason") or "").strip() summary.failure_stage = str(payload.get("failure_stage") or "").strip() if ( - isinstance(summary.best_request_rate, (int, float)) - and (best_rate is None or summary.best_request_rate > best_rate) + isinstance(summary.parallel_size, int) + and summary.parallel_size > 0 + and isinstance(summary.best_request_rate_per_gpu, (int, float)) ): - best_rate = float(summary.best_request_rate) + group_key = str(summary.parallel_size) + incumbent = best_by_parallel_size.get(group_key) + if incumbent is None or float(summary.best_request_rate_per_gpu) > float( + incumbent.get("best_request_rate_per_gpu") or float("-inf") + ): + best_by_parallel_size[group_key] = { + "trial_id": trial_id, + "parallel_size": summary.parallel_size, + "best_sampling_u": summary.best_sampling_u, + "best_request_rate": summary.best_request_rate, + "best_request_rate_per_gpu": summary.best_request_rate_per_gpu, + "best_pass_rate": summary.best_pass_rate, + } + if ( + isinstance(summary.best_request_rate_per_gpu, (int, float)) + and ( + best_rate_per_gpu is None + or float(summary.best_request_rate_per_gpu) > float(best_rate_per_gpu) + ) + ): + best_rate = ( + float(summary.best_request_rate) + if isinstance(summary.best_request_rate, (int, float)) + else None + ) + best_rate_per_gpu = float(summary.best_request_rate_per_gpu) best_sampling_u = ( float(summary.best_sampling_u) if isinstance(summary.best_sampling_u, (int, float)) else None ) best_trial_id = trial_id + best_parallel_size = summary.parallel_size state.best_sampling_u = best_sampling_u state.best_request_rate = best_rate + state.best_request_rate_per_gpu = best_rate_per_gpu state.best_trial_id = best_trial_id + state.best_parallel_size = best_parallel_size + state.best_by_parallel_size = best_by_parallel_size self.save_state(state) return state + def load_study_snapshot(self, study_id: str) -> StudySpec: + path = self.study_root(study_id) / "study_spec.snapshot.json" + payload = json.loads(path.read_text(encoding="utf-8")) + return StudySpec.from_dict(payload) + @staticmethod def write_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + + +def _parse_int_like(value: Any, *, default: int = 1) -> int: + if value is None: + return default + if isinstance(value, bool): + raise ValueError("Boolean values are not valid integer topology settings.") + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str) and value.strip(): + return int(value.strip()) + raise ValueError(f"Unable to parse integer value from {value!r}.") + + +def _parallel_size_for_config(*, study: StudySpec, flag_patch: dict[str, Any]) -> int: + flags = dict(study.engine.base_flags) + flags.update(flag_patch) + tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1) + dp = _parse_int_like(flags.get("data-parallel-size"), default=1) + return tp * dp + + +def _parallel_size_for_proposal(*, study: StudySpec, proposal: Proposal) -> int: + return _parallel_size_for_config(study=study, flag_patch=proposal.config_patch.flag_patch) + + +def _parallel_size_for_trial_id(*, study: StudySpec, study_root: Path, trial_id: str) -> int | None: + trial_spec_path = study_root / "trials" / trial_id / "trial_spec.json" + if not trial_spec_path.exists(): + return None + payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) + config_patch = payload.get("config_patch") or {} + flag_patch = config_patch.get("flag_patch") if isinstance(config_patch, dict) else {} + if not isinstance(flag_patch, dict): + return None + return _parallel_size_for_config(study=study, flag_patch=flag_patch) + + +def _request_rate_per_gpu(best_request_rate: Any, parallel_size: int | None) -> float | None: + if not isinstance(best_request_rate, (int, float)) or not isinstance(parallel_size, int): + return None + if parallel_size <= 0: + return None + return float(best_request_rate) / float(parallel_size) + + +def _derive_search_floor(*, study: StudySpec, state: StudyState, parallel_size: int) -> float: + low = study.search.low + high = study.search.high + group_incumbent = (state.best_by_parallel_size or {}).get(str(parallel_size)) + if isinstance(group_incumbent, dict) and isinstance( + group_incumbent.get("best_sampling_u"), (int, float) + ): + candidate = float(group_incumbent["best_sampling_u"]) + elif ( + isinstance(state.best_sampling_u, (int, float)) + and isinstance(state.best_parallel_size, int) + and state.best_parallel_size > 0 + ): + candidate = float(state.best_sampling_u) * float(parallel_size) / float(state.best_parallel_size) + else: + candidate = low + return min(high, max(low, candidate)) diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 770d8f0..1e7b856 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -1039,6 +1039,12 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(next_state.best_trial_id, trial.trial_id) self.assertEqual(next_state.best_sampling_u, 0.75) self.assertEqual(next_state.best_request_rate, 12.5) + self.assertEqual(next_state.best_parallel_size, 4) + self.assertEqual(next_state.best_request_rate_per_gpu, 3.125) + self.assertEqual( + next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"], + 3.125, + ) def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None: with tempfile.TemporaryDirectory() as tmp: @@ -1050,8 +1056,10 @@ class CoreFlowTests(unittest.TestCase): state = StudyState( study_id=study.study_id, best_trial_id="trial-0001", + best_parallel_size=4, best_sampling_u=0.375, best_request_rate=3.0, + best_request_rate_per_gpu=0.75, next_trial_index=2, trials=[], ) @@ -1067,6 +1075,71 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(trial.search.low, 0.375) self.assertEqual(trial.search.high, 1.0) + def test_materialize_trial_uses_same_parallel_group_incumbent(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + store = StudyStore(tmp_path / ".aituner" / "studies") + store.init_study(spec_path=study_path, study=study) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=4, + best_sampling_u=0.375, + best_request_rate=3.0, + best_request_rate_per_gpu=0.75, + next_trial_index=2, + best_by_parallel_size={ + "2": { + "trial_id": "trial-0000", + "parallel_size": 2, + "best_sampling_u": 0.125, + "best_request_rate": 0.8, + "best_request_rate_per_gpu": 0.4, + } + }, + trials=[], + ) + proposal = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Diag", + "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}}, + "expected_effects": ["raise rate"], + } + ) + trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) + self.assertEqual(trial.search.low, 0.125) + + def test_materialize_trial_scales_search_floor_for_different_parallel_group(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + store = StudyStore(tmp_path / ".aituner" / "studies") + store.init_study(spec_path=study_path, study=study) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=4, + best_sampling_u=0.4, + best_request_rate=3.0, + best_request_rate_per_gpu=0.75, + next_trial_index=2, + trials=[], + ) + proposal = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Diag", + "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}}, + "expected_effects": ["raise rate"], + } + ) + trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) + self.assertEqual(trial.search.low, 0.2) + def test_ingest_trial_results_records_failure_reason(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) @@ -1137,6 +1210,66 @@ class CoreFlowTests(unittest.TestCase): next_state = store.ingest_trial_results(study.study_id) self.assertEqual(next_state.trials[0].failure_stage, "engine_launch") + def test_ingest_trial_results_prefers_higher_request_rate_per_gpu(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + store = StudyStore(tmp_path / ".aituner" / "studies") + store.init_study(spec_path=study_path, study=study) + state = store.load_state(study.study_id) + proposal_a = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Diag", + "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, + "expected_effects": ["raise rate"], + } + ) + trial_a, state = store.materialize_trial(study=study, state=state, proposal=proposal_a) + Path(trial_a.result_path).write_text( + json.dumps( + { + "study_id": study.study_id, + "trial_id": trial_a.trial_id, + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 4.0, + "best_pass_rate": 0.97, + } + ), + encoding="utf-8", + ) + proposal_b = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Diag", + "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}}, + "expected_effects": ["raise rate"], + } + ) + trial_b, _ = store.materialize_trial(study=study, state=state, proposal=proposal_b) + Path(trial_b.result_path).write_text( + json.dumps( + { + "study_id": study.study_id, + "trial_id": trial_b.trial_id, + "status": "completed", + "best_sampling_u": 0.4, + "best_request_rate": 3.0, + "best_pass_rate": 0.97, + } + ), + encoding="utf-8", + ) + next_state = store.ingest_trial_results(study.study_id) + self.assertEqual(next_state.best_trial_id, trial_b.trial_id) + self.assertEqual(next_state.best_parallel_size, 2) + self.assertEqual(next_state.best_request_rate, 3.0) + self.assertEqual(next_state.best_request_rate_per_gpu, 1.5) + self.assertEqual(next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"], 1.0) + self.assertEqual(next_state.best_by_parallel_size["2"]["best_request_rate_per_gpu"], 1.5) + def test_validate_proposal_rejects_invalid_tp_dp_product(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)