High-concurrency test (512 input, 64 output, rates 4-32 req/s): Rate=8: plain TTFT p90=94ms, mooncake_both=102ms → +9% tax Rate=16: plain TTFT p90=144ms, mooncake_both=156ms → +8% tax Rate=32: both saturated at ~6.1s → no distinguishable difference Low-concurrency back-to-back retest (4096 input, 256 output): mooncake_both_v2 vs plain_v2: tax is ≈0% (within noise) because scheduler's 1.4ms/step is hidden behind model forward. Decomposition of trace-replay's +45%: +7-9% from build_connector_meta per-step cost (this microbench) +20-30% from multi-instance coupling amplification (not measurable here) remainder from large-cache O(|cache|) scaling (Phase B follow-up) Also: bench_loop.py now emits mean/p50/p90/p99 for all three metrics.
355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""Open-loop fixed-rate loadgen for connector_tax microbench.
|
||
|
||
Sends requests at a Poisson-arrival rate (rate_target req/s) of fixed
|
||
(input_tokens, output_tokens) shape with random content. Each cell runs
|
||
until BOTH the duration floor AND min-completed thresholds are met.
|
||
|
||
Per-request metrics are appended to a JSONL file.
|
||
|
||
Usage:
|
||
bench_loop.py \
|
||
--url http://127.0.0.1:8000/v1/chat/completions \
|
||
--model /path/to/model \
|
||
--rates 0.5,1,2,4,8 \
|
||
--shape 4096,256 \
|
||
--duration 60 \
|
||
--min-completed 200 \
|
||
--warmup 10 \
|
||
--output-dir results/<run> \
|
||
--phase A
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import hashlib
|
||
import json
|
||
import random
|
||
import statistics
|
||
import time
|
||
import uuid
|
||
from dataclasses import asdict, dataclass, field
|
||
from pathlib import Path
|
||
|
||
import httpx
|
||
|
||
|
||
@dataclass
|
||
class ReqMetric:
|
||
req_id: str
|
||
rate_target: float
|
||
input_tokens_target: int
|
||
output_tokens_target: int
|
||
t_send_ns: int
|
||
t_first_token_ns: int | None = None
|
||
t_last_token_ns: int | None = None
|
||
prompt_tokens: int = 0
|
||
completion_tokens: int = 0
|
||
inflight_at_send: int = 0
|
||
error: str | None = None
|
||
|
||
|
||
# ─── content generation ────────────────────────────────────────────────────
|
||
def make_random_prompt(target_tokens: int) -> str:
|
||
"""Generate a prompt that tokenizes to roughly target_tokens.
|
||
|
||
Calibration (Qwen3-Coder tokenizer): "Block N: <32-hex>" ≈ 35 tokens.
|
||
"""
|
||
n_parts = max(1, target_tokens // 35)
|
||
seed = uuid.uuid4().hex
|
||
parts = []
|
||
for i in range(n_parts):
|
||
h = hashlib.md5(f"{seed}_{i}_{time.time_ns()}".encode()).hexdigest()
|
||
parts.append(f"Block {i}: {h}")
|
||
return " ".join(parts)
|
||
|
||
|
||
# ─── single request worker ─────────────────────────────────────────────────
|
||
async def send_one(client: httpx.AsyncClient, url: str, model: str,
|
||
inp_tokens: int, out_tokens: int,
|
||
rate: float, inflight: list[int],
|
||
inflight_cap: int) -> ReqMetric:
|
||
rid = uuid.uuid4().hex[:16]
|
||
|
||
if inflight[0] >= inflight_cap:
|
||
# Drop with logged metric
|
||
return ReqMetric(
|
||
req_id=rid, rate_target=rate,
|
||
input_tokens_target=inp_tokens, output_tokens_target=out_tokens,
|
||
t_send_ns=time.perf_counter_ns(),
|
||
inflight_at_send=inflight[0],
|
||
error="dropped_due_to_inflight_cap",
|
||
)
|
||
|
||
inflight[0] += 1
|
||
m = ReqMetric(
|
||
req_id=rid, rate_target=rate,
|
||
input_tokens_target=inp_tokens, output_tokens_target=out_tokens,
|
||
t_send_ns=time.perf_counter_ns(),
|
||
inflight_at_send=inflight[0],
|
||
)
|
||
|
||
try:
|
||
prompt = make_random_prompt(inp_tokens)
|
||
payload = {
|
||
"model": model,
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
"max_tokens": out_tokens,
|
||
"min_tokens": out_tokens,
|
||
"temperature": 0,
|
||
"ignore_eos": True,
|
||
"stream": True,
|
||
"stream_options": {"include_usage": True},
|
||
}
|
||
async with client.stream("POST", url, json=payload, timeout=600.0) as resp:
|
||
resp.raise_for_status()
|
||
async for line in resp.aiter_lines():
|
||
if not line.startswith("data: "):
|
||
continue
|
||
data = line[6:]
|
||
if data.strip() == "[DONE]":
|
||
break
|
||
try:
|
||
chunk = json.loads(data)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
# Capture usage from any chunk (it may arrive in a trailing
|
||
# chunk with empty `choices`).
|
||
usage = chunk.get("usage")
|
||
if usage:
|
||
m.prompt_tokens = usage.get("prompt_tokens", m.prompt_tokens)
|
||
m.completion_tokens = usage.get(
|
||
"completion_tokens", m.completion_tokens)
|
||
choices = chunk.get("choices") or []
|
||
if not choices:
|
||
continue
|
||
delta = choices[0].get("delta", {})
|
||
if "role" in delta:
|
||
continue
|
||
now = time.perf_counter_ns()
|
||
if m.t_first_token_ns is None:
|
||
m.t_first_token_ns = now
|
||
m.t_last_token_ns = now
|
||
except Exception as e:
|
||
m.error = f"{type(e).__name__}: {e}"
|
||
finally:
|
||
inflight[0] -= 1
|
||
|
||
return m
|
||
|
||
|
||
# ─── per-cell driver (one rate × shape) ────────────────────────────────────
|
||
async def run_cell(
|
||
client: httpx.AsyncClient,
|
||
url: str,
|
||
model: str,
|
||
rate: float,
|
||
inp_tokens: int,
|
||
out_tokens: int,
|
||
duration_floor_s: float,
|
||
min_completed: int,
|
||
warmup_s: float,
|
||
inflight_cap: int,
|
||
out_path: Path,
|
||
) -> dict:
|
||
"""Run one (rate, shape) cell. Streams per-request JSONL to out_path.
|
||
Returns aggregated summary."""
|
||
|
||
inflight = [0]
|
||
metrics: list[ReqMetric] = []
|
||
pending_tasks: list[asyncio.Task] = []
|
||
t_start_ns = time.perf_counter_ns()
|
||
cell_start = time.perf_counter()
|
||
|
||
print(f" [cell] rate={rate} shape=({inp_tokens},{out_tokens}) "
|
||
f"floor={duration_floor_s}s min_completed={min_completed}")
|
||
|
||
interval_mean = 1.0 / rate
|
||
rng = random.Random(int(time.time_ns()) & 0xFFFFFFFF)
|
||
|
||
fh = open(out_path, "a", buffering=1)
|
||
completed_count = 0
|
||
|
||
def reap_one(t: asyncio.Task) -> None:
|
||
nonlocal completed_count
|
||
try:
|
||
m = t.result()
|
||
except Exception:
|
||
return
|
||
metrics.append(m)
|
||
fh.write(json.dumps(asdict(m)) + "\n")
|
||
completed_count += 1
|
||
|
||
async def submit():
|
||
while True:
|
||
elapsed = time.perf_counter() - cell_start
|
||
if elapsed >= duration_floor_s and completed_count >= min_completed:
|
||
return
|
||
task = asyncio.create_task(
|
||
send_one(client, url, model, inp_tokens, out_tokens,
|
||
rate, inflight, inflight_cap)
|
||
)
|
||
pending_tasks.append(task)
|
||
await asyncio.sleep(rng.expovariate(1.0 / interval_mean))
|
||
|
||
submitter = asyncio.create_task(submit())
|
||
|
||
async def drain_periodic():
|
||
while not submitter.done():
|
||
keep = []
|
||
for t in pending_tasks:
|
||
if t.done():
|
||
reap_one(t)
|
||
else:
|
||
keep.append(t)
|
||
pending_tasks[:] = keep
|
||
await asyncio.sleep(0.1)
|
||
|
||
drainer = asyncio.create_task(drain_periodic())
|
||
await submitter
|
||
drainer.cancel()
|
||
try:
|
||
await drainer
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# Final drain: wait for all remaining inflight to complete and write them
|
||
if pending_tasks:
|
||
await asyncio.gather(*pending_tasks, return_exceptions=True)
|
||
for t in pending_tasks:
|
||
if t.done():
|
||
reap_one(t)
|
||
pending_tasks.clear()
|
||
fh.close()
|
||
|
||
# Discard warmup window (first warmup_s seconds of completions)
|
||
warmup_cutoff_ns = t_start_ns + int(warmup_s * 1e9)
|
||
after = [m for m in metrics if m.t_send_ns > warmup_cutoff_ns and m.error is None
|
||
and m.t_first_token_ns and m.t_last_token_ns]
|
||
|
||
def pct(xs, p):
|
||
if not xs:
|
||
return None
|
||
xs = sorted(xs)
|
||
k = max(0, min(len(xs) - 1, int(p / 100.0 * (len(xs) - 1))))
|
||
return xs[k]
|
||
|
||
ttft = [(m.t_first_token_ns - m.t_send_ns) / 1e6 for m in after]
|
||
tpot = []
|
||
for m in after:
|
||
if m.completion_tokens > 1 and m.t_last_token_ns and m.t_first_token_ns:
|
||
tpot.append((m.t_last_token_ns - m.t_first_token_ns) / 1e6
|
||
/ max(1, m.completion_tokens - 1))
|
||
e2e = [(m.t_last_token_ns - m.t_send_ns) / 1e6 for m in after]
|
||
inflight_seq = [m.inflight_at_send for m in after]
|
||
elapsed_s = (time.perf_counter_ns() - t_start_ns) / 1e9
|
||
|
||
summary = {
|
||
"rate_target": rate,
|
||
"input_tokens": inp_tokens,
|
||
"output_tokens": out_tokens,
|
||
"duration_actual_s": elapsed_s,
|
||
"n_completed_total": len(metrics),
|
||
"n_after_warmup": len(after),
|
||
"n_dropped": sum(1 for m in metrics if m.error == "dropped_due_to_inflight_cap"),
|
||
"n_errors": sum(1 for m in metrics if m.error and m.error != "dropped_due_to_inflight_cap"),
|
||
"ttft_ms_mean": sum(ttft) / len(ttft) if ttft else None,
|
||
"ttft_ms_p50": pct(ttft, 50),
|
||
"ttft_ms_p90": pct(ttft, 90),
|
||
"ttft_ms_p99": pct(ttft, 99),
|
||
"tpot_ms_mean": sum(tpot) / len(tpot) if tpot else None,
|
||
"tpot_ms_p50": pct(tpot, 50),
|
||
"tpot_ms_p90": pct(tpot, 90),
|
||
"tpot_ms_p99": pct(tpot, 99),
|
||
"e2e_ms_mean": sum(e2e) / len(e2e) if e2e else None,
|
||
"e2e_ms_p50": pct(e2e, 50),
|
||
"e2e_ms_p90": pct(e2e, 90),
|
||
"e2e_ms_p99": pct(e2e, 99),
|
||
"throughput_effective_rps": len(after) / max(1.0, elapsed_s - warmup_s),
|
||
"throughput_ratio": (len(after) / max(1.0, elapsed_s - warmup_s)) / rate,
|
||
"inflight_p50": pct(inflight_seq, 50),
|
||
"inflight_p90": pct(inflight_seq, 90),
|
||
}
|
||
print(f" completed={len(after)} ttft_p90={summary['ttft_ms_p90']} "
|
||
f"tpot_p90={summary['tpot_ms_p90']} thr_ratio={summary['throughput_ratio']:.2f}")
|
||
return summary
|
||
|
||
|
||
async def main_async(args):
|
||
out_dir = Path(args.output_dir)
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
rates = [float(x) for x in args.rates.split(",")] if args.rates else [args.rate]
|
||
shapes = []
|
||
if args.shape:
|
||
ip, op = args.shape.split(",")
|
||
shapes = [(int(ip), int(op))]
|
||
elif args.shapes:
|
||
for s in args.shapes.split(","):
|
||
ip, op = s.split("x")
|
||
shapes.append((int(ip), int(op)))
|
||
|
||
summaries = []
|
||
timeout = httpx.Timeout(600.0)
|
||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||
for rate in rates:
|
||
for inp, out in shapes:
|
||
cell_label = f"{args.phase}_r{rate}_{inp}x{out}"
|
||
req_path = out_dir / f"requests_{cell_label}.jsonl"
|
||
|
||
# min_completed-driven duration floor
|
||
min_floor_for_rate = max(1, int(args.min_completed / rate)) if rate > 0 else args.duration
|
||
floor = max(args.duration, min_floor_for_rate)
|
||
|
||
summary = await run_cell(
|
||
client, args.url, args.model, rate, inp, out,
|
||
duration_floor_s=floor,
|
||
min_completed=args.min_completed,
|
||
warmup_s=args.warmup,
|
||
inflight_cap=args.inflight_cap,
|
||
out_path=req_path,
|
||
)
|
||
summary["phase"] = args.phase
|
||
summary["cell"] = cell_label
|
||
summaries.append(summary)
|
||
|
||
# Cooldown between cells (let queue drain)
|
||
await asyncio.sleep(3.0)
|
||
|
||
# Persist summary
|
||
with open(out_dir / f"summary_{args.phase}.json", "w") as f:
|
||
json.dump(summaries, f, indent=2)
|
||
print(f"\nWrote {len(summaries)} cell summaries.")
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--url", required=True,
|
||
help="vLLM /v1/chat/completions URL")
|
||
ap.add_argument("--model", required=True)
|
||
ap.add_argument("--phase", default="A")
|
||
# Rate spec — either --rates a,b,c (Phase A) or --rate r (Phase B)
|
||
ap.add_argument("--rates", default="")
|
||
ap.add_argument("--rate", type=float, default=4.0)
|
||
# Shape spec — either --shape ip,op (Phase A) or --shapes IPxOP,IPxOP,... (Phase B)
|
||
ap.add_argument("--shape", default="")
|
||
ap.add_argument("--shapes", default="")
|
||
ap.add_argument("--duration", type=float, default=60.0,
|
||
help="Cell duration floor (seconds)")
|
||
ap.add_argument("--min-completed", type=int, default=200)
|
||
ap.add_argument("--warmup", type=float, default=10.0)
|
||
ap.add_argument("--inflight-cap", type=int, default=256)
|
||
ap.add_argument("--output-dir", required=True)
|
||
args = ap.parse_args()
|
||
|
||
if not (args.rates or args.rate):
|
||
ap.error("Provide --rates or --rate")
|
||
if not (args.shape or args.shapes):
|
||
ap.error("Provide --shape or --shapes")
|
||
|
||
asyncio.run(main_async(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|