From 48911b658bbf052d70d952d1cdf55ad6b50ba7a5 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 26 Jun 2026 21:28:10 +0800 Subject: [PATCH] Use normalized full config signatures --- src/aituner/harness.py | 73 ++++++++++------ src/aituner/llm.py | 11 +-- tests/test_core_flow.py | 178 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 29 deletions(-) diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 8c4973a..88f2eb4 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -902,16 +902,19 @@ def _harness_proposal_decision( "expected_effects": [], } tested_signatures = { - _config_signature(item.get("config_patch") if isinstance(item, dict) else None) + _effective_config_signature( + study, + item.get("config_patch") if isinstance(item, dict) else None, + ) for item in recent_diagnostics } - tested_signatures.update(_state_tested_signatures(state)) + tested_signatures.update(_state_tested_signatures(study, state)) if experiment_plan is not None: next_action = experiment_plan.get("next_action") if isinstance(next_action, dict) and _as_float(next_action.get("score")) >= 0.35: patch = next_action.get("config_patch") if isinstance(patch, dict): - signature = _config_signature(patch) + signature = _effective_config_signature(study, patch) if signature not in tested_signatures: return { "should_propose": True, @@ -976,7 +979,7 @@ def _harness_proposal_decision( "reason": "no_legal_adjacent_tensor_parallel_probe", } flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} - signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch}) if signature in tested_signatures: return { **default, @@ -1021,7 +1024,7 @@ def _topology_frontier_proposal( flag_patch = frontier.get("flag_patch") if not isinstance(flag_patch, dict): return {**default, "reason": "topology_frontier_patch_missing"} - signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch}) if signature in tested_signatures: return {**default, "reason": "topology_frontier_already_tested"} return { @@ -1051,10 +1054,13 @@ def _experiment_plan( bottleneck_hypotheses: list[dict[str, Any]], ) -> dict[str, Any]: tested_signatures = { - _config_signature(item.get("config_patch") if isinstance(item, dict) else None) + _effective_config_signature( + study, + item.get("config_patch") if isinstance(item, dict) else None, + ) for item in recent_diagnostics } - tested_signatures.update(_state_tested_signatures(state)) + tested_signatures.update(_state_tested_signatures(study, state)) candidates = _candidate_actions( study, window_summary, @@ -1183,7 +1189,7 @@ def _topology_candidate_actions( if point["tensor-parallel-size"] == current_tp and point["data-parallel-size"] == current_dp: continue patch = _topology_patch(study, point) - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature in tested_signatures: continue score, factors = _score_topology_candidate( @@ -1254,7 +1260,8 @@ def _runtime_candidate_actions( _next_tp = _next_allowed_tp(study, current_tp=cur_tp, current_dp=cur_dp) tp_frontier_open = ( _next_tp is not None - and _config_signature( + and _effective_config_signature( + study, {"env_patch": {}, "flag_patch": {"tensor-parallel-size": _next_tp}} ) not in tested_signatures @@ -1276,7 +1283,7 @@ def _runtime_candidate_actions( mbt_targets.append(("lower_mbt", max(8192, current_mbt // 2))) for action_id, target in mbt_targets: patch = {**runtime_base_patch, "max-num-batched-tokens": target} - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature in tested_signatures: continue relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14 @@ -1332,7 +1339,7 @@ def _runtime_candidate_actions( mns_targets.append(("raise_max_num_seqs", target)) for action_id, target in mns_targets: patch = {**runtime_base_patch, "max-num-seqs": target} - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature in tested_signatures: continue if top_bottleneck in {"decode_tpot", "admission_or_queueing"}: @@ -1388,7 +1395,7 @@ def _runtime_candidate_actions( "max-num-batched-tokens": mbt_target, "max-num-seqs": mns_target, } - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature not in tested_signatures: actions.append( _runtime_action( @@ -1413,7 +1420,7 @@ def _runtime_candidate_actions( current = bool(anchor_flags.get("enable-chunked-prefill", False)) if not current: patch = {**runtime_base_patch, "enable-chunked-prefill": True} - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature not in tested_signatures: actions.append( _runtime_action( @@ -1444,7 +1451,7 @@ def _runtime_candidate_actions( ) if target is not None: patch = {**runtime_base_patch, "gpu-memory-utilization": target} - signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) if signature not in tested_signatures: actions.append( _runtime_action( @@ -1557,8 +1564,9 @@ def _has_unmeasured_higher_tp_candidate( or point["tensor-parallel-size"] <= current_tp ): continue - signature = _config_signature( - {"env_patch": {}, "flag_patch": _topology_patch(study, point)} + signature = _effective_config_signature( + study, + {"env_patch": {}, "flag_patch": _topology_patch(study, point)}, ) if signature not in tested_signatures: return True @@ -1770,7 +1778,7 @@ def _parallel_size_can_vary(study: StudySpec) -> bool: normalized = _normalized_topology_flags(flags) if any(normalized.get(key) != point.get(key) for key in point): continue - signatures.add(_config_signature({"env_patch": {}, "flag_patch": patch})) + signatures.add(_effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})) return len(signatures) > 1 @@ -1948,8 +1956,8 @@ def _topology_frontier_status( base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1) if current_dp != base_dp: flag_patch["data-parallel-size"] = current_dp - signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) - if signature in _state_tested_signatures(state): + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch}) + if signature in _state_tested_signatures(study, state): return { **default, "reason": "higher_tp_frontier_already_tested", @@ -1979,9 +1987,9 @@ def _effective_flags_for_item(study: StudySpec, item: dict[str, Any]) -> dict[st return flags -def _state_tested_signatures(state: StudyState) -> set[str]: +def _state_tested_signatures(study: StudySpec, state: StudyState) -> set[str]: return { - _config_signature(trial.config_patch) + _effective_config_signature(study, trial.config_patch) for trial in state.trials if isinstance(trial.config_patch, dict) } @@ -2045,7 +2053,7 @@ def _runtime_refinement_proposal( "reason": "no_larger_mbt_step_available", } flag_patch["max-num-batched-tokens"] = target_mbt - signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch}) if signature in tested_signatures: return { **default, @@ -2643,12 +2651,29 @@ def _parse_float_like(value: Any, *, default: float) -> float: def _config_signature(config_patch: Any) -> str: + return json.dumps( + _normalized_config_patch(config_patch), + ensure_ascii=False, + sort_keys=True, + separators=(",", ":"), + ) + + +def _effective_config_signature(study: StudySpec, config_patch: Any) -> str: + patch = _normalized_config_patch(config_patch) + payload = { + "env": {**study.engine.base_envs, **patch["env_patch"]}, + "flags": {**study.engine.base_flags, **patch["flag_patch"]}, + } + return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + + +def _normalized_config_patch(config_patch: Any) -> dict[str, dict[str, Any]]: if not isinstance(config_patch, dict): config_patch = {} env_patch = config_patch.get("env_patch") flag_patch = config_patch.get("flag_patch") - payload = { + return { "env_patch": env_patch if isinstance(env_patch, dict) else {}, "flag_patch": flag_patch if isinstance(flag_patch, dict) else {}, } - return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) diff --git a/src/aituner/llm.py b/src/aituner/llm.py index 1a4dc9c..125be40 100644 --- a/src/aituner/llm.py +++ b/src/aituner/llm.py @@ -5,7 +5,7 @@ import time from pathlib import Path from typing import TYPE_CHECKING, Any -from .harness import build_harness_context, render_harness_context +from .harness import _effective_config_signature, build_harness_context, render_harness_context from .http_client import chat_completion, stream_text_completion from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState @@ -306,7 +306,7 @@ def build_prompt( json.dumps(launch_failures, ensure_ascii=False, indent=2), "", "Tested config signatures:", - json.dumps(_tested_config_signatures(state), ensure_ascii=False, indent=2), + json.dumps(_tested_config_signatures(study, state), ensure_ascii=False, indent=2), ] return "\n".join(sections) @@ -402,7 +402,7 @@ def build_prompt( json.dumps(parallel_candidates, ensure_ascii=False, indent=2), "", "Tested config signatures:", - json.dumps(_tested_config_signatures(state), ensure_ascii=False, indent=2), + json.dumps(_tested_config_signatures(study, state), ensure_ascii=False, indent=2), ] sections.extend( [ @@ -435,12 +435,12 @@ def build_prompt( return "\n".join(sections) -def _tested_config_signatures(state: StudyState) -> list[dict[str, Any]]: +def _tested_config_signatures(study: StudySpec, state: StudyState) -> list[dict[str, Any]]: signatures: list[dict[str, Any]] = [] seen: set[str] = set() for trial in state.trials: config_patch = trial.config_patch or {} - signature = json.dumps(config_patch, sort_keys=True, ensure_ascii=False) + signature = _effective_config_signature(study, config_patch) if signature in seen: continue seen.add(signature) @@ -449,6 +449,7 @@ def _tested_config_signatures(state: StudyState) -> list[dict[str, Any]]: "trial_id": trial.trial_id, "status": trial.status, "best_request_rate_per_gpu": trial.best_request_rate_per_gpu, + "effective_config_signature": signature, "config_patch": config_patch, } ) diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index a72eeb6..2f8a330 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -25,6 +25,7 @@ from aituner.http_client import ( ) from aituner.job import append_job, build_trial_job from aituner.harness import ( + _effective_config_signature, build_harness_context, build_harness_guided_proposal, build_harness_stop_proposal, @@ -358,6 +359,36 @@ class CoreFlowTests(unittest.TestCase): "search_high_lowered_to_trace_ceiling", ) + def test_effective_config_signature_treats_noop_patch_as_baseline(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + study_path = _write_study_assets( + Path(tmp), + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "gpu-memory-utilization": 0.5, + "max-num-seqs": 8, + }, + }, + ) + study = load_study_spec(study_path) + + baseline = _effective_config_signature(study, {"env_patch": {}, "flag_patch": {}}) + noop_tp = _effective_config_signature( + study, + {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 8}}, + ) + changed_tp = _effective_config_signature( + study, + {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}}, + ) + + self.assertEqual(baseline, noop_tp) + self.assertNotEqual(baseline, changed_tp) + def test_lca_workload_profile_uses_standard_10d_features(self) -> None: window = WindowRecord( window_id="w1", @@ -1639,6 +1670,153 @@ class CoreFlowTests(unittest.TestCase): "search_high_saturation_requires_parallel_size_evidence", ) + def test_harness_does_not_repropose_noop_topology_equivalent_to_baseline( + self, + ) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "gpu-memory-utilization": 0.5, + "max-num-seqs": 8, + }, + "tunable_flags": ["tensor-parallel-size", "max-num-seqs"], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4, 8], + "allowed_tp_dp_products": [1, 2, 4, 8], + }, + }, + ) + study = load_study_spec(study_path) + trial1_result = tmp_path / "trial-0001.json" + trial1_result.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.935616858887, + "best_request_rate": 8.0, + "best_pass_rate": 1.0, + "probes": [ + { + "threshold": 0.935616858887, + "feasible": True, + "payload": { + "request_count": 480, + "pass_rate": 1.0, + "request_rate": 8.0, + "early_stopped": False, + "early_stop_reason": "", + "latency_summary": {"failed_reason_counts": {}}, + }, + } + ], + } + ), + encoding="utf-8", + ) + trial2_result = tmp_path / "trial-0002.json" + trial2_result.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.810867944369, + "best_request_rate": 6.95, + "best_pass_rate": 0.9784, + "probes": [ + { + "threshold": 0.873242401628, + "feasible": False, + "payload": { + "request_count": 450, + "pass_rate": 0.7844, + "request_rate": 7.5, + "early_stopped": True, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": { + "ttft_ms>2000.0": 42, + "slo_pass_rate_unrecoverable": 49, + } + }, + }, + }, + { + "threshold": 0.810867944369, + "feasible": True, + "payload": { + "request_count": 417, + "pass_rate": 0.9784, + "request_rate": 6.95, + "early_stopped": False, + "early_stop_reason": "", + "latency_summary": { + "failed_reason_counts": {"ttft_ms>2000.0": 9} + }, + }, + }, + ], + } + ), + encoding="utf-8", + ) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_parallel_size=4, + best_sampling_u=0.810867944369, + best_request_rate=6.95, + best_request_rate_per_gpu=1.7375, + next_trial_index=3, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=8.0, + best_request_rate_per_gpu=1.0, + result_path=str(trial1_result), + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + parallel_size=4, + best_request_rate=6.95, + best_request_rate_per_gpu=1.7375, + result_path=str(trial2_result), + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 4}, + }, + ), + ], + ) + + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 2048}, + state=state, + ) + actions = context["experiment_plan"]["candidate_actions"] + self.assertFalse( + any( + action.get("config_patch", {}).get("flag_patch") + == {"tensor-parallel-size": 8} + for action in actions + ) + ) + proposal = build_harness_guided_proposal(context) + self.assertTrue( + proposal is None + or proposal.config_patch.flag_patch != {"tensor-parallel-size": 8} + ) + def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)