Add frontier delta projection harness candidates

This commit is contained in:
2026-06-29 16:15:06 +08:00
parent 6c84dc91d7
commit 8dd9ada194
2 changed files with 553 additions and 0 deletions

View File

@@ -42,6 +42,9 @@ _GMU_SAFE_CEILING = 0.97
_PREFILL_QUANTUM_HEAD_OF_LINE_RATIO = 1.0
_PREFILL_QUANTUM_FRAGMENTATION_RATIO = 0.5
_ADMISSION_PRESSURE_STEP_UP = 1.5
_FRONTIER_DELTA_MIN_ABS_GAIN = 0.02
_FRONTIER_DELTA_MIN_REL_GAIN = 0.03
_FRONTIER_DELTA_PROJECTED_INCUMBENT_FLOOR = 0.98
def build_harness_context(
@@ -1115,6 +1118,16 @@ def _candidate_actions(
blocked_candidates,
)
)
candidates.extend(
_frontier_delta_projection_actions(
study,
trial_profiles,
top_bottleneck,
bottleneck_hypotheses,
tested_signatures,
blocked_candidates,
)
)
candidates.extend(
_runtime_candidate_actions(
study,
@@ -1626,6 +1639,376 @@ def _runtime_candidate_actions(
return actions
def _frontier_delta_projection_actions(
study: StudySpec,
trial_profiles: list[dict[str, Any]],
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
tested_signatures: set[str],
blocked_candidates: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if not (set(study.engine.tunable_flags) & _RUNTIME_KEYS):
return []
anchors = _pareto_frontier_anchor_profiles(study, trial_profiles)
if len(anchors) < 2:
return []
deltas = _positive_runtime_delta_records(study, trial_profiles)
if not deltas:
return []
actions: list[dict[str, Any]] = []
seen_signatures = set(tested_signatures)
incumbent_rate = max(
(
_profile_request_rate_per_gpu(profile)
for profile in trial_profiles
if profile.get("status") == "completed"
),
default=0.0,
)
for delta in deltas:
source_topology_key = delta["source_topology_key"]
source_parallel = int(delta["source_parallel_size"])
source_rate = _as_float(delta["source_rate_per_gpu"])
source_gain = _as_float(delta["gain"])
for target in anchors:
target_flags = _effective_flags_for_item(study, target)
target_topology_key = _topology_key(target_flags)
if target_topology_key == source_topology_key:
continue
target_parallel = _topology_parallel_size(target_flags)
target_rate = _profile_request_rate_per_gpu(target)
runtime_delta = {
key: value
for key, value in delta["runtime_delta"].items()
if key in study.engine.tunable_flags and target_flags.get(key) != value
}
action_id = (
f"project_runtime_delta_from_{delta['source_trial_id']}"
f"_to_{target.get('trial_id')}"
)
if not runtime_delta:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="frontier-delta-projection",
config_patch={
"env_patch": {},
"flag_patch": {
**_preserve_topology_patch(study, target_flags),
**_preserve_runtime_patch(study, target_flags),
},
},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=_effective_config_signature(
study,
{
"env_patch": {},
"flag_patch": {
**_preserve_topology_patch(study, target_flags),
**_preserve_runtime_patch(study, target_flags),
},
},
),
)
)
continue
patch = {
**_preserve_topology_patch(study, target_flags),
**_preserve_runtime_patch(study, target_flags),
**runtime_delta,
}
signature = _effective_config_signature(
study,
{"env_patch": {}, "flag_patch": patch},
)
if source_rate + EPSILON < target_rate + max(
_FRONTIER_DELTA_MIN_ABS_GAIN,
target_rate * _FRONTIER_DELTA_MIN_REL_GAIN,
):
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="frontier-delta-projection",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_delta_source_not_above_target_anchor",
effective_config_signature=signature,
)
)
continue
if (
incumbent_rate > 0
and target_rate + source_gain + EPSILON
< incumbent_rate * _FRONTIER_DELTA_PROJECTED_INCUMBENT_FLOOR
):
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="frontier-delta-projection",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_projected_gain_below_incumbent_floor",
effective_config_signature=signature,
)
)
continue
if signature in seen_signatures:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="frontier-delta-projection",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_noop_or_repeat_effective_full_config",
effective_config_signature=signature,
)
)
continue
score, factors = _score_frontier_delta_projection(
top_bottleneck=top_bottleneck,
source_gain=delta["gain"],
source_baseline_rate=delta["baseline_rate_per_gpu"],
source_rate=delta["source_rate_per_gpu"],
source_parallel=source_parallel,
target_rate=target_rate,
target_parallel=target_parallel,
bottleneck_hypotheses=bottleneck_hypotheses,
)
if score <= 0:
blocked_candidates.append(
_blocked_candidate(
action_id=action_id,
knob_family="frontier-delta-projection",
config_patch={"env_patch": {}, "flag_patch": patch},
blocked_reason="blocked_non_positive_candidate_score",
effective_config_signature=signature,
score=round(score, 4),
score_factors=factors,
)
)
continue
actions.append(
_runtime_action(
action_id=action_id,
knob_family="frontier-delta-projection",
score=score,
score_factors=factors,
patch=patch,
hypothesis=(
"A measured runtime-only delta improved one topology anchor; "
"apply the same delta to another Pareto anchor to test whether "
"the mechanism transfers without inheriting the source topology."
),
expected_effects=[
"preserve the target anchor topology and only apply measured runtime changes",
"separate runtime-mechanism gain from topology-resource tradeoffs",
"reject if the projected delta fails to improve request_rate_per_gpu",
],
)
)
seen_signatures.add(signature)
actions.sort(key=lambda item: _as_float(item.get("score")), reverse=True)
return actions[:8]
def _pareto_frontier_anchor_profiles(
study: StudySpec,
trial_profiles: list[dict[str, Any]],
) -> list[dict[str, Any]]:
best_by_topology: dict[tuple[int, int, int, bool], dict[str, Any]] = {}
for profile in trial_profiles:
if profile.get("status") != "completed":
continue
rate = _profile_request_rate_per_gpu(profile)
if rate <= 0:
continue
flags = _effective_flags_for_item(study, profile)
topology_key = _topology_key(flags)
incumbent = best_by_topology.get(topology_key)
if incumbent is None:
best_by_topology[topology_key] = profile
continue
incumbent_rate = _profile_request_rate_per_gpu(incumbent)
raw_rate = _as_float(profile.get("performance", {}).get("best_request_rate"))
incumbent_raw_rate = _as_float(
incumbent.get("performance", {}).get("best_request_rate")
)
if (rate, raw_rate) > (incumbent_rate, incumbent_raw_rate):
best_by_topology[topology_key] = profile
anchors = list(best_by_topology.values())
frontier: list[dict[str, Any]] = []
for candidate in anchors:
candidate_rate = _profile_request_rate_per_gpu(candidate)
candidate_parallel = _topology_parallel_size(
_effective_flags_for_item(study, candidate)
)
dominated = False
for other in anchors:
if other is candidate:
continue
other_rate = _profile_request_rate_per_gpu(other)
other_parallel = _topology_parallel_size(_effective_flags_for_item(study, other))
if (
other_parallel <= candidate_parallel
and other_rate + EPSILON >= candidate_rate
and (other_parallel < candidate_parallel or other_rate > candidate_rate + EPSILON)
):
dominated = True
break
if not dominated:
frontier.append(candidate)
frontier.sort(
key=lambda item: (
_topology_parallel_size(_effective_flags_for_item(study, item)),
-_profile_request_rate_per_gpu(item),
str(item.get("trial_id") or ""),
)
)
return frontier
def _positive_runtime_delta_records(
study: StudySpec,
trial_profiles: list[dict[str, Any]],
) -> list[dict[str, Any]]:
completed = [
profile
for profile in trial_profiles
if profile.get("status") == "completed"
and _profile_request_rate_per_gpu(profile) > 0
]
records: list[dict[str, Any]] = []
for source in completed:
source_patch = source.get("config_patch")
if not isinstance(source_patch, dict):
continue
source_flag_patch = source_patch.get("flag_patch")
if not isinstance(source_flag_patch, dict):
continue
source_runtime_keys = set(source_flag_patch) & _RUNTIME_KEYS & set(study.engine.tunable_flags)
if not source_runtime_keys:
continue
source_flags = _effective_flags_for_item(study, source)
source_topology_key = _topology_key(source_flags)
source_rate = _profile_request_rate_per_gpu(source)
best_baseline: dict[str, Any] | None = None
best_baseline_delta: dict[str, Any] = {}
best_baseline_gain = 0.0
for baseline in completed:
if baseline is source:
continue
baseline_flags = _effective_flags_for_item(study, baseline)
if _topology_key(baseline_flags) != source_topology_key:
continue
baseline_rate = _profile_request_rate_per_gpu(baseline)
runtime_delta = _runtime_delta_patch(
study,
baseline_flags=baseline_flags,
source_flags=source_flags,
source_flag_patch=source_flag_patch,
)
if not runtime_delta:
continue
gain = source_rate - baseline_rate
min_gain = max(
_FRONTIER_DELTA_MIN_ABS_GAIN,
baseline_rate * _FRONTIER_DELTA_MIN_REL_GAIN,
)
if gain + EPSILON < min_gain:
continue
if (
best_baseline is None
or baseline_rate > _profile_request_rate_per_gpu(best_baseline)
):
best_baseline = baseline
best_baseline_delta = runtime_delta
best_baseline_gain = gain
if best_baseline is None:
continue
records.append(
{
"source_trial_id": source.get("trial_id"),
"baseline_trial_id": best_baseline.get("trial_id"),
"source_topology_key": source_topology_key,
"source_parallel_size": _topology_parallel_size(source_flags),
"runtime_delta": best_baseline_delta,
"gain": best_baseline_gain,
"source_rate_per_gpu": source_rate,
"baseline_rate_per_gpu": _profile_request_rate_per_gpu(best_baseline),
}
)
records.sort(
key=lambda item: (
_as_float(item.get("gain")),
_as_float(item.get("source_rate_per_gpu")),
),
reverse=True,
)
return records[:8]
def _runtime_delta_patch(
study: StudySpec,
*,
baseline_flags: dict[str, Any],
source_flags: dict[str, Any],
source_flag_patch: dict[str, Any],
) -> dict[str, Any]:
tunable = set(study.engine.tunable_flags)
patch: dict[str, Any] = {}
for key in sorted(_RUNTIME_KEYS & tunable & set(source_flag_patch)):
if baseline_flags.get(key) == source_flags.get(key):
continue
patch[key] = source_flags.get(key)
return patch
def _score_frontier_delta_projection(
*,
top_bottleneck: str,
source_gain: float,
source_baseline_rate: float,
source_rate: float,
source_parallel: int,
target_rate: float,
target_parallel: int,
bottleneck_hypotheses: list[dict[str, Any]],
) -> tuple[float, dict[str, Any]]:
relative_gain = source_gain / max(source_baseline_rate, EPSILON)
measured_delta_evidence = min(0.24, max(0.0, relative_gain) * 0.8)
if target_parallel < source_parallel:
resource_transfer = 0.16
elif target_parallel == source_parallel:
resource_transfer = 0.06
else:
resource_transfer = -0.04
anchor_quality = min(0.12, max(0.0, target_rate) / max(source_rate, target_rate, EPSILON) * 0.12)
information_gain = _information_gain(bottleneck_hypotheses, "runtime")
score = 0.42 + measured_delta_evidence + resource_transfer + anchor_quality + information_gain
factors = {
"active_bottleneck": top_bottleneck,
"measured_delta_gain_per_gpu": round(source_gain, 4),
"measured_delta_relative_gain": round(relative_gain, 4),
"measured_delta_evidence": round(measured_delta_evidence, 4),
"source_parallel_size": source_parallel,
"target_parallel_size": target_parallel,
"resource_transfer": round(resource_transfer, 4),
"target_anchor_rate_per_gpu": round(target_rate, 4),
"source_delta_rate_per_gpu": round(source_rate, 4),
"anchor_quality": round(anchor_quality, 4),
"information_gain": round(information_gain, 4),
"regression_risk": 0.05,
}
return score, factors
def _next_gpu_memory_utilization_target(
study: StudySpec,
anchor_flags: dict[str, Any],
@@ -2253,6 +2636,25 @@ def _normalized_topology_flags(flags: dict[str, Any]) -> dict[str, Any]:
}
def _topology_key(flags: dict[str, Any]) -> tuple[int, int, int, bool]:
normalized = _normalized_topology_flags(flags)
return (
int(normalized["tensor-parallel-size"]),
int(normalized["data-parallel-size"]),
int(normalized["expert-parallel-size"]),
bool(normalized["enable-expert-parallel"]),
)
def _topology_parallel_size(flags: dict[str, Any]) -> int:
normalized = _normalized_topology_flags(flags)
return max(
1,
int(normalized["tensor-parallel-size"])
* int(normalized["data-parallel-size"]),
)
def _effective_gpu_count(study: StudySpec) -> int:
visible = str(study.engine.base_envs.get("CUDA_VISIBLE_DEVICES") or "").strip()
if not visible:

