Add study tune loop and smoke configs

This commit is contained in:
2026-04-04 22:29:59 +08:00
parent 7b7eaafd78
commit f192c741ed
8 changed files with 387 additions and 1 deletions

View File

@@ -0,0 +1,19 @@
{
"observation": "Push batching further after validating the balanced layout.",
"diagnosis": "If TTFT remains under control, a larger admission window should maximize achieved request rate.",
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"max-num-seqs": 24,
"max-num-batched-tokens": 98304,
"gpu-memory-utilization": 0.92,
"block-size": 64
}
},
"expected_effects": [
"Highest throughput among the smoke candidates if memory is sufficient",
"More pressure on TTFT, so the binary search should locate the safe threshold"
],
"why_not_previous_failures": "Keeps tp=4 and block-size stable while only expanding batching and memory utilization."
}

View File

@@ -0,0 +1,19 @@
{
"observation": "Increase batching once tp=4 is fixed.",
"diagnosis": "Throughput should improve if the engine can admit more concurrent prefills without violating TTFT.",
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"max-num-seqs": 16,
"max-num-batched-tokens": 65536,
"gpu-memory-utilization": 0.9,
"block-size": 64
}
},
"expected_effects": [
"Higher feasible sampling_u than the conservative baseline",
"Better token throughput if memory headroom is sufficient"
],
"why_not_previous_failures": "Raises batching in a controlled step instead of jumping directly to the most aggressive setting."
}

View File

@@ -0,0 +1,19 @@
{
"observation": "Start from a safe tp=4 layout and conservative batching.",
"diagnosis": "The first pass should verify multi-GPU launch and avoid queueing collapse from over-batching.",
"config_patch": {
"env_patch": {},
"flag_patch": {
"tensor-parallel-size": 4,
"max-num-seqs": 8,
"max-num-batched-tokens": 32768,
"gpu-memory-utilization": 0.85,
"block-size": 64
}
},
"expected_effects": [
"Stable startup on 4x H20",
"Low risk of OOM during the first binary-search probes"
],
"why_not_previous_failures": "This is the initial baseline proposal."
}

View File

@@ -0,0 +1,83 @@
{
"study_id": "dash0-qwen30b-chat-smoke",
"hardware": {
"gpu_count": 4,
"gpu_model": "H20",
"host_candidates": ["dash0"]
},
"model": {
"model_id": "qwen3-30b-a3b",
"served_model_name": "qwen3-30b-smoke"
},
"engine": {
"engine_name": "vllm",
"engine_version": "0.13.0rc2.dev2111+gb44b43f43.d20260309",
"exec_path": "/usr/local/bin/vllm",
"cwd": "/home/admin/cpfs/wjh/aituner/aituner",
"host": "127.0.0.1",
"port": 18080,
"healthcheck_path": "/v1/models",
"ready_timeout_s": 900,
"request_timeout_s": 900,
"launch_args": [
"serve",
"/home/admin/resource/model/464482ce.qwen3-30b-a3b/1m-instruct-0726-fp4"
],
"base_envs": {
"CUDA_VISIBLE_DEVICES": "0,1,2,3"
},
"base_flags": {
"host": "127.0.0.1",
"port": 18080,
"served-model-name": "qwen3-30b-smoke",
"max-model-len": 65536,
"disable-log-requests": true,
"trust-remote-code": true
},
"tunable_envs": [
"VLLM_ATTENTION_BACKEND"
],
"tunable_flags": [
"tensor-parallel-size",
"max-num-seqs",
"max-num-batched-tokens",
"gpu-memory-utilization",
"block-size"
],
"python_executable": "python3"
},
"trace": {
"windows_path": "/home/admin/cpfs/wjh/aituner/aituner/trace_windows/windows.json",
"window_id": "chat_w20260311_1000",
"u_field": "sampling_u",
"timestamp_field": "timestamp",
"max_concurrency": 2,
"max_requests_per_probe": 24
},
"slo": {
"target_pass_rate": 0.95,
"ttft_rule": {
"kind": "step_ms",
"buckets": [
{"max_input_tokens": 4096, "threshold_ms": 15000},
{"max_input_tokens": 16384, "threshold_ms": 30000},
{"threshold_ms": 45000}
]
},
"tpot_rule": {
"kind": "fixed_ms",
"threshold_ms": 1500
}
},
"search": {
"low": 0.0,
"high": 1.0,
"tolerance": 0.1,
"max_probes": 4,
"sample_seed": 20260325
},
"llm": {
"system_prompt": "Propose a single engine config patch that increases the maximum feasible sampling_u under the SLO target.",
"max_history_trials": 8
}
}

View File

@@ -105,6 +105,76 @@ def cmd_study_ingest(args: argparse.Namespace) -> int:
return 0
def cmd_study_tune(args: argparse.Namespace) -> int:
spec_path = Path(args.spec).resolve()
study = load_study_spec(spec_path)
store = StudyStore(Path(args.store_root) if args.store_root else None)
study_root = store.init_study(spec_path=spec_path, study=study)
capability_profile = load_capability_profile(study, study_spec_path=spec_path)
proposal_files = [Path(item).resolve() for item in (args.proposal_file or [])]
max_trials = args.max_trials or (len(proposal_files) if proposal_files else 1)
if max_trials <= 0:
raise SpecError("max_trials must be positive")
if proposal_files and max_trials > len(proposal_files):
max_trials = len(proposal_files)
if not proposal_files and study.llm.endpoint is None:
raise SpecError("No proposal files provided and study.llm.endpoint is not configured")
executed: list[dict[str, object]] = []
for idx in range(max_trials):
state = store.load_state(study.study_id)
window, requests = load_trace_requests(study, study_spec_path=spec_path)
prompt = build_prompt(
study=study,
window_summary=summarize_window(requests, window),
state=state,
capability_profile=capability_profile,
)
prompt_name = f"prompt-{state.next_trial_index:04d}"
store.write_prompt(study.study_id, prompt_name, prompt)
if proposal_files:
proposal_source = proposal_files[idx]
proposal_text = proposal_source.read_text(encoding="utf-8")
proposal_name = proposal_source.stem
else:
proposal_source = None
proposal_text = call_llm_for_proposal(policy=study.llm, prompt=prompt)
proposal_name = f"proposal-{state.next_trial_index:04d}"
proposal = parse_proposal_text(proposal_text, study)
store.write_proposal(study.study_id, proposal_name, proposal)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
trial_spec_path = Path(trial.artifact_dir) / "trial_spec.json"
result = run_trial(trial_spec_path)
state = store.ingest_trial_results(study.study_id)
executed.append(
{
"trial_id": trial.trial_id,
"proposal_name": proposal_name,
"proposal_source": str(proposal_source) if proposal_source else "llm",
"best_sampling_u": result.get("best_sampling_u"),
"best_request_rate": result.get("best_request_rate"),
"best_pass_rate": result.get("best_pass_rate"),
"state_best_trial_id": state.best_trial_id,
"state_best_request_rate": state.best_request_rate,
}
)
final_state = store.load_state(study.study_id)
print(
json.dumps(
{
"study_root": str(study_root),
"executed_trials": executed,
"best_trial_id": final_state.best_trial_id,
"best_request_rate": final_state.best_request_rate,
},
ensure_ascii=False,
)
)
return 0
def cmd_worker_run_trial(args: argparse.Namespace) -> int:
result = run_trial(Path(args.trial_spec).resolve())
print(json.dumps(result))
@@ -154,6 +224,13 @@ def build_parser() -> argparse.ArgumentParser:
ingest.add_argument("--store-root")
ingest.set_defaults(func=cmd_study_ingest)
tune = study_sub.add_parser("tune")
tune.add_argument("--spec", required=True)
tune.add_argument("--store-root")
tune.add_argument("--proposal-file", action="append")
tune.add_argument("--max-trials", type=int)
tune.set_defaults(func=cmd_study_tune)
worker = subparsers.add_parser("worker")
worker_sub = worker.add_subparsers(dest="worker_command", required=True)
run = worker_sub.add_parser("run-trial")

View File

