Fix decode harness partial probe handling

This commit is contained in:
2026-05-16 14:18:07 +08:00
parent f18765b235
commit 5a879a8592
3 changed files with 180 additions and 1 deletions

View File

@@ -1038,6 +1038,7 @@ def _topology_candidate_actions(
score, factors = _score_topology_candidate(
top_bottleneck,
bottleneck_hypotheses,
request_mode=study.trace.request_mode,
current_tp=current_tp,
current_dp=current_dp,
candidate_tp=point["tensor-parallel-size"],
@@ -1225,7 +1226,13 @@ def _legal_topology_points(
else:
dp_values = [current_dp]
if constraints is not None and constraints.allowed_expert_parallel_sizes:
if (
study.trace.request_mode == "decode_only"
and current_enable_ep
and current_ep > 1
):
ep_values = [current_ep]
elif constraints is not None and constraints.allowed_expert_parallel_sizes:
ep_values = sorted(set(constraints.allowed_expert_parallel_sizes))
elif "expert-parallel-size" in tunable:
ep_values = sorted({1, current_ep})
@@ -1349,6 +1356,7 @@ def _score_topology_candidate(
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
*,
request_mode: str,
current_tp: int,
current_dp: int,
candidate_tp: int,
@@ -1360,6 +1368,15 @@ def _score_topology_candidate(
relief = 0.0
if top_bottleneck == "ttft_prefill":
relief = 0.42 if tp_delta > 0 else 0.05
elif top_bottleneck == "decode_tpot" and request_mode == "decode_only":
if dp_delta > 0 and candidate_tp <= current_tp:
relief = 0.44
elif dp_delta > 0:
relief = 0.24
elif tp_delta > 0 and candidate_dp < current_dp:
relief = 0.03
else:
relief = 0.08
elif top_bottleneck == "decode_tpot":
relief = 0.34 if tp_delta > 0 else 0.02
elif top_bottleneck == "admission_or_queueing":
@@ -1485,6 +1502,12 @@ def _topology_frontier_status(
"reason": "active_bottleneck_does_not_require_tp_frontier",
"active_bottleneck": active_bottleneck,
}
if active_bottleneck == "decode_tpot" and study.trace.request_mode == "decode_only":
return {
**default,
"reason": "decode_tpot_uses_topology_redistribution_not_higher_tp_frontier",
"active_bottleneck": active_bottleneck,
}
flags = _effective_flags_for_item(study, best)
current_tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1)

View File

@@ -209,6 +209,17 @@ def _probe_outcome_details(
}
def _best_feasible_probe_record(probe_history: list[dict[str, Any]]) -> dict[str, Any] | None:
feasible = [
item
for item in probe_history
if item.get("feasible") and isinstance(item.get("request_rate"), (int, float))
]
if not feasible:
return None
return max(feasible, key=lambda item: float(item["request_rate"]))
def _replay_requests(
requests: list[TraceRequest],
*,
@@ -633,6 +644,26 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
StudyStore.write_json(Path(trial.result_path), result)
return result
except Exception as exc: # noqa: BLE001
partial_best = _best_feasible_probe_record(probe_history)
if partial_best is not None:
result = {
"study_id": trial.study_id,
"trial_id": trial.trial_id,
"status": "completed",
"config_patch": to_jsonable(trial.config_patch),
"best_source": "partial_probe_before_failure",
"best_sampling_u": partial_best.get("threshold"),
"best_request_rate": partial_best.get("request_rate"),
"best_pass_rate": partial_best.get("pass_rate"),
"best_request_count": partial_best.get("request_count"),
"completed_with_probe_failure": True,
"failure_stage": failure_stage,
"failure_reason": str(exc),
"failure_traceback": traceback.format_exc(),
"probes": probe_history,
}
StudyStore.write_json(Path(trial.result_path), result)
return result
result = {
"study_id": trial.study_id,
"trial_id": trial.trial_id,

View File

@@ -39,6 +39,7 @@ from aituner.spec import (
from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window
from aituner.worker import (
_best_feasible_probe_record,
_latency_summary,
_run_one_request,
_replay_requests,
@@ -1518,6 +1519,103 @@ class CoreFlowTests(unittest.TestCase):
"\n".join(context["proposal_rules"]),
)
def test_decode_topology_planner_prefers_dp_redistribution_and_preserves_ep(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
trace_overrides={"request_mode": "decode_only"},
slo_overrides={
"ttft_rule": None,
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 40},
},
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"enable-expert-parallel": True,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"expert-parallel-size": 8,
"max-num-seqs": 192,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"max-num-seqs",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
"require_tp_dp_product_equals_gpu_count": True,
"require_ep_size_leq_tp_dp_product": True,
"require_ep_size_divides_tp_dp_product": True,
"require_enable_expert_parallel_when_ep_gt_one": True,
},
},
)
result_path = tmp_path / "trial-0001-result.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_request_rate": 0.47,
"best_pass_rate": 0.98,
"probes": [
{
"threshold": 0.04,
"feasible": False,
"payload": {
"request_rate": 0.72,
"pass_rate": 0.3,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"tpot_ms>40.0": 80}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
context = build_harness_context(
study=study,
window_summary={},
state=StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_request_rate=0.47,
best_request_rate_per_gpu=0.05875,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=0.47,
best_request_rate_per_gpu=0.05875,
best_pass_rate=0.98,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
),
)
action = context["experiment_plan"]["next_action"]
self.assertEqual(action["knob_family"], "topology")
self.assertEqual(
action["config_patch"]["flag_patch"],
{"tensor-parallel-size": 2, "data-parallel-size": 4},
)
proposal = build_harness_guided_proposal(context)
self.assertIsNotNone(proposal)
self.assertEqual(
proposal.config_patch.flag_patch,
{"tensor-parallel-size": 2, "data-parallel-size": 4},
)
def test_prompt_can_disable_harness_for_ablation(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -1625,6 +1723,33 @@ class CoreFlowTests(unittest.TestCase):
self.assertIn("data-parallel-size", active)
self.assertIn("max-num-seqs", active)
def test_best_feasible_probe_record_keeps_partial_probe_evidence(self) -> None:
best = _best_feasible_probe_record(
[
{
"threshold": 0.03125,
"request_rate": 0.72,
"pass_rate": 0.3,
"feasible": False,
},
{
"threshold": 0.015625,
"request_rate": 0.3533,
"pass_rate": 0.99,
"feasible": True,
},
{
"threshold": 0.017578125,
"request_rate": 0.3833,
"pass_rate": 0.995,
"feasible": True,
},
]
)
self.assertIsNotNone(best)
self.assertEqual(best["threshold"], 0.017578125)
self.assertEqual(best["request_rate"], 0.3833)
def test_load_study_spec_rejects_mismatched_served_model_name(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)