View File

@@ -2720,6 +2720,157 @@ class CoreFlowTests(unittest.TestCase):
],
)
def test_harness_projects_measured_runtime_delta_to_other_frontier_anchor(self) -> None:
"""A runtime improvement found on one topology must be tested on other
Pareto anchors before the harness can keep micro-tuning the source topology."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
slo_overrides={
"ttft_rule": {"kind": "fixed_ms", "threshold_ms": 4000},
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50},
},
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 2,
"data-parallel-size": 1,
"gpu-memory-utilization": 0.5,
"max-num-seqs": 8,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"gpu-memory-utilization",
"max-num-seqs",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [2, 4, 8],
"allowed_data_parallel_sizes": [1],
"allowed_tp_dp_products": [2, 4, 8],
},
},
)
study = load_study_spec(study_path)
latest_result_path = tmp_path / "trial-0005.json"
latest_result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.1,
"best_request_rate": 8.0,
"best_pass_rate": 0.96,
"probes": [
{
"threshold": 0.1,
"feasible": True,
"payload": {
"request_count": 300,
"pass_rate": 0.96,
"request_rate": 8.0,
"latency_summary": {"failed_reason_counts": {}},
},
},
{
"threshold": 0.12,
"feasible": False,
"payload": {
"request_count": 300,
"pass_rate": 0.6,
"request_rate": 9.0,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 100}
},
},
},
],
}
),
encoding="utf-8",
)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0005",
best_request_rate=8.0,
best_request_rate_per_gpu=2.0,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
parallel_size=2,
best_request_rate=3.466,
best_request_rate_per_gpu=1.733,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
parallel_size=4,
best_request_rate=6.95,
best_request_rate_per_gpu=1.7375,
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 4},
},
),
TrialSummary(
trial_id="trial-0003",
status="completed",
parallel_size=8,
best_request_rate=8.0,
best_request_rate_per_gpu=1.0,
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 8},
},
),
TrialSummary(
trial_id="trial-0004",
status="completed",
parallel_size=4,
best_request_rate=6.95,
best_request_rate_per_gpu=1.7375,
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"max-num-seqs": 16,
},
},
),
TrialSummary(
trial_id="trial-0005",
status="completed",
parallel_size=4,
best_request_rate=8.0,
best_request_rate_per_gpu=2.0,
result_path=str(latest_result_path),
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"gpu-memory-utilization": 0.9,
},
},
),
],
)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 6500},
state=state,
)
next_action = context["experiment_plan"]["next_action"]
self.assertEqual(next_action["knob_family"], "frontier-delta-projection")
self.assertEqual(
next_action["config_patch"]["flag_patch"],
{"gpu-memory-utilization": 0.9},
)
self.assertIsNone(build_harness_stop_proposal(context))
def test_harness_validates_unmeasured_tp_frontier_before_runtime_refinement(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)