Guide harness runtime refinement after TP
This commit is contained in:
@@ -42,7 +42,12 @@ def build_harness_context(
|
|||||||
"recent_trial_diagnostics": recent_diagnostics,
|
"recent_trial_diagnostics": recent_diagnostics,
|
||||||
"convergence_guard": _convergence_guard(state, recent_diagnostics),
|
"convergence_guard": _convergence_guard(state, recent_diagnostics),
|
||||||
"harness_stop": _harness_stop_decision(study, state, recent_diagnostics),
|
"harness_stop": _harness_stop_decision(study, state, recent_diagnostics),
|
||||||
"harness_proposal": _harness_proposal_decision(study, state, recent_diagnostics),
|
"harness_proposal": _harness_proposal_decision(
|
||||||
|
study,
|
||||||
|
window_summary,
|
||||||
|
state,
|
||||||
|
recent_diagnostics,
|
||||||
|
),
|
||||||
"knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics),
|
"knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics),
|
||||||
"proposal_rules": _proposal_rules(),
|
"proposal_rules": _proposal_rules(),
|
||||||
}
|
}
|
||||||
@@ -547,6 +552,7 @@ def _harness_stop_decision(
|
|||||||
|
|
||||||
def _harness_proposal_decision(
|
def _harness_proposal_decision(
|
||||||
study: StudySpec,
|
study: StudySpec,
|
||||||
|
window_summary: dict[str, Any],
|
||||||
state: StudyState,
|
state: StudyState,
|
||||||
recent_diagnostics: list[dict[str, Any]],
|
recent_diagnostics: list[dict[str, Any]],
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
@@ -557,9 +563,22 @@ def _harness_proposal_decision(
|
|||||||
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
||||||
"expected_effects": [],
|
"expected_effects": [],
|
||||||
}
|
}
|
||||||
|
tested_signatures = {
|
||||||
|
_config_signature(item.get("config_patch") if isinstance(item, dict) else None)
|
||||||
|
for item in recent_diagnostics
|
||||||
|
}
|
||||||
|
baseline = recent_diagnostics[0] if recent_diagnostics else {}
|
||||||
|
runtime_refinement = _runtime_refinement_proposal(
|
||||||
|
study,
|
||||||
|
window_summary,
|
||||||
|
state,
|
||||||
|
recent_diagnostics,
|
||||||
|
tested_signatures=tested_signatures,
|
||||||
|
)
|
||||||
|
if runtime_refinement["should_propose"]:
|
||||||
|
return runtime_refinement
|
||||||
if len(state.trials) != 1 or len(recent_diagnostics) != 1:
|
if len(state.trials) != 1 or len(recent_diagnostics) != 1:
|
||||||
return default
|
return default
|
||||||
baseline = recent_diagnostics[0]
|
|
||||||
if baseline.get("status") != "completed":
|
if baseline.get("status") != "completed":
|
||||||
return default
|
return default
|
||||||
if not isinstance(baseline.get("best_request_rate_per_gpu"), (int, float)):
|
if not isinstance(baseline.get("best_request_rate_per_gpu"), (int, float)):
|
||||||
@@ -576,11 +595,6 @@ def _harness_proposal_decision(
|
|||||||
**default,
|
**default,
|
||||||
"reason": "tensor_parallel_size_not_tunable",
|
"reason": "tensor_parallel_size_not_tunable",
|
||||||
}
|
}
|
||||||
failed_signatures = {
|
|
||||||
_config_signature(item.get("config_patch") if isinstance(item, dict) else None)
|
|
||||||
for item in recent_diagnostics
|
|
||||||
if item.get("failure_stage") == "engine_launch"
|
|
||||||
}
|
|
||||||
base_flags = dict(study.engine.base_flags)
|
base_flags = dict(study.engine.base_flags)
|
||||||
baseline_patch = baseline.get("config_patch")
|
baseline_patch = baseline.get("config_patch")
|
||||||
if isinstance(baseline_patch, dict) and isinstance(baseline_patch.get("flag_patch"), dict):
|
if isinstance(baseline_patch, dict) and isinstance(baseline_patch.get("flag_patch"), dict):
|
||||||
@@ -595,10 +609,10 @@ def _harness_proposal_decision(
|
|||||||
}
|
}
|
||||||
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
|
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
|
||||||
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch})
|
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch})
|
||||||
if signature in failed_signatures:
|
if signature in tested_signatures:
|
||||||
return {
|
return {
|
||||||
**default,
|
**default,
|
||||||
"reason": "adjacent_tensor_parallel_probe_previously_failed",
|
"reason": "adjacent_tensor_parallel_probe_already_tested",
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
"should_propose": True,
|
"should_propose": True,
|
||||||
@@ -618,6 +632,100 @@ def _harness_proposal_decision(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _runtime_refinement_proposal(
|
||||||
|
study: StudySpec,
|
||||||
|
window_summary: dict[str, Any],
|
||||||
|
state: StudyState,
|
||||||
|
recent_diagnostics: list[dict[str, Any]],
|
||||||
|
*,
|
||||||
|
tested_signatures: set[str],
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
default = {
|
||||||
|
"should_propose": False,
|
||||||
|
"reason": "runtime_refinement_not_applicable",
|
||||||
|
"diagnosis": "Runtime refinement does not apply yet.",
|
||||||
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
||||||
|
"expected_effects": [],
|
||||||
|
}
|
||||||
|
if not state.best_trial_id or not recent_diagnostics:
|
||||||
|
return default
|
||||||
|
best = next(
|
||||||
|
(item for item in recent_diagnostics if item.get("trial_id") == state.best_trial_id),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
if not best or best.get("status") != "completed":
|
||||||
|
return default
|
||||||
|
if recent_diagnostics[-1].get("trial_id") != state.best_trial_id:
|
||||||
|
return default
|
||||||
|
best_patch = best.get("config_patch")
|
||||||
|
if not isinstance(best_patch, dict):
|
||||||
|
return default
|
||||||
|
best_flags = best_patch.get("flag_patch")
|
||||||
|
if not isinstance(best_flags, dict):
|
||||||
|
best_flags = {}
|
||||||
|
best_tp = _parse_int_like(best_flags.get("tensor-parallel-size"), default=1)
|
||||||
|
if best_tp <= 1:
|
||||||
|
return default
|
||||||
|
tunable = set(study.engine.tunable_flags)
|
||||||
|
flag_patch: dict[str, Any] = {"tensor-parallel-size": best_tp}
|
||||||
|
if "gpu-memory-utilization" in tunable:
|
||||||
|
flag_patch["gpu-memory-utilization"] = 0.95
|
||||||
|
if "enable-chunked-prefill" in tunable:
|
||||||
|
flag_patch["enable-chunked-prefill"] = True
|
||||||
|
if "max-num-batched-tokens" not in tunable:
|
||||||
|
return default
|
||||||
|
|
||||||
|
current_mbt = _parse_int_like(best_flags.get("max-num-batched-tokens"), default=0)
|
||||||
|
if current_mbt <= 0:
|
||||||
|
target_mbt = _initial_mbt_from_window(window_summary)
|
||||||
|
reason = "same_topology_runtime_seed_after_tp_incumbent"
|
||||||
|
else:
|
||||||
|
target_mbt = _next_mbt_step(current_mbt)
|
||||||
|
reason = "same_topology_mbt_growth_after_feasible_runtime_incumbent"
|
||||||
|
if target_mbt is None:
|
||||||
|
return {
|
||||||
|
**default,
|
||||||
|
"reason": "no_larger_mbt_step_available",
|
||||||
|
}
|
||||||
|
flag_patch["max-num-batched-tokens"] = target_mbt
|
||||||
|
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch})
|
||||||
|
if signature in tested_signatures:
|
||||||
|
return {
|
||||||
|
**default,
|
||||||
|
"reason": "same_topology_runtime_probe_already_tested",
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"should_propose": True,
|
||||||
|
"reason": reason,
|
||||||
|
"diagnosis": (
|
||||||
|
"A TP incumbent improved request_rate_per_gpu; refine batching on the "
|
||||||
|
"same topology before trying DP/EP or broad runtime changes."
|
||||||
|
),
|
||||||
|
"config_patch": {"env_patch": {}, "flag_patch": flag_patch},
|
||||||
|
"expected_effects": [
|
||||||
|
"preserve the incumbent topology",
|
||||||
|
"increase batching headroom while staying inside one runtime family",
|
||||||
|
],
|
||||||
|
"incumbent_trial_id": best.get("trial_id"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _initial_mbt_from_window(window_summary: dict[str, Any]) -> int:
|
||||||
|
prompt_p99 = _as_float(window_summary.get("prompt_tokens_p99"))
|
||||||
|
target = max(8192, int(prompt_p99 * 2.0))
|
||||||
|
return min(32768, _round_up_to_multiple(target, 1024))
|
||||||
|
|
||||||
|
|
||||||
|
def _next_mbt_step(current_mbt: int) -> int | None:
|
||||||
|
if current_mbt >= 32768:
|
||||||
|
return None
|
||||||
|
return min(32768, _round_up_to_multiple(int(current_mbt * 1.5), 1024))
|
||||||
|
|
||||||
|
|
||||||
|
def _round_up_to_multiple(value: int, multiple: int) -> int:
|
||||||
|
return ((max(value, 1) + multiple - 1) // multiple) * multiple
|
||||||
|
|
||||||
|
|
||||||
def _next_allowed_tp(study: StudySpec, *, current_tp: int, current_dp: int) -> int | None:
|
def _next_allowed_tp(study: StudySpec, *, current_tp: int, current_dp: int) -> int | None:
|
||||||
constraints = study.engine.topology_constraints
|
constraints = study.engine.topology_constraints
|
||||||
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
|
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
|
||||||
|
|||||||
@@ -669,6 +669,94 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 2})
|
self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 2})
|
||||||
self.assertFalse(proposal.should_stop)
|
self.assertFalse(proposal.should_stop)
|
||||||
|
|
||||||
|
def test_harness_guided_runtime_seed_preserves_tp_incumbent(self) -> None:
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
study_path = _write_study_assets(
|
||||||
|
tmp_path,
|
||||||
|
engine_overrides={
|
||||||
|
"tunable_flags": [
|
||||||
|
"tensor-parallel-size",
|
||||||
|
"gpu-memory-utilization",
|
||||||
|
"enable-chunked-prefill",
|
||||||
|
"max-num-batched-tokens",
|
||||||
|
],
|
||||||
|
"topology_constraints": {
|
||||||
|
"allowed_tensor_parallel_sizes": [1, 2, 4],
|
||||||
|
"allowed_tp_dp_products": [1, 2, 4],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
study = load_study_spec(study_path)
|
||||||
|
result_path = tmp_path / "trial-0002.json"
|
||||||
|
result_path.write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"status": "completed",
|
||||||
|
"best_sampling_u": 0.75,
|
||||||
|
"best_request_rate": 6.0,
|
||||||
|
"best_pass_rate": 1.0,
|
||||||
|
"probes": [
|
||||||
|
{
|
||||||
|
"threshold": 0.75,
|
||||||
|
"feasible": True,
|
||||||
|
"payload": {
|
||||||
|
"request_count": 100,
|
||||||
|
"pass_rate": 1.0,
|
||||||
|
"request_rate": 6.0,
|
||||||
|
"early_stopped": False,
|
||||||
|
"early_stop_reason": "",
|
||||||
|
"latency_summary": {"failed_reason_counts": {}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
state = StudyState(
|
||||||
|
study_id=study.study_id,
|
||||||
|
best_trial_id="trial-0002",
|
||||||
|
best_request_rate=6.0,
|
||||||
|
best_request_rate_per_gpu=3.0,
|
||||||
|
trials=[
|
||||||
|
TrialSummary(
|
||||||
|
trial_id="trial-0001",
|
||||||
|
status="completed",
|
||||||
|
best_request_rate=2.0,
|
||||||
|
best_request_rate_per_gpu=2.0,
|
||||||
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
||||||
|
),
|
||||||
|
TrialSummary(
|
||||||
|
trial_id="trial-0002",
|
||||||
|
status="completed",
|
||||||
|
best_request_rate=6.0,
|
||||||
|
best_request_rate_per_gpu=3.0,
|
||||||
|
result_path=str(result_path),
|
||||||
|
config_patch={
|
||||||
|
"env_patch": {},
|
||||||
|
"flag_patch": {"tensor-parallel-size": 2},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
context = build_harness_context(
|
||||||
|
study=study,
|
||||||
|
window_summary={"prompt_tokens_p99": 8100},
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
proposal = build_harness_guided_proposal(context)
|
||||||
|
self.assertIsNotNone(proposal)
|
||||||
|
self.assertEqual(
|
||||||
|
proposal.config_patch.flag_patch,
|
||||||
|
{
|
||||||
|
"tensor-parallel-size": 2,
|
||||||
|
"gpu-memory-utilization": 0.95,
|
||||||
|
"enable-chunked-prefill": True,
|
||||||
|
"max-num-batched-tokens": 16384,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
|
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
tmp_path = Path(tmp)
|
tmp_path = Path(tmp)
|
||||||
|
|||||||
Reference in New Issue
Block a user