diff --git a/docs/harness-tuning-progress.md b/docs/harness-tuning-progress.md new file mode 100644 index 0000000..fb499cf --- /dev/null +++ b/docs/harness-tuning-progress.md @@ -0,0 +1,59 @@ +# Harness-Guided AITuner Progress + +## Goal + +Improve AITuner convergence for the `dash0` internal vLLM + Qwen3.5-27B 0-8k chat study. The prior 12-iteration run can still propose worse configs after finding good ones. The new harness should make config proposals bottleneck-directed and stop spending GPU trials once no adjacent harness-guided probe is justified. + +## Paper Alignment + +- Prompt structure now includes an explicit `[Harnesses]` section aligned with paper Figure 12. +- The harness uses the paper's L-C-A workload model: + - L: prompt length percentiles and tail ratio. + - C: prefix/KV-cache reuse estimated from repeated `hash_ids` blocks when available. + - A: request rate, 1-second QPS burst ratio, and interarrival CV. +- Knob rules follow the paper's Figure 13 style: + - map active bottleneck to a knob family; + - probe adjacent legal choices; + - enforce guard conditions to avoid harmful side effects; + - prefer stopping over weak exploratory proposals after convergence. + +## Local Implementation Log + +- Added `src/aituner/harness.py`. + - Builds structured harness context for prompt injection. + - Adds TP, max-num-seqs, max-num-batched-tokens, chunked-prefill, and memory-utilization harnesses when those knobs are tunable. + - Extracts compact recent trial diagnostics from result JSON files. + - Adds a convergence guard based on recent completed trial performance. +- Extended `src/aituner/trace.py`. + - `summarize_window` now reports L-C-A features. + - `TraceRequest` now carries optional metadata for `hash_ids`, turn, parent chat id, and trace type. +- Extended `src/aituner/llm.py`. + - Prompt now includes tested config signatures and the structured harness section. + - Prompt schema now asks for `should_stop`. +- Extended `src/aituner/spec.py`. + - `Proposal` accepts optional `should_stop`. +- Extended `src/aituner/cli.py`. + - `study tune` honors `should_stop=true` by recording the proposal and not launching another GPU trial. +- Extended `tests/test_core_flow.py`. + - Prompt includes harness context. + - Trace summary includes new L-C-A fields. + - Proposal parsing accepts `should_stop`. + - CLI does not launch a trial for a stop proposal. + +## Local Verification + +- `python3 -m compileall -q src tests`: passed. +- `PYTHONPATH=src python3 -m unittest tests.test_core_flow`: passed, 59 tests. +- `pytest -q` and `python3 -m pytest -q`: not runnable locally because `pytest` is not installed. + +## Remote Experiment Log + +Pending. Next steps: + +1. Commit and push the harness implementation. +2. Pull on `dash0` in `/home/admin/cpfs/wjh/aituner/aituner`. +3. Start a real harness-guided Qwen3.5-27B 0-8k chat tuning run from `configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json`. +4. Compare the first few iterations against the prior 12-iteration behavior: + - best request rate per GPU should improve or reach the known good region in fewer trials; + - proposals should follow the active bottleneck harness; + - if the incumbent has converged, the LLM should emit `should_stop=true` instead of proposing a weak exploratory config. diff --git a/src/aituner/cli.py b/src/aituner/cli.py index 9f3c83b..e0fddaa 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -146,6 +146,19 @@ def cmd_study_tune(args: argparse.Namespace) -> int: raw_proposal_path.write_text(proposal_text, encoding="utf-8") proposal = parse_proposal_text(proposal_text, study) store.write_proposal(study.study_id, proposal_name, proposal) + if proposal.should_stop: + executed.append( + { + "trial_id": None, + "proposal_name": proposal_name, + "proposal_source": str(proposal_source) if proposal_source else "llm", + "stopped": True, + "diagnosis": proposal.diagnosis, + "state_best_trial_id": state.best_trial_id, + "state_best_request_rate": state.best_request_rate, + } + ) + break trial, _ = store.materialize_trial(study=study, state=state, proposal=proposal) trial_spec_path = Path(trial.artifact_dir) / "trial_spec.json" result = run_trial(trial_spec_path) diff --git a/src/aituner/harness.py b/src/aituner/harness.py new file mode 100644 index 0000000..5d725eb --- /dev/null +++ b/src/aituner/harness.py @@ -0,0 +1,390 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from .spec import StudySpec, StudyState, TrialSummary + + +def build_harness_context( + *, + study: StudySpec, + window_summary: dict[str, Any], + state: StudyState, +) -> dict[str, Any]: + recent_diagnostics = _recent_trial_diagnostics(state) + return { + "paper_alignment": { + "goal": "Use workload-feature-to-knob harnesses to reduce wasted trials and avoid regressing after a good configuration is found.", + "feature_model": "L-C-A: request lengths, inter-request KV-cache reuse, and arrival dynamics.", + "trial_policy": "Map the active bottleneck to one knob family, apply guard conditions, and stop when the incumbent has converged.", + }, + "workload_lca_profile": _workload_lca_profile(window_summary), + "recent_trial_diagnostics": recent_diagnostics, + "convergence_guard": _convergence_guard(state, recent_diagnostics), + "knob_harnesses": _knob_harnesses(study, window_summary, recent_diagnostics), + "proposal_rules": _proposal_rules(), + } + + +def render_harness_context(context: dict[str, Any]) -> str: + return json.dumps(context, ensure_ascii=False, indent=2) + + +def _workload_lca_profile(window_summary: dict[str, Any]) -> dict[str, Any]: + prefix_cache = window_summary.get("prefix_cache") + if not isinstance(prefix_cache, dict): + prefix_cache = {} + prompt_p50 = _as_float(window_summary.get("prompt_tokens_p50")) + prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95")) + prompt_p99 = _as_float(window_summary.get("prompt_tokens_p99")) + tail_ratio = _as_float(window_summary.get("prompt_tail_ratio_p95_p50")) + repeated_token_ratio = _as_float(prefix_cache.get("repeated_token_ratio_estimate")) + burst_ratio = _as_float(window_summary.get("arrival_burst_ratio_p95_to_mean")) + interarrival_cv = _as_float(window_summary.get("arrival_interarrival_cv")) + return { + "L_request_lengths": { + "prompt_tokens_p50": prompt_p50, + "prompt_tokens_p95": prompt_p95, + "prompt_tokens_p99": prompt_p99, + "tail_ratio_p95_p50": tail_ratio, + "regime": _length_regime(prompt_p95, tail_ratio), + }, + "C_prefix_cache": { + "repeated_token_ratio_estimate": repeated_token_ratio, + "repeated_block_ratio": _as_float(prefix_cache.get("repeated_block_ratio")), + "multi_turn_request_ratio": _as_float(prefix_cache.get("multi_turn_request_ratio")), + "rows_with_hash_ids": int(prefix_cache.get("rows_with_hash_ids") or 0), + "regime": _cache_regime(repeated_token_ratio), + }, + "A_arrivals": { + "request_rate": _as_float(window_summary.get("request_rate")), + "qps_1s_p50": _as_float(window_summary.get("arrival_qps_1s_p50")), + "qps_1s_p95": _as_float(window_summary.get("arrival_qps_1s_p95")), + "burst_ratio_p95_to_mean": burst_ratio, + "interarrival_cv": interarrival_cv, + "regime": _arrival_regime(burst_ratio, interarrival_cv), + }, + } + + +def _knob_harnesses( + study: StudySpec, + window_summary: dict[str, Any], + recent_diagnostics: list[dict[str, Any]], +) -> list[dict[str, Any]]: + tunable = set(study.engine.tunable_flags) + latest = recent_diagnostics[-1] if recent_diagnostics else {} + active_bottleneck = latest.get("active_bottleneck") or _workload_default_bottleneck( + window_summary + ) + harnesses: list[dict[str, Any]] = [] + if "tensor-parallel-size" in tunable: + harnesses.append( + { + "knob_family": "tensor-parallel-size", + "use_when": [ + "TTFT failures dominate, especially on long prompt windows.", + "The L profile has a heavy tail and prefill service time is the likely bottleneck.", + ], + "procedure": [ + "Probe only adjacent legal TP choices around the incumbent topology.", + "Prefer TP up when it lowers long-prefill latency and the projected request rate remains left of the high-TP queueing knee.", + "Prefer TP down or keep moderate TP when communication overhead or concurrency loss becomes visible.", + ], + "guards": [ + "Do not jump across multiple TP values without a launch-safe reason.", + "Do not raise TP for a short-prompt/cache-heavy window if TTFT is already passing and TPOT or queueing is the active bottleneck.", + "Keep TP/DP/EP inside topology_constraints.", + ], + "active_now": active_bottleneck == "ttft_prefill", + } + ) + if "max-num-seqs" in tunable: + harnesses.append( + { + "knob_family": "max-num-seqs", + "use_when": [ + "Prefix-cache reuse is high, requests are short-to-moderate after cache hits, and queueing/admission is limiting throughput.", + "TTFT is mostly passing but offered load stalls below the target.", + ], + "procedure": [ + "Increase max-num-seqs one step at a time to exploit cache-created parallelism.", + "Decrease it if p95 TTFT worsens, prefill queueing appears, or memory pressure causes launch/runtime failures.", + ], + "guards": [ + "Avoid large max-num-seqs increases on low-cache or heavy-tail windows.", + "Do not combine a max-num-seqs jump with a TP jump unless the history clearly isolates both bottlenecks.", + ], + "active_now": active_bottleneck == "admission_or_queueing", + } + ) + if "max-num-batched-tokens" in tunable: + harnesses.append( + { + "knob_family": "max-num-batched-tokens", + "use_when": [ + "Prefill batching is too small for the L profile or TTFT is hurt by excessive chunking overhead.", + "GPU work appears fragmented and the incumbent is stable but under-utilized.", + ], + "procedure": [ + "Raise MBT for long prompts when memory headroom and SLO permit.", + "Lower MBT if long requests monopolize batches and short-request TTFT regresses.", + ], + "guards": [ + "Keep MBT changes within a conservative trust region.", + "Do not raise MBT after OOM or launch failures involving memory-related knobs.", + ], + "active_now": active_bottleneck == "ttft_prefill", + } + ) + if "enable-chunked-prefill" in tunable: + harnesses.append( + { + "knob_family": "enable-chunked-prefill", + "use_when": [ + "The L profile has a long tail and long prefills block shorter requests.", + ], + "procedure": [ + "Keep chunked prefill enabled for heavy-tail chat windows unless history shows chunking overhead dominates.", + ], + "guards": [ + "Do not disable chunked prefill on a heavy-tail workload without direct evidence from a nearby trial.", + ], + "active_now": False, + } + ) + if "gpu-memory-utilization" in tunable: + harnesses.append( + { + "knob_family": "gpu-memory-utilization", + "use_when": [ + "The engine launches cleanly but memory headroom limits batching.", + ], + "procedure": [ + "Make small adjustments only after topology and batching knobs are stable.", + ], + "guards": [ + "Treat launch OOM as hard negative evidence and back off immediately.", + ], + "active_now": False, + } + ) + return harnesses + + +def _recent_trial_diagnostics(state: StudyState) -> list[dict[str, Any]]: + diagnostics: list[dict[str, Any]] = [] + for trial in state.trials[-stateful_history_limit() :]: + item: dict[str, Any] = { + "trial_id": trial.trial_id, + "status": trial.status, + "parallel_size": trial.parallel_size, + "best_request_rate": trial.best_request_rate, + "best_request_rate_per_gpu": trial.best_request_rate_per_gpu, + "best_pass_rate": trial.best_pass_rate, + "diagnosis": trial.diagnosis, + "config_patch": trial.config_patch, + "failure_stage": trial.failure_stage, + "failure_reason": trial.failure_reason, + } + result = _load_result(trial) + if result: + probes = result.get("probes") + if isinstance(probes, list) and probes: + best_probe = _best_feasible_probe(probes) + last_probe = probes[-1] if isinstance(probes[-1], dict) else None + item["probe_summary"] = { + "best_feasible_probe": _compact_probe(best_probe), + "last_probe": _compact_probe(last_probe), + } + item["active_bottleneck"] = _active_bottleneck(best_probe or last_probe) + elif result.get("all_infeasible_diagnostics"): + diag = result["all_infeasible_diagnostics"] + item["probe_summary"] = {"all_infeasible": diag} + item["active_bottleneck"] = _active_bottleneck({"payload": diag}) + else: + item["active_bottleneck"] = _failure_bottleneck(trial) + diagnostics.append(item) + return diagnostics + + +def stateful_history_limit() -> int: + return 8 + + +def _load_result(trial: TrialSummary) -> dict[str, Any] | None: + if not trial.result_path: + return None + path = Path(trial.result_path) + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + return payload if isinstance(payload, dict) else None + + +def _best_feasible_probe(probes: list[Any]) -> dict[str, Any] | None: + best: dict[str, Any] | None = None + for probe in probes: + if not isinstance(probe, dict) or not probe.get("feasible"): + continue + payload = probe.get("payload") + if not isinstance(payload, dict): + continue + if best is None: + best = probe + continue + best_payload = best.get("payload") if isinstance(best.get("payload"), dict) else {} + if _as_float(payload.get("request_rate")) > _as_float(best_payload.get("request_rate")): + best = probe + return best + + +def _compact_probe(probe: dict[str, Any] | None) -> dict[str, Any] | None: + if not isinstance(probe, dict): + return None + payload = probe.get("payload") + if not isinstance(payload, dict): + payload = probe + return { + "threshold": probe.get("threshold", payload.get("threshold")), + "feasible": probe.get("feasible", payload.get("feasible")), + "request_count": payload.get("request_count"), + "pass_rate": payload.get("pass_rate"), + "request_rate": payload.get("request_rate"), + "early_stopped": payload.get("early_stopped"), + "early_stop_reason": payload.get("early_stop_reason"), + "latency_summary": payload.get("latency_summary"), + } + + +def _active_bottleneck(probe: dict[str, Any] | None) -> str: + compact = _compact_probe(probe) + if not compact: + return "unknown" + early_stop_reason = str(compact.get("early_stop_reason") or "") + if early_stop_reason.startswith("arrival_lag_s>") or early_stop_reason == "slo_pass_rate_unrecoverable": + return "admission_or_queueing" + latency_summary = compact.get("latency_summary") + if not isinstance(latency_summary, dict): + return "unknown" + failed = latency_summary.get("failed_reason_counts") + if not isinstance(failed, dict) or not failed: + return "none_obvious" + ttft_count = sum(int(v) for k, v in failed.items() if str(k).startswith("ttft")) + tpot_count = sum(int(v) for k, v in failed.items() if str(k).startswith("tpot")) + request_failed_count = sum( + int(v) + for k, v in failed.items() + if not str(k).startswith("ttft") and not str(k).startswith("tpot") + ) + if ttft_count >= max(tpot_count, request_failed_count): + return "ttft_prefill" + if tpot_count >= max(ttft_count, request_failed_count): + return "decode_tpot" + return "admission_or_queueing" + + +def _failure_bottleneck(trial: TrialSummary) -> str: + if trial.failure_stage == "engine_launch": + return "launch_or_memory" + if trial.failure_reason: + return "runtime_failure" + return "unknown" + + +def _convergence_guard( + state: StudyState, + recent_diagnostics: list[dict[str, Any]], +) -> dict[str, Any]: + completed = [ + item + for item in recent_diagnostics + if item.get("status") == "completed" + and isinstance(item.get("best_request_rate_per_gpu"), (int, float)) + ] + should_stop = False + reason = "insufficient_completed_trials" + if len(completed) >= 3 and state.best_request_rate_per_gpu is not None: + recent_rates = [float(item["best_request_rate_per_gpu"]) for item in completed[-3:]] + best = float(state.best_request_rate_per_gpu) + near_best = [rate >= best * 0.98 for rate in recent_rates] + if all(near_best): + should_stop = True + reason = "last_three_completed_trials_are_within_2_percent_of_incumbent" + else: + reason = "recent_trials_still_show_material_variation" + elif state.best_request_rate_per_gpu is not None: + reason = "need_more_evidence_before_stop" + return { + "should_stop_if_no_harness_can_justify_a_new_adjacent_probe": should_stop, + "reason": reason, + "incumbent": { + "trial_id": state.best_trial_id, + "parallel_size": state.best_parallel_size, + "best_sampling_u": state.best_sampling_u, + "best_request_rate": state.best_request_rate, + "best_request_rate_per_gpu": state.best_request_rate_per_gpu, + }, + } + + +def _proposal_rules() -> list[str]: + return [ + "First decide the active bottleneck from recent_trial_diagnostics.", + "Pick at most one primary knob family from knob_harnesses unless the history proves a coupled change is needed.", + "Use adjacent legal values around the incumbent; avoid broad exploratory jumps.", + "If a proposed config is likely to reduce request_rate_per_gpu under the active guard, set should_stop=true instead of exploring.", + "Never repeat an already tested config signature.", + "Return should_stop=true when the convergence guard fires and no active harness justifies another adjacent probe.", + ] + + +def _workload_default_bottleneck(window_summary: dict[str, Any]) -> str: + tail_ratio = _as_float(window_summary.get("prompt_tail_ratio_p95_p50")) + prompt_p95 = _as_float(window_summary.get("prompt_tokens_p95")) + prefix_cache = window_summary.get("prefix_cache") + cache_ratio = 0.0 + if isinstance(prefix_cache, dict): + cache_ratio = _as_float(prefix_cache.get("repeated_token_ratio_estimate")) + if prompt_p95 >= 4096 or tail_ratio >= 2.0: + return "ttft_prefill" + if cache_ratio >= 0.25: + return "admission_or_queueing" + return "unknown" + + +def _length_regime(prompt_p95: float, tail_ratio: float) -> str: + if prompt_p95 >= 8192 or tail_ratio >= 4.0: + return "heavy_tail_long_prefill" + if prompt_p95 >= 4096 or tail_ratio >= 2.0: + return "moderate_tail_prefill_sensitive" + return "short_or_moderate" + + +def _cache_regime(repeated_token_ratio: float) -> str: + if repeated_token_ratio >= 0.5: + return "high_prefix_reuse" + if repeated_token_ratio >= 0.2: + return "moderate_prefix_reuse" + return "low_prefix_reuse" + + +def _arrival_regime(burst_ratio: float, interarrival_cv: float) -> str: + if burst_ratio >= 3.0 or interarrival_cv >= 2.0: + return "bursty" + if burst_ratio >= 1.5 or interarrival_cv >= 1.0: + return "moderately_bursty" + return "smooth" + + +def _as_float(value: Any) -> float: + if isinstance(value, bool): + return 0.0 + if isinstance(value, (int, float)): + return float(value) + return 0.0 diff --git a/src/aituner/llm.py b/src/aituner/llm.py index f27be86..6411972 100644 --- a/src/aituner/llm.py +++ b/src/aituner/llm.py @@ -4,6 +4,7 @@ import json from pathlib import Path from typing import Any +from .harness import build_harness_context, render_harness_context from .http_client import chat_completion, stream_text_completion from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState @@ -212,9 +213,10 @@ def build_prompt( parallel_candidates = _enumerate_parallel_candidates(study) sections = [ "You are tuning an OpenAI-compatible serving engine.", - "Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures.", + "Return exactly one JSON object with keys: observation, diagnosis, config_patch, expected_effects, why_not_previous_failures, should_stop.", "config_patch must contain env_patch and flag_patch.", "expected_effects must be a JSON array of short strings, not an object.", + "should_stop must be a boolean. Use true only when the harness convergence guard says another adjacent probe is not justified.", "Only use allowed tunable env keys and allowed tunable flag keys.", "Do not wrap the JSON in markdown fences or any extra text.", "Do not repeat a config that previously failed to launch unless the new patch explicitly removes the failing knob.", @@ -305,15 +307,48 @@ def build_prompt( "Parallel space candidates:", json.dumps(parallel_candidates, ensure_ascii=False, indent=2), "", + "Tested config signatures:", + json.dumps(_tested_config_signatures(state), ensure_ascii=False, indent=2), + "", + "Harnesses:", + render_harness_context( + build_harness_context( + study=study, + window_summary=window_summary, + state=state, + ) + ), + "", "The primary cross-topology comparison metric is request_rate_per_gpu, not raw request_rate.", "The proposal should beat the incumbent on request_rate_per_gpu under the 95%+ SLO target.", "The evaluator uses the best feasible sampling_u from the same tp_dp_product group when it exists.", "If a tp_dp_product group has no history yet, the evaluator starts from the study's original search.low and runs a full binary search for that group.", "Do not assume a configuration with fewer GPUs should inherit the global incumbent sampling_u.", + "Follow the active harness. Prefer stop over a weak exploratory proposal once a good incumbent has converged.", ] return "\n".join(sections) +def _tested_config_signatures(state: StudyState) -> list[dict[str, Any]]: + signatures: list[dict[str, Any]] = [] + seen: set[str] = set() + for trial in state.trials: + config_patch = trial.config_patch or {} + signature = json.dumps(config_patch, sort_keys=True, ensure_ascii=False) + if signature in seen: + continue + seen.add(signature) + signatures.append( + { + "trial_id": trial.trial_id, + "status": trial.status, + "best_request_rate_per_gpu": trial.best_request_rate_per_gpu, + "config_patch": config_patch, + } + ) + return signatures + + def load_capability_profile(study: StudySpec, *, study_spec_path: Path) -> dict[str, Any] | None: if not study.capability_profile_path: return None diff --git a/src/aituner/spec.py b/src/aituner/spec.py index 24393ab..1f0199e 100644 --- a/src/aituner/spec.py +++ b/src/aituner/spec.py @@ -642,6 +642,7 @@ class Proposal: config_patch: ConfigPatch expected_effects: list[str] why_not_previous_failures: str = "" + should_stop: bool = False @classmethod def from_dict(cls, data: Mapping[str, Any]) -> "Proposal": @@ -671,6 +672,11 @@ class Proposal: ), expected_effects=expected_effects_value, why_not_previous_failures=str(data.get("why_not_previous_failures") or "").strip(), + should_stop=( + _require_bool(data.get("should_stop"), context="proposal.should_stop") + if data.get("should_stop") is not None + else False + ), ) diff --git a/src/aituner/trace.py b/src/aituner/trace.py index 7d03a80..cabb3a5 100644 --- a/src/aituner/trace.py +++ b/src/aituner/trace.py @@ -2,7 +2,8 @@ from __future__ import annotations import json import math -from dataclasses import dataclass +import statistics +from dataclasses import dataclass, field from pathlib import Path from typing import Any, Mapping @@ -39,6 +40,7 @@ class TraceRequest: body: dict[str, Any] prompt_tokens_hint: int | None completion_tokens_hint: int | None + metadata: dict[str, Any] = field(default_factory=dict) def resolve_window_record(study: StudySpec, *, study_spec_path: Path) -> WindowRecord: @@ -223,6 +225,12 @@ def load_trace_requests(study: StudySpec, *, study_spec_path: Path) -> tuple[Win body=body, prompt_tokens_hint=prompt_tokens_hint, completion_tokens_hint=completion_tokens, + metadata={ + "hash_ids": row.get("hash_ids") if isinstance(row.get("hash_ids"), list) else None, + "turn": row.get("turn"), + "parent_chat_id": row.get("parent_chat_id"), + "type": row.get("type"), + }, ) ) requests.sort(key=lambda item: item.arrival_s) @@ -241,6 +249,23 @@ def summarize_window(requests: list[TraceRequest], window: WindowRecord) -> dict requests[-1].arrival_s - requests[0].arrival_s if len(requests) >= 2 else 0.0 ) qps = (len(requests) / duration) if duration > 0 else 0.0 + interarrivals = [ + max(0.0, requests[idx].arrival_s - requests[idx - 1].arrival_s) + for idx in range(1, len(requests)) + ] + mean_interarrival = statistics.fmean(interarrivals) if interarrivals else 0.0 + stdev_interarrival = statistics.pstdev(interarrivals) if len(interarrivals) >= 2 else 0.0 + interarrival_cv = ( + float(stdev_interarrival / mean_interarrival) if mean_interarrival > 0 else 0.0 + ) + one_second_bins: dict[int, int] = {} + for request in requests: + bin_id = int(math.floor(request.arrival_s)) + one_second_bins[bin_id] = one_second_bins.get(bin_id, 0) + 1 + one_second_counts = [float(value) for value in one_second_bins.values()] + cache_summary = _cache_summary(requests, window) + p50_prompt = _percentile(prompt_tokens, 50.0) + p95_prompt = _percentile(prompt_tokens, 95.0) return { "window_id": window.window_id, "trace_path": str(window.trace_path), @@ -248,10 +273,66 @@ def summarize_window(requests: list[TraceRequest], window: WindowRecord) -> dict "request_count": len(requests), "duration_s": duration, "request_rate": qps, - "prompt_tokens_p50": _percentile(prompt_tokens, 50.0), - "prompt_tokens_p95": _percentile(prompt_tokens, 95.0), + "prompt_tokens_p50": p50_prompt, + "prompt_tokens_p95": p95_prompt, + "prompt_tokens_p99": _percentile(prompt_tokens, 99.0), + "prompt_tail_ratio_p95_p50": ( + float(p95_prompt / max(p50_prompt, 1.0)) if prompt_tokens else 0.0 + ), "completion_tokens_p50": _percentile(completion_tokens, 50.0), "completion_tokens_p95": _percentile(completion_tokens, 95.0), + "arrival_interarrival_cv": interarrival_cv, + "arrival_qps_1s_p50": _percentile(one_second_counts, 50.0), + "arrival_qps_1s_p95": _percentile(one_second_counts, 95.0), + "arrival_burst_ratio_p95_to_mean": ( + float(_percentile(one_second_counts, 95.0) / max(qps, 1e-9)) + if one_second_counts and qps > 0 + else 0.0 + ), + "prefix_cache": cache_summary, + } + + +def _cache_summary(requests: list[TraceRequest], window: WindowRecord) -> dict[str, Any]: + block_size = int(window.source_payload.get("block_size") or 1) + seen_hashes: set[Any] = set() + repeated_blocks = 0 + total_blocks = 0 + repeated_token_estimate = 0 + total_token_estimate = 0 + multi_turn_count = 0 + rows_with_hashes = 0 + for request in requests: + prompt_tokens = int(request.prompt_tokens_hint or 0) + total_token_estimate += prompt_tokens + turn = request.metadata.get("turn") + if isinstance(turn, (int, float)) and turn > 1: + multi_turn_count += 1 + hash_ids = request.metadata.get("hash_ids") + if not isinstance(hash_ids, list): + continue + rows_with_hashes += 1 + request_repeated_blocks = 0 + for hash_id in hash_ids: + total_blocks += 1 + if hash_id in seen_hashes: + repeated_blocks += 1 + request_repeated_blocks += 1 + else: + seen_hashes.add(hash_id) + repeated_token_estimate += min(prompt_tokens, request_repeated_blocks * block_size) + return { + "block_size": block_size, + "rows_with_hash_ids": rows_with_hashes, + "multi_turn_request_ratio": ( + float(multi_turn_count / len(requests)) if requests else 0.0 + ), + "repeated_block_ratio": float(repeated_blocks / total_blocks) if total_blocks else 0.0, + "repeated_token_ratio_estimate": ( + float(repeated_token_estimate / total_token_estimate) + if total_token_estimate + else 0.0 + ), } diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 7d2dff9..f9801bc 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -218,6 +218,9 @@ class CoreFlowTests(unittest.TestCase): self.assertIn("study-1", prompt) self.assertIn('"current_best"', prompt) self.assertIn("queueing_knee_by_bucket", prompt) + self.assertIn("Harnesses:", prompt) + self.assertIn("workload_lca_profile", prompt) + self.assertIn("knob_harnesses", prompt) self.assertTrue(study_root.exists()) def test_trace_input_length_filter_keeps_only_matching_rows(self) -> None: @@ -239,6 +242,8 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual([item.prompt_tokens_hint for item in requests], [1000, 5000]) self.assertEqual(summary["request_count"], 2) self.assertEqual(summary["prompt_tokens_p95"], 5000.0) + self.assertIn("prefix_cache", summary) + self.assertIn("arrival_burst_ratio_p95_to_mean", summary) prompt = build_prompt( study=study, window_summary=summary, @@ -1728,6 +1733,44 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(state.best_request_rate, 2.0) self.assertEqual(state.next_trial_index, 3) + def test_cli_tune_honors_should_stop_proposal(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + proposal_path = tmp_path / "stop.json" + proposal_path.write_text( + json.dumps( + { + "observation": "incumbent converged", + "diagnosis": "no adjacent harness probe is justified", + "config_patch": {"env_patch": {}, "flag_patch": {}}, + "expected_effects": ["stop without spending another GPU trial"], + "why_not_previous_failures": "not applicable", + "should_stop": True, + } + ), + encoding="utf-8", + ) + store_root = tmp_path / "store" + with mock.patch("aituner.cli.run_trial") as run_trial_mock: + exit_code = cli_main( + [ + "study", + "tune", + "--spec", + str(study_path), + "--store-root", + str(store_root), + "--proposal-file", + str(proposal_path), + ] + ) + self.assertEqual(exit_code, 0) + run_trial_mock.assert_not_called() + store = StudyStore(store_root) + state = store.load_state("study-1") + self.assertEqual(state.next_trial_index, 1) + def test_load_compare_spec_requires_window_selection(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) @@ -1993,6 +2036,18 @@ class CoreFlowTests(unittest.TestCase): ["throughput: higher", "ttft: lower"], ) + def test_proposal_accepts_should_stop(self) -> None: + proposal = Proposal.from_dict( + { + "observation": "obs", + "diagnosis": "converged", + "config_patch": {"env_patch": {}, "flag_patch": {}}, + "expected_effects": ["avoid wasting another GPU trial"], + "should_stop": True, + } + ) + self.assertTrue(proposal.should_stop) + def test_parse_proposal_text_accepts_wrapped_json(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)