diff --git a/docs/harness-ablation/profile-driven-harness-implementation-20260512.md b/docs/harness-ablation/profile-driven-harness-implementation-20260512.md new file mode 100644 index 0000000..f85b9e4 --- /dev/null +++ b/docs/harness-ablation/profile-driven-harness-implementation-20260512.md @@ -0,0 +1,73 @@ +# Profile-Driven Harness Implementation Log + +Date: 2026-05-12 + +## Goal + +The harness should accelerate AITuner as a general tuning system, not as a collection of case-specific rules. The current implementation moves the harness toward a performance-engineering loop: + +1. extract a compact profile from each measured trial; +2. rank bottleneck hypotheses from workload and probe evidence; +3. generate generic candidate actions from a knob-effect model; +4. score candidates by expected bottleneck relief, information gain, launch safety, and regression risk; +5. block early stop while a high-value untested candidate remains. + +This is intended to apply across qwen3.5-27b chat, qwen3-235b prefill-only, qwen3-235b decode-only, and different SLOs without encoding model names, SLO constants, or known winning configs. + +## Code Changes + +- `src/aituner/harness.py` + - Added `trial_profiles` to normalize trial topology, performance, probe failures, latency quantiles, and launch failure evidence. + - Added `bottleneck_hypotheses`, a ranked list instead of a single active bottleneck label. + - Added `candidate_actions`, generated from topology and runtime knob families. + - Added `experiment_plan`, which selects the next high-score candidate or declares stop readiness. + - Updated harness proposal generation to prefer the profile-driven next action before falling back to legacy deterministic proposal code. + - Updated harness stop logic so convergence/validation stop is blocked when the planner still has a high-value untested candidate. + +- `tests/test_core_flow.py` + - Added coverage that a strong TP=2 incumbent with TTFT pressure still selects an unmeasured TP=4 topology candidate. + - Added coverage that decode-only TPOT pressure at max TP can prefer lowering `max-num-seqs` instead of blindly lowering TP. + +## Current Scoring Model + +The candidate score is intentionally generic: + +```text +score = expected_bottleneck_relief * bottleneck_confidence + + information_gain + + launch_safety + - regression_risk +``` + +Examples: + +- TTFT/prefill bottleneck: increasing TP and prefill batching candidates receive relief score. +- Decode TPOT bottleneck: increasing TP is useful if a higher legal TP exists; if already at high TP, lowering decode concurrency can become the higher-value candidate. +- Admission/queueing bottleneck: more DP or higher safe concurrency receives relief score. + +The scores are not tied to qwen27b/qwen235b or a fixed TPOT/TTFT threshold. They are tied to the measured bottleneck class and legal tunable space. + +## Verification + +Local: + +```bash +python3 -m compileall -q src tests +PYTHONPATH=src python3 -m unittest tests.test_core_flow +``` + +Result: `93` tests passed. + +## Next Experiment + +Run the same qwen3.5-27b chat 0-8k setup as the current ablation baseline: + +- workload: chat, input length 0-8k +- SLO: TTFT p95 <= 4000ms, TPOT p95 <= 25ms, target pass rate 0.95 +- search: full range, `inherit_incumbent_floor=false` +- budget: 12 total tuning iterations +- LLM model: `gpt-5.4` +- variant: harness enabled with profile-driven planner + +The no-harness min-prompt baseline is already available and only needs to be reused for comparison unless the setup changes. + diff --git a/src/aituner/harness.py b/src/aituner/harness.py index 321c43b..0d11480 100644 --- a/src/aituner/harness.py +++ b/src/aituner/harness.py @@ -32,21 +32,45 @@ def build_harness_context( state: StudyState, ) -> dict[str, Any]: recent_diagnostics = _recent_trial_diagnostics(state) + trial_profiles = _trial_profiles(study, recent_diagnostics) + bottleneck_hypotheses = _rank_bottleneck_hypotheses( + study, + window_summary, + trial_profiles, + ) + experiment_plan = _experiment_plan( + study, + window_summary, + state, + recent_diagnostics, + trial_profiles, + bottleneck_hypotheses, + ) return { "paper_alignment": { "goal": "Use workload-feature-to-knob harnesses to reduce wasted trials and avoid regressing after a good configuration is found.", "feature_model": "L-C-A: request lengths, inter-request KV-cache reuse, and arrival dynamics.", - "trial_policy": "Map the active bottleneck to one knob family, apply guard conditions, and stop when the incumbent has converged.", + "trial_policy": "Profile measured trials, rank bottleneck hypotheses, score generic candidate actions, and stop only when no useful measured hypothesis remains.", }, "workload_lca_profile": _workload_lca_profile(window_summary), "recent_trial_diagnostics": recent_diagnostics, + "trial_profiles": trial_profiles, + "bottleneck_hypotheses": bottleneck_hypotheses, + "candidate_actions": experiment_plan["candidate_actions"], + "experiment_plan": experiment_plan, "convergence_guard": _convergence_guard(state, recent_diagnostics), - "harness_stop": _harness_stop_decision(study, state, recent_diagnostics), + "harness_stop": _harness_stop_decision( + study, + state, + recent_diagnostics, + experiment_plan=experiment_plan, + ), "harness_proposal": _harness_proposal_decision( study, window_summary, state, recent_diagnostics, + experiment_plan=experiment_plan, ), "knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics), "proposal_rules": _proposal_rules(), @@ -348,6 +372,164 @@ def _recent_trial_diagnostics(state: StudyState) -> list[dict[str, Any]]: return diagnostics +def _trial_profiles( + study: StudySpec, + recent_diagnostics: list[dict[str, Any]], +) -> list[dict[str, Any]]: + profiles: list[dict[str, Any]] = [] + for item in recent_diagnostics: + flags = _effective_flags_for_item(study, item) + probe_summary = item.get("probe_summary") + best_probe = None + last_probe = None + all_infeasible = None + if isinstance(probe_summary, dict): + best_probe = probe_summary.get("best_feasible_probe") + last_probe = probe_summary.get("last_probe") + all_infeasible = probe_summary.get("all_infeasible") + limiting_probe = ( + last_probe + if isinstance(last_probe, dict) + else all_infeasible + if isinstance(all_infeasible, dict) + else best_probe + if isinstance(best_probe, dict) + else None + ) + latency = limiting_probe.get("latency_summary") if isinstance(limiting_probe, dict) else {} + if not isinstance(latency, dict): + latency = {} + failed_counts = latency.get("failed_reason_counts") + if not isinstance(failed_counts, dict): + failed_counts = {} + profile = { + "trial_id": item.get("trial_id"), + "status": item.get("status"), + "config_patch": item.get("config_patch") if isinstance(item.get("config_patch"), dict) else {}, + "topology": { + "tensor_parallel_size": _parse_int_like( + flags.get("tensor-parallel-size"), + default=1, + ), + "data_parallel_size": _parse_int_like( + flags.get("data-parallel-size"), + default=1, + ), + "expert_parallel_size": _parse_int_like( + flags.get("expert-parallel-size"), + default=1, + ), + "enable_expert_parallel": bool(flags.get("enable-expert-parallel", False)), + }, + "performance": { + "best_request_rate": item.get("best_request_rate"), + "best_request_rate_per_gpu": item.get("best_request_rate_per_gpu"), + "best_pass_rate": item.get("best_pass_rate"), + }, + "probe_profile": { + "best_feasible_probe": best_probe, + "limiting_probe": limiting_probe, + "active_bottleneck": item.get("active_bottleneck"), + "failed_reason_counts": failed_counts, + "latency_quantiles": { + "ttft_ms": latency.get("ttft_ms") if isinstance(latency.get("ttft_ms"), dict) else {}, + "tpot_ms": latency.get("tpot_ms") if isinstance(latency.get("tpot_ms"), dict) else {}, + }, + }, + "failure_profile": { + "failure_stage": item.get("failure_stage"), + "failure_reason": item.get("failure_reason"), + }, + } + profiles.append(profile) + return profiles + + +def _rank_bottleneck_hypotheses( + study: StudySpec, + window_summary: dict[str, Any], + trial_profiles: list[dict[str, Any]], +) -> list[dict[str, Any]]: + scores = { + "ttft_prefill": 0.0, + "decode_tpot": 0.0, + "admission_or_queueing": 0.0, + "launch_or_memory": 0.0, + } + evidence: dict[str, list[str]] = {name: [] for name in scores} + + default = _workload_default_bottleneck(study, window_summary) + if default in scores: + scores[default] += 0.18 + evidence[default].append(f"workload default bottleneck is {default}") + + if study.trace.request_mode == "decode_only" and study.slo.tpot_rule is not None: + scores["decode_tpot"] += 0.22 + evidence["decode_tpot"].append("decode_only study with configured TPOT SLO") + if study.slo.ttft_rule is not None: + prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95")) + if prompt_p95 >= 4096: + scores["ttft_prefill"] += 0.14 + evidence["ttft_prefill"].append("long prompt p95 makes TTFT/prefill plausible") + + for profile in trial_profiles[-stateful_history_limit() :]: + active = str(profile.get("probe_profile", {}).get("active_bottleneck") or "") + if active in scores: + weight = 0.34 if profile is trial_profiles[-1] else 0.18 + scores[active] += weight + evidence[active].append( + f"{profile.get('trial_id')} probe diagnosis is {active}" + ) + failed = profile.get("probe_profile", {}).get("failed_reason_counts") + if not isinstance(failed, dict): + failed = {} + ttft_count = sum(int(v) for k, v in failed.items() if str(k).startswith("ttft")) + tpot_count = sum(int(v) for k, v in failed.items() if str(k).startswith("tpot")) + elapsed_count = sum( + int(v) + for k, v in failed.items() + if str(k).startswith("probe_elapsed_s>") + or str(k).startswith("arrival_lag_s>") + ) + total = max(ttft_count + tpot_count + elapsed_count, 1) + if ttft_count: + scores["ttft_prefill"] += min(0.24, 0.24 * ttft_count / total) + evidence["ttft_prefill"].append( + f"{profile.get('trial_id')} TTFT failures={ttft_count}" + ) + if tpot_count: + scores["decode_tpot"] += min(0.24, 0.24 * tpot_count / total) + evidence["decode_tpot"].append( + f"{profile.get('trial_id')} TPOT failures={tpot_count}" + ) + if elapsed_count: + scores["admission_or_queueing"] += min(0.18, 0.18 * elapsed_count / total) + evidence["admission_or_queueing"].append( + f"{profile.get('trial_id')} queue/elapsed failures={elapsed_count}" + ) + failure_stage = str(profile.get("failure_profile", {}).get("failure_stage") or "") + failure_reason = str(profile.get("failure_profile", {}).get("failure_reason") or "") + if failure_stage == "engine_launch" or "out of memory" in failure_reason.lower(): + scores["launch_or_memory"] += 0.4 + evidence["launch_or_memory"].append( + f"{profile.get('trial_id')} launch or memory failure" + ) + + ranked = [] + for name, score in scores.items(): + if score <= 0: + continue + ranked.append( + { + "name": name, + "confidence": min(0.99, round(score, 4)), + "evidence": evidence[name][:6], + } + ) + ranked.sort(key=lambda item: item["confidence"], reverse=True) + return ranked + + def stateful_history_limit() -> int: return 8 @@ -513,6 +695,8 @@ def _harness_stop_decision( study: StudySpec, state: StudyState, recent_diagnostics: list[dict[str, Any]], + *, + experiment_plan: dict[str, Any] | None = None, ) -> dict[str, Any]: high_saturation = _search_high_saturation_guard(study, state, recent_diagnostics) if high_saturation["saturated"]: @@ -528,6 +712,17 @@ def _harness_stop_decision( "reason": "topology_frontier_requires_probe", "evidence": topology_frontier, } + if experiment_plan is not None and experiment_plan.get("next_action"): + action = experiment_plan["next_action"] + if isinstance(action, dict) and _as_float(action.get("score")) >= 0.35: + return { + "should_stop": False, + "reason": "experiment_plan_has_high_value_candidate", + "evidence": { + "summary": "The profile-driven planner still has a useful measured hypothesis to test.", + "next_action": action, + }, + } guard = _convergence_guard(state, recent_diagnostics) if guard["deterministic_stop"]: return { @@ -562,6 +757,8 @@ def _harness_proposal_decision( window_summary: dict[str, Any], state: StudyState, recent_diagnostics: list[dict[str, Any]], + *, + experiment_plan: dict[str, Any] | None = None, ) -> dict[str, Any]: default = { "should_propose": False, @@ -575,6 +772,26 @@ def _harness_proposal_decision( for item in recent_diagnostics } tested_signatures.update(_state_tested_signatures(state)) + if experiment_plan is not None: + next_action = experiment_plan.get("next_action") + if isinstance(next_action, dict) and _as_float(next_action.get("score")) >= 0.35: + patch = next_action.get("config_patch") + if isinstance(patch, dict): + signature = _config_signature(patch) + if signature not in tested_signatures: + return { + "should_propose": True, + "reason": str(next_action.get("action_id") or "profile_driven_candidate"), + "diagnosis": str(next_action.get("hypothesis") or "Profile-driven harness candidate."), + "config_patch": patch, + "expected_effects": [ + str(item) + for item in next_action.get("expected_effects", []) + if isinstance(item, str) + ], + "candidate_score": next_action.get("score"), + "bottleneck_hypotheses": experiment_plan.get("bottleneck_hypotheses", []), + } baseline = recent_diagnostics[0] if recent_diagnostics else {} topology_frontier = _topology_frontier_proposal( study, @@ -691,6 +908,526 @@ def _topology_frontier_proposal( } +def _experiment_plan( + study: StudySpec, + window_summary: dict[str, Any], + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + trial_profiles: list[dict[str, Any]], + bottleneck_hypotheses: list[dict[str, Any]], +) -> dict[str, Any]: + tested_signatures = { + _config_signature(item.get("config_patch") if isinstance(item, dict) else None) + for item in recent_diagnostics + } + tested_signatures.update(_state_tested_signatures(state)) + candidates = _candidate_actions( + study, + window_summary, + state, + recent_diagnostics, + trial_profiles, + bottleneck_hypotheses, + tested_signatures=tested_signatures, + ) + candidates.sort(key=lambda item: _as_float(item.get("score")), reverse=True) + next_action = candidates[0] if candidates else None + return { + "planner_version": "profile-driven-v1", + "bottleneck_hypotheses": bottleneck_hypotheses, + "candidate_actions": candidates[:8], + "next_action": next_action, + "stop_ready": next_action is None or _as_float(next_action.get("score")) < 0.35, + "stop_rationale": ( + "no untested high-value candidate remains" + if not candidates or _as_float(candidates[0].get("score")) < 0.35 + else "continue with the highest-scoring measured hypothesis" + ), + } + + +def _candidate_actions( + study: StudySpec, + window_summary: dict[str, Any], + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + trial_profiles: list[dict[str, Any]], + bottleneck_hypotheses: list[dict[str, Any]], + *, + tested_signatures: set[str], +) -> list[dict[str, Any]]: + if not recent_diagnostics: + return [] + anchor = _anchor_profile(study, state, recent_diagnostics, trial_profiles) + if anchor is None: + return [] + top_bottleneck = ( + str(bottleneck_hypotheses[0]["name"]) + if bottleneck_hypotheses + else str(anchor.get("probe_profile", {}).get("active_bottleneck") or "") + ) + candidates: list[dict[str, Any]] = [] + candidates.extend( + _topology_candidate_actions( + study, + anchor, + top_bottleneck, + bottleneck_hypotheses, + tested_signatures, + ) + ) + candidates.extend( + _runtime_candidate_actions( + study, + window_summary, + anchor, + top_bottleneck, + bottleneck_hypotheses, + tested_signatures, + ) + ) + return candidates + + +def _anchor_profile( + study: StudySpec, + state: StudyState, + recent_diagnostics: list[dict[str, Any]], + trial_profiles: list[dict[str, Any]], +) -> dict[str, Any] | None: + if state.best_trial_id: + for profile in reversed(trial_profiles): + if profile.get("trial_id") == state.best_trial_id: + return profile + for profile in reversed(trial_profiles): + if profile.get("status") == "completed": + return profile + return trial_profiles[-1] if trial_profiles else None + + +def _topology_candidate_actions( + study: StudySpec, + anchor: dict[str, Any], + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], + tested_signatures: set[str], +) -> list[dict[str, Any]]: + if not ({"tensor-parallel-size", "data-parallel-size"} & set(study.engine.tunable_flags)): + return [] + anchor_flags = _effective_flags_for_item(study, anchor) + current_tp = _parse_int_like(anchor_flags.get("tensor-parallel-size"), default=1) + current_dp = _parse_int_like(anchor_flags.get("data-parallel-size"), default=1) + current_ep = _parse_int_like(anchor_flags.get("expert-parallel-size"), default=1) + current_enable_ep = bool(anchor_flags.get("enable-expert-parallel", False)) + legal = _legal_topology_points( + study, + current_tp=current_tp, + current_dp=current_dp, + current_ep=current_ep, + current_enable_ep=current_enable_ep, + ) + actions: list[dict[str, Any]] = [] + for point in legal: + if point["tensor-parallel-size"] == current_tp and point["data-parallel-size"] == current_dp: + continue + patch = _topology_patch(study, point) + signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + if signature in tested_signatures: + continue + score, factors = _score_topology_candidate( + top_bottleneck, + bottleneck_hypotheses, + current_tp=current_tp, + current_dp=current_dp, + candidate_tp=point["tensor-parallel-size"], + candidate_dp=point["data-parallel-size"], + ) + if score <= 0: + continue + action_id = _topology_action_id(current_tp, current_dp, point) + actions.append( + { + "action_id": action_id, + "knob_family": "topology", + "score": round(score, 4), + "score_factors": factors, + "config_patch": {"env_patch": {}, "flag_patch": patch}, + "hypothesis": _topology_hypothesis( + top_bottleneck, + current_tp=current_tp, + current_dp=current_dp, + candidate_tp=point["tensor-parallel-size"], + candidate_dp=point["data-parallel-size"], + ), + "expected_effects": [ + "measure whether topology changes relieve the ranked bottleneck", + "compare request_rate_per_gpu under the configured SLO, not raw throughput alone", + "reject this hypothesis if latency improves but per-GPU throughput regresses materially", + ], + } + ) + return actions + + +def _runtime_candidate_actions( + study: StudySpec, + window_summary: dict[str, Any], + anchor: dict[str, Any], + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], + tested_signatures: set[str], +) -> list[dict[str, Any]]: + tunable = set(study.engine.tunable_flags) + anchor_flags = _effective_flags_for_item(study, anchor) + topology_patch = _preserve_topology_patch(study, anchor_flags) + actions: list[dict[str, Any]] = [] + + if "max-num-batched-tokens" in tunable: + current_mbt = _parse_int_like(anchor_flags.get("max-num-batched-tokens"), default=0) + mbt_targets: list[tuple[str, int]] = [] + if top_bottleneck == "ttft_prefill": + target = ( + _initial_mbt_from_window(window_summary) + if current_mbt <= 0 + else _next_mbt_step(current_mbt) + ) + if target is not None: + mbt_targets.append(("raise_mbt", target)) + elif top_bottleneck == "decode_tpot" and current_mbt > 8192: + mbt_targets.append(("lower_mbt", max(8192, current_mbt // 2))) + for action_id, target in mbt_targets: + patch = {**topology_patch, "max-num-batched-tokens": target} + signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + if signature in tested_signatures: + continue + relief = 0.24 if top_bottleneck == "ttft_prefill" else 0.14 + actions.append( + _runtime_action( + action_id=action_id, + knob_family="max-num-batched-tokens", + score=relief + _information_gain(bottleneck_hypotheses, "runtime"), + patch=patch, + hypothesis=( + "Adjust max-num-batched-tokens to test whether batching, not topology, " + "is limiting the active latency objective." + ), + expected_effects=[ + "change prefill/decode batching pressure on the incumbent topology", + "confirm if the latency knee moves without requiring another topology change", + ], + ) + ) + + if "max-num-seqs" in tunable: + current_mns = _parse_int_like(anchor_flags.get("max-num-seqs"), default=0) + mns_targets: list[tuple[str, int]] = [] + if top_bottleneck == "admission_or_queueing": + target = max(8, int(current_mns * 1.5)) if current_mns > 0 else 64 + mns_targets.append(("raise_max_num_seqs", _round_up_to_multiple(target, 8))) + elif top_bottleneck == "decode_tpot" and current_mns > 8: + mns_targets.append(("lower_max_num_seqs", max(8, current_mns // 2))) + for action_id, target in mns_targets: + patch = {**topology_patch, "max-num-seqs": target} + signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + if signature in tested_signatures: + continue + relief = 0.25 if top_bottleneck in {"decode_tpot", "admission_or_queueing"} else 0.08 + actions.append( + _runtime_action( + action_id=action_id, + knob_family="max-num-seqs", + score=relief + _information_gain(bottleneck_hypotheses, "runtime"), + patch=patch, + hypothesis=( + "Adjust max-num-seqs to test whether concurrency pressure is the " + "limiting factor under the configured SLO." + ), + expected_effects=[ + "change decode/admission concurrency on the incumbent topology", + "confirm if TPOT or queueing pressure is caused by sequence concurrency", + ], + ) + ) + + if "enable-chunked-prefill" in tunable and top_bottleneck == "ttft_prefill": + current = bool(anchor_flags.get("enable-chunked-prefill", False)) + if not current: + patch = {**topology_patch, "enable-chunked-prefill": True} + signature = _config_signature({"env_patch": {}, "flag_patch": patch}) + if signature not in tested_signatures: + actions.append( + _runtime_action( + action_id="enable_chunked_prefill", + knob_family="enable-chunked-prefill", + score=0.2 + _information_gain(bottleneck_hypotheses, "runtime"), + patch=patch, + hypothesis=( + "Enable chunked prefill to test whether long-prefill head-of-line " + "blocking is driving TTFT failures." + ), + expected_effects=[ + "reduce long-prefill interference for mixed-length chat windows", + "reject if chunking overhead worsens request_rate_per_gpu", + ], + ) + ) + return actions + + +def _runtime_action( + *, + action_id: str, + knob_family: str, + score: float, + patch: dict[str, Any], + hypothesis: str, + expected_effects: list[str], +) -> dict[str, Any]: + return { + "action_id": action_id, + "knob_family": knob_family, + "score": round(score, 4), + "score_factors": { + "expected_bottleneck_relief": round(max(score - 0.1, 0.0), 4), + "information_gain": 0.1, + "launch_safety": 0.05, + "regression_risk": 0.05, + }, + "config_patch": {"env_patch": {}, "flag_patch": patch}, + "hypothesis": hypothesis, + "expected_effects": expected_effects, + } + + +def _legal_topology_points( + study: StudySpec, + *, + current_tp: int, + current_dp: int, + current_ep: int, + current_enable_ep: bool, +) -> list[dict[str, Any]]: + constraints = study.engine.topology_constraints + tunable = set(study.engine.tunable_flags) + if constraints is not None and constraints.allowed_tensor_parallel_sizes: + tp_values = sorted(set(constraints.allowed_tensor_parallel_sizes)) + elif "tensor-parallel-size" in tunable: + tp_values = [value for value in [1, 2, 4, 8] if value <= study.hardware.gpu_count] + else: + tp_values = [current_tp] + + if constraints is not None and constraints.allowed_data_parallel_sizes: + dp_values = sorted(set(constraints.allowed_data_parallel_sizes)) + elif "data-parallel-size" in tunable: + dp_values = [value for value in [1, 2, 4, 8] if value <= study.hardware.gpu_count] + else: + dp_values = [current_dp] + + if constraints is not None and constraints.allowed_expert_parallel_sizes: + ep_values = sorted(set(constraints.allowed_expert_parallel_sizes)) + elif "expert-parallel-size" in tunable: + ep_values = sorted({1, current_ep}) + else: + ep_values = [current_ep] + + points: list[dict[str, Any]] = [] + for tp in tp_values: + for dp in dp_values: + tp_dp_product = tp * dp + if constraints is not None: + if ( + constraints.allowed_tp_dp_products + and tp_dp_product not in constraints.allowed_tp_dp_products + ): + continue + if ( + constraints.require_tp_dp_product_equals_gpu_count + and tp_dp_product != study.hardware.gpu_count + ): + continue + elif tp_dp_product > study.hardware.gpu_count: + continue + if constraints is not None and not constraints.allowed_tp_dp_products: + if tp_dp_product > study.hardware.gpu_count: + continue + for ep in ep_values: + enable_ep = current_enable_ep or ep > 1 + if constraints is not None: + if constraints.allowed_expert_parallel_sizes and ep not in constraints.allowed_expert_parallel_sizes: + continue + if constraints.require_ep_size_leq_tp_dp_product and ep > tp_dp_product: + continue + if constraints.require_ep_size_divides_tp_dp_product and tp_dp_product % ep != 0: + continue + if ( + constraints.require_enable_expert_parallel_when_ep_gt_one + and ep > 1 + and not enable_ep + ): + continue + points.append( + { + "tensor-parallel-size": tp, + "data-parallel-size": dp, + "expert-parallel-size": ep, + "enable-expert-parallel": enable_ep, + } + ) + return points + + +def _topology_patch(study: StudySpec, point: dict[str, Any]) -> dict[str, Any]: + patch: dict[str, Any] = {} + tunable = set(study.engine.tunable_flags) + base = _normalized_topology_flags(study.engine.base_flags) + for key in ( + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + "enable-expert-parallel", + ): + if key not in tunable: + continue + if key in point and point[key] != base.get(key): + patch[key] = point[key] + return patch + + +def _preserve_topology_patch(study: StudySpec, flags: dict[str, Any]) -> dict[str, Any]: + patch: dict[str, Any] = {} + tunable = set(study.engine.tunable_flags) + base = _normalized_topology_flags(study.engine.base_flags) + normalized = _normalized_topology_flags(flags) + for key in ( + "tensor-parallel-size", + "data-parallel-size", + "expert-parallel-size", + "enable-expert-parallel", + ): + if key not in tunable or key not in normalized: + continue + if normalized.get(key) != base.get(key): + patch[key] = normalized[key] + return patch + + +def _normalized_topology_flags(flags: dict[str, Any]) -> dict[str, Any]: + return { + "tensor-parallel-size": _parse_int_like( + flags.get("tensor-parallel-size"), + default=1, + ), + "data-parallel-size": _parse_int_like( + flags.get("data-parallel-size"), + default=1, + ), + "expert-parallel-size": _parse_int_like( + flags.get("expert-parallel-size"), + default=1, + ), + "enable-expert-parallel": bool(flags.get("enable-expert-parallel", False)), + } + + +def _score_topology_candidate( + top_bottleneck: str, + bottleneck_hypotheses: list[dict[str, Any]], + *, + current_tp: int, + current_dp: int, + candidate_tp: int, + candidate_dp: int, +) -> tuple[float, dict[str, float]]: + tp_delta = candidate_tp - current_tp + dp_delta = candidate_dp - current_dp + confidence = _hypothesis_confidence(bottleneck_hypotheses, top_bottleneck) + relief = 0.0 + if top_bottleneck == "ttft_prefill": + relief = 0.42 if tp_delta > 0 else 0.05 + elif top_bottleneck == "decode_tpot": + relief = 0.34 if tp_delta > 0 else 0.02 + elif top_bottleneck == "admission_or_queueing": + relief = 0.34 if dp_delta > 0 else 0.08 + else: + relief = 0.04 + info_gain = 0.2 if abs(tp_delta) + abs(dp_delta) > 0 else 0.0 + launch_safety = 0.08 if candidate_tp * candidate_dp <= max(current_tp * current_dp, 1) else 0.04 + distance = abs(_log2_ratio(candidate_tp, current_tp)) + abs(_log2_ratio(candidate_dp, current_dp)) + regression_risk = min(0.28, 0.06 * distance) + score = relief * max(confidence, 0.35) + info_gain + launch_safety - regression_risk + return score, { + "expected_bottleneck_relief": round(relief, 4), + "bottleneck_confidence": round(confidence, 4), + "information_gain": round(info_gain, 4), + "launch_safety": round(launch_safety, 4), + "regression_risk": round(regression_risk, 4), + } + + +def _information_gain(bottleneck_hypotheses: list[dict[str, Any]], family: str) -> float: + if not bottleneck_hypotheses: + return 0.08 + top_confidence = _as_float(bottleneck_hypotheses[0].get("confidence")) + uncertainty = max(0.0, 1.0 - top_confidence) + return 0.08 + min(0.12, uncertainty * 0.12) + + +def _hypothesis_confidence( + bottleneck_hypotheses: list[dict[str, Any]], + name: str, +) -> float: + for item in bottleneck_hypotheses: + if item.get("name") == name: + return _as_float(item.get("confidence")) + return 0.0 + + +def _topology_action_id( + current_tp: int, + current_dp: int, + point: dict[str, Any], +) -> str: + candidate_tp = int(point["tensor-parallel-size"]) + candidate_dp = int(point["data-parallel-size"]) + if candidate_tp > current_tp: + return "topology_frontier_probe_for_slo_pressure" + if candidate_dp > current_dp: + return "increase_data_parallel_probe" + if candidate_tp < current_tp: + return "decrease_tensor_parallel_probe" + return "redistribute_topology_probe" + + +def _topology_hypothesis( + top_bottleneck: str, + *, + current_tp: int, + current_dp: int, + candidate_tp: int, + candidate_dp: int, +) -> str: + return ( + f"Ranked bottleneck is {top_bottleneck}. Test topology " + f"TP={candidate_tp}, DP={candidate_dp} against incumbent TP={current_tp}, " + f"DP={current_dp}; this distinguishes compute-latency relief from " + "replica/admission effects under the configured SLO." + ) + + +def _log2_ratio(new: int, old: int) -> float: + if new <= 0 or old <= 0: + return 0.0 + ratio = new / old + steps = 0.0 + while ratio >= 2.0: + steps += 1.0 + ratio /= 2.0 + while ratio <= 0.5: + steps += 1.0 + ratio *= 2.0 + return steps + + def _topology_frontier_status( study: StudySpec, state: StudyState, diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 3ebac99..627033b 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -920,6 +920,170 @@ class CoreFlowTests(unittest.TestCase): "topology_frontier_probe_for_slo_pressure", ) + def test_profile_driven_planner_scores_unmeasured_tp_frontier(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + engine_overrides={ + "tunable_flags": [ + "tensor-parallel-size", + "max-num-batched-tokens", + "enable-chunked-prefill", + ], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_tp_dp_products": [1, 2, 4], + }, + }, + ) + result_path = tmp_path / "trial-0002.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.5, + "best_request_rate": 2.0, + "best_pass_rate": 0.96, + "probes": [ + { + "threshold": 0.75, + "feasible": False, + "payload": { + "request_count": 100, + "pass_rate": 0.6, + "request_rate": 3.0, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"ttft_ms>4000.0": 35} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + context = build_harness_context( + study=study, + window_summary={"prompt_tokens_p95": 7628, "prompt_tail_ratio_p95_p50": 3.8}, + state=StudyState( + study_id=study.study_id, + best_trial_id="trial-0002", + best_request_rate=2.0, + best_request_rate_per_gpu=1.0, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=0.5, + best_request_rate_per_gpu=0.5, + config_patch={"env_patch": {}, "flag_patch": {}}, + ), + TrialSummary( + trial_id="trial-0002", + status="completed", + best_request_rate=2.0, + best_request_rate_per_gpu=1.0, + result_path=str(result_path), + config_patch={ + "env_patch": {}, + "flag_patch": {"tensor-parallel-size": 2}, + }, + ), + ], + ), + ) + plan = context["experiment_plan"] + self.assertEqual(plan["planner_version"], "profile-driven-v1") + self.assertEqual(plan["next_action"]["knob_family"], "topology") + self.assertEqual( + plan["next_action"]["config_patch"]["flag_patch"], + {"tensor-parallel-size": 4}, + ) + self.assertIn("ttft_prefill", context["bottleneck_hypotheses"][0]["name"]) + self.assertFalse(context["harness_stop"]["should_stop"]) + + def test_profile_driven_planner_prefers_decode_concurrency_relief(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets( + tmp_path, + trace_overrides={"request_mode": "decode_only"}, + slo_overrides={ + "ttft_rule": None, + "tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20}, + }, + engine_overrides={ + "base_flags": { + "host": "127.0.0.1", + "port": 8000, + "tensor-parallel-size": 4, + "max-num-seqs": 64, + }, + "tunable_flags": ["tensor-parallel-size", "max-num-seqs"], + "topology_constraints": { + "allowed_tensor_parallel_sizes": [1, 2, 4], + "allowed_tp_dp_products": [1, 2, 4], + }, + }, + ) + result_path = tmp_path / "trial-0001.json" + result_path.write_text( + json.dumps( + { + "status": "completed", + "best_sampling_u": 0.25, + "best_request_rate": 1.0, + "best_pass_rate": 0.97, + "probes": [ + { + "threshold": 0.5, + "feasible": False, + "payload": { + "request_count": 100, + "pass_rate": 0.5, + "request_rate": 2.0, + "early_stop_reason": "slo_pass_rate_unrecoverable", + "latency_summary": { + "failed_reason_counts": {"tpot_ms>20.0": 50} + }, + }, + } + ], + } + ), + encoding="utf-8", + ) + study = load_study_spec(study_path) + context = build_harness_context( + study=study, + window_summary={}, + state=StudyState( + study_id=study.study_id, + best_trial_id="trial-0001", + best_request_rate=1.0, + best_request_rate_per_gpu=0.25, + trials=[ + TrialSummary( + trial_id="trial-0001", + status="completed", + best_request_rate=1.0, + best_request_rate_per_gpu=0.25, + result_path=str(result_path), + config_patch={"env_patch": {}, "flag_patch": {}}, + ) + ], + ), + ) + plan = context["experiment_plan"] + self.assertEqual(plan["next_action"]["knob_family"], "max-num-seqs") + self.assertEqual( + plan["next_action"]["config_patch"]["flag_patch"], + {"max-num-seqs": 32}, + ) + def test_harness_stop_blocked_until_slo_driven_topology_frontier_is_measured(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)