Relaunch engine after early-stopped probes

This commit is contained in:
2026-04-26 00:32:39 +08:00
parent 440f5b491b
commit d76ac49198

View File

@@ -341,7 +341,8 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
artifact_dir.mkdir(parents=True, exist_ok=True) artifact_dir.mkdir(parents=True, exist_ok=True)
engine_log_path = Path(trial.engine_log_path) engine_log_path = Path(trial.engine_log_path)
with engine_log_path.open("w", encoding="utf-8") as engine_log: with engine_log_path.open("w", encoding="utf-8") as engine_log:
process = subprocess.Popen( # noqa: S603 def launch_process() -> subprocess.Popen[str]:
return subprocess.Popen( # noqa: S603
recipe.argv, recipe.argv,
cwd=recipe.cwd, cwd=recipe.cwd,
env=recipe.env, env=recipe.env,
@@ -350,6 +351,8 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
text=True, text=True,
start_new_session=True, start_new_session=True,
) )
process = launch_process()
probe_history: list[dict[str, Any]] = [] probe_history: list[dict[str, Any]] = []
failure_stage = "engine_launch" failure_stage = "engine_launch"
try: try:
@@ -362,6 +365,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
failure_stage = "probe_search" failure_stage = "probe_search"
def evaluator(threshold: float) -> ThresholdProbe[ProbePayload]: def evaluator(threshold: float) -> ThresholdProbe[ProbePayload]:
nonlocal process
selected = select_requests_for_threshold(requests, threshold=threshold) selected = select_requests_for_threshold(requests, threshold=threshold)
outcomes, early_stopped, early_stop_reason = _replay_requests( outcomes, early_stopped, early_stop_reason = _replay_requests(
selected, selected,
@@ -372,6 +376,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
max_lag_s=study.trace.early_stop_max_lag_s, max_lag_s=study.trace.early_stop_max_lag_s,
max_elapsed_s=study.trace.early_stop_max_elapsed_s, max_elapsed_s=study.trace.early_stop_max_elapsed_s,
evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo), evaluate_outcome=lambda outcome: evaluate_request(outcome, study.slo),
drain_inflight_on_early_stop=False,
) )
evaluations, summary = summarize_evaluations(outcomes, study.slo) evaluations, summary = summarize_evaluations(outcomes, study.slo)
request_rate = ( request_rate = (
@@ -418,6 +423,15 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
} }
probe_history.append(probe_record) probe_history.append(probe_record)
StudyStore.write_json(Path(trial.probe_log_path), probe_history) StudyStore.write_json(Path(trial.probe_log_path), probe_history)
if early_stopped:
_terminate_process_tree(process, timeout_s=30.0)
process = launch_process()
_wait_for_server_or_exit(
process,
base_url=recipe.base_url,
healthcheck_path=recipe.healthcheck_path,
ready_timeout_s=recipe.ready_timeout_s,
)
return ThresholdProbe( return ThresholdProbe(
threshold=threshold, threshold=threshold,
feasible=payload.feasible, feasible=payload.feasible,