Prevent prefill convergence stop before seq probe

This commit is contained in:
2026-06-22 14:43:55 +08:00
parent 4607711bb5
commit fd94ab9f3b
2 changed files with 173 additions and 5 deletions

View File

@@ -1103,6 +1103,7 @@ def _candidate_actions(
anchor,
top_bottleneck,
bottleneck_hypotheses,
recent_diagnostics,
tested_signatures,
)
)
@@ -1196,6 +1197,7 @@ def _runtime_candidate_actions(
anchor: dict[str, Any],
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
recent_diagnostics: list[dict[str, Any]],
tested_signatures: set[str],
) -> list[dict[str, Any]]:
tunable = set(study.engine.tunable_flags)
@@ -1258,6 +1260,14 @@ def _runtime_candidate_actions(
if "max-num-seqs" in tunable:
current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0)
max_num_seqs_tested = any(
"max-num-seqs" in (
((item.get("config_patch") or {}).get("flag_patch") or {})
if isinstance(item.get("config_patch"), dict)
else {}
)
for item in recent_diagnostics
)
mns_targets: list[tuple[str, int]] = []
if top_bottleneck == "admission_or_queueing":
target = max(8, int(current_mns * 1.5)) if current_mns > 0 else 64
@@ -1273,12 +1283,25 @@ def _runtime_candidate_actions(
max(16, int(current_mns * 1.5)) if current_mns > 0 else 48, 8
)
mns_targets.append(("raise_max_num_seqs", raise_target))
elif top_bottleneck == "ttft_prefill" and topology_settled and not max_num_seqs_tested:
# Prefill-heavy TTFT can still be admission/concurrency limited after TP and
# max-num-batched-tokens probes settle. Try a modest same-topology seq cap
# increase before letting convergence guards declare the incumbent final.
target = _round_up_to_multiple(
max(16, int(current_mns * 1.5)) if current_mns > 0 else 64, 8
)
mns_targets.append(("raise_max_num_seqs", target))
for action_id, target in mns_targets:
patch = {**runtime_base_patch, "max-num-seqs": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures:
continue
relief = 0.25 if top_bottleneck in {"decode_tpot", "admission_or_queueing"} else 0.08
if top_bottleneck in {"decode_tpot", "admission_or_queueing"}:
relief = 0.25
elif top_bottleneck == "ttft_prefill":
relief = 0.3
else:
relief = 0.08
actions.append(
_runtime_action(
action_id=action_id,
@@ -1286,12 +1309,12 @@ def _runtime_candidate_actions(
score=relief + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"Adjust max-num-seqs to test whether concurrency pressure is the "
"limiting factor under the configured SLO."
"Adjust max-num-seqs to test whether concurrency/admission pressure "
"is the limiting factor under the configured SLO."
),
expected_effects=[
"change decode/admission concurrency on the incumbent topology",
"confirm if TPOT or queueing pressure is caused by sequence concurrency",
"change prefill/decode admission concurrency on the incumbent topology",
"confirm if latency or queueing pressure is caused by sequence concurrency",
],
)
)

View File

@@ -2058,6 +2058,151 @@ class CoreFlowTests(unittest.TestCase):
{"max-num-seqs": 32},
)
def test_prefill_convergence_stop_waits_for_sequence_concurrency_probe(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 4,
"data-parallel-size": 1,
"max-num-batched-tokens": 8192,
"max-num-seqs": 64,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [4, 8],
"allowed_data_parallel_sizes": [1, 2],
"allowed_tp_dp_products": [4, 8],
},
},
)
def write_result(name: str, best_rate: float | None, pass_rate: float) -> Path:
path = tmp_path / f"{name}.json"
payload = {
"status": "completed",
"best_sampling_u": 0.091796875 if best_rate is not None else None,
"best_request_rate": best_rate,
"best_pass_rate": pass_rate if best_rate is not None else None,
"probes": [
{
"threshold": 0.09375,
"feasible": best_rate is not None,
"payload": {
"request_rate": best_rate,
"pass_rate": pass_rate,
"early_stop_reason": (
"" if best_rate is not None else "slo_pass_rate_unrecoverable"
),
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 32}
},
},
}
],
}
path.write_text(json.dumps(payload), encoding="utf-8")
return path
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_sampling_u=0.091796875,
best_request_rate=2.303,
best_request_rate_per_gpu=0.288,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.303,
best_request_rate_per_gpu=0.288,
best_pass_rate=0.952,
result_path=str(write_result("trial-0001", 2.303, 0.952)),
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 8,
"data-parallel-size": 1,
},
},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=8,
best_request_rate=2.303,
best_request_rate_per_gpu=0.288,
best_pass_rate=0.953,
result_path=str(write_result("trial-0002", 2.303, 0.953)),
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 8,
"max-num-batched-tokens": 32768,
},
},
),
TrialSummary(
trial_id="trial-0003",
status="completed",
parallel_size=8,
result_path=str(write_result("trial-0003", None, 0.0)),
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"data-parallel-size": 2,
},
},
),
TrialSummary(
trial_id="trial-0004",
status="completed",
parallel_size=8,
best_request_rate=2.303,
best_request_rate_per_gpu=0.288,
best_pass_rate=0.954,
result_path=str(write_result("trial-0004", 2.303, 0.954)),
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 12288,
},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 24000, "prompt_tokens_p99": 32000},
state=state,
)
self.assertFalse(context["harness_stop"]["should_stop"])
self.assertEqual(
context["harness_stop"]["reason"],
"experiment_plan_has_high_value_candidate",
)
action = context["experiment_plan"]["next_action"]
self.assertEqual(action["knob_family"], "max-num-seqs")
self.assertEqual(action["config_patch"]["flag_patch"]["max-num-seqs"], 96)
self.assertEqual(action["config_patch"]["flag_patch"]["tensor-parallel-size"], 8)
def test_slo_unrecoverable_does_not_mask_latency_bottleneck(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)