diff --git a/src/aituner/harness.py b/src/aituner/harness.py index a8f50bf..2d7ac11 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -1396,36 +1396,75 @@ def _runtime_candidate_actions( if ( "gpu-memory-utilization" in tunable and topology_settled - and top_bottleneck in {"decode_tpot", "admission_or_queueing"} + and top_bottleneck in {"decode_tpot", "admission_or_queueing", "ttft_prefill"} ): - current_gmu = _parse_float_like( - anchor_flags.get("gpu-memory-utilization"), default=0.9 + target = _next_gpu_memory_utilization_target( + study, + anchor_flags, + recent_diagnostics, ) - if 0.0 < current_gmu < _GMU_SAFE_CEILING: - target = round(min(_GMU_SAFE_CEILING, current_gmu + _GMU_STEP), 4) - if target > current_gmu: - patch = {**runtime_base_patch, "gpu-memory-utilization": target} - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) - if signature not in tested_signatures: - actions.append( - _runtime_action( - action_id="raise_gpu_memory_utilization", - knob_family="gpu-memory-utilization", - score=0.4 + _information_gain(bottleneck_hypotheses, "runtime"), - patch=patch, - hypothesis=( - "Raise gpu-memory-utilization to add KV-cache headroom so the " - "decode-bound incumbent can sustain more concurrent decode." - ), - expected_effects=[ - "add KV-cache blocks for higher decode concurrency on the incumbent topology", - "reject if the higher memory target regresses request_rate_per_gpu or fails to launch", - ], - ) + if target is not None: + patch = {**runtime_base_patch, "gpu-memory-utilization": target} + signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + if signature not in tested_signatures: + actions.append( + _runtime_action( + action_id="raise_gpu_memory_utilization", + knob_family="gpu-memory-utilization", + score=0.5 + _information_gain(bottleneck_hypotheses, "runtime"), + patch=patch, + hypothesis=( + "Raise gpu-memory-utilization on the settled incumbent topology " + "to test whether extra KV-cache headroom moves the SLO frontier." + ), + expected_effects=[ + "add KV-cache blocks for higher concurrency on the incumbent topology", + "reject if the higher memory target regresses request_rate_per_gpu or fails to launch", + ], ) + ) return actions +def _next_gpu_memory_utilization_target( + study: StudySpec, + anchor_flags: dict[str, Any], + recent_diagnostics: list[dict[str, Any]], +) -> float | None: + current_gmu = _parse_float_like( + anchor_flags.get("gpu-memory-utilization"), default=0.9 + ) + if current_gmu <= 0 or current_gmu >= _GMU_SAFE_CEILING: + return None + anchor_topology = _normalized_topology_flags(anchor_flags) + successful_gmus: list[float] = [current_gmu] + failed_gmus: list[float] = [] + for item in recent_diagnostics: + patch = item.get("config_patch") + if not isinstance(patch, dict): + continue + flag_patch = patch.get("flag_patch") + if not isinstance(flag_patch, dict) or "gpu-memory-utilization" not in flag_patch: + continue + flags = _effective_flags_for_item(study, item) + if _normalized_topology_flags(flags) != anchor_topology: + continue + gmu = _parse_float_like(flag_patch.get("gpu-memory-utilization"), default=0.0) + if gmu <= 0: + continue + if item.get("status") == "completed": + successful_gmus.append(gmu) + elif item.get("status") == "failed": + failed_gmus.append(gmu) + climb_from = max(successful_gmus) + target = round(min(_GMU_SAFE_CEILING, climb_from + _GMU_STEP), 4) + if target <= climb_from: + return None + if any(failed <= target + EPSILON for failed in failed_gmus): + return None + return target + + def _runtime_action( *, action_id: str, diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 55b4c2b..4eaa5d9 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -1800,6 +1800,130 @@ class CoreFlowTests(unittest.TestCase): ) self.assertNotIn("gpu-memory-utilization", proposal.config_patch.flag_patch) + def test_harness_continues_gpu_mem_util_after_tied_same_topology_probe(self) -> None: + """After adjacent topology validation, gpu-memory-utilization should hill-climb + on the incumbent topology even if an earlier gmu step tied the incumbent and + did not become state.best_trial_id.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + slo_overrides={ + "ttft_rule": {"kind": "fixed_ms", "threshold_ms": 4000}, + "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50}, + }, + engine_overrides={ + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "gpu-memory-utilization", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2], + "allowed_tp_dp_products": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-0002.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.75, + "best_request_rate": 6.5, + "best_pass_rate": 1.0, + "probes": [ + { + "threshold": 0.75, + "feasible": True, + "payload": { + "request_count": 300, + "pass_rate": 1.0, + "request_rate": 6.5, + "latency_summary": {"failed_reason_counts": {}}, + }, + }, + { + "threshold": 0.765625, + "feasible": False, + "payload": { + "request_count": 300, + "pass_rate": 0.6, + "request_rate": 6.7, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 80} + }, + }, + }, + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_request_rate=6.5, + best_request_rate_per_gpu=3.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=2.2, + best_request_rate_per_gpu=2.2, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + best_request_rate=6.5, + best_request_rate_per_gpu=3.25, + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 2}, + }, + ), + TrialSummary( + trial_id="trial-0003", + status="completed", + best_request_rate=8.4, + best_request_rate_per_gpu=2.1, + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 4}, + }, + ), + TrialSummary( + trial_id="trial-0004", + status="completed", + best_request_rate=6.5, + best_request_rate_per_gpu=3.25, + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 2, + "gpu-memory-utilization": 0.92, + }, + }, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 1500}, + state=state, + ) + proposal = build_harness_guided_proposal(context) + self.assertIsNotNone(proposal) + self.assertEqual( + proposal.config_patch.flag_patch, + {"tensor-parallel-size": 2, "gpu-memory-utilization": 0.94}, + ) + def test_harness_validates_unmeasured_tp_frontier_before_runtime_refinement(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)