diff --git a/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md b/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md index 734599e..0b8cd81 100644 --- a/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md +++ b/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md @@ -45,6 +45,39 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95) 这对应在 log space 走半步。它比固定乘以 0.5/1.5 更接近 scale-invariant:prompt scale 变大时,下一步 MBT 也会变大。 +## Agent Loop + +当前 harness 的 loop 可以形式化为: + +```text +trial result + -> observation extractor + -> bottleneck classifier + -> candidate family selector + -> normalized candidate generator + -> scoring / coverage ranking + -> validator / no-repeat / stop guard + -> next trial +``` + +每一层承担不同责任: + +1. observation extractor 只把 trial result 转成可比较的事实,例如 + request_rate_per_gpu、pass_rate、失败原因、TTFT/TPOT 分位数。 +2. bottleneck classifier 把事实归入 `ttft_prefill`、`decode_tpot`、 + `admission_or_queueing` 等机制瓶颈,不直接输出配置值。 +3. candidate family selector 决定要验证哪个系统假设,例如 topology frontier、 + prefill scheduler、admission pressure 或 GPU memory headroom。 +4. normalized candidate generator 才把机制变量映射成具体 engine flag。 +5. scoring / coverage ranking 负责排序:未覆盖但机制上相关的维度应优先于 + 已知方向上的微调。 +6. validator 使用 normalized full-config signature 防止重复测试,并用 stop guard + 避免在仍有高价值 falsification candidate 时过早停止。 + +因此,harness 的核心不是“把 LLM prompt 写好”,而是把黑盒搜索拆成带因果方向的 +white-box falsification loop。LLM 可以参与生成候选或解释候选,但候选必须通过 +harness 的 family、signature、scoring 和 validator 约束。 + ## 实现映射 代码入口: @@ -61,13 +94,19 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95) - 使用 `current_mbt / prompt_scale` 判断方向。 - 通过几何中点做相对 step。 - `_next_admission_pressure_step` - - 使用 `trace.max_concurrency` 作为 admission scale,不使用固定 `max-num-seqs` 表。 + - 使用 `max-num-seqs / trace.max_concurrency` 作为 normalized admission pressure。 + - 当 admission/queueing 受限且 admission pressure 过低时 raise。 + - 当 TTFT/prefill 受限且 admission pressure 明显高于 trace concurrency scale 时 lower。 - `_prefill_scheduler_candidate_actions` - 输出 `prefill-scheduler-interaction` family。 - `score_factors` 显式记录 current/target `prefill_quantum_ratio`,方便后续实验解释。 + - `score_factors` 同时记录 current/target admission pressure ratio,避免只解释 MBT。 - 当 scheduler dimension 还没有被 materialized config 覆盖时,加入 `uncovered_scheduler_dimension_bonus`,让该 family 在 topology settled 后优先于 `gpu-memory-utilization` 这类 resource micro-tuning。 + - 当该 family 已生成有效候选时,旧的 standalone `raise_mbt`、 + `enable_chunked_prefill`、`raise_mbt_and_max_num_seqs` 只作为 fallback,不作为同级 + prefill runtime 候选抢排序。 ## 为什么不是 rule-based hack @@ -84,8 +123,18 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95) - proposal 是相对当前 incumbent 的 direction,不是固定答案; - concrete value 随 prompt scale 和 current config 改变; - validator/no-repeat 仍使用 normalized effective full-config signature; +- runtime gate 和正式 topology frontier 共用 higher-TP frontier patch 构造,避免 + DP 非 base 时 scheduler 抢跑; - short prompt、decode-only、high prefix reuse 不激活该 family。 +但这不是完备性证明。当前能 claim 的是更严格的工程性质: + +- 不引用特定 case identity; +- 不把已知 winner 写成候选表; +- 每个 concrete proposal 都能追溯到一个 normalized mechanism variable; +- 每次 trial 都能被解释成对一个系统假设的 falsification; +- 失败时会留下可审计的 candidate sequence 和 score factors。 + ## Review 结论 ### 之前实现的问题 @@ -104,6 +153,39 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95) - `adjust_admission_pressure_with_chunked_prefill` 3. 测试改为验证 normalized direction 和 scale sensitivity,而不是固定 absolute value。 +### 当前实现仍需警惕的风险 + +1. `_PREFILL_QUANTUM_HEAD_OF_LINE_RATIO=1.0` 和 + `_PREFILL_QUANTUM_FRAGMENTATION_RATIO=0.5` 仍是机制阈值,不是定理。 + 它们必须通过 scaled prompt / negative workload 实验验证,而不能只靠 case3。 +2. `uncovered_scheduler_dimension_bonus` 是 coverage 排序策略。它的合理性来自 + “先覆盖未 materialized 的机制维度,再做 GMU 微调”,但必须通过 candidate + sequence 证明它不会在 topology frontier 未覆盖时抢跑。 +3. `block-size=16` 目前没有被纳入这个 family。不能把它作为 case3 固定答案加入; + 如果后续要处理,需要单独设计 allocator/layout family,从 engine capability 和 + memory block waste observation 推导,而不是在 prefill scheduler family 里硬塞。 +4. 现有实现仍保留旧的 standalone `enable-chunked-prefill` 和 `raise_mbt` 路径作为 + fallback。它们不能在 `prefill-scheduler-interaction` 已生成有效候选时作为同级 + prefill runtime 候选抢排序。 + +### 2026-06-29 独立 review 后的修正 + +独立 review 指出了三个需要立即收紧的泛化风险: + +1. 旧 standalone MBT/chunked 候选仍可能让整体 harness 表现得像 heuristic table。 +2. admission pressure 只有 raise,没有处理 `max-num-seqs` 过高导致 TTFT/prefill 干扰。 +3. runtime gate 的 topology-settled 判断和正式 topology frontier 在 DP 非 base 时不完全一致。 + +对应修正: + +- 当 `prefill-scheduler-interaction` 有有效候选时,旧的 standalone MBT/chunked/joint + prefill-runtime 候选降为 fallback。 +- admission pressure 改为 normalized ratio,并支持 raise/lower 两个方向: + `raise_admission_pressure_with_chunked_prefill` 和 + `lower_admission_pressure_with_chunked_prefill`。 +- 抽出 `_higher_tp_frontier_patch`,让 runtime gate 与 + `_topology_frontier_status` 使用同一套 higher-TP signature。 + ### 2026-06-29 远端 review feedback 在 dash1 用 `36c301c` 启动 case3 bad-runtime 重跑后,trial-0003 的 @@ -136,7 +218,22 @@ dimension 还没有被 materialized config 测过时,优先测试 scheduler hy ```text PYTHONPATH=src python3 -m unittest discover -s tests -151 tests OK +156 tests OK +``` + +本地重点回归: + +```text +PYTHONPATH=src python3 -m unittest \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_coverage_precedes_gmu_microtune \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_admission_pressure_only_uses_normalized_seq_cap \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_lowers_excess_admission_pressure \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_negative_applicability_matrix \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_does_not_preempt_open_topology_frontier \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_lowers_quantum_by_normalized_ratio \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_quantum_step_scales_with_prompt_length \ + tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_not_active_for_short_prompt_workload +8 tests OK ``` ## 还需要真机实验验证 @@ -159,3 +256,39 @@ PYTHONPATH=src python3 -m unittest discover -s tests - candidate family sequence; - `prefill_quantum_ratio_current -> target` 的方向是否与 bottleneck evidence 一致; - 是否出现 repeated normalized full-config signature。 + +## 当前 dash1 真机状态 + +当前正在验证提交 `bfd8579`: + +```text +run = .aituner/badstart-prefill-scheduler-bfd8579-20260628T173102Z +case = badstart-expanded-9accf25-20260626T184911Z-runtime_tp2_dp1_gmu070_mns8 +session = aituner-prefill-scheduler-case3-bfd8579 +``` + +截至 2026-06-29 01:53 UTC+8 左右: + +- baseline trial-0001 已完成,best request_rate_per_gpu 约为 2.025; +- trial-0002 TP4 topology frontier probe 已完成,best request_rate_per_gpu 约为 2.000, + 没有超过 baseline; +- candidate-set-0002 的 top action 是 topology frontier,符合 topology-before-runtime; +- candidate-set-0003 的 top action 已变为 `seed_chunked_prefill_quantum`: + +```text +score = 0.69 +patch = enable-chunked-prefill=true, max-num-batched-tokens=8192 +ratio = prefill_quantum_ratio_target ~= 1.0536 +baseline = raise_gpu_memory_utilization score 0.64 +``` + +这说明 `uncovered_scheduler_dimension_bonus` 达到了设计目的:topology frontier 覆盖后, +未 materialized 的 scheduler dimension 会先于 GMU 微调被验证。 + +trial-0003 已完成,best request_rate_per_gpu 约为 2.025,和 baseline 持平,没有形成 +性能提升。这个结果不能 claim scheduler seed 是 winner,但它提供了有价值的 +falsification evidence:coverage priority 改变了探索顺序,具体 `chunked + MBT ~= p95` +hypothesis 被验证后没有改进。系统随后进入 candidate-set-0004,开始测试 +`gpu-memory-utilization=0.9`。trial-0004 同样完成在约 2.025,没有超过 baseline; +当前旧 run 已进入 trial-0005,继续测试 `gpu-memory-utilization=0.92`。后续需要观察 +GMU climb 是否会停下并转向 admission pressure、topology/DP 或其他 family。 diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 1353853..0be6993 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -1280,30 +1280,31 @@ def _runtime_candidate_actions( # only justified once no untested TP increase remains. At an intermediate TP (e.g. TP2 # while TP4 is still reachable and untried) a latency bottleneck must still be answered # by climbing TP, not a runtime tweak -- otherwise runtime tuning preempts the frontier. - _next_tp = _next_allowed_tp(study, current_tp=cur_tp, current_dp=cur_dp) + higher_tp_patch = _higher_tp_frontier_patch( + study, + current_tp=cur_tp, + current_dp=cur_dp, + ) tp_frontier_open = ( - _next_tp is not None - and _effective_config_signature( - study, - {"env_patch": {}, "flag_patch": {"tensor-parallel-size": _next_tp}} - ) + higher_tp_patch is not None + and _effective_config_signature(study, {"env_patch": {}, "flag_patch": higher_tp_patch}) not in tested_signatures ) topology_settled = not tp_frontier_open - actions.extend( - _prefill_scheduler_candidate_actions( - study, - window_summary, - anchor_flags, - runtime_base_patch, - top_bottleneck, - bottleneck_hypotheses, - topology_settled=topology_settled, - seen_signatures=seen_signatures, - blocked_candidates=blocked_candidates, - ) + prefill_scheduler_actions = _prefill_scheduler_candidate_actions( + study, + window_summary, + anchor_flags, + runtime_base_patch, + top_bottleneck, + bottleneck_hypotheses, + topology_settled=topology_settled, + seen_signatures=seen_signatures, + blocked_candidates=blocked_candidates, ) + actions.extend(prefill_scheduler_actions) + prefill_scheduler_candidate_available = bool(prefill_scheduler_actions) if ( "max-num-batched-tokens" in tunable @@ -1312,6 +1313,7 @@ def _runtime_candidate_actions( and recent_diagnostics[-1].get("trial_id") == anchor.get("trial_id") and cur_tp > 1 and not bottleneck_hypotheses + and not prefill_scheduler_candidate_available ): current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0) target_mbt = ( @@ -1361,7 +1363,7 @@ def _runtime_candidate_actions( ) seen_signatures.add(signature) - if "max-num-batched-tokens" in tunable: + if "max-num-batched-tokens" in tunable and not prefill_scheduler_candidate_available: current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0) mbt_targets: list[tuple[str, int]] = [] if top_bottleneck == "ttft_prefill": @@ -1484,6 +1486,7 @@ def _runtime_candidate_actions( and "max-num-batched-tokens" in tunable and "max-num-seqs" in tunable and max_num_seqs_tested + and not prefill_scheduler_candidate_available ): current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0) current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0) @@ -1540,7 +1543,11 @@ def _runtime_candidate_actions( ) ) - if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill": + if ( + "enable-chunked-prefill" in tunable + and top_bottleneck == "ttft_prefill" + and not prefill_scheduler_candidate_available + ): current = bool(anchor_flags.get("enable-chunked-prefill", False)) if not current: patch = {**runtime_base_patch, "enable-chunked-prefill": True} @@ -1706,14 +1713,15 @@ def _prefill_scheduler_candidate_actions( else None ) - if current_chunked and quantum_step["target"] is None and admission_step is None: + admission_target = admission_step["target"] if admission_step is not None else None + if current_chunked and quantum_step["target"] is None and admission_target is None: return [] patch = {**runtime_base_patch, "enable-chunked-prefill": True} if quantum_step["target"] is not None: patch["max-num-batched-tokens"] = quantum_step["target"] - if admission_step is not None: - patch["max-num-seqs"] = admission_step + if admission_target is not None: + patch["max-num-seqs"] = admission_target signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) action_id = _prefill_scheduler_action_id(quantum_step["direction"], admission_step) @@ -1736,12 +1744,12 @@ def _prefill_scheduler_candidate_actions( relief = 0.56 if quantum_step["direction"] == "lower" else 0.42 if quantum_step["direction"] == "seed": relief = 0.38 - if admission_step is not None: - relief += 0.06 + if admission_target is not None: + relief += 0.08 if admission_step and admission_step["direction"] == "lower" else 0.06 coverage_bonus = 0.0 if quantum_step["direction"] == "seed" or not current_chunked: coverage_bonus = 0.28 - elif quantum_step["target"] is not None or admission_step is not None: + elif quantum_step["target"] is not None or admission_target is not None: coverage_bonus = 0.14 information_gain = _information_gain(bottleneck_hypotheses, "runtime") score = relief * max(confidence, 0.35) + information_gain + 0.08 + coverage_bonus @@ -1759,7 +1767,14 @@ def _prefill_scheduler_candidate_actions( round(target_ratio, 4) if target_ratio is not None else None ), "admission_pressure_current": current_mns or None, - "admission_pressure_target": admission_step, + "admission_pressure_target": admission_target, + "admission_pressure_direction": admission_step["direction"] if admission_step else "hold", + "admission_pressure_ratio_current": ( + round(admission_step["current_ratio"], 4) if admission_step else None + ), + "admission_pressure_ratio_target": ( + round(admission_step["target_ratio"], 4) if admission_step else None + ), } actions = [ _runtime_action( @@ -1843,32 +1858,59 @@ def _next_admission_pressure_step( *, top_bottleneck: str, quantum_direction: str, -) -> int | None: +) -> dict[str, Any] | None: if current_mns <= 0: return None target_concurrency = max(int(study.trace.max_concurrency), 1) + current_ratio = current_mns / target_concurrency if top_bottleneck == "admission_or_queueing" and current_mns < target_concurrency: target = min(target_concurrency, int(current_mns * _ADMISSION_PRESSURE_STEP_UP)) - return _round_up_to_multiple(target, 8) + target = _round_up_to_multiple(target, 8) + return { + "direction": "raise", + "target": target, + "current_ratio": current_ratio, + "target_ratio": target / target_concurrency, + } if ( top_bottleneck == "ttft_prefill" and quantum_direction in {"hold", "raise"} and current_mns < target_concurrency ): target = min(target_concurrency, int(current_mns * _ADMISSION_PRESSURE_STEP_UP)) - return _round_up_to_multiple(target, 8) + target = _round_up_to_multiple(target, 8) + return { + "direction": "raise", + "target": target, + "current_ratio": current_ratio, + "target_ratio": target / target_concurrency, + } + if top_bottleneck == "ttft_prefill" and current_mns > target_concurrency: + target = min(current_mns - 8, _round_up_to_multiple(target_concurrency, 8)) + if target > 0 and target < current_mns: + return { + "direction": "lower", + "target": target, + "current_ratio": current_ratio, + "target_ratio": target / target_concurrency, + } return None -def _prefill_scheduler_action_id(quantum_direction: str, admission_target: int | None) -> str: +def _prefill_scheduler_action_id( + quantum_direction: str, + admission_step: dict[str, Any] | None, +) -> str: if quantum_direction == "lower": return "lower_prefill_quantum_with_chunked_prefill" if quantum_direction == "raise": return "raise_prefill_quantum_with_chunked_prefill" if quantum_direction == "seed": return "seed_chunked_prefill_quantum" - if admission_target is not None: - return "adjust_admission_pressure_with_chunked_prefill" + if admission_step is not None and admission_step["direction"] == "lower": + return "lower_admission_pressure_with_chunked_prefill" + if admission_step is not None and admission_step["direction"] == "raise": + return "raise_admission_pressure_with_chunked_prefill" return "enable_chunked_prefill_scheduler_mode" @@ -2393,8 +2435,12 @@ def _topology_frontier_status( flags = _effective_flags_for_item(study, best) current_tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1) current_dp = _parse_int_like(flags.get("data-parallel-size"), default=1) - next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp) - if next_tp is None: + flag_patch = _higher_tp_frontier_patch( + study, + current_tp=current_tp, + current_dp=current_dp, + ) + if flag_patch is None: return { **default, "reason": "no_legal_higher_tp_frontier", @@ -2402,11 +2448,8 @@ def _topology_frontier_status( "current_tp": current_tp, "current_dp": current_dp, } + next_tp = _parse_int_like(flag_patch.get("tensor-parallel-size"), default=current_tp) - flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} - base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1) - if current_dp != base_dp: - flag_patch["data-parallel-size"] = current_dp signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch}) if signature in _state_tested_signatures(study, state): return { @@ -2430,6 +2473,22 @@ def _topology_frontier_status( } +def _higher_tp_frontier_patch( + study: StudySpec, + *, + current_tp: int, + current_dp: int, +) -> dict[str, Any] | None: + next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp) + if next_tp is None: + return None + flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp} + base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1) + if current_dp != base_dp: + flag_patch["data-parallel-size"] = current_dp + return flag_patch + + def _effective_flags_for_item(study: StudySpec, item: dict[str, Any]) -> dict[str, Any]: flags = dict(study.engine.base_flags) patch = item.get("config_patch") diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 6308296..8d43878 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -3510,6 +3510,397 @@ class CoreFlowTests(unittest.TestCase): action["score_factors"]["uncovered_scheduler_dimension_bonus"], 0.0, ) + families = { + item["knob_family"] for item in context["experiment_plan"]["candidate_actions"] + } + self.assertNotIn("enable-chunked-prefill", families) + + def test_prefill_scheduler_admission_pressure_only_uses_normalized_seq_cap(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + trace_overrides={"max_concurrency": 64}, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 8192, + "max-num-seqs": 8, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.5, + "probes": [ + { + "threshold": 0.5, + "feasible": False, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.5, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": {"failed_reason_counts": {}}, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0}, + state=state, + ) + + action = context["experiment_plan"]["next_action"] + flag_patch = action["config_patch"]["flag_patch"] + self.assertEqual(action["knob_family"], "prefill-scheduler-interaction") + self.assertEqual(action["action_id"], "raise_admission_pressure_with_chunked_prefill") + self.assertEqual(flag_patch["max-num-seqs"], 16) + self.assertNotIn("max-num-batched-tokens", flag_patch) + self.assertEqual(action["score_factors"]["admission_pressure_direction"], "raise") + self.assertLess( + action["score_factors"]["admission_pressure_ratio_current"], + action["score_factors"]["admission_pressure_ratio_target"], + ) + + def test_prefill_scheduler_lowers_excess_admission_pressure(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + trace_overrides={"max_concurrency": 64}, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 8192, + "max-num-seqs": 128, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.95, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.95, + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 24} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0}, + state=state, + ) + + action = context["experiment_plan"]["next_action"] + flag_patch = action["config_patch"]["flag_patch"] + self.assertEqual(action["knob_family"], "prefill-scheduler-interaction") + self.assertEqual(action["action_id"], "lower_admission_pressure_with_chunked_prefill") + self.assertLess(flag_patch["max-num-seqs"], 128) + self.assertNotIn("max-num-batched-tokens", flag_patch) + self.assertEqual(action["score_factors"]["admission_pressure_direction"], "lower") + self.assertLess( + action["score_factors"]["admission_pressure_ratio_target"], + action["score_factors"]["admission_pressure_ratio_current"], + ) + + def test_prefill_scheduler_negative_applicability_matrix(self) -> None: + variants = [ + ( + {"request_mode": "decode_only"}, + {"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0}, + ), + ( + {}, + { + "prompt_tokens_p95": 8192, + "prompt_tail_ratio_p95_p50": 4.0, + "prefix_cache": {"repeated_token_ratio_estimate": 0.75}, + }, + ), + ( + {}, + {"prompt_tokens_p95": 2048, "prompt_tail_ratio_p95_p50": 1.0}, + ), + ] + for trace_overrides, window_summary in variants: + with self.subTest(trace_overrides=trace_overrides, window_summary=window_summary): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + trace_overrides=trace_overrides, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 8192, + "max-num-seqs": 8, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.95, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.95, + "latency_summary": { + "failed_reason_counts": { + "ttft_ms>4000.0": 24 + } + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary=window_summary, + state=state, + ) + families = { + item["knob_family"] + for item in context["experiment_plan"]["candidate_actions"] + } + self.assertNotIn("prefill-scheduler-interaction", families) + + def test_prefill_scheduler_does_not_preempt_open_topology_frontier(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 2, + "data-parallel-size": 1, + "max-num-batched-tokens": 8192, + "max-num-seqs": 8, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [2, 4], + "allowed_data_parallel_sizes": [1, 2], + "allowed_tp_dp_products": [4, 8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.95, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.95, + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 24} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=4, + best_request_rate=2.0, + best_request_rate_per_gpu=0.5, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=4, + best_request_rate=2.0, + best_request_rate_per_gpu=0.5, + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": {"data-parallel-size": 2}, + }, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0}, + state=state, + ) + + action = context["experiment_plan"]["next_action"] + self.assertEqual(action["knob_family"], "topology") + self.assertEqual( + action["config_patch"]["flag_patch"], + {"tensor-parallel-size": 4, "data-parallel-size": 2}, + ) + families = { + item["knob_family"] for item in context["experiment_plan"]["candidate_actions"] + } + self.assertNotIn("prefill-scheduler-interaction", families) def test_prefill_scheduler_not_active_for_short_prompt_workload(self) -> None: with tempfile.TemporaryDirectory() as tmp: