From 1b8f5a3af18383fa5794145657c5d34e44fff0d3 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Tue, 30 Jun 2026 14:10:19 +0800 Subject: [PATCH] Integrate descriptor runtime candidates into harness --- src/aituner/engine_adapters/vllm.py | 2 +- src/aituner/harness.py | 222 ++++++++++++++++++++++++++++ src/aituner/mechanism_planner.py | 71 +++++---- tests/test_core_flow.py | 113 ++++++++++++++ tests/test_mechanism_planner.py | 23 ++- 5 files changed, 400 insertions(+), 31 deletions(-) diff --git a/src/aituner/engine_adapters/vllm.py b/src/aituner/engine_adapters/vllm.py index 4947da6..6e83b00 100644 --- a/src/aituner/engine_adapters/vllm.py +++ b/src/aituner/engine_adapters/vllm.py @@ -55,7 +55,7 @@ def default_vllm_descriptors(*, tunable_flags: Iterable[str]) -> tuple[KnobDescr mechanisms=("kv_memory_capacity", "launch_feasibility"), search_geometry="bounded_fraction", operators=("coordinate_line_search", "frontier_delta_projection"), - constraints=KnobConstraints(min_value=0.0, max_value=1.0), + constraints=KnobConstraints(min_value=0.0, max_value=0.97), directional_effects={ "increase": ("kv_memory_capacity",), "decrease": ("launch_feasibility",), diff --git a/src/aituner/harness.py b/src/aituner/harness.py index cb97f7b..3b65fde 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -9,7 +9,12 @@ from .config_signature import ( effective_config_signature as _shared_effective_config_signature, normalized_config_patch as _shared_normalized_config_patch, ) +from .engine_adapters.vllm import default_vllm_descriptors from .lca import EPSILON, WorkloadProfile +from .mechanism_planner import ( + CoordinateSearchPolicy, + coordinate_line_search_candidates, +) from .spec import ConfigPatch, Proposal, StudySpec, StudyState, TrialSummary @@ -1644,9 +1649,226 @@ def _runtime_candidate_actions( effective_config_signature=signature, ) ) + production_covered_knobs = _candidate_action_runtime_knobs(actions) + actions.extend( + _descriptor_runtime_candidate_actions( + study, + window_summary, + anchor_flags, + runtime_base_patch, + top_bottleneck, + bottleneck_hypotheses, + recent_diagnostics, + production_covered_knobs=production_covered_knobs, + topology_settled=topology_settled, + seen_signatures=seen_signatures, + blocked_candidates=blocked_candidates, + ) + ) return actions +def _descriptor_runtime_candidate_actions( + study: StudySpec, + window_summary: dict[str, Any], + anchor_flags: dict[str, Any], + runtime_base_patch: dict[str, Any], + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], + recent_diagnostics: list[dict[str, Any]], + *, + production_covered_knobs: set[str], + topology_settled: bool, + seen_signatures: set[str], + blocked_candidates: list[dict[str, Any]], +) -> list[dict[str, Any]]: + descriptors = _engine_knob_descriptors(study) + if not descriptors: + return [] + evidence_weights = _mechanism_evidence_weights( + study, + window_summary, + top_bottleneck, + bottleneck_hypotheses, + ) + candidates = coordinate_line_search_candidates( + current_config=anchor_flags, + descriptors=tuple( + descriptor + for descriptor in descriptors + if descriptor.name in anchor_flags and anchor_flags.get(descriptor.name) is not None + ), + evidence_weights=evidence_weights, + policy=CoordinateSearchPolicy( + initial_relative_step=1.0, + initial_fraction_step=0.1, + step_multipliers=(1.0, 2.0, 4.0), + min_score=0.25, + ), + ) + actions: list[dict[str, Any]] = [] + for candidate in candidates: + patch = {**runtime_base_patch, **candidate.patch} + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) + if candidate.knob in production_covered_knobs: + blocked_candidates.append( + _blocked_candidate( + action_id=candidate.action_id, + knob_family=f"descriptor:{candidate.mechanism}", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_descriptor_knob_has_production_candidate", + effective_config_signature=signature, + score=candidate.score, + score_factors=candidate.score_factors, + ) + ) + continue + if _recent_runtime_knob_probe_count(candidate.knob, recent_diagnostics) >= 2: + blocked_candidates.append( + _blocked_candidate( + action_id=candidate.action_id, + knob_family=f"descriptor:{candidate.mechanism}", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_descriptor_knob_already_covered_by_recent_probes", + effective_config_signature=signature, + score=candidate.score, + score_factors=candidate.score_factors, + ) + ) + continue + if signature in seen_signatures: + blocked_candidates.append( + _blocked_candidate( + action_id=candidate.action_id, + knob_family=f"descriptor:{candidate.mechanism}", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + score=candidate.score, + score_factors=candidate.score_factors, + ) + ) + continue + score_cap = 0.52 if topology_settled else 0.45 + score = min( + score_cap, + 0.22 + + candidate.score * 0.65 + + _information_gain(bottleneck_hypotheses, "runtime") * 0.5, + ) + actions.append( + _runtime_action( + action_id=candidate.action_id, + knob_family=f"descriptor:{candidate.mechanism}", + score=score, + score_factors={ + **candidate.score_factors, + "planner_score": candidate.score, + "information_gain": round( + _information_gain(bottleneck_hypotheses, "runtime"), 4 + ), + "regression_risk": round( + candidate.score_factors.get("direction_risk", 0.0) + + candidate.score_factors.get("step_risk", 0.0), + 4, + ), + }, + patch=patch, + hypothesis=( + "Descriptor-driven coordinate line search over a serving mechanism " + f"({candidate.mechanism}) selected {candidate.knob} " + f"{candidate.direction} without using a knob-specific candidate rule." + ), + expected_effects=[ + "test one declarative knob mechanism while preserving the anchor topology", + "treat a positive result as transferable mechanism evidence, not a fixed recipe", + "reject or shrink this direction if the measured per-GPU SLO objective does not improve", + ], + ) + ) + seen_signatures.add(signature) + return actions[:8] + + +def _candidate_action_runtime_knobs(actions: list[dict[str, Any]]) -> set[str]: + knobs: set[str] = set() + for action in actions: + patch = action.get("config_patch") + if not isinstance(patch, dict): + continue + flag_patch = patch.get("flag_patch") + if not isinstance(flag_patch, dict): + continue + knobs.update(set(flag_patch) & _RUNTIME_KEYS) + return knobs + + +def _recent_runtime_knob_probe_count( + knob: str, + recent_diagnostics: list[dict[str, Any]], +) -> int: + count = 0 + for item in recent_diagnostics: + patch = item.get("config_patch") + if not isinstance(patch, dict): + continue + flag_patch = patch.get("flag_patch") + if isinstance(flag_patch, dict) and knob in flag_patch: + count += 1 + return count + + +def _engine_knob_descriptors(study: StudySpec): + engine_name = str(study.engine.engine_name or "").lower() + if engine_name == "vllm": + return default_vllm_descriptors(tunable_flags=study.engine.tunable_flags) + return () + + +def _mechanism_evidence_weights( + study: StudySpec, + window_summary: dict[str, Any], + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], +) -> dict[str, float]: + confidence = _hypothesis_confidence(bottleneck_hypotheses, top_bottleneck) + if confidence <= 0: + confidence = 0.35 + weights: dict[str, float] = {} + + def add(mechanism: str, weight: float) -> None: + weights[mechanism] = max(weights.get(mechanism, 0.0), round(weight, 4)) + + if top_bottleneck == "ttft_prefill": + add("prefill_scheduling", 0.78 * confidence) + add("prefill_tail_latency", 0.58 * confidence) + add("admission_capacity", 0.64 * confidence) + add("kv_memory_capacity", 0.58 * confidence) + elif top_bottleneck == "admission_or_queueing": + add("admission_capacity", 0.82 * confidence) + add("kv_memory_capacity", 0.62 * confidence) + add("decode_batching", 0.48 * confidence) + add("prefill_scheduling", 0.36 * confidence) + elif top_bottleneck == "decode_tpot": + add("kv_memory_pressure", 0.74 * confidence) + add("kv_memory_capacity", 0.56 * confidence) + add("decode_batching", 0.48 * confidence) + elif top_bottleneck == "launch_or_memory": + add("launch_feasibility", 0.82 * confidence) + add("kv_memory_pressure", 0.56 * confidence) + else: + add("admission_capacity", 0.32) + add("kv_memory_capacity", 0.28) + + prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95")) + tail_ratio = _as_float(window_summary.get("prompt_tail_ratio_p95_p50")) + if _length_regime(prompt_p95, tail_ratio) != "short_or_moderate": + add("prefill_scheduling", 0.36) + if study.trace.request_mode == "decode_only": + add("decode_batching", 0.36) + return weights + + def _frontier_delta_projection_actions( study: StudySpec, state: StudyState, diff --git a/src/aituner/mechanism_planner.py b/src/aituner/mechanism_planner.py index a9a4125..3e1427c 100644 --- a/src/aituner/mechanism_planner.py +++ b/src/aituner/mechanism_planner.py @@ -11,6 +11,7 @@ from .knob_descriptor import KnobDescriptor class CoordinateSearchPolicy: initial_relative_step: float = 1.0 initial_fraction_step: float = 0.05 + step_multipliers: tuple[float, ...] = (1.0,) grow_factor: float = 1.5 shrink_factor: float = 0.5 min_score: float = 0.0 @@ -60,36 +61,44 @@ def coordinate_line_search_candidates( continue state = states.get((descriptor.name, direction)) current = descriptor.current_value(current_config) - target = _propose_value( - descriptor=descriptor, - current=current, - direction=direction, - state=state, - policy=policy, + multipliers = ( + (1.0,) if descriptor.search_geometry == "toggle" else policy.step_multipliers ) - if target is None or target == current: - continue - risk = _direction_risk(descriptor, direction, evidence_weights) - score = max(0.0, evidence - risk) - candidates.append( - MechanismCandidate( - action_id=( - f"coordinate_line_search:{descriptor.search_geometry}:" - f"{descriptor.name}:{direction}:{_stable_token(current)}->{_stable_token(target)}" - ), - knob=descriptor.name, - mechanism=mechanism, - operator="coordinate_line_search", + for multiplier in multipliers: + target = _propose_value( + descriptor=descriptor, + current=current, direction=direction, - patch={descriptor.name: target}, - score=round(score, 4), - score_factors={ - "mechanism_evidence": round(evidence, 4), - "direction_risk": round(risk, 4), - }, - evidence_refs=(mechanism,), + state=state, + policy=policy, + step_multiplier=multiplier, + ) + if target is None or target == current: + continue + risk = _direction_risk(descriptor, direction, evidence_weights) + step_risk = _step_risk(multiplier) + score = max(0.0, evidence - risk - step_risk) + candidates.append( + MechanismCandidate( + action_id=( + f"coordinate_line_search:{descriptor.search_geometry}:" + f"{descriptor.name}:{direction}:step={_stable_token(multiplier)}:" + f"{_stable_token(current)}->{_stable_token(target)}" + ), + knob=descriptor.name, + mechanism=mechanism, + operator="coordinate_line_search", + direction=direction, + patch={descriptor.name: target}, + score=round(score, 4), + score_factors={ + "mechanism_evidence": round(evidence, 4), + "direction_risk": round(risk, 4), + "step_risk": round(step_risk, 4), + }, + evidence_refs=(mechanism,), + ) ) - ) candidates.sort(key=lambda item: (item.score, item.action_id), reverse=True) return tuple(candidates) @@ -122,6 +131,10 @@ def _direction_risk( return min(0.5, 0.2 * max(float(evidence_weights.get(item, 0.0)) for item in risks)) +def _step_risk(multiplier: float) -> float: + return min(0.2, max(0.0, float(multiplier) - 1.0) * 0.04) + + def _propose_value( *, descriptor: KnobDescriptor, @@ -129,6 +142,7 @@ def _propose_value( direction: str, state: CoordinateOperatorState | None, policy: CoordinateSearchPolicy, + step_multiplier: float, ) -> Any | None: if descriptor.search_geometry == "toggle": if not isinstance(current, bool): @@ -143,6 +157,7 @@ def _propose_value( if state is not None and state.trust_radius is not None else policy.initial_fraction_step ) + radius *= max(float(step_multiplier), 0.0) if direction == "decrease": target = value - radius else: @@ -157,6 +172,7 @@ def _propose_value( if state is not None and state.trust_radius is not None else 1.0 ) + radius *= max(float(step_multiplier), 0.0) return _canonicalize_value( descriptor, value - radius if direction == "decrease" else value + radius, @@ -171,6 +187,7 @@ def _propose_value( if state is not None and state.trust_radius is not None else policy.initial_relative_step ) + radius *= max(float(step_multiplier), 0.0) factor = max(1.0 + radius, 1.01) target = value / factor if direction == "decrease" else value * factor return _canonicalize_value(descriptor, target) diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index f75a96e..540397b 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -2594,6 +2594,119 @@ class CoreFlowTests(unittest.TestCase): ) self.assertNotIn("tensor-parallel-size", proposal.config_patch.flag_patch) + def test_descriptor_candidates_expose_bad_runtime_recovery_without_preempting_topology( + self, + ) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + slo_overrides={ + "ttft_rule": {"kind": "fixed_ms", "threshold_ms": 4000}, + "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50}, + }, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 2, + "data-parallel-size": 1, + "gpu-memory-utilization": 0.5, + "max-num-seqs": 8, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "gpu-memory-utilization", + "max-num-seqs", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [2, 4, 8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.05, + "best_request_rate": 3.4667, + "best_pass_rate": 0.9663, + "probes": [ + { + "threshold": 0.05, + "feasible": True, + "payload": { + "request_rate": 3.4667, + "pass_rate": 0.9663, + "latency_summary": {"failed_reason_counts": {}}, + }, + }, + { + "threshold": 0.08, + "feasible": False, + "payload": { + "request_rate": 4.0, + "pass_rate": 0.5, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 120} + }, + }, + }, + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_request_rate=3.4667, + best_request_rate_per_gpu=1.73335, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=2, + best_request_rate=3.4667, + best_request_rate_per_gpu=1.73335, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={ + "prompt_tokens_p95": 6500, + "prompt_tail_ratio_p95_p50": 3.0, + }, + state=state, + ) + + next_action = context["experiment_plan"]["next_action"] + self.assertEqual(next_action["knob_family"], "topology") + descriptor_patches = [ + action["config_patch"]["flag_patch"] + for action in context["experiment_plan"]["candidate_actions"] + if str(action["knob_family"]).startswith("descriptor:") + ] + self.assertTrue( + any(patch.get("max-num-seqs") == 24 for patch in descriptor_patches) + ) + self.assertTrue( + any( + patch.get("gpu-memory-utilization") == 0.9 + for patch in descriptor_patches + ) + ) + def test_harness_stops_gpu_mem_util_climb_after_tied_same_topology_probe(self) -> None: """A same-topology gpu-memory-utilization probe must improve per-GPU rate before the hill-climb continues; launch success alone is not evidence to keep climbing.""" diff --git a/tests/test_mechanism_planner.py b/tests/test_mechanism_planner.py index 4fee15c..08cf9a2 100644 --- a/tests/test_mechanism_planner.py +++ b/tests/test_mechanism_planner.py @@ -4,7 +4,10 @@ import unittest from aituner.engine_adapters.vllm import default_vllm_descriptors from aituner.knob_descriptor import KnobConstraints, KnobDescriptor -from aituner.mechanism_planner import coordinate_line_search_candidates +from aituner.mechanism_planner import ( + CoordinateSearchPolicy, + coordinate_line_search_candidates, +) class MechanismPlannerTests(unittest.TestCase): @@ -56,12 +59,26 @@ class MechanismPlannerTests(unittest.TestCase): descriptor = default_vllm_descriptors(tunable_flags=("gpu-memory-utilization",))[0] candidates = coordinate_line_search_candidates( - current_config={"gpu-memory-utilization": 0.98}, + current_config={"gpu-memory-utilization": 0.96}, descriptors=(descriptor,), evidence_weights={"kv_memory_capacity": 0.8}, ) - self.assertEqual(candidates[0].patch, {"gpu-memory-utilization": 1.0}) + self.assertEqual(candidates[0].patch, {"gpu-memory-utilization": 0.97}) + + def test_coordinate_search_can_emit_larger_same_operator_steps(self) -> None: + descriptor = default_vllm_descriptors(tunable_flags=("max-num-seqs",))[0] + + candidates = coordinate_line_search_candidates( + current_config={"max-num-seqs": 8}, + descriptors=(descriptor,), + evidence_weights={"admission_capacity": 0.9}, + policy=CoordinateSearchPolicy(step_multipliers=(1.0, 2.0)), + ) + + patches = [candidate.patch for candidate in candidates] + self.assertIn({"max-num-seqs": 16}, patches) + self.assertIn({"max-num-seqs": 24}, patches) if __name__ == "__main__":