diff --git a/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md b/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md new file mode 100644 index 0000000..8712c9f --- /dev/null +++ b/docs/harness-ablation/prefill-scheduler-normalized-design-20260629.md @@ -0,0 +1,141 @@ +# Prefill Scheduler Interaction Harness 设计与 Review + +日期:2026-06-29 + +## 背景 + +case3 的 ablation 结果显示,`gpt-5.5 no-harness` 找到了一个 runtime/scheduler 方向: + +```text +enable-chunked-prefill=true +max-num-batched-tokens 较低/中等 +max-num-seqs 适中 +block-size=16 +``` + +而当时 harness 主要做两类动作: + +- 单点打开 `enable-chunked-prefill`; +- 对 `max-num-batched-tokens` 做单调 raise。 + +这个 gap 不能用“把 8192/32 这组值加入 candidate grid”来修补。那会把 case3 的答案硬编码成更大的候选表,仍然是 rule-based overfitting。 + +## 设计原则 + +新增的设计不是一个 fixed value set,而是一个 normalized control dimension: + +```text +prefill_quantum_ratio = max-num-batched-tokens / prompt_tokens_p95 +admission_pressure = max-num-seqs relative to trace.max_concurrency +scheduler_mode = enable-chunked-prefill +``` + +因此,candidate generator 不直接说“试 8192”,而是说: + +- 如果 long-tail prefill + TTFT/prefill bottleneck,且当前 `prefill_quantum_ratio` 太大,则沿 log-ratio 方向降低 prefill quantum; +- 如果 prefill quantum 远小于 prompt scale,可能是过度切碎导致 overhead,则沿 log-ratio 方向提高 prefill quantum; +- 如果 admission/queueing 是瓶颈,则只按 relative step 调整 admission pressure; +- 所有 concrete flag value 都是最后一步从 normalized target 映射到 engine flag,并按 engine granularity round。 + +当前实现使用几何中点作为 trust-region step: + +```text +target_mbt = sqrt(current_mbt * prompt_tokens_p95) +``` + +这对应在 log space 走半步。它比固定乘以 0.5/1.5 更接近 scale-invariant:prompt scale 变大时,下一步 MBT 也会变大。 + +## 实现映射 + +代码入口: + +- `src/aituner/harness.py::_runtime_candidate_actions` + - 在 topology frontier settled 后调用新的 `_prefill_scheduler_candidate_actions`。 + - 仍保留 topology-before-runtime guard,runtime family 不抢未覆盖的 topology frontier。 + +新增逻辑: + +- `_prefill_scheduler_workload_applies` + - 只在非 decode-only、long-tail/moderate-tail prefill workload、非 high-prefix-reuse 场景激活。 +- `_next_prefill_quantum_step` + - 使用 `current_mbt / prompt_scale` 判断方向。 + - 通过几何中点做相对 step。 +- `_next_admission_pressure_step` + - 使用 `trace.max_concurrency` 作为 admission scale,不使用固定 `max-num-seqs` 表。 +- `_prefill_scheduler_candidate_actions` + - 输出 `prefill-scheduler-interaction` family。 + - `score_factors` 显式记录 current/target `prefill_quantum_ratio`,方便后续实验解释。 + +## 为什么不是 rule-based hack + +禁止的实现形态: + +- 不允许引用 case3、具体 trace 名、模型名、机器名; +- 不允许出现 `if TP=2 and gmu=0.7 and mns=8 then MBT=8192`; +- 不允许把 case3 发现扩成 `{4096,8192,12288,16384} x {16,32,64}` 这种固定 grid; +- 不允许 bypass normalized full-config signature。 + +当前实现满足: + +- trigger 来自 L-C-A profile、bottleneck classifier、topology frontier、tunable flags; +- proposal 是相对当前 incumbent 的 direction,不是固定答案; +- concrete value 随 prompt scale 和 current config 改变; +- validator/no-repeat 仍使用 normalized effective full-config signature; +- short prompt、decode-only、high prefix reuse 不激活该 family。 + +## Review 结论 + +### 之前实现的问题 + +1. `enable-chunked-prefill` 是 standalone toggle,无法表达 scheduler interaction。 +2. TTFT/prefill bottleneck 下 MBT 主要单调 raise,无法发现“降低 prefill quantum 减少 HoL blocking”。 +3. 旧测试断言了固定 `16384` 等值,容易把 harness 叙事拉回 heuristic table。 + +### 当前改动的效果 + +1. 引入 `prefill-scheduler-interaction` 作为新的 mechanistic family。 +2. candidate 的 action id 表达方向: + - `lower_prefill_quantum_with_chunked_prefill` + - `raise_prefill_quantum_with_chunked_prefill` + - `seed_chunked_prefill_quantum` + - `adjust_admission_pressure_with_chunked_prefill` +3. 测试改为验证 normalized direction 和 scale sensitivity,而不是固定 absolute value。 + +## 单元验证 + +新增/更新的测试覆盖: + +- long-tail TTFT 下,过大的 `prefill_quantum_ratio` 会下降; +- prompt length scale 变大时,下一步 MBT target 也变大; +- short prompt workload 不激活 prefill scheduler family; +- 原有 prefill stop guard 仍不允许在有 high-value candidate 时停止; +- normalized full-config no-repeat 语义不变。 + +本地全量测试: + +```text +PYTHONPATH=src python3 -m unittest discover -s tests +151 tests OK +``` + +## 还需要真机实验验证 + +下一步实验不应该只看 case3 是否复现,而要攻击这个 family 的边界: + +1. case3 bad runtime start: + - 目标:验证 LLM+harness / no-LLM harness 是否能从 bad runtime start 找到 chunked-prefill scheduler 方向。 +2. scaled prompt case: + - 目标:验证 proposal 不固定在同一个 MBT,而会随 `prompt_tokens_p95` 改变。 +3. short/decode negative case: + - 目标:验证该 family 不会在不适用 workload 上误触发。 +4. topology frontier case: + - 目标:验证 topology 未覆盖时 runtime scheduler 不抢跑。 + +核心指标: + +- best request_rate_per_gpu; +- time-to-best / trial-to-target; +- candidate family sequence; +- `prefill_quantum_ratio_current -> target` 的方向是否与 bottleneck evidence 一致; +- 是否出现 repeated normalized full-config signature。 + diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 2b9a50b..fb4050c 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -39,6 +39,9 @@ _VALIDATION_TRIALS_WITHOUT_FAMILY_COVERAGE = 3 _GMU_STEP = 0.02 _GMU_NOMINAL_FLOOR = 0.9 _GMU_SAFE_CEILING = 0.97 +_PREFILL_QUANTUM_HEAD_OF_LINE_RATIO = 1.0 +_PREFILL_QUANTUM_FRAGMENTATION_RATIO = 0.5 +_ADMISSION_PRESSURE_STEP_UP = 1.5 def build_harness_context( @@ -355,19 +358,27 @@ def _knob_harnesses( } ) if "enable-chunked-prefill" in tunable: + prefill_scheduler_active = ( + active_bottleneck in {"ttft_prefill", "admission_or_queueing"} + and _prefill_scheduler_workload_applies(study, window_summary) + ) harnesses.append( { "knob_family": "enable-chunked-prefill", "use_when": [ "The L profile has a long tail and long prefills block shorter requests.", + "Treat chunked prefill as part of a scheduler interaction with prefill quantum and admission pressure, not as a standalone magic flag.", ], "procedure": [ - "Keep chunked prefill enabled for heavy-tail chat windows unless history shows chunking overhead dominates.", + "Enable chunked prefill when the measured bottleneck indicates prefill head-of-line blocking.", + "Move max-num-batched-tokens by relative trust-region steps in normalized prefill_quantum_ratio space.", + "Move max-num-seqs only as a relative admission-pressure correction, and preserve topology while testing this scheduler hypothesis.", ], "guards": [ "Do not disable chunked prefill on a heavy-tail workload without direct evidence from a nearby trial.", + "Do not use fixed absolute MBT/MNS tables; derive the next concrete flag values from the incumbent and workload scale.", ], - "active_now": False, + "active_now": prefill_scheduler_active, } ) if "expert-parallel-size" in tunable or "enable-expert-parallel" in tunable: @@ -1280,6 +1291,20 @@ def _runtime_candidate_actions( ) topology_settled = not tp_frontier_open + actions.extend( + _prefill_scheduler_candidate_actions( + study, + window_summary, + anchor_flags, + runtime_base_patch, + top_bottleneck, + bottleneck_hypotheses, + topology_settled=topology_settled, + seen_signatures=seen_signatures, + blocked_candidates=blocked_candidates, + ) + ) + if ( "max-num-batched-tokens" in tunable and _anchor_has_topology_patch(anchor) @@ -1636,11 +1661,216 @@ def _next_gpu_memory_utilization_target( return target +def _prefill_scheduler_candidate_actions( + study: StudySpec, + window_summary: dict[str, Any], + anchor_flags: dict[str, Any], + runtime_base_patch: dict[str, Any], + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], + *, + topology_settled: bool, + seen_signatures: set[str], + blocked_candidates: list[dict[str, Any]], +) -> list[dict[str, Any]]: + tunable = set(study.engine.tunable_flags) + if not topology_settled: + return [] + if top_bottleneck not in {"ttft_prefill", "admission_or_queueing"}: + return [] + if "enable-chunked-prefill" not in tunable or "max-num-batched-tokens" not in tunable: + return [] + if not _prefill_scheduler_workload_applies(study, window_summary): + return [] + + prompt_scale = _prefill_prompt_scale(window_summary) + if prompt_scale <= 0: + return [] + + current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0) + current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0) + current_chunked = bool(anchor_flags.get("enable-chunked-prefill", False)) + quantum_step = _next_prefill_quantum_step( + current_mbt, + prompt_scale, + top_bottleneck=top_bottleneck, + ) + admission_step = ( + _next_admission_pressure_step( + study, + current_mns, + top_bottleneck=top_bottleneck, + quantum_direction=quantum_step["direction"], + ) + if "max-num-seqs" in tunable + else None + ) + + if current_chunked and quantum_step["target"] is None and admission_step is None: + return [] + + patch = {**runtime_base_patch, "enable-chunked-prefill": True} + if quantum_step["target"] is not None: + patch["max-num-batched-tokens"] = quantum_step["target"] + if admission_step is not None: + patch["max-num-seqs"] = admission_step + + signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch}) + action_id = _prefill_scheduler_action_id(quantum_step["direction"], admission_step) + if signature in seen_signatures: + blocked_candidates.append( + _blocked_candidate( + action_id=action_id, + knob_family="prefill-scheduler-interaction", + config_patch={"env_patch": {}, "flag_patch": patch}, + blocked_reason="blocked_noop_or_repeat_effective_full_config", + effective_config_signature=signature, + ) + ) + return [] + + current_ratio = current_mbt / prompt_scale if current_mbt > 0 else None + target_mbt = quantum_step["target"] if quantum_step["target"] is not None else current_mbt + target_ratio = target_mbt / prompt_scale if target_mbt > 0 else None + confidence = _hypothesis_confidence(bottleneck_hypotheses, top_bottleneck) + relief = 0.56 if quantum_step["direction"] == "lower" else 0.42 + if quantum_step["direction"] == "seed": + relief = 0.38 + if admission_step is not None: + relief += 0.06 + score = relief * max(confidence, 0.35) + _information_gain(bottleneck_hypotheses, "runtime") + 0.08 + factors = { + "expected_bottleneck_relief": round(relief, 4), + "bottleneck_confidence": round(confidence, 4), + "information_gain": round(_information_gain(bottleneck_hypotheses, "runtime"), 4), + "launch_safety": 0.08, + "regression_risk": 0.06 if current_chunked else 0.1, + "prefill_quantum_ratio_current": ( + round(current_ratio, 4) if current_ratio is not None else None + ), + "prefill_quantum_ratio_target": ( + round(target_ratio, 4) if target_ratio is not None else None + ), + "admission_pressure_current": current_mns or None, + "admission_pressure_target": admission_step, + } + actions = [ + _runtime_action( + action_id=action_id, + knob_family="prefill-scheduler-interaction", + score=score, + score_factors=factors, + patch=patch, + hypothesis=( + "Test the prefill scheduler hypothesis in normalized control space: " + "chunked prefill changes the scheduler mode, max-num-batched-tokens " + "controls prefill_quantum_ratio, and max-num-seqs controls admission pressure." + ), + expected_effects=[ + "preserve the settled topology while perturbing scheduler controls", + "reduce long-prefill head-of-line blocking when the prefill quantum is too large", + "reject this scheduler hypothesis if request_rate_per_gpu does not improve under the configured SLO", + ], + ) + ] + seen_signatures.add(signature) + return actions + + +def _prefill_scheduler_workload_applies( + study: StudySpec, + window_summary: dict[str, Any], +) -> bool: + if study.trace.request_mode == "decode_only": + return False + prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95")) + prompt_p99 = _as_float(window_summary.get("prompt_tokens_p99")) + tail_ratio = _as_float(window_summary.get("prompt_tail_ratio_p95_p50")) + if prompt_p95 <= 0 and prompt_p99 > 0: + prompt_p95 = prompt_p99 + if _length_regime(prompt_p95, tail_ratio) == "short_or_moderate": + return False + prefix_cache = window_summary.get("prefix_cache") + cache_ratio = 0.0 + if isinstance(prefix_cache, dict): + cache_ratio = _as_float(prefix_cache.get("repeated_token_ratio_estimate")) + return _cache_regime(cache_ratio) != "high_prefix_reuse" + + +def _prefill_prompt_scale(window_summary: dict[str, Any]) -> float: + prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95")) + prompt_p99 = _as_float(window_summary.get("prompt_tokens_p99")) + if prompt_p95 > 0: + return prompt_p95 + return prompt_p99 + + +def _next_prefill_quantum_step( + current_mbt: int, + prompt_scale: float, + *, + top_bottleneck: str, +) -> dict[str, Any]: + if current_mbt <= 0: + return { + "direction": "seed", + "target": _round_up_to_multiple(int(prompt_scale), 1024), + } + ratio = current_mbt / prompt_scale if prompt_scale > 0 else 0.0 + if top_bottleneck == "ttft_prefill" and ratio > _PREFILL_QUANTUM_HEAD_OF_LINE_RATIO: + target = int((current_mbt * prompt_scale) ** 0.5) + target = _round_up_to_multiple(target, 1024) + if target < current_mbt: + return {"direction": "lower", "target": target} + if ratio < _PREFILL_QUANTUM_FRAGMENTATION_RATIO: + target = int((current_mbt * prompt_scale) ** 0.5) + target = _round_up_to_multiple(target, 1024) + if target > current_mbt: + return {"direction": "raise", "target": target} + return {"direction": "hold", "target": None} + + +def _next_admission_pressure_step( + study: StudySpec, + current_mns: int, + *, + top_bottleneck: str, + quantum_direction: str, +) -> int | None: + if current_mns <= 0: + return None + target_concurrency = max(int(study.trace.max_concurrency), 1) + if top_bottleneck == "admission_or_queueing" and current_mns < target_concurrency: + target = min(target_concurrency, int(current_mns * _ADMISSION_PRESSURE_STEP_UP)) + return _round_up_to_multiple(target, 8) + if ( + top_bottleneck == "ttft_prefill" + and quantum_direction in {"hold", "raise"} + and current_mns < target_concurrency + ): + target = min(target_concurrency, int(current_mns * _ADMISSION_PRESSURE_STEP_UP)) + return _round_up_to_multiple(target, 8) + return None + + +def _prefill_scheduler_action_id(quantum_direction: str, admission_target: int | None) -> str: + if quantum_direction == "lower": + return "lower_prefill_quantum_with_chunked_prefill" + if quantum_direction == "raise": + return "raise_prefill_quantum_with_chunked_prefill" + if quantum_direction == "seed": + return "seed_chunked_prefill_quantum" + if admission_target is not None: + return "adjust_admission_pressure_with_chunked_prefill" + return "enable_chunked_prefill_scheduler_mode" + + def _runtime_action( *, action_id: str, knob_family: str, score: float, + score_factors: dict[str, Any] | None = None, patch: dict[str, Any], hypothesis: str, expected_effects: list[str], @@ -1649,7 +1879,8 @@ def _runtime_action( "action_id": action_id, "knob_family": knob_family, "score": round(score, 4), - "score_factors": { + "score_factors": score_factors + or { "expected_bottleneck_relief": round(max(score - 0.1, 0.0), 4), "information_gain": 0.1, "launch_safety": 0.05, diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 6200578..c2fe5da 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -3220,9 +3220,277 @@ class CoreFlowTests(unittest.TestCase): "experiment_plan_has_high_value_candidate", ) action = context["experiment_plan"]["next_action"] - self.assertEqual(action["knob_family"], "max-num-seqs") - self.assertEqual(action["config_patch"]["flag_patch"]["max-num-seqs"], 96) - self.assertEqual(action["config_patch"]["flag_patch"]["tensor-parallel-size"], 8) + self.assertEqual(action["knob_family"], "prefill-scheduler-interaction") + self.assertEqual(action["action_id"], "raise_prefill_quantum_with_chunked_prefill") + flag_patch = action["config_patch"]["flag_patch"] + self.assertEqual(flag_patch["tensor-parallel-size"], 8) + self.assertGreater(flag_patch["max-num-batched-tokens"], 8192) + + def test_prefill_scheduler_lowers_quantum_by_normalized_ratio(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 32768, + "max-num-seqs": 8, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.95, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.95, + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 24} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0}, + state=state, + ) + + action = context["experiment_plan"]["next_action"] + flag_patch = action["config_patch"]["flag_patch"] + self.assertEqual(action["knob_family"], "prefill-scheduler-interaction") + self.assertEqual(action["action_id"], "lower_prefill_quantum_with_chunked_prefill") + self.assertLess(flag_patch["max-num-batched-tokens"], 32768) + factors = action["score_factors"] + self.assertLess( + factors["prefill_quantum_ratio_target"], + factors["prefill_quantum_ratio_current"], + ) + + def test_prefill_scheduler_quantum_step_scales_with_prompt_length(self) -> None: + targets: list[int] = [] + for prompt_p95 in (8192, 16384): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 32768, + "max-num-seqs": 8, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.95, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.95, + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 24} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={ + "prompt_tokens_p95": prompt_p95, + "prompt_tail_ratio_p95_p50": 4.0, + }, + state=state, + ) + action = context["experiment_plan"]["next_action"] + self.assertEqual(action["knob_family"], "prefill-scheduler-interaction") + targets.append(action["config_patch"]["flag_patch"]["max-num-batched-tokens"]) + + self.assertGreater(targets[1], targets[0]) + + def test_prefill_scheduler_not_active_for_short_prompt_workload(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 8, + "data-parallel-size": 1, + "max-num-batched-tokens": 32768, + "max-num-seqs": 8, + "enable-chunked-prefill": True, + }, + "tunable_flags": [ + "tensor-parallel-size", + "data-parallel-size", + "max-num-batched-tokens", + "max-num-seqs", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [8], + "allowed_data_parallel_sizes": [1], + "allowed_tp_dp_products": [8], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.95, + "probes": [ + { + "threshold": 0.5, + "feasible": True, + "payload": { + "request_rate": 2.0, + "pass_rate": 0.95, + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 24} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + state = StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=8, + best_request_rate=2.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ) + + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 2048, "prompt_tail_ratio_p95_p50": 1.0}, + state=state, + ) + + families = { + item["knob_family"] for item in context["experiment_plan"]["candidate_actions"] + } + self.assertNotIn("prefill-scheduler-interaction", families) def test_prefill_sequence_probe_followed_by_joint_runtime_probe(self) -> None: with tempfile.TemporaryDirectory() as tmp: @@ -3350,10 +3618,11 @@ class CoreFlowTests(unittest.TestCase): ) action = context["experiment_plan"]["next_action"] flag_patch = action["config_patch"]["flag_patch"] - self.assertEqual(action["knob_family"], "prefill-runtime-interaction") + self.assertEqual(action["knob_family"], "prefill-scheduler-interaction") + self.assertEqual(action["action_id"], "raise_prefill_quantum_with_chunked_prefill") self.assertEqual(flag_patch["tensor-parallel-size"], 8) - self.assertEqual(flag_patch["max-num-batched-tokens"], 16384) - self.assertEqual(flag_patch["max-num-seqs"], 96) + self.assertGreater(flag_patch["max-num-batched-tokens"], 8192) + self.assertLess(flag_patch["max-num-batched-tokens"], 24000) def test_slo_unrecoverable_does_not_mask_latency_bottleneck(self) -> None: with tempfile.TemporaryDirectory() as tmp: