From 27d1c8fa920748d8da35c22342d28a681de6216c Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Mon, 15 Jun 2026 14:02:24 +0800 Subject: [PATCH] Add L-C-A workload profile metric and CLI profile commands Implement the paper's 10-dimensional L-C-A workload feature vector (RobustScaler-normalized, sim=exp(-||dz||)) in lca.py, and wire it into `aituner profile window` / `aituner profile similarity`. Covered by tests. Co-Authored-By: Claude Opus 4.8 --- src/aituner/cli.py | 212 ++++++++++++++++++++- src/aituner/lca.py | 406 ++++++++++++++++++++++++++++++++++++++++ tests/test_core_flow.py | 154 ++++++++++++++- 3 files changed, 770 insertions(+), 2 deletions(-) create mode 100644 src/aituner/lca.py diff --git a/src/aituner/cli.py b/src/aituner/cli.py index cc9fb7c..e24b99f 100644 --- a/src/aituner/cli.py +++ b/src/aituner/cli.py @@ -3,6 +3,7 @@ from __future__ import annotations import argparse import json import sys +from dataclasses import replace from pathlib import Path from .compare import run_compare @@ -12,8 +13,20 @@ from .harness import ( build_harness_stop_proposal, ) from .job import append_job, build_trial_job +from .lca import ( + build_workload_profile, + resolve_length_mode, + similarity_report, +) from .llm import build_prompt, call_llm_for_proposal, load_capability_profile, parse_proposal_text -from .spec import Proposal, SpecError, load_study_spec, to_jsonable +from .spec import ( + Proposal, + SpecError, + StudySpec, + load_structured_file, + load_study_spec, + to_jsonable, +) from .store import StudyStore from .trace import load_trace_requests, summarize_window from .worker import run_trial @@ -422,6 +435,159 @@ def cmd_compare_run(args: argparse.Namespace) -> int: return 0 +def _resolve_profile_gpu_count(args: argparse.Namespace, study: StudySpec) -> int: + gpu_count = args.gpu_count + if gpu_count is None: + gpu_count = study.hardware.gpu_count + if gpu_count <= 0: + raise SpecError("--gpu-count must be > 0.") + return int(gpu_count) + + +def _load_profile_study_spec(spec_path: Path) -> StudySpec: + payload = dict(load_structured_file(spec_path)) + llm_payload = dict(payload.get("llm") or {}) + llm_payload.pop("endpoint", None) + payload["llm"] = llm_payload + return StudySpec.from_dict(payload) + + +def _profile_current_study_window(args: argparse.Namespace) -> dict[str, object]: + spec_path = Path(args.spec).resolve() + study = _load_profile_study_spec(spec_path) + mode = resolve_length_mode( + request_mode=study.trace.request_mode, + length_mode=args.length_mode, + ) + window, requests = load_trace_requests(study, study_spec_path=spec_path) + profile = build_workload_profile( + requests, + window, + gpu_count=_resolve_profile_gpu_count(args, study), + length_mode=mode, + ) + return { + "profile": profile.to_dict(), + "source": { + "study_spec_path": str(spec_path), + "window_id": study.trace.window_id, + }, + } + + +def _resolve_windows_path_for_profile(study: StudySpec, *, study_spec_path: Path) -> Path: + path = Path(study.trace.windows_path) + if not path.is_absolute(): + path = (study_spec_path.parent / path).resolve() + return path + + +def _load_profile_windows( + study: StudySpec, + *, + study_spec_path: Path, +) -> list[dict[str, object]]: + windows_path = _resolve_windows_path_for_profile(study, study_spec_path=study_spec_path) + payload = json.loads(windows_path.read_text(encoding="utf-8")) + raw_windows = payload.get("windows") if isinstance(payload, dict) else payload + if not isinstance(raw_windows, list): + raise SpecError(f"windows payload must contain a list: {windows_path}") + return [ + {str(key): value for key, value in item.items()} + for item in raw_windows + if isinstance(item, dict) + ] + + +def _selected_profile_windows( + args: argparse.Namespace, + study: StudySpec, + *, + study_spec_path: Path, +) -> list[dict[str, object]]: + windows = _load_profile_windows(study, study_spec_path=study_spec_path) + window_ids = set(args.window_id or []) + selected: list[dict[str, object]] = [] + for item in windows: + window_id = str(item.get("window_id") or "").strip() + if not window_id: + continue + if window_ids and window_id not in window_ids: + continue + if not window_ids and not args.all: + if window_id != study.trace.window_id: + continue + trace_type = str(item.get("trace_type") or "").strip() + if args.trace_type and trace_type != args.trace_type: + continue + date_value = str(item.get("date") or "").strip() + if args.date_from and date_value and date_value < args.date_from: + continue + if args.date_to and date_value and date_value > args.date_to: + continue + if args.slot_token and str(item.get("slot_token") or "").strip() != args.slot_token: + continue + selected.append(item) + selected.sort( + key=lambda item: ( + str(item.get("date") or ""), + str(item.get("slot_token") or ""), + str(item.get("window_id") or ""), + ) + ) + if args.limit is not None: + selected = selected[: args.limit] + if not selected: + raise SpecError("No trace windows selected for profile similarity.") + return selected + + +def cmd_profile_window(args: argparse.Namespace) -> int: + print(json.dumps(_profile_current_study_window(args), ensure_ascii=False, indent=2)) + return 0 + + +def cmd_profile_similarity(args: argparse.Namespace) -> int: + spec_path = Path(args.spec).resolve() + study = _load_profile_study_spec(spec_path) + mode = resolve_length_mode( + request_mode=study.trace.request_mode, + length_mode=args.length_mode, + ) + gpu_count = _resolve_profile_gpu_count(args, study) + profiles = [] + selected = _selected_profile_windows(args, study, study_spec_path=spec_path) + for item in selected: + window_id = str(item["window_id"]) + window_study = replace(study, trace=replace(study.trace, window_id=window_id)) + window, requests = load_trace_requests(window_study, study_spec_path=spec_path) + profiles.append( + build_workload_profile( + requests, + window, + gpu_count=gpu_count, + length_mode=mode, + ) + ) + print( + json.dumps( + { + "source": { + "study_spec_path": str(spec_path), + "selected_window_count": len(profiles), + "length_mode": mode, + "gpu_count": gpu_count, + }, + "profiles": [profile.to_dict() for profile in profiles], + "similarity": similarity_report(profiles), + }, + ensure_ascii=False, + indent=2, + ) + ) + return 0 + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="AITuner CLI") subparsers = parser.add_subparsers(dest="command", required=True) @@ -490,6 +656,50 @@ def build_parser() -> argparse.ArgumentParser: compare_run.add_argument("--output-root") compare_run.set_defaults(func=cmd_compare_run) + profile = subparsers.add_parser("profile") + profile_sub = profile.add_subparsers(dest="profile_command", required=True) + + profile_window = profile_sub.add_parser("window") + profile_window.add_argument("--spec", required=True) + profile_window.add_argument( + "--length-mode", + default="auto", + choices=["auto", "total", "input", "output"], + help="Token length basis for the L vector. auto uses output for decode_only and total otherwise.", + ) + profile_window.add_argument( + "--gpu-count", + type=int, + help="GPU denominator for per-GPU arrival rate. Defaults to hardware.gpu_count.", + ) + profile_window.set_defaults(func=cmd_profile_window) + + profile_similarity = profile_sub.add_parser("similarity") + profile_similarity.add_argument("--spec", required=True) + profile_similarity.add_argument("--window-id", action="append") + profile_similarity.add_argument("--trace-type") + profile_similarity.add_argument("--date-from") + profile_similarity.add_argument("--date-to") + profile_similarity.add_argument("--slot-token") + profile_similarity.add_argument("--limit", type=int) + profile_similarity.add_argument( + "--all", + action="store_true", + help="Profile all windows selected by filters. Without this or --window-id, only the study window is used.", + ) + profile_similarity.add_argument( + "--length-mode", + default="auto", + choices=["auto", "total", "input", "output"], + help="Token length basis for the L vector. auto uses output for decode_only and total otherwise.", + ) + profile_similarity.add_argument( + "--gpu-count", + type=int, + help="GPU denominator for per-GPU arrival rate. Defaults to hardware.gpu_count.", + ) + profile_similarity.set_defaults(func=cmd_profile_similarity) + return parser diff --git a/src/aituner/lca.py b/src/aituner/lca.py new file mode 100644 index 0000000..21846ac --- /dev/null +++ b/src/aituner/lca.py @@ -0,0 +1,406 @@ +from __future__ import annotations + +import json +import math +import statistics +from dataclasses import dataclass +from typing import Any, Sequence + +from .trace import TraceRequest, WindowRecord + + +EPSILON = 1e-9 + +FEATURE_NAMES = [ + "L.log_mean_length", + "L.log_p95_over_mean_length", + "L.cv_length", + "C.log_mean_hit_length", + "C.log_p95_over_mean_hit_length", + "C.cv_hit_length", + "C.hit_rate", + "A.log_request_rate_per_gpu", + "A.cv_interarrival", + "A.log_fano_1s", +] + +FAMILY_SLICES = { + "L": slice(0, 3), + "C": slice(3, 7), + "A": slice(7, 10), +} + +LENGTH_MODES = {"total", "input", "output"} + + +@dataclass(frozen=True) +class WorkloadProfile: + window_id: str + trace_type: str + request_count: int + duration_s: float + gpu_count: int + length_mode: str + feature_names: list[str] + vector: list[float] + stats: dict[str, Any] + + def to_dict(self) -> dict[str, Any]: + return { + "window_id": self.window_id, + "trace_type": self.trace_type, + "request_count": self.request_count, + "duration_s": self.duration_s, + "gpu_count": self.gpu_count, + "length_mode": self.length_mode, + "feature_names": self.feature_names, + "vector": self.vector, + "stats": self.stats, + } + + +@dataclass(frozen=True) +class RobustScale: + feature_names: list[str] + center: list[float] + scale: list[float] + + def transform(self, vector: Sequence[float]) -> list[float]: + return [ + (float(value) - self.center[idx]) / self.scale[idx] + for idx, value in enumerate(vector) + ] + + def to_dict(self) -> dict[str, Any]: + return { + "feature_names": self.feature_names, + "center": self.center, + "scale": self.scale, + } + + +def resolve_length_mode(*, request_mode: str | None = None, length_mode: str = "auto") -> str: + normalized = str(length_mode or "auto").strip().lower() + if normalized == "auto": + return ( + "output" + if str(request_mode or "").strip().lower() == "decode_only" + else "total" + ) + if normalized not in LENGTH_MODES: + raise ValueError( + "length_mode must be one of: auto, total, input, output." + ) + return normalized + + +def build_workload_profile( + requests: list[TraceRequest], + window: WindowRecord, + *, + gpu_count: int, + length_mode: str = "total", +) -> WorkloadProfile: + if gpu_count <= 0: + raise ValueError("gpu_count must be > 0.") + if length_mode not in LENGTH_MODES: + raise ValueError(f"Unsupported length_mode: {length_mode}") + + duration_s = _duration_s(requests, window) + input_lengths = [float(item.prompt_tokens_hint or 0) for item in requests] + output_lengths = [float(item.completion_tokens_hint or 0) for item in requests] + profile_lengths = [ + _profile_length(input_len, output_len, length_mode=length_mode) + for input_len, output_len in zip(input_lengths, output_lengths) + ] + hit_lengths, cache_stats = _ideal_cache_hit_lengths( + requests, + input_lengths=input_lengths, + block_size=_block_size(window), + ) + arrival_stats = _arrival_stats(requests, duration_s=duration_s, gpu_count=gpu_count) + + length_stats = _series_stats(profile_lengths) + hit_stats = _series_stats(hit_lengths) + total_profile_length = sum(profile_lengths) + total_input_length = sum(input_lengths) + total_hit_length = sum(hit_lengths) + feature_hit_rate = ( + float(total_hit_length / max(total_profile_length, EPSILON)) + if total_profile_length > 0 + else 0.0 + ) + input_hit_rate = ( + float(total_hit_length / max(total_input_length, EPSILON)) + if total_input_length > 0 + else 0.0 + ) + + vector = [ + math.log1p(length_stats["mean"]), + math.log1p(length_stats["p95"] / max(length_stats["mean"], EPSILON)), + length_stats["cv"], + math.log1p(hit_stats["mean"]), + math.log1p(hit_stats["p95"] / max(hit_stats["mean"], EPSILON)), + hit_stats["cv"], + feature_hit_rate, + math.log1p(arrival_stats["request_rate_per_gpu"]), + arrival_stats["interarrival_cv"], + math.log1p(arrival_stats["fano_1s"]), + ] + + return WorkloadProfile( + window_id=window.window_id, + trace_type=window.trace_type, + request_count=len(requests), + duration_s=duration_s, + gpu_count=int(gpu_count), + length_mode=length_mode, + feature_names=list(FEATURE_NAMES), + vector=[float(item) for item in vector], + stats={ + "length": { + **length_stats, + "mode": length_mode, + "total": total_profile_length, + "input_total": total_input_length, + "output_total": sum(output_lengths), + }, + "cache": { + **hit_stats, + **cache_stats, + "total_hit_length": total_hit_length, + "hit_rate": feature_hit_rate, + "input_hit_rate": input_hit_rate, + }, + "arrival": arrival_stats, + }, + ) + + +def fit_robust_scale(profiles: Sequence[WorkloadProfile]) -> RobustScale: + if not profiles: + raise ValueError("At least one profile is required to fit a robust scale.") + centers: list[float] = [] + scales: list[float] = [] + for idx in range(len(FEATURE_NAMES)): + values = [float(profile.vector[idx]) for profile in profiles] + median = _percentile(values, 50.0) + iqr = _percentile(values, 75.0) - _percentile(values, 25.0) + centers.append(float(median)) + scales.append(float(iqr if abs(iqr) > EPSILON else 1.0)) + return RobustScale(feature_names=list(FEATURE_NAMES), center=centers, scale=scales) + + +def profile_similarity( + left: WorkloadProfile, + right: WorkloadProfile, + *, + scale: RobustScale | None = None, +) -> float: + scaler = scale or fit_robust_scale([left, right]) + z_left = scaler.transform(left.vector) + z_right = scaler.transform(right.vector) + return _similarity_from_z(z_left, z_right) + + +def similarity_report(profiles: Sequence[WorkloadProfile]) -> dict[str, Any]: + if not profiles: + raise ValueError("At least one profile is required.") + scale = fit_robust_scale(profiles) + transformed = [scale.transform(profile.vector) for profile in profiles] + rows: list[dict[str, Any]] = [] + matrix: list[list[float]] = [] + for i, left in enumerate(profiles): + row_values: list[float] = [] + for j, right in enumerate(profiles): + sim = _similarity_from_z(transformed[i], transformed[j]) + row_values.append(sim) + rows.append( + { + "left": left.window_id, + "right": right.window_id, + "similarity": sim, + "family_similarity": _family_similarity(transformed[i], transformed[j]), + } + ) + matrix.append(row_values) + return { + "feature_names": list(FEATURE_NAMES), + "scaler": scale.to_dict(), + "windows": [profile.window_id for profile in profiles], + "matrix": matrix, + "pairs": rows, + } + + +def dumps_profile(profile: WorkloadProfile) -> str: + return json.dumps(profile.to_dict(), ensure_ascii=False, indent=2) + "\n" + + +def _duration_s(requests: list[TraceRequest], window: WindowRecord) -> float: + duration = max(float(window.window_end) - float(window.window_start), 0.0) + if duration > 0: + return duration + if len(requests) >= 2: + return max(0.0, float(requests[-1].arrival_s) - float(requests[0].arrival_s)) + return 0.0 + + +def _profile_length(input_length: float, output_length: float, *, length_mode: str) -> float: + if length_mode == "input": + return max(input_length, 0.0) + if length_mode == "output": + return max(output_length, 0.0) + return max(input_length, 0.0) + max(output_length, 0.0) + + +def _block_size(window: WindowRecord) -> int: + value = window.source_payload.get("block_size") + if isinstance(value, bool): + return 1 + if isinstance(value, (int, float)) and value > 0: + return int(value) + if isinstance(value, str) and value.strip(): + try: + parsed = int(value) + except ValueError: + return 1 + return parsed if parsed > 0 else 1 + return 1 + + +def _ideal_cache_hit_lengths( + requests: list[TraceRequest], + *, + input_lengths: list[float], + block_size: int, +) -> tuple[list[float], dict[str, Any]]: + seen_hashes: set[Any] = set() + hit_lengths: list[float] = [] + total_blocks = 0 + repeated_blocks = 0 + rows_with_hash_ids = 0 + for request, input_length in zip(requests, input_lengths): + hash_ids = request.metadata.get("hash_ids") + if not isinstance(hash_ids, list): + hit_lengths.append(0.0) + continue + rows_with_hash_ids += 1 + repeated_for_request = 0 + for hash_id in hash_ids: + total_blocks += 1 + if hash_id in seen_hashes: + repeated_blocks += 1 + repeated_for_request += 1 + else: + seen_hashes.add(hash_id) + hit_lengths.append(float(min(max(input_length, 0.0), repeated_for_request * block_size))) + return hit_lengths, { + "block_size": block_size, + "rows_with_hash_ids": rows_with_hash_ids, + "total_blocks": total_blocks, + "repeated_blocks": repeated_blocks, + "repeated_block_ratio": ( + float(repeated_blocks / total_blocks) if total_blocks else 0.0 + ), + } + + +def _arrival_stats( + requests: list[TraceRequest], + *, + duration_s: float, + gpu_count: int, +) -> dict[str, Any]: + arrivals = [float(item.arrival_s) for item in requests] + interarrivals = [ + max(0.0, arrivals[idx] - arrivals[idx - 1]) + for idx in range(1, len(arrivals)) + ] + per_second_counts = _per_second_counts(arrivals, duration_s=duration_s) + qps = float(len(requests) / duration_s) if duration_s > 0 else 0.0 + return { + "request_rate": qps, + "request_rate_per_gpu": float(qps / gpu_count) if gpu_count > 0 else 0.0, + "interarrival_cv": _cv(interarrivals), + "fano_1s": _fano(per_second_counts), + "one_second_count_mean": statistics.fmean(per_second_counts) + if per_second_counts + else 0.0, + "one_second_count_variance": statistics.pvariance(per_second_counts) + if len(per_second_counts) >= 2 + else 0.0, + "one_second_bin_count": len(per_second_counts), + } + + +def _per_second_counts(arrivals: list[float], *, duration_s: float) -> list[float]: + if duration_s <= 0: + return [float(len(arrivals))] if arrivals else [] + bin_count = max(1, int(math.ceil(duration_s))) + counts = [0.0 for _ in range(bin_count)] + for arrival in arrivals: + if arrival < 0: + continue + idx = int(math.floor(arrival)) + if 0 <= idx < bin_count: + counts[idx] += 1.0 + return counts + + +def _series_stats(values: list[float]) -> dict[str, float]: + return { + "count": float(len(values)), + "mean": statistics.fmean(values) if values else 0.0, + "p50": _percentile(values, 50.0), + "p95": _percentile(values, 95.0), + "cv": _cv(values), + } + + +def _cv(values: list[float]) -> float: + if not values: + return 0.0 + mean = statistics.fmean(values) + if abs(mean) <= EPSILON: + return 0.0 + return float(statistics.pstdev(values) / mean) if len(values) >= 2 else 0.0 + + +def _fano(values: list[float]) -> float: + if not values: + return 0.0 + mean = statistics.fmean(values) + if abs(mean) <= EPSILON: + return 0.0 + return float(statistics.pvariance(values) / mean) if len(values) >= 2 else 0.0 + + +def _percentile(values: Sequence[float], p: float) -> float: + if not values: + return 0.0 + ordered = sorted(float(item) for item in values) + if len(ordered) == 1: + return ordered[0] + rank = (p / 100.0) * (len(ordered) - 1) + lower = int(math.floor(rank)) + upper = int(math.ceil(rank)) + if lower == upper: + return ordered[lower] + weight = rank - lower + return float(ordered[lower] * (1.0 - weight) + ordered[upper] * weight) + + +def _similarity_from_z(left: Sequence[float], right: Sequence[float]) -> float: + distance = math.sqrt( + sum((float(lval) - float(rval)) ** 2 for lval, rval in zip(left, right)) + ) + return float(math.exp(-distance)) + + +def _family_similarity(left: Sequence[float], right: Sequence[float]) -> dict[str, float]: + result: dict[str, float] = {} + for family, family_slice in FAMILY_SLICES.items(): + result[family] = _similarity_from_z(left[family_slice], right[family_slice]) + return result diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index 69b0547..1fd5494 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -1,6 +1,8 @@ from __future__ import annotations import json +import io +import math import os import signal import subprocess @@ -25,6 +27,12 @@ from aituner.harness import ( build_harness_guided_proposal, build_harness_stop_proposal, ) +from aituner.lca import ( + build_workload_profile, + profile_similarity, + resolve_length_mode, + similarity_report, +) from aituner.llm import _extract_response_text, build_prompt, parse_proposal_text, validate_proposal from aituner.search import ThresholdProbe, binary_search_max_feasible from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations @@ -48,7 +56,7 @@ from aituner.worker import ( _wait_for_server_or_exit, run_trial, ) -from aituner.trace import TraceRequest +from aituner.trace import TraceRequest, WindowRecord REPO_ROOT = Path(__file__).resolve().parents[1] @@ -241,6 +249,150 @@ class CoreFlowTests(unittest.TestCase): self.assertIn("knob_harnesses", prompt) self.assertTrue(study_root.exists()) + def test_lca_workload_profile_uses_standard_10d_features(self) -> None: + window = WindowRecord( + window_id="w1", + trace_path=Path("trace.jsonl"), + trace_type="chat", + window_start=0.0, + window_end=4.0, + source_payload={"block_size": 64}, + ) + requests = [ + TraceRequest( + row_id="r1", + arrival_s=0.0, + sampling_u=1.0, + body={}, + prompt_tokens_hint=100, + completion_tokens_hint=10, + metadata={"hash_ids": [1, 2]}, + ), + TraceRequest( + row_id="r2", + arrival_s=1.0, + sampling_u=1.0, + body={}, + prompt_tokens_hint=100, + completion_tokens_hint=20, + metadata={"hash_ids": [1, 3]}, + ), + ] + + profile = build_workload_profile( + requests, + window, + gpu_count=2, + length_mode="total", + ) + + self.assertEqual(len(profile.feature_names), 10) + self.assertEqual(len(profile.vector), 10) + self.assertEqual(profile.feature_names[0], "L.log_mean_length") + self.assertAlmostEqual(profile.stats["cache"]["total_hit_length"], 64.0) + self.assertAlmostEqual(profile.stats["cache"]["hit_rate"], 64.0 / 230.0) + self.assertAlmostEqual(profile.stats["cache"]["input_hit_rate"], 64.0 / 200.0) + self.assertAlmostEqual(profile.vector[3], math.log1p(32.0)) + self.assertAlmostEqual(profile.vector[5], 1.0) + self.assertAlmostEqual(profile.stats["arrival"]["request_rate_per_gpu"], 0.25) + self.assertAlmostEqual(profile.stats["arrival"]["fano_1s"], 0.5) + self.assertEqual(resolve_length_mode(request_mode="decode_only"), "output") + + def test_lca_similarity_matrix_separates_different_profiles(self) -> None: + window = WindowRecord( + window_id="base", + trace_path=Path("trace.jsonl"), + trace_type="chat", + window_start=0.0, + window_end=4.0, + source_payload={"block_size": 64}, + ) + + def make_profile(window_id: str, input_tokens: int, *, arrival_gap: float) -> object: + reqs = [ + TraceRequest( + row_id=f"{window_id}-1", + arrival_s=0.0, + sampling_u=1.0, + body={}, + prompt_tokens_hint=input_tokens, + completion_tokens_hint=16, + metadata={"hash_ids": [window_id, 1]}, + ), + TraceRequest( + row_id=f"{window_id}-2", + arrival_s=arrival_gap, + sampling_u=1.0, + body={}, + prompt_tokens_hint=input_tokens, + completion_tokens_hint=16, + metadata={"hash_ids": [window_id, 1, 2]}, + ), + ] + return build_workload_profile( + reqs, + WindowRecord( + window_id=window_id, + trace_path=window.trace_path, + trace_type=window.trace_type, + window_start=window.window_start, + window_end=window.window_end, + source_payload=window.source_payload, + ), + gpu_count=1, + length_mode="total", + ) + + p1 = make_profile("same-a", 100, arrival_gap=1.0) + p2 = make_profile("same-b", 100, arrival_gap=1.0) + p3 = make_profile("different", 10000, arrival_gap=0.1) + + report = similarity_report([p1, p2, p3]) + + self.assertAlmostEqual(profile_similarity(p1, p2), 1.0) + self.assertGreater(report["matrix"][0][1], report["matrix"][0][2]) + self.assertIn("L", report["pairs"][2]["family_similarity"]) + + def test_cli_profile_window_outputs_lca_profile(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + stdout = io.StringIO() + with mock.patch("sys.stdout", stdout): + rc = cli_main( + [ + "profile", + "window", + "--spec", + str(study_path), + "--gpu-count", + "8", + ] + ) + + self.assertEqual(rc, 0) + payload = json.loads(stdout.getvalue()) + self.assertEqual(payload["profile"]["window_id"], "chat_w1") + self.assertEqual(len(payload["profile"]["vector"]), 10) + self.assertEqual(payload["profile"]["gpu_count"], 8) + + def test_cli_profile_window_does_not_resolve_llm_endpoint(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + study_path = _write_study_assets(tmp_path) + payload = json.loads(study_path.read_text(encoding="utf-8")) + payload["llm"]["endpoint"] = { + "provider": "codex", + "model": "gpt-5.4", + } + study_path.write_text(json.dumps(payload), encoding="utf-8") + stdout = io.StringIO() + with mock.patch("sys.stdout", stdout): + rc = cli_main(["profile", "window", "--spec", str(study_path)]) + + self.assertEqual(rc, 0) + self.assertEqual(json.loads(stdout.getvalue())["profile"]["window_id"], "chat_w1") + def test_harness_uses_latency_failures_before_generic_unrecoverable(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp)