Add profile-driven harness planner

This commit is contained in:
2026-05-12 21:28:44 +08:00
parent 63d6a111f4
commit 17e9681ca0
3 changed files with 976 additions and 2 deletions

View File

@@ -0,0 +1,73 @@
# Profile-Driven Harness Implementation Log
Date: 2026-05-12
## Goal
The harness should accelerate AITuner as a general tuning system, not as a collection of case-specific rules. The current implementation moves the harness toward a performance-engineering loop:
1. extract a compact profile from each measured trial;
2. rank bottleneck hypotheses from workload and probe evidence;
3. generate generic candidate actions from a knob-effect model;
4. score candidates by expected bottleneck relief, information gain, launch safety, and regression risk;
5. block early stop while a high-value untested candidate remains.
This is intended to apply across qwen3.5-27b chat, qwen3-235b prefill-only, qwen3-235b decode-only, and different SLOs without encoding model names, SLO constants, or known winning configs.
## Code Changes
- `src/aituner/harness.py`
- Added `trial_profiles` to normalize trial topology, performance, probe failures, latency quantiles, and launch failure evidence.
- Added `bottleneck_hypotheses`, a ranked list instead of a single active bottleneck label.
- Added `candidate_actions`, generated from topology and runtime knob families.
- Added `experiment_plan`, which selects the next high-score candidate or declares stop readiness.
- Updated harness proposal generation to prefer the profile-driven next action before falling back to legacy deterministic proposal code.
- Updated harness stop logic so convergence/validation stop is blocked when the planner still has a high-value untested candidate.
- `tests/test_core_flow.py`
- Added coverage that a strong TP=2 incumbent with TTFT pressure still selects an unmeasured TP=4 topology candidate.
- Added coverage that decode-only TPOT pressure at max TP can prefer lowering `max-num-seqs` instead of blindly lowering TP.
## Current Scoring Model
The candidate score is intentionally generic:
```text
score = expected_bottleneck_relief * bottleneck_confidence
+ information_gain
+ launch_safety
- regression_risk
```
Examples:
- TTFT/prefill bottleneck: increasing TP and prefill batching candidates receive relief score.
- Decode TPOT bottleneck: increasing TP is useful if a higher legal TP exists; if already at high TP, lowering decode concurrency can become the higher-value candidate.
- Admission/queueing bottleneck: more DP or higher safe concurrency receives relief score.
The scores are not tied to qwen27b/qwen235b or a fixed TPOT/TTFT threshold. They are tied to the measured bottleneck class and legal tunable space.
## Verification
Local:
```bash
python3 -m compileall -q src tests
PYTHONPATH=src python3 -m unittest tests.test_core_flow
```
Result: `93` tests passed.
## Next Experiment
Run the same qwen3.5-27b chat 0-8k setup as the current ablation baseline:
- workload: chat, input length 0-8k
- SLO: TTFT p95 <= 4000ms, TPOT p95 <= 25ms, target pass rate 0.95
- search: full range, `inherit_incumbent_floor=false`
- budget: 12 total tuning iterations
- LLM model: `gpt-5.4`
- variant: harness enabled with profile-driven planner
The no-harness min-prompt baseline is already available and only needs to be reused for comparison unless the setup changes.

View File

