Unify harness L-C-A on the canonical lca.WorkloadProfile

Phase 0 of the two-stop work. The prompt block labeled `workload_lca_profile`
previously re-derived L-C-A from summarize_window's ad-hoc percentiles, diverging
from the paper's 10-dim RobustScaler vector implemented in lca.py. Make that block
authoritative: build_harness_context now accepts an optional workload_profile and
renders the canonical 10-dim vector + per-family stats when present, falling back
to the legacy rendering only when no profile is supplied (direct unit-test calls).

Real call sites (study prompt/llm-propose/tune, run_baseline_then_llm) build the
profile via lca.build_study_workload_profile and pass it through build_prompt. The
heuristic regime classifiers keep reading window_summary; that is the heuristic
layer, distinct from the similarity metric.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 14:12:17 +08:00
parent 8b4116fad0
commit 6f8e3c95c1
6 changed files with 126 additions and 4 deletions

View File

@@ -10,6 +10,7 @@ from aituner.llm import (
load_capability_profile, load_capability_profile,
parse_proposal_text, parse_proposal_text,
) )
from aituner.lca import build_study_workload_profile
from aituner.spec import load_study_spec from aituner.spec import load_study_spec
from aituner.store import StudyStore from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window from aituner.trace import load_trace_requests, summarize_window
@@ -89,6 +90,7 @@ def main() -> int:
window_summary=summarize_window(requests, window), window_summary=summarize_window(requests, window),
state=state, state=state,
capability_profile=capability_profile, capability_profile=capability_profile,
workload_profile=build_study_workload_profile(study, requests, window),
) )
prompt_name = f"prompt-{state.next_trial_index:04d}" prompt_name = f"prompt-{state.next_trial_index:04d}"
store.write_prompt(study.study_id, prompt_name, prompt) store.write_prompt(study.study_id, prompt_name, prompt)

View File

