From 48ae72467a8faa46d0eb5079d7a371b7f4e89b3c Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 29 May 2026 18:19:12 +0800 Subject: [PATCH] Replayer: closed-loop inter-turn think-time mode Add --inter-turn-think (env REPLAY_INTER_TURN_THINK_S): turn 1 fires on session admission, each later turn a FIXED think-time after the previous turn COMPLETES, ignoring absolute trace timestamps. Combined with --max-inflight-sessions (env REPLAY_MAX_INFLIGHT) this is a stable N-user closed loop, removing the open-loop "fire immediately because timestamp is in the past" retrigger artifact. Needed for the dispatch-coupling (wall-clock amplification) sweep. Co-Authored-By: Claude Opus 4.8 --- replayer/__main__.py | 13 +++++++++++-- replayer/replay.py | 25 +++++++++++++++++++------ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/replayer/__main__.py b/replayer/__main__.py index 41885db..d9aa0ce 100644 --- a/replayer/__main__.py +++ b/replayer/__main__.py @@ -5,6 +5,7 @@ from __future__ import annotations import argparse import asyncio import logging +import os from pathlib import Path from .replay import ReplayConfig, replay_trace @@ -19,9 +20,16 @@ def main() -> None: p.add_argument("--model", type=str, default="default", help="Model name for API") p.add_argument("--concurrency-limit", type=int, default=2000, help="Max concurrent HTTP requests (safety limit)") - p.add_argument("--max-inflight-sessions", type=int, default=None, + _env_inflight = os.environ.get("REPLAY_MAX_INFLIGHT") + p.add_argument("--max-inflight-sessions", type=int, + default=int(_env_inflight) if _env_inflight else None, help="Cap on concurrent sessions (None = unlimited; " - "trace-driven dispatch otherwise)") + "trace-driven dispatch otherwise). Env: REPLAY_MAX_INFLIGHT") + _env_think = os.environ.get("REPLAY_INTER_TURN_THINK_S") + p.add_argument("--inter-turn-think", type=float, + default=float(_env_think) if _env_think else None, + help="Closed-loop think-time (s) after each turn completes; " + "ignore absolute trace schedule. Env: REPLAY_INTER_TURN_THINK_S") p.add_argument("--request-timeout", type=float, default=600.0) p.add_argument("--request-limit", type=int, default=None, help="Limit number of requests to replay") @@ -42,6 +50,7 @@ def main() -> None: request_timeout_s=args.request_timeout, request_limit=args.request_limit, max_inflight_sessions=args.max_inflight_sessions, + inter_turn_think_s=args.inter_turn_think, ) results = asyncio.run(replay_trace(config)) diff --git a/replayer/replay.py b/replayer/replay.py index a43906f..eb16c77 100644 --- a/replayer/replay.py +++ b/replayer/replay.py @@ -60,6 +60,12 @@ class ReplayConfig: request_limit: int | None = None model_name: str = "default" max_inflight_sessions: int | None = None # cap on concurrent sessions; None = unlimited + # Closed-loop think-time mode: if set, ignore absolute trace timestamps for + # subsequent turns — fire turn 1 on session admission, then each later turn a + # FIXED think-time after the previous turn COMPLETES. Combined with + # max_inflight_sessions=N this is a stable N-user closed-loop (no open-loop + # runaway), so it removes the "immediate retrigger under load" artifact. + inter_turn_think_s: float | None = None def _build_prompt_token_ids(req: TraceRequest) -> list[int]: @@ -279,12 +285,19 @@ async def _run_session( await session_sem.acquire() realized_context: list[int] = [] try: - for req in state.turns: - # Wait until this request's trace timestamp - target_wall = (req.timestamp_s - earliest_ts) - elapsed = time.perf_counter() - sweep_start - if elapsed < target_wall: - await asyncio.sleep(target_wall - elapsed) + for turn_idx, req in enumerate(state.turns): + if config.inter_turn_think_s is not None: + # Closed-loop: turn 1 fires on admission; later turns wait a fixed + # think-time AFTER the previous turn completed (no absolute schedule, + # so no "fire immediately because timestamp is in the past"). + if turn_idx > 0: + await asyncio.sleep(config.inter_turn_think_s) + else: + # Original: dispatch at the request's absolute trace timestamp. + target_wall = (req.timestamp_s - earliest_ts) + elapsed = time.perf_counter() - sweep_start + if elapsed < target_wall: + await asyncio.sleep(target_wall - elapsed) token_ids = _apply_realized_prefix( _build_prompt_token_ids(req),