Integrate descriptor runtime candidates into harness

This commit is contained in:
2026-06-30 14:10:19 +08:00
parent adb5356c4b
commit 1b8f5a3af1
5 changed files with 400 additions and 31 deletions

View File

@@ -55,7 +55,7 @@ def default_vllm_descriptors(*, tunable_flags: Iterable[str]) -> tuple[KnobDescr
mechanisms=("kv_memory_capacity", "launch_feasibility"),
search_geometry="bounded_fraction",
operators=("coordinate_line_search", "frontier_delta_projection"),
constraints=KnobConstraints(min_value=0.0, max_value=1.0),
constraints=KnobConstraints(min_value=0.0, max_value=0.97),
directional_effects={
"increase": ("kv_memory_capacity",),
"decrease": ("launch_feasibility",),

View File

@@ -9,7 +9,12 @@ from .config_signature import (
effective_config_signature as _shared_effective_config_signature,
normalized_config_patch as _shared_normalized_config_patch,
)
from .engine_adapters.vllm import default_vllm_descriptors
from .lca import EPSILON, WorkloadProfile
from .mechanism_planner import (
CoordinateSearchPolicy,
coordinate_line_search_candidates,
)
from .spec import ConfigPatch, Proposal, StudySpec, StudyState, TrialSummary
@@ -1644,9 +1649,226 @@ def _runtime_candidate_actions(
effective_config_signature=signature,
)
)
production_covered_knobs = _candidate_action_runtime_knobs(actions)
actions.extend(
_descriptor_runtime_candidate_actions(
study,
window_summary,
anchor_flags,
runtime_base_patch,
top_bottleneck,
bottleneck_hypotheses,
recent_diagnostics,
production_covered_knobs=production_covered_knobs,
topology_settled=topology_settled,
seen_signatures=seen_signatures,
blocked_candidates=blocked_candidates,
)
)
return actions
def _descriptor_runtime_candidate_actions(
study: StudySpec,
window_summary: dict[str, Any],
anchor_flags: dict[str, Any],
runtime_base_patch: dict[str, Any],
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
recent_diagnostics: list[dict[str, Any]],
*,
production_covered_knobs: set[str],
topology_settled: bool,
seen_signatures: set[str],
blocked_candidates: list[dict[str, Any]],
) -> list[dict[str, Any]]:
descriptors = _engine_knob_descriptors(study)
if not descriptors:
return []
evidence_weights = _mechanism_evidence_weights(
study,
window_summary,
top_bottleneck,
bottleneck_hypotheses,
)
candidates = coordinate_line_search_candidates(
current_config=anchor_flags,
descriptors=tuple(
descriptor
for descriptor in descriptors
if descriptor.name in anchor_flags and anchor_flags.get(descriptor.name) is not None
),
evidence_weights=evidence_weights,
policy=CoordinateSearchPolicy(
initial_relative_step=1.0,
initial_fraction_step=0.1,
step_multipliers=(1.0, 2.0, 4.0),
min_score=0.25,
),
)
actions: list[dict[str, Any]] = []
for candidate in candidates:
patch = {**runtime_base_patch, **candidate.patch}
signature = _effective_config_signature(study, {"env_patch": {}, "flag_patch": patch})
if candidate.knob in production_covered_knobs:
blocked_candidates.append(
_blocked_candidate(
action_id=candidate.action_id,
knob_family=f"descriptor:{candidate.mechanism}",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_descriptor_knob_has_production_candidate",
effective_config_signature=signature,
score=candidate.score,
score_factors=candidate.score_factors,
)
)
continue
if _recent_runtime_knob_probe_count(candidate.knob, recent_diagnostics) >= 2:
blocked_candidates.append(
_blocked_candidate(
action_id=candidate.action_id,
knob_family=f"descriptor:{candidate.mechanism}",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_descriptor_knob_already_covered_by_recent_probes",
effective_config_signature=signature,
score=candidate.score,
score_factors=candidate.score_factors,
)
)
continue
if signature in seen_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=candidate.action_id,
knob_family=f"descriptor:{candidate.mechanism}",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
score=candidate.score,
score_factors=candidate.score_factors,
)
)
continue
score_cap = 0.52 if topology_settled else 0.45
score = min(
score_cap,
0.22
+ candidate.score * 0.65
+ _information_gain(bottleneck_hypotheses, "runtime") * 0.5,
)
actions.append(
_runtime_action(
action_id=candidate.action_id,
knob_family=f"descriptor:{candidate.mechanism}",
score=score,
score_factors={
**candidate.score_factors,
"planner_score": candidate.score,
"information_gain": round(
_information_gain(bottleneck_hypotheses, "runtime"), 4
),
"regression_risk": round(
candidate.score_factors.get("direction_risk", 0.0)
+ candidate.score_factors.get("step_risk", 0.0),
4,
),
},
patch=patch,
hypothesis=(
"Descriptor-driven coordinate line search over a serving mechanism "
f"({candidate.mechanism}) selected {candidate.knob} "
f"{candidate.direction} without using a knob-specific candidate rule."
),
expected_effects=[
"test one declarative knob mechanism while preserving the anchor topology",
"treat a positive result as transferable mechanism evidence, not a fixed recipe",
"reject or shrink this direction if the measured per-GPU SLO objective does not improve",
],
)
)
seen_signatures.add(signature)
return actions[:8]
def _candidate_action_runtime_knobs(actions: list[dict[str, Any]]) -> set[str]:
knobs: set[str] = set()
for action in actions:
patch = action.get("config_patch")
if not isinstance(patch, dict):
continue
flag_patch = patch.get("flag_patch")
if not isinstance(flag_patch, dict):
continue
knobs.update(set(flag_patch) & _RUNTIME_KEYS)
return knobs
def _recent_runtime_knob_probe_count(
knob: str,
recent_diagnostics: list[dict[str, Any]],
) -> int:
count = 0
for item in recent_diagnostics:
patch = item.get("config_patch")
if not isinstance(patch, dict):
continue
flag_patch = patch.get("flag_patch")
if isinstance(flag_patch, dict) and knob in flag_patch:
count += 1
return count
def _engine_knob_descriptors(study: StudySpec):
engine_name = str(study.engine.engine_name or "").lower()
if engine_name == "vllm":
return default_vllm_descriptors(tunable_flags=study.engine.tunable_flags)
return ()
def _mechanism_evidence_weights(
study: StudySpec,
window_summary: dict[str, Any],
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
) -> dict[str, float]:
confidence = _hypothesis_confidence(bottleneck_hypotheses, top_bottleneck)
if confidence <= 0:
confidence = 0.35
weights: dict[str, float] = {}
def add(mechanism: str, weight: float) -> None:
weights[mechanism] = max(weights.get(mechanism, 0.0), round(weight, 4))
if top_bottleneck == "ttft_prefill":
add("prefill_scheduling", 0.78 * confidence)
add("prefill_tail_latency", 0.58 * confidence)
add("admission_capacity", 0.64 * confidence)
add("kv_memory_capacity", 0.58 * confidence)
elif top_bottleneck == "admission_or_queueing":
add("admission_capacity", 0.82 * confidence)
add("kv_memory_capacity", 0.62 * confidence)
add("decode_batching", 0.48 * confidence)
add("prefill_scheduling", 0.36 * confidence)
elif top_bottleneck == "decode_tpot":
add("kv_memory_pressure", 0.74 * confidence)
add("kv_memory_capacity", 0.56 * confidence)
add("decode_batching", 0.48 * confidence)
elif top_bottleneck == "launch_or_memory":
add("launch_feasibility", 0.82 * confidence)
add("kv_memory_pressure", 0.56 * confidence)
else:
add("admission_capacity", 0.32)
add("kv_memory_capacity", 0.28)
prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95"))
tail_ratio = _as_float(window_summary.get("prompt_tail_ratio_p95_p50"))
if _length_regime(prompt_p95, tail_ratio) != "short_or_moderate":
add("prefill_scheduling", 0.36)
if study.trace.request_mode == "decode_only":
add("decode_batching", 0.36)
return weights
def _frontier_delta_projection_actions(
study: StudySpec,
state: StudyState,

View File

@@ -11,6 +11,7 @@ from .knob_descriptor import KnobDescriptor
class CoordinateSearchPolicy:
initial_relative_step: float = 1.0
initial_fraction_step: float = 0.05
step_multipliers: tuple[float, ...] = (1.0,)
grow_factor: float = 1.5
shrink_factor: float = 0.5
min_score: float = 0.0
@@ -60,36 +61,44 @@ def coordinate_line_search_candidates(
continue
state = states.get((descriptor.name, direction))
current = descriptor.current_value(current_config)
target = _propose_value(
descriptor=descriptor,
current=current,
direction=direction,
state=state,
policy=policy,
multipliers = (
(1.0,) if descriptor.search_geometry == "toggle" else policy.step_multipliers
)
if target is None or target == current:
continue
risk = _direction_risk(descriptor, direction, evidence_weights)
score = max(0.0, evidence - risk)
candidates.append(
MechanismCandidate(
action_id=(
f"coordinate_line_search:{descriptor.search_geometry}:"
f"{descriptor.name}:{direction}:{_stable_token(current)}->{_stable_token(target)}"
),
knob=descriptor.name,
mechanism=mechanism,
operator="coordinate_line_search",
for multiplier in multipliers:
target = _propose_value(
descriptor=descriptor,
current=current,
direction=direction,
patch={descriptor.name: target},
score=round(score, 4),
score_factors={
"mechanism_evidence": round(evidence, 4),
"direction_risk": round(risk, 4),
},
evidence_refs=(mechanism,),
state=state,
policy=policy,
step_multiplier=multiplier,
)
if target is None or target == current:
continue
risk = _direction_risk(descriptor, direction, evidence_weights)
step_risk = _step_risk(multiplier)
score = max(0.0, evidence - risk - step_risk)
candidates.append(
MechanismCandidate(
action_id=(
f"coordinate_line_search:{descriptor.search_geometry}:"
f"{descriptor.name}:{direction}:step={_stable_token(multiplier)}:"
f"{_stable_token(current)}->{_stable_token(target)}"
),
knob=descriptor.name,
mechanism=mechanism,
operator="coordinate_line_search",
direction=direction,
patch={descriptor.name: target},
score=round(score, 4),
score_factors={
"mechanism_evidence": round(evidence, 4),
"direction_risk": round(risk, 4),
"step_risk": round(step_risk, 4),
},
evidence_refs=(mechanism,),
)
)
)
candidates.sort(key=lambda item: (item.score, item.action_id), reverse=True)
return tuple(candidates)
@@ -122,6 +131,10 @@ def _direction_risk(
return min(0.5, 0.2 * max(float(evidence_weights.get(item, 0.0)) for item in risks))
def _step_risk(multiplier: float) -> float:
return min(0.2, max(0.0, float(multiplier) - 1.0) * 0.04)
def _propose_value(
*,
descriptor: KnobDescriptor,
@@ -129,6 +142,7 @@ def _propose_value(
direction: str,
state: CoordinateOperatorState | None,
policy: CoordinateSearchPolicy,
step_multiplier: float,
) -> Any | None:
if descriptor.search_geometry == "toggle":
if not isinstance(current, bool):
@@ -143,6 +157,7 @@ def _propose_value(
if state is not None and state.trust_radius is not None
else policy.initial_fraction_step
)
radius *= max(float(step_multiplier), 0.0)
if direction == "decrease":
target = value - radius
else:
@@ -157,6 +172,7 @@ def _propose_value(
if state is not None and state.trust_radius is not None
else 1.0
)
radius *= max(float(step_multiplier), 0.0)
return _canonicalize_value(
descriptor,
value - radius if direction == "decrease" else value + radius,
@@ -171,6 +187,7 @@ def _propose_value(
if state is not None and state.trust_radius is not None
else policy.initial_relative_step
)
radius *= max(float(step_multiplier), 0.0)
factor = max(1.0 + radius, 1.01)
target = value / factor if direction == "decrease" else value * factor
return _canonicalize_value(descriptor, target)

View File

@@ -2594,6 +2594,119 @@ class CoreFlowTests(unittest.TestCase):
)
self.assertNotIn("tensor-parallel-size", proposal.config_patch.flag_patch)
def test_descriptor_candidates_expose_bad_runtime_recovery_without_preempting_topology(
self,
) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
slo_overrides={
"ttft_rule": {"kind": "fixed_ms", "threshold_ms": 4000},
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50},
},
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 2,
"data-parallel-size": 1,
"gpu-memory-utilization": 0.5,
"max-num-seqs": 8,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"gpu-memory-utilization",
"max-num-seqs",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [2, 4, 8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [2, 4, 8],
},
},
)
study = load_study_spec(study_path)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.05,
"best_request_rate": 3.4667,
"best_pass_rate": 0.9663,
"probes": [
{
"threshold": 0.05,
"feasible": True,
"payload": {
"request_rate": 3.4667,
"pass_rate": 0.9663,
"latency_summary": {"failed_reason_counts": {}},
},
},
{
"threshold": 0.08,
"feasible": False,
"payload": {
"request_rate": 4.0,
"pass_rate": 0.5,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 120}
},
},
},
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_request_rate=3.4667,
best_request_rate_per_gpu=1.73335,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=2,
best_request_rate=3.4667,
best_request_rate_per_gpu=1.73335,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
)
context = build_harness_context(
study=study,
window_summary={
"prompt_tokens_p95": 6500,
"prompt_tail_ratio_p95_p50": 3.0,
},
state=state,
)
next_action = context["experiment_plan"]["next_action"]
self.assertEqual(next_action["knob_family"], "topology")
descriptor_patches = [
action["config_patch"]["flag_patch"]
for action in context["experiment_plan"]["candidate_actions"]
if str(action["knob_family"]).startswith("descriptor:")
]
self.assertTrue(
any(patch.get("max-num-seqs") == 24 for patch in descriptor_patches)
)
self.assertTrue(
any(
patch.get("gpu-memory-utilization") == 0.9
for patch in descriptor_patches
)
)
def test_harness_stops_gpu_mem_util_climb_after_tied_same_topology_probe(self) -> None:
"""A same-topology gpu-memory-utilization probe must improve per-GPU rate before
the hill-climb continues; launch success alone is not evidence to keep climbing."""

View File

@@ -4,7 +4,10 @@ import unittest
from aituner.engine_adapters.vllm import default_vllm_descriptors
from aituner.knob_descriptor import KnobConstraints, KnobDescriptor
from aituner.mechanism_planner import coordinate_line_search_candidates
from aituner.mechanism_planner import (
CoordinateSearchPolicy,
coordinate_line_search_candidates,
)
class MechanismPlannerTests(unittest.TestCase):
@@ -56,12 +59,26 @@ class MechanismPlannerTests(unittest.TestCase):
descriptor = default_vllm_descriptors(tunable_flags=("gpu-memory-utilization",))[0]
candidates = coordinate_line_search_candidates(
current_config={"gpu-memory-utilization": 0.98},
current_config={"gpu-memory-utilization": 0.96},
descriptors=(descriptor,),
evidence_weights={"kv_memory_capacity": 0.8},
)
self.assertEqual(candidates[0].patch, {"gpu-memory-utilization": 1.0})
self.assertEqual(candidates[0].patch, {"gpu-memory-utilization": 0.97})
def test_coordinate_search_can_emit_larger_same_operator_steps(self) -> None:
descriptor = default_vllm_descriptors(tunable_flags=("max-num-seqs",))[0]
candidates = coordinate_line_search_candidates(
current_config={"max-num-seqs": 8},
descriptors=(descriptor,),
evidence_weights={"admission_capacity": 0.9},
policy=CoordinateSearchPolicy(step_multipliers=(1.0, 2.0)),
)
patches = [candidate.patch for candidate in candidates]
self.assertIn({"max-num-seqs": 16}, patches)
self.assertIn({"max-num-seqs": 24}, patches)
if __name__ == "__main__":