Add harness guided first topology probe
This commit is contained in:
@@ -6,7 +6,11 @@ import sys
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from .compare import run_compare
|
from .compare import run_compare
|
||||||
from .harness import build_harness_context, build_harness_stop_proposal
|
from .harness import (
|
||||||
|
build_harness_context,
|
||||||
|
build_harness_guided_proposal,
|
||||||
|
build_harness_stop_proposal,
|
||||||
|
)
|
||||||
from .job import append_job, build_trial_job
|
from .job import append_job, build_trial_job
|
||||||
from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text
|
from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text
|
||||||
from .spec import Proposal, SpecError, load_study_spec, to_jsonable
|
from .spec import Proposal, SpecError, load_study_spec, to_jsonable
|
||||||
@@ -179,13 +183,25 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
|
|||||||
proposal_text = json.dumps(to_jsonable(stop_proposal), ensure_ascii=False)
|
proposal_text = json.dumps(to_jsonable(stop_proposal), ensure_ascii=False)
|
||||||
proposal_name = f"harness-stop-{state.next_trial_index:04d}"
|
proposal_name = f"harness-stop-{state.next_trial_index:04d}"
|
||||||
else:
|
else:
|
||||||
if study.llm.endpoint is None:
|
guided_proposal = (
|
||||||
raise SpecError(
|
build_harness_guided_proposal(harness_context)
|
||||||
"No proposal files provided, study.llm.endpoint is not configured, "
|
if harness_context is not None
|
||||||
"and the harness stop guard did not fire."
|
else None
|
||||||
|
)
|
||||||
|
if guided_proposal is not None:
|
||||||
|
proposal_text = json.dumps(
|
||||||
|
to_jsonable(guided_proposal),
|
||||||
|
ensure_ascii=False,
|
||||||
)
|
)
|
||||||
proposal_text = call_llm_for_proposal(policy=study.llm, prompt=prompt)
|
proposal_name = f"harness-proposal-{state.next_trial_index:04d}"
|
||||||
proposal_name = f"proposal-{state.next_trial_index:04d}"
|
else:
|
||||||
|
if study.llm.endpoint is None:
|
||||||
|
raise SpecError(
|
||||||
|
"No proposal files provided, study.llm.endpoint is not configured, "
|
||||||
|
"and the harness stop guard did not fire."
|
||||||
|
)
|
||||||
|
proposal_text = call_llm_for_proposal(policy=study.llm, prompt=prompt)
|
||||||
|
proposal_name = f"proposal-{state.next_trial_index:04d}"
|
||||||
raw_proposal_path = store.study_root(study.study_id) / "proposals" / f"{proposal_name}.raw.txt"
|
raw_proposal_path = store.study_root(study.study_id) / "proposals" / f"{proposal_name}.raw.txt"
|
||||||
raw_proposal_path.write_text(proposal_text, encoding="utf-8")
|
raw_proposal_path.write_text(proposal_text, encoding="utf-8")
|
||||||
proposal = parse_proposal_text(proposal_text, study)
|
proposal = parse_proposal_text(proposal_text, study)
|
||||||
@@ -212,10 +228,14 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
|
|||||||
result = run_trial(trial_spec_path)
|
result = run_trial(trial_spec_path)
|
||||||
state = store.ingest_trial_results(study.study_id)
|
state = store.ingest_trial_results(study.study_id)
|
||||||
executed.append(
|
executed.append(
|
||||||
{
|
{
|
||||||
"trial_id": trial.trial_id,
|
"trial_id": trial.trial_id,
|
||||||
"proposal_name": proposal_name,
|
"proposal_name": proposal_name,
|
||||||
"proposal_source": str(proposal_source) if proposal_source else "llm",
|
"proposal_source": (
|
||||||
|
"harness"
|
||||||
|
if proposal_name.startswith("harness-proposal-")
|
||||||
|
else str(proposal_source) if proposal_source else "llm"
|
||||||
|
),
|
||||||
"best_sampling_u": result.get("best_sampling_u"),
|
"best_sampling_u": result.get("best_sampling_u"),
|
||||||
"best_request_rate": result.get("best_request_rate"),
|
"best_request_rate": result.get("best_request_rate"),
|
||||||
"best_pass_rate": result.get("best_pass_rate"),
|
"best_pass_rate": result.get("best_pass_rate"),
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ def build_harness_context(
|
|||||||
"recent_trial_diagnostics": recent_diagnostics,
|
"recent_trial_diagnostics": recent_diagnostics,
|
||||||
"convergence_guard": _convergence_guard(state, recent_diagnostics),
|
"convergence_guard": _convergence_guard(state, recent_diagnostics),
|
||||||
"harness_stop": _harness_stop_decision(study, state, recent_diagnostics),
|
"harness_stop": _harness_stop_decision(study, state, recent_diagnostics),
|
||||||
|
"harness_proposal": _harness_proposal_decision(study, state, recent_diagnostics),
|
||||||
"knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics),
|
"knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics),
|
||||||
"proposal_rules": _proposal_rules(),
|
"proposal_rules": _proposal_rules(),
|
||||||
}
|
}
|
||||||
@@ -74,6 +75,39 @@ def build_harness_stop_proposal(context: dict[str, Any]) -> Proposal | None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def build_harness_guided_proposal(context: dict[str, Any]) -> Proposal | None:
|
||||||
|
proposal = context.get("harness_proposal")
|
||||||
|
if not isinstance(proposal, dict) or not proposal.get("should_propose"):
|
||||||
|
return None
|
||||||
|
patch = proposal.get("config_patch")
|
||||||
|
if not isinstance(patch, dict):
|
||||||
|
return None
|
||||||
|
flag_patch = patch.get("flag_patch")
|
||||||
|
env_patch = patch.get("env_patch")
|
||||||
|
if not isinstance(flag_patch, dict) or not isinstance(env_patch, dict):
|
||||||
|
return None
|
||||||
|
reason = str(proposal.get("reason") or "harness_guided_probe")
|
||||||
|
diagnosis = str(proposal.get("diagnosis") or reason)
|
||||||
|
return Proposal(
|
||||||
|
observation=(
|
||||||
|
"Harness selected a deterministic first validation probe before "
|
||||||
|
f"requesting an LLM proposal: {reason}."
|
||||||
|
),
|
||||||
|
diagnosis=diagnosis,
|
||||||
|
config_patch=ConfigPatch(env_patch=dict(env_patch), flag_patch=dict(flag_patch)),
|
||||||
|
expected_effects=[
|
||||||
|
str(item)
|
||||||
|
for item in proposal.get("expected_effects", [])
|
||||||
|
if isinstance(item, str)
|
||||||
|
],
|
||||||
|
why_not_previous_failures=(
|
||||||
|
"The proposal is based on the first completed baseline trial and does not "
|
||||||
|
"repeat a prior failed configuration."
|
||||||
|
),
|
||||||
|
should_stop=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def render_harness_context(context: dict[str, Any]) -> str:
|
def render_harness_context(context: dict[str, Any]) -> str:
|
||||||
return json.dumps(context, ensure_ascii=False, indent=2)
|
return json.dumps(context, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
@@ -511,6 +545,106 @@ def _harness_stop_decision(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _harness_proposal_decision(
|
||||||
|
study: StudySpec,
|
||||||
|
state: StudyState,
|
||||||
|
recent_diagnostics: list[dict[str, Any]],
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
default = {
|
||||||
|
"should_propose": False,
|
||||||
|
"reason": "no_deterministic_harness_proposal",
|
||||||
|
"diagnosis": "Defer to the LLM proposal policy.",
|
||||||
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
||||||
|
"expected_effects": [],
|
||||||
|
}
|
||||||
|
if len(state.trials) != 1 or len(recent_diagnostics) != 1:
|
||||||
|
return default
|
||||||
|
baseline = recent_diagnostics[0]
|
||||||
|
if baseline.get("status") != "completed":
|
||||||
|
return default
|
||||||
|
if not isinstance(baseline.get("best_request_rate_per_gpu"), (int, float)):
|
||||||
|
return default
|
||||||
|
active_bottleneck = str(baseline.get("active_bottleneck") or "")
|
||||||
|
if active_bottleneck not in {"ttft_prefill", "decode_tpot"}:
|
||||||
|
return {
|
||||||
|
**default,
|
||||||
|
"reason": "baseline_bottleneck_does_not_require_tp_first_probe",
|
||||||
|
"diagnosis": f"Baseline bottleneck is {active_bottleneck or 'unknown'}.",
|
||||||
|
}
|
||||||
|
if "tensor-parallel-size" not in set(study.engine.tunable_flags):
|
||||||
|
return {
|
||||||
|
**default,
|
||||||
|
"reason": "tensor_parallel_size_not_tunable",
|
||||||
|
}
|
||||||
|
failed_signatures = {
|
||||||
|
_config_signature(item.get("config_patch") if isinstance(item, dict) else None)
|
||||||
|
for item in recent_diagnostics
|
||||||
|
if item.get("failure_stage") == "engine_launch"
|
||||||
|
}
|
||||||
|
base_flags = dict(study.engine.base_flags)
|
||||||
|
baseline_patch = baseline.get("config_patch")
|
||||||
|
if isinstance(baseline_patch, dict) and isinstance(baseline_patch.get("flag_patch"), dict):
|
||||||
|
base_flags.update(baseline_patch["flag_patch"])
|
||||||
|
current_tp = _parse_int_like(base_flags.get("tensor-parallel-size", 1), default=1)
|
||||||
|
current_dp = _parse_int_like(base_flags.get("data-parallel-size", 1), default=1)
|
||||||
|
next_tp = _next_allowed_tp(study, current_tp=current_tp, current_dp=current_dp)
|
||||||
|
if next_tp is None:
|
||||||
|
return {
|
||||||
|
**default,
|
||||||
|
"reason": "no_legal_adjacent_tensor_parallel_probe",
|
||||||
|
}
|
||||||
|
flag_patch: dict[str, Any] = {"tensor-parallel-size": next_tp}
|
||||||
|
signature = _config_signature({"env_patch": {}, "flag_patch": flag_patch})
|
||||||
|
if signature in failed_signatures:
|
||||||
|
return {
|
||||||
|
**default,
|
||||||
|
"reason": "adjacent_tensor_parallel_probe_previously_failed",
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"should_propose": True,
|
||||||
|
"reason": "first_adjacent_tensor_parallel_probe_for_latency_bottleneck",
|
||||||
|
"diagnosis": (
|
||||||
|
f"Baseline high-load probes indicate {active_bottleneck}; the generic "
|
||||||
|
"topology harness validates the adjacent legal TP increase before "
|
||||||
|
"runtime-only or DP/EP probes."
|
||||||
|
),
|
||||||
|
"config_patch": {"env_patch": {}, "flag_patch": flag_patch},
|
||||||
|
"expected_effects": [
|
||||||
|
"reduce per-request latency pressure at higher offered load",
|
||||||
|
"validate the nearest TP topology before broader runtime search",
|
||||||
|
],
|
||||||
|
"baseline_trial_id": baseline.get("trial_id"),
|
||||||
|
"active_bottleneck": active_bottleneck,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _next_allowed_tp(study: StudySpec, *, current_tp: int, current_dp: int) -> int | None:
|
||||||
|
constraints = study.engine.topology_constraints
|
||||||
|
if constraints is not None and constraints.allowed_tensor_parallel_sizes:
|
||||||
|
candidates = sorted({int(item) for item in constraints.allowed_tensor_parallel_sizes})
|
||||||
|
else:
|
||||||
|
candidates = [1, 2, 4, 8]
|
||||||
|
for candidate in candidates:
|
||||||
|
if candidate <= current_tp:
|
||||||
|
continue
|
||||||
|
tp_dp_product = candidate * current_dp
|
||||||
|
if tp_dp_product > study.hardware.gpu_count:
|
||||||
|
continue
|
||||||
|
if constraints is not None:
|
||||||
|
if (
|
||||||
|
constraints.allowed_tp_dp_products
|
||||||
|
and tp_dp_product not in constraints.allowed_tp_dp_products
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
if (
|
||||||
|
constraints.require_tp_dp_product_equals_gpu_count
|
||||||
|
and tp_dp_product != study.hardware.gpu_count
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
return candidate
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _search_high_saturation_guard(
|
def _search_high_saturation_guard(
|
||||||
study: StudySpec,
|
study: StudySpec,
|
||||||
state: StudyState,
|
state: StudyState,
|
||||||
@@ -971,3 +1105,32 @@ def _optional_float(value: Any) -> float | None:
|
|||||||
if isinstance(value, (int, float)):
|
if isinstance(value, (int, float)):
|
||||||
return float(value)
|
return float(value)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_int_like(value: Any, *, default: int) -> int:
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return default
|
||||||
|
if isinstance(value, int):
|
||||||
|
return value
|
||||||
|
if isinstance(value, float) and value.is_integer():
|
||||||
|
return int(value)
|
||||||
|
if isinstance(value, str) and value.strip():
|
||||||
|
try:
|
||||||
|
return int(value.strip())
|
||||||
|
except ValueError:
|
||||||
|
return default
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
|
def _config_signature(config_patch: Any) -> str:
|
||||||
|
if not isinstance(config_patch, dict):
|
||||||
|
config_patch = {}
|
||||||
|
env_patch = config_patch.get("env_patch")
|
||||||
|
flag_patch = config_patch.get("flag_patch")
|
||||||
|
payload = {
|
||||||
|
"env_patch": env_patch if isinstance(env_patch, dict) else {},
|
||||||
|
"flag_patch": flag_patch if isinstance(flag_patch, dict) else {},
|
||||||
|
}
|
||||||
|
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||||||
|
|||||||
@@ -13,7 +13,11 @@ from aituner.compare import load_compare_spec, run_compare
|
|||||||
from aituner.engine import build_launch_recipe
|
from aituner.engine import build_launch_recipe
|
||||||
from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy
|
from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy
|
||||||
from aituner.job import append_job, build_trial_job
|
from aituner.job import append_job, build_trial_job
|
||||||
from aituner.harness import build_harness_context, build_harness_stop_proposal
|
from aituner.harness import (
|
||||||
|
build_harness_context,
|
||||||
|
build_harness_guided_proposal,
|
||||||
|
build_harness_stop_proposal,
|
||||||
|
)
|
||||||
from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal
|
from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal
|
||||||
from aituner.search import ThresholdProbe, binary_search_max_feasible
|
from aituner.search import ThresholdProbe, binary_search_max_feasible
|
||||||
from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations
|
from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations
|
||||||
@@ -596,6 +600,75 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
self.assertIsNotNone(proposal)
|
self.assertIsNotNone(proposal)
|
||||||
self.assertTrue(proposal.should_stop)
|
self.assertTrue(proposal.should_stop)
|
||||||
|
|
||||||
|
def test_harness_guided_first_tp_probe_for_latency_bottleneck(self) -> None:
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
tmp_path = Path(tmp)
|
||||||
|
study_path = _write_study_assets(
|
||||||
|
tmp_path,
|
||||||
|
engine_overrides={
|
||||||
|
"tunable_flags": ["tensor-parallel-size", "data-parallel-size"],
|
||||||
|
"topology_constraints": {
|
||||||
|
"allowed_tensor_parallel_sizes": [1, 2, 4],
|
||||||
|
"allowed_data_parallel_sizes": [1, 2],
|
||||||
|
"allowed_tp_dp_products": [1, 2, 4],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
study = load_study_spec(study_path)
|
||||||
|
result_path = tmp_path / "trial-0001.json"
|
||||||
|
result_path.write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"status": "completed",
|
||||||
|
"best_sampling_u": 0.25,
|
||||||
|
"best_request_rate": 2.0,
|
||||||
|
"best_pass_rate": 1.0,
|
||||||
|
"probes": [
|
||||||
|
{
|
||||||
|
"threshold": 0.5,
|
||||||
|
"feasible": False,
|
||||||
|
"payload": {
|
||||||
|
"request_count": 100,
|
||||||
|
"pass_rate": 0.6,
|
||||||
|
"request_rate": 4.0,
|
||||||
|
"early_stopped": True,
|
||||||
|
"early_stop_reason": "slo_pass_rate_unrecoverable",
|
||||||
|
"latency_summary": {
|
||||||
|
"failed_reason_counts": {"tpot_ms>50.0": 40},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
state = StudyState(
|
||||||
|
study_id=study.study_id,
|
||||||
|
best_trial_id="trial-0001",
|
||||||
|
best_request_rate=2.0,
|
||||||
|
best_request_rate_per_gpu=2.0,
|
||||||
|
trials=[
|
||||||
|
TrialSummary(
|
||||||
|
trial_id="trial-0001",
|
||||||
|
status="completed",
|
||||||
|
best_request_rate=2.0,
|
||||||
|
best_request_rate_per_gpu=2.0,
|
||||||
|
result_path=str(result_path),
|
||||||
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
context = build_harness_context(
|
||||||
|
study=study,
|
||||||
|
window_summary={"prompt_tokens_p95": 2048},
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
proposal = build_harness_guided_proposal(context)
|
||||||
|
self.assertIsNotNone(proposal)
|
||||||
|
self.assertEqual(proposal.config_patch.flag_patch, {"tensor-parallel-size": 2})
|
||||||
|
self.assertFalse(proposal.should_stop)
|
||||||
|
|
||||||
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
|
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
tmp_path = Path(tmp)
|
tmp_path = Path(tmp)
|
||||||
|
|||||||
Reference in New Issue
Block a user