Continue gmu hill-climb after topology validation

This commit is contained in:
2026-06-24 19:09:35 +08:00
parent 8fa758797e
commit b075afe6f2
2 changed files with 187 additions and 24 deletions

View File

@@ -1396,36 +1396,75 @@ def _runtime_candidate_actions(
if (
"gpu-memory-utilization" in tunable
and topology_settled
and top_bottleneck in {"decode_tpot", "admission_or_queueing"}
and top_bottleneck in {"decode_tpot", "admission_or_queueing", "ttft_prefill"}
):
current_gmu = _parse_float_like(
anchor_flags.get("gpu-memory-utilization"), default=0.9
target = _next_gpu_memory_utilization_target(
study,
anchor_flags,
recent_diagnostics,
)
if 0.0 < current_gmu < _GMU_SAFE_CEILING:
target = round(min(_GMU_SAFE_CEILING, current_gmu + _GMU_STEP), 4)
if target > current_gmu:
patch = {**runtime_base_patch, "gpu-memory-utilization": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures:
actions.append(
_runtime_action(
action_id="raise_gpu_memory_utilization",
knob_family="gpu-memory-utilization",
score=0.4 + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"Raise gpu-memory-utilization to add KV-cache headroom so the "
"decode-bound incumbent can sustain more concurrent decode."
),
expected_effects=[
"add KV-cache blocks for higher decode concurrency on the incumbent topology",
"reject if the higher memory target regresses request_rate_per_gpu or fails to launch",
],
)
if target is not None:
patch = {**runtime_base_patch, "gpu-memory-utilization": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures:
actions.append(
_runtime_action(
action_id="raise_gpu_memory_utilization",
knob_family="gpu-memory-utilization",
score=0.5 + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"Raise gpu-memory-utilization on the settled incumbent topology "
"to test whether extra KV-cache headroom moves the SLO frontier."
),
expected_effects=[
"add KV-cache blocks for higher concurrency on the incumbent topology",
"reject if the higher memory target regresses request_rate_per_gpu or fails to launch",
],
)
)
return actions
def _next_gpu_memory_utilization_target(
study: StudySpec,
anchor_flags: dict[str, Any],
recent_diagnostics: list[dict[str, Any]],
) -> float | None:
current_gmu = _parse_float_like(
anchor_flags.get("gpu-memory-utilization"), default=0.9
)
if current_gmu <= 0 or current_gmu >= _GMU_SAFE_CEILING:
return None
anchor_topology = _normalized_topology_flags(anchor_flags)
successful_gmus: list[float] = [current_gmu]
failed_gmus: list[float] = []
for item in recent_diagnostics:
patch = item.get("config_patch")
if not isinstance(patch, dict):
continue
flag_patch = patch.get("flag_patch")
if not isinstance(flag_patch, dict) or "gpu-memory-utilization" not in flag_patch:
continue
flags = _effective_flags_for_item(study, item)
if _normalized_topology_flags(flags) != anchor_topology:
continue
gmu = _parse_float_like(flag_patch.get("gpu-memory-utilization"), default=0.0)
if gmu <= 0:
continue
if item.get("status") == "completed":
successful_gmus.append(gmu)
elif item.get("status") == "failed":
failed_gmus.append(gmu)
climb_from = max(successful_gmus)
target = round(min(_GMU_SAFE_CEILING, climb_from + _GMU_STEP), 4)
if target <= climb_from:
return None
if any(failed <= target + EPSILON for failed in failed_gmus):
return None
return target
def _runtime_action(
*,
action_id: str,

View File

@@ -1800,6 +1800,130 @@ class CoreFlowTests(unittest.TestCase):
)
self.assertNotIn("gpu-memory-utilization", proposal.config_patch.flag_patch)
def test_harness_continues_gpu_mem_util_after_tied_same_topology_probe(self) -> None:
"""After adjacent topology validation, gpu-memory-utilization should hill-climb
on the incumbent topology even if an earlier gmu step tied the incumbent and
did not become state.best_trial_id."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
slo_overrides={
"ttft_rule": {"kind": "fixed_ms", "threshold_ms": 4000},
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50},
},
engine_overrides={
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"gpu-memory-utilization",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2],
"allowed_tp_dp_products": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
result_path = tmp_path / "trial-0002.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.75,
"best_request_rate": 6.5,
"best_pass_rate": 1.0,
"probes": [
{
"threshold": 0.75,
"feasible": True,
"payload": {
"request_count": 300,
"pass_rate": 1.0,
"request_rate": 6.5,
"latency_summary": {"failed_reason_counts": {}},
},
},
{
"threshold": 0.765625,
"feasible": False,
"payload": {
"request_count": 300,
"pass_rate": 0.6,
"request_rate": 6.7,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 80}
},
},
},
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate=6.5,
best_request_rate_per_gpu=3.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=2.2,
best_request_rate_per_gpu=2.2,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
best_request_rate=6.5,
best_request_rate_per_gpu=3.25,
result_path=str(result_path),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 2},
},
),
TrialSummary(
trial_id="trial-0003",
status="completed",
best_request_rate=8.4,
best_request_rate_per_gpu=2.1,
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 4},
},
),
TrialSummary(
trial_id="trial-0004",
status="completed",
best_request_rate=6.5,
best_request_rate_per_gpu=3.25,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"gpu-memory-utilization": 0.92,
},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 1500},
state=state,
)
proposal = build_harness_guided_proposal(context)
self.assertIsNotNone(proposal)
self.assertEqual(
proposal.config_patch.flag_patch,
{"tensor-parallel-size": 2, "gpu-memory-utilization": 0.94},
)
def test_harness_validates_unmeasured_tp_frontier_before_runtime_refinement(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)