745 lines
30 KiB
Python
745 lines
30 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
from aituner.cli import main as cli_main
|
|
from aituner.http_client import _openai_url
|
|
from aituner.job import append_job, build_trial_job
|
|
from aituner.llm import build_prompt, parse_proposal_text
|
|
from aituner.search import ThresholdProbe, binary_search_max_feasible
|
|
from aituner.slo import RequestOutcome, summarize_evaluations
|
|
from aituner.spec import Proposal, load_study_spec
|
|
from aituner.store import StudyStore
|
|
from aituner.trace import load_trace_requests, summarize_window
|
|
from aituner.worker import _replay_requests
|
|
from aituner.trace import TraceRequest
|
|
|
|
|
|
def _write_study_assets(tmp_path: Path) -> Path:
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_w1.jsonl"
|
|
rows = [
|
|
{
|
|
"request_id": "r1",
|
|
"timestamp": 0.0,
|
|
"sampling_u": 0.10,
|
|
"messages": [{"role": "user", "content": "hello"}],
|
|
"input_length": 1000,
|
|
"output_length": 16
|
|
},
|
|
{
|
|
"request_id": "r2",
|
|
"timestamp": 1.0,
|
|
"sampling_u": 0.50,
|
|
"messages": [{"role": "user", "content": "world"}],
|
|
"input_length": 5000,
|
|
"output_length": 32
|
|
},
|
|
{
|
|
"request_id": "r3",
|
|
"timestamp": 2.0,
|
|
"sampling_u": 0.90,
|
|
"messages": [{"role": "user", "content": "!"}],
|
|
"input_length": 20000,
|
|
"output_length": 64
|
|
}
|
|
]
|
|
with trace_path.open("w", encoding="utf-8") as handle:
|
|
for row in rows:
|
|
handle.write(json.dumps(row) + "\n")
|
|
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_payload = {
|
|
"u_field": "sampling_u",
|
|
"windows": [
|
|
{
|
|
"window_id": "chat_w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_w1.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0
|
|
}
|
|
]
|
|
}
|
|
windows_path.write_text(json.dumps(windows_payload), encoding="utf-8")
|
|
|
|
capability_path = tmp_path / "capability.json"
|
|
capability_path.write_text(
|
|
json.dumps({"prefill_service_by_bucket": {"4k": {"tp4_ms": 320, "tp8_ms": 240}}}),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
study_path = tmp_path / "study.json"
|
|
study_payload = {
|
|
"study_id": "study-1",
|
|
"hardware": {"gpu_count": 8, "gpu_model": "H20", "host_candidates": ["dash0"]},
|
|
"model": {
|
|
"model_id": "qwen",
|
|
"served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507"
|
|
},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"engine_version": "0.1",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"cwd": str(tmp_path),
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"healthcheck_path": "/v1/models",
|
|
"ready_timeout_s": 30,
|
|
"request_timeout_s": 30,
|
|
"launch_args": ["serve", "/models/qwen"],
|
|
"base_envs": {"BASE_ENV": "1"},
|
|
"base_flags": {"host": "127.0.0.1", "port": 8000},
|
|
"tunable_envs": ["VLLM_ATTENTION_BACKEND"],
|
|
"tunable_flags": ["tensor-parallel-size", "max-num-seqs"],
|
|
"python_executable": "python3"
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "chat_w1",
|
|
"u_field": "sampling_u",
|
|
"timestamp_field": "timestamp",
|
|
"max_concurrency": 4
|
|
},
|
|
"slo": {
|
|
"target_pass_rate": 0.95,
|
|
"ttft_rule": {
|
|
"kind": "step_ms",
|
|
"buckets": [
|
|
{"max_input_tokens": 4096, "threshold_ms": 2000},
|
|
{"max_input_tokens": 16384, "threshold_ms": 5000},
|
|
{"threshold_ms": 9000}
|
|
]
|
|
},
|
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120}
|
|
},
|
|
"search": {
|
|
"low": 0.0,
|
|
"high": 1.0,
|
|
"tolerance": 0.01,
|
|
"max_probes": 8,
|
|
"sample_seed": 20260325
|
|
},
|
|
"llm": {"system_prompt": "Tune it.", "max_history_trials": 8},
|
|
"capability_profile_path": str(capability_path)
|
|
}
|
|
study_path.write_text(json.dumps(study_payload), encoding="utf-8")
|
|
return study_path
|
|
|
|
|
|
class CoreFlowTests(unittest.TestCase):
|
|
def test_trace_and_prompt_flow(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
study_root = store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
summary = summarize_window(requests, window)
|
|
self.assertEqual(summary["request_count"], 3)
|
|
self.assertEqual(summary["request_rate"], 0.3)
|
|
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summary,
|
|
state=state,
|
|
capability_profile={"queueing_knee_by_bucket": {"4k": 1000}},
|
|
)
|
|
self.assertIn("allowed_flag_keys", prompt)
|
|
self.assertIn("study-1", prompt)
|
|
self.assertIn("queueing_knee_by_bucket", prompt)
|
|
self.assertTrue(study_root.exists())
|
|
|
|
def test_length_only_trace_rows_are_synthesized(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_len_only.jsonl"
|
|
with trace_path.open("w", encoding="utf-8") as handle:
|
|
handle.write(
|
|
json.dumps(
|
|
{
|
|
"timestamp": 0.0,
|
|
"sampling_u": 0.1,
|
|
"input_length": 32,
|
|
"output_length": 16
|
|
}
|
|
)
|
|
+ "\n"
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"windows": [
|
|
{
|
|
"window_id": "w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_len_only.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0
|
|
}
|
|
]
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study_path = tmp_path / "study.json"
|
|
study_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": "study-len-only",
|
|
"hardware": {"gpu_count": 1},
|
|
"model": {
|
|
"model_id": "m1",
|
|
"served_model_name": "dummy-model"
|
|
},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"ready_timeout_s": 10,
|
|
"request_timeout_s": 10,
|
|
"healthcheck_path": "/v1/models",
|
|
"launch_args": [],
|
|
"base_envs": {},
|
|
"base_flags": {},
|
|
"tunable_envs": [],
|
|
"tunable_flags": []
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "w1",
|
|
"max_concurrency": 1,
|
|
"synthetic_prompt_cap_tokens": 8
|
|
},
|
|
"slo": {"target_pass_rate": 0.95},
|
|
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
|
|
"llm": {"system_prompt": "", "max_history_trials": 1}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
_, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual(len(requests), 1)
|
|
message = requests[0].body["messages"][0]["content"]
|
|
self.assertEqual(message.count("token"), 8)
|
|
self.assertEqual(requests[0].body["min_tokens"], 16)
|
|
self.assertEqual(requests[0].body["max_tokens"], 16)
|
|
|
|
def test_slo_evaluation_step_and_fixed_rules(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
study = load_study_spec(_write_study_assets(Path(tmp)))
|
|
outcomes = [
|
|
RequestOutcome(
|
|
request_id="r1",
|
|
success=True,
|
|
ttft_ms=1000,
|
|
tpot_ms=100,
|
|
prompt_tokens=1000,
|
|
completion_tokens=16,
|
|
),
|
|
RequestOutcome(
|
|
request_id="r2",
|
|
success=True,
|
|
ttft_ms=6000,
|
|
tpot_ms=100,
|
|
prompt_tokens=5000,
|
|
completion_tokens=16,
|
|
),
|
|
]
|
|
evaluations, summary = summarize_evaluations(outcomes, study.slo)
|
|
self.assertTrue(evaluations[0].passed)
|
|
self.assertFalse(evaluations[1].passed)
|
|
self.assertEqual(summary["slo_pass_rate"], 0.5)
|
|
|
|
def test_prepare_trace_windows_materializes_repo_local_assets(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
legacy_source = tmp_path / "legacy"
|
|
thinking_source = tmp_path / "thinking"
|
|
legacy_source.mkdir()
|
|
thinking_source.mkdir()
|
|
|
|
for filename in [
|
|
"qwen_chat_blksz_64_031109-031111",
|
|
"qwen_chat_blksz_64_031121-031123",
|
|
"qwen_chat_blksz_64_031209-031211",
|
|
"qwen_chat_blksz_64_031221-031223",
|
|
"qwen_chat_blksz_64_031309-031311",
|
|
"qwen_chat_blksz_64_031321-031323",
|
|
"qwen_chat_blksz_64_031609-031611",
|
|
"qwen_chat_blksz_64_031621-031623",
|
|
"qwen_chat_blksz_64_031709-031711",
|
|
"qwen_chat_blksz_64_031721-031723",
|
|
]:
|
|
for suffix in [".jsonl", "_prompt.jsonl"]:
|
|
path = legacy_source / f"{filename}{suffix}"
|
|
path.write_text("", encoding="utf-8")
|
|
|
|
peak_trace = legacy_source / "qwen_chat_blksz_64_031109-031111.jsonl"
|
|
peak_prompt = legacy_source / "qwen_chat_blksz_64_031109-031111_prompt.jsonl"
|
|
peak_trace.write_text(
|
|
"\n".join(
|
|
[
|
|
json.dumps(
|
|
{
|
|
"chat_id": "c1",
|
|
"turn": 1,
|
|
"timestamp": 3599.0,
|
|
"input_length": 10,
|
|
"output_length": 3,
|
|
}
|
|
),
|
|
json.dumps(
|
|
{
|
|
"chat_id": "c2",
|
|
"turn": 2,
|
|
"timestamp": 3605.0,
|
|
"input_length": 20,
|
|
"output_length": 7,
|
|
}
|
|
),
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
peak_prompt.write_text(
|
|
"\n".join(
|
|
[
|
|
json.dumps({"chat_id": "c1", "turn": 1, "prompt": "ignore me"}),
|
|
json.dumps({"chat_id": "c2", "turn": 2, "prompt": "real prompt"}),
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
output_root = tmp_path / "trace_windows"
|
|
subprocess.run(
|
|
[
|
|
"python3",
|
|
"scripts/prepare_trace_windows.py",
|
|
"--legacy-source",
|
|
str(legacy_source),
|
|
"--thinking-source",
|
|
str(thinking_source),
|
|
"--output-root",
|
|
str(output_root),
|
|
"--workloads",
|
|
"chat",
|
|
"--overwrite",
|
|
],
|
|
check=True,
|
|
cwd="/home/gahow/phd/aituner",
|
|
)
|
|
|
|
windows_payload = json.loads((output_root / "windows.json").read_text(encoding="utf-8"))
|
|
windows = {item["window_id"]: item for item in windows_payload["windows"]}
|
|
self.assertIn("chat_w20260311_1000", windows)
|
|
self.assertEqual(windows["chat_w20260311_1000"]["num_requests"], 1)
|
|
|
|
trace_path = output_root / windows["chat_w20260311_1000"]["trace_file"]
|
|
rows = [json.loads(line) for line in trace_path.read_text(encoding="utf-8").splitlines()]
|
|
self.assertEqual(len(rows), 1)
|
|
self.assertEqual(rows[0]["prompt"], "real prompt")
|
|
self.assertEqual(rows[0]["timestamp"], 5.0)
|
|
self.assertEqual(rows[0]["output_length"], 7)
|
|
self.assertIsInstance(rows[0]["sampling_u"], float)
|
|
|
|
def test_binary_search_max_feasible(self) -> None:
|
|
result = binary_search_max_feasible(
|
|
low=0.0,
|
|
high=1.0,
|
|
tolerance=0.01,
|
|
max_probes=8,
|
|
evaluator=lambda threshold: ThresholdProbe(
|
|
threshold=threshold,
|
|
feasible=threshold <= 0.625,
|
|
payload={"threshold": threshold},
|
|
),
|
|
)
|
|
self.assertLessEqual(result.best_threshold, 0.625)
|
|
self.assertGreaterEqual(result.best_threshold, 0.5)
|
|
self.assertIsNotNone(result.best_feasible_payload)
|
|
|
|
def test_trace_max_requests_uses_window_wide_downsample(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_many.jsonl"
|
|
with trace_path.open("w", encoding="utf-8") as handle:
|
|
for idx in range(10):
|
|
handle.write(
|
|
json.dumps(
|
|
{
|
|
"request_id": f"r{idx}",
|
|
"timestamp": float(idx),
|
|
"sampling_u": idx / 10.0,
|
|
"messages": [{"role": "user", "content": f"hello-{idx}"}],
|
|
"input_length": 10 + idx,
|
|
"output_length": 5,
|
|
}
|
|
)
|
|
+ "\n"
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"windows": [
|
|
{
|
|
"window_id": "w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_many.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0,
|
|
}
|
|
]
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study_path = tmp_path / "study.json"
|
|
study_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": "study-downsample",
|
|
"hardware": {"gpu_count": 1},
|
|
"model": {"model_id": "m1", "served_model_name": "dummy-model"},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"ready_timeout_s": 10,
|
|
"request_timeout_s": 10,
|
|
"healthcheck_path": "/v1/models",
|
|
"launch_args": [],
|
|
"base_envs": {},
|
|
"base_flags": {},
|
|
"tunable_envs": [],
|
|
"tunable_flags": [],
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "w1",
|
|
"max_concurrency": 1,
|
|
"max_requests_per_probe": 4,
|
|
},
|
|
"slo": {"target_pass_rate": 0.95},
|
|
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
|
|
"llm": {"system_prompt": "", "max_history_trials": 1},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
_, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual([item.row_id for item in requests], ["r0", "r2", "r5", "r7"])
|
|
|
|
def test_trace_replay_time_scale_scales_arrivals_and_window(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_scale.jsonl"
|
|
trace_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"request_id": "r1",
|
|
"timestamp": 10.0,
|
|
"sampling_u": 0.25,
|
|
"messages": [{"role": "user", "content": "hello"}],
|
|
"input_length": 16,
|
|
"output_length": 4,
|
|
}
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"windows": [
|
|
{
|
|
"window_id": "w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_scale.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 100.0,
|
|
}
|
|
]
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study_path = tmp_path / "study.json"
|
|
study_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": "study-scale",
|
|
"hardware": {"gpu_count": 1},
|
|
"model": {"model_id": "m1", "served_model_name": "dummy-model"},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"ready_timeout_s": 10,
|
|
"request_timeout_s": 10,
|
|
"healthcheck_path": "/v1/models",
|
|
"launch_args": [],
|
|
"base_envs": {},
|
|
"base_flags": {},
|
|
"tunable_envs": [],
|
|
"tunable_flags": [],
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "w1",
|
|
"max_concurrency": 1,
|
|
"replay_time_scale": 0.1,
|
|
},
|
|
"slo": {"target_pass_rate": 0.95},
|
|
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
|
|
"llm": {"system_prompt": "", "max_history_trials": 1},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual(window.window_end, 10.0)
|
|
self.assertEqual(requests[0].arrival_s, 1.0)
|
|
|
|
def test_proposal_validation_and_job_emission(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
|
|
proposal_text = json.dumps(
|
|
{
|
|
"observation": "Current TTFT fails before TPOT.",
|
|
"diagnosis": "Prefill pressure dominates.",
|
|
"config_patch": {
|
|
"env_patch": {"VLLM_ATTENTION_BACKEND": "FLASHINFER"},
|
|
"flag_patch": {"tensor-parallel-size": 4, "max-num-seqs": 64}
|
|
},
|
|
"expected_effects": ["lower TTFT", "raise feasible sampling_u"],
|
|
"why_not_previous_failures": "Avoids changing unsupported envs."
|
|
}
|
|
)
|
|
proposal = parse_proposal_text(proposal_text, study)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
|
|
job = build_trial_job(study=study, trial=trial, repo_root=tmp_path)
|
|
jobs_path = tmp_path / "jobs.toml"
|
|
append_job(jobs_path, job)
|
|
rendered = jobs_path.read_text(encoding="utf-8")
|
|
self.assertIn('name = "study-1-trial-0001"', rendered)
|
|
self.assertIn('command = "python3 -m aituner.cli worker run-trial', rendered)
|
|
self.assertIn('PYTHONPATH = "src"', rendered)
|
|
|
|
def test_ingest_trial_results_updates_best(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["raise rate"]
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
Path(trial.result_path).write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": study.study_id,
|
|
"trial_id": trial.trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": 0.75,
|
|
"best_request_rate": 12.5,
|
|
"best_pass_rate": 0.97
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
next_state = store.ingest_trial_results(study.study_id)
|
|
self.assertEqual(next_state.best_trial_id, trial.trial_id)
|
|
self.assertEqual(next_state.best_request_rate, 12.5)
|
|
|
|
def test_cli_tune_runs_multiple_manual_proposals(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
proposal1 = tmp_path / "proposal-1.json"
|
|
proposal2 = tmp_path / "proposal-2.json"
|
|
proposal1.write_text(
|
|
json.dumps(
|
|
{
|
|
"observation": "trial one",
|
|
"diagnosis": "conservative",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["stable"],
|
|
"why_not_previous_failures": "",
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
proposal2.write_text(
|
|
json.dumps(
|
|
{
|
|
"observation": "trial two",
|
|
"diagnosis": "more batching",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}},
|
|
"expected_effects": ["higher throughput"],
|
|
"why_not_previous_failures": "",
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
store_root = tmp_path / "store"
|
|
|
|
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
|
|
payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
|
|
trial_id = str(payload["trial_id"])
|
|
trial_root = Path(payload["artifact_dir"])
|
|
if trial_id.endswith("0001"):
|
|
best_rate = 1.0
|
|
best_u = 0.5
|
|
else:
|
|
best_rate = 2.0
|
|
best_u = 0.75
|
|
result = {
|
|
"study_id": payload["study_id"],
|
|
"trial_id": trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": best_u,
|
|
"best_request_rate": best_rate,
|
|
"best_pass_rate": 1.0,
|
|
"best_request_count": 2,
|
|
"probes": [],
|
|
}
|
|
(trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8")
|
|
return result
|
|
|
|
with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial):
|
|
exit_code = cli_main(
|
|
[
|
|
"study",
|
|
"tune",
|
|
"--spec",
|
|
str(study_path),
|
|
"--store-root",
|
|
str(store_root),
|
|
"--proposal-file",
|
|
str(proposal1),
|
|
"--proposal-file",
|
|
str(proposal2),
|
|
]
|
|
)
|
|
self.assertEqual(exit_code, 0)
|
|
store = StudyStore(store_root)
|
|
state = store.load_state("study-1")
|
|
self.assertEqual(state.best_trial_id, "trial-0002")
|
|
self.assertEqual(state.best_request_rate, 2.0)
|
|
self.assertEqual(state.next_trial_index, 3)
|
|
|
|
def test_proposal_expected_effects_accepts_string(self) -> None:
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "obs",
|
|
"diagnosis": "diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
|
"expected_effects": "higher throughput",
|
|
}
|
|
)
|
|
self.assertEqual(proposal.expected_effects, ["higher throughput"])
|
|
|
|
def test_replay_requests_early_stops_when_slo_is_unrecoverable(self) -> None:
|
|
requests = [
|
|
TraceRequest(
|
|
row_id=f"r{i}",
|
|
arrival_s=0.0,
|
|
sampling_u=0.1 * i,
|
|
body={"model": "m", "messages": [{"role": "user", "content": "x"}]},
|
|
prompt_tokens_hint=8,
|
|
completion_tokens_hint=4,
|
|
)
|
|
for i in range(3)
|
|
]
|
|
|
|
outcomes = [
|
|
RequestOutcome(
|
|
request_id="r0",
|
|
success=False,
|
|
ttft_ms=None,
|
|
tpot_ms=None,
|
|
prompt_tokens=8,
|
|
completion_tokens=4,
|
|
error="request_failed",
|
|
)
|
|
]
|
|
|
|
def fake_run_one_request(*args, **kwargs):
|
|
return outcomes.pop(0)
|
|
|
|
def fake_evaluate(outcome: RequestOutcome):
|
|
return type("Eval", (), {"passed": outcome.success})()
|
|
|
|
with mock.patch("aituner.worker._run_one_request", side_effect=fake_run_one_request):
|
|
replayed, early_stopped, reason = _replay_requests(
|
|
requests,
|
|
base_url="http://127.0.0.1:8000",
|
|
timeout_s=1.0,
|
|
max_concurrency=1,
|
|
target_pass_rate=0.95,
|
|
max_lag_s=None,
|
|
max_elapsed_s=None,
|
|
evaluate_outcome=fake_evaluate,
|
|
)
|
|
self.assertTrue(early_stopped)
|
|
self.assertEqual(reason, "slo_pass_rate_unrecoverable")
|
|
self.assertEqual(len(replayed), 3)
|
|
self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable")
|
|
|
|
def test_openai_url_avoids_double_v1(self) -> None:
|
|
self.assertEqual(
|
|
_openai_url("http://example.com", "/v1/chat/completions"),
|
|
"http://example.com/v1/chat/completions",
|
|
)
|
|
self.assertEqual(
|
|
_openai_url("http://example.com/v1", "/v1/chat/completions"),
|
|
"http://example.com/v1/chat/completions",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|