Use normalized full config signatures

This commit is contained in:
2026-06-26 21:28:10 +08:00
parent 7f50b8b8ea
commit 48911b658b
3 changed files with 233 additions and 29 deletions

View File

@@ -902,16 +902,19 @@ def _harness_proposal_decision(
"expected_effects": [], "expected_effects": [],
} }
tested_signatures = { tested_signatures = {
_config_signature(item.get("config_patch") if isinstance(item, dict) else None) _effective_config_signature(
study,
item.get("config_patch") if isinstance(item, dict) else None,
)
for item in recent_diagnostics for item in recent_diagnostics
} }
tested_signatures.update(_state_tested_signatures(state)) tested_signatures.update(_state_tested_signatures(study, state))
if experiment_plan is not None: if experiment_plan is not None:
next_action = experiment_plan.get("next_action") next_action = experiment_plan.get("next_action")
if isinstance(next_action, dict) and _as_float(next_action.get("score")) >= 0.35: if isinstance(next_action, dict) and _as_float(next_action.get("score")) >= 0.35:
patch = next_action.get("config_patch") patch = next_action.get("config_patch")
if isinstance(patch, dict): if isinstance(patch, dict):
signature = _config_signature(patch) signature = _effective_config_signature(study, patch)
if signature not in tested_signatures: if signature not in tested_signatures:
return { return {
"should_propose": True, "should_propose": True,
@@ -976,7 +979,7 @@ def _harness_proposal_decision(
"reason": "no_legal_adjacent_tensor_parallel_probe", "reason": "no_legal_adjacent_tensor_parallel_probe",
} }
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch})
if signature in tested_signatures: if signature in tested_signatures:
return { return {
**default, **default,
@@ -1021,7 +1024,7 @@ def _topology_frontier_proposal(
flag_patch = frontier.get("flag_patch") flag_patch = frontier.get("flag_patch")
if not isinstance(flag_patch, dict): if not isinstance(flag_patch, dict):
return {**default, "reason": "topology_frontier_patch_missing"} return {**default, "reason": "topology_frontier_patch_missing"}
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch})
if signature in tested_signatures: if signature in tested_signatures:
return {**default, "reason": "topology_frontier_already_tested"} return {**default, "reason": "topology_frontier_already_tested"}
return { return {
@@ -1051,10 +1054,13 @@ def _experiment_plan(
bottleneck_hypotheses: list[dict[str, Any]], bottleneck_hypotheses: list[dict[str, Any]],
) -> dict[str, Any]: ) -> dict[str, Any]:
tested_signatures = { tested_signatures = {
_config_signature(item.get("config_patch") if isinstance(item, dict) else None) _effective_config_signature(
study,
item.get("config_patch") if isinstance(item, dict) else None,
)
for item in recent_diagnostics for item in recent_diagnostics
} }
tested_signatures.update(_state_tested_signatures(state)) tested_signatures.update(_state_tested_signatures(study, state))
candidates = _candidate_actions( candidates = _candidate_actions(
study, study,
window_summary, window_summary,
@@ -1183,7 +1189,7 @@ def _topology_candidate_actions(
if point["tensor-parallel-size"] == current_tp and point["data-parallel-size"] == current_dp: if point["tensor-parallel-size"] == current_tp and point["data-parallel-size"] == current_dp:
continue continue
patch = _topology_patch(study, point) patch = _topology_patch(study, point)
signature = _config_signature({"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures: if signature in tested_signatures:
continue continue
score, factors = _score_topology_candidate( score, factors = _score_topology_candidate(
@@ -1254,7 +1260,8 @@ def _runtime_candidate_actions(
_next_tp = _next_allowed_tp(study, current_tp=cur_tp, current_dp=cur_dp) _next_tp = _next_allowed_tp(study, current_tp=cur_tp, current_dp=cur_dp)
tp_frontier_open = ( tp_frontier_open = (
_next_tp is not None _next_tp is not None
and _config_signature( and _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": _next_tp}} {"env_patch": {}, "flag_patch": {"tensor-parallel-size": _next_tp}}
) )
not in tested_signatures not in tested_signatures
@@ -1276,7 +1283,7 @@ def _runtime_candidate_actions(
mbt_targets.append(("lower_mbt", max(8192, current_mbt // 2))) mbt_targets.append(("lower_mbt", max(8192, current_mbt // 2)))
for action_id, target in mbt_targets: for action_id, target in mbt_targets:
patch = {**runtime_base_patch, "max-num-batched-tokens": target} patch = {**runtime_base_patch, "max-num-batched-tokens": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures: if signature in tested_signatures:
continue continue
relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14 relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14
@@ -1332,7 +1339,7 @@ def _runtime_candidate_actions(
mns_targets.append(("raise_max_num_seqs", target)) mns_targets.append(("raise_max_num_seqs", target))
for action_id, target in mns_targets: for action_id, target in mns_targets:
patch = {**runtime_base_patch, "max-num-seqs": target} patch = {**runtime_base_patch, "max-num-seqs": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures: if signature in tested_signatures:
continue continue
if top_bottleneck in {"decode_tpot", "admission_or_queueing"}: if top_bottleneck in {"decode_tpot", "admission_or_queueing"}:
@@ -1388,7 +1395,7 @@ def _runtime_candidate_actions(
"max-num-batched-tokens": mbt_target, "max-num-batched-tokens": mbt_target,
"max-num-seqs": mns_target, "max-num-seqs": mns_target,
} }
signature = _config_signature({"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures: if signature not in tested_signatures:
actions.append( actions.append(
_runtime_action( _runtime_action(
@@ -1413,7 +1420,7 @@ def _runtime_candidate_actions(
current = bool(anchor_flags.get("enable-chunked-prefill", False)) current = bool(anchor_flags.get("enable-chunked-prefill", False))
if not current: if not current:
patch = {**runtime_base_patch, "enable-chunked-prefill": True} patch = {**runtime_base_patch, "enable-chunked-prefill": True}
signature = _config_signature({"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures: if signature not in tested_signatures:
actions.append( actions.append(
_runtime_action( _runtime_action(
@@ -1444,7 +1451,7 @@ def _runtime_candidate_actions(
) )
if target is not None: if target is not None:
patch = {**runtime_base_patch, "gpu-memory-utilization": target} patch = {**runtime_base_patch, "gpu-memory-utilization": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures: if signature not in tested_signatures:
actions.append( actions.append(
_runtime_action( _runtime_action(
@@ -1557,8 +1564,9 @@ def _has_unmeasured_higher_tp_candidate(
or point["tensor-parallel-size"] <= current_tp or point["tensor-parallel-size"] <= current_tp
): ):
continue continue
signature = _config_signature( signature = _effective_config_signature(
{"env_patch": {}, "flag_patch": _topology_patch(study, point)} study,
{"env_patch": {}, "flag_patch": _topology_patch(study, point)},
) )
if signature not in tested_signatures: if signature not in tested_signatures:
return True return True
@@ -1770,7 +1778,7 @@ def _parallel_size_can_vary(study: StudySpec) -> bool:
normalized = _normalized_topology_flags(flags) normalized = _normalized_topology_flags(flags)
if any(normalized.get(key) != point.get(key) for key in point): if any(normalized.get(key) != point.get(key) for key in point):
continue continue
signatures.add(_config_signature({"env_patch": {}, "flag_patch": patch})) signatures.add(_effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}))
return len(signatures) > 1 return len(signatures) > 1
@@ -1948,8 +1956,8 @@ def _topology_frontier_status(
base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1) base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1)
if current_dp != base_dp: if current_dp != base_dp:
flag_patch["data-parallel-size"] = current_dp flag_patch["data-parallel-size"] = current_dp
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch})
if signature in _state_tested_signatures(state): if signature in _state_tested_signatures(study, state):
return { return {
**default, **default,
"reason": "higher_tp_frontier_already_tested", "reason": "higher_tp_frontier_already_tested",
@@ -1979,9 +1987,9 @@ def _effective_flags_for_item(study: StudySpec, item: dict[str, Any]) -> dict[st
return flags return flags
def _state_tested_signatures(state: StudyState) -> set[str]: def _state_tested_signatures(study: StudySpec, state: StudyState) -> set[str]:
return { return {
_config_signature(trial.config_patch) _effective_config_signature(study, trial.config_patch)
for trial in state.trials for trial in state.trials
if isinstance(trial.config_patch, dict) if isinstance(trial.config_patch, dict)
} }
@@ -2045,7 +2053,7 @@ def _runtime_refinement_proposal(
"reason": "no_larger_mbt_step_available", "reason": "no_larger_mbt_step_available",
} }
flag_patch["max-num-batched-tokens"] = target_mbt flag_patch["max-num-batched-tokens"] = target_mbt
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch}) signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch})
if signature in tested_signatures: if signature in tested_signatures:
return { return {
**default, **default,
@@ -2643,12 +2651,29 @@ def _parse_float_like(value: Any, *, default: float) -> float:
def _config_signature(config_patch: Any) -> str: def _config_signature(config_patch: Any) -> str:
return json.dumps(
_normalized_config_patch(config_patch),
ensure_ascii=False,
sort_keys=True,
separators=(",", ":"),
)
def _effective_config_signature(study: StudySpec, config_patch: Any) -> str:
patch = _normalized_config_patch(config_patch)
payload = {
"env": {**study.engine.base_envs, **patch["env_patch"]},
"flags": {**study.engine.base_flags, **patch["flag_patch"]},
}
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _normalized_config_patch(config_patch: Any) -> dict[str, dict[str, Any]]:
if not isinstance(config_patch, dict): if not isinstance(config_patch, dict):
config_patch = {} config_patch = {}
env_patch = config_patch.get("env_patch") env_patch = config_patch.get("env_patch")
flag_patch = config_patch.get("flag_patch") flag_patch = config_patch.get("flag_patch")
payload = { return {
"env_patch": env_patch if isinstance(env_patch, dict) else {}, "env_patch": env_patch if isinstance(env_patch, dict) else {},
"flag_patch": flag_patch if isinstance(flag_patch, dict) else {}, "flag_patch": flag_patch if isinstance(flag_patch, dict) else {},
} }
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))

View File

@@ -5,7 +5,7 @@ import time
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from .harness import build_harness_context, render_harness_context from .harness import _effective_config_signature, build_harness_context, render_harness_context
from .http_client import chat_completion, stream_text_completion from .http_client import chat_completion, stream_text_completion
from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState
@@ -306,7 +306,7 @@ def build_prompt(
json.dumps(launch_failures, ensure_ascii=False, indent=2), json.dumps(launch_failures, ensure_ascii=False, indent=2),
"", "",
"Tested config signatures:", "Tested config signatures:",
json.dumps(_tested_config_signatures(state), ensure_ascii=False, indent=2), json.dumps(_tested_config_signatures(study, state), ensure_ascii=False, indent=2),
] ]
return "\n".join(sections) return "\n".join(sections)
@@ -402,7 +402,7 @@ def build_prompt(
json.dumps(parallel_candidates, ensure_ascii=False, indent=2), json.dumps(parallel_candidates, ensure_ascii=False, indent=2),
"", "",
"Tested config signatures:", "Tested config signatures:",
json.dumps(_tested_config_signatures(state), ensure_ascii=False, indent=2), json.dumps(_tested_config_signatures(study, state), ensure_ascii=False, indent=2),
] ]
sections.extend( sections.extend(
[ [
@@ -435,12 +435,12 @@ def build_prompt(
return "\n".join(sections) return "\n".join(sections)
def _tested_config_signatures(state: StudyState) -> list[dict[str, Any]]: def _tested_config_signatures(study: StudySpec, state: StudyState) -> list[dict[str, Any]]:
signatures: list[dict[str, Any]] = [] signatures: list[dict[str, Any]] = []
seen: set[str] = set() seen: set[str] = set()
for trial in state.trials: for trial in state.trials:
config_patch = trial.config_patch or {} config_patch = trial.config_patch or {}
signature = json.dumps(config_patch, sort_keys=True, ensure_ascii=False) signature = _effective_config_signature(study, config_patch)
if signature in seen: if signature in seen:
continue continue
seen.add(signature) seen.add(signature)
@@ -449,6 +449,7 @@ def _tested_config_signatures(state: StudyState) -> list[dict[str, Any]]:
"trial_id": trial.trial_id, "trial_id": trial.trial_id,
"status": trial.status, "status": trial.status,
"best_request_rate_per_gpu": trial.best_request_rate_per_gpu, "best_request_rate_per_gpu": trial.best_request_rate_per_gpu,
"effective_config_signature": signature,
"config_patch": config_patch, "config_patch": config_patch,
} }
) )

View File

@@ -25,6 +25,7 @@ from aituner.http_client import (
) )
from aituner.job import append_job, build_trial_job from aituner.job import append_job, build_trial_job
from aituner.harness import ( from aituner.harness import (
_effective_config_signature,
build_harness_context, build_harness_context,
build_harness_guided_proposal, build_harness_guided_proposal,
build_harness_stop_proposal, build_harness_stop_proposal,
@@ -358,6 +359,36 @@ class CoreFlowTests(unittest.TestCase):
"search_high_lowered_to_trace_ceiling", "search_high_lowered_to_trace_ceiling",
) )
def test_effective_config_signature_treats_noop_patch_as_baseline(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets(
Path(tmp),
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"gpu-memory-utilization": 0.5,
"max-num-seqs": 8,
},
},
)
study = load_study_spec(study_path)
baseline = _effective_config_signature(study, {"env_patch": {}, "flag_patch": {}})
noop_tp = _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": 8}},
)
changed_tp = _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
)
self.assertEqual(baseline, noop_tp)
self.assertNotEqual(baseline, changed_tp)
def test_lca_workload_profile_uses_standard_10d_features(self) -> None: def test_lca_workload_profile_uses_standard_10d_features(self) -> None:
window = WindowRecord( window = WindowRecord(
window_id="w1", window_id="w1",
@@ -1639,6 +1670,153 @@ class CoreFlowTests(unittest.TestCase):
"search_high_saturation_requires_parallel_size_evidence", "search_high_saturation_requires_parallel_size_evidence",
) )
def test_harness_does_not_repropose_noop_topology_equivalent_to_baseline(
self,
) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"gpu-memory-utilization": 0.5,
"max-num-seqs": 8,
},
"tunable_flags": ["tensor-parallel-size", "max-num-seqs"],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_tp_dp_products": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
trial1_result = tmp_path / "trial-0001.json"
trial1_result.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.935616858887,
"best_request_rate": 8.0,
"best_pass_rate": 1.0,
"probes": [
{
"threshold": 0.935616858887,
"feasible": True,
"payload": {
"request_count": 480,
"pass_rate": 1.0,
"request_rate": 8.0,
"early_stopped": False,
"early_stop_reason": "",
"latency_summary": {"failed_reason_counts": {}},
},
}
],
}
),
encoding="utf-8",
)
trial2_result = tmp_path / "trial-0002.json"
trial2_result.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.810867944369,
"best_request_rate": 6.95,
"best_pass_rate": 0.9784,
"probes": [
{
"threshold": 0.873242401628,
"feasible": False,
"payload": {
"request_count": 450,
"pass_rate": 0.7844,
"request_rate": 7.5,
"early_stopped": True,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {
"ttft_ms>2000.0": 42,
"slo_pass_rate_unrecoverable": 49,
}
},
},
},
{
"threshold": 0.810867944369,
"feasible": True,
"payload": {
"request_count": 417,
"pass_rate": 0.9784,
"request_rate": 6.95,
"early_stopped": False,
"early_stop_reason": "",
"latency_summary": {
"failed_reason_counts": {"ttft_ms>2000.0": 9}
},
},
},
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_parallel_size=4,
best_sampling_u=0.810867944369,
best_request_rate=6.95,
best_request_rate_per_gpu=1.7375,
next_trial_index=3,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=8.0,
best_request_rate_per_gpu=1.0,
result_path=str(trial1_result),
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=4,
best_request_rate=6.95,
best_request_rate_per_gpu=1.7375,
result_path=str(trial2_result),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 4},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 2048},
state=state,
)
actions = context["experiment_plan"]["candidate_actions"]
self.assertFalse(
any(
action.get("config_patch", {}).get("flag_patch")
== {"tensor-parallel-size": 8}
for action in actions
)
)
proposal = build_harness_guided_proposal(context)
self.assertTrue(
proposal is None
or proposal.config_patch.flag_patch != {"tensor-parallel-size": 8}
)
def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None: def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp) tmp_path = Path(tmp)