Add declarative harness prototype

This commit is contained in:
2026-06-26 18:07:02 +08:00
parent 4075c7abf0
commit 384cb58f1f
5 changed files with 752 additions and 1 deletions

View File

@@ -83,7 +83,7 @@ kernel、KV cache、通信和排队的闭式性能模型。更稳妥也更强的
| C5. AITuner 找到 near-optimal region而不是只找到一个可行 config | Qwen30B 有解释性信号 | [Qwen30B SLO robustness](harness-ablation/qwen30b-slo-robustness-20260624.md) | 选 1-2 个 case 做局部 grid 或专家配置对照 |
| C6. AITuner 能随 SLO tightness 移动到合适 frontier | Qwen30B 已完成 | [Qwen30B SLO robustness](harness-ablation/qwen30b-slo-robustness-20260624.md) | 再选一个非同质 case 做 SLO sweep同时画 SLO tightness -> frontier/regime transition |
| C7. Engine adapter 让 intervention grammar 可迁移到其他 serving engine | 设计上可行,暂不作为主实验 claim | `EngineLaunchSpec` / launch recipe / tunable schema | vLLM 主线完成后,再做 SGLang adapter 和一个低成本验证 case |
| C8. Harness 对坏初始点有恢复能力,不只依赖可信 base config | 当前 rule-based fix 只能作为 prototype 信号,不能作为最终 claim | [Declarative intervention harness design](harness-ablation/declarative-intervention-harness-design-20260626.md), [No-LLM harness mechanism](harness-ablation/no-llm-harness-mechanism-20260625.md) | 重构为 grammar/operator 后跑 random/adversarial start distribution |
| C8. Harness 对坏初始点有恢复能力,不只依赖可信 base config | 当前发现反例,不能 claim | [Declarative intervention harness design](harness-ablation/declarative-intervention-harness-design-20260626.md), [Bad-start stop counterexample](harness-ablation/bad-start-stop-counterexample-20260626.md), [No-LLM harness mechanism](harness-ablation/no-llm-harness-mechanism-20260625.md) | 重构为 grammar/operator + coverage-relative stop 后跑 random/adversarial start distribution |
## 最高优先级实验
@@ -97,6 +97,8 @@ declarative intervention grammar + coverage-relative validator。
- CandidateSet 完整枚举并持久化 snapshot
- `harness_priority` 与 backend ranking 分离;
- CoverageUnit 结构化stop 不能只依赖 exact signature
- `search_high_saturated_by_incumbent` 不能绕过 CandidateSet coverage`req/s/GPU`
目标,未覆盖 topology/resource-efficiency contrast 时必须继续;
- Failure invalidation 有保守 region predicate 和 retry/unblock 条件;
- grammar/policy/capability 都有 version 和 anti-overfitting static checks
- LLM/BO 只能选择合法 candidate不能绕过 validator。

View File