@@ -121,6 +121,17 @@ def _coerce_prompt_tokens(row: Mapping[str, Any]) -> int | None:
return None
def _downsample_requests(
requests: list[TraceRequest], *, limit: int
) -> list[TraceRequest]:
if limit <= 0:
return []
if len(requests) <= limit:
return requests
indexes = sorted({(i * len(requests)) // limit for i in range(limit)})
return [requests[idx] for idx in indexes]
def load_trace_requests(study: StudySpec, *, study_spec_path: Path) -> tuple[WindowRecord, list[TraceRequest]]:
window = resolve_window_record(study, study_spec_path=study_spec_path)
requests: list[TraceRequest] = []
@@ -179,7 +190,10 @@ def load_trace_requests(study: StudySpec, *, study_spec_path: Path) -> tuple[Win
)
requests.sort(key=lambda item: item.arrival_s)
if study.trace.max_requests_per_probe is not None:
requests = requests[: study.trace.max_requests_per_probe]
requests = _downsample_requests(
requests,
limit=study.trace.max_requests_per_probe,
)
return window, requests

View File

@@ -5,7 +5,9 @@ import subprocess
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from aituner.cli import main as cli_main
from aituner.job import append_job, build_trial_job
from aituner.llm import build_prompt, parse_proposal_text
from aituner.search import ThresholdProbe, binary_search_max_feasible
@@ -371,6 +373,82 @@ class CoreFlowTests(unittest.TestCase):
self.assertGreaterEqual(result.best_threshold, 0.5)
self.assertIsNotNone(result.best_feasible_payload)
def test_trace_max_requests_uses_window_wide_downsample(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
trace_dir = tmp_path / "trace_windows" / "traces"
trace_dir.mkdir(parents=True)
trace_path = trace_dir / "chat_many.jsonl"
with trace_path.open("w", encoding="utf-8") as handle:
for idx in range(10):
handle.write(
json.dumps(
{
"request_id": f"r{idx}",
"timestamp": float(idx),
"sampling_u": idx / 10.0,
"messages": [{"role": "user", "content": f"hello-{idx}"}],
"input_length": 10 + idx,
"output_length": 5,
}
)
+ "\n"
)
windows_path = tmp_path / "trace_windows" / "windows.json"
windows_path.write_text(
json.dumps(
{
"windows": [
{
"window_id": "w1",
"trace_type": "chat",
"trace_file": "traces/chat_many.jsonl",
"window_start": 0.0,
"window_end": 10.0,
}
]
}
),
encoding="utf-8",
)
study_path = tmp_path / "study.json"
study_path.write_text(
json.dumps(
{
"study_id": "study-downsample",
"hardware": {"gpu_count": 1},
"model": {"model_id": "m1", "served_model_name": "dummy-model"},
"engine": {
"engine_name": "vllm",
"exec_path": "/usr/local/bin/vllm",
"host": "127.0.0.1",
"port": 8000,
"ready_timeout_s": 10,
"request_timeout_s": 10,
"healthcheck_path": "/v1/models",
"launch_args": [],
"base_envs": {},
"base_flags": {},
"tunable_envs": [],
"tunable_flags": [],
},
"trace": {
"windows_path": str(windows_path),
"window_id": "w1",
"max_concurrency": 1,
"max_requests_per_probe": 4,
},
"slo": {"target_pass_rate": 0.95},
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
"llm": {"system_prompt": "", "max_history_trials": 1},
}
),
encoding="utf-8",
)
study = load_study_spec(study_path)
_, requests = load_trace_requests(study, study_spec_path=study_path)
self.assertEqual([item.row_id for item in requests], ["r0", "r2", "r5", "r7"])
def test_proposal_validation_and_job_emission(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -437,6 +515,83 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(next_state.best_trial_id, trial.trial_id)
self.assertEqual(next_state.best_request_rate, 12.5)
def test_cli_tune_runs_multiple_manual_proposals(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
proposal1 = tmp_path / "proposal-1.json"
proposal2 = tmp_path / "proposal-2.json"
proposal1.write_text(
json.dumps(
{
"observation": "trial one",
"diagnosis": "conservative",
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
"expected_effects": ["stable"],
"why_not_previous_failures": "",
}
),
encoding="utf-8",
)
proposal2.write_text(
json.dumps(
{
"observation": "trial two",
"diagnosis": "more batching",
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}},
"expected_effects": ["higher throughput"],
"why_not_previous_failures": "",
}
),
encoding="utf-8",
)
store_root = tmp_path / "store"
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
trial_id = str(payload["trial_id"])
trial_root = Path(payload["artifact_dir"])
if trial_id.endswith("0001"):
best_rate = 1.0
best_u = 0.5
else:
best_rate = 2.0
best_u = 0.75
result = {
"study_id": payload["study_id"],
"trial_id": trial_id,
"status": "completed",
"best_sampling_u": best_u,
"best_request_rate": best_rate,
"best_pass_rate": 1.0,
"best_request_count": 2,
"probes": [],
}
(trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8")
return result
with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial):
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--proposal-file",
str(proposal1),
"--proposal-file",
str(proposal2),
]
)
self.assertEqual(exit_code, 0)
store = StudyStore(store_root)
state = store.load_state("study-1")
self.assertEqual(state.best_trial_id, "trial-0002")
self.assertEqual(state.best_request_rate, 2.0)
self.assertEqual(state.next_trial_index, 3)
if __name__ == "__main__":
unittest.main()