@@ -14,6 +14,7 @@ from .harness import (
) )
from .job import append_job, build_trial_job from .job import append_job, build_trial_job
from .lca import ( from .lca import (
build_study_workload_profile,
build_workload_profile, build_workload_profile,
resolve_length_mode, resolve_length_mode,
similarity_report, similarity_report,
@@ -140,6 +141,7 @@ def cmd_study_prompt(args: argparse.Namespace) -> int:
window_summary=summarize_window(requests, window), window_summary=summarize_window(requests, window),
state=state, state=state,
capability_profile=capability_profile, capability_profile=capability_profile,
workload_profile=build_study_workload_profile(study, requests, window),
) )
prompt_name = args.prompt_name or f"prompt-{state.next_trial_index:04d}" prompt_name = args.prompt_name or f"prompt-{state.next_trial_index:04d}"
path = store.write_prompt(study.study_id, prompt_name, prompt) path = store.write_prompt(study.study_id, prompt_name, prompt)
@@ -160,6 +162,7 @@ def cmd_study_llm_propose(args: argparse.Namespace) -> int:
window_summary=summarize_window(requests, window), window_summary=summarize_window(requests, window),
state=state, state=state,
capability_profile=capability_profile, capability_profile=capability_profile,
workload_profile=build_study_workload_profile(study, requests, window),
) )
proposal_text = call_llm_for_proposal( proposal_text = call_llm_for_proposal(
policy=study.llm, policy=study.llm,
@@ -242,11 +245,13 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
break break
window, requests = load_trace_requests(study, study_spec_path=spec_path) window, requests = load_trace_requests(study, study_spec_path=spec_path)
window_summary = summarize_window(requests, window) window_summary = summarize_window(requests, window)
workload_profile = build_study_workload_profile(study, requests, window)
harness_context = ( harness_context = (
build_harness_context( build_harness_context(
study=study, study=study,
window_summary=window_summary, window_summary=window_summary,
state=state, state=state,
workload_profile=workload_profile,
) )
if study.llm.use_harness if study.llm.use_harness
else None else None
@@ -256,6 +261,7 @@ def cmd_study_tune(args: argparse.Namespace) -> int:
window_summary=window_summary, window_summary=window_summary,
state=state, state=state,
capability_profile=capability_profile, capability_profile=capability_profile,
workload_profile=workload_profile,
) )
prompt_name = f"prompt-{state.next_trial_index:04d}" prompt_name = f"prompt-{state.next_trial_index:04d}"
store.write_prompt(study.study_id, prompt_name, prompt) store.write_prompt(study.study_id, prompt_name, prompt)

View File

@@ -4,6 +4,7 @@ import json
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from .lca import EPSILON, WorkloadProfile
from .spec import ConfigPatch, Proposal, StudySpec, StudyState, TrialSummary from .spec import ConfigPatch, Proposal, StudySpec, StudyState, TrialSummary
@@ -30,6 +31,7 @@ def build_harness_context(
study: StudySpec, study: StudySpec,
window_summary: dict[str, Any], window_summary: dict[str, Any],
state: StudyState, state: StudyState,
workload_profile: WorkloadProfile | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
recent_diagnostics = _recent_trial_diagnostics(state) recent_diagnostics = _recent_trial_diagnostics(state)
trial_profiles = _trial_profiles(study, recent_diagnostics) trial_profiles = _trial_profiles(study, recent_diagnostics)
@@ -52,7 +54,7 @@ def build_harness_context(
"feature_model": "L-C-A: request lengths, inter-request KV-cache reuse, and arrival dynamics.", "feature_model": "L-C-A: request lengths, inter-request KV-cache reuse, and arrival dynamics.",
"trial_policy": "Profile measured trials, rank bottleneck hypotheses, score generic candidate actions, and stop only when no useful measured hypothesis remains.", "trial_policy": "Profile measured trials, rank bottleneck hypotheses, score generic candidate actions, and stop only when no useful measured hypothesis remains.",
}, },
"workload_lca_profile": _workload_lca_profile(window_summary), "workload_lca_profile": _workload_lca_profile(window_summary, workload_profile),
"recent_trial_diagnostics": recent_diagnostics, "recent_trial_diagnostics": recent_diagnostics,
"trial_profiles": trial_profiles, "trial_profiles": trial_profiles,
"bottleneck_hypotheses": bottleneck_hypotheses, "bottleneck_hypotheses": bottleneck_hypotheses,
@@ -141,7 +143,12 @@ def render_harness_context(context: dict[str, Any]) -> str:
return json.dumps(context, ensure_ascii=False, indent=2) return json.dumps(context, ensure_ascii=False, indent=2)
def _workload_lca_profile(window_summary: dict[str, Any]) -> dict[str, Any]: def _workload_lca_profile(
window_summary: dict[str, Any],
workload_profile: WorkloadProfile | None = None,
) -> dict[str, Any]:
if workload_profile is not None:
return _canonical_lca_profile(workload_profile)
prefix_cache = window_summary.get("prefix_cache") prefix_cache = window_summary.get("prefix_cache")
if not isinstance(prefix_cache, dict): if not isinstance(prefix_cache, dict):
prefix_cache = {} prefix_cache = {}
@@ -178,6 +185,54 @@ def _workload_lca_profile(window_summary: dict[str, Any]) -> dict[str, Any]:
} }
def _canonical_lca_profile(profile: WorkloadProfile) -> dict[str, Any]:
"""Authoritative L-C-A block: the paper's 10-dim RobustScaler vector.
Sourced from lca.WorkloadProfile so the prompt's L-C-A is the same metric
used for the workload-similarity computations, not an ad-hoc re-derivation.
The regime labels reuse the heuristic classifiers but are fed from the
canonical stats.
"""
stats = profile.stats if isinstance(profile.stats, dict) else {}
length = stats.get("length") if isinstance(stats.get("length"), dict) else {}
cache = stats.get("cache") if isinstance(stats.get("cache"), dict) else {}
arrival = stats.get("arrival") if isinstance(stats.get("arrival"), dict) else {}
length_p95 = _as_float(length.get("p95"))
length_p50 = _as_float(length.get("p50"))
tail_ratio = float(length_p95 / max(length_p50, EPSILON)) if length_p95 else 0.0
repeated_token_ratio = _as_float(cache.get("input_hit_rate"))
fano_1s = _as_float(arrival.get("fano_1s"))
interarrival_cv = _as_float(arrival.get("interarrival_cv"))
return {
"metric": "paper L-C-A (10-dim, RobustScaler-normalized) from lca.WorkloadProfile",
"length_mode": profile.length_mode,
"feature_names": list(profile.feature_names),
"vector": list(profile.vector),
"L_request_lengths": {
"mean": _as_float(length.get("mean")),
"p50": length_p50,
"p95": length_p95,
"cv": _as_float(length.get("cv")),
"tail_ratio_p95_p50": tail_ratio,
"regime": _length_regime(length_p95, tail_ratio),
},
"C_prefix_cache": {
"hit_rate": _as_float(cache.get("hit_rate")),
"input_hit_rate": repeated_token_ratio,
"repeated_block_ratio": _as_float(cache.get("repeated_block_ratio")),
"rows_with_hash_ids": int(cache.get("rows_with_hash_ids") or 0),
"regime": _cache_regime(repeated_token_ratio),
},
"A_arrivals": {
"request_rate": _as_float(arrival.get("request_rate")),
"request_rate_per_gpu": _as_float(arrival.get("request_rate_per_gpu")),
"interarrival_cv": interarrival_cv,
"fano_1s": fano_1s,
"regime": _arrival_regime(fano_1s, interarrival_cv),
},
}
def _knob_harnesses( def _knob_harnesses(
study: StudySpec, study: StudySpec,
window_summary: dict[str, Any], window_summary: dict[str, Any],

View File

@@ -4,10 +4,13 @@ import json
import math import math
import statistics import statistics
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Sequence from typing import TYPE_CHECKING, Any, Sequence
from .trace import TraceRequest, WindowRecord from .trace import TraceRequest, WindowRecord
if TYPE_CHECKING:
from .spec import StudySpec
EPSILON = 1e-9 EPSILON = 1e-9
@@ -178,6 +181,28 @@ def build_workload_profile(
) )
def build_study_workload_profile(
study: "StudySpec",
requests: list[TraceRequest],
window: WindowRecord,
) -> WorkloadProfile:
"""Canonical L-C-A profile for a study's loaded window.
This is the single source of truth for the paper's 10-dimensional L-C-A
feature vector used by the harness prompt and (later) by Stop-A.
"""
mode = resolve_length_mode(
request_mode=study.trace.request_mode,
length_mode="auto",
)
return build_workload_profile(
requests,
window,
gpu_count=study.hardware.gpu_count,
length_mode=mode,
)
def fit_robust_scale(profiles: Sequence[WorkloadProfile]) -> RobustScale: def fit_robust_scale(profiles: Sequence[WorkloadProfile]) -> RobustScale:
if not profiles: if not profiles:
raise ValueError("At least one profile is required to fit a robust scale.") raise ValueError("At least one profile is required to fit a robust scale.")

View File

@@ -3,12 +3,15 @@ from __future__ import annotations
import json import json
import time import time
from pathlib import Path from pathlib import Path
from typing import Any from typing import TYPE_CHECKING, Any
from .harness import build_harness_context, render_harness_context from .harness import build_harness_context, render_harness_context
from .http_client import chat_completion, stream_text_completion from .http_client import chat_completion, stream_text_completion
from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState from .spec import LLMPolicySpec, Proposal, SpecError, StudySpec, StudyState
if TYPE_CHECKING:
from .lca import WorkloadProfile
def _parse_bool_like(value: Any, *, context: str) -> bool: def _parse_bool_like(value: Any, *, context: str) -> bool:
if isinstance(value, bool): if isinstance(value, bool):
@@ -178,6 +181,7 @@ def build_prompt(
window_summary: dict[str, Any], window_summary: dict[str, Any],
state: StudyState, state: StudyState,
capability_profile: dict[str, Any] | None, capability_profile: dict[str, Any] | None,
workload_profile: "WorkloadProfile | None" = None,
) -> str: ) -> str:
objective_notes: list[str] = [] objective_notes: list[str] = []
if study.trace.request_mode == "decode_only": if study.trace.request_mode == "decode_only":
@@ -409,6 +413,7 @@ def build_prompt(
study=study, study=study,
window_summary=window_summary, window_summary=window_summary,
state=state, state=state,
workload_profile=workload_profile,
) )
), ),
"", "",

