diff --git a/src/aituner/cli.py b/src/aituner/cli.py index 05878b1..5323c61 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -6,7 +6,11 @@ import sys from pathlib import Path from .compare import run_compare -from .harness import build_harness_context, build_harness_stop_proposal +from .harness import ( + build_harness_context, + build_harness_guided_proposal, + build_harness_stop_proposal, +) from .job import append_job, build_trial_job from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text from .spec import Proposal, SpecError, load_study_spec, to_jsonable @@ -179,13 +183,25 @@ def cmd_study_tune(args: argparse.Namespace) -> int: proposal_text = json.dumps(to_jsonable(stop_proposal), ensure_ascii=False) proposal_name = f"harness-stop-{state.next_trial_index:04d}" else: - if study.llm.endpoint is None: - raise SpecError( - "No proposal files provided, study.llm.endpoint is not configured, " - "and the harness stop guard did not fire." + guided_proposal = ( + build_harness_guided_proposal(harness_context) + if harness_context is not None + else None + ) + if guided_proposal is not None: + proposal_text = json.dumps( + to_jsonable(guided_proposal), + ensure_ascii=False, ) - proposal_text = call_llm_for_proposal(policy=study.llm, prompt=prompt) - proposal_name = f"proposal-{state.next_trial_index:04d}" + proposal_name = f"harness-proposal-{state.next_trial_index:04d}" + else: + if study.llm.endpoint is None: + raise SpecError( + "No proposal files provided, study.llm.endpoint is not configured, " + "and the harness stop guard did not fire." + ) + proposal_text = call_llm_for_proposal(policy=study.llm, prompt=prompt) + proposal_name = f"proposal-{state.next_trial_index:04d}" raw_proposal_path = store.study_root(study.study_id) / "proposals" / f"{proposal_name}.raw.txt" raw_proposal_path.write_text(proposal_text, encoding="utf-8") proposal = parse_proposal_text(proposal_text, study) @@ -212,10 +228,14 @@ def cmd_study_tune(args: argparse.Namespace) -> int: result = run_trial(trial_spec_path) state = store.ingest_trial_results(study.study_id) executed.append( - { - "trial_id": trial.trial_id, - "proposal_name": proposal_name, - "proposal_source": str(proposal_source) if proposal_source else "llm", + { + "trial_id": trial.trial_id, + "proposal_name": proposal_name, + "proposal_source": ( + "harness" + if proposal_name.startswith("harness-proposal-") + else str(proposal_source) if proposal_source else "llm" + ), "best_sampling_u": result.get("best_sampling_u"), "best_request_rate": result.get("best_request_rate"), "best_pass_rate": result.get("best_pass_rate"), diff --git a/src/aituner/harness.py b/src/aituner/harness.py index d6096b3..da20248 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -42,6 +42,7 @@ def build_harness_context( "recent_trial_diagnostics": recent_diagnostics, "convergence_guard": _convergence_guard(state, recent_diagnostics), "harness_stop": _harness_stop_decision(study, state, recent_diagnostics), + "harness_proposal": _harness_proposal_decision(study, state, recent_diagnostics), "knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics), "proposal_rules": _proposal_rules(), } @@ -74,6 +75,39 @@ def build_harness_stop_proposal(context: dict[str, Any]) -> Proposal | None: ) +def build_harness_guided_proposal(context: dict[str, Any]) -> Proposal | None: + proposal = context.get("harness_proposal") + if not isinstance(proposal, dict) or not proposal.get("should_propose"): + return None + patch = proposal.get("config_patch") + if not isinstance(patch, dict): + return None + flag_patch = patch.get("flag_patch") + env_patch = patch.get("env_patch") + if not isinstance(flag_patch, dict) or not isinstance(env_patch, dict): + return None + reason = str(proposal.get("reason") or "harness_guided_probe") + diagnosis = str(proposal.get("diagnosis") or reason) + return Proposal( + observation=( + "Harness selected a deterministic first validation probe before " + f"requesting an LLM proposal: {reason}." + ), + diagnosis=diagnosis, + config_patch=ConfigPatch(env_patch=dict(env_patch), flag_patch=dict(flag_patch)), + expected_effects=[ + str(item) + for item in proposal.get("expected_effects", []) + if isinstance(item, str) + ], + why_not_previous_failures=( + "The proposal is based on the first completed baseline trial and does not " + "repeat a prior failed configuration." + ), + should_stop=False, + ) + + def render_harness_context(context: dict[str, Any]) -> str: return json.dumps(context, ensure_ascii=False, indent=2) @@ -511,6 +545,106 @@ def _harness_stop_decision( } +def _harness_proposal_decision( + study: StudySpec, + state: StudyState, + recent_diagnostics: list[dict[str, Any]], +) -> dict[str, Any]: + default = { + "should_propose": False, + "reason": "no_deterministic_harness_proposal", + "diagnosis": "Defer to the LLM proposal policy.", + "config_patch": {"env_patch": {}, "flag_patch": {}}, + "expected_effects": [], + } + if len(state.trials) != 1 or len(recent_diagnostics) != 1: + return default + baseline = recent_diagnostics[0] + if baseline.get("status") != "completed": + return default + if not isinstance(baseline.get("best_request_rate_per_gpu"), (int, float)): + return default + active_bottleneck = str(baseline.get("active_bottleneck") or "") + if active_bottleneck not in {"ttft_prefill", "decode_tpot"}: + return { + **default, + "reason": "baseline_bottleneck_does_not_require_tp_first_probe", + "diagnosis": f"Baseline bottleneck is {active_bottleneck or 'unknown'}.", + } + if "tensor-parallel-size" not in set(study.engine.tunable_flags): + return { + **default, + "reason": "tensor_parallel_size_not_tunable", + } + failed_signatures = { + _config_signature(item.get("config_patch") if isinstance(item, dict) else None) + for item in recent_diagnostics + if item.get("failure_stage") == "engine_launch" + } + base_flags = dict(study.engine.base_flags) + baseline_patch = baseline.get("config_patch") + if isinstance(baseline_patch, dict) and isinstance(baseline_patch.get("flag_patch"), dict): + base_flags.update(baseline_patch["flag_patch"]) + current_tp = _parse_int_like(base_flags.get("tensor-parallel-size", 1), default=1) + current_dp = _parse_int_like(base_flags.get("data-parallel-size", 1), default=1) + next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp) + if next_tp is None: + return { + **default, + "reason": "no_legal_adjacent_tensor_parallel_probe", + } + flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} + signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + if signature in failed_signatures: + return { + **default, + "reason": "adjacent_tensor_parallel_probe_previously_failed", + } + return { + "should_propose": True, + "reason": "first_adjacent_tensor_parallel_probe_for_latency_bottleneck", + "diagnosis": ( + f"Baseline high-load probes indicate {active_bottleneck}; the generic " + "topology harness validates the adjacent legal TP increase before " + "runtime-only or DP/EP probes." + ), + "config_patch": {"env_patch": {}, "flag_patch": flag_patch}, + "expected_effects": [ + "reduce per-request latency pressure at higher offered load", + "validate the nearest TP topology before broader runtime search", + ], + "baseline_trial_id": baseline.get("trial_id"), + "active_bottleneck": active_bottleneck, + } + + +def _next_allowed_tp(study: StudySpec, *, current_tp: int, current_dp: int) -> int | None: + constraints = study.engine.topology_constraints + if constraints is not None and constraints.allowed_tensor_parallel_sizes: + candidates = sorted({int(item) for item in constraints.allowed_tensor_parallel_sizes}) + else: + candidates = [1, 2, 4, 8] + for candidate in candidates: + if candidate <= current_tp: + continue + tp_dp_product = candidate * current_dp + if tp_dp_product > study.hardware.gpu_count: + continue + if constraints is not None: + if ( + constraints.allowed_tp_dp_products + and tp_dp_product not in constraints.allowed_tp_dp_products + ): + continue + if ( + constraints.require_tp_dp_product_equals_gpu_count + and tp_dp_product != study.hardware.gpu_count + ): + continue + return candidate + return None + + def _search_high_saturation_guard( study: StudySpec, state: StudyState, @@ -971,3 +1105,32 @@ def _optional_float(value: Any) -> float | None: if isinstance(value, (int, float)): return float(value) return None + + +def _parse_int_like(value: Any, *, default: int) -> int: + if value is None: + return default + if isinstance(value, bool): + return default + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str) and value.strip(): + try: + return int(value.strip()) + except ValueError: + return default + return default + + +def _config_signature(config_patch: Any) -> str: + if not isinstance(config_patch, dict): + config_patch = {} + env_patch = config_patch.get("env_patch") + flag_patch = config_patch.get("flag_patch") + payload = { + "env_patch": env_patch if isinstance(env_patch, dict) else {}, + "flag_patch": flag_patch if isinstance(flag_patch, dict) else {}, + } + return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 7bfa1ec..49463d3 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -13,7 +13,11 @@ from aituner.compare import load_compare_spec, run_compare from aituner.engine import build_launch_recipe from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy from aituner.job import append_job, build_trial_job -from aituner.harness import build_harness_context, build_harness_stop_proposal +from aituner.harness import ( + build_harness_context, + build_harness_guided_proposal, + build_harness_stop_proposal, +) from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations @@ -596,6 +600,75 @@ class CoreFlowTests(unittest.TestCase): self.assertIsNotNone(proposal) self.assertTrue(proposal.should_stop) + def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "tunable_flags": ["tensor-parallel-size", "data-parallel-size"], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_data_parallel_sizes": [1, 2], + "allowed_tp_dp_products": [1, 2, 4], + }, + }, + ) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.25, + "best_request_rate": 2.0, + "best_pass_rate": 1.0, + "probes": [ + { + "threshold": 0.5, + "feasible": False, + "payload": { + "request_count": 100, + "pass_rate": 0.6, + "request_rate": 4.0, + "early_stopped": True, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"tpot_ms>50.0": 40}, + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_request_rate=2.0, + best_request_rate_per_gpu=2.0, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=2.0, + best_request_rate_per_gpu=2.0, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 2048}, + state=state, + ) + proposal = build_harness_guided_proposal(context) + self.assertIsNotNone(proposal) + self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 2}) + self.assertFalse(proposal.should_stop) + def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)