Add advisory harness attribution and descriptor planner MVP

This commit is contained in:
2026-06-30 12:05:03 +08:00
parent 08429e5da8
commit adb5356c4b
11 changed files with 1066 additions and 9 deletions

View File

@@ -0,0 +1,37 @@
# Candidate Family Gap Review Log
本文档维护 LLM 在 `advisory` 模式下提出 harness candidate set 之外配置、且该配置带来性能提升时的人工 review 入口。
运行时系统不会自动修改 harness也不会把 LLM 的 out-of-set proposal 直接提升为规则。每条提升先写入 study artifact
```text
.aituner/<study>/candidate_family_gaps/<trial-id>.json
```
然后人工 review 决定是否需要修改:
- `KnobDescriptor`
- generic operator
- acquisition / step-size policy
- evidence estimator
## Gap 分类
| 类型 | 含义 | 默认处理 |
|---|---|---|
| `same_operator_new_step` | harness 已有同 knob / 同方向候选,但 LLM 给了更好的 step/value | 优先检查 trust-region、step-size、candidate budget 和 acquisition |
| `missing_operator` | visible candidate set 中没有同 knob 或同 mechanism 的候选 | 检查是否缺 generic operator 或 descriptor 映射 |
| `missing_descriptor` | knob 不在 adapter descriptor 中 | 需要 engine adapter review |
| `missing_mechanism` | 现有机制词表无法表达该 proposal 的作用 | 需要 design review |
| `llm_independent_discovery` | LLM 发现无法归入当前 harness abstraction 的新方向 | 只作为 observation不直接进入 harness |
## Review 原则
1. 不接受 case-specific 数值表,例如“这个 case 试 `max-num-seqs=24`”。
2. 若归类为 `same_operator_new_step`,只能修改通用 step policy例如 grow/shrink factor、local grid budget、bracket 触发条件。
3. 若归类为 `missing_descriptor`descriptor 只能表达 knob 语义、约束、search geometry 和 directional effects不能表达具体目标答案。
4. 任何被接受的 gap 都需要新增 synthetic test证明它不依赖 vLLM 常见取值或某个 bad-start case。
## Pending
当前 repo 内尚无已人工接受的 candidate family gap。实验产生的 JSON artifact 需要在这里补充 review 摘要后再进入代码设计。

View File

