Add SLO-driven topology frontier harness guard

This commit is contained in:
2026-05-12 21:00:49 +08:00
parent e1125475ae
commit 2d03b1cd4c
2 changed files with 340 additions and 0 deletions

View File

@@ -521,6 +521,13 @@ def _harness_stop_decision(
"reason": high_saturation["reason"],
"evidence": high_saturation,
}
topology_frontier = _topology_frontier_status(study, state, recent_diagnostics)
if topology_frontier["frontier_open"]:
return {
"should_stop": False,
"reason": "topology_frontier_requires_probe",
"evidence": topology_frontier,
}
guard = _convergence_guard(state, recent_diagnostics)
if guard["deterministic_stop"]:
return {
@@ -567,7 +574,17 @@ def _harness_proposal_decision(
_config_signature(item.get("config_patch") if isinstance(item, dict) else None)
for item in recent_diagnostics
}
tested_signatures.update(_state_tested_signatures(state))
baseline = recent_diagnostics[0] if recent_diagnostics else {}
topology_frontier = _topology_frontier_proposal(
study,
window_summary,
state,
recent_diagnostics,
tested_signatures=tested_signatures,
)
if topology_frontier["should_propose"]:
return topology_frontier
runtime_refinement = _runtime_refinement_proposal(
study,
window_summary,
@@ -632,6 +649,146 @@ def _harness_proposal_decision(
}
def _topology_frontier_proposal(
study: StudySpec,
window_summary: dict[str, Any],
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
*,
tested_signatures: set[str],
) -> dict[str, Any]:
default = {
"should_propose": False,
"reason": "topology_frontier_not_applicable",
"diagnosis": "No untested topology frontier is justified by the current SLO/workload profile.",
"config_patch": {"env_patch": {}, "flag_patch": {}},
"expected_effects": [],
}
frontier = _topology_frontier_status(study, state, recent_diagnostics, window_summary)
if not frontier["frontier_open"]:
return {**default, "reason": frontier["reason"]}
flag_patch = frontier.get("flag_patch")
if not isinstance(flag_patch, dict):
return {**default, "reason": "topology_frontier_patch_missing"}
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch})
if signature in tested_signatures:
return {**default, "reason": "topology_frontier_already_tested"}
return {
"should_propose": True,
"reason": "topology_frontier_probe_for_slo_pressure",
"diagnosis": (
"The current SLO/workload profile still shows latency pressure and a higher "
"legal TP frontier has not been measured. Validate that topology before "
"declaring the incumbent converged or spending more trials on local runtime knobs."
),
"config_patch": {"env_patch": {}, "flag_patch": flag_patch},
"expected_effects": [
"test whether extra model parallelism improves the configured TTFT/TPOT pass rate",
"cover the next legal topology frontier before local runtime refinement",
"avoid overfitting the harness to the first strong incumbent",
],
"frontier": frontier,
}
def _topology_frontier_status(
study: StudySpec,
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
window_summary: dict[str, Any] | None = None,
) -> dict[str, Any]:
default = {
"frontier_open": False,
"reason": "topology_frontier_not_needed",
"current_trial_id": state.best_trial_id,
"active_bottleneck": None,
"current_tp": None,
"current_dp": None,
"next_tp": None,
"flag_patch": None,
}
if "tensor-parallel-size" not in set(study.engine.tunable_flags):
return {**default, "reason": "tensor_parallel_size_not_tunable"}
if not state.best_trial_id:
return {**default, "reason": "no_incumbent_for_topology_frontier"}
best = next(
(item for item in recent_diagnostics if item.get("trial_id") == state.best_trial_id),
None,
)
if best is None:
return {**default, "reason": "incumbent_not_in_recent_harness_history"}
if best.get("status") != "completed":
return {**default, "reason": "incumbent_not_completed"}
if not isinstance(best.get("best_request_rate_per_gpu"), (int, float)):
return {**default, "reason": "incumbent_has_no_feasible_rate"}
active_bottleneck = str(best.get("active_bottleneck") or "")
if active_bottleneck in {"unknown", "none_obvious", ""} and recent_diagnostics:
active_bottleneck = str(recent_diagnostics[-1].get("active_bottleneck") or "")
if active_bottleneck not in {"ttft_prefill", "decode_tpot"}:
return {
**default,
"reason": "active_bottleneck_does_not_require_tp_frontier",
"active_bottleneck": active_bottleneck,
}
flags = _effective_flags_for_item(study, best)
current_tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1)
current_dp = _parse_int_like(flags.get("data-parallel-size"), default=1)
next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp)
if next_tp is None:
return {
**default,
"reason": "no_legal_higher_tp_frontier",
"active_bottleneck": active_bottleneck,
"current_tp": current_tp,
"current_dp": current_dp,
}
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1)
if current_dp != base_dp:
flag_patch["data-parallel-size"] = current_dp
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch})
if signature in _state_tested_signatures(state):
return {
**default,
"reason": "higher_tp_frontier_already_tested",
"active_bottleneck": active_bottleneck,
"current_tp": current_tp,
"current_dp": current_dp,
"next_tp": next_tp,
"flag_patch": flag_patch,
}
return {
"frontier_open": True,
"reason": "higher_tp_frontier_unmeasured_under_slo_pressure",
"current_trial_id": state.best_trial_id,
"active_bottleneck": active_bottleneck,
"current_tp": current_tp,
"current_dp": current_dp,
"next_tp": next_tp,
"flag_patch": flag_patch,
}
def _effective_flags_for_item(study: StudySpec, item: dict[str, Any]) -> dict[str, Any]:
flags = dict(study.engine.base_flags)
patch = item.get("config_patch")
if isinstance(patch, dict) and isinstance(patch.get("flag_patch"), dict):
flags.update(patch["flag_patch"])
return flags
def _state_tested_signatures(state: StudyState) -> set[str]:
return {
_config_signature(trial.config_patch)
for trial in state.trials
if isinstance(trial.config_patch, dict)
}
def _runtime_refinement_proposal(
study: StudySpec,
window_summary: dict[str, Any],

View File

@@ -826,6 +826,189 @@ class CoreFlowTests(unittest.TestCase):
},
)
def test_harness_validates_unmeasured_tp_frontier_before_runtime_refinement(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"tunable_flags": [
"tensor-parallel-size",
"max-num-batched-tokens",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_tp_dp_products": [1, 2, 4],
},
},
)
study = load_study_spec(study_path)
result_path = tmp_path / "trial-0002.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.96,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_count": 100,
"pass_rate": 0.96,
"request_rate": 2.0,
"latency_summary": {"failed_reason_counts": {}},
},
},
{
"threshold": 0.75,
"feasible": False,
"payload": {
"request_count": 100,
"pass_rate": 0.6,
"request_rate": 3.0,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"tpot_ms>25.0": 40}
},
},
},
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate=2.0,
best_request_rate_per_gpu=1.0,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=0.5,
best_request_rate_per_gpu=0.5,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
best_request_rate=2.0,
best_request_rate_per_gpu=1.0,
result_path=str(result_path),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 2},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.8},
state=state,
)
proposal = build_harness_guided_proposal(context)
self.assertIsNotNone(proposal)
self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 4})
self.assertEqual(
context["harness_proposal"]["reason"],
"topology_frontier_probe_for_slo_pressure",
)
def test_harness_stop_blocked_until_slo_driven_topology_frontier_is_measured(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"tunable_flags": ["tensor-parallel-size", "max-num-seqs"],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_tp_dp_products": [1, 2, 4],
},
},
)
study = load_study_spec(study_path)
result_path = tmp_path / "trial-0002.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.96,
"probes": [
{
"threshold": 0.75,
"feasible": False,
"payload": {
"request_count": 100,
"pass_rate": 0.6,
"request_rate": 3.0,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"tpot_ms>25.0": 40}
},
},
}
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate=2.0,
best_request_rate_per_gpu=1.0,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=0.5,
best_request_rate_per_gpu=0.5,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
best_request_rate=2.0,
best_request_rate_per_gpu=1.0,
result_path=str(result_path),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 2},
},
),
TrialSummary(
trial_id="trial-0003",
status="completed",
best_request_rate=1.98,
best_request_rate_per_gpu=0.99,
config_patch={"env_patch": {}, "flag_patch": {"max-num-seqs": 8}},
),
TrialSummary(
trial_id="trial-0004",
status="completed",
best_request_rate=1.98,
best_request_rate_per_gpu=0.99,
config_patch={"env_patch": {}, "flag_patch": {"max-num-seqs": 16}},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.8},
state=state,
)
self.assertFalse(context["harness_stop"]["should_stop"])
self.assertEqual(context["harness_stop"]["reason"], "topology_frontier_requires_probe")
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)