Files
Gahow Wang c8ec73c548 Connector tax: high-concurrency confirms +7-9% tax, resolves trace-replay gap
High-concurrency test (512 input, 64 output, rates 4-32 req/s):
  Rate=8:  plain TTFT p90=94ms, mooncake_both=102ms → +9% tax
  Rate=16: plain TTFT p90=144ms, mooncake_both=156ms → +8% tax
  Rate=32: both saturated at ~6.1s → no distinguishable difference

Low-concurrency back-to-back retest (4096 input, 256 output):
  mooncake_both_v2 vs plain_v2: tax is ≈0% (within noise)
  because scheduler's 1.4ms/step is hidden behind model forward.

Decomposition of trace-replay's +45%:
  +7-9% from build_connector_meta per-step cost (this microbench)
  +20-30% from multi-instance coupling amplification (not measurable here)
  remainder from large-cache O(|cache|) scaling (Phase B follow-up)

Also: bench_loop.py now emits mean/p50/p90/p99 for all three metrics.
2026-05-26 21:00:25 +08:00

355 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Open-loop fixed-rate loadgen for connector_tax microbench.
Sends requests at a Poisson-arrival rate (rate_target req/s) of fixed
(input_tokens, output_tokens) shape with random content. Each cell runs
until BOTH the duration floor AND min-completed thresholds are met.
Per-request metrics are appended to a JSONL file.
Usage:
bench_loop.py \
--url http://127.0.0.1:8000/v1/chat/completions \
--model /path/to/model \
--rates 0.5,1,2,4,8 \
--shape 4096,256 \
--duration 60 \
--min-completed 200 \
--warmup 10 \
--output-dir results/<run> \
--phase A
"""
import argparse
import asyncio
import hashlib
import json
import random
import statistics
import time
import uuid
from dataclasses import asdict, dataclass, field
from pathlib import Path
import httpx
@dataclass
class ReqMetric:
req_id: str
rate_target: float
input_tokens_target: int
output_tokens_target: int
t_send_ns: int
t_first_token_ns: int | None = None
t_last_token_ns: int | None = None
prompt_tokens: int = 0
completion_tokens: int = 0
inflight_at_send: int = 0
error: str | None = None
# ─── content generation ────────────────────────────────────────────────────
def make_random_prompt(target_tokens: int) -> str:
"""Generate a prompt that tokenizes to roughly target_tokens.
Calibration (Qwen3-Coder tokenizer): "Block N: <32-hex>" ≈ 35 tokens.
"""
n_parts = max(1, target_tokens // 35)
seed = uuid.uuid4().hex
parts = []
for i in range(n_parts):
h = hashlib.md5(f"{seed}_{i}_{time.time_ns()}".encode()).hexdigest()
parts.append(f"Block {i}: {h}")
return " ".join(parts)
# ─── single request worker ─────────────────────────────────────────────────
async def send_one(client: httpx.AsyncClient, url: str, model: str,
inp_tokens: int, out_tokens: int,
rate: float, inflight: list[int],
inflight_cap: int) -> ReqMetric:
rid = uuid.uuid4().hex[:16]
if inflight[0] >= inflight_cap:
# Drop with logged metric
return ReqMetric(
req_id=rid, rate_target=rate,
input_tokens_target=inp_tokens, output_tokens_target=out_tokens,
t_send_ns=time.perf_counter_ns(),
inflight_at_send=inflight[0],
error="dropped_due_to_inflight_cap",
)
inflight[0] += 1
m = ReqMetric(
req_id=rid, rate_target=rate,
input_tokens_target=inp_tokens, output_tokens_target=out_tokens,
t_send_ns=time.perf_counter_ns(),
inflight_at_send=inflight[0],
)
try:
prompt = make_random_prompt(inp_tokens)
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": out_tokens,
"min_tokens": out_tokens,
"temperature": 0,
"ignore_eos": True,
"stream": True,
"stream_options": {"include_usage": True},
}
async with client.stream("POST", url, json=payload, timeout=600.0) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:]
if data.strip() == "[DONE]":
break
try:
chunk = json.loads(data)
except json.JSONDecodeError:
continue
# Capture usage from any chunk (it may arrive in a trailing
# chunk with empty `choices`).
usage = chunk.get("usage")
if usage:
m.prompt_tokens = usage.get("prompt_tokens", m.prompt_tokens)
m.completion_tokens = usage.get(
"completion_tokens", m.completion_tokens)
choices = chunk.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta", {})
if "role" in delta:
continue
now = time.perf_counter_ns()
if m.t_first_token_ns is None:
m.t_first_token_ns = now
m.t_last_token_ns = now
except Exception as e:
m.error = f"{type(e).__name__}: {e}"
finally:
inflight[0] -= 1
return m
# ─── per-cell driver (one rate × shape) ────────────────────────────────────
async def run_cell(
client: httpx.AsyncClient,
url: str,
model: str,
rate: float,
inp_tokens: int,
out_tokens: int,
duration_floor_s: float,
min_completed: int,
warmup_s: float,
inflight_cap: int,
out_path: Path,
) -> dict:
"""Run one (rate, shape) cell. Streams per-request JSONL to out_path.
Returns aggregated summary."""
inflight = [0]
metrics: list[ReqMetric] = []
pending_tasks: list[asyncio.Task] = []
t_start_ns = time.perf_counter_ns()
cell_start = time.perf_counter()
print(f" [cell] rate={rate} shape=({inp_tokens},{out_tokens}) "
f"floor={duration_floor_s}s min_completed={min_completed}")
interval_mean = 1.0 / rate
rng = random.Random(int(time.time_ns()) & 0xFFFFFFFF)
fh = open(out_path, "a", buffering=1)
completed_count = 0
def reap_one(t: asyncio.Task) -> None:
nonlocal completed_count
try:
m = t.result()
except Exception:
return
metrics.append(m)
fh.write(json.dumps(asdict(m)) + "\n")
completed_count += 1
async def submit():
while True:
elapsed = time.perf_counter() - cell_start
if elapsed >= duration_floor_s and completed_count >= min_completed:
return
task = asyncio.create_task(
send_one(client, url, model, inp_tokens, out_tokens,
rate, inflight, inflight_cap)
)
pending_tasks.append(task)
await asyncio.sleep(rng.expovariate(1.0 / interval_mean))
submitter = asyncio.create_task(submit())
async def drain_periodic():
while not submitter.done():
keep = []
for t in pending_tasks:
if t.done():
reap_one(t)
else:
keep.append(t)
pending_tasks[:] = keep
await asyncio.sleep(0.1)
drainer = asyncio.create_task(drain_periodic())
await submitter
drainer.cancel()
try:
await drainer
except asyncio.CancelledError:
pass
# Final drain: wait for all remaining inflight to complete and write them
if pending_tasks:
await asyncio.gather(*pending_tasks, return_exceptions=True)
for t in pending_tasks:
if t.done():
reap_one(t)
pending_tasks.clear()
fh.close()
# Discard warmup window (first warmup_s seconds of completions)
warmup_cutoff_ns = t_start_ns + int(warmup_s * 1e9)
after = [m for m in metrics if m.t_send_ns > warmup_cutoff_ns and m.error is None
and m.t_first_token_ns and m.t_last_token_ns]
def pct(xs, p):
if not xs:
return None
xs = sorted(xs)
k = max(0, min(len(xs) - 1, int(p / 100.0 * (len(xs) - 1))))
return xs[k]
ttft = [(m.t_first_token_ns - m.t_send_ns) / 1e6 for m in after]
tpot = []
for m in after:
if m.completion_tokens > 1 and m.t_last_token_ns and m.t_first_token_ns:
tpot.append((m.t_last_token_ns - m.t_first_token_ns) / 1e6
/ max(1, m.completion_tokens - 1))
e2e = [(m.t_last_token_ns - m.t_send_ns) / 1e6 for m in after]
inflight_seq = [m.inflight_at_send for m in after]
elapsed_s = (time.perf_counter_ns() - t_start_ns) / 1e9
summary = {
"rate_target": rate,
"input_tokens": inp_tokens,
"output_tokens": out_tokens,
"duration_actual_s": elapsed_s,
"n_completed_total": len(metrics),
"n_after_warmup": len(after),
"n_dropped": sum(1 for m in metrics if m.error == "dropped_due_to_inflight_cap"),
"n_errors": sum(1 for m in metrics if m.error and m.error != "dropped_due_to_inflight_cap"),
"ttft_ms_mean": sum(ttft) / len(ttft) if ttft else None,
"ttft_ms_p50": pct(ttft, 50),
"ttft_ms_p90": pct(ttft, 90),
"ttft_ms_p99": pct(ttft, 99),
"tpot_ms_mean": sum(tpot) / len(tpot) if tpot else None,
"tpot_ms_p50": pct(tpot, 50),
"tpot_ms_p90": pct(tpot, 90),
"tpot_ms_p99": pct(tpot, 99),
"e2e_ms_mean": sum(e2e) / len(e2e) if e2e else None,
"e2e_ms_p50": pct(e2e, 50),
"e2e_ms_p90": pct(e2e, 90),
"e2e_ms_p99": pct(e2e, 99),
"throughput_effective_rps": len(after) / max(1.0, elapsed_s - warmup_s),
"throughput_ratio": (len(after) / max(1.0, elapsed_s - warmup_s)) / rate,
"inflight_p50": pct(inflight_seq, 50),
"inflight_p90": pct(inflight_seq, 90),
}
print(f" completed={len(after)} ttft_p90={summary['ttft_ms_p90']} "
f"tpot_p90={summary['tpot_ms_p90']} thr_ratio={summary['throughput_ratio']:.2f}")
return summary
async def main_async(args):
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
rates = [float(x) for x in args.rates.split(",")] if args.rates else [args.rate]
shapes = []
if args.shape:
ip, op = args.shape.split(",")
shapes = [(int(ip), int(op))]
elif args.shapes:
for s in args.shapes.split(","):
ip, op = s.split("x")
shapes.append((int(ip), int(op)))
summaries = []
timeout = httpx.Timeout(600.0)
async with httpx.AsyncClient(timeout=timeout) as client:
for rate in rates:
for inp, out in shapes:
cell_label = f"{args.phase}_r{rate}_{inp}x{out}"
req_path = out_dir / f"requests_{cell_label}.jsonl"
# min_completed-driven duration floor
min_floor_for_rate = max(1, int(args.min_completed / rate)) if rate > 0 else args.duration
floor = max(args.duration, min_floor_for_rate)
summary = await run_cell(
client, args.url, args.model, rate, inp, out,
duration_floor_s=floor,
min_completed=args.min_completed,
warmup_s=args.warmup,
inflight_cap=args.inflight_cap,
out_path=req_path,
)
summary["phase"] = args.phase
summary["cell"] = cell_label
summaries.append(summary)
# Cooldown between cells (let queue drain)
await asyncio.sleep(3.0)
# Persist summary
with open(out_dir / f"summary_{args.phase}.json", "w") as f:
json.dump(summaries, f, indent=2)
print(f"\nWrote {len(summaries)} cell summaries.")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--url", required=True,
help="vLLM /v1/chat/completions URL")
ap.add_argument("--model", required=True)
ap.add_argument("--phase", default="A")
# Rate spec — either --rates a,b,c (Phase A) or --rate r (Phase B)
ap.add_argument("--rates", default="")
ap.add_argument("--rate", type=float, default=4.0)
# Shape spec — either --shape ip,op (Phase A) or --shapes IPxOP,IPxOP,... (Phase B)
ap.add_argument("--shape", default="")
ap.add_argument("--shapes", default="")
ap.add_argument("--duration", type=float, default=60.0,
help="Cell duration floor (seconds)")
ap.add_argument("--min-completed", type=int, default=200)
ap.add_argument("--warmup", type=float, default=10.0)
ap.add_argument("--inflight-cap", type=int, default=256)
ap.add_argument("--output-dir", required=True)
args = ap.parse_args()
if not (args.rates or args.rate):
ap.error("Provide --rates or --rate")
if not (args.shape or args.shapes):
ap.error("Provide --shape or --shapes")
asyncio.run(main_async(args))
if __name__ == "__main__":
main()