D2: run_benchmark.sh and run_experiments.sh still pass --time-scale and --max-inflight-sessions to the replayer, but those flags were removed when the project moved to trace-driven dispatch. The scripts cannot run as-is. D3: ~25 ad-hoc analyze_* / compare_* / profile_* / final_* scripts and a handful of single-experiment run_*.sh point at /home/admin/cpfs paths, deleted output directories, or a sampled trace file that no longer exists. Keep them in scripts/legacy/ for historical reference; the scripts that remain in scripts/ (analyze_trace, analyze_breakdown, analyze_cache_hit, analyze_eviction, compare_results, compute_roofline, sample_trace, analyze_agentic_patterns, simulate_cache_policies, plus launch_*.sh, gpu_monitor.sh, bench.sh) cover the current workflow. Adds scripts/legacy/README.md to document the archival policy.
119 lines
5.0 KiB
Python
119 lines
5.0 KiB
Python
"""Final A/B comparison: baseline (dash0) vs elastic (dash1).
|
|
Both fresh restart, same trace, same params. GPU util + APC + latency."""
|
|
import json, csv, statistics, os, urllib.request
|
|
|
|
def lat(path):
|
|
rows = [json.loads(l) for l in open(path)]
|
|
ok = [r for r in rows if not r.get("error")]
|
|
err = [r for r in rows if r.get("error")]
|
|
ttfts = sorted([r["ttft_s"] for r in ok if r.get("ttft_s")])
|
|
tpots = sorted([r["tpot_s"] for r in ok if r.get("tpot_s") and r["tpot_s"]>0])
|
|
lats = sorted([r["latency_s"] for r in ok])
|
|
p = lambda v,q: v[min(int(q*len(v)),len(v)-1)] if v else 0
|
|
ok_inp = sorted([r["input_length"] for r in ok])
|
|
err_inp = sorted([r["input_length"] for r in err])
|
|
return {"ok": len(ok), "n": len(rows),
|
|
"t50": p(ttfts,.5), "t90": p(ttfts,.9),
|
|
"p50": p(tpots,.5), "p90": p(tpots,.9),
|
|
"e50": p(lats,.5), "e90": p(lats,.9),
|
|
"inp50": p(ok_inp,.5), "err_inp50": p(err_inp,.5) if err_inp else 0}
|
|
|
|
def gpu_per_inst(path):
|
|
if not os.path.exists(path):
|
|
return {}
|
|
rows = list(csv.DictReader(open(path)))
|
|
by_gpu = {}
|
|
for r in rows:
|
|
g = int(r["gpu"])
|
|
by_gpu.setdefault(g, []).append(float(r["util_pct"]))
|
|
result = {}
|
|
for g, vals in sorted(by_gpu.items()):
|
|
nz = sum(1 for v in vals if v > 0)
|
|
result[g] = {"mean": statistics.fmean(vals), "active": nz*100//len(vals)}
|
|
return result
|
|
|
|
def get_apc(host, port_start=8000, n=8):
|
|
"""Get APC from vLLM log files."""
|
|
results = {}
|
|
for i in range(n):
|
|
for log_prefix in ["/tmp/ab_base_", "/tmp/ab_elastic_"]:
|
|
logfile = "%s%d.log" % (log_prefix, i)
|
|
try:
|
|
import subprocess
|
|
r = subprocess.run(["ssh", "-o", "ConnectTimeout=5", host,
|
|
"grep 'Prefix cache hit rate' %s 2>/dev/null | tail -1" % logfile],
|
|
capture_output=True, text=True, timeout=10)
|
|
line = r.stdout.strip()
|
|
if "Prefix cache hit rate:" in line:
|
|
import re
|
|
pch = re.search(r"Prefix cache hit rate: ([0-9.]+)", line)
|
|
ech = re.search(r"External prefix cache hit rate: ([0-9.]+)", line)
|
|
results[i] = {
|
|
"prefix": float(pch.group(1)) if pch else 0,
|
|
"external": float(ech.group(1)) if ech else 0,
|
|
}
|
|
except:
|
|
pass
|
|
return results
|
|
|
|
sep = "=" * 80
|
|
print(sep)
|
|
print(" A/B COMPARISON: Baseline (dash0) vs Elastic P2P (dash1)")
|
|
print(" Both: fresh restart, 200 req, time_scale=20, 8 sessions")
|
|
print(sep)
|
|
|
|
# Latency
|
|
print("\n LATENCY:")
|
|
fmt = "%-30s %7s %8s %8s %8s %8s %8s %8s"
|
|
print(fmt % ("Config", "OK/N", "TTFT50", "TTFT90", "TPOT50", "TPOT90", "E2E50", "inp_p50"))
|
|
print("-" * 80)
|
|
for path, label in [
|
|
("outputs/ab_baseline/metrics.jsonl", "Baseline (combined)"),
|
|
("outputs/ab_elastic/metrics.jsonl", "Elastic P2P (cap=4)"),
|
|
]:
|
|
if os.path.exists(path):
|
|
s = lat(path)
|
|
print(fmt % (label, "%d/%d" % (s["ok"],s["n"]),
|
|
"%.3f" % s["t50"], "%.3f" % s["t90"], "%.3f" % s["p50"],
|
|
"%.3f" % s["p90"], "%.3f" % s["e50"], str(s["inp50"])))
|
|
|
|
# Delta
|
|
b = lat("outputs/ab_baseline/metrics.jsonl") if os.path.exists("outputs/ab_baseline/metrics.jsonl") else None
|
|
a = lat("outputs/ab_elastic/metrics.jsonl") if os.path.exists("outputs/ab_elastic/metrics.jsonl") else None
|
|
if b and a:
|
|
print()
|
|
for label, bv, av in [("TTFT p50",b["t50"],a["t50"]),("TTFT p90",b["t90"],a["t90"]),
|
|
("TPOT p90",b["p90"],a["p90"]),("E2E p50",b["e50"],a["e50"])]:
|
|
d = (av/bv-1)*100 if bv > 0 else 0
|
|
print(" %s: %.3f -> %.3f (%+.1f%%)" % (label, bv, av, d))
|
|
|
|
# GPU utilization
|
|
print("\n GPU UTILIZATION:")
|
|
for path, label in [
|
|
("outputs/ab_baseline/gpu_util.csv", "Baseline"),
|
|
("outputs/ab_elastic/gpu_util.csv", "Elastic"),
|
|
]:
|
|
gi = gpu_per_inst(path)
|
|
if gi:
|
|
means = [gi[g]["mean"] for g in sorted(gi.keys())]
|
|
actives = [gi[g]["active"] for g in sorted(gi.keys())]
|
|
print(" %s:" % label)
|
|
for g in sorted(gi.keys()):
|
|
print(" GPU%d: mean=%5.1f%% active=%2d%%" % (g, gi[g]["mean"], gi[g]["active"]))
|
|
print(" Aggregate: mean=%.1f%% imbalance=%.1fx" % (
|
|
statistics.fmean(means), max(means)/max(min(means),0.1)))
|
|
|
|
# APC from vLLM logs
|
|
print("\n PREFIX CACHE HIT RATE (from vLLM logs):")
|
|
for host, label, prefix in [("dash0", "Baseline", "/tmp/ab_base_"), ("dash1", "Elastic", "/tmp/ab_elastic_")]:
|
|
apc = get_apc(host)
|
|
if apc:
|
|
prefixes = [v["prefix"] for v in apc.values()]
|
|
externals = [v.get("external", 0) for v in apc.values()]
|
|
print(" %s:" % label)
|
|
for i in sorted(apc.keys()):
|
|
ext = " ext=%.1f%%" % apc[i]["external"] if apc[i].get("external") else ""
|
|
print(" inst_%d: prefix=%.1f%%%s" % (i, apc[i]["prefix"], ext))
|
|
print(" Avg prefix: %.1f%% Avg external: %.1f%%" % (
|
|
statistics.fmean(prefixes), statistics.fmean(externals)))
|