Improve harness incumbent follow-up search

This commit is contained in:
2026-06-20 05:37:15 +08:00
parent b3156a382a
commit 5257fbc1a2
3 changed files with 316 additions and 23 deletions

View File

@@ -998,6 +998,76 @@ class CoreFlowTests(unittest.TestCase):
self.assertIsNotNone(proposal)
self.assertTrue(proposal.should_stop)
def test_harness_stop_after_non_improving_feasible_validation_is_exhausted(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_parallel_size=8,
best_sampling_u=0.02,
best_request_rate=2.4,
best_request_rate_per_gpu=0.3,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=0.8,
best_request_rate_per_gpu=0.1,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=8,
best_request_rate=2.4,
best_request_rate_per_gpu=0.3,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"data-parallel-size": 4,
},
},
),
TrialSummary(
trial_id="trial-0003",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 1,
"data-parallel-size": 8,
},
},
),
TrialSummary(
trial_id="trial-0004",
status="completed",
parallel_size=8,
best_request_rate=2.1,
best_request_rate_per_gpu=0.2625,
config_patch={
"env_patch": {},
"flag_patch": {"max-num-seqs": 160},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 2048},
state=state,
)
self.assertTrue(context["harness_stop"]["should_stop"])
self.assertEqual(context["harness_stop"]["reason"], "post_incumbent_validation_exhausted")
def test_harness_does_not_stop_immediately_after_strong_incumbent(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -1318,6 +1388,100 @@ class CoreFlowTests(unittest.TestCase):
},
)
def test_harness_runtime_refinement_preserves_incumbent_runtime_knobs(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"tunable_flags": [
"tensor-parallel-size",
"gpu-memory-utilization",
"max-num-seqs",
"enable-chunked-prefill",
"max-num-batched-tokens",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_tp_dp_products": [1, 2, 4],
},
},
)
study = load_study_spec(study_path)
result_path = tmp_path / "trial-0002.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.098,
"best_request_rate": 3.3,
"best_pass_rate": 0.97,
"probes": [
{
"threshold": 0.098,
"feasible": True,
"payload": {
"request_count": 100,
"pass_rate": 0.97,
"request_rate": 3.3,
"early_stopped": False,
"early_stop_reason": "",
"latency_summary": {"failed_reason_counts": {}},
},
}
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate=3.3,
best_request_rate_per_gpu=0.825,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=2.5,
best_request_rate_per_gpu=0.625,
config_patch={"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
best_request_rate=3.3,
best_request_rate_per_gpu=0.825,
result_path=str(result_path),
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"gpu-memory-utilization": 0.92,
"max-num-seqs": 48,
},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p99": 8100},
state=state,
)
proposal = build_harness_guided_proposal(context)
self.assertIsNotNone(proposal)
self.assertEqual(
proposal.config_patch.flag_patch,
{
"tensor-parallel-size": 4,
"gpu-memory-utilization": 0.92,
"max-num-seqs": 48,
"enable-chunked-prefill": True,
"max-num-batched-tokens": 16384,
},
)
def test_harness_raises_gpu_mem_util_on_settled_decode_bound_incumbent(self) -> None:
"""Regression for the coverage gap that let the naive baseline beat the harness:
a settled TP incumbent that is decode_tpot-bound must get a gpu-memory-utilization
@@ -3511,6 +3675,94 @@ class CoreFlowTests(unittest.TestCase):
[0.25, 0.375],
)
def test_run_trial_skips_fallback_below_incumbent_floor(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["search"]["max_probes"] = 2
payload["search"]["inherit_incumbent_floor"] = True
study_path.write_text(json.dumps(payload), encoding="utf-8")
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=1,
best_sampling_u=0.5,
best_request_rate=2.0,
best_request_rate_per_gpu=2.0,
next_trial_index=2,
best_by_parallel_size={
"1": {
"trial_id": "trial-0001",
"parallel_size": 1,
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_request_rate_per_gpu": 2.0,
}
},
trials=[],
)
proposal = Proposal.from_dict(
{
"observation": "runtime patch",
"diagnosis": "primary range all infeasible",
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 2}},
"expected_effects": ["measure"],
}
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
self.assertEqual(trial.search.low, 0.5)
self.assertTrue(trial.search.inherit_incumbent_floor)
def fake_replay(requests, **kwargs):
return (
[
RequestOutcome(
request_id=request.row_id,
success=True,
ttft_ms=10000.0,
tpot_ms=1000.0,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=request.completion_tokens_hint,
)
for request in requests
],
False,
"",
)
process = mock.Mock()
process.poll.return_value = 0
with mock.patch("aituner.worker.subprocess.Popen", return_value=process):
with mock.patch("aituner.worker._wait_for_server_or_exit", return_value=None):
with mock.patch("aituner.worker._terminate_process_tree", return_value=None):
with mock.patch("aituner.worker._replay_requests", side_effect=fake_replay):
result = run_trial(Path(trial.artifact_dir) / "trial_spec.json")
self.assertEqual(result["status"], "completed")
self.assertIsNone(result["best_request_rate"])
self.assertEqual(result["best_source"], "primary_search")
self.assertEqual(result["primary_search"]["low"], 0.5)
self.assertIsNone(result["primary_search"]["best_request_rate"])
self.assertEqual(
[probe["threshold"] for probe in result["primary_search"]["probes"]],
[0.75, 0.625],
)
self.assertEqual(result["lower_range_fallback"]["triggered"], False)
self.assertEqual(result["lower_range_fallback"]["skipped"], True)
self.assertEqual(result["lower_range_fallback"]["probes"], [])
self.assertEqual(
result["lower_range_fallback"]["reason"],
"primary_search_above_incumbent_floor_all_infeasible",
)
self.assertEqual(
result["all_infeasible_diagnostics"]["threshold"],
0.625,
)
def test_materialize_trial_does_not_mutate_input_state_trials(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)