Compare commits

...

2 Commits

Author SHA1 Message Date
d7df1ebdac Add open source project metadata
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
2026-05-06 21:18:21 +08:00
c1ff64381d Harden trial measurement accounting 2026-05-06 21:18:09 +08:00
18 changed files with 604 additions and 20 deletions

25
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,25 @@
name: CI
on:
pull_request:
push:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.11"
- "3.12"
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install
run: python -m pip install -e .
- name: Test
run: python -m unittest discover -s tests -v

23
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,23 @@
# Contributing
## Development Setup
```bash
python3 -m pip install -e .
PYTHONPATH=src python3 -m unittest discover -s tests -v
```
## Change Requirements
- Add or update tests for behavior changes.
- Keep experiment claims tied to reproducible artifacts: study spec, trial spec,
result JSON, probe history, and per-request probe details.
- Do not publish benchmark conclusions from bounded or time-compressed replays
without clearly labeling the replay controls.
- Keep example configs free of private credentials and prefer explicit,
reproducible endpoint settings.
## Commit Hygiene
Use small commits grouped by behavior: measurement integrity, orchestration
logic, documentation, or infrastructure.

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 AITuner contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

72
README.md Normal file
View File

@@ -0,0 +1,72 @@
# AITuner
AITuner is a small study orchestrator for OpenAI-compatible serving engines. It
replays trace windows, searches for the highest feasible offered load under
configured SLOs, and records enough trial context for LLM- or harness-guided
configuration proposals.
## Status
This repository is research tooling. Treat reported experiment numbers as valid
only when the matching study spec, trial artifacts, probe history, and
`probe_details.jsonl` files are available for audit.
## Install
```bash
python3 -m pip install -e .
```
## Test
The test suite uses the Python standard library `unittest` runner:
```bash
PYTHONPATH=src python3 -m unittest discover -s tests -v
```
If the package is installed in editable mode, `PYTHONPATH=src` is optional.
## Basic Workflow
Initialize a study:
```bash
aituner study init --spec configs/examples/study.example.json
```
Run a local tuning loop:
```bash
aituner study tune --spec configs/examples/study.example.json --max-trials 2
```
Run a compare:
```bash
aituner compare run --spec configs/examples/compare.example.json
```
Remote experiment notes for this checkout live in `AGENTS.md`. The default
remote host is `dash0`, and code should be synchronized through Git before
remote runs.
## Experiment Integrity
- Fixed-length replay requests are scored only when completion token usage is
verifiable and matches the trace expectation.
- Each trial writes aggregate probe history and per-request probe details.
- `request_rate_per_gpu` is the primary cross-topology metric:
`best_feasible_request_rate / (tensor_parallel_size * data_parallel_size)`.
- Compare reports include failed and no-feasible window counts; do not interpret
mean request rates without those counts.
- Bounded replays using `max_requests_per_probe`, `completion_tokens_override`,
or `replay_time_scale` are convergence tests for that bounded workload, not
production benchmarks.
## Configuration Notes
Example specs that use `llm.endpoint.provider=codex` resolve the endpoint from
the local Codex configuration unless `llm.endpoint.base_url` or
`AITUNER_CODEX_BASE_URL` is set. Public, reproducible examples should prefer an
explicit endpoint or omit the LLM endpoint and use proposal files.

19
SECURITY.md Normal file
View File

@@ -0,0 +1,19 @@
# Security
AITuner launches local or remote serving engines and may replay trace payloads.
Do not commit secrets, API keys, private trace content, or private model access
tokens.
## Reporting
Report security issues privately to the project maintainers. If this repository
is mirrored to a public forge, use that forge's private vulnerability reporting
flow when available.
## Operational Guidance
- Keep `.env` files local; `.env.example` documents expected variable names.
- Review generated trial artifacts before publishing them, because request
payloads may contain trace text.
- Treat remote execution configs as sensitive when they include internal host
names, paths, or scheduler details.

View File

@@ -60,7 +60,7 @@ The speedup comes from reducing wasted proposal families, not from changing the
- Engine relaunch after early stop is available as opt-in for faster smoke studies, but it is not the default because it can change warm-state comparability. - Engine relaunch after early stop is available as opt-in for faster smoke studies, but it is not the default because it can change warm-state comparability.
5. Search-high saturation stop 5. Search-high saturation stop
- If the incumbent's highest measured probe is feasible, has no SLO failures, and is within the configured binary-search resolution of `search.high`, the harness stops before asking the LLM for another proposal. - If the incumbent's highest measured probe is feasible and is within the configured binary-search resolution of `search.high`, the harness stops before asking the LLM for another proposal. Individual request failures can be present when the aggregate probe still meets the configured pass-rate SLO.
- This is not a model-specific threshold. It means the workload search range, not the engine config, is currently the limiting measurement bound. - This is not a model-specific threshold. It means the workload search range, not the engine config, is currently the limiting measurement bound.
6. Deterministic first probes 6. Deterministic first probes

