MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at ~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into ~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s) + ~45% control-plane GIL starvation during long prefills. Reproduced on a fresh upstream venv (byte-identical transfer path) -> upstream/hardware inherent, not our patch. Layerwise is the wrong lever; the tax is structural on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6, instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
262 lines
10 KiB
Python
Executable File
262 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""MB6: KV-transfer bandwidth vs instance busy-ness.
|
|
|
|
Confirms the causal hypothesis from the v3 breakdown: the migration
|
|
transfer runs far below wire speed because it happens between instances
|
|
that are concurrently busy with compute (GIL-starved control plane +
|
|
HBM/NIC contention), NOT because of a wire/NIC limit.
|
|
|
|
Method (reuses the MB2 transfer primitive):
|
|
prefill on A (do_remote_decode, max_tokens=1) -> migrate to B
|
|
(do_remote_prefill). Time step 2 = the KV transfer.
|
|
|
|
For each background-load level B in --bg-loads, we hold B concurrent
|
|
long-decode streams on BOTH instances to keep them busy, then run
|
|
--repeats measured transfers per size. With the MB2 mooncake instrument
|
|
applied (MB2_LOG_DIR set), the analyzer can split the e2e transfer into
|
|
RDMA-actual (`send_blocks`) vs control-plane.
|
|
|
|
Expected: bg=0 reproduces MB2 (~10 GB/s); higher bg degrades toward the
|
|
~2-3 GB/s seen in the v3 trace.
|
|
|
|
Usage:
|
|
python mb6_transfer_under_load.py \
|
|
--src-port 8000 --dst-port 8001 --src-bp 8998 --dst-bp 8999 \
|
|
--sizes 16384,65536 --bg-loads 0,8,24 --repeats 4 \
|
|
--out mb6_result.json
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import statistics
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
MODEL_PATH = "/home/admin/cpfs/wjh/models/Qwen/Qwen3-Coder-30B-A3B-Instruct"
|
|
KV_PER_TOK = 98304 # Qwen3-30B-A3B est bytes/token
|
|
|
|
|
|
def synth_prompt(seed: int, n: int) -> list[int]:
|
|
import random
|
|
rng = random.Random(seed)
|
|
return [rng.randint(100, 150000) for _ in range(n)]
|
|
|
|
|
|
async def get_engine_id(client, host, bp):
|
|
r = await client.get(f"http://{host}:{bp}/query")
|
|
r.raise_for_status()
|
|
return r.json()["0"]["engine_id"]
|
|
|
|
|
|
async def completion(client, host, port, prompt, max_tokens, ktp=None, stream=False):
|
|
payload = {
|
|
"model": MODEL_PATH, "prompt": prompt, "max_tokens": max_tokens,
|
|
"min_tokens": max_tokens if max_tokens == 1 else 1,
|
|
"temperature": 0.0, "stream": stream,
|
|
}
|
|
if ktp:
|
|
payload["kv_transfer_params"] = ktp
|
|
t0 = time.perf_counter()
|
|
if stream:
|
|
# consume the stream to keep the instance decoding
|
|
async with client.stream("POST", f"http://{host}:{port}/v1/completions",
|
|
json=payload, timeout=600.0) as r:
|
|
r.raise_for_status()
|
|
async for _ in r.aiter_bytes():
|
|
pass
|
|
return time.perf_counter() - t0, {}
|
|
r = await client.post(f"http://{host}:{port}/v1/completions",
|
|
json=payload, timeout=600.0)
|
|
elapsed = time.perf_counter() - t0
|
|
r.raise_for_status()
|
|
return elapsed, r.json()
|
|
|
|
|
|
async def num_running(client, host, port) -> int:
|
|
"""Read vLLM running-request gauge from /metrics."""
|
|
try:
|
|
r = await client.get(f"http://{host}:{port}/metrics", timeout=5.0)
|
|
for line in r.text.splitlines():
|
|
if line.startswith("vllm:num_requests_running"):
|
|
return int(float(line.split()[-1]))
|
|
except Exception:
|
|
pass
|
|
return -1
|
|
|
|
|
|
class BackgroundLoad:
|
|
"""Maintain N concurrent long-decode streams on a set of (host,port)."""
|
|
def __init__(self, client, endpoints, concurrency, prompt_tokens=2000,
|
|
out_tokens=6000):
|
|
self.client = client
|
|
self.endpoints = endpoints
|
|
self.concurrency = concurrency
|
|
self.prompt_tokens = prompt_tokens
|
|
self.out_tokens = out_tokens
|
|
self._stop = asyncio.Event()
|
|
self._tasks: list[asyncio.Task] = []
|
|
|
|
async def _worker(self, idx):
|
|
host, port = self.endpoints[idx % len(self.endpoints)]
|
|
seed = 900000 + idx
|
|
while not self._stop.is_set():
|
|
prompt = synth_prompt(seed, self.prompt_tokens)
|
|
seed += 1
|
|
try:
|
|
await completion(self.client, host, port, prompt,
|
|
max_tokens=self.out_tokens, stream=True)
|
|
except Exception:
|
|
await asyncio.sleep(0.5)
|
|
|
|
def start(self):
|
|
self._tasks = [asyncio.create_task(self._worker(i))
|
|
for i in range(self.concurrency)]
|
|
|
|
async def stop(self):
|
|
self._stop.set()
|
|
for t in self._tasks:
|
|
t.cancel()
|
|
await asyncio.gather(*self._tasks, return_exceptions=True)
|
|
self._tasks = []
|
|
|
|
|
|
async def measure_transfer(client, src_host, src_port, dst_host, dst_port,
|
|
src_eid, src_bootstrap_addr, input_tokens, seed):
|
|
prompt = synth_prompt(seed, input_tokens)
|
|
transfer_id = uuid.uuid4().hex
|
|
# step 1: prefill on A
|
|
await completion(client, src_host, src_port, prompt, max_tokens=1,
|
|
ktp={"do_remote_decode": True, "transfer_id": transfer_id})
|
|
# step 2: migrate to B (this is the timed transfer)
|
|
t_start_unix = time.time()
|
|
t_xfer, _ = await completion(
|
|
client, dst_host, dst_port, prompt, max_tokens=1,
|
|
ktp={"do_remote_prefill": True, "transfer_id": transfer_id,
|
|
"remote_engine_id": src_eid,
|
|
"remote_bootstrap_addr": src_bootstrap_addr})
|
|
return {
|
|
"input_tokens": input_tokens,
|
|
"t_transfer_s": t_xfer,
|
|
"t_step2_start_unix": t_start_unix,
|
|
"t_step2_end_unix": time.time(),
|
|
"kv_bytes": input_tokens * KV_PER_TOK,
|
|
"eff_gbps": input_tokens * KV_PER_TOK / 1e9 / t_xfer if t_xfer > 0 else 0,
|
|
}
|
|
|
|
|
|
async def main_async(a):
|
|
sizes = [int(s) for s in a.sizes.split(",")]
|
|
bg_loads = [int(s) for s in a.bg_loads.split(",")]
|
|
src_host, dst_host = a.src_host, a.dst_host
|
|
limits = httpx.Limits(max_connections=256, max_keepalive_connections=256)
|
|
async with httpx.AsyncClient(limits=limits, trust_env=False) as client:
|
|
src_eid = await get_engine_id(client, src_host, a.src_bp)
|
|
src_bootstrap_addr = f"http://{src_host}:{a.src_bp}"
|
|
print(f"[mb6] src eid={src_eid[:16]}... endpoints A={src_host}:{a.src_port} "
|
|
f"B={dst_host}:{a.dst_port}")
|
|
|
|
endpoints = [(src_host, a.src_port), (dst_host, a.dst_port)]
|
|
results = []
|
|
for bg in bg_loads:
|
|
loader = None
|
|
if bg > 0:
|
|
loader = BackgroundLoad(client, endpoints, bg,
|
|
prompt_tokens=a.bg_prompt,
|
|
out_tokens=a.bg_out)
|
|
loader.start()
|
|
# wait for instances to actually be busy
|
|
print(f"[mb6] bg={bg}: ramping background load ...")
|
|
for _ in range(40):
|
|
await asyncio.sleep(1.0)
|
|
na = await num_running(client, src_host, a.src_port)
|
|
nb = await num_running(client, dst_host, a.dst_port)
|
|
if na >= 1 and nb >= 1:
|
|
print(f"[mb6] bg={bg}: busy (A running={na} B running={nb})")
|
|
break
|
|
else:
|
|
print(f"[mb6] bg=0: idle baseline")
|
|
# ensure idle
|
|
await asyncio.sleep(2.0)
|
|
|
|
for sz in sizes:
|
|
for rep in range(a.repeats):
|
|
na = await num_running(client, src_host, a.src_port)
|
|
nb = await num_running(client, dst_host, a.dst_port)
|
|
row = await measure_transfer(
|
|
client, src_host, a.src_port, dst_host, a.dst_port,
|
|
src_eid, src_bootstrap_addr, sz, seed=sz * 100 + rep + bg * 7)
|
|
row["bg_load"] = bg
|
|
row["A_running_at_measure"] = na
|
|
row["B_running_at_measure"] = nb
|
|
results.append(row)
|
|
kv_mib = sz * KV_PER_TOK / 2**20
|
|
print(f" bg={bg:>3} size={sz:>6} ({kv_mib:6.0f}MiB) rep={rep} "
|
|
f"A_run={na:>2} B_run={nb:>2} "
|
|
f"transfer={row['t_transfer_s']*1000:7.0f}ms "
|
|
f"eff={row['eff_gbps']:5.2f}GB/s")
|
|
|
|
if loader:
|
|
await loader.stop()
|
|
# let the instances drain before next bg level
|
|
print(f"[mb6] bg={bg}: draining ...")
|
|
for _ in range(60):
|
|
await asyncio.sleep(1.0)
|
|
na = await num_running(client, src_host, a.src_port)
|
|
nb = await num_running(client, dst_host, a.dst_port)
|
|
if na <= 0 and nb <= 0:
|
|
break
|
|
|
|
# summary per (bg, size)
|
|
print("\n=== summary: effective transfer bandwidth vs background load ===")
|
|
print(f"{'bg':>4} {'size':>7} {'n':>3} {'xfer_p50_ms':>12} {'eff_p50_GBps':>13} "
|
|
f"{'eff_mean':>9}")
|
|
summary = []
|
|
for bg in bg_loads:
|
|
for sz in sizes:
|
|
rs = [r for r in results if r["bg_load"] == bg and r["input_tokens"] == sz]
|
|
if not rs:
|
|
continue
|
|
xfer = sorted(r["t_transfer_s"] for r in rs)
|
|
eff = sorted(r["eff_gbps"] for r in rs)
|
|
p50x = xfer[len(xfer) // 2]
|
|
p50e = eff[len(eff) // 2]
|
|
meane = statistics.mean(eff)
|
|
summary.append({"bg": bg, "size": sz, "n": len(rs),
|
|
"xfer_p50_ms": p50x * 1000,
|
|
"eff_p50_gbps": p50e, "eff_mean_gbps": meane})
|
|
print(f"{bg:>4} {sz:>7} {len(rs):>3} {p50x*1000:>12.0f} "
|
|
f"{p50e:>13.2f} {meane:>9.2f}")
|
|
|
|
Path(a.out).write_text(json.dumps(
|
|
{"model": MODEL_PATH, "kv_bytes_per_token": KV_PER_TOK,
|
|
"label": a.label, "raw": results, "summary": summary}, indent=2))
|
|
print(f"\n[mb6] wrote {a.out}")
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--src-host", default="127.0.0.1")
|
|
p.add_argument("--dst-host", default="127.0.0.1")
|
|
p.add_argument("--src-port", type=int, default=8000)
|
|
p.add_argument("--dst-port", type=int, default=8001)
|
|
p.add_argument("--src-bp", type=int, default=8998)
|
|
p.add_argument("--dst-bp", type=int, default=8999)
|
|
p.add_argument("--sizes", default="16384,65536")
|
|
p.add_argument("--bg-loads", default="0,8,24")
|
|
p.add_argument("--repeats", type=int, default=4)
|
|
p.add_argument("--bg-prompt", type=int, default=2000)
|
|
p.add_argument("--bg-out", type=int, default=6000)
|
|
p.add_argument("--label", default="main-venv")
|
|
p.add_argument("--out", default="mb6_result.json")
|
|
args = p.parse_args()
|
|
asyncio.run(main_async(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|