From 0622e23817f267ea3caf0acf1bf63a43a55a4332 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Wed, 6 May 2026 02:46:07 +0800 Subject: [PATCH] Guide harness runtime refinement after TP --- src/aituner/harness.py | 126 +++++++++++++++++++++++++++++++++++++--- tests/test_core_flow.py | 88 ++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 9 deletions(-) diff --git a/src/aituner/harness.py b/src/aituner/harness.py index da20248..fa8626d 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -42,7 +42,12 @@ def build_harness_context( "recent_trial_diagnostics": recent_diagnostics, "convergence_guard": _convergence_guard(state, recent_diagnostics), "harness_stop": _harness_stop_decision(study, state, recent_diagnostics), - "harness_proposal": _harness_proposal_decision(study, state, recent_diagnostics), + "harness_proposal": _harness_proposal_decision( + study, + window_summary, + state, + recent_diagnostics, + ), "knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics), "proposal_rules": _proposal_rules(), } @@ -547,6 +552,7 @@ def _harness_stop_decision( def _harness_proposal_decision( study: StudySpec, + window_summary: dict[str, Any], state: StudyState, recent_diagnostics: list[dict[str, Any]], ) -> dict[str, Any]: @@ -557,9 +563,22 @@ def _harness_proposal_decision( "config_patch": {"env_patch": {}, "flag_patch": {}}, "expected_effects": [], } + tested_signatures = { + _config_signature(item.get("config_patch") if isinstance(item, dict) else None) + for item in recent_diagnostics + } + baseline = recent_diagnostics[0] if recent_diagnostics else {} + runtime_refinement = _runtime_refinement_proposal( + study, + window_summary, + state, + recent_diagnostics, + tested_signatures=tested_signatures, + ) + if runtime_refinement["should_propose"]: + return runtime_refinement if len(state.trials) != 1 or len(recent_diagnostics) != 1: return default - baseline = recent_diagnostics[0] if baseline.get("status") != "completed": return default if not isinstance(baseline.get("best_request_rate_per_gpu"), (int, float)): @@ -576,11 +595,6 @@ def _harness_proposal_decision( **default, "reason": "tensor_parallel_size_not_tunable", } - failed_signatures = { - _config_signature(item.get("config_patch") if isinstance(item, dict) else None) - for item in recent_diagnostics - if item.get("failure_stage") == "engine_launch" - } base_flags = dict(study.engine.base_flags) baseline_patch = baseline.get("config_patch") if isinstance(baseline_patch, dict) and isinstance(baseline_patch.get("flag_patch"), dict): @@ -595,10 +609,10 @@ def _harness_proposal_decision( } flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) - if signature in failed_signatures: + if signature in tested_signatures: return { **default, - "reason": "adjacent_tensor_parallel_probe_previously_failed", + "reason": "adjacent_tensor_parallel_probe_already_tested", } return { "should_propose": True, @@ -618,6 +632,100 @@ def _harness_proposal_decision( } +def _runtime_refinement_proposal( + study: StudySpec, + window_summary: dict[str, Any], + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + *, + tested_signatures: set[str], +) -> dict[str, Any]: + default = { + "should_propose": False, + "reason": "runtime_refinement_not_applicable", + "diagnosis": "Runtime refinement does not apply yet.", + "config_patch": {"env_patch": {}, "flag_patch": {}}, + "expected_effects": [], + } + if not state.best_trial_id or not recent_diagnostics: + return default + best = next( + (item for item in recent_diagnostics if item.get("trial_id") == state.best_trial_id), + None, + ) + if not best or best.get("status") != "completed": + return default + if recent_diagnostics[-1].get("trial_id") != state.best_trial_id: + return default + best_patch = best.get("config_patch") + if not isinstance(best_patch, dict): + return default + best_flags = best_patch.get("flag_patch") + if not isinstance(best_flags, dict): + best_flags = {} + best_tp = _parse_int_like(best_flags.get("tensor-parallel-size"), default=1) + if best_tp <= 1: + return default + tunable = set(study.engine.tunable_flags) + flag_patch: dict[str, Any] = {"tensor-parallel-size": best_tp} + if "gpu-memory-utilization" in tunable: + flag_patch["gpu-memory-utilization"] = 0.95 + if "enable-chunked-prefill" in tunable: + flag_patch["enable-chunked-prefill"] = True + if "max-num-batched-tokens" not in tunable: + return default + + current_mbt = _parse_int_like(best_flags.get("max-num-batched-tokens"), default=0) + if current_mbt <= 0: + target_mbt = _initial_mbt_from_window(window_summary) + reason = "same_topology_runtime_seed_after_tp_incumbent" + else: + target_mbt = _next_mbt_step(current_mbt) + reason = "same_topology_mbt_growth_after_feasible_runtime_incumbent" + if target_mbt is None: + return { + **default, + "reason": "no_larger_mbt_step_available", + } + flag_patch["max-num-batched-tokens"] = target_mbt + signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + if signature in tested_signatures: + return { + **default, + "reason": "same_topology_runtime_probe_already_tested", + } + return { + "should_propose": True, + "reason": reason, + "diagnosis": ( + "A TP incumbent improved request_rate_per_gpu; refine batching on the " + "same topology before trying DP/EP or broad runtime changes." + ), + "config_patch": {"env_patch": {}, "flag_patch": flag_patch}, + "expected_effects": [ + "preserve the incumbent topology", + "increase batching headroom while staying inside one runtime family", + ], + "incumbent_trial_id": best.get("trial_id"), + } + + +def _initial_mbt_from_window(window_summary: dict[str, Any]) -> int: + prompt_p99 = _as_float(window_summary.get("prompt_tokens_p99")) + target = max(8192, int(prompt_p99 * 2.0)) + return min(32768, _round_up_to_multiple(target, 1024)) + + +def _next_mbt_step(current_mbt: int) -> int | None: + if current_mbt >= 32768: + return None + return min(32768, _round_up_to_multiple(int(current_mbt * 1.5), 1024)) + + +def _round_up_to_multiple(value: int, multiple: int) -> int: + return ((max(value, 1) + multiple - 1) // multiple) * multiple + + def _next_allowed_tp(study: StudySpec, *, current_tp: int, current_dp: int) -> int | None: constraints = study.engine.topology_constraints if constraints is not None and constraints.allowed_tensor_parallel_sizes: diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 49463d3..ce96403 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -669,6 +669,94 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 2}) self.assertFalse(proposal.should_stop) + def test_harness_guided_runtime_seed_preserves_tp_incumbent(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "tunable_flags": [ + "tensor-parallel-size", + "gpu-memory-utilization", + "enable-chunked-prefill", + "max-num-batched-tokens", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_tp_dp_products": [1, 2, 4], + }, + }, + ) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-0002.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.75, + "best_request_rate": 6.0, + "best_pass_rate": 1.0, + "probes": [ + { + "threshold": 0.75, + "feasible": True, + "payload": { + "request_count": 100, + "pass_rate": 1.0, + "request_rate": 6.0, + "early_stopped": False, + "early_stop_reason": "", + "latency_summary": {"failed_reason_counts": {}}, + }, + } + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_request_rate=6.0, + best_request_rate_per_gpu=3.0, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=2.0, + best_request_rate_per_gpu=2.0, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + best_request_rate=6.0, + best_request_rate_per_gpu=3.0, + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 2}, + }, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p99": 8100}, + state=state, + ) + proposal = build_harness_guided_proposal(context) + self.assertIsNotNone(proposal) + self.assertEqual( + proposal.config_patch.flag_patch, + { + "tensor-parallel-size": 2, + "gpu-memory-utilization": 0.95, + "enable-chunked-prefill": True, + "max-num-batched-tokens": 16384, + }, + ) + def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)