Fig18 substrate: real output_length + criterion-A time_scale + Stop-A drain deadline
Replace the out=128 / scale=0.5 ablation substrate with a paper-faithful one: - Use the trace's real output_length (drop completion_tokens_override=128). The 0-8k chat window has p50=531 / p99=2436 / max=35168 output tokens, so decode (TPOT) becomes the dominant bottleneck instead of an artificial 128-token cap. - replay_time_scale=0.8775, chosen by criterion-A: binary-search the smallest scale whose A-family L-C-A similarity to the real (scale=1.0) arrivals stays >= tau (0.90). The old scale=0.5 had sim_A=0.56, distorting the arrival axis far below the tau bar used everywhere else. New calibrator: scripts/calibrate_time_scale.py. - Per-probe Stop-A-consistent drain deadline (worker._probe_drain_deadline): the wall-clock a *feasible* config needs to drain the LCA-admitted set (last_arrival + worst-case TTFT + p99_out * TPOT budget + margin). With real outputs decode dominates wall-clock, so the old fixed 320s cap would truncate the Stop-A offered window mid-decode. early_stop_max_elapsed_s (1000s) is now a hard ceiling; the per-probe deadline governs. The lag cap still cuts overload. 12-iter paired driver (both arms on dash1, removes the dash0/dash1 host confound): scripts/run_ablation_pair_d1.sh. 115 tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -130,9 +130,9 @@
|
|||||||
"min_input_tokens": 0,
|
"min_input_tokens": 0,
|
||||||
"max_input_tokens": 8192
|
"max_input_tokens": 8192
|
||||||
},
|
},
|
||||||
"replay_time_scale": 0.5,
|
"replay_time_scale": 0.8775,
|
||||||
"early_stop_max_lag_s": 45.0,
|
"early_stop_max_lag_s": 45.0,
|
||||||
"early_stop_max_elapsed_s": 320.0,
|
"early_stop_max_elapsed_s": 1000.0,
|
||||||
"adaptive_stop": {
|
"adaptive_stop": {
|
||||||
"enabled": true,
|
"enabled": true,
|
||||||
"tau": 0.9,
|
"tau": 0.9,
|
||||||
@@ -141,8 +141,7 @@
|
|||||||
"max_checks": 20,
|
"max_checks": 20,
|
||||||
"min_fraction": 0.1,
|
"min_fraction": 0.1,
|
||||||
"boundary_delta": 0.02
|
"boundary_delta": 0.02
|
||||||
},
|
}
|
||||||
"completion_tokens_override": 128
|
|
||||||
},
|
},
|
||||||
"slo": {
|
"slo": {
|
||||||
"target_pass_rate": 0.95,
|
"target_pass_rate": 0.95,
|
||||||
|
|||||||
@@ -130,9 +130,9 @@
|
|||||||
"min_input_tokens": 0,
|
"min_input_tokens": 0,
|
||||||
"max_input_tokens": 8192
|
"max_input_tokens": 8192
|
||||||
},
|
},
|
||||||
"replay_time_scale": 0.5,
|
"replay_time_scale": 0.8775,
|
||||||
"early_stop_max_lag_s": 45.0,
|
"early_stop_max_lag_s": 45.0,
|
||||||
"early_stop_max_elapsed_s": 320.0,
|
"early_stop_max_elapsed_s": 1000.0,
|
||||||
"adaptive_stop": {
|
"adaptive_stop": {
|
||||||
"enabled": true,
|
"enabled": true,
|
||||||
"tau": 0.9,
|
"tau": 0.9,
|
||||||
@@ -141,8 +141,7 @@
|
|||||||
"max_checks": 20,
|
"max_checks": 20,
|
||||||
"min_fraction": 0.1,
|
"min_fraction": 0.1,
|
||||||
"boundary_delta": 0.02
|
"boundary_delta": 0.02
|
||||||
},
|
}
|
||||||
"completion_tokens_override": 128
|
|
||||||
},
|
},
|
||||||
"slo": {
|
"slo": {
|
||||||
"target_pass_rate": 0.95,
|
"target_pass_rate": 0.95,
|
||||||
|
|||||||
99
scripts/calibrate_time_scale.py
Normal file
99
scripts/calibrate_time_scale.py
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Criterion-A time_scale calibration.
|
||||||
|
|
||||||
|
Binary-search the smallest replay_time_scale whose A-family L-C-A similarity to the
|
||||||
|
real (scale=1.0) arrival process stays >= tau. Uniform time scaling distorts only
|
||||||
|
the A axis (rate + fano; interarrival CV is scale-invariant), so this bounds the
|
||||||
|
arrival-axis distortion introduced by compression using the same similarity metric
|
||||||
|
Stop-A uses. Pure trace metadata -> deterministic, no GPU needed.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
PYTHONPATH=src python3 scripts/calibrate_time_scale.py \
|
||||||
|
--trace trace_windows/traces/chat_w20260311_1000.jsonl \
|
||||||
|
--gpu-count 8 --min-input 0 --max-input 8192 --tau 0.9
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from aituner.lca import _family_similarity, build_workload_profile
|
||||||
|
from aituner.trace import TraceRequest, WindowRecord
|
||||||
|
|
||||||
|
|
||||||
|
def load_rows(path: Path, lo: int, hi: int) -> list[dict]:
|
||||||
|
with path.open(encoding="utf-8") as fh:
|
||||||
|
rows = [json.loads(l) for l in fh if l.strip()]
|
||||||
|
return [r for r in rows if lo <= int(r["input_length"]) <= hi]
|
||||||
|
|
||||||
|
|
||||||
|
def build_requests(rows: list[dict]) -> tuple[list[TraceRequest], float, float]:
|
||||||
|
reqs = []
|
||||||
|
for i, r in enumerate(rows):
|
||||||
|
reqs.append(
|
||||||
|
TraceRequest(
|
||||||
|
row_id=str(r.get("chat_id", i)),
|
||||||
|
arrival_s=float(r["timestamp"]),
|
||||||
|
sampling_u=float(r.get("sampling_u", 0.0)),
|
||||||
|
body={},
|
||||||
|
prompt_tokens_hint=int(r["input_length"]),
|
||||||
|
completion_tokens_hint=int(r["output_length"]),
|
||||||
|
metadata={"hash_ids": r.get("hash_ids") if isinstance(r.get("hash_ids"), list) else None},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
amin = min(x.arrival_s for x in reqs)
|
||||||
|
amax = max(x.arrival_s for x in reqs)
|
||||||
|
return reqs, amin, amax
|
||||||
|
|
||||||
|
|
||||||
|
def profile_at(reqs, amin, amax, gpu_count, scale):
|
||||||
|
rs = [
|
||||||
|
TraceRequest(
|
||||||
|
x.row_id, (x.arrival_s - amin) * scale, x.sampling_u, x.body,
|
||||||
|
x.prompt_tokens_hint, x.completion_tokens_hint, x.metadata,
|
||||||
|
)
|
||||||
|
for x in reqs
|
||||||
|
]
|
||||||
|
span = (amax - amin) * scale
|
||||||
|
w = WindowRecord(
|
||||||
|
window_id="w", trace_path="", trace_type="chat",
|
||||||
|
window_start=0.0, window_end=span, source_payload={"block_size": 64},
|
||||||
|
)
|
||||||
|
return build_workload_profile(rs, w, gpu_count=gpu_count, length_mode="total")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--trace", type=Path, required=True)
|
||||||
|
ap.add_argument("--gpu-count", type=int, default=8)
|
||||||
|
ap.add_argument("--min-input", type=int, default=0)
|
||||||
|
ap.add_argument("--max-input", type=int, default=8192)
|
||||||
|
ap.add_argument("--tau", type=float, default=0.9)
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
rows = load_rows(args.trace, args.min_input, args.max_input)
|
||||||
|
reqs, amin, amax = build_requests(rows)
|
||||||
|
print(f"n={len(reqs)} raw arrival span={amax - amin:.1f}s")
|
||||||
|
base = profile_at(reqs, amin, amax, args.gpu_count, 1.0)
|
||||||
|
print(f"{'scale':>6} {'simA':>7} {'rate/gpu':>9} {'fano':>8} {'span_s':>8}")
|
||||||
|
for s in (1.0, 0.95, 0.9, 0.85, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2):
|
||||||
|
p = profile_at(reqs, amin, amax, args.gpu_count, s)
|
||||||
|
a = _family_similarity(base.vector, p.vector)["A"]
|
||||||
|
print(f"{s:6.2f} {a:7.3f} {math.expm1(p.vector[7]):9.3f} {math.expm1(p.vector[9]):8.2f} {(amax-amin)*s:8.1f}")
|
||||||
|
|
||||||
|
lo, hi = 0.05, 1.0
|
||||||
|
for _ in range(40):
|
||||||
|
mid = (lo + hi) / 2
|
||||||
|
a = _family_similarity(base.vector, profile_at(reqs, amin, amax, args.gpu_count, mid).vector)["A"]
|
||||||
|
if a >= args.tau:
|
||||||
|
hi = mid
|
||||||
|
else:
|
||||||
|
lo = mid
|
||||||
|
print(f"\nsmallest scale with simA>={args.tau}: {hi:.4f} (arrival span {(amax-amin)*hi:.0f}s)")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
27
scripts/run_ablation_pair_d1.sh
Normal file
27
scripts/run_ablation_pair_d1.sh
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# 12-iteration harness-vs-naive ablation, both arms on dash1 (clean paired run,
|
||||||
|
# no host confound). Substrate: real output_length (no completion override),
|
||||||
|
# replay_time_scale=0.8775 (criterion-A, sim_A>=0.90), Stop-A on (LCA offered
|
||||||
|
# window), per-probe Stop-A-consistent drain deadline. Harness stops early; naive
|
||||||
|
# runs the full budget. Run from the repo root on dash1.
|
||||||
|
set -u
|
||||||
|
export OPENAI_API_KEY=$(python3 -c 'import json,pathlib;print(json.load(open(pathlib.Path.home()/".codex/auth.json"))["OPENAI_API_KEY"])')
|
||||||
|
# codex config.toml points at a dash0-local proxy (127.0.0.1:11235); on dash1 the
|
||||||
|
# LLM endpoint is reachable directly, so force a direct connection.
|
||||||
|
export http_proxy= https_proxy= all_proxy= HTTP_PROXY= HTTPS_PROXY= ALL_PROXY= no_proxy='*'
|
||||||
|
mkdir -p .aituner
|
||||||
|
rm -rf .aituner/abl12-harness .aituner/abl12-naive .aituner/ABLATION12_DONE
|
||||||
|
|
||||||
|
echo "=== harness ON (12-iter) start $(date -Is) ==="
|
||||||
|
PYTHONPATH=src python3 -m aituner.cli study tune \
|
||||||
|
--spec configs/examples/dash0_qwen27b_ablation_harness_on.json \
|
||||||
|
--store-root .aituner/abl12-harness --max-trials 12 --skip-baseline > .aituner/abl12-harness.log 2>&1
|
||||||
|
echo "=== harness ON (12-iter) done $(date -Is) ==="
|
||||||
|
|
||||||
|
echo "=== naive OFF (12-iter) start $(date -Is) ==="
|
||||||
|
PYTHONPATH=src python3 -m aituner.cli study tune \
|
||||||
|
--spec configs/examples/dash0_qwen27b_ablation_naive_off.json \
|
||||||
|
--store-root .aituner/abl12-naive --max-trials 12 --skip-baseline > .aituner/abl12-naive.log 2>&1
|
||||||
|
echo "=== naive OFF (12-iter) done $(date -Is) ==="
|
||||||
|
|
||||||
|
touch .aituner/ABLATION12_DONE
|
||||||
@@ -18,7 +18,7 @@ from .engine import build_launch_recipe
|
|||||||
from .http_client import HttpClientError, stream_chat_completion, wait_for_server
|
from .http_client import HttpClientError, stream_chat_completion, wait_for_server
|
||||||
from .lca import find_convergence_prefix, resolve_length_mode
|
from .lca import find_convergence_prefix, resolve_length_mode
|
||||||
from .search import ThresholdProbe, binary_search_max_feasible
|
from .search import ThresholdProbe, binary_search_max_feasible
|
||||||
from .slo import RequestOutcome, evaluate_request, summarize_evaluations
|
from .slo import RequestOutcome, _rule_threshold_ms, evaluate_request, summarize_evaluations
|
||||||
from .spec import ConfigPatch, SamplingSearchSpec, TrialSpec, load_study_spec, to_jsonable
|
from .spec import ConfigPatch, SamplingSearchSpec, TrialSpec, load_study_spec, to_jsonable
|
||||||
from .trace import TraceRequest, load_trace_requests, select_requests_for_threshold
|
from .trace import TraceRequest, load_trace_requests, select_requests_for_threshold
|
||||||
|
|
||||||
@@ -254,6 +254,34 @@ def _ignore_sigterm_if_main() -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _probe_drain_deadline(
|
||||||
|
reqs: list[TraceRequest], slo: Any, *, ceiling: float | None
|
||||||
|
) -> float | None:
|
||||||
|
"""Stop-A-consistent per-probe drain deadline (wall-clock seconds).
|
||||||
|
|
||||||
|
The deadline is the time a *feasible* config needs to drain the admitted set:
|
||||||
|
the last admitted arrival plus the worst-case TTFT budget plus the p99 output
|
||||||
|
length times the TPOT budget. A config that cannot finish by this deadline is
|
||||||
|
genuinely SLO-infeasible, so the clock never pre-empts the LCA-matched offered
|
||||||
|
window (Stop-A) -- it only fails the unfit. ``ceiling`` is a hard safety cap.
|
||||||
|
"""
|
||||||
|
if not reqs or slo.tpot_rule is None:
|
||||||
|
return ceiling
|
||||||
|
last_arrival = max(float(r.arrival_s or 0.0) for r in reqs)
|
||||||
|
inputs = sorted(int(r.prompt_tokens_hint or 0) for r in reqs)
|
||||||
|
outputs = sorted(int(r.completion_tokens_hint or 0) for r in reqs)
|
||||||
|
|
||||||
|
def _p99(xs: list[int]) -> int:
|
||||||
|
return xs[min(len(xs) - 1, int(0.99 * len(xs)))] if xs else 0
|
||||||
|
|
||||||
|
p99_in, p99_out = _p99(inputs), _p99(outputs)
|
||||||
|
tpot_ms = _rule_threshold_ms(slo.tpot_rule, p99_in)
|
||||||
|
ttft_ms = _rule_threshold_ms(slo.ttft_rule, p99_in) if slo.ttft_rule is not None else 0.0
|
||||||
|
margin_s = 30.0
|
||||||
|
deadline = last_arrival + (ttft_ms + p99_out * tpot_ms) / 1000.0 + margin_s
|
||||||
|
return min(float(ceiling), deadline) if ceiling else deadline
|
||||||
|
|
||||||
|
|
||||||
def _adaptive_replay_set(
|
def _adaptive_replay_set(
|
||||||
selected: list[TraceRequest],
|
selected: list[TraceRequest],
|
||||||
*,
|
*,
|
||||||
@@ -640,7 +668,9 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
|
|||||||
max_concurrency=study.trace.max_concurrency,
|
max_concurrency=study.trace.max_concurrency,
|
||||||
target_pass_rate=study.slo.target_pass_rate,
|
target_pass_rate=study.slo.target_pass_rate,
|
||||||
max_lag_s=study.trace.early_stop_max_lag_s,
|
max_lag_s=study.trace.early_stop_max_lag_s,
|
||||||
max_elapsed_s=study.trace.early_stop_max_elapsed_s,
|
max_elapsed_s=_probe_drain_deadline(
|
||||||
|
reqs, study.slo, ceiling=study.trace.early_stop_max_elapsed_s
|
||||||
|
),
|
||||||
evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo),
|
evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo),
|
||||||
drain_inflight_on_early_stop=not restart_after_early_stop,
|
drain_inflight_on_early_stop=not restart_after_early_stop,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ from aituner.store import StudyStore
|
|||||||
from aituner.trace import load_trace_requests, summarize_window
|
from aituner.trace import load_trace_requests, summarize_window
|
||||||
from aituner.worker import (
|
from aituner.worker import (
|
||||||
_adaptive_replay_set,
|
_adaptive_replay_set,
|
||||||
|
_probe_drain_deadline,
|
||||||
_install_sigterm_as_keyboardinterrupt,
|
_install_sigterm_as_keyboardinterrupt,
|
||||||
_restore_sigterm,
|
_restore_sigterm,
|
||||||
_should_extend_on_boundary,
|
_should_extend_on_boundary,
|
||||||
@@ -535,6 +536,38 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_probe_drain_deadline_tracks_admitted_set_and_caps_at_ceiling(self) -> None:
|
||||||
|
slo = SloSpec.from_dict(
|
||||||
|
{
|
||||||
|
"target_pass_rate": 0.95,
|
||||||
|
"ttft_rule": {"kind": "linear_ms", "intercept_ms": 4000, "per_token_ms": 0.125},
|
||||||
|
"tpot_rule": {"kind": "fixed_ms", "threshold_ms": 50},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def req(arrival_s: float, in_tok: int, out_tok: int) -> TraceRequest:
|
||||||
|
return TraceRequest(
|
||||||
|
row_id="r",
|
||||||
|
arrival_s=arrival_s,
|
||||||
|
sampling_u=0.1,
|
||||||
|
body={},
|
||||||
|
prompt_tokens_hint=in_tok,
|
||||||
|
completion_tokens_hint=out_tok,
|
||||||
|
metadata={},
|
||||||
|
)
|
||||||
|
|
||||||
|
# 100 requests, last arrival 500s, p99 in=8000 / out=2000.
|
||||||
|
reqs = [req(float(i * 5), 8000, 2000) for i in range(100)]
|
||||||
|
# deadline = last_arrival + (ttft_ms + p99_out*tpot_ms)/1000 + margin
|
||||||
|
# = 495 + (5000 + 2000*50)/1000 + 30 = 495 + 105 + 30 = 630
|
||||||
|
self.assertAlmostEqual(
|
||||||
|
_probe_drain_deadline(reqs, slo, ceiling=1000.0), 630.0, places=3
|
||||||
|
)
|
||||||
|
# Ceiling caps a deadline that would otherwise exceed it.
|
||||||
|
self.assertEqual(_probe_drain_deadline(reqs, slo, ceiling=400.0), 400.0)
|
||||||
|
# No requests or no TPOT rule -> fall back to the ceiling.
|
||||||
|
self.assertEqual(_probe_drain_deadline([], slo, ceiling=400.0), 400.0)
|
||||||
|
|
||||||
def test_linear_ms_ttft_rule_scales_with_input_length(self) -> None:
|
def test_linear_ms_ttft_rule_scales_with_input_length(self) -> None:
|
||||||
slo = SloSpec.from_dict(
|
slo = SloSpec.from_dict(
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user