Add deeper infeasible probe diagnostics

This commit is contained in:
2026-04-05 01:44:38 +08:00
parent 0aa607a4f1
commit 84c5d6bd80
5 changed files with 249 additions and 3 deletions

View File

@@ -0,0 +1,97 @@
{
"study_id": "dash0-qwen30b-chat-10min-manual-trial2-maxprobes6",
"hardware": {
"gpu_count": 4,
"gpu_model": "H20",
"host_candidates": [
"dash0"
]
},
"model": {
"model_id": "qwen3-30b-a3b",
"served_model_name": "qwen3-30b-smoke"
},
"engine": {
"engine_name": "vllm",
"engine_version": "0.13.0rc2.dev2111+gb44b43f43.d20260309",
"exec_path": "/usr/local/bin/vllm",
"cwd": "/home/admin/cpfs/wjh/aituner/aituner",
"host": "127.0.0.1",
"port": 18081,
"healthcheck_path": "/v1/models",
"ready_timeout_s": 900,
"request_timeout_s": 900,
"launch_args": [
"serve",
"/home/admin/resource/model/464482ce.qwen3-30b-a3b/1m-instruct-0726-fp4"
],
"base_envs": {
"CUDA_VISIBLE_DEVICES": "4,5,6,7",
"VLLM_FP8_USE_BLADNN": "1",
"VLLM_MOE_USE_BLADNN": "1"
},
"base_flags": {
"host": "127.0.0.1",
"port": 18081,
"served-model-name": "qwen3-30b-smoke",
"max-model-len": 65536,
"disable-log-requests": true,
"trust-remote-code": true
},
"tunable_envs": [
"VLLM_ATTENTION_BACKEND"
],
"tunable_flags": [
"tensor-parallel-size",
"max-num-seqs",
"max-num-batched-tokens",
"gpu-memory-utilization",
"block-size"
],
"python_executable": "python3"
},
"trace": {
"windows_path": "/home/admin/cpfs/wjh/aituner/aituner/trace_windows/windows.json",
"window_id": "chat_w20260311_1000",
"u_field": "sampling_u",
"timestamp_field": "timestamp",
"max_concurrency": 64,
"replay_time_scale": 1.0,
"early_stop_max_lag_s": 120.0,
"early_stop_max_elapsed_s": 900.0
},
"slo": {
"target_pass_rate": 0.95,
"ttft_rule": {
"kind": "step_ms",
"buckets": [
{
"max_input_tokens": 4096,
"threshold_ms": 15000
},
{
"max_input_tokens": 16384,
"threshold_ms": 30000
},
{
"threshold_ms": 45000
}
]
},
"tpot_rule": {
"kind": "fixed_ms",
"threshold_ms": 1500
}
},
"search": {
"low": 0.0,
"high": 1.0,
"tolerance": 0.1,
"max_probes": 6,
"sample_seed": 20260325
},
"llm": {
"system_prompt": "Propose a single engine config patch that increases the maximum feasible sampling_u under the SLO target.",
"max_history_trials": 8
}
}

View File

@@ -0,0 +1,22 @@
{
"observation": "Long-context chat traffic is dominated by large prefills, so TTFT under the stepped SLO is the binding constraint. A launch-safe baseline should spread compute across all 4 GPUs while limiting concurrent long-prefill contention.",
"diagnosis": "The FLASHINFER attempt failed at startup, but the safer FLASH_ATTN family launched successfully. A conservative seq cap plus a moderate batched-token cap is a better baseline for diagnosing whether the SLO itself is too aggressive under this trace.",
"config_patch": {
"env_patch": {
"VLLM_ATTENTION_BACKEND": "FLASH_ATTN"
},
"flag_patch": {
"tensor-parallel-size": 4,
"max-num-seqs": 16,
"max-num-batched-tokens": 24576,
"gpu-memory-utilization": 0.94,
"block-size": 32
}
},
"expected_effects": [
"Stable 4-GPU launch without FLASHINFER warmup failure",
"Lower head-of-line blocking than larger sequence caps",
"More interpretable lower-bound throughput/SLO measurement"
],
"why_not_previous_failures": "This proposal keeps the launch-safe FLASH_ATTN backend and the conservative batching limits that already avoided the earlier FLASHINFER startup failure."
}

View File