@@ -0,0 +1,176 @@
# Bad-start stop counterexample - 2026-06-26
本文记录一次有意构造的 adversarial bad-start 测试。它的目的不是证明 harness 已经
robust而是攻击当前实现确认它是否会从明显不合理的初始配置中恢复。
结论:
```text
当前 production/prototype harness 还不能支持 bad-start robustness claim。
它会在高 GPU、高 TP 的坏起点上被 search_high_saturated_by_incumbent 提前 stop
没有测试 topology/resource-efficiency contrast。
```
这不是一个需要补 `TP=8 -> TP=4` 特例规则的问题。它暴露的是更基础的 stop authority
问题measurement saturation 不能绕过 coverage-relative candidate set。
## 实验设置
机器:`dash1`8x H20。
目标:从一个故意不合理的初始配置开始:
```text
tensor-parallel-size = 8
data-parallel-size = 1
gpu-memory-utilization = 0.5
max-num-seqs = 8
LLM endpoint disabled
```
期望行为:
- harness 不应只因为 baseline feasible 就停止;
- 它至少应生成 topology/resource-efficiency contrast candidate
-`req/s/GPU` 目标8 GPU incumbent 需要被低 GPU 或邻域 topology probe 验证。
## Run A: 低 search.high
第一轮保留原始 `search.high=0.125`
结果:
```text
trial-0001 completed
harness-stop-0002
tuning_stop_reason = harness_stop
validator reason = search_high_saturated_by_incumbent
best request_rate = 1.0333 total
best request_rate_per_gpu = 0.1292
pass_rate = 1.0
```
解释:这个 run 的 offered-load ceiling 太低baseline 很容易 saturate `search.high`
因此它不能区分“配置真的足够好”和“测量上限太低”。
## Run B: corrected high search ceiling
第二轮把 `search.high` 提到 `1.0`,保留同一个 bad-start 配置,`max_trials=3`
远端产物:
```text
session = adv_badcase_corr_casea_20260626T095356Z
store = /home/admin/cpfs/wjh/aituner/aituner/.aituner/adversarial-badcase-corrected-casea-20260626T095356Z
spec = /home/admin/cpfs/wjh/aituner/aituner/.aituner-run-configs/adversarial-badcase-corrected-casea-20260626T095356Z/casea-combined-bad-highsearch.json
log = /home/admin/cpfs/wjh/aituner/aituner/.aituner/adversarial-badcase-corrected-casea-20260626T095356Z.log
```
结果仍然是在 baseline 后 stop
```text
trial-0001 completed
harness-stop-0002
no harness-proposal-0002.json
tuning_stop_reason = harness_stop
validator reason = search_high_saturated_by_incumbent
best sampling_u = 0.9375
best request_rate = 8.033333333333333
best request_rate_per_gpu = 1.0041666666666667
pass_rate = 1.0
```
Probe trace
| sampling_u | request_rate | feasible |
| --- | ---: | --- |
| 0.5 | 4.6000 | true |
| 0.75 | 6.5167 | true |
| 0.875 | 7.5000 | true |
| 0.9375 | 8.0333 | true |
它触发 stop 的原因是当前 guard 计算:
```text
binary_probe_resolution = max(tolerance, (high - low) / 2**max_probes)
= 0.0625
threshold_gap_to_high = 1.0 - 0.9375
= 0.0625
```
因此当前实现认为 incumbent 已经 saturate `search.high`
## 为什么这是反例
当前 objective 是 SLO-constrained `req/s/GPU`,不是固定 8 GPU 的 total throughput。
一个 8-GPU incumbent saturate offered-load ceiling并不能证明
- 低 TP / 低 GPU 配置没有更高 `req/s/GPU`
- 当前 topology 是资源效率最优;
- runtime knobs 已经进入合适 trust region
- no-LLM harness 能从 bad start 中恢复。
所以这个 stop 是 unsound 的,至少相对于 bad-start robustness claim 是 unsound。
更形式化地说:
```text
search_high_saturated_by_incumbent
does not imply
incumbent_validated(topology/resource-efficiency)
```
当目标包含 resource efficiency并且 parallel-size/topology 仍然 tunable 时,
`search_high_saturated_by_incumbent` 只能作为 measurement evidence不能单独作为 stop
authority。
## 对新 harness 设计的约束
这个反例直接约束 declarative harness
1. Stop 前必须生成并持久化完整 `CandidateSet`
2. Stop proof 必须引用 `candidate_set_hash`
3. 如果存在未覆盖的 high-priority topology/resource-efficiency candidatevalidator
必须返回 `eligible_candidates_remain`,即使 incumbent saturate `search.high`
4. `search.high` saturation 只能更新 measurement coverage不能替代
`incumbent_validated`
5.`req/s/GPU` objectiverequired coverage 必须包含至少一个 topology 或
resource-efficiency contrast除非 StudySpec 明确固定 GPU budget 和 topology。
这也说明当前 repair 方向不能是:
```text
if tp == 8 and gmu == 0.5: try tp = 4
```
正确方向应该是:
```text
ordered topology lattice + resource-efficiency objective
-> candidate set includes lower/redistributed topology contrast
-> stop is blocked until that coverage unit is measured or invalidated
```
## 当前 verdict
当前 production harness
```text
prototype, not yet fundamental
```
新的 declarative prototype
```text
promising substrate, but not production-proven
```
它已经把 `CandidateSet``CoverageUnit`、failure region 和 coverage-relative stop 的最小
接口跑通,但还没接入真实 tuning loop也还没证明 bad-start distribution 的收敛。
因此接下来的 P0 gate 是:
```text
先实现 coverage-relative stop authority再重跑 bad-start distribution。
```

