Fix replay methodology: trace-driven dispatch, no artificial limits

The replayer was artificially limiting concurrency with --max-inflight-sessions
(semaphore) and --time-scale (time compression), producing unrealistically low
1 req/GPU load that masked prefill-decode interference.

Replayer changes:
- Remove session_sem and time_scale entirely
- Each request dispatched at its trace timestamp exactly
- Sessions still sequential (turn N+1 waits for turn N completion)
- If turn completes late, next turn fires immediately

Sampler changes:
- Add --sample-ratio for GPU-proportional session sampling
- Keep --target-requests for backwards compat
- No time compression (preserve original arrival pattern)

bench.sh: remove --time-scale and --max-inflight-sessions args

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-23 12:43:41 +08:00
parent c8ba666517
commit 4089ffd63f
4 changed files with 84 additions and 103 deletions

View File

@@ -17,10 +17,8 @@ def main() -> None:
p.add_argument("--endpoint", type=str, required=True, p.add_argument("--endpoint", type=str, required=True,
help="vLLM server URL (e.g. http://localhost:8000)") help="vLLM server URL (e.g. http://localhost:8000)")
p.add_argument("--model", type=str, default="default", help="Model name for API") p.add_argument("--model", type=str, default="default", help="Model name for API")
p.add_argument("--time-scale", type=float, default=1.0, p.add_argument("--concurrency-limit", type=int, default=2000,
help="Time compression (>1 = faster)") help="Max concurrent HTTP requests (safety limit)")
p.add_argument("--max-inflight-sessions", type=int, default=32)
p.add_argument("--concurrency-limit", type=int, default=256)
p.add_argument("--request-timeout", type=float, default=600.0) p.add_argument("--request-timeout", type=float, default=600.0)
p.add_argument("--request-limit", type=int, default=None, p.add_argument("--request-limit", type=int, default=None,
help="Limit number of requests to replay") help="Limit number of requests to replay")
@@ -37,8 +35,6 @@ def main() -> None:
output_path=args.output, output_path=args.output,
endpoint_url=args.endpoint.rstrip("/"), endpoint_url=args.endpoint.rstrip("/"),
model_name=args.model, model_name=args.model,
time_scale=args.time_scale,
max_inflight_sessions=args.max_inflight_sessions,
concurrency_limit=args.concurrency_limit, concurrency_limit=args.concurrency_limit,
request_timeout_s=args.request_timeout, request_timeout_s=args.request_timeout,
request_limit=args.request_limit, request_limit=args.request_limit,

View File

@@ -1,16 +1,15 @@
"""Trace replayer — send requests to vLLM following trace timing. """Trace replayer — send requests to vLLM following trace timing.
Supports both vLLM's /v1/completions (OpenAI-compatible) and /generate Uses hash_ids from the trace to construct synthetic prompts that
(SGLang-style) endpoints. Uses hash_ids from the trace to construct reproduce realistic prefix-cache hit patterns.
synthetic prompts that reproduce realistic prefix-cache hit patterns.
Key behaviors: Key behaviors:
- Trace-driven dispatch: each request is sent at its trace timestamp.
No artificial concurrency limits or time compression.
- Per-session sequencing: turns within a session are sent in order, - Per-session sequencing: turns within a session are sent in order,
each waiting for the previous to complete before dispatching. each waiting for the previous to complete before dispatching.
- Inter-session arrival: sessions start at their trace timestamps, If a turn completes after its successor's timestamp, the successor
scaled by --time-scale. fires immediately (no waiting for a past timestamp).
- Concurrency control: --max-inflight-sessions caps concurrent sessions;
--concurrency-limit caps total in-flight requests.
""" """
from __future__ import annotations from __future__ import annotations
@@ -56,9 +55,7 @@ class ReplayConfig:
trace_path: Path trace_path: Path
output_path: Path output_path: Path
endpoint_url: str # comma-separated for round-robin: "http://host:8000,http://host:8001" endpoint_url: str # comma-separated for round-robin: "http://host:8000,http://host:8001"
time_scale: float = 1.0 concurrency_limit: int = 2000
max_inflight_sessions: int = 32
concurrency_limit: int = 256
request_timeout_s: float = 600.0 request_timeout_s: float = 600.0
request_limit: int | None = None request_limit: int | None = None
model_name: str = "default" model_name: str = "default"
@@ -214,34 +211,25 @@ async def _run_session(
state: _SessionState, state: _SessionState,
config: ReplayConfig, config: ReplayConfig,
client: httpx.AsyncClient, client: httpx.AsyncClient,
session_sem: asyncio.Semaphore,
request_sem: asyncio.Semaphore, request_sem: asyncio.Semaphore,
earliest_ts: float, earliest_ts: float,
sweep_start: float, sweep_start: float,
sink: IncrementalMetricSink, sink: IncrementalMetricSink,
) -> list[RequestMetrics]: ) -> list[RequestMetrics]:
async with session_sem: for req in state.turns:
# Wait until this session's start time # Wait until this request's trace timestamp
offset = (state.turns[0].timestamp_s - earliest_ts) / config.time_scale target_wall = (req.timestamp_s - earliest_ts)
wait = offset - (time.perf_counter() - sweep_start) elapsed = time.perf_counter() - sweep_start
if wait > 0: if elapsed < target_wall:
await asyncio.sleep(wait) await asyncio.sleep(target_wall - elapsed)
for req in state.turns: token_ids = _build_prompt_token_ids(req)
# Intra-session: wait for turn's relative offset metric = await _dispatch_request(
if req != state.turns[0]: client=client, config=config, req=req,
target = (req.timestamp_s - state.turns[0].timestamp_s) / config.time_scale prompt_token_ids=token_ids, sem=request_sem,
elapsed = time.perf_counter() - sweep_start - offset )
if elapsed < target: state.metrics.append(metric)
await asyncio.sleep(target - elapsed) await sink.append(metric)
token_ids = _build_prompt_token_ids(req)
metric = await _dispatch_request(
client=client, config=config, req=req,
prompt_token_ids=token_ids, sem=request_sem,
)
state.metrics.append(metric)
await sink.append(metric)
return state.metrics return state.metrics
@@ -283,16 +271,18 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
sessions = sorted(by_session.items(), key=lambda kv: kv[1][0].timestamp_s) sessions = sorted(by_session.items(), key=lambda kv: kv[1][0].timestamp_s)
earliest_ts = sessions[0][1][0].timestamp_s earliest_ts = sessions[0][1][0].timestamp_s
latest_ts = max(r.timestamp_s for r in requests)
trace_span = latest_ts - earliest_ts
session_sem = asyncio.Semaphore(config.max_inflight_sessions)
request_sem = asyncio.Semaphore(config.concurrency_limit) request_sem = asyncio.Semaphore(config.concurrency_limit)
sink = IncrementalMetricSink(config.output_path) sink = IncrementalMetricSink(config.output_path)
n_sessions = len(sessions) n_sessions = len(sessions)
n_requests = len(requests) n_requests = len(requests)
logger.info("Replaying %d sessions (%d requests), time_scale=%.1f", qps = n_requests / trace_span if trace_span > 0 else 0
n_sessions, n_requests, config.time_scale) logger.info("Replaying %d sessions (%d requests) over %.0fs (%.2f req/s)",
n_sessions, n_requests, trace_span, qps)
pre_metrics = await _snapshot_prefix_cache_metrics(config.endpoint_url) pre_metrics = await _snapshot_prefix_cache_metrics(config.endpoint_url)
sweep_start = time.perf_counter() sweep_start = time.perf_counter()
@@ -312,7 +302,7 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
asyncio.create_task(_run_session( asyncio.create_task(_run_session(
state=_SessionState(session_id=sid, turns=turns), state=_SessionState(session_id=sid, turns=turns),
config=config, client=client, config=config, client=client,
session_sem=session_sem, request_sem=request_sem, request_sem=request_sem,
earliest_ts=earliest_ts, sweep_start=sweep_start, earliest_ts=earliest_ts, sweep_start=sweep_start,
sink=sink, sink=sink,
)) ))

View File

@@ -30,9 +30,7 @@ POLICY="linear" # linear | lmetric
N_INSTANCES=8 N_INSTANCES=8
BASE_PORT=8000 BASE_PORT=8000
PROXY_PORT=9090 PROXY_PORT=9090
REQUESTS=200 REQUESTS="" # empty = all requests in trace
TIME_SCALE=20
MAX_SESSIONS=8
HEAVY_THRESHOLD=20000 HEAVY_THRESHOLD=20000
NO_OFFLOAD=false NO_OFFLOAD=false
OVERLOAD_FACTOR_ARG="" OVERLOAD_FACTOR_ARG=""
@@ -46,8 +44,6 @@ while [[ $# -gt 0 ]]; do
--policy) POLICY="$2"; shift 2 ;; --policy) POLICY="$2"; shift 2 ;;
--instances) N_INSTANCES="$2"; shift 2 ;; --instances) N_INSTANCES="$2"; shift 2 ;;
--requests) REQUESTS="$2"; shift 2 ;; --requests) REQUESTS="$2"; shift 2 ;;
--time-scale) TIME_SCALE="$2"; shift 2 ;;
--sessions) MAX_SESSIONS="$2"; shift 2 ;;
--heavy-threshold) HEAVY_THRESHOLD="$2"; shift 2 ;; --heavy-threshold) HEAVY_THRESHOLD="$2"; shift 2 ;;
--no-offload) NO_OFFLOAD=true; shift ;; --no-offload) NO_OFFLOAD=true; shift ;;
--overload-factor) OVERLOAD_FACTOR_ARG="$2"; shift 2 ;; --overload-factor) OVERLOAD_FACTOR_ARG="$2"; shift 2 ;;
@@ -58,6 +54,7 @@ done
if [ -z "$TAG" ]; then if [ -z "$TAG" ]; then
echo "Usage: bench.sh --tag NAME --mode {baseline|elastic} [--instances N] [--policy {linear|lmetric}] [--requests N]" echo "Usage: bench.sh --tag NAME --mode {baseline|elastic} [--instances N] [--policy {linear|lmetric}] [--requests N]"
echo " Trace QPS is controlled by sample_trace.py --sample-ratio, not by bench.sh."
exit 1 exit 1
fi fi
@@ -76,9 +73,7 @@ cat > "$OUTDIR/config.json" << CONF
"policy": "$POLICY", "policy": "$POLICY",
"model": "$MODEL", "model": "$MODEL",
"n_instances": $N_INSTANCES, "n_instances": $N_INSTANCES,
"requests": $REQUESTS, "requests": "${REQUESTS:-all}",
"time_scale": $TIME_SCALE,
"max_sessions": $MAX_SESSIONS,
"heavy_threshold": $HEAVY_THRESHOLD, "heavy_threshold": $HEAVY_THRESHOLD,
"no_offload": "$NO_OFFLOAD", "no_offload": "$NO_OFFLOAD",
"overload_factor": "${OVERLOAD_FACTOR_ARG:-2.0}", "overload_factor": "${OVERLOAD_FACTOR_ARG:-2.0}",
@@ -245,7 +240,13 @@ launch_proxy() {
# ─── Run benchmark ───────────────────────────────────────────────────────── # ─── Run benchmark ─────────────────────────────────────────────────────────
run_benchmark() { run_benchmark() {
echo "[bench] Running $REQUESTS requests (time_scale=$TIME_SCALE, sessions=$MAX_SESSIONS)..." local request_args=""
if [ -n "$REQUESTS" ]; then
request_args="--request-limit $REQUESTS"
echo "[bench] Running $REQUESTS requests (trace-driven timing)..."
else
echo "[bench] Running all requests in trace (trace-driven timing)..."
fi
# Start GPU monitor in background # Start GPU monitor in background
bash "$PROJECT_DIR/scripts/gpu_monitor.sh" "$OUTDIR/gpu_util.csv" 5 & bash "$PROJECT_DIR/scripts/gpu_monitor.sh" "$OUTDIR/gpu_util.csv" 5 &
@@ -256,9 +257,7 @@ run_benchmark() {
--output "$OUTDIR/metrics.jsonl" \ --output "$OUTDIR/metrics.jsonl" \
--endpoint "http://localhost:$PROXY_PORT" \ --endpoint "http://localhost:$PROXY_PORT" \
--model "$MODEL" \ --model "$MODEL" \
--time-scale "$TIME_SCALE" \ $request_args \
--max-inflight-sessions "$MAX_SESSIONS" \
--request-limit "$REQUESTS" \
-v 2>&1 | tee "$OUTDIR/replayer.log" -v 2>&1 | tee "$OUTDIR/replayer.log"
# Stop GPU monitor # Stop GPU monitor
@@ -324,7 +323,7 @@ print('=' * 70)
echo "================================================================" echo "================================================================"
echo " bench.sh: $TAG" echo " bench.sh: $TAG"
echo " mode=$MODE policy=$POLICY requests=$REQUESTS overload_factor=${OVERLOAD_FACTOR_ARG:-2.0} max_batched_tokens=${MAX_BATCHED_TOKENS:-default}" echo " mode=$MODE policy=$POLICY requests=${REQUESTS:-all} overload_factor=${OVERLOAD_FACTOR_ARG:-2.0}"
echo " $(date)" echo " $(date)"
echo "================================================================" echo "================================================================"

View File

@@ -2,22 +2,28 @@
Preserves: Preserves:
- Complete session structure (all turns within a session kept together) - Complete session structure (all turns within a session kept together)
- Original arrival timing (inter-session and intra-session gaps) - Original arrival timing (re-zeroed to t=0 but NOT compressed)
- hash_ids for KV cache reuse patterns - hash_ids for KV cache reuse patterns
- Request type distribution - Request type distribution
Sampling strategy: Sampling strategy:
1. Group requests by session (derived from parent_chat_id chains) 1. Group requests by session (derived from parent_chat_id chains)
2. Randomly sample N sessions (or until target request count reached) 2. Randomly sample a fraction of sessions (--sample-ratio)
OR sample until target request count (--target-requests)
3. Re-zero timestamps so first event starts at t=0 3. Re-zero timestamps so first event starts at t=0
4. Optionally compress time axis to increase load density 4. The resulting QPS is proportional to the sample ratio,
preserving the production arrival pattern
Usage: Usage:
# Sample 1.6% of sessions (e.g., 8 GPUs / 500 cluster GPUs)
python scripts/sample_trace.py \\ python scripts/sample_trace.py \\
--input ~/ali-trace/trace-glm5.1-formatted/051315-051317.jsonl \\ --input ~/ali-trace/trace-glm5.1-formatted/051315-051317.jsonl \\
--output traces/sampled.jsonl \\ --output traces/sampled_ratio016.jsonl \\
--target-requests 5000 \\ --sample-ratio 0.016 --seed 42
--seed 42
# Sample by request count (legacy)
python scripts/sample_trace.py \\
--input ... --output ... --target-requests 1000 --seed 42
""" """
from __future__ import annotations from __future__ import annotations
@@ -58,39 +64,37 @@ def load_raw_rows(path: Path) -> dict[str, list[dict]]:
def sample_sessions( def sample_sessions(
rows_by_session: dict[str, list[dict]], rows_by_session: dict[str, list[dict]],
*, *,
target_requests: int, sample_ratio: float | None = None,
target_requests: int | None = None,
seed: int, seed: int,
strategy: str = "random",
) -> list[str]: ) -> list[str]:
"""Select sessions until target request count is reached.""" """Select sessions by ratio or until target request count."""
all_sids = list(rows_by_session.keys()) all_sids = list(rows_by_session.keys())
rng = random.Random(seed) rng = random.Random(seed)
rng.shuffle(all_sids)
if strategy == "random": if sample_ratio is not None:
rng.shuffle(all_sids) n_select = max(1, int(len(all_sids) * sample_ratio))
elif strategy == "sequential": return all_sids[:n_select]
pass # keep file order
else:
raise ValueError(f"Unknown strategy: {strategy}")
selected = [] if target_requests is not None:
total = 0 selected = []
for sid in all_sids: total = 0
selected.append(sid) for sid in all_sids:
total += len(rows_by_session[sid]) selected.append(sid)
if total >= target_requests: total += len(rows_by_session[sid])
break if total >= target_requests:
break
return selected
return selected raise ValueError("Must specify --sample-ratio or --target-requests")
def build_output( def build_output(
rows_by_session: dict[str, list[dict]], rows_by_session: dict[str, list[dict]],
selected: list[str], selected: list[str],
*,
time_scale: float = 1.0,
) -> list[dict]: ) -> list[dict]:
"""Build output rows with re-zeroed timestamps.""" """Build output rows with re-zeroed timestamps (no time compression)."""
out_rows = [] out_rows = []
for sid in selected: for sid in selected:
for row in rows_by_session[sid]: for row in rows_by_session[sid]:
@@ -103,10 +107,9 @@ def build_output(
if not out_rows: if not out_rows:
return out_rows return out_rows
# Re-zero: subtract earliest timestamp
t0 = float(out_rows[0]["timestamp"]) t0 = float(out_rows[0]["timestamp"])
for row in out_rows: for row in out_rows:
row["timestamp"] = (float(row["timestamp"]) - t0) / time_scale row["timestamp"] = float(row["timestamp"]) - t0
return out_rows return out_rows
@@ -125,17 +128,16 @@ def print_summary(
output_lens = [r["output_length"] for r in out_rows] output_lens = [r["output_length"] for r in out_rows]
span_s = float(out_rows[-1]["timestamp"]) if out_rows else 0 span_s = float(out_rows[-1]["timestamp"]) if out_rows else 0
qps = n_requests / span_s if span_s > 0 else 0
session_starts = {} session_starts = {}
for r in out_rows: for r in out_rows:
sid = r["session_id"] sid = r["session_id"]
ts = float(r["timestamp"]) ts = float(r["timestamp"])
if sid not in session_starts: if sid not in session_starts:
session_starts[sid] = ts session_starts[sid] = ts
starts_sorted = sorted(session_starts.values())
deltas = [starts_sorted[i+1] - starts_sorted[i]
for i in range(len(starts_sorted) - 1)]
# hash_ids overlap: count unique hash_ids across all requests # hash_ids overlap
all_hashes = set() all_hashes = set()
for r in out_rows: for r in out_rows:
all_hashes.update(r.get("hash_ids", [])) all_hashes.update(r.get("hash_ids", []))
@@ -149,12 +151,8 @@ def print_summary(
print(f" Output length: min={min(output_lens)} max={max(output_lens)} " print(f" Output length: min={min(output_lens)} max={max(output_lens)} "
f"avg={sum(output_lens)/len(output_lens):.0f}") f"avg={sum(output_lens)/len(output_lens):.0f}")
print(f" Trace span: {span_s:.1f}s ({span_s/60:.1f} min)") print(f" Trace span: {span_s:.1f}s ({span_s/60:.1f} min)")
print(f" QPS: {qps:.2f} req/s")
print(f" Unique hash blocks: {len(all_hashes)}") print(f" Unique hash blocks: {len(all_hashes)}")
if deltas:
deltas.sort()
p = lambda q: deltas[min(int(q * len(deltas)), len(deltas) - 1)]
print(f" Session arrival deltas (s): p10={p(0.1):.2f} p50={p(0.5):.2f} "
f"p90={p(0.9):.2f} max={max(deltas):.2f}")
def main() -> None: def main() -> None:
@@ -164,15 +162,16 @@ def main() -> None:
help="Path to the full trace JSONL file") help="Path to the full trace JSONL file")
p.add_argument("--output", type=Path, required=True, p.add_argument("--output", type=Path, required=True,
help="Path to write sampled trace JSONL") help="Path to write sampled trace JSONL")
p.add_argument("--target-requests", type=int, default=5000, p.add_argument("--sample-ratio", type=float, default=None,
help="Target number of requests (stops after session that crosses it)") help="Fraction of sessions to sample (e.g. 0.016 for 8/500 GPU ratio)")
p.add_argument("--strategy", choices=["random", "sequential"], default="random", p.add_argument("--target-requests", type=int, default=None,
help="Session selection strategy") help="Target number of requests (legacy, stops after session that crosses it)")
p.add_argument("--time-scale", type=float, default=1.0,
help="Compress time axis by this factor (>1 = faster arrival)")
p.add_argument("--seed", type=int, default=42) p.add_argument("--seed", type=int, default=42)
args = p.parse_args() args = p.parse_args()
if args.sample_ratio is None and args.target_requests is None:
p.error("Must specify --sample-ratio or --target-requests")
print(f"Loading trace from {args.input} ...") print(f"Loading trace from {args.input} ...")
rows_by_session = load_raw_rows(args.input) rows_by_session = load_raw_rows(args.input)
total_sessions = len(rows_by_session) total_sessions = len(rows_by_session)
@@ -181,15 +180,12 @@ def main() -> None:
selected = sample_sessions( selected = sample_sessions(
rows_by_session, rows_by_session,
sample_ratio=args.sample_ratio,
target_requests=args.target_requests, target_requests=args.target_requests,
seed=args.seed, seed=args.seed,
strategy=args.strategy,
) )
out_rows = build_output( out_rows = build_output(rows_by_session, selected)
rows_by_session, selected,
time_scale=args.time_scale,
)
print_summary(rows_by_session, selected, out_rows) print_summary(rows_by_session, selected, out_rows)