From adb5356c4b3031e2f0caaf4efa678d9f1269942e Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Tue, 30 Jun 2026 12:05:03 +0800 Subject: [PATCH] Add advisory harness attribution and descriptor planner MVP --- .../candidate-family-gap-log.md | 37 ++ src/aituner/cli.py | 364 +++++++++++++++++- src/aituner/engine_adapters/__init__.py | 2 + src/aituner/engine_adapters/vllm.py | 82 ++++ src/aituner/knob_descriptor.py | 40 ++ src/aituner/llm.py | 5 + src/aituner/mechanism_planner.py | 213 ++++++++++ src/aituner/spec.py | 9 + src/aituner/store.py | 30 +- tests/test_core_flow.py | 225 +++++++++++ tests/test_mechanism_planner.py | 68 ++++ 11 files changed, 1066 insertions(+), 9 deletions(-) create mode 100644 docs/harness-ablation/candidate-family-gap-log.md create mode 100644 src/aituner/engine_adapters/__init__.py create mode 100644 src/aituner/engine_adapters/vllm.py create mode 100644 src/aituner/knob_descriptor.py create mode 100644 src/aituner/mechanism_planner.py create mode 100644 tests/test_mechanism_planner.py diff --git a/docs/harness-ablation/candidate-family-gap-log.md b/docs/harness-ablation/candidate-family-gap-log.md new file mode 100644 index 0000000..8258aea --- /dev/null +++ b/docs/harness-ablation/candidate-family-gap-log.md @@ -0,0 +1,37 @@ +# Candidate Family Gap Review Log + +本文档维护 LLM 在 `advisory` 模式下提出 harness candidate set 之外配置、且该配置带来性能提升时的人工 review 入口。 + +运行时系统不会自动修改 harness,也不会把 LLM 的 out-of-set proposal 直接提升为规则。每条提升先写入 study artifact: + +```text +.aituner//candidate_family_gaps/.json +``` + +然后人工 review 决定是否需要修改: + +- `KnobDescriptor` +- generic operator +- acquisition / step-size policy +- evidence estimator + +## Gap 分类 + +| 类型 | 含义 | 默认处理 | +|---|---|---| +| `same_operator_new_step` | harness 已有同 knob / 同方向候选,但 LLM 给了更好的 step/value | 优先检查 trust-region、step-size、candidate budget 和 acquisition | +| `missing_operator` | visible candidate set 中没有同 knob 或同 mechanism 的候选 | 检查是否缺 generic operator 或 descriptor 映射 | +| `missing_descriptor` | knob 不在 adapter descriptor 中 | 需要 engine adapter review | +| `missing_mechanism` | 现有机制词表无法表达该 proposal 的作用 | 需要 design review | +| `llm_independent_discovery` | LLM 发现无法归入当前 harness abstraction 的新方向 | 只作为 observation,不直接进入 harness | + +## Review 原则 + +1. 不接受 case-specific 数值表,例如“这个 case 试 `max-num-seqs=24`”。 +2. 若归类为 `same_operator_new_step`,只能修改通用 step policy,例如 grow/shrink factor、local grid budget、bracket 触发条件。 +3. 若归类为 `missing_descriptor`,descriptor 只能表达 knob 语义、约束、search geometry 和 directional effects,不能表达具体目标答案。 +4. 任何被接受的 gap 都需要新增 synthetic test,证明它不依赖 vLLM 常见取值或某个 bad-start case。 + +## Pending + +当前 repo 内尚无已人工接受的 candidate family gap。实验产生的 JSON artifact 需要在这里补充 review 摘要后再进入代码设计。 diff --git a/src/aituner/cli.py b/src/aituner/cli.py index 44c31f9..57605d0 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -1,10 +1,12 @@ from __future__ import annotations import argparse +import hashlib import json import sys from dataclasses import replace from pathlib import Path +from typing import Any from .compare import run_compare from .config_signature import ( @@ -25,6 +27,7 @@ from .lca import ( ) from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text from .spec import ( + ConfigPatch, Proposal, SpecError, StudySpec, @@ -68,6 +71,302 @@ def _reject_repeated_effective_config( ) +def _effective_config_fingerprint(signature: str) -> str: + return hashlib.sha256(signature.encode("utf-8")).hexdigest() + + +def _proposal_effective_config_fingerprint( + *, + study: StudySpec, + state: StudyState, + proposal: Proposal, +) -> str | None: + if proposal.should_stop: + return None + signature = materialized_effective_config_signature( + study=study, + state=state, + proposal=proposal, + ) + return _effective_config_fingerprint(signature) + + +def _visible_harness_candidates( + harness_context: dict[str, object] | None, +) -> list[dict[str, Any]]: + if not isinstance(harness_context, dict): + return [] + experiment_plan = harness_context.get("experiment_plan") + if not isinstance(experiment_plan, dict): + return [] + candidate_set = experiment_plan.get("candidate_set") + if not isinstance(candidate_set, dict): + return [] + candidates = candidate_set.get("eligible_candidates") + if not isinstance(candidates, list): + return [] + return [item for item in candidates if isinstance(item, dict)] + + +def _candidate_set_hash(harness_context: dict[str, object] | None) -> object: + if not isinstance(harness_context, dict): + return None + experiment_plan = harness_context.get("experiment_plan") + if not isinstance(experiment_plan, dict): + return None + candidate_set = experiment_plan.get("candidate_set") + if not isinstance(candidate_set, dict): + return None + return candidate_set.get("candidate_set_hash") + + +def _match_visible_harness_candidate( + *, + proposal_fingerprint: str | None, + harness_context: dict[str, object] | None, +) -> dict[str, Any] | None: + if proposal_fingerprint is None: + return None + for candidate in _visible_harness_candidates(harness_context): + if candidate.get("effective_config_fingerprint") == proposal_fingerprint: + return candidate + return None + + +def _proposal_source_label( + *, + proposal_name: str, + proposal_source: Path | None, +) -> str: + if proposal_name.startswith("baseline-"): + return "baseline" + if proposal_name.startswith("harness-stop-") or proposal_name.startswith("harness-proposal-"): + return "harness" + return str(proposal_source) if proposal_source else "llm" + + +def _classify_proposal_attribution( + *, + study: StudySpec, + state: StudyState, + proposal: Proposal, + proposal_name: str, + proposal_source: Path | None, + harness_context: dict[str, object] | None, +) -> dict[str, Any]: + source_label = _proposal_source_label( + proposal_name=proposal_name, + proposal_source=proposal_source, + ) + fingerprint = _proposal_effective_config_fingerprint( + study=study, + state=state, + proposal=proposal, + ) + matched = _match_visible_harness_candidate( + proposal_fingerprint=fingerprint, + harness_context=harness_context, + ) + matched_candidate_id = matched.get("candidate_id") if matched else None + if proposal.should_stop: + origin = ( + "harness_stop" + if proposal_name.startswith("harness-stop-") + else "proposal_file_stop" + if proposal_source is not None + else "llm_stop" + ) + elif proposal_name.startswith("baseline-"): + origin = "baseline" + elif proposal_name.startswith("harness-proposal-"): + origin = "harness_top1" + elif proposal_source is not None: + origin = "proposal_file" + elif study.llm.use_harness and harness_context is not None: + origin = "llm_selected_harness_candidate" if matched else "llm_out_of_set" + else: + origin = "llm_no_harness" + return { + "schema_version": 1, + "proposal_name": proposal_name, + "proposal_source": source_label, + "proposal_origin": origin, + "harness_candidate_policy": study.llm.harness_candidate_policy, + "candidate_set_hash": _candidate_set_hash(harness_context), + "proposal_effective_config_fingerprint": fingerprint, + "matched_effective_config_signature": matched is not None, + "matched_candidate_id": matched_candidate_id, + "matched_candidate": _compact_candidate_for_attribution(matched), + } + + +def _compact_candidate_for_attribution(candidate: dict[str, Any] | None) -> dict[str, Any] | None: + if not candidate: + return None + return { + "candidate_id": candidate.get("candidate_id"), + "action_id": candidate.get("action_id"), + "knob_family": candidate.get("knob_family"), + "score": candidate.get("score"), + "config_patch": candidate.get("config_patch"), + } + + +def _validate_harness_candidate_policy(attribution: dict[str, Any]) -> None: + if ( + attribution.get("harness_candidate_policy") == "strict" + and attribution.get("proposal_origin") == "llm_out_of_set" + ): + raise SpecError( + "LLM proposal is outside the visible harness eligible candidate set while " + "llm.harness_candidate_policy=strict. Use an eligible harness candidate, " + "switch to advisory mode, or disable harness context." + ) + + +def _config_patch_knob_keys(config_patch: object) -> set[str]: + if not isinstance(config_patch, dict): + return set() + keys: set[str] = set() + env_patch = config_patch.get("env_patch") + if isinstance(env_patch, dict): + keys.update(f"env:{key}" for key in env_patch) + flag_patch = config_patch.get("flag_patch") + if isinstance(flag_patch, dict): + keys.update(f"flag:{key}" for key in flag_patch) + return keys + + +def _proposal_knob_keys(proposal: Proposal) -> set[str]: + return { + *{f"env:{key}" for key in proposal.config_patch.env_patch}, + *{f"flag:{key}" for key in proposal.config_patch.flag_patch}, + } + + +def _nearest_visible_harness_candidates( + *, + proposal: Proposal, + harness_context: dict[str, object] | None, + limit: int = 3, +) -> list[dict[str, Any]]: + proposal_keys = _proposal_knob_keys(proposal) + scored: list[tuple[int, float, dict[str, Any]]] = [] + for candidate in _visible_harness_candidates(harness_context): + candidate_keys = _config_patch_knob_keys(candidate.get("config_patch")) + overlap = len(proposal_keys & candidate_keys) + score = candidate.get("score") + candidate_score = float(score) if isinstance(score, (int, float)) else 0.0 + scored.append((overlap, candidate_score, candidate)) + scored.sort(key=lambda item: (item[0], item[1]), reverse=True) + return [_compact_candidate_for_attribution(item[2]) or {} for item in scored[:limit]] + + +def _candidate_family_gap_payload( + *, + study: StudySpec, + trial_id: str, + proposal_name: str, + proposal: Proposal, + attribution: dict[str, Any], + harness_context: dict[str, object] | None, + incumbent_rate_per_gpu: float, + result_rate_per_gpu: float, +) -> dict[str, Any]: + nearest = _nearest_visible_harness_candidates( + proposal=proposal, + harness_context=harness_context, + ) + proposal_keys = _proposal_knob_keys(proposal) + has_same_knob_candidate = any( + proposal_keys + & _config_patch_knob_keys(candidate.get("config_patch") if isinstance(candidate, dict) else None) + for candidate in nearest + ) + return { + "schema_version": 1, + "study_id": study.study_id, + "trial_id": trial_id, + "proposal_name": proposal_name, + "proposal_origin": attribution.get("proposal_origin"), + "gap_type": "same_operator_new_step" if has_same_knob_candidate else "missing_operator", + "review_status": "pending", + "incumbent_request_rate_per_gpu": incumbent_rate_per_gpu, + "result_request_rate_per_gpu": result_rate_per_gpu, + "absolute_gain": result_rate_per_gpu - incumbent_rate_per_gpu, + "relative_gain": ( + (result_rate_per_gpu - incumbent_rate_per_gpu) / incumbent_rate_per_gpu + if incumbent_rate_per_gpu > 0 + else None + ), + "proposal_patch": { + "env_patch": dict(proposal.config_patch.env_patch), + "flag_patch": dict(proposal.config_patch.flag_patch), + }, + "changed_knobs": sorted(proposal_keys), + "candidate_set_hash": attribution.get("candidate_set_hash"), + "nearest_harness_candidates": nearest, + "interpretation": ( + "LLM changed a knob already present in the visible harness candidate set; " + "treat this as a step-size/acquisition gap until offline review accepts " + "a descriptor or operator change." + if has_same_knob_candidate + else "LLM changed knobs not represented by the visible harness candidates; " + "offline review should decide whether this is a missing operator, " + "descriptor, or mechanism." + ), + } + + +def _result_request_rate_per_gpu(result: dict[str, object], parallel_size: int | None) -> float | None: + best_request_rate = result.get("best_request_rate") + if not isinstance(best_request_rate, (int, float)): + return None + if not isinstance(parallel_size, int) or parallel_size <= 0: + return None + return float(best_request_rate) / float(parallel_size) + + +def _parse_parallel_int(value: object, *, default: int = 1) -> int: + if value is None: + return default + if isinstance(value, bool): + raise SpecError("Boolean values are not valid topology settings.") + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + if isinstance(value, str) and value.strip(): + return int(value.strip()) + raise SpecError(f"Unable to parse topology setting from {value!r}.") + + +def _parallel_size_for_config_patch(study: StudySpec, config_patch: object) -> int | None: + if not isinstance(config_patch, ConfigPatch): + return None + flags: dict[str, object] = dict(study.engine.base_flags) + flags.update(config_patch.flag_patch) + tp = _parse_parallel_int(flags.get("tensor-parallel-size"), default=1) + dp = _parse_parallel_int(flags.get("data-parallel-size"), default=1) + return tp * dp + + +def _is_candidate_family_gap( + *, + attribution: dict[str, Any], + incumbent_rate_per_gpu: float | None, + result_rate_per_gpu: float | None, +) -> bool: + if attribution.get("proposal_origin") != "llm_out_of_set": + return False + if not isinstance(incumbent_rate_per_gpu, (int, float)): + return False + if not isinstance(result_rate_per_gpu, (int, float)): + return False + min_gain = max(1e-6, float(incumbent_rate_per_gpu) * 0.01) + return float(result_rate_per_gpu) > float(incumbent_rate_per_gpu) + min_gain + + def _harness_snapshot_payload( *, study: StudySpec, @@ -418,8 +717,23 @@ def cmd_study_tune(args: argparse.Namespace) -> int: proposal=proposal, proposal_name=proposal_name, ) + proposal_attribution = _classify_proposal_attribution( + study=study, + state=state, + proposal=proposal, + proposal_name=proposal_name, + proposal_source=proposal_source, + harness_context=harness_context, + ) + _validate_harness_candidate_policy(proposal_attribution) store.write_proposal(study.study_id, proposal_name, proposal) if proposal.should_stop: + proposal_attribution["stopped"] = True + store.write_proposal_attribution( + study.study_id, + proposal_name, + proposal_attribution, + ) is_harness_stop = proposal_name.startswith("harness-stop-") is_llm_stop = not is_harness_stop and proposal_source is None stop_authority = ( @@ -498,21 +812,55 @@ def cmd_study_tune(args: argparse.Namespace) -> int: and not state.trials and _is_empty_config_patch(proposal) ) + pre_trial_best_rate_per_gpu = state.best_request_rate_per_gpu trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) + proposal_attribution["trial_id"] = trial.trial_id + store.write_proposal_attribution( + study.study_id, + proposal_name, + proposal_attribution, + ) trial_spec_path = Path(trial.artifact_dir) / "trial_spec.json" result = run_trial(trial_spec_path) state = store.ingest_trial_results(study.study_id) + trial_parallel_size = _parallel_size_for_config_patch(study, trial.config_patch) + result_rate_per_gpu = _result_request_rate_per_gpu(result, trial_parallel_size) + gap_path: Path | None = None + if _is_candidate_family_gap( + attribution=proposal_attribution, + incumbent_rate_per_gpu=pre_trial_best_rate_per_gpu, + result_rate_per_gpu=result_rate_per_gpu, + ): + gap_payload = _candidate_family_gap_payload( + study=study, + trial_id=trial.trial_id, + proposal_name=proposal_name, + proposal=proposal, + attribution=proposal_attribution, + harness_context=harness_context, + incumbent_rate_per_gpu=float(pre_trial_best_rate_per_gpu), + result_rate_per_gpu=float(result_rate_per_gpu), + ) + gap_path = store.write_candidate_family_gap( + study.study_id, + trial.trial_id, + gap_payload, + ) executed.append( - { - "trial_id": trial.trial_id, - "proposal_name": proposal_name, - "proposal_source": ( - "harness" - if proposal_name.startswith("harness-proposal-") - else str(proposal_source) if proposal_source else "llm" - ), + { + "trial_id": trial.trial_id, + "proposal_name": proposal_name, + "proposal_source": ( + "harness" + if proposal_name.startswith("harness-proposal-") + else str(proposal_source) if proposal_source else "llm" + ), + "proposal_origin": proposal_attribution.get("proposal_origin"), + "matched_candidate_id": proposal_attribution.get("matched_candidate_id"), + "candidate_family_gap_path": str(gap_path) if gap_path else None, "best_sampling_u": result.get("best_sampling_u"), "best_request_rate": result.get("best_request_rate"), + "best_request_rate_per_gpu": result_rate_per_gpu, "best_pass_rate": result.get("best_pass_rate"), "state_best_trial_id": state.best_trial_id, "state_best_request_rate": state.best_request_rate, diff --git a/src/aituner/engine_adapters/__init__.py b/src/aituner/engine_adapters/__init__.py new file mode 100644 index 0000000..3ce55a6 --- /dev/null +++ b/src/aituner/engine_adapters/__init__.py @@ -0,0 +1,2 @@ +from __future__ import annotations + diff --git a/src/aituner/engine_adapters/vllm.py b/src/aituner/engine_adapters/vllm.py new file mode 100644 index 0000000..4947da6 --- /dev/null +++ b/src/aituner/engine_adapters/vllm.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from collections.abc import Iterable + +from ..knob_descriptor import KnobConstraints, KnobDescriptor + + +def default_vllm_descriptors(*, tunable_flags: Iterable[str]) -> tuple[KnobDescriptor, ...]: + tunable = set(tunable_flags) + descriptors: list[KnobDescriptor] = [] + if "max-num-seqs" in tunable: + descriptors.append( + KnobDescriptor( + name="max-num-seqs", + location="flag", + value_type="int", + mechanisms=("admission_capacity", "kv_memory_pressure"), + search_geometry="positive_capacity", + operators=("coordinate_line_search", "frontier_delta_projection"), + constraints=KnobConstraints(min_value=1, integer=True, multiple_of=8), + directional_effects={ + "increase": ("admission_capacity",), + "decrease": ("kv_memory_pressure",), + }, + risk_effects={ + "increase": ("kv_memory_pressure", "decode_tail_latency"), + }, + ) + ) + if "max-num-batched-tokens" in tunable: + descriptors.append( + KnobDescriptor( + name="max-num-batched-tokens", + location="flag", + value_type="int", + mechanisms=("prefill_scheduling", "decode_batching"), + search_geometry="positive_capacity", + operators=("coordinate_line_search", "frontier_delta_projection"), + constraints=KnobConstraints(min_value=1, integer=True, multiple_of=128), + directional_effects={ + "increase": ("prefill_scheduling", "decode_batching"), + "decrease": ("prefill_tail_latency",), + }, + risk_effects={ + "increase": ("prefill_tail_latency", "kv_memory_pressure"), + }, + ) + ) + if "gpu-memory-utilization" in tunable: + descriptors.append( + KnobDescriptor( + name="gpu-memory-utilization", + location="flag", + value_type="float", + mechanisms=("kv_memory_capacity", "launch_feasibility"), + search_geometry="bounded_fraction", + operators=("coordinate_line_search", "frontier_delta_projection"), + constraints=KnobConstraints(min_value=0.0, max_value=1.0), + directional_effects={ + "increase": ("kv_memory_capacity",), + "decrease": ("launch_feasibility",), + }, + risk_effects={ + "increase": ("launch_feasibility",), + }, + ) + ) + if "enable-chunked-prefill" in tunable: + descriptors.append( + KnobDescriptor( + name="enable-chunked-prefill", + location="flag", + value_type="bool", + mechanisms=("prefill_scheduling",), + search_geometry="toggle", + operators=("coordinate_line_search",), + directional_effects={ + "toggle": ("prefill_scheduling",), + }, + ) + ) + return tuple(descriptors) diff --git a/src/aituner/knob_descriptor.py b/src/aituner/knob_descriptor.py new file mode 100644 index 0000000..002f11e --- /dev/null +++ b/src/aituner/knob_descriptor.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal, Mapping + + +KnobLocation = Literal["flag", "env"] +KnobValueType = Literal["int", "float", "bool", "enum", "str"] +SearchGeometry = Literal["linear", "positive_capacity", "bounded_fraction", "toggle"] + + +@dataclass(frozen=True) +class KnobConstraints: + min_value: float | None = None + max_value: float | None = None + integer: bool = False + multiple_of: int | None = None + + +@dataclass(frozen=True) +class KnobDescriptor: + """Declarative serving-knob semantics used by generic planners. + + The descriptor intentionally does not enumerate target values for continuous + or large integer knobs. It describes how a generic operator may perturb the + knob and which mechanism each direction is expected to affect. + """ + + name: str + location: KnobLocation + value_type: KnobValueType + mechanisms: tuple[str, ...] + search_geometry: SearchGeometry + operators: tuple[str, ...] + constraints: KnobConstraints = field(default_factory=KnobConstraints) + directional_effects: Mapping[str, tuple[str, ...]] = field(default_factory=dict) + risk_effects: Mapping[str, tuple[str, ...]] = field(default_factory=dict) + + def current_value(self, config: Mapping[str, Any]) -> Any: + return config.get(self.name) diff --git a/src/aituner/llm.py b/src/aituner/llm.py index 125be40..a70c20c 100644 --- a/src/aituner/llm.py +++ b/src/aituner/llm.py @@ -317,6 +317,11 @@ def build_prompt( if parallel_candidates else "If TP/DP/EP are not tunable, focus on the remaining launch-safe runtime knobs." ), + ( + "Harness candidate policy is advisory: prefer a high-scoring harness candidate when it matches your diagnosis, but you may propose an out-of-set config when the harness candidate family appears to miss the right step; such proposals are audited as candidate-family gaps." + if study.llm.harness_candidate_policy == "advisory" + else "Harness candidate policy is strict: your config_patch must match one of the harness eligible candidates after effective-config materialization." + ), "", "Study stack:", json.dumps( diff --git a/src/aituner/mechanism_planner.py b/src/aituner/mechanism_planner.py new file mode 100644 index 0000000..a9a4125 --- /dev/null +++ b/src/aituner/mechanism_planner.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from typing import Any, Mapping + +from .knob_descriptor import KnobDescriptor + + +@dataclass(frozen=True) +class CoordinateSearchPolicy: + initial_relative_step: float = 1.0 + initial_fraction_step: float = 0.05 + grow_factor: float = 1.5 + shrink_factor: float = 0.5 + min_score: float = 0.0 + + +@dataclass(frozen=True) +class CoordinateOperatorState: + knob: str + direction: str + trust_radius: float | None = None + last_good_value: Any | None = None + last_bad_value: Any | None = None + tested_values: tuple[Any, ...] = () + + +@dataclass(frozen=True) +class MechanismCandidate: + action_id: str + knob: str + mechanism: str + operator: str + direction: str + patch: dict[str, Any] + score: float + score_factors: dict[str, float] + evidence_refs: tuple[str, ...] = () + + +def coordinate_line_search_candidates( + *, + current_config: Mapping[str, Any], + descriptors: tuple[KnobDescriptor, ...], + evidence_weights: Mapping[str, float], + states: Mapping[tuple[str, str], CoordinateOperatorState] | None = None, + policy: CoordinateSearchPolicy | None = None, +) -> tuple[MechanismCandidate, ...]: + policy = policy or CoordinateSearchPolicy() + states = states or {} + candidates: list[MechanismCandidate] = [] + for descriptor in descriptors: + if "coordinate_line_search" not in descriptor.operators: + continue + direction, mechanism, evidence = _choose_direction(descriptor, evidence_weights) + if direction is None or mechanism is None: + continue + if evidence < policy.min_score: + continue + state = states.get((descriptor.name, direction)) + current = descriptor.current_value(current_config) + target = _propose_value( + descriptor=descriptor, + current=current, + direction=direction, + state=state, + policy=policy, + ) + if target is None or target == current: + continue + risk = _direction_risk(descriptor, direction, evidence_weights) + score = max(0.0, evidence - risk) + candidates.append( + MechanismCandidate( + action_id=( + f"coordinate_line_search:{descriptor.search_geometry}:" + f"{descriptor.name}:{direction}:{_stable_token(current)}->{_stable_token(target)}" + ), + knob=descriptor.name, + mechanism=mechanism, + operator="coordinate_line_search", + direction=direction, + patch={descriptor.name: target}, + score=round(score, 4), + score_factors={ + "mechanism_evidence": round(evidence, 4), + "direction_risk": round(risk, 4), + }, + evidence_refs=(mechanism,), + ) + ) + candidates.sort(key=lambda item: (item.score, item.action_id), reverse=True) + return tuple(candidates) + + +def _choose_direction( + descriptor: KnobDescriptor, + evidence_weights: Mapping[str, float], +) -> tuple[str | None, str | None, float]: + best_direction: str | None = None + best_mechanism: str | None = None + best_weight = 0.0 + for direction, mechanisms in descriptor.directional_effects.items(): + for mechanism in mechanisms: + weight = float(evidence_weights.get(mechanism, 0.0)) + if weight > best_weight: + best_direction = direction + best_mechanism = mechanism + best_weight = weight + return best_direction, best_mechanism, best_weight + + +def _direction_risk( + descriptor: KnobDescriptor, + direction: str, + evidence_weights: Mapping[str, float], +) -> float: + risks = descriptor.risk_effects.get(direction, ()) + if not risks: + return 0.0 + return min(0.5, 0.2 * max(float(evidence_weights.get(item, 0.0)) for item in risks)) + + +def _propose_value( + *, + descriptor: KnobDescriptor, + current: Any, + direction: str, + state: CoordinateOperatorState | None, + policy: CoordinateSearchPolicy, +) -> Any | None: + if descriptor.search_geometry == "toggle": + if not isinstance(current, bool): + current = bool(current) + return not current + if descriptor.search_geometry == "bounded_fraction": + value = _as_float(current) + if value is None: + return None + radius = ( + state.trust_radius + if state is not None and state.trust_radius is not None + else policy.initial_fraction_step + ) + if direction == "decrease": + target = value - radius + else: + target = value + radius + return _canonicalize_value(descriptor, target) + if descriptor.search_geometry == "linear": + value = _as_float(current) + if value is None: + return None + radius = ( + state.trust_radius + if state is not None and state.trust_radius is not None + else 1.0 + ) + return _canonicalize_value( + descriptor, + value - radius if direction == "decrease" else value + radius, + ) + if descriptor.search_geometry == "positive_capacity": + value = _as_float(current) + if value is None or value <= 0: + min_value = descriptor.constraints.min_value + return _canonicalize_value(descriptor, min_value if min_value is not None else 1) + radius = ( + state.trust_radius + if state is not None and state.trust_radius is not None + else policy.initial_relative_step + ) + factor = max(1.0 + radius, 1.01) + target = value / factor if direction == "decrease" else value * factor + return _canonicalize_value(descriptor, target) + raise ValueError(f"unsupported search geometry {descriptor.search_geometry!r}") + + +def _canonicalize_value(descriptor: KnobDescriptor, value: float | int) -> Any: + target = float(value) + if descriptor.constraints.min_value is not None: + target = max(target, float(descriptor.constraints.min_value)) + if descriptor.constraints.max_value is not None: + target = min(target, float(descriptor.constraints.max_value)) + if descriptor.constraints.integer or descriptor.value_type == "int": + integer_target = int(math.ceil(target)) + multiple_of = descriptor.constraints.multiple_of + if multiple_of is not None and multiple_of > 1: + integer_target = int(math.ceil(integer_target / multiple_of) * multiple_of) + if descriptor.constraints.min_value is not None: + integer_target = max(integer_target, int(descriptor.constraints.min_value)) + if descriptor.constraints.max_value is not None: + integer_target = min(integer_target, int(descriptor.constraints.max_value)) + return integer_target + return round(target, 6) + + +def _as_float(value: Any) -> float | None: + if isinstance(value, bool): + return None + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str) and value.strip(): + try: + return float(value.strip()) + except ValueError: + return None + return None + + +def _stable_token(value: Any) -> str: + return repr(value) diff --git a/src/aituner/spec.py b/src/aituner/spec.py index e9549b2..de5c661 100644 --- a/src/aituner/spec.py +++ b/src/aituner/spec.py @@ -732,6 +732,7 @@ class LLMPolicySpec: system_prompt: str max_history_trials: int use_harness: bool = True + harness_candidate_policy: str = "advisory" @classmethod def from_dict(cls, data: Mapping[str, Any] | None) -> "LLMPolicySpec": @@ -743,6 +744,13 @@ class LLMPolicySpec: if payload.get("endpoint") else None ) + harness_candidate_policy = str( + payload.get("harness_candidate_policy") or "advisory" + ).strip() + if harness_candidate_policy not in {"advisory", "strict"}: + raise SpecError( + "llm.harness_candidate_policy must be one of: advisory, strict." + ) return cls( endpoint=endpoint, system_prompt=str(payload.get("system_prompt") or "").strip(), @@ -754,6 +762,7 @@ class LLMPolicySpec: if payload.get("use_harness") is not None else True ), + harness_candidate_policy=harness_candidate_policy, ) diff --git a/src/aituner/store.py b/src/aituner/store.py index c0aae71..532654f 100644 --- a/src/aituner/store.py +++ b/src/aituner/store.py @@ -27,7 +27,15 @@ class StudyStore: def init_study(self, *, spec_path: Path, study: StudySpec) -> Path: root = self.study_root(study.study_id) - for rel in ("prompts", "proposals", "trials", "results", "harness"): + for rel in ( + "prompts", + "proposals", + "proposal_attributions", + "trials", + "results", + "harness", + "candidate_family_gaps", + ): (root / rel).mkdir(parents=True, exist_ok=True) (root / "study_spec.source").write_text(str(spec_path.resolve()) + "\n", encoding="utf-8") self.write_json(root / "study_spec.snapshot.json", to_jsonable(study)) @@ -80,6 +88,26 @@ class StudyStore: self.write_json(path, payload) return path + def write_proposal_attribution( + self, + study_id: str, + proposal_name: str, + payload: dict[str, Any], + ) -> Path: + path = self.study_root(study_id) / "proposal_attributions" / f"{proposal_name}.json" + self.write_json(path, payload) + return path + + def write_candidate_family_gap( + self, + study_id: str, + trial_id: str, + payload: dict[str, Any], + ) -> Path: + path = self.study_root(study_id) / "candidate_family_gaps" / f"{trial_id}.json" + self.write_json(path, payload) + return path + def materialize_trial( self, *, diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index e3f6d77..f75a96e 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -7410,6 +7410,231 @@ class CoreFlowTests(unittest.TestCase): (store.study_root(study.study_id) / "harness" / "candidate-set-0002.json").exists() ) + def test_cli_tune_records_advisory_llm_out_of_set_candidate_family_gap(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + payload = json.loads(study_path.read_text(encoding="utf-8")) + payload["llm"]["endpoint"] = { + "provider": "custom", + "base_url": "http://llm.example/v1", + "wire_api": "chat.completions", + "model": "test-model", + "api_key_env": "OPENAI_API_KEY", + } + study_path.write_text(json.dumps(payload), encoding="utf-8") + study = load_study_spec(study_path) + store_root = tmp_path / "store" + store = StudyStore(store_root) + store.init_study(spec_path=study_path, study=study) + store.save_state( + StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=1, + best_sampling_u=0.25, + best_request_rate=1.0, + best_request_rate_per_gpu=1.0, + next_trial_index=2, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=1, + best_request_rate=1.0, + best_request_rate_per_gpu=1.0, + config_patch={ + "env_patch": {}, + "flag_patch": {"max-num-seqs": 8}, + }, + ) + ], + ) + ) + harness_context = { + "experiment_plan": { + "planner_version": "test", + "candidate_set": { + "candidate_set_hash": "candidate-set-test", + "eligible_candidates": [ + { + "candidate_id": "cand-mns16", + "action_id": "coordinate_step:max-num-seqs:8->16", + "knob_family": "max-num-seqs", + "score": 0.8, + "effective_config_fingerprint": "not-the-llm-proposal", + "config_patch": { + "env_patch": {}, + "flag_patch": {"max-num-seqs": 16}, + }, + } + ], + "blocked_candidates": [], + }, + "next_action": None, + } + } + llm_payload = json.dumps( + { + "observation": "Harness is in the right admission direction but too conservative.", + "diagnosis": "Try a larger same-operator admission step.", + "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 24}}, + "expected_effects": ["test whether admission capacity was underexplored"], + "why_not_previous_failures": "new value and no launch failure evidence", + "should_stop": False, + } + ) + + def fake_run_trial(trial_spec_path: Path) -> dict[str, object]: + trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8")) + trial_root = Path(trial_payload["artifact_dir"]) + result = { + "study_id": trial_payload["study_id"], + "trial_id": trial_payload["trial_id"], + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 1.0, + "best_request_count": 2, + "probes": [], + } + (trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8") + return result + + buffer = io.StringIO() + with mock.patch("aituner.cli.build_harness_context", return_value=harness_context): + with mock.patch("aituner.llm.build_harness_context", return_value=harness_context): + with mock.patch("aituner.cli.call_llm_for_proposal", return_value=llm_payload): + with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial): + with contextlib.redirect_stdout(buffer): + exit_code = cli_main( + [ + "study", + "tune", + "--spec", + str(study_path), + "--store-root", + str(store_root), + "--skip-baseline", + "--max-trials", + "2", + "--proposal-policy", + "llm-first", + ] + ) + + self.assertEqual(exit_code, 0) + summary = json.loads(buffer.getvalue()) + executed = summary["executed_trials"] + self.assertEqual(executed[0]["proposal_origin"], "llm_out_of_set") + self.assertTrue(executed[0]["candidate_family_gap_path"]) + attribution_path = ( + store.study_root(study.study_id) + / "proposal_attributions" + / "proposal-0002.json" + ) + attribution = json.loads(attribution_path.read_text(encoding="utf-8")) + self.assertEqual(attribution["proposal_origin"], "llm_out_of_set") + self.assertEqual(attribution["harness_candidate_policy"], "advisory") + gap_path = Path(executed[0]["candidate_family_gap_path"]) + gap = json.loads(gap_path.read_text(encoding="utf-8")) + self.assertEqual(gap["gap_type"], "same_operator_new_step") + self.assertEqual(gap["review_status"], "pending") + self.assertEqual(gap["changed_knobs"], ["flag:max-num-seqs"]) + self.assertEqual(gap["proposal_patch"]["flag_patch"]["max-num-seqs"], 24) + self.assertEqual(gap["nearest_harness_candidates"][0]["candidate_id"], "cand-mns16") + + def test_cli_tune_strict_harness_policy_rejects_llm_out_of_set_proposal(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + payload = json.loads(study_path.read_text(encoding="utf-8")) + payload["llm"]["harness_candidate_policy"] = "strict" + payload["llm"]["endpoint"] = { + "provider": "custom", + "base_url": "http://llm.example/v1", + "wire_api": "chat.completions", + "model": "test-model", + "api_key_env": "OPENAI_API_KEY", + } + study_path.write_text(json.dumps(payload), encoding="utf-8") + study = load_study_spec(study_path) + store_root = tmp_path / "store" + store = StudyStore(store_root) + store.init_study(spec_path=study_path, study=study) + store.save_state( + StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_parallel_size=1, + best_request_rate=1.0, + best_request_rate_per_gpu=1.0, + next_trial_index=2, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + parallel_size=1, + best_request_rate=1.0, + best_request_rate_per_gpu=1.0, + config_patch={"env_patch": {}, "flag_patch": {"max-num-seqs": 8}}, + ) + ], + ) + ) + harness_context = { + "experiment_plan": { + "candidate_set": { + "candidate_set_hash": "candidate-set-test", + "eligible_candidates": [ + { + "candidate_id": "cand-mns16", + "effective_config_fingerprint": "not-the-llm-proposal", + "config_patch": { + "env_patch": {}, + "flag_patch": {"max-num-seqs": 16}, + }, + } + ], + } + } + } + llm_payload = json.dumps( + { + "observation": "Try an out-of-set candidate.", + "diagnosis": "strict mode should reject this.", + "config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 24}}, + "expected_effects": ["should not run"], + "why_not_previous_failures": "", + "should_stop": False, + } + ) + stderr = io.StringIO() + with mock.patch("aituner.cli.build_harness_context", return_value=harness_context): + with mock.patch("aituner.llm.build_harness_context", return_value=harness_context): + with mock.patch("aituner.cli.call_llm_for_proposal", return_value=llm_payload): + with mock.patch("aituner.cli.run_trial") as run_trial_mock: + with contextlib.redirect_stderr(stderr): + exit_code = cli_main( + [ + "study", + "tune", + "--spec", + str(study_path), + "--store-root", + str(store_root), + "--skip-baseline", + "--max-trials", + "2", + "--proposal-policy", + "llm-first", + ] + ) + + self.assertEqual(exit_code, 2) + run_trial_mock.assert_not_called() + self.assertIn("llm.harness_candidate_policy=strict", stderr.getvalue()) + def test_cli_tune_evaluates_baseline_before_llm_proposal(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) diff --git a/tests/test_mechanism_planner.py b/tests/test_mechanism_planner.py new file mode 100644 index 0000000..4fee15c --- /dev/null +++ b/tests/test_mechanism_planner.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import unittest + +from aituner.engine_adapters.vllm import default_vllm_descriptors +from aituner.knob_descriptor import KnobConstraints, KnobDescriptor +from aituner.mechanism_planner import coordinate_line_search_candidates + + +class MechanismPlannerTests(unittest.TestCase): + def test_coordinate_search_uses_mechanism_not_knob_name(self) -> None: + vllm_descriptor = default_vllm_descriptors(tunable_flags=("max-num-seqs",))[0] + sglang_descriptor = KnobDescriptor( + name="max-running-requests", + location="flag", + value_type="int", + mechanisms=("admission_capacity", "kv_memory_pressure"), + search_geometry="positive_capacity", + operators=("coordinate_line_search",), + constraints=KnobConstraints(min_value=1, integer=True, multiple_of=8), + directional_effects={ + "increase": ("admission_capacity",), + "decrease": ("kv_memory_pressure",), + }, + ) + + vllm_candidates = coordinate_line_search_candidates( + current_config={"max-num-seqs": 8}, + descriptors=(vllm_descriptor,), + evidence_weights={"admission_capacity": 0.9}, + ) + sglang_candidates = coordinate_line_search_candidates( + current_config={"max-running-requests": 8}, + descriptors=(sglang_descriptor,), + evidence_weights={"admission_capacity": 0.9}, + ) + + self.assertEqual(vllm_candidates[0].patch, {"max-num-seqs": 16}) + self.assertEqual(sglang_candidates[0].patch, {"max-running-requests": 16}) + self.assertEqual(vllm_candidates[0].mechanism, "admission_capacity") + self.assertEqual(sglang_candidates[0].mechanism, "admission_capacity") + + def test_positive_capacity_can_decrease_for_memory_pressure(self) -> None: + descriptor = default_vllm_descriptors(tunable_flags=("max-num-seqs",))[0] + + candidates = coordinate_line_search_candidates( + current_config={"max-num-seqs": 64}, + descriptors=(descriptor,), + evidence_weights={"kv_memory_pressure": 0.8}, + ) + + self.assertEqual(candidates[0].direction, "decrease") + self.assertEqual(candidates[0].patch, {"max-num-seqs": 32}) + + def test_bounded_fraction_respects_constraints(self) -> None: + descriptor = default_vllm_descriptors(tunable_flags=("gpu-memory-utilization",))[0] + + candidates = coordinate_line_search_candidates( + current_config={"gpu-memory-utilization": 0.98}, + descriptors=(descriptor,), + evidence_weights={"kv_memory_capacity": 0.8}, + ) + + self.assertEqual(candidates[0].patch, {"gpu-memory-utilization": 1.0}) + + +if __name__ == "__main__": + unittest.main()