From 84c5d6bd80aa77886ae65d5644d4a20e175379de Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Sun, 5 Apr 2026 01:44:38 +0800 Subject: [PATCH] Add deeper infeasible probe diagnostics --- .../dash0_manual_trial2_maxprobes6.json | 97 +++++++++++++++++++ .../dash0_manual_trial2_proposal.json | 22 +++++ src/aituner/search.py | 2 +- src/aituner/worker.py | 68 +++++++++++++ tests/test_core_flow.py | 63 +++++++++++- 5 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 configs/examples/dash0_manual_trial2_maxprobes6.json create mode 100644 configs/examples/dash0_manual_trial2_proposal.json diff --git a/configs/examples/dash0_manual_trial2_maxprobes6.json b/configs/examples/dash0_manual_trial2_maxprobes6.json new file mode 100644 index 0000000..c4b972f --- /dev/null +++ b/configs/examples/dash0_manual_trial2_maxprobes6.json @@ -0,0 +1,97 @@ +{ + "study_id": "dash0-qwen30b-chat-10min-manual-trial2-maxprobes6", + "hardware": { + "gpu_count": 4, + "gpu_model": "H20", + "host_candidates": [ + "dash0" + ] + }, + "model": { + "model_id": "qwen3-30b-a3b", + "served_model_name": "qwen3-30b-smoke" + }, + "engine": { + "engine_name": "vllm", + "engine_version": "0.13.0rc2.dev2111+gb44b43f43.d20260309", + "exec_path": "/usr/local/bin/vllm", + "cwd": "/home/admin/cpfs/wjh/aituner/aituner", + "host": "127.0.0.1", + "port": 18081, + "healthcheck_path": "/v1/models", + "ready_timeout_s": 900, + "request_timeout_s": 900, + "launch_args": [ + "serve", + "/home/admin/resource/model/464482ce.qwen3-30b-a3b/1m-instruct-0726-fp4" + ], + "base_envs": { + "CUDA_VISIBLE_DEVICES": "4,5,6,7", + "VLLM_FP8_USE_BLADNN": "1", + "VLLM_MOE_USE_BLADNN": "1" + }, + "base_flags": { + "host": "127.0.0.1", + "port": 18081, + "served-model-name": "qwen3-30b-smoke", + "max-model-len": 65536, + "disable-log-requests": true, + "trust-remote-code": true + }, + "tunable_envs": [ + "VLLM_ATTENTION_BACKEND" + ], + "tunable_flags": [ + "tensor-parallel-size", + "max-num-seqs", + "max-num-batched-tokens", + "gpu-memory-utilization", + "block-size" + ], + "python_executable": "python3" + }, + "trace": { + "windows_path": "/home/admin/cpfs/wjh/aituner/aituner/trace_windows/windows.json", + "window_id": "chat_w20260311_1000", + "u_field": "sampling_u", + "timestamp_field": "timestamp", + "max_concurrency": 64, + "replay_time_scale": 1.0, + "early_stop_max_lag_s": 120.0, + "early_stop_max_elapsed_s": 900.0 + }, + "slo": { + "target_pass_rate": 0.95, + "ttft_rule": { + "kind": "step_ms", + "buckets": [ + { + "max_input_tokens": 4096, + "threshold_ms": 15000 + }, + { + "max_input_tokens": 16384, + "threshold_ms": 30000 + }, + { + "threshold_ms": 45000 + } + ] + }, + "tpot_rule": { + "kind": "fixed_ms", + "threshold_ms": 1500 + } + }, + "search": { + "low": 0.0, + "high": 1.0, + "tolerance": 0.1, + "max_probes": 6, + "sample_seed": 20260325 + }, + "llm": { + "system_prompt": "Propose a single engine config patch that increases the maximum feasible sampling_u under the SLO target.", + "max_history_trials": 8 + } +} diff --git a/configs/examples/dash0_manual_trial2_proposal.json b/configs/examples/dash0_manual_trial2_proposal.json new file mode 100644 index 0000000..9421c37 --- /dev/null +++ b/configs/examples/dash0_manual_trial2_proposal.json @@ -0,0 +1,22 @@ +{ + "observation": "Long-context chat traffic is dominated by large prefills, so TTFT under the stepped SLO is the binding constraint. A launch-safe baseline should spread compute across all 4 GPUs while limiting concurrent long-prefill contention.", + "diagnosis": "The FLASHINFER attempt failed at startup, but the safer FLASH_ATTN family launched successfully. A conservative seq cap plus a moderate batched-token cap is a better baseline for diagnosing whether the SLO itself is too aggressive under this trace.", + "config_patch": { + "env_patch": { + "VLLM_ATTENTION_BACKEND": "FLASH_ATTN" + }, + "flag_patch": { + "tensor-parallel-size": 4, + "max-num-seqs": 16, + "max-num-batched-tokens": 24576, + "gpu-memory-utilization": 0.94, + "block-size": 32 + } + }, + "expected_effects": [ + "Stable 4-GPU launch without FLASHINFER warmup failure", + "Lower head-of-line blocking than larger sequence caps", + "More interpretable lower-bound throughput/SLO measurement" + ], + "why_not_previous_failures": "This proposal keeps the launch-safe FLASH_ATTN backend and the conservative batching limits that already avoided the earlier FLASHINFER startup failure." +} diff --git a/src/aituner/search.py b/src/aituner/search.py index 5f8e5cb..7b1d2d1 100644 --- a/src/aituner/search.py +++ b/src/aituner/search.py @@ -36,7 +36,7 @@ def binary_search_max_feasible( cur_low = low cur_high = high for _ in range(max_probes): - if cur_high - cur_low <= tolerance: + if cur_high - cur_low <= tolerance and best_payload is not None: break threshold = round((cur_low + cur_high) / 2.0, 12) probe = cache.get(threshold) diff --git a/src/aituner/worker.py b/src/aituner/worker.py index a4df42c..2902d89 100644 --- a/src/aituner/worker.py +++ b/src/aituner/worker.py @@ -4,6 +4,7 @@ import json import math import os import signal +import statistics import subprocess import threading import time @@ -30,6 +31,55 @@ class ProbePayload: outcomes: list[dict[str, Any]] early_stopped: bool = False early_stop_reason: str = "" + latency_summary: dict[str, Any] | None = None + + +def _percentile(values: list[float], p: float) -> float | None: + if not values: + return None + ordered = sorted(values) + idx = min(len(ordered) - 1, max(0, math.ceil((p / 100.0) * len(ordered)) - 1)) + return float(ordered[idx]) + + +def _metric_summary(values: list[float]) -> dict[str, Any]: + return { + "count": len(values), + "mean": float(statistics.fmean(values)) if values else None, + "p50": _percentile(values, 50.0), + "p90": _percentile(values, 90.0), + "p95": _percentile(values, 95.0), + "p99": _percentile(values, 99.0), + } + + +def _reason_counts(evaluations: list[Any]) -> dict[str, int]: + counts: dict[str, int] = {} + for evaluation in evaluations: + for reason in evaluation.reasons: + counts[reason] = counts.get(reason, 0) + 1 + return counts + + +def _latency_summary( + *, + outcomes: list[RequestOutcome], + evaluations: list[Any], + study: Any, +) -> dict[str, Any]: + ttft_values = [float(item.ttft_ms) for item in outcomes if item.ttft_ms is not None] + tpot_values = [float(item.tpot_ms) for item in outcomes if item.tpot_ms is not None] + return { + "observed_request_count": len(outcomes), + "ttft_ms": _metric_summary(ttft_values), + "tpot_ms": _metric_summary(tpot_values), + "failed_reason_counts": _reason_counts(evaluations), + "slo": { + "target_pass_rate": study.slo.target_pass_rate, + "ttft_rule": study.slo.ttft_rule.__dict__ if study.slo.ttft_rule is not None else None, + "tpot_rule": study.slo.tpot_rule.__dict__ if study.slo.tpot_rule is not None else None, + }, + } def _trial_spec_from_json(path: Path) -> TrialSpec: payload = json.loads(path.read_text(encoding="utf-8")) @@ -299,6 +349,11 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: feasible=bool(summary["feasible"]), early_stopped=early_stopped, early_stop_reason=early_stop_reason, + latency_summary=_latency_summary( + outcomes=outcomes, + evaluations=evaluations, + study=study, + ), outcomes=[ { "request_id": outcome.request_id, @@ -321,6 +376,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: "feasible": payload.feasible, "early_stopped": payload.early_stopped, "early_stop_reason": payload.early_stop_reason, + "latency_summary": payload.latency_summary, } probe_history.append(probe_record) StudyStore.write_json(Path(trial.probe_log_path), probe_history) @@ -356,11 +412,23 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: "request_rate": probe.payload.request_rate, "early_stopped": probe.payload.early_stopped, "early_stop_reason": probe.payload.early_stop_reason, + "latency_summary": probe.payload.latency_summary, }, } for probe in search.probes ], } + if best is None and search.probes: + last_probe = search.probes[-1] + result["all_infeasible_diagnostics"] = { + "threshold": last_probe.threshold, + "request_count": last_probe.payload.request_count, + "request_rate": last_probe.payload.request_rate, + "pass_rate": last_probe.payload.pass_rate, + "early_stopped": last_probe.payload.early_stopped, + "early_stop_reason": last_probe.payload.early_stop_reason, + "latency_summary": last_probe.payload.latency_summary, + } StudyStore.write_json(Path(trial.result_path), result) return result except Exception as exc: # noqa: BLE001 diff --git a/tests/test_core_flow.py b/tests/test_core_flow.py index f2ee9ea..4d24d6b 100644 --- a/tests/test_core_flow.py +++ b/tests/test_core_flow.py @@ -12,11 +12,16 @@ from aituner.http_client import _openai_url, _should_bypass_proxy from aituner.job import append_job, build_trial_job from aituner.llm import build_prompt, parse_proposal_text from aituner.search import ThresholdProbe, binary_search_max_feasible -from aituner.slo import RequestOutcome, summarize_evaluations +from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations from aituner.spec import Proposal, StudyState, TrialSummary, load_study_spec from aituner.store import StudyStore from aituner.trace import load_trace_requests, summarize_window -from aituner.worker import _replay_requests, _terminate_process_tree, _wait_for_server_or_exit +from aituner.worker import ( + _latency_summary, + _replay_requests, + _terminate_process_tree, + _wait_for_server_or_exit, +) from aituner.trace import TraceRequest @@ -406,6 +411,31 @@ class CoreFlowTests(unittest.TestCase): self.assertGreaterEqual(result.best_threshold, 0.5) self.assertIsNotNone(result.best_feasible_payload) + def test_binary_search_continues_below_tolerance_when_all_infeasible(self) -> None: + seen = [] + + def evaluator(threshold): + seen.append(threshold) + return ThresholdProbe( + threshold=threshold, + feasible=False, + payload={"threshold": threshold}, + ) + + result = binary_search_max_feasible( + low=0.0, + high=1.0, + tolerance=0.1, + max_probes=6, + evaluator=evaluator, + ) + self.assertIsNone(result.best_feasible_payload) + self.assertEqual(len(result.probes), 6) + self.assertEqual( + seen, + [0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625], + ) + def test_trace_max_requests_uses_window_wide_downsample(self) -> None: with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) @@ -899,6 +929,35 @@ class CoreFlowTests(unittest.TestCase): self.assertEqual(len(replayed), 2) self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable") + def test_latency_summary_reports_quantiles_and_slo(self) -> None: + study = load_study_spec(_write_study_assets(Path(tempfile.mkdtemp()))) + outcomes = [ + RequestOutcome( + request_id="r1", + success=True, + ttft_ms=100.0, + tpot_ms=10.0, + prompt_tokens=100, + completion_tokens=10, + ), + RequestOutcome( + request_id="r2", + success=True, + ttft_ms=200.0, + tpot_ms=20.0, + prompt_tokens=5000, + completion_tokens=10, + ), + ] + evaluations = [evaluate_request(item, study.slo) for item in outcomes] + summary = _latency_summary(outcomes=outcomes, evaluations=evaluations, study=study) + self.assertEqual(summary["observed_request_count"], 2) + self.assertEqual(summary["ttft_ms"]["mean"], 150.0) + self.assertEqual(summary["ttft_ms"]["p50"], 100.0) + self.assertEqual(summary["ttft_ms"]["p99"], 200.0) + self.assertEqual(summary["tpot_ms"]["mean"], 15.0) + self.assertEqual(summary["slo"]["target_pass_rate"], 0.95) + def test_wait_for_server_or_exit_fails_fast_when_process_exits(self) -> None: process = mock.Mock() process.poll.return_value = 17