Fix topology-aware incumbents for qwen27b tuning

This commit is contained in:
2026-04-11 00:32:41 +08:00
parent 06d4c380b3
commit a4d54442db
5 changed files with 282 additions and 22 deletions

View File

@@ -1,7 +1,7 @@
{
"study_id": "dash0-qwen27b-tight-slo-10min-run4-chat-0-8k",
"hardware": {
"gpu_count": 4,
"gpu_count": 8,
"gpu_model": "H20",
"host_candidates": [
"dash0"
@@ -26,7 +26,6 @@
"/home/admin/resource/model/464482ce/qwen3.5-27b/256k-0223-internal"
],
"base_envs": {
"CUDA_VISIBLE_DEVICES": "4,5,6,7",
"VLLM_DISABLE_COMPILE_CACHE": "1",
"DS_LLM_IGNORE_WARMUP": "1",
"DS_LLM_IGNORE_CHECK_WARMUP": "1",
@@ -73,10 +72,7 @@
"mamba-cache-dtype": "float32",
"skip-mm-profiling": true,
"quantization": "fp8",
"tensor-parallel-size": 4,
"data-parallel-size": 1,
"expert-parallel-size": 1,
"max-num-seqs": 16,
"tensor-parallel-size": 1,
"disable-log-requests": true
},
"tunable_envs": [
@@ -100,7 +96,7 @@
"require_enable_expert_parallel_when_ep_gt_one": true,
"validate_cuda_graph_sizes_divisible_by_tp_when_tp_ep_reduce_scatter": true,
"allowed_tp_dp_products": [1, 2, 4, 8],
"allowed_tensor_parallel_sizes": [1, 2, 4],
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
"allowed_data_parallel_sizes": [1, 2, 4, 8],
"allowed_expert_parallel_sizes": [1]
},

View File

@@ -192,8 +192,10 @@ def build_prompt(
{
"trial_id": trial.trial_id,
"status": trial.status,
"parallel_size": trial.parallel_size,
"best_sampling_u": trial.best_sampling_u,
"best_request_rate": trial.best_request_rate,
"best_request_rate_per_gpu": trial.best_request_rate_per_gpu,
"best_pass_rate": trial.best_pass_rate,
"diagnosis": trial.diagnosis,
"config_patch": trial.config_patch,
@@ -224,9 +226,12 @@ def build_prompt(
"study_id": study.study_id,
"current_best": {
"trial_id": state.best_trial_id,
"best_parallel_size": state.best_parallel_size,
"best_sampling_u": state.best_sampling_u,
"best_request_rate": state.best_request_rate,
"best_request_rate_per_gpu": state.best_request_rate_per_gpu,
},
"best_by_parallel_size": state.best_by_parallel_size,
"hardware": {
"gpu_count": study.hardware.gpu_count,
"gpu_model": study.hardware.gpu_model,
@@ -294,9 +299,10 @@ def build_prompt(
"Parallel space candidates:",
json.dumps(parallel_candidates, ensure_ascii=False, indent=2),
"",
"The proposal must beat the current incumbent. Do not propose a config that is only likely to be feasible below the current best_sampling_u/request_rate.",
"The evaluator for a new trial will start searching from the current best feasible sampling_u and only look for improvements above it.",
"The proposal should improve the maximum feasible sampling_u under the 95%+ SLO target.",
"The primary cross-topology comparison metric is request_rate_per_gpu, not raw request_rate.",
"The proposal should beat the incumbent on request_rate_per_gpu under the 95%+ SLO target.",
"The evaluator uses the best feasible sampling_u from the same tp_dp_product group when it exists. For a different tp_dp_product group, it uses a scaled lower floor instead of reusing the global incumbent directly.",
"Do not assume a configuration with fewer GPUs must start from the global incumbent sampling_u.",
]
return "\n".join(sections)

View File

@@ -681,8 +681,10 @@ class TrialSpec:
class TrialSummary:
trial_id: str
status: str
parallel_size: int | None = None
best_sampling_u: float | None = None
best_request_rate: float | None = None
best_request_rate_per_gpu: float | None = None
best_pass_rate: float | None = None
result_path: str | None = None
diagnosis: str = ""
@@ -695,9 +697,12 @@ class TrialSummary:
class StudyState:
study_id: str
best_trial_id: str | None = None
best_parallel_size: int | None = None
best_sampling_u: float | None = None
best_request_rate: float | None = None
best_request_rate_per_gpu: float | None = None
next_trial_index: int = 1
best_by_parallel_size: dict[str, dict[str, Any]] = field(default_factory=dict)
trials: list[TrialSummary] = field(default_factory=list)

View File

@@ -32,9 +32,16 @@ class StudyStore:
return StudyState(
study_id=str(payload["study_id"]),
best_trial_id=payload.get("best_trial_id"),
best_parallel_size=payload.get("best_parallel_size"),
best_sampling_u=payload.get("best_sampling_u"),
best_request_rate=payload.get("best_request_rate"),
best_request_rate_per_gpu=payload.get("best_request_rate_per_gpu"),
next_trial_index=int(payload.get("next_trial_index", 1)),
best_by_parallel_size={
str(key): value
for key, value in (payload.get("best_by_parallel_size") or {}).items()
if isinstance(value, dict)
},
trials=trials,
)
@@ -61,21 +68,15 @@ class StudyStore:
trial_id = f"trial-{state.next_trial_index:04d}"
trial_root = self.study_root(study.study_id) / "trials" / trial_id
trial_root.mkdir(parents=True, exist_ok=True)
parallel_size = _parallel_size_for_proposal(study=study, proposal=proposal)
search_low = _derive_search_floor(study=study, state=state, parallel_size=parallel_size)
spec = TrialSpec(
study_id=study.study_id,
trial_id=trial_id,
config_patch=proposal.config_patch,
search=replace(
study.search,
low=min(
study.search.high,
max(
study.search.low,
float(state.best_sampling_u)
if isinstance(state.best_sampling_u, (int, float))
else study.search.low,
),
),
low=search_low,
),
study_spec_path=str((self.study_root(study.study_id) / "study_spec.source").resolve()),
artifact_dir=str(trial_root),
@@ -89,6 +90,7 @@ class StudyStore:
TrialSummary(
trial_id=trial_id,
status="queued",
parallel_size=parallel_size,
diagnosis=proposal.diagnosis,
config_patch=to_jsonable(proposal.config_patch),
)
@@ -97,12 +99,20 @@ class StudyStore:
return spec, next_state
def ingest_trial_results(self, study_id: str) -> StudyState:
study = self.load_study_snapshot(study_id)
state = self.load_state(study_id)
by_id = {item.trial_id: item for item in state.trials}
trials_dir = self.study_root(study_id) / "trials"
best_trial_id = state.best_trial_id
best_parallel_size = state.best_parallel_size
best_sampling_u = state.best_sampling_u
best_rate = state.best_request_rate
best_rate_per_gpu = state.best_request_rate_per_gpu
best_by_parallel_size = {
str(key): dict(value)
for key, value in (state.best_by_parallel_size or {}).items()
if isinstance(value, dict)
}
for trial_dir in sorted(trials_dir.glob("trial-*")):
result_path = trial_dir / "result.json"
if not result_path.exists():
@@ -115,30 +125,140 @@ class StudyStore:
state.trials.append(summary)
by_id[trial_id] = summary
summary.status = str(payload.get("status") or "completed")
if summary.parallel_size is None:
summary.parallel_size = _parallel_size_for_trial_id(
study=study,
study_root=self.study_root(study_id),
trial_id=trial_id,
)
summary.best_sampling_u = payload.get("best_sampling_u")
summary.best_request_rate = payload.get("best_request_rate")
summary.best_request_rate_per_gpu = _request_rate_per_gpu(
summary.best_request_rate,
summary.parallel_size,
)
summary.best_pass_rate = payload.get("best_pass_rate")
summary.result_path = str(result_path)
summary.failure_reason = str(payload.get("failure_reason") or "").strip()
summary.failure_stage = str(payload.get("failure_stage") or "").strip()
if (
isinstance(summary.best_request_rate, (int, float))
and (best_rate is None or summary.best_request_rate > best_rate)
isinstance(summary.parallel_size, int)
and summary.parallel_size > 0
and isinstance(summary.best_request_rate_per_gpu, (int, float))
):
best_rate = float(summary.best_request_rate)
group_key = str(summary.parallel_size)
incumbent = best_by_parallel_size.get(group_key)
if incumbent is None or float(summary.best_request_rate_per_gpu) > float(
incumbent.get("best_request_rate_per_gpu") or float("-inf")
):
best_by_parallel_size[group_key] = {
"trial_id": trial_id,
"parallel_size": summary.parallel_size,
"best_sampling_u": summary.best_sampling_u,
"best_request_rate": summary.best_request_rate,
"best_request_rate_per_gpu": summary.best_request_rate_per_gpu,
"best_pass_rate": summary.best_pass_rate,
}
if (
isinstance(summary.best_request_rate_per_gpu, (int, float))
and (
best_rate_per_gpu is None
or float(summary.best_request_rate_per_gpu) > float(best_rate_per_gpu)
)
):
best_rate = (
float(summary.best_request_rate)
if isinstance(summary.best_request_rate, (int, float))
else None
)
best_rate_per_gpu = float(summary.best_request_rate_per_gpu)
best_sampling_u = (
float(summary.best_sampling_u)
if isinstance(summary.best_sampling_u, (int, float))
else None
)
best_trial_id = trial_id
best_parallel_size = summary.parallel_size
state.best_sampling_u = best_sampling_u
state.best_request_rate = best_rate
state.best_request_rate_per_gpu = best_rate_per_gpu
state.best_trial_id = best_trial_id
state.best_parallel_size = best_parallel_size
state.best_by_parallel_size = best_by_parallel_size
self.save_state(state)
return state
def load_study_snapshot(self, study_id: str) -> StudySpec:
path = self.study_root(study_id) / "study_spec.snapshot.json"
payload = json.loads(path.read_text(encoding="utf-8"))
return StudySpec.from_dict(payload)
@staticmethod
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def _parse_int_like(value: Any, *, default: int = 1) -> int:
if value is None:
return default
if isinstance(value, bool):
raise ValueError("Boolean values are not valid integer topology settings.")
if isinstance(value, int):
return value
if isinstance(value, float) and value.is_integer():
return int(value)
if isinstance(value, str) and value.strip():
return int(value.strip())
raise ValueError(f"Unable to parse integer value from {value!r}.")
def _parallel_size_for_config(*, study: StudySpec, flag_patch: dict[str, Any]) -> int:
flags = dict(study.engine.base_flags)
flags.update(flag_patch)
tp = _parse_int_like(flags.get("tensor-parallel-size"), default=1)
dp = _parse_int_like(flags.get("data-parallel-size"), default=1)
return tp * dp
def _parallel_size_for_proposal(*, study: StudySpec, proposal: Proposal) -> int:
return _parallel_size_for_config(study=study, flag_patch=proposal.config_patch.flag_patch)
def _parallel_size_for_trial_id(*, study: StudySpec, study_root: Path, trial_id: str) -> int | None:
trial_spec_path = study_root / "trials" / trial_id / "trial_spec.json"
if not trial_spec_path.exists():
return None
payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
config_patch = payload.get("config_patch") or {}
flag_patch = config_patch.get("flag_patch") if isinstance(config_patch, dict) else {}
if not isinstance(flag_patch, dict):
return None
return _parallel_size_for_config(study=study, flag_patch=flag_patch)
def _request_rate_per_gpu(best_request_rate: Any, parallel_size: int | None) -> float | None:
if not isinstance(best_request_rate, (int, float)) or not isinstance(parallel_size, int):
return None
if parallel_size <= 0:
return None
return float(best_request_rate) / float(parallel_size)
def _derive_search_floor(*, study: StudySpec, state: StudyState, parallel_size: int) -> float:
low = study.search.low
high = study.search.high
group_incumbent = (state.best_by_parallel_size or {}).get(str(parallel_size))
if isinstance(group_incumbent, dict) and isinstance(
group_incumbent.get("best_sampling_u"), (int, float)
):
candidate = float(group_incumbent["best_sampling_u"])
elif (
isinstance(state.best_sampling_u, (int, float))
and isinstance(state.best_parallel_size, int)
and state.best_parallel_size > 0
):
candidate = float(state.best_sampling_u) * float(parallel_size) / float(state.best_parallel_size)
else:
candidate = low
return min(high, max(low, candidate))

View File

@@ -1039,6 +1039,12 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(next_state.best_trial_id, trial.trial_id)
self.assertEqual(next_state.best_sampling_u, 0.75)
self.assertEqual(next_state.best_request_rate, 12.5)
self.assertEqual(next_state.best_parallel_size, 4)
self.assertEqual(next_state.best_request_rate_per_gpu, 3.125)
self.assertEqual(
next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"],
3.125,
)
def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
@@ -1050,8 +1056,10 @@ class CoreFlowTests(unittest.TestCase):
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=4,
best_sampling_u=0.375,
best_request_rate=3.0,
best_request_rate_per_gpu=0.75,
next_trial_index=2,
trials=[],
)
@@ -1067,6 +1075,71 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(trial.search.low, 0.375)
self.assertEqual(trial.search.high, 1.0)
def test_materialize_trial_uses_same_parallel_group_incumbent(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=4,
best_sampling_u=0.375,
best_request_rate=3.0,
best_request_rate_per_gpu=0.75,
next_trial_index=2,
best_by_parallel_size={
"2": {
"trial_id": "trial-0000",
"parallel_size": 2,
"best_sampling_u": 0.125,
"best_request_rate": 0.8,
"best_request_rate_per_gpu": 0.4,
}
},
trials=[],
)
proposal = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Diag",
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}},
"expected_effects": ["raise rate"],
}
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
self.assertEqual(trial.search.low, 0.125)
def test_materialize_trial_scales_search_floor_for_different_parallel_group(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = StudyState(
study_id=study.study_id,
best_trial_id="trial-0001",
best_parallel_size=4,
best_sampling_u=0.4,
best_request_rate=3.0,
best_request_rate_per_gpu=0.75,
next_trial_index=2,
trials=[],
)
proposal = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Diag",
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}},
"expected_effects": ["raise rate"],
}
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
self.assertEqual(trial.search.low, 0.2)
def test_ingest_trial_results_records_failure_reason(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -1137,6 +1210,66 @@ class CoreFlowTests(unittest.TestCase):
next_state = store.ingest_trial_results(study.study_id)
self.assertEqual(next_state.trials[0].failure_stage, "engine_launch")
def test_ingest_trial_results_prefers_higher_request_rate_per_gpu(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = store.load_state(study.study_id)
proposal_a = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Diag",
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
"expected_effects": ["raise rate"],
}
)
trial_a, state = store.materialize_trial(study=study, state=state, proposal=proposal_a)
Path(trial_a.result_path).write_text(
json.dumps(
{
"study_id": study.study_id,
"trial_id": trial_a.trial_id,
"status": "completed",
"best_sampling_u": 0.5,
"best_request_rate": 4.0,
"best_pass_rate": 0.97,
}
),
encoding="utf-8",
)
proposal_b = Proposal.from_dict(
{
"observation": "Obs",
"diagnosis": "Diag",
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}},
"expected_effects": ["raise rate"],
}
)
trial_b, _ = store.materialize_trial(study=study, state=state, proposal=proposal_b)
Path(trial_b.result_path).write_text(
json.dumps(
{
"study_id": study.study_id,
"trial_id": trial_b.trial_id,
"status": "completed",
"best_sampling_u": 0.4,
"best_request_rate": 3.0,
"best_pass_rate": 0.97,
}
),
encoding="utf-8",
)
next_state = store.ingest_trial_results(study.study_id)
self.assertEqual(next_state.best_trial_id, trial_b.trial_id)
self.assertEqual(next_state.best_parallel_size, 2)
self.assertEqual(next_state.best_request_rate, 3.0)
self.assertEqual(next_state.best_request_rate_per_gpu, 1.5)
self.assertEqual(next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"], 1.0)
self.assertEqual(next_state.best_by_parallel_size["2"]["best_request_rate_per_gpu"], 1.5)
def test_validate_proposal_rejects_invalid_tp_dp_product(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)