Kill engine process groups on trial cleanup

This commit is contained in:
2026-04-05 01:30:05 +08:00
parent e00bedb466
commit 0aa607a4f1
2 changed files with 39 additions and 8 deletions

View File

@@ -2,6 +2,8 @@ from __future__ import annotations
import json import json
import math import math
import os
import signal
import subprocess import subprocess
import threading import threading
import time import time
@@ -218,6 +220,29 @@ def _wait_for_server_or_exit(
raise HttpClientError(f"Timed out waiting for {base_url}{healthcheck_path}: {last_error}") raise HttpClientError(f"Timed out waiting for {base_url}{healthcheck_path}: {last_error}")
def _terminate_process_tree(process: subprocess.Popen[str], *, timeout_s: float = 30.0) -> None:
if process.poll() is not None:
return
try:
pgid = os.getpgid(process.pid)
except ProcessLookupError:
return
try:
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
return
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if process.poll() is not None:
return
time.sleep(0.1)
try:
os.killpg(pgid, signal.SIGKILL)
except ProcessLookupError:
return
process.wait(timeout=timeout_s)
def run_trial(trial_spec_path: Path) -> dict[str, Any]: def run_trial(trial_spec_path: Path) -> dict[str, Any]:
from .store import StudyStore from .store import StudyStore
@@ -237,6 +262,7 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
stdout=engine_log, stdout=engine_log,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
text=True, text=True,
start_new_session=True,
) )
probe_history: list[dict[str, Any]] = [] probe_history: list[dict[str, Any]] = []
try: try:
@@ -352,10 +378,4 @@ def run_trial(trial_spec_path: Path) -> dict[str, Any]:
StudyStore.write_json(Path(trial.result_path), result) StudyStore.write_json(Path(trial.result_path), result)
return result return result
finally: finally:
if process.poll() is None: _terminate_process_tree(process, timeout_s=30.0)
process.terminate()
try:
process.wait(timeout=30)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=30)

View File

@@ -16,7 +16,7 @@ from aituner.slo import RequestOutcome, summarize_evaluations
from aituner.spec import Proposal, StudyState, TrialSummary, load_study_spec from aituner.spec import Proposal, StudyState, TrialSummary, load_study_spec
from aituner.store import StudyStore from aituner.store import StudyStore
from aituner.trace import load_trace_requests, summarize_window from aituner.trace import load_trace_requests, summarize_window
from aituner.worker import _replay_requests, _wait_for_server_or_exit from aituner.worker import _replay_requests, _terminate_process_tree, _wait_for_server_or_exit
from aituner.trace import TraceRequest from aituner.trace import TraceRequest
@@ -910,6 +910,17 @@ class CoreFlowTests(unittest.TestCase):
ready_timeout_s=10.0, ready_timeout_s=10.0,
) )
def test_terminate_process_tree_kills_process_group(self) -> None:
process = mock.Mock()
process.pid = 1234
process.poll.side_effect = [None, None, 0]
process.wait.return_value = 0
with mock.patch("aituner.worker.os.getpgid", return_value=1234):
with mock.patch("aituner.worker.os.killpg") as mock_killpg:
_terminate_process_tree(process, timeout_s=1.0)
mock_killpg.assert_called_once()
self.assertEqual(mock_killpg.call_args[0][0], 1234)
def test_openai_url_avoids_double_v1(self) -> None: def test_openai_url_avoids_double_v1(self) -> None:
self.assertEqual( self.assertEqual(
_openai_url("http://example.com", "/v1/chat/completions"), _openai_url("http://example.com", "/v1/chat/completions"),