Add L-C-A workload profile metric and CLI profile commands

Implement the paper's 10-dimensional L-C-A workload feature vector
(RobustScaler-normalized, sim=exp(-||dz||)) in lca.py, and wire it into
`aituner profile window` / `aituner profile similarity`. Covered by tests.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 14:02:24 +08:00
parent 984eb1f325
commit 27d1c8fa92
3 changed files with 770 additions and 2 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import argparse
import json
import sys
from dataclasses import replace
from pathlib import Path
from .compare import run_compare
@@ -12,8 +13,20 @@ from .harness import (
build_harness_stop_proposal,
)
from .job import append_job, build_trial_job
from .lca import (
build_workload_profile,
resolve_length_mode,
similarity_report,
)
from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text
from .spec import Proposal, SpecError, load_study_spec, to_jsonable
from .spec import (
Proposal,
SpecError,
StudySpec,
load_structured_file,
load_study_spec,
to_jsonable,
)
from .store import StudyStore
from .trace import load_trace_requests, summarize_window
from .worker import run_trial
@@ -422,6 +435,159 @@ def cmd_compare_run(args: argparse.Namespace) -> int:
return 0
def _resolve_profile_gpu_count(args: argparse.Namespace, study: StudySpec) -> int:
gpu_count = args.gpu_count
if gpu_count is None:
gpu_count = study.hardware.gpu_count
if gpu_count <= 0:
raise SpecError("--gpu-count must be > 0.")
return int(gpu_count)
def _load_profile_study_spec(spec_path: Path) -> StudySpec:
payload = dict(load_structured_file(spec_path))
llm_payload = dict(payload.get("llm") or {})
llm_payload.pop("endpoint", None)
payload["llm"] = llm_payload
return StudySpec.from_dict(payload)
def _profile_current_study_window(args: argparse.Namespace) -> dict[str, object]:
spec_path = Path(args.spec).resolve()
study = _load_profile_study_spec(spec_path)
mode = resolve_length_mode(
request_mode=study.trace.request_mode,
length_mode=args.length_mode,
)
window, requests = load_trace_requests(study, study_spec_path=spec_path)
profile = build_workload_profile(
requests,
window,
gpu_count=_resolve_profile_gpu_count(args, study),
length_mode=mode,
)
return {
"profile": profile.to_dict(),
"source": {
"study_spec_path": str(spec_path),
"window_id": study.trace.window_id,
},
}
def _resolve_windows_path_for_profile(study: StudySpec, *, study_spec_path: Path) -> Path:
path = Path(study.trace.windows_path)
if not path.is_absolute():
path = (study_spec_path.parent / path).resolve()
return path
def _load_profile_windows(
study: StudySpec,
*,
study_spec_path: Path,
) -> list[dict[str, object]]:
windows_path = _resolve_windows_path_for_profile(study, study_spec_path=study_spec_path)
payload = json.loads(windows_path.read_text(encoding="utf-8"))
raw_windows = payload.get("windows") if isinstance(payload, dict) else payload
if not isinstance(raw_windows, list):
raise SpecError(f"windows payload must contain a list: {windows_path}")
return [
{str(key): value for key, value in item.items()}
for item in raw_windows
if isinstance(item, dict)
]
def _selected_profile_windows(
args: argparse.Namespace,
study: StudySpec,
*,
study_spec_path: Path,
) -> list[dict[str, object]]:
windows = _load_profile_windows(study, study_spec_path=study_spec_path)
window_ids = set(args.window_id or [])
selected: list[dict[str, object]] = []
for item in windows:
window_id = str(item.get("window_id") or "").strip()
if not window_id:
continue
if window_ids and window_id not in window_ids:
continue
if not window_ids and not args.all:
if window_id != study.trace.window_id:
continue
trace_type = str(item.get("trace_type") or "").strip()
if args.trace_type and trace_type != args.trace_type:
continue
date_value = str(item.get("date") or "").strip()
if args.date_from and date_value and date_value < args.date_from:
continue
if args.date_to and date_value and date_value > args.date_to:
continue
if args.slot_token and str(item.get("slot_token") or "").strip() != args.slot_token:
continue
selected.append(item)
selected.sort(
key=lambda item: (
str(item.get("date") or ""),
str(item.get("slot_token") or ""),
str(item.get("window_id") or ""),
)
)
if args.limit is not None:
selected = selected[: args.limit]
if not selected:
raise SpecError("No trace windows selected for profile similarity.")
return selected
def cmd_profile_window(args: argparse.Namespace) -> int:
print(json.dumps(_profile_current_study_window(args), ensure_ascii=False, indent=2))
return 0
def cmd_profile_similarity(args: argparse.Namespace) -> int:
spec_path = Path(args.spec).resolve()
study = _load_profile_study_spec(spec_path)
mode = resolve_length_mode(
request_mode=study.trace.request_mode,
length_mode=args.length_mode,
)
gpu_count = _resolve_profile_gpu_count(args, study)
profiles = []
selected = _selected_profile_windows(args, study, study_spec_path=spec_path)
for item in selected:
window_id = str(item["window_id"])
window_study = replace(study, trace=replace(study.trace, window_id=window_id))
window, requests = load_trace_requests(window_study, study_spec_path=spec_path)
profiles.append(
build_workload_profile(
requests,
window,
gpu_count=gpu_count,
length_mode=mode,
)
)
print(
json.dumps(
{
"source": {
"study_spec_path": str(spec_path),
"selected_window_count": len(profiles),
"length_mode": mode,
"gpu_count": gpu_count,
},
"profiles": [profile.to_dict() for profile in profiles],
"similarity": similarity_report(profiles),
},
ensure_ascii=False,
indent=2,
)
)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="AITuner CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
@@ -490,6 +656,50 @@ def build_parser() -> argparse.ArgumentParser:
compare_run.add_argument("--output-root")
compare_run.set_defaults(func=cmd_compare_run)
profile = subparsers.add_parser("profile")
profile_sub = profile.add_subparsers(dest="profile_command", required=True)
profile_window = profile_sub.add_parser("window")
profile_window.add_argument("--spec", required=True)
profile_window.add_argument(
"--length-mode",
default="auto",
choices=["auto", "total", "input", "output"],
help="Token length basis for the L vector. auto uses output for decode_only and total otherwise.",
)
profile_window.add_argument(
"--gpu-count",
type=int,
help="GPU denominator for per-GPU arrival rate. Defaults to hardware.gpu_count.",
)
profile_window.set_defaults(func=cmd_profile_window)
profile_similarity = profile_sub.add_parser("similarity")
profile_similarity.add_argument("--spec", required=True)
profile_similarity.add_argument("--window-id", action="append")
profile_similarity.add_argument("--trace-type")
profile_similarity.add_argument("--date-from")
profile_similarity.add_argument("--date-to")
profile_similarity.add_argument("--slot-token")
profile_similarity.add_argument("--limit", type=int)
profile_similarity.add_argument(
"--all",
action="store_true",
help="Profile all windows selected by filters. Without this or --window-id, only the study window is used.",
)
profile_similarity.add_argument(
"--length-mode",
default="auto",
choices=["auto", "total", "input", "output"],
help="Token length basis for the L vector. auto uses output for decode_only and total otherwise.",
)
profile_similarity.add_argument(
"--gpu-count",
type=int,
help="GPU denominator for per-GPU arrival rate. Defaults to hardware.gpu_count.",
)
profile_similarity.set_defaults(func=cmd_profile_similarity)
return parser

