Drain inflight requests after early stop
This commit is contained in:
@@ -61,6 +61,21 @@ Improve AITuner convergence for the `dash0` internal vLLM + Qwen3.5-27B 0-8k cha
|
|||||||
- Remote `compileall` passed.
|
- Remote `compileall` passed.
|
||||||
- Remote `unittest discover` initially exposed two pre-existing path-sensitive tests that hardcoded `/home/gahow/phd/aituner`; fixed them to derive `REPO_ROOT` from the test file path.
|
- Remote `unittest discover` initially exposed two pre-existing path-sensitive tests that hardcoded `/home/gahow/phd/aituner`; fixed them to derive `REPO_ROOT` from the test file path.
|
||||||
|
|
||||||
|
### 2026-04-25 16:38-16:58 CST
|
||||||
|
|
||||||
|
- Started real run in tmux session `aituner_harness_qwen27b_0_8k_20260425`.
|
||||||
|
- Store root: `.aituner/harness-studies-20260425`.
|
||||||
|
- First proposal followed the harness:
|
||||||
|
- proposal: `tensor-parallel-size: 2`;
|
||||||
|
- rationale: L profile is prefill-sensitive, prefix reuse is low, arrivals are smooth, so probe adjacent TP before runtime batching knobs.
|
||||||
|
- First high-load probe at `sampling_u=0.03125` was infeasible:
|
||||||
|
- request rate 0.895 req/s;
|
||||||
|
- pass rate 0.145;
|
||||||
|
- p95 TTFT 4063 ms and p95 TPOT 113 ms;
|
||||||
|
- failed reasons included `tpot_ms>50.0` and `slo_pass_rate_unrecoverable`.
|
||||||
|
- Important implementation issue found: after an early-stopped probe, the worker returned while in-flight HTTP requests could continue occupying the engine, stalling/polluting the next binary-search probe.
|
||||||
|
- Action: stopped the run and freed GPUs. Updating `worker._replay_requests` to drain in-flight requests after early stop before the next probe starts.
|
||||||
|
|
||||||
Remaining next steps:
|
Remaining next steps:
|
||||||
|
|
||||||
1. Start a real harness-guided Qwen3.5-27B 0-8k chat tuning run from `configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json`.
|
1. Start a real harness-guided Qwen3.5-27B 0-8k chat tuning run from `configs/examples/dash0_qwen27b_tight_slo_run4_0_8k.json`.
|
||||||
|
|||||||
@@ -135,6 +135,7 @@ def _replay_requests(
|
|||||||
max_lag_s: float | None,
|
max_lag_s: float | None,
|
||||||
max_elapsed_s: float | None,
|
max_elapsed_s: float | None,
|
||||||
evaluate_outcome: Callable[[RequestOutcome], Any],
|
evaluate_outcome: Callable[[RequestOutcome], Any],
|
||||||
|
drain_inflight_on_early_stop: bool = True,
|
||||||
) -> tuple[list[RequestOutcome], bool, str]:
|
) -> tuple[list[RequestOutcome], bool, str]:
|
||||||
outcomes_by_id: dict[str, RequestOutcome] = {}
|
outcomes_by_id: dict[str, RequestOutcome] = {}
|
||||||
lock = threading.Lock()
|
lock = threading.Lock()
|
||||||
@@ -209,6 +210,35 @@ def _replay_requests(
|
|||||||
if sleep_for > 0:
|
if sleep_for > 0:
|
||||||
time.sleep(min(sleep_for, 0.1))
|
time.sleep(min(sleep_for, 0.1))
|
||||||
if early_stopped:
|
if early_stopped:
|
||||||
|
if drain_inflight_on_early_stop and futures_by_request:
|
||||||
|
done, not_done = wait(list(futures_by_request), timeout=timeout_s)
|
||||||
|
for future in done:
|
||||||
|
request = futures_by_request[future]
|
||||||
|
try:
|
||||||
|
outcomes_by_id[request.row_id] = future.result(timeout=0)
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
outcomes_by_id[request.row_id] = RequestOutcome(
|
||||||
|
request_id=request.row_id,
|
||||||
|
success=False,
|
||||||
|
ttft_ms=None,
|
||||||
|
tpot_ms=None,
|
||||||
|
prompt_tokens=request.prompt_tokens_hint,
|
||||||
|
completion_tokens=request.completion_tokens_hint,
|
||||||
|
error=early_stop_reason or "probe_early_stopped",
|
||||||
|
)
|
||||||
|
for future in not_done:
|
||||||
|
future.cancel()
|
||||||
|
request = futures_by_request[future]
|
||||||
|
outcomes_by_id[request.row_id] = RequestOutcome(
|
||||||
|
request_id=request.row_id,
|
||||||
|
success=False,
|
||||||
|
ttft_ms=None,
|
||||||
|
tpot_ms=None,
|
||||||
|
prompt_tokens=request.prompt_tokens_hint,
|
||||||
|
completion_tokens=request.completion_tokens_hint,
|
||||||
|
error=early_stop_reason or "probe_early_stopped",
|
||||||
|
)
|
||||||
|
else:
|
||||||
for future in list(futures_by_request):
|
for future in list(futures_by_request):
|
||||||
future.cancel()
|
future.cancel()
|
||||||
for request in futures_by_request.values():
|
for request in futures_by_request.values():
|
||||||
|
|||||||
@@ -2191,6 +2191,7 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
max_lag_s=None,
|
max_lag_s=None,
|
||||||
max_elapsed_s=None,
|
max_elapsed_s=None,
|
||||||
evaluate_outcome=fake_evaluate,
|
evaluate_outcome=fake_evaluate,
|
||||||
|
drain_inflight_on_early_stop=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(submitted, ["r0", "r1"])
|
self.assertEqual(submitted, ["r0", "r1"])
|
||||||
@@ -2254,6 +2255,7 @@ class CoreFlowTests(unittest.TestCase):
|
|||||||
max_lag_s=None,
|
max_lag_s=None,
|
||||||
max_elapsed_s=1.0,
|
max_elapsed_s=1.0,
|
||||||
evaluate_outcome=fake_evaluate,
|
evaluate_outcome=fake_evaluate,
|
||||||
|
drain_inflight_on_early_stop=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.assertEqual(submitted, ["r0"])
|
self.assertEqual(submitted, ["r0"])
|
||||||
|
|||||||
Reference in New Issue
Block a user