Files
agentic-kvc/microbench/fresh_setup/mb6_transfer_under_load.py
Gahow Wang 1262c9c22e Migration transfer-cost study: KV transfer is slow on busy GPUs
MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at
~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into
~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s)
+ ~45% control-plane GIL starvation during long prefills. Reproduced on a
fresh upstream venv (byte-identical transfer path) -> upstream/hardware
inherent, not our patch. Layerwise is the wrong lever; the tax is structural
on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6,
instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-29 11:53:01 +08:00

262 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""MB6: KV-transfer bandwidth vs instance busy-ness.
Confirms the causal hypothesis from the v3 breakdown: the migration
transfer runs far below wire speed because it happens between instances
that are concurrently busy with compute (GIL-starved control plane +
HBM/NIC contention), NOT because of a wire/NIC limit.
Method (reuses the MB2 transfer primitive):
prefill on A (do_remote_decode, max_tokens=1) -> migrate to B
(do_remote_prefill). Time step 2 = the KV transfer.
For each background-load level B in --bg-loads, we hold B concurrent
long-decode streams on BOTH instances to keep them busy, then run
--repeats measured transfers per size. With the MB2 mooncake instrument
applied (MB2_LOG_DIR set), the analyzer can split the e2e transfer into
RDMA-actual (`send_blocks`) vs control-plane.
Expected: bg=0 reproduces MB2 (~10 GB/s); higher bg degrades toward the
~2-3 GB/s seen in the v3 trace.
Usage:
python mb6_transfer_under_load.py \
--src-port 8000 --dst-port 8001 --src-bp 8998 --dst-bp 8999 \
--sizes 16384,65536 --bg-loads 0,8,24 --repeats 4 \
--out mb6_result.json
"""
from __future__ import annotations
import argparse
import asyncio
import json
import statistics
import time
import uuid
from pathlib import Path
import httpx
MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct"
KV_PER_TOK = 98304 # Qwen3-30B-A3B est bytes/token
def synth_prompt(seed: int, n: int) -> list[int]:
import random
rng = random.Random(seed)
return [rng.randint(100, 150000) for _ in range(n)]
async def get_engine_id(client, host, bp):
r = await client.get(f"http://{host}:{bp}/query")
r.raise_for_status()
return r.json()["0"]["engine_id"]
async def completion(client, host, port, prompt, max_tokens, ktp=None, stream=False):
payload = {
"model": MODEL_PATH, "prompt": prompt, "max_tokens": max_tokens,
"min_tokens": max_tokens if max_tokens == 1 else 1,
"temperature": 0.0, "stream": stream,
}
if ktp:
payload["kv_transfer_params"] = ktp
t0 = time.perf_counter()
if stream:
# consume the stream to keep the instance decoding
async with client.stream("POST", f"http://{host}:{port}/v1/completions",
json=payload, timeout=600.0) as r:
r.raise_for_status()
async for _ in r.aiter_bytes():
pass
return time.perf_counter() - t0, {}
r = await client.post(f"http://{host}:{port}/v1/completions",
json=payload, timeout=600.0)
elapsed = time.perf_counter() - t0
r.raise_for_status()
return elapsed, r.json()
async def num_running(client, host, port) -> int:
"""Read vLLM running-request gauge from /metrics."""
try:
r = await client.get(f"http://{host}:{port}/metrics", timeout=5.0)
for line in r.text.splitlines():
if line.startswith("vllm:num_requests_running"):
return int(float(line.split()[-1]))
except Exception:
pass
return -1
class BackgroundLoad:
"""Maintain N concurrent long-decode streams on a set of (host,port)."""
def __init__(self, client, endpoints, concurrency, prompt_tokens=2000,
out_tokens=6000):
self.client = client
self.endpoints = endpoints
self.concurrency = concurrency
self.prompt_tokens = prompt_tokens
self.out_tokens = out_tokens
self._stop = asyncio.Event()
self._tasks: list[asyncio.Task] = []
async def _worker(self, idx):
host, port = self.endpoints[idx % len(self.endpoints)]
seed = 900000 + idx
while not self._stop.is_set():
prompt = synth_prompt(seed, self.prompt_tokens)
seed += 1
try:
await completion(self.client, host, port, prompt,
max_tokens=self.out_tokens, stream=True)
except Exception:
await asyncio.sleep(0.5)
def start(self):
self._tasks = [asyncio.create_task(self._worker(i))
for i in range(self.concurrency)]
async def stop(self):
self._stop.set()
for t in self._tasks:
t.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks = []
async def measure_transfer(client, src_host, src_port, dst_host, dst_port,
src_eid, src_bootstrap_addr, input_tokens, seed):
prompt = synth_prompt(seed, input_tokens)
transfer_id = uuid.uuid4().hex
# step 1: prefill on A
await completion(client, src_host, src_port, prompt, max_tokens=1,
ktp={"do_remote_decode": True, "transfer_id": transfer_id})
# step 2: migrate to B (this is the timed transfer)
t_start_unix = time.time()
t_xfer, _ = await completion(
client, dst_host, dst_port, prompt, max_tokens=1,
ktp={"do_remote_prefill": True, "transfer_id": transfer_id,
"remote_engine_id": src_eid,
"remote_bootstrap_addr": src_bootstrap_addr})
return {
"input_tokens": input_tokens,
"t_transfer_s": t_xfer,
"t_step2_start_unix": t_start_unix,
"t_step2_end_unix": time.time(),
"kv_bytes": input_tokens * KV_PER_TOK,
"eff_gbps": input_tokens * KV_PER_TOK / 1e9 / t_xfer if t_xfer > 0 else 0,
}
async def main_async(a):
sizes = [int(s) for s in a.sizes.split(",")]
bg_loads = [int(s) for s in a.bg_loads.split(",")]
src_host, dst_host = a.src_host, a.dst_host
limits = httpx.Limits(max_connections=256, max_keepalive_connections=256)
async with httpx.AsyncClient(limits=limits, trust_env=False) as client:
src_eid = await get_engine_id(client, src_host, a.src_bp)
src_bootstrap_addr = f"http://{src_host}:{a.src_bp}"
print(f"[mb6] src eid={src_eid[:16]}... endpoints A={src_host}:{a.src_port} "
f"B={dst_host}:{a.dst_port}")
endpoints = [(src_host, a.src_port), (dst_host, a.dst_port)]
results = []
for bg in bg_loads:
loader = None
if bg > 0:
loader = BackgroundLoad(client, endpoints, bg,
prompt_tokens=a.bg_prompt,
out_tokens=a.bg_out)
loader.start()
# wait for instances to actually be busy
print(f"[mb6] bg={bg}: ramping background load ...")
for _ in range(40):
await asyncio.sleep(1.0)
na = await num_running(client, src_host, a.src_port)
nb = await num_running(client, dst_host, a.dst_port)
if na >= 1 and nb >= 1:
print(f"[mb6] bg={bg}: busy (A running={na} B running={nb})")
break
else:
print(f"[mb6] bg=0: idle baseline")
# ensure idle
await asyncio.sleep(2.0)
for sz in sizes:
for rep in range(a.repeats):
na = await num_running(client, src_host, a.src_port)
nb = await num_running(client, dst_host, a.dst_port)
row = await measure_transfer(
client, src_host, a.src_port, dst_host, a.dst_port,
src_eid, src_bootstrap_addr, sz, seed=sz * 100 + rep + bg * 7)
row["bg_load"] = bg
row["A_running_at_measure"] = na
row["B_running_at_measure"] = nb
results.append(row)
kv_mib = sz * KV_PER_TOK / 2**20
print(f" bg={bg:>3} size={sz:>6} ({kv_mib:6.0f}MiB) rep={rep} "
f"A_run={na:>2} B_run={nb:>2} "
f"transfer={row['t_transfer_s']*1000:7.0f}ms "
f"eff={row['eff_gbps']:5.2f}GB/s")
if loader:
await loader.stop()
# let the instances drain before next bg level
print(f"[mb6] bg={bg}: draining ...")
for _ in range(60):
await asyncio.sleep(1.0)
na = await num_running(client, src_host, a.src_port)
nb = await num_running(client, dst_host, a.dst_port)
if na <= 0 and nb <= 0:
break
# summary per (bg, size)
print("\n=== summary: effective transfer bandwidth vs background load ===")
print(f"{'bg':>4} {'size':>7} {'n':>3} {'xfer_p50_ms':>12} {'eff_p50_GBps':>13} "
f"{'eff_mean':>9}")
summary = []
for bg in bg_loads:
for sz in sizes:
rs = [r for r in results if r["bg_load"] == bg and r["input_tokens"] == sz]
if not rs:
continue
xfer = sorted(r["t_transfer_s"] for r in rs)
eff = sorted(r["eff_gbps"] for r in rs)
p50x = xfer[len(xfer) // 2]
p50e = eff[len(eff) // 2]
meane = statistics.mean(eff)
summary.append({"bg": bg, "size": sz, "n": len(rs),
"xfer_p50_ms": p50x * 1000,
"eff_p50_gbps": p50e, "eff_mean_gbps": meane})
print(f"{bg:>4} {sz:>7} {len(rs):>3} {p50x*1000:>12.0f} "
f"{p50e:>13.2f} {meane:>9.2f}")
Path(a.out).write_text(json.dumps(
{"model": MODEL_PATH, "kv_bytes_per_token": KV_PER_TOK,
"label": a.label, "raw": results, "summary": summary}, indent=2))
print(f"\n[mb6] wrote {a.out}")
def main():
p = argparse.ArgumentParser()
p.add_argument("--src-host", default="127.0.0.1")
p.add_argument("--dst-host", default="127.0.0.1")
p.add_argument("--src-port", type=int, default=8000)
p.add_argument("--dst-port", type=int, default=8001)
p.add_argument("--src-bp", type=int, default=8998)
p.add_argument("--dst-bp", type=int, default=8999)
p.add_argument("--sizes", default="16384,65536")
p.add_argument("--bg-loads", default="0,8,24")
p.add_argument("--repeats", type=int, default=4)
p.add_argument("--bg-prompt", type=int, default=2000)
p.add_argument("--bg-out", type=int, default=6000)
p.add_argument("--label", default="main-venv")
p.add_argument("--out", default="mb6_result.json")
args = p.parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()