Clean vLLM process groups after parent exit
This commit is contained in:
@@ -401,26 +401,36 @@ def _wait_for_server_or_exit(
|
||||
raise HttpClientError(f"Timed out waiting for {base_url}{healthcheck_path}: {last_error}")
|
||||
|
||||
|
||||
def _process_group_exists(pgid: int) -> bool:
|
||||
try:
|
||||
os.killpg(pgid, 0)
|
||||
return True
|
||||
except ProcessLookupError:
|
||||
return False
|
||||
|
||||
|
||||
def _terminate_process_tree(process: subprocess.Popen[str], *, timeout_s: float = 30.0) -> None:
|
||||
if process.poll() is not None:
|
||||
return
|
||||
try:
|
||||
pgid = os.getpgid(process.pid)
|
||||
except ProcessLookupError:
|
||||
return
|
||||
# Children can keep the session/process group alive after the vLLM API
|
||||
# server exits. In that case the group id is still the original pid
|
||||
# because the process was launched with start_new_session=True.
|
||||
pgid = process.pid
|
||||
try:
|
||||
os.killpg(pgid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
return
|
||||
deadline = time.monotonic() + timeout_s
|
||||
while time.monotonic() < deadline:
|
||||
if process.poll() is not None:
|
||||
if not _process_group_exists(pgid):
|
||||
return
|
||||
time.sleep(0.1)
|
||||
try:
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
except ProcessLookupError:
|
||||
return
|
||||
if process.poll() is None:
|
||||
process.wait(timeout=timeout_s)
|
||||
|
||||
|
||||
|
||||
@@ -4549,13 +4549,29 @@ class CoreFlowTests(unittest.TestCase):
|
||||
def test_terminate_process_tree_kills_process_group(self) -> None:
|
||||
process = mock.Mock()
|
||||
process.pid = 1234
|
||||
process.poll.side_effect = [None, None, 0]
|
||||
process.poll.return_value = None
|
||||
process.wait.return_value = 0
|
||||
with mock.patch("aituner.worker.os.getpgid", return_value=1234):
|
||||
with mock.patch("aituner.worker.os.killpg") as mock_killpg:
|
||||
with mock.patch(
|
||||
"aituner.worker.os.killpg",
|
||||
side_effect=[None, ProcessLookupError],
|
||||
) as mock_killpg:
|
||||
_terminate_process_tree(process, timeout_s=1.0)
|
||||
mock_killpg.assert_called_once()
|
||||
self.assertEqual(mock_killpg.call_args[0][0], 1234)
|
||||
self.assertEqual(mock_killpg.call_args_list[0].args[0], 1234)
|
||||
self.assertEqual(mock_killpg.call_args_list[0].args[1], 15)
|
||||
|
||||
def test_terminate_process_tree_kills_group_when_parent_already_exited(self) -> None:
|
||||
process = mock.Mock()
|
||||
process.pid = 1234
|
||||
process.poll.return_value = 0
|
||||
with mock.patch("aituner.worker.os.getpgid", side_effect=ProcessLookupError):
|
||||
with mock.patch(
|
||||
"aituner.worker.os.killpg",
|
||||
side_effect=[None, ProcessLookupError],
|
||||
) as mock_killpg:
|
||||
_terminate_process_tree(process, timeout_s=1.0)
|
||||
self.assertEqual(mock_killpg.call_args_list[0].args[0], 1234)
|
||||
process.wait.assert_not_called()
|
||||
|
||||
def test_openai_url_avoids_double_v1(self) -> None:
|
||||
self.assertEqual(
|
||||
|
||||
Reference in New Issue
Block a user