From 9422d43737b47492dbed6217d0edfc392040cc74 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 10 Apr 2026 10:25:41 +0800 Subject: [PATCH] Prioritize topology exploration in decode tuning --- ..._qwen235b_decode_thinking_run2_tpot40.json | 5 +- src/aituner/llm.py | 73 +++++++++++++++++++ tests/test_core_flow.py | 43 +++++++++++ 3 files changed, 117 insertions(+), 4 deletions(-) diff --git a/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json b/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json index 7b92e04..d06faec 100644 --- a/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json +++ b/configs/examples/dash0_qwen235b_decode_thinking_run2_tpot40.json @@ -135,10 +135,7 @@ "disable-log-requests": true }, "tunable_envs": [ - "VLLM_ENABLE_TORCH_COMPILE", - "VLLM_ENABLE_TBO_OPT", - "VLLM_USE_FLASHINFER_SAMPLER", - "CUDA_DEVICE_MAX_CONNECTIONS" + "VLLM_ENABLE_TORCH_COMPILE" ], "tunable_flags": [ "tensor-parallel-size", diff --git a/src/aituner/llm.py b/src/aituner/llm.py index 840217a..028bad1 100644 --- a/src/aituner/llm.py +++ b/src/aituner/llm.py @@ -98,6 +98,70 @@ def _launch_failure_history(state: StudyState) -> list[dict[str, Any]]: return failures +def _parallel_space_tunable(study: StudySpec) -> bool: + tunable_flags = set(study.engine.tunable_flags) + return { + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + }.issubset(tunable_flags) + + +def _enumerate_parallel_candidates(study: StudySpec) -> list[dict[str, int | bool]]: + constraints = study.engine.topology_constraints + if constraints is None or not _parallel_space_tunable(study): + return [] + base_flags = _effective_engine_flags(study) + base_enable_ep = _parse_bool_like( + base_flags.get("enable-expert-parallel", False), + context="enable-expert-parallel", + ) + current = _effective_topology(study) + tp_values = constraints.allowed_tensor_parallel_sizes or [current["tensor_parallel_size"]] + dp_values = constraints.allowed_data_parallel_sizes or [current["data_parallel_size"]] + ep_values = constraints.allowed_expert_parallel_sizes or [current["effective_expert_parallel_size"]] + candidates: list[dict[str, int | bool]] = [] + for tp in sorted(set(tp_values)): + for dp in sorted(set(dp_values)): + tp_dp_product = tp * dp + if tp_dp_product > study.hardware.gpu_count: + continue + if ( + constraints.require_tp_dp_product_equals_gpu_count + and tp_dp_product != study.hardware.gpu_count + ): + continue + for ep in sorted(set(ep_values)): + if constraints.require_ep_size_leq_tp_dp_product and ep > tp_dp_product: + continue + if ( + constraints.require_ep_size_divides_tp_dp_product + and tp_dp_product % ep != 0 + ): + continue + enable_ep = base_enable_ep or ep > 1 + if ( + constraints.require_enable_expert_parallel_when_ep_gt_one + and ep > 1 + and not enable_ep + ): + continue + candidate = { + "tensor_parallel_size": tp, + "data_parallel_size": dp, + "expert_parallel_size": ep, + "enable_expert_parallel": enable_ep, + } + if ( + candidate["tensor_parallel_size"] == current["tensor_parallel_size"] + and candidate["data_parallel_size"] == current["data_parallel_size"] + and candidate["expert_parallel_size"] == current["effective_expert_parallel_size"] + ): + continue + candidates.append(candidate) + return candidates + + def build_prompt( *, study: StudySpec, @@ -130,6 +194,7 @@ def build_prompt( } ) launch_failures = _launch_failure_history(state) + parallel_candidates = _enumerate_parallel_candidates(study) sections = [ "You are tuning an OpenAI-compatible serving engine.", "Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures.", @@ -139,6 +204,11 @@ def build_prompt( "Do not wrap the JSON in markdown fences or any extra text.", "Do not repeat a config that previously failed to launch unless the new patch explicitly removes the failing knob.", "Treat previous engine launch failures as hard negative evidence. If you touch TP/DP/EP, keep the proposal inside the topology constraints exactly.", + ( + "TP/DP/EP are part of the tunable space for this study. Prioritize exploring legal topology changes in parallel space before runtime-only knobs unless recent history already proves a topology variant is worse or fails to launch." + if parallel_candidates + else "If TP/DP/EP are not tunable, focus on the remaining launch-safe runtime knobs." + ), "", "Study stack:", json.dumps( @@ -213,6 +283,9 @@ def build_prompt( "Known launch failures:", json.dumps(launch_failures, ensure_ascii=False, indent=2), "", + "Parallel space candidates:", + json.dumps(parallel_candidates, ensure_ascii=False, indent=2), + "", "The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.", "The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.", "The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.", diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index d6fe7fe..3eb0ac4 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -461,6 +461,49 @@ class CoreFlowTests(unittest.TestCase): self.assertIn('"failure_stage": "engine_launch"', prompt) self.assertIn('"implicated_flag_keys"', prompt) + def test_prompt_prioritizes_parallel_space_when_tp_dp_ep_are_tunable(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "enable-expert-parallel": True, + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "expert-parallel-size": 8, + }, + "tunable_envs": [], + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + "max-num-seqs", + ], + "topology_constraints": { + "require_tp_dp_product_equals_gpu_count": True, + "require_ep_size_leq_tp_dp_product": True, + "require_ep_size_divides_tp_dp_product": True, + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_expert_parallel_sizes": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + window, requests = load_trace_requests(study, study_spec_path=study_path) + prompt = build_prompt( + study=study, + window_summary=summarize_window(requests, window), + state=StudyState(study_id=study.study_id), + capability_profile=None, + ) + self.assertIn("Prioritize exploring legal topology changes in parallel space", prompt) + self.assertIn("Parallel space candidates:", prompt) + self.assertIn('"tensor_parallel_size": 2', prompt) + def test_parse_proposal_text_repairs_truncated_json(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)