diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 034627c..5dbe239 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -135,6 +135,7 @@ def _knob_harnesses( "guards": [ "Keep MBT changes within a conservative trust region.", "Do not raise MBT after OOM or launch failures involving memory-related knobs.", + "Do not raise MBT when the incumbent MBT already covers prompt p99 unless same-topology history proves prefill fragmentation is the bottleneck.", ], "active_now": active_bottleneck == "ttft_prefill", } @@ -308,6 +309,7 @@ def _convergence_guard( recent_diagnostics: list[dict[str, Any]], ) -> dict[str, Any]: infeasible_progress = _infeasible_progress_guard(recent_diagnostics) + strong_incumbent = _strong_incumbent_guard(state, recent_diagnostics) completed = [ item for item in recent_diagnostics @@ -329,12 +331,21 @@ def _convergence_guard( reason = "need_more_evidence_before_stop" if not should_stop and infeasible_progress["plateau_detected"]: reason = str(infeasible_progress["reason"]) + if ( + not should_stop + and not infeasible_progress["plateau_detected"] + and strong_incumbent["guard_active"] + ): + reason = str(strong_incumbent["reason"]) return { "should_stop_if_no_harness_can_justify_a_new_adjacent_probe": ( - should_stop or bool(infeasible_progress["stop_if_next_probe_repeats_family"]) + should_stop + or bool(infeasible_progress["stop_if_next_probe_repeats_family"]) + or bool(strong_incumbent["guard_active"]) ), "reason": reason, "infeasible_progress": infeasible_progress, + "strong_incumbent": strong_incumbent, "incumbent": { "trial_id": state.best_trial_id, "parallel_size": state.best_parallel_size, @@ -345,6 +356,51 @@ def _convergence_guard( } +def _strong_incumbent_guard( + state: StudyState, + recent_diagnostics: list[dict[str, Any]], +) -> dict[str, Any]: + default = { + "guard_active": False, + "reason": "no_strong_incumbent_yet", + "baseline_trial_id": None, + "baseline_request_rate_per_gpu": None, + "incumbent_gain_vs_baseline": None, + } + if state.best_trial_id is None or state.best_request_rate_per_gpu is None: + return default + completed = [ + item + for item in recent_diagnostics + if item.get("status") == "completed" + and isinstance(item.get("best_request_rate_per_gpu"), (int, float)) + ] + if len(completed) < 2: + return default + baseline = completed[0] + baseline_rate = float(baseline["best_request_rate_per_gpu"]) + incumbent_rate = float(state.best_request_rate_per_gpu) + if baseline_rate <= 0: + return default + gain = incumbent_rate / baseline_rate + latest = recent_diagnostics[-1] if recent_diagnostics else {} + if state.best_trial_id == latest.get("trial_id") and gain >= 3.0: + return { + "guard_active": True, + "reason": "incumbent_exceeds_baseline_by_3x_and_latest_trial_is_best", + "baseline_trial_id": baseline.get("trial_id"), + "baseline_request_rate_per_gpu": baseline_rate, + "incumbent_gain_vs_baseline": gain, + } + return { + **default, + "baseline_trial_id": baseline.get("trial_id"), + "baseline_request_rate_per_gpu": baseline_rate, + "incumbent_gain_vs_baseline": gain, + "reason": "need_more_evidence_before_strong_incumbent_stop", + } + + def _infeasible_progress_guard(recent_diagnostics: list[dict[str, Any]]) -> dict[str, Any]: points = [ point @@ -474,6 +530,7 @@ def _proposal_rules() -> list[str]: "First decide the active bottleneck from recent_trial_diagnostics.", "Pick at most one primary knob family from knob_harnesses unless the history proves a coupled change is needed.", "Use adjacent legal values around the incumbent; avoid broad exploratory jumps.", + "When strong_incumbent.guard_active is true, do not propose runtime-only tweaks unless the relevant harness guard is positively satisfied by same-topology evidence.", "If infeasible_progress blocks the last primary knob family, do not continue that family; switch families with direct bottleneck evidence or set should_stop=true.", "If a proposed config is likely to reduce request_rate_per_gpu under the active guard, set should_stop=true instead of exploring.", "Never repeat an already tested config signature.", diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index ddc060e..4c677d9 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -365,6 +365,58 @@ class CoreFlowTests(unittest.TestCase): ] ) + def test_harness_strong_incumbent_guard_after_large_gain(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_request_rate_per_gpu=0.21, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=1, + best_request_rate=0.035, + best_request_rate_per_gpu=0.035, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + parallel_size=2, + best_request_rate=0.42, + best_request_rate_per_gpu=0.21, + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 2, + "data-parallel-size": 1, + }, + }, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={ + "prompt_tokens_p95": 7628, + "prompt_tokens_p99": 8102, + "prompt_tail_ratio_p95_p50": 3.83, + }, + state=state, + ) + guard = context["convergence_guard"]["strong_incumbent"] + self.assertTrue(guard["guard_active"]) + self.assertGreaterEqual(guard["incumbent_gain_vs_baseline"], 3.0) + self.assertTrue( + context["convergence_guard"][ + "should_stop_if_no_harness_can_justify_a_new_adjacent_probe" + ] + ) + def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)