Add topology-aware tuning constraints

This commit is contained in:
2026-04-09 21:07:51 +08:00
parent 7371d6635c
commit ef78fe7eb5
6 changed files with 506 additions and 2 deletions

View File

@@ -141,11 +141,24 @@
"CUDA_DEVICE_MAX_CONNECTIONS" "CUDA_DEVICE_MAX_CONNECTIONS"
], ],
"tunable_flags": [ "tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
"gpu-memory-utilization", "gpu-memory-utilization",
"max-num-batched-tokens", "max-num-batched-tokens",
"max-num-seqs", "max-num-seqs",
"block-size" "block-size"
], ],
"topology_constraints": {
"require_tp_dp_product_equals_gpu_count": true,
"require_ep_size_leq_tp_dp_product": true,
"require_ep_size_divides_tp_dp_product": true,
"require_enable_expert_parallel_when_ep_gt_one": true,
"validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter": true,
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1, 2, 4, 8]
},
"python_executable": "python3" "python_executable": "python3"
}, },
"trace": { "trace": {

View File

@@ -8,6 +8,96 @@ from .http_client import chat_completion, stream_text_completion
from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState
def _parse_bool_like(value: Any, *, context: str) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise SpecError(f"{context} must be a boolean-like value.")
def _parse_int_like(value: Any, *, context: str) -> int:
if isinstance(value, bool):
raise SpecError(f"{context} must be an integer.")
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str):
stripped = value.strip()
if stripped:
try:
return int(stripped)
except ValueError as exc:
raise SpecError(f"{context} must be an integer.") from exc
raise SpecError(f"{context} must be an integer.")
def _effective_engine_envs(study: StudySpec, proposal: Proposal | None = None) -> dict[str, Any]:
envs = dict(study.engine.base_envs)
if proposal is not None:
envs.update(proposal.config_patch.env_patch)
return envs
def _effective_engine_flags(study: StudySpec, proposal: Proposal | None = None) -> dict[str, Any]:
flags = dict(study.engine.base_flags)
if proposal is not None:
flags.update(proposal.config_patch.flag_patch)
return flags
def _effective_topology(study: StudySpec, proposal: Proposal | None = None) -> dict[str, Any]:
envs = _effective_engine_envs(study, proposal)
flags = _effective_engine_flags(study, proposal)
tp = _parse_int_like(flags.get("tensor-parallel-size", 1), context="tensor-parallel-size")
dp = _parse_int_like(flags.get("data-parallel-size", 1), context="data-parallel-size")
raw_enable_ep = flags.get("enable-expert-parallel", False)
enable_ep = _parse_bool_like(raw_enable_ep, context="enable-expert-parallel")
raw_ep = flags.get("expert-parallel-size")
ep = _parse_int_like(raw_ep, context="expert-parallel-size") if raw_ep is not None else None
effective_ep = ep if ep is not None else (tp * dp if enable_ep else 1)
return {
"tensor_parallel_size": tp,
"data_parallel_size": dp,
"tp_dp_product": tp * dp,
"enable_expert_parallel": enable_ep,
"expert_parallel_size": ep,
"effective_expert_parallel_size": effective_ep,
"vllm_tpep_reduce_scatter": _parse_bool_like(
envs.get("VLLM_TPEP_REDUCE_SCATTER", "0"),
context="VLLM_TPEP_REDUCE_SCATTER",
),
}
def _launch_failure_history(state: StudyState) -> list[dict[str, Any]]:
failures: list[dict[str, Any]] = []
for trial in state.trials:
if trial.failure_stage != "engine_launch" and "engine_process_exited_before_ready" not in (
trial.failure_reason or ""
):
continue
config_patch = trial.config_patch or {}
env_patch = config_patch.get("env_patch", {}) if isinstance(config_patch, dict) else {}
flag_patch = config_patch.get("flag_patch", {}) if isinstance(config_patch, dict) else {}
failures.append(
{
"trial_id": trial.trial_id,
"failure_reason": trial.failure_reason,
"failure_stage": trial.failure_stage or "engine_launch",
"implicated_env_keys": sorted(env_patch),
"implicated_flag_keys": sorted(flag_patch),
"config_patch": config_patch,
}
)
return failures
def build_prompt( def build_prompt(
*, *,
study: StudySpec, study: StudySpec,
@@ -36,8 +126,10 @@ def build_prompt(
"diagnosis": trial.diagnosis, "diagnosis": trial.diagnosis,
"config_patch": trial.config_patch, "config_patch": trial.config_patch,
"failure_reason": trial.failure_reason, "failure_reason": trial.failure_reason,
"failure_stage": trial.failure_stage,
} }
) )
launch_failures = _launch_failure_history(state)
sections = [ sections = [
"You are tuning an OpenAI-compatible serving engine.", "You are tuning an OpenAI-compatible serving engine.",
"Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures.", "Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures.",
@@ -46,6 +138,7 @@ def build_prompt(
"Only use allowed tunable env keys and allowed tunable flag keys.", "Only use allowed tunable env keys and allowed tunable flag keys.",
"Do not wrap the JSON in markdown fences or any extra text.", "Do not wrap the JSON in markdown fences or any extra text.",
"Do not repeat a config that previously failed to launch unless the new patch explicitly removes the failing knob.", "Do not repeat a config that previously failed to launch unless the new patch explicitly removes the failing knob.",
"Treat previous engine launch failures as hard negative evidence. If you touch TP/DP/EP, keep the proposal inside the topology constraints exactly.",
"", "",
"Study stack:", "Study stack:",
json.dumps( json.dumps(
@@ -83,6 +176,12 @@ def build_prompt(
"base_envs": study.engine.base_envs, "base_envs": study.engine.base_envs,
"allowed_flag_keys": study.engine.tunable_flags, "allowed_flag_keys": study.engine.tunable_flags,
"allowed_env_keys": study.engine.tunable_envs, "allowed_env_keys": study.engine.tunable_envs,
"topology_constraints": (
study.engine.topology_constraints.__dict__
if study.engine.topology_constraints is not None
else None
),
"effective_topology": _effective_topology(study),
}, },
}, },
ensure_ascii=False, ensure_ascii=False,
@@ -111,6 +210,9 @@ def build_prompt(
"Trial history:", "Trial history:",
json.dumps(history, ensure_ascii=False, indent=2), json.dumps(history, ensure_ascii=False, indent=2),
"", "",
"Known launch failures:",
json.dumps(launch_failures, ensure_ascii=False, indent=2),
"",
"The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.", "The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.",
"The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.", "The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.",
"The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.", "The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.",
@@ -136,6 +238,101 @@ def validate_proposal(proposal: Proposal, study: StudySpec) -> Proposal:
raise SpecError(f"Proposal uses unsupported env keys: {', '.join(unknown_envs)}") raise SpecError(f"Proposal uses unsupported env keys: {', '.join(unknown_envs)}")
if unknown_flags: if unknown_flags:
raise SpecError(f"Proposal uses unsupported flag keys: {', '.join(unknown_flags)}") raise SpecError(f"Proposal uses unsupported flag keys: {', '.join(unknown_flags)}")
topology = _effective_topology(study, proposal)
tp = topology["tensor_parallel_size"]
dp = topology["data_parallel_size"]
effective_ep = topology["effective_expert_parallel_size"]
if tp < 1 or dp < 1:
raise SpecError("Proposal violates topology constraints: TP/DP must be >= 1.")
constraints = study.engine.topology_constraints
if constraints is None:
return proposal
if (
constraints.allowed_tensor_parallel_sizes
and tp not in constraints.allowed_tensor_parallel_sizes
):
raise SpecError(
"Proposal violates topology constraints: "
f"tensor-parallel-size={tp} not in {constraints.allowed_tensor_parallel_sizes}."
)
if constraints.allowed_data_parallel_sizes and dp not in constraints.allowed_data_parallel_sizes:
raise SpecError(
"Proposal violates topology constraints: "
f"data-parallel-size={dp} not in {constraints.allowed_data_parallel_sizes}."
)
if (
constraints.allowed_expert_parallel_sizes
and effective_ep not in constraints.allowed_expert_parallel_sizes
):
raise SpecError(
"Proposal violates topology constraints: "
f"expert-parallel-size={effective_ep} not in {constraints.allowed_expert_parallel_sizes}."
)
tp_dp_product = topology["tp_dp_product"]
if tp_dp_product > study.hardware.gpu_count:
raise SpecError(
"Proposal violates topology constraints: "
f"tensor-parallel-size * data-parallel-size = {tp_dp_product} exceeds "
f"hardware.gpu_count={study.hardware.gpu_count}."
)
if (
constraints.require_tp_dp_product_equals_gpu_count
and tp_dp_product != study.hardware.gpu_count
):
raise SpecError(
"Proposal violates topology constraints: "
f"tensor-parallel-size * data-parallel-size must equal hardware.gpu_count="
f"{study.hardware.gpu_count}, got {tp_dp_product}."
)
if (
constraints.require_enable_expert_parallel_when_ep_gt_one
and effective_ep > 1
and not topology["enable_expert_parallel"]
):
raise SpecError(
"Proposal violates topology constraints: expert-parallel-size > 1 requires "
"enable-expert-parallel=true."
)
if (
constraints.require_ep_size_leq_tp_dp_product
and effective_ep > tp_dp_product
):
raise SpecError(
"Proposal violates topology constraints: "
f"expert-parallel-size={effective_ep} must be <= tensor-parallel-size * "
f"data-parallel-size={tp_dp_product}."
)
if (
constraints.require_ep_size_divides_tp_dp_product
and tp_dp_product % effective_ep != 0
):
raise SpecError(
"Proposal violates topology constraints: "
f"expert-parallel-size={effective_ep} must divide tensor-parallel-size * "
f"data-parallel-size={tp_dp_product}."
)
if (
constraints.validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter
and topology["vllm_tpep_reduce_scatter"]
and topology["enable_expert_parallel"]
and effective_ep > 1
and tp > 1
):
flags = _effective_engine_flags(study, proposal)
graph_sizes = flags.get("cuda-graph-sizes")
if isinstance(graph_sizes, list):
invalid_sizes = [
_parse_int_like(item, context="cuda-graph-sizes")
for item in graph_sizes
if _parse_int_like(item, context="cuda-graph-sizes") % tp != 0
]
if invalid_sizes:
raise SpecError(
"Proposal violates topology constraints: "
f"cuda-graph-sizes must be divisible by tensor-parallel-size={tp} "
f"when VLLM_TPEP_REDUCE_SCATTER=1 and expert parallel is enabled. "
f"First invalid sizes: {invalid_sizes[:8]}"
)
return proposal return proposal

View File

@@ -63,6 +63,17 @@ def _coerce_str_list(value: Any, *, context: str) -> list[str]:
return result return result
def _coerce_int_list(value: Any, *, context: str) -> list[int]:
if value is None:
return []
if not isinstance(value, list):
raise SpecError(f"{context} must be a list.")
result: list[int] = []
for item in value:
result.append(_require_int(item, context=context))
return result
def _resolve_codex_endpoint() -> tuple[str, str, str | None]: def _resolve_codex_endpoint() -> tuple[str, str, str | None]:
override = os.environ.get("AITUNER_CODEX_BASE_URL", "").strip() override = os.environ.get("AITUNER_CODEX_BASE_URL", "").strip()
if override: if override:
@@ -158,6 +169,7 @@ class EngineLaunchSpec:
base_flags: dict[str, Any] base_flags: dict[str, Any]
tunable_envs: list[str] tunable_envs: list[str]
tunable_flags: list[str] tunable_flags: list[str]
topology_constraints: "TopologyConstraintSpec | None" = None
python_executable: str = "python3" python_executable: str = "python3"
@property @property
@@ -192,10 +204,72 @@ class EngineLaunchSpec:
tunable_flags=_coerce_str_list( tunable_flags=_coerce_str_list(
data.get("tunable_flags"), context="engine.tunable_flags" data.get("tunable_flags"), context="engine.tunable_flags"
), ),
topology_constraints=(
TopologyConstraintSpec.from_dict(
_require_mapping(
data.get("topology_constraints"),
context="engine.topology_constraints",
)
)
if data.get("topology_constraints") is not None
else None
),
python_executable=str(data.get("python_executable") or "python3").strip(), python_executable=str(data.get("python_executable") or "python3").strip(),
) )
@dataclass(frozen=True)
class TopologyConstraintSpec:
require_tp_dp_product_equals_gpu_count: bool = False
require_ep_size_leq_tp_dp_product: bool = False
require_ep_size_divides_tp_dp_product: bool = False
require_enable_expert_parallel_when_ep_gt_one: bool = True
validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter: bool = True
allowed_tensor_parallel_sizes: list[int] = field(default_factory=list)
allowed_data_parallel_sizes: list[int] = field(default_factory=list)
allowed_expert_parallel_sizes: list[int] = field(default_factory=list)
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> "TopologyConstraintSpec":
return cls(
require_tp_dp_product_equals_gpu_count=_require_bool(
data.get("require_tp_dp_product_equals_gpu_count", False),
context="engine.topology_constraints.require_tp_dp_product_equals_gpu_count",
),
require_ep_size_leq_tp_dp_product=_require_bool(
data.get("require_ep_size_leq_tp_dp_product", False),
context="engine.topology_constraints.require_ep_size_leq_tp_dp_product",
),
require_ep_size_divides_tp_dp_product=_require_bool(
data.get("require_ep_size_divides_tp_dp_product", False),
context="engine.topology_constraints.require_ep_size_divides_tp_dp_product",
),
require_enable_expert_parallel_when_ep_gt_one=_require_bool(
data.get("require_enable_expert_parallel_when_ep_gt_one", True),
context="engine.topology_constraints.require_enable_expert_parallel_when_ep_gt_one",
),
validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter=_require_bool(
data.get(
"validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter",
True,
),
context="engine.topology_constraints.validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter",
),
allowed_tensor_parallel_sizes=_coerce_int_list(
data.get("allowed_tensor_parallel_sizes"),
context="engine.topology_constraints.allowed_tensor_parallel_sizes",
),
allowed_data_parallel_sizes=_coerce_int_list(
data.get("allowed_data_parallel_sizes"),
context="engine.topology_constraints.allowed_data_parallel_sizes",
),
allowed_expert_parallel_sizes=_coerce_int_list(
data.get("allowed_expert_parallel_sizes"),
context="engine.topology_constraints.allowed_expert_parallel_sizes",
),
)
@dataclass(frozen=True) @dataclass(frozen=True)
class InputLengthFilterSpec: class InputLengthFilterSpec:
min_input_tokens: int | None = None min_input_tokens: int | None = None
@@ -601,6 +675,7 @@ class TrialSummary:
diagnosis: str = "" diagnosis: str = ""
config_patch: dict[str, Any] | None = None config_patch: dict[str, Any] | None = None
failure_reason: str = "" failure_reason: str = ""
failure_stage: str = ""
@dataclass @dataclass

View File

@@ -120,6 +120,7 @@ class StudyStore:
summary.best_pass_rate = payload.get("best_pass_rate") summary.best_pass_rate = payload.get("best_pass_rate")
summary.result_path = str(result_path) summary.result_path = str(result_path)
summary.failure_reason = str(payload.get("failure_reason") or "").strip() summary.failure_reason = str(payload.get("failure_reason") or "").strip()
summary.failure_stage = str(payload.get("failure_stage") or "").strip()
if ( if (
isinstance(summary.best_request_rate, (int, float)) isinstance(summary.best_request_rate, (int, float))
and (best_rate is None or summary.best_request_rate > best_rate) and (best_rate is None or summary.best_request_rate > best_rate)

View File

@@ -17,7 +17,7 @@ from .engine import build_launch_recipe
from .http_client import HttpClientError, stream_chat_completion, wait_for_server from .http_client import HttpClientError, stream_chat_completion, wait_for_server
from .search import ThresholdProbe, binary_search_max_feasible from .search import ThresholdProbe, binary_search_max_feasible
from .slo import RequestOutcome, evaluate_request, summarize_evaluations from .slo import RequestOutcome, evaluate_request, summarize_evaluations
from .spec import ConfigPatch, SamplingSearchSpec, TrialSpec, load_study_spec from .spec import ConfigPatch, SamplingSearchSpec, TrialSpec, load_study_spec, to_jsonable
from .trace import TraceRequest, load_trace_requests, select_requests_for_threshold from .trace import TraceRequest, load_trace_requests, select_requests_for_threshold
@@ -321,6 +321,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
start_new_session=True, start_new_session=True,
) )
probe_history: list[dict[str, Any]] = [] probe_history: list[dict[str, Any]] = []
failure_stage = "engine_launch"
try: try:
_wait_for_server_or_exit( _wait_for_server_or_exit(
process, process,
@@ -328,6 +329,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
healthcheck_path=recipe.healthcheck_path, healthcheck_path=recipe.healthcheck_path,
ready_timeout_s=recipe.ready_timeout_s, ready_timeout_s=recipe.ready_timeout_s,
) )
failure_stage = "probe_search"
def evaluator(threshold: float) -> ThresholdProbe[ProbePayload]: def evaluator(threshold: float) -> ThresholdProbe[ProbePayload]:
selected = select_requests_for_threshold(requests, threshold=threshold) selected = select_requests_for_threshold(requests, threshold=threshold)
@@ -404,6 +406,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
"study_id": trial.study_id, "study_id": trial.study_id,
"trial_id": trial.trial_id, "trial_id": trial.trial_id,
"status": "completed", "status": "completed",
"config_patch": to_jsonable(trial.config_patch),
"best_sampling_u": search.best_threshold if best is not None else None, "best_sampling_u": search.best_threshold if best is not None else None,
"best_request_rate": best.request_rate if best is not None else None, "best_request_rate": best.request_rate if best is not None else None,
"best_pass_rate": best.pass_rate if best is not None else None, "best_pass_rate": best.pass_rate if best is not None else None,
@@ -442,10 +445,12 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
"study_id": trial.study_id, "study_id": trial.study_id,
"trial_id": trial.trial_id, "trial_id": trial.trial_id,
"status": "failed", "status": "failed",
"config_patch": to_jsonable(trial.config_patch),
"best_sampling_u": None, "best_sampling_u": None,
"best_request_rate": None, "best_request_rate": None,
"best_pass_rate": None, "best_pass_rate": None,
"best_request_count": None, "best_request_count": None,
"failure_stage": failure_stage,
"failure_reason": str(exc), "failure_reason": str(exc),
"probes": probe_history, "probes": probe_history,
} }

