Veto repeated materialized configs

This commit is contained in:
2026-06-26 22:15:47 +08:00
parent 825d3e03e9
commit 5080b50315
6 changed files with 383 additions and 66 deletions

View File

@@ -109,6 +109,9 @@ declarative intervention grammar + coverage-relative validator。
- normalized full-config signatureno-repeat 不能只看 patch signaturebase config 与
no-op patch 必须被识别为同一 full config`48911b6` 已实现并在 dash1 bad-start
validation 中通过;
- materialized effective signatureruntime-only proposal 必须先按真实执行路径继承
incumbent topology再做 no-repeat已加入 shared signature/canonicalization并在
CLI 进入 trial 前 hard-veto 重复 LLM/manual/harness proposal
- Failure invalidation 有保守 region predicate 和 retry/unblock 条件;
- grammar/policy/capability 都有 version 和 anti-overfitting static checks
- LLM/BO 只能选择合法 candidate不能绕过 validator。

View File

@@ -7,6 +7,10 @@ from dataclasses import replace
from pathlib import Path
from .compare import run_compare
from .config_signature import (
materialized_effective_config_signature,
tested_config_signature_index,
)
from .harness import (
build_harness_context,
build_harness_guided_proposal,
@@ -24,6 +28,7 @@ from .spec import (
Proposal,
SpecError,
StudySpec,
StudyState,
load_structured_file,
load_study_spec,
to_jsonable,
@@ -37,6 +42,32 @@ def _is_empty_config_patch(proposal: Proposal) -> bool:
return not proposal.config_patch.env_patch and not proposal.config_patch.flag_patch
def _reject_repeated_effective_config(
*,
study: StudySpec,
state: StudyState,
proposal: Proposal,
proposal_name: str,
) -> None:
if proposal.should_stop:
return
tested = tested_config_signature_index(study, state)
signature = materialized_effective_config_signature(
study=study,
state=state,
proposal=proposal,
)
matching_trials = tested.get(signature)
if not matching_trials:
return
raise SpecError(
f"Proposal {proposal_name} repeats an already tested effective full config "
"after materialization. "
f"matching_trial_ids={matching_trials}. "
"Choose a different eligible candidate or return should_stop=true."
)
def _latency_percentiles(summary: object, metric: str) -> dict[str, float]:
if not isinstance(summary, dict):
return {}
@@ -334,6 +365,12 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
raw_proposal_path = store.study_root(study.study_id) / "proposals" / f"{proposal_name}.raw.txt"
raw_proposal_path.write_text(proposal_text, encoding="utf-8")
proposal = parse_proposal_text(proposal_text, study)
_reject_repeated_effective_config(
study=study,
state=state,
proposal=proposal,
proposal_name=proposal_name,
)
store.write_proposal(study.study_id, proposal_name, proposal)
if proposal.should_stop:
is_harness_stop = proposal_name.startswith("harness-stop-")

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
import json
from dataclasses import replace
from typing import Any
from .spec import ConfigPatch, Proposal, StudySpec, StudyState
TOPOLOGY_FLAG_KEYS = {
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"enable-expert-parallel",
}
def normalized_config_patch(config_patch: Any) -> dict[str, dict[str, Any]]:
if isinstance(config_patch, ConfigPatch):
env_patch: Any = config_patch.env_patch
flag_patch: Any = config_patch.flag_patch
elif isinstance(config_patch, dict):
env_patch = config_patch.get("env_patch")
flag_patch = config_patch.get("flag_patch")
else:
env_patch = {}
flag_patch = {}
return {
"env_patch": _canonical_env_map(env_patch if isinstance(env_patch, dict) else {}),
"flag_patch": _canonical_flag_map(flag_patch if isinstance(flag_patch, dict) else {}),
}
def effective_config_signature(study: StudySpec, config_patch: Any) -> str:
patch = normalized_config_patch(config_patch)
payload = {
"env": _canonical_env_map({**study.engine.base_envs, **patch["env_patch"]}),
"flags": _canonical_flag_map({**study.engine.base_flags, **patch["flag_patch"]}),
}
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def materialize_proposal_for_execution(
*,
study: StudySpec,
state: StudyState,
proposal: Proposal,
) -> Proposal:
flag_patch = dict(proposal.config_patch.flag_patch)
env_patch = dict(proposal.config_patch.env_patch)
if not flag_patch and not env_patch:
return proposal
if TOPOLOGY_FLAG_KEYS.intersection(flag_patch):
return proposal
if not state.best_trial_id:
return proposal
incumbent = next(
(trial for trial in state.trials if trial.trial_id == state.best_trial_id),
None,
)
if incumbent is None or not isinstance(incumbent.config_patch, dict):
return proposal
incumbent_patch = incumbent.config_patch.get("flag_patch")
if not isinstance(incumbent_patch, dict):
return proposal
inherited_topology = {
key: value
for key, value in incumbent_patch.items()
if key in TOPOLOGY_FLAG_KEYS and study.engine.base_flags.get(key) != value
}
if not inherited_topology:
return proposal
merged_flag_patch = dict(inherited_topology)
merged_flag_patch.update(flag_patch)
return replace(
proposal,
config_patch=ConfigPatch(
env_patch=env_patch,
flag_patch=merged_flag_patch,
),
)
def materialized_effective_config_signature(
*,
study: StudySpec,
state: StudyState,
proposal: Proposal,
) -> str:
materialized = materialize_proposal_for_execution(
study=study,
state=state,
proposal=proposal,
)
return effective_config_signature(study, materialized.config_patch)
def tested_config_signature_index(study: StudySpec, state: StudyState) -> dict[str, list[str]]:
index: dict[str, list[str]] = {}
for trial in state.trials:
signature = effective_config_signature(study, trial.config_patch)
index.setdefault(signature, []).append(trial.trial_id)
return index
def _canonical_env_map(payload: dict[str, Any]) -> dict[str, str]:
return {str(key): str(value) for key, value in payload.items()}
def _canonical_flag_map(payload: dict[str, Any]) -> dict[str, Any]:
return {str(key): _canonical_flag_value(value) for key, value in payload.items()}
def _canonical_flag_value(value: Any) -> Any:
if value is None or isinstance(value, bool):
return value
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value) if value.is_integer() else value
if isinstance(value, str):
return _canonical_string_flag_value(value)
if isinstance(value, list):
return [_canonical_flag_value(item) for item in value]
if isinstance(value, tuple):
return [_canonical_flag_value(item) for item in value]
if isinstance(value, dict):
return {str(key): _canonical_flag_value(item) for key, item in value.items()}
return str(value)
def _canonical_string_flag_value(value: str) -> Any:
stripped = value.strip()
if not stripped:
return stripped
try:
parsed_int = int(stripped, 10)
except ValueError:
pass
else:
return parsed_int
try:
parsed_float = float(stripped)
except ValueError:
return stripped
if parsed_float.is_integer():
return int(parsed_float)
return parsed_float

View File

@@ -5,6 +5,10 @@ import hashlib
from pathlib import Path
from typing import Any
from .config_signature import (
effective_config_signature as _shared_effective_config_signature,
normalized_config_patch as _shared_normalized_config_patch,
)
from .lca import EPSILON, WorkloadProfile
from .spec import ConfigPatch, Proposal, StudySpec, StudyState, TrialSummary
@@ -2203,7 +2207,6 @@ def _state_tested_signatures(study: StudySpec, state: StudyState) -> set[str]:
return {
_effective_config_signature(study, trial.config_patch)
for trial in state.trials
if isinstance(trial.config_patch, dict)
}
@@ -2872,20 +2875,8 @@ def _config_signature(config_patch: Any) -> str:
def _effective_config_signature(study: StudySpec, config_patch: Any) -> str:
patch = _normalized_config_patch(config_patch)
payload = {
"env": {**study.engine.base_envs, **patch["env_patch"]},
"flags": {**study.engine.base_flags, **patch["flag_patch"]},
}
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return _shared_effective_config_signature(study, config_patch)
def _normalized_config_patch(config_patch: Any) -> dict[str, dict[str, Any]]:
if not isinstance(config_patch, dict):
config_patch = {}
env_patch = config_patch.get("env_patch")
flag_patch = config_patch.get("flag_patch")
return {
"env_patch": env_patch if isinstance(env_patch, dict) else {},
"flag_patch": flag_patch if isinstance(flag_patch, dict) else {},
}
return _shared_normalized_config_patch(config_patch)

View File

@@ -5,8 +5,8 @@ from dataclasses import replace
from pathlib import Path
from typing import Any
from .config_signature import materialize_proposal_for_execution
from .spec import (
ConfigPatch,
Proposal,
SamplingSearchSpec,
StudySpec,
@@ -17,14 +17,6 @@ from .spec import (
)
_TOPOLOGY_FLAG_KEYS = {
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"enable-expert-parallel",
}
class StudyStore:
def __init__(self, root: Path | None = None):
base = root or Path(".aituner") / "studies"
@@ -85,7 +77,7 @@ class StudyStore:
state: StudyState,
proposal: Proposal,
) -> tuple[TrialSpec, StudyState]:
proposal = _inherit_incumbent_topology_for_runtime_patch(
proposal = materialize_proposal_for_execution(
study=study,
state=state,
proposal=proposal,
@@ -268,47 +260,6 @@ def _parallel_size_for_proposal(*, study: StudySpec, proposal: Proposal) -> int:
return _parallel_size_for_config(study=study, flag_patch=proposal.config_patch.flag_patch)
def _inherit_incumbent_topology_for_runtime_patch(
*,
study: StudySpec,
state: StudyState,
proposal: Proposal,
) -> Proposal:
flag_patch = dict(proposal.config_patch.flag_patch)
env_patch = dict(proposal.config_patch.env_patch)
if not flag_patch and not env_patch:
return proposal
if _TOPOLOGY_FLAG_KEYS.intersection(flag_patch):
return proposal
if not state.best_trial_id:
return proposal
incumbent = next(
(trial for trial in state.trials if trial.trial_id == state.best_trial_id),
None,
)
if incumbent is None or not isinstance(incumbent.config_patch, dict):
return proposal
incumbent_patch = incumbent.config_patch.get("flag_patch")
if not isinstance(incumbent_patch, dict):
return proposal
inherited_topology = {
key: value
for key, value in incumbent_patch.items()
if key in _TOPOLOGY_FLAG_KEYS and study.engine.base_flags.get(key) != value
}
if not inherited_topology:
return proposal
merged_flag_patch = dict(inherited_topology)
merged_flag_patch.update(flag_patch)
return replace(
proposal,
config_patch=ConfigPatch(
env_patch=env_patch,
flag_patch=merged_flag_patch,
),
)
def _parallel_size_for_trial_id(*, study: StudySpec, study_root: Path, trial_id: str) -> int | None:
trial_spec_path = study_root / "trials" / trial_id / "trial_spec.json"
if not trial_spec_path.exists():

View File

@@ -15,6 +15,7 @@ from unittest import mock
from aituner.cli import main as cli_main
from aituner.compare import _aggregate_summary, load_compare_spec, run_compare
from aituner.config_signature import materialized_effective_config_signature
from aituner.engine import build_launch_recipe
from aituner.http_client import (
HttpClientError,
@@ -382,14 +383,101 @@ class CoreFlowTests(unittest.TestCase):
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": 8}},
)
noop_tp_string = _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": "8"}},
)
changed_tp = _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
)
self.assertEqual(baseline, noop_tp)
self.assertEqual(baseline, noop_tp_string)
self.assertNotEqual(baseline, changed_tp)
def test_materialized_signature_inherits_incumbent_topology(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"max-num-seqs": 64,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-seqs",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_tp_dp_products": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_parallel_size=8,
trials=[
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=8,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"data-parallel-size": 4,
"max-num-seqs": 160,
},
},
)
],
)
runtime_only = Proposal.from_dict(
{
"observation": "Try the same runtime cap.",
"diagnosis": "This should materialize on incumbent topology.",
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 160}},
"expected_effects": ["no-op after topology inheritance"],
}
)
explicit = Proposal.from_dict(
{
"observation": "Explicit duplicate.",
"diagnosis": "Same effective execution config.",
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": "2",
"data-parallel-size": "4",
"max-num-seqs": "160",
},
},
"expected_effects": ["same config"],
}
)
self.assertEqual(
materialized_effective_config_signature(
study=study,
state=state,
proposal=runtime_only,
),
materialized_effective_config_signature(
study=study,
state=state,
proposal=explicit,
),
)
def test_lca_workload_profile_uses_standard_10d_features(self) -> None:
window = WindowRecord(
window_id="w1",
@@ -6019,6 +6107,105 @@ class CoreFlowTests(unittest.TestCase):
self.assertTrue(honored)
self.assertEqual(honored[-1]["stop_authorized_by"], "llm_after_veto_budget")
def test_cli_tune_rejects_repeated_materialized_llm_config(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"max-num-seqs": 64,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-seqs",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_tp_dp_products": [1, 2, 4, 8],
},
},
)
spec = json.loads(study_path.read_text(encoding="utf-8"))
spec["llm"]["use_harness"] = False
spec["llm"]["endpoint"] = {
"provider": "custom",
"base_url": "http://localhost:9/v1",
"model": "test-model",
"api_key_env": "AITUNER_TEST_KEY",
}
study_path.write_text(json.dumps(spec), encoding="utf-8")
study = load_study_spec(study_path)
store_root = tmp_path / "store"
store = StudyStore(store_root)
store.init_study(spec_path=study_path, study=study)
store.save_state(
StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_parallel_size=8,
best_sampling_u=0.125,
best_request_rate=3.0,
best_request_rate_per_gpu=0.375,
next_trial_index=3,
trials=[
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=8,
best_sampling_u=0.125,
best_request_rate=3.0,
best_request_rate_per_gpu=0.375,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"data-parallel-size": 4,
"max-num-seqs": 160,
},
},
)
],
)
)
repeated_runtime_patch = json.dumps(
{
"observation": "Try the same runtime setting.",
"diagnosis": "This is duplicate after topology inheritance.",
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 160}},
"expected_effects": ["should be vetoed"],
"why_not_previous_failures": "",
"should_stop": False,
}
)
stderr = io.StringIO()
with mock.patch("aituner.cli.run_trial") as run_trial_mock, mock.patch(
"aituner.cli.call_llm_for_proposal", return_value=repeated_runtime_patch
), contextlib.redirect_stderr(stderr):
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--skip-baseline",
"--max-trials",
"3",
]
)
self.assertEqual(exit_code, 2)
run_trial_mock.assert_not_called()
self.assertIn("repeats an already tested effective full config", stderr.getvalue())
self.assertIn("trial-0002", stderr.getvalue())
def test_cli_tune_uses_harness_stop_before_llm(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)