From ef78fe7eb5c355f5e15145beeb97be0c4d806f03 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Thu, 9 Apr 2026 21:07:51 +0800 Subject: [PATCH] Add topology-aware tuning constraints --- ..._qwen235b_decode_thinking_run2_tpot40.json | 13 ++ src/aituner/llm.py | 197 ++++++++++++++++ src/aituner/spec.py | 75 ++++++ src/aituner/store.py | 1 + src/aituner/worker.py | 7 +- tests/test_core_flow.py | 215 +++++++++++++++++- 6 files changed, 506 insertions(+), 2 deletions(-) diff --git a/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json b/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json index 0b385c1..32619e2 100644 --- a/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json +++ b/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json @@ -141,11 +141,24 @@ "CUDA_DEVICE_MAX_CONNECTIONS" ], "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", "gpu-memory-utilization", "max-num-batched-tokens", "max-num-seqs", "block-size" ], + "topology_constraints": { + "require_tp_dp_product_equals_gpu_count": true, + "require_ep_size_leq_tp_dp_product": true, + "require_ep_size_divides_tp_dp_product": true, + "require_enable_expert_parallel_when_ep_gt_one": true, + "validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter": true, + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_expert_parallel_sizes": [1, 2, 4, 8] + }, "python_executable": "python3" }, "trace": { diff --git a/src/aituner/llm.py b/src/aituner/llm.py index 8911e25..840217a 100644 --- a/src/aituner/llm.py +++ b/src/aituner/llm.py @@ -8,6 +8,96 @@ from .http_client import chat_completion, stream_text_completion from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState +def _parse_bool_like(value: Any, *, context: str) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in {"1", "true", "yes", "on"}: + return True + if normalized in {"0", "false", "no", "off"}: + return False + raise SpecError(f"{context} must be a boolean-like value.") + + +def _parse_int_like(value: Any, *, context: str) -> int: + if isinstance(value, bool): + raise SpecError(f"{context} must be an integer.") + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str): + stripped = value.strip() + if stripped: + try: + return int(stripped) + except ValueError as exc: + raise SpecError(f"{context} must be an integer.") from exc + raise SpecError(f"{context} must be an integer.") + + +def _effective_engine_envs(study: StudySpec, proposal: Proposal | None = None) -> dict[str, Any]: + envs = dict(study.engine.base_envs) + if proposal is not None: + envs.update(proposal.config_patch.env_patch) + return envs + + +def _effective_engine_flags(study: StudySpec, proposal: Proposal | None = None) -> dict[str, Any]: + flags = dict(study.engine.base_flags) + if proposal is not None: + flags.update(proposal.config_patch.flag_patch) + return flags + + +def _effective_topology(study: StudySpec, proposal: Proposal | None = None) -> dict[str, Any]: + envs = _effective_engine_envs(study, proposal) + flags = _effective_engine_flags(study, proposal) + tp = _parse_int_like(flags.get("tensor-parallel-size", 1), context="tensor-parallel-size") + dp = _parse_int_like(flags.get("data-parallel-size", 1), context="data-parallel-size") + raw_enable_ep = flags.get("enable-expert-parallel", False) + enable_ep = _parse_bool_like(raw_enable_ep, context="enable-expert-parallel") + raw_ep = flags.get("expert-parallel-size") + ep = _parse_int_like(raw_ep, context="expert-parallel-size") if raw_ep is not None else None + effective_ep = ep if ep is not None else (tp * dp if enable_ep else 1) + return { + "tensor_parallel_size": tp, + "data_parallel_size": dp, + "tp_dp_product": tp * dp, + "enable_expert_parallel": enable_ep, + "expert_parallel_size": ep, + "effective_expert_parallel_size": effective_ep, + "vllm_tpep_reduce_scatter": _parse_bool_like( + envs.get("VLLM_TPEP_REDUCE_SCATTER", "0"), + context="VLLM_TPEP_REDUCE_SCATTER", + ), + } + + +def _launch_failure_history(state: StudyState) -> list[dict[str, Any]]: + failures: list[dict[str, Any]] = [] + for trial in state.trials: + if trial.failure_stage != "engine_launch" and "engine_process_exited_before_ready" not in ( + trial.failure_reason or "" + ): + continue + config_patch = trial.config_patch or {} + env_patch = config_patch.get("env_patch", {}) if isinstance(config_patch, dict) else {} + flag_patch = config_patch.get("flag_patch", {}) if isinstance(config_patch, dict) else {} + failures.append( + { + "trial_id": trial.trial_id, + "failure_reason": trial.failure_reason, + "failure_stage": trial.failure_stage or "engine_launch", + "implicated_env_keys": sorted(env_patch), + "implicated_flag_keys": sorted(flag_patch), + "config_patch": config_patch, + } + ) + return failures + + def build_prompt( *, study: StudySpec, @@ -36,8 +126,10 @@ def build_prompt( "diagnosis": trial.diagnosis, "config_patch": trial.config_patch, "failure_reason": trial.failure_reason, + "failure_stage": trial.failure_stage, } ) + launch_failures = _launch_failure_history(state) sections = [ "You are tuning an OpenAI-compatible serving engine.", "Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures.", @@ -46,6 +138,7 @@ def build_prompt( "Only use allowed tunable env keys and allowed tunable flag keys.", "Do not wrap the JSON in markdown fences or any extra text.", "Do not repeat a config that previously failed to launch unless the new patch explicitly removes the failing knob.", + "Treat previous engine launch failures as hard negative evidence. If you touch TP/DP/EP, keep the proposal inside the topology constraints exactly.", "", "Study stack:", json.dumps( @@ -83,6 +176,12 @@ def build_prompt( "base_envs": study.engine.base_envs, "allowed_flag_keys": study.engine.tunable_flags, "allowed_env_keys": study.engine.tunable_envs, + "topology_constraints": ( + study.engine.topology_constraints.__dict__ + if study.engine.topology_constraints is not None + else None + ), + "effective_topology": _effective_topology(study), }, }, ensure_ascii=False, @@ -111,6 +210,9 @@ def build_prompt( "Trial history:", json.dumps(history, ensure_ascii=False, indent=2), "", + "Known launch failures:", + json.dumps(launch_failures, ensure_ascii=False, indent=2), + "", "The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.", "The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.", "The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.", @@ -136,6 +238,101 @@ def validate_proposal(proposal: Proposal, study: StudySpec) -> Proposal: raise SpecError(f"Proposal uses unsupported env keys: {', '.join(unknown_envs)}") if unknown_flags: raise SpecError(f"Proposal uses unsupported flag keys: {', '.join(unknown_flags)}") + topology = _effective_topology(study, proposal) + tp = topology["tensor_parallel_size"] + dp = topology["data_parallel_size"] + effective_ep = topology["effective_expert_parallel_size"] + if tp < 1 or dp < 1: + raise SpecError("Proposal violates topology constraints: TP/DP must be >= 1.") + constraints = study.engine.topology_constraints + if constraints is None: + return proposal + if ( + constraints.allowed_tensor_parallel_sizes + and tp not in constraints.allowed_tensor_parallel_sizes + ): + raise SpecError( + "Proposal violates topology constraints: " + f"tensor-parallel-size={tp} not in {constraints.allowed_tensor_parallel_sizes}." + ) + if constraints.allowed_data_parallel_sizes and dp not in constraints.allowed_data_parallel_sizes: + raise SpecError( + "Proposal violates topology constraints: " + f"data-parallel-size={dp} not in {constraints.allowed_data_parallel_sizes}." + ) + if ( + constraints.allowed_expert_parallel_sizes + and effective_ep not in constraints.allowed_expert_parallel_sizes + ): + raise SpecError( + "Proposal violates topology constraints: " + f"expert-parallel-size={effective_ep} not in {constraints.allowed_expert_parallel_sizes}." + ) + tp_dp_product = topology["tp_dp_product"] + if tp_dp_product > study.hardware.gpu_count: + raise SpecError( + "Proposal violates topology constraints: " + f"tensor-parallel-size * data-parallel-size = {tp_dp_product} exceeds " + f"hardware.gpu_count={study.hardware.gpu_count}." + ) + if ( + constraints.require_tp_dp_product_equals_gpu_count + and tp_dp_product != study.hardware.gpu_count + ): + raise SpecError( + "Proposal violates topology constraints: " + f"tensor-parallel-size * data-parallel-size must equal hardware.gpu_count=" + f"{study.hardware.gpu_count}, got {tp_dp_product}." + ) + if ( + constraints.require_enable_expert_parallel_when_ep_gt_one + and effective_ep > 1 + and not topology["enable_expert_parallel"] + ): + raise SpecError( + "Proposal violates topology constraints: expert-parallel-size > 1 requires " + "enable-expert-parallel=true." + ) + if ( + constraints.require_ep_size_leq_tp_dp_product + and effective_ep > tp_dp_product + ): + raise SpecError( + "Proposal violates topology constraints: " + f"expert-parallel-size={effective_ep} must be <= tensor-parallel-size * " + f"data-parallel-size={tp_dp_product}." + ) + if ( + constraints.require_ep_size_divides_tp_dp_product + and tp_dp_product % effective_ep != 0 + ): + raise SpecError( + "Proposal violates topology constraints: " + f"expert-parallel-size={effective_ep} must divide tensor-parallel-size * " + f"data-parallel-size={tp_dp_product}." + ) + if ( + constraints.validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter + and topology["vllm_tpep_reduce_scatter"] + and topology["enable_expert_parallel"] + and effective_ep > 1 + and tp > 1 + ): + flags = _effective_engine_flags(study, proposal) + graph_sizes = flags.get("cuda-graph-sizes") + if isinstance(graph_sizes, list): + invalid_sizes = [ + _parse_int_like(item, context="cuda-graph-sizes") + for item in graph_sizes + if _parse_int_like(item, context="cuda-graph-sizes") % tp != 0 + ] + if invalid_sizes: + raise SpecError( + "Proposal violates topology constraints: " + f"cuda-graph-sizes must be divisible by tensor-parallel-size={tp} " + f"when VLLM_TPEP_REDUCE_SCATTER=1 and expert parallel is enabled. " + f"First invalid sizes: {invalid_sizes[:8]}" + ) return proposal diff --git a/src/aituner/spec.py b/src/aituner/spec.py index 281531e..d099f01 100644 --- a/src/aituner/spec.py +++ b/src/aituner/spec.py @@ -63,6 +63,17 @@ def _coerce_str_list(value: Any, *, context: str) -> list[str]: return result +def _coerce_int_list(value: Any, *, context: str) -> list[int]: + if value is None: + return [] + if not isinstance(value, list): + raise SpecError(f"{context} must be a list.") + result: list[int] = [] + for item in value: + result.append(_require_int(item, context=context)) + return result + + def _resolve_codex_endpoint() -> tuple[str, str, str | None]: override = os.environ.get("AITUNER_CODEX_BASE_URL", "").strip() if override: @@ -158,6 +169,7 @@ class EngineLaunchSpec: base_flags: dict[str, Any] tunable_envs: list[str] tunable_flags: list[str] + topology_constraints: "TopologyConstraintSpec | None" = None python_executable: str = "python3" @property @@ -192,10 +204,72 @@ class EngineLaunchSpec: tunable_flags=_coerce_str_list( data.get("tunable_flags"), context="engine.tunable_flags" ), + topology_constraints=( + TopologyConstraintSpec.from_dict( + _require_mapping( + data.get("topology_constraints"), + context="engine.topology_constraints", + ) + ) + if data.get("topology_constraints") is not None + else None + ), python_executable=str(data.get("python_executable") or "python3").strip(), ) +@dataclass(frozen=True) +class TopologyConstraintSpec: + require_tp_dp_product_equals_gpu_count: bool = False + require_ep_size_leq_tp_dp_product: bool = False + require_ep_size_divides_tp_dp_product: bool = False + require_enable_expert_parallel_when_ep_gt_one: bool = True + validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter: bool = True + allowed_tensor_parallel_sizes: list[int] = field(default_factory=list) + allowed_data_parallel_sizes: list[int] = field(default_factory=list) + allowed_expert_parallel_sizes: list[int] = field(default_factory=list) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "TopologyConstraintSpec": + return cls( + require_tp_dp_product_equals_gpu_count=_require_bool( + data.get("require_tp_dp_product_equals_gpu_count", False), + context="engine.topology_constraints.require_tp_dp_product_equals_gpu_count", + ), + require_ep_size_leq_tp_dp_product=_require_bool( + data.get("require_ep_size_leq_tp_dp_product", False), + context="engine.topology_constraints.require_ep_size_leq_tp_dp_product", + ), + require_ep_size_divides_tp_dp_product=_require_bool( + data.get("require_ep_size_divides_tp_dp_product", False), + context="engine.topology_constraints.require_ep_size_divides_tp_dp_product", + ), + require_enable_expert_parallel_when_ep_gt_one=_require_bool( + data.get("require_enable_expert_parallel_when_ep_gt_one", True), + context="engine.topology_constraints.require_enable_expert_parallel_when_ep_gt_one", + ), + validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter=_require_bool( + data.get( + "validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter", + True, + ), + context="engine.topology_constraints.validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter", + ), + allowed_tensor_parallel_sizes=_coerce_int_list( + data.get("allowed_tensor_parallel_sizes"), + context="engine.topology_constraints.allowed_tensor_parallel_sizes", + ), + allowed_data_parallel_sizes=_coerce_int_list( + data.get("allowed_data_parallel_sizes"), + context="engine.topology_constraints.allowed_data_parallel_sizes", + ), + allowed_expert_parallel_sizes=_coerce_int_list( + data.get("allowed_expert_parallel_sizes"), + context="engine.topology_constraints.allowed_expert_parallel_sizes", + ), + ) + + @dataclass(frozen=True) class InputLengthFilterSpec: min_input_tokens: int | None = None @@ -601,6 +675,7 @@ class TrialSummary: diagnosis: str = "" config_patch: dict[str, Any] | None = None failure_reason: str = "" + failure_stage: str = "" @dataclass diff --git a/src/aituner/store.py b/src/aituner/store.py index f2983e7..5fc580e 100644 --- a/src/aituner/store.py +++ b/src/aituner/store.py @@ -120,6 +120,7 @@ class StudyStore: summary.best_pass_rate = payload.get("best_pass_rate") summary.result_path = str(result_path) summary.failure_reason = str(payload.get("failure_reason") or "").strip() + summary.failure_stage = str(payload.get("failure_stage") or "").strip() if ( isinstance(summary.best_request_rate, (int, float)) and (best_rate is None or summary.best_request_rate > best_rate) diff --git a/src/aituner/worker.py b/src/aituner/worker.py index 2d8e5c5..49c5916 100644 --- a/src/aituner/worker.py +++ b/src/aituner/worker.py @@ -17,7 +17,7 @@ from .engine import build_launch_recipe from .http_client import HttpClientError, stream_chat_completion, wait_for_server from .search import ThresholdProbe, binary_search_max_feasible from .slo import RequestOutcome, evaluate_request, summarize_evaluations -from .spec import ConfigPatch, SamplingSearchSpec, TrialSpec, load_study_spec +from .spec import ConfigPatch, SamplingSearchSpec, TrialSpec, load_study_spec, to_jsonable from .trace import TraceRequest, load_trace_requests, select_requests_for_threshold @@ -321,6 +321,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: start_new_session=True, ) probe_history: list[dict[str, Any]] = [] + failure_stage = "engine_launch" try: _wait_for_server_or_exit( process, @@ -328,6 +329,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: healthcheck_path=recipe.healthcheck_path, ready_timeout_s=recipe.ready_timeout_s, ) + failure_stage = "probe_search" def evaluator(threshold: float) -> ThresholdProbe[ProbePayload]: selected = select_requests_for_threshold(requests, threshold=threshold) @@ -404,6 +406,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: "study_id": trial.study_id, "trial_id": trial.trial_id, "status": "completed", + "config_patch": to_jsonable(trial.config_patch), "best_sampling_u": search.best_threshold if best is not None else None, "best_request_rate": best.request_rate if best is not None else None, "best_pass_rate": best.pass_rate if best is not None else None, @@ -442,10 +445,12 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: "study_id": trial.study_id, "trial_id": trial.trial_id, "status": "failed", + "config_patch": to_jsonable(trial.config_patch), "best_sampling_u": None, "best_request_rate": None, "best_pass_rate": None, "best_request_count": None, + "failure_stage": failure_stage, "failure_reason": str(exc), "probes": probe_history, } diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 0f25178..532d581 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -12,7 +12,7 @@ from aituner.cli import main as cli_main from aituner.engine import build_launch_recipe from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy from aituner.job import append_job, build_trial_job -from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text +from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations from aituner.spec import ( @@ -40,6 +40,7 @@ def _write_study_assets( *, trace_overrides: dict[str, object] | None = None, slo_overrides: dict[str, object] | None = None, + engine_overrides: dict[str, object] | None = None, ) -> Path: trace_dir = tmp_path / "trace_windows" / "traces" trace_dir.mkdir(parents=True) @@ -155,6 +156,8 @@ def _write_study_assets( } if slo_overrides: study_payload["slo"].update(slo_overrides) + if engine_overrides: + study_payload["engine"].update(engine_overrides) study_path.write_text(json.dumps(study_payload), encoding="utf-8") return study_path @@ -404,6 +407,40 @@ class CoreFlowTests(unittest.TestCase): self.assertIn('"status": "failed"', prompt) self.assertIn('"failure_reason": "engine_process_exited_before_ready exit_code=1"', prompt) self.assertIn('"VLLM_ATTENTION_BACKEND": "FLASHINFER"', prompt) + self.assertIn("Known launch failures:", prompt) + + def test_prompt_includes_failure_stage_for_launch_failures(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + window, requests = load_trace_requests(study, study_spec_path=study_path) + prompt = build_prompt( + study=study, + window_summary=summarize_window(requests, window), + state=StudyState( + study_id=study.study_id, + trials=[ + TrialSummary( + trial_id="trial-0002", + status="failed", + diagnosis="bad topology", + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 3, + "data-parallel-size": 3, + }, + }, + failure_stage="engine_launch", + failure_reason="engine_process_exited_before_ready exit_code=1", + ) + ], + ), + capability_profile=None, + ) + self.assertIn('"failure_stage": "engine_launch"', prompt) + self.assertIn('"implicated_flag_keys"', prompt) def test_parse_proposal_text_repairs_truncated_json(self) -> None: with tempfile.TemporaryDirectory() as tmp: @@ -1003,6 +1040,182 @@ class CoreFlowTests(unittest.TestCase): next_state.trials[0].failure_reason, "engine_process_exited_before_ready exit_code=1", ) + self.assertEqual(next_state.trials[0].failure_stage, "") + + def test_ingest_trial_results_records_failure_stage(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + store = StudyStore(tmp_path / ".aituner" / "studies") + store.init_study(spec_path=study_path, study=study) + state = store.load_state(study.study_id) + proposal = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Diag", + "config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, + "expected_effects": ["raise rate"] + } + ) + trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) + Path(trial.result_path).write_text( + json.dumps( + { + "study_id": study.study_id, + "trial_id": trial.trial_id, + "status": "failed", + "failure_stage": "engine_launch", + "failure_reason": "engine_process_exited_before_ready exit_code=1", + "probes": [] + } + ), + encoding="utf-8", + ) + next_state = store.ingest_trial_results(study.study_id) + self.assertEqual(next_state.trials[0].failure_stage, "engine_launch") + + def test_validate_proposal_rejects_invalid_tp_dp_product(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "enable-expert-parallel": True, + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "expert-parallel-size": 8, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + ], + "topology_constraints": { + "require_tp_dp_product_equals_gpu_count": True, + "require_ep_size_leq_tp_dp_product": True, + "require_ep_size_divides_tp_dp_product": True, + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_expert_parallel_sizes": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + proposal = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Bad topology", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 2, + "data-parallel-size": 2, + "expert-parallel-size": 4, + }, + }, + "expected_effects": ["raise throughput"], + } + ) + with self.assertRaisesRegex(SpecError, "must equal hardware.gpu_count"): + validate_proposal(proposal, study) + + def test_validate_proposal_rejects_invalid_ep_divisibility(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "enable-expert-parallel": True, + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "expert-parallel-size": 8, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + ], + "topology_constraints": { + "require_tp_dp_product_equals_gpu_count": True, + "require_ep_size_leq_tp_dp_product": True, + "require_ep_size_divides_tp_dp_product": True, + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_expert_parallel_sizes": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + proposal = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Bad EP", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "expert-parallel-size": 3, + }, + }, + "expected_effects": ["raise throughput"], + } + ) + with self.assertRaisesRegex(SpecError, "expert-parallel-size=3"): + validate_proposal(proposal, study) + + def test_validate_proposal_accepts_valid_tp_dp_ep_combo(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "enable-expert-parallel": True, + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "expert-parallel-size": 8, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + ], + "topology_constraints": { + "require_tp_dp_product_equals_gpu_count": True, + "require_ep_size_leq_tp_dp_product": True, + "require_ep_size_divides_tp_dp_product": True, + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_expert_parallel_sizes": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + proposal = Proposal.from_dict( + { + "observation": "Obs", + "diagnosis": "Valid topology", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 2, + "data-parallel-size": 4, + "expert-parallel-size": 4, + }, + }, + "expected_effects": ["raise throughput"], + } + ) + validated = validate_proposal(proposal, study) + self.assertEqual(validated.config_patch.flag_patch["tensor-parallel-size"], 2) def test_cli_tune_runs_multiple_manual_proposals(self) -> None: with tempfile.TemporaryDirectory() as tmp: