Compare commits

...

7 Commits

Author SHA1 Message Date
adc4351e5d Report latency stats for infeasible baseline 2026-05-08 11:10:34 +08:00
eb137a0b62 Document TPOT40 baseline infeasible run 2026-05-08 02:57:03 +08:00
f212673f44 Stop tuning when baseline is infeasible 2026-05-08 01:07:36 +08:00
a7a5e9ad80 Make tune trial budget resumable 2026-05-07 17:18:06 +08:00
7263587cb6 clean: ci 2026-05-06 22:56:53 +08:00
d7df1ebdac Add open source project metadata
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
2026-05-06 21:18:21 +08:00
c1ff64381d Harden trial measurement accounting 2026-05-06 21:18:09 +08:00
20 changed files with 1016 additions and 24 deletions

23
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,23 @@
# Contributing
## Development Setup
```bash
python3 -m pip install -e .
PYTHONPATH=src python3 -m unittest discover -s tests -v
```
## Change Requirements
- Add or update tests for behavior changes.
- Keep experiment claims tied to reproducible artifacts: study spec, trial spec,
result JSON, probe history, and per-request probe details.
- Do not publish benchmark conclusions from bounded or time-compressed replays
without clearly labeling the replay controls.
- Keep example configs free of private credentials and prefer explicit,
reproducible endpoint settings.
## Commit Hygiene
Use small commits grouped by behavior: measurement integrity, orchestration
logic, documentation, or infrastructure.

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 AITuner contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

72
README.md Normal file
View File

@@ -0,0 +1,72 @@
# AITuner
AITuner is a small study orchestrator for OpenAI-compatible serving engines. It
replays trace windows, searches for the highest feasible offered load under
configured SLOs, and records enough trial context for LLM- or harness-guided
configuration proposals.
## Status
This repository is research tooling. Treat reported experiment numbers as valid
only when the matching study spec, trial artifacts, probe history, and
`probe_details.jsonl` files are available for audit.
## Install
```bash
python3 -m pip install -e .
```
## Test
The test suite uses the Python standard library `unittest` runner:
```bash
PYTHONPATH=src python3 -m unittest discover -s tests -v
```
If the package is installed in editable mode, `PYTHONPATH=src` is optional.
## Basic Workflow
Initialize a study:
```bash
aituner study init --spec configs/examples/study.example.json
```
Run a local tuning loop:
```bash
aituner study tune --spec configs/examples/study.example.json --max-trials 2
```
Run a compare:
```bash
aituner compare run --spec configs/examples/compare.example.json
```
Remote experiment notes for this checkout live in `AGENTS.md`. The default
remote host is `dash0`, and code should be synchronized through Git before
remote runs.
## Experiment Integrity
- Fixed-length replay requests are scored only when completion token usage is
verifiable and matches the trace expectation.
- Each trial writes aggregate probe history and per-request probe details.
- `request_rate_per_gpu` is the primary cross-topology metric:
`best_feasible_request_rate / (tensor_parallel_size * data_parallel_size)`.
- Compare reports include failed and no-feasible window counts; do not interpret
mean request rates without those counts.
- Bounded replays using `max_requests_per_probe`, `completion_tokens_override`,
or `replay_time_scale` are convergence tests for that bounded workload, not
production benchmarks.
## Configuration Notes
Example specs that use `llm.endpoint.provider=codex` resolve the endpoint from
the local Codex configuration unless `llm.endpoint.base_url` or
`AITUNER_CODEX_BASE_URL` is set. Public, reproducible examples should prefer an
explicit endpoint or omit the LLM endpoint and use proposal files.

19
SECURITY.md Normal file
View File

@@ -0,0 +1,19 @@
# Security
AITuner launches local or remote serving engines and may replay trace payloads.
Do not commit secrets, API keys, private trace content, or private model access
tokens.
## Reporting
Report security issues privately to the project maintainers. If this repository
is mirrored to a public forge, use that forge's private vulnerability reporting
flow when available.
## Operational Guidance
- Keep `.env` files local; `.env.example` documents expected variable names.
- Review generated trial artifacts before publishing them, because request
payloads may contain trace text.
- Treat remote execution configs as sensitive when they include internal host
names, paths, or scheduler details.

View File

@@ -60,7 +60,7 @@ The speedup comes from reducing wasted proposal families, not from changing the
- Engine relaunch after early stop is available as opt-in for faster smoke studies, but it is not the default because it can change warm-state comparability.
5. Search-high saturation stop
- If the incumbent's highest measured probe is feasible, has no SLO failures, and is within the configured binary-search resolution of `search.high`, the harness stops before asking the LLM for another proposal.
- If the incumbent's highest measured probe is feasible and is within the configured binary-search resolution of `search.high`, the harness stops before asking the LLM for another proposal. Individual request failures can be present when the aggregate probe still meets the configured pass-rate SLO.
- This is not a model-specific threshold. It means the workload search range, not the engine config, is currently the limiting measurement bound.
6. Deterministic first probes

View File

@@ -118,7 +118,7 @@ A second generic diagnosis bug was fixed: non-SLO bookkeeping counts such as `pr
The base-relative patch issue is now guarded in code, not only in the LLM prompt. When `StudyStore.materialize_trial` sees a runtime/env-only proposal after a non-base incumbent has been found, it inherits the incumbent topology patch into the trial spec unless the proposal explicitly provides a topology. This keeps same-topology runtime validation on the actual incumbent while preserving the ability to test the base topology by stating it explicitly.
Local verification: `PYTHONPATH=src python3 -m unittest discover -s tests` passed 68 tests.
Local verification at that commit: `PYTHONPATH=src python3 -m unittest discover -s tests` passed. The current repository suite has since grown; rerun the command rather than relying on this historical test count.
## Current Harness Judgment

View File

@@ -0,0 +1,131 @@
# Qwen27B Chat 0-8k TPOT 40ms Baseline Infeasible Run
Date: 2026-05-07
## Goal
Re-run the internal vLLM + Qwen3.5-27B chat 0-8k tuning comparison after adding a study-level guard:
- if the automatic baseline trial has no feasible probe;
- and the lowest sampled request rate still fails the SLO target pass rate;
- then AITuner stops the whole study and reports that the SLO is too tight for the current setup.
This prevents spending the remaining tuning budget on LLM or harness proposals when the baseline itself demonstrates that the workload/SLO is infeasible at the search floor.
## Implementation
Commit: `f212673 Stop tuning when baseline is infeasible`
Changed behavior:
- `study tune` now persists `tuning_stop_reason` and `tuning_stop_diagnosis` in `state.json`.
- `study tune` also persists `tuning_stop_details`, including the lowest sampled probe's TTFT/TPOT mean, p50, p95, and p99.
- After the automatic baseline trial is ingested, AITuner checks the worker result:
- `status == completed`
- `best_request_rate is None`
- at least one probe exists
- all probes are infeasible
- If true, AITuner stops before asking the LLM or harness for any proposal.
- Re-running the same study respects the persisted stop state and does not resume tuning.
Validation:
```bash
python3 -m compileall -q src tests
PYTHONPATH=src python3 -m unittest tests.test_core_flow
```
Local and `dash0` both passed.
## Setup
Host: `dash0`
Remote repo: `/home/admin/cpfs/wjh/aituner/aituner`
Base spec: `configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json`
Model: `/home/admin/resource/model/464482ce/qwen3.5-27b/256k-0223-internal`
Workload: chat, 0-8k input window
SLO:
- TTFT: existing step rule from the base spec
- TPOT: fixed `40ms`
- target pass rate: `0.95`
Search:
- Direct AITuner command: `python3 -m aituner.cli study tune ... --max-trials 12`
- No manual proposal/state edits during either run.
- Both variants used `CUDA_VISIBLE_DEVICES=0,1,2,4,5,6,7`; this was identical for both specs.
- The two specs were verified equal after normalizing only `study_id` and `llm.use_harness`.
Specs:
- no-harness: `.aituner-tight/specs/dash0-qwen27b-chat-0-8k-tpot40-gpu3skip-12iter-noharness-20260507.json`
- harness: `.aituner-tight/specs/dash0-qwen27b-chat-0-8k-tpot40-gpu3skip-12iter-harness-20260507.json`
## Commands
No harness:
```bash
PYTHONPATH=src python3 -m aituner.cli study tune \
--spec .aituner-tight/specs/dash0-qwen27b-chat-0-8k-tpot40-gpu3skip-12iter-noharness-20260507.json \
--store-root .aituner-tight \
--max-trials 12
```
Harness:
```bash
PYTHONPATH=src python3 -m aituner.cli study tune \
--spec .aituner-tight/specs/dash0-qwen27b-chat-0-8k-tpot40-gpu3skip-12iter-harness-20260507.json \
--store-root .aituner-tight \
--max-trials 12
```
## Results
Both runs stopped after the baseline trial. No LLM/harness proposal was evaluated because baseline had no feasible probe.
| Variant | Trials executed | Best request rate | Best request rate / GPU | Stop reason |
| --- | ---: | ---: | ---: | --- |
| no-harness | 1 | - | - | `baseline_all_infeasible` |
| harness | 1 | - | - | `baseline_all_infeasible` |
Baseline probe curve:
| sampling_u | request rate | pass rate | feasible | early stop reason |
| ---: | ---: | ---: | --- | --- |
| 0.03125 | 0.895 | 0.000000 | false | `slo_pass_rate_unrecoverable` |
| 0.015625 | 0.483333 | 0.137931 | false | `slo_pass_rate_unrecoverable` |
| 0.0078125 | 0.246667 | 0.236486 | false | `slo_pass_rate_unrecoverable` |
| 0.00390625 | 0.123333 | 0.189189 | false | `slo_pass_rate_unrecoverable` |
| 0.001953125 | 0.065000 | 0.205128 | false | `slo_pass_rate_unrecoverable` |
| 0.0009765625 | 0.035000 | 0.142857 | false | `slo_pass_rate_unrecoverable` |
Lowest request rate latency summary:
| Variant | request rate | pass rate | TTFT mean | TTFT p50 | TTFT p95 | TTFT p99 | TPOT mean | TPOT p50 | TPOT p95 | TPOT p99 |
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |
| no-harness | 0.035000 | 0.142857 | 1288.953ms | 446.586ms | 3011.814ms | 3011.814ms | 12.661ms | 13.141ms | 15.097ms | 15.097ms |
| harness | 0.035000 | 0.142857 | 1268.090ms | 445.274ms | 2889.080ms | 2889.080ms | 12.658ms | 13.170ms | 15.102ms | 15.102ms |
This shows that the TPOT threshold of `40ms` is not the binding constraint at the lowest sampled rate. The observed TPOT p99 is about `15.1ms`; failures are driven by TTFT and by the unrecoverable-pass-rate early stop after too many requests have already failed or been skipped.
Final diagnosis written by AITuner:
```text
Baseline configuration has no feasible probe under the current SLO. Stopping tuning because even the lowest sampled request rate did not meet the target pass rate. lowest_sampled_request_rate=0.035 lowest_sampling_u=0.000976562 lowest_probe_pass_rate=0.142857 early_stop_reason=slo_pass_rate_unrecoverable
```
## Interpretation
This run does not measure harness acceleration. It proves that the TPOT 40ms setup is infeasible for the current baseline and search floor: even at `0.035` aggregate request rate, only `14.29%` of requests pass the SLO, far below the `95%` target.
The correct behavior is to stop the study early and report SLO infeasibility instead of spending the remaining 11 trial slots. Harness cannot accelerate convergence when there is no feasible baseline point and no incumbent for guided tuning.
For a Fig. 18-style convergence comparison, the next setup must first have at least one feasible baseline or feasible low-rate point under the same metric definitions.

View File

@@ -64,7 +64,7 @@ This run tests a stricter early-stop harness:
- the validation covered topology and runtime families, or accumulated at least three post-incumbent validation attempts.
- If the stop guard fires, `study tune` writes `harness-stop-XXXX` and exits without spending another GPU trial or asking the LLM for another proposal.
- A single-family all-infeasible plateau is not enough to stop deterministically. It only blocks repeating that family; the LLM must either justify a different family or later satisfy the validation/convergence stop rule.
- A search-high saturation guard stops immediately when the incumbent's highest measured probe is feasible, has no SLO failures, and is within the configured binary-search resolution of `search.high`. In that case the current study cannot measure a better config without increasing the workload search range, so more config proposals only waste tuning iterations.
- A search-high saturation guard stops immediately when the incumbent's highest measured probe is feasible and is within the configured binary-search resolution of `search.high`. A feasible probe may still contain individual SLO failures as long as it meets the configured pass-rate target. In that case the current study cannot measure a better config without increasing the workload search range, so more config proposals only waste tuning iterations.
This is a generic harness rule, not a testcase-specific threshold. It does not depend on qwen27b, qwen235b, qwen30b, a fixed TP/DP value, or a hardcoded SLO number.
@@ -76,7 +76,7 @@ Local test command:
PYTHONPATH=src python3 -m unittest tests.test_core_flow -q
```
Result: passed, 77 tests.
Result at the time of this note: passed. The current repository test count may be higher; use the command above as the source of truth.
The added coverage checks:

View File

@@ -0,0 +1,59 @@
# Repo Audit Repair Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Repair the audit findings that affect measurement integrity, state correctness, documentation accuracy, and open-source readiness.
**Architecture:** Keep changes localized to the existing stdlib-only Python package. Measurement validation lives at the HTTP/worker boundary, study state fixes remain in `StudyStore`, compare reporting gains explicit failed/no-feasible accounting, and project metadata/docs are added at repo root.
**Tech Stack:** Python 3.11+ stdlib, `unittest`, setuptools `pyproject.toml`.
---
### Task 1: Measurement Integrity
**Files:**
- Modify: `src/aituner/http_client.py`
- Modify: `src/aituner/slo.py`
- Modify: `src/aituner/worker.py`
- Test: `tests/test_core_flow.py`
- [ ] Write failing tests for completion token source/mismatch failures and persisted per-request probe details.
- [ ] Run the targeted tests and confirm they fail for the expected reason.
- [ ] Add token source metadata to streamed metrics and request outcomes.
- [ ] Fail requests when configured completion length cannot be verified from usage or differs from expected.
- [ ] Persist probe outcome details under each trial artifact directory.
- [ ] Run targeted tests and the full unittest suite.
### Task 2: State, Spec, And Compare Guards
**Files:**
- Modify: `src/aituner/spec.py`
- Modify: `src/aituner/store.py`
- Modify: `src/aituner/compare.py`
- Modify: `scripts/run_multi_compare.py`
- Test: `tests/test_core_flow.py`
- [ ] Write failing tests for state list isolation, invalid trace numeric bounds, and compare aggregate failure accounting.
- [ ] Run targeted tests and confirm expected failures.
- [ ] Deep-copy/replace trial lists when materializing trials.
- [ ] Validate positive trace controls in `TraceSpec.from_dict`.
- [ ] Report failed/no-feasible counts in compare aggregates without changing existing winner semantics.
- [ ] Run targeted tests and the full unittest suite.
### Task 3: Docs And Open-Source Readiness
**Files:**
- Create: `README.md`
- Create: `LICENSE`
- Create: `CONTRIBUTING.md`
- Create: `SECURITY.md`
- Modify: `pyproject.toml`
- Modify: selected docs under `docs/`
- [ ] Add concise repo usage, verification, and experiment integrity guidance.
- [ ] Add MIT license and contribution/security notes.
- [ ] Add project metadata and optional test extra.
- [ ] Update stale docs about high-stop behavior and current test count.
- [ ] Run JSON validation and full unittest suite.
- [ ] Commit changes in logical groups.

View File

@@ -6,8 +6,23 @@ build-backend = "setuptools.build_meta"
name = "aituner"
version = "0.1.0"
description = "AITuner study orchestrator for OpenAI-compatible serving engines"
readme = "README.md"
requires-python = ">=3.11"
license = {text = "MIT"}
authors = [{name = "AITuner contributors"}]
dependencies = []
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
[project.optional-dependencies]
test = []
[project.scripts]
aituner = "aituner.cli:main"

View File