View File

@@ -118,7 +118,7 @@ A second generic diagnosis bug was fixed: non-SLO bookkeeping counts such as `pr
The base-relative patch issue is now guarded in code, not only in the LLM prompt. When `StudyStore.materialize_trial` sees a runtime/env-only proposal after a non-base incumbent has been found, it inherits the incumbent topology patch into the trial spec unless the proposal explicitly provides a topology. This keeps same-topology runtime validation on the actual incumbent while preserving the ability to test the base topology by stating it explicitly. The base-relative patch issue is now guarded in code, not only in the LLM prompt. When `StudyStore.materialize_trial` sees a runtime/env-only proposal after a non-base incumbent has been found, it inherits the incumbent topology patch into the trial spec unless the proposal explicitly provides a topology. This keeps same-topology runtime validation on the actual incumbent while preserving the ability to test the base topology by stating it explicitly.
Local verification: `PYTHONPATH=src python3 -m unittest discover -s tests` passed 68 tests. Local verification at that commit: `PYTHONPATH=src python3 -m unittest discover -s tests` passed. The current repository suite has since grown; rerun the command rather than relying on this historical test count.
## Current Harness Judgment ## Current Harness Judgment

View File

@@ -64,7 +64,7 @@ This run tests a stricter early-stop harness:
- the validation covered topology and runtime families, or accumulated at least three post-incumbent validation attempts. - the validation covered topology and runtime families, or accumulated at least three post-incumbent validation attempts.
- If the stop guard fires, `study tune` writes `harness-stop-XXXX` and exits without spending another GPU trial or asking the LLM for another proposal. - If the stop guard fires, `study tune` writes `harness-stop-XXXX` and exits without spending another GPU trial or asking the LLM for another proposal.
- A single-family all-infeasible plateau is not enough to stop deterministically. It only blocks repeating that family; the LLM must either justify a different family or later satisfy the validation/convergence stop rule. - A single-family all-infeasible plateau is not enough to stop deterministically. It only blocks repeating that family; the LLM must either justify a different family or later satisfy the validation/convergence stop rule.
- A search-high saturation guard stops immediately when the incumbent's highest measured probe is feasible, has no SLO failures, and is within the configured binary-search resolution of `search.high`. In that case the current study cannot measure a better config without increasing the workload search range, so more config proposals only waste tuning iterations. - A search-high saturation guard stops immediately when the incumbent's highest measured probe is feasible and is within the configured binary-search resolution of `search.high`. A feasible probe may still contain individual SLO failures as long as it meets the configured pass-rate target. In that case the current study cannot measure a better config without increasing the workload search range, so more config proposals only waste tuning iterations.
This is a generic harness rule, not a testcase-specific threshold. It does not depend on qwen27b, qwen235b, qwen30b, a fixed TP/DP value, or a hardcoded SLO number. This is a generic harness rule, not a testcase-specific threshold. It does not depend on qwen27b, qwen235b, qwen30b, a fixed TP/DP value, or a hardcoded SLO number.
@@ -76,7 +76,7 @@ Local test command:
PYTHONPATH=src python3 -m unittest tests.test_core_flow -q PYTHONPATH=src python3 -m unittest tests.test_core_flow -q
``` ```
Result: passed, 77 tests. Result at the time of this note: passed. The current repository test count may be higher; use the command above as the source of truth.
The added coverage checks: The added coverage checks:

View File

@@ -0,0 +1,59 @@
# Repo Audit Repair Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Repair the audit findings that affect measurement integrity, state correctness, documentation accuracy, and open-source readiness.
**Architecture:** Keep changes localized to the existing stdlib-only Python package. Measurement validation lives at the HTTP/worker boundary, study state fixes remain in `StudyStore`, compare reporting gains explicit failed/no-feasible accounting, and project metadata/docs are added at repo root.
**Tech Stack:** Python 3.11+ stdlib, `unittest`, setuptools `pyproject.toml`.
---
### Task 1: Measurement Integrity
**Files:**
- Modify: `src/aituner/http_client.py`
- Modify: `src/aituner/slo.py`
- Modify: `src/aituner/worker.py`
- Test: `tests/test_core_flow.py`
- [ ] Write failing tests for completion token source/mismatch failures and persisted per-request probe details.
- [ ] Run the targeted tests and confirm they fail for the expected reason.
- [ ] Add token source metadata to streamed metrics and request outcomes.
- [ ] Fail requests when configured completion length cannot be verified from usage or differs from expected.
- [ ] Persist probe outcome details under each trial artifact directory.
- [ ] Run targeted tests and the full unittest suite.
### Task 2: State, Spec, And Compare Guards
**Files:**
- Modify: `src/aituner/spec.py`
- Modify: `src/aituner/store.py`
- Modify: `src/aituner/compare.py`
- Modify: `scripts/run_multi_compare.py`
- Test: `tests/test_core_flow.py`
- [ ] Write failing tests for state list isolation, invalid trace numeric bounds, and compare aggregate failure accounting.
- [ ] Run targeted tests and confirm expected failures.
- [ ] Deep-copy/replace trial lists when materializing trials.
- [ ] Validate positive trace controls in `TraceSpec.from_dict`.
- [ ] Report failed/no-feasible counts in compare aggregates without changing existing winner semantics.
- [ ] Run targeted tests and the full unittest suite.
### Task 3: Docs And Open-Source Readiness
**Files:**
- Create: `README.md`
- Create: `LICENSE`
- Create: `CONTRIBUTING.md`
- Create: `SECURITY.md`
- Modify: `pyproject.toml`
- Modify: selected docs under `docs/`
- [ ] Add concise repo usage, verification, and experiment integrity guidance.
- [ ] Add MIT license and contribution/security notes.
- [ ] Add project metadata and optional test extra.
- [ ] Update stale docs about high-stop behavior and current test count.
- [ ] Run JSON validation and full unittest suite.
- [ ] Commit changes in logical groups.

View File

@@ -6,8 +6,23 @@ build-backend = "setuptools.build_meta"
name = "aituner" name = "aituner"
version = "0.1.0" version = "0.1.0"
description = "AITuner study orchestrator for OpenAI-compatible serving engines" description = "AITuner study orchestrator for OpenAI-compatible serving engines"
readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
license = {text = "MIT"}
authors = [{name = "AITuner contributors"}]
dependencies = [] dependencies = []
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
[project.optional-dependencies]
test = []
[project.scripts] [project.scripts]
aituner = "aituner.cli:main" aituner = "aituner.cli:main"

View File

@@ -372,6 +372,7 @@ def _aggregate(rows: list[dict[str, Any]], candidates: list[MultiCompareCandidat
if rates_per_gpu if rates_per_gpu
else None, else None,
"mean_pass_rate": (sum(pass_rates) / len(pass_rates)) if pass_rates else None, "mean_pass_rate": (sum(pass_rates) / len(pass_rates)) if pass_rates else None,
**_candidate_result_counts(rows, name),
} }
for row in rows: for row in rows:
wins[row["winner"]] = wins.get(row["winner"], 0) + 1 wins[row["winner"]] = wins.get(row["winner"], 0) + 1
@@ -382,6 +383,26 @@ def _aggregate(rows: list[dict[str, Any]], candidates: list[MultiCompareCandidat
} }
def _candidate_result_counts(rows: list[dict[str, Any]], name: str) -> dict[str, int]:
counts = {
"completed_window_count": 0,
"failed_window_count": 0,
"no_feasible_window_count": 0,
}
for row in rows:
result = row.get("candidates", {}).get(name)
if not isinstance(result, dict):
continue
status = str(result.get("status") or "")
if status == "completed":
counts["completed_window_count"] += 1
elif status == "failed":
counts["failed_window_count"] += 1
if not isinstance(result.get("best_request_rate_per_gpu"), (int, float)):
counts["no_feasible_window_count"] += 1
return counts
def _render_report(summary: dict[str, Any], candidates: list[MultiCompareCandidate]) -> str: def _render_report(summary: dict[str, Any], candidates: list[MultiCompareCandidate]) -> str:
candidate_names = [item.name for item in candidates] candidate_names = [item.name for item in candidates]
lines = [ lines = [
@@ -413,6 +434,9 @@ def _render_report(summary: dict[str, Any], candidates: list[MultiCompareCandida
lines.append( lines.append(
f"- `{name}` mean req/s=`{aggregate['mean_request_rate']}`, mean req/s/gpu=`{aggregate['mean_request_rate_per_gpu']}`, mean pass_rate=`{aggregate['mean_pass_rate']}`" f"- `{name}` mean req/s=`{aggregate['mean_request_rate']}`, mean req/s/gpu=`{aggregate['mean_request_rate_per_gpu']}`, mean pass_rate=`{aggregate['mean_pass_rate']}`"
) )
lines.append(
f" completed/failed/no-feasible windows=`{aggregate['completed_window_count']}`/`{aggregate['failed_window_count']}`/`{aggregate['no_feasible_window_count']}`"
)
header = ["Window", "Date"] header = ["Window", "Date"]
for name in candidate_names: for name in candidate_names:
header.extend([f"{name} req/s", f"{name} req/s/gpu"]) header.extend([f"{name} req/s", f"{name} req/s/gpu"])

View File

@@ -382,6 +382,8 @@ def _aggregate_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
wins = {"baseline": 0, "tuned": 0, "tie": 0, "incomparable": 0} wins = {"baseline": 0, "tuned": 0, "tie": 0, "incomparable": 0}
for row in rows: for row in rows:
wins[row["delta"]["winner"]] += 1 wins[row["delta"]["winner"]] += 1
baseline_counts = _candidate_result_counts(rows, "baseline")
tuned_counts = _candidate_result_counts(rows, "tuned")
return { return {
"window_count": len(rows), "window_count": len(rows),
"wins": wins, "wins": wins,
@@ -389,9 +391,31 @@ def _aggregate_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
"tuned_mean_request_rate": _mean_or_none(tuned_rates), "tuned_mean_request_rate": _mean_or_none(tuned_rates),
"baseline_mean_request_rate_per_gpu": _mean_or_none(baseline_per_gpu), "baseline_mean_request_rate_per_gpu": _mean_or_none(baseline_per_gpu),
"tuned_mean_request_rate_per_gpu": _mean_or_none(tuned_per_gpu), "tuned_mean_request_rate_per_gpu": _mean_or_none(tuned_per_gpu),
"baseline_completed_window_count": baseline_counts["completed"],
"baseline_failed_window_count": baseline_counts["failed"],
"baseline_no_feasible_window_count": baseline_counts["no_feasible"],
"tuned_completed_window_count": tuned_counts["completed"],
"tuned_failed_window_count": tuned_counts["failed"],
"tuned_no_feasible_window_count": tuned_counts["no_feasible"],
} }
def _candidate_result_counts(rows: list[dict[str, Any]], name: str) -> dict[str, int]:
counts = {"completed": 0, "failed": 0, "no_feasible": 0}
for row in rows:
result = row.get(name)
if not isinstance(result, dict):
continue
status = str(result.get("status") or "")
if status == "completed":
counts["completed"] += 1
elif status == "failed":
counts["failed"] += 1
if not isinstance(result.get("best_request_rate_per_gpu"), (int, float)):
counts["no_feasible"] += 1
return counts
def _mean_or_none(values: list[float]) -> float | None: def _mean_or_none(values: list[float]) -> float | None:
if not values: if not values:
return None return None
@@ -417,6 +441,8 @@ def _render_report(summary: dict[str, Any]) -> str:
f"- Tuned mean request rate: `{summary['aggregate']['tuned_mean_request_rate']}`", f"- Tuned mean request rate: `{summary['aggregate']['tuned_mean_request_rate']}`",
f"- Baseline mean request rate per GPU: `{summary['aggregate']['baseline_mean_request_rate_per_gpu']}`", f"- Baseline mean request rate per GPU: `{summary['aggregate']['baseline_mean_request_rate_per_gpu']}`",
f"- Tuned mean request rate per GPU: `{summary['aggregate']['tuned_mean_request_rate_per_gpu']}`", f"- Tuned mean request rate per GPU: `{summary['aggregate']['tuned_mean_request_rate_per_gpu']}`",
f"- Baseline completed/failed/no-feasible windows: `{summary['aggregate']['baseline_completed_window_count']}`/`{summary['aggregate']['baseline_failed_window_count']}`/`{summary['aggregate']['baseline_no_feasible_window_count']}`",
f"- Tuned completed/failed/no-feasible windows: `{summary['aggregate']['tuned_completed_window_count']}`/`{summary['aggregate']['tuned_failed_window_count']}`/`{summary['aggregate']['tuned_no_feasible_window_count']}`",
"", "",
"## Per Window", "## Per Window",
"", "",

View File

@@ -240,6 +240,8 @@ class StreamMetrics:
ttft_ms: float | None ttft_ms: float | None
tpot_ms: float | None tpot_ms: float | None
completion_tokens: int | None completion_tokens: int | None
completion_tokens_source: str = "usage"
streamed_chunk_count: int = 0
def stream_chat_completion( def stream_chat_completion(
@@ -260,6 +262,7 @@ def stream_chat_completion(
last_token_at: float | None = None last_token_at: float | None = None
chunk_token_count = 0 chunk_token_count = 0
completion_tokens: int | None = None completion_tokens: int | None = None
completion_tokens_source = "none"
try: try:
with _urlopen(request, timeout=timeout_s) as response: with _urlopen(request, timeout=timeout_s) as response:
for raw in _iter_sse_lines(response): for raw in _iter_sse_lines(response):
@@ -273,6 +276,7 @@ def stream_chat_completion(
comp = usage.get("completion_tokens") comp = usage.get("completion_tokens")
if isinstance(comp, int) and comp >= 0: if isinstance(comp, int) and comp >= 0:
completion_tokens = comp completion_tokens = comp
completion_tokens_source = "usage"
choices = payload.get("choices") choices = payload.get("choices")
if not isinstance(choices, list) or not choices: if not isinstance(choices, list) or not choices:
continue continue
@@ -290,7 +294,10 @@ def stream_chat_completion(
detail = exc.read().decode("utf-8", errors="replace") detail = exc.read().decode("utf-8", errors="replace")
raise HttpClientError(f"stream_chat_completion failed: {exc.code} {detail}") from exc raise HttpClientError(f"stream_chat_completion failed: {exc.code} {detail}") from exc
ttft_ms = None if first_token_at is None else (first_token_at - start) * 1000.0 ttft_ms = None if first_token_at is None else (first_token_at - start) * 1000.0
used_tokens = completion_tokens if completion_tokens is not None else chunk_token_count if completion_tokens is None and chunk_token_count > 0:
completion_tokens = chunk_token_count
completion_tokens_source = "stream_chunks"
used_tokens = completion_tokens
if ( if (
first_token_at is None first_token_at is None
or last_token_at is None or last_token_at is None
@@ -304,6 +311,8 @@ def stream_chat_completion(
ttft_ms=ttft_ms, ttft_ms=ttft_ms,
tpot_ms=tpot_ms, tpot_ms=tpot_ms,
completion_tokens=used_tokens if used_tokens > 0 else None, completion_tokens=used_tokens if used_tokens > 0 else None,
completion_tokens_source=completion_tokens_source,
streamed_chunk_count=chunk_token_count,
) )

View File

@@ -15,6 +15,7 @@ class RequestOutcome:
prompt_tokens: int | None prompt_tokens: int | None
completion_tokens: int | None completion_tokens: int | None
error: str = "" error: str = ""
completion_tokens_source: str = ""
@dataclass(frozen=True) @dataclass(frozen=True)

View File

@@ -354,6 +354,33 @@ class TraceSpec:
) )
if completion_tokens_override < 0: if completion_tokens_override < 0:
raise SpecError("trace.completion_tokens_override must be >= 0.") raise SpecError("trace.completion_tokens_override must be >= 0.")
max_requests_value = (
_require_int(max_requests, context="trace.max_requests_per_probe")
if max_requests is not None
else None
)
if max_requests_value is not None and max_requests_value <= 0:
raise SpecError("trace.max_requests_per_probe must be > 0.")
synthetic_prompt_cap_value = (
_require_int(
synthetic_prompt_cap,
context="trace.synthetic_prompt_cap_tokens",
)
if synthetic_prompt_cap is not None
else None
)
if synthetic_prompt_cap_value is not None and synthetic_prompt_cap_value < 0:
raise SpecError("trace.synthetic_prompt_cap_tokens must be >= 0.")
replay_time_scale = _require_float(
data.get("replay_time_scale", 1.0), context="trace.replay_time_scale"
)
if replay_time_scale <= 0:
raise SpecError("trace.replay_time_scale must be > 0.")
max_concurrency = _require_int(
data.get("max_concurrency", 64), context="trace.max_concurrency"
)
if max_concurrency <= 0:
raise SpecError("trace.max_concurrency must be > 0.")
return cls( return cls(
windows_path=_require_str(data.get("windows_path"), context="trace.windows_path"), windows_path=_require_str(data.get("windows_path"), context="trace.windows_path"),
window_id=_require_str(data.get("window_id"), context="trace.window_id"), window_id=_require_str(data.get("window_id"), context="trace.window_id"),
@@ -364,9 +391,7 @@ class TraceSpec:
completion_tokens_override=completion_tokens_override, completion_tokens_override=completion_tokens_override,
u_field=str(data.get("u_field") or "sampling_u").strip(), u_field=str(data.get("u_field") or "sampling_u").strip(),
timestamp_field=str(data.get("timestamp_field") or "timestamp").strip(), timestamp_field=str(data.get("timestamp_field") or "timestamp").strip(),
max_concurrency=_require_int( max_concurrency=max_concurrency,
data.get("max_concurrency", 64), context="trace.max_concurrency"
),
input_length_filter=( input_length_filter=(
InputLengthFilterSpec.from_dict( InputLengthFilterSpec.from_dict(
_require_mapping( _require_mapping(
@@ -378,13 +403,9 @@ class TraceSpec:
if data.get("input_length_filter") is not None if data.get("input_length_filter") is not None
else None else None
), ),
max_requests_per_probe=int(max_requests) if max_requests is not None else None, max_requests_per_probe=max_requests_value,
synthetic_prompt_cap_tokens=( synthetic_prompt_cap_tokens=synthetic_prompt_cap_value,
int(synthetic_prompt_cap) if synthetic_prompt_cap is not None else None replay_time_scale=replay_time_scale,
),
replay_time_scale=_require_float(
data.get("replay_time_scale", 1.0), context="trace.replay_time_scale"
),
early_stop_max_lag_s=( early_stop_max_lag_s=(
_require_float( _require_float(
data.get("early_stop_max_lag_s"), context="trace.early_stop_max_lag_s" data.get("early_stop_max_lag_s"), context="trace.early_stop_max_lag_s"

View File

@@ -98,8 +98,7 @@ class StudyStore:
result_path=str(trial_root / "result.json"), result_path=str(trial_root / "result.json"),
) )
self.write_json(trial_root / "trial_spec.json", to_jsonable(spec)) self.write_json(trial_root / "trial_spec.json", to_jsonable(spec))
next_state = replace(state, next_trial_index=state.next_trial_index + 1) next_trial = (
next_state.trials.append(
TrialSummary( TrialSummary(
trial_id=trial_id, trial_id=trial_id,
status="queued", status="queued",
@@ -108,6 +107,11 @@ class StudyStore:
config_patch=to_jsonable(proposal.config_patch), config_patch=to_jsonable(proposal.config_patch),
) )
) )
next_state = replace(
state,
next_trial_index=state.next_trial_index + 1,
trials=[*state.trials, next_trial],
)
self.save_state(next_state) self.save_state(next_state)
return spec, next_state return spec, next_state

View File

@@ -105,13 +105,49 @@ def _run_one_request(
) -> RequestOutcome: ) -> RequestOutcome:
try: try:
metrics = stream_chat_completion(base_url=base_url, body=request.body, timeout_s=timeout_s) metrics = stream_chat_completion(base_url=base_url, body=request.body, timeout_s=timeout_s)
expected_completion_tokens = request.completion_tokens_hint
actual_completion_tokens = metrics.completion_tokens
completion_tokens_source = getattr(metrics, "completion_tokens_source", "")
if expected_completion_tokens is not None:
if completion_tokens_source != "usage":
return RequestOutcome(
request_id=request.row_id,
success=False,
ttft_ms=metrics.ttft_ms,
tpot_ms=metrics.tpot_ms,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=actual_completion_tokens,
error=(
"completion_tokens_unverified "
f"source={completion_tokens_source or 'unknown'} "
f"expected={expected_completion_tokens} "
f"actual={actual_completion_tokens}"
),
completion_tokens_source=completion_tokens_source,
)
if actual_completion_tokens != expected_completion_tokens:
return RequestOutcome(
request_id=request.row_id,
success=False,
ttft_ms=metrics.ttft_ms,
tpot_ms=metrics.tpot_ms,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=actual_completion_tokens,
error=(
"completion_tokens_mismatch "
f"expected={expected_completion_tokens} "
f"actual={actual_completion_tokens}"
),
completion_tokens_source=completion_tokens_source,
)
return RequestOutcome( return RequestOutcome(
request_id=request.row_id, request_id=request.row_id,
success=True, success=True,
ttft_ms=metrics.ttft_ms, ttft_ms=metrics.ttft_ms,
tpot_ms=metrics.tpot_ms, tpot_ms=metrics.tpot_ms,
prompt_tokens=request.prompt_tokens_hint, prompt_tokens=request.prompt_tokens_hint,
completion_tokens=metrics.completion_tokens or request.completion_tokens_hint, completion_tokens=actual_completion_tokens or request.completion_tokens_hint,
completion_tokens_source=completion_tokens_source,
) )
except HttpClientError as exc: except HttpClientError as exc:
return RequestOutcome( return RequestOutcome(
@@ -125,6 +161,53 @@ def _run_one_request(
) )
def _probe_outcome_details(
*,
threshold: float,
selected: list[TraceRequest],
outcomes: list[RequestOutcome],
evaluations: list[Any],
early_stopped: bool,
early_stop_reason: str,
) -> dict[str, Any]:
selected_by_id = {request.row_id: request for request in selected}
return {
"threshold": threshold,
"early_stopped": early_stopped,
"early_stop_reason": early_stop_reason,
"outcomes": [
{
"request_id": outcome.request_id,
"sampling_u": (
selected_by_id[outcome.request_id].sampling_u
if outcome.request_id in selected_by_id
else None
),
"arrival_s": (
selected_by_id[outcome.request_id].arrival_s
if outcome.request_id in selected_by_id
else None
),
"success": outcome.success,
"ttft_ms": outcome.ttft_ms,
"tpot_ms": outcome.tpot_ms,
"prompt_tokens": outcome.prompt_tokens,
"expected_completion_tokens": (
selected_by_id[outcome.request_id].completion_tokens_hint
if outcome.request_id in selected_by_id
else None
),
"completion_tokens": outcome.completion_tokens,
"completion_tokens_source": outcome.completion_tokens_source,
"error": outcome.error,
"evaluation": evaluation.passed,
"reasons": evaluation.reasons,
}
for outcome, evaluation in zip(outcomes, evaluations)
],
}
def _replay_requests( def _replay_requests(
requests: list[TraceRequest], requests: list[TraceRequest],
*, *,
@@ -340,6 +423,9 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
artifact_dir = Path(trial.artifact_dir) artifact_dir = Path(trial.artifact_dir)
artifact_dir.mkdir(parents=True, exist_ok=True) artifact_dir.mkdir(parents=True, exist_ok=True)
engine_log_path = Path(trial.engine_log_path) engine_log_path = Path(trial.engine_log_path)
probe_details_path = artifact_dir / "probe_details.jsonl"
if probe_details_path.exists():
probe_details_path.unlink()
with engine_log_path.open("w", encoding="utf-8") as engine_log: with engine_log_path.open("w", encoding="utf-8") as engine_log:
def launch_process() -> subprocess.Popen[str]: def launch_process() -> subprocess.Popen[str]:
return subprocess.Popen( # noqa: S603 return subprocess.Popen( # noqa: S603
@@ -380,6 +466,18 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
drain_inflight_on_early_stop=not restart_after_early_stop, drain_inflight_on_early_stop=not restart_after_early_stop,
) )
evaluations, summary = summarize_evaluations(outcomes, study.slo) evaluations, summary = summarize_evaluations(outcomes, study.slo)
probe_details = _probe_outcome_details(
threshold=threshold,
selected=selected,
outcomes=outcomes,
evaluations=evaluations,
early_stopped=early_stopped,
early_stop_reason=early_stop_reason,
)
with probe_details_path.open("a", encoding="utf-8") as details_handle:
details_handle.write(
json.dumps(probe_details, ensure_ascii=False) + "\n"
)
request_rate = ( request_rate = (
len(selected) / max(window.window_end - window.window_start, 1e-9) len(selected) / max(window.window_end - window.window_start, 1e-9)
if selected if selected
@@ -406,6 +504,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
"tpot_ms": outcome.tpot_ms, "tpot_ms": outcome.tpot_ms,
"prompt_tokens": outcome.prompt_tokens, "prompt_tokens": outcome.prompt_tokens,
"completion_tokens": outcome.completion_tokens, "completion_tokens": outcome.completion_tokens,
"completion_tokens_source": outcome.completion_tokens_source,
"evaluation": evaluation.passed, "evaluation": evaluation.passed,
"reasons": evaluation.reasons, "reasons": evaluation.reasons,
} }

View File

@@ -9,9 +9,9 @@ from pathlib import Path
from unittest import mock from unittest import mock
from aituner.cli import main as cli_main from aituner.cli import main as cli_main
from aituner.compare import load_compare_spec, run_compare from aituner.compare import _aggregate_summary, load_compare_spec, run_compare
from aituner.engine import build_launch_recipe from aituner.engine import build_launch_recipe
from aituner.http_client import _auth_headers, _openai_url, _should_bypass_proxy from aituner.http_client import StreamMetrics, _auth_headers, _openai_url, _should_bypass_proxy
from aituner.job import append_job, build_trial_job from aituner.job import append_job, build_trial_job
from aituner.harness import ( from aituner.harness import (
build_harness_context, build_harness_context,
@@ -34,9 +34,11 @@ from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window from aituner.trace import load_trace_requests, summarize_window
from aituner.worker import ( from aituner.worker import (
_latency_summary, _latency_summary,
_run_one_request,
_replay_requests, _replay_requests,
_terminate_process_tree, _terminate_process_tree,
_wait_for_server_or_exit, _wait_for_server_or_exit,
run_trial,
) )
from aituner.trace import TraceRequest from aituner.trace import TraceRequest
@@ -863,6 +865,24 @@ class CoreFlowTests(unittest.TestCase):
with self.assertRaisesRegex(SpecError, "min_input_tokens must be <="): with self.assertRaisesRegex(SpecError, "min_input_tokens must be <="):
load_study_spec(study_path) load_study_spec(study_path)
def test_trace_rejects_non_positive_max_requests_per_probe(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets(
Path(tmp),
trace_overrides={"max_requests_per_probe": 0},
)
with self.assertRaisesRegex(SpecError, "max_requests_per_probe must be > 0"):
load_study_spec(study_path)
def test_trace_rejects_invalid_replay_time_scale(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets(
Path(tmp),
trace_overrides={"replay_time_scale": 0.0},
)
with self.assertRaisesRegex(SpecError, "replay_time_scale must be > 0"):
load_study_spec(study_path)
def test_decode_only_mode_is_loaded_and_prompt_mentions_it(self) -> None: def test_decode_only_mode_is_loaded_and_prompt_mentions_it(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp) tmp_path = Path(tmp)
@@ -1456,6 +1476,34 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(requests[2].body["min_tokens"], 1) self.assertEqual(requests[2].body["min_tokens"], 1)
self.assertEqual(requests[2].body["max_tokens"], 1) self.assertEqual(requests[2].body["max_tokens"], 1)
def test_run_one_request_fails_fixed_length_completion_mismatch(self) -> None:
request = TraceRequest(
row_id="r1",
arrival_s=0.0,
sampling_u=0.1,
body={"model": "m", "messages": [{"role": "user", "content": "x"}]},
prompt_tokens_hint=8,
completion_tokens_hint=2,
)
with mock.patch(
"aituner.worker.stream_chat_completion",
return_value=StreamMetrics(
ttft_ms=10.0,
tpot_ms=5.0,
completion_tokens=1,
),
):
outcome = _run_one_request(
request,
base_url="http://127.0.0.1:8000",
timeout_s=1.0,
)
self.assertFalse(outcome.success)
self.assertEqual(outcome.error, "completion_tokens_mismatch expected=2 actual=1")
self.assertEqual(outcome.completion_tokens, 1)
def test_build_prompt_mentions_completion_tokens_override(self) -> None: def test_build_prompt_mentions_completion_tokens_override(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
study_path = _write_study_assets( study_path = _write_study_assets(
@@ -1950,6 +1998,86 @@ class CoreFlowTests(unittest.TestCase):
3.125, 3.125,
) )
def test_run_trial_persists_probe_request_details(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["search"]["max_probes"] = 1
study_path.write_text(json.dumps(payload), encoding="utf-8")
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = store.load_state(study.study_id)
proposal = Proposal.from_dict(
{
"observation": "baseline",
"diagnosis": "baseline",
"config_patch": {"env_patch": {}, "flag_patch": {}},
"expected_effects": ["measure"],
}
)
trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal)
def fake_replay(requests, **kwargs):
return (
[
RequestOutcome(
request_id=request.row_id,
success=True,
ttft_ms=10.0,
tpot_ms=5.0,
prompt_tokens=request.prompt_tokens_hint,
completion_tokens=request.completion_tokens_hint,
)
for request in requests
],
False,
"",
)
process = mock.Mock()
process.poll.return_value = 0
with mock.patch("aituner.worker.subprocess.Popen", return_value=process):
with mock.patch("aituner.worker._wait_for_server_or_exit", return_value=None):
with mock.patch("aituner.worker._terminate_process_tree", return_value=None):
with mock.patch("aituner.worker._replay_requests", side_effect=fake_replay):
result = run_trial(Path(trial.artifact_dir) / "trial_spec.json")
self.assertEqual(result["status"], "completed")
details_path = Path(trial.artifact_dir) / "probe_details.jsonl"
self.assertTrue(details_path.exists())
rows = [
json.loads(line)
for line in details_path.read_text(encoding="utf-8").splitlines()
]
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["threshold"], 0.5)
self.assertEqual(rows[0]["outcomes"][0]["request_id"], "r1")
self.assertEqual(rows[0]["outcomes"][0]["sampling_u"], 0.1)
def test_materialize_trial_does_not_mutate_input_state_trials(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
store = StudyStore(tmp_path / ".aituner" / "studies")
store.init_study(spec_path=study_path, study=study)
state = store.load_state(study.study_id)
proposal = Proposal.from_dict(
{
"observation": "baseline",
"diagnosis": "baseline",
"config_patch": {"env_patch": {}, "flag_patch": {}},
"expected_effects": ["measure"],
}
)
_, next_state = store.materialize_trial(study=study, state=state, proposal=proposal)
self.assertEqual(state.trials, [])
self.assertEqual(len(next_state.trials), 1)
def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None: def test_materialize_trial_uses_incumbent_sampling_u_as_search_floor(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp) tmp_path = Path(tmp)
@@ -2969,6 +3097,44 @@ class CoreFlowTests(unittest.TestCase):
self.assertTrue((tmp_path / ".compare" / "summary.json").exists()) self.assertTrue((tmp_path / ".compare" / "summary.json").exists())
self.assertTrue((tmp_path / ".compare" / "report.md").exists()) self.assertTrue((tmp_path / ".compare" / "report.md").exists())
def test_compare_aggregate_counts_failed_and_no_feasible_windows(self) -> None:
summary = _aggregate_summary(
[
{
"baseline": {
"status": "completed",
"best_request_rate": 1.0,
"best_request_rate_per_gpu": 1.0,
},
"tuned": {
"status": "completed",
"best_request_rate": None,
"best_request_rate_per_gpu": None,
},
"delta": {"winner": "baseline"},
},
{
"baseline": {
"status": "failed",
"best_request_rate": None,
"best_request_rate_per_gpu": None,
},
"tuned": {
"status": "completed",
"best_request_rate": 2.0,
"best_request_rate_per_gpu": 2.0,
},
"delta": {"winner": "tuned"},
},
]
)
self.assertEqual(summary["baseline_completed_window_count"], 1)
self.assertEqual(summary["baseline_failed_window_count"], 1)
self.assertEqual(summary["baseline_no_feasible_window_count"], 1)
self.assertEqual(summary["tuned_completed_window_count"], 2)
self.assertEqual(summary["tuned_failed_window_count"], 0)
self.assertEqual(summary["tuned_no_feasible_window_count"], 1)
def test_run_compare_resolves_trial_ref_candidate(self) -> None: def test_run_compare_resolves_trial_ref_candidate(self) -> None:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp) tmp_path = Path(tmp)