@@ -1,10 +1,12 @@
from __future__ import annotations
import argparse
import hashlib
import json
import sys
from dataclasses import replace
from pathlib import Path
from typing import Any
from .compare import run_compare
from .config_signature import (
@@ -25,6 +27,7 @@ from .lca import (
)
from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text
from .spec import (
ConfigPatch,
Proposal,
SpecError,
StudySpec,
@@ -68,6 +71,302 @@ def _reject_repeated_effective_config(
)
def _effective_config_fingerprint(signature: str) -> str:
return hashlib.sha256(signature.encode("utf-8")).hexdigest()
def _proposal_effective_config_fingerprint(
*,
study: StudySpec,
state: StudyState,
proposal: Proposal,
) -> str | None:
if proposal.should_stop:
return None
signature = materialized_effective_config_signature(
study=study,
state=state,
proposal=proposal,
)
return _effective_config_fingerprint(signature)
def _visible_harness_candidates(
harness_context: dict[str, object] | None,
) -> list[dict[str, Any]]:
if not isinstance(harness_context, dict):
return []
experiment_plan = harness_context.get("experiment_plan")
if not isinstance(experiment_plan, dict):
return []
candidate_set = experiment_plan.get("candidate_set")
if not isinstance(candidate_set, dict):
return []
candidates = candidate_set.get("eligible_candidates")
if not isinstance(candidates, list):
return []
return [item for item in candidates if isinstance(item, dict)]
def _candidate_set_hash(harness_context: dict[str, object] | None) -> object:
if not isinstance(harness_context, dict):
return None
experiment_plan = harness_context.get("experiment_plan")
if not isinstance(experiment_plan, dict):
return None
candidate_set = experiment_plan.get("candidate_set")
if not isinstance(candidate_set, dict):
return None
return candidate_set.get("candidate_set_hash")
def _match_visible_harness_candidate(
*,
proposal_fingerprint: str | None,
harness_context: dict[str, object] | None,
) -> dict[str, Any] | None:
if proposal_fingerprint is None:
return None
for candidate in _visible_harness_candidates(harness_context):
if candidate.get("effective_config_fingerprint") == proposal_fingerprint:
return candidate
return None
def _proposal_source_label(
*,
proposal_name: str,
proposal_source: Path | None,
) -> str:
if proposal_name.startswith("baseline-"):
return "baseline"
if proposal_name.startswith("harness-stop-") or proposal_name.startswith("harness-proposal-"):
return "harness"
return str(proposal_source) if proposal_source else "llm"
def _classify_proposal_attribution(
*,
study: StudySpec,
state: StudyState,
proposal: Proposal,
proposal_name: str,
proposal_source: Path | None,
harness_context: dict[str, object] | None,
) -> dict[str, Any]:
source_label = _proposal_source_label(
proposal_name=proposal_name,
proposal_source=proposal_source,
)
fingerprint = _proposal_effective_config_fingerprint(
study=study,
state=state,
proposal=proposal,
)
matched = _match_visible_harness_candidate(
proposal_fingerprint=fingerprint,
harness_context=harness_context,
)
matched_candidate_id = matched.get("candidate_id") if matched else None
if proposal.should_stop:
origin = (
"harness_stop"
if proposal_name.startswith("harness-stop-")
else "proposal_file_stop"
if proposal_source is not None
else "llm_stop"
)
elif proposal_name.startswith("baseline-"):
origin = "baseline"
elif proposal_name.startswith("harness-proposal-"):
origin = "harness_top1"
elif proposal_source is not None:
origin = "proposal_file"
elif study.llm.use_harness and harness_context is not None:
origin = "llm_selected_harness_candidate" if matched else "llm_out_of_set"
else:
origin = "llm_no_harness"
return {
"schema_version": 1,
"proposal_name": proposal_name,
"proposal_source": source_label,
"proposal_origin": origin,
"harness_candidate_policy": study.llm.harness_candidate_policy,
"candidate_set_hash": _candidate_set_hash(harness_context),
"proposal_effective_config_fingerprint": fingerprint,
"matched_effective_config_signature": matched is not None,
"matched_candidate_id": matched_candidate_id,
"matched_candidate": _compact_candidate_for_attribution(matched),
}
def _compact_candidate_for_attribution(candidate: dict[str, Any] | None) -> dict[str, Any] | None:
if not candidate:
return None
return {
"candidate_id": candidate.get("candidate_id"),
"action_id": candidate.get("action_id"),
"knob_family": candidate.get("knob_family"),
"score": candidate.get("score"),
"config_patch": candidate.get("config_patch"),
}
def _validate_harness_candidate_policy(attribution: dict[str, Any]) -> None:
if (
attribution.get("harness_candidate_policy") == "strict"
and attribution.get("proposal_origin") == "llm_out_of_set"
):
raise SpecError(
"LLM proposal is outside the visible harness eligible candidate set while "
"llm.harness_candidate_policy=strict. Use an eligible harness candidate, "
"switch to advisory mode, or disable harness context."
)
def _config_patch_knob_keys(config_patch: object) -> set[str]:
if not isinstance(config_patch, dict):
return set()
keys: set[str] = set()
env_patch = config_patch.get("env_patch")
if isinstance(env_patch, dict):
keys.update(f"env:{key}" for key in env_patch)
flag_patch = config_patch.get("flag_patch")
if isinstance(flag_patch, dict):
keys.update(f"flag:{key}" for key in flag_patch)
return keys
def _proposal_knob_keys(proposal: Proposal) -> set[str]:
return {
*{f"env:{key}" for key in proposal.config_patch.env_patch},
*{f"flag:{key}" for key in proposal.config_patch.flag_patch},
}
def _nearest_visible_harness_candidates(
*,
proposal: Proposal,
harness_context: dict[str, object] | None,
limit: int = 3,
) -> list[dict[str, Any]]:
proposal_keys = _proposal_knob_keys(proposal)
scored: list[tuple[int, float, dict[str, Any]]] = []
for candidate in _visible_harness_candidates(harness_context):
candidate_keys = _config_patch_knob_keys(candidate.get("config_patch"))
overlap = len(proposal_keys & candidate_keys)
score = candidate.get("score")
candidate_score = float(score) if isinstance(score, (int, float)) else 0.0
scored.append((overlap, candidate_score, candidate))
scored.sort(key=lambda item: (item[0], item[1]), reverse=True)
return [_compact_candidate_for_attribution(item[2]) or {} for item in scored[:limit]]
def _candidate_family_gap_payload(
*,
study: StudySpec,
trial_id: str,
proposal_name: str,
proposal: Proposal,
attribution: dict[str, Any],
harness_context: dict[str, object] | None,
incumbent_rate_per_gpu: float,
result_rate_per_gpu: float,
) -> dict[str, Any]:
nearest = _nearest_visible_harness_candidates(
proposal=proposal,
harness_context=harness_context,
)
proposal_keys = _proposal_knob_keys(proposal)
has_same_knob_candidate = any(
proposal_keys
& _config_patch_knob_keys(candidate.get("config_patch") if isinstance(candidate, dict) else None)
for candidate in nearest
)
return {
"schema_version": 1,
"study_id": study.study_id,
"trial_id": trial_id,
"proposal_name": proposal_name,
"proposal_origin": attribution.get("proposal_origin"),
"gap_type": "same_operator_new_step" if has_same_knob_candidate else "missing_operator",
"review_status": "pending",
"incumbent_request_rate_per_gpu": incumbent_rate_per_gpu,
"result_request_rate_per_gpu": result_rate_per_gpu,
"absolute_gain": result_rate_per_gpu - incumbent_rate_per_gpu,
"relative_gain": (
(result_rate_per_gpu - incumbent_rate_per_gpu) / incumbent_rate_per_gpu
if incumbent_rate_per_gpu > 0
else None
),
"proposal_patch": {
"env_patch": dict(proposal.config_patch.env_patch),
"flag_patch": dict(proposal.config_patch.flag_patch),
},
"changed_knobs": sorted(proposal_keys),
"candidate_set_hash": attribution.get("candidate_set_hash"),
"nearest_harness_candidates": nearest,
"interpretation": (
"LLM changed a knob already present in the visible harness candidate set; "
"treat this as a step-size/acquisition gap until offline review accepts "
"a descriptor or operator change."
if has_same_knob_candidate
else "LLM changed knobs not represented by the visible harness candidates; "
"offline review should decide whether this is a missing operator, "
"descriptor, or mechanism."
),
}
def _result_request_rate_per_gpu(result: dict[str, object], parallel_size: int | None) -> float | None:
best_request_rate = result.get("best_request_rate")
if not isinstance(best_request_rate, (int, float)):
return None
if not isinstance(parallel_size, int) or parallel_size <= 0:
return None
return float(best_request_rate) / float(parallel_size)
def _parse_parallel_int(value: object, *, default: int = 1) -> int:
if value is None:
return default
if isinstance(value, bool):
raise SpecError("Boolean values are not valid topology settings.")
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str) and value.strip():
return int(value.strip())
raise SpecError(f"Unable to parse topology setting from {value!r}.")
def _parallel_size_for_config_patch(study: StudySpec, config_patch: object) -> int | None:
if not isinstance(config_patch, ConfigPatch):
return None
flags: dict[str, object] = dict(study.engine.base_flags)
flags.update(config_patch.flag_patch)
tp = _parse_parallel_int(flags.get("tensor-parallel-size"), default=1)
dp = _parse_parallel_int(flags.get("data-parallel-size"), default=1)
return tp * dp
def _is_candidate_family_gap(
*,
attribution: dict[str, Any],
incumbent_rate_per_gpu: float | None,
result_rate_per_gpu: float | None,
) -> bool:
if attribution.get("proposal_origin") != "llm_out_of_set":
return False
if not isinstance(incumbent_rate_per_gpu, (int, float)):
return False
if not isinstance(result_rate_per_gpu, (int, float)):
return False
min_gain = max(1e-6, float(incumbent_rate_per_gpu) * 0.01)
return float(result_rate_per_gpu) > float(incumbent_rate_per_gpu) + min_gain
def _harness_snapshot_payload(
*,
study: StudySpec,
@@ -418,8 +717,23 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
proposal=proposal,
proposal_name=proposal_name,
)
proposal_attribution = _classify_proposal_attribution(
study=study,
state=state,
proposal=proposal,
proposal_name=proposal_name,
proposal_source=proposal_source,
harness_context=harness_context,
)
_validate_harness_candidate_policy(proposal_attribution)
store.write_proposal(study.study_id, proposal_name, proposal)
if proposal.should_stop:
proposal_attribution["stopped"] = True
store.write_proposal_attribution(
study.study_id,
proposal_name,
proposal_attribution,
)
is_harness_stop = proposal_name.startswith("harness-stop-")
is_llm_stop = not is_harness_stop and proposal_source is None
stop_authority = (
@@ -498,21 +812,55 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
and not state.trials
and _is_empty_config_patch(proposal)
)
pre_trial_best_rate_per_gpu = state.best_request_rate_per_gpu
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
proposal_attribution["trial_id"] = trial.trial_id
store.write_proposal_attribution(
study.study_id,
proposal_name,
proposal_attribution,
)
trial_spec_path = Path(trial.artifact_dir) / "trial_spec.json"
result = run_trial(trial_spec_path)
state = store.ingest_trial_results(study.study_id)
trial_parallel_size = _parallel_size_for_config_patch(study, trial.config_patch)
result_rate_per_gpu = _result_request_rate_per_gpu(result, trial_parallel_size)
gap_path: Path | None = None
if _is_candidate_family_gap(
attribution=proposal_attribution,
incumbent_rate_per_gpu=pre_trial_best_rate_per_gpu,
result_rate_per_gpu=result_rate_per_gpu,
):
gap_payload = _candidate_family_gap_payload(
study=study,
trial_id=trial.trial_id,
proposal_name=proposal_name,
proposal=proposal,
attribution=proposal_attribution,
harness_context=harness_context,
incumbent_rate_per_gpu=float(pre_trial_best_rate_per_gpu),
result_rate_per_gpu=float(result_rate_per_gpu),
)
gap_path = store.write_candidate_family_gap(
study.study_id,
trial.trial_id,
gap_payload,
)
executed.append(
{
"trial_id": trial.trial_id,
"proposal_name": proposal_name,
"proposal_source": (
"harness"
if proposal_name.startswith("harness-proposal-")
else str(proposal_source) if proposal_source else "llm"
),
{
"trial_id": trial.trial_id,
"proposal_name": proposal_name,
"proposal_source": (
"harness"
if proposal_name.startswith("harness-proposal-")
else str(proposal_source) if proposal_source else "llm"
),
"proposal_origin": proposal_attribution.get("proposal_origin"),
"matched_candidate_id": proposal_attribution.get("matched_candidate_id"),
"candidate_family_gap_path": str(gap_path) if gap_path else None,
"best_sampling_u": result.get("best_sampling_u"),
"best_request_rate": result.get("best_request_rate"),
"best_request_rate_per_gpu": result_rate_per_gpu,
"best_pass_rate": result.get("best_pass_rate"),
"state_best_trial_id": state.best_trial_id,
"state_best_request_rate": state.best_request_rate,

View File

@@ -0,0 +1,2 @@
from __future__ import annotations

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from collections.abc import Iterable
from ..knob_descriptor import KnobConstraints, KnobDescriptor
def default_vllm_descriptors(*, tunable_flags: Iterable[str]) -> tuple[KnobDescriptor, ...]:
tunable = set(tunable_flags)
descriptors: list[KnobDescriptor] = []
if "max-num-seqs" in tunable:
descriptors.append(
KnobDescriptor(
name="max-num-seqs",
location="flag",
value_type="int",
mechanisms=("admission_capacity", "kv_memory_pressure"),
search_geometry="positive_capacity",
operators=("coordinate_line_search", "frontier_delta_projection"),
constraints=KnobConstraints(min_value=1, integer=True, multiple_of=8),
directional_effects={
"increase": ("admission_capacity",),
"decrease": ("kv_memory_pressure",),
},
risk_effects={
"increase": ("kv_memory_pressure", "decode_tail_latency"),
},
)
)
if "max-num-batched-tokens" in tunable:
descriptors.append(
KnobDescriptor(
name="max-num-batched-tokens",
location="flag",
value_type="int",
mechanisms=("prefill_scheduling", "decode_batching"),
search_geometry="positive_capacity",
operators=("coordinate_line_search", "frontier_delta_projection"),
constraints=KnobConstraints(min_value=1, integer=True, multiple_of=128),
directional_effects={
"increase": ("prefill_scheduling", "decode_batching"),
"decrease": ("prefill_tail_latency",),
},
risk_effects={
"increase": ("prefill_tail_latency", "kv_memory_pressure"),
},
)
)
if "gpu-memory-utilization" in tunable:
descriptors.append(
KnobDescriptor(
name="gpu-memory-utilization",
location="flag",
value_type="float",
mechanisms=("kv_memory_capacity", "launch_feasibility"),
search_geometry="bounded_fraction",
operators=("coordinate_line_search", "frontier_delta_projection"),
constraints=KnobConstraints(min_value=0.0, max_value=1.0),
directional_effects={
"increase": ("kv_memory_capacity",),
"decrease": ("launch_feasibility",),
},
risk_effects={
"increase": ("launch_feasibility",),
},
)
)
if "enable-chunked-prefill" in tunable:
descriptors.append(
KnobDescriptor(
name="enable-chunked-prefill",
location="flag",
value_type="bool",
mechanisms=("prefill_scheduling",),
search_geometry="toggle",
operators=("coordinate_line_search",),
directional_effects={
"toggle": ("prefill_scheduling",),
},
)
)
return tuple(descriptors)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal, Mapping
KnobLocation = Literal["flag", "env"]
KnobValueType = Literal["int", "float", "bool", "enum", "str"]
SearchGeometry = Literal["linear", "positive_capacity", "bounded_fraction", "toggle"]
@dataclass(frozen=True)
class KnobConstraints:
min_value: float | None = None
max_value: float | None = None
integer: bool = False
multiple_of: int | None = None
@dataclass(frozen=True)
class KnobDescriptor:
"""Declarative serving-knob semantics used by generic planners.
The descriptor intentionally does not enumerate target values for continuous
or large integer knobs. It describes how a generic operator may perturb the
knob and which mechanism each direction is expected to affect.
"""
name: str
location: KnobLocation
value_type: KnobValueType
mechanisms: tuple[str, ...]
search_geometry: SearchGeometry
operators: tuple[str, ...]
constraints: KnobConstraints = field(default_factory=KnobConstraints)
directional_effects: Mapping[str, tuple[str, ...]] = field(default_factory=dict)
risk_effects: Mapping[str, tuple[str, ...]] = field(default_factory=dict)
def current_value(self, config: Mapping[str, Any]) -> Any:
return config.get(self.name)

View File

@@ -317,6 +317,11 @@ def build_prompt(
if parallel_candidates
else "If TP/DP/EP are not tunable, focus on the remaining launch-safe runtime knobs."
),
(
"Harness candidate policy is advisory: prefer a high-scoring harness candidate when it matches your diagnosis, but you may propose an out-of-set config when the harness candidate family appears to miss the right step; such proposals are audited as candidate-family gaps."
if study.llm.harness_candidate_policy == "advisory"
else "Harness candidate policy is strict: your config_patch must match one of the harness eligible candidates after effective-config materialization."
),
"",
"Study stack:",
json.dumps(

View File

@@ -0,0 +1,213 @@
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Any, Mapping
from .knob_descriptor import KnobDescriptor
@dataclass(frozen=True)
class CoordinateSearchPolicy:
initial_relative_step: float = 1.0
initial_fraction_step: float = 0.05
grow_factor: float = 1.5
shrink_factor: float = 0.5
min_score: float = 0.0
@dataclass(frozen=True)
class CoordinateOperatorState:
knob: str
direction: str
trust_radius: float | None = None
last_good_value: Any | None = None
last_bad_value: Any | None = None
tested_values: tuple[Any, ...] = ()
@dataclass(frozen=True)
class MechanismCandidate:
action_id: str
knob: str
mechanism: str
operator: str
direction: str
patch: dict[str, Any]
score: float
score_factors: dict[str, float]
evidence_refs: tuple[str, ...] = ()
def coordinate_line_search_candidates(
*,
current_config: Mapping[str, Any],
descriptors: tuple[KnobDescriptor, ...],
evidence_weights: Mapping[str, float],
states: Mapping[tuple[str, str], CoordinateOperatorState] | None = None,
policy: CoordinateSearchPolicy | None = None,
) -> tuple[MechanismCandidate, ...]:
policy = policy or CoordinateSearchPolicy()
states = states or {}
candidates: list[MechanismCandidate] = []
for descriptor in descriptors:
if "coordinate_line_search" not in descriptor.operators:
continue
direction, mechanism, evidence = _choose_direction(descriptor, evidence_weights)
if direction is None or mechanism is None:
continue
if evidence < policy.min_score:
continue
state = states.get((descriptor.name, direction))
current = descriptor.current_value(current_config)
target = _propose_value(
descriptor=descriptor,
current=current,
direction=direction,
state=state,
policy=policy,
)
if target is None or target == current:
continue
risk = _direction_risk(descriptor, direction, evidence_weights)
score = max(0.0, evidence - risk)
candidates.append(
MechanismCandidate(
action_id=(
f"coordinate_line_search:{descriptor.search_geometry}:"
f"{descriptor.name}:{direction}:{_stable_token(current)}->{_stable_token(target)}"
),
knob=descriptor.name,
mechanism=mechanism,
operator="coordinate_line_search",
direction=direction,
patch={descriptor.name: target},
score=round(score, 4),
score_factors={
"mechanism_evidence": round(evidence, 4),
"direction_risk": round(risk, 4),
},
evidence_refs=(mechanism,),
)
)
candidates.sort(key=lambda item: (item.score, item.action_id), reverse=True)
return tuple(candidates)
def _choose_direction(
descriptor: KnobDescriptor,
evidence_weights: Mapping[str, float],
) -> tuple[str | None, str | None, float]:
best_direction: str | None = None
best_mechanism: str | None = None
best_weight = 0.0
for direction, mechanisms in descriptor.directional_effects.items():
for mechanism in mechanisms:
weight = float(evidence_weights.get(mechanism, 0.0))
if weight > best_weight:
best_direction = direction
best_mechanism = mechanism
best_weight = weight
return best_direction, best_mechanism, best_weight
def _direction_risk(
descriptor: KnobDescriptor,
direction: str,
evidence_weights: Mapping[str, float],
) -> float:
risks = descriptor.risk_effects.get(direction, ())
if not risks:
return 0.0
return min(0.5, 0.2 * max(float(evidence_weights.get(item, 0.0)) for item in risks))
def _propose_value(
*,
descriptor: KnobDescriptor,
current: Any,
direction: str,
state: CoordinateOperatorState | None,
policy: CoordinateSearchPolicy,
) -> Any | None:
if descriptor.search_geometry == "toggle":
if not isinstance(current, bool):
current = bool(current)
return not current
if descriptor.search_geometry == "bounded_fraction":
value = _as_float(current)
if value is None:
return None
radius = (
state.trust_radius
if state is not None and state.trust_radius is not None
else policy.initial_fraction_step
)
if direction == "decrease":
target = value - radius
else:
target = value + radius
return _canonicalize_value(descriptor, target)
if descriptor.search_geometry == "linear":
value = _as_float(current)
if value is None:
return None
radius = (
state.trust_radius
if state is not None and state.trust_radius is not None
else 1.0
)
return _canonicalize_value(
descriptor,
value - radius if direction == "decrease" else value + radius,
)
if descriptor.search_geometry == "positive_capacity":
value = _as_float(current)
if value is None or value <= 0:
min_value = descriptor.constraints.min_value
return _canonicalize_value(descriptor, min_value if min_value is not None else 1)
radius = (
state.trust_radius
if state is not None and state.trust_radius is not None
else policy.initial_relative_step
)
factor = max(1.0 + radius, 1.01)
target = value / factor if direction == "decrease" else value * factor
return _canonicalize_value(descriptor, target)
raise ValueError(f"unsupported search geometry {descriptor.search_geometry!r}")
def _canonicalize_value(descriptor: KnobDescriptor, value: float | int) -> Any:
target = float(value)
if descriptor.constraints.min_value is not None:
target = max(target, float(descriptor.constraints.min_value))
if descriptor.constraints.max_value is not None:
target = min(target, float(descriptor.constraints.max_value))
if descriptor.constraints.integer or descriptor.value_type == "int":
integer_target = int(math.ceil(target))
multiple_of = descriptor.constraints.multiple_of
if multiple_of is not None and multiple_of > 1:
integer_target = int(math.ceil(integer_target / multiple_of) * multiple_of)
if descriptor.constraints.min_value is not None:
integer_target = max(integer_target, int(descriptor.constraints.min_value))
if descriptor.constraints.max_value is not None:
integer_target = min(integer_target, int(descriptor.constraints.max_value))
return integer_target
return round(target, 6)
def _as_float(value: Any) -> float | None:
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str) and value.strip():
try:
return float(value.strip())
except ValueError:
return None
return None
def _stable_token(value: Any) -> str:
return repr(value)

View File

@@ -732,6 +732,7 @@ class LLMPolicySpec:
system_prompt: str
max_history_trials: int
use_harness: bool = True
harness_candidate_policy: str = "advisory"
@classmethod
def from_dict(cls, data: Mapping[str, Any] | None) -> "LLMPolicySpec":
@@ -743,6 +744,13 @@ class LLMPolicySpec:
if payload.get("endpoint")
else None
)
harness_candidate_policy = str(
payload.get("harness_candidate_policy") or "advisory"
).strip()
if harness_candidate_policy not in {"advisory", "strict"}:
raise SpecError(
"llm.harness_candidate_policy must be one of: advisory, strict."
)
return cls(
endpoint=endpoint,
system_prompt=str(payload.get("system_prompt") or "").strip(),
@@ -754,6 +762,7 @@ class LLMPolicySpec:
if payload.get("use_harness") is not None
else True
),
harness_candidate_policy=harness_candidate_policy,
)

View File

@@ -27,7 +27,15 @@ class StudyStore:
def init_study(self, *, spec_path: Path, study: StudySpec) -> Path:
root = self.study_root(study.study_id)
for rel in ("prompts", "proposals", "trials", "results", "harness"):
for rel in (
"prompts",
"proposals",
"proposal_attributions",
"trials",
"results",
"harness",
"candidate_family_gaps",
):
(root / rel).mkdir(parents=True, exist_ok=True)
(root / "study_spec.source").write_text(str(spec_path.resolve()) + "\n", encoding="utf-8")
self.write_json(root / "study_spec.snapshot.json", to_jsonable(study))
@@ -80,6 +88,26 @@ class StudyStore:
self.write_json(path, payload)
return path
def write_proposal_attribution(
self,
study_id: str,
proposal_name: str,
payload: dict[str, Any],
) -> Path:
path = self.study_root(study_id) / "proposal_attributions" / f"{proposal_name}.json"
self.write_json(path, payload)
return path
def write_candidate_family_gap(
self,
study_id: str,
trial_id: str,
payload: dict[str, Any],
) -> Path:
path = self.study_root(study_id) / "candidate_family_gaps" / f"{trial_id}.json"
self.write_json(path, payload)
return path
def materialize_trial(
self,
*,

View File

@@ -7410,6 +7410,231 @@ class CoreFlowTests(unittest.TestCase):
(store.study_root(study.study_id) / "harness" / "candidate-set-0002.json").exists()
)
def test_cli_tune_records_advisory_llm_out_of_set_candidate_family_gap(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["llm"]["endpoint"] = {
"provider": "custom",
"base_url": "http://llm.example/v1",
"wire_api": "chat.completions",
"model": "test-model",
"api_key_env": "OPENAI_API_KEY",
}
study_path.write_text(json.dumps(payload), encoding="utf-8")
study = load_study_spec(study_path)
store_root = tmp_path / "store"
store = StudyStore(store_root)
store.init_study(spec_path=study_path, study=study)
store.save_state(
StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=1,
best_sampling_u=0.25,
best_request_rate=1.0,
best_request_rate_per_gpu=1.0,
next_trial_index=2,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=1,
best_request_rate=1.0,
best_request_rate_per_gpu=1.0,
config_patch={
"env_patch": {},
"flag_patch": {"max-num-seqs": 8},
},
)
],
)
)
harness_context = {
"experiment_plan": {
"planner_version": "test",
"candidate_set": {
"candidate_set_hash": "candidate-set-test",
"eligible_candidates": [
{
"candidate_id": "cand-mns16",
"action_id": "coordinate_step:max-num-seqs:8->16",
"knob_family": "max-num-seqs",
"score": 0.8,
"effective_config_fingerprint": "not-the-llm-proposal",
"config_patch": {
"env_patch": {},
"flag_patch": {"max-num-seqs": 16},
},
}
],
"blocked_candidates": [],
},
"next_action": None,
}
}
llm_payload = json.dumps(
{
"observation": "Harness is in the right admission direction but too conservative.",
"diagnosis": "Try a larger same-operator admission step.",
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 24}},
"expected_effects": ["test whether admission capacity was underexplored"],
"why_not_previous_failures": "new value and no launch failure evidence",
"should_stop": False,
}
)
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
trial_root = Path(trial_payload["artifact_dir"])
result = {
"study_id": trial_payload["study_id"],
"trial_id": trial_payload["trial_id"],
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 1.0,
"best_request_count": 2,
"probes": [],
}
(trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8")
return result
buffer = io.StringIO()
with mock.patch("aituner.cli.build_harness_context", return_value=harness_context):
with mock.patch("aituner.llm.build_harness_context", return_value=harness_context):
with mock.patch("aituner.cli.call_llm_for_proposal", return_value=llm_payload):
with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial):
with contextlib.redirect_stdout(buffer):
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--skip-baseline",
"--max-trials",
"2",
"--proposal-policy",
"llm-first",
]
)
self.assertEqual(exit_code, 0)
summary = json.loads(buffer.getvalue())
executed = summary["executed_trials"]
self.assertEqual(executed[0]["proposal_origin"], "llm_out_of_set")
self.assertTrue(executed[0]["candidate_family_gap_path"])
attribution_path = (
store.study_root(study.study_id)
/ "proposal_attributions"
/ "proposal-0002.json"
)
attribution = json.loads(attribution_path.read_text(encoding="utf-8"))
self.assertEqual(attribution["proposal_origin"], "llm_out_of_set")
self.assertEqual(attribution["harness_candidate_policy"], "advisory")
gap_path = Path(executed[0]["candidate_family_gap_path"])
gap = json.loads(gap_path.read_text(encoding="utf-8"))
self.assertEqual(gap["gap_type"], "same_operator_new_step")
self.assertEqual(gap["review_status"], "pending")
self.assertEqual(gap["changed_knobs"], ["flag:max-num-seqs"])
self.assertEqual(gap["proposal_patch"]["flag_patch"]["max-num-seqs"], 24)
self.assertEqual(gap["nearest_harness_candidates"][0]["candidate_id"], "cand-mns16")
def test_cli_tune_strict_harness_policy_rejects_llm_out_of_set_proposal(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["llm"]["harness_candidate_policy"] = "strict"
payload["llm"]["endpoint"] = {
"provider": "custom",
"base_url": "http://llm.example/v1",
"wire_api": "chat.completions",
"model": "test-model",
"api_key_env": "OPENAI_API_KEY",
}
study_path.write_text(json.dumps(payload), encoding="utf-8")
study = load_study_spec(study_path)
store_root = tmp_path / "store"
store = StudyStore(store_root)
store.init_study(spec_path=study_path, study=study)
store.save_state(
StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=1,
best_request_rate=1.0,
best_request_rate_per_gpu=1.0,
next_trial_index=2,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=1,
best_request_rate=1.0,
best_request_rate_per_gpu=1.0,
config_patch={"env_patch": {}, "flag_patch": {"max-num-seqs": 8}},
)
],
)
)
harness_context = {
"experiment_plan": {
"candidate_set": {
"candidate_set_hash": "candidate-set-test",
"eligible_candidates": [
{
"candidate_id": "cand-mns16",
"effective_config_fingerprint": "not-the-llm-proposal",
"config_patch": {
"env_patch": {},
"flag_patch": {"max-num-seqs": 16},
},
}
],
}
}
}
llm_payload = json.dumps(
{
"observation": "Try an out-of-set candidate.",
"diagnosis": "strict mode should reject this.",
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 24}},
"expected_effects": ["should not run"],
"why_not_previous_failures": "",
"should_stop": False,
}
)
stderr = io.StringIO()
with mock.patch("aituner.cli.build_harness_context", return_value=harness_context):
with mock.patch("aituner.llm.build_harness_context", return_value=harness_context):
with mock.patch("aituner.cli.call_llm_for_proposal", return_value=llm_payload):
with mock.patch("aituner.cli.run_trial") as run_trial_mock:
with contextlib.redirect_stderr(stderr):
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--skip-baseline",
"--max-trials",
"2",
"--proposal-policy",
"llm-first",
]
)
self.assertEqual(exit_code, 2)
run_trial_mock.assert_not_called()
self.assertIn("llm.harness_candidate_policy=strict", stderr.getvalue())
def test_cli_tune_evaluates_baseline_before_llm_proposal(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import unittest
from aituner.engine_adapters.vllm import default_vllm_descriptors
from aituner.knob_descriptor import KnobConstraints, KnobDescriptor
from aituner.mechanism_planner import coordinate_line_search_candidates
class MechanismPlannerTests(unittest.TestCase):
def test_coordinate_search_uses_mechanism_not_knob_name(self) -> None:
vllm_descriptor = default_vllm_descriptors(tunable_flags=("max-num-seqs",))[0]
sglang_descriptor = KnobDescriptor(
name="max-running-requests",
location="flag",
value_type="int",
mechanisms=("admission_capacity", "kv_memory_pressure"),
search_geometry="positive_capacity",
operators=("coordinate_line_search",),
constraints=KnobConstraints(min_value=1, integer=True, multiple_of=8),
directional_effects={
"increase": ("admission_capacity",),
"decrease": ("kv_memory_pressure",),
},
)
vllm_candidates = coordinate_line_search_candidates(
current_config={"max-num-seqs": 8},
descriptors=(vllm_descriptor,),
evidence_weights={"admission_capacity": 0.9},
)
sglang_candidates = coordinate_line_search_candidates(
current_config={"max-running-requests": 8},
descriptors=(sglang_descriptor,),
evidence_weights={"admission_capacity": 0.9},
)
self.assertEqual(vllm_candidates[0].patch, {"max-num-seqs": 16})
self.assertEqual(sglang_candidates[0].patch, {"max-running-requests": 16})
self.assertEqual(vllm_candidates[0].mechanism, "admission_capacity")
self.assertEqual(sglang_candidates[0].mechanism, "admission_capacity")
def test_positive_capacity_can_decrease_for_memory_pressure(self) -> None:
descriptor = default_vllm_descriptors(tunable_flags=("max-num-seqs",))[0]
candidates = coordinate_line_search_candidates(
current_config={"max-num-seqs": 64},
descriptors=(descriptor,),
evidence_weights={"kv_memory_pressure": 0.8},
)
self.assertEqual(candidates[0].direction, "decrease")
self.assertEqual(candidates[0].patch, {"max-num-seqs": 32})
def test_bounded_fraction_respects_constraints(self) -> None:
descriptor = default_vllm_descriptors(tunable_flags=("gpu-memory-utilization",))[0]
candidates = coordinate_line_search_candidates(
current_config={"gpu-memory-utilization": 0.98},
descriptors=(descriptor,),
evidence_weights={"kv_memory_capacity": 0.8},
)
self.assertEqual(candidates[0].patch, {"gpu-memory-utilization": 1.0})
if __name__ == "__main__":
unittest.main()