406
src/aituner/lca.py Normal file
View File

@@ -0,0 +1,406 @@
from __future__ import annotations
import json
import math
import statistics
from dataclasses import dataclass
from typing import Any, Sequence
from .trace import TraceRequest, WindowRecord
EPSILON = 1e-9
FEATURE_NAMES = [
"L.log_mean_length",
"L.log_p95_over_mean_length",
"L.cv_length",
"C.log_mean_hit_length",
"C.log_p95_over_mean_hit_length",
"C.cv_hit_length",
"C.hit_rate",
"A.log_request_rate_per_gpu",
"A.cv_interarrival",
"A.log_fano_1s",
]
FAMILY_SLICES = {
"L": slice(0, 3),
"C": slice(3, 7),
"A": slice(7, 10),
}
LENGTH_MODES = {"total", "input", "output"}
@dataclass(frozen=True)
class WorkloadProfile:
window_id: str
trace_type: str
request_count: int
duration_s: float
gpu_count: int
length_mode: str
feature_names: list[str]
vector: list[float]
stats: dict[str, Any]
def to_dict(self) -> dict[str, Any]:
return {
"window_id": self.window_id,
"trace_type": self.trace_type,
"request_count": self.request_count,
"duration_s": self.duration_s,
"gpu_count": self.gpu_count,
"length_mode": self.length_mode,
"feature_names": self.feature_names,
"vector": self.vector,
"stats": self.stats,
}
@dataclass(frozen=True)
class RobustScale:
feature_names: list[str]
center: list[float]
scale: list[float]
def transform(self, vector: Sequence[float]) -> list[float]:
return [
(float(value) - self.center[idx]) / self.scale[idx]
for idx, value in enumerate(vector)
]
def to_dict(self) -> dict[str, Any]:
return {
"feature_names": self.feature_names,
"center": self.center,
"scale": self.scale,
}
def resolve_length_mode(*, request_mode: str | None = None, length_mode: str = "auto") -> str:
normalized = str(length_mode or "auto").strip().lower()
if normalized == "auto":
return (
"output"
if str(request_mode or "").strip().lower() == "decode_only"
else "total"
)
if normalized not in LENGTH_MODES:
raise ValueError(
"length_mode must be one of: auto, total, input, output."
)
return normalized
def build_workload_profile(
requests: list[TraceRequest],
window: WindowRecord,
*,
gpu_count: int,
length_mode: str = "total",
) -> WorkloadProfile:
if gpu_count <= 0:
raise ValueError("gpu_count must be > 0.")
if length_mode not in LENGTH_MODES:
raise ValueError(f"Unsupported length_mode: {length_mode}")
duration_s = _duration_s(requests, window)
input_lengths = [float(item.prompt_tokens_hint or 0) for item in requests]
output_lengths = [float(item.completion_tokens_hint or 0) for item in requests]
profile_lengths = [
_profile_length(input_len, output_len, length_mode=length_mode)
for input_len, output_len in zip(input_lengths, output_lengths)
]
hit_lengths, cache_stats = _ideal_cache_hit_lengths(
requests,
input_lengths=input_lengths,
block_size=_block_size(window),
)
arrival_stats = _arrival_stats(requests, duration_s=duration_s, gpu_count=gpu_count)
length_stats = _series_stats(profile_lengths)
hit_stats = _series_stats(hit_lengths)
total_profile_length = sum(profile_lengths)
total_input_length = sum(input_lengths)
total_hit_length = sum(hit_lengths)
feature_hit_rate = (
float(total_hit_length / max(total_profile_length, EPSILON))
if total_profile_length > 0
else 0.0
)
input_hit_rate = (
float(total_hit_length / max(total_input_length, EPSILON))
if total_input_length > 0
else 0.0
)
vector = [
math.log1p(length_stats["mean"]),
math.log1p(length_stats["p95"] / max(length_stats["mean"], EPSILON)),
length_stats["cv"],
math.log1p(hit_stats["mean"]),
math.log1p(hit_stats["p95"] / max(hit_stats["mean"], EPSILON)),
hit_stats["cv"],
feature_hit_rate,
math.log1p(arrival_stats["request_rate_per_gpu"]),
arrival_stats["interarrival_cv"],
math.log1p(arrival_stats["fano_1s"]),
]
return WorkloadProfile(
window_id=window.window_id,
trace_type=window.trace_type,
request_count=len(requests),
duration_s=duration_s,
gpu_count=int(gpu_count),
length_mode=length_mode,
feature_names=list(FEATURE_NAMES),
vector=[float(item) for item in vector],
stats={
"length": {
**length_stats,
"mode": length_mode,
"total": total_profile_length,
"input_total": total_input_length,
"output_total": sum(output_lengths),
},
"cache": {
**hit_stats,
**cache_stats,
"total_hit_length": total_hit_length,
"hit_rate": feature_hit_rate,
"input_hit_rate": input_hit_rate,
},
"arrival": arrival_stats,
},
)
def fit_robust_scale(profiles: Sequence[WorkloadProfile]) -> RobustScale:
if not profiles:
raise ValueError("At least one profile is required to fit a robust scale.")
centers: list[float] = []
scales: list[float] = []
for idx in range(len(FEATURE_NAMES)):
values = [float(profile.vector[idx]) for profile in profiles]
median = _percentile(values, 50.0)
iqr = _percentile(values, 75.0) - _percentile(values, 25.0)
centers.append(float(median))
scales.append(float(iqr if abs(iqr) > EPSILON else 1.0))
return RobustScale(feature_names=list(FEATURE_NAMES), center=centers, scale=scales)
def profile_similarity(
left: WorkloadProfile,
right: WorkloadProfile,
*,
scale: RobustScale | None = None,
) -> float:
scaler = scale or fit_robust_scale([left, right])
z_left = scaler.transform(left.vector)
z_right = scaler.transform(right.vector)
return _similarity_from_z(z_left, z_right)
def similarity_report(profiles: Sequence[WorkloadProfile]) -> dict[str, Any]:
if not profiles:
raise ValueError("At least one profile is required.")
scale = fit_robust_scale(profiles)
transformed = [scale.transform(profile.vector) for profile in profiles]
rows: list[dict[str, Any]] = []
matrix: list[list[float]] = []
for i, left in enumerate(profiles):
row_values: list[float] = []
for j, right in enumerate(profiles):
sim = _similarity_from_z(transformed[i], transformed[j])
row_values.append(sim)
rows.append(
{
"left": left.window_id,
"right": right.window_id,
"similarity": sim,
"family_similarity": _family_similarity(transformed[i], transformed[j]),
}
)
matrix.append(row_values)
return {
"feature_names": list(FEATURE_NAMES),
"scaler": scale.to_dict(),
"windows": [profile.window_id for profile in profiles],
"matrix": matrix,
"pairs": rows,
}
def dumps_profile(profile: WorkloadProfile) -> str:
return json.dumps(profile.to_dict(), ensure_ascii=False, indent=2) + "\n"
def _duration_s(requests: list[TraceRequest], window: WindowRecord) -> float:
duration = max(float(window.window_end) - float(window.window_start), 0.0)
if duration > 0:
return duration
if len(requests) >= 2:
return max(0.0, float(requests[-1].arrival_s) - float(requests[0].arrival_s))
return 0.0
def _profile_length(input_length: float, output_length: float, *, length_mode: str) -> float:
if length_mode == "input":
return max(input_length, 0.0)
if length_mode == "output":
return max(output_length, 0.0)
return max(input_length, 0.0) + max(output_length, 0.0)
def _block_size(window: WindowRecord) -> int:
value = window.source_payload.get("block_size")
if isinstance(value, bool):
return 1
if isinstance(value, (int, float)) and value > 0:
return int(value)
if isinstance(value, str) and value.strip():
try:
parsed = int(value)
except ValueError:
return 1
return parsed if parsed > 0 else 1
return 1
def _ideal_cache_hit_lengths(
requests: list[TraceRequest],
*,
input_lengths: list[float],
block_size: int,
) -> tuple[list[float], dict[str, Any]]:
seen_hashes: set[Any] = set()
hit_lengths: list[float] = []
total_blocks = 0
repeated_blocks = 0
rows_with_hash_ids = 0
for request, input_length in zip(requests, input_lengths):
hash_ids = request.metadata.get("hash_ids")
if not isinstance(hash_ids, list):
hit_lengths.append(0.0)
continue
rows_with_hash_ids += 1
repeated_for_request = 0
for hash_id in hash_ids:
total_blocks += 1
if hash_id in seen_hashes:
repeated_blocks += 1
repeated_for_request += 1
else:
seen_hashes.add(hash_id)
hit_lengths.append(float(min(max(input_length, 0.0), repeated_for_request * block_size)))
return hit_lengths, {
"block_size": block_size,
"rows_with_hash_ids": rows_with_hash_ids,
"total_blocks": total_blocks,
"repeated_blocks": repeated_blocks,
"repeated_block_ratio": (
float(repeated_blocks / total_blocks) if total_blocks else 0.0
),
}
def _arrival_stats(
requests: list[TraceRequest],
*,
duration_s: float,
gpu_count: int,
) -> dict[str, Any]:
arrivals = [float(item.arrival_s) for item in requests]
interarrivals = [
max(0.0, arrivals[idx] - arrivals[idx - 1])
for idx in range(1, len(arrivals))
]
per_second_counts = _per_second_counts(arrivals, duration_s=duration_s)
qps = float(len(requests) / duration_s) if duration_s > 0 else 0.0
return {
"request_rate": qps,
"request_rate_per_gpu": float(qps / gpu_count) if gpu_count > 0 else 0.0,
"interarrival_cv": _cv(interarrivals),
"fano_1s": _fano(per_second_counts),
"one_second_count_mean": statistics.fmean(per_second_counts)
if per_second_counts
else 0.0,
"one_second_count_variance": statistics.pvariance(per_second_counts)
if len(per_second_counts) >= 2
else 0.0,
"one_second_bin_count": len(per_second_counts),
}
def _per_second_counts(arrivals: list[float], *, duration_s: float) -> list[float]:
if duration_s <= 0:
return [float(len(arrivals))] if arrivals else []
bin_count = max(1, int(math.ceil(duration_s)))
counts = [0.0 for _ in range(bin_count)]
for arrival in arrivals:
if arrival < 0:
continue
idx = int(math.floor(arrival))
if 0 <= idx < bin_count:
counts[idx] += 1.0
return counts
def _series_stats(values: list[float]) -> dict[str, float]:
return {
"count": float(len(values)),
"mean": statistics.fmean(values) if values else 0.0,
"p50": _percentile(values, 50.0),
"p95": _percentile(values, 95.0),
"cv": _cv(values),
}
def _cv(values: list[float]) -> float:
if not values:
return 0.0
mean = statistics.fmean(values)
if abs(mean) <= EPSILON:
return 0.0
return float(statistics.pstdev(values) / mean) if len(values) >= 2 else 0.0
def _fano(values: list[float]) -> float:
if not values:
return 0.0
mean = statistics.fmean(values)
if abs(mean) <= EPSILON:
return 0.0
return float(statistics.pvariance(values) / mean) if len(values) >= 2 else 0.0
def _percentile(values: Sequence[float], p: float) -> float:
if not values:
return 0.0
ordered = sorted(float(item) for item in values)
if len(ordered) == 1:
return ordered[0]
rank = (p / 100.0) * (len(ordered) - 1)
lower = int(math.floor(rank))
upper = int(math.ceil(rank))
if lower == upper:
return ordered[lower]
weight = rank - lower
return float(ordered[lower] * (1.0 - weight) + ordered[upper] * weight)
def _similarity_from_z(left: Sequence[float], right: Sequence[float]) -> float:
distance = math.sqrt(
sum((float(lval) - float(rval)) ** 2 for lval, rval in zip(left, right))
)
return float(math.exp(-distance))
def _family_similarity(left: Sequence[float], right: Sequence[float]) -> dict[str, float]:
result: dict[str, float] = {}
for family, family_slice in FAMILY_SLICES.items():
result[family] = _similarity_from_z(left[family_slice], right[family_slice])
return result

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
import json
import io
import math
import os
import signal
import subprocess
@@ -25,6 +27,12 @@ from aituner.harness import (
build_harness_guided_proposal,
build_harness_stop_proposal,
)
from aituner.lca import (
build_workload_profile,
profile_similarity,
resolve_length_mode,
similarity_report,
)
from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal
from aituner.search import ThresholdProbe, binary_search_max_feasible
from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations
@@ -48,7 +56,7 @@ from aituner.worker import (
_wait_for_server_or_exit,
run_trial,
)
from aituner.trace import TraceRequest
from aituner.trace import TraceRequest, WindowRecord
REPO_ROOT = Path(__file__).resolve().parents[1]
@@ -241,6 +249,150 @@ class CoreFlowTests(unittest.TestCase):
self.assertIn("knob_harnesses", prompt)
self.assertTrue(study_root.exists())
def test_lca_workload_profile_uses_standard_10d_features(self) -> None:
window = WindowRecord(
window_id="w1",
trace_path=Path("trace.jsonl"),
trace_type="chat",
window_start=0.0,
window_end=4.0,
source_payload={"block_size": 64},
)
requests = [
TraceRequest(
row_id="r1",
arrival_s=0.0,
sampling_u=1.0,
body={},
prompt_tokens_hint=100,
completion_tokens_hint=10,
metadata={"hash_ids": [1, 2]},
),
TraceRequest(
row_id="r2",
arrival_s=1.0,
sampling_u=1.0,
body={},
prompt_tokens_hint=100,
completion_tokens_hint=20,
metadata={"hash_ids": [1, 3]},
),
]
profile = build_workload_profile(
requests,
window,
gpu_count=2,
length_mode="total",
)
self.assertEqual(len(profile.feature_names), 10)
self.assertEqual(len(profile.vector), 10)
self.assertEqual(profile.feature_names[0], "L.log_mean_length")
self.assertAlmostEqual(profile.stats["cache"]["total_hit_length"], 64.0)
self.assertAlmostEqual(profile.stats["cache"]["hit_rate"], 64.0 / 230.0)
self.assertAlmostEqual(profile.stats["cache"]["input_hit_rate"], 64.0 / 200.0)
self.assertAlmostEqual(profile.vector[3], math.log1p(32.0))
self.assertAlmostEqual(profile.vector[5], 1.0)
self.assertAlmostEqual(profile.stats["arrival"]["request_rate_per_gpu"], 0.25)
self.assertAlmostEqual(profile.stats["arrival"]["fano_1s"], 0.5)
self.assertEqual(resolve_length_mode(request_mode="decode_only"), "output")
def test_lca_similarity_matrix_separates_different_profiles(self) -> None:
window = WindowRecord(
window_id="base",
trace_path=Path("trace.jsonl"),
trace_type="chat",
window_start=0.0,
window_end=4.0,
source_payload={"block_size": 64},
)
def make_profile(window_id: str, input_tokens: int, *, arrival_gap: float) -> object:
reqs = [
TraceRequest(
row_id=f"{window_id}-1",
arrival_s=0.0,
sampling_u=1.0,
body={},
prompt_tokens_hint=input_tokens,
completion_tokens_hint=16,
metadata={"hash_ids": [window_id, 1]},
),
TraceRequest(
row_id=f"{window_id}-2",
arrival_s=arrival_gap,
sampling_u=1.0,
body={},
prompt_tokens_hint=input_tokens,
completion_tokens_hint=16,
metadata={"hash_ids": [window_id, 1, 2]},
),
]
return build_workload_profile(
reqs,
WindowRecord(
window_id=window_id,
trace_path=window.trace_path,
trace_type=window.trace_type,
window_start=window.window_start,
window_end=window.window_end,
source_payload=window.source_payload,
),
gpu_count=1,
length_mode="total",
)
p1 = make_profile("same-a", 100, arrival_gap=1.0)
p2 = make_profile("same-b", 100, arrival_gap=1.0)
p3 = make_profile("different", 10000, arrival_gap=0.1)
report = similarity_report([p1, p2, p3])
self.assertAlmostEqual(profile_similarity(p1, p2), 1.0)
self.assertGreater(report["matrix"][0][1], report["matrix"][0][2])
self.assertIn("L", report["pairs"][2]["family_similarity"])
def test_cli_profile_window_outputs_lca_profile(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
stdout = io.StringIO()
with mock.patch("sys.stdout", stdout):
rc = cli_main(
[
"profile",
"window",
"--spec",
str(study_path),
"--gpu-count",
"8",
]
)
self.assertEqual(rc, 0)
payload = json.loads(stdout.getvalue())
self.assertEqual(payload["profile"]["window_id"], "chat_w1")
self.assertEqual(len(payload["profile"]["vector"]), 10)
self.assertEqual(payload["profile"]["gpu_count"], 8)
def test_cli_profile_window_does_not_resolve_llm_endpoint(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
payload = json.loads(study_path.read_text(encoding="utf-8"))
payload["llm"]["endpoint"] = {
"provider": "codex",
"model": "gpt-5.4",
}
study_path.write_text(json.dumps(payload), encoding="utf-8")
stdout = io.StringIO()
with mock.patch("sys.stdout", stdout):
rc = cli_main(["profile", "window", "--spec", str(study_path)])
self.assertEqual(rc, 0)
self.assertEqual(json.loads(stdout.getvalue())["profile"]["window_id"], "chat_w1")
def test_harness_uses_latency_failures_before_generic_unrecoverable(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)