replayer: wire --max-inflight-sessions cap into replay loop (B2)

Trace-driven dispatch is preserved by default (semaphore=None when the
flag is not set), but operators can now cap concurrent sessions to
reproduce session-admission scenarios from earlier sweeps without
artificial time compression.
This commit is contained in:
2026-05-23 21:04:09 +08:00
parent 2c7f7fdaae
commit 7c7f8b951a
2 changed files with 13 additions and 0 deletions

View File

@@ -19,6 +19,9 @@ def main() -> None:
p.add_argument("--model", type=str, default="default", help="Model name for API")
p.add_argument("--concurrency-limit", type=int, default=2000,
help="Max concurrent HTTP requests (safety limit)")
p.add_argument("--max-inflight-sessions", type=int, default=None,
help="Cap on concurrent sessions (None = unlimited; "
"trace-driven dispatch otherwise)")
p.add_argument("--request-timeout", type=float, default=600.0)
p.add_argument("--request-limit", type=int, default=None,
help="Limit number of requests to replay")
@@ -38,6 +41,7 @@ def main() -> None:
concurrency_limit=args.concurrency_limit,
request_timeout_s=args.request_timeout,
request_limit=args.request_limit,
max_inflight_sessions=args.max_inflight_sessions,
)
results = asyncio.run(replay_trace(config))

View File

@@ -283,6 +283,11 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
trace_span = latest_ts - earliest_ts
request_sem = asyncio.Semaphore(config.concurrency_limit)
session_sem = (
asyncio.Semaphore(config.max_inflight_sessions)
if config.max_inflight_sessions and config.max_inflight_sessions > 0
else None
)
sink = IncrementalMetricSink(config.output_path)
@@ -291,6 +296,9 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
qps = n_requests / trace_span if trace_span > 0 else 0
logger.info("Replaying %d sessions (%d requests) over %.0fs (%.2f req/s)",
n_sessions, n_requests, trace_span, qps)
if session_sem is not None:
logger.info("Session admission cap: %d concurrent sessions",
config.max_inflight_sessions)
pre_metrics = await _snapshot_prefix_cache_metrics(config.endpoint_url)
sweep_start = time.perf_counter()
@@ -313,6 +321,7 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
request_sem=request_sem,
earliest_ts=earliest_ts, sweep_start=sweep_start,
sink=sink,
session_sem=session_sem,
))
for sid, turns in sessions
]