diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 663acec..5392a9d 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -1319,6 +1319,57 @@ def _runtime_candidate_actions( ) ) + if ( + top_bottleneck == "ttft_prefill" + and topology_settled + and "max-num-batched-tokens" in tunable + and "max-num-seqs" in tunable + and max_num_seqs_tested + ): + current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0) + current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0) + if current_mbt > 0: + window_target = _initial_mbt_from_window(window_summary) + step_target = _next_mbt_step(current_mbt) or current_mbt + mbt_target = min( + 32768, + max( + step_target, + min(window_target, _round_up_to_multiple(current_mbt * 2, 1024)), + ), + ) + else: + mbt_target = _initial_mbt_from_window(window_summary) + mns_target = _round_up_to_multiple( + max(16, int(current_mns * 1.5)) if current_mns > 0 else 64, 8 + ) + if mbt_target > 0 and (mbt_target != current_mbt or mns_target != current_mns): + patch = { + **runtime_base_patch, + "max-num-batched-tokens": mbt_target, + "max-num-seqs": mns_target, + } + signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + if signature not in tested_signatures: + actions.append( + _runtime_action( + action_id="raise_mbt_and_max_num_seqs", + knob_family="prefill-runtime-interaction", + score=0.38 + + _information_gain(bottleneck_hypotheses, "runtime"), + patch=patch, + hypothesis=( + "Jointly raise max-num-batched-tokens and max-num-seqs to test " + "whether prefill batching headroom and admission concurrency only " + "help when adjusted together." + ), + expected_effects=[ + "preserve the incumbent topology while changing coupled prefill runtime limits", + "confirm whether separate MBT or sequence-cap probes masked an interaction", + ], + ) + ) + if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill": current = bool(anchor_flags.get("enable-chunked-prefill", False)) if not current: diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 823b443..1c22a3d 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -2203,6 +2203,137 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(action["config_patch"]["flag_patch"]["max-num-seqs"], 96) self.assertEqual(action["config_patch"]["flag_patch"]["tensor-parallel-size"], 8) + def test_prefill_sequence_probe_followed_by_joint_runtime_probe(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 4, + "data-parallel-size": 1, + "max-num-batched-tokens": 8192, + "max-num-seqs": 64, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [4, 8], + "allowed_data_parallel_sizes": [1, 2], + "allowed_tp_dp_products": [4, 8], + }, + }, + ) + + def write_result(name: str) -> Path: + path = tmp_path / f"{name}.json" + payload = { + "status": "completed", + "best_sampling_u": 0.091796875, + "best_request_rate": 2.303, + "best_pass_rate": 0.951, + "probes": [ + { + "threshold": 0.09375, + "feasible": True, + "payload": { + "request_rate": 2.303, + "pass_rate": 0.951, + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 32} + }, + }, + } + ], + } + path.write_text(json.dumps(payload), encoding="utf-8") + return path + + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_sampling_u=0.091796875, + best_request_rate=2.303, + best_request_rate_per_gpu=0.288, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.303, + best_request_rate_per_gpu=0.288, + best_pass_rate=0.952, + result_path=str(write_result("trial-0001")), + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 8, + "data-parallel-size": 1, + }, + }, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + parallel_size=8, + best_request_rate=2.303, + best_request_rate_per_gpu=0.288, + best_pass_rate=0.950, + result_path=str(write_result("trial-0002")), + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 8, + "max-num-seqs": 96, + }, + }, + ), + TrialSummary( + trial_id="trial-0003", + status="completed", + parallel_size=8, + best_request_rate=2.303, + best_request_rate_per_gpu=0.288, + best_pass_rate=0.950, + result_path=str(write_result("trial-0003")), + config_patch={ + "env_patch": {}, + "flag_patch": { + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 12288, + }, + }, + ), + ], + ) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 24000, "prompt_tokens_p99": 32000}, + state=state, + ) + self.assertFalse(context["harness_stop"]["should_stop"]) + self.assertEqual( + context["harness_stop"]["reason"], + "experiment_plan_has_high_value_candidate", + ) + action = context["experiment_plan"]["next_action"] + flag_patch = action["config_patch"]["flag_patch"] + self.assertEqual(action["knob_family"], "prefill-runtime-interaction") + self.assertEqual(flag_patch["tensor-parallel-size"], 8) + self.assertEqual(flag_patch["max-num-batched-tokens"], 16384) + self.assertEqual(flag_patch["max-num-seqs"], 96) + def test_slo_unrecoverable_does_not_mask_latency_bottleneck(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)