Harden prefill scheduler harness

This commit is contained in:
2026-06-29 01:54:02 +08:00
parent bfd85793f3
commit ee101a7c24
3 changed files with 624 additions and 41 deletions

View File

@@ -45,6 +45,39 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95)
这对应在 log space 走半步。它比固定乘以 0.5/1.5 更接近 scale-invariantprompt scale 变大时,下一步 MBT 也会变大。
## Agent Loop
当前 harness 的 loop 可以形式化为:
```text
trial result
-> observation extractor
-> bottleneck classifier
-> candidate family selector
-> normalized candidate generator
-> scoring / coverage ranking
-> validator / no-repeat / stop guard
-> next trial
```
每一层承担不同责任:
1. observation extractor 只把 trial result 转成可比较的事实,例如
request_rate_per_gpu、pass_rate、失败原因、TTFT/TPOT 分位数。
2. bottleneck classifier 把事实归入 `ttft_prefill``decode_tpot`
`admission_or_queueing` 等机制瓶颈,不直接输出配置值。
3. candidate family selector 决定要验证哪个系统假设,例如 topology frontier、
prefill scheduler、admission pressure 或 GPU memory headroom。
4. normalized candidate generator 才把机制变量映射成具体 engine flag。
5. scoring / coverage ranking 负责排序:未覆盖但机制上相关的维度应优先于
已知方向上的微调。
6. validator 使用 normalized full-config signature 防止重复测试,并用 stop guard
避免在仍有高价值 falsification candidate 时过早停止。
因此harness 的核心不是“把 LLM prompt 写好”,而是把黑盒搜索拆成带因果方向的
white-box falsification loop。LLM 可以参与生成候选或解释候选,但候选必须通过
harness 的 family、signature、scoring 和 validator 约束。
## 实现映射
代码入口:
@@ -61,13 +94,19 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95)
- 使用 `current_mbt / prompt_scale` 判断方向。
- 通过几何中点做相对 step。
- `_next_admission_pressure_step`
- 使用 `trace.max_concurrency` 作为 admission scale不使用固定 `max-num-seqs`
- 使用 `max-num-seqs / trace.max_concurrency` 作为 normalized admission pressure
- 当 admission/queueing 受限且 admission pressure 过低时 raise。
- 当 TTFT/prefill 受限且 admission pressure 明显高于 trace concurrency scale 时 lower。
- `_prefill_scheduler_candidate_actions`
- 输出 `prefill-scheduler-interaction` family。
- `score_factors` 显式记录 current/target `prefill_quantum_ratio`,方便后续实验解释。
- `score_factors` 同时记录 current/target admission pressure ratio避免只解释 MBT。
- 当 scheduler dimension 还没有被 materialized config 覆盖时,加入
`uncovered_scheduler_dimension_bonus`,让该 family 在 topology settled 后优先于
`gpu-memory-utilization` 这类 resource micro-tuning。
- 当该 family 已生成有效候选时,旧的 standalone `raise_mbt`
`enable_chunked_prefill``raise_mbt_and_max_num_seqs` 只作为 fallback不作为同级
prefill runtime 候选抢排序。
## 为什么不是 rule-based hack
@@ -84,8 +123,18 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95)
- proposal 是相对当前 incumbent 的 direction不是固定答案
- concrete value 随 prompt scale 和 current config 改变;
- validator/no-repeat 仍使用 normalized effective full-config signature
- runtime gate 和正式 topology frontier 共用 higher-TP frontier patch 构造,避免
DP 非 base 时 scheduler 抢跑;
- short prompt、decode-only、high prefix reuse 不激活该 family。
但这不是完备性证明。当前能 claim 的是更严格的工程性质:
- 不引用特定 case identity
- 不把已知 winner 写成候选表;
- 每个 concrete proposal 都能追溯到一个 normalized mechanism variable
- 每次 trial 都能被解释成对一个系统假设的 falsification
- 失败时会留下可审计的 candidate sequence 和 score factors。
## Review 结论
### 之前实现的问题
@@ -104,6 +153,39 @@ target_mbt = sqrt(current_mbt * prompt_tokens_p95)
- `adjust_admission_pressure_with_chunked_prefill`
3. 测试改为验证 normalized direction 和 scale sensitivity而不是固定 absolute value。
### 当前实现仍需警惕的风险
1. `_PREFILL_QUANTUM_HEAD_OF_LINE_RATIO=1.0`
`_PREFILL_QUANTUM_FRAGMENTATION_RATIO=0.5` 仍是机制阈值,不是定理。
它们必须通过 scaled prompt / negative workload 实验验证,而不能只靠 case3。
2. `uncovered_scheduler_dimension_bonus` 是 coverage 排序策略。它的合理性来自
“先覆盖未 materialized 的机制维度,再做 GMU 微调”,但必须通过 candidate
sequence 证明它不会在 topology frontier 未覆盖时抢跑。
3. `block-size=16` 目前没有被纳入这个 family。不能把它作为 case3 固定答案加入;
如果后续要处理,需要单独设计 allocator/layout family从 engine capability 和
memory block waste observation 推导,而不是在 prefill scheduler family 里硬塞。
4. 现有实现仍保留旧的 standalone `enable-chunked-prefill``raise_mbt` 路径作为
fallback。它们不能在 `prefill-scheduler-interaction` 已生成有效候选时作为同级
prefill runtime 候选抢排序。
### 2026-06-29 独立 review 后的修正
独立 review 指出了三个需要立即收紧的泛化风险:
1. 旧 standalone MBT/chunked 候选仍可能让整体 harness 表现得像 heuristic table。
2. admission pressure 只有 raise没有处理 `max-num-seqs` 过高导致 TTFT/prefill 干扰。
3. runtime gate 的 topology-settled 判断和正式 topology frontier 在 DP 非 base 时不完全一致。
对应修正:
-`prefill-scheduler-interaction` 有有效候选时,旧的 standalone MBT/chunked/joint
prefill-runtime 候选降为 fallback。
- admission pressure 改为 normalized ratio并支持 raise/lower 两个方向:
`raise_admission_pressure_with_chunked_prefill`
`lower_admission_pressure_with_chunked_prefill`
- 抽出 `_higher_tp_frontier_patch`,让 runtime gate 与
`_topology_frontier_status` 使用同一套 higher-TP signature。
### 2026-06-29 远端 review feedback
在 dash1 用 `36c301c` 启动 case3 bad-runtime 重跑后trial-0003 的
@@ -136,7 +218,22 @@ dimension 还没有被 materialized config 测过时,优先测试 scheduler hy
```text
PYTHONPATH=src python3 -m unittest discover -s tests
151 tests OK
156 tests OK
```
本地重点回归:
```text
PYTHONPATH=src python3 -m unittest \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_coverage_precedes_gmu_microtune \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_admission_pressure_only_uses_normalized_seq_cap \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_lowers_excess_admission_pressure \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_negative_applicability_matrix \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_does_not_preempt_open_topology_frontier \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_lowers_quantum_by_normalized_ratio \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_quantum_step_scales_with_prompt_length \
tests.test_core_flow.CoreFlowTests.test_prefill_scheduler_not_active_for_short_prompt_workload
8 tests OK
```
## 还需要真机实验验证
@@ -159,3 +256,39 @@ PYTHONPATH=src python3 -m unittest discover -s tests
- candidate family sequence
- `prefill_quantum_ratio_current -> target` 的方向是否与 bottleneck evidence 一致;
- 是否出现 repeated normalized full-config signature。
## 当前 dash1 真机状态
当前正在验证提交 `bfd8579`
```text
run = .aituner/badstart-prefill-scheduler-bfd8579-20260628T173102Z
case = badstart-expanded-9accf25-20260626T184911Z-runtime_tp2_dp1_gmu070_mns8
session = aituner-prefill-scheduler-case3-bfd8579
```
截至 2026-06-29 01:53 UTC+8 左右:
- baseline trial-0001 已完成best request_rate_per_gpu 约为 2.025
- trial-0002 TP4 topology frontier probe 已完成best request_rate_per_gpu 约为 2.000
没有超过 baseline
- candidate-set-0002 的 top action 是 topology frontier符合 topology-before-runtime
- candidate-set-0003 的 top action 已变为 `seed_chunked_prefill_quantum`
```text
score = 0.69
patch = enable-chunked-prefill=true, max-num-batched-tokens=8192
ratio = prefill_quantum_ratio_target ~= 1.0536
baseline = raise_gpu_memory_utilization score 0.64
```
这说明 `uncovered_scheduler_dimension_bonus` 达到了设计目的topology frontier 覆盖后,
未 materialized 的 scheduler dimension 会先于 GMU 微调被验证。
trial-0003 已完成best request_rate_per_gpu 约为 2.025,和 baseline 持平,没有形成
性能提升。这个结果不能 claim scheduler seed 是 winner但它提供了有价值的
falsification evidencecoverage priority 改变了探索顺序,具体 `chunked + MBT ~= p95`
hypothesis 被验证后没有改进。系统随后进入 candidate-set-0004开始测试
`gpu-memory-utilization=0.9`。trial-0004 同样完成在约 2.025,没有超过 baseline
当前旧 run 已进入 trial-0005继续测试 `gpu-memory-utilization=0.92`。后续需要观察
GMU climb 是否会停下并转向 admission pressure、topology/DP 或其他 family。

