Files
agentic-kvc/microbench/connector_tax/layerwise/run_v3_trace.sh

115 lines
5.0 KiB
Bash
Executable File

#!/usr/bin/env bash
# Full 1200-req v3 trace, two modes (MODE env), for layer-wise re-profile.
# MODE=baseline : stock connector + stock proxy (post-hoc transfer)
# MODE=layerwise : LAYERWISE connector + write-mode proxy (overlapped)
# Both: unified_v3 routing + DR-fix. Connector & proxy restored from backup
# on exit. Output-equivalence/correctness gate = success rate + migrated-req
# TTFT distribution (byte-level KV correctness already validated on mb7).
#
# Usage (on dash0): MODE=baseline bash run_v3_trace.sh
# MODE=layerwise bash run_v3_trace.sh
set -uo pipefail
MODE="${MODE:-baseline}"
POLICY="${POLICY:-unified_v3}"
AB_FLAGS="${AB_FLAGS:-}" # e.g. "--overload-factor 1.3 --lmetric-decode-weight 0.01"
TAG="${TAG:-$MODE}"
PROJ_DIR="${PROJ_DIR:-/home/admin/cpfs/wjh/agentic-kv}"
VENV="$PROJ_DIR/.venv"
VLLM_ROOT="$VENV/lib/python3.12/site-packages/vllm"
TRACE="${TRACE:-$PROJ_DIR/traces/w600_r0.0015_st30.jsonl}"
DATE="$(date +%Y%m%d_%H%M)"
OUTROOT="${OUTROOT:-$PROJ_DIR/outputs/v3trace_${TAG}_${DATE}}"
PYTHON="$VENV/bin/python"
DR_FIX="$PROJ_DIR/microbench/connector_tax/cache_sweep/apply_direct_read_fix.py"
MC_FILE="$VLLM_ROOT/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_connector.py"
PROXY_FILE="$PROJ_DIR/scripts/cache_aware_proxy.py"
# Staging on shared cpfs (visible on dash0/dash1), not node-local /tmp.
_LWDIR="$PROJ_DIR/microbench/connector_tax/layerwise"
LW_CONN="${LW_CONN:-$_LWDIR/mooncake_connector.LAYERWISE.py}"
WM_PROXY="${WM_PROXY:-$_LWDIR/cache_aware_proxy.WRITEMODE.py}"
ES_INSTR="$_LWDIR/instrument_engine_state.py"
ES="${ES:-0}" # 1 = enable real engine-state feed (P2)
ES_DIR="/dev/shm/agentic_engine_state_${TAG}"
mkdir -p "$OUTROOT"
cfg_dir="$OUTROOT/unified_v3"; mkdir -p "$cfg_dir"
# Backups (connector backup already exists as .ORIG_BACKUP; make proxy one).
[ -f "$MC_FILE.ORIG_BACKUP" ] || cp "$MC_FILE" "$MC_FILE.ORIG_BACKUP"
[ -f "$PROXY_FILE.ORIG_BACKUP" ] || cp "$PROXY_FILE" "$PROXY_FILE.ORIG_BACKUP"
restore() {
cp -f "$MC_FILE.ORIG_BACKUP" "$MC_FILE"
cp -f "$PROXY_FILE.ORIG_BACKUP" "$PROXY_FILE"
"$PYTHON" "$DR_FIX" --revert --vllm-root "$VLLM_ROOT" 2>/dev/null || true
"$PYTHON" "$ES_INSTR" --revert --venv "$VENV" 2>/dev/null || true
rm -rf "$ES_DIR" 2>/dev/null || true
echo "[restore] connector+proxy reset to ORIG, DR-fix + ES-patch reverted"
}
cleanup() {
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
pkill -9 -f "EngineCore" 2>/dev/null || true
sleep 5
restore
}
trap cleanup EXIT
pkill -9 -f "vllm serve" 2>/dev/null || true; sleep 3
restore # start from clean
echo "=== v3 trace (mode=$MODE es=$ES tag=$TAG) -> $OUTROOT ==="
# Always deploy the enhanced proxy (write-mode + engine-state, both env/flag
# gated; with feed off + write-mode off it behaves identically to stock).
cp -f "$WM_PROXY" "$PROXY_FILE"
if [ "$MODE" = "layerwise" ]; then
cp -f "$LW_CONN" "$MC_FILE"
export MOONCAKE_LAYERWISE=1
export EAR_WRITE_MODE=1
fi
"$PYTHON" -c "import ast; ast.parse(open('$MC_FILE').read()); ast.parse(open('$PROXY_FILE').read()); print('[deploy] proxy + connector AST OK')" || exit 1
PROXY_ES_ARG=""
if [ "$ES" = "1" ]; then
echo "[ES] apply engine-state patch + enable feed at $ES_DIR"
"$PYTHON" "$ES_INSTR" --apply --venv "$VENV"
mkdir -p "$ES_DIR"
export AGENTIC_ENGINE_STATE_URI="file://$ES_DIR"
PROXY_ES_ARG="--engine-state-uri file://$ES_DIR"
fi
echo "[DR-fix] apply"
"$PYTHON" "$DR_FIX" --apply --vllm-root "$VLLM_ROOT"
export VLLM_MOONCAKE_DISABLE_DIRECT_READ_SYNC=1
echo "[run] $POLICY AB=[$AB_FLAGS] (MOONCAKE_LAYERWISE=${MOONCAKE_LAYERWISE:-0} EAR_WRITE_MODE=${EAR_WRITE_MODE:-0})"
EXTRA_PROXY_ARGS="$AB_FLAGS $PROXY_ES_ARG" bash "$PROJ_DIR/scripts/b3_isolated_policy.sh" "$POLICY" "$TRACE" "$cfg_dir" \
2>&1 | tee "$cfg_dir/orchestrator.log" | tail -20
pkill -9 -f cache_aware_proxy 2>/dev/null || true
pkill -9 -f "vllm serve" 2>/dev/null || true
sleep 5
echo "[stats] $MODE"
"$PYTHON" - "$cfg_dir" << 'PYEOF'
import json, sys, statistics
d = sys.argv[1]
ms = [json.loads(l) for l in open(f"{d}/metrics.jsonl")]
ok = [m for m in ms if not m.get("error")]
ttft = sorted(m["ttft_s"] for m in ok if m.get("ttft_s") is not None)
def p(q): return ttft[min(len(ttft)-1, int(q*len(ttft)))] if ttft else 0
print(f" requests: {len(ms)} success: {len(ok)} ({len(ok)/max(1,len(ms))*100:.1f}%)")
print(f" TTFT s : p50={p(.5):.2f} p90={p(.9):.2f} p99={p(.99):.2f}")
# migrated reqs from proxy breakdown
try:
bd = json.load(open(f"{d}/breakdown.json"))
mig = [x for x in bd if x.get("route_class") == "PD_SEP_V2"]
mids = {x["request_id"] for x in mig}
mt = sorted(m["ttft_s"] for m in ok if m["request_id"] in mids and m.get("ttft_s"))
print(f" migrations: {len(mig)} migrated-req TTFT: "
f"p50={mt[len(mt)//2]:.2f} p90={mt[int(len(mt)*.9)]:.2f} max={mt[-1]:.2f}" if mt else f" migrations: {len(mig)}")
except Exception as e:
print(f" (breakdown parse: {e})")
PYEOF
echo "[done] $cfg_dir (metrics.jsonl, breakdown.json)"