Add normalized prefill scheduler harness

This commit is contained in:
2026-06-29 01:12:19 +08:00
parent 7ad439730e
commit 36c301c128
3 changed files with 650 additions and 9 deletions

View File

@@ -3220,9 +3220,277 @@ class CoreFlowTests(unittest.TestCase):
"experiment_plan_has_high_value_candidate",
)
action = context["experiment_plan"]["next_action"]
self.assertEqual(action["knob_family"], "max-num-seqs")
self.assertEqual(action["config_patch"]["flag_patch"]["max-num-seqs"], 96)
self.assertEqual(action["config_patch"]["flag_patch"]["tensor-parallel-size"], 8)
self.assertEqual(action["knob_family"], "prefill-scheduler-interaction")
self.assertEqual(action["action_id"], "raise_prefill_quantum_with_chunked_prefill")
flag_patch = action["config_patch"]["flag_patch"]
self.assertEqual(flag_patch["tensor-parallel-size"], 8)
self.assertGreater(flag_patch["max-num-batched-tokens"], 8192)
def test_prefill_scheduler_lowers_quantum_by_normalized_ratio(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 32768,
"max-num-seqs": 8,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.95,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.95,
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 24}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0},
state=state,
)
action = context["experiment_plan"]["next_action"]
flag_patch = action["config_patch"]["flag_patch"]
self.assertEqual(action["knob_family"], "prefill-scheduler-interaction")
self.assertEqual(action["action_id"], "lower_prefill_quantum_with_chunked_prefill")
self.assertLess(flag_patch["max-num-batched-tokens"], 32768)
factors = action["score_factors"]
self.assertLess(
factors["prefill_quantum_ratio_target"],
factors["prefill_quantum_ratio_current"],
)
def test_prefill_scheduler_quantum_step_scales_with_prompt_length(self) -> None:
targets: list[int] = []
for prompt_p95 in (8192, 16384):
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 32768,
"max-num-seqs": 8,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.95,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.95,
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 24}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary={
"prompt_tokens_p95": prompt_p95,
"prompt_tail_ratio_p95_p50": 4.0,
},
state=state,
)
action = context["experiment_plan"]["next_action"]
self.assertEqual(action["knob_family"], "prefill-scheduler-interaction")
targets.append(action["config_patch"]["flag_patch"]["max-num-batched-tokens"])
self.assertGreater(targets[1], targets[0])
def test_prefill_scheduler_not_active_for_short_prompt_workload(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 32768,
"max-num-seqs": 8,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.95,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.95,
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 24}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 2048, "prompt_tail_ratio_p95_p50": 1.0},
state=state,
)
families = {
item["knob_family"] for item in context["experiment_plan"]["candidate_actions"]
}
self.assertNotIn("prefill-scheduler-interaction", families)
def test_prefill_sequence_probe_followed_by_joint_runtime_probe(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
@@ -3350,10 +3618,11 @@ class CoreFlowTests(unittest.TestCase):
)
action = context["experiment_plan"]["next_action"]
flag_patch = action["config_patch"]["flag_patch"]
self.assertEqual(action["knob_family"], "prefill-runtime-interaction")
self.assertEqual(action["knob_family"], "prefill-scheduler-interaction")
self.assertEqual(action["action_id"], "raise_prefill_quantum_with_chunked_prefill")
self.assertEqual(flag_patch["tensor-parallel-size"], 8)
self.assertEqual(flag_patch["max-num-batched-tokens"], 16384)
self.assertEqual(flag_patch["max-num-seqs"], 96)
self.assertGreater(flag_patch["max-num-batched-tokens"], 8192)
self.assertLess(flag_patch["max-num-batched-tokens"], 24000)
def test_slo_unrecoverable_does_not_mask_latency_bottleneck(self) -> None:
with tempfile.TemporaryDirectory() as tmp: