diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 641178e..921bbc9 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -42,6 +42,9 @@ _GMU_SAFE_CEILING = 0.97 _PREFILL_QUANTUM_HEAD_OF_LINE_RATIO = 1.0 _PREFILL_QUANTUM_FRAGMENTATION_RATIO = 0.5 _ADMISSION_PRESSURE_STEP_UP = 1.5 +_FRONTIER_DELTA_MIN_ABS_GAIN = 0.02 +_FRONTIER_DELTA_MIN_REL_GAIN = 0.03 +_FRONTIER_DELTA_PROJECTED_INCUMBENT_FLOOR = 0.98 def build_harness_context( @@ -1115,6 +1118,16 @@ def _candidate_actions( blocked_candidates, ) ) + candidates.extend( + _frontier_delta_projection_actions( + study, + trial_profiles, + top_bottleneck, + bottleneck_hypotheses, + tested_signatures, + blocked_candidates, + ) + ) candidates.extend( _runtime_candidate_actions( study, @@ -1626,6 +1639,376 @@ def _runtime_candidate_actions( return actions +def _frontier_delta_projection_actions( + study: StudySpec, + trial_profiles: list[dict[str, Any]], + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], + tested_signatures: set[str], + blocked_candidates: list[dict[str, Any]], +) -> list[dict[str, Any]]: + if not (set(study.engine.tunable_flags) & _RUNTIME_KEYS): + return [] + anchors = _pareto_frontier_anchor_profiles(study, trial_profiles) + if len(anchors) < 2: + return [] + deltas = _positive_runtime_delta_records(study, trial_profiles) + if not deltas: + return [] + + actions: list[dict[str, Any]] = [] + seen_signatures = set(tested_signatures) + incumbent_rate = max( + ( + _profile_request_rate_per_gpu(profile) + for profile in trial_profiles + if profile.get("status") == "completed" + ), + default=0.0, + ) + for delta in deltas: + source_topology_key = delta["source_topology_key"] + source_parallel = int(delta["source_parallel_size"]) + source_rate = _as_float(delta["source_rate_per_gpu"]) + source_gain = _as_float(delta["gain"]) + for target in anchors: + target_flags = _effective_flags_for_item(study, target) + target_topology_key = _topology_key(target_flags) + if target_topology_key == source_topology_key: + continue + target_parallel = _topology_parallel_size(target_flags) + target_rate = _profile_request_rate_per_gpu(target) + runtime_delta = { + key: value + for key, value in delta["runtime_delta"].items() + if key in study.engine.tunable_flags and target_flags.get(key) != value + } + action_id = ( + f"project_runtime_delta_from_{delta['source_trial_id']}" + f"_to_{target.get('trial_id')}" + ) + if not runtime_delta: + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="frontier-delta-projection", + config_patch={ + "env_patch": {}, + "flag_patch": { + **_preserve_topology_patch(study, target_flags), + **_preserve_runtime_patch(study, target_flags), + }, + }, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=_effective_config_signature( + study, + { + "env_patch": {}, + "flag_patch": { + **_preserve_topology_patch(study, target_flags), + **_preserve_runtime_patch(study, target_flags), + }, + }, + ), + ) + ) + continue + + patch = { + **_preserve_topology_patch(study, target_flags), + **_preserve_runtime_patch(study, target_flags), + **runtime_delta, + } + signature = _effective_config_signature( + study, + {"env_patch": {}, "flag_patch": patch}, + ) + if source_rate + EPSILON < target_rate + max( + _FRONTIER_DELTA_MIN_ABS_GAIN, + target_rate * _FRONTIER_DELTA_MIN_REL_GAIN, + ): + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="frontier-delta-projection", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_delta_source_not_above_target_anchor", + effective_config_signature=signature, + ) + ) + continue + if ( + incumbent_rate > 0 + and target_rate + source_gain + EPSILON + < incumbent_rate * _FRONTIER_DELTA_PROJECTED_INCUMBENT_FLOOR + ): + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="frontier-delta-projection", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_projected_gain_below_incumbent_floor", + effective_config_signature=signature, + ) + ) + continue + if signature in seen_signatures: + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="frontier-delta-projection", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) + continue + + score, factors = _score_frontier_delta_projection( + top_bottleneck=top_bottleneck, + source_gain=delta["gain"], + source_baseline_rate=delta["baseline_rate_per_gpu"], + source_rate=delta["source_rate_per_gpu"], + source_parallel=source_parallel, + target_rate=target_rate, + target_parallel=target_parallel, + bottleneck_hypotheses=bottleneck_hypotheses, + ) + if score <= 0: + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="frontier-delta-projection", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_non_positive_candidate_score", + effective_config_signature=signature, + score=round(score, 4), + score_factors=factors, + ) + ) + continue + + actions.append( + _runtime_action( + action_id=action_id, + knob_family="frontier-delta-projection", + score=score, + score_factors=factors, + patch=patch, + hypothesis=( + "A measured runtime-only delta improved one topology anchor; " + "apply the same delta to another Pareto anchor to test whether " + "the mechanism transfers without inheriting the source topology." + ), + expected_effects=[ + "preserve the target anchor topology and only apply measured runtime changes", + "separate runtime-mechanism gain from topology-resource tradeoffs", + "reject if the projected delta fails to improve request_rate_per_gpu", + ], + ) + ) + seen_signatures.add(signature) + + actions.sort(key=lambda item: _as_float(item.get("score")), reverse=True) + return actions[:8] + + +def _pareto_frontier_anchor_profiles( + study: StudySpec, + trial_profiles: list[dict[str, Any]], +) -> list[dict[str, Any]]: + best_by_topology: dict[tuple[int, int, int, bool], dict[str, Any]] = {} + for profile in trial_profiles: + if profile.get("status") != "completed": + continue + rate = _profile_request_rate_per_gpu(profile) + if rate <= 0: + continue + flags = _effective_flags_for_item(study, profile) + topology_key = _topology_key(flags) + incumbent = best_by_topology.get(topology_key) + if incumbent is None: + best_by_topology[topology_key] = profile + continue + incumbent_rate = _profile_request_rate_per_gpu(incumbent) + raw_rate = _as_float(profile.get("performance", {}).get("best_request_rate")) + incumbent_raw_rate = _as_float( + incumbent.get("performance", {}).get("best_request_rate") + ) + if (rate, raw_rate) > (incumbent_rate, incumbent_raw_rate): + best_by_topology[topology_key] = profile + + anchors = list(best_by_topology.values()) + frontier: list[dict[str, Any]] = [] + for candidate in anchors: + candidate_rate = _profile_request_rate_per_gpu(candidate) + candidate_parallel = _topology_parallel_size( + _effective_flags_for_item(study, candidate) + ) + dominated = False + for other in anchors: + if other is candidate: + continue + other_rate = _profile_request_rate_per_gpu(other) + other_parallel = _topology_parallel_size(_effective_flags_for_item(study, other)) + if ( + other_parallel <= candidate_parallel + and other_rate + EPSILON >= candidate_rate + and (other_parallel < candidate_parallel or other_rate > candidate_rate + EPSILON) + ): + dominated = True + break + if not dominated: + frontier.append(candidate) + + frontier.sort( + key=lambda item: ( + _topology_parallel_size(_effective_flags_for_item(study, item)), + -_profile_request_rate_per_gpu(item), + str(item.get("trial_id") or ""), + ) + ) + return frontier + + +def _positive_runtime_delta_records( + study: StudySpec, + trial_profiles: list[dict[str, Any]], +) -> list[dict[str, Any]]: + completed = [ + profile + for profile in trial_profiles + if profile.get("status") == "completed" + and _profile_request_rate_per_gpu(profile) > 0 + ] + records: list[dict[str, Any]] = [] + for source in completed: + source_patch = source.get("config_patch") + if not isinstance(source_patch, dict): + continue + source_flag_patch = source_patch.get("flag_patch") + if not isinstance(source_flag_patch, dict): + continue + source_runtime_keys = set(source_flag_patch) & _RUNTIME_KEYS & set(study.engine.tunable_flags) + if not source_runtime_keys: + continue + + source_flags = _effective_flags_for_item(study, source) + source_topology_key = _topology_key(source_flags) + source_rate = _profile_request_rate_per_gpu(source) + best_baseline: dict[str, Any] | None = None + best_baseline_delta: dict[str, Any] = {} + best_baseline_gain = 0.0 + for baseline in completed: + if baseline is source: + continue + baseline_flags = _effective_flags_for_item(study, baseline) + if _topology_key(baseline_flags) != source_topology_key: + continue + baseline_rate = _profile_request_rate_per_gpu(baseline) + runtime_delta = _runtime_delta_patch( + study, + baseline_flags=baseline_flags, + source_flags=source_flags, + source_flag_patch=source_flag_patch, + ) + if not runtime_delta: + continue + gain = source_rate - baseline_rate + min_gain = max( + _FRONTIER_DELTA_MIN_ABS_GAIN, + baseline_rate * _FRONTIER_DELTA_MIN_REL_GAIN, + ) + if gain + EPSILON < min_gain: + continue + if ( + best_baseline is None + or baseline_rate > _profile_request_rate_per_gpu(best_baseline) + ): + best_baseline = baseline + best_baseline_delta = runtime_delta + best_baseline_gain = gain + + if best_baseline is None: + continue + records.append( + { + "source_trial_id": source.get("trial_id"), + "baseline_trial_id": best_baseline.get("trial_id"), + "source_topology_key": source_topology_key, + "source_parallel_size": _topology_parallel_size(source_flags), + "runtime_delta": best_baseline_delta, + "gain": best_baseline_gain, + "source_rate_per_gpu": source_rate, + "baseline_rate_per_gpu": _profile_request_rate_per_gpu(best_baseline), + } + ) + + records.sort( + key=lambda item: ( + _as_float(item.get("gain")), + _as_float(item.get("source_rate_per_gpu")), + ), + reverse=True, + ) + return records[:8] + + +def _runtime_delta_patch( + study: StudySpec, + *, + baseline_flags: dict[str, Any], + source_flags: dict[str, Any], + source_flag_patch: dict[str, Any], +) -> dict[str, Any]: + tunable = set(study.engine.tunable_flags) + patch: dict[str, Any] = {} + for key in sorted(_RUNTIME_KEYS & tunable & set(source_flag_patch)): + if baseline_flags.get(key) == source_flags.get(key): + continue + patch[key] = source_flags.get(key) + return patch + + +def _score_frontier_delta_projection( + *, + top_bottleneck: str, + source_gain: float, + source_baseline_rate: float, + source_rate: float, + source_parallel: int, + target_rate: float, + target_parallel: int, + bottleneck_hypotheses: list[dict[str, Any]], +) -> tuple[float, dict[str, Any]]: + relative_gain = source_gain / max(source_baseline_rate, EPSILON) + measured_delta_evidence = min(0.24, max(0.0, relative_gain) * 0.8) + if target_parallel < source_parallel: + resource_transfer = 0.16 + elif target_parallel == source_parallel: + resource_transfer = 0.06 + else: + resource_transfer = -0.04 + anchor_quality = min(0.12, max(0.0, target_rate) / max(source_rate, target_rate, EPSILON) * 0.12) + information_gain = _information_gain(bottleneck_hypotheses, "runtime") + score = 0.42 + measured_delta_evidence + resource_transfer + anchor_quality + information_gain + factors = { + "active_bottleneck": top_bottleneck, + "measured_delta_gain_per_gpu": round(source_gain, 4), + "measured_delta_relative_gain": round(relative_gain, 4), + "measured_delta_evidence": round(measured_delta_evidence, 4), + "source_parallel_size": source_parallel, + "target_parallel_size": target_parallel, + "resource_transfer": round(resource_transfer, 4), + "target_anchor_rate_per_gpu": round(target_rate, 4), + "source_delta_rate_per_gpu": round(source_rate, 4), + "anchor_quality": round(anchor_quality, 4), + "information_gain": round(information_gain, 4), + "regression_risk": 0.05, + } + return score, factors + + def _next_gpu_memory_utilization_target( study: StudySpec, anchor_flags: dict[str, Any], @@ -2253,6 +2636,25 @@ def _normalized_topology_flags(flags: dict[str, Any]) -> dict[str, Any]: } +def _topology_key(flags: dict[str, Any]) -> tuple[int, int, int, bool]: + normalized = _normalized_topology_flags(flags) + return ( + int(normalized["tensor-parallel-size"]), + int(normalized["data-parallel-size"]), + int(normalized["expert-parallel-size"]), + bool(normalized["enable-expert-parallel"]), + ) + + +def _topology_parallel_size(flags: dict[str, Any]) -> int: + normalized = _normalized_topology_flags(flags) + return max( + 1, + int(normalized["tensor-parallel-size"]) + * int(normalized["data-parallel-size"]), + ) + + def _effective_gpu_count(study: StudySpec) -> int: visible = str(study.engine.base_envs.get("CUDA_VISIBLE_DEVICES") or "").strip() if not visible: diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index cba3af5..0e47d2b 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -2720,6 +2720,157 @@ class CoreFlowTests(unittest.TestCase): ], ) + def test_harness_projects_measured_runtime_delta_to_other_frontier_anchor(self) -> None: + """A runtime improvement found on one topology must be tested on other + Pareto anchors before the harness can keep micro-tuning the source topology.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + slo_overrides={ + "ttft_rule": {"kind": "fixed_ms", "threshold_ms": 4000}, + "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50}, + }, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 2, + "data-parallel-size": 1, + "gpu-memory-utilization": 0.5, + "max-num-seqs": 8, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "gpu-memory-utilization", + "max-num-seqs", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [2, 4, 8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + latest_result_path = tmp_path / "trial-0005.json" + latest_result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.1, + "best_request_rate": 8.0, + "best_pass_rate": 0.96, + "probes": [ + { + "threshold": 0.1, + "feasible": True, + "payload": { + "request_count": 300, + "pass_rate": 0.96, + "request_rate": 8.0, + "latency_summary": {"failed_reason_counts": {}}, + }, + }, + { + "threshold": 0.12, + "feasible": False, + "payload": { + "request_count": 300, + "pass_rate": 0.6, + "request_rate": 9.0, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 100} + }, + }, + }, + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0005", + best_request_rate=8.0, + best_request_rate_per_gpu=2.0, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=2, + best_request_rate=3.466, + best_request_rate_per_gpu=1.733, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + parallel_size=4, + best_request_rate=6.95, + best_request_rate_per_gpu=1.7375, + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 4}, + }, + ), + TrialSummary( + trial_id="trial-0003", + status="completed", + parallel_size=8, + best_request_rate=8.0, + best_request_rate_per_gpu=1.0, + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 8}, + }, + ), + TrialSummary( + trial_id="trial-0004", + status="completed", + parallel_size=4, + best_request_rate=6.95, + best_request_rate_per_gpu=1.7375, + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 4, + "max-num-seqs": 16, + }, + }, + ), + TrialSummary( + trial_id="trial-0005", + status="completed", + parallel_size=4, + best_request_rate=8.0, + best_request_rate_per_gpu=2.0, + result_path=str(latest_result_path), + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 4, + "gpu-memory-utilization": 0.9, + }, + }, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 6500}, + state=state, + ) + next_action = context["experiment_plan"]["next_action"] + self.assertEqual(next_action["knob_family"], "frontier-delta-projection") + self.assertEqual( + next_action["config_patch"]["flag_patch"], + {"gpu-memory-utilization": 0.9}, + ) + self.assertIsNone(build_harness_stop_proposal(context)) + def test_harness_validates_unmeasured_tp_frontier_before_runtime_refinement(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)