View File

@@ -1280,19 +1280,19 @@ def _runtime_candidate_actions(
# only justified once no untested TP increase remains. At an intermediate TP (e.g. TP2
# while TP4 is still reachable and untried) a latency bottleneck must still be answered
# by climbing TP, not a runtime tweak -- otherwise runtime tuning preempts the frontier.
_next_tp = _next_allowed_tp(study, current_tp=cur_tp, current_dp=cur_dp)
tp_frontier_open = (
_next_tp is not None
and _effective_config_signature(
higher_tp_patch = _higher_tp_frontier_patch(
study,
{"env_patch": {}, "flag_patch": {"tensor-parallel-size": _next_tp}}
current_tp=cur_tp,
current_dp=cur_dp,
)
tp_frontier_open = (
higher_tp_patch is not None
and _effective_config_signature(study, {"env_patch": {}, "flag_patch": higher_tp_patch})
not in tested_signatures
)
topology_settled = not tp_frontier_open
actions.extend(
_prefill_scheduler_candidate_actions(
prefill_scheduler_actions = _prefill_scheduler_candidate_actions(
study,
window_summary,
anchor_flags,
@@ -1303,7 +1303,8 @@ def _runtime_candidate_actions(
seen_signatures=seen_signatures,
blocked_candidates=blocked_candidates,
)
)
actions.extend(prefill_scheduler_actions)
prefill_scheduler_candidate_available = bool(prefill_scheduler_actions)
if (
"max-num-batched-tokens" in tunable
@@ -1312,6 +1313,7 @@ def _runtime_candidate_actions(
and recent_diagnostics[-1].get("trial_id") == anchor.get("trial_id")
and cur_tp > 1
and not bottleneck_hypotheses
and not prefill_scheduler_candidate_available
):
current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0)
target_mbt = (
@@ -1361,7 +1363,7 @@ def _runtime_candidate_actions(
)
seen_signatures.add(signature)
if "max-num-batched-tokens" in tunable:
if "max-num-batched-tokens" in tunable and not prefill_scheduler_candidate_available:
current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0)
mbt_targets: list[tuple[str, int]] = []
if top_bottleneck == "ttft_prefill":
@@ -1484,6 +1486,7 @@ def _runtime_candidate_actions(
and "max-num-batched-tokens" in tunable
and "max-num-seqs" in tunable
and max_num_seqs_tested
and not prefill_scheduler_candidate_available
):
current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0)
current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0)
@@ -1540,7 +1543,11 @@ def _runtime_candidate_actions(
)
)
if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill":
if (
"enable-chunked-prefill" in tunable
and top_bottleneck == "ttft_prefill"
and not prefill_scheduler_candidate_available
):
current = bool(anchor_flags.get("enable-chunked-prefill", False))
if not current:
patch = {**runtime_base_patch, "enable-chunked-prefill": True}
@@ -1706,14 +1713,15 @@ def _prefill_scheduler_candidate_actions(
else None
)
if current_chunked and quantum_step["target"] is None and admission_step is None:
admission_target = admission_step["target"] if admission_step is not None else None
if current_chunked and quantum_step["target"] is None and admission_target is None:
return []
patch = {**runtime_base_patch, "enable-chunked-prefill": True}
if quantum_step["target"] is not None:
patch["max-num-batched-tokens"] = quantum_step["target"]
if admission_step is not None:
patch["max-num-seqs"] = admission_step
if admission_target is not None:
patch["max-num-seqs"] = admission_target
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
action_id = _prefill_scheduler_action_id(quantum_step["direction"], admission_step)
@@ -1736,12 +1744,12 @@ def _prefill_scheduler_candidate_actions(
relief = 0.56 if quantum_step["direction"] == "lower" else 0.42
if quantum_step["direction"] == "seed":
relief = 0.38
if admission_step is not None:
relief += 0.06
if admission_target is not None:
relief += 0.08 if admission_step and admission_step["direction"] == "lower" else 0.06
coverage_bonus = 0.0
if quantum_step["direction"] == "seed" or not current_chunked:
coverage_bonus = 0.28
elif quantum_step["target"] is not None or admission_step is not None:
elif quantum_step["target"] is not None or admission_target is not None:
coverage_bonus = 0.14
information_gain = _information_gain(bottleneck_hypotheses, "runtime")
score = relief * max(confidence, 0.35) + information_gain + 0.08 + coverage_bonus
@@ -1759,7 +1767,14 @@ def _prefill_scheduler_candidate_actions(
round(target_ratio, 4) if target_ratio is not None else None
),
"admission_pressure_current": current_mns or None,
"admission_pressure_target": admission_step,
"admission_pressure_target": admission_target,
"admission_pressure_direction": admission_step["direction"] if admission_step else "hold",
"admission_pressure_ratio_current": (
round(admission_step["current_ratio"], 4) if admission_step else None
),
"admission_pressure_ratio_target": (
round(admission_step["target_ratio"], 4) if admission_step else None
),
}
actions = [
_runtime_action(
@@ -1843,32 +1858,59 @@ def _next_admission_pressure_step(
*,
top_bottleneck: str,
quantum_direction: str,
) -> int | None:
) -> dict[str, Any] | None:
if current_mns <= 0:
return None
target_concurrency = max(int(study.trace.max_concurrency), 1)
current_ratio = current_mns / target_concurrency
if top_bottleneck == "admission_or_queueing" and current_mns < target_concurrency:
target = min(target_concurrency, int(current_mns * _ADMISSION_PRESSURE_STEP_UP))
return _round_up_to_multiple(target, 8)
target = _round_up_to_multiple(target, 8)
return {
"direction": "raise",
"target": target,
"current_ratio": current_ratio,
"target_ratio": target / target_concurrency,
}
if (
top_bottleneck == "ttft_prefill"
and quantum_direction in {"hold", "raise"}
and current_mns < target_concurrency
):
target = min(target_concurrency, int(current_mns * _ADMISSION_PRESSURE_STEP_UP))
return _round_up_to_multiple(target, 8)
target = _round_up_to_multiple(target, 8)
return {
"direction": "raise",
"target": target,
"current_ratio": current_ratio,
"target_ratio": target / target_concurrency,
}
if top_bottleneck == "ttft_prefill" and current_mns > target_concurrency:
target = min(current_mns - 8, _round_up_to_multiple(target_concurrency, 8))
if target > 0 and target < current_mns:
return {
"direction": "lower",
"target": target,
"current_ratio": current_ratio,
"target_ratio": target / target_concurrency,
}
return None
def _prefill_scheduler_action_id(quantum_direction: str, admission_target: int | None) -> str:
def _prefill_scheduler_action_id(
quantum_direction: str,
admission_step: dict[str, Any] | None,
) -> str:
if quantum_direction == "lower":
return "lower_prefill_quantum_with_chunked_prefill"
if quantum_direction == "raise":
return "raise_prefill_quantum_with_chunked_prefill"
if quantum_direction == "seed":
return "seed_chunked_prefill_quantum"
if admission_target is not None:
return "adjust_admission_pressure_with_chunked_prefill"
if admission_step is not None and admission_step["direction"] == "lower":
return "lower_admission_pressure_with_chunked_prefill"
if admission_step is not None and admission_step["direction"] == "raise":
return "raise_admission_pressure_with_chunked_prefill"
return "enable_chunked_prefill_scheduler_mode"
@@ -2393,8 +2435,12 @@ def _topology_frontier_status(
flags = _effective_flags_for_item(study, best)
current_tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1)
current_dp = _parse_int_like(flags.get("data-parallel-size"), default=1)
next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp)
if next_tp is None:
flag_patch = _higher_tp_frontier_patch(
study,
current_tp=current_tp,
current_dp=current_dp,
)
if flag_patch is None:
return {
**default,
"reason": "no_legal_higher_tp_frontier",
@@ -2402,11 +2448,8 @@ def _topology_frontier_status(
"current_tp": current_tp,
"current_dp": current_dp,
}
next_tp = _parse_int_like(flag_patch.get("tensor-parallel-size"), default=current_tp)
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1)
if current_dp != base_dp:
flag_patch["data-parallel-size"] = current_dp
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": flag_patch})
if signature in _state_tested_signatures(study, state):
return {
@@ -2430,6 +2473,22 @@ def _topology_frontier_status(
}
def _higher_tp_frontier_patch(
study: StudySpec,
*,
current_tp: int,
current_dp: int,
) -> dict[str, Any] | None:
next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp)
if next_tp is None:
return None
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
base_dp = _parse_int_like(study.engine.base_flags.get("data-parallel-size"), default=1)
if current_dp != base_dp:
flag_patch["data-parallel-size"] = current_dp
return flag_patch
def _effective_flags_for_item(study: StudySpec, item: dict[str, Any]) -> dict[str, Any]:
flags = dict(study.engine.base_flags)
patch = item.get("config_patch")

View File

@@ -3510,6 +3510,397 @@ class CoreFlowTests(unittest.TestCase):
action["score_factors"]["uncovered_scheduler_dimension_bonus"],
0.0,
)
families = {
item["knob_family"] for item in context["experiment_plan"]["candidate_actions"]
}
self.assertNotIn("enable-chunked-prefill", families)
def test_prefill_scheduler_admission_pressure_only_uses_normalized_seq_cap(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
trace_overrides={"max_concurrency": 64},
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 8192,
"max-num-seqs": 8,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.5,
"probes": [
{
"threshold": 0.5,
"feasible": False,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.5,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {"failed_reason_counts": {}},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0},
state=state,
)
action = context["experiment_plan"]["next_action"]
flag_patch = action["config_patch"]["flag_patch"]
self.assertEqual(action["knob_family"], "prefill-scheduler-interaction")
self.assertEqual(action["action_id"], "raise_admission_pressure_with_chunked_prefill")
self.assertEqual(flag_patch["max-num-seqs"], 16)
self.assertNotIn("max-num-batched-tokens", flag_patch)
self.assertEqual(action["score_factors"]["admission_pressure_direction"], "raise")
self.assertLess(
action["score_factors"]["admission_pressure_ratio_current"],
action["score_factors"]["admission_pressure_ratio_target"],
)
def test_prefill_scheduler_lowers_excess_admission_pressure(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
trace_overrides={"max_concurrency": 64},
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 8192,
"max-num-seqs": 128,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.95,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.95,
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 24}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0},
state=state,
)
action = context["experiment_plan"]["next_action"]
flag_patch = action["config_patch"]["flag_patch"]
self.assertEqual(action["knob_family"], "prefill-scheduler-interaction")
self.assertEqual(action["action_id"], "lower_admission_pressure_with_chunked_prefill")
self.assertLess(flag_patch["max-num-seqs"], 128)
self.assertNotIn("max-num-batched-tokens", flag_patch)
self.assertEqual(action["score_factors"]["admission_pressure_direction"], "lower")
self.assertLess(
action["score_factors"]["admission_pressure_ratio_target"],
action["score_factors"]["admission_pressure_ratio_current"],
)
def test_prefill_scheduler_negative_applicability_matrix(self) -> None:
variants = [
(
{"request_mode": "decode_only"},
{"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0},
),
(
{},
{
"prompt_tokens_p95": 8192,
"prompt_tail_ratio_p95_p50": 4.0,
"prefix_cache": {"repeated_token_ratio_estimate": 0.75},
},
),
(
{},
{"prompt_tokens_p95": 2048, "prompt_tail_ratio_p95_p50": 1.0},
),
]
for trace_overrides, window_summary in variants:
with self.subTest(trace_overrides=trace_overrides, window_summary=window_summary):
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
trace_overrides=trace_overrides,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 8,
"data-parallel-size": 1,
"max-num-batched-tokens": 8192,
"max-num-seqs": 8,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.95,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.95,
"latency_summary": {
"failed_reason_counts": {
"ttft_ms>4000.0": 24
}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=8,
best_request_rate=2.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary=window_summary,
state=state,
)
families = {
item["knob_family"]
for item in context["experiment_plan"]["candidate_actions"]
}
self.assertNotIn("prefill-scheduler-interaction", families)
def test_prefill_scheduler_does_not_preempt_open_topology_frontier(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 2,
"data-parallel-size": 1,
"max-num-batched-tokens": 8192,
"max-num-seqs": 8,
"enable-chunked-prefill": True,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"max-num-batched-tokens",
"max-num-seqs",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [2, 4],
"allowed_data_parallel_sizes": [1, 2],
"allowed_tp_dp_products": [4, 8],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.95,
"probes": [
{
"threshold": 0.5,
"feasible": True,
"payload": {
"request_rate": 2.0,
"pass_rate": 0.95,
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 24}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=4,
best_request_rate=2.0,
best_request_rate_per_gpu=0.5,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=4,
best_request_rate=2.0,
best_request_rate_per_gpu=0.5,
result_path=str(result_path),
config_patch={
"env_patch": {},
"flag_patch": {"data-parallel-size": 2},
},
)
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 8192, "prompt_tail_ratio_p95_p50": 4.0},
state=state,
)
action = context["experiment_plan"]["next_action"]
self.assertEqual(action["knob_family"], "topology")
self.assertEqual(
action["config_patch"]["flag_patch"],
{"tensor-parallel-size": 4, "data-parallel-size": 2},
)
families = {
item["knob_family"] for item in context["experiment_plan"]["candidate_actions"]
}
self.assertNotIn("prefill-scheduler-interaction", families)
def test_prefill_scheduler_not_active_for_short_prompt_workload(self) -> None:
with tempfile.TemporaryDirectory() as tmp: