Require harness proposals from candidate sets

This commit is contained in:
2026-06-27 01:03:30 +08:00
parent bef260f183
commit 9accf2575e

View File

@@ -144,8 +144,8 @@ def build_harness_guided_proposal(context: dict[str, Any]) -> Proposal | None:
diagnosis = str(proposal.get("diagnosis") or reason)
return Proposal(
observation=(
"Harness selected a deterministic first validation probe before "
f"requesting an LLM proposal: {reason}."
"Harness selected the highest-scoring deterministic candidate-set action: "
f"{reason}."
),
diagnosis=diagnosis,
config_patch=ConfigPatch(env_patch=dict(env_patch), flag_patch=dict(flag_patch)),
@@ -902,7 +902,10 @@ def _harness_proposal_decision(
default = {
"should_propose": False,
"reason": "no_deterministic_harness_proposal",
"diagnosis": "Defer to the LLM proposal policy.",
"diagnosis": (
"The profile-driven candidate set has no untested action above the "
"proposal threshold; defer to the LLM proposal policy."
),
"config_patch": {"env_patch": {}, "flag_patch": {}},
"expected_effects": [],
}
@@ -920,6 +923,7 @@ def _harness_proposal_decision(
patch = next_action.get("config_patch")
if isinstance(patch, dict):
signature = _effective_config_signature(study, patch)
candidate_fingerprint = next_action.get("effective_config_fingerprint")
if signature not in tested_signatures:
return {
"should_propose": True,
@@ -932,80 +936,11 @@ def _harness_proposal_decision(
if isinstance(item, str)
],
"candidate_score": next_action.get("score"),
"candidate_id": next_action.get("candidate_id"),
"effective_config_fingerprint": candidate_fingerprint,
"bottleneck_hypotheses": experiment_plan.get("bottleneck_hypotheses", []),
}
baseline = recent_diagnostics[0] if recent_diagnostics else {}
topology_frontier = _topology_frontier_proposal(
study,
window_summary,
state,
recent_diagnostics,
tested_signatures=tested_signatures,
)
if topology_frontier["should_propose"]:
return topology_frontier
runtime_refinement = _runtime_refinement_proposal(
study,
window_summary,
state,
recent_diagnostics,
tested_signatures=tested_signatures,
)
if runtime_refinement["should_propose"]:
return runtime_refinement
if len(state.trials) != 1 or len(recent_diagnostics) != 1:
return default
if baseline.get("status") != "completed":
return default
if not isinstance(baseline.get("best_request_rate_per_gpu"), (int, float)):
return default
active_bottleneck = str(baseline.get("active_bottleneck") or "")
if active_bottleneck not in {"ttft_prefill", "decode_tpot"}:
return {
**default,
"reason": "baseline_bottleneck_does_not_require_tp_first_probe",
"diagnosis": f"Baseline bottleneck is {active_bottleneck or 'unknown'}.",
}
if "tensor-parallel-size" not in set(study.engine.tunable_flags):
return {
**default,
"reason": "tensor_parallel_size_not_tunable",
}
base_flags = dict(study.engine.base_flags)
baseline_patch = baseline.get("config_patch")
if isinstance(baseline_patch, dict) and isinstance(baseline_patch.get("flag_patch"), dict):
base_flags.update(baseline_patch["flag_patch"])
current_tp = _parse_int_like(base_flags.get("tensor-parallel-size", 1), default=1)
current_dp = _parse_int_like(base_flags.get("data-parallel-size", 1), default=1)
next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp)
if next_tp is None:
return {
**default,
"reason": "no_legal_adjacent_tensor_parallel_probe",
}
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch})
if signature in tested_signatures:
return {
**default,
"reason": "adjacent_tensor_parallel_probe_already_tested",
}
return {
"should_propose": True,
"reason": "first_adjacent_tensor_parallel_probe_for_latency_bottleneck",
"diagnosis": (
f"Baseline high-load probes indicate {active_bottleneck}; the generic "
"topology harness validates the adjacent legal TP increase before "
"runtime-only or DP/EP probes."
),
"config_patch": {"env_patch": {}, "flag_patch": flag_patch},
"expected_effects": [
"reduce per-request latency pressure at higher offered load",
"validate the nearest TP topology before broader runtime search",
],
"baseline_trial_id": baseline.get("trial_id"),
"active_bottleneck": active_bottleneck,
}
return default
def _topology_frontier_proposal(
@@ -1326,6 +1261,7 @@ def _runtime_candidate_actions(
topology_patch = _preserve_topology_patch(study, anchor_flags)
runtime_base_patch = {**topology_patch, **_preserve_runtime_patch(study, anchor_flags)}
actions: list[dict[str, Any]] = []
seen_signatures = set(tested_signatures)
cur_tp = _parse_int_like(anchor_flags.get("tensor-parallel-size"), default=1)
cur_dp = _parse_int_like(anchor_flags.get("data-parallel-size"), default=1)
@@ -1344,6 +1280,62 @@ def _runtime_candidate_actions(
)
topology_settled = not tp_frontier_open
if (
"max-num-batched-tokens" in tunable
and _anchor_has_topology_patch(anchor)
and recent_diagnostics
and recent_diagnostics[-1].get("trial_id") == anchor.get("trial_id")
and cur_tp > 1
and not bottleneck_hypotheses
):
current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0)
target_mbt = (
_initial_mbt_from_window(window_summary)
if current_mbt <= 0
else _next_mbt_step(current_mbt)
)
if target_mbt is not None:
action_id = (
"same_topology_runtime_seed_after_tp_incumbent"
if current_mbt <= 0
else "same_topology_mbt_growth_after_feasible_runtime_incumbent"
)
patch = {**runtime_base_patch, "max-num-batched-tokens": target_mbt}
if "enable-chunked-prefill" in tunable:
patch["enable-chunked-prefill"] = True
signature = _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": patch},
)
if signature in seen_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="same-topology-runtime",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
else:
actions.append(
_runtime_action(
action_id=action_id,
knob_family="same-topology-runtime",
score=0.46 + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"A measured TP incumbent improved request_rate_per_gpu; seed "
"same-topology batching controls before changing another topology axis."
),
expected_effects=[
"preserve the incumbent topology while increasing batching headroom",
"reject if the runtime seed fails to improve request_rate_per_gpu",
],
)
)
seen_signatures.add(signature)
if "max-num-batched-tokens" in tunable:
current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0)
mbt_targets: list[tuple[str, int]] = []
@@ -1360,7 +1352,7 @@ def _runtime_candidate_actions(
for action_id, target in mbt_targets:
patch = {**runtime_base_patch, "max-num-batched-tokens": target}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures:
if signature in seen_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
@@ -1388,6 +1380,7 @@ def _runtime_candidate_actions(
],
)
)
seen_signatures.add(signature)
if "max-num-seqs" in tunable:
current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0)
@@ -1425,7 +1418,7 @@ def _runtime_candidate_actions(
for action_id, target in mns_targets:
patch = {**runtime_base_patch, "max-num-seqs": target}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures:
if signature in seen_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
@@ -1458,6 +1451,7 @@ def _runtime_candidate_actions(
],
)
)
seen_signatures.add(signature)
if (
top_bottleneck == "ttft_prefill"
@@ -1490,7 +1484,7 @@ def _runtime_candidate_actions(
"max-num-seqs": mns_target,
}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures:
if signature not in seen_signatures:
actions.append(
_runtime_action(
action_id="raise_mbt_and_max_num_seqs",
@@ -1509,6 +1503,7 @@ def _runtime_candidate_actions(
],
)
)
seen_signatures.add(signature)
else:
blocked_candidates.append(
_blocked_candidate(
@@ -1525,7 +1520,7 @@ def _runtime_candidate_actions(
if not current:
patch = {**runtime_base_patch, "enable-chunked-prefill": True}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures:
if signature not in seen_signatures:
actions.append(
_runtime_action(
action_id="enable_chunked_prefill",
@@ -1542,6 +1537,7 @@ def _runtime_candidate_actions(
],
)
)
seen_signatures.add(signature)
else:
blocked_candidates.append(
_blocked_candidate(
@@ -1566,7 +1562,7 @@ def _runtime_candidate_actions(
if target is not None:
patch = {**runtime_base_patch, "gpu-memory-utilization": target}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures:
if signature not in seen_signatures:
actions.append(
_runtime_action(
action_id="raise_gpu_memory_utilization",
@@ -1583,6 +1579,7 @@ def _runtime_candidate_actions(
],
)
)
seen_signatures.add(signature)
else:
blocked_candidates.append(
_blocked_candidate(
@@ -2024,7 +2021,7 @@ def _score_topology_candidate(
else:
relief = 0.08
elif top_bottleneck == "decode_tpot":
relief = 0.34 if tp_delta > 0 else 0.02
relief = 0.52 if tp_delta > 0 else 0.02
elif top_bottleneck == "admission_or_queueing":
relief = 0.34 if dp_delta > 0 else 0.08
else: