Stop after strong incumbent harness gains

This commit is contained in:
2026-04-26 01:29:05 +08:00
parent a53445868e
commit 29d0548e06
2 changed files with 110 additions and 1 deletions

View File

@@ -135,6 +135,7 @@ def _knob_harnesses(
"guards": [
"Keep MBT changes within a conservative trust region.",
"Do not raise MBT after OOM or launch failures involving memory-related knobs.",
"Do not raise MBT when the incumbent MBT already covers prompt p99 unless same-topology history proves prefill fragmentation is the bottleneck.",
],
"active_now": active_bottleneck == "ttft_prefill",
}
@@ -308,6 +309,7 @@ def _convergence_guard(
recent_diagnostics: list[dict[str, Any]],
) -> dict[str, Any]:
infeasible_progress = _infeasible_progress_guard(recent_diagnostics)
strong_incumbent = _strong_incumbent_guard(state, recent_diagnostics)
completed = [
item
for item in recent_diagnostics
@@ -329,12 +331,21 @@ def _convergence_guard(
reason = "need_more_evidence_before_stop"
if not should_stop and infeasible_progress["plateau_detected"]:
reason = str(infeasible_progress["reason"])
if (
not should_stop
and not infeasible_progress["plateau_detected"]
and strong_incumbent["guard_active"]
):
reason = str(strong_incumbent["reason"])
return {
"should_stop_if_no_harness_can_justify_a_new_adjacent_probe": (
should_stop or bool(infeasible_progress["stop_if_next_probe_repeats_family"])
should_stop
or bool(infeasible_progress["stop_if_next_probe_repeats_family"])
or bool(strong_incumbent["guard_active"])
),
"reason": reason,
"infeasible_progress": infeasible_progress,
"strong_incumbent": strong_incumbent,
"incumbent": {
"trial_id": state.best_trial_id,
"parallel_size": state.best_parallel_size,
@@ -345,6 +356,51 @@ def _convergence_guard(
}
def _strong_incumbent_guard(
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
) -> dict[str, Any]:
default = {
"guard_active": False,
"reason": "no_strong_incumbent_yet",
"baseline_trial_id": None,
"baseline_request_rate_per_gpu": None,
"incumbent_gain_vs_baseline": None,
}
if state.best_trial_id is None or state.best_request_rate_per_gpu is None:
return default
completed = [
item
for item in recent_diagnostics
if item.get("status") == "completed"
and isinstance(item.get("best_request_rate_per_gpu"), (int, float))
]
if len(completed) < 2:
return default
baseline = completed[0]
baseline_rate = float(baseline["best_request_rate_per_gpu"])
incumbent_rate = float(state.best_request_rate_per_gpu)
if baseline_rate <= 0:
return default
gain = incumbent_rate / baseline_rate
latest = recent_diagnostics[-1] if recent_diagnostics else {}
if state.best_trial_id == latest.get("trial_id") and gain >= 3.0:
return {
"guard_active": True,
"reason": "incumbent_exceeds_baseline_by_3x_and_latest_trial_is_best",
"baseline_trial_id": baseline.get("trial_id"),
"baseline_request_rate_per_gpu": baseline_rate,
"incumbent_gain_vs_baseline": gain,
}
return {
**default,
"baseline_trial_id": baseline.get("trial_id"),
"baseline_request_rate_per_gpu": baseline_rate,
"incumbent_gain_vs_baseline": gain,
"reason": "need_more_evidence_before_strong_incumbent_stop",
}
def _infeasible_progress_guard(recent_diagnostics: list[dict[str, Any]]) -> dict[str, Any]:
points = [
point
@@ -474,6 +530,7 @@ def _proposal_rules() -> list[str]:
"First decide the active bottleneck from recent_trial_diagnostics.",
"Pick at most one primary knob family from knob_harnesses unless the history proves a coupled change is needed.",
"Use adjacent legal values around the incumbent; avoid broad exploratory jumps.",
"When strong_incumbent.guard_active is true, do not propose runtime-only tweaks unless the relevant harness guard is positively satisfied by same-topology evidence.",
"If infeasible_progress blocks the last primary knob family, do not continue that family; switch families with direct bottleneck evidence or set should_stop=true.",
"If a proposed config is likely to reduce request_rate_per_gpu under the active guard, set should_stop=true instead of exploring.",
"Never repeat an already tested config signature.",

View File

@@ -365,6 +365,58 @@ class CoreFlowTests(unittest.TestCase):
]
)
def test_harness_strong_incumbent_guard_after_large_gain(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate_per_gpu=0.21,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=1,
best_request_rate=0.035,
best_request_rate_per_gpu=0.035,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=2,
best_request_rate=0.42,
best_request_rate_per_gpu=0.21,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"data-parallel-size": 1,
},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={
"prompt_tokens_p95": 7628,
"prompt_tokens_p99": 8102,
"prompt_tail_ratio_p95_p50": 3.83,
},
state=state,
)
guard = context["convergence_guard"]["strong_incumbent"]
self.assertTrue(guard["guard_active"])
self.assertGreaterEqual(guard["incumbent_gain_vs_baseline"], 3.0)
self.assertTrue(
context["convergence_guard"][
"should_stop_if_no_harness_can_justify_a_new_adjacent_probe"
]
)
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)