From d76ac49198431cd65879e56c85488b119ca3ba72 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Sun, 26 Apr 2026 00:32:39 +0800 Subject: [PATCH] Relaunch engine after early-stopped probes --- src/aituner/worker.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/aituner/worker.py b/src/aituner/worker.py index 3e7bba9..d54bd9b 100644 --- a/src/aituner/worker.py +++ b/src/aituner/worker.py @@ -341,15 +341,18 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: artifact_dir.mkdir(parents=True, exist_ok=True) engine_log_path = Path(trial.engine_log_path) with engine_log_path.open("w", encoding="utf-8") as engine_log: - process = subprocess.Popen( # noqa: S603 - recipe.argv, - cwd=recipe.cwd, - env=recipe.env, - stdout=engine_log, - stderr=subprocess.STDOUT, - text=True, - start_new_session=True, - ) + def launch_process() -> subprocess.Popen[str]: + return subprocess.Popen( # noqa: S603 + recipe.argv, + cwd=recipe.cwd, + env=recipe.env, + stdout=engine_log, + stderr=subprocess.STDOUT, + text=True, + start_new_session=True, + ) + + process = launch_process() probe_history: list[dict[str, Any]] = [] failure_stage = "engine_launch" try: @@ -362,6 +365,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: failure_stage = "probe_search" def evaluator(threshold: float) -> ThresholdProbe[ProbePayload]: + nonlocal process selected = select_requests_for_threshold(requests, threshold=threshold) outcomes, early_stopped, early_stop_reason = _replay_requests( selected, @@ -372,6 +376,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: max_lag_s=study.trace.early_stop_max_lag_s, max_elapsed_s=study.trace.early_stop_max_elapsed_s, evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo), + drain_inflight_on_early_stop=False, ) evaluations, summary = summarize_evaluations(outcomes, study.slo) request_rate = ( @@ -418,6 +423,15 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]: } probe_history.append(probe_record) StudyStore.write_json(Path(trial.probe_log_path), probe_history) + if early_stopped: + _terminate_process_tree(process, timeout_s=30.0) + process = launch_process() + _wait_for_server_or_exit( + process, + base_url=recipe.base_url, + healthcheck_path=recipe.healthcheck_path, + ready_timeout_s=recipe.ready_timeout_s, + ) return ThresholdProbe( threshold=threshold, feasible=payload.feasible,