From 2d03b1cd4cb3e5743b22c2669d325da12c2a53a2 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Tue, 12 May 2026 21:00:49 +0800 Subject: [PATCH] Add SLO-driven topology frontier harness guard --- src/aituner/harness.py | 157 ++++++++++++++++++++++++++++++++++ tests/test_core_flow.py | 183 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 340 insertions(+) diff --git a/src/aituner/harness.py b/src/aituner/harness.py index ee274c7..321c43b 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -521,6 +521,13 @@ def _harness_stop_decision( "reason": high_saturation["reason"], "evidence": high_saturation, } + topology_frontier = _topology_frontier_status(study, state, recent_diagnostics) + if topology_frontier["frontier_open"]: + return { + "should_stop": False, + "reason": "topology_frontier_requires_probe", + "evidence": topology_frontier, + } guard = _convergence_guard(state, recent_diagnostics) if guard["deterministic_stop"]: return { @@ -567,7 +574,17 @@ def _harness_proposal_decision( _config_signature(item.get("config_patch") if isinstance(item, dict) else None) for item in recent_diagnostics } + tested_signatures.update(_state_tested_signatures(state)) baseline = recent_diagnostics[0] if recent_diagnostics else {} + topology_frontier = _topology_frontier_proposal( + study, + window_summary, + state, + recent_diagnostics, + tested_signatures=tested_signatures, + ) + if topology_frontier["should_propose"]: + return topology_frontier runtime_refinement = _runtime_refinement_proposal( study, window_summary, @@ -632,6 +649,146 @@ def _harness_proposal_decision( } +def _topology_frontier_proposal( + study: StudySpec, + window_summary: dict[str, Any], + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + *, + tested_signatures: set[str], +) -> dict[str, Any]: + default = { + "should_propose": False, + "reason": "topology_frontier_not_applicable", + "diagnosis": "No untested topology frontier is justified by the current SLO/workload profile.", + "config_patch": {"env_patch": {}, "flag_patch": {}}, + "expected_effects": [], + } + frontier = _topology_frontier_status(study, state, recent_diagnostics, window_summary) + if not frontier["frontier_open"]: + return {**default, "reason": frontier["reason"]} + flag_patch = frontier.get("flag_patch") + if not isinstance(flag_patch, dict): + return {**default, "reason": "topology_frontier_patch_missing"} + signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + if signature in tested_signatures: + return {**default, "reason": "topology_frontier_already_tested"} + return { + "should_propose": True, + "reason": "topology_frontier_probe_for_slo_pressure", + "diagnosis": ( + "The current SLO/workload profile still shows latency pressure and a higher " + "legal TP frontier has not been measured. Validate that topology before " + "declaring the incumbent converged or spending more trials on local runtime knobs." + ), + "config_patch": {"env_patch": {}, "flag_patch": flag_patch}, + "expected_effects": [ + "test whether extra model parallelism improves the configured TTFT/TPOT pass rate", + "cover the next legal topology frontier before local runtime refinement", + "avoid overfitting the harness to the first strong incumbent", + ], + "frontier": frontier, + } + + +def _topology_frontier_status( + study: StudySpec, + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + window_summary: dict[str, Any] | None = None, +) -> dict[str, Any]: + default = { + "frontier_open": False, + "reason": "topology_frontier_not_needed", + "current_trial_id": state.best_trial_id, + "active_bottleneck": None, + "current_tp": None, + "current_dp": None, + "next_tp": None, + "flag_patch": None, + } + if "tensor-parallel-size" not in set(study.engine.tunable_flags): + return {**default, "reason": "tensor_parallel_size_not_tunable"} + if not state.best_trial_id: + return {**default, "reason": "no_incumbent_for_topology_frontier"} + + best = next( + (item for item in recent_diagnostics if item.get("trial_id") == state.best_trial_id), + None, + ) + if best is None: + return {**default, "reason": "incumbent_not_in_recent_harness_history"} + if best.get("status") != "completed": + return {**default, "reason": "incumbent_not_completed"} + if not isinstance(best.get("best_request_rate_per_gpu"), (int, float)): + return {**default, "reason": "incumbent_has_no_feasible_rate"} + + active_bottleneck = str(best.get("active_bottleneck") or "") + if active_bottleneck in {"unknown", "none_obvious", ""} and recent_diagnostics: + active_bottleneck = str(recent_diagnostics[-1].get("active_bottleneck") or "") + if active_bottleneck not in {"ttft_prefill", "decode_tpot"}: + return { + **default, + "reason": "active_bottleneck_does_not_require_tp_frontier", + "active_bottleneck": active_bottleneck, + } + + flags = _effective_flags_for_item(study, best) + current_tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1) + current_dp = _parse_int_like(flags.get("data-parallel-size"), default=1) + next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp) + if next_tp is None: + return { + **default, + "reason": "no_legal_higher_tp_frontier", + "active_bottleneck": active_bottleneck, + "current_tp": current_tp, + "current_dp": current_dp, + } + + flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} + base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1) + if current_dp != base_dp: + flag_patch["data-parallel-size"] = current_dp + signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + if signature in _state_tested_signatures(state): + return { + **default, + "reason": "higher_tp_frontier_already_tested", + "active_bottleneck": active_bottleneck, + "current_tp": current_tp, + "current_dp": current_dp, + "next_tp": next_tp, + "flag_patch": flag_patch, + } + return { + "frontier_open": True, + "reason": "higher_tp_frontier_unmeasured_under_slo_pressure", + "current_trial_id": state.best_trial_id, + "active_bottleneck": active_bottleneck, + "current_tp": current_tp, + "current_dp": current_dp, + "next_tp": next_tp, + "flag_patch": flag_patch, + } + + +def _effective_flags_for_item(study: StudySpec, item: dict[str, Any]) -> dict[str, Any]: + flags = dict(study.engine.base_flags) + patch = item.get("config_patch") + if isinstance(patch, dict) and isinstance(patch.get("flag_patch"), dict): + flags.update(patch["flag_patch"]) + return flags + + +def _state_tested_signatures(state: StudyState) -> set[str]: + return { + _config_signature(trial.config_patch) + for trial in state.trials + if isinstance(trial.config_patch, dict) + } + + def _runtime_refinement_proposal( study: StudySpec, window_summary: dict[str, Any], diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index dbdfe54..3ebac99 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -826,6 +826,189 @@ class CoreFlowTests(unittest.TestCase): }, ) + def test_harness_validates_unmeasured_tp_frontier_before_runtime_refinement(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "tunable_flags": [ + "tensor-parallel-size", + "max-num-batched-tokens", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_tp_dp_products": [1, 2, 4], + }, + }, + ) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-0002.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.96, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_count": 100, + "pass_rate": 0.96, + "request_rate": 2.0, + "latency_summary": {"failed_reason_counts": {}}, + }, + }, + { + "threshold": 0.75, + "feasible": False, + "payload": { + "request_count": 100, + "pass_rate": 0.6, + "request_rate": 3.0, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"tpot_ms>25.0": 40} + }, + }, + }, + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_request_rate=2.0, + best_request_rate_per_gpu=1.0, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=0.5, + best_request_rate_per_gpu=0.5, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + best_request_rate=2.0, + best_request_rate_per_gpu=1.0, + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 2}, + }, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.8}, + state=state, + ) + proposal = build_harness_guided_proposal(context) + self.assertIsNotNone(proposal) + self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 4}) + self.assertEqual( + context["harness_proposal"]["reason"], + "topology_frontier_probe_for_slo_pressure", + ) + + def test_harness_stop_blocked_until_slo_driven_topology_frontier_is_measured(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "tunable_flags": ["tensor-parallel-size", "max-num-seqs"], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_tp_dp_products": [1, 2, 4], + }, + }, + ) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-0002.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.96, + "probes": [ + { + "threshold": 0.75, + "feasible": False, + "payload": { + "request_count": 100, + "pass_rate": 0.6, + "request_rate": 3.0, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"tpot_ms>25.0": 40} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_request_rate=2.0, + best_request_rate_per_gpu=1.0, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=0.5, + best_request_rate_per_gpu=0.5, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + best_request_rate=2.0, + best_request_rate_per_gpu=1.0, + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 2}, + }, + ), + TrialSummary( + trial_id="trial-0003", + status="completed", + best_request_rate=1.98, + best_request_rate_per_gpu=0.99, + config_patch={"env_patch": {}, "flag_patch": {"max-num-seqs": 8}}, + ), + TrialSummary( + trial_id="trial-0004", + status="completed", + best_request_rate=1.98, + best_request_rate_per_gpu=0.99, + config_patch={"env_patch": {}, "flag_patch": {"max-num-seqs": 16}}, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.8}, + state=state, + ) + self.assertFalse(context["harness_stop"]["should_stop"]) + self.assertEqual(context["harness_stop"]["reason"], "topology_frontier_requires_probe") + def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)