MB2: per-stage instrumentation patch + launcher integration

Per-stage breakdown of "step 2" (the B-side do_remote_prefill) requires
vLLM/mooncake-internal timing — we cannot infer it from black-box HTTP
E2E. This commit adds the four pieces to do that breakdown:

instrument_mooncake.py
  apply / revert / check patches on mooncake_connector_v1.py to emit
  structured JSONL transfer events at two key sites:

    send_blocks (P-side, on batch_transfer_sync_write):
      {event, remote_session, total_bytes, duration_s, t_start_unix,
       ret, tp_rank, t_log_unix}
    receive_kv (D-side, on the ZMQ-driven pull request):
      {event, path, local_req_ids, remote_req_ids, duration_s,
       t_start_unix, tp_rank, t_log_unix}

  All injected code is bracketed by `# MB2_INSTRUMENT_START/END` so the
  --revert pass is a single regex scan. Apply-revert round-trip
  validated on dash1 (PATCHED → py_compile ok → revert → CLEAN → ok).

start_vllm_pair.sh (updated)
  - Picks up instrument_mooncake.py via SCRIPT_DIR.
  - On `start`: applies patch before launching the two vLLM instances.
  - On `stop` (or trap exit): reverts patch.
  - Sets per-instance MB2_LOG_DIR = $FRESH_ROOT/mb2_transfer_logs/{A,B}/
    so send-side and receive-side events land in cleanly separated dirs.

