Files
xserv/tools/bench/servers.py
Gahow Wang d5dcf1a5ab bench: PP harness (xserv --pp vs llama.cpp -sm layer)
runner/servers: add --pp for both engines (xserv --pp N; llama.cpp
-sm layer over N GPUs). New drivers: pp_final.sh (sequential latency +
per-GPU VRAM + byte-exact correctness), pp_diag.sh (single x2 vs pp4 x2
determinism control), pp_quality_full.sh / pp_llama_47.sh (AIME+GSM8K
matrix, xserv on 0-3 || llama on 4-7), summarize_pp/summarize_fullq,
pp_time.py latency probe.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-29 18:45:59 +08:00

169 lines
5.4 KiB
Python

"""Start/stop xserv-server and llama-server as subprocesses.
The benchmark driver treats both systems as black-box HTTP servers — it does
not import their Rust/C++ code. This keeps the comparison fair (same wire
protocol, no in-process shortcut) and avoids coupling the bench harness to
internal APIs.
"""
from __future__ import annotations
import contextlib
import os
import signal
import subprocess
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from .config import SystemEndpoint
@dataclass
class ServerHandle:
endpoint: SystemEndpoint
proc: subprocess.Popen[bytes] | None
log_path: str | None
def _wait_ready(base_url: str, health_path: str, timeout_s: float) -> bool:
url = base_url.rstrip("/") + health_path
deadline = time.monotonic() + timeout_s
last_err = ""
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=5) as r:
if r.status == 200:
return True
except (urllib.error.URLError, ConnectionError, TimeoutError) as e:
last_err = repr(e)
time.sleep(1.0)
print(f"[servers] not ready after {timeout_s}s ({url}): {last_err}", file=sys.stderr)
return False
def start_server(ep: SystemEndpoint, log_dir: str) -> ServerHandle:
"""Launch `ep.launch_cmd` if set; otherwise assume it's already running."""
if ep.launch_cmd is None:
if _wait_ready(ep.base_url, ep.health_path, timeout_s=10.0):
print(f"[servers] reusing already-running {ep.name} at {ep.base_url}")
return ServerHandle(endpoint=ep, proc=None, log_path=None)
raise RuntimeError(f"{ep.name}: no launch_cmd and not reachable at {ep.base_url}")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, f"{ep.name.replace('.', '_')}.log")
log_f = open(log_path, "wb")
env = os.environ.copy()
env.update(ep.launch_env)
print(f"[servers] launching {ep.name}: {' '.join(ep.launch_cmd)}")
print(f"[servers] log: {log_path}")
proc = subprocess.Popen(
ep.launch_cmd,
cwd=ep.launch_cwd,
env=env,
stdout=log_f,
stderr=subprocess.STDOUT,
# Own process group so SIGTERM kills children (llama-server in particular).
preexec_fn=os.setsid,
)
ok = _wait_ready(ep.base_url, ep.health_path, timeout_s=ep.ready_timeout_s)
if not ok:
# Hand back enough info so caller can drain logs before dying.
log_f.flush()
try:
os.killpg(proc.pid, signal.SIGTERM)
except ProcessLookupError:
pass
raise RuntimeError(
f"{ep.name} failed to become ready (see {log_path}). "
"Common causes: model path wrong, port already in use, OOM."
)
return ServerHandle(endpoint=ep, proc=proc, log_path=log_path)
def stop_server(h: ServerHandle, *, grace_s: float = 10.0) -> None:
if h.proc is None:
return
print(f"[servers] stopping {h.endpoint.name} (pid {h.proc.pid})")
try:
os.killpg(h.proc.pid, signal.SIGTERM)
except ProcessLookupError:
return
try:
h.proc.wait(timeout=grace_s)
except subprocess.TimeoutExpired:
print(f"[servers] {h.endpoint.name} did not exit, sending SIGKILL")
with contextlib.suppress(ProcessLookupError):
os.killpg(h.proc.pid, signal.SIGKILL)
h.proc.wait(timeout=5)
# ---------- launch-command builders ----------
def xserv_launch_cmd(
bin_path: str,
model_dir: str,
port: int,
*,
max_batch: int,
max_seq_len: int,
tp: int = 1,
pp: int = 1,
) -> list[str]:
cmd = [
bin_path,
model_dir,
"--port", str(port),
"--max-batch", str(max_batch),
"--max-seq-len", str(max_seq_len),
]
if pp > 1:
cmd += ["--pp", str(pp)] # xserv binds stage s -> GPU s internally
elif tp > 1:
cmd += ["--tp", str(tp)] # xserv binds rank r -> GPU r internally
return cmd
def llama_cpp_launch_cmd(
bin_path: str,
gguf_path: str,
port: int,
*,
n_parallel: int,
ctx_per_slot: int,
n_gpu_layers: int = 99,
tp: int = 1,
pp: int = 1,
) -> list[str]:
# llama.cpp DIVIDES total -c across --parallel slots: per-slot context is
# n_ctx / n_parallel. xserv gives each sequence the full max_seq_len, so to
# match we must set total -c = ctx_per_slot * n_parallel. Getting this wrong
# silently truncates long generations (e.g. AIME) on llama.cpp's side.
total_ctx = ctx_per_slot * n_parallel
cmd = [
bin_path,
"-m", gguf_path,
"--port", str(port),
"--host", "0.0.0.0",
"-c", str(total_ctx),
"-ngl", str(n_gpu_layers),
"--parallel", str(n_parallel),
# NOTE: do NOT pass --log-disable; its startup log reports per-slot
# n_ctx, which is exactly the diagnostic that catches ctx misconfig.
]
if pp > 1:
# Pipeline / layer split across the visible GPUs (llama.cpp default).
cmd += ["--split-mode", "layer", "-ts", ",".join(["1"] * pp)]
elif tp > 1:
# Tensor-parallel split across the visible GPUs (caller restricts the
# set via CUDA_VISIBLE_DEVICES in launch_env). Row-split is llama.cpp's
# tensor-parallel mode (vs the default layer/pipeline split).
cmd += ["--split-mode", "row"]
return cmd