Prioritize topology exploration in decode tuning

This commit is contained in:
2026-04-10 10:25:41 +08:00
parent d582a8ed1b
commit 9422d43737
3 changed files with 117 additions and 4 deletions

View File

@@ -135,10 +135,7 @@
"disable-log-requests": true
},
"tunable_envs": [
"VLLM_ENABLE_TORCH_COMPILE",
"VLLM_ENABLE_TBO_OPT",
"VLLM_USE_FLASHINFER_SAMPLER",
"CUDA_DEVICE_MAX_CONNECTIONS"
"VLLM_ENABLE_TORCH_COMPILE"
],
"tunable_flags": [
"tensor-parallel-size",

View File

@@ -98,6 +98,70 @@ def _launch_failure_history(state: StudyState) -> list[dict[str, Any]]:
return failures
def _parallel_space_tunable(study: StudySpec) -> bool:
tunable_flags = set(study.engine.tunable_flags)
return {
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
}.issubset(tunable_flags)
def _enumerate_parallel_candidates(study: StudySpec) -> list[dict[str, int | bool]]:
constraints = study.engine.topology_constraints
if constraints is None or not _parallel_space_tunable(study):
return []
base_flags = _effective_engine_flags(study)
base_enable_ep = _parse_bool_like(
base_flags.get("enable-expert-parallel", False),
context="enable-expert-parallel",
)
current = _effective_topology(study)
tp_values = constraints.allowed_tensor_parallel_sizes or [current["tensor_parallel_size"]]
dp_values = constraints.allowed_data_parallel_sizes or [current["data_parallel_size"]]
ep_values = constraints.allowed_expert_parallel_sizes or [current["effective_expert_parallel_size"]]
candidates: list[dict[str, int | bool]] = []
for tp in sorted(set(tp_values)):
for dp in sorted(set(dp_values)):
tp_dp_product = tp * dp
if tp_dp_product > study.hardware.gpu_count:
continue
if (
constraints.require_tp_dp_product_equals_gpu_count
and tp_dp_product != study.hardware.gpu_count
):
continue
for ep in sorted(set(ep_values)):
if constraints.require_ep_size_leq_tp_dp_product and ep > tp_dp_product:
continue
if (
constraints.require_ep_size_divides_tp_dp_product
and tp_dp_product % ep != 0
):
continue
enable_ep = base_enable_ep or ep > 1
if (
constraints.require_enable_expert_parallel_when_ep_gt_one
and ep > 1
and not enable_ep
):
continue
candidate = {
"tensor_parallel_size": tp,
"data_parallel_size": dp,
"expert_parallel_size": ep,
"enable_expert_parallel": enable_ep,
}
if (
candidate["tensor_parallel_size"] == current["tensor_parallel_size"]
and candidate["data_parallel_size"] == current["data_parallel_size"]
and candidate["expert_parallel_size"] == current["effective_expert_parallel_size"]
):
continue
candidates.append(candidate)
return candidates
def build_prompt(
*,
study: StudySpec,
@@ -130,6 +194,7 @@ def build_prompt(
}
)
launch_failures = _launch_failure_history(state)
parallel_candidates = _enumerate_parallel_candidates(study)
sections = [
"You are tuning an OpenAI-compatible serving engine.",
"Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures.",
@@ -139,6 +204,11 @@ def build_prompt(
"Do not wrap the JSON in markdown fences or any extra text.",
"Do not repeat a config that previously failed to launch unless the new patch explicitly removes the failing knob.",
"Treat previous engine launch failures as hard negative evidence. If you touch TP/DP/EP, keep the proposal inside the topology constraints exactly.",
(
"TP/DP/EP are part of the tunable space for this study. Prioritize exploring legal topology changes in parallel space before runtime-only knobs unless recent history already proves a topology variant is worse or fails to launch."
if parallel_candidates
else "If TP/DP/EP are not tunable, focus on the remaining launch-safe runtime knobs."
),
"",
"Study stack:",
json.dumps(
@@ -213,6 +283,9 @@ def build_prompt(
"Known launch failures:",
json.dumps(launch_failures, ensure_ascii=False, indent=2),
"",
"Parallel space candidates:",
json.dumps(parallel_candidates, ensure_ascii=False, indent=2),
"",
"The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.",
"The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.",
"The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.",

View File

@@ -461,6 +461,49 @@ class CoreFlowTests(unittest.TestCase):
self.assertIn('"failure_stage": "engine_launch"', prompt)
self.assertIn('"implicated_flag_keys"', prompt)
def test_prompt_prioritizes_parallel_space_when_tp_dp_ep_are_tunable(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"enable-expert-parallel": True,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"expert-parallel-size": 8,
},
"tunable_envs": [],
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"max-num-seqs",
],
"topology_constraints": {
"require_tp_dp_product_equals_gpu_count": True,
"require_ep_size_leq_tp_dp_product": True,
"require_ep_size_divides_tp_dp_product": True,
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
window, requests = load_trace_requests(study, study_spec_path=study_path)
prompt = build_prompt(
study=study,
window_summary=summarize_window(requests, window),
state=StudyState(study_id=study.study_id),
capability_profile=None,
)
self.assertIn("Prioritize exploring legal topology changes in parallel space", prompt)
self.assertIn("Parallel space candidates:", prompt)
self.assertIn('"tensor_parallel_size": 2', prompt)
def test_parse_proposal_text_repairs_truncated_json(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)