Add harness candidate set audit

This commit is contained in:
2026-06-26 22:02:09 +08:00
parent 42f75553a6
commit 825d3e03e9
3 changed files with 245 additions and 1 deletions

View File

@@ -95,6 +95,10 @@ declarative intervention grammar + coverage-relative validator。
最低验收: 最低验收:
- CandidateSet 完整枚举并持久化 snapshot - CandidateSet 完整枚举并持久化 snapshot
- CandidateSet v1 先限定为当前 harness generator 实际构造出的 concrete candidates
不 claim 全 Cartesian knob space 枚举;`candidate_set_hash`、eligible/blocked
records 和 blocked reason summary 已在 harness context 中实现,独立 sidecar JSON
persistence 是下一片;
- `harness_priority` 与 backend ranking 分离; - `harness_priority` 与 backend ranking 分离;
- CoverageUnit 结构化stop 不能只依赖 exact signature - CoverageUnit 结构化stop 不能只依赖 exact signature
- `search_high_saturated_by_incumbent` 不能绕过 CandidateSet coverage`req/s/GPU` - `search_high_saturated_by_incumbent` 不能绕过 CandidateSet coverage`req/s/GPU`

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
import hashlib
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -1061,7 +1062,7 @@ def _experiment_plan(
for item in recent_diagnostics for item in recent_diagnostics
} }
tested_signatures.update(_state_tested_signatures(study, state)) tested_signatures.update(_state_tested_signatures(study, state))
candidates = _candidate_actions( candidate_set = _candidate_action_set(
study, study,
window_summary, window_summary,
state, state,
@@ -1070,12 +1071,18 @@ def _experiment_plan(
bottleneck_hypotheses, bottleneck_hypotheses,
tested_signatures=tested_signatures, tested_signatures=tested_signatures,
) )
candidates = list(candidate_set["eligible_candidates"])
candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True) candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True)
next_action = candidates[0] if candidates else None next_action = candidates[0] if candidates else None
return { return {
"planner_version": "profile-driven-v1", "planner_version": "profile-driven-v1",
"bottleneck_hypotheses": bottleneck_hypotheses, "bottleneck_hypotheses": bottleneck_hypotheses,
"candidate_actions": candidates[:8], "candidate_actions": candidates[:8],
"candidate_set": {
**candidate_set,
"eligible_candidates": candidates[:8],
"blocked_candidates": candidate_set["blocked_candidates"][:16],
},
"next_action": next_action, "next_action": next_action,
"stop_ready": next_action is None or _as_float(next_action.get("score")) < 0.35, "stop_ready": next_action is None or _as_float(next_action.get("score")) < 0.35,
"stop_rationale": ( "stop_rationale": (
@@ -1086,6 +1093,46 @@ def _experiment_plan(
} }
def _candidate_action_set(
study: StudySpec,
window_summary: dict[str, Any],
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
trial_profiles: list[dict[str, Any]],
bottleneck_hypotheses: list[dict[str, Any]],
*,
tested_signatures: set[str],
) -> dict[str, Any]:
blocked_candidates: list[dict[str, Any]] = []
eligible_candidates = _candidate_actions(
study,
window_summary,
state,
recent_diagnostics,
trial_profiles,
bottleneck_hypotheses,
tested_signatures=tested_signatures,
blocked_candidates=blocked_candidates,
)
_annotate_candidate_signatures(study, eligible_candidates)
eligible_candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True)
blocked_summary = _blocked_reason_summary(blocked_candidates)
candidate_set_hash = _candidate_set_hash(eligible_candidates, blocked_candidates)
return {
"version": "candidate-set-v1",
"signature_semantics": (
"effective_config_fingerprint = sha256(normalized effective full-config JSON)"
),
"candidate_set_hash": candidate_set_hash,
"tested_signature_count": len(tested_signatures),
"eligible_count": len(eligible_candidates),
"blocked_count": len(blocked_candidates),
"blocked_reason_summary": blocked_summary,
"eligible_candidates": eligible_candidates,
"blocked_candidates": blocked_candidates,
}
def _candidate_actions( def _candidate_actions(
study: StudySpec, study: StudySpec,
window_summary: dict[str, Any], window_summary: dict[str, Any],
@@ -1095,6 +1142,7 @@ def _candidate_actions(
bottleneck_hypotheses: list[dict[str, Any]], bottleneck_hypotheses: list[dict[str, Any]],
*, *,
tested_signatures: set[str], tested_signatures: set[str],
blocked_candidates: list[dict[str, Any]],
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
if not recent_diagnostics: if not recent_diagnostics:
return [] return []
@@ -1114,6 +1162,7 @@ def _candidate_actions(
top_bottleneck, top_bottleneck,
bottleneck_hypotheses, bottleneck_hypotheses,
tested_signatures, tested_signatures,
blocked_candidates,
) )
) )
candidates.extend( candidates.extend(
@@ -1125,6 +1174,7 @@ def _candidate_actions(
bottleneck_hypotheses, bottleneck_hypotheses,
recent_diagnostics, recent_diagnostics,
tested_signatures, tested_signatures,
blocked_candidates,
) )
) )
return candidates return candidates
@@ -1152,6 +1202,7 @@ def _topology_candidate_actions(
top_bottleneck: str, top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]], bottleneck_hypotheses: list[dict[str, Any]],
tested_signatures: set[str], tested_signatures: set[str],
blocked_candidates: list[dict[str, Any]],
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
if not ({"tensor-parallel-size", "data-parallel-size"} & set(study.engine.tunable_flags)): if not ({"tensor-parallel-size", "data-parallel-size"} & set(study.engine.tunable_flags)):
return [] return []
@@ -1191,6 +1242,15 @@ def _topology_candidate_actions(
patch = _topology_patch(study, point) patch = _topology_patch(study, point)
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures: if signature in tested_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=_topology_action_id(current_tp, current_dp, point),
knob_family="topology",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
continue continue
score, factors = _score_topology_candidate( score, factors = _score_topology_candidate(
top_bottleneck, top_bottleneck,
@@ -1210,6 +1270,17 @@ def _topology_candidate_actions(
score = max(score, 0.74) score = max(score, 0.74)
factors["bad_start_topology_bracket"] = 0.74 factors["bad_start_topology_bracket"] = 0.74
if score <= 0: if score <= 0:
blocked_candidates.append(
_blocked_candidate(
action_id=_topology_action_id(current_tp, current_dp, point),
knob_family="topology",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_non_positive_candidate_score",
effective_config_signature=signature,
score=round(score, 4),
score_factors=factors,
)
)
continue continue
action_id = _topology_action_id(current_tp, current_dp, point) action_id = _topology_action_id(current_tp, current_dp, point)
actions.append( actions.append(
@@ -1244,6 +1315,7 @@ def _runtime_candidate_actions(
bottleneck_hypotheses: list[dict[str, Any]], bottleneck_hypotheses: list[dict[str, Any]],
recent_diagnostics: list[dict[str, Any]], recent_diagnostics: list[dict[str, Any]],
tested_signatures: set[str], tested_signatures: set[str],
blocked_candidates: list[dict[str, Any]],
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
tunable = set(study.engine.tunable_flags) tunable = set(study.engine.tunable_flags)
anchor_flags = _effective_flags_for_item(study, anchor) anchor_flags = _effective_flags_for_item(study, anchor)
@@ -1285,6 +1357,15 @@ def _runtime_candidate_actions(
patch = {**runtime_base_patch, "max-num-batched-tokens": target} patch = {**runtime_base_patch, "max-num-batched-tokens": target}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures: if signature in tested_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="max-num-batched-tokens",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
continue continue
relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14 relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14
actions.append( actions.append(
@@ -1341,6 +1422,15 @@ def _runtime_candidate_actions(
patch = {**runtime_base_patch, "max-num-seqs": target} patch = {**runtime_base_patch, "max-num-seqs": target}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures: if signature in tested_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="max-num-seqs",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
continue continue
if top_bottleneck in {"decode_tpot", "admission_or_queueing"}: if top_bottleneck in {"decode_tpot", "admission_or_queueing"}:
relief = 0.25 relief = 0.25
@@ -1415,6 +1505,16 @@ def _runtime_candidate_actions(
], ],
) )
) )
else:
blocked_candidates.append(
_blocked_candidate(
action_id="raise_mbt_and_max_num_seqs",
knob_family="prefill-runtime-interaction",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill": if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill":
current = bool(anchor_flags.get("enable-chunked-prefill", False)) current = bool(anchor_flags.get("enable-chunked-prefill", False))
@@ -1438,6 +1538,16 @@ def _runtime_candidate_actions(
], ],
) )
) )
else:
blocked_candidates.append(
_blocked_candidate(
action_id="enable_chunked_prefill",
knob_family="enable-chunked-prefill",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
if ( if (
"gpu-memory-utilization" in tunable "gpu-memory-utilization" in tunable
@@ -1469,6 +1579,16 @@ def _runtime_candidate_actions(
], ],
) )
) )
else:
blocked_candidates.append(
_blocked_candidate(
action_id="raise_gpu_memory_utilization",
knob_family="gpu-memory-utilization",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
return actions return actions
@@ -1540,6 +1660,98 @@ def _runtime_action(
} }
def _annotate_candidate_signatures(
study: StudySpec,
candidates: list[dict[str, Any]],
) -> None:
for candidate in candidates:
if not isinstance(candidate, dict):
continue
patch = candidate.get("config_patch")
if not isinstance(patch, dict):
patch = {"env_patch": {}, "flag_patch": {}}
signature = _effective_config_signature(study, patch)
fingerprint = _effective_config_fingerprint(signature)
candidate["status"] = "eligible"
candidate["candidate_id"] = _candidate_id(candidate.get("action_id"), fingerprint)
candidate["effective_config_fingerprint"] = fingerprint
def _blocked_candidate(
*,
action_id: str,
knob_family: str,
config_patch: dict[str, Any],
blocked_reason: str,
effective_config_signature: str,
score: float | None = None,
score_factors: dict[str, Any] | None = None,
) -> dict[str, Any]:
fingerprint = _effective_config_fingerprint(effective_config_signature)
item: dict[str, Any] = {
"candidate_id": _candidate_id(action_id, fingerprint),
"action_id": action_id,
"knob_family": knob_family,
"status": "blocked",
"blocked_reason": blocked_reason,
"blocked_reasons": [{"code": blocked_reason}],
"config_patch": _normalized_config_patch(config_patch),
"effective_config_fingerprint": fingerprint,
}
if score is not None:
item["score"] = score
if score_factors is not None:
item["score_factors"] = score_factors
return item
def _candidate_id(action_id: Any, fingerprint: str) -> str:
action = str(action_id or "candidate").strip() or "candidate"
return f"{action}:{fingerprint[:12]}"
def _effective_config_fingerprint(effective_config_signature: str) -> str:
return hashlib.sha256(effective_config_signature.encode("utf-8")).hexdigest()
def _blocked_reason_summary(blocked_candidates: list[dict[str, Any]]) -> dict[str, int]:
summary: dict[str, int] = {}
for item in blocked_candidates:
reason = str(item.get("blocked_reason") or "unknown")
summary[reason] = summary.get(reason, 0) + 1
return summary
def _candidate_set_hash(
eligible_candidates: list[dict[str, Any]],
blocked_candidates: list[dict[str, Any]],
) -> str:
records: list[dict[str, Any]] = []
for status, candidates in (
("eligible", eligible_candidates),
("blocked", blocked_candidates),
):
for item in candidates:
records.append(
{
"status": status,
"candidate_id": item.get("candidate_id"),
"action_id": item.get("action_id"),
"knob_family": item.get("knob_family"),
"effective_config_fingerprint": item.get("effective_config_fingerprint"),
"blocked_reason": item.get("blocked_reason"),
"score": item.get("score"),
}
)
payload = json.dumps(
records,
ensure_ascii=False,
sort_keys=True,
separators=(",", ":"),
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _anchor_has_topology_patch(anchor: dict[str, Any]) -> bool: def _anchor_has_topology_patch(anchor: dict[str, Any]) -> bool:
patch = anchor.get("config_patch") patch = anchor.get("config_patch")
if not isinstance(patch, dict): if not isinstance(patch, dict):

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
import hashlib
import contextlib import contextlib
import io import io
import math import math
@@ -1803,6 +1804,33 @@ class CoreFlowTests(unittest.TestCase):
window_summary={"prompt_tokens_p95": 2048}, window_summary={"prompt_tokens_p95": 2048},
state=state, state=state,
) )
candidate_set = context["experiment_plan"]["candidate_set"]
self.assertEqual(candidate_set["version"], "candidate-set-v1")
self.assertIn("candidate_set_hash", candidate_set)
self.assertGreaterEqual(
candidate_set["blocked_reason_summary"].get(
"blocked_noop_or_repeat_effective_full_config",
0,
),
1,
)
baseline_fingerprint = hashlib.sha256(
_effective_config_signature(
study,
{"env_patch": {}, "flag_patch": {}},
).encode("utf-8")
).hexdigest()
blocked_baseline_equivalent = [
item
for item in candidate_set["blocked_candidates"]
if item.get("effective_config_fingerprint") == baseline_fingerprint
]
self.assertTrue(blocked_baseline_equivalent)
self.assertEqual(
blocked_baseline_equivalent[0]["blocked_reason"],
"blocked_noop_or_repeat_effective_full_config",
)
self.assertIn("effective_config_fingerprint", blocked_baseline_equivalent[0])
actions = context["experiment_plan"]["candidate_actions"] actions = context["experiment_plan"]["candidate_actions"]
self.assertFalse( self.assertFalse(
any( any(