@@ -32,21 +32,45 @@ def build_harness_context(
state: StudyState,
) -> dict[str, Any]:
recent_diagnostics = _recent_trial_diagnostics(state)
trial_profiles = _trial_profiles(study, recent_diagnostics)
bottleneck_hypotheses = _rank_bottleneck_hypotheses(
study,
window_summary,
trial_profiles,
)
experiment_plan = _experiment_plan(
study,
window_summary,
state,
recent_diagnostics,
trial_profiles,
bottleneck_hypotheses,
)
return {
"paper_alignment": {
"goal": "Use workload-feature-to-knob harnesses to reduce wasted trials and avoid regressing after a good configuration is found.",
"feature_model": "L-C-A: request lengths, inter-request KV-cache reuse, and arrival dynamics.",
"trial_policy": "Map the active bottleneck to one knob family, apply guard conditions, and stop when the incumbent has converged.",
"trial_policy": "Profile measured trials, rank bottleneck hypotheses, score generic candidate actions, and stop only when no useful measured hypothesis remains.",
},
"workload_lca_profile": _workload_lca_profile(window_summary),
"recent_trial_diagnostics": recent_diagnostics,
"trial_profiles": trial_profiles,
"bottleneck_hypotheses": bottleneck_hypotheses,
"candidate_actions": experiment_plan["candidate_actions"],
"experiment_plan": experiment_plan,
"convergence_guard": _convergence_guard(state, recent_diagnostics),
"harness_stop": _harness_stop_decision(study, state, recent_diagnostics),
"harness_stop": _harness_stop_decision(
study,
state,
recent_diagnostics,
experiment_plan=experiment_plan,
),
"harness_proposal": _harness_proposal_decision(
study,
window_summary,
state,
recent_diagnostics,
experiment_plan=experiment_plan,
),
"knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics),
"proposal_rules": _proposal_rules(),
@@ -348,6 +372,164 @@ def _recent_trial_diagnostics(state: StudyState) -> list[dict[str, Any]]:
return diagnostics
def _trial_profiles(
study: StudySpec,
recent_diagnostics: list[dict[str, Any]],
) -> list[dict[str, Any]]:
profiles: list[dict[str, Any]] = []
for item in recent_diagnostics:
flags = _effective_flags_for_item(study, item)
probe_summary = item.get("probe_summary")
best_probe = None
last_probe = None
all_infeasible = None
if isinstance(probe_summary, dict):
best_probe = probe_summary.get("best_feasible_probe")
last_probe = probe_summary.get("last_probe")
all_infeasible = probe_summary.get("all_infeasible")
limiting_probe = (
last_probe
if isinstance(last_probe, dict)
else all_infeasible
if isinstance(all_infeasible, dict)
else best_probe
if isinstance(best_probe, dict)
else None
)
latency = limiting_probe.get("latency_summary") if isinstance(limiting_probe, dict) else {}
if not isinstance(latency, dict):
latency = {}
failed_counts = latency.get("failed_reason_counts")
if not isinstance(failed_counts, dict):
failed_counts = {}
profile = {
"trial_id": item.get("trial_id"),
"status": item.get("status"),
"config_patch": item.get("config_patch") if isinstance(item.get("config_patch"), dict) else {},
"topology": {
"tensor_parallel_size": _parse_int_like(
flags.get("tensor-parallel-size"),
default=1,
),
"data_parallel_size": _parse_int_like(
flags.get("data-parallel-size"),
default=1,
),
"expert_parallel_size": _parse_int_like(
flags.get("expert-parallel-size"),
default=1,
),
"enable_expert_parallel": bool(flags.get("enable-expert-parallel", False)),
},
"performance": {
"best_request_rate": item.get("best_request_rate"),
"best_request_rate_per_gpu": item.get("best_request_rate_per_gpu"),
"best_pass_rate": item.get("best_pass_rate"),
},
"probe_profile": {
"best_feasible_probe": best_probe,
"limiting_probe": limiting_probe,
"active_bottleneck": item.get("active_bottleneck"),
"failed_reason_counts": failed_counts,
"latency_quantiles": {
"ttft_ms": latency.get("ttft_ms") if isinstance(latency.get("ttft_ms"), dict) else {},
"tpot_ms": latency.get("tpot_ms") if isinstance(latency.get("tpot_ms"), dict) else {},
},
},
"failure_profile": {
"failure_stage": item.get("failure_stage"),
"failure_reason": item.get("failure_reason"),
},
}
profiles.append(profile)
return profiles
def _rank_bottleneck_hypotheses(
study: StudySpec,
window_summary: dict[str, Any],
trial_profiles: list[dict[str, Any]],
) -> list[dict[str, Any]]:
scores = {
"ttft_prefill": 0.0,
"decode_tpot": 0.0,
"admission_or_queueing": 0.0,
"launch_or_memory": 0.0,
}
evidence: dict[str, list[str]] = {name: [] for name in scores}
default = _workload_default_bottleneck(study, window_summary)
if default in scores:
scores[default] += 0.18
evidence[default].append(f"workload default bottleneck is {default}")
if study.trace.request_mode == "decode_only" and study.slo.tpot_rule is not None:
scores["decode_tpot"] += 0.22
evidence["decode_tpot"].append("decode_only study with configured TPOT SLO")
if study.slo.ttft_rule is not None:
prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95"))
if prompt_p95 >= 4096:
scores["ttft_prefill"] += 0.14
evidence["ttft_prefill"].append("long prompt p95 makes TTFT/prefill plausible")
for profile in trial_profiles[-stateful_history_limit() :]:
active = str(profile.get("probe_profile", {}).get("active_bottleneck") or "")
if active in scores:
weight = 0.34 if profile is trial_profiles[-1] else 0.18
scores[active] += weight
evidence[active].append(
f"{profile.get('trial_id')} probe diagnosis is {active}"
)
failed = profile.get("probe_profile", {}).get("failed_reason_counts")
if not isinstance(failed, dict):
failed = {}
ttft_count = sum(int(v) for k, v in failed.items() if str(k).startswith("ttft"))
tpot_count = sum(int(v) for k, v in failed.items() if str(k).startswith("tpot"))
elapsed_count = sum(
int(v)
for k, v in failed.items()
if str(k).startswith("probe_elapsed_s>")
or str(k).startswith("arrival_lag_s>")
)
total = max(ttft_count + tpot_count + elapsed_count, 1)
if ttft_count:
scores["ttft_prefill"] += min(0.24, 0.24 * ttft_count / total)
evidence["ttft_prefill"].append(
f"{profile.get('trial_id')} TTFT failures={ttft_count}"
)
if tpot_count:
scores["decode_tpot"] += min(0.24, 0.24 * tpot_count / total)
evidence["decode_tpot"].append(
f"{profile.get('trial_id')} TPOT failures={tpot_count}"
)
if elapsed_count:
scores["admission_or_queueing"] += min(0.18, 0.18 * elapsed_count / total)
evidence["admission_or_queueing"].append(
f"{profile.get('trial_id')} queue/elapsed failures={elapsed_count}"
)
failure_stage = str(profile.get("failure_profile", {}).get("failure_stage") or "")
failure_reason = str(profile.get("failure_profile", {}).get("failure_reason") or "")
if failure_stage == "engine_launch" or "out of memory" in failure_reason.lower():
scores["launch_or_memory"] += 0.4
evidence["launch_or_memory"].append(
f"{profile.get('trial_id')} launch or memory failure"
)
ranked = []
for name, score in scores.items():
if score <= 0:
continue
ranked.append(
{
"name": name,
"confidence": min(0.99, round(score, 4)),
"evidence": evidence[name][:6],
}
)
ranked.sort(key=lambda item: item["confidence"], reverse=True)
return ranked
def stateful_history_limit() -> int:
return 8
@@ -513,6 +695,8 @@ def _harness_stop_decision(
study: StudySpec,
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
*,
experiment_plan: dict[str, Any] | None = None,
) -> dict[str, Any]:
high_saturation = _search_high_saturation_guard(study, state, recent_diagnostics)
if high_saturation["saturated"]:
@@ -528,6 +712,17 @@ def _harness_stop_decision(
"reason": "topology_frontier_requires_probe",
"evidence": topology_frontier,
}
if experiment_plan is not None and experiment_plan.get("next_action"):
action = experiment_plan["next_action"]
if isinstance(action, dict) and _as_float(action.get("score")) >= 0.35:
return {
"should_stop": False,
"reason": "experiment_plan_has_high_value_candidate",
"evidence": {
"summary": "The profile-driven planner still has a useful measured hypothesis to test.",
"next_action": action,
},
}
guard = _convergence_guard(state, recent_diagnostics)
if guard["deterministic_stop"]:
return {
@@ -562,6 +757,8 @@ def _harness_proposal_decision(
window_summary: dict[str, Any],
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
*,
experiment_plan: dict[str, Any] | None = None,
) -> dict[str, Any]:
default = {
"should_propose": False,
@@ -575,6 +772,26 @@ def _harness_proposal_decision(
for item in recent_diagnostics
}
tested_signatures.update(_state_tested_signatures(state))
if experiment_plan is not None:
next_action = experiment_plan.get("next_action")
if isinstance(next_action, dict) and _as_float(next_action.get("score")) >= 0.35:
patch = next_action.get("config_patch")
if isinstance(patch, dict):
signature = _config_signature(patch)
if signature not in tested_signatures:
return {
"should_propose": True,
"reason": str(next_action.get("action_id") or "profile_driven_candidate"),
"diagnosis": str(next_action.get("hypothesis") or "Profile-driven harness candidate."),
"config_patch": patch,
"expected_effects": [
str(item)
for item in next_action.get("expected_effects", [])
if isinstance(item, str)
],
"candidate_score": next_action.get("score"),
"bottleneck_hypotheses": experiment_plan.get("bottleneck_hypotheses", []),
}
baseline = recent_diagnostics[0] if recent_diagnostics else {}
topology_frontier = _topology_frontier_proposal(
study,
@@ -691,6 +908,526 @@ def _topology_frontier_proposal(
}
def _experiment_plan(
study: StudySpec,
window_summary: dict[str, Any],
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
trial_profiles: list[dict[str, Any]],
bottleneck_hypotheses: list[dict[str, Any]],
) -> dict[str, Any]:
tested_signatures = {
_config_signature(item.get("config_patch") if isinstance(item, dict) else None)
for item in recent_diagnostics
}
tested_signatures.update(_state_tested_signatures(state))
candidates = _candidate_actions(
study,
window_summary,
state,
recent_diagnostics,
trial_profiles,
bottleneck_hypotheses,
tested_signatures=tested_signatures,
)
candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True)
next_action = candidates[0] if candidates else None
return {
"planner_version": "profile-driven-v1",
"bottleneck_hypotheses": bottleneck_hypotheses,
"candidate_actions": candidates[:8],
"next_action": next_action,
"stop_ready": next_action is None or _as_float(next_action.get("score")) < 0.35,
"stop_rationale": (
"no untested high-value candidate remains"
if not candidates or _as_float(candidates[0].get("score")) < 0.35
else "continue with the highest-scoring measured hypothesis"
),
}
def _candidate_actions(
study: StudySpec,
window_summary: dict[str, Any],
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
trial_profiles: list[dict[str, Any]],
bottleneck_hypotheses: list[dict[str, Any]],
*,
tested_signatures: set[str],
) -> list[dict[str, Any]]:
if not recent_diagnostics:
return []
anchor = _anchor_profile(study, state, recent_diagnostics, trial_profiles)
if anchor is None:
return []
top_bottleneck = (
str(bottleneck_hypotheses[0]["name"])
if bottleneck_hypotheses
else str(anchor.get("probe_profile", {}).get("active_bottleneck") or "")
)
candidates: list[dict[str, Any]] = []
candidates.extend(
_topology_candidate_actions(
study,
anchor,
top_bottleneck,
bottleneck_hypotheses,
tested_signatures,
)
)
candidates.extend(
_runtime_candidate_actions(
study,
window_summary,
anchor,
top_bottleneck,
bottleneck_hypotheses,
tested_signatures,
)
)
return candidates
def _anchor_profile(
study: StudySpec,
state: StudyState,
recent_diagnostics: list[dict[str, Any]],
trial_profiles: list[dict[str, Any]],
) -> dict[str, Any] | None:
if state.best_trial_id:
for profile in reversed(trial_profiles):
if profile.get("trial_id") == state.best_trial_id:
return profile
for profile in reversed(trial_profiles):
if profile.get("status") == "completed":
return profile
return trial_profiles[-1] if trial_profiles else None
def _topology_candidate_actions(
study: StudySpec,
anchor: dict[str, Any],
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
tested_signatures: set[str],
) -> list[dict[str, Any]]:
if not ({"tensor-parallel-size", "data-parallel-size"} & set(study.engine.tunable_flags)):
return []
anchor_flags = _effective_flags_for_item(study, anchor)
current_tp = _parse_int_like(anchor_flags.get("tensor-parallel-size"), default=1)
current_dp = _parse_int_like(anchor_flags.get("data-parallel-size"), default=1)
current_ep = _parse_int_like(anchor_flags.get("expert-parallel-size"), default=1)
current_enable_ep = bool(anchor_flags.get("enable-expert-parallel", False))
legal = _legal_topology_points(
study,
current_tp=current_tp,
current_dp=current_dp,
current_ep=current_ep,
current_enable_ep=current_enable_ep,
)
actions: list[dict[str, Any]] = []
for point in legal:
if point["tensor-parallel-size"] == current_tp and point["data-parallel-size"] == current_dp:
continue
patch = _topology_patch(study, point)
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures:
continue
score, factors = _score_topology_candidate(
top_bottleneck,
bottleneck_hypotheses,
current_tp=current_tp,
current_dp=current_dp,
candidate_tp=point["tensor-parallel-size"],
candidate_dp=point["data-parallel-size"],
)
if score <= 0:
continue
action_id = _topology_action_id(current_tp, current_dp, point)
actions.append(
{
"action_id": action_id,
"knob_family": "topology",
"score": round(score, 4),
"score_factors": factors,
"config_patch": {"env_patch": {}, "flag_patch": patch},
"hypothesis": _topology_hypothesis(
top_bottleneck,
current_tp=current_tp,
current_dp=current_dp,
candidate_tp=point["tensor-parallel-size"],
candidate_dp=point["data-parallel-size"],
),
"expected_effects": [
"measure whether topology changes relieve the ranked bottleneck",
"compare request_rate_per_gpu under the configured SLO, not raw throughput alone",
"reject this hypothesis if latency improves but per-GPU throughput regresses materially",
],
}
)
return actions
def _runtime_candidate_actions(
study: StudySpec,
window_summary: dict[str, Any],
anchor: dict[str, Any],
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
tested_signatures: set[str],
) -> list[dict[str, Any]]:
tunable = set(study.engine.tunable_flags)
anchor_flags = _effective_flags_for_item(study, anchor)
topology_patch = _preserve_topology_patch(study, anchor_flags)
actions: list[dict[str, Any]] = []
if "max-num-batched-tokens" in tunable:
current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0)
mbt_targets: list[tuple[str, int]] = []
if top_bottleneck == "ttft_prefill":
target = (
_initial_mbt_from_window(window_summary)
if current_mbt <= 0
else _next_mbt_step(current_mbt)
)
if target is not None:
mbt_targets.append(("raise_mbt", target))
elif top_bottleneck == "decode_tpot" and current_mbt > 8192:
mbt_targets.append(("lower_mbt", max(8192, current_mbt // 2)))
for action_id, target in mbt_targets:
patch = {**topology_patch, "max-num-batched-tokens": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures:
continue
relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14
actions.append(
_runtime_action(
action_id=action_id,
knob_family="max-num-batched-tokens",
score=relief + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"Adjust max-num-batched-tokens to test whether batching, not topology, "
"is limiting the active latency objective."
),
expected_effects=[
"change prefill/decode batching pressure on the incumbent topology",
"confirm if the latency knee moves without requiring another topology change",
],
)
)
if "max-num-seqs" in tunable:
current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0)
mns_targets: list[tuple[str, int]] = []
if top_bottleneck == "admission_or_queueing":
target = max(8, int(current_mns * 1.5)) if current_mns > 0 else 64
mns_targets.append(("raise_max_num_seqs", _round_up_to_multiple(target, 8)))
elif top_bottleneck == "decode_tpot" and current_mns > 8:
mns_targets.append(("lower_max_num_seqs", max(8, current_mns // 2)))
for action_id, target in mns_targets:
patch = {**topology_patch, "max-num-seqs": target}
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature in tested_signatures:
continue
relief = 0.25 if top_bottleneck in {"decode_tpot", "admission_or_queueing"} else 0.08
actions.append(
_runtime_action(
action_id=action_id,
knob_family="max-num-seqs",
score=relief + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"Adjust max-num-seqs to test whether concurrency pressure is the "
"limiting factor under the configured SLO."
),
expected_effects=[
"change decode/admission concurrency on the incumbent topology",
"confirm if TPOT or queueing pressure is caused by sequence concurrency",
],
)
)
if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill":
current = bool(anchor_flags.get("enable-chunked-prefill", False))
if not current:
patch = {**topology_patch, "enable-chunked-prefill": True}
signature = _config_signature({"env_patch": {}, "flag_patch": patch})
if signature not in tested_signatures:
actions.append(
_runtime_action(
action_id="enable_chunked_prefill",
knob_family="enable-chunked-prefill",
score=0.2 + _information_gain(bottleneck_hypotheses, "runtime"),
patch=patch,
hypothesis=(
"Enable chunked prefill to test whether long-prefill head-of-line "
"blocking is driving TTFT failures."
),
expected_effects=[
"reduce long-prefill interference for mixed-length chat windows",
"reject if chunking overhead worsens request_rate_per_gpu",
],
)
)
return actions
def _runtime_action(
*,
action_id: str,
knob_family: str,
score: float,
patch: dict[str, Any],
hypothesis: str,
expected_effects: list[str],
) -> dict[str, Any]:
return {
"action_id": action_id,
"knob_family": knob_family,
"score": round(score, 4),
"score_factors": {
"expected_bottleneck_relief": round(max(score - 0.1, 0.0), 4),
"information_gain": 0.1,
"launch_safety": 0.05,
"regression_risk": 0.05,
},
"config_patch": {"env_patch": {}, "flag_patch": patch},
"hypothesis": hypothesis,
"expected_effects": expected_effects,
}
def _legal_topology_points(
study: StudySpec,
*,
current_tp: int,
current_dp: int,
current_ep: int,
current_enable_ep: bool,
) -> list[dict[str, Any]]:
constraints = study.engine.topology_constraints
tunable = set(study.engine.tunable_flags)
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
tp_values = sorted(set(constraints.allowed_tensor_parallel_sizes))
elif "tensor-parallel-size" in tunable:
tp_values = [value for value in [1, 2, 4, 8] if value <= study.hardware.gpu_count]
else:
tp_values = [current_tp]
if constraints is not None and constraints.allowed_data_parallel_sizes:
dp_values = sorted(set(constraints.allowed_data_parallel_sizes))
elif "data-parallel-size" in tunable:
dp_values = [value for value in [1, 2, 4, 8] if value <= study.hardware.gpu_count]
else:
dp_values = [current_dp]
if constraints is not None and constraints.allowed_expert_parallel_sizes:
ep_values = sorted(set(constraints.allowed_expert_parallel_sizes))
elif "expert-parallel-size" in tunable:
ep_values = sorted({1, current_ep})
else:
ep_values = [current_ep]
points: list[dict[str, Any]] = []
for tp in tp_values:
for dp in dp_values:
tp_dp_product = tp * dp
if constraints is not None:
if (
constraints.allowed_tp_dp_products
and tp_dp_product not in constraints.allowed_tp_dp_products
):
continue
if (
constraints.require_tp_dp_product_equals_gpu_count
and tp_dp_product != study.hardware.gpu_count
):
continue
elif tp_dp_product > study.hardware.gpu_count:
continue
if constraints is not None and not constraints.allowed_tp_dp_products:
if tp_dp_product > study.hardware.gpu_count:
continue
for ep in ep_values:
enable_ep = current_enable_ep or ep > 1
if constraints is not None:
if constraints.allowed_expert_parallel_sizes and ep not in constraints.allowed_expert_parallel_sizes:
continue
if constraints.require_ep_size_leq_tp_dp_product and ep > tp_dp_product:
continue
if constraints.require_ep_size_divides_tp_dp_product and tp_dp_product % ep != 0:
continue
if (
constraints.require_enable_expert_parallel_when_ep_gt_one
and ep > 1
and not enable_ep
):
continue
points.append(
{
"tensor-parallel-size": tp,
"data-parallel-size": dp,
"expert-parallel-size": ep,
"enable-expert-parallel": enable_ep,
}
)
return points
def _topology_patch(study: StudySpec, point: dict[str, Any]) -> dict[str, Any]:
patch: dict[str, Any] = {}
tunable = set(study.engine.tunable_flags)
base = _normalized_topology_flags(study.engine.base_flags)
for key in (
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"enable-expert-parallel",
):
if key not in tunable:
continue
if key in point and point[key] != base.get(key):
patch[key] = point[key]
return patch
def _preserve_topology_patch(study: StudySpec, flags: dict[str, Any]) -> dict[str, Any]:
patch: dict[str, Any] = {}
tunable = set(study.engine.tunable_flags)
base = _normalized_topology_flags(study.engine.base_flags)
normalized = _normalized_topology_flags(flags)
for key in (
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"enable-expert-parallel",
):
if key not in tunable or key not in normalized:
continue
if normalized.get(key) != base.get(key):
patch[key] = normalized[key]
return patch
def _normalized_topology_flags(flags: dict[str, Any]) -> dict[str, Any]:
return {
"tensor-parallel-size": _parse_int_like(
flags.get("tensor-parallel-size"),
default=1,
),
"data-parallel-size": _parse_int_like(
flags.get("data-parallel-size"),
default=1,
),
"expert-parallel-size": _parse_int_like(
flags.get("expert-parallel-size"),
default=1,
),
"enable-expert-parallel": bool(flags.get("enable-expert-parallel", False)),
}
def _score_topology_candidate(
top_bottleneck: str,
bottleneck_hypotheses: list[dict[str, Any]],
*,
current_tp: int,
current_dp: int,
candidate_tp: int,
candidate_dp: int,
) -> tuple[float, dict[str, float]]:
tp_delta = candidate_tp - current_tp
dp_delta = candidate_dp - current_dp
confidence = _hypothesis_confidence(bottleneck_hypotheses, top_bottleneck)
relief = 0.0
if top_bottleneck == "ttft_prefill":
relief = 0.42 if tp_delta > 0 else 0.05
elif top_bottleneck == "decode_tpot":
relief = 0.34 if tp_delta > 0 else 0.02
elif top_bottleneck == "admission_or_queueing":
relief = 0.34 if dp_delta > 0 else 0.08
else:
relief = 0.04
info_gain = 0.2 if abs(tp_delta) + abs(dp_delta) > 0 else 0.0
launch_safety = 0.08 if candidate_tp * candidate_dp <= max(current_tp * current_dp, 1) else 0.04
distance = abs(_log2_ratio(candidate_tp, current_tp)) + abs(_log2_ratio(candidate_dp, current_dp))
regression_risk = min(0.28, 0.06 * distance)
score = relief * max(confidence, 0.35) + info_gain + launch_safety - regression_risk
return score, {
"expected_bottleneck_relief": round(relief, 4),
"bottleneck_confidence": round(confidence, 4),
"information_gain": round(info_gain, 4),
"launch_safety": round(launch_safety, 4),
"regression_risk": round(regression_risk, 4),
}
def _information_gain(bottleneck_hypotheses: list[dict[str, Any]], family: str) -> float:
if not bottleneck_hypotheses:
return 0.08
top_confidence = _as_float(bottleneck_hypotheses[0].get("confidence"))
uncertainty = max(0.0, 1.0 - top_confidence)
return 0.08 + min(0.12, uncertainty * 0.12)
def _hypothesis_confidence(
bottleneck_hypotheses: list[dict[str, Any]],
name: str,
) -> float:
for item in bottleneck_hypotheses:
if item.get("name") == name:
return _as_float(item.get("confidence"))
return 0.0
def _topology_action_id(
current_tp: int,
current_dp: int,
point: dict[str, Any],
) -> str:
candidate_tp = int(point["tensor-parallel-size"])
candidate_dp = int(point["data-parallel-size"])
if candidate_tp > current_tp:
return "topology_frontier_probe_for_slo_pressure"
if candidate_dp > current_dp:
return "increase_data_parallel_probe"
if candidate_tp < current_tp:
return "decrease_tensor_parallel_probe"
return "redistribute_topology_probe"
def _topology_hypothesis(
top_bottleneck: str,
*,
current_tp: int,
current_dp: int,
candidate_tp: int,
candidate_dp: int,
) -> str:
return (
f"Ranked bottleneck is {top_bottleneck}. Test topology "
f"TP={candidate_tp}, DP={candidate_dp} against incumbent TP={current_tp}, "
f"DP={current_dp}; this distinguishes compute-latency relief from "
"replica/admission effects under the configured SLO."
)
def _log2_ratio(new: int, old: int) -> float:
if new <= 0 or old <= 0:
return 0.0
ratio = new / old
steps = 0.0
while ratio >= 2.0:
steps += 1.0
ratio /= 2.0
while ratio <= 0.5:
steps += 1.0
ratio *= 2.0
return steps
def _topology_frontier_status(
study: StudySpec,
state: StudyState,

View File

@@ -920,6 +920,170 @@ class CoreFlowTests(unittest.TestCase):
"topology_frontier_probe_for_slo_pressure",
)
def test_profile_driven_planner_scores_unmeasured_tp_frontier(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"tunable_flags": [
"tensor-parallel-size",
"max-num-batched-tokens",
"enable-chunked-prefill",
],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_tp_dp_products": [1, 2, 4],
},
},
)
result_path = tmp_path / "trial-0002.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 2.0,
"best_pass_rate": 0.96,
"probes": [
{
"threshold": 0.75,
"feasible": False,
"payload": {
"request_count": 100,
"pass_rate": 0.6,
"request_rate": 3.0,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"ttft_ms>4000.0": 35}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
context = build_harness_context(
study=study,
window_summary={"prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.8},
state=StudyState(
study_id=study.study_id,
best_trial_id="trial-0002",
best_request_rate=2.0,
best_request_rate_per_gpu=1.0,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=0.5,
best_request_rate_per_gpu=0.5,
config_patch={"env_patch": {}, "flag_patch": {}},
),
TrialSummary(
trial_id="trial-0002",
status="completed",
best_request_rate=2.0,
best_request_rate_per_gpu=1.0,
result_path=str(result_path),
config_patch={
"env_patch": {},
"flag_patch": {"tensor-parallel-size": 2},
},
),
],
),
)
plan = context["experiment_plan"]
self.assertEqual(plan["planner_version"], "profile-driven-v1")
self.assertEqual(plan["next_action"]["knob_family"], "topology")
self.assertEqual(
plan["next_action"]["config_patch"]["flag_patch"],
{"tensor-parallel-size": 4},
)
self.assertIn("ttft_prefill", context["bottleneck_hypotheses"][0]["name"])
self.assertFalse(context["harness_stop"]["should_stop"])
def test_profile_driven_planner_prefers_decode_concurrency_relief(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
trace_overrides={"request_mode": "decode_only"},
slo_overrides={
"ttft_rule": None,
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20},
},
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"tensor-parallel-size": 4,
"max-num-seqs": 64,
},
"tunable_flags": ["tensor-parallel-size", "max-num-seqs"],
"topology_constraints": {
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_tp_dp_products": [1, 2, 4],
},
},
)
result_path = tmp_path / "trial-0001.json"
result_path.write_text(
json.dumps(
{
"status": "completed",
"best_sampling_u": 0.25,
"best_request_rate": 1.0,
"best_pass_rate": 0.97,
"probes": [
{
"threshold": 0.5,
"feasible": False,
"payload": {
"request_count": 100,
"pass_rate": 0.5,
"request_rate": 2.0,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"failed_reason_counts": {"tpot_ms>20.0": 50}
},
},
}
],
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
context = build_harness_context(
study=study,
window_summary={},
state=StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_request_rate=1.0,
best_request_rate_per_gpu=0.25,
trials=[
TrialSummary(
trial_id="trial-0001",
status="completed",
best_request_rate=1.0,
best_request_rate_per_gpu=0.25,
result_path=str(result_path),
config_patch={"env_patch": {}, "flag_patch": {}},
)
],
),
)
plan = context["experiment_plan"]
self.assertEqual(plan["next_action"]["knob_family"], "max-num-seqs")
self.assertEqual(
plan["next_action"]["config_patch"]["flag_patch"],
{"max-num-seqs": 32},
)
def test_harness_stop_blocked_until_slo_driven_topology_frontier_is_measured(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)