diff --git a/docs/aituner-roadmap.md b/docs/aituner-roadmap.md index 11071e2..c625939 100644 --- a/docs/aituner-roadmap.md +++ b/docs/aituner-roadmap.md @@ -109,6 +109,9 @@ declarative intervention grammar + coverage-relative validator。 - normalized full-config signature:no-repeat 不能只看 patch signature;base config 与 no-op patch 必须被识别为同一 full config;`48911b6` 已实现并在 dash1 bad-start validation 中通过; +- materialized effective signature:runtime-only proposal 必须先按真实执行路径继承 + incumbent topology,再做 no-repeat;已加入 shared signature/canonicalization,并在 + CLI 进入 trial 前 hard-veto 重复 LLM/manual/harness proposal; - Failure invalidation 有保守 region predicate 和 retry/unblock 条件; - grammar/policy/capability 都有 version 和 anti-overfitting static checks; - LLM/BO 只能选择合法 candidate,不能绕过 validator。 diff --git a/src/aituner/cli.py b/src/aituner/cli.py index 6dec945..8874441 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -7,6 +7,10 @@ from dataclasses import replace from pathlib import Path from .compare import run_compare +from .config_signature import ( + materialized_effective_config_signature, + tested_config_signature_index, +) from .harness import ( build_harness_context, build_harness_guided_proposal, @@ -24,6 +28,7 @@ from .spec import ( Proposal, SpecError, StudySpec, + StudyState, load_structured_file, load_study_spec, to_jsonable, @@ -37,6 +42,32 @@ def _is_empty_config_patch(proposal: Proposal) -> bool: return not proposal.config_patch.env_patch and not proposal.config_patch.flag_patch +def _reject_repeated_effective_config( + *, + study: StudySpec, + state: StudyState, + proposal: Proposal, + proposal_name: str, +) -> None: + if proposal.should_stop: + return + tested = tested_config_signature_index(study, state) + signature = materialized_effective_config_signature( + study=study, + state=state, + proposal=proposal, + ) + matching_trials = tested.get(signature) + if not matching_trials: + return + raise SpecError( + f"Proposal {proposal_name} repeats an already tested effective full config " + "after materialization. " + f"matching_trial_ids={matching_trials}. " + "Choose a different eligible candidate or return should_stop=true." + ) + + def _latency_percentiles(summary: object, metric: str) -> dict[str, float]: if not isinstance(summary, dict): return {} @@ -334,6 +365,12 @@ def cmd_study_tune(args: argparse.Namespace) -> int: raw_proposal_path = store.study_root(study.study_id) / "proposals" / f"{proposal_name}.raw.txt" raw_proposal_path.write_text(proposal_text, encoding="utf-8") proposal = parse_proposal_text(proposal_text, study) + _reject_repeated_effective_config( + study=study, + state=state, + proposal=proposal, + proposal_name=proposal_name, + ) store.write_proposal(study.study_id, proposal_name, proposal) if proposal.should_stop: is_harness_stop = proposal_name.startswith("harness-stop-") diff --git a/src/aituner/config_signature.py b/src/aituner/config_signature.py new file mode 100644 index 0000000..2f8598f --- /dev/null +++ b/src/aituner/config_signature.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import json +from dataclasses import replace +from typing import Any + +from .spec import ConfigPatch, Proposal, StudySpec, StudyState + + +TOPOLOGY_FLAG_KEYS = { + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + "enable-expert-parallel", +} + + +def normalized_config_patch(config_patch: Any) -> dict[str, dict[str, Any]]: + if isinstance(config_patch, ConfigPatch): + env_patch: Any = config_patch.env_patch + flag_patch: Any = config_patch.flag_patch + elif isinstance(config_patch, dict): + env_patch = config_patch.get("env_patch") + flag_patch = config_patch.get("flag_patch") + else: + env_patch = {} + flag_patch = {} + return { + "env_patch": _canonical_env_map(env_patch if isinstance(env_patch, dict) else {}), + "flag_patch": _canonical_flag_map(flag_patch if isinstance(flag_patch, dict) else {}), + } + + +def effective_config_signature(study: StudySpec, config_patch: Any) -> str: + patch = normalized_config_patch(config_patch) + payload = { + "env": _canonical_env_map({**study.engine.base_envs, **patch["env_patch"]}), + "flags": _canonical_flag_map({**study.engine.base_flags, **patch["flag_patch"]}), + } + return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + + +def materialize_proposal_for_execution( + *, + study: StudySpec, + state: StudyState, + proposal: Proposal, +) -> Proposal: + flag_patch = dict(proposal.config_patch.flag_patch) + env_patch = dict(proposal.config_patch.env_patch) + if not flag_patch and not env_patch: + return proposal + if TOPOLOGY_FLAG_KEYS.intersection(flag_patch): + return proposal + if not state.best_trial_id: + return proposal + incumbent = next( + (trial for trial in state.trials if trial.trial_id == state.best_trial_id), + None, + ) + if incumbent is None or not isinstance(incumbent.config_patch, dict): + return proposal + incumbent_patch = incumbent.config_patch.get("flag_patch") + if not isinstance(incumbent_patch, dict): + return proposal + inherited_topology = { + key: value + for key, value in incumbent_patch.items() + if key in TOPOLOGY_FLAG_KEYS and study.engine.base_flags.get(key) != value + } + if not inherited_topology: + return proposal + merged_flag_patch = dict(inherited_topology) + merged_flag_patch.update(flag_patch) + return replace( + proposal, + config_patch=ConfigPatch( + env_patch=env_patch, + flag_patch=merged_flag_patch, + ), + ) + + +def materialized_effective_config_signature( + *, + study: StudySpec, + state: StudyState, + proposal: Proposal, +) -> str: + materialized = materialize_proposal_for_execution( + study=study, + state=state, + proposal=proposal, + ) + return effective_config_signature(study, materialized.config_patch) + + +def tested_config_signature_index(study: StudySpec, state: StudyState) -> dict[str, list[str]]: + index: dict[str, list[str]] = {} + for trial in state.trials: + signature = effective_config_signature(study, trial.config_patch) + index.setdefault(signature, []).append(trial.trial_id) + return index + + +def _canonical_env_map(payload: dict[str, Any]) -> dict[str, str]: + return {str(key): str(value) for key, value in payload.items()} + + +def _canonical_flag_map(payload: dict[str, Any]) -> dict[str, Any]: + return {str(key): _canonical_flag_value(value) for key, value in payload.items()} + + +def _canonical_flag_value(value: Any) -> Any: + if value is None or isinstance(value, bool): + return value + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) if value.is_integer() else value + if isinstance(value, str): + return _canonical_string_flag_value(value) + if isinstance(value, list): + return [_canonical_flag_value(item) for item in value] + if isinstance(value, tuple): + return [_canonical_flag_value(item) for item in value] + if isinstance(value, dict): + return {str(key): _canonical_flag_value(item) for key, item in value.items()} + return str(value) + + +def _canonical_string_flag_value(value: str) -> Any: + stripped = value.strip() + if not stripped: + return stripped + try: + parsed_int = int(stripped, 10) + except ValueError: + pass + else: + return parsed_int + try: + parsed_float = float(stripped) + except ValueError: + return stripped + if parsed_float.is_integer(): + return int(parsed_float) + return parsed_float diff --git a/src/aituner/harness.py b/src/aituner/harness.py index f175afa..4f7770a 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -5,6 +5,10 @@ import hashlib from pathlib import Path from typing import Any +from .config_signature import ( + effective_config_signature as _shared_effective_config_signature, + normalized_config_patch as _shared_normalized_config_patch, +) from .lca import EPSILON, WorkloadProfile from .spec import ConfigPatch, Proposal, StudySpec, StudyState, TrialSummary @@ -2203,7 +2207,6 @@ def _state_tested_signatures(study: StudySpec, state: StudyState) -> set[str]: return { _effective_config_signature(study, trial.config_patch) for trial in state.trials - if isinstance(trial.config_patch, dict) } @@ -2872,20 +2875,8 @@ def _config_signature(config_patch: Any) -> str: def _effective_config_signature(study: StudySpec, config_patch: Any) -> str: - patch = _normalized_config_patch(config_patch) - payload = { - "env": {**study.engine.base_envs, **patch["env_patch"]}, - "flags": {**study.engine.base_flags, **patch["flag_patch"]}, - } - return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + return _shared_effective_config_signature(study, config_patch) def _normalized_config_patch(config_patch: Any) -> dict[str, dict[str, Any]]: - if not isinstance(config_patch, dict): - config_patch = {} - env_patch = config_patch.get("env_patch") - flag_patch = config_patch.get("flag_patch") - return { - "env_patch": env_patch if isinstance(env_patch, dict) else {}, - "flag_patch": flag_patch if isinstance(flag_patch, dict) else {}, - } + return _shared_normalized_config_patch(config_patch) diff --git a/src/aituner/store.py b/src/aituner/store.py index 1c9283d..1c970b6 100644 --- a/src/aituner/store.py +++ b/src/aituner/store.py @@ -5,8 +5,8 @@ from dataclasses import replace from pathlib import Path from typing import Any +from .config_signature import materialize_proposal_for_execution from .spec import ( - ConfigPatch, Proposal, SamplingSearchSpec, StudySpec, @@ -17,14 +17,6 @@ from .spec import ( ) -_TOPOLOGY_FLAG_KEYS = { - "tensor-parallel-size", - "data-parallel-size", - "expert-parallel-size", - "enable-expert-parallel", -} - - class StudyStore: def __init__(self, root: Path | None = None): base = root or Path(".aituner") / "studies" @@ -85,7 +77,7 @@ class StudyStore: state: StudyState, proposal: Proposal, ) -> tuple[TrialSpec, StudyState]: - proposal = _inherit_incumbent_topology_for_runtime_patch( + proposal = materialize_proposal_for_execution( study=study, state=state, proposal=proposal, @@ -268,47 +260,6 @@ def _parallel_size_for_proposal(*, study: StudySpec, proposal: Proposal) -> int: return _parallel_size_for_config(study=study, flag_patch=proposal.config_patch.flag_patch) -def _inherit_incumbent_topology_for_runtime_patch( - *, - study: StudySpec, - state: StudyState, - proposal: Proposal, -) -> Proposal: - flag_patch = dict(proposal.config_patch.flag_patch) - env_patch = dict(proposal.config_patch.env_patch) - if not flag_patch and not env_patch: - return proposal - if _TOPOLOGY_FLAG_KEYS.intersection(flag_patch): - return proposal - if not state.best_trial_id: - return proposal - incumbent = next( - (trial for trial in state.trials if trial.trial_id == state.best_trial_id), - None, - ) - if incumbent is None or not isinstance(incumbent.config_patch, dict): - return proposal - incumbent_patch = incumbent.config_patch.get("flag_patch") - if not isinstance(incumbent_patch, dict): - return proposal - inherited_topology = { - key: value - for key, value in incumbent_patch.items() - if key in _TOPOLOGY_FLAG_KEYS and study.engine.base_flags.get(key) != value - } - if not inherited_topology: - return proposal - merged_flag_patch = dict(inherited_topology) - merged_flag_patch.update(flag_patch) - return replace( - proposal, - config_patch=ConfigPatch( - env_patch=env_patch, - flag_patch=merged_flag_patch, - ), - ) - - def _parallel_size_for_trial_id(*, study: StudySpec, study_root: Path, trial_id: str) -> int | None: trial_spec_path = study_root / "trials" / trial_id / "trial_spec.json" if not trial_spec_path.exists(): diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 49e0ac2..9b8f064 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -15,6 +15,7 @@ from unittest import mock from aituner.cli import main as cli_main from aituner.compare import _aggregate_summary, load_compare_spec, run_compare +from aituner.config_signature import materialized_effective_config_signature from aituner.engine import build_launch_recipe from aituner.http_client import ( HttpClientError, @@ -382,14 +383,101 @@ class CoreFlowTests(unittest.TestCase): study, {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 8}}, ) + noop_tp_string = _effective_config_signature( + study, + {"env_patch": {}, "flag_patch": {"tensor-parallel-size": "8"}}, + ) changed_tp = _effective_config_signature( study, {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, ) self.assertEqual(baseline, noop_tp) + self.assertEqual(baseline, noop_tp_string) self.assertNotEqual(baseline, changed_tp) + def test_materialized_signature_inherits_incumbent_topology(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "max-num-seqs": 64, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-seqs", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_tp_dp_products": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_parallel_size=8, + trials=[ + TrialSummary( + trial_id="trial-0002", + status="completed", + parallel_size=8, + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 2, + "data-parallel-size": 4, + "max-num-seqs": 160, + }, + }, + ) + ], + ) + runtime_only = Proposal.from_dict( + { + "observation": "Try the same runtime cap.", + "diagnosis": "This should materialize on incumbent topology.", + "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 160}}, + "expected_effects": ["no-op after topology inheritance"], + } + ) + explicit = Proposal.from_dict( + { + "observation": "Explicit duplicate.", + "diagnosis": "Same effective execution config.", + "config_patch": { + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": "2", + "data-parallel-size": "4", + "max-num-seqs": "160", + }, + }, + "expected_effects": ["same config"], + } + ) + self.assertEqual( + materialized_effective_config_signature( + study=study, + state=state, + proposal=runtime_only, + ), + materialized_effective_config_signature( + study=study, + state=state, + proposal=explicit, + ), + ) + def test_lca_workload_profile_uses_standard_10d_features(self) -> None: window = WindowRecord( window_id="w1", @@ -6019,6 +6107,105 @@ class CoreFlowTests(unittest.TestCase): self.assertTrue(honored) self.assertEqual(honored[-1]["stop_authorized_by"], "llm_after_veto_budget") + def test_cli_tune_rejects_repeated_materialized_llm_config(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 4, + "data-parallel-size": 2, + "max-num-seqs": 64, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-seqs", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_data_parallel_sizes": [1, 2, 4, 8], + "allowed_tp_dp_products": [1, 2, 4, 8], + }, + }, + ) + spec = json.loads(study_path.read_text(encoding="utf-8")) + spec["llm"]["use_harness"] = False + spec["llm"]["endpoint"] = { + "provider": "custom", + "base_url": "http://localhost:9/v1", + "model": "test-model", + "api_key_env": "AITUNER_TEST_KEY", + } + study_path.write_text(json.dumps(spec), encoding="utf-8") + study = load_study_spec(study_path) + store_root = tmp_path / "store" + store = StudyStore(store_root) + store.init_study(spec_path=study_path, study=study) + store.save_state( + StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_parallel_size=8, + best_sampling_u=0.125, + best_request_rate=3.0, + best_request_rate_per_gpu=0.375, + next_trial_index=3, + trials=[ + TrialSummary( + trial_id="trial-0002", + status="completed", + parallel_size=8, + best_sampling_u=0.125, + best_request_rate=3.0, + best_request_rate_per_gpu=0.375, + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 2, + "data-parallel-size": 4, + "max-num-seqs": 160, + }, + }, + ) + ], + ) + ) + repeated_runtime_patch = json.dumps( + { + "observation": "Try the same runtime setting.", + "diagnosis": "This is duplicate after topology inheritance.", + "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 160}}, + "expected_effects": ["should be vetoed"], + "why_not_previous_failures": "", + "should_stop": False, + } + ) + stderr = io.StringIO() + with mock.patch("aituner.cli.run_trial") as run_trial_mock, mock.patch( + "aituner.cli.call_llm_for_proposal", return_value=repeated_runtime_patch + ), contextlib.redirect_stderr(stderr): + exit_code = cli_main( + [ + "study", + "tune", + "--spec", + str(study_path), + "--store-root", + str(store_root), + "--skip-baseline", + "--max-trials", + "3", + ] + ) + self.assertEqual(exit_code, 2) + run_trial_mock.assert_not_called() + self.assertIn("repeats an already tested effective full config", stderr.getvalue()) + self.assertIn("trial-0002", stderr.getvalue()) + def test_cli_tune_uses_harness_stop_before_llm(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)