From 825d3e03e9016f9658d0c528e055951c3c974d64 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 26 Jun 2026 22:02:09 +0800 Subject: [PATCH] Add harness candidate set audit --- docs/aituner-roadmap.md | 4 + src/aituner/harness.py | 214 +++++++++++++++++++++++++++++++++++++++- tests/test_core_flow.py | 28 ++++++ 3 files changed, 245 insertions(+), 1 deletion(-) diff --git a/docs/aituner-roadmap.md b/docs/aituner-roadmap.md index 0dd0f00..11071e2 100644 --- a/docs/aituner-roadmap.md +++ b/docs/aituner-roadmap.md @@ -95,6 +95,10 @@ declarative intervention grammar + coverage-relative validator。 最低验收: - CandidateSet 完整枚举并持久化 snapshot; +- CandidateSet v1 先限定为当前 harness generator 实际构造出的 concrete candidates, + 不 claim 全 Cartesian knob space 枚举;`candidate_set_hash`、eligible/blocked + records 和 blocked reason summary 已在 harness context 中实现,独立 sidecar JSON + persistence 是下一片; - `harness_priority` 与 backend ranking 分离; - CoverageUnit 结构化,stop 不能只依赖 exact signature; - `search_high_saturated_by_incumbent` 不能绕过 CandidateSet coverage;对 `req/s/GPU` diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 88f2eb4..f175afa 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import hashlib from pathlib import Path from typing import Any @@ -1061,7 +1062,7 @@ def _experiment_plan( for item in recent_diagnostics } tested_signatures.update(_state_tested_signatures(study, state)) - candidates = _candidate_actions( + candidate_set = _candidate_action_set( study, window_summary, state, @@ -1070,12 +1071,18 @@ def _experiment_plan( bottleneck_hypotheses, tested_signatures=tested_signatures, ) + candidates = list(candidate_set["eligible_candidates"]) candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True) next_action = candidates[0] if candidates else None return { "planner_version": "profile-driven-v1", "bottleneck_hypotheses": bottleneck_hypotheses, "candidate_actions": candidates[:8], + "candidate_set": { + **candidate_set, + "eligible_candidates": candidates[:8], + "blocked_candidates": candidate_set["blocked_candidates"][:16], + }, "next_action": next_action, "stop_ready": next_action is None or _as_float(next_action.get("score")) < 0.35, "stop_rationale": ( @@ -1086,6 +1093,46 @@ def _experiment_plan( } +def _candidate_action_set( + study: StudySpec, + window_summary: dict[str, Any], + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + trial_profiles: list[dict[str, Any]], + bottleneck_hypotheses: list[dict[str, Any]], + *, + tested_signatures: set[str], +) -> dict[str, Any]: + blocked_candidates: list[dict[str, Any]] = [] + eligible_candidates = _candidate_actions( + study, + window_summary, + state, + recent_diagnostics, + trial_profiles, + bottleneck_hypotheses, + tested_signatures=tested_signatures, + blocked_candidates=blocked_candidates, + ) + _annotate_candidate_signatures(study, eligible_candidates) + eligible_candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True) + blocked_summary = _blocked_reason_summary(blocked_candidates) + candidate_set_hash = _candidate_set_hash(eligible_candidates, blocked_candidates) + return { + "version": "candidate-set-v1", + "signature_semantics": ( + "effective_config_fingerprint = sha256(normalized effective full-config JSON)" + ), + "candidate_set_hash": candidate_set_hash, + "tested_signature_count": len(tested_signatures), + "eligible_count": len(eligible_candidates), + "blocked_count": len(blocked_candidates), + "blocked_reason_summary": blocked_summary, + "eligible_candidates": eligible_candidates, + "blocked_candidates": blocked_candidates, + } + + def _candidate_actions( study: StudySpec, window_summary: dict[str, Any], @@ -1095,6 +1142,7 @@ def _candidate_actions( bottleneck_hypotheses: list[dict[str, Any]], *, tested_signatures: set[str], + blocked_candidates: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not recent_diagnostics: return [] @@ -1114,6 +1162,7 @@ def _candidate_actions( top_bottleneck, bottleneck_hypotheses, tested_signatures, + blocked_candidates, ) ) candidates.extend( @@ -1125,6 +1174,7 @@ def _candidate_actions( bottleneck_hypotheses, recent_diagnostics, tested_signatures, + blocked_candidates, ) ) return candidates @@ -1152,6 +1202,7 @@ def _topology_candidate_actions( top_bottleneck: str, bottleneck_hypotheses: list[dict[str, Any]], tested_signatures: set[str], + blocked_candidates: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not ({"tensor-parallel-size", "data-parallel-size"} & set(study.engine.tunable_flags)): return [] @@ -1191,6 +1242,15 @@ def _topology_candidate_actions( patch = _topology_patch(study, point) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature in tested_signatures: + blocked_candidates.append( + _blocked_candidate( + action_id=_topology_action_id(current_tp, current_dp, point), + knob_family="topology", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) continue score, factors = _score_topology_candidate( top_bottleneck, @@ -1210,6 +1270,17 @@ def _topology_candidate_actions( score = max(score, 0.74) factors["bad_start_topology_bracket"] = 0.74 if score <= 0: + blocked_candidates.append( + _blocked_candidate( + action_id=_topology_action_id(current_tp, current_dp, point), + knob_family="topology", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_non_positive_candidate_score", + effective_config_signature=signature, + score=round(score, 4), + score_factors=factors, + ) + ) continue action_id = _topology_action_id(current_tp, current_dp, point) actions.append( @@ -1244,6 +1315,7 @@ def _runtime_candidate_actions( bottleneck_hypotheses: list[dict[str, Any]], recent_diagnostics: list[dict[str, Any]], tested_signatures: set[str], + blocked_candidates: list[dict[str, Any]], ) -> list[dict[str, Any]]: tunable = set(study.engine.tunable_flags) anchor_flags = _effective_flags_for_item(study, anchor) @@ -1285,6 +1357,15 @@ def _runtime_candidate_actions( patch = {**runtime_base_patch, "max-num-batched-tokens": target} signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature in tested_signatures: + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="max-num-batched-tokens", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) continue relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14 actions.append( @@ -1341,6 +1422,15 @@ def _runtime_candidate_actions( patch = {**runtime_base_patch, "max-num-seqs": target} signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature in tested_signatures: + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="max-num-seqs", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) continue if top_bottleneck in {"decode_tpot", "admission_or_queueing"}: relief = 0.25 @@ -1415,6 +1505,16 @@ def _runtime_candidate_actions( ], ) ) + else: + blocked_candidates.append( + _blocked_candidate( + action_id="raise_mbt_and_max_num_seqs", + knob_family="prefill-runtime-interaction", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill": current = bool(anchor_flags.get("enable-chunked-prefill", False)) @@ -1438,6 +1538,16 @@ def _runtime_candidate_actions( ], ) ) + else: + blocked_candidates.append( + _blocked_candidate( + action_id="enable_chunked_prefill", + knob_family="enable-chunked-prefill", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) if ( "gpu-memory-utilization" in tunable @@ -1469,6 +1579,16 @@ def _runtime_candidate_actions( ], ) ) + else: + blocked_candidates.append( + _blocked_candidate( + action_id="raise_gpu_memory_utilization", + knob_family="gpu-memory-utilization", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) return actions @@ -1540,6 +1660,98 @@ def _runtime_action( } +def _annotate_candidate_signatures( + study: StudySpec, + candidates: list[dict[str, Any]], +) -> None: + for candidate in candidates: + if not isinstance(candidate, dict): + continue + patch = candidate.get("config_patch") + if not isinstance(patch, dict): + patch = {"env_patch": {}, "flag_patch": {}} + signature = _effective_config_signature(study, patch) + fingerprint = _effective_config_fingerprint(signature) + candidate["status"] = "eligible" + candidate["candidate_id"] = _candidate_id(candidate.get("action_id"), fingerprint) + candidate["effective_config_fingerprint"] = fingerprint + + +def _blocked_candidate( + *, + action_id: str, + knob_family: str, + config_patch: dict[str, Any], + blocked_reason: str, + effective_config_signature: str, + score: float | None = None, + score_factors: dict[str, Any] | None = None, +) -> dict[str, Any]: + fingerprint = _effective_config_fingerprint(effective_config_signature) + item: dict[str, Any] = { + "candidate_id": _candidate_id(action_id, fingerprint), + "action_id": action_id, + "knob_family": knob_family, + "status": "blocked", + "blocked_reason": blocked_reason, + "blocked_reasons": [{"code": blocked_reason}], + "config_patch": _normalized_config_patch(config_patch), + "effective_config_fingerprint": fingerprint, + } + if score is not None: + item["score"] = score + if score_factors is not None: + item["score_factors"] = score_factors + return item + + +def _candidate_id(action_id: Any, fingerprint: str) -> str: + action = str(action_id or "candidate").strip() or "candidate" + return f"{action}:{fingerprint[:12]}" + + +def _effective_config_fingerprint(effective_config_signature: str) -> str: + return hashlib.sha256(effective_config_signature.encode("utf-8")).hexdigest() + + +def _blocked_reason_summary(blocked_candidates: list[dict[str, Any]]) -> dict[str, int]: + summary: dict[str, int] = {} + for item in blocked_candidates: + reason = str(item.get("blocked_reason") or "unknown") + summary[reason] = summary.get(reason, 0) + 1 + return summary + + +def _candidate_set_hash( + eligible_candidates: list[dict[str, Any]], + blocked_candidates: list[dict[str, Any]], +) -> str: + records: list[dict[str, Any]] = [] + for status, candidates in ( + ("eligible", eligible_candidates), + ("blocked", blocked_candidates), + ): + for item in candidates: + records.append( + { + "status": status, + "candidate_id": item.get("candidate_id"), + "action_id": item.get("action_id"), + "knob_family": item.get("knob_family"), + "effective_config_fingerprint": item.get("effective_config_fingerprint"), + "blocked_reason": item.get("blocked_reason"), + "score": item.get("score"), + } + ) + payload = json.dumps( + records, + ensure_ascii=False, + sort_keys=True, + separators=(",", ":"), + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + def _anchor_has_topology_patch(anchor: dict[str, Any]) -> bool: patch = anchor.get("config_patch") if not isinstance(patch, dict): diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 2f8a330..49e0ac2 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import hashlib import contextlib import io import math @@ -1803,6 +1804,33 @@ class CoreFlowTests(unittest.TestCase): window_summary={"prompt_tokens_p95": 2048}, state=state, ) + candidate_set = context["experiment_plan"]["candidate_set"] + self.assertEqual(candidate_set["version"], "candidate-set-v1") + self.assertIn("candidate_set_hash", candidate_set) + self.assertGreaterEqual( + candidate_set["blocked_reason_summary"].get( + "blocked_noop_or_repeat_effective_full_config", + 0, + ), + 1, + ) + baseline_fingerprint = hashlib.sha256( + _effective_config_signature( + study, + {"env_patch": {}, "flag_patch": {}}, + ).encode("utf-8") + ).hexdigest() + blocked_baseline_equivalent = [ + item + for item in candidate_set["blocked_candidates"] + if item.get("effective_config_fingerprint") == baseline_fingerprint + ] + self.assertTrue(blocked_baseline_equivalent) + self.assertEqual( + blocked_baseline_equivalent[0]["blocked_reason"], + "blocked_noop_or_repeat_effective_full_config", + ) + self.assertIn("effective_config_fingerprint", blocked_baseline_equivalent[0]) actions = context["experiment_plan"]["candidate_actions"] self.assertFalse( any(