From e23128ad658a965d67a3bba871088491a9fe65ef Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Mon, 25 May 2026 17:54:51 +0800 Subject: [PATCH] B2: PD-colo interference microbench harness + sweep aggregator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/b2_interference.py is the controlled microbench. It runs two coroutines against the open proxy bypass (direct vLLM endpoints): - decode_load: continuous short-prompt requests at fixed QPS into a designated decode instance, to keep it decode-saturated. - prefill_injections: N large one-token requests at fixed interval, pointed at either the same instance (same-worker variant) or a paired one (different-worker control). Each cell (variant × prefill_size) gets its own metrics.jsonl plus a run_window.json containing t_start_unix/t_end_unix. The shared engine_*.jsonl from the scheduler patch is sliced by that window in the aggregator. analysis/characterization/b2_sweep_analysis.py walks the cell tree, slices the per-worker step log by each cell's window, runs the A5 interference_index() against the slice, and emits a single b2_sweep_summary.json with one row per cell. This is what feeds the "interference vs uncached prefill size" figure. Co-Authored-By: Claude Opus 4.7 --- .../characterization/b2_sweep_analysis.py | 117 ++++++++ scripts/b2_interference.py | 278 ++++++++++++++++++ 2 files changed, 395 insertions(+) create mode 100644 analysis/characterization/b2_sweep_analysis.py create mode 100644 scripts/b2_interference.py diff --git a/analysis/characterization/b2_sweep_analysis.py b/analysis/characterization/b2_sweep_analysis.py new file mode 100644 index 0000000..0acc6bb --- /dev/null +++ b/analysis/characterization/b2_sweep_analysis.py @@ -0,0 +1,117 @@ +"""Aggregate B2 microbench cells into a single interference-index sweep summary. + +Per cell (variant × prefill_size): +- read metrics.jsonl + run_window.json +- slice the shared engine_*.jsonl by run window +- run interference_index() against the slice +- record (variant, prefill_size, n_overlap, n_clean, tpot_p90_*, idx) +""" + +from __future__ import annotations + +import argparse +import json +from collections import defaultdict +from pathlib import Path +from typing import Any + +from analysis.characterization.joined_analysis import ( + _percentile, + _vllm_rid_matches, + interference_index, + load_engine_state, + load_jsonl, + write_json, +) + + +def _slice_engine_state( + engine_state_by_worker: dict[str, list[dict]], + t_start: float, + t_end: float, +) -> dict[str, list[dict]]: + sliced: dict[str, list[dict]] = {} + for worker, steps in engine_state_by_worker.items(): + sliced[worker] = [s for s in steps + if t_start <= (s.get("t_unix") or 0.0) <= t_end] + return sliced + + +def _to_joined_shape(metrics_rows: list[dict], variant: str) -> list[dict]: + """Adapt B2 metric rows to what interference_index expects.""" + joined: list[dict] = [] + for r in metrics_rows: + if r.get("workload") != "decode": + continue + joined.append({ + "request_id": r["request_id"], + "tpot_s": r.get("tpot_s"), + "ttft_s": r.get("ttft_s"), + "latency_s": r.get("latency_s"), + "endpoint_url": r.get("endpoint"), + "routed_to": r.get("endpoint"), + "t_first_token_unix": ( + (r["t_dispatch_unix"] + r["ttft_s"]) + if r.get("ttft_s") is not None + and r.get("t_dispatch_unix") is not None else None + ), + "t_finish_unix": r.get("t_finish_unix"), + "error": r.get("error"), + }) + return joined + + +def main() -> None: + p = argparse.ArgumentParser(description="B2 sweep aggregation") + p.add_argument("--sweep-dir", type=Path, required=True, + help="Top-level dir produced by scripts/b2_interference.py") + p.add_argument("--engine-state-dir", type=Path, required=True) + p.add_argument("--worker-map", type=str, required=True, + help="URL=worker_id pairs, comma-separated") + p.add_argument("--out", type=Path, default=None) + args = p.parse_args() + + worker_map = {} + for entry in args.worker_map.split(","): + url, _, wid = entry.strip().partition("=") + if url and wid: + worker_map[url] = wid + + engine_state = load_engine_state(args.engine_state_dir) + rows: list[dict] = [] + for variant_dir in sorted(args.sweep_dir.glob("*/")): + if variant_dir.name in ("logs",): + continue + for cell_dir in sorted(variant_dir.glob("p*/")): + window_path = cell_dir / "run_window.json" + metrics_path = cell_dir / "metrics.jsonl" + if not window_path.exists() or not metrics_path.exists(): + continue + window = json.loads(window_path.read_text()) + metrics_rows = load_jsonl(metrics_path) + joined = _to_joined_shape(metrics_rows, variant_dir.name) + sliced = _slice_engine_state( + engine_state, window["t_start_unix"], window["t_end_unix"], + ) + idx = interference_index(joined, sliced, worker_map) + rows.append({ + "variant": variant_dir.name, + "prefill_size": int(window["prefill_size"]), + "decode_endpoint": window["decode_endpoint"], + "prefill_endpoint": window["prefill_endpoint"], + "n_decode_requests": sum(1 for r in metrics_rows + if r.get("workload") == "decode" + and r.get("error") is None), + "n_prefill_injections": sum(1 for r in metrics_rows + if r.get("workload") == "prefill" + and r.get("error") is None), + **idx, + }) + summary = {"rows": rows} + out_path = args.out or args.sweep_dir / "b2_sweep_summary.json" + write_json(out_path, summary) + print(json.dumps(rows, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/scripts/b2_interference.py b/scripts/b2_interference.py new file mode 100644 index 0000000..b48c8de --- /dev/null +++ b/scripts/b2_interference.py @@ -0,0 +1,278 @@ +"""B2 PD-colo interference microbench. + +Concurrently issues: +- a steady stream of short-prompt decode-heavy requests to a designated + "decode" instance; +- a periodic single large-prompt one-token request that hits either the + same instance (same-worker) or a separate one (different-worker). + +The harness writes per-request metrics with a `workload` tag +({"decode", "prefill"}) and a `variant` tag (same/different) plus +unix timestamps so the analyzer can label same-worker overlap directly +from the engine step JSONL. + +Outputs under ///: +- metrics.jsonl — per-request rows for this cell +- run_window.json — t_start_unix/t_end_unix for analyzer slice +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import logging +import random +import time +from pathlib import Path + +import httpx + +logger = logging.getLogger(__name__) + + +async def _send( + client: httpx.AsyncClient, + endpoint: str, + model: str, + prompt_ids: list[int], + max_tokens: int, + *, + workload: str, + variant: str, + prefill_size: int, + out_fh, + fh_lock: asyncio.Lock, + idx: int, +) -> None: + payload = { + "model": model, + "prompt": prompt_ids, + "max_tokens": max_tokens, + "min_tokens": max_tokens, + "temperature": 0, + "stream": True, + "stream_options": {"include_usage": True}, + } + rid = f"{workload}_{variant}_p{prefill_size}_{idx}" + t_dispatch = time.time() + ttft = None + finish = None + n_output = 0 + err = None + token_times: list[float] = [] + try: + async with client.stream( + "POST", f"{endpoint}/v1/completions", json=payload, + headers={"X-Request-Id": rid, "X-Session-Id": rid}, + timeout=600.0, + ) as resp: + resp.raise_for_status() + async for raw_line in resp.aiter_lines(): + if not raw_line or not raw_line.startswith("data:"): + continue + data = raw_line[5:].strip() + if data == "[DONE]": + break + try: + chunk = json.loads(data) + except json.JSONDecodeError: + continue + choices = chunk.get("choices", []) + if choices: + now = time.time() + token_ids = choices[0].get("token_ids") + if isinstance(token_ids, list) and token_ids: + if ttft is None: + ttft = now - t_dispatch + token_times.extend([now] * len(token_ids)) + usage = chunk.get("usage") + if usage: + n_output = usage.get("completion_tokens", n_output) + finish = time.time() + except Exception as exc: + err = repr(exc)[:300] + finish = time.time() + + tpot = None + if len(token_times) > 1: + diffs = [token_times[i + 1] - token_times[i] + for i in range(len(token_times) - 1)] + tpot = sum(diffs) / len(diffs) + + row = { + "request_id": rid, + "workload": workload, + "variant": variant, + "prefill_size": prefill_size, + "endpoint": endpoint, + "input_length": len(prompt_ids), + "max_tokens": max_tokens, + "t_dispatch_unix": t_dispatch, + "t_finish_unix": finish, + "ttft_s": ttft, + "tpot_s": tpot, + "latency_s": (finish - t_dispatch) if finish else None, + "actual_output_tokens": n_output, + "error": err, + } + async with fh_lock: + out_fh.write(json.dumps(row, sort_keys=True) + "\n") + out_fh.flush() + + +async def decode_load( + *, client, endpoint, model, qps, duration_s, + workload, variant, prefill_size, out_fh, fh_lock, + decode_prompt_tokens, decode_output_tokens, rng, +) -> None: + period = 1.0 / qps + end_t = time.time() + duration_s + pending: list[asyncio.Task] = [] + idx = 0 + while time.time() < end_t: + prompt_ids = [rng.randint(1000, 100000) for _ in range(decode_prompt_tokens)] + task = asyncio.create_task(_send( + client, endpoint, model, prompt_ids, decode_output_tokens, + workload="decode", variant=variant, prefill_size=prefill_size, + out_fh=out_fh, fh_lock=fh_lock, idx=idx, + )) + pending.append(task) + idx += 1 + await asyncio.sleep(period) + await asyncio.gather(*pending, return_exceptions=True) + + +async def prefill_injections( + *, client, endpoint, model, prefill_size, n_injections, interval_s, + variant, out_fh, fh_lock, start_delay_s, rng, +) -> None: + await asyncio.sleep(start_delay_s) + for i in range(n_injections): + prompt_ids = [rng.randint(1000, 100000) for _ in range(prefill_size)] + await _send( + client, endpoint, model, prompt_ids, max_tokens=1, + workload="prefill", variant=variant, prefill_size=prefill_size, + out_fh=out_fh, fh_lock=fh_lock, idx=i, + ) + await asyncio.sleep(interval_s) + + +async def run_cell( + *, + decode_endpoint, prefill_endpoint, model, prefill_size, variant, + qps, duration_s, n_injections, injection_interval_s, start_delay_s, + decode_prompt_tokens, decode_output_tokens, + out_dir, +) -> dict: + cell_dir = out_dir / variant / f"p{prefill_size}" + cell_dir.mkdir(parents=True, exist_ok=True) + metrics_path = cell_dir / "metrics.jsonl" + fh_lock = asyncio.Lock() + rng = random.Random(42 + prefill_size + (0 if variant == "same" else 1000)) + + t_start = time.time() + logger.info("[b2] start variant=%s prefill_size=%d", variant, prefill_size) + with metrics_path.open("w", encoding="utf-8") as out_fh: + limits = httpx.Limits(max_connections=2000, max_keepalive_connections=500) + async with httpx.AsyncClient(timeout=600.0, limits=limits) as client: + await asyncio.gather( + decode_load( + client=client, endpoint=decode_endpoint, model=model, + qps=qps, duration_s=duration_s, + workload="decode", variant=variant, + prefill_size=prefill_size, + out_fh=out_fh, fh_lock=fh_lock, + decode_prompt_tokens=decode_prompt_tokens, + decode_output_tokens=decode_output_tokens, + rng=rng, + ), + prefill_injections( + client=client, endpoint=prefill_endpoint, model=model, + prefill_size=prefill_size, n_injections=n_injections, + interval_s=injection_interval_s, variant=variant, + out_fh=out_fh, fh_lock=fh_lock, + start_delay_s=start_delay_s, rng=rng, + ), + ) + t_end = time.time() + window = { + "variant": variant, + "prefill_size": prefill_size, + "decode_endpoint": decode_endpoint, + "prefill_endpoint": prefill_endpoint, + "qps": qps, "duration_s": duration_s, + "n_injections": n_injections, + "injection_interval_s": injection_interval_s, + "decode_prompt_tokens": decode_prompt_tokens, + "decode_output_tokens": decode_output_tokens, + "t_start_unix": t_start, "t_end_unix": t_end, + } + (cell_dir / "run_window.json").write_text(json.dumps(window, indent=2)) + logger.info("[b2] done variant=%s prefill_size=%d wall=%.1fs", + variant, prefill_size, t_end - t_start) + return window + + +async def amain(args: argparse.Namespace) -> None: + sizes = [int(s) for s in args.prefill_sizes.split(",")] + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + overall: list[dict] = [] + for size in sizes: + for variant in args.variants.split(","): + variant = variant.strip() + if variant == "same": + d_ep, p_ep = args.decode_endpoint, args.decode_endpoint + elif variant == "different": + d_ep, p_ep = args.decode_endpoint, args.prefill_endpoint + else: + raise ValueError(f"unknown variant {variant!r}") + w = await run_cell( + decode_endpoint=d_ep, prefill_endpoint=p_ep, + model=args.model, prefill_size=size, variant=variant, + qps=args.decode_qps, duration_s=args.duration_s, + n_injections=args.injections, + injection_interval_s=args.injection_interval_s, + start_delay_s=args.start_delay_s, + decode_prompt_tokens=args.decode_prompt_tokens, + decode_output_tokens=args.decode_output_tokens, + out_dir=out_dir, + ) + overall.append(w) + (out_dir / "sweep_meta.json").write_text(json.dumps(overall, indent=2)) + + +def main() -> None: + p = argparse.ArgumentParser(description="B2 interference microbench") + p.add_argument("--decode-endpoint", required=True, + help="e.g. http://127.0.0.1:8000") + p.add_argument("--prefill-endpoint", required=True, + help="e.g. http://127.0.0.1:8001") + p.add_argument("--model", required=True) + p.add_argument("--out-dir", required=True) + p.add_argument("--prefill-sizes", default="2048,8192,16384,32768,65536") + p.add_argument("--variants", default="different,same", + help="Comma-separated variants in run order") + p.add_argument("--decode-qps", type=float, default=4.0, + help="Decode-load arrival rate (req/s)") + p.add_argument("--duration-s", type=float, default=60.0, + help="Decode-load duration (s) per cell") + p.add_argument("--injections", type=int, default=4, + help="Number of prefill injections per cell") + p.add_argument("--injection-interval-s", type=float, default=12.0) + p.add_argument("--start-delay-s", type=float, default=10.0, + help="Warmup before first prefill injection") + p.add_argument("--decode-prompt-tokens", type=int, default=256) + p.add_argument("--decode-output-tokens", type=int, default=100) + p.add_argument("-v", "--verbose", action="store_true") + args = p.parse_args() + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + asyncio.run(amain(args)) + + +if __name__ == "__main__": + main()