@@ -36,7 +36,7 @@ def binary_search_max_feasible(
cur_low = low
cur_high = high
for _ in range(max_probes):
if cur_high - cur_low <= tolerance:
if cur_high - cur_low <= tolerance and best_payload is not None:
break
threshold = round((cur_low + cur_high) / 2.0, 12)
probe = cache.get(threshold)

View File

@@ -4,6 +4,7 @@ import json
import math
import os
import signal
import statistics
import subprocess
import threading
import time
@@ -30,6 +31,55 @@ class ProbePayload:
outcomes: list[dict[str, Any]]
early_stopped: bool = False
early_stop_reason: str = ""
latency_summary: dict[str, Any] | None = None
def _percentile(values: list[float], p: float) -> float | None:
if not values:
return None
ordered = sorted(values)
idx = min(len(ordered) - 1, max(0, math.ceil((p / 100.0) * len(ordered)) - 1))
return float(ordered[idx])
def _metric_summary(values: list[float]) -> dict[str, Any]:
return {
"count": len(values),
"mean": float(statistics.fmean(values)) if values else None,
"p50": _percentile(values, 50.0),
"p90": _percentile(values, 90.0),
"p95": _percentile(values, 95.0),
"p99": _percentile(values, 99.0),
}
def _reason_counts(evaluations: list[Any]) -> dict[str, int]:
counts: dict[str, int] = {}
for evaluation in evaluations:
for reason in evaluation.reasons:
counts[reason] = counts.get(reason, 0) + 1
return counts
def _latency_summary(
*,
outcomes: list[RequestOutcome],
evaluations: list[Any],
study: Any,
) -> dict[str, Any]:
ttft_values = [float(item.ttft_ms) for item in outcomes if item.ttft_ms is not None]
tpot_values = [float(item.tpot_ms) for item in outcomes if item.tpot_ms is not None]
return {
"observed_request_count": len(outcomes),
"ttft_ms": _metric_summary(ttft_values),
"tpot_ms": _metric_summary(tpot_values),
"failed_reason_counts": _reason_counts(evaluations),
"slo": {
"target_pass_rate": study.slo.target_pass_rate,
"ttft_rule": study.slo.ttft_rule.__dict__ if study.slo.ttft_rule is not None else None,
"tpot_rule": study.slo.tpot_rule.__dict__ if study.slo.tpot_rule is not None else None,
},
}
def _trial_spec_from_json(path: Path) -> TrialSpec:
payload = json.loads(path.read_text(encoding="utf-8"))
@@ -299,6 +349,11 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
feasible=bool(summary["feasible"]),
early_stopped=early_stopped,
early_stop_reason=early_stop_reason,
latency_summary=_latency_summary(
outcomes=outcomes,
evaluations=evaluations,
study=study,
),
outcomes=[
{
"request_id": outcome.request_id,
@@ -321,6 +376,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
"feasible": payload.feasible,
"early_stopped": payload.early_stopped,
"early_stop_reason": payload.early_stop_reason,
"latency_summary": payload.latency_summary,
}
probe_history.append(probe_record)
StudyStore.write_json(Path(trial.probe_log_path), probe_history)
@@ -356,11 +412,23 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
"request_rate": probe.payload.request_rate,
"early_stopped": probe.payload.early_stopped,
"early_stop_reason": probe.payload.early_stop_reason,
"latency_summary": probe.payload.latency_summary,
},
}
for probe in search.probes
],
}
if best is None and search.probes:
last_probe = search.probes[-1]
result["all_infeasible_diagnostics"] = {
"threshold": last_probe.threshold,
"request_count": last_probe.payload.request_count,
"request_rate": last_probe.payload.request_rate,
"pass_rate": last_probe.payload.pass_rate,
"early_stopped": last_probe.payload.early_stopped,
"early_stop_reason": last_probe.payload.early_stop_reason,
"latency_summary": last_probe.payload.latency_summary,
}
StudyStore.write_json(Path(trial.result_path), result)
return result
except Exception as exc: # noqa: BLE001

View File

@@ -12,11 +12,16 @@ from aituner.http_client import _openai_url, _should_bypass_proxy
from aituner.job import append_job, build_trial_job
from aituner.llm import build_prompt, parse_proposal_text
from aituner.search import ThresholdProbe, binary_search_max_feasible
from aituner.slo import RequestOutcome, summarize_evaluations
from aituner.slo import RequestOutcome, evaluate_request, summarize_evaluations
from aituner.spec import Proposal, StudyState, TrialSummary, load_study_spec
from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window
from aituner.worker import _replay_requests, _terminate_process_tree, _wait_for_server_or_exit
from aituner.worker import (
_latency_summary,
_replay_requests,
_terminate_process_tree,
_wait_for_server_or_exit,
)
from aituner.trace import TraceRequest
@@ -406,6 +411,31 @@ class CoreFlowTests(unittest.TestCase):
self.assertGreaterEqual(result.best_threshold, 0.5)
self.assertIsNotNone(result.best_feasible_payload)
def test_binary_search_continues_below_tolerance_when_all_infeasible(self) -> None:
seen = []
def evaluator(threshold):
seen.append(threshold)
return ThresholdProbe(
threshold=threshold,
feasible=False,
payload={"threshold": threshold},
)
result = binary_search_max_feasible(
low=0.0,
high=1.0,
tolerance=0.1,
max_probes=6,
evaluator=evaluator,
)
self.assertIsNone(result.best_feasible_payload)
self.assertEqual(len(result.probes), 6)
self.assertEqual(
seen,
[0.5, 0.25, 0.125, 0.0625, 0.03125, 0.015625],
)
def test_trace_max_requests_uses_window_wide_downsample(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
@@ -899,6 +929,35 @@ class CoreFlowTests(unittest.TestCase):
self.assertEqual(len(replayed), 2)
self.assertEqual(replayed[1].error, "slo_pass_rate_unrecoverable")
def test_latency_summary_reports_quantiles_and_slo(self) -> None:
study = load_study_spec(_write_study_assets(Path(tempfile.mkdtemp())))
outcomes = [
RequestOutcome(
request_id="r1",
success=True,
ttft_ms=100.0,
tpot_ms=10.0,
prompt_tokens=100,
completion_tokens=10,
),
RequestOutcome(
request_id="r2",
success=True,
ttft_ms=200.0,
tpot_ms=20.0,
prompt_tokens=5000,
completion_tokens=10,
),
]
evaluations = [evaluate_request(item, study.slo) for item in outcomes]
summary = _latency_summary(outcomes=outcomes, evaluations=evaluations, study=study)
self.assertEqual(summary["observed_request_count"], 2)
self.assertEqual(summary["ttft_ms"]["mean"], 150.0)
self.assertEqual(summary["ttft_ms"]["p50"], 100.0)
self.assertEqual(summary["ttft_ms"]["p99"], 200.0)
self.assertEqual(summary["tpot_ms"]["mean"], 15.0)
self.assertEqual(summary["slo"]["target_pass_rate"], 0.95)
def test_wait_for_server_or_exit_fails_fast_when_process_exits(self) -> None:
process = mock.Mock()
process.poll.return_value = 17