Add Stop-A SLO-boundary guard

When a truncated probe's measured pass-rate lands within trace.adaptive_stop.
boundary_delta of the SLO target, re-measure on the full window and use that
verdict. Offered-L-C-A convergence cannot see engine-state drift in the window
tail, so a near-knee truncated verdict is untrustworthy (validated: prefix 0.96
vs full 0.946 at threshold 0.08594). The guard fires only on feasibility-knee
probes, so non-boundary probes keep the Stop-A saving. Default delta=0.02.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 16:25:24 +08:00
parent 9f52812753
commit dfc823f972
3 changed files with 114 additions and 11 deletions

View File

@@ -335,6 +335,7 @@ class AdaptiveStopSpec:
stable_checks: int = 3
max_checks: int = 20
min_fraction: float = 0.1
boundary_delta: float = 0.02
@classmethod
def from_dict(cls, data: Any) -> "AdaptiveStopSpec":
@@ -357,9 +358,14 @@ class AdaptiveStopSpec:
min_fraction = _require_float(
m.get("min_fraction", 0.1), context="trace.adaptive_stop.min_fraction"
)
boundary_delta = _require_float(
m.get("boundary_delta", 0.02), context="trace.adaptive_stop.boundary_delta"
)
for name, value in (("tau", tau), ("tau_c", tau_c), ("min_fraction", min_fraction)):
if not 0.0 < value <= 1.0:
raise SpecError(f"trace.adaptive_stop.{name} must be in (0, 1].")
if not 0.0 <= boundary_delta < 1.0:
raise SpecError("trace.adaptive_stop.boundary_delta must be in [0, 1).")
if stable_checks <= 0 or max_checks <= 0:
raise SpecError(
"trace.adaptive_stop.stable_checks and max_checks must be > 0."
@@ -376,6 +382,7 @@ class AdaptiveStopSpec:
stable_checks=stable_checks,
max_checks=max_checks,
min_fraction=min_fraction,
boundary_delta=boundary_delta,
)

View File

@@ -249,6 +249,29 @@ def _adaptive_replay_set(
return replay, certificate
def _should_extend_on_boundary(
*,
pass_rate: float,
target_pass_rate: float,
certificate: dict[str, Any] | None,
truncated: bool,
boundary_delta: float,
) -> bool:
"""SLO-boundary guard: re-measure on the full window when a truncated probe
lands within +/- boundary_delta of the SLO target.
Offered-L-C-A convergence cannot see engine-state drift in the window's tail,
so a near-boundary truncated verdict is untrustworthy. This fires only on
probes sitting on the feasibility knee, so non-boundary probes keep the Stop-A
time saving.
"""
if certificate is None or not certificate.get("converged"):
return False
if not truncated or boundary_delta <= 0:
return False
return abs(float(pass_rate) - float(target_pass_rate)) <= float(boundary_delta)
def _best_feasible_probe_record(probe_history: list[dict[str, Any]]) -> dict[str, Any] | None:
feasible = [
item
@@ -563,18 +586,36 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
selected, study=study, window=window
)
restart_after_early_stop = study.trace.restart_engine_after_early_stop
outcomes, early_stopped, early_stop_reason = _replay_requests(
replay_set,
base_url=recipe.base_url,
timeout_s=recipe.request_timeout_s,
max_concurrency=study.trace.max_concurrency,
target_pass_rate=study.slo.target_pass_rate,
max_lag_s=study.trace.early_stop_max_lag_s,
max_elapsed_s=study.trace.early_stop_max_elapsed_s,
evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo),
drain_inflight_on_early_stop=not restart_after_early_stop,
)
def _run(reqs: list[TraceRequest]) -> tuple[list[RequestOutcome], bool, str]:
return _replay_requests(
reqs,
base_url=recipe.base_url,
timeout_s=recipe.request_timeout_s,
max_concurrency=study.trace.max_concurrency,
target_pass_rate=study.slo.target_pass_rate,
max_lag_s=study.trace.early_stop_max_lag_s,
max_elapsed_s=study.trace.early_stop_max_elapsed_s,
evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo),
drain_inflight_on_early_stop=not restart_after_early_stop,
)
outcomes, early_stopped, early_stop_reason = _run(replay_set)
evaluations, summary = summarize_evaluations(outcomes, study.slo)
if _should_extend_on_boundary(
pass_rate=summary["slo_pass_rate"],
target_pass_rate=study.slo.target_pass_rate,
certificate=adaptive_stop_certificate,
truncated=len(replay_set) < len(selected),
boundary_delta=study.trace.adaptive_stop.boundary_delta,
):
# On the feasibility knee the truncated verdict is untrustworthy;
# re-measure the full window and use that result.
replay_set = selected
outcomes, early_stopped, early_stop_reason = _run(selected)
evaluations, summary = summarize_evaluations(outcomes, study.slo)
if adaptive_stop_certificate is not None:
adaptive_stop_certificate["boundary_extended"] = True
probe_details = _probe_outcome_details(
threshold=threshold,
selected=replay_set,

View File

@@ -53,6 +53,7 @@ from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window
from aituner.worker import (
_adaptive_replay_set,
_should_extend_on_boundary,
_best_feasible_probe_record,
_latency_summary,
_run_one_request,
@@ -476,6 +477,60 @@ class CoreFlowTests(unittest.TestCase):
self.assertIsNone(no_cert)
self.assertEqual(len(passthrough), len(requests))
def test_boundary_guard_extends_only_near_the_slo_knee(self) -> None:
converged = {"converged": True}
# Truncated, converged, pass-rate on the knee -> re-measure full.
self.assertTrue(
_should_extend_on_boundary(
pass_rate=0.961, target_pass_rate=0.95, certificate=converged,
truncated=True, boundary_delta=0.02,
)
)
self.assertTrue(
_should_extend_on_boundary(
pass_rate=0.946, target_pass_rate=0.95, certificate=converged,
truncated=True, boundary_delta=0.02,
)
)
# Clearly feasible / clearly infeasible -> trust the truncated verdict.
self.assertFalse(
_should_extend_on_boundary(
pass_rate=0.99, target_pass_rate=0.95, certificate=converged,
truncated=True, boundary_delta=0.02,
)
)
self.assertFalse(
_should_extend_on_boundary(
pass_rate=0.50, target_pass_rate=0.95, certificate=converged,
truncated=True, boundary_delta=0.02,
)
)
# Not truncated, not converged, guard disabled, or no certificate -> no extend.
self.assertFalse(
_should_extend_on_boundary(
pass_rate=0.95, target_pass_rate=0.95, certificate=converged,
truncated=False, boundary_delta=0.02,
)
)
self.assertFalse(
_should_extend_on_boundary(
pass_rate=0.95, target_pass_rate=0.95, certificate={"converged": False},
truncated=True, boundary_delta=0.02,
)
)
self.assertFalse(
_should_extend_on_boundary(
pass_rate=0.95, target_pass_rate=0.95, certificate=converged,
truncated=True, boundary_delta=0.0,
)
)
self.assertFalse(
_should_extend_on_boundary(
pass_rate=0.95, target_pass_rate=0.95, certificate=None,
truncated=True, boundary_delta=0.02,
)
)
def test_lca_similarity_matrix_separates_different_profiles(self) -> None:
window = WindowRecord(
window_id="base",