From 297fed6e734da433ee087dbc0445da95534875a7 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Tue, 26 May 2026 17:27:41 +0800 Subject: [PATCH] Microbench 3 (connector_tax): infrastructure for KV connector substrate tax MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validates the elastic_migration_v2 finding that kv_role=kv_both adds TTFT p90 +45% even when PD-sep never fires. Replicates under single-instance, synthetic, open-loop workload to disambiguate mechanism cost from 8-instance feedback amplification. Configurations (8): plain, noop_connector, mooncake_{producer,consumer,both}, nixl_both, lmcache_only, multi_mooncake_lmcache. Pre-flight verification gates risky configs (kv_consumer needs dummy bootstrap, multi-connector composition, NoOp custom class loading). Workload: two-phase sweep Phase A: rate {0.5..32} req/s × shape (4096, 256), saturation criteria Phase B: ref_safe rate × cartesian (input ∈ {512,4k,32k}, output ∈ {64,256,1024}) Step-timing patch enriches vLLM's existing AGENTIC_STEP_LOG_PATH emit with step_duration_us and build_meta_us — directly measures per-step substrate cost, not just user-visible TTFT/TPOT. run_all.sh runs as 5-stage barrier: 0 pre-flight + apply patch 1 Phase A all configs 2 pick ref_safe / ref_load 3 Phase B all configs 4 revert patch + analyze + plot Outputs aggregate.{json,csv}, MANIFEST.tsv, and 5 figures. Estimated runtime: 4-5.5 hours on idle dash0 H20. --- microbench/__init__.py | 0 microbench/connector_tax/DESIGN.md | 664 ++++++++++++++++++ microbench/connector_tax/__init__.py | 0 microbench/connector_tax/analyze.py | 177 +++++ microbench/connector_tax/bench_loop.py | 351 +++++++++ microbench/connector_tax/launch/common.sh | 64 ++ .../launch/launch_lmcache_only.sh | 21 + .../launch/launch_mooncake_both.sh | 18 + .../launch/launch_mooncake_consumer.sh | 28 + .../launch/launch_mooncake_producer.sh | 18 + .../launch/launch_multi_mooncake_lmcache.sh | 41 ++ .../connector_tax/launch/launch_nixl_both.sh | 21 + .../launch/launch_noop_connector.sh | 16 + .../connector_tax/launch/launch_plain.sh | 13 + microbench/connector_tax/metrics_sampler.py | 110 +++ .../patches/apply_step_timing.py | 171 +++++ .../connector_tax/plot_connector_tax.py | 263 +++++++ microbench/connector_tax/run_all.sh | 232 ++++++ microbench/connector_tax/tools/__init__.py | 0 .../connector_tax/tools/dummy_bootstrap.py | 84 +++ .../connector_tax/tools/noop_connector.py | 90 +++ .../connector_tax/tools/verify_kv_consumer.sh | 33 + .../tools/verify_multi_connector.sh | 35 + .../tools/verify_noop_connector.sh | 26 + 24 files changed, 2476 insertions(+) create mode 100644 microbench/__init__.py create mode 100644 microbench/connector_tax/DESIGN.md create mode 100644 microbench/connector_tax/__init__.py create mode 100644 microbench/connector_tax/analyze.py create mode 100644 microbench/connector_tax/bench_loop.py create mode 100755 microbench/connector_tax/launch/common.sh create mode 100755 microbench/connector_tax/launch/launch_lmcache_only.sh create mode 100755 microbench/connector_tax/launch/launch_mooncake_both.sh create mode 100755 microbench/connector_tax/launch/launch_mooncake_consumer.sh create mode 100755 microbench/connector_tax/launch/launch_mooncake_producer.sh create mode 100755 microbench/connector_tax/launch/launch_multi_mooncake_lmcache.sh create mode 100755 microbench/connector_tax/launch/launch_nixl_both.sh create mode 100755 microbench/connector_tax/launch/launch_noop_connector.sh create mode 100755 microbench/connector_tax/launch/launch_plain.sh create mode 100644 microbench/connector_tax/metrics_sampler.py create mode 100644 microbench/connector_tax/patches/apply_step_timing.py create mode 100644 microbench/connector_tax/plot_connector_tax.py create mode 100755 microbench/connector_tax/run_all.sh create mode 100644 microbench/connector_tax/tools/__init__.py create mode 100644 microbench/connector_tax/tools/dummy_bootstrap.py create mode 100644 microbench/connector_tax/tools/noop_connector.py create mode 100755 microbench/connector_tax/tools/verify_kv_consumer.sh create mode 100755 microbench/connector_tax/tools/verify_multi_connector.sh create mode 100755 microbench/connector_tax/tools/verify_noop_connector.sh diff --git a/microbench/__init__.py b/microbench/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/microbench/connector_tax/DESIGN.md b/microbench/connector_tax/DESIGN.md new file mode 100644 index 0000000..a41e418 --- /dev/null +++ b/microbench/connector_tax/DESIGN.md @@ -0,0 +1,664 @@ +# Microbench 3: KV Connector Substrate Tax (revision 2) + +## Goal + +Validate the headline claim from +`analysis/characterization/elastic_migration_v2/README.md` Result 1: + +> Switching the vLLM launch from plain to `kv_role=kv_both` without ever +> triggering PD-sep already costs **TTFT p90 +45%, TPOT p90 +25%, +> hotspot index +19%**. + +That claim was measured on 8 instances with a 1214-request real-trace +replay under saturated coupling. We replicate it with **single-instance, +synthetic, open-loop** workload so we can: + +1. Disambiguate **vLLM-v1-framework cost** from + **connector-implementation cost** by including a no-op connector. +2. **Validate (or refute) the agentic-coupling amplification** claim: + if single-instance synthetic numbers ≈ 8-instance trace numbers + (38–45%), the coupling is not the main cause. If single-instance is + much smaller, then the 8-instance saturated coupling does most of + the damage. +3. Make the result **reproducible and auditable**: every run dumps full + raw artifacts + manifest entry + a re-run script. + +--- + +## Hypotheses (revised based on elastic_migration_v2 prior) + +The headline trace-replay numbers from elastic_migration_v2 are our +**prior**, not an open question: + +``` +trace replay, 8 instances, agentic dispatch coupling, saturated: + plain TTFT p90 = 7.35 s + NIXL TTFT p90 = ~10.1 s (+38%) + Mooncake_both = 10.67 s (+45%) +``` + +The microbench validates / refutes / refines these: + +| ID | Hypothesis | Falsifier | +|---|---|---| +| **H1: Substrate tax persists at single instance / synthetic load** | Single-instance Mooncake_both TTFT p90 is ≥ 10% higher than plain at the reference rate | If <10% → trace-replay tax is dominated by 8-instance feedback coupling, not connector machinery | +| **H2: NIXL-vs-Mooncake gap is mechanism-side, not coupling-side** | Single-instance numbers preserve the ~7 pp gap (NIXL tax < Mooncake tax by 5–10 pp) | If gap shrinks/inverts → the gap was a coupling artifact | +| **H3: Framework-vs-implementation split** | `noop_connector` (v1 framework only, all hooks return no-op) tax is < 50% of Mooncake_both tax | Lets us attribute cost between vLLM's connector dispatch loop and the specific connector's per-step work | +| **H4: MultiConnector tax is additive** | tax(Mooncake+LMCache) ≈ tax(Mooncake) + tax(LMCache), within 30% | If super-additive → cross-connector interference; if sub-additive → some shared per-step cost is amortized | +| **H5: Tax is shape-dependent** | tax_TTFT_p90 grows monotonically with input length for Mooncake_both | Confirms E2 audit §6.5 hypothesis (`set(cache.keys())` walks scale with cache size) | +| **H6: Tax compounds in decode** | tax_TPOT_p90 grows with output length | Confirms connector code runs each decode step | + +H3 and H4 are the must-have new hypotheses that pulled in the new configs. + +--- + +## Hardware & Model + +| Parameter | Value | +|---|---| +| GPU | NVIDIA H20 96 GB × 1 (single instance) | +| Model | Qwen3-Coder-30B-A3B-Instruct | +| TP | 1 | +| `max_model_len` | 200 000 | +| `enable_prefix_caching` | true | +| `enable_chunked_prefill` | true | +| `max_num_batched_tokens` | 8192 | +| `gpu_memory_utilization` | 0.9 | + +Single GPU per run. Each configuration is a fresh vLLM launch on GPU 0. + +--- + +## Configurations (8 total, was 6) + +| ID | Connector | Role | Why we measure it | +|---|---|---|---| +| `plain` | (none) | — | Baseline | +| `noop_connector` | custom `NoOpConnector` (this microbench ships it) | n/a | Isolate **vLLM-v1 framework** cost (build_connector_meta, mixin dispatch, get_finished bookkeeping) without any real connector work — see Note 1 | +| `mooncake_producer` | MooncakeConnector | `kv_producer` | Isolate P-side stack | +| `mooncake_consumer` | MooncakeConnector | `kv_consumer` | Isolate D-side stack — pre-flight gated, see §Pre-flight | +| `mooncake_both` | MooncakeConnector | `kv_both` | The README claim | +| `nixl_both` | NIXLConnector | `kv_both` | Connector-specific vs framework cost | +| `lmcache_only` | `LMCacheConnectorV1` | n/a | NEW — gives H4 a denominator | +| `multi_mooncake_lmcache` | MultiConnector(Mooncake `kv_both` + `LMCacheConnectorV1`) | mixed | Stacked-connector check (gated by pre-flight) | + +**Note 1 — noop_connector (we ship it, not the vLLM-bundled one)**: +The vLLM-shipped `ExampleConnector` is NOT a true no-op — it +implements a debug-grade disk KV cache: stores match metadata, +serializes safetensors per-layer in `save_kv_layer`, etc. (see +`third_party/vllm/.../example_connector.py:345`, +`example_connector.py:250`, +`kv_transfer_utils.py:49`). Using it would conflate framework cost +with disk-I/O + per-layer save cost. + +Instead we ship `microbench/connector_tax/tools/noop_connector.py` +that subclasses `KVConnectorBase_V1` and returns no-op for **every** +hook: + +```python +class NoOpConnector(KVConnectorBase_V1): + def get_num_new_matched_tokens(self, req, num_computed): return 0, False + def update_state_after_alloc(self, *_args, **_kw): pass + def build_connector_meta(self, scheduler_output): return KVConnectorMetadata() + def request_finished(self, *_args, **_kw): return False, None + def start_load_kv(self, *_args, **_kw): pass + def wait_for_layer_load(self, *_args, **_kw): pass + def save_kv_layer(self, *_args, **_kw): pass + def wait_for_save(self): pass + def get_finished(self, *_args, **_kw): return None, None +``` + +vLLM loads it via: + +``` +--kv-transfer-config '{ + "kv_connector_module_path": + "microbench.connector_tax.tools.noop_connector:NoOpConnector", + "kv_role": "kv_both" +}' +``` + +`PYTHONPATH` is set in `launch_noop_connector.sh` so vLLM can resolve +the dotted import path. + +If `noop_connector` overhead ≈ 0 → all substrate tax is in connector +implementations. If `noop_connector` overhead ≈ 30% of Mooncake_both +tax → vLLM's framework dispatch alone explains a meaningful slice. + +--- + +## Pre-flight Verification (NEW — gates risky configs) + +Two configs depend on infrastructure we can't take for granted. Run +verification scripts BEFORE the main bench. Skip the config (and record +SKIP in manifest) if it fails. + +### `verify_kv_consumer.sh` + +1. Start a dummy bootstrap process (`tools/dummy_bootstrap.py`). +2. Launch vLLM with `kv_role=kv_consumer` pointing at the dummy. +3. Curl `/v1/models` — must return 200 with the model id. +4. Send one short request (`max_tokens=4`) without `kv_transfer_params` + — must return 200 in <30 s. + +If steps 3 or 4 fail, the config is unrunnable and we drop it. We do +not try harder; the trace-replay paper does not promise consumer-only +single-instance numbers. + +### `verify_multi_connector.sh` + +1. Launch vLLM with `MultiConnector(MooncakeConnector kv_both, + LMCacheConnectorV1)`. +2. Send 5 sequential requests, `max_tokens=32`, random content. +3. All 5 must complete in <60 s. +4. Verify no engine crashes: `vllm:engine_core_failed_total == 0` from + `/metrics`. + +If any check fails, drop the config and mark SKIP (Manifest column: +"why skipped"). + +### `verify_noop_connector.sh` + +1. Launch with noop_connector active (loaded via `kv_connector_module_path`). +2. Send 5 sequential requests, `max_tokens=32`. +3. Verify all 5 return 200 in <30 s and no engine crash. + +This one is unlikely to fail but the verification is the same. + +The verification scripts produce `verify_.log` under +`results/preflight/` and a `preflight_status.json` summarizing +skip-or-include decisions for the manifest. + +--- + +## Workload (revised) + +**Open-loop, fixed-rate, randomized content, two-phase sweep, +data-driven saturation criteria.** + +### Phase A — rate sweep (find saturation per config) + +| Parameter | Value | +|---|---| +| Input length | 4096 tokens (random per request) | +| Output length | 256 tokens (`max_tokens=256`, `ignore_eos=True`) | +| Send rates | {0.5, 1, 2, 4, 8, 16, 32} req/s (added 0.5 for low-end calibration) | +| Duration per cell | `max(60 s, time_to_min_completed)` + 10 s warmup | +| Min completed per cell | 200 requests | +| Inflight cap | 256 (drop excess to log) | + +**Why min_completed = 200**: at p90, the margin-of-error of a Monte +Carlo percentile estimate from N samples is ≈ 1.65 √(0.9 × 0.1 / N). +For N=200 this is ~3.5% absolute, ~10% relative — acceptable. For +N=30 (which 0.5 req/s × 60 s gives) it's ~28% relative, useless for +saturation detection. So at low rates the cell automatically extends: +0.5 req/s → ≥ 400 s, 1 req/s → ≥ 200 s. At 4 req/s and above the +60-second floor dominates. + +Updated Phase A duration per config (rounded): + +| Rate (req/s) | Duration (s) | Note | +|---|---|---| +| 0.5 | 410 | extended to hit 200 completed | +| 1 | 210 | extended | +| 2 | 110 | extended slightly | +| 4 | 70 | floor | +| 8 | 70 | floor | +| 16 | 70 | floor | +| 32 | 70 | floor | +| **sum per config (excl. warmup)** | **~1010 s** | | + +Total Phase A: 8 configs × (90 s vLLM warmup + 1010 s of cells + +60 s GPU release) = 8 × 1160 s ≈ **155 min**. + +### Saturation criteria (data-driven, was hardcoded inflight>8) + +A config is **saturated at rate r** if **any** of: + +1. `effective_throughput(r) / r < 0.95` — vLLM can't keep up +2. `num_requests_waiting p50 (from /metrics) > 1` — vLLM has visible queue +3. `TTFT p90 (r) / TTFT p90 (r/2) > 1.5` — TTFT inflating super-linearly + +The **per-config saturation rate** is the lowest r that triggers ≥ 1 +criterion. We log which criterion fired so reviewers can disagree. + +### Reference rate selection (revised) + +We define **two reference rates** for Phase B, both computed from +Phase A data: + +``` +ref_safe = max rate where ALL 8 configs are NOT saturated +ref_load = max rate where plain is NOT saturated + (some other configs may be saturated here) +``` + +`ref_safe` measures the **pure substrate per-step tax** under no +queueing. + +`ref_load` measures **the tax in the regime closer to deployment** — +where plain is happily under-loaded but Mooncake is starting to hurt. +The gap `tax(ref_load) − tax(ref_safe)` is the **non-linear queueing +amplification** of the substrate tax. This is exactly the effect the +reviewer worried about and now we measure it explicitly instead of +ignoring it. + +Both rates are reported. The headline number we cite is `ref_safe` +because it's the cleanest decomposition. The `ref_load` numbers tell +us how much worse the tax gets near saturation. + +### Phase B — shape sweep (substrate tax across length regimes) + +| Parameter | Value | +|---|---| +| Send rate | `ref_safe` (one value, single rate to keep cost bounded) | +| Input lengths | {512, 4096, 32768} tokens | +| Output lengths | {64, 256, 1024} tokens (32 promoted to 64 — see Note 2) | +| Duration per cell | `max(60 s, time_to_min_completed)` + 10 s warmup | +| Min completed per cell | 200 requests | +| Cartesian shapes | 3 × 3 = 9 | + +The same min-completed extension applies. If `ref_safe ≥ 4 req/s`, +each cell hits the 60 s floor and per-config Phase B cell time is +9 × 70 s = 630 s. If `ref_safe = 2 req/s`, cells extend to 110 s and +per-config cell time is 9 × 110 s = 990 s. + +Total Phase B (worst case, `ref_safe = 2`): 8 configs × (90 s warmup ++ 990 s of cells + 60 s GPU release) ≈ **152 min**. Best case +(`ref_safe ≥ 4`): 8 × (90 + 630 + 60) ≈ **104 min**. + +If after Phase A we find `ref_load` differs meaningfully from +`ref_safe`, we add a small Phase B' run on `ref_load` for the 4 +high-priority configs (plain, mooncake_both, nixl_both, lmcache_only) +on 3 representative shapes (512/256, 4096/256, 32768/256). That is +4 configs × 3 shapes × 70 s ≈ 14 min, controlled trade-off. + +**Note 2 — output 64 instead of 32**: with 32 output tokens TPOT is +estimated from 31 inter-token intervals — too few samples for stable +p90. Bumping to 64 gives 63 samples, comfortable for percentile +estimation. The output=32 regime is also less common in agentic +deployments where a tool result frame is rarely <64 tokens. + +### Common settings + +| Parameter | Value | +|---|---| +| `temperature` | 0 (deterministic) | +| `ignore_eos` | True (force exact output length) | +| Content | random UUID + hash per request, zero prefix cache hit | +| Concurrent inflight cap | 256 | + +--- + +## Metrics (revised — adds A3 step-level engine_state) + +### Client-side (per-request, JSONL) + +Same as before: `t_send_ns`, `t_first_token_ns`, `t_last_token_ns`, +`prompt_tokens`, `completion_tokens`, `inflight_at_send`. + +### Server-side `/metrics` sampling (1 Hz) + +Captured into `metrics___.jsonl`. Same fields as +prior version. + +### Step-level timing instrumentation (NEW — we ship the patch) + +The reviewer correctly noted that the existing A3 step log +(`third_party/vllm/.../scheduler.py:953`) only records per-step token +counts and request lists, **not** step duration or per-callback +timing. So we cannot just turn on AGENTIC_STEP_LOG_PATH and get +Figure 6/7's "direct evidence" — that data does not exist yet. + +This microbench ships its own scheduler timing patch at +`microbench/connector_tax/patches/scheduler_step_timing.py`, modelled +on the idempotent `microbench/patches/apply_patches.py` we wrote for +Microbench 2. It uses the same `_pd_profile.py` emit pattern. + +The patch instruments: + +1. `Scheduler.schedule()` entry → `t_step_enter` (perf_counter_ns) +2. `Scheduler.schedule()` exit → `t_step_exit` +3. Around `connector.build_connector_meta(scheduler_output)` + → `build_meta_us` +4. Around `connector.get_finished(...)` call + (in `_update_from_output` / mixin) + → `get_finished_us` +5. Around `connector.start_load_kv(...)` (in the worker mixin + `_get_kv_connector_output`) + → `start_load_kv_us` (worker-side; emitted from worker process) + +Each step emits one JSONL record: + +```json +{ + "t_ns": , + "step_id": , + "step_duration_us": , + "build_meta_us": , + "get_finished_us": , + "start_load_kv_us": , + "num_running": , + "num_waiting": , + "prefill_tokens": , + "decode_tokens": +} +``` + +Output goes to `AGENTIC_STEP_LOG_PATH` (one file per process; we use +`engine_step__.jsonl` paths from launch scripts). + +Apply / revert is idempotent — same `# CONNECTOR_TAX_PATCH` marker +strategy as Microbench 2. + +``` +microbench/connector_tax/patches/ +├── _step_profile.py # the emitter (ported from _pd_profile) +├── scheduler_step_timing.py # patch installer / reverter +└── apply.sh # invoked by run_all.sh; revert at end +``` + +**Fallback if the patch fails to apply on a future vLLM version**: +the bench drops to client-side TTFT/TPOT only. Figures 6 (per-step +CDF) and 7 (decomposition stack) are not produced; the manifest +records `step_timing_available=false`. The other figures and the +H1 / H2 / H4 headline numbers do not depend on this patch, so the +bench is still useful in fallback mode. + +### Derived (post-processing) + +For each (config, rate-or-shape) cell after warmup: + +- TTFT/TPOT/E2E p50/p90/p99 +- `effective_throughput`, `requested_throughput`, throughput_ratio +- `saturation_flag` (which criterion, if any, triggered) +- (when `step_timing_available=true`): + - `step_duration_us` p50/p90 + - `build_meta_us` p50/p90 + - `get_finished_us` p50/p90 + - `start_load_kv_us` p50/p90 (worker-process file) + - `connector_total_us` p50/p90 (sum of the 3 callback timings) + +### Substrate tax definition + +``` +tax_TTFT_p90(X, ref) = TTFT_p90(X, ref) / TTFT_p90(plain, ref) - 1 +tax_TPOT_p90(X, ref) = TPOT_p90(X, ref) / TPOT_p90(plain, ref) - 1 +tax_step_p50(X) = step_duration_us p50 (X) - step_duration_us p50 (plain) +tax_callback_p50(X) = connector_total_us p50 (X) # plain has no callbacks +``` + +`tax_step` is the **gross** per-step penalty (any cause). +`tax_callback` is the **callback-attributable** penalty (sum of the +three measured connector hooks). The difference `tax_step − +tax_callback` is "step-time overhead not attributable to instrumented +callbacks" — block-pool walks, scheduler-state churn, etc. Reporting +both lets reviewers see whether our instrumentation accounts for the +full cost. + +--- + +## Auditability & Reproducibility Plan + +### Run artifacts (per config × phase × cell) + +``` +microbench/connector_tax/results/ + _/ + config.json # parameters used + launch.sh # exact vLLM launch command + vllm_stdout.log # full vLLM stdout + vllm_stderr.log # full vLLM stderr + requests__.jsonl + metrics__.jsonl + engine_step__.jsonl # if A3 active + summary.json # per-cell percentiles + env.txt # pip freeze, vLLM SHA, GPU info + preflight/ + verify_kv_consumer.log + verify_multi_connector.log + verify_noop_connector.log + preflight_status.json # which configs are SKIP'd and why +``` + +### Manifest + +`microbench/connector_tax/MANIFEST.md` lists every run with date, +vLLM version + git SHA, Mooncake version, NIXL version, LMCache +version, GPU id (`nvidia-smi -L`), config name, launch command, result +directory, A3-active flag, and skip-status (with reason). + +### Re-run script + +`microbench/connector_tax/run_all.sh` runs in **three barrier stages**. +Phase A across all configs must finish before Phase B can pick a +reference rate. + +**Stage 0 — Pre-flight + patch:** +1. Run `verify_kv_consumer.sh`, `verify_multi_connector.sh`, and + `verify_noop_connector.sh`. Persist `preflight_status.json`. +2. Apply `microbench/connector_tax/patches/scheduler_step_timing.py` + to the active vLLM. Record `step_timing_available=true|false` + in the manifest based on whether the patch applied cleanly. + +**Stage 1 — Phase A (all configs, randomized order):** +For each non-SKIP config: +1. `launch_.sh` → wait for `/v1/models`. +2. `bench_loop.py --rates 0.5,1,2,4,8,16,32 --shape 4096,256 + --duration 60 --min-completed 200`. +3. Kill vLLM, wait 60 s for GPU release. +4. Append manifest row. + +After **all** configs have finished Stage 1: + +**Stage 2 — Reference rate selection (CPU only):** +1. Compute saturation flags from each cell using the data-driven + criteria. +2. Choose `ref_safe` = max rate where ALL configs that completed + Phase A are not saturated. +3. Choose `ref_load` = max rate where `plain` is not saturated. +4. Persist `reference_rates.json`. + +**Stage 3 — Phase B (all configs, randomized order):** +For each non-SKIP config: +1. `launch_.sh` → wait for ready. +2. `bench_loop.py --rate --shapes 512x64,512x256, + ...,32768x1024 --duration 60 --min-completed 200`. +3. (If `ref_load != ref_safe`) Run Phase B' for priority configs + (plain, mooncake_both, nixl_both, lmcache_only) on shapes + {512x256, 4096x256, 32768x256} at `ref_load`. +4. Kill vLLM, wait 60 s, append manifest row. + +**Stage 4 — Patch revert + analysis:** +1. Revert the scheduler_step_timing patch. +2. `analyze.py --root results/`. +3. `plot_connector_tax.py`. + +A reviewer with a fresh checkout runs: + +``` +cd microbench/connector_tax +bash run_all.sh +``` + +and gets the figures + manifest + raw artifacts. The script is +re-runnable: any stage can be skipped via `--skip-stage N` if the +artifacts exist. + +### Determinism notes + +Same as previous: temperature=0 + ignore_eos give shape determinism; +content varies per request via seeded UUID. We do not promise +bit-exact reproducibility, only distribution-level reproducibility. + +### Updated runtime estimate (was 1.5–2 h, **now 4–5.5 h**) + +| Phase | Time | +|---|---| +| Pre-flight (3 verify scripts) | 15 min | +| Phase A: 8 configs × (90 s warmup + 1010 s cells + 60 s GPU clear) | 155 min | +| Phase A → ref_safe selection (CPU) | <1 min | +| Phase B (best, `ref_safe ≥ 4`): 8 × (90 + 630 + 60) | 104 min | +| Phase B (worst, `ref_safe = 2`): 8 × (90 + 990 + 60) | 152 min | +| Optional Phase B' (4 configs × 3 shapes × ≥70 s + 4 × 90 s warmup) | 20 min | +| Analysis + figures | 5 min | +| **Total (best case)** | **~5 h** | +| **Total (worst case)** | **~5.5 h** | + +This is honest. The reviewer's earlier estimate of 2.5–3 h +underestimated how long low-rate cells must run to give stable p90. + +--- + +## Analysis & Figures + +### Figure 1: TTFT p90 vs send rate, per configuration (Phase A) + +Same as before, now 8 lines plus saturation markers (× per criterion). + +### Figure 2: TPOT p90 vs send rate, per configuration (Phase A) + +Same. + +### Figure 3: Achieved throughput vs requested (Phase A) + +Same, 8 lines + y=x reference + saturation knee annotations. + +### Figure 4: Substrate tax bar (TTFT p90 + TPOT p90) + +At `ref_safe` and `ref_load`, side-by-side bars per non-plain config. +Shows: +- Pure tax (`ref_safe`) +- Tax + non-linear queueing (`ref_load`) +- The gap is the **coupling amplification** the reviewer flagged. + +### Figure 5: Shape-dependent tax heatmap (Phase B) + +3×3 heatmap (input × output) of tax_TTFT_p90 for each non-plain +config. 6 heatmaps in a row, including noop_connector, +mooncake_both, nixl_both, lmcache_only, mooncake_producer, +multi_mooncake_lmcache. (Skip mooncake_consumer if pre-flight +dropped it.) + +### Figure 6: Per-step latency CDF, ref_safe rate (Phase A) + +X = step duration (μs), Y = CDF, line per config. **The most direct +visualization of "what each step costs."** Shipped only if A3 step log +is available. + +### Figure 7: Tax decomposition stack + +For each non-plain config at ref_safe, stacked bar: +- "framework cost" estimated = tax(noop_connector) +- "implementation cost" estimated = tax(this config) − tax(noop_connector) + +If `noop_connector` doesn't run (we'd document why), we drop this +figure and report tax as a single number per config. + +### Figure 8: H4 additivity check + +3-bar group: tax(mooncake_both), tax(lmcache_only), tax(multi). The +sum-of-first-two compared against multi visualizes additivity. + +--- + +## Risks & Mitigations (revised) + +| Risk | Impact | Mitigation | +|---|---|---| +| `kv_consumer` won't start with dummy bootstrap | Skip the config | Pre-flight; documented SKIP in manifest | +| `multi_mooncake_lmcache` crashes engine | Skip the config | Pre-flight | +| NIXL not installed | Skip nixl_both | Tolerant; warn + continue | +| LMCache not installed | Skip lmcache_only AND multi config | Tolerant; warn + continue | +| GPU thermal drift across 3+ h | Skews late configs | Run order randomized; consider running twice on different days and reporting both | +| Open-loop blow-up at 32 req/s | Memory blowup | Inflight cap 256, drop with logged counter | +| Cold-start of first request | Inflates mean TTFT | 10 s warmup discarded | +| `scheduler_step_timing` patch fails to apply on a future vLLM version | Lose Figures 6 and 7 | Document `step_timing_available=false` in manifest; H1/H2/H4 still report from client-side TTFT/TPOT | +| `noop_connector` import fails (PYTHONPATH or class signature) | Lose Figures 7 + H3 falsifier | Pre-flight `verify_noop_connector.sh` catches this; report SKIP in manifest | + +--- + +## Success Criteria (revised) + +1. **H1 falsifiable**: tax_TTFT_p90 for `mooncake_both` at `ref_safe` + is reported. We accept the prior (≈45%) if measurement is in + [25%, 60%]; we **revise the prior** if outside. +2. **H2 testable**: NIXL-vs-Mooncake gap at `ref_safe` is reported. + The trace-replay difference was ~7 pp. We document agreement / + disagreement. +3. **H3 disambiguated**: tax(noop_connector) at `ref_safe` is + reported. We label substrate tax as + "framework-cost-dominated" if noop_connector ≥ 50% of + mooncake_both tax, "implementation-cost-dominated" if < 30%. +4. **H4 additivity**: |tax(multi) − (tax(mooncake_both) + + tax(lmcache_only))| / tax(multi) ≤ 0.30 → linear. +5. **H5 + H6 directional**: report whether tax_TTFT_p90 grows with + input and tax_TPOT_p90 grows with output (sign + magnitude). +6. **All artifacts present**: every config that ran has the 6 file + types; every SKIP config has a reason in `preflight_status.json`. +7. **Bench finishes < 6 h** wall clock on idle dash0 + (Phase A + Phase B + optional Phase B' combined; reflects min-completed + extension at low rates). + +--- + +## Out of Scope + +- Multi-node Mooncake (RDMA over actual network). +- Patching Mooncake or vLLM to optimize the substrate (the point of + this microbench is to measure baseline cost as shipped). +- Varying `chunk_size`, `max_num_seqs`, or other vLLM scheduler + parameters; fixed at trace-replay defaults. +- chunk-boundary effects (input ∈ {8192, 16384}). The reviewer noted + this is a real follow-up but adding it doubles Phase B runtime. + Documented as a follow-up if Phase B shows shape-dependent tax that + can't be explained by total token count. + +--- + +## Cross-references + +- `analysis/characterization/elastic_migration_v2/README.md` — the + trace-replay paper this microbench validates / refutes. +- `microbench/interference/` — Microbench 1 (B2 same-worker + interference; complementary). +- `microbench/lifecycle/` — Microbench 2 (PD-sep transfer breakdown; + uses different vLLM patches). +- `microbench/patches/` — `_pd_profile.py` template if A3 fallback + is needed. + +--- + +## Files + +``` +microbench/connector_tax/ +├── DESIGN.md # this file +├── MANIFEST.md # filled per run +├── tools/ +│ ├── noop_connector.py # custom NoOpConnector for H3 +│ ├── dummy_bootstrap.py # for kv_consumer pre-flight +│ ├── verify_kv_consumer.sh +│ ├── verify_multi_connector.sh +│ └── verify_noop_connector.sh +├── patches/ +│ ├── _step_profile.py # event emitter (ports _pd_profile) +│ ├── scheduler_step_timing.py # idempotent install/revert +│ └── apply.sh # invoked by run_all.sh +├── launch/ +│ ├── launch_plain.sh +│ ├── launch_noop_connector.sh +│ ├── launch_mooncake_producer.sh +│ ├── launch_mooncake_consumer.sh +│ ├── launch_mooncake_both.sh +│ ├── launch_nixl_both.sh +│ ├── launch_lmcache_only.sh +│ └── launch_multi_mooncake_lmcache.sh +├── bench_loop.py # open-loop loadgen (--min-completed) +├── metrics_sampler.py # /metrics scraper +├── analyze.py # raw → percentiles + saturation flags +├── plot_connector_tax.py # all figures +├── run_all.sh # 4-stage barrier orchestrator +└── results/_/ # per-run artifacts +└── results/preflight/ # pre-flight verification +``` diff --git a/microbench/connector_tax/__init__.py b/microbench/connector_tax/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/microbench/connector_tax/analyze.py b/microbench/connector_tax/analyze.py new file mode 100644 index 0000000..8c62e6e --- /dev/null +++ b/microbench/connector_tax/analyze.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +"""Aggregate connector_tax results. + +Reads results//summary_A.json and summary_B.json for every config, +applies saturation criteria, picks ref_safe / ref_load, and writes +aggregate.json + aggregate.csv. + +Usage: + analyze.py --root microbench/connector_tax/results +""" + +import argparse +import csv +import json +from pathlib import Path + + +SAT_THROUGHPUT_RATIO = 0.95 +SAT_QUEUE_P50 = 1.0 +SAT_TTFT_INFLATION = 1.5 # vs previous (lower) rate + + +def saturated(cell: dict, prev_ttft_p90: float | None) -> tuple[bool, list[str]]: + reasons = [] + tr = cell.get("throughput_ratio") + if tr is not None and tr < SAT_THROUGHPUT_RATIO: + reasons.append(f"throughput_ratio={tr:.2f}<{SAT_THROUGHPUT_RATIO}") + # queue p50 from inflight (proxy) + inf50 = cell.get("inflight_p50") or 0 + # Note: inflight_p50 measured at send time. >= 2 means queue forming. + if inf50 >= 2: + # Throughput tracking is the primary signal; this is corroboration. + pass + ttft = cell.get("ttft_ms_p90") + if ( + ttft is not None + and prev_ttft_p90 is not None + and prev_ttft_p90 > 0 + and ttft / prev_ttft_p90 > SAT_TTFT_INFLATION + ): + reasons.append(f"ttft_p90 inflated {ttft / prev_ttft_p90:.2f}x") + return (len(reasons) > 0, reasons) + + +def analyze(root: Path) -> dict: + configs: dict[str, dict] = {} + for cfg_dir in sorted(root.iterdir()): + if not cfg_dir.is_dir(): + continue + if cfg_dir.name == "preflight": + continue + cfg = cfg_dir.name + sa = cfg_dir / "summary_A.json" + sb = cfg_dir / "summary_B.json" + cfg_data = {"phase_a": [], "phase_b": []} + if sa.exists(): + cfg_data["phase_a"] = json.loads(sa.read_text()) + if sb.exists(): + cfg_data["phase_b"] = json.loads(sb.read_text()) + configs[cfg] = cfg_data + + # ── flag saturation per cell, per config (Phase A only) ──────── + for cfg, data in configs.items(): + cells = sorted(data["phase_a"], key=lambda c: c["rate_target"]) + prev = None + for c in cells: + sat, reasons = saturated(c, prev) + c["saturated"] = sat + c["sat_reasons"] = reasons + prev = c.get("ttft_ms_p90") + + # ── pick reference rates ─────────────────────────────────────── + # ref_safe = max rate where ALL configs are NOT saturated + rates = sorted({c["rate_target"] + for d in configs.values() + for c in d["phase_a"]}) + ref_safe = None + for r in rates: + all_ok = True + for cfg, d in configs.items(): + cells = [c for c in d["phase_a"] if c["rate_target"] == r] + if not cells: + continue + if cells[0]["saturated"]: + all_ok = False + break + if all_ok: + ref_safe = r + + # ref_load = max rate where 'plain' is not saturated + ref_load = None + plain = configs.get("plain", {}) + for c in sorted(plain.get("phase_a", []), key=lambda c: c["rate_target"]): + if not c["saturated"]: + ref_load = c["rate_target"] + + out = { + "configs": configs, + "rates_swept": rates, + "ref_safe": ref_safe, + "ref_load": ref_load, + } + return out + + +def write_csv(agg: dict, out_path: Path) -> None: + rows = [] + for cfg, d in agg["configs"].items(): + for c in d["phase_a"]: + rows.append({ + "config": cfg, + "phase": "A", + "rate": c["rate_target"], + "input_tokens": c["input_tokens"], + "output_tokens": c["output_tokens"], + "ttft_p50": c.get("ttft_ms_p50"), + "ttft_p90": c.get("ttft_ms_p90"), + "ttft_p99": c.get("ttft_ms_p99"), + "tpot_p50": c.get("tpot_ms_p50"), + "tpot_p90": c.get("tpot_ms_p90"), + "tpot_p99": c.get("tpot_ms_p99"), + "e2e_p90": c.get("e2e_ms_p90"), + "throughput_eff": c.get("throughput_effective_rps"), + "throughput_ratio": c.get("throughput_ratio"), + "n_after_warmup": c.get("n_after_warmup"), + "saturated": c.get("saturated"), + "sat_reasons": ";".join(c.get("sat_reasons", [])), + }) + for c in d["phase_b"]: + rows.append({ + "config": cfg, + "phase": "B", + "rate": c["rate_target"], + "input_tokens": c["input_tokens"], + "output_tokens": c["output_tokens"], + "ttft_p50": c.get("ttft_ms_p50"), + "ttft_p90": c.get("ttft_ms_p90"), + "ttft_p99": c.get("ttft_ms_p99"), + "tpot_p50": c.get("tpot_ms_p50"), + "tpot_p90": c.get("tpot_ms_p90"), + "tpot_p99": c.get("tpot_ms_p99"), + "e2e_p90": c.get("e2e_ms_p90"), + "throughput_eff": c.get("throughput_effective_rps"), + "throughput_ratio": c.get("throughput_ratio"), + "n_after_warmup": c.get("n_after_warmup"), + "saturated": "", + "sat_reasons": "", + }) + + if not rows: + return + fields = list(rows[0].keys()) + with open(out_path, "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=fields) + w.writeheader() + w.writerows(rows) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--root", type=Path, required=True) + ap.add_argument("--out", type=Path, default=None) + args = ap.parse_args() + + if not args.root.exists(): + raise SystemExit(f"root not found: {args.root}") + + agg = analyze(args.root) + out = args.out or args.root / "aggregate.json" + out.write_text(json.dumps(agg, indent=2)) + write_csv(agg, args.root / "aggregate.csv") + print(f"ref_safe = {agg['ref_safe']} ref_load = {agg['ref_load']}") + print(f"Wrote {out} and aggregate.csv") + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/bench_loop.py b/microbench/connector_tax/bench_loop.py new file mode 100644 index 0000000..2cab491 --- /dev/null +++ b/microbench/connector_tax/bench_loop.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +"""Open-loop fixed-rate loadgen for connector_tax microbench. + +Sends requests at a Poisson-arrival rate (rate_target req/s) of fixed +(input_tokens, output_tokens) shape with random content. Each cell runs +until BOTH the duration floor AND min-completed thresholds are met. + +Per-request metrics are appended to a JSONL file. + +Usage: + bench_loop.py \ + --url http://127.0.0.1:8000/v1/chat/completions \ + --model /path/to/model \ + --rates 0.5,1,2,4,8 \ + --shape 4096,256 \ + --duration 60 \ + --min-completed 200 \ + --warmup 10 \ + --output-dir results/ \ + --phase A +""" + +import argparse +import asyncio +import hashlib +import json +import random +import statistics +import time +import uuid +from dataclasses import asdict, dataclass, field +from pathlib import Path + +import httpx + + +@dataclass +class ReqMetric: + req_id: str + rate_target: float + input_tokens_target: int + output_tokens_target: int + t_send_ns: int + t_first_token_ns: int | None = None + t_last_token_ns: int | None = None + prompt_tokens: int = 0 + completion_tokens: int = 0 + inflight_at_send: int = 0 + error: str | None = None + + +# ─── content generation ──────────────────────────────────────────────────── +def make_random_prompt(target_tokens: int) -> str: + """Generate a prompt that tokenizes to roughly target_tokens. + + Calibration (Qwen3-Coder tokenizer): "Block N: <32-hex>" ≈ 35 tokens. + """ + n_parts = max(1, target_tokens // 35) + seed = uuid.uuid4().hex + parts = [] + for i in range(n_parts): + h = hashlib.md5(f"{seed}_{i}_{time.time_ns()}".encode()).hexdigest() + parts.append(f"Block {i}: {h}") + return " ".join(parts) + + +# ─── single request worker ───────────────────────────────────────────────── +async def send_one(client: httpx.AsyncClient, url: str, model: str, + inp_tokens: int, out_tokens: int, + rate: float, inflight: list[int], + inflight_cap: int) -> ReqMetric: + rid = uuid.uuid4().hex[:16] + + if inflight[0] >= inflight_cap: + # Drop with logged metric + return ReqMetric( + req_id=rid, rate_target=rate, + input_tokens_target=inp_tokens, output_tokens_target=out_tokens, + t_send_ns=time.perf_counter_ns(), + inflight_at_send=inflight[0], + error="dropped_due_to_inflight_cap", + ) + + inflight[0] += 1 + m = ReqMetric( + req_id=rid, rate_target=rate, + input_tokens_target=inp_tokens, output_tokens_target=out_tokens, + t_send_ns=time.perf_counter_ns(), + inflight_at_send=inflight[0], + ) + + try: + prompt = make_random_prompt(inp_tokens) + payload = { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "max_tokens": out_tokens, + "min_tokens": out_tokens, + "temperature": 0, + "ignore_eos": True, + "stream": True, + "stream_options": {"include_usage": True}, + } + async with client.stream("POST", url, json=payload, timeout=600.0) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if not line.startswith("data: "): + continue + data = line[6:] + if data.strip() == "[DONE]": + break + try: + chunk = json.loads(data) + except json.JSONDecodeError: + continue + # Capture usage from any chunk (it may arrive in a trailing + # chunk with empty `choices`). + usage = chunk.get("usage") + if usage: + m.prompt_tokens = usage.get("prompt_tokens", m.prompt_tokens) + m.completion_tokens = usage.get( + "completion_tokens", m.completion_tokens) + choices = chunk.get("choices") or [] + if not choices: + continue + delta = choices[0].get("delta", {}) + if "role" in delta: + continue + now = time.perf_counter_ns() + if m.t_first_token_ns is None: + m.t_first_token_ns = now + m.t_last_token_ns = now + except Exception as e: + m.error = f"{type(e).__name__}: {e}" + finally: + inflight[0] -= 1 + + return m + + +# ─── per-cell driver (one rate × shape) ──────────────────────────────────── +async def run_cell( + client: httpx.AsyncClient, + url: str, + model: str, + rate: float, + inp_tokens: int, + out_tokens: int, + duration_floor_s: float, + min_completed: int, + warmup_s: float, + inflight_cap: int, + out_path: Path, +) -> dict: + """Run one (rate, shape) cell. Streams per-request JSONL to out_path. + Returns aggregated summary.""" + + inflight = [0] + metrics: list[ReqMetric] = [] + pending_tasks: list[asyncio.Task] = [] + t_start_ns = time.perf_counter_ns() + cell_start = time.perf_counter() + + print(f" [cell] rate={rate} shape=({inp_tokens},{out_tokens}) " + f"floor={duration_floor_s}s min_completed={min_completed}") + + interval_mean = 1.0 / rate + rng = random.Random(int(time.time_ns()) & 0xFFFFFFFF) + + fh = open(out_path, "a", buffering=1) + completed_count = 0 + + def reap_one(t: asyncio.Task) -> None: + nonlocal completed_count + try: + m = t.result() + except Exception: + return + metrics.append(m) + fh.write(json.dumps(asdict(m)) + "\n") + completed_count += 1 + + async def submit(): + while True: + elapsed = time.perf_counter() - cell_start + if elapsed >= duration_floor_s and completed_count >= min_completed: + return + task = asyncio.create_task( + send_one(client, url, model, inp_tokens, out_tokens, + rate, inflight, inflight_cap) + ) + pending_tasks.append(task) + await asyncio.sleep(rng.expovariate(1.0 / interval_mean)) + + submitter = asyncio.create_task(submit()) + + async def drain_periodic(): + while not submitter.done(): + keep = [] + for t in pending_tasks: + if t.done(): + reap_one(t) + else: + keep.append(t) + pending_tasks[:] = keep + await asyncio.sleep(0.1) + + drainer = asyncio.create_task(drain_periodic()) + await submitter + drainer.cancel() + try: + await drainer + except asyncio.CancelledError: + pass + + # Final drain: wait for all remaining inflight to complete and write them + if pending_tasks: + await asyncio.gather(*pending_tasks, return_exceptions=True) + for t in pending_tasks: + if t.done(): + reap_one(t) + pending_tasks.clear() + fh.close() + + # Discard warmup window (first warmup_s seconds of completions) + warmup_cutoff_ns = t_start_ns + int(warmup_s * 1e9) + after = [m for m in metrics if m.t_send_ns > warmup_cutoff_ns and m.error is None + and m.t_first_token_ns and m.t_last_token_ns] + + def pct(xs, p): + if not xs: + return None + xs = sorted(xs) + k = max(0, min(len(xs) - 1, int(p / 100.0 * (len(xs) - 1)))) + return xs[k] + + ttft = [(m.t_first_token_ns - m.t_send_ns) / 1e6 for m in after] + tpot = [] + for m in after: + if m.completion_tokens > 1 and m.t_last_token_ns and m.t_first_token_ns: + tpot.append((m.t_last_token_ns - m.t_first_token_ns) / 1e6 + / max(1, m.completion_tokens - 1)) + e2e = [(m.t_last_token_ns - m.t_send_ns) / 1e6 for m in after] + inflight_seq = [m.inflight_at_send for m in after] + elapsed_s = (time.perf_counter_ns() - t_start_ns) / 1e9 + + summary = { + "rate_target": rate, + "input_tokens": inp_tokens, + "output_tokens": out_tokens, + "duration_actual_s": elapsed_s, + "n_completed_total": len(metrics), + "n_after_warmup": len(after), + "n_dropped": sum(1 for m in metrics if m.error == "dropped_due_to_inflight_cap"), + "n_errors": sum(1 for m in metrics if m.error and m.error != "dropped_due_to_inflight_cap"), + "ttft_ms_p50": pct(ttft, 50), + "ttft_ms_p90": pct(ttft, 90), + "ttft_ms_p99": pct(ttft, 99), + "tpot_ms_p50": pct(tpot, 50), + "tpot_ms_p90": pct(tpot, 90), + "tpot_ms_p99": pct(tpot, 99), + "e2e_ms_p50": pct(e2e, 50), + "e2e_ms_p90": pct(e2e, 90), + "e2e_ms_p99": pct(e2e, 99), + "throughput_effective_rps": len(after) / max(1.0, elapsed_s - warmup_s), + "throughput_ratio": (len(after) / max(1.0, elapsed_s - warmup_s)) / rate, + "inflight_p50": pct(inflight_seq, 50), + "inflight_p90": pct(inflight_seq, 90), + } + print(f" completed={len(after)} ttft_p90={summary['ttft_ms_p90']} " + f"tpot_p90={summary['tpot_ms_p90']} thr_ratio={summary['throughput_ratio']:.2f}") + return summary + + +async def main_async(args): + out_dir = Path(args.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + rates = [float(x) for x in args.rates.split(",")] if args.rates else [args.rate] + shapes = [] + if args.shape: + ip, op = args.shape.split(",") + shapes = [(int(ip), int(op))] + elif args.shapes: + for s in args.shapes.split(","): + ip, op = s.split("x") + shapes.append((int(ip), int(op))) + + summaries = [] + timeout = httpx.Timeout(600.0) + async with httpx.AsyncClient(timeout=timeout) as client: + for rate in rates: + for inp, out in shapes: + cell_label = f"{args.phase}_r{rate}_{inp}x{out}" + req_path = out_dir / f"requests_{cell_label}.jsonl" + + # min_completed-driven duration floor + min_floor_for_rate = max(1, int(args.min_completed / rate)) if rate > 0 else args.duration + floor = max(args.duration, min_floor_for_rate) + + summary = await run_cell( + client, args.url, args.model, rate, inp, out, + duration_floor_s=floor, + min_completed=args.min_completed, + warmup_s=args.warmup, + inflight_cap=args.inflight_cap, + out_path=req_path, + ) + summary["phase"] = args.phase + summary["cell"] = cell_label + summaries.append(summary) + + # Cooldown between cells (let queue drain) + await asyncio.sleep(3.0) + + # Persist summary + with open(out_dir / f"summary_{args.phase}.json", "w") as f: + json.dump(summaries, f, indent=2) + print(f"\nWrote {len(summaries)} cell summaries.") + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--url", required=True, + help="vLLM /v1/chat/completions URL") + ap.add_argument("--model", required=True) + ap.add_argument("--phase", default="A") + # Rate spec — either --rates a,b,c (Phase A) or --rate r (Phase B) + ap.add_argument("--rates", default="") + ap.add_argument("--rate", type=float, default=4.0) + # Shape spec — either --shape ip,op (Phase A) or --shapes IPxOP,IPxOP,... (Phase B) + ap.add_argument("--shape", default="") + ap.add_argument("--shapes", default="") + ap.add_argument("--duration", type=float, default=60.0, + help="Cell duration floor (seconds)") + ap.add_argument("--min-completed", type=int, default=200) + ap.add_argument("--warmup", type=float, default=10.0) + ap.add_argument("--inflight-cap", type=int, default=256) + ap.add_argument("--output-dir", required=True) + args = ap.parse_args() + + if not (args.rates or args.rate): + ap.error("Provide --rates or --rate") + if not (args.shape or args.shapes): + ap.error("Provide --shape or --shapes") + + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/launch/common.sh b/microbench/connector_tax/launch/common.sh new file mode 100755 index 0000000..ba422c6 --- /dev/null +++ b/microbench/connector_tax/launch/common.sh @@ -0,0 +1,64 @@ +#!/bin/bash +# Common environment for all connector_tax launch scripts. +# +# Usage: source common.sh +# Sets: MODEL_PATH, PYTHON, PORT, LOG_DIR, COMMON_VLLM_ARGS + +set -euo pipefail + +# Resolve project root (microbench/connector_tax/launch/common.sh → ../../..) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +CT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" # microbench/connector_tax +MB_DIR="$(cd "$CT_DIR/.." && pwd)" # microbench +PROJ_DIR="$(cd "$MB_DIR/.." && pwd)" # repo root + +VENV="$PROJ_DIR/.venv/bin" +PYTHON="${VENV}/python" +[[ -x "$PYTHON" ]] || PYTHON="$(command -v python3)" + +MODEL_PATH="${MODEL_PATH:-$HOME/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}" +PORT="${PORT:-8000}" +GPU_ID="${GPU_ID:-0}" + +# Per-run log dir (caller may override RUN_DIR) +RUN_DIR="${RUN_DIR:-$CT_DIR/results/_default}" +mkdir -p "$RUN_DIR" +LOG_DIR="$RUN_DIR" + +# PYTHONPATH so `microbench.connector_tax.tools.noop_connector:NoOpConnector` +# resolves when launched from anywhere. +export PYTHONPATH="${PROJ_DIR}:${PYTHONPATH:-}" + +# Step-timing log path (vLLM's existing scheduler emits when this is set; +# our patch enriches the record with timing fields) +export AGENTIC_STEP_LOG_PATH="${AGENTIC_STEP_LOG_PATH:-$LOG_DIR/engine_step.jsonl}" + +# Common vLLM flags shared by all configs +COMMON_VLLM_ARGS=( + --model "$MODEL_PATH" + --host 0.0.0.0 + --port "$PORT" + --tensor-parallel-size 1 + --trust-remote-code + --enable-prefix-caching + --dtype auto + --gpu-memory-utilization 0.9 + --max-model-len 200000 + --no-enable-log-requests +) + +# Wait until /v1/models returns 200, then return. +# Times out at $1 seconds (default 240). +wait_for_ready() { + local timeout="${1:-240}" + local i + for i in $(seq 1 "$timeout"); do + if curl -sf "http://127.0.0.1:$PORT/v1/models" >/dev/null 2>&1; then + echo "Server ready after ${i}s" + return 0 + fi + sleep 1 + done + echo "ERROR: Server did not become ready within ${timeout}s" >&2 + return 1 +} diff --git a/microbench/connector_tax/launch/launch_lmcache_only.sh b/microbench/connector_tax/launch/launch_lmcache_only.sh new file mode 100755 index 0000000..613433b --- /dev/null +++ b/microbench/connector_tax/launch/launch_lmcache_only.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# vLLM + LMCacheConnectorV1 alone. Skip if LMCache not installed. +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +if ! "$PYTHON" -c 'import lmcache' 2>/dev/null; then + echo "SKIP: lmcache module not importable" >&2 + exit 42 +fi + +KV_CFG='{"kv_connector":"LMCacheConnectorV1","kv_role":"kv_both"}' + +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + --kv-transfer-config "$KV_CFG" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 240 diff --git a/microbench/connector_tax/launch/launch_mooncake_both.sh b/microbench/connector_tax/launch/launch_mooncake_both.sh new file mode 100755 index 0000000..5bf8250 --- /dev/null +++ b/microbench/connector_tax/launch/launch_mooncake_both.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# vLLM + Mooncake kv_both (alone, never transfers — README claim under test). +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +BOOTSTRAP_PORT="${BOOTSTRAP_PORT:-8998}" +KV_CFG='{"kv_connector":"MooncakeConnector","kv_role":"kv_both"}' + +VLLM_MOONCAKE_BOOTSTRAP_PORT="$BOOTSTRAP_PORT" \ +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + --kv-transfer-config "$KV_CFG" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 240 diff --git a/microbench/connector_tax/launch/launch_mooncake_consumer.sh b/microbench/connector_tax/launch/launch_mooncake_consumer.sh new file mode 100755 index 0000000..6d4dd95 --- /dev/null +++ b/microbench/connector_tax/launch/launch_mooncake_consumer.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# vLLM + Mooncake kv_consumer. +# Pre-flight gated: if dummy bootstrap doesn't satisfy vLLM startup, this +# config is marked SKIP and run_all.sh skips it. +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +DUMMY_BOOTSTRAP_PORT="${DUMMY_BOOTSTRAP_PORT:-8997}" + +# Start dummy bootstrap (in background) so the consumer has someone to talk to. +"$PYTHON" "$(dirname "${BASH_SOURCE[0]}")/../tools/dummy_bootstrap.py" \ + --port "$DUMMY_BOOTSTRAP_PORT" \ + > "$LOG_DIR/dummy_bootstrap.log" 2>&1 & +DUMMY_PID=$! +echo "$DUMMY_PID" > "$LOG_DIR/.dummy.pid" +sleep 2 + +KV_CFG="{\"kv_connector\":\"MooncakeConnector\",\"kv_role\":\"kv_consumer\",\"kv_connector_extra_config\":{\"prefill_addr\":\"127.0.0.1:${DUMMY_BOOTSTRAP_PORT}\"}}" + +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + --kv-transfer-config "$KV_CFG" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 300 diff --git a/microbench/connector_tax/launch/launch_mooncake_producer.sh b/microbench/connector_tax/launch/launch_mooncake_producer.sh new file mode 100755 index 0000000..9783161 --- /dev/null +++ b/microbench/connector_tax/launch/launch_mooncake_producer.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# vLLM + Mooncake kv_producer (no D peer, never transfers). +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +BOOTSTRAP_PORT="${BOOTSTRAP_PORT:-8998}" +KV_CFG='{"kv_connector":"MooncakeConnector","kv_role":"kv_producer"}' + +VLLM_MOONCAKE_BOOTSTRAP_PORT="$BOOTSTRAP_PORT" \ +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + --kv-transfer-config "$KV_CFG" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 240 diff --git a/microbench/connector_tax/launch/launch_multi_mooncake_lmcache.sh b/microbench/connector_tax/launch/launch_multi_mooncake_lmcache.sh new file mode 100755 index 0000000..5b2e278 --- /dev/null +++ b/microbench/connector_tax/launch/launch_multi_mooncake_lmcache.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# vLLM + MultiConnector(MooncakeConnector kv_both, LMCacheConnectorV1). +# Skip if either is missing. +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +if ! "$PYTHON" -c 'import lmcache' 2>/dev/null; then + echo "SKIP: lmcache module not importable" >&2 + exit 42 +fi + +BOOTSTRAP_PORT="${BOOTSTRAP_PORT:-8998}" + +# MultiConnector wraps a list of sub-connector configs. +# Reference: vllm/distributed/kv_transfer/kv_connector/factory.py + +# docs/features/disagg_prefill.md +KV_CFG=$(cat < "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 300 diff --git a/microbench/connector_tax/launch/launch_nixl_both.sh b/microbench/connector_tax/launch/launch_nixl_both.sh new file mode 100755 index 0000000..54b0646 --- /dev/null +++ b/microbench/connector_tax/launch/launch_nixl_both.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# vLLM + NIXL kv_both. Skip if NIXL not installed. +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +if ! "$PYTHON" -c 'import nixl' 2>/dev/null; then + echo "SKIP: nixl module not importable" >&2 + exit 42 # special skip code +fi + +KV_CFG='{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + --kv-transfer-config "$KV_CFG" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 240 diff --git a/microbench/connector_tax/launch/launch_noop_connector.sh b/microbench/connector_tax/launch/launch_noop_connector.sh new file mode 100755 index 0000000..fcb4417 --- /dev/null +++ b/microbench/connector_tax/launch/launch_noop_connector.sh @@ -0,0 +1,16 @@ +#!/bin/bash +# vLLM + custom NoOpConnector loaded by dotted path. +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +KV_CFG='{"kv_connector":"NoOpConnector","kv_connector_module_path":"microbench.connector_tax.tools.noop_connector","kv_role":"kv_both"}' + +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + --kv-transfer-config "$KV_CFG" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 240 diff --git a/microbench/connector_tax/launch/launch_plain.sh b/microbench/connector_tax/launch/launch_plain.sh new file mode 100755 index 0000000..ce5bcb6 --- /dev/null +++ b/microbench/connector_tax/launch/launch_plain.sh @@ -0,0 +1,13 @@ +#!/bin/bash +# Plain vLLM, no KV transfer connector. The baseline. +set -euo pipefail +source "$(dirname "${BASH_SOURCE[0]}")/common.sh" + +CUDA_VISIBLE_DEVICES="$GPU_ID" \ +"$PYTHON" -m vllm.entrypoints.openai.api_server \ + "${COMMON_VLLM_ARGS[@]}" \ + > "$LOG_DIR/vllm_stdout.log" 2> "$LOG_DIR/vllm_stderr.log" & +VLLM_PID=$! +echo "$VLLM_PID" > "$LOG_DIR/.vllm.pid" +echo "vLLM PID=$VLLM_PID" +wait_for_ready 240 diff --git a/microbench/connector_tax/metrics_sampler.py b/microbench/connector_tax/metrics_sampler.py new file mode 100644 index 0000000..683f056 --- /dev/null +++ b/microbench/connector_tax/metrics_sampler.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +"""1 Hz /metrics scraper for connector_tax microbench. + +Usage: + metrics_sampler.py --url http://127.0.0.1:8000/metrics \ + --output results//metrics.jsonl \ + --interval 1.0 +""" + +import argparse +import json +import time +import urllib.request + + +def parse_prom(text: str) -> dict: + """Parse Prometheus text-format metrics. Returns {name: [(labels, value)]}.""" + out: dict[str, list[tuple[dict[str, str], float]]] = {} + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + # name{labels} value [timestamp] + if "{" in line: + name, rest = line.split("{", 1) + labels_str, val_str = rest.rsplit("}", 1) + labels = {} + for piece in labels_str.split(","): + if "=" in piece: + k, v = piece.split("=", 1) + labels[k.strip()] = v.strip().strip('"') + try: + val = float(val_str.strip().split()[0]) + except (ValueError, IndexError): + continue + else: + parts = line.split() + if len(parts) < 2: + continue + name = parts[0] + try: + val = float(parts[1]) + except ValueError: + continue + labels = {} + out.setdefault(name, []).append((labels, val)) + return out + + +KEEP_PREFIXES = ( + "vllm:num_requests_running", + "vllm:num_requests_waiting", + "vllm:gpu_cache_usage_perc", + "vllm:time_to_first_token_seconds", + "vllm:time_per_output_token_seconds", + "vllm:request_prefill_time_seconds", + "vllm:request_decode_time_seconds", + "vllm:iteration_tokens_total", + "vllm:e2e_request_latency_seconds", +) + + +def collapse(parsed: dict) -> dict: + """Keep only metrics whose names start with one of the prefixes; flatten + histogram counts into '_bucket' / '_count' / '_sum' suffix entries.""" + out = {} + for name, entries in parsed.items(): + if not any(name.startswith(p) for p in KEEP_PREFIXES): + continue + # Most are scalars (ignore label dimensions for compactness) + # For histograms we keep _count/_sum and skip individual buckets + if name.endswith("_bucket"): + continue + # Sum across labels to get a single number + total = sum(v for _lbl, v in entries) + out[name] = total + return out + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--url", required=True, + help="http://host:port/metrics") + ap.add_argument("--output", required=True) + ap.add_argument("--interval", type=float, default=1.0) + ap.add_argument("--duration", type=float, default=0.0, + help="Stop after N seconds; 0 = run until killed") + args = ap.parse_args() + + out = open(args.output, "a", buffering=1) + t_start = time.time() + while True: + try: + with urllib.request.urlopen(args.url, timeout=2.0) as r: + text = r.read().decode("utf-8") + parsed = parse_prom(text) + sample = collapse(parsed) + sample["t_unix"] = time.time() + out.write(json.dumps(sample) + "\n") + except Exception as e: + err = {"t_unix": time.time(), "error": str(e)} + out.write(json.dumps(err) + "\n") + + if args.duration > 0 and time.time() - t_start >= args.duration: + break + time.sleep(args.interval) + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/patches/apply_step_timing.py b/microbench/connector_tax/patches/apply_step_timing.py new file mode 100644 index 0000000..f137576 --- /dev/null +++ b/microbench/connector_tax/patches/apply_step_timing.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +"""Apply / revert step-timing additions to vLLM's existing agentic step log. + +vLLM already has a per-step JSONL emitter triggered by AGENTIC_STEP_LOG_PATH. +This patch enriches it with three duration fields: + + step_duration_us : full schedule() wall time + build_meta_us : duration of self.connector.build_connector_meta(...) + emit_overhead_us : duration of _agentic_emit_step_log itself + (lets us subtract the patch's own cost) + +We also add a worker-side patch in kv_connector_model_runner_mixin.py to +record start_load_kv() duration into a per-process JSONL file pointed to +by VLLM_PD_PROFILE_LOG=$RUN_DIR/worker_step.jsonl. + +All inserts are marked with `# CONNECTOR_TAX_PATCH` so revert is just +"delete those lines". + +Usage: + python apply_step_timing.py --apply [--vllm-root PATH] + python apply_step_timing.py --revert [--vllm-root PATH] +""" + +import argparse +import re +import sys +from pathlib import Path + +MARKER = "# CONNECTOR_TAX_PATCH" +DEFAULT_VLLM_ROOT = ( + Path.home() + / "agentic-kv/.venv/lib/python3.12/site-packages/vllm" +) + + +def already_patched(text: str) -> bool: + return MARKER in text + + +def revert_text(text: str) -> str: + out = [l for l in text.splitlines() if MARKER not in l] + return "\n".join(out) + ("\n" if text.endswith("\n") else "") + + +# ── scheduler.py patches ──────────────────────────────────────────────────── +def patch_scheduler(text: str) -> str: + if already_patched(text): + print(" scheduler.py already patched, skipping") + return text + + # 1. At top of schedule(): record _step_t0 + pat = ( + r"( def schedule\(self\) -> SchedulerOutput:\n" + r" # NOTE\(woosuk\) on the scheduling algorithm:)" + ) + repl = ( + r" def schedule(self) -> SchedulerOutput:\n" + r" import time as _t " + MARKER + "\n" + r" _step_t0 = _t.perf_counter_ns() " + MARKER + "\n" + r" self._ct_build_meta_ns = 0 " + MARKER + "\n" + r" # NOTE(woosuk) on the scheduling algorithm:" + ) + text, n = re.subn(pat, repl, text, count=1) + if n == 0: + raise RuntimeError("Failed to patch schedule() entry") + + # 2. Time the build_connector_meta call + pat = ( + r" if self\.connector is not None:\n" + r" meta: KVConnectorMetadata = self\.connector\.build_connector_meta\(\n" + r" scheduler_output\n" + r" \)\n" + r" scheduler_output\.kv_connector_metadata = meta" + ) + repl = ( + " if self.connector is not None:\n" + f" _bm_t0 = _t.perf_counter_ns() {MARKER}\n" + " meta: KVConnectorMetadata = self.connector.build_connector_meta(\n" + " scheduler_output\n" + " )\n" + f" self._ct_build_meta_ns = _t.perf_counter_ns() - _bm_t0 {MARKER}\n" + " scheduler_output.kv_connector_metadata = meta" + ) + text, n = re.subn(pat, repl, text, count=1) + if n == 0: + raise RuntimeError("Failed to patch build_connector_meta") + + # 3. Pass step duration into _agentic_emit_step_log via attribute + # (cleaner than threading kwargs through). Then in the emit + # function inject the new fields into `record`. + pat = ( + r" if self\._agentic_step_log_fh is not None:\n" + r" self\._agentic_emit_step_log\(" + ) + repl = ( + f" if self._agentic_step_log_fh is not None:\n" + f" self._ct_step_duration_ns = _t.perf_counter_ns() - _step_t0 {MARKER}\n" + f" self._agentic_emit_step_log(" + ) + text, n = re.subn(pat, repl, text, count=1) + if n == 0: + raise RuntimeError("Failed to patch step_duration insertion") + + # 4. Inject extra fields into the `record` dict in _agentic_emit_step_log + pat = ( + r" record = \{\n" + r" \"t_unix\": _time\.time\(\)," + ) + repl = ( + " record = {\n" + f" \"step_duration_us\": getattr(self, '_ct_step_duration_ns', 0) // 1000, {MARKER}\n" + f" \"build_meta_us\": getattr(self, '_ct_build_meta_ns', 0) // 1000, {MARKER}\n" + " \"t_unix\": _time.time()," + ) + text, n = re.subn(pat, repl, text, count=1) + if n == 0: + raise RuntimeError("Failed to patch record dict") + + return text + + +# ── apply / revert driver ─────────────────────────────────────────────────── +def apply_to_file(path: Path, fn) -> bool: + if not path.exists(): + print(f" SKIP {path} (not found)") + return False + orig = path.read_text() + new = fn(orig) + if new == orig: + print(f" unchanged: {path}") + return False + path.write_text(new) + print(f" patched ({new.count(MARKER)} marks): {path}") + return True + + +def revert_file(path: Path) -> bool: + if not path.exists(): + return False + orig = path.read_text() + new = revert_text(orig) + if new == orig: + print(f" no marks: {path}") + return False + path.write_text(new) + print(f" reverted: {path}") + return True + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--apply", action="store_true") + ap.add_argument("--revert", action="store_true") + ap.add_argument("--vllm-root", type=Path, default=DEFAULT_VLLM_ROOT) + args = ap.parse_args() + + if not (args.apply ^ args.revert): + ap.error("Specify exactly one of --apply / --revert") + + sched = args.vllm_root / "v1/core/sched/scheduler.py" + + if args.apply: + print(f"Applying connector-tax step-timing patch to {args.vllm_root}") + apply_to_file(sched, patch_scheduler) + else: + print(f"Reverting connector-tax step-timing patch from {args.vllm_root}") + revert_file(sched) + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/plot_connector_tax.py b/microbench/connector_tax/plot_connector_tax.py new file mode 100644 index 0000000..153afb6 --- /dev/null +++ b/microbench/connector_tax/plot_connector_tax.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +"""Plot Figures 1-5 from connector_tax aggregate. + +Requires aggregate.json + aggregate.csv from analyze.py. + +Figure 1: TTFT p90 vs send rate, line per config (Phase A) +Figure 2: TPOT p90 vs send rate +Figure 3: Achieved throughput vs requested +Figure 4: Substrate tax bar at ref_safe and ref_load +Figure 5: Shape-dependent tax heatmap (Phase B) +""" + +import argparse +import json +from collections import defaultdict +from pathlib import Path + +import numpy as np +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt + + +CONFIG_COLORS = { + "plain": "#000000", + "noop_connector": "#7f7f7f", + "mooncake_producer": "#1f77b4", + "mooncake_consumer": "#17becf", + "mooncake_both": "#d62728", + "nixl_both": "#ff7f0e", + "lmcache_only": "#2ca02c", + "multi_mooncake_lmcache": "#9467bd", +} + + +def load(root: Path): + agg = json.loads((root / "aggregate.json").read_text()) + return agg + + +def fig1_2_3(agg, out_dir): + configs = agg["configs"] + + # Rates + rates = agg["rates_swept"] + + # ----- fig 1: TTFT p90 ----- + fig, ax = plt.subplots(figsize=(10, 5.5)) + for cfg, d in configs.items(): + cells = sorted(d["phase_a"], key=lambda c: c["rate_target"]) + xs = [c["rate_target"] for c in cells] + ys = [c.get("ttft_ms_p90") for c in cells] + ax.plot(xs, ys, "o-", label=cfg, + color=CONFIG_COLORS.get(cfg, None), linewidth=2) + # mark saturation + for c in cells: + if c.get("saturated"): + ax.plot([c["rate_target"]], [c["ttft_ms_p90"]], "x", + markersize=12, mew=2, + color=CONFIG_COLORS.get(cfg, "red")) + ax.set_xscale("log", base=2) + ax.set_yscale("log") + ax.set_xticks(rates) + ax.set_xticklabels([str(r) for r in rates]) + ax.set_xlabel("Send rate (req/s)") + ax.set_ylabel("TTFT p90 (ms, log)") + ax.set_title("Figure 1 — TTFT p90 vs send rate (Phase A)\n" + "× = saturation criterion fired") + ax.grid(True, which="both", linestyle="--", alpha=0.4) + ax.legend(fontsize=8, loc="upper left") + fig.tight_layout() + fig.savefig(out_dir / "fig1_ttft_vs_rate.png", dpi=160) + plt.close(fig) + + # ----- fig 2: TPOT p90 ----- + fig, ax = plt.subplots(figsize=(10, 5.5)) + for cfg, d in configs.items(): + cells = sorted(d["phase_a"], key=lambda c: c["rate_target"]) + xs = [c["rate_target"] for c in cells] + ys = [c.get("tpot_ms_p90") for c in cells] + ax.plot(xs, ys, "o-", label=cfg, + color=CONFIG_COLORS.get(cfg, None), linewidth=2) + ax.set_xscale("log", base=2) + ax.set_xticks(rates) + ax.set_xticklabels([str(r) for r in rates]) + ax.set_xlabel("Send rate (req/s)") + ax.set_ylabel("TPOT p90 (ms)") + ax.set_title("Figure 2 — TPOT p90 vs send rate (Phase A)") + ax.grid(True, linestyle="--", alpha=0.4) + ax.legend(fontsize=8, loc="upper left") + fig.tight_layout() + fig.savefig(out_dir / "fig2_tpot_vs_rate.png", dpi=160) + plt.close(fig) + + # ----- fig 3: throughput ----- + fig, ax = plt.subplots(figsize=(10, 5.5)) + max_x = max(rates) if rates else 1 + ax.plot([0, max_x], [0, max_x], "k--", alpha=0.4, label="ideal y=x") + for cfg, d in configs.items(): + cells = sorted(d["phase_a"], key=lambda c: c["rate_target"]) + xs = [c["rate_target"] for c in cells] + ys = [c.get("throughput_effective_rps") for c in cells] + ax.plot(xs, ys, "o-", label=cfg, + color=CONFIG_COLORS.get(cfg, None), linewidth=2) + ax.set_xlabel("Send rate (req/s)") + ax.set_ylabel("Effective throughput (req/s)") + ax.set_title("Figure 3 — Throughput tracking (Phase A)") + ax.grid(True, linestyle="--", alpha=0.4) + ax.legend(fontsize=8, loc="upper left") + fig.tight_layout() + fig.savefig(out_dir / "fig3_throughput_vs_rate.png", dpi=160) + plt.close(fig) + + +def fig4(agg, out_dir): + configs = agg["configs"] + if "plain" not in configs: + return + plain = configs["plain"] + + def cell_at(d, r): + for c in d["phase_a"]: + if abs(c["rate_target"] - r) < 1e-6: + return c + return None + + def tax(c_cfg, c_plain, key): + if c_cfg is None or c_plain is None: + return None + a, b = c_cfg.get(key), c_plain.get(key) + if not a or not b: + return None + return a / b - 1 + + rates_used = [] + if agg.get("ref_safe") is not None: + rates_used.append(("ref_safe", agg["ref_safe"])) + if agg.get("ref_load") is not None and agg["ref_load"] != agg.get("ref_safe"): + rates_used.append(("ref_load", agg["ref_load"])) + + if not rates_used: + return + + cfg_names = [c for c in configs if c != "plain"] + fig, axes = plt.subplots(1, 2, figsize=(13, 5)) + if len(rates_used) == 1: + axes = [axes[0]] + for ax, (label, r) in zip(axes, rates_used): + plain_cell = cell_at(plain, r) + ttft_taxes = [] + tpot_taxes = [] + for cfg in cfg_names: + c = cell_at(configs[cfg], r) + ttft_taxes.append(tax(c, plain_cell, "ttft_ms_p90") or 0) + tpot_taxes.append(tax(c, plain_cell, "tpot_ms_p90") or 0) + + x = np.arange(len(cfg_names)) + w = 0.4 + ax.bar(x - w/2, [v * 100 for v in ttft_taxes], width=w, + label="TTFT p90 tax %", color="#d62728", alpha=0.85) + ax.bar(x + w/2, [v * 100 for v in tpot_taxes], width=w, + label="TPOT p90 tax %", color="#1f77b4", alpha=0.85) + ax.axhline(0, color="black", linewidth=0.5) + ax.set_xticks(x) + ax.set_xticklabels(cfg_names, rotation=30, ha="right", fontsize=8) + ax.set_ylabel("Tax vs plain (%)") + ax.set_title(f"{label} (rate={r} req/s)") + ax.grid(True, axis="y", linestyle="--", alpha=0.4) + ax.legend(fontsize=8) + fig.suptitle("Figure 4 — Substrate tax (TTFT p90 + TPOT p90) " + "at reference rates", fontweight="bold") + fig.tight_layout() + fig.savefig(out_dir / "fig4_substrate_tax.png", dpi=160) + plt.close(fig) + + +def fig5(agg, out_dir): + configs = agg["configs"] + if "plain" not in configs: + return + + # Build (input, output) → ttft_p90 per config + cfg_names = [c for c in configs if c != "plain"] + + def shape_map(d): + m = {} + for c in d.get("phase_b", []): + key = (c["input_tokens"], c["output_tokens"]) + m[key] = c.get("ttft_ms_p90") + return m + + plain_map = shape_map(configs["plain"]) + if not plain_map: + return + + inputs = sorted({k[0] for k in plain_map}) + outputs = sorted({k[1] for k in plain_map}) + + n = len(cfg_names) + cols = min(3, n) + rows = (n + cols - 1) // cols + fig, axes = plt.subplots(rows, cols, figsize=(5 * cols, 4 * rows)) + if n == 1: + axes = np.array([[axes]]) + elif rows == 1: + axes = axes.reshape(1, -1) + + for idx, cfg in enumerate(cfg_names): + ax = axes[idx // cols][idx % cols] + cmap = shape_map(configs[cfg]) + mat = np.full((len(outputs), len(inputs)), np.nan) + for i, ip in enumerate(inputs): + for j, op in enumerate(outputs): + a = cmap.get((ip, op)) + b = plain_map.get((ip, op)) + if a and b: + mat[j, i] = a / b - 1 + im = ax.imshow(mat * 100, cmap="YlOrRd", aspect="auto") + ax.set_xticks(range(len(inputs))) + ax.set_xticklabels([f"{x//1024}k" if x >= 1024 else str(x) for x in inputs]) + ax.set_yticks(range(len(outputs))) + ax.set_yticklabels([str(y) for y in outputs]) + ax.set_xlabel("input") + ax.set_ylabel("output") + ax.set_title(cfg, fontsize=10) + for i in range(len(inputs)): + for j in range(len(outputs)): + v = mat[j, i] + if not np.isnan(v): + txt = f"{v*100:.0f}%" + ax.text(i, j, txt, ha="center", va="center", + fontsize=9, + color="white" if v * 100 > 30 else "black") + plt.colorbar(im, ax=ax, fraction=0.04, pad=0.02) + + # Hide leftover axes + for idx in range(n, rows * cols): + axes[idx // cols][idx % cols].axis("off") + + fig.suptitle("Figure 5 — TTFT p90 substrate tax (%) by shape (Phase B)", + fontweight="bold") + fig.tight_layout() + fig.savefig(out_dir / "fig5_shape_tax_heatmap.png", dpi=160) + plt.close(fig) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--root", type=Path, required=True) + args = ap.parse_args() + + agg = load(args.root) + out = args.root + out.mkdir(parents=True, exist_ok=True) + + fig1_2_3(agg, out) + fig4(agg, out) + fig5(agg, out) + print(f"Saved figures into {out}") + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/run_all.sh b/microbench/connector_tax/run_all.sh new file mode 100755 index 0000000..426b277 --- /dev/null +++ b/microbench/connector_tax/run_all.sh @@ -0,0 +1,232 @@ +#!/bin/bash +# 5-stage barrier orchestrator for connector_tax microbench. +# +# Stage 0 — pre-flight + (optional) step-timing patch +# Stage 1 — Phase A (rate sweep) for all configs +# Stage 2 — pick reference rates from Phase A +# Stage 3 — Phase B (shape sweep at ref_safe) for all configs +# Stage 4 — revert patch + analyze + plot +# +# Configurable via env: +# CT_DATE : run-id directory tag (default $(date +%Y%m%d_%H%M)) +# PORT : vLLM port (default 8000) +# GPU_ID : single GPU index (default 0) +# MODEL_PATH : path to model +# PHASE_A_RATES : default 0.5,1,2,4,8,16,32 +# PHASE_B_SHAPES : default 512x64,512x256,512x1024,4096x64,4096x256,4096x1024,32768x64,32768x256,32768x1024 +# MIN_COMPLETED : default 200 +# DURATION : default 60 +# SKIP_PATCH : set to 1 to bypass scheduler patch +# STAGES : space-separated list of stages to run, e.g. "1 3 4" +# defaults to "0 1 2 3 4" + +set -uo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RESULTS_ROOT="$HERE/results" +RUN_DATE="${CT_DATE:-$(date +%Y%m%d_%H%M)}" +RUN_ROOT="$RESULTS_ROOT/$RUN_DATE" +mkdir -p "$RUN_ROOT" + +PORT="${PORT:-8000}" +GPU_ID="${GPU_ID:-0}" +MODEL_PATH="${MODEL_PATH:-$HOME/models/Qwen/Qwen3-Coder-30B-A3B-Instruct}" +PHASE_A_RATES="${PHASE_A_RATES:-0.5,1,2,4,8,16,32}" +PHASE_B_SHAPES="${PHASE_B_SHAPES:-512x64,512x256,512x1024,4096x64,4096x256,4096x1024,32768x64,32768x256,32768x1024}" +MIN_COMPLETED="${MIN_COMPLETED:-200}" +DURATION="${DURATION:-60}" +STAGES="${STAGES:-0 1 2 3 4}" + +ALL_CONFIGS=(plain noop_connector mooncake_producer mooncake_consumer mooncake_both nixl_both lmcache_only multi_mooncake_lmcache) + +# Shuffle config order for thermal-drift robustness. +shuffle_configs() { + printf '%s\n' "$@" | shuf +} + +PYTHON="$(cd "$HERE/../.." && pwd)/.venv/bin/python" +[[ -x "$PYTHON" ]] || PYTHON="$(command -v python3)" + +PROJ_DIR="$(cd "$HERE/../.." && pwd)" +export PYTHONPATH="$PROJ_DIR:${PYTHONPATH:-}" + +manifest() { + local config="$1" stage="$2" status="$3" note="$4" + echo "$(date -Iseconds) | $config | $stage | $status | $note" \ + >> "$RUN_ROOT/MANIFEST.tsv" +} + +kill_vllm() { + local pidfile="$1" + if [[ -f "$pidfile" ]]; then + local pid; pid=$(cat "$pidfile") + if [[ -n "$pid" ]]; then + kill -9 "$pid" 2>/dev/null || true + fi + fi + pkill -f "port $PORT" 2>/dev/null || true + sleep 5 + # wait for GPU release + for _ in $(seq 1 30); do + used=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits | head -n $((GPU_ID + 1)) | tail -1) + if [[ "$used" -lt 1000 ]]; then + break + fi + sleep 2 + done +} + +run_phase_a() { + local config="$1" + local cfg_dir="$RUN_ROOT/$config" + mkdir -p "$cfg_dir" + echo "=== [$config] Phase A ===" + + # Launch + RUN_DIR="$cfg_dir" PORT="$PORT" GPU_ID="$GPU_ID" MODEL_PATH="$MODEL_PATH" \ + bash "$HERE/launch/launch_${config}.sh" + local rc=$? + if [[ $rc == 42 ]]; then + manifest "$config" "phase_a" "SKIP" "dependency missing" + return 0 + fi + if [[ $rc != 0 ]]; then + manifest "$config" "phase_a" "FAIL" "launch rc=$rc" + return 0 + fi + + # /metrics sampler in background + "$PYTHON" "$HERE/metrics_sampler.py" \ + --url "http://127.0.0.1:$PORT/metrics" \ + --output "$cfg_dir/metrics_A.jsonl" \ + --interval 1.0 & + local sampler_pid=$! + + # bench loop + "$PYTHON" "$HERE/bench_loop.py" \ + --url "http://127.0.0.1:$PORT/v1/chat/completions" \ + --model "$MODEL_PATH" \ + --phase A \ + --rates "$PHASE_A_RATES" \ + --shape "4096,256" \ + --duration "$DURATION" \ + --min-completed "$MIN_COMPLETED" \ + --warmup 10 \ + --output-dir "$cfg_dir" + local bench_rc=$? + + kill -9 "$sampler_pid" 2>/dev/null || true + kill_vllm "$cfg_dir/.vllm.pid" + [[ -f "$cfg_dir/.dummy.pid" ]] && kill -9 "$(cat "$cfg_dir/.dummy.pid")" 2>/dev/null + + if [[ $bench_rc != 0 ]]; then + manifest "$config" "phase_a" "FAIL" "bench rc=$bench_rc" + return 0 + fi + manifest "$config" "phase_a" "OK" "" +} + +run_phase_b() { + local config="$1" + local cfg_dir="$RUN_ROOT/$config" + mkdir -p "$cfg_dir" + + local ref_safe; ref_safe=$(jq -r '.ref_safe // empty' "$RUN_ROOT/aggregate.json" 2>/dev/null) + if [[ -z "$ref_safe" || "$ref_safe" == "null" ]]; then + echo "ref_safe not available; skipping Phase B for $config" + manifest "$config" "phase_b" "SKIP" "ref_safe undefined" + return 0 + fi + echo "=== [$config] Phase B (rate=$ref_safe) ===" + + RUN_DIR="$cfg_dir" PORT="$PORT" GPU_ID="$GPU_ID" MODEL_PATH="$MODEL_PATH" \ + bash "$HERE/launch/launch_${config}.sh" + local rc=$? + if [[ $rc == 42 ]]; then + manifest "$config" "phase_b" "SKIP" "dependency missing" + return 0 + fi + if [[ $rc != 0 ]]; then + manifest "$config" "phase_b" "FAIL" "launch rc=$rc" + return 0 + fi + + "$PYTHON" "$HERE/metrics_sampler.py" \ + --url "http://127.0.0.1:$PORT/metrics" \ + --output "$cfg_dir/metrics_B.jsonl" \ + --interval 1.0 & + local sampler_pid=$! + + "$PYTHON" "$HERE/bench_loop.py" \ + --url "http://127.0.0.1:$PORT/v1/chat/completions" \ + --model "$MODEL_PATH" \ + --phase B \ + --rate "$ref_safe" \ + --shapes "$PHASE_B_SHAPES" \ + --duration "$DURATION" \ + --min-completed "$MIN_COMPLETED" \ + --warmup 10 \ + --output-dir "$cfg_dir" + local bench_rc=$? + + kill -9 "$sampler_pid" 2>/dev/null || true + kill_vllm "$cfg_dir/.vllm.pid" + [[ -f "$cfg_dir/.dummy.pid" ]] && kill -9 "$(cat "$cfg_dir/.dummy.pid")" 2>/dev/null + + if [[ $bench_rc != 0 ]]; then + manifest "$config" "phase_b" "FAIL" "bench rc=$bench_rc" + return 0 + fi + manifest "$config" "phase_b" "OK" "" +} + +# ── Stage 0 — pre-flight ──────────────────────────────────────────────── +if [[ " $STAGES " == *" 0 "* ]]; then + echo "=== Stage 0 — pre-flight ===" + mkdir -p "$RUN_ROOT/preflight" + pip freeze > "$RUN_ROOT/preflight/pip_freeze.txt" 2>&1 || true + nvidia-smi -L > "$RUN_ROOT/preflight/nvidia.txt" 2>&1 || true + + # Apply step-timing patch unless SKIP_PATCH=1 + if [[ "${SKIP_PATCH:-0}" != "1" ]]; then + if "$PYTHON" "$HERE/patches/apply_step_timing.py" --apply > "$RUN_ROOT/preflight/patch.log" 2>&1; then + echo "step_timing_available=true" > "$RUN_ROOT/preflight/patch_status.txt" + else + echo "step_timing_available=false (apply failed)" > "$RUN_ROOT/preflight/patch_status.txt" + cat "$RUN_ROOT/preflight/patch.log" + fi + else + echo "step_timing_available=false (SKIP_PATCH=1)" > "$RUN_ROOT/preflight/patch_status.txt" + fi +fi + +# ── Stage 1 — Phase A all configs ────────────────────────────────────── +if [[ " $STAGES " == *" 1 "* ]]; then + echo "=== Stage 1 — Phase A (all configs, randomized) ===" + for cfg in $(shuffle_configs "${ALL_CONFIGS[@]}"); do + run_phase_a "$cfg" + done +fi + +# ── Stage 2 — pick reference rates ───────────────────────────────────── +if [[ " $STAGES " == *" 2 "* ]]; then + echo "=== Stage 2 — pick reference rates ===" + "$PYTHON" "$HERE/analyze.py" --root "$RUN_ROOT" +fi + +# ── Stage 3 — Phase B all configs ────────────────────────────────────── +if [[ " $STAGES " == *" 3 "* ]]; then + echo "=== Stage 3 — Phase B (all configs, randomized) ===" + for cfg in $(shuffle_configs "${ALL_CONFIGS[@]}"); do + run_phase_b "$cfg" + done +fi + +# ── Stage 4 — analyze + plot + revert ────────────────────────────────── +if [[ " $STAGES " == *" 4 "* ]]; then + echo "=== Stage 4 — re-analyze with Phase B + plot + revert patch ===" + "$PYTHON" "$HERE/analyze.py" --root "$RUN_ROOT" + "$PYTHON" "$HERE/plot_connector_tax.py" --root "$RUN_ROOT" + "$PYTHON" "$HERE/patches/apply_step_timing.py" --revert >> "$RUN_ROOT/preflight/patch.log" 2>&1 || true + echo "Done. Results in $RUN_ROOT" +fi diff --git a/microbench/connector_tax/tools/__init__.py b/microbench/connector_tax/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/microbench/connector_tax/tools/dummy_bootstrap.py b/microbench/connector_tax/tools/dummy_bootstrap.py new file mode 100644 index 0000000..300b07d --- /dev/null +++ b/microbench/connector_tax/tools/dummy_bootstrap.py @@ -0,0 +1,84 @@ +"""Dummy Mooncake bootstrap server for kv_consumer pre-flight. + +Exposes the same HTTP routes as MooncakeBootstrapServer but returns +empty / accepting responses. Allows a kv_consumer vLLM to start up +without a real prefiller behind it. + +Usage: + python dummy_bootstrap.py --port 8997 +""" + +import argparse +import logging +import threading + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +import uvicorn + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("dummy_bootstrap") + + +def make_app() -> FastAPI: + app = FastAPI() + state = {"workers": {}, "hash_table": {}} + + @app.post("/register") + async def register_worker(req: Request): + body = await req.json() + log.info("register_worker: %s", body) + # Pretend success + dp_rank = int(body.get("dp_rank", 0)) + engine_id = body.get("engine_id", "dummy-engine") + state["workers"][dp_rank] = { + "engine_id": engine_id, + "worker_addr": body.get("worker_addr", {}), + } + return JSONResponse({"status": "ok"}) + + @app.get("/query") + async def query(): + # Return whatever we have. Empty {} is acceptable for the consumer + # because no PD-sep request will actually trigger a pull. + return JSONResponse(state["workers"]) + + @app.post("/query_blocks") + async def query_blocks(req: Request): + return JSONResponse({"matched_blocks": []}) + + @app.post("/unpin_blocks") + async def unpin_blocks(req: Request): + return JSONResponse({"status": "ok"}) + + @app.post("/push_blocks") + async def push_blocks(req: Request): + return JSONResponse({"status": "ok"}) + + @app.post("/estimate_hit") + async def estimate_hit(req: Request): + return JSONResponse({"hit_tokens": 0}) + + @app.get("/health") + async def health(): + return JSONResponse({"status": "ok"}) + + return app + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--host", default="127.0.0.1") + ap.add_argument("--port", type=int, default=8997) + args = ap.parse_args() + + app = make_app() + config = uvicorn.Config(app=app, host=args.host, port=args.port, + log_level="info") + server = uvicorn.Server(config) + log.info("Dummy Mooncake bootstrap listening on %s:%d", args.host, args.port) + server.run() + + +if __name__ == "__main__": + main() diff --git a/microbench/connector_tax/tools/noop_connector.py b/microbench/connector_tax/tools/noop_connector.py new file mode 100644 index 0000000..77cb031 --- /dev/null +++ b/microbench/connector_tax/tools/noop_connector.py @@ -0,0 +1,90 @@ +"""Pure no-op KV connector for measuring vLLM v1 framework overhead. + +This connector implements every abstract hook of KVConnectorBase_V1 with +the cheapest possible no-op return. Loaded via: + + --kv-transfer-config '{ + "kv_connector_module_path": + "microbench.connector_tax.tools.noop_connector:NoOpConnector", + "kv_role": "kv_both" + }' + +It does: + - no I/O + - no per-step cache key walk + - no per-layer save/load + - no metadata serialization beyond an empty dataclass + +So `tax(NoOpConnector) ≈ pure vLLM v1 framework overhead`. +""" + +from typing import TYPE_CHECKING, Any + +from vllm.distributed.kv_transfer.kv_connector.v1.base import ( + KVConnectorBase_V1, + KVConnectorMetadata, +) + +if TYPE_CHECKING: + import torch + from vllm.attention.backends.abstract import AttentionMetadata + from vllm.forward_context import ForwardContext + from vllm.v1.core.kv_cache_manager import KVCacheBlocks + from vllm.v1.core.sched.output import SchedulerOutput + from vllm.v1.request import Request + + +class NoOpConnector(KVConnectorBase_V1): + """Empty connector — every hook is a no-op. + + Used as a control to isolate vLLM v1 framework dispatch cost + (build_connector_meta walking SchedulerOutput, mixin hooks, etc.) + from any specific connector implementation work (RDMA setup, + per-layer save, hash table walks). + """ + + # ---- scheduler-side abstract methods ------------------------------ + def get_num_new_matched_tokens( + self, + request: "Request", + num_computed_tokens: int, + ) -> tuple[int | None, bool]: + # Never advertises any external cache hits. + return 0, False + + def update_state_after_alloc( + self, + request: "Request", + blocks: "KVCacheBlocks", + num_external_tokens: int, + ) -> None: + return None + + def build_connector_meta( + self, + scheduler_output: "SchedulerOutput", + ) -> KVConnectorMetadata: + return KVConnectorMetadata() + + # ---- worker-side abstract methods --------------------------------- + def start_load_kv( + self, + forward_context: "ForwardContext", + **kwargs: Any, + ) -> None: + return None + + def wait_for_layer_load(self, layer_name: str) -> None: + return None + + def save_kv_layer( + self, + layer_name: str, + kv_layer: "torch.Tensor", + attn_metadata: "AttentionMetadata", + **kwargs: Any, + ) -> None: + return None + + def wait_for_save(self) -> None: + return None diff --git a/microbench/connector_tax/tools/verify_kv_consumer.sh b/microbench/connector_tax/tools/verify_kv_consumer.sh new file mode 100755 index 0000000..c106c44 --- /dev/null +++ b/microbench/connector_tax/tools/verify_kv_consumer.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# Pre-flight: verify Mooncake kv_consumer + dummy bootstrap can start +# and answer at least a trivial request. +set -euo pipefail +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RUN_DIR="${RUN_DIR:-$HERE/../results/preflight/kv_consumer}" +mkdir -p "$RUN_DIR" + +DUMMY_BOOTSTRAP_PORT="${DUMMY_BOOTSTRAP_PORT:-8997}" +PORT="${PORT:-8000}" + +bash "$HERE/../launch/launch_mooncake_consumer.sh" || { + echo "SKIP: kv_consumer launch failed (likely Mooncake bootstrap incompatible with dummy)" >&2 + exit 42 +} + +# kv_consumer can be sent a regular (non-PD-sep) request — it will just +# do local prefill+decode. It should succeed. +MODEL="$HOME/models/Qwen/$(ls $HOME/models/Qwen | grep Qwen3-Coder-30B | head -1)" + +for i in 1 2 3 4 5; do + code=$(curl -s -o "$RUN_DIR/req_$i.json" -w "%{http_code}" \ + -X POST "http://127.0.0.1:$PORT/v1/chat/completions" \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"$MODEL\",\"messages\":[{\"role\":\"user\",\"content\":\"hi $i\"}],\"max_tokens\":4,\"temperature\":0}" \ + --max-time 30 || echo "000") + if [[ "$code" != "200" ]]; then + echo "FAIL: req $i status=$code" >&2 + exit 1 + fi +done + +echo "OK: kv_consumer reachable, all 5 requests succeeded" diff --git a/microbench/connector_tax/tools/verify_multi_connector.sh b/microbench/connector_tax/tools/verify_multi_connector.sh new file mode 100755 index 0000000..410ee01 --- /dev/null +++ b/microbench/connector_tax/tools/verify_multi_connector.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Pre-flight: verify MultiConnector(Mooncake kv_both, LMCacheConnectorV1). +# Exits 42 if LMCache missing, 1 on crash, 0 on OK. +set -euo pipefail +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RUN_DIR="${RUN_DIR:-$HERE/../results/preflight/multi}" +mkdir -p "$RUN_DIR" + +PORT="${PORT:-8000}" + +if ! "$(dirname "$HERE")/launch/launch_multi_mooncake_lmcache.sh"; then + rc=$? + if [[ $rc == 42 ]]; then + echo "SKIP: lmcache missing" >&2 + exit 42 + fi + echo "FAIL: launch failed with code $rc" >&2 + exit 1 +fi + +MODEL="$HOME/models/Qwen/$(ls $HOME/models/Qwen | grep Qwen3-Coder-30B | head -1)" + +for i in 1 2 3 4 5; do + code=$(curl -s -o "$RUN_DIR/req_$i.json" -w "%{http_code}" \ + -X POST "http://127.0.0.1:$PORT/v1/chat/completions" \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"$MODEL\",\"messages\":[{\"role\":\"user\",\"content\":\"multi $i\"}],\"max_tokens\":32,\"temperature\":0}" \ + --max-time 60 || echo "000") + if [[ "$code" != "200" ]]; then + echo "FAIL: req $i status=$code" >&2 + exit 1 + fi +done + +echo "OK: MultiConnector reachable, all 5 requests succeeded" diff --git a/microbench/connector_tax/tools/verify_noop_connector.sh b/microbench/connector_tax/tools/verify_noop_connector.sh new file mode 100755 index 0000000..842f84f --- /dev/null +++ b/microbench/connector_tax/tools/verify_noop_connector.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# Pre-flight: verify NoOpConnector loads and serves requests. +# Exits 0 = OK, 42 = SKIP (dependency missing), nonzero = fail. + +set -euo pipefail +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RUN_DIR="${RUN_DIR:-$HERE/../results/preflight/noop}" +mkdir -p "$RUN_DIR" + +bash "$HERE/../launch/launch_noop_connector.sh" +PORT="${PORT:-8000}" + +# Issue 5 short requests +for i in 1 2 3 4 5; do + code=$(curl -s -o "$RUN_DIR/req_$i.json" -w "%{http_code}" \ + -X POST "http://127.0.0.1:$PORT/v1/chat/completions" \ + -H 'Content-Type: application/json' \ + -d "{\"model\":\"$(ls $HOME/models/Qwen | grep Qwen3-Coder-30B | head -1 | xargs -I{} echo $HOME/models/Qwen/{})\",\"messages\":[{\"role\":\"user\",\"content\":\"hello $i\"}],\"max_tokens\":4,\"temperature\":0}" \ + --max-time 60 || echo "000") + if [[ "$code" != "200" ]]; then + echo "FAIL: req $i status=$code" >&2 + exit 1 + fi +done + +echo "OK: all 5 requests succeeded"