View File

@@ -12,7 +12,7 @@ from aituner.cli import main as cli_main
from aituner.engine import build_launch_recipe from aituner.engine import build_launch_recipe
from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy
from aituner.job import append_job, build_trial_job from aituner.job import append_job, build_trial_job
from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal
from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.search import ThresholdProbe, binary_search_max_feasible
from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations
from aituner.spec import ( from aituner.spec import (
@@ -40,6 +40,7 @@ def _write_study_assets(
*, *,
trace_overrides: dict[str, object] | None = None, trace_overrides: dict[str, object] | None = None,
slo_overrides: dict[str, object] | None = None, slo_overrides: dict[str, object] | None = None,
engine_overrides: dict[str, object] | None = None,
) -> Path: ) -> Path:
trace_dir = tmp_path / "trace_windows" / "traces" trace_dir = tmp_path / "trace_windows" / "traces"
trace_dir.mkdir(parents=True) trace_dir.mkdir(parents=True)
@@ -155,6 +156,8 @@ def _write_study_assets(
} }
if slo_overrides: if slo_overrides:
study_payload["slo"].update(slo_overrides) study_payload["slo"].update(slo_overrides)
if engine_overrides:
study_payload["engine"].update(engine_overrides)
study_path.write_text(json.dumps(study_payload), encoding="utf-8") study_path.write_text(json.dumps(study_payload), encoding="utf-8")
return study_path return study_path
@@ -404,6 +407,40 @@ class CoreFlowTests(unittest.TestCase):
self.assertIn('"status": "failed"', prompt) self.assertIn('"status": "failed"', prompt)
self.assertIn('"failure_reason": "engine_process_exited_before_ready exit_code=1"', prompt) self.assertIn('"failure_reason": "engine_process_exited_before_ready exit_code=1"', prompt)
self.assertIn('"VLLM_ATTENTION_BACKEND": "FLASHINFER"', prompt) self.assertIn('"VLLM_ATTENTION_BACKEND": "FLASHINFER"', prompt)
self.assertIn("Known launch failures:", prompt)
def test_prompt_includes_failure_stage_for_launch_failures(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
window, requests = load_trace_requests(study, study_spec_path=study_path)
prompt = build_prompt(
study=study,
window_summary=summarize_window(requests, window),
state=StudyState(
study_id=study.study_id,
trials=[
TrialSummary(
trial_id="trial-0002",
status="failed",
diagnosis="bad topology",
config_patch={
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 3,
"data-parallel-size": 3,
},
},
failure_stage="engine_launch",
failure_reason="engine_process_exited_before_ready exit_code=1",
)
],
),
capability_profile=None,
)
self.assertIn('"failure_stage": "engine_launch"', prompt)
self.assertIn('"implicated_flag_keys"', prompt)
def test_parse_proposal_text_repairs_truncated_json(self) -> None: def test_parse_proposal_text_repairs_truncated_json(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
@@ -1003,6 +1040,182 @@ class CoreFlowTests(unittest.TestCase):
next_state.trials[0].failure_reason, next_state.trials[0].failure_reason,
"engine_process_exited_before_ready exit_code=1", "engine_process_exited_before_ready exit_code=1",
) )
self.assertEqual(next_state.trials[0].failure_stage, "")
def test_ingest_trial_results_records_failure_stage(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = store.load_state(study.study_id)
proposal = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Diag",
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
"expected_effects": ["raise rate"]
}
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
Path(trial.result_path).write_text(
json.dumps(
{
"study_id": study.study_id,
"trial_id": trial.trial_id,
"status": "failed",
"failure_stage": "engine_launch",
"failure_reason": "engine_process_exited_before_ready exit_code=1",
"probes": []
}
),
encoding="utf-8",
)
next_state = store.ingest_trial_results(study.study_id)
self.assertEqual(next_state.trials[0].failure_stage, "engine_launch")
def test_validate_proposal_rejects_invalid_tp_dp_product(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"enable-expert-parallel": True,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"expert-parallel-size": 8,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
],
"topology_constraints": {
"require_tp_dp_product_equals_gpu_count": True,
"require_ep_size_leq_tp_dp_product": True,
"require_ep_size_divides_tp_dp_product": True,
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
proposal = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Bad topology",
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"data-parallel-size": 2,
"expert-parallel-size": 4,
},
},
"expected_effects": ["raise throughput"],
}
)
with self.assertRaisesRegex(SpecError, "must equal hardware.gpu_count"):
validate_proposal(proposal, study)
def test_validate_proposal_rejects_invalid_ep_divisibility(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"enable-expert-parallel": True,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"expert-parallel-size": 8,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
],
"topology_constraints": {
"require_tp_dp_product_equals_gpu_count": True,
"require_ep_size_leq_tp_dp_product": True,
"require_ep_size_divides_tp_dp_product": True,
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
proposal = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Bad EP",
"config_patch": {
"env_patch": {},
"flag_patch": {
"expert-parallel-size": 3,
},
},
"expected_effects": ["raise throughput"],
}
)
with self.assertRaisesRegex(SpecError, "expert-parallel-size=3"):
validate_proposal(proposal, study)
def test_validate_proposal_accepts_valid_tp_dp_ep_combo(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(
tmp_path,
engine_overrides={
"base_flags": {
"host": "127.0.0.1",
"port": 8000,
"enable-expert-parallel": True,
"tensor-parallel-size": 4,
"data-parallel-size": 2,
"expert-parallel-size": 8,
},
"tunable_flags": [
"tensor-parallel-size",
"data-parallel-size",
"expert-parallel-size",
],
"topology_constraints": {
"require_tp_dp_product_equals_gpu_count": True,
"require_ep_size_leq_tp_dp_product": True,
"require_ep_size_divides_tp_dp_product": True,
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
},
},
)
study = load_study_spec(study_path)
proposal = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Valid topology",
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 2,
"data-parallel-size": 4,
"expert-parallel-size": 4,
},
},
"expected_effects": ["raise throughput"],
}
)
validated = validate_proposal(proposal, study)
self.assertEqual(validated.config_patch.flag_patch["tensor-parallel-size"], 2)
def test_cli_tune_runs_multiple_manual_proposals(self) -> None: def test_cli_tune_runs_multiple_manual_proposals(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp: