Tighten topology and auto-high validation
This commit is contained in:
@@ -1755,31 +1755,23 @@ def _parallel_size_can_vary(study: StudySpec) -> bool:
|
|||||||
effective_gpu_count = _effective_gpu_count(study)
|
effective_gpu_count = _effective_gpu_count(study)
|
||||||
if effective_gpu_count <= 1:
|
if effective_gpu_count <= 1:
|
||||||
return False
|
return False
|
||||||
constraints = study.engine.topology_constraints
|
base = _normalized_topology_flags(study.engine.base_flags)
|
||||||
if constraints is not None and constraints.allowed_tp_dp_products:
|
legal = _legal_topology_points(
|
||||||
legal_products = {
|
study,
|
||||||
item for item in constraints.allowed_tp_dp_products if item <= effective_gpu_count
|
current_tp=int(base["tensor-parallel-size"]),
|
||||||
}
|
current_dp=int(base["data-parallel-size"]),
|
||||||
return len(legal_products) > 1
|
current_ep=int(base["expert-parallel-size"]),
|
||||||
if constraints is not None:
|
current_enable_ep=bool(base["enable-expert-parallel"]),
|
||||||
tp_values = (
|
)
|
||||||
constraints.allowed_tensor_parallel_sizes
|
signatures: set[str] = set()
|
||||||
if constraints.allowed_tensor_parallel_sizes
|
for point in legal:
|
||||||
else [1, 2, 4, 8]
|
patch = _topology_patch(study, point)
|
||||||
)
|
flags = {**study.engine.base_flags, **patch}
|
||||||
dp_values = (
|
normalized = _normalized_topology_flags(flags)
|
||||||
constraints.allowed_data_parallel_sizes
|
if any(normalized.get(key) != point.get(key) for key in point):
|
||||||
if constraints.allowed_data_parallel_sizes
|
continue
|
||||||
else [1]
|
signatures.add(_config_signature({"env_patch": {}, "flag_patch": patch}))
|
||||||
)
|
return len(signatures) > 1
|
||||||
products = {
|
|
||||||
int(tp) * int(dp)
|
|
||||||
for tp in tp_values
|
|
||||||
for dp in dp_values
|
|
||||||
if int(tp) > 0 and int(dp) > 0 and int(tp) * int(dp) <= effective_gpu_count
|
|
||||||
}
|
|
||||||
return len(products) > 1
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def _score_topology_candidate(
|
def _score_topology_candidate(
|
||||||
|
|||||||
@@ -381,6 +381,9 @@ def resolve_auto_high_search(
|
|||||||
return search, evidence
|
return search, evidence
|
||||||
ceiling = min(float(policy.max_sampling_u), 1.0, float(trace_max_sampling_u))
|
ceiling = min(float(policy.max_sampling_u), 1.0, float(trace_max_sampling_u))
|
||||||
evidence["effective_ceiling"] = ceiling
|
evidence["effective_ceiling"] = ceiling
|
||||||
|
if ceiling < float(search.low):
|
||||||
|
evidence["reason"] = "auto_high_ceiling_below_search_low"
|
||||||
|
return search, evidence
|
||||||
if abs(float(search.high) - ceiling) <= 1e-12:
|
if abs(float(search.high) - ceiling) <= 1e-12:
|
||||||
evidence["reason"] = "search_high_already_at_auto_high_ceiling"
|
evidence["reason"] = "search_high_already_at_auto_high_ceiling"
|
||||||
return search, evidence
|
return search, evidence
|
||||||
|
|||||||
@@ -309,6 +309,31 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
self.assertEqual(capped_by_trace.high, 0.7)
|
self.assertEqual(capped_by_trace.high, 0.7)
|
||||||
self.assertEqual(trace_evidence["effective_ceiling"], 0.7)
|
self.assertEqual(trace_evidence["effective_ceiling"], 0.7)
|
||||||
|
|
||||||
|
low_above_ceiling = study.search.__class__.from_dict(
|
||||||
|
{
|
||||||
|
"low": 0.9,
|
||||||
|
"high": 0.95,
|
||||||
|
"tolerance": study.search.tolerance,
|
||||||
|
"max_probes": study.search.max_probes,
|
||||||
|
"sample_seed": study.search.sample_seed,
|
||||||
|
"auto_high": {
|
||||||
|
"enabled": True,
|
||||||
|
"max_sampling_u": 0.8,
|
||||||
|
"require_human_confirmation_beyond_trace": True,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
unchanged, invalid_evidence = resolve_auto_high_search(
|
||||||
|
search=low_above_ceiling,
|
||||||
|
sampling_us=[0.1, 0.9],
|
||||||
|
)
|
||||||
|
self.assertEqual(unchanged.low, 0.9)
|
||||||
|
self.assertEqual(unchanged.high, 0.95)
|
||||||
|
self.assertEqual(
|
||||||
|
invalid_evidence["reason"],
|
||||||
|
"auto_high_ceiling_below_search_low",
|
||||||
|
)
|
||||||
|
|
||||||
high_search = study.search.__class__.from_dict(
|
high_search = study.search.__class__.from_dict(
|
||||||
{
|
{
|
||||||
"low": 0.0,
|
"low": 0.0,
|
||||||
@@ -1531,6 +1556,89 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
"search_high_saturation_requires_parallel_size_evidence",
|
"search_high_saturation_requires_parallel_size_evidence",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_harness_stop_blocks_high_saturation_for_fixed_product_tp_dp_redistribution(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
study_path = _write_study_assets(
|
||||||
|
tmp_path,
|
||||||
|
engine_overrides={
|
||||||
|
"base_flags": {
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 8000,
|
||||||
|
"tensor-parallel-size": 8,
|
||||||
|
"data-parallel-size": 1,
|
||||||
|
},
|
||||||
|
"tunable_flags": ["tensor-parallel-size", "data-parallel-size"],
|
||||||
|
"topology_constraints": {
|
||||||
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
||||||
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
||||||
|
"allowed_tp_dp_products": [8],
|
||||||
|
"require_tp_dp_product_equals_gpu_count": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
study = load_study_spec(study_path)
|
||||||
|
result_path = tmp_path / "trial-0001.json"
|
||||||
|
result_path.write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"status": "completed",
|
||||||
|
"best_sampling_u": 0.99609375,
|
||||||
|
"best_request_rate": 8.0,
|
||||||
|
"best_pass_rate": 1.0,
|
||||||
|
"probes": [
|
||||||
|
{
|
||||||
|
"threshold": 0.99609375,
|
||||||
|
"feasible": True,
|
||||||
|
"payload": {
|
||||||
|
"request_count": 10,
|
||||||
|
"pass_rate": 1.0,
|
||||||
|
"request_rate": 8.0,
|
||||||
|
"early_stopped": False,
|
||||||
|
"early_stop_reason": "",
|
||||||
|
"latency_summary": {"failed_reason_counts": {}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
state = StudyState(
|
||||||
|
study_id=study.study_id,
|
||||||
|
best_trial_id="trial-0001",
|
||||||
|
best_request_rate=8.0,
|
||||||
|
best_request_rate_per_gpu=1.0,
|
||||||
|
trials=[
|
||||||
|
TrialSummary(
|
||||||
|
trial_id="trial-0001",
|
||||||
|
status="completed",
|
||||||
|
best_request_rate=8.0,
|
||||||
|
best_request_rate_per_gpu=1.0,
|
||||||
|
result_path=str(result_path),
|
||||||
|
config_patch={
|
||||||
|
"env_patch": {},
|
||||||
|
"flag_patch": {
|
||||||
|
"tensor-parallel-size": 8,
|
||||||
|
"data-parallel-size": 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
context = build_harness_context(
|
||||||
|
study=study,
|
||||||
|
window_summary={"prompt_tokens_p95": 2048},
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
self.assertFalse(context["harness_stop"]["should_stop"])
|
||||||
|
self.assertEqual(
|
||||||
|
context["harness_stop"]["reason"],
|
||||||
|
"search_high_saturation_requires_parallel_size_evidence",
|
||||||
|
)
|
||||||
|
|
||||||
def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None:
|
def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
tmp_path = Path(tmp)
|
tmp_path = Path(tmp)
|
||||||
|
|||||||
Reference in New Issue
Block a user