MIGRATION_TRANSFER_COST.md: under real load, migration KV transfer runs at ~3 GB/s vs ~10 GB/s idle. Decomposed (instruments + MB6 microbench) into ~55% RDMA-actual (HBM/PCIe contention with running kernels: 7.6->4.0 GB/s) + ~45% control-plane GIL starvation during long prefills. Reproduced on a fresh upstream venv (byte-identical transfer path) -> upstream/hardware inherent, not our patch. Layerwise is the wrong lever; the tax is structural on a loaded agentic cluster. Includes mb6_transfer_under_load + run_mb6, instrument_dst_migration/mooncake, and the dst/transfer decomposition analyzers. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
238 lines
9.1 KiB
Python
Executable File
238 lines
9.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Decompose migration KV-transfer time into RDMA-actual vs control-plane.
|
|
|
|
Joins three logs from an instrumented unified_v3 run:
|
|
|
|
proxy breakdown.json — per-request route + phase timestamps
|
|
dst_mig_log/dm_mig_pid*.jsonl — dst lifecycle (instrument_dst_migration.py)
|
|
gives T_kv_pull = wait_for_kvs -> recv_done
|
|
mooncake xfer/mb2_transfer_pid*.jsonl — connector internals
|
|
(instrument_mooncake.py):
|
|
send_blocks : pure RDMA (total_bytes, duration_s) [producer]
|
|
receive_kv_enter/finish: consumer-observed transfer window [consumer]
|
|
ready_wait : producer wait for src KV commit [producer]
|
|
send_kv_to_decode_enter: producer received the pull request [producer]
|
|
|
|
Decisive question: of the 87% dst-side overhead that is T_kv_pull, how
|
|
much is the actual RDMA write (`send_blocks`) vs control-plane
|
|
(handshake / ready-wait / GIL starvation on the busy src)?
|
|
|
|
- send_blocks bandwidth ~ wire (10 GB/s) AND << T_kv_pull
|
|
=> loss is control-plane; layerwise (which only moves WHEN the
|
|
RDMA fires) will NOT fix it.
|
|
- send_blocks bandwidth << wire
|
|
=> the RDMA write itself is slow (NIC / src-side servicing);
|
|
characterize with a load microbench next.
|
|
|
|
Usage:
|
|
python analyze_transfer_decomp.py \
|
|
--proxy-breakdown <RUN>/unified_v3/breakdown.json \
|
|
--dst-log-dir <RUN>/dst_mig_log \
|
|
--xfer-log-dir <RUN>/xfer_log
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import re
|
|
import statistics
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
def _core_req_id(rid: str) -> str:
|
|
if not rid:
|
|
return rid
|
|
s = rid
|
|
if s.startswith("cmpl-"):
|
|
s = s[len("cmpl-"):]
|
|
m = re.match(r"^(.*)-\d+-[0-9a-fA-F]+$", s)
|
|
if m:
|
|
s = m.group(1)
|
|
return s
|
|
|
|
|
|
def _pct(vals, q):
|
|
if not vals:
|
|
return float("nan")
|
|
vs = sorted(vals)
|
|
i = max(0, min(len(vs) - 1, int(math.ceil(q * len(vs))) - 1))
|
|
return vs[i]
|
|
|
|
|
|
def _stat_line(name, vals, unit="s"):
|
|
if not vals:
|
|
print(f"{name:<34} n=0")
|
|
return
|
|
print(f"{name:<34} n={len(vals):>3} mean={statistics.mean(vals):>8.3f} "
|
|
f"p50={_pct(vals,0.5):>8.3f} p90={_pct(vals,0.9):>8.3f} "
|
|
f"max={max(vals):>8.3f} sum={sum(vals):>8.2f} {unit}")
|
|
|
|
|
|
def load_events(xfer_dir: Path):
|
|
files = sorted(xfer_dir.glob("mb2_transfer_pid*.jsonl"))
|
|
print(f"[xfer] log files: {len(files)} under {xfer_dir}")
|
|
send_blocks, recv_enter, recv_finish, ready_wait, send_enter = [], [], [], [], []
|
|
for f in files:
|
|
pid = f.stem.replace("mb2_transfer_pid", "")
|
|
with f.open() as fh:
|
|
for line in fh:
|
|
try:
|
|
e = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
e["_pid"] = pid
|
|
ev = e.get("event")
|
|
if ev == "send_blocks":
|
|
send_blocks.append(e)
|
|
elif ev == "receive_kv_enter":
|
|
recv_enter.append(e)
|
|
elif ev == "receive_kv_finish":
|
|
recv_finish.append(e)
|
|
elif ev == "ready_wait":
|
|
ready_wait.append(e)
|
|
elif ev == "send_kv_to_decode_enter":
|
|
send_enter.append(e)
|
|
print(f"[xfer] events: send_blocks={len(send_blocks)} "
|
|
f"recv_enter={len(recv_enter)} recv_finish={len(recv_finish)} "
|
|
f"ready_wait={len(ready_wait)} send_enter={len(send_enter)}")
|
|
return send_blocks, recv_enter, recv_finish, ready_wait, send_enter
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("--proxy-breakdown", type=Path, required=True)
|
|
p.add_argument("--dst-log-dir", type=Path, required=True)
|
|
p.add_argument("--xfer-log-dir", type=Path, required=True)
|
|
args = p.parse_args()
|
|
|
|
for pth in (args.proxy_breakdown, args.dst_log_dir, args.xfer_log_dir):
|
|
if not pth.exists():
|
|
sys.exit(f"missing: {pth}")
|
|
|
|
proxy = json.load(args.proxy_breakdown.open())
|
|
migrations = [x for x in proxy if x.get("route_class") == "PD_SEP_V2"]
|
|
mig_ids = {x.get("request_id") for x in migrations}
|
|
print(f"[proxy] migrations: {len(migrations)} / {len(proxy)} total")
|
|
|
|
# dst lifecycle: T_kv_pull per migration (core req id)
|
|
dst_pull = {}
|
|
for f in sorted(args.dst_log_dir.glob("dm_mig_pid*.jsonl")):
|
|
for line in f.open():
|
|
try:
|
|
r = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
tw = r.get("t_wait_for_kvs_unix")
|
|
td = r.get("t_kv_recv_done_unix")
|
|
if tw and td:
|
|
dst_pull[_core_req_id(r.get("req_id"))] = td - tw
|
|
|
|
sb, re_enter, re_finish, rw, se = load_events(args.xfer_log_dir)
|
|
|
|
# ---- 1. Pure RDMA bandwidth from send_blocks (the decisive number) ----
|
|
print("\n" + "=" * 90)
|
|
print("1. PURE RDMA WRITE rate (`send_blocks` = batch_transfer_sync_write)")
|
|
print("=" * 90)
|
|
bws, durs, bytes_l = [], [], []
|
|
for e in sb:
|
|
b = e.get("total_bytes", 0)
|
|
d = e.get("duration_s", 0)
|
|
if d and d > 0 and b > 0:
|
|
bws.append(b / 1e9 / d)
|
|
durs.append(d)
|
|
bytes_l.append(b)
|
|
if bws:
|
|
tot_b = sum(bytes_l)
|
|
tot_d = sum(durs)
|
|
print(f" send_blocks calls: {len(bws)}")
|
|
print(f" total bytes moved : {tot_b/2**30:.2f} GiB")
|
|
print(f" total RDMA time : {tot_d:.2f} s")
|
|
print(f" AGGREGATE rate : {tot_b/1e9/tot_d:.2f} GB/s "
|
|
f"(MB2 idle-src steady-state = ~9.7-10 GB/s)")
|
|
_stat_line(" per-call rate (GB/s)", bws, unit="GB/s")
|
|
_stat_line(" per-call duration", durs)
|
|
# bandwidth vs size — small ops are latency-bound
|
|
print("\n rate vs transfer size:")
|
|
pairs = sorted(zip(bytes_l, bws))
|
|
for b, w in pairs:
|
|
bar = "#" * int(min(40, w * 4))
|
|
print(f" {b/2**20:>8.1f} MiB {w:>6.2f} GB/s {bar}")
|
|
else:
|
|
print(" no send_blocks events with positive duration")
|
|
|
|
# ---- 2. Producer ready-wait (src KV commit) ----
|
|
print("\n" + "=" * 90)
|
|
print("2. PRODUCER ready-wait (src KV not yet committed when pull arrived)")
|
|
print("=" * 90)
|
|
rw_vals = [e.get("ready_wait_s", 0) for e in rw if e.get("ready_wait_s") is not None]
|
|
already = sum(1 for e in rw if e.get("ready_already_set"))
|
|
_stat_line(" ready_wait", rw_vals)
|
|
print(f" ready_already_set at entry: {already}/{len(rw)} "
|
|
f"(if most are True, src commit is not the bottleneck)")
|
|
|
|
# ---- 3. Consumer-observed receive_kv window ----
|
|
print("\n" + "=" * 90)
|
|
print("3. CONSUMER receive_kv window (enter->FINISH, ~most of T_kv_pull)")
|
|
print("=" * 90)
|
|
rf_vals = [e.get("duration_s", 0) for e in re_finish if e.get("duration_s")]
|
|
_stat_line(" receive_kv duration", rf_vals)
|
|
|
|
# ---- 4. Per-migration join: T_kv_pull vs receive_kv vs ready_wait ----
|
|
print("\n" + "=" * 90)
|
|
print("4. PER-MIGRATION join (T_kv_pull from dst vs connector internals)")
|
|
print("=" * 90)
|
|
# index connector events by core req id
|
|
rf_by_req = {}
|
|
for e in re_finish:
|
|
for rid in e.get("req_ids", []):
|
|
rf_by_req[_core_req_id(rid)] = e.get("duration_s")
|
|
rw_by_req = {}
|
|
for e in rw:
|
|
rw_by_req[_core_req_id(e.get("d_req_id", ""))] = e.get("ready_wait_s")
|
|
|
|
joined = 0
|
|
sum_pull = sum_recv = sum_rw = 0.0
|
|
rows = []
|
|
for m in migrations:
|
|
core = m.get("request_id")
|
|
pull = dst_pull.get(core)
|
|
recv = rf_by_req.get(core)
|
|
rwv = rw_by_req.get(core)
|
|
if pull is None and recv is None:
|
|
continue
|
|
joined += 1
|
|
if pull: sum_pull += pull
|
|
if recv: sum_recv += recv
|
|
if rwv: sum_rw += rwv
|
|
rows.append((core, m.get("input_length"), m.get("v3_target_cache_hit"),
|
|
pull, recv, rwv))
|
|
print(f" joined migrations: {joined}")
|
|
print(f" Σ T_kv_pull (dst) = {sum_pull:8.2f} s")
|
|
print(f" Σ receive_kv (consumer) = {sum_recv:8.2f} s")
|
|
print(f" Σ ready_wait (producer) = {sum_rw:8.2f} s")
|
|
# The RDMA share: best-effort total send_blocks time
|
|
sum_rdma = sum(durs) if durs else 0.0
|
|
print(f" Σ send_blocks RDMA = {sum_rdma:8.2f} s (all transfers, "
|
|
f"not just migrations)")
|
|
if sum_pull > 0:
|
|
print(f"\n RDMA-actual / T_kv_pull ≈ {sum_rdma/sum_pull*100:5.1f} %")
|
|
print(f" ready-wait / T_kv_pull ≈ {sum_rw/sum_pull*100:5.1f} %")
|
|
resid = sum_pull - sum_rdma - sum_rw
|
|
print(f" control-plane residual ≈ {resid/sum_pull*100:5.1f} % "
|
|
f"(handshake / ZMQ / GIL starvation)")
|
|
|
|
print("\n per-migration detail:")
|
|
print(f" {'req_id':<22} {'in_len':>7} {'dst_hit':>8} {'kv_pull':>8} "
|
|
f"{'recv_kv':>8} {'rdy_wait':>8}")
|
|
for core, il, hit, pull, recv, rwv in sorted(
|
|
rows, key=lambda r: -(r[3] or 0)):
|
|
def s(v): return f"{v:.2f}" if v is not None else " --"
|
|
print(f" {core:<22} {str(il):>7} {str(hit):>8} {s(pull):>8} "
|
|
f"{s(recv):>8} {s(rwv):>8}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|