deploy.sh
  tar-over-ssh sync of microbench/fresh_setup/ → cpfs
  /home/admin/cpfs/wjh/agentic-kv-fresh/scripts/ so dash1 / dash2 see
  the same scripts (dash{1,2} don't have rsync; tar pipe works).

The mb2_kv_transfer.py client still uses black-box E2E timing — the
next commit will teach it to ingest the per-instance JSONL logs to
produce the 4-way breakdown (queueing / setup / transfer / decode).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 18:12:44 +08:00
parent 7437422618
commit efdcf3c555
3 changed files with 259 additions and 3 deletions

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Sync microbench/fresh_setup/ to /home/admin/cpfs/wjh/agentic-kv-fresh/scripts/
# so dash1 / dash2 see the same scripts. cpfs is mounted at the same path on
# both, so one rsync from any host with cpfs access is enough.
#
# Run from the agentic-kv repo root (this directory contains microbench/).
set -eo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SRC="${SCRIPT_DIR}/"
DEST_HOST="${1:-dash1}"
DEST="/home/admin/cpfs/wjh/agentic-kv-fresh/scripts/"
echo "[deploy] syncing ${SRC} -> ${DEST_HOST}:${DEST}"
ssh "${DEST_HOST}" "mkdir -p ${DEST}"
# dash1/2 don't have rsync; use tar over ssh.
tar -C "${SCRIPT_DIR}" --exclude='__pycache__' --exclude='*.pyc' \
--exclude='deploy.sh' -czf - . \
| ssh "${DEST_HOST}" "cd ${DEST} && tar -xzf -"
echo "[deploy] done"
ssh "${DEST_HOST}" "ls -la ${DEST}"

View File

@@ -0,0 +1,217 @@
#!/usr/bin/env python3
"""Instrument mooncake_connector_v1.py with per-stage timing for MB2.
Adds structured JSONL logging at two key call sites so we can recover
the full breakdown (queueing / prefill / kv transfer / decode) when
combined with the client-side TTFT + finish timestamps:
send_blocks (P-side, the actual `batch_transfer_sync_write`):
{event=send_blocks, remote_session, total_bytes,
duration_s, t_start_unix, ret, tp_rank}
receive_kv (D-side, the ZMQ-driven pull request):
{event=receive_kv, path, local_req_ids, remote_req_ids,
duration_s, t_start_unix, tp_rank}
Logs go to ``${MB2_LOG_DIR}/mb2_transfer_pid<pid>.jsonl`` (default
``MB2_LOG_DIR=/tmp``). One file per vLLM worker PID so concurrent send +
receive on the same machine don't fight for fsync ordering.
Apply / revert pattern follows microbench/connector_tax/cache_sweep/
apply_direct_read_fix.py. All inserted code is bracketed with
``# MB2_INSTRUMENT_START`` / ``# MB2_INSTRUMENT_END`` so revert is a
single text scan.
Usage:
python instrument_mooncake.py --apply [--venv PATH]
python instrument_mooncake.py --revert [--venv PATH]
python instrument_mooncake.py --check [--venv PATH]
"""
from __future__ import annotations
import argparse
import re
from pathlib import Path
DEFAULT_VENV = Path("/home/admin/cpfs/wjh/agentic-kv-fresh/.venv")
TARGET_REL = "lib/python3.12/site-packages/mooncake/mooncake_connector_v1.py"
START_MARK = "# MB2_INSTRUMENT_START"
END_MARK = "# MB2_INSTRUMENT_END"
HEADER_INSERT = f"""
{START_MARK}
import json as _mb2_json
import os as _mb2_os
import threading as _mb2_threading
import time as _mb2_time
_MB2_LOG_DIR = _mb2_os.environ.get("MB2_LOG_DIR", "/tmp")
_MB2_LOG_PATH = _mb2_os.path.join(_MB2_LOG_DIR, f"mb2_transfer_pid{{_mb2_os.getpid()}}.jsonl")
_MB2_LOG_FILE = None
_MB2_LOG_LOCK = _mb2_threading.Lock()
def _mb2_log_event(d):
global _MB2_LOG_FILE
d.setdefault("t_log_unix", _mb2_time.time())
if _MB2_LOG_FILE is None:
_MB2_LOG_FILE = open(_MB2_LOG_PATH, "a", buffering=1)
with _MB2_LOG_LOCK:
_MB2_LOG_FILE.write(_mb2_json.dumps(d) + "\\n")
{END_MARK}
"""
# Insert HEADER right after the `import time` near the top.
HEADER_ANCHOR = "import time\n"
SEND_TARGET = """ def _send_blocks(
self,
remote_session: str,
src_ptrs: list[int],
dst_ptrs: list[int],
lengths: list[int],
) -> int:
start_time = time.perf_counter()
ret_value = self.engine.batch_transfer_sync_write(
remote_session, src_ptrs, dst_ptrs, lengths
)"""
SEND_REPLACE = f""" def _send_blocks(
self,
remote_session: str,
src_ptrs: list[int],
dst_ptrs: list[int],
lengths: list[int],
) -> int:
{START_MARK}
_mb2_total_bytes = sum(lengths)
_mb2_t_start_unix = time.time()
{END_MARK}
start_time = time.perf_counter()
ret_value = self.engine.batch_transfer_sync_write(
remote_session, src_ptrs, dst_ptrs, lengths
)
{START_MARK}
_mb2_log_event({{"event": "send_blocks",
"remote_session": remote_session,
"total_bytes": _mb2_total_bytes,
"duration_s": time.perf_counter() - start_time,
"t_start_unix": _mb2_t_start_unix,
"ret": ret_value,
"tp_rank": self.tp_rank}})
{END_MARK}"""
RECV_TARGET = """ async def receive_kv(
self, path: str, req_blocks: list[tuple[str, str, list[int]]]
):
local_req_ids, remote_req_ids, block_ids = map(list, zip(*req_blocks))"""
RECV_REPLACE = f""" async def receive_kv(
self, path: str, req_blocks: list[tuple[str, str, list[int]]]
):
local_req_ids, remote_req_ids, block_ids = map(list, zip(*req_blocks))
{START_MARK}
_mb2_t_recv_start = time.perf_counter()
_mb2_t_recv_start_unix = time.time()
{END_MARK}"""
RECV_DONE_TARGET = """ self.finished_recving_reqs.update(local_req_ids)
logger.debug(
"pulling kv_caches for %s finished (local requests: %s)",
remote_req_ids, local_req_ids)"""
RECV_DONE_REPLACE = f""" {START_MARK}
_mb2_log_event({{"event": "receive_kv",
"path": path,
"local_req_ids": list(local_req_ids),
"remote_req_ids": list(remote_req_ids),
"duration_s": time.perf_counter() - _mb2_t_recv_start,
"t_start_unix": _mb2_t_recv_start_unix,
"tp_rank": self.tp_rank}})
{END_MARK}
self.finished_recving_reqs.update(local_req_ids)
logger.debug(
"pulling kv_caches for %s finished (local requests: %s)",
remote_req_ids, local_req_ids)"""
PATCHES = [
("header", HEADER_ANCHOR, HEADER_ANCHOR + HEADER_INSERT),
("send_blocks", SEND_TARGET, SEND_REPLACE),
("receive_kv (entry)", RECV_TARGET, RECV_REPLACE),
("receive_kv (done)", RECV_DONE_TARGET, RECV_DONE_REPLACE),
]
def find_target(path: Path) -> Path:
candidates = [path, DEFAULT_VENV / TARGET_REL]
for c in candidates:
if c.is_file():
return c
if c.is_dir():
sub = c / TARGET_REL
if sub.is_file():
return sub
raise FileNotFoundError(f"cannot find mooncake_connector_v1.py near {path}")
def is_patched(text: str) -> bool:
return START_MARK in text
def apply(target: Path) -> None:
text = target.read_text()
if is_patched(text):
print(f"[mb2-instr] already patched: {target}")
return
new = text
for name, src, dst in PATCHES:
if src not in new:
raise RuntimeError(
f"patch {name!r}: anchor not found in {target}. "
"File may have been refactored."
)
new = new.replace(src, dst, 1)
target.write_text(new)
print(f"[mb2-instr] applied 4 patches -> {target}")
def revert(target: Path) -> None:
text = target.read_text()
if not is_patched(text):
print(f"[mb2-instr] not patched (nothing to revert): {target}")
return
# Remove everything between START_MARK and END_MARK, including the markers
# and the surrounding whitespace they introduced.
pat = re.compile(
r"[ \t]*" + re.escape(START_MARK) + r".*?" + re.escape(END_MARK) + r"\n?",
flags=re.DOTALL,
)
new = pat.sub("", text)
# Header insert leaves a leading blank line — collapse runs of 3+ newlines.
new = re.sub(r"\n{3,}", "\n\n", new)
target.write_text(new)
print(f"[mb2-instr] reverted: {target}")
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--apply", action="store_true")
p.add_argument("--revert", action="store_true")
p.add_argument("--check", action="store_true")
p.add_argument("--venv", type=Path, default=DEFAULT_VENV)
args = p.parse_args()
target = find_target(args.venv)
if args.apply:
apply(target)
elif args.revert:
revert(target)
elif args.check:
text = target.read_text()
print(f"[mb2-instr] {'PATCHED' if is_patched(text) else 'CLEAN'}: {target}")
else:
p.error("specify --apply / --revert / --check")
if __name__ == "__main__":
main()

View File

@@ -30,10 +30,17 @@ BP_B=8999
MASTER_A=29500
MASTER_B=29501
MB2_LOG_ROOT="${FRESH_ROOT}/mb2_transfer_logs"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
INSTRUMENT="${SCRIPT_DIR}/instrument_mooncake.py"
stop_all() {
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 2
if [[ -f "${INSTRUMENT}" ]]; then
python "${INSTRUMENT}" --revert --venv "${VENV}" 2>/dev/null || true
fi
}
case "${1:-start}" in
@@ -61,6 +68,15 @@ stop_all
source "${VENV}/bin/activate"
if [[ -f "${INSTRUMENT}" ]]; then
echo "[mb2] applying instrumentation patch"
python "${INSTRUMENT}" --apply --venv "${VENV}"
else
echo "[mb2] WARN instrument_mooncake.py not found at ${INSTRUMENT}; transfer logs will be absent"
fi
mkdir -p "${MB2_LOG_ROOT}/A" "${MB2_LOG_ROOT}/B"
launch() {
local idx="$1" gpu="$2" port="$3" bp="$4" master="$5"
echo "[mb2] launching instance ${idx} on GPU ${gpu}, port ${port}, bp ${bp}"
@@ -68,6 +84,7 @@ launch() {
VLLM_MOONCAKE_BOOTSTRAP_PORT="${bp}" \
CUDA_VISIBLE_DEVICES="${gpu}" \
MASTER_PORT="${master}" \
MB2_LOG_DIR="${MB2_LOG_ROOT}/${idx}" \
nohup vllm serve "${MODEL}" \
--host 0.0.0.0 --port "${port}" \
--tensor-parallel-size 1 \
@@ -100,6 +117,6 @@ for port in "${PORT_A}" "${PORT_B}"; do
done
echo "[mb2] both instances UP"
echo " A: 127.0.0.1:${PORT_A} (GPU ${GPU_A}, bp ${BP_A})"
echo " B: 127.0.0.1:${PORT_B} (GPU ${GPU_B}, bp ${BP_B})"
echo " logs: ${LOGS_DIR}"
echo " A: 127.0.0.1:${PORT_A} (GPU ${GPU_A}, bp ${BP_A}, log_dir ${MB2_LOG_ROOT}/A)"
echo " B: 127.0.0.1:${PORT_B} (GPU ${GPU_B}, bp ${BP_B}, log_dir ${MB2_LOG_ROOT}/B)"
echo " vllm stdout: ${LOGS_DIR}"