Guard generic topology search from introducing EP

This commit is contained in:
2026-06-24 15:21:22 +08:00
parent c245774d76
commit 8fa758797e
2 changed files with 108 additions and 0 deletions

View File

@@ -1979,6 +1979,109 @@ class CoreFlowTests(unittest.TestCase):
self.assertIn("ttft_prefill", context["bottleneck_hypotheses"][0]["name"])
self.assertFalse(context["harness_stop"]["should_stop"])
def test_profile_driven_topology_does_not_introduce_ep_for_ttft(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {"host": "127.0.0.1", "port": 8000},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"enable-expert-parallel",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_data_parallel_sizes": [1],
"allowed_expert_parallel_sizes": [1, 2],
"allowed_tp_dp_products": [1, 2, 4],
"require_ep_size_leq_tp_dp_product": True,
"require_ep_size_divides_tp_dp_product": True,
"require_enable_expert_parallel_when_ep_gt_one": True,
},
},
)
result_paths: list[Path] = []
for idx in range(1, 4):
result_path = tmp_path / f"trial-000{idx}.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.25,
"best_request_rate": 2.0,
"best_pass_rate": 1.0,
"probes": [
{
"threshold": 0.5,
"feasible": False,
"payload": {
"request_count": 100,
"pass_rate": 0.6,
"request_rate": 4.0,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"ttft_ms>2000": 40}
},
},
}
],
}
),
encoding="utf-8",
)
result_paths.append(result_path)
study = load_study_spec(study_path)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 8192},
state=StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate=4.0,
best_request_rate_per_gpu=2.0,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=2.0,
best_request_rate_per_gpu=2.0,
result_path=str(result_paths[0]),
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
best_request_rate=4.0,
best_request_rate_per_gpu=2.0,
result_path=str(result_paths[1]),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 2},
},
),
TrialSummary(
trial_id="trial-0003",
status="completed",
best_request_rate=4.0,
best_request_rate_per_gpu=1.0,
result_path=str(result_paths[2]),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 4},
},
),
],
),
)
candidate_actions = context["experiment_plan"]["candidate_actions"]
for action in candidate_actions:
patch = action["config_patch"]["flag_patch"]
self.assertNotIn("enable-expert-parallel", patch)
self.assertNotIn("expert-parallel-size", patch)
def test_profile_driven_planner_prefers_decode_concurrency_relief(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)