View File

@@ -28,6 +28,7 @@ from aituner.harness import (
build_harness_stop_proposal, build_harness_stop_proposal,
) )
from aituner.lca import ( from aituner.lca import (
build_study_workload_profile,
build_workload_profile, build_workload_profile,
profile_similarity, profile_similarity,
resolve_length_mode, resolve_length_mode,
@@ -298,6 +299,34 @@ class CoreFlowTests(unittest.TestCase):
self.assertAlmostEqual(profile.stats["arrival"]["fano_1s"], 0.5) self.assertAlmostEqual(profile.stats["arrival"]["fano_1s"], 0.5)
self.assertEqual(resolve_length_mode(request_mode="decode_only"), "output") self.assertEqual(resolve_length_mode(request_mode="decode_only"), "output")
def test_harness_context_uses_canonical_lca_vector(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
study_path = _write_study_assets(tmp_path)
study = load_study_spec(study_path)
window, requests = load_trace_requests(study, study_spec_path=study_path)
profile = build_study_workload_profile(study, requests, window)
state = StudyState(study_id=study.study_id, trials=[])
summary = summarize_window(requests, window)
context = build_harness_context(
study=study,
window_summary=summary,
state=state,
workload_profile=profile,
)
block = context["workload_lca_profile"]
# The labeled L-C-A block is the canonical 10-dim metric, not ad-hoc.
self.assertEqual(block["vector"], profile.vector)
self.assertEqual(len(block["vector"]), 10)
self.assertIn("RobustScaler", block["metric"])
# Without a profile it falls back to the legacy ad-hoc rendering.
legacy = build_harness_context(
study=study,
window_summary=summary,
state=state,
)["workload_lca_profile"]
self.assertNotIn("vector", legacy)
def test_lca_similarity_matrix_separates_different_profiles(self) -> None: def test_lca_similarity_matrix_separates_different_profiles(self) -> None:
window = WindowRecord( window = WindowRecord(
window_id="base", window_id="base",