diff --git a/docs/harness-tuning-progress.md b/docs/harness-tuning-progress.md index bc76205..c8de297 100644 --- a/docs/harness-tuning-progress.md +++ b/docs/harness-tuning-progress.md @@ -24,6 +24,8 @@ Improve AITuner convergence for the `dash0` internal vLLM + Qwen3.5-27B 0-8k cha - Adds TP, max-num-seqs, max-num-batched-tokens, chunked-prefill, and memory-utilization harnesses when those knobs are tunable. - Extracts compact recent trial diagnostics from result JSON files. - Adds a convergence guard based on recent completed trial performance. + - Adds an infeasible-progress guard: when recent all-infeasible trials at the same sampling threshold stop improving pass rate and p95 TTFT after changing one knob family, the next proposal must switch primary family or stop. + - Classifies `slo_pass_rate_unrecoverable` by latency failure counts first, so TTFT-heavy failures stay aligned to prefill/TP or batching harnesses instead of being treated as generic queueing. - Extended `src/aituner/trace.py`. - `summarize_window` now reports L-C-A features. - `TraceRequest` now carries optional metadata for `hash_ids`, turn, parent chat id, and trace type. @@ -43,7 +45,7 @@ Improve AITuner convergence for the `dash0` internal vLLM + Qwen3.5-27B 0-8k cha ## Local Verification - `python3 -m compileall -q src tests`: passed. -- `PYTHONPATH=src python3 -m unittest tests.test_core_flow`: passed, 59 tests. +- `PYTHONPATH=src python3 -m unittest tests.test_core_flow`: passed, 62 tests. - `pytest -q` and `python3 -m pytest -q`: not runnable locally because `pytest` is not installed. ## Remote Experiment Log @@ -82,10 +84,39 @@ Improve AITuner convergence for the `dash0` internal vLLM + Qwen3.5-27B 0-8k cha - This is not aligned with the paper's agentic loop, which evaluates the initial configuration first and then searches from measured feedback. - Action: update `study tune` so LLM-driven studies automatically materialize a baseline empty-patch trial first, unless `--skip-baseline` is passed. This should reduce early bad proposals because the first LLM edit will see real baseline bottleneck diagnostics and an incumbent request_rate_per_gpu. +### 2026-04-25 17:20-18:30 CST + +- r3 started with baseline-first enabled, but the full 0-8k run was too slow for fast iteration with raw chat completions. Stopped it before using it as a convergence signal. +- A fast validation using `max_requests_per_probe=160` was invalid: the trace is downsampled before threshold selection, so lower thresholds can end up with `request_count=0`. Do not use that result for performance claims. +- Prefill smoke v1 used `completion_tokens_override=1` but kept the TPOT SLO. That made TPOT missing failures dominate, so it was useful only for checking control flow, not for performance. + +### 2026-04-25 18:30-20:10 CST + +- Prefill smoke v2 used real dash0 internal vLLM, Qwen3.5-27B, the real 0-8k prompt distribution and arrivals, `completion_tokens_override=1`, and `tpot_rule=null`. +- Trial 0001 baseline TP1/DP1: + - sampling `0.0078125`: pass rate 0.270, mean TTFT 2033.9 ms, p95 TTFT 5656.7 ms, p99 TTFT 6832.8 ms. +- Trial 0002 TP1/DP2: + - sampling `0.0078125`: pass rate 0.277, mean TTFT 1766.9 ms, p95 TTFT 4215.3 ms, p99 TTFT 5801.7 ms. +- Trial 0003 TP1/DP4: + - sampling `0.0078125`: pass rate 0.345, mean TTFT 1668.9 ms, p95 TTFT 3818.4 ms, p99 TTFT 5804.9 ms. +- Trial 0004 TP1/DP8: + - sampling `0.0078125`: pass rate 0.345, mean TTFT 1675.7 ms, p95 TTFT 3823.4 ms. +- Interpretation: + - The harness improved directionality: after the measured baseline, proposals followed a consistent scale-out path and avoided random runtime-knob churn. + - The smoke result improved p95 TTFT by about 32% versus baseline at the low sampling threshold and improved pass rate from 0.270 to 0.345 within 3-4 trials. + - It did not reach the 95% pass-rate SLO in this smoke setting, so this is not a full proof of convergence to a good production config. + - DP8 did not improve over DP4, which exposed a gap: when every trial is infeasible, the prior convergence guard had no feasible incumbent and could not detect plateau. + +### 2026-04-25 20:10 CST + +- Added the all-infeasible plateau guard described above. +- Added unit coverage for: + - TTFT failure classification under `slo_pass_rate_unrecoverable`; + - blocking a repeat of the DP family after DP4 and DP8 show no material improvement at the same sampling threshold. +- Current status: the harness now has the mechanism needed to avoid continuing the exact DP-only direction seen in the smoke v2 plateau. The next real experiment should either switch to a bottleneck-justified mixed TP/DP candidate or return `should_stop=true`. + Remaining next steps: -1. Start a real harness-guided Qwen3.5-27B 0-8k chat tuning run from `configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json`. -2. Compare the first few iterations against the prior 12-iteration behavior: - - best request rate per GPU should improve or reach the known good region in fewer trials; - - proposals should follow the active bottleneck harness; - - if the incumbent has converged, the LLM should emit `should_stop=true` instead of proposing a weak exploratory config. +1. Push/pull the plateau-guard commit to `dash0`. +2. Re-run the remote unit suite. +3. Start the next real tuning run only after deciding whether to spend a full multi-hour run on the production SLO or a shorter prefill-only confirmation of the new plateau guard. diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 5d725eb..60714fe 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -267,13 +267,17 @@ def _active_bottleneck(probe: dict[str, Any] | None) -> str: if not compact: return "unknown" early_stop_reason = str(compact.get("early_stop_reason") or "") - if early_stop_reason.startswith("arrival_lag_s>") or early_stop_reason == "slo_pass_rate_unrecoverable": + if early_stop_reason.startswith("arrival_lag_s>"): return "admission_or_queueing" latency_summary = compact.get("latency_summary") if not isinstance(latency_summary, dict): + if early_stop_reason == "slo_pass_rate_unrecoverable": + return "admission_or_queueing" return "unknown" failed = latency_summary.get("failed_reason_counts") if not isinstance(failed, dict) or not failed: + if early_stop_reason == "slo_pass_rate_unrecoverable": + return "admission_or_queueing" return "none_obvious" ttft_count = sum(int(v) for k, v in failed.items() if str(k).startswith("ttft")) tpot_count = sum(int(v) for k, v in failed.items() if str(k).startswith("tpot")) @@ -301,6 +305,7 @@ def _convergence_guard( state: StudyState, recent_diagnostics: list[dict[str, Any]], ) -> dict[str, Any]: + infeasible_progress = _infeasible_progress_guard(recent_diagnostics) completed = [ item for item in recent_diagnostics @@ -320,9 +325,14 @@ def _convergence_guard( reason = "recent_trials_still_show_material_variation" elif state.best_request_rate_per_gpu is not None: reason = "need_more_evidence_before_stop" + if not should_stop and infeasible_progress["plateau_detected"]: + reason = str(infeasible_progress["reason"]) return { - "should_stop_if_no_harness_can_justify_a_new_adjacent_probe": should_stop, + "should_stop_if_no_harness_can_justify_a_new_adjacent_probe": ( + should_stop or bool(infeasible_progress["stop_if_next_probe_repeats_family"]) + ), "reason": reason, + "infeasible_progress": infeasible_progress, "incumbent": { "trial_id": state.best_trial_id, "parallel_size": state.best_parallel_size, @@ -333,11 +343,136 @@ def _convergence_guard( } +def _infeasible_progress_guard(recent_diagnostics: list[dict[str, Any]]) -> dict[str, Any]: + points = [ + point + for item in recent_diagnostics + if (point := _infeasible_progress_point(item)) is not None + ] + default = { + "plateau_detected": False, + "stop_if_next_probe_repeats_family": False, + "reason": "insufficient_infeasible_history", + "blocked_primary_family": None, + "recommended_next_action": "continue_adjacent_probe_if_a_harness_justifies_it", + "comparison": None, + } + if len(points) < 2: + return default + + prev, latest = points[-2], points[-1] + if not _same_threshold(prev["threshold"], latest["threshold"]): + return { + **default, + "reason": "recent_infeasible_trials_used_different_sampling_thresholds", + } + family = _changed_primary_family(prev["config_patch"], latest["config_patch"]) + pass_rate_delta = latest["pass_rate"] - prev["pass_rate"] + p95_delta_ratio = _relative_delta(latest["ttft_p95_ms"], prev["ttft_p95_ms"]) + p99_delta_ratio = _relative_delta(latest["ttft_p99_ms"], prev["ttft_p99_ms"]) + plateau = ( + family is not None + and abs(pass_rate_delta) <= 0.01 + and p95_delta_ratio is not None + and p95_delta_ratio >= -0.02 + ) + comparison = { + "previous": prev, + "latest": latest, + "changed_primary_family": family, + "pass_rate_delta": pass_rate_delta, + "ttft_p95_delta_ratio": p95_delta_ratio, + "ttft_p99_delta_ratio": p99_delta_ratio, + } + if not plateau: + return { + **default, + "reason": "recent_infeasible_trials_still_show_material_progress", + "comparison": comparison, + } + return { + "plateau_detected": True, + "stop_if_next_probe_repeats_family": True, + "reason": f"{family}_plateau_on_infeasible_trials", + "blocked_primary_family": family, + "recommended_next_action": ( + "switch_primary_family_with_a_specific_bottleneck_rationale_or_return_should_stop" + ), + "comparison": comparison, + } + + +def _infeasible_progress_point(item: dict[str, Any]) -> dict[str, Any] | None: + if item.get("status") != "completed" or item.get("best_request_rate") is not None: + return None + probe_summary = item.get("probe_summary") + if not isinstance(probe_summary, dict): + return None + probe = probe_summary.get("all_infeasible") or probe_summary.get("last_probe") + compact = _compact_probe(probe if isinstance(probe, dict) else None) + if not compact or compact.get("feasible"): + return None + latency_summary = compact.get("latency_summary") + if not isinstance(latency_summary, dict): + latency_summary = {} + ttft = latency_summary.get("ttft_ms") + if not isinstance(ttft, dict): + ttft = {} + return { + "trial_id": item.get("trial_id"), + "threshold": _as_float(compact.get("threshold")), + "request_rate": _as_float(compact.get("request_rate")), + "pass_rate": _as_float(compact.get("pass_rate")), + "ttft_p95_ms": _optional_float(ttft.get("p95")), + "ttft_p99_ms": _optional_float(ttft.get("p99")), + "active_bottleneck": item.get("active_bottleneck"), + "config_patch": item.get("config_patch") if isinstance(item.get("config_patch"), dict) else {}, + } + + +def _changed_primary_family( + previous_patch: dict[str, Any], + latest_patch: dict[str, Any], +) -> str | None: + previous_flags = previous_patch.get("flag_patch") if isinstance(previous_patch, dict) else {} + latest_flags = latest_patch.get("flag_patch") if isinstance(latest_patch, dict) else {} + if not isinstance(previous_flags, dict) or not isinstance(latest_flags, dict): + return None + changed: list[str] = [] + for key, family in ( + ("tensor-parallel-size", "tensor-parallel-size"), + ("data-parallel-size", "data-parallel-size"), + ("expert-parallel-size", "expert-parallel-size"), + ("max-num-seqs", "max-num-seqs"), + ("max-num-batched-tokens", "max-num-batched-tokens"), + ("gpu-memory-utilization", "gpu-memory-utilization"), + ("enable-chunked-prefill", "enable-chunked-prefill"), + ): + if previous_flags.get(key) != latest_flags.get(key): + changed.append(family) + if len(changed) == 1: + return changed[0] + if changed: + return "mixed:" + ",".join(changed) + return None + + +def _same_threshold(left: float, right: float) -> bool: + return abs(left - right) <= max(abs(left), abs(right), 1.0) * 1e-6 + + +def _relative_delta(new: float | None, old: float | None) -> float | None: + if new is None or old is None or old <= 0: + return None + return (new - old) / old + + def _proposal_rules() -> list[str]: return [ "First decide the active bottleneck from recent_trial_diagnostics.", "Pick at most one primary knob family from knob_harnesses unless the history proves a coupled change is needed.", "Use adjacent legal values around the incumbent; avoid broad exploratory jumps.", + "If infeasible_progress blocks the last primary knob family, do not continue that family; switch families with direct bottleneck evidence or set should_stop=true.", "If a proposed config is likely to reduce request_rate_per_gpu under the active guard, set should_stop=true instead of exploring.", "Never repeat an already tested config signature.", "Return should_stop=true when the convergence guard fires and no active harness justifies another adjacent probe.", @@ -388,3 +523,11 @@ def _as_float(value: Any) -> float: if isinstance(value, (int, float)): return float(value) return 0.0 + + +def _optional_float(value: Any) -> float | None: + if isinstance(value, bool): + return None + if isinstance(value, (int, float)): + return float(value) + return None diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 351efd3..b056f0f 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -13,6 +13,7 @@ from aituner.compare import load_compare_spec, run_compare from aituner.engine import build_launch_recipe from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy from aituner.job import append_job, build_trial_job +from aituner.harness import build_harness_context from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations @@ -226,6 +227,143 @@ class CoreFlowTests(unittest.TestCase): self.assertIn("knob_harnesses", prompt) self.assertTrue(study_root.exists()) + def test_harness_uses_latency_failures_before_generic_unrecoverable(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + study = load_study_spec(study_path) + result_path = tmp_path / "trial-result.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "probes": [ + { + "threshold": 0.25, + "feasible": False, + "payload": { + "request_count": 100, + "pass_rate": 0.3, + "request_rate": 1.0, + "early_stopped": True, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": { + "ttft_ms>5000.0": 70, + "tpot_ms>50.0": 5, + }, + "ttft_ms": {"p95": 6500.0, "p99": 7200.0}, + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + context = build_harness_context( + study=study, + window_summary={ + "prompt_tokens_p95": 5000, + "prompt_tail_ratio_p95_p50": 3.0, + }, + state=state, + ) + self.assertEqual( + context["recent_trial_diagnostics"][0]["active_bottleneck"], + "ttft_prefill", + ) + + def test_harness_blocks_repeating_infeasible_plateau_family(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_expert_parallel_sizes": [1], + "allowed_tp_dp_products": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + trial_summaries = [] + for index, (dp, pass_rate, p95) in enumerate( + [(4, 0.345, 3818.4), (8, 0.345, 3823.4)], start=3 + ): + result_path = tmp_path / f"trial-{index:04d}.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_request_rate": None, + "all_infeasible_diagnostics": { + "threshold": 0.0078125, + "request_count": 148, + "request_rate": 0.22, + "pass_rate": pass_rate, + "early_stopped": True, + "early_stop_reason": "elapsed", + "latency_summary": { + "failed_reason_counts": {"ttft_ms>5000.0": 97}, + "ttft_ms": {"p95": p95, "p99": 5800.0}, + }, + }, + } + ), + encoding="utf-8", + ) + trial_summaries.append( + TrialSummary( + trial_id=f"trial-{index:04d}", + status="completed", + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 1, + "data-parallel-size": dp, + "expert-parallel-size": 1, + }, + }, + ) + ) + context = build_harness_context( + study=study, + window_summary={ + "prompt_tokens_p95": 7628, + "prompt_tail_ratio_p95_p50": 3.83, + }, + state=StudyState(study_id=study.study_id, trials=trial_summaries), + ) + guard = context["convergence_guard"]["infeasible_progress"] + self.assertTrue(guard["plateau_detected"]) + self.assertTrue(guard["stop_if_next_probe_repeats_family"]) + self.assertEqual(guard["blocked_primary_family"], "data-parallel-size") + self.assertTrue( + context["convergence_guard"][ + "should_stop_if_no_harness_can_justify_a_new_adjacent_probe" + ] + ) + def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)