@@ -372,6 +372,7 @@ def _aggregate(rows: list[dict[str, Any]], candidates: list[MultiCompareCandidat
if rates_per_gpu
else None,
"mean_pass_rate": (sum(pass_rates) / len(pass_rates)) if pass_rates else None,
**_candidate_result_counts(rows, name),
}
for row in rows:
wins[row["winner"]] = wins.get(row["winner"], 0) + 1
@@ -382,6 +383,26 @@ def _aggregate(rows: list[dict[str, Any]], candidates: list[MultiCompareCandidat
}
def _candidate_result_counts(rows: list[dict[str, Any]], name: str) -> dict[str, int]:
counts = {
"completed_window_count": 0,
"failed_window_count": 0,
"no_feasible_window_count": 0,
}
for row in rows:
result = row.get("candidates", {}).get(name)
if not isinstance(result, dict):
continue
status = str(result.get("status") or "")
if status == "completed":
counts["completed_window_count"] += 1
elif status == "failed":
counts["failed_window_count"] += 1
if not isinstance(result.get("best_request_rate_per_gpu"), (int, float)):
counts["no_feasible_window_count"] += 1
return counts
def _render_report(summary: dict[str, Any], candidates: list[MultiCompareCandidate]) -> str:
candidate_names = [item.name for item in candidates]
lines = [
@@ -413,6 +434,9 @@ def _render_report(summary: dict[str, Any], candidates: list[MultiCompareCandida
lines.append(
f"- `{name}` mean req/s=`{aggregate['mean_request_rate']}`, mean req/s/gpu=`{aggregate['mean_request_rate_per_gpu']}`, mean pass_rate=`{aggregate['mean_pass_rate']}`"
)
lines.append(
f" completed/failed/no-feasible windows=`{aggregate['completed_window_count']}`/`{aggregate['failed_window_count']}`/`{aggregate['no_feasible_window_count']}`"
)
header = ["Window", "Date"]
for name in candidate_names:
header.extend([f"{name} req/s", f"{name} req/s/gpu"])

View File

@@ -19,6 +19,88 @@ from .trace import load_trace_requests, summarize_window
from .worker import run_trial
def _is_empty_config_patch(proposal: Proposal) -> bool:
return not proposal.config_patch.env_patch and not proposal.config_patch.flag_patch
def _latency_percentiles(summary: object, metric: str) -> dict[str, float]:
if not isinstance(summary, dict):
return {}
payload = summary.get(metric)
if not isinstance(payload, dict):
return {}
selected: dict[str, float] = {}
for key in ("mean", "p50", "p95", "p99"):
value = payload.get(key)
if isinstance(value, (int, float)):
selected[key] = float(value)
return selected
def _format_latency_percentiles(metric: str, values: dict[str, float]) -> str:
if not values:
return ""
ordered = ", ".join(
f"{key}={values[key]:.3f}"
for key in ("mean", "p50", "p95", "p99")
if key in values
)
return f"{metric}({ordered})"
def _baseline_all_infeasible_stop(result: dict[str, object]) -> tuple[str, dict[str, object]] | None:
if result.get("status") != "completed":
return None
if isinstance(result.get("best_request_rate"), (int, float)):
return None
probes = result.get("probes")
if not isinstance(probes, list) or not probes:
return None
if any(isinstance(probe, dict) and probe.get("feasible") for probe in probes):
return None
diagnostics = result.get("all_infeasible_diagnostics")
if not isinstance(diagnostics, dict):
diagnostics = {}
lowest_rate = diagnostics.get("request_rate")
lowest_threshold = diagnostics.get("threshold")
pass_rate = diagnostics.get("pass_rate")
early_stop_reason = str(diagnostics.get("early_stop_reason") or "").strip()
latency_summary = diagnostics.get("latency_summary")
ttft = _latency_percentiles(latency_summary, "ttft_ms")
tpot = _latency_percentiles(latency_summary, "tpot_ms")
details: dict[str, object] = {
"lowest_sampled_request_rate": lowest_rate,
"lowest_sampling_u": lowest_threshold,
"lowest_probe_pass_rate": pass_rate,
"early_stop_reason": early_stop_reason,
"lowest_probe_latency_ms": {
"ttft": ttft,
"tpot": tpot,
},
"lowest_probe_latency_summary": latency_summary if isinstance(latency_summary, dict) else {},
}
pieces = [
"Baseline configuration has no feasible probe under the current SLO.",
"Stopping tuning because even the lowest sampled request rate did not meet the target pass rate.",
]
if isinstance(lowest_rate, (int, float)):
pieces.append(f"lowest_sampled_request_rate={float(lowest_rate):.6g}")
if isinstance(lowest_threshold, (int, float)):
pieces.append(f"lowest_sampling_u={float(lowest_threshold):.6g}")
if isinstance(pass_rate, (int, float)):
pieces.append(f"lowest_probe_pass_rate={float(pass_rate):.6g}")
if early_stop_reason:
pieces.append(f"early_stop_reason={early_stop_reason}")
for item in (
_format_latency_percentiles("lowest_probe_ttft_ms", ttft),
_format_latency_percentiles("lowest_probe_tpot_ms", tpot),
):
if item:
pieces.append(item)
return " ".join(pieces), details
def _study_source_path(study_root: Path) -> Path:
return Path((study_root / "study_spec.source").read_text(encoding="utf-8").strip())
@@ -126,6 +208,21 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
executed: list[dict[str, object]] = []
for idx in range(max_trials):
state = store.load_state(study.study_id)
if state.tuning_stop_reason:
executed.append(
{
"trial_id": None,
"stopped": True,
"reason": state.tuning_stop_reason,
"diagnosis": state.tuning_stop_diagnosis,
"details": state.tuning_stop_details,
"state_best_trial_id": state.best_trial_id,
"state_best_request_rate": state.best_request_rate,
}
)
break
if state.next_trial_index > max_trials:
break
window, requests = load_trace_requests(study, study_spec_path=spec_path)
window_summary = summarize_window(requests, window)
harness_context = (
@@ -169,7 +266,10 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
ensure_ascii=False,
)
elif proposal_files:
proposal_source = proposal_files[idx]
proposal_index = state.next_trial_index - 1
if proposal_index >= len(proposal_files):
break
proposal_source = proposal_files[proposal_index]
proposal_text = proposal_source.read_text(encoding="utf-8")
proposal_name = proposal_source.stem
else:
@@ -223,6 +323,13 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
}
)
break
is_auto_baseline = (
not proposal_files
and not args.skip_baseline
and state.next_trial_index == 1
and not state.trials
and _is_empty_config_patch(proposal)
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
trial_spec_path = Path(trial.artifact_dir) / "trial_spec.json"
result = run_trial(trial_spec_path)
@@ -243,6 +350,26 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
"state_best_request_rate": state.best_request_rate,
}
)
if is_auto_baseline:
stop = _baseline_all_infeasible_stop(result)
if stop is not None:
diagnosis, details = stop
state.tuning_stop_reason = "baseline_all_infeasible"
state.tuning_stop_diagnosis = diagnosis
state.tuning_stop_details = details
store.save_state(state)
executed.append(
{
"trial_id": None,
"stopped": True,
"reason": state.tuning_stop_reason,
"diagnosis": diagnosis,
"details": details,
"state_best_trial_id": state.best_trial_id,
"state_best_request_rate": state.best_request_rate,
}
)
break
final_state = store.load_state(study.study_id)
print(
@@ -252,6 +379,9 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
"executed_trials": executed,
"best_trial_id": final_state.best_trial_id,
"best_request_rate": final_state.best_request_rate,
"tuning_stop_reason": final_state.tuning_stop_reason,
"tuning_stop_diagnosis": final_state.tuning_stop_diagnosis,
"tuning_stop_details": final_state.tuning_stop_details,
},
ensure_ascii=False,
)

View File

@@ -382,6 +382,8 @@ def _aggregate_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
wins = {"baseline": 0, "tuned": 0, "tie": 0, "incomparable": 0}
for row in rows:
wins[row["delta"]["winner"]] += 1
baseline_counts = _candidate_result_counts(rows, "baseline")
tuned_counts = _candidate_result_counts(rows, "tuned")
return {
"window_count": len(rows),
"wins": wins,
@@ -389,9 +391,31 @@ def _aggregate_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
"tuned_mean_request_rate": _mean_or_none(tuned_rates),
"baseline_mean_request_rate_per_gpu": _mean_or_none(baseline_per_gpu),
"tuned_mean_request_rate_per_gpu": _mean_or_none(tuned_per_gpu),
"baseline_completed_window_count": baseline_counts["completed"],
"baseline_failed_window_count": baseline_counts["failed"],
"baseline_no_feasible_window_count": baseline_counts["no_feasible"],
"tuned_completed_window_count": tuned_counts["completed"],
"tuned_failed_window_count": tuned_counts["failed"],
"tuned_no_feasible_window_count": tuned_counts["no_feasible"],
}
def _candidate_result_counts(rows: list[dict[str, Any]], name: str) -> dict[str, int]:
counts = {"completed": 0, "failed": 0, "no_feasible": 0}
for row in rows:
result = row.get(name)
if not isinstance(result, dict):
continue
status = str(result.get("status") or "")
if status == "completed":
counts["completed"] += 1
elif status == "failed":
counts["failed"] += 1
if not isinstance(result.get("best_request_rate_per_gpu"), (int, float)):
counts["no_feasible"] += 1
return counts
def _mean_or_none(values: list[float]) -> float | None:
if not values:
return None
@@ -417,6 +441,8 @@ def _render_report(summary: dict[str, Any]) -> str:
f"- Tuned mean request rate: `{summary['aggregate']['tuned_mean_request_rate']}`",
f"- Baseline mean request rate per GPU: `{summary['aggregate']['baseline_mean_request_rate_per_gpu']}`",
f"- Tuned mean request rate per GPU: `{summary['aggregate']['tuned_mean_request_rate_per_gpu']}`",
f"- Baseline completed/failed/no-feasible windows: `{summary['aggregate']['baseline_completed_window_count']}`/`{summary['aggregate']['baseline_failed_window_count']}`/`{summary['aggregate']['baseline_no_feasible_window_count']}`",
f"- Tuned completed/failed/no-feasible windows: `{summary['aggregate']['tuned_completed_window_count']}`/`{summary['aggregate']['tuned_failed_window_count']}`/`{summary['aggregate']['tuned_no_feasible_window_count']}`",
"",
"## Per Window",
"",

View File

@@ -240,6 +240,8 @@ class StreamMetrics:
ttft_ms: float | None
tpot_ms: float | None
completion_tokens: int | None
completion_tokens_source: str = "usage"
streamed_chunk_count: int = 0
def stream_chat_completion(
@@ -260,6 +262,7 @@ def stream_chat_completion(
last_token_at: float | None = None
chunk_token_count = 0
completion_tokens: int | None = None
completion_tokens_source = "none"
try:
with _urlopen(request, timeout=timeout_s) as response:
for raw in _iter_sse_lines(response):
@@ -273,6 +276,7 @@ def stream_chat_completion(
comp = usage.get("completion_tokens")
if isinstance(comp, int) and comp >= 0:
completion_tokens = comp
completion_tokens_source = "usage"
choices = payload.get("choices")
if not isinstance(choices, list) or not choices:
continue
@@ -290,7 +294,10 @@ def stream_chat_completion(
detail = exc.read().decode("utf-8", errors="replace")
raise HttpClientError(f"stream_chat_completion failed: {exc.code} {detail}") from exc
ttft_ms = None if first_token_at is None else (first_token_at - start) * 1000.0
used_tokens = completion_tokens if completion_tokens is not None else chunk_token_count
if completion_tokens is None and chunk_token_count > 0:
completion_tokens = chunk_token_count
completion_tokens_source = "stream_chunks"
used_tokens = completion_tokens
if (
first_token_at is None
or last_token_at is None
@@ -304,6 +311,8 @@ def stream_chat_completion(
ttft_ms=ttft_ms,
tpot_ms=tpot_ms,
completion_tokens=used_tokens if used_tokens > 0 else None,
completion_tokens_source=completion_tokens_source,
streamed_chunk_count=chunk_token_count,
)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any
@@ -604,7 +605,8 @@ def call_llm_for_proposal(
if policy.endpoint is None:
raise RuntimeError("study.llm.endpoint is not configured")
last_error: Exception | None = None
for attempt in range(2):
max_attempts = 4
for attempt in range(max_attempts):
try:
if policy.endpoint.stream:
text = stream_text_completion(
@@ -636,6 +638,7 @@ def call_llm_for_proposal(
last_error = RuntimeError("LLM response content is empty")
except Exception as exc: # noqa: BLE001
last_error = exc
if attempt == 0:
if attempt < max_attempts - 1:
time.sleep(min(30.0, 2.0 * (2**attempt)))
continue
raise RuntimeError(f"LLM proposal failed after retry: {last_error}") from last_error

View File

@@ -15,6 +15,7 @@ class RequestOutcome:
prompt_tokens: int | None
completion_tokens: int | None
error: str = ""
completion_tokens_source: str = ""
@dataclass(frozen=True)

View File

@@ -354,6 +354,33 @@ class TraceSpec:
)
if completion_tokens_override < 0:
raise SpecError("trace.completion_tokens_override must be >= 0.")
max_requests_value = (
_require_int(max_requests, context="trace.max_requests_per_probe")
if max_requests is not None
else None
)
if max_requests_value is not None and max_requests_value <= 0:
raise SpecError("trace.max_requests_per_probe must be > 0.")
synthetic_prompt_cap_value = (
_require_int(
synthetic_prompt_cap,
context="trace.synthetic_prompt_cap_tokens",
)
if synthetic_prompt_cap is not None
else None
)
if synthetic_prompt_cap_value is not None and synthetic_prompt_cap_value < 0:
raise SpecError("trace.synthetic_prompt_cap_tokens must be >= 0.")
replay_time_scale = _require_float(
data.get("replay_time_scale", 1.0), context="trace.replay_time_scale"
)
if replay_time_scale <= 0:
raise SpecError("trace.replay_time_scale must be > 0.")
max_concurrency = _require_int(
data.get("max_concurrency", 64), context="trace.max_concurrency"
)
if max_concurrency <= 0:
raise SpecError("trace.max_concurrency must be > 0.")
return cls(
windows_path=_require_str(data.get("windows_path"), context="trace.windows_path"),
window_id=_require_str(data.get("window_id"), context="trace.window_id"),
@@ -364,9 +391,7 @@ class TraceSpec:
completion_tokens_override=completion_tokens_override,
u_field=str(data.get("u_field") or "sampling_u").strip(),
timestamp_field=str(data.get("timestamp_field") or "timestamp").strip(),
max_concurrency=_require_int(
data.get("max_concurrency", 64), context="trace.max_concurrency"
),
max_concurrency=max_concurrency,
input_length_filter=(
InputLengthFilterSpec.from_dict(
_require_mapping(
@@ -378,13 +403,9 @@ class TraceSpec:
if data.get("input_length_filter") is not None
else None
),
max_requests_per_probe=int(max_requests) if max_requests is not None else None,
synthetic_prompt_cap_tokens=(
int(synthetic_prompt_cap) if synthetic_prompt_cap is not None else None
),
replay_time_scale=_require_float(
data.get("replay_time_scale", 1.0), context="trace.replay_time_scale"
),
max_requests_per_probe=max_requests_value,
synthetic_prompt_cap_tokens=synthetic_prompt_cap_value,
replay_time_scale=replay_time_scale,
early_stop_max_lag_s=(
_require_float(
data.get("early_stop_max_lag_s"), context="trace.early_stop_max_lag_s"
@@ -743,6 +764,9 @@ class StudyState:
best_request_rate: float | None = None
best_request_rate_per_gpu: float | None = None
next_trial_index: int = 1
tuning_stop_reason: str = ""
tuning_stop_diagnosis: str = ""
tuning_stop_details: dict[str, Any] = field(default_factory=dict)
best_by_parallel_size: dict[str, dict[str, Any]] = field(default_factory=dict)
trials: list[TrialSummary] = field(default_factory=list)

View File

@@ -45,6 +45,9 @@ class StudyStore:
best_request_rate=payload.get("best_request_rate"),
best_request_rate_per_gpu=payload.get("best_request_rate_per_gpu"),
next_trial_index=int(payload.get("next_trial_index", 1)),
tuning_stop_reason=str(payload.get("tuning_stop_reason") or ""),
tuning_stop_diagnosis=str(payload.get("tuning_stop_diagnosis") or ""),
tuning_stop_details=dict(payload.get("tuning_stop_details") or {}),
best_by_parallel_size={
str(key): value
for key, value in (payload.get("best_by_parallel_size") or {}).items()
@@ -98,8 +101,7 @@ class StudyStore:
result_path=str(trial_root / "result.json"),
)
self.write_json(trial_root / "trial_spec.json", to_jsonable(spec))
next_state = replace(state, next_trial_index=state.next_trial_index + 1)
next_state.trials.append(
next_trial = (
TrialSummary(
trial_id=trial_id,
status="queued",
@@ -108,6 +110,11 @@ class StudyStore:
config_patch=to_jsonable(proposal.config_patch),
)
)
next_state = replace(
state,
next_trial_index=state.next_trial_index + 1,
trials=[*state.trials, next_trial],
)
self.save_state(next_state)
return spec, next_state

View File

@@ -105,13 +105,49 @@ def _run_one_request(
) -> RequestOutcome:
try:
metrics = stream_chat_completion(base_url=base_url, body=request.body, timeout_s=timeout_s)
expected_completion_tokens = request.completion_tokens_hint
actual_completion_tokens = metrics.completion_tokens
completion_tokens_source = getattr(metrics, "completion_tokens_source", "")
if expected_completion_tokens is not None:
if completion_tokens_source != "usage":
return RequestOutcome(
request_id=request.row_id,
success=False,
ttft_ms=metrics.ttft_ms,
tpot_ms=metrics.tpot_ms,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=actual_completion_tokens,
error=(
"completion_tokens_unverified "
f"source={completion_tokens_source or 'unknown'} "
f"expected={expected_completion_tokens} "
f"actual={actual_completion_tokens}"
),
completion_tokens_source=completion_tokens_source,
)
if actual_completion_tokens != expected_completion_tokens:
return RequestOutcome(
request_id=request.row_id,
success=False,
ttft_ms=metrics.ttft_ms,
tpot_ms=metrics.tpot_ms,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=actual_completion_tokens,
error=(
"completion_tokens_mismatch "
f"expected={expected_completion_tokens} "
f"actual={actual_completion_tokens}"
),
completion_tokens_source=completion_tokens_source,
)
return RequestOutcome(
request_id=request.row_id,
success=True,
ttft_ms=metrics.ttft_ms,
tpot_ms=metrics.tpot_ms,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=metrics.completion_tokens or request.completion_tokens_hint,
completion_tokens=actual_completion_tokens or request.completion_tokens_hint,
completion_tokens_source=completion_tokens_source,
)
except HttpClientError as exc:
return RequestOutcome(
@@ -125,6 +161,53 @@ def _run_one_request(
)
def _probe_outcome_details(
*,
threshold: float,
selected: list[TraceRequest],
outcomes: list[RequestOutcome],
evaluations: list[Any],
early_stopped: bool,
early_stop_reason: str,
) -> dict[str, Any]:
selected_by_id = {request.row_id: request for request in selected}
return {
"threshold": threshold,
"early_stopped": early_stopped,
"early_stop_reason": early_stop_reason,
"outcomes": [
{
"request_id": outcome.request_id,
"sampling_u": (
selected_by_id[outcome.request_id].sampling_u
if outcome.request_id in selected_by_id
else None
),
"arrival_s": (
selected_by_id[outcome.request_id].arrival_s
if outcome.request_id in selected_by_id
else None
),
"success": outcome.success,
"ttft_ms": outcome.ttft_ms,
"tpot_ms": outcome.tpot_ms,
"prompt_tokens": outcome.prompt_tokens,
"expected_completion_tokens": (
selected_by_id[outcome.request_id].completion_tokens_hint
if outcome.request_id in selected_by_id
else None
),
"completion_tokens": outcome.completion_tokens,
"completion_tokens_source": outcome.completion_tokens_source,
"error": outcome.error,
"evaluation": evaluation.passed,
"reasons": evaluation.reasons,
}
for outcome, evaluation in zip(outcomes, evaluations)
],
}
def _replay_requests(
requests: list[TraceRequest],
*,
@@ -340,6 +423,9 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
artifact_dir = Path(trial.artifact_dir)
artifact_dir.mkdir(parents=True, exist_ok=True)
engine_log_path = Path(trial.engine_log_path)
probe_details_path = artifact_dir / "probe_details.jsonl"
if probe_details_path.exists():
probe_details_path.unlink()
with engine_log_path.open("w", encoding="utf-8") as engine_log:
def launch_process() -> subprocess.Popen[str]:
return subprocess.Popen( # noqa: S603
@@ -380,6 +466,18 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
drain_inflight_on_early_stop=not restart_after_early_stop,
)
evaluations, summary = summarize_evaluations(outcomes, study.slo)
probe_details = _probe_outcome_details(
threshold=threshold,
selected=selected,
outcomes=outcomes,
evaluations=evaluations,
early_stopped=early_stopped,
early_stop_reason=early_stop_reason,
)
with probe_details_path.open("a", encoding="utf-8") as details_handle:
details_handle.write(
json.dumps(probe_details, ensure_ascii=False) + "\n"
)
request_rate = (
len(selected) / max(window.window_end - window.window_start, 1e-9)
if selected
@@ -406,6 +504,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
"tpot_ms": outcome.tpot_ms,
"prompt_tokens": outcome.prompt_tokens,
"completion_tokens": outcome.completion_tokens,
"completion_tokens_source": outcome.completion_tokens_source,
"evaluation": evaluation.passed,
"reasons": evaluation.reasons,
}

View File

@@ -9,9 +9,9 @@ from pathlib import Path
from unittest import mock
from aituner.cli import main as cli_main
from aituner.compare import load_compare_spec, run_compare
from aituner.compare import _aggregate_summary, load_compare_spec, run_compare
from aituner.engine import build_launch_recipe
from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy
from aituner.http_client import StreamMetrics, _auth_headers, _openai_url, _should_bypass_proxy
from aituner.job import append_job, build_trial_job
from aituner.harness import (
build_harness_context,
@@ -34,9 +34,11 @@ from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window
from aituner.worker import (
_latency_summary,
_run_one_request,
_replay_requests,
_terminate_process_tree,
_wait_for_server_or_exit,
run_trial,
)
from aituner.trace import TraceRequest
@@ -863,6 +865,24 @@ class CoreFlowTests(unittest.TestCase):
with self.assertRaisesRegex(SpecError, "min_input_tokens must be <="):
load_study_spec(study_path)
def test_trace_rejects_non_positive_max_requests_per_probe(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets(
Path(tmp),
trace_overrides={"max_requests_per_probe": 0},
)
with self.assertRaisesRegex(SpecError, "max_requests_per_probe must be > 0"):
load_study_spec(study_path)
def test_trace_rejects_invalid_replay_time_scale(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets(
Path(tmp),
trace_overrides={"replay_time_scale": 0.0},
)
with self.assertRaisesRegex(SpecError, "replay_time_scale must be > 0"):
load_study_spec(study_path)
def test_decode_only_mode_is_loaded_and_prompt_mentions_it(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -1456,6 +1476,34 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(requests[2].body["min_tokens"], 1)
self.assertEqual(requests[2].body["max_tokens"], 1)
def test_run_one_request_fails_fixed_length_completion_mismatch(self) -> None:
request = TraceRequest(
row_id="r1",
arrival_s=0.0,
sampling_u=0.1,
body={"model": "m", "messages": [{"role": "user", "content": "x"}]},
prompt_tokens_hint=8,
completion_tokens_hint=2,
)
with mock.patch(
"aituner.worker.stream_chat_completion",
return_value=StreamMetrics(
ttft_ms=10.0,
tpot_ms=5.0,
completion_tokens=1,
),
):
outcome = _run_one_request(
request,
base_url="http://127.0.0.1:8000",
timeout_s=1.0,
)
self.assertFalse(outcome.success)
self.assertEqual(outcome.error, "completion_tokens_mismatch expected=2 actual=1")
self.assertEqual(outcome.completion_tokens, 1)
def test_build_prompt_mentions_completion_tokens_override(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets(
@@ -1950,6 +1998,86 @@ class CoreFlowTests(unittest.TestCase):
3.125,
)
def test_run_trial_persists_probe_request_details(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["search"]["max_probes"] = 1
study_path.write_text(json.dumps(payload), encoding="utf-8")
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = store.load_state(study.study_id)
proposal = Proposal.from_dict(
{
"observation": "baseline",
"diagnosis": "baseline",
"config_patch": {"env_patch": {}, "flag_patch": {}},
"expected_effects": ["measure"],
}
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
def fake_replay(requests, **kwargs):
return (
[
RequestOutcome(
request_id=request.row_id,
success=True,
ttft_ms=10.0,
tpot_ms=5.0,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=request.completion_tokens_hint,
)
for request in requests
],
False,
"",
)
process = mock.Mock()
process.poll.return_value = 0
with mock.patch("aituner.worker.subprocess.Popen", return_value=process):
with mock.patch("aituner.worker._wait_for_server_or_exit", return_value=None):
with mock.patch("aituner.worker._terminate_process_tree", return_value=None):
with mock.patch("aituner.worker._replay_requests", side_effect=fake_replay):
result = run_trial(Path(trial.artifact_dir) / "trial_spec.json")
self.assertEqual(result["status"], "completed")
details_path = Path(trial.artifact_dir) / "probe_details.jsonl"
self.assertTrue(details_path.exists())
rows = [
json.loads(line)
for line in details_path.read_text(encoding="utf-8").splitlines()
]
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["threshold"], 0.5)
self.assertEqual(rows[0]["outcomes"][0]["request_id"], "r1")
self.assertEqual(rows[0]["outcomes"][0]["sampling_u"], 0.1)
def test_materialize_trial_does_not_mutate_input_state_trials(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = store.load_state(study.study_id)
proposal = Proposal.from_dict(
{
"observation": "baseline",
"diagnosis": "baseline",
"config_patch": {"env_patch": {}, "flag_patch": {}},
"expected_effects": ["measure"],
}
)
_, next_state = store.materialize_trial(study=study, state=state, proposal=proposal)
self.assertEqual(state.trials, [])
self.assertEqual(len(next_state.trials), 1)
def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -2791,7 +2919,7 @@ class CoreFlowTests(unittest.TestCase):
"--store-root",
str(store_root),
"--max-trials",
"1",
"5",
]
)
@@ -2869,6 +2997,169 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(state.trials[0].config_patch, {"env_patch": {}, "flag_patch": {}})
self.assertEqual(state.trials[1].config_patch["flag_patch"], {"max-num-seqs": 64})
def test_cli_tune_stops_when_baseline_is_all_infeasible(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["llm"]["endpoint"] = {
"provider": "custom",
"base_url": "http://llm.example/v1",
"wire_api": "chat.completions",
"model": "test-model",
"api_key_env": "OPENAI_API_KEY",
}
study_path.write_text(json.dumps(payload), encoding="utf-8")
store_root = tmp_path / "store"
def fake_run_trial(trial_spec_path: Path) -> dict[str, object]:
payload = json.loads(trial_spec_path.read_text(encoding="utf-8"))
trial_root = Path(payload["artifact_dir"])
result = {
"study_id": payload["study_id"],
"trial_id": payload["trial_id"],
"status": "completed",
"best_sampling_u": None,
"best_request_rate": None,
"best_pass_rate": None,
"best_request_count": None,
"probes": [
{
"threshold": 0.5,
"feasible": False,
"payload": {"pass_rate": 0.0, "request_rate": 2.0},
},
{
"threshold": 0.25,
"feasible": False,
"payload": {"pass_rate": 0.5, "request_rate": 1.0},
},
],
"all_infeasible_diagnostics": {
"threshold": 0.25,
"request_rate": 1.0,
"pass_rate": 0.5,
"early_stop_reason": "slo_pass_rate_unrecoverable",
"latency_summary": {
"ttft_ms": {
"count": 2,
"mean": 1200.0,
"p50": 1100.0,
"p95": 1900.0,
"p99": 1980.0,
},
"tpot_ms": {
"count": 2,
"mean": 35.0,
"p50": 32.0,
"p95": 48.0,
"p99": 49.0,
},
},
},
}
(trial_root / "result.json").write_text(json.dumps(result), encoding="utf-8")
return result
with mock.patch("aituner.cli.run_trial", side_effect=fake_run_trial):
with mock.patch("aituner.cli.call_llm_for_proposal") as llm_mock:
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--max-trials",
"3",
]
)
self.assertEqual(exit_code, 0)
llm_mock.assert_not_called()
store = StudyStore(store_root)
state = store.load_state("study-1")
self.assertEqual(state.next_trial_index, 2)
self.assertEqual(len(state.trials), 1)
self.assertEqual(state.tuning_stop_reason, "baseline_all_infeasible")
self.assertIn("lowest_sampled_request_rate=1", state.tuning_stop_diagnosis)
self.assertIn("lowest_probe_ttft_ms", state.tuning_stop_diagnosis)
self.assertEqual(
state.tuning_stop_details["lowest_probe_latency_ms"]["ttft"]["p95"],
1900.0,
)
self.assertEqual(
state.tuning_stop_details["lowest_probe_latency_ms"]["tpot"]["p99"],
49.0,
)
with mock.patch("aituner.cli.run_trial") as run_trial_mock:
with mock.patch("aituner.cli.call_llm_for_proposal") as llm_mock:
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--max-trials",
"3",
]
)
self.assertEqual(exit_code, 0)
run_trial_mock.assert_not_called()
llm_mock.assert_not_called()
def test_cli_tune_max_trials_is_total_budget_on_resume(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["llm"]["endpoint"] = {
"provider": "custom",
"base_url": "http://llm.example/v1",
"wire_api": "chat.completions",
"model": "test-model",
"api_key_env": "OPENAI_API_KEY",
}
study_path.write_text(json.dumps(payload), encoding="utf-8")
store_root = tmp_path / "store"
study = load_study_spec(study_path)
store = StudyStore(store_root)
store.init_study(spec_path=study_path, study=study)
state = StudyState(
study_id=study.study_id,
next_trial_index=3,
trials=[
TrialSummary(trial_id="trial-0001", status="completed"),
TrialSummary(trial_id="trial-0002", status="completed"),
],
)
store.save_state(state)
with mock.patch("aituner.cli.call_llm_for_proposal") as llm_mock:
with mock.patch("aituner.cli.run_trial") as run_trial_mock:
exit_code = cli_main(
[
"study",
"tune",
"--spec",
str(study_path),
"--store-root",
str(store_root),
"--max-trials",
"2",
]
)
self.assertEqual(exit_code, 0)
llm_mock.assert_not_called()
run_trial_mock.assert_not_called()
self.assertEqual(store.load_state(study.study_id).next_trial_index, 3)
def test_load_compare_spec_requires_window_selection(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -2969,6 +3260,44 @@ class CoreFlowTests(unittest.TestCase):
self.assertTrue((tmp_path / ".compare" / "summary.json").exists())
self.assertTrue((tmp_path / ".compare" / "report.md").exists())
def test_compare_aggregate_counts_failed_and_no_feasible_windows(self) -> None:
summary = _aggregate_summary(
[
{
"baseline": {
"status": "completed",
"best_request_rate": 1.0,
"best_request_rate_per_gpu": 1.0,
},
"tuned": {
"status": "completed",
"best_request_rate": None,
"best_request_rate_per_gpu": None,
},
"delta": {"winner": "baseline"},
},
{
"baseline": {
"status": "failed",
"best_request_rate": None,
"best_request_rate_per_gpu": None,
},
"tuned": {
"status": "completed",
"best_request_rate": 2.0,
"best_request_rate_per_gpu": 2.0,
},
"delta": {"winner": "tuned"},
},
]
)
self.assertEqual(summary["baseline_completed_window_count"], 1)
self.assertEqual(summary["baseline_failed_window_count"], 1)
self.assertEqual(summary["baseline_no_feasible_window_count"], 1)
self.assertEqual(summary["tuned_completed_window_count"], 2)
self.assertEqual(summary["tuned_failed_window_count"], 0)
self.assertEqual(summary["tuned_no_feasible_window_count"], 1)
def test_run_compare_resolves_trial_ref_candidate(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)