3169 lines
132 KiB
Python
3169 lines
132 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
from aituner.cli import main as cli_main
|
|
from aituner.compare import load_compare_spec, run_compare
|
|
from aituner.engine import build_launch_recipe
|
|
from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy
|
|
from aituner.job import append_job, build_trial_job
|
|
from aituner.harness import build_harness_context, build_harness_stop_proposal
|
|
from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal
|
|
from aituner.search import ThresholdProbe, binary_search_max_feasible
|
|
from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations
|
|
from aituner.spec import (
|
|
ConfigPatch,
|
|
LLMEndpointSpec,
|
|
Proposal,
|
|
SpecError,
|
|
StudyState,
|
|
TrialSummary,
|
|
load_study_spec,
|
|
)
|
|
from aituner.store import StudyStore
|
|
from aituner.trace import load_trace_requests, summarize_window
|
|
from aituner.worker import (
|
|
_latency_summary,
|
|
_replay_requests,
|
|
_terminate_process_tree,
|
|
_wait_for_server_or_exit,
|
|
)
|
|
from aituner.trace import TraceRequest
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def _write_study_assets(
|
|
tmp_path: Path,
|
|
*,
|
|
trace_overrides: dict[str, object] | None = None,
|
|
slo_overrides: dict[str, object] | None = None,
|
|
engine_overrides: dict[str, object] | None = None,
|
|
) -> Path:
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_w1.jsonl"
|
|
rows = [
|
|
{
|
|
"request_id": "r1",
|
|
"timestamp": 0.0,
|
|
"sampling_u": 0.10,
|
|
"messages": [{"role": "user", "content": "hello"}],
|
|
"input_length": 1000,
|
|
"output_length": 16
|
|
},
|
|
{
|
|
"request_id": "r2",
|
|
"timestamp": 1.0,
|
|
"sampling_u": 0.50,
|
|
"messages": [{"role": "user", "content": "world"}],
|
|
"input_length": 5000,
|
|
"output_length": 32
|
|
},
|
|
{
|
|
"request_id": "r3",
|
|
"timestamp": 2.0,
|
|
"sampling_u": 0.90,
|
|
"messages": [{"role": "user", "content": "!"}],
|
|
"input_length": 20000,
|
|
"output_length": 64
|
|
}
|
|
]
|
|
with trace_path.open("w", encoding="utf-8") as handle:
|
|
for row in rows:
|
|
handle.write(json.dumps(row) + "\n")
|
|
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_payload = {
|
|
"u_field": "sampling_u",
|
|
"windows": [
|
|
{
|
|
"window_id": "chat_w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_w1.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0
|
|
}
|
|
]
|
|
}
|
|
windows_path.write_text(json.dumps(windows_payload), encoding="utf-8")
|
|
|
|
capability_path = tmp_path / "capability.json"
|
|
capability_path.write_text(
|
|
json.dumps({"prefill_service_by_bucket": {"4k": {"tp4_ms": 320, "tp8_ms": 240}}}),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
study_path = tmp_path / "study.json"
|
|
trace_payload: dict[str, object] = {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "chat_w1",
|
|
"u_field": "sampling_u",
|
|
"timestamp_field": "timestamp",
|
|
"max_concurrency": 4,
|
|
}
|
|
if trace_overrides:
|
|
trace_payload.update(trace_overrides)
|
|
|
|
study_payload = {
|
|
"study_id": "study-1",
|
|
"hardware": {"gpu_count": 8, "gpu_model": "H20", "host_candidates": ["dash0"]},
|
|
"model": {
|
|
"model_id": "qwen",
|
|
"served_model_name": "Qwen/Qwen3-30B-A3B-Instruct-2507"
|
|
},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"engine_version": "0.1",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"cwd": str(tmp_path),
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"healthcheck_path": "/v1/models",
|
|
"ready_timeout_s": 30,
|
|
"request_timeout_s": 30,
|
|
"launch_args": ["serve", "/models/qwen"],
|
|
"base_envs": {"BASE_ENV": "1"},
|
|
"base_flags": {"host": "127.0.0.1", "port": 8000},
|
|
"tunable_envs": ["VLLM_ATTENTION_BACKEND"],
|
|
"tunable_flags": ["tensor-parallel-size", "max-num-seqs"],
|
|
"python_executable": "python3"
|
|
},
|
|
"trace": trace_payload,
|
|
"slo": {
|
|
"target_pass_rate": 0.95,
|
|
"ttft_rule": {
|
|
"kind": "step_ms",
|
|
"buckets": [
|
|
{"max_input_tokens": 4096, "threshold_ms": 2000},
|
|
{"max_input_tokens": 16384, "threshold_ms": 5000},
|
|
{"threshold_ms": 9000}
|
|
]
|
|
},
|
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 120}
|
|
},
|
|
"search": {
|
|
"low": 0.0,
|
|
"high": 1.0,
|
|
"tolerance": 0.01,
|
|
"max_probes": 8,
|
|
"sample_seed": 20260325
|
|
},
|
|
"llm": {"system_prompt": "Tune it.", "max_history_trials": 8},
|
|
"capability_profile_path": str(capability_path)
|
|
}
|
|
if slo_overrides:
|
|
study_payload["slo"].update(slo_overrides)
|
|
if engine_overrides:
|
|
study_payload["engine"].update(engine_overrides)
|
|
study_path.write_text(json.dumps(study_payload), encoding="utf-8")
|
|
return study_path
|
|
|
|
|
|
def _write_compare_assets(
|
|
tmp_path: Path,
|
|
*,
|
|
study_path: Path,
|
|
window_ids: list[str] | None = None,
|
|
window_selector: dict[str, object] | None = None,
|
|
baseline: dict[str, object] | None = None,
|
|
tuned: dict[str, object] | None = None,
|
|
) -> Path:
|
|
compare_path = tmp_path / "compare.json"
|
|
payload: dict[str, object] = {
|
|
"compare_id": "compare-1",
|
|
"study_spec_path": str(study_path),
|
|
"baseline": baseline or {"config_patch": {"env_patch": {}, "flag_patch": {}}},
|
|
"tuned": tuned
|
|
or {
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {"tensor-parallel-size": 2},
|
|
}
|
|
},
|
|
}
|
|
if window_ids is not None:
|
|
payload["window_ids"] = window_ids
|
|
if window_selector is not None:
|
|
payload["window_selector"] = window_selector
|
|
compare_path.write_text(json.dumps(payload), encoding="utf-8")
|
|
return compare_path
|
|
|
|
|
|
class CoreFlowTests(unittest.TestCase):
|
|
def test_trace_and_prompt_flow(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
study_root = store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
summary = summarize_window(requests, window)
|
|
self.assertEqual(summary["request_count"], 3)
|
|
self.assertEqual(summary["request_rate"], 0.3)
|
|
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summary,
|
|
state=state,
|
|
capability_profile={"queueing_knee_by_bucket": {"4k": 1000}},
|
|
)
|
|
self.assertIn("allowed_flag_keys", prompt)
|
|
self.assertIn("study-1", prompt)
|
|
self.assertIn('"current_best"', prompt)
|
|
self.assertIn("queueing_knee_by_bucket", prompt)
|
|
self.assertIn("Harnesses:", prompt)
|
|
self.assertIn("workload_lca_profile", prompt)
|
|
self.assertIn("knob_harnesses", prompt)
|
|
self.assertTrue(study_root.exists())
|
|
|
|
def test_harness_uses_latency_failures_before_generic_unrecoverable(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
result_path = tmp_path / "trial-result.json"
|
|
result_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"status": "completed",
|
|
"probes": [
|
|
{
|
|
"threshold": 0.25,
|
|
"feasible": False,
|
|
"payload": {
|
|
"request_count": 100,
|
|
"pass_rate": 0.3,
|
|
"request_rate": 1.0,
|
|
"early_stopped": True,
|
|
"early_stop_reason": "slo_pass_rate_unrecoverable",
|
|
"latency_summary": {
|
|
"failed_reason_counts": {
|
|
"ttft_ms>5000.0": 70,
|
|
"tpot_ms>50.0": 5,
|
|
"probe_elapsed_s>240.0": 100,
|
|
},
|
|
"ttft_ms": {"p95": 6500.0, "p99": 7200.0},
|
|
},
|
|
},
|
|
}
|
|
],
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="completed",
|
|
result_path=str(result_path),
|
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
|
)
|
|
],
|
|
)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary={
|
|
"prompt_tokens_p95": 5000,
|
|
"prompt_tail_ratio_p95_p50": 3.0,
|
|
},
|
|
state=state,
|
|
)
|
|
self.assertEqual(
|
|
context["recent_trial_diagnostics"][0]["active_bottleneck"],
|
|
"ttft_prefill",
|
|
)
|
|
|
|
def test_harness_blocks_repeating_infeasible_plateau_family(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
],
|
|
"topology_constraints": {
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1],
|
|
"allowed_tp_dp_products": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
trial_summaries = []
|
|
for index, (dp, pass_rate, p95) in enumerate(
|
|
[(4, 0.345, 3818.4), (8, 0.345, 3823.4)], start=3
|
|
):
|
|
result_path = tmp_path / f"trial-{index:04d}.json"
|
|
result_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"status": "completed",
|
|
"best_request_rate": None,
|
|
"all_infeasible_diagnostics": {
|
|
"threshold": 0.0078125,
|
|
"request_count": 148,
|
|
"request_rate": 0.22,
|
|
"pass_rate": pass_rate,
|
|
"early_stopped": True,
|
|
"early_stop_reason": "elapsed",
|
|
"latency_summary": {
|
|
"failed_reason_counts": {"ttft_ms>5000.0": 97},
|
|
"ttft_ms": {"p95": p95, "p99": 5800.0},
|
|
},
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
trial_summaries.append(
|
|
TrialSummary(
|
|
trial_id=f"trial-{index:04d}",
|
|
status="completed",
|
|
result_path=str(result_path),
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 1,
|
|
"data-parallel-size": dp,
|
|
"expert-parallel-size": 1,
|
|
},
|
|
},
|
|
)
|
|
)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary={
|
|
"prompt_tokens_p95": 7628,
|
|
"prompt_tail_ratio_p95_p50": 3.83,
|
|
},
|
|
state=StudyState(study_id=study.study_id, trials=trial_summaries),
|
|
)
|
|
guard = context["convergence_guard"]["infeasible_progress"]
|
|
self.assertTrue(guard["plateau_detected"])
|
|
self.assertTrue(guard["stop_if_next_probe_repeats_family"])
|
|
self.assertEqual(guard["blocked_primary_family"], "data-parallel-size")
|
|
self.assertTrue(
|
|
context["convergence_guard"][
|
|
"should_stop_if_no_harness_can_justify_a_new_adjacent_probe"
|
|
]
|
|
)
|
|
self.assertFalse(context["convergence_guard"]["deterministic_stop"])
|
|
self.assertFalse(context["harness_stop"]["should_stop"])
|
|
self.assertIsNone(build_harness_stop_proposal(context))
|
|
|
|
def test_harness_strong_incumbent_guard_after_large_gain(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0002",
|
|
best_request_rate_per_gpu=0.21,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="completed",
|
|
parallel_size=1,
|
|
best_request_rate=0.035,
|
|
best_request_rate_per_gpu=0.035,
|
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="completed",
|
|
parallel_size=2,
|
|
best_request_rate=0.42,
|
|
best_request_rate_per_gpu=0.21,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 1,
|
|
},
|
|
},
|
|
),
|
|
],
|
|
)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary={
|
|
"prompt_tokens_p95": 7628,
|
|
"prompt_tokens_p99": 8102,
|
|
"prompt_tail_ratio_p95_p50": 3.83,
|
|
},
|
|
state=state,
|
|
)
|
|
guard = context["convergence_guard"]["strong_incumbent"]
|
|
self.assertTrue(guard["guard_active"])
|
|
self.assertGreaterEqual(guard["incumbent_gain_vs_baseline"], 3.0)
|
|
self.assertFalse(
|
|
context["convergence_guard"][
|
|
"should_stop_if_no_harness_can_justify_a_new_adjacent_probe"
|
|
]
|
|
)
|
|
self.assertEqual(
|
|
context["convergence_guard"]["reason"],
|
|
"strong_incumbent_requires_validation_probes",
|
|
)
|
|
self.assertIn("validate", guard["recommended_next_action"])
|
|
|
|
def test_harness_stop_after_post_incumbent_validation_is_exhausted(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0002",
|
|
best_parallel_size=8,
|
|
best_sampling_u=0.02,
|
|
best_request_rate=2.4,
|
|
best_request_rate_per_gpu=0.3,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_request_rate=0.8,
|
|
best_request_rate_per_gpu=0.1,
|
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_request_rate=2.4,
|
|
best_request_rate_per_gpu=0.3,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
},
|
|
},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0003",
|
|
status="completed",
|
|
parallel_size=8,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 1,
|
|
"data-parallel-size": 8,
|
|
},
|
|
},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0004",
|
|
status="completed",
|
|
parallel_size=8,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {"max-num-seqs": 160},
|
|
},
|
|
),
|
|
],
|
|
)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary={"prompt_tokens_p95": 2048},
|
|
state=state,
|
|
)
|
|
self.assertTrue(context["harness_stop"]["should_stop"])
|
|
self.assertEqual(context["harness_stop"]["reason"], "post_incumbent_validation_exhausted")
|
|
proposal = build_harness_stop_proposal(context)
|
|
self.assertIsNotNone(proposal)
|
|
self.assertTrue(proposal.should_stop)
|
|
|
|
def test_harness_does_not_stop_immediately_after_strong_incumbent(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0002",
|
|
best_parallel_size=8,
|
|
best_request_rate=2.4,
|
|
best_request_rate_per_gpu=0.3,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_request_rate=0.8,
|
|
best_request_rate_per_gpu=0.1,
|
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_request_rate=2.4,
|
|
best_request_rate_per_gpu=0.3,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
},
|
|
},
|
|
),
|
|
],
|
|
)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary={"prompt_tokens_p95": 2048},
|
|
state=state,
|
|
)
|
|
self.assertFalse(context["harness_stop"]["should_stop"])
|
|
self.assertIsNone(build_harness_stop_proposal(context))
|
|
|
|
def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
trace_overrides={
|
|
"input_length_filter": {
|
|
"min_input_tokens": 0,
|
|
"max_input_tokens": 8192,
|
|
}
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
summary = summarize_window(requests, window)
|
|
self.assertEqual(len(requests), 2)
|
|
self.assertEqual([item.prompt_tokens_hint for item in requests], [1000, 5000])
|
|
self.assertEqual(summary["request_count"], 2)
|
|
self.assertEqual(summary["prompt_tokens_p95"], 5000.0)
|
|
self.assertIn("prefix_cache", summary)
|
|
self.assertIn("arrival_burst_ratio_p95_to_mean", summary)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summary,
|
|
state=StudyState(study_id=study.study_id),
|
|
capability_profile=None,
|
|
)
|
|
self.assertIn('"input_length_filter"', prompt)
|
|
self.assertIn('"max_input_tokens": 8192', prompt)
|
|
|
|
def test_trace_input_length_filter_rejects_invalid_bounds(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
trace_overrides={
|
|
"input_length_filter": {
|
|
"min_input_tokens": 8193,
|
|
"max_input_tokens": 8192,
|
|
}
|
|
},
|
|
)
|
|
with self.assertRaisesRegex(SpecError, "min_input_tokens must be <="):
|
|
load_study_spec(study_path)
|
|
|
|
def test_decode_only_mode_is_loaded_and_prompt_mentions_it(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
trace_overrides={"request_mode": "decode_only"},
|
|
slo_overrides={
|
|
"ttft_rule": None,
|
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
self.assertEqual(study.trace.request_mode, "decode_only")
|
|
self.assertTrue(study.trace.restart_engine_after_early_stop)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=StudyState(study_id=study.study_id),
|
|
capability_profile=None,
|
|
)
|
|
self.assertIn('"request_mode": "decode_only"', prompt)
|
|
self.assertIn('"restart_engine_after_early_stop": true', prompt)
|
|
self.assertIn("There is no TTFT SLO for this study.", prompt)
|
|
self.assertIn("decode-only", prompt)
|
|
|
|
def test_decode_only_restart_after_early_stop_can_be_disabled(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
trace_overrides={
|
|
"request_mode": "decode_only",
|
|
"restart_engine_after_early_stop": False,
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
self.assertFalse(study.trace.restart_engine_after_early_stop)
|
|
|
|
def test_chat_mode_does_not_restart_after_early_stop_by_default(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
self.assertEqual(study.trace.request_mode, "chat")
|
|
self.assertFalse(study.trace.restart_engine_after_early_stop)
|
|
|
|
def test_decode_only_harness_defaults_to_decode_tpot(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
trace_overrides={"request_mode": "decode_only"},
|
|
slo_overrides={
|
|
"ttft_rule": None,
|
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20},
|
|
},
|
|
engine_overrides={
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"max-num-seqs",
|
|
"max-num-batched-tokens",
|
|
],
|
|
"topology_constraints": {
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_tp_dp_products": [8],
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=StudyState(study_id=study.study_id),
|
|
)
|
|
active = {
|
|
harness["knob_family"]
|
|
for harness in context["knob_harnesses"]
|
|
if harness["active_now"]
|
|
}
|
|
self.assertIn("tensor-parallel-size", active)
|
|
self.assertIn("data-parallel-size", active)
|
|
self.assertIn("max-num-seqs", active)
|
|
self.assertIn("max-num-batched-tokens", active)
|
|
self.assertIn(
|
|
"For decode_only studies, ignore TTFT",
|
|
"\n".join(context["proposal_rules"]),
|
|
)
|
|
self.assertIn(
|
|
"config_patch is applied to the study base config",
|
|
"\n".join(context["proposal_rules"]),
|
|
)
|
|
|
|
def test_prompt_can_disable_harness_for_ablation(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
payload = json.loads(study_path.read_text(encoding="utf-8"))
|
|
payload["llm"]["use_harness"] = False
|
|
study_path.write_text(json.dumps(payload), encoding="utf-8")
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=StudyState(study_id=study.study_id),
|
|
capability_profile=None,
|
|
)
|
|
self.assertFalse(study.llm.use_harness)
|
|
self.assertIn("Disabled by llm.use_harness=false", prompt)
|
|
self.assertNotIn('"paper_alignment"', prompt)
|
|
self.assertIn("without harness hints", prompt)
|
|
|
|
def test_harness_uses_prior_infeasible_probe_for_active_bottleneck(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
trace_overrides={"request_mode": "decode_only"},
|
|
slo_overrides={
|
|
"ttft_rule": None,
|
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20},
|
|
},
|
|
engine_overrides={
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"max-num-seqs",
|
|
]
|
|
},
|
|
)
|
|
result_path = tmp_path / "trial-0001-result.json"
|
|
result_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"status": "completed",
|
|
"best_request_rate": 1.0,
|
|
"best_pass_rate": 1.0,
|
|
"probes": [
|
|
{
|
|
"threshold": 0.1,
|
|
"feasible": False,
|
|
"payload": {
|
|
"request_rate": 2.0,
|
|
"pass_rate": 0.1,
|
|
"early_stop_reason": "slo_pass_rate_unrecoverable",
|
|
"latency_summary": {
|
|
"failed_reason_counts": {"tpot_ms>20.0": 20}
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"threshold": 0.01,
|
|
"feasible": True,
|
|
"payload": {
|
|
"request_rate": 1.0,
|
|
"pass_rate": 1.0,
|
|
"early_stop_reason": "probe_elapsed_s>1200.0",
|
|
"latency_summary": {
|
|
"failed_reason_counts": {"probe_elapsed_s>1200.0": 1}
|
|
},
|
|
},
|
|
},
|
|
],
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
context = build_harness_context(
|
|
study=study,
|
|
window_summary={},
|
|
state=StudyState(
|
|
study_id=study.study_id,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="completed",
|
|
result_path=str(result_path),
|
|
)
|
|
],
|
|
),
|
|
)
|
|
diagnostics = context["recent_trial_diagnostics"]
|
|
self.assertEqual(diagnostics[0]["active_bottleneck"], "decode_tpot")
|
|
active = {
|
|
harness["knob_family"]
|
|
for harness in context["knob_harnesses"]
|
|
if harness["active_now"]
|
|
}
|
|
self.assertIn("data-parallel-size", active)
|
|
self.assertIn("max-num-seqs", active)
|
|
|
|
def test_load_study_spec_rejects_mismatched_served_model_name(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"served-model-name": "engine-name",
|
|
}
|
|
},
|
|
)
|
|
payload = json.loads(study_path.read_text(encoding="utf-8"))
|
|
payload["model"]["served_model_name"] = "trace-name"
|
|
study_path.write_text(json.dumps(payload), encoding="utf-8")
|
|
with self.assertRaisesRegex(SpecError, "must match engine.base_flags"):
|
|
load_study_spec(study_path)
|
|
|
|
def test_bailian_endpoint_defaults(self) -> None:
|
|
endpoint = LLMEndpointSpec.from_dict({"provider": "bailian", "model": "qwen-plus"})
|
|
self.assertEqual(endpoint.provider, "bailian")
|
|
self.assertEqual(
|
|
endpoint.base_url, "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
|
)
|
|
self.assertEqual(endpoint.api_key_env, "DASHSCOPE_API_KEY")
|
|
|
|
def test_codex_endpoint_resolves_base_url_from_codex_config(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
codex_dir = tmp_path / ".codex"
|
|
codex_dir.mkdir(parents=True)
|
|
(codex_dir / "config.toml").write_text(
|
|
'\n'.join(
|
|
[
|
|
'model_provider = "ipads"',
|
|
'model_reasoning_effort = "high"',
|
|
"",
|
|
"[model_providers.ipads]",
|
|
'base_url = "http://codex.example/v1"',
|
|
'wire_api = "responses"',
|
|
]
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
with mock.patch.dict(os.environ, {"HOME": str(tmp_path)}, clear=True):
|
|
endpoint = LLMEndpointSpec.from_dict({"provider": "codex", "model": "gpt-5.4"})
|
|
self.assertEqual(endpoint.provider, "codex")
|
|
self.assertEqual(endpoint.base_url, "http://codex.example/v1")
|
|
self.assertEqual(endpoint.wire_api, "responses")
|
|
self.assertFalse(endpoint.stream)
|
|
self.assertEqual(endpoint.reasoning_effort, "high")
|
|
self.assertEqual(endpoint.api_key_env, "OPENAI_API_KEY")
|
|
|
|
def test_codex_stream_forces_chat_completions_wire_api(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
codex_dir = tmp_path / ".codex"
|
|
codex_dir.mkdir(parents=True)
|
|
(codex_dir / "config.toml").write_text(
|
|
'\n'.join(
|
|
[
|
|
'model_provider = "ipads"',
|
|
"",
|
|
"[model_providers.ipads]",
|
|
'base_url = "http://codex.example/v1"',
|
|
'wire_api = "responses"',
|
|
]
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
with mock.patch.dict(os.environ, {"HOME": str(tmp_path)}, clear=True):
|
|
endpoint = LLMEndpointSpec.from_dict(
|
|
{"provider": "codex", "model": "gpt-5.4", "stream": True}
|
|
)
|
|
self.assertTrue(endpoint.stream)
|
|
self.assertEqual(endpoint.wire_api, "chat.completions")
|
|
|
|
def test_endpoint_stream_flag(self) -> None:
|
|
endpoint = LLMEndpointSpec.from_dict(
|
|
{
|
|
"provider": "custom",
|
|
"base_url": "http://example/v1",
|
|
"wire_api": "chat.completions",
|
|
"stream": True,
|
|
"model": "x",
|
|
"api_key_env": "OPENAI_API_KEY",
|
|
}
|
|
)
|
|
self.assertTrue(endpoint.stream)
|
|
|
|
def test_extract_response_text_supports_responses_api_output(self) -> None:
|
|
text = _extract_response_text(
|
|
{
|
|
"output": [
|
|
{
|
|
"type": "message",
|
|
"content": [
|
|
{"type": "output_text", "text": '{"diagnosis":"ok"}'}
|
|
],
|
|
}
|
|
]
|
|
}
|
|
)
|
|
self.assertEqual(text, '{"diagnosis":"ok"}')
|
|
|
|
def test_auth_headers_load_bailian_key_from_dotenv(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
(tmp_path / ".env").write_text('DASHSCOPE_API_KEY="dash-key"\n', encoding="utf-8")
|
|
with mock.patch.dict(os.environ, {}, clear=True):
|
|
with mock.patch("pathlib.Path.cwd", return_value=tmp_path):
|
|
headers = _auth_headers("DASHSCOPE_API_KEY", "bailian")
|
|
self.assertEqual(headers["Authorization"], "Bearer dash-key")
|
|
|
|
def test_auth_headers_load_codex_auth_and_proxy(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
codex_dir = tmp_path / ".codex"
|
|
codex_dir.mkdir(parents=True)
|
|
(codex_dir / "config.toml").write_text(
|
|
'\n'.join(
|
|
[
|
|
"[network]",
|
|
'http_proxy = "http://proxy.example:3128"',
|
|
'https_proxy = "http://proxy.example:3128"',
|
|
]
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
(codex_dir / "auth.json").write_text(
|
|
json.dumps({"OPENAI_API_KEY": "sk-codex-test"}),
|
|
encoding="utf-8",
|
|
)
|
|
with mock.patch.dict(os.environ, {"HOME": str(tmp_path)}, clear=True):
|
|
with mock.patch("pathlib.Path.cwd", return_value=tmp_path):
|
|
headers = _auth_headers("OPENAI_API_KEY", "codex")
|
|
self.assertEqual(os.environ["http_proxy"], "http://proxy.example:3128")
|
|
self.assertEqual(os.environ["HTTP_PROXY"], "http://proxy.example:3128")
|
|
self.assertEqual(headers["Authorization"], "Bearer sk-codex-test")
|
|
|
|
def test_prompt_includes_failed_trial_context(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=StudyState(
|
|
study_id=study.study_id,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="failed",
|
|
diagnosis="flashinfer looked promising",
|
|
config_patch={
|
|
"env_patch": {"VLLM_ATTENTION_BACKEND": "FLASHINFER"},
|
|
"flag_patch": {"tensor-parallel-size": 4},
|
|
},
|
|
failure_reason="engine_process_exited_before_ready exit_code=1",
|
|
)
|
|
],
|
|
),
|
|
capability_profile=None,
|
|
)
|
|
self.assertIn('"status": "failed"', prompt)
|
|
self.assertIn('"failure_reason": "engine_process_exited_before_ready exit_code=1"', prompt)
|
|
self.assertIn('"VLLM_ATTENTION_BACKEND": "FLASHINFER"', prompt)
|
|
self.assertIn("Known launch failures:", prompt)
|
|
|
|
def test_prompt_includes_failure_stage_for_launch_failures(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=StudyState(
|
|
study_id=study.study_id,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="failed",
|
|
diagnosis="bad topology",
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 3,
|
|
"data-parallel-size": 3,
|
|
},
|
|
},
|
|
failure_stage="engine_launch",
|
|
failure_reason="engine_process_exited_before_ready exit_code=1",
|
|
)
|
|
],
|
|
),
|
|
capability_profile=None,
|
|
)
|
|
self.assertIn('"failure_stage": "engine_launch"', prompt)
|
|
self.assertIn('"implicated_flag_keys"', prompt)
|
|
|
|
def test_prompt_prioritizes_parallel_space_when_tp_dp_ep_are_tunable(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": True,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
"tunable_envs": [],
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
"max-num-seqs",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=StudyState(study_id=study.study_id),
|
|
capability_profile=None,
|
|
)
|
|
self.assertIn("Prioritize exploring legal topology changes in parallel space", prompt)
|
|
self.assertIn("Parallel space candidates:", prompt)
|
|
self.assertIn('"tensor_parallel_size": 2', prompt)
|
|
|
|
def test_parse_proposal_text_repairs_truncated_json(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study = load_study_spec(_write_study_assets(tmp_path))
|
|
proposal = parse_proposal_text(
|
|
"""
|
|
{
|
|
"observation": "obs",
|
|
"diagnosis": "diag",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"max-num-seqs": 24
|
|
}
|
|
},
|
|
"expected_effects": [
|
|
"faster batching"
|
|
],
|
|
"why_not_previous_failures": "none"
|
|
""",
|
|
study,
|
|
)
|
|
self.assertEqual(proposal.diagnosis, "diag")
|
|
self.assertEqual(proposal.config_patch.flag_patch["max-num-seqs"], 24)
|
|
|
|
def test_length_only_trace_rows_are_synthesized(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_len_only.jsonl"
|
|
with trace_path.open("w", encoding="utf-8") as handle:
|
|
handle.write(
|
|
json.dumps(
|
|
{
|
|
"timestamp": 0.0,
|
|
"sampling_u": 0.1,
|
|
"input_length": 32,
|
|
"output_length": 16
|
|
}
|
|
)
|
|
+ "\n"
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"windows": [
|
|
{
|
|
"window_id": "w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_len_only.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0
|
|
}
|
|
]
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study_path = tmp_path / "study.json"
|
|
study_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": "study-len-only",
|
|
"hardware": {"gpu_count": 1},
|
|
"model": {
|
|
"model_id": "m1",
|
|
"served_model_name": "dummy-model"
|
|
},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"ready_timeout_s": 10,
|
|
"request_timeout_s": 10,
|
|
"healthcheck_path": "/v1/models",
|
|
"launch_args": [],
|
|
"base_envs": {},
|
|
"base_flags": {},
|
|
"tunable_envs": [],
|
|
"tunable_flags": []
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "w1",
|
|
"max_concurrency": 1,
|
|
"synthetic_prompt_cap_tokens": 8
|
|
},
|
|
"slo": {"target_pass_rate": 0.95},
|
|
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
|
|
"llm": {"system_prompt": "", "max_history_trials": 1}
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
_, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual(len(requests), 1)
|
|
message = requests[0].body["messages"][0]["content"]
|
|
self.assertEqual(message.count("token"), 8)
|
|
self.assertEqual(requests[0].body["min_tokens"], 16)
|
|
self.assertEqual(requests[0].body["max_tokens"], 16)
|
|
|
|
def test_slo_evaluation_step_and_fixed_rules(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
study = load_study_spec(_write_study_assets(Path(tmp)))
|
|
outcomes = [
|
|
RequestOutcome(
|
|
request_id="r1",
|
|
success=True,
|
|
ttft_ms=1000,
|
|
tpot_ms=100,
|
|
prompt_tokens=1000,
|
|
completion_tokens=16,
|
|
),
|
|
RequestOutcome(
|
|
request_id="r2",
|
|
success=True,
|
|
ttft_ms=6000,
|
|
tpot_ms=100,
|
|
prompt_tokens=5000,
|
|
completion_tokens=16,
|
|
),
|
|
]
|
|
evaluations, summary = summarize_evaluations(outcomes, study.slo)
|
|
self.assertTrue(evaluations[0].passed)
|
|
self.assertFalse(evaluations[1].passed)
|
|
self.assertEqual(summary["slo_pass_rate"], 0.5)
|
|
|
|
def test_trace_completion_tokens_override_forces_min_and_max_tokens(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
study_path = _write_study_assets(
|
|
Path(tmp),
|
|
trace_overrides={"completion_tokens_override": 1},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
_, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual(len(requests), 3)
|
|
self.assertEqual(requests[0].completion_tokens_hint, 1)
|
|
self.assertEqual(requests[1].completion_tokens_hint, 1)
|
|
self.assertEqual(requests[2].completion_tokens_hint, 1)
|
|
self.assertEqual(requests[0].body["min_tokens"], 1)
|
|
self.assertEqual(requests[0].body["max_tokens"], 1)
|
|
self.assertEqual(requests[2].body["min_tokens"], 1)
|
|
self.assertEqual(requests[2].body["max_tokens"], 1)
|
|
|
|
def test_build_prompt_mentions_completion_tokens_override(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
study_path = _write_study_assets(
|
|
Path(tmp),
|
|
trace_overrides={"completion_tokens_override": 1},
|
|
slo_overrides={"tpot_rule": None},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(Path(tmp) / ".aituner")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
prompt = build_prompt(
|
|
study=study,
|
|
window_summary=summarize_window(requests, window),
|
|
state=state,
|
|
capability_profile=None,
|
|
)
|
|
self.assertIn('"completion_tokens_override": 1', prompt)
|
|
self.assertIn("min_tokens=max_tokens=1", prompt)
|
|
|
|
def test_slo_evaluation_supports_tpot_only_95_percent_target(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
study = load_study_spec(
|
|
_write_study_assets(
|
|
Path(tmp),
|
|
slo_overrides={
|
|
"ttft_rule": None,
|
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 20},
|
|
},
|
|
)
|
|
)
|
|
outcomes = [
|
|
RequestOutcome(
|
|
request_id="r1",
|
|
success=True,
|
|
ttft_ms=3000,
|
|
tpot_ms=10,
|
|
prompt_tokens=1000,
|
|
completion_tokens=16,
|
|
),
|
|
RequestOutcome(
|
|
request_id="r2",
|
|
success=True,
|
|
ttft_ms=9000,
|
|
tpot_ms=21,
|
|
prompt_tokens=5000,
|
|
completion_tokens=16,
|
|
),
|
|
]
|
|
evaluations, summary = summarize_evaluations(outcomes, study.slo)
|
|
self.assertEqual([item.passed for item in evaluations], [True, False])
|
|
self.assertEqual(summary["slo_pass_rate"], 0.5)
|
|
self.assertFalse(summary["feasible"])
|
|
|
|
def test_build_launch_recipe_serializes_list_flags_once(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
study = load_study_spec(_write_study_assets(Path(tmp)))
|
|
recipe = build_launch_recipe(
|
|
study.engine,
|
|
ConfigPatch(
|
|
flag_patch={
|
|
"cuda-graph-sizes": [1, 2, 4],
|
|
}
|
|
),
|
|
)
|
|
self.assertIn("--cuda-graph-sizes", recipe.argv)
|
|
flag_index = recipe.argv.index("--cuda-graph-sizes")
|
|
self.assertEqual(recipe.argv[flag_index + 1 : flag_index + 4], ["1", "2", "4"])
|
|
self.assertEqual(recipe.argv.count("--cuda-graph-sizes"), 1)
|
|
|
|
def test_prepare_trace_windows_materializes_repo_local_assets(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
legacy_source = tmp_path / "legacy"
|
|
thinking_source = tmp_path / "thinking"
|
|
legacy_source.mkdir()
|
|
thinking_source.mkdir()
|
|
|
|
for filename in [
|
|
"qwen_chat_blksz_64_031109-031111",
|
|
"qwen_chat_blksz_64_031121-031123",
|
|
"qwen_chat_blksz_64_031209-031211",
|
|
"qwen_chat_blksz_64_031221-031223",
|
|
"qwen_chat_blksz_64_031309-031311",
|
|
"qwen_chat_blksz_64_031321-031323",
|
|
"qwen_chat_blksz_64_031409-031411",
|
|
"qwen_chat_blksz_64_031421-031423",
|
|
"qwen_chat_blksz_64_031509-031511",
|
|
"qwen_chat_blksz_64_031521-031523",
|
|
"qwen_chat_blksz_64_031609-031611",
|
|
"qwen_chat_blksz_64_031621-031623",
|
|
"qwen_chat_blksz_64_031709-031711",
|
|
"qwen_chat_blksz_64_031721-031723",
|
|
]:
|
|
for suffix in [".jsonl", "_prompt.jsonl"]:
|
|
path = legacy_source / f"{filename}{suffix}"
|
|
path.write_text("", encoding="utf-8")
|
|
|
|
peak_trace = legacy_source / "qwen_chat_blksz_64_031109-031111.jsonl"
|
|
peak_prompt = legacy_source / "qwen_chat_blksz_64_031109-031111_prompt.jsonl"
|
|
peak_trace.write_text(
|
|
"\n".join(
|
|
[
|
|
json.dumps(
|
|
{
|
|
"chat_id": "c1",
|
|
"turn": 1,
|
|
"timestamp": 3599.0,
|
|
"input_length": 10,
|
|
"output_length": 3,
|
|
}
|
|
),
|
|
json.dumps(
|
|
{
|
|
"chat_id": "c2",
|
|
"turn": 2,
|
|
"timestamp": 3605.0,
|
|
"input_length": 20,
|
|
"output_length": 7,
|
|
}
|
|
),
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
peak_prompt.write_text(
|
|
"\n".join(
|
|
[
|
|
json.dumps({"chat_id": "c1", "turn": 1, "prompt": "ignore me"}),
|
|
json.dumps({"chat_id": "c2", "turn": 2, "prompt": "real prompt"}),
|
|
]
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
output_root = tmp_path / "trace_windows"
|
|
subprocess.run(
|
|
[
|
|
"python3",
|
|
"scripts/prepare_trace_windows.py",
|
|
"--legacy-source",
|
|
str(legacy_source),
|
|
"--thinking-source",
|
|
str(thinking_source),
|
|
"--output-root",
|
|
str(output_root),
|
|
"--workloads",
|
|
"chat",
|
|
"--overwrite",
|
|
],
|
|
check=True,
|
|
cwd=str(REPO_ROOT),
|
|
)
|
|
|
|
windows_payload = json.loads((output_root / "windows.json").read_text(encoding="utf-8"))
|
|
windows = {item["window_id"]: item for item in windows_payload["windows"]}
|
|
self.assertIn("chat_w20260311_1000", windows)
|
|
self.assertEqual(windows["chat_w20260311_1000"]["num_requests"], 1)
|
|
|
|
trace_path = output_root / windows["chat_w20260311_1000"]["trace_file"]
|
|
rows = [json.loads(line) for line in trace_path.read_text(encoding="utf-8").splitlines()]
|
|
self.assertEqual(len(rows), 1)
|
|
self.assertEqual(rows[0]["prompt"], "real prompt")
|
|
self.assertEqual(rows[0]["timestamp"], 5.0)
|
|
self.assertEqual(rows[0]["output_length"], 7)
|
|
self.assertIsInstance(rows[0]["sampling_u"], float)
|
|
|
|
def test_prepare_trace_windows_preserves_existing_files_on_failure(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
legacy_source = tmp_path / "legacy"
|
|
thinking_source = tmp_path / "thinking"
|
|
output_root = tmp_path / "trace_windows"
|
|
traces_dir = output_root / "traces"
|
|
legacy_source.mkdir()
|
|
thinking_source.mkdir()
|
|
traces_dir.mkdir(parents=True)
|
|
|
|
for filename in [
|
|
"qwen_chat_blksz_64_031109-031111",
|
|
"qwen_chat_blksz_64_031121-031123",
|
|
]:
|
|
for suffix in [".jsonl", "_prompt.jsonl"]:
|
|
path = legacy_source / f"{filename}{suffix}"
|
|
path.write_text(
|
|
json.dumps(
|
|
{
|
|
"chat_id": "c1",
|
|
"turn": 1,
|
|
"timestamp": 3605.0,
|
|
"input_length": 20,
|
|
"output_length": 7,
|
|
"prompt": "prompt",
|
|
}
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
sentinel = traces_dir / "chat_w20260311_1000.jsonl"
|
|
sentinel.write_text("sentinel\n", encoding="utf-8")
|
|
|
|
proc = subprocess.run(
|
|
[
|
|
"python3",
|
|
"scripts/prepare_trace_windows.py",
|
|
"--legacy-source",
|
|
str(legacy_source),
|
|
"--thinking-source",
|
|
str(thinking_source),
|
|
"--output-root",
|
|
str(output_root),
|
|
"--workloads",
|
|
"chat",
|
|
"--overwrite",
|
|
],
|
|
cwd=str(REPO_ROOT),
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
self.assertNotEqual(proc.returncode, 0)
|
|
self.assertEqual(sentinel.read_text(encoding="utf-8"), "sentinel\n")
|
|
self.assertEqual(sorted(path.name for path in traces_dir.glob("*.tmp.*")), [])
|
|
|
|
def test_binary_search_max_feasible(self) -> None:
|
|
result = binary_search_max_feasible(
|
|
low=0.0,
|
|
high=1.0,
|
|
tolerance=0.01,
|
|
max_probes=8,
|
|
evaluator=lambda threshold: ThresholdProbe(
|
|
threshold=threshold,
|
|
feasible=threshold <= 0.625,
|
|
payload={"threshold": threshold},
|
|
),
|
|
)
|
|
self.assertLessEqual(result.best_threshold, 0.625)
|
|
self.assertGreaterEqual(result.best_threshold, 0.5)
|
|
self.assertIsNotNone(result.best_feasible_payload)
|
|
|
|
def test_binary_search_continues_below_tolerance_when_all_infeasible(self) -> None:
|
|
seen = []
|
|
|
|
def evaluator(threshold):
|
|
seen.append(threshold)
|
|
return ThresholdProbe(
|
|
threshold=threshold,
|
|
feasible=False,
|
|
payload={"threshold": threshold},
|
|
)
|
|
|
|
result = binary_search_max_feasible(
|
|
low=0.0,
|
|
high=1.0,
|
|
tolerance=0.1,
|
|
max_probes=6,
|
|
evaluator=evaluator,
|
|
)
|
|
self.assertIsNone(result.best_feasible_payload)
|
|
self.assertEqual(len(result.probes), 6)
|
|
self.assertEqual(
|
|
seen,
|
|
[0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625],
|
|
)
|
|
|
|
def test_trace_max_requests_uses_window_wide_downsample(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_many.jsonl"
|
|
with trace_path.open("w", encoding="utf-8") as handle:
|
|
for idx in range(10):
|
|
handle.write(
|
|
json.dumps(
|
|
{
|
|
"request_id": f"r{idx}",
|
|
"timestamp": float(idx),
|
|
"sampling_u": idx / 10.0,
|
|
"messages": [{"role": "user", "content": f"hello-{idx}"}],
|
|
"input_length": 10 + idx,
|
|
"output_length": 5,
|
|
}
|
|
)
|
|
+ "\n"
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"windows": [
|
|
{
|
|
"window_id": "w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_many.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0,
|
|
}
|
|
]
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study_path = tmp_path / "study.json"
|
|
study_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": "study-downsample",
|
|
"hardware": {"gpu_count": 1},
|
|
"model": {"model_id": "m1", "served_model_name": "dummy-model"},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"ready_timeout_s": 10,
|
|
"request_timeout_s": 10,
|
|
"healthcheck_path": "/v1/models",
|
|
"launch_args": [],
|
|
"base_envs": {},
|
|
"base_flags": {},
|
|
"tunable_envs": [],
|
|
"tunable_flags": [],
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "w1",
|
|
"max_concurrency": 1,
|
|
"max_requests_per_probe": 4,
|
|
},
|
|
"slo": {"target_pass_rate": 0.95},
|
|
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
|
|
"llm": {"system_prompt": "", "max_history_trials": 1},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
_, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual([item.row_id for item in requests], ["r0", "r2", "r5", "r7"])
|
|
|
|
def test_trace_replay_time_scale_scales_arrivals_and_window(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_dir.mkdir(parents=True)
|
|
trace_path = trace_dir / "chat_scale.jsonl"
|
|
trace_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"request_id": "r1",
|
|
"timestamp": 10.0,
|
|
"sampling_u": 0.25,
|
|
"messages": [{"role": "user", "content": "hello"}],
|
|
"input_length": 16,
|
|
"output_length": 4,
|
|
}
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"windows": [
|
|
{
|
|
"window_id": "w1",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_scale.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 100.0,
|
|
}
|
|
]
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study_path = tmp_path / "study.json"
|
|
study_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": "study-scale",
|
|
"hardware": {"gpu_count": 1},
|
|
"model": {"model_id": "m1", "served_model_name": "dummy-model"},
|
|
"engine": {
|
|
"engine_name": "vllm",
|
|
"exec_path": "/usr/local/bin/vllm",
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"ready_timeout_s": 10,
|
|
"request_timeout_s": 10,
|
|
"healthcheck_path": "/v1/models",
|
|
"launch_args": [],
|
|
"base_envs": {},
|
|
"base_flags": {},
|
|
"tunable_envs": [],
|
|
"tunable_flags": [],
|
|
},
|
|
"trace": {
|
|
"windows_path": str(windows_path),
|
|
"window_id": "w1",
|
|
"max_concurrency": 1,
|
|
"replay_time_scale": 0.1,
|
|
},
|
|
"slo": {"target_pass_rate": 0.95},
|
|
"search": {"low": 0.0, "high": 1.0, "tolerance": 0.1, "max_probes": 2, "sample_seed": 1},
|
|
"llm": {"system_prompt": "", "max_history_trials": 1},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
study = load_study_spec(study_path)
|
|
window, requests = load_trace_requests(study, study_spec_path=study_path)
|
|
self.assertEqual(window.window_end, 10.0)
|
|
self.assertEqual(requests[0].arrival_s, 1.0)
|
|
|
|
def test_proposal_validation_and_job_emission(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
|
|
proposal_text = json.dumps(
|
|
{
|
|
"observation": "Current TTFT fails before TPOT.",
|
|
"diagnosis": "Prefill pressure dominates.",
|
|
"config_patch": {
|
|
"env_patch": {"VLLM_ATTENTION_BACKEND": "FLASHINFER"},
|
|
"flag_patch": {"tensor-parallel-size": 4, "max-num-seqs": 64}
|
|
},
|
|
"expected_effects": ["lower TTFT", "raise feasible sampling_u"],
|
|
"why_not_previous_failures": "Avoids changing unsupported envs."
|
|
}
|
|
)
|
|
proposal = parse_proposal_text(proposal_text, study)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
|
|
job = build_trial_job(study=study, trial=trial, repo_root=tmp_path)
|
|
jobs_path = tmp_path / "jobs.toml"
|
|
append_job(jobs_path, job)
|
|
rendered = jobs_path.read_text(encoding="utf-8")
|
|
self.assertIn('name = "study-1-trial-0001"', rendered)
|
|
self.assertIn('command = "python3 -m aituner.cli worker run-trial', rendered)
|
|
self.assertIn('PYTHONPATH = "src"', rendered)
|
|
|
|
def test_ingest_trial_results_updates_best(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["raise rate"]
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
Path(trial.result_path).write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": study.study_id,
|
|
"trial_id": trial.trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": 0.75,
|
|
"best_request_rate": 12.5,
|
|
"best_pass_rate": 0.97
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
next_state = store.ingest_trial_results(study.study_id)
|
|
self.assertEqual(next_state.best_trial_id, trial.trial_id)
|
|
self.assertEqual(next_state.best_sampling_u, 0.75)
|
|
self.assertEqual(next_state.best_request_rate, 12.5)
|
|
self.assertEqual(next_state.best_parallel_size, 4)
|
|
self.assertEqual(next_state.best_request_rate_per_gpu, 3.125)
|
|
self.assertEqual(
|
|
next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"],
|
|
3.125,
|
|
)
|
|
|
|
def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0001",
|
|
best_parallel_size=4,
|
|
best_sampling_u=0.375,
|
|
best_request_rate=3.0,
|
|
best_request_rate_per_gpu=0.75,
|
|
next_trial_index=2,
|
|
best_by_parallel_size={
|
|
"4": {
|
|
"trial_id": "trial-0001",
|
|
"parallel_size": 4,
|
|
"best_sampling_u": 0.375,
|
|
"best_request_rate": 3.0,
|
|
"best_request_rate_per_gpu": 0.75,
|
|
}
|
|
},
|
|
trials=[],
|
|
)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["raise rate"],
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
self.assertEqual(trial.search.low, 0.375)
|
|
self.assertEqual(trial.search.high, 1.0)
|
|
|
|
def test_materialize_trial_uses_same_parallel_group_incumbent(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0001",
|
|
best_parallel_size=4,
|
|
best_sampling_u=0.375,
|
|
best_request_rate=3.0,
|
|
best_request_rate_per_gpu=0.75,
|
|
next_trial_index=2,
|
|
best_by_parallel_size={
|
|
"2": {
|
|
"trial_id": "trial-0000",
|
|
"parallel_size": 2,
|
|
"best_sampling_u": 0.125,
|
|
"best_request_rate": 0.8,
|
|
"best_request_rate_per_gpu": 0.4,
|
|
}
|
|
},
|
|
trials=[],
|
|
)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}},
|
|
"expected_effects": ["raise rate"],
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
self.assertEqual(trial.search.low, 0.125)
|
|
|
|
def test_materialize_trial_resets_search_floor_for_new_parallel_group(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0001",
|
|
best_parallel_size=4,
|
|
best_sampling_u=0.4,
|
|
best_request_rate=3.0,
|
|
best_request_rate_per_gpu=0.75,
|
|
next_trial_index=2,
|
|
trials=[],
|
|
)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}},
|
|
"expected_effects": ["raise rate"],
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
self.assertEqual(trial.search.low, study.search.low)
|
|
|
|
def test_materialize_trial_inherits_incumbent_topology_for_runtime_patch(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": True,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
"max-num-seqs",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0002",
|
|
best_parallel_size=8,
|
|
best_sampling_u=0.125,
|
|
best_request_rate=3.0,
|
|
best_request_rate_per_gpu=0.375,
|
|
next_trial_index=3,
|
|
best_by_parallel_size={
|
|
"8": {
|
|
"trial_id": "trial-0002",
|
|
"parallel_size": 8,
|
|
"best_sampling_u": 0.125,
|
|
"best_request_rate": 3.0,
|
|
"best_request_rate_per_gpu": 0.375,
|
|
}
|
|
},
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_sampling_u=0.125,
|
|
best_request_rate=3.0,
|
|
best_request_rate_per_gpu=0.375,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
},
|
|
)
|
|
],
|
|
)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Validate runtime headroom around the incumbent.",
|
|
"diagnosis": "Try lower concurrency on the current best topology.",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 160}},
|
|
"expected_effects": ["validate incumbent runtime headroom"],
|
|
}
|
|
)
|
|
|
|
trial, next_state = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
|
|
self.assertEqual(
|
|
trial.config_patch.flag_patch,
|
|
{
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
"max-num-seqs": 160,
|
|
},
|
|
)
|
|
self.assertEqual(trial.search.low, 0.125)
|
|
self.assertEqual(
|
|
next_state.trials[-1].config_patch["flag_patch"],
|
|
{
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
"max-num-seqs": 160,
|
|
},
|
|
)
|
|
|
|
def test_materialize_trial_keeps_explicit_topology_runtime_patch(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": True,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
"max-num-seqs",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0002",
|
|
next_trial_index=3,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="completed",
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
},
|
|
},
|
|
)
|
|
],
|
|
)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Validate base topology runtime.",
|
|
"diagnosis": "Explicitly keep base topology and adjust concurrency.",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"max-num-seqs": 160,
|
|
},
|
|
},
|
|
"expected_effects": ["test base topology runtime headroom"],
|
|
}
|
|
)
|
|
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
|
|
self.assertEqual(
|
|
trial.config_patch.flag_patch,
|
|
{
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"max-num-seqs": 160,
|
|
},
|
|
)
|
|
|
|
def test_ingest_trial_results_records_failure_reason(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["raise rate"]
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
Path(trial.result_path).write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": study.study_id,
|
|
"trial_id": trial.trial_id,
|
|
"status": "failed",
|
|
"failure_reason": "engine_process_exited_before_ready exit_code=1",
|
|
"probes": []
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
next_state = store.ingest_trial_results(study.study_id)
|
|
self.assertEqual(next_state.trials[0].status, "failed")
|
|
self.assertEqual(
|
|
next_state.trials[0].failure_reason,
|
|
"engine_process_exited_before_ready exit_code=1",
|
|
)
|
|
self.assertEqual(next_state.trials[0].failure_stage, "")
|
|
|
|
def test_ingest_trial_results_records_failure_stage(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["raise rate"]
|
|
}
|
|
)
|
|
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
|
|
Path(trial.result_path).write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": study.study_id,
|
|
"trial_id": trial.trial_id,
|
|
"status": "failed",
|
|
"failure_stage": "engine_launch",
|
|
"failure_reason": "engine_process_exited_before_ready exit_code=1",
|
|
"probes": []
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
next_state = store.ingest_trial_results(study.study_id)
|
|
self.assertEqual(next_state.trials[0].failure_stage, "engine_launch")
|
|
|
|
def test_ingest_trial_results_prefers_higher_request_rate_per_gpu(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store = StudyStore(tmp_path / ".aituner" / "studies")
|
|
store.init_study(spec_path=study_path, study=study)
|
|
state = store.load_state(study.study_id)
|
|
proposal_a = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["raise rate"],
|
|
}
|
|
)
|
|
trial_a, state = store.materialize_trial(study=study, state=state, proposal=proposal_a)
|
|
Path(trial_a.result_path).write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": study.study_id,
|
|
"trial_id": trial_a.trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": 0.5,
|
|
"best_request_rate": 4.0,
|
|
"best_pass_rate": 0.97,
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
proposal_b = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 2}},
|
|
"expected_effects": ["raise rate"],
|
|
}
|
|
)
|
|
trial_b, _ = store.materialize_trial(study=study, state=state, proposal=proposal_b)
|
|
Path(trial_b.result_path).write_text(
|
|
json.dumps(
|
|
{
|
|
"study_id": study.study_id,
|
|
"trial_id": trial_b.trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": 0.4,
|
|
"best_request_rate": 3.0,
|
|
"best_pass_rate": 0.97,
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
next_state = store.ingest_trial_results(study.study_id)
|
|
self.assertEqual(next_state.best_trial_id, trial_b.trial_id)
|
|
self.assertEqual(next_state.best_parallel_size, 2)
|
|
self.assertEqual(next_state.best_request_rate, 3.0)
|
|
self.assertEqual(next_state.best_request_rate_per_gpu, 1.5)
|
|
self.assertEqual(next_state.best_by_parallel_size["4"]["best_request_rate_per_gpu"], 1.0)
|
|
self.assertEqual(next_state.best_by_parallel_size["2"]["best_request_rate_per_gpu"], 1.5)
|
|
|
|
def test_validate_proposal_rejects_invalid_tp_dp_product(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": True,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Bad topology",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 4,
|
|
},
|
|
},
|
|
"expected_effects": ["raise throughput"],
|
|
}
|
|
)
|
|
with self.assertRaisesRegex(SpecError, "must equal hardware.gpu_count"):
|
|
validate_proposal(proposal, study)
|
|
|
|
def test_validate_proposal_rejects_invalid_ep_divisibility(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": True,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Bad EP",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"expert-parallel-size": 3,
|
|
},
|
|
},
|
|
"expected_effects": ["raise throughput"],
|
|
}
|
|
)
|
|
with self.assertRaisesRegex(SpecError, "expert-parallel-size=3"):
|
|
validate_proposal(proposal, study)
|
|
|
|
def test_validate_proposal_accepts_valid_tp_dp_ep_combo(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": True,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 8,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": True,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1, 2, 4, 8],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Valid topology",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
"expert-parallel-size": 4,
|
|
},
|
|
},
|
|
"expected_effects": ["raise throughput"],
|
|
}
|
|
)
|
|
validated = validate_proposal(proposal, study)
|
|
self.assertEqual(validated.config_patch.flag_patch["tensor-parallel-size"], 2)
|
|
|
|
def test_validate_proposal_accepts_allowed_tp_dp_product_above_gpu_count(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": False,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 1,
|
|
"expert-parallel-size": 1,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": False,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tp_dp_products": [1, 2, 4, 8],
|
|
"allowed_tensor_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Allow product 8",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 1,
|
|
},
|
|
},
|
|
"expected_effects": ["explore larger topology"],
|
|
}
|
|
)
|
|
validated = validate_proposal(proposal, study)
|
|
self.assertEqual(validated.config_patch.flag_patch["data-parallel-size"], 2)
|
|
|
|
def test_validate_proposal_rejects_tp_dp_product_outside_allowed_set(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(
|
|
tmp_path,
|
|
engine_overrides={
|
|
"base_flags": {
|
|
"host": "127.0.0.1",
|
|
"port": 8000,
|
|
"enable-expert-parallel": False,
|
|
"tensor-parallel-size": 4,
|
|
"data-parallel-size": 1,
|
|
"expert-parallel-size": 1,
|
|
},
|
|
"tunable_flags": [
|
|
"tensor-parallel-size",
|
|
"data-parallel-size",
|
|
"expert-parallel-size",
|
|
],
|
|
"topology_constraints": {
|
|
"require_tp_dp_product_equals_gpu_count": False,
|
|
"require_ep_size_leq_tp_dp_product": True,
|
|
"require_ep_size_divides_tp_dp_product": True,
|
|
"allowed_tp_dp_products": [1, 2, 4, 8],
|
|
"allowed_tensor_parallel_sizes": [1, 2, 3, 4, 8],
|
|
"allowed_data_parallel_sizes": [1, 2, 3, 4, 8],
|
|
"allowed_expert_parallel_sizes": [1],
|
|
},
|
|
},
|
|
)
|
|
study = load_study_spec(study_path)
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "Obs",
|
|
"diagnosis": "Invalid product",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 3,
|
|
"data-parallel-size": 2,
|
|
"expert-parallel-size": 1,
|
|
},
|
|
},
|
|
"expected_effects": ["explore invalid topology"],
|
|
}
|
|
)
|
|
with self.assertRaisesRegex(SpecError, "not in \\[1, 2, 4, 8\\]"):
|
|
validate_proposal(proposal, study)
|
|
|
|
def test_cli_tune_runs_multiple_manual_proposals(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
proposal1 = tmp_path / "proposal-1.json"
|
|
proposal2 = tmp_path / "proposal-2.json"
|
|
proposal1.write_text(
|
|
json.dumps(
|
|
{
|
|
"observation": "trial one",
|
|
"diagnosis": "conservative",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"tensor-parallel-size": 4}},
|
|
"expected_effects": ["stable"],
|
|
"why_not_previous_failures": "",
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
proposal2.write_text(
|
|
json.dumps(
|
|
{
|
|
"observation": "trial two",
|
|
"diagnosis": "more batching",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}},
|
|
"expected_effects": ["higher throughput"],
|
|
"why_not_previous_failures": "",
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
store_root = tmp_path / "store"
|
|
|
|
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
|
|
payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
|
|
trial_id = str(payload["trial_id"])
|
|
trial_root = Path(payload["artifact_dir"])
|
|
if trial_id.endswith("0001"):
|
|
best_rate = 1.0
|
|
best_u = 0.5
|
|
else:
|
|
best_rate = 2.0
|
|
best_u = 0.75
|
|
result = {
|
|
"study_id": payload["study_id"],
|
|
"trial_id": trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": best_u,
|
|
"best_request_rate": best_rate,
|
|
"best_pass_rate": 1.0,
|
|
"best_request_count": 2,
|
|
"probes": [],
|
|
}
|
|
(trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8")
|
|
return result
|
|
|
|
with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial):
|
|
exit_code = cli_main(
|
|
[
|
|
"study",
|
|
"tune",
|
|
"--spec",
|
|
str(study_path),
|
|
"--store-root",
|
|
str(store_root),
|
|
"--proposal-file",
|
|
str(proposal1),
|
|
"--proposal-file",
|
|
str(proposal2),
|
|
]
|
|
)
|
|
self.assertEqual(exit_code, 0)
|
|
store = StudyStore(store_root)
|
|
state = store.load_state("study-1")
|
|
self.assertEqual(state.best_trial_id, "trial-0002")
|
|
self.assertEqual(state.best_sampling_u, 0.75)
|
|
self.assertEqual(state.best_request_rate, 2.0)
|
|
self.assertEqual(state.next_trial_index, 3)
|
|
|
|
def test_cli_tune_honors_should_stop_proposal(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
proposal_path = tmp_path / "stop.json"
|
|
proposal_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"observation": "incumbent converged",
|
|
"diagnosis": "no adjacent harness probe is justified",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
|
"expected_effects": ["stop without spending another GPU trial"],
|
|
"why_not_previous_failures": "not applicable",
|
|
"should_stop": True,
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
store_root = tmp_path / "store"
|
|
with mock.patch("aituner.cli.run_trial") as run_trial_mock:
|
|
exit_code = cli_main(
|
|
[
|
|
"study",
|
|
"tune",
|
|
"--spec",
|
|
str(study_path),
|
|
"--store-root",
|
|
str(store_root),
|
|
"--proposal-file",
|
|
str(proposal_path),
|
|
]
|
|
)
|
|
self.assertEqual(exit_code, 0)
|
|
run_trial_mock.assert_not_called()
|
|
store = StudyStore(store_root)
|
|
state = store.load_state("study-1")
|
|
self.assertEqual(state.next_trial_index, 1)
|
|
|
|
def test_cli_tune_uses_harness_stop_before_llm(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
store_root = tmp_path / "store"
|
|
store = StudyStore(store_root)
|
|
store.init_study(spec_path=study_path, study=study)
|
|
store.save_state(
|
|
StudyState(
|
|
study_id=study.study_id,
|
|
best_trial_id="trial-0002",
|
|
best_parallel_size=8,
|
|
best_sampling_u=0.02,
|
|
best_request_rate=2.4,
|
|
best_request_rate_per_gpu=0.3,
|
|
next_trial_index=5,
|
|
trials=[
|
|
TrialSummary(
|
|
trial_id="trial-0001",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_request_rate=0.8,
|
|
best_request_rate_per_gpu=0.1,
|
|
config_patch={"env_patch": {}, "flag_patch": {}},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0002",
|
|
status="completed",
|
|
parallel_size=8,
|
|
best_request_rate=2.4,
|
|
best_request_rate_per_gpu=0.3,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 2,
|
|
"data-parallel-size": 4,
|
|
},
|
|
},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0003",
|
|
status="completed",
|
|
parallel_size=8,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {
|
|
"tensor-parallel-size": 1,
|
|
"data-parallel-size": 8,
|
|
},
|
|
},
|
|
),
|
|
TrialSummary(
|
|
trial_id="trial-0004",
|
|
status="completed",
|
|
parallel_size=8,
|
|
config_patch={
|
|
"env_patch": {},
|
|
"flag_patch": {"max-num-seqs": 160},
|
|
},
|
|
),
|
|
],
|
|
)
|
|
)
|
|
|
|
with mock.patch("aituner.cli.call_llm_for_proposal") as llm_mock:
|
|
with mock.patch("aituner.cli.run_trial") as run_trial_mock:
|
|
exit_code = cli_main(
|
|
[
|
|
"study",
|
|
"tune",
|
|
"--spec",
|
|
str(study_path),
|
|
"--store-root",
|
|
str(store_root),
|
|
"--max-trials",
|
|
"1",
|
|
]
|
|
)
|
|
|
|
self.assertEqual(exit_code, 0)
|
|
llm_mock.assert_not_called()
|
|
run_trial_mock.assert_not_called()
|
|
proposal_path = (
|
|
store.study_root(study.study_id)
|
|
/ "proposals"
|
|
/ "harness-stop-0005.json"
|
|
)
|
|
self.assertTrue(proposal_path.exists())
|
|
proposal = json.loads(proposal_path.read_text(encoding="utf-8"))
|
|
self.assertTrue(proposal["should_stop"])
|
|
|
|
def test_cli_tune_evaluates_baseline_before_llm_proposal(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
payload = json.loads(study_path.read_text(encoding="utf-8"))
|
|
payload["llm"]["endpoint"] = {
|
|
"provider": "custom",
|
|
"base_url": "http://llm.example/v1",
|
|
"wire_api": "chat.completions",
|
|
"model": "test-model",
|
|
"api_key_env": "OPENAI_API_KEY",
|
|
}
|
|
study_path.write_text(json.dumps(payload), encoding="utf-8")
|
|
store_root = tmp_path / "store"
|
|
|
|
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
|
|
payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
|
|
trial_root = Path(payload["artifact_dir"])
|
|
result = {
|
|
"study_id": payload["study_id"],
|
|
"trial_id": payload["trial_id"],
|
|
"status": "completed",
|
|
"best_sampling_u": 0.25,
|
|
"best_request_rate": 1.0,
|
|
"best_pass_rate": 1.0,
|
|
"best_request_count": 2,
|
|
"probes": [],
|
|
}
|
|
(trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8")
|
|
return result
|
|
|
|
llm_payload = json.dumps(
|
|
{
|
|
"observation": "baseline done",
|
|
"diagnosis": "try more batching",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 64}},
|
|
"expected_effects": ["higher throughput"],
|
|
"why_not_previous_failures": "",
|
|
"should_stop": False,
|
|
}
|
|
)
|
|
with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial):
|
|
with mock.patch("aituner.cli.call_llm_for_proposal", return_value=llm_payload):
|
|
exit_code = cli_main(
|
|
[
|
|
"study",
|
|
"tune",
|
|
"--spec",
|
|
str(study_path),
|
|
"--store-root",
|
|
str(store_root),
|
|
"--max-trials",
|
|
"2",
|
|
]
|
|
)
|
|
self.assertEqual(exit_code, 0)
|
|
store = StudyStore(store_root)
|
|
state = store.load_state("study-1")
|
|
self.assertEqual(state.next_trial_index, 3)
|
|
self.assertEqual(state.trials[0].config_patch, {"env_patch": {}, "flag_patch": {}})
|
|
self.assertEqual(state.trials[1].config_patch["flag_patch"], {"max-num-seqs": 64})
|
|
|
|
def test_load_compare_spec_requires_window_selection(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
compare_path = tmp_path / "compare.json"
|
|
compare_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"compare_id": "compare-1",
|
|
"study_spec_path": str(study_path),
|
|
"baseline": {"config_patch": {"env_patch": {}, "flag_patch": {}}},
|
|
"tuned": {"config_patch": {"env_patch": {}, "flag_patch": {}}},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
with self.assertRaisesRegex(SpecError, "window_ids or window_selector"):
|
|
load_compare_spec(compare_path)
|
|
|
|
def test_run_compare_outputs_summary_and_report(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
trace_path = trace_dir / "chat_w2.jsonl"
|
|
trace_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"request_id": "r4",
|
|
"timestamp": 0.0,
|
|
"sampling_u": 0.2,
|
|
"messages": [{"role": "user", "content": "extra"}],
|
|
"input_length": 3000,
|
|
"output_length": 32,
|
|
}
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_payload = json.loads(windows_path.read_text(encoding="utf-8"))
|
|
windows_payload["windows"].append(
|
|
{
|
|
"window_id": "chat_w2",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_w2.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0,
|
|
"date": "2026-03-12",
|
|
"slot_token": "1000",
|
|
"slot_label": "10:00-10:10",
|
|
}
|
|
)
|
|
windows_payload["windows"][0]["date"] = "2026-03-11"
|
|
windows_payload["windows"][0]["slot_token"] = "1000"
|
|
windows_payload["windows"][0]["slot_label"] = "10:00-10:10"
|
|
windows_path.write_text(json.dumps(windows_payload), encoding="utf-8")
|
|
compare_path = _write_compare_assets(
|
|
tmp_path,
|
|
study_path=study_path,
|
|
window_ids=["chat_w1", "chat_w2"],
|
|
)
|
|
|
|
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
|
|
trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
|
|
source_path = Path(trial_payload["study_spec_path"])
|
|
actual_spec_path = Path(source_path.read_text(encoding="utf-8").strip())
|
|
study_payload = json.loads(actual_spec_path.read_text(encoding="utf-8"))
|
|
window_id = study_payload["trace"]["window_id"]
|
|
trial_id = trial_payload["trial_id"]
|
|
rate_map = {
|
|
("chat_w1", "baseline"): 1.0,
|
|
("chat_w1", "tuned"): 3.0,
|
|
("chat_w2", "baseline"): 3.0,
|
|
("chat_w2", "tuned"): 7.0,
|
|
}
|
|
best_rate = rate_map[(window_id, trial_id)]
|
|
result = {
|
|
"study_id": trial_payload["study_id"],
|
|
"trial_id": trial_id,
|
|
"status": "completed",
|
|
"best_sampling_u": 0.5,
|
|
"best_request_rate": best_rate,
|
|
"best_pass_rate": 1.0,
|
|
"best_request_count": 2,
|
|
"probes": [],
|
|
}
|
|
Path(trial_payload["result_path"]).write_text(
|
|
json.dumps(result),
|
|
encoding="utf-8",
|
|
)
|
|
return result
|
|
|
|
with mock.patch("aituner.compare.run_trial", side_effect=fake_run_trial):
|
|
summary = run_compare(compare_path, output_root=tmp_path / ".compare")
|
|
self.assertEqual(len(summary["windows"]), 2)
|
|
self.assertEqual(summary["aggregate"]["wins"]["tuned"], 2)
|
|
self.assertTrue((tmp_path / ".compare" / "summary.json").exists())
|
|
self.assertTrue((tmp_path / ".compare" / "report.md").exists())
|
|
|
|
def test_run_compare_resolves_trial_ref_candidate(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
prior_root = tmp_path / "prior-study"
|
|
trial_dir = prior_root / "trials" / "trial-0002"
|
|
trial_dir.mkdir(parents=True)
|
|
trial_spec = {
|
|
"study_id": "prior-study",
|
|
"trial_id": "trial-0002",
|
|
"config_patch": {
|
|
"env_patch": {},
|
|
"flag_patch": {"data-parallel-size": 2},
|
|
},
|
|
"search": {
|
|
"low": 0.0,
|
|
"high": 1.0,
|
|
"tolerance": 0.01,
|
|
"max_probes": 8,
|
|
"sample_seed": 20260325,
|
|
},
|
|
"study_spec_path": str(study_path),
|
|
"artifact_dir": str(trial_dir),
|
|
"probe_log_path": str(trial_dir / "probe_history.json"),
|
|
"engine_log_path": str(trial_dir / "engine.log"),
|
|
"result_path": str(trial_dir / "result.json"),
|
|
}
|
|
(trial_dir / "trial_spec.json").write_text(json.dumps(trial_spec), encoding="utf-8")
|
|
compare_path = _write_compare_assets(
|
|
tmp_path,
|
|
study_path=study_path,
|
|
window_ids=["chat_w1"],
|
|
baseline={
|
|
"trial_ref": {
|
|
"study_root": str(prior_root),
|
|
"trial_id": "trial-0002",
|
|
}
|
|
},
|
|
)
|
|
|
|
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
|
|
trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
|
|
flags = (trial_payload["config_patch"] or {}).get("flag_patch") or {}
|
|
best_rate = 5.0 if flags.get("data-parallel-size") == 2 else 2.0
|
|
result = {
|
|
"study_id": trial_payload["study_id"],
|
|
"trial_id": trial_payload["trial_id"],
|
|
"status": "completed",
|
|
"best_sampling_u": 0.5,
|
|
"best_request_rate": best_rate,
|
|
"best_pass_rate": 1.0,
|
|
"best_request_count": 2,
|
|
"probes": [],
|
|
}
|
|
Path(trial_payload["result_path"]).write_text(json.dumps(result), encoding="utf-8")
|
|
return result
|
|
|
|
with mock.patch("aituner.compare.run_trial", side_effect=fake_run_trial):
|
|
summary = run_compare(compare_path, output_root=tmp_path / ".compare")
|
|
self.assertEqual(summary["baseline_source"]["kind"], "trial_ref")
|
|
self.assertEqual(
|
|
summary["windows"][0]["baseline"]["config_patch"]["flag_patch"]["data-parallel-size"],
|
|
2,
|
|
)
|
|
|
|
def test_run_compare_window_selector_filters_windows(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
trace_dir = tmp_path / "trace_windows" / "traces"
|
|
for name in ("chat_w2.jsonl", "thinking_w3.jsonl"):
|
|
(trace_dir / name).write_text(
|
|
json.dumps(
|
|
{
|
|
"request_id": name,
|
|
"timestamp": 0.0,
|
|
"sampling_u": 0.2,
|
|
"messages": [{"role": "user", "content": name}],
|
|
"input_length": 3000,
|
|
"output_length": 32,
|
|
}
|
|
)
|
|
+ "\n",
|
|
encoding="utf-8",
|
|
)
|
|
windows_path = tmp_path / "trace_windows" / "windows.json"
|
|
windows_payload = json.loads(windows_path.read_text(encoding="utf-8"))
|
|
windows_payload["windows"][0]["date"] = "2026-03-11"
|
|
windows_payload["windows"][0]["slot_token"] = "1000"
|
|
windows_payload["windows"].append(
|
|
{
|
|
"window_id": "chat_w2",
|
|
"trace_type": "chat",
|
|
"trace_file": "traces/chat_w2.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0,
|
|
"date": "2026-03-12",
|
|
"slot_token": "1000",
|
|
}
|
|
)
|
|
windows_payload["windows"].append(
|
|
{
|
|
"window_id": "thinking_w3",
|
|
"trace_type": "thinking",
|
|
"trace_file": "traces/thinking_w3.jsonl",
|
|
"window_start": 0.0,
|
|
"window_end": 10.0,
|
|
"date": "2026-03-12",
|
|
"slot_token": "1000",
|
|
}
|
|
)
|
|
windows_path.write_text(json.dumps(windows_payload), encoding="utf-8")
|
|
compare_path = _write_compare_assets(
|
|
tmp_path,
|
|
study_path=study_path,
|
|
window_selector={"trace_type": "chat", "date_prefix": "2026-03-12"},
|
|
)
|
|
|
|
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
|
|
trial_payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
|
|
result = {
|
|
"study_id": trial_payload["study_id"],
|
|
"trial_id": trial_payload["trial_id"],
|
|
"status": "completed",
|
|
"best_sampling_u": 0.5,
|
|
"best_request_rate": 1.0,
|
|
"best_pass_rate": 1.0,
|
|
"best_request_count": 2,
|
|
"probes": [],
|
|
}
|
|
Path(trial_payload["result_path"]).write_text(json.dumps(result), encoding="utf-8")
|
|
return result
|
|
|
|
with mock.patch("aituner.compare.run_trial", side_effect=fake_run_trial):
|
|
summary = run_compare(compare_path, output_root=tmp_path / ".compare")
|
|
self.assertEqual([row["window_id"] for row in summary["windows"]], ["chat_w2"])
|
|
|
|
def test_proposal_expected_effects_accepts_string(self) -> None:
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "obs",
|
|
"diagnosis": "diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
|
"expected_effects": "higher throughput",
|
|
}
|
|
)
|
|
self.assertEqual(proposal.expected_effects, ["higher throughput"])
|
|
|
|
def test_proposal_expected_effects_accepts_object(self) -> None:
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "obs",
|
|
"diagnosis": "diag",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
|
"expected_effects": {
|
|
"throughput": "higher",
|
|
"ttft": "lower",
|
|
},
|
|
}
|
|
)
|
|
self.assertEqual(
|
|
proposal.expected_effects,
|
|
["throughput: higher", "ttft: lower"],
|
|
)
|
|
|
|
def test_proposal_observation_accepts_object(self) -> None:
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": {
|
|
"incumbent_trial": "trial-0002",
|
|
"boundary_signal": "tpot cliff",
|
|
},
|
|
"diagnosis": "validate incumbent",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {"max-num-seqs": 160}},
|
|
"expected_effects": ["more TPOT headroom"],
|
|
}
|
|
)
|
|
self.assertIn('"incumbent_trial": "trial-0002"', proposal.observation)
|
|
self.assertEqual(proposal.diagnosis, "validate incumbent")
|
|
|
|
def test_proposal_accepts_should_stop(self) -> None:
|
|
proposal = Proposal.from_dict(
|
|
{
|
|
"observation": "obs",
|
|
"diagnosis": "converged",
|
|
"config_patch": {"env_patch": {}, "flag_patch": {}},
|
|
"expected_effects": ["avoid wasting another GPU trial"],
|
|
"should_stop": True,
|
|
}
|
|
)
|
|
self.assertTrue(proposal.should_stop)
|
|
|
|
def test_parse_proposal_text_accepts_wrapped_json(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
study_path = _write_study_assets(tmp_path)
|
|
study = load_study_spec(study_path)
|
|
proposal = parse_proposal_text(
|
|
"""Here is the proposal:
|
|
```json
|
|
{"observation":"obs","diagnosis":"diag","config_patch":{"env_patch":{},"flag_patch":{"max-num-seqs":32}},"expected_effects":["higher throughput"],"why_not_previous_failures":"keeps supported knobs"}
|
|
```""",
|
|
study,
|
|
)
|
|
self.assertEqual(proposal.config_patch.flag_patch["max-num-seqs"], 32)
|
|
|
|
def test_replay_requests_early_stops_when_slo_is_unrecoverable(self) -> None:
|
|
requests = [
|
|
TraceRequest(
|
|
row_id=f"r{i}",
|
|
arrival_s=0.0,
|
|
sampling_u=0.1 * i,
|
|
body={"model": "m", "messages": [{"role": "user", "content": "x"}]},
|
|
prompt_tokens_hint=8,
|
|
completion_tokens_hint=4,
|
|
)
|
|
for i in range(3)
|
|
]
|
|
|
|
outcomes = [
|
|
RequestOutcome(
|
|
request_id="r0",
|
|
success=False,
|
|
ttft_ms=None,
|
|
tpot_ms=None,
|
|
prompt_tokens=8,
|
|
completion_tokens=4,
|
|
error="request_failed",
|
|
)
|
|
]
|
|
|
|
def fake_run_one_request(*args, **kwargs):
|
|
return outcomes.pop(0)
|
|
|
|
def fake_evaluate(outcome: RequestOutcome):
|
|
return type("Eval", (), {"passed": outcome.success})()
|
|
|
|
with mock.patch("aituner.worker._run_one_request", side_effect=fake_run_one_request):
|
|
replayed, early_stopped, reason = _replay_requests(
|
|
requests,
|
|
base_url="http://127.0.0.1:8000",
|
|
timeout_s=1.0,
|
|
max_concurrency=1,
|
|
target_pass_rate=0.95,
|
|
max_lag_s=None,
|
|
max_elapsed_s=None,
|
|
evaluate_outcome=fake_evaluate,
|
|
)
|
|
self.assertTrue(early_stopped)
|
|
self.assertEqual(reason, "slo_pass_rate_unrecoverable")
|
|
self.assertEqual(len(replayed), 3)
|
|
self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable")
|
|
|
|
def test_replay_requests_does_not_wait_for_inflight_after_early_stop(self) -> None:
|
|
requests = [
|
|
TraceRequest(
|
|
row_id="r0",
|
|
arrival_s=0.0,
|
|
sampling_u=0.1,
|
|
body={"model": "m", "messages": [{"role": "user", "content": "x"}]},
|
|
prompt_tokens_hint=8,
|
|
completion_tokens_hint=4,
|
|
),
|
|
TraceRequest(
|
|
row_id="r1",
|
|
arrival_s=0.0,
|
|
sampling_u=0.2,
|
|
body={"model": "m", "messages": [{"role": "user", "content": "y"}]},
|
|
prompt_tokens_hint=8,
|
|
completion_tokens_hint=4,
|
|
),
|
|
]
|
|
|
|
class FakeFuture:
|
|
def __init__(self, outcome=None, *, should_fail_if_waited=False):
|
|
self._outcome = outcome
|
|
self._should_fail_if_waited = should_fail_if_waited
|
|
|
|
def result(self, timeout=None):
|
|
if self._should_fail_if_waited:
|
|
raise AssertionError("in-flight future should not be awaited after early stop")
|
|
return self._outcome
|
|
|
|
def cancel(self):
|
|
return True
|
|
|
|
done_future = FakeFuture(
|
|
RequestOutcome(
|
|
request_id="r0",
|
|
success=False,
|
|
ttft_ms=None,
|
|
tpot_ms=None,
|
|
prompt_tokens=8,
|
|
completion_tokens=4,
|
|
error="request_failed",
|
|
)
|
|
)
|
|
inflight_future = FakeFuture(should_fail_if_waited=True)
|
|
|
|
submitted = []
|
|
|
|
class FakeExecutor:
|
|
def __init__(self, max_workers):
|
|
self.max_workers = max_workers
|
|
|
|
def submit(self, fn, request, **kwargs):
|
|
submitted.append(request.row_id)
|
|
if request.row_id == "r0":
|
|
return done_future
|
|
return inflight_future
|
|
|
|
def shutdown(self, wait=False, cancel_futures=True):
|
|
return None
|
|
|
|
def fake_wait(futures, timeout=None, return_when=None):
|
|
self.assertEqual(len(futures), 2)
|
|
return {done_future}, {inflight_future}
|
|
|
|
def fake_evaluate(outcome: RequestOutcome):
|
|
return type("Eval", (), {"passed": outcome.success})()
|
|
|
|
with mock.patch("aituner.worker.ThreadPoolExecutor", FakeExecutor):
|
|
with mock.patch("aituner.worker.wait", side_effect=fake_wait):
|
|
replayed, early_stopped, reason = _replay_requests(
|
|
requests,
|
|
base_url="http://127.0.0.1:8000",
|
|
timeout_s=30.0,
|
|
max_concurrency=2,
|
|
target_pass_rate=0.95,
|
|
max_lag_s=None,
|
|
max_elapsed_s=None,
|
|
evaluate_outcome=fake_evaluate,
|
|
drain_inflight_on_early_stop=False,
|
|
)
|
|
|
|
self.assertEqual(submitted, ["r0", "r1"])
|
|
self.assertTrue(early_stopped)
|
|
self.assertEqual(reason, "slo_pass_rate_unrecoverable")
|
|
self.assertEqual(len(replayed), 2)
|
|
self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable")
|
|
|
|
def test_replay_requests_respects_max_elapsed_while_waiting_for_inflight(self) -> None:
|
|
requests = [
|
|
TraceRequest(
|
|
row_id="r0",
|
|
arrival_s=0.0,
|
|
sampling_u=0.1,
|
|
body={"model": "m", "messages": [{"role": "user", "content": "x"}]},
|
|
prompt_tokens_hint=8,
|
|
completion_tokens_hint=4,
|
|
)
|
|
]
|
|
|
|
class FakeFuture:
|
|
def result(self, timeout=None):
|
|
raise AssertionError("future should not be awaited after elapsed early stop")
|
|
|
|
def cancel(self):
|
|
return True
|
|
|
|
submitted = []
|
|
|
|
class FakeExecutor:
|
|
def __init__(self, max_workers):
|
|
self.max_workers = max_workers
|
|
|
|
def submit(self, fn, request, **kwargs):
|
|
submitted.append(request.row_id)
|
|
return FakeFuture()
|
|
|
|
def shutdown(self, wait=False, cancel_futures=True):
|
|
return None
|
|
|
|
wait_timeouts: list[float] = []
|
|
|
|
def fake_wait(futures, timeout=None, return_when=None):
|
|
wait_timeouts.append(timeout)
|
|
return set(), set(futures)
|
|
|
|
def fake_evaluate(outcome: RequestOutcome):
|
|
return type("Eval", (), {"passed": outcome.success})()
|
|
|
|
monotonic_values = iter([0.0, 0.0, 0.4, 1.2])
|
|
|
|
with mock.patch("aituner.worker.ThreadPoolExecutor", FakeExecutor):
|
|
with mock.patch("aituner.worker.wait", side_effect=fake_wait):
|
|
with mock.patch("aituner.worker.time.monotonic", side_effect=lambda: next(monotonic_values)):
|
|
replayed, early_stopped, reason = _replay_requests(
|
|
requests,
|
|
base_url="http://127.0.0.1:8000",
|
|
timeout_s=30.0,
|
|
max_concurrency=1,
|
|
target_pass_rate=0.95,
|
|
max_lag_s=None,
|
|
max_elapsed_s=1.0,
|
|
evaluate_outcome=fake_evaluate,
|
|
drain_inflight_on_early_stop=False,
|
|
)
|
|
|
|
self.assertEqual(submitted, ["r0"])
|
|
self.assertTrue(early_stopped)
|
|
self.assertEqual(reason, "probe_elapsed_s>1.0")
|
|
self.assertEqual(len(replayed), 1)
|
|
self.assertEqual(replayed[0].error, "probe_elapsed_s>1.0")
|
|
self.assertTrue(wait_timeouts)
|
|
self.assertLessEqual(wait_timeouts[0], 0.5)
|
|
|
|
def test_latency_summary_reports_quantiles_and_slo(self) -> None:
|
|
study = load_study_spec(_write_study_assets(Path(tempfile.mkdtemp())))
|
|
outcomes = [
|
|
RequestOutcome(
|
|
request_id="r1",
|
|
success=True,
|
|
ttft_ms=100.0,
|
|
tpot_ms=10.0,
|
|
prompt_tokens=100,
|
|
completion_tokens=10,
|
|
),
|
|
RequestOutcome(
|
|
request_id="r2",
|
|
success=True,
|
|
ttft_ms=200.0,
|
|
tpot_ms=20.0,
|
|
prompt_tokens=5000,
|
|
completion_tokens=10,
|
|
),
|
|
]
|
|
evaluations = [evaluate_request(item, study.slo) for item in outcomes]
|
|
summary = _latency_summary(outcomes=outcomes, evaluations=evaluations, study=study)
|
|
self.assertEqual(summary["observed_request_count"], 2)
|
|
self.assertEqual(summary["request_mode"], "chat")
|
|
self.assertEqual(summary["ttft_ms"]["mean"], 150.0)
|
|
self.assertEqual(summary["ttft_ms"]["p50"], 100.0)
|
|
self.assertEqual(summary["ttft_ms"]["p99"], 200.0)
|
|
self.assertEqual(summary["tpot_ms"]["mean"], 15.0)
|
|
self.assertEqual(summary["slo"]["target_pass_rate"], 0.95)
|
|
|
|
def test_wait_for_server_or_exit_fails_fast_when_process_exits(self) -> None:
|
|
process = mock.Mock()
|
|
process.poll.return_value = 17
|
|
with self.assertRaisesRegex(RuntimeError, "engine_process_exited_before_ready exit_code=17"):
|
|
_wait_for_server_or_exit(
|
|
process,
|
|
base_url="http://127.0.0.1:8000",
|
|
healthcheck_path="/v1/models",
|
|
ready_timeout_s=10.0,
|
|
)
|
|
|
|
def test_terminate_process_tree_kills_process_group(self) -> None:
|
|
process = mock.Mock()
|
|
process.pid = 1234
|
|
process.poll.side_effect = [None, None, 0]
|
|
process.wait.return_value = 0
|
|
with mock.patch("aituner.worker.os.getpgid", return_value=1234):
|
|
with mock.patch("aituner.worker.os.killpg") as mock_killpg:
|
|
_terminate_process_tree(process, timeout_s=1.0)
|
|
mock_killpg.assert_called_once()
|
|
self.assertEqual(mock_killpg.call_args[0][0], 1234)
|
|
|
|
def test_openai_url_avoids_double_v1(self) -> None:
|
|
self.assertEqual(
|
|
_openai_url("http://example.com", "/v1/chat/completions"),
|
|
"http://example.com/v1/chat/completions",
|
|
)
|
|
self.assertEqual(
|
|
_openai_url("http://example.com/v1", "/v1/chat/completions"),
|
|
"http://example.com/v1/chat/completions",
|
|
)
|
|
|
|
def test_loopback_urls_bypass_proxy(self) -> None:
|
|
self.assertTrue(_should_bypass_proxy("http://127.0.0.1:8000/v1/models"))
|
|
self.assertTrue(_should_bypass_proxy("http://localhost:8000/health"))
|
|
self.assertFalse(_should_bypass_proxy("http://example.com/v1/models"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|