View File

@@ -46,6 +46,28 @@ priority 中,仍然可能只是“换皮的 rule-based harness”。因此
下面的设计已经把这些 major revisions 纳入硬性要求。
## 2026-06-26 adversarial status
我们已经用 `TP=8, gmu=0.5, max-num-seqs=8` 的 bad-start case 攻击当前 production
harness。结果显示当前 stop guard 会在 baseline 后触发
`search_high_saturated_by_incumbent`,没有生成 topology/resource-efficiency contrast。
这证明当前 implementation 还不是最终 contribution。
详细反例见
[Bad-start stop counterexample](bad-start-stop-counterexample-20260626.md)。
该反例给本设计增加一个硬约束:
```text
search_high_saturated_by_incumbent may be measurement evidence,
but it cannot bypass candidate-set coverage when topology/resource efficiency
remains tunable.
```
因此新的 CoverageValidator 必须先证明没有未覆盖的 high-priority candidate才能授权
stop。对 `req/s/GPU` objective未覆盖的 topology/resource-efficiency contrast 必须阻止
stop除非 StudySpec 明确固定 topology/GPU budget。
## 当前问题
当前 `src/aituner/harness.py` 已经具备了一些正确的抽象词汇observation、

View File

@@ -0,0 +1,395 @@
from __future__ import annotations
"""Experimental declarative harness substrate.
This module intentionally stays separate from the production harness while the
coverage-relative design is being validated. It models a small, typed subset of
the proposed intervention grammar: axes, generic operators, complete candidate
sets, failure regions, and stop reports.
"""
import hashlib
import json
from dataclasses import dataclass
from typing import Any, Literal, Mapping, Sequence
AxisKind = Literal["ordered_lattice", "bounded_numeric"]
OperatorKind = Literal["bracket", "step_up", "step_down", "jump_to_floor", "local_climb"]
RegionRelation = Literal["eq", "ge", "le"]
@dataclass(frozen=True)
class AxisSpec:
name: str
kind: AxisKind
values: tuple[Any, ...] = ()
floor: float | None = None
ceiling: float | None = None
step: float | None = None
def validate(self) -> None:
if not self.name:
raise ValueError("axis name must be non-empty")
if self.kind == "ordered_lattice":
if not self.values:
raise ValueError(f"ordered lattice axis {self.name!r} needs values")
if len(set(_stable_token(value) for value in self.values)) != len(self.values):
raise ValueError(f"ordered lattice axis {self.name!r} has duplicate values")
return
if self.floor is None or self.ceiling is None:
raise ValueError(f"bounded numeric axis {self.name!r} needs floor and ceiling")
if self.floor > self.ceiling:
raise ValueError(f"bounded numeric axis {self.name!r} has floor above ceiling")
if self.step is None or self.step <= 0:
raise ValueError(f"bounded numeric axis {self.name!r} needs a positive step")
@dataclass(frozen=True)
class OperatorSpec:
name: str
axis: str
kind: OperatorKind
harness_priority: float = 0.0
@dataclass(frozen=True)
class CoverageUnit:
axis: str
operator: str
target: Any
@property
def unit_id(self) -> str:
return coverage_unit_id(self.axis, self.operator, self.target)
@dataclass(frozen=True)
class CandidateAction:
action_id: str
operator: str
axis: str
patch: Mapping[str, Any]
harness_priority: float
planner_score: float | None = None
backend_score: float | None = None
coverage_units: tuple[CoverageUnit, ...] = ()
source_value: Any = None
target_value: Any = None
@property
def signature(self) -> str:
return config_signature(self.patch)
@dataclass(frozen=True)
class BlockedCandidate:
candidate: CandidateAction
reason: str
@dataclass(frozen=True)
class FailureRegion:
axis: str
relation: RegionRelation
value: Any
reason: str = "prior_failure"
def contains(self, candidate: CandidateAction) -> bool:
if candidate.axis != self.axis:
return False
target = candidate.target_value
if self.relation == "eq":
return target == self.value
if self.relation == "ge":
return target >= self.value
if self.relation == "le":
return target <= self.value
raise ValueError(f"unknown region relation {self.relation!r}")
@dataclass(frozen=True)
class CoverageState:
tested_signatures: frozenset[str] = frozenset()
covered_unit_ids: frozenset[str] = frozenset()
failed_regions: tuple[FailureRegion, ...] = ()
@dataclass(frozen=True)
class HarnessPolicy:
operators: tuple[OperatorSpec, ...]
no_repeat: bool = True
required_coverage_unit_ids: frozenset[str] = frozenset()
@dataclass(frozen=True)
class CandidateSet:
eligible: tuple[CandidateAction, ...]
blocked: tuple[BlockedCandidate, ...]
candidate_set_hash: str
@dataclass(frozen=True)
class StopReport:
should_stop: bool
reason: str
candidate_set_hash: str
uncovered_unit_ids: tuple[str, ...] = ()
eligible_count: int = 0
blocked_count: int = 0
def config_signature(patch: Mapping[str, Any]) -> str:
return json.dumps(dict(patch), sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def coverage_unit_id(axis: str, operator: str, target: Any) -> str:
target_text = json.dumps(target, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
return f"{axis}:{operator}:{target_text}"
def ordered_lattice_failure_region(
axis: AxisSpec,
failed_value: Any,
*,
direction: Literal["up", "down", "exact"],
reason: str = "prior_failure",
) -> FailureRegion:
axis.validate()
if axis.kind != "ordered_lattice":
raise ValueError("ordered_lattice_failure_region requires an ordered lattice axis")
if failed_value not in axis.values:
raise ValueError(f"{failed_value!r} is not in lattice axis {axis.name!r}")
if direction == "up":
return FailureRegion(axis=axis.name, relation="ge", value=failed_value, reason=reason)
if direction == "down":
return FailureRegion(axis=axis.name, relation="le", value=failed_value, reason=reason)
return FailureRegion(axis=axis.name, relation="eq", value=failed_value, reason=reason)
def enumerate_candidate_set(
state: Mapping[str, Any],
axes: Sequence[AxisSpec],
policy: HarnessPolicy,
coverage_state: CoverageState | None = None,
) -> CandidateSet:
coverage_state = coverage_state or CoverageState()
axis_by_name = {axis.name: axis for axis in axes}
for axis in axes:
axis.validate()
eligible: list[CandidateAction] = []
blocked: list[BlockedCandidate] = []
for operator in sorted(
policy.operators,
key=lambda item: (item.axis, item.name, item.kind),
):
axis = axis_by_name.get(operator.axis)
if axis is None:
raise ValueError(f"operator {operator.name!r} references unknown axis {operator.axis!r}")
generated, generated_blocked = _generate_operator_actions(state, axis, operator)
blocked.extend(generated_blocked)
for candidate in generated:
reason = _blocking_reason(candidate, policy, coverage_state)
if reason is None:
eligible.append(candidate)
else:
blocked.append(BlockedCandidate(candidate=candidate, reason=reason))
eligible_tuple = tuple(sorted(eligible, key=_candidate_sort_key))
blocked_tuple = tuple(
sorted(blocked, key=lambda item: (_candidate_sort_key(item.candidate), item.reason))
)
return CandidateSet(
eligible=eligible_tuple,
blocked=blocked_tuple,
candidate_set_hash=_candidate_set_hash(eligible_tuple, blocked_tuple),
)
def validate_coverage_stop(
candidate_set: CandidateSet,
policy: HarnessPolicy,
coverage_state: CoverageState,
) -> StopReport:
uncovered = tuple(sorted(policy.required_coverage_unit_ids - coverage_state.covered_unit_ids))
if uncovered:
return StopReport(
should_stop=False,
reason="coverage_units_missing",
candidate_set_hash=candidate_set.candidate_set_hash,
uncovered_unit_ids=uncovered,
eligible_count=len(candidate_set.eligible),
blocked_count=len(candidate_set.blocked),
)
if candidate_set.eligible:
return StopReport(
should_stop=False,
reason="eligible_candidates_remain",
candidate_set_hash=candidate_set.candidate_set_hash,
eligible_count=len(candidate_set.eligible),
blocked_count=len(candidate_set.blocked),
)
return StopReport(
should_stop=True,
reason="coverage_complete_no_eligible_candidates",
candidate_set_hash=candidate_set.candidate_set_hash,
eligible_count=0,
blocked_count=len(candidate_set.blocked),
)
def _generate_operator_actions(
state: Mapping[str, Any],
axis: AxisSpec,
operator: OperatorSpec,
) -> tuple[list[CandidateAction], list[BlockedCandidate]]:
if axis.kind == "ordered_lattice":
return _ordered_lattice_actions(state, axis, operator)
return _bounded_numeric_actions(state, axis, operator)
def _ordered_lattice_actions(
state: Mapping[str, Any],
axis: AxisSpec,
operator: OperatorSpec,
) -> tuple[list[CandidateAction], list[BlockedCandidate]]:
if operator.kind not in {"bracket", "step_up", "step_down"}:
raise ValueError(
f"operator {operator.name!r} is not valid for ordered lattice axis {axis.name!r}"
)
current = state.get(axis.name)
if current not in axis.values:
raise ValueError(f"state value {current!r} is not in lattice axis {axis.name!r}")
index = axis.values.index(current)
if operator.kind == "bracket":
targets = [value for value in axis.values if value != current]
return ([_candidate(axis, operator, current, target) for target in targets], [])
if operator.kind == "step_up":
if index == len(axis.values) - 1:
return (
[],
[_boundary_block(axis, operator, current, "ordered_lattice_upper_boundary")],
)
return ([_candidate(axis, operator, current, axis.values[index + 1])], [])
if index == 0:
return (
[],
[_boundary_block(axis, operator, current, "ordered_lattice_lower_boundary")],
)
return ([_candidate(axis, operator, current, axis.values[index - 1])], [])
def _bounded_numeric_actions(
state: Mapping[str, Any],
axis: AxisSpec,
operator: OperatorSpec,
) -> tuple[list[CandidateAction], list[BlockedCandidate]]:
if operator.kind not in {"jump_to_floor", "local_climb"}:
raise ValueError(
f"operator {operator.name!r} is not valid for bounded numeric axis {axis.name!r}"
)
current = _as_float(state.get(axis.name), axis=axis.name)
assert axis.floor is not None
assert axis.ceiling is not None
assert axis.step is not None
if operator.kind == "jump_to_floor":
if current < axis.floor:
return ([_candidate(axis, operator, current, axis.floor)], [])
return ([], [_boundary_block(axis, operator, current, "numeric_at_or_above_floor")])
if current < axis.floor:
return ([], [_boundary_block(axis, operator, current, "numeric_below_floor")])
if current >= axis.ceiling:
return ([], [_boundary_block(axis, operator, current, "numeric_upper_boundary")])
target = min(axis.ceiling, current + axis.step)
return ([_candidate(axis, operator, current, target)], [])
def _candidate(axis: AxisSpec, operator: OperatorSpec, source: Any, target: Any) -> CandidateAction:
coverage = CoverageUnit(axis=axis.name, operator=operator.kind, target=target)
return CandidateAction(
action_id=f"{operator.name}:{axis.name}:{_stable_token(source)}->{_stable_token(target)}",
operator=operator.name,
axis=axis.name,
patch={axis.name: target},
harness_priority=operator.harness_priority,
coverage_units=(coverage,),
source_value=source,
target_value=target,
)
def _boundary_block(axis: AxisSpec, operator: OperatorSpec, current: Any, reason: str) -> BlockedCandidate:
candidate = CandidateAction(
action_id=f"{operator.name}:{axis.name}:{_stable_token(current)}->boundary",
operator=operator.name,
axis=axis.name,
patch={axis.name: current},
harness_priority=operator.harness_priority,
coverage_units=(),
source_value=current,
target_value=current,
)
return BlockedCandidate(candidate=candidate, reason=reason)
def _blocking_reason(
candidate: CandidateAction,
policy: HarnessPolicy,
coverage_state: CoverageState,
) -> str | None:
if policy.no_repeat and candidate.signature in coverage_state.tested_signatures:
return "no_repeat: signature already tested"
for region in coverage_state.failed_regions:
if region.contains(candidate):
return f"failure_region:{region.axis}:{region.relation}:{_stable_token(region.value)}:{region.reason}"
return None
def _candidate_set_hash(
eligible: tuple[CandidateAction, ...],
blocked: tuple[BlockedCandidate, ...],
) -> str:
payload = {
"eligible": [_candidate_payload(candidate) for candidate in eligible],
"blocked": [
{"candidate": _candidate_payload(item.candidate), "reason": item.reason}
for item in blocked
],
}
encoded = json.dumps(
payload,
sort_keys=True,
separators=(",", ":"),
ensure_ascii=False,
).encode("utf-8")
return hashlib.sha256(encoded).hexdigest()
def _candidate_payload(candidate: CandidateAction) -> dict[str, Any]:
return {
"action_id": candidate.action_id,
"axis": candidate.axis,
"operator": candidate.operator,
"patch": dict(candidate.patch),
"harness_priority": candidate.harness_priority,
"planner_score": candidate.planner_score,
"backend_score": candidate.backend_score,
"coverage_unit_ids": [unit.unit_id for unit in candidate.coverage_units],
"source_value": candidate.source_value,
"target_value": candidate.target_value,
}
def _candidate_sort_key(candidate: CandidateAction) -> tuple[float, str, str]:
return (-candidate.harness_priority, candidate.axis, candidate.action_id)
def _stable_token(value: Any) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def _as_float(value: Any, *, axis: str) -> float:
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ValueError(f"state value for numeric axis {axis!r} must be numeric")
return float(value)

View File

@@ -0,0 +1,156 @@
from __future__ import annotations
import unittest
from aituner.declarative_harness import (
AxisSpec,
CoverageState,
HarnessPolicy,
OperatorSpec,
config_signature,
coverage_unit_id,
enumerate_candidate_set,
ordered_lattice_failure_region,
validate_coverage_stop,
)
class DeclarativeHarnessTests(unittest.TestCase):
def test_same_state_grammar_policy_candidate_set_is_deterministic(self) -> None:
axes = (
AxisSpec(name="tp", kind="ordered_lattice", values=(1, 2, 4)),
AxisSpec(name="gmu", kind="bounded_numeric", floor=0.7, ceiling=0.95, step=0.05),
)
policy = HarnessPolicy(
operators=(
OperatorSpec(name="runtime_climb", axis="gmu", kind="local_climb", harness_priority=1),
OperatorSpec(name="topology_bracket", axis="tp", kind="bracket", harness_priority=5),
OperatorSpec(name="runtime_floor", axis="gmu", kind="jump_to_floor", harness_priority=2),
)
)
first = enumerate_candidate_set({"tp": 2, "gmu": 0.8}, axes, policy)
second = enumerate_candidate_set({"gmu": 0.8, "tp": 2}, axes, policy)
self.assertEqual(first.candidate_set_hash, second.candidate_set_hash)
self.assertEqual(
[candidate.action_id for candidate in first.eligible],
[candidate.action_id for candidate in second.eligible],
)
self.assertEqual(
[blocked.reason for blocked in first.blocked],
[blocked.reason for blocked in second.blocked],
)
self.assertTrue(all(candidate.planner_score is None for candidate in first.eligible))
self.assertTrue(all(candidate.backend_score is None for candidate in first.eligible))
def test_toy_lattice_bracket_enumerates_all_other_lattice_points(self) -> None:
axis = AxisSpec(name="tp", kind="ordered_lattice", values=(1, 2, 4, 8))
policy = HarnessPolicy(
operators=(OperatorSpec(name="topology_bracket", axis="tp", kind="bracket"),)
)
candidate_set = enumerate_candidate_set({"tp": 2}, (axis,), policy)
self.assertEqual({candidate.target_value for candidate in candidate_set.eligible}, {1, 4, 8})
self.assertEqual(candidate_set.blocked, ())
def test_no_repeat_blocks_exact_candidate_signature_and_records_reason(self) -> None:
axis = AxisSpec(name="tp", kind="ordered_lattice", values=(1, 2, 4))
policy = HarnessPolicy(operators=(OperatorSpec(name="step", axis="tp", kind="step_up"),))
tested = CoverageState(tested_signatures=frozenset({config_signature({"tp": 4})}))
candidate_set = enumerate_candidate_set({"tp": 2}, (axis,), policy, tested)
self.assertEqual(candidate_set.eligible, ())
self.assertEqual(len(candidate_set.blocked), 1)
self.assertEqual(candidate_set.blocked[0].candidate.target_value, 4)
self.assertEqual(candidate_set.blocked[0].reason, "no_repeat: signature already tested")
def test_ordered_lattice_upper_boundary_uses_axis_values_not_hard_coded_tp8(self) -> None:
for values in ((1, 3, 9), (2, 5, 10, 20)):
with self.subTest(values=values):
axis = AxisSpec(name="parallel_size", kind="ordered_lattice", values=values)
policy = HarnessPolicy(
operators=(OperatorSpec(name="step", axis=axis.name, kind="step_up"),)
)
candidate_set = enumerate_candidate_set({axis.name: values[-1]}, (axis,), policy)
self.assertEqual(candidate_set.eligible, ())
self.assertEqual(len(candidate_set.blocked), 1)
self.assertEqual(candidate_set.blocked[0].reason, "ordered_lattice_upper_boundary")
self.assertEqual(candidate_set.blocked[0].candidate.source_value, values[-1])
def test_bounded_numeric_jump_to_floor_uses_declared_floor_not_fixed_gmu_values(self) -> None:
for current, floor, ceiling in ((0.2, 0.6, 0.95), (0.77, 0.83, 0.91)):
with self.subTest(current=current, floor=floor, ceiling=ceiling):
axis = AxisSpec(
name="memory_fraction",
kind="bounded_numeric",
floor=floor,
ceiling=ceiling,
step=0.02,
)
policy = HarnessPolicy(
operators=(OperatorSpec(name="floor", axis="memory_fraction", kind="jump_to_floor"),)
)
candidate_set = enumerate_candidate_set({"memory_fraction": current}, (axis,), policy)
self.assertEqual(len(candidate_set.eligible), 1)
self.assertEqual(candidate_set.eligible[0].target_value, floor)
self.assertEqual(candidate_set.eligible[0].patch, {"memory_fraction": floor})
def test_coverage_stop_does_not_treat_signature_tested_as_coverage(self) -> None:
axis = AxisSpec(name="tp", kind="ordered_lattice", values=(1, 2))
required_unit = coverage_unit_id("tp", "step_up", 2)
policy = HarnessPolicy(
operators=(OperatorSpec(name="step", axis="tp", kind="step_up"),),
required_coverage_unit_ids=frozenset({required_unit}),
)
candidate = enumerate_candidate_set({"tp": 1}, (axis,), policy).eligible[0]
coverage_state = CoverageState(tested_signatures=frozenset({candidate.signature}))
candidate_set = enumerate_candidate_set({"tp": 1}, (axis,), policy, coverage_state)
stop = validate_coverage_stop(candidate_set, policy, coverage_state)
self.assertEqual(candidate_set.eligible, ())
self.assertEqual(stop.candidate_set_hash, candidate_set.candidate_set_hash)
self.assertFalse(stop.should_stop)
self.assertEqual(stop.reason, "coverage_units_missing")
self.assertEqual(stop.uncovered_unit_ids, (required_unit,))
def test_failure_invalidation_uses_conservative_region_not_exact_signature_only(self) -> None:
axis = AxisSpec(name="tp", kind="ordered_lattice", values=(1, 2, 4, 8))
policy = HarnessPolicy(
operators=(OperatorSpec(name="topology_bracket", axis="tp", kind="bracket"),)
)
exact_only = CoverageState(tested_signatures=frozenset({config_signature({"tp": 4})}))
exact_set = enumerate_candidate_set({"tp": 1}, (axis,), policy, exact_only)
self.assertEqual({candidate.target_value for candidate in exact_set.eligible}, {2, 8})
region = ordered_lattice_failure_region(
axis,
4,
direction="up",
reason="launch_failure_at_or_above_parallel_size",
)
regional_set = enumerate_candidate_set(
{"tp": 1},
(axis,),
policy,
CoverageState(failed_regions=(region,)),
)
self.assertEqual({candidate.target_value for candidate in regional_set.eligible}, {2})
blocked_targets = {blocked.candidate.target_value for blocked in regional_set.blocked}
self.assertTrue({4, 8}.issubset(blocked_targets))
self.assertTrue(
all("failure_region:tp:ge:4" in blocked.reason for blocked in regional_set.blocked)
)
if __name__ == "__main__":
unittest.main()