Compare commits

..

13 Commits

32 changed files with 2471 additions and 451 deletions

1
.gitignore vendored
View File

@@ -15,6 +15,7 @@ tests/test_analyze_affinity_policy.py
# Simulation output
/runs/
.worktrees/
# Editor / IDE
.vscode/

568
README.md
View File

@@ -1,353 +1,386 @@
# kvcache-simulator
Discrete-event simulator for cluster-level LLM **prefill** serving with a
two-tier KV cache (GPU HBM + CPU DRAM / v6d) and KV-aware request routing.
Replays real production traces against a synthetic cluster so you can
ablate routing strategies and cache sizing without spinning up any GPUs.
two-tier KV cache and routing experiments. The simulator models a
PD-disaggregated deployment: only the **prefill** path is simulated, while
decode is reduced to a small completion tail for TTFT/E2E accounting.
Assumes **PD (prefill/decode) disaggregation** — only the prefill path is
modeled.
It is intended for answering questions like:
## Features
- How much do different KV-aware routers help on the same trace?
- How much HBM/DRAM capacity is enough before routing dominates?
- How do prefix-locality policies behave under bucketed input-length pools?
- What is the gap between online LRU and offline-optimal cache capacity?
- **Architecture-derived roofline compute** — auto-derives FLOPs,
attention coefficients, and weight-streaming costs from model
architecture (MoE, MLA, GQA, DSA, sliding window).
- **HuggingFace config.json auto-parsing** — drop in any HF
`config.json` and the simulator extracts layer counts, attention heads,
MoE expert configs, MLA LoRA ranks, and DSA sparse parameters.
- **Built-in GPU hardware presets** — H100, H800, H20, A100-80GB,
A100-40GB, B200 with tensor-parallel scaling (e.g. `8xb200`).
- **Two-tier KV cache hierarchy** — L0 (GPU HBM) + L1 (CPU DRAM) with
LRU eviction and cross-instance RDMA fetch via a cluster-wide
meta-store.
- **11 routing policies** — from baselines (random, round-robin) to
cache-aware (min\_pd, prefix\_affinity) for systematic ablation.
- **Token-bucket link contention** — PCIe and RDMA bandwidth modeled with
reservation-based token-bucket queues.
- **Oracle analysis** — computes theoretical hit-rate ceilings (infinite
cache, Belady optimal, LRU) for gap analysis.
## What The Repo Models
- **Architecture-derived prefill cost** from model structure, including MoE,
MLA, GQA, sliding-window attention, and DSA.
- **Two-tier KV hierarchy** with L0 GPU HBM and L1 host DRAM, plus remote
RDMA fetches via a meta-store.
- **Single-pool and bucketed clusters**. Bucketed mode separates the service
into input-length buckets with isolated instance pools and meta-stores.
- **Local instance routing and global bucket routing** with detailed
per-request routing logs.
- **Trace replay with optional input-length filtering** so the same trace can
be sliced into buckets without rewriting the source file.
- **Offline oracle analysis** for unlimited capacity, Belady, and LRU hit-rate
ceilings.
## Highlights
- **HF `config.json` auto-loading**: point `model.config_json` at a model
config and the simulator derives architecture parameters automatically.
- **Hardware presets**: `h100`, `h800`, `h20`, `h20-141g`, `a100-80gb`,
`a100-40gb`, `b200`, and `b300`, plus TP variants such as `8xb200`.
- **18 local router modes** covering baselines, load-based, cache-aware,
affinity, and TTFT-estimating policies.
- **2 global bucket router modes**: `strict_input_length` and `bucket_score`.
- **Detailed outputs**: `summary.json`, `per_request.csv`, `instances.csv`,
`routing_log.jsonl`, plus `ablation.json` / `oracle.json` when applicable.
## Build
```bash
cargo build --release
# binary: target/release/kvcache-sim
```
Fetch the upstream trace (consumed as a git submodule):
If you want the public Qwen trace submodule as well:
```bash
git submodule update --init --recursive
```
## Usage
### 1. Run a single simulation
The release binary is:
```bash
target/release/kvcache-sim run --config configs/glm5-8xb200-hf.yaml
target/release/kvcache-sim
```
Prints `summary.json` to stdout and writes the full output directory
(see [Outputs](#outputs) below).
## Quick Start
### 2. Compare routers on the same trace (ablation)
Validate a config:
```bash
target/release/kvcache-sim validate --config configs/glm5-8xb200.yaml
```
Run one simulation:
```bash
target/release/kvcache-sim run --config configs/glm5-8xb200.yaml
```
Compare several routers on the same trace:
```bash
target/release/kvcache-sim ablate \
--config configs/glm5-8xb200-hf.yaml \
--routers random,least_loaded,least_tokens,min_pd,prefix_affinity \
--evict-policies lru \
--output-dir runs/glm5_ablation
--config configs/glm5-8xb200.yaml \
--routers random,least_loaded,cache_score,cache_affinity,estimated_ttft
```
Writes `ablation.json` with one row per `router x evict_policy`.
`ablate` currently supports only `lru` as a valid eviction policy. The
aggregated output keeps the online prefill-time metrics
(`ttft_mean/p50/p95/p99`) and omits `e2e`.
The previous replay-based `belady` approximation has been removed from
the CLI because it was not an exact full-hierarchy Belady algorithm and
could produce misleading comparisons against `lru`.
### 3. Compute theoretical hit-rate ceilings (oracle)
Auto-pick the smallest cluster size that meets a TTFT target, then ablate at
that size:
```bash
# Cluster-aggregate capacity (default)
target/release/kvcache-sim oracle \
--config configs/glm5-8xb200-hf.yaml --num-instances 64
# A single instance's HBM budget
target/release/kvcache-sim oracle \
--config configs/glm5-8xb200-hf.yaml --per-instance
# Explicit capacity in blocks
target/release/kvcache-sim oracle \
--config configs/glm5-8xb200-hf.yaml --capacity-blocks 200000
target/release/kvcache-sim ablate \
--config configs/glm5-8xb200.yaml \
--auto-instances \
--auto-probe-router cache_score \
--auto-target-ttft-mean 4.0
```
Reports three numbers:
- `unlimited.hit_rate` — absolute ceiling (infinite cache)
- `belady_finite.hit_rate` — optimal-eviction ceiling at the given capacity
- `lru_finite.hit_rate` — production LRU at the same capacity
Gap between `lru_finite` and `belady_finite` = headroom from a smarter
eviction policy. Gap between `belady_finite` and `unlimited` = headroom
only reachable by adding capacity.
### 4. Validate a config without running
Run the oracle:
```bash
target/release/kvcache-sim validate --config configs/glm5-8xb200-hf.yaml
target/release/kvcache-sim oracle \
--config configs/glm5-8xb200.yaml \
--per-instance
```
Parses the YAML, prints derived per-instance block budgets, and dumps
the first 5 trace records so you can sanity-check the path.
`run` prints `summary.json` to stdout and also writes the full output directory
under `sim.output_dir`.
## CLI overrides
## Current Command Boundaries
These flags work on **all** subcommands and override the YAML in place,
so the same config can be reused across sweeps:
The repository now supports both legacy single-pool clusters and bucketed
service topologies, but not every CLI path supports both yet.
| Flag | Overrides |
|--------------------------|-------------------------------------------|
| `--num-instances <N>` | `cluster.num_instances` |
| `--max-requests <N>` | `sim.max_requests` |
| `--trace <PATH>` | `sim.trace_path` |
| `--output-dir <PATH>` | `sim.output_dir` |
| `--seed <N>` | `sim.seed` |
| `--precise-topk <N>` | `cluster.router.precise_probe_topk` |
| `--ttl-seconds <S>` | `cluster.meta_store.ttl_seconds` |
- `run`: supports `cluster.num_instances` and `cluster.buckets`
- `validate`: supports `cluster.num_instances` and `cluster.buckets`
- `ablate`: currently **single-pool only**
- `ablate --evict-policies`: currently supports **`lru` only**
- `oracle`: currently **single-pool only**
- `--num-instances` override: currently **single-pool only**
- `--auto-instances`: currently **single-pool only**
`oracle` additionally takes `--capacity-blocks <N>` / `--per-instance`
and `--out <PATH>`. `ablate` additionally takes `--routers <csv>` and
`--evict-policies <csv>` (currently only `lru`).
In practice, bucket-aware experiments are ready in `run`, while fixed-placement
ablation and oracle analysis still reject `cluster.buckets`.
## Router modes
## Config Model
Set `cluster.router.mode` in the YAML or list in `--routers`:
### Single-Pool Cluster
| Mode | Aliases | What it does |
|-------------------|------------------|--------------------------------------------------------------------------------------|
| `random` | | Uniform random. Baseline. |
| `round_robin` | `rr` | Deterministic round-robin. Baseline. |
| `least_loaded` | | `argmin(kv_blocks_used + alpha * queue_len)`. KV-blind load balance. |
| `least_tokens` | `lt` | `argmin(waiting_tokens)`. Pure load balance by queued compute work. |
| `ttl_aware` | `ttl` | Picks instance with longest prefix in the global TTL meta-store. Cache-only. |
| `precise` | `precise_aware` | Probes top-K least-loaded instances' actual caches; charges probe latency into TTFT. |
| `min_pd` | `minpd`, `pd` | Minimizes `P*D` (prefill tokens x ongoing requests). Cluster-wide RDMA-aware. |
| `cache_load` | `cl` | Filters to least-loaded 1/4 instances, then picks best cache prefix. |
| `cache_score` | `cs` | Exponential scoring: `2^(alpha * queue_len + beta * miss_blocks)`. |
| `estimated_ttft` | `ettft`,`optimal`| Estimates `drain_time + fetch_time` per instance using architecture-aware compute. |
| `prefix_affinity` | `affinity`, `pa` | Rendezvous-hashed prefix fingerprinting for deterministic cache locality. |
Use `cluster.num_instances` for the original flat instance pool:
### Router parameters
These fields in `cluster.router` tune specific routers:
| Field | Default | Used by | Description |
|--------------------------|---------|------------------|------------------------------------------------------|
| `load_alpha` | `1.0` | `least_loaded` | Weight of queue\_len vs kv\_blocks\_used |
| `score_alpha` | `1.0` | `cache_score` | Load weight in `2^(alpha*load + beta*miss)` |
| `score_beta` | `0.1` | `cache_score` | Cache-miss weight in `2^(alpha*load + beta*miss)` |
| `prefix_k` | `8` | `prefix_affinity`| Number of leading blocks for the prefix fingerprint |
| `affinity_fan_out` | `0` | `prefix_affinity`| Top-K affinity candidates (0 = auto: n/8, min 2) |
| `precise_probe_latency_us`| `50.0`| `precise` | Simulated per-probe latency (microseconds) |
| `precise_probe_topk` | `4` | `precise` | Number of instances probed |
### Router design spectrum
```
Cache-only Hybrid Load-only
(hot-spot risk) (cache-blind)
┌─────────┬───────────┬───────────┬────────────┬───────────┬───────────┐
ttl_aware precise cache_score min_pd prefix_ least_ random
cache_load affinity loaded
est_ttft least_tokens
```yaml
cluster:
num_instances: 32
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
```
`prefix_affinity` sits in a unique position: it builds **proactive cache
locality** by consistently routing same-prefix requests to the same
instances (via rendezvous hashing), rather than reactively chasing
existing cache state. This yields the highest L0 hit rates while
maintaining load balance through within-group drain-time-aware selection.
### Bucketed Service
## Model configuration
Use `cluster.buckets` plus a `global_router` to model explicit input-length
buckets:
### HuggingFace config.json (recommended)
```yaml
cluster:
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
load_alpha: 1.5
prefix_k: 8
global_router:
mode: strict_input_length
length_penalty_weight: 1.0
load_weight: 1.0
cache_weight: 1.0
buckets:
- name: short
input_length_min: 0
input_length_max: 32768
num_instances: 8
- name: long
input_length_min: 32769
input_length_max: 131072
num_instances: 4
```
Point `model.config_json` at any HF `config.json` to auto-extract
architecture:
Rules enforced by config validation:
- `cluster.num_instances` and `cluster.buckets` are mutually exclusive
- bucket ranges must not overlap
- every bucket must have `num_instances > 0`
- `input_length_min <= input_length_max`
### CLI Overrides
These flags apply on top of the YAML config:
| Flag | Overrides |
|------|-----------|
| `--num-instances <N>` | `cluster.num_instances` |
| `--max-requests <N>` | `sim.max_requests` |
| `--trace <PATH>` | `sim.trace_path` |
| `--output-dir <PATH>` | `sim.output_dir` |
| `--seed <N>` | `sim.seed` |
| `--precise-topk <N>` | `cluster.router.precise_probe_topk` |
| `--ttl-seconds <S>` | `cluster.meta_store.ttl_seconds` |
| `--input-length-min <N>` | `sim.input_length_min` |
| `--input-length-max <N>` | `sim.input_length_max` |
Subcommand-specific additions:
- `ablate`: `--routers`, `--evict-policies`, `--auto-instances`,
`--auto-target-ttft-mean`, `--auto-candidates`, `--auto-probe-router`,
`--jobs`
- `oracle`: `--capacity-blocks`, `--per-instance`, `--out`
## Routing Modes
### Global Bucket Routers
Configured through `cluster.global_router.mode`.
| Mode | What it does |
|------|---------------|
| `strict_input_length` | Routes to the unique bucket whose `[input_length_min, input_length_max]` contains the request. |
| `bucket_score` | Scores every bucket using weighted length mismatch, aggregate queue load, and predicted cache miss. Can intentionally deviate from the strict length bucket. |
### Local Instance Routers
Configured through `cluster.router.mode`. All of these names are accepted by
`run`, and any of them can be passed to `ablate --routers` on single-pool
configs.
| Mode | Aliases | What it does |
|------|---------|---------------|
| `random` | | Uniform random baseline. |
| `round_robin` | `rr` | Deterministic round-robin baseline. |
| `least_loaded` | | Minimizes `kv_blocks_used + alpha * queue_len`. |
| `least_tokens` | `lt` | Minimizes queued token work. |
| `ttl_aware` | `ttl` | Uses the global TTL meta-store to chase the longest reusable prefix. |
| `precise` | `precise_aware` | Probes top-K least-loaded instances for actual cache contents and charges probe latency. |
| `min_pd` | `minpd`, `pd` | Minimizes `P * D` using ongoing load and prefix reuse. |
| `cache_load` | `cl` | Filters to lightly loaded instances, then chooses the best cache prefix. |
| `cache_affinity` | `caff`, `ca` | Strong cache-first scoring with rendezvous-based sticky homes for prefix families. |
| `cache_affinity_weak_rend` | `caff_weak` | Ablation: weak cache weights plus rendezvous placement. |
| `cache_affinity_strong_only` | `caff_strong` | Ablation: strong cache weights without rendezvous tie-breaking. |
| `cache_score` | `cs` | Exponential score over queue length and miss blocks. |
| `cache_score_strong` | `cs_strong`, `css` | Parity probe with stronger cache weighting than default `cache_score`. |
| `cache_score_ttl` | `csttl`, `cs_ttl` | `cache_score` variant that also uses TTL/meta-store visibility. |
| `estimated_ttft` | `ettft`, `optimal` | First-principles TTFT estimate per instance using compute plus KV movement. |
| `prefix_affinity` | `affinity`, `pa` | Deterministic prefix fingerprinting with affinity fan-out and load-aware selection. |
| `adaptive_affinity` | `aa` | Uses hot-prefix detection: affinity for short hot stems, TTFT optimization otherwise. |
| `lineage_affinity` | `la` | Combines parent stickiness, family homesets, and strong local cache scoring. |
Router tuning knobs in `cluster.router`:
| Field | Default | Used by |
|-------|---------|---------|
| `load_alpha` | `1.0` | `least_loaded`, `ttl_aware`, affinity families |
| `score_alpha` | `1.0` | `cache_score`, `cache_score_ttl` |
| `score_beta` | `0.1` | `cache_score`, `cache_score_ttl` |
| `prefix_k` | `8` | prefix and affinity fingerprinting |
| `affinity_fan_out` | `0` | `prefix_affinity`, `adaptive_affinity`, `lineage_affinity` |
| `precise_probe_latency_us` | `50.0` | `precise` |
| `precise_probe_topk` | `4` | `precise` |
## Model And Hardware Configuration
### Model Config
Recommended pattern:
```yaml
model:
config_json: ../models/GLM-5/config.json
dtype_bytes: 2 # required (not in HF schema)
block_size_tokens: 512 # required (not in HF schema)
name: glm-5
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
```
Auto-detected features:
Notes:
| Feature | Detection trigger | What it extracts |
|-----------|-------------------------------|----------------------------------------------|
| **MoE** | `n_routed_experts`, `num_local_experts`, or `num_experts` | Expert count, active experts, shared experts, expert FFN width |
| **MLA** | `kv_lora_rank` present | KV/Q LoRA ranks, qk\_rope/nope dims, v\_head\_dim |
| **DSA** | `first_k_dense_replace` present| Dense window, sparse stride, first dense layers |
| **Sliding window** | `sliding_window` present | Window size |
| **GQA** | `num_key_value_heads < num_attention_heads` | KV head count for grouped-query attention |
- `config_json` is resolved relative to the YAML file
- explicit YAML fields override values loaded from the model config
- `compute_dtype` selects the compute FLOPS tier
- `weight_dtype` controls model-weight bytes separately from KV-cache bytes
- `dtype_bytes` sizes the KV cache
Explicit YAML fields always override the auto-detected values.
The architecture loader understands:
### Inline specification
- MoE expert counts and active experts
- MLA LoRA ranks and attention dimensions
- DSA sparse-attention parameters
- sliding-window attention
- GQA from KV-head count
Alternatively, specify architecture fields directly:
### Hardware Presets
```yaml
model:
name: qwen2.5-coder-7b
num_layers: 28
hidden_size: 3584
num_attention_heads: 28
num_kv_heads: 4
head_dim: 128
intermediate_size: 18944
dtype_bytes: 2
block_size_tokens: 16
```
When `hidden_size` is present, the compute model is auto-derived
(architecture mode). Without it, you must supply legacy manual
coefficients (`flops_per_token_prefill`, `attn_quadratic_coeff`, etc.).
### Bundled model configs
| Model | Path | Architecture |
|-------|------|--------------|
| GLM-5 (744B/40B-active) | `models/GLM-5/config.json` | MoE (256 routed, 8 active, 1 shared) + MLA + DSA |
| GLM-5-FP8 | `models/GLM-5-FP8/config.json` | GLM-5 architecture + upstream FP8 quantization metadata |
| Qwen3-Coder-480B-A35B FP8 | `models/Qwen3-Coder-480B-A35B-Instruct-FP8/config.json` | MoE (160 experts, 8 active) + GQA |
## Hardware configuration
### Using presets (recommended)
Set `hardware.type` to a preset name — individual fields can override:
Recommended pattern:
```yaml
hardware:
type: 8xb200
hbm_bytes: 500.0e9 # override KV budget (after model weights)
```
Available presets:
| Preset | FLOPS | HBM | Mem BW | PCIe |
|-------------|------------|---------|------------|------|
| `h100` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 |
| `h800` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 |
| `h20` | 148 TFLOPS | 96 GB | 4.0 TB/s | Gen5 |
| `h20-141g` | 148 TFLOPS | 141 GB | 4.8 TB/s | Gen5 |
| `a100-80gb` | 312 TFLOPS | 80 GB | 2.0 TB/s | Gen4 |
| `a100-40gb` | 312 TFLOPS | 40 GB | 1.555 TB/s | Gen4 |
| `b200` | 2.25 PFLOPS| 192 GB | 8.0 TB/s | Gen6 |
Prefix with `2x`, `4x`, or `8x` for tensor-parallel groups (e.g.
`8xh20`). FLOPS, memory bandwidth, and HBM scale linearly; RDMA and
DRAM are set to sensible per-node defaults.
### Inline specification
```yaml
hardware:
gpu_flops: 1.80e16
gpu_mem_bw: 6.40e13
hbm_bytes: 500.0e9
type: 8xb300
hbm_bytes: 1900.0e9
dram_bytes: 1.5e12
pcie_bw: 128.0e9
pcie_latency_us: 4.0
rdma_bw: 50.0e9
rdma_latency_us: 6.0
max_batch_slots: 256
prefill_chunk_tokens: 4096
```
## Architecture-aware compute model
Available preset families:
The simulator derives a **roofline prefill model** from model
architecture:
- `h100`, `h800`, `h20`, `h20-141g`
- `a100-80gb`, `a100-40gb`
- `b200`, `b300`
- TP forms such as `2xh100`, `4xh20`, `8xb200`, `8xb300`
```
prefill_time(N tokens) = max(compute_time, memory_time)
## Bundled Configs
compute_time = layers * (N * linear_flops + attn_coeff * N * effective_ctx(N)) / gpu_flops
memory_time = layers * weight_bytes_per_layer / gpu_mem_bw
Representative configs in `configs/`:
| Config | Notes |
|--------|-------|
| `glm5-8xb200.yaml` | GLM-5 on `8xb200`, single-pool baseline config. |
| `glm5-fp8-8xh20-141g.yaml` | GLM-5-FP8 on `8xh20-141g`, with a 0-32k input-length filter. |
| `glm5-fp8-8xh20-141g-ca-tuned.yaml` | Same family as above, tuned for `cache_affinity`. |
| `glm5-nvfp4-8xb300.yaml` | GLM-5-NVFP4 on `8xb300`. |
| `glm5-nvfp4-fp8compute-8xb300.yaml` | NVFP4 weights with FP8 compute on `8xb300`. |
| `qwen3-coder-480b-8xh20.yaml` | Qwen3-Coder-480B-A35B on `8xh20`. |
Many of the `glm5-*n*.yaml` configs are bucket/slice-specific experiment
points that use `sim.input_length_min` and `sim.input_length_max`.
## Trace Inputs
This repository currently contains two trace sources:
- `bailian-traces/`
- `glm_coder_blksz_512_040915-040917.jsonl`
- `qwen3_coder_blksz_512_040915-040917.jsonl`
- `qwen-bailian-usagetraces-anon/` submodule
- public 16-token-block Qwen traces such as
`qwen_coder_blksz_16.jsonl` and `qwen_traceB_blksz_16.jsonl`
The simulator expects JSONL records with fields like:
```json
{
"chat_id": 159,
"parent_chat_id": 55,
"timestamp": 61.114,
"input_length": 521,
"output_length": 132,
"type": "text",
"turn": 2,
"hash_ids": [1089, 1090, 1091]
}
```
- **MoE**: only active experts contribute to FLOPs and weight streaming
(shared experts always counted)
- **MLA**: compressed KV projections reduce attention FLOPs; KV cache
uses `kv_lora_rank + qk_rope_head_dim` instead of `2 * kv_heads * head_dim`
- **DSA**: `effective_ctx = min(N, dense_window) + max(0, N - dense_window) / sparse_stride`,
with the first K layers using full dense attention
- **GQA**: fewer KV heads reduce both attention compute and KV cache size
## Bundled config files
| Config | Model | Hardware | Instances | Trace |
|--------|-------|----------|-----------|-------|
| `glm5-8xb200-hf.yaml` | GLM-5 via HF config.json | 8xB200 preset | 32 | GLM coder blk512 |
| `glm5-fp8-8xh20-141g.yaml` | GLM-5-FP8 via ModelScope config.json | 8xH20-141G preset | 128 | GLM coder blk512 |
| `glm5-nvfp4-8xb300.yaml` | GLM-5-NVFP4 via HF config.json | 8xB300 preset | 8 | GLM coder blk512 |
| `qwen3-coder-480b-8xh20.yaml` | Qwen3-Coder via HF | 8xH20 preset | 32 | Qwen coder blk16 |
Only prefill-side behavior is modeled; `output_length` is used only for a
decode tail in completion metrics.
## Outputs
Each run writes a directory under `sim.output_dir`:
Each `run` writes a directory under `sim.output_dir`:
| File | Contents |
|----------------------|----------------------------------------------------------------------------|
| `summary.json` | Router, throughput, TTFT p50/p95/p99, hit rates per tier, total RDMA/PCIe bytes |
| `per_request.csv` | `req_id,arrival,ttft,e2e,instance,total_blocks,l0_hit,l1_hit,remote_hit,miss,rdma_bytes,pcie_bytes,probe_overhead_s` |
| `instances.csv` | `t,instance,queue_len,kv_blocks_used,kv_blocks_total,busy` per sample |
| `routing_log.jsonl` | One JSON per request: all router candidates + chosen instance + reason |
| File | Contents |
|------|----------|
| `summary.json` | Aggregate throughput, TTFT/E2E percentiles, hit rates, RDMA bytes, PCIe bytes. |
| `per_request.csv` | Per-request latency and cache stats, including `bucket`, `instance`, and `length_bucket_match`. |
| `instances.csv` | Periodic per-instance samples with `bucket`, `instance`, `queue_len`, and KV usage. |
| `routing_log.jsonl` | One JSON route decision per request, including `global_mode`, `mode`, `chosen_bucket`, candidate buckets, and candidate instances. |
For `ablate`: an extra `ablation.json` with one summary per router.
For `oracle`: an `oracle.json` with the three hit-rate analyses.
Additional outputs:
### Reading results quickly
- `ablate`: writes `ablation.json`
- `oracle`: writes `oracle.json`
- `ablate --auto-instances`: writes calibration runs under
`<output_dir>/auto_instances/`
Quick inspection examples:
```bash
# Pretty-print the summary
cat runs/glm5_8xb200_hf/summary.json | jq .
# Compare all routers from an ablation
cat runs/glm5_8xb200_hf/ablation.json | \
jq '.[] | {router, ttft_mean, ttft_p50, hit_rate_l0, miss_rate}'
# Sort by TTFT
cat runs/glm5_8xb200_hf/ablation.json | \
jq 'sort_by(.ttft_mean) | .[] | {router, ttft_mean, hit_rate_l0}'
jq . runs/glm5_8xb200/summary.json
```
## Trace format
```bash
jq 'sort_by(.ttft_mean) | .[] | {router, ttft_mean, hit_rate_l0, miss_rate}' \
runs/glm5_8xb200/ablation.json
```
The simulator reads the Alibaba
[`qwen-bailian-usagetraces-anon`](https://github.com/alibaba-edu/qwen-bailian-usagetraces-anon)
JSONL schema. Each record has `chat_id`, `timestamp`, `input_length`,
`output_length`, and `hash_ids` (block hashes, typically 16 tokens each).
Only the input side is used.
## Oracle Semantics
Available traces in the submodule:
`oracle` computes three hit-rate references at a chosen cache capacity:
| Trace | Requests | Description |
|-------|----------|-------------|
| `qwen_coder_blksz_16.jsonl` | 43k | Qwen Coder serving traffic |
| `qwen_traceA_blksz_16.jsonl` | 43k | Qwen general traffic A |
| `qwen_traceB_blksz_16.jsonl` | 173k | Qwen general traffic B |
| `qwen_thinking_blksz_16.jsonl` | 11k | Qwen reasoning/thinking traffic |
- `unlimited.hit_rate`: absolute ceiling with infinite capacity
- `belady_finite.hit_rate`: offline-optimal eviction at the chosen capacity
- `lru_finite.hit_rate`: LRU at the same capacity
When `sim.input_length_min` / `sim.input_length_max` are set, `oracle` still
feeds the full trace into cache state but only counts requests inside the
selected input-length range. That matches the intended "measure one bucket
inside a mixed workload" interpretation.
The gap from `lru_finite` to `belady_finite` is eviction-policy headroom. The
gap from `belady_finite` to `unlimited` is pure capacity headroom.
## Testing
@@ -355,6 +388,5 @@ Available traces in the submodule:
cargo test --release
```
28 tests: 27 unit tests (compute model, HF config parsing, hardware
presets) + 1 integration smoke test that runs routers on a synthetic
shared-prefix trace and asserts the expected hit-rate ordering.
The test suite covers config parsing, hardware presets, routing behavior,
bucket-aware service semantics, oracle logic, and smoke-style end-to-end runs.

View File

@@ -0,0 +1,449 @@
# Bucket-Aware Routing Design
## 背景
当前 simulator 只有一个全局 `Cluster`
- trace replay 的所有请求共享一组 `instances`
- router 直接在全局 instance 池里选目标实例
- `meta_store`、L0/L1 cache 可见性、remote RDMA 也都是全局共享
这和目标架构不一致。目标架构要求:
- service 内存在多个显式定义的 input-length buckets
- 每个 bucket 有独立的 instance 池,实例数由配置显式给出
- bucket 之间的 cache / meta-store / remote 可见性严格隔离
- router 不只是 bucket 内调度器,还要能以 global 视角感知 bucket 的存在
- 后续需要用 simulator 研究:
- 是否应该区分 bucket
- 严格按 input length 分发与非严格分发的差异
- bucket policy 与 bucket 内 instance policy 的耦合和收益
因此,这次重构不能把“先按 input length 选 bucket”硬编码到 service 层,而要把 bucket 选择纳入 global router 的决策面。
## 目标
本次设计的目标是把 simulator 重构成“两级调度”架构:
1. global router 在所有 bucket 之间选择目标 bucket
2. local router 在选中的 bucket 内选择目标 instance
同时满足:
- bucket 由配置文件显式定义
- 所有 bucket 共享同一套 local router 配置
- bucket 之间完全隔离,不共享 L0/L1/meta-store/remote 视图
- 保留足够的 metrics / routing log支持后续研究 bucket policy 的效果
- 尽量复用现有 `src/router/*` 中的 instance 级路由实现,避免把所有 router 改写成跨 bucket 扁平打分器
非目标:
- 第一阶段不支持 per-bucket 自定义 router 算法
- 第一阶段不支持跨 bucket cache 共享
- 第一阶段不自动推导 bucket 边界或 bucket 实例数
- 第一阶段不实现“全局扁平 instance 池”语义
## 方案比较
### 方案 Aservice 层固定按 input length 选 bucketrouter 只负责 bucket 内 instance
优点:
- 对现有代码侵入最小
- local router 基本不用改
缺点:
- 无法研究“router 不严格按照 input length 分发会怎样”
- bucket policy 被固化,无法与 instance policy 解耦对比
- global 视角只能做观测,不能做真正决策
### 方案 B两级路由global router 选 bucketlocal router 选 bucket 内 instance
优点:
- bucket policy 和 instance policy 清晰解耦
- 符合目标架构,也适合做对照实验
- 可以最大程度复用现有 router 实现作为 local router
缺点:
- 需要新增 service 层摘要视图与 global router 接口
- driver / events / metrics 要显式携带 bucket 维度
### 方案 C全局扁平 router对所有 instance 跨 bucket 一起打分
优点:
- 表面上最自由
缺点:
- bucket policy 与 instance policy 混在一起,实验解释性差
- 现有大多数 router 要重写
- 容易把“bucket 是独立实例池”的物理边界冲淡
推荐方案:方案 B。
原因bucket 是 service-level 拓扑与隔离边界instance selection 是 bucket 内局部调度问题。这两个层次应该分开建模否则后续无法清晰回答“bucket 本身是否有价值”和“instance 级路由算法是否有效”。
## 配置设计
`cluster` 从现在的单实例池配置扩展为显式 bucket 配置。
目标 YAML 形态:
```yaml
cluster:
meta_store:
ttl_seconds: 1000.0
router:
mode: cache_affinity
precise_probe_latency_us: 10.0
precise_probe_topk: 4
load_alpha: 0.1
score_alpha: 1.0
score_beta: 0.1
prefix_k: 8
affinity_fan_out: 0
global_router:
mode: strict_input_length
length_penalty_weight: 1.0
load_weight: 1.0
cache_weight: 1.0
buckets:
- name: short
input_length_min: 0
input_length_max: 32768
num_instances: 3
- name: medium
input_length_min: 32769
input_length_max: 81920
num_instances: 4
- name: long
input_length_min: 81921
input_length_max: 131072
num_instances: 3
```
其中:
- `cluster.router` 继续表示 bucket 内 local router 配置,全局统一
- 新增 `cluster.global_router`,表示 bucket 选择策略
- `cluster.buckets` 显式描述 service 拓扑
第一阶段建议继续兼容旧配置:
- 若只提供 `cluster.num_instances`,则视为单 bucket 模式
- 若提供 `cluster.buckets`,则进入多 bucket 模式
- 两者同时出现时直接报错,避免歧义
配置校验约束:
1. `buckets` 非空
2. 每个 bucket `num_instances > 0`
3. `input_length_min <= input_length_max`
4. bucket 区间不重叠
5. bucket 排序后必须能唯一命中一个 bucket
6. 旧模式与新模式互斥
## 运行时架构
运行时拆成三层:
### 1. BucketedService
新增一个 service 层对象,持有多个 bucket。每个 bucket 包含:
- `bucket_id`
- bucket 配置
- 独立的 `Cluster`
`BucketedService` 职责:
- 为请求构造所有 bucket 的摘要视图
- 调用 global router 选择 bucket
- 将请求转发给选中的 bucket 内 `Cluster`
- 提供全量 bucket / instance 遍历接口,供 driver 做采样与 tick 调度
### 2. Cluster
现有 `Cluster` 语义收缩为“单个 bucket 内的 cluster”
- 只持有该 bucket 的 `instances`
- 只持有该 bucket 的 `meta_store`
- 只运行该 bucket 的 local router
`Cluster::route_and_admit` 不再负责 bucket 选择,只负责:
- 在 bucket 内调用 local router 选 instance
- 执行该 bucket 内的 L0/L1/remote/miss 路径
- 返回 bucket 维度补充后的 admission stats
### 3. Router 分层
router 明确拆分为:
- `GlobalRouter`:负责 bucket 选择
- `LocalRouter`:负责 bucket 内 instance 选择
现有 `src/router/*` 中的大部分算法迁移为 `LocalRouter` 实现。
## Router 接口设计
### GlobalRouter
global router 只看 bucket 摘要,不直接操作实例数组。
建议接口:
```rust
trait GlobalRouter {
fn name(&self) -> &'static str;
fn route_bucket(
&mut self,
req: &RequestRecord,
buckets: &[BucketView],
now: f64,
) -> GlobalRouteDecision;
}
```
`BucketView` 是只读摘要,至少包含:
- `bucket_id`
- `name`
- `input_length_min`
- `input_length_max`
- `num_instances`
- `queue_len_sum`
- `queue_len_max`
- `kv_blocks_used_sum`
- `kv_blocks_total_sum`
- `active_requests`
- `predicted_prefix`
- 可选的 `estimated_drain_time`
`predicted_prefix` 表示该 bucket 对当前请求的 bucket 级 prefix 命中预测,用于让 global router 感知 bucket 级 cache affinity。
### LocalRouter
local router 继续以 bucket 内 instance 池为输入。
建议接口保持和现有语义接近:
```rust
trait LocalRouter {
fn name(&self) -> &'static str;
fn route_instance(
&mut self,
req: &RequestRecord,
instances: &[Instance],
meta: &MetaStore,
now: f64,
) -> LocalRouteDecision;
}
```
现有 `RouteDecision` 需要拆成两层,最后再合并成对外统一的日志结构:
- `GlobalRouteDecision`
- `LocalRouteDecision`
- `RouteDecision`:包含 `chosen_bucket + chosen_instance + 两层 candidates`
## Bucket 隔离语义
bucket 是显式物理隔离边界,不是逻辑标签。
必须满足:
- 请求一旦进入 bucket只能使用该 bucket 的实例池
- L0 / L1 只在 bucket 内可见
- `meta_store` 只描述该 bucket 内哪些实例持有块
- remote RDMA 只允许从同 bucket 其他实例拉取
- bucket 之间不共享 owner 信息
这保证了 simulator 中的 bucket 与实际服务拓扑有明确对应关系,避免 global router 虽然“知道 bucket”但底层缓存模型仍然偷偷全局共享从而污染实验结论。
## 首批 Global Bucket Policies
第一阶段只实现两个 global bucket policy。
### 1. strict_input_length
语义:
- 只允许选择 `req.input_len` 所在区间的 bucket
用途:
- 作为严格长度分桶的基线策略
- 对应最初目标架构图
### 2. bucket_score
语义:
- 对所有 bucket 计算分数并选择最优 bucket
第一阶段分数只使用少量强信号:
- `length_penalty`:请求长度偏离 bucket 目标范围的惩罚
- `load`bucket 总负载或最大负载
- `miss`bucket 级预测 miss来自当前请求在该 bucket 上的 `predicted_prefix`
目标形式:
```text
score = a * length_penalty + b * load + c * miss
```
设计意图:
- 能研究“非严格按 input length 分发”的收益或损失
- 同时保留长度匹配偏好,避免第一阶段就退化为完全无约束调度
暂不实现:
- 完全扁平的跨 bucket instance 全局打分
- per-bucket 特化 global scoring
- 跨 bucket 回退式 cache 共享
## 事件与 Driver 设计
当前 `Event::BatchTick { instance }` 假设 instance 是全局单层编号。多 bucket 后需要改成:
```rust
Event::BatchTick { bucket: BucketId, instance: InstanceId }
```
driver 主循环改为:
1. `Arrival`
2. 读取请求
3. 调用 `BucketedService::route_and_admit`
4. 记录全局+局部路由决策
5.`(bucket, instance)` 安排 `BatchTick`
`Sample` 事件可以继续是全局事件,但采样时要遍历所有 buckets 的所有 instances。
`inflight` 结构保留按 `req_id` 索引,但 value 中新增:
- `bucket`
- `bucket_policy`
- `length_bucket_match`
- 可选的 `bucket_predicted_prefix`
## Metrics 与可观测性
这次重构的重点之一是让 bucket policy 可研究,因此 metrics 必须明确区分 bucket 选择和 instance 选择。
### routing_log.jsonl
建议新增字段:
- `global_mode`
- `local_mode`
- `chosen_bucket`
- `chosen_instance`
- `bucket_candidates`
- `instance_candidates`
- `global_reason`
- `local_reason`
其中:
- `bucket_candidates` 记录每个 bucket 的摘要与分数
- `instance_candidates` 记录选中 bucket 内的 instance 级候选信息
### per_request.csv
建议新增字段:
- `bucket`
- `bucket_policy`
- `length_bucket_match`
- `bucket_predicted_prefix`
`length_bucket_match` 用于直接衡量“最终 bucket 是否等于严格长度命中的 bucket”是分析非严格分发影响的关键字段。
### instances.csv
建议新增字段:
- `bucket`
### summary.json
保留全局汇总不变,同时新增 bucket 维度 breakdown。可以采用
-`summary.json` 内增加 `per_bucket`
- 或新增独立 `bucket_summary.json`
第一阶段优先保证可读性与易分析,不需要过度抽象。
## 错误处理
需要显式处理以下失败场景:
- 请求命中 0 个 bucket
- 请求命中多个 bucket
- bucket 配置不合法
- 多 bucket 模式下事件找不到对应 `(bucket, instance)`
- routing log / metrics 缺失 bucket 信息
其中配置相关错误应尽量在启动时失败,而不是等到 trace replay 中途才暴露。
## 测试策略
测试分三层。
### 1. 配置测试
- 旧配置 `num_instances` 模式仍能成功加载
- `buckets` 模式成功加载
- bucket 区间重叠时报错
- `num_instances``buckets` 同时配置时报错
- bucket 未覆盖请求长度时报错或在运行时明确失败
### 2. Service / Driver 测试
- 短请求进入 short bucket
- 长请求进入 long bucket
- `bucket_score` 可以在长度不完全匹配时选择非默认 bucket
- long bucket 请求看不到 short bucket 的 meta-store / remote owner
- `(bucket, instance)` 维度的 `BatchTick` 能正确推进
### 3. 集成 Smoke Test
构造 mixed trace包含多个 input length 段与共享前缀模式,验证:
- 严格 bucket policy 下,请求落入预期 bucket
- `routing_log` 同时记录 bucket 候选和 instance 候选
- `per_request` / `instances` 带 bucket 字段
- `bucket_score``strict_input_length` 在 mixed trace 上产生可观测差异
## 迁移策略
为了控制风险,这次重构按以下顺序推进:
1. 在配置层引入 bucket 结构与校验,但保留旧单 bucket 模式
2. 把现有 `Cluster` 收缩为单 bucket 语义
3. 新增 `BucketedService`,先接通 strict bucket policy
4. 抽出 `GlobalRouter` 接口,补上 `strict_input_length`
5. 把现有 instance 级 router 适配为 `LocalRouter`
6. 扩展 driver / events / metrics 到 `(bucket, instance)` 维度
7. 再实现 `bucket_score` 作为第一种非严格 bucket policy
这样可以先建立正确拓扑与日志,再逐步加入实验策略,避免一次性重写过多核心路径。
## 预期结果
完成后simulator 将具备以下能力:
- 用显式 bucket 拓扑 replay 混合长度 trace
- 研究严格长度分桶是否带来收益
- 研究 global router 在 bucket 维度做非严格调度会产生什么影响
- 在 bucket policy 与 local instance policy 两个层次分别做 ablation
这为后续研究“bucket 是否必要”“bucket 边界怎么设”“global router 应不应该偏离长度分发”提供了清晰、可观测、可复用的 simulator 基础。

View File

@@ -0,0 +1,216 @@
use anyhow::Result;
use super::cluster::{AdmissionStats, Cluster};
use crate::config::{BucketConfig, Config, ModelConfig};
use crate::instance::Instance;
use crate::router::{self, BucketId, GlobalRouter};
use crate::trace::RequestRecord;
pub struct ServiceBucket {
pub id: BucketId,
pub cfg: BucketConfig,
pub cluster: Cluster,
}
impl ServiceBucket {
pub fn instances(&self) -> &[Instance] {
&self.cluster.instances
}
}
pub struct BucketedService {
pub buckets: Vec<ServiceBucket>,
pub global_router: Box<dyn GlobalRouter>,
}
impl BucketedService {
pub fn new(config: &Config, model: &ModelConfig) -> Self {
let buckets = config
.cluster
.effective_buckets()
.into_iter()
.enumerate()
.map(|(idx, cfg)| ServiceBucket {
id: idx as BucketId,
cluster: Cluster::new_for_bucket(config, model, idx as BucketId, cfg.num_instances)
.expect("bucket-local cluster construction should succeed"),
cfg,
})
.collect();
Self {
buckets,
global_router: router::build_global(config),
}
}
pub fn bucket(&self, bucket_id: BucketId) -> &ServiceBucket {
&self.buckets[bucket_id as usize]
}
pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> Result<AdmissionStats> {
let bucket_views = self
.buckets
.iter()
.map(|bucket| bucket.cluster.bucket_view(bucket.id, &bucket.cfg, req, now))
.collect::<Vec<_>>();
let global = self.global_router.route(req, &bucket_views, now)?;
let bucket = &mut self.buckets[global.chosen_bucket as usize];
Ok(bucket
.cluster
.route_and_admit_with_global(req, now, &global))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
BucketConfig, CalibrationConfig, ClusterConfig, Config, GlobalRouterConfig,
GlobalRouterMode, HardwareConfig, MetaStoreConfig, ModelConfig, RouterConfig, RouterMode,
SimConfig,
};
use crate::trace::RequestRecord;
fn test_config() -> Config {
Config {
model: ModelConfig {
name: "test".into(),
num_layers: 4,
num_kv_heads: 2,
head_dim: 64,
dtype_bytes: 2,
block_size_tokens: 16,
flops_per_token_prefill: Some(1.0e9),
attn_quadratic_coeff: Some(64.0),
..Default::default()
},
hardware: HardwareConfig {
gpu_flops: 1.0e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9,
dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9,
pcie_latency_us: 1.0,
rdma_bw: 12.0e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
},
calibration: CalibrationConfig::default(),
cluster: ClusterConfig {
num_instances: None,
buckets: vec![
BucketConfig {
name: "short".into(),
input_length_min: 0,
input_length_max: 32,
num_instances: 2,
},
BucketConfig {
name: "long".into(),
input_length_min: 33,
input_length_max: 96,
num_instances: 1,
},
],
global_router: GlobalRouterConfig {
mode: GlobalRouterMode::StrictInputLength,
length_penalty_weight: 1.0,
load_weight: 1.0,
cache_weight: 1.0,
},
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
router: RouterConfig {
mode: RouterMode::LeastLoaded,
precise_probe_latency_us: 10.0,
precise_probe_topk: 2,
load_alpha: 0.0,
score_alpha: 1.0,
score_beta: 0.1,
prefix_k: 8,
affinity_fan_out: 2,
},
},
sim: SimConfig {
trace_path: String::new(),
max_requests: None,
output_dir: String::new(),
sample_interval_s: 0.0,
seed: 7,
input_length_min: None,
input_length_max: None,
},
}
}
fn req(req_id: u64, input_len: u32, hashes: &[u64]) -> RequestRecord {
RequestRecord {
req_id,
chat_id: req_id as i64,
parent_chat_id: -1,
turn: 0,
arrival: 0.0,
input_len,
output_len: 16,
hash_ids: hashes.to_vec(),
}
}
#[test]
fn strict_input_length_routes_into_matching_bucket() {
let cfg = test_config();
let mut service = BucketedService::new(&cfg, &cfg.model);
let stats = service
.route_and_admit(&req(1, 24, &[10, 11]), 0.0)
.unwrap();
assert_eq!(stats.bucket, 0);
assert_eq!(stats.decision.chosen_bucket, 0);
assert_eq!(
stats.decision.global_reason,
"unique bucket range contains input_length"
);
assert_eq!(
stats.decision.local_reason,
"argmin(kv_used + alpha * queue_len)"
);
assert_eq!(service.bucket(0).instances().len(), 2);
}
#[test]
fn bucket_meta_store_is_isolated() {
let cfg = test_config();
let mut service = BucketedService::new(&cfg, &cfg.model);
let _ = service
.route_and_admit(&req(1, 24, &[10, 11]), 0.0)
.unwrap();
let long_stats = service
.route_and_admit(&req(2, 64, &[10, 11, 12, 13]), 1.0)
.unwrap();
assert_eq!(long_stats.bucket, 1);
assert_eq!(long_stats.remote_hit_blocks, 0);
assert_eq!(long_stats.l1_hit_blocks, 0);
}
#[test]
fn unmatched_input_length_returns_recoverable_error() {
let mut cfg = test_config();
cfg.cluster.buckets[1].input_length_min = 40;
let mut service = BucketedService::new(&cfg, &cfg.model);
let err = service
.route_and_admit(&req(3, 36, &[20, 21, 22]), 0.0)
.unwrap_err();
assert!(err.to_string().contains("no bucket"));
assert!(err.to_string().contains("input_length=36"));
}
}

View File

@@ -1,18 +1,21 @@
//! Cluster: routes arrivals, performs the L0 / L1 / remote-RDMA fetch chain
//! described in the design diagram, and bookkeeps the global meta store.
use anyhow::Result;
use crate::cluster::meta_store::MetaStore;
use crate::config::{Config, ModelConfig};
use crate::config::{BucketConfig, Config, ModelConfig};
use crate::instance::instance::AdmittedRequest;
use crate::instance::kv_cache::L1Change;
use crate::instance::Instance;
use crate::router::{self, RouteDecision, Router};
use crate::router::{self, BucketId, BucketView, GlobalRouteDecision, RouteDecision, Router};
use crate::trace::RequestRecord;
use crate::ttft::{classify_prefix_tiers, TtftModel};
use crate::types::InstanceId;
#[derive(Debug, Clone)]
pub struct AdmissionStats {
pub bucket: BucketId,
pub instance: InstanceId,
pub l0_hit_blocks: u32,
pub l1_hit_blocks: u32,
@@ -36,39 +39,41 @@ pub struct Cluster {
}
impl Cluster {
pub fn new(config: &Config, model: &ModelConfig) -> Self {
let mut instances = Vec::with_capacity(config.cluster.num_instances as usize);
for id in 0..config.cluster.num_instances {
instances.push(Instance::new(
id as InstanceId,
model,
&config.hardware,
&config.calibration,
));
}
let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds);
let router = router::build(config, config.sim.seed);
Self {
instances,
meta_store,
router,
block_size_tokens: model.block_size_tokens,
kv_block_bytes: model.kv_block_bytes(),
ttft_model: TtftModel::new(
&config.hardware,
&config.calibration,
model.kv_block_bytes(),
),
}
pub fn new(config: &Config, model: &ModelConfig) -> Result<Self> {
let total_instances = config.cluster.require_legacy_single_pool("Cluster::new")?;
Self::build_local_cluster(config, model, total_instances)
}
pub fn new_for_bucket(
config: &Config,
model: &ModelConfig,
_bucket_id: BucketId,
num_instances: u32,
) -> Result<Self> {
let mut local_config = config.clone();
local_config.cluster.num_instances = Some(num_instances);
local_config.cluster.buckets.clear();
Self::build_local_cluster(&local_config, model, num_instances)
}
/// Route + admit a request. Returns the chosen instance plus rich
/// per-request stats for metrics. Does NOT schedule the BatchTick — the
/// simulator driver does that based on the returned `ready_at`.
pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats {
let global = GlobalRouteDecision::single_bucket(req.req_id, 0);
self.route_and_admit_with_global(req, now, &global)
}
pub fn route_and_admit_with_global(
&mut self,
req: &RequestRecord,
now: f64,
global: &GlobalRouteDecision,
) -> AdmissionStats {
let decision = self
.router
.route(req, &self.instances, &self.meta_store, now);
.route(req, &self.instances, &self.meta_store, now)
.with_global(global);
let inst_id = decision.chosen;
let probe_overhead_s = decision.probe_overhead_s;
let scheduler_overhead_s = self
@@ -151,6 +156,7 @@ impl Cluster {
let fetch_time_s = (t - effective_now).max(0.0);
AdmissionStats {
bucket: decision.chosen_bucket,
instance: inst_id,
l0_hit_blocks: l0_hits,
l1_hit_blocks: l1_hits,
@@ -165,6 +171,30 @@ impl Cluster {
}
}
pub fn bucket_view(
&self,
bucket_id: BucketId,
cfg: &BucketConfig,
req: &RequestRecord,
now: f64,
) -> BucketView {
let predicted_prefix = self
.meta_store
.score_prefix(&req.hash_ids, now, self.instances.len())
.into_iter()
.max()
.unwrap_or(0);
BucketView {
id: bucket_id,
input_length_min: cfg.input_length_min,
input_length_max: cfg.input_length_max,
num_instances: self.instances.len() as u32,
total_queue_len: self.instances.iter().map(Instance::queue_len).sum(),
total_load_blocks: self.instances.iter().map(|inst| inst.kv_blocks_used).sum(),
predicted_prefix,
}
}
fn apply_l1_changes(
meta_store: &mut MetaStore,
inst_id: InstanceId,
@@ -178,14 +208,44 @@ impl Cluster {
}
}
}
fn build_local_cluster(
config: &Config,
model: &ModelConfig,
num_instances: u32,
) -> Result<Self> {
let mut instances = Vec::with_capacity(num_instances as usize);
for id in 0..num_instances {
instances.push(Instance::new(
id as InstanceId,
model,
&config.hardware,
&config.calibration,
));
}
let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds);
let router = router::build(config, config.sim.seed);
Ok(Self {
instances,
meta_store,
router,
block_size_tokens: model.block_size_tokens,
kv_block_bytes: model.kv_block_bytes(),
ttft_model: TtftModel::new(
&config.hardware,
&config.calibration,
model.kv_block_bytes(),
),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CalibrationConfig, ClusterConfig, Config, HardwareConfig, MetaStoreConfig, ModelConfig,
RouterConfig, RouterMode, SimConfig,
BucketConfig, CalibrationConfig, ClusterConfig, Config, HardwareConfig, MetaStoreConfig,
ModelConfig, RouterConfig, RouterMode, SimConfig,
};
use crate::trace::RequestRecord;
@@ -226,7 +286,9 @@ mod tests {
..CalibrationConfig::default()
},
cluster: ClusterConfig {
num_instances: 1,
num_instances: Some(1),
buckets: Vec::new(),
global_router: Default::default(),
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
@@ -256,7 +318,7 @@ mod tests {
#[test]
fn l1_ready_at_includes_dram_and_transform_overhead() {
let cfg = test_config(RouterMode::EstimatedTtft);
let mut cluster = Cluster::new(&cfg, &cfg.model);
let mut cluster = Cluster::new(&cfg, &cfg.model).unwrap();
let req = RequestRecord {
req_id: 1,
chat_id: 0,
@@ -282,4 +344,22 @@ mod tests {
assert!(stats.ready_at > pure_pcie);
}
#[test]
fn cluster_new_rejects_bucketed_configs() {
let mut cfg = test_config(RouterMode::EstimatedTtft);
cfg.cluster.num_instances = None;
cfg.cluster.buckets = vec![BucketConfig {
name: "short".into(),
input_length_min: 0,
input_length_max: 64,
num_instances: 2,
}];
let result = Cluster::new(&cfg, &cfg.model);
assert!(result.is_err(), "bucketed Cluster::new should fail");
let err = result.err().unwrap();
assert!(err.to_string().contains("Cluster::new"));
assert!(err.to_string().contains("cluster.buckets"));
}
}

View File

@@ -1,6 +1,8 @@
pub mod bucketed_service;
#[allow(clippy::module_inception)]
pub mod cluster;
pub mod meta_store;
pub use bucketed_service::BucketedService;
pub use cluster::Cluster;
pub use meta_store::MetaStore;

View File

@@ -379,11 +379,76 @@ fn default_first_token_ready_us() -> f64 {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterConfig {
pub num_instances: u32,
#[serde(default)]
pub num_instances: Option<u32>,
#[serde(default)]
pub buckets: Vec<BucketConfig>,
#[serde(default)]
pub global_router: GlobalRouterConfig,
pub meta_store: MetaStoreConfig,
pub router: RouterConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct BucketConfig {
pub name: String,
pub input_length_min: u32,
pub input_length_max: u32,
pub num_instances: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GlobalRouterConfig {
#[serde(default)]
pub mode: GlobalRouterMode,
#[serde(default = "default_global_router_length_penalty_weight")]
pub length_penalty_weight: f64,
#[serde(default = "default_global_router_load_weight")]
pub load_weight: f64,
#[serde(default = "default_global_router_cache_weight")]
pub cache_weight: f64,
}
impl Default for GlobalRouterConfig {
fn default() -> Self {
Self {
mode: GlobalRouterMode::StrictInputLength,
length_penalty_weight: default_global_router_length_penalty_weight(),
load_weight: default_global_router_load_weight(),
cache_weight: default_global_router_cache_weight(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum GlobalRouterMode {
#[default]
StrictInputLength,
BucketScore,
}
impl GlobalRouterMode {
pub fn as_str(&self) -> &'static str {
match self {
Self::StrictInputLength => "strict_input_length",
Self::BucketScore => "bucket_score",
}
}
}
fn default_global_router_length_penalty_weight() -> f64 {
1.0
}
fn default_global_router_load_weight() -> f64 {
1.0
}
fn default_global_router_cache_weight() -> f64 {
1.0
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetaStoreConfig {
pub ttl_seconds: f64,
@@ -432,6 +497,105 @@ fn default_prefix_k() -> usize {
8
}
impl ClusterConfig {
pub fn require_legacy_single_pool(&self, context: &str) -> Result<u32> {
if !self.buckets.is_empty() {
anyhow::bail!("{context} does not support cluster.buckets until Task 2 lands");
}
self.legacy_num_instances()
.ok_or_else(|| anyhow::anyhow!("{context} requires cluster.num_instances"))
}
pub fn legacy_num_instances(&self) -> Option<u32> {
if self.buckets.is_empty() {
self.num_instances
} else {
None
}
}
pub fn total_instances(&self) -> u32 {
self.legacy_num_instances()
.unwrap_or_else(|| self.buckets.iter().map(|bucket| bucket.num_instances).sum())
}
pub fn bucket_index_for_input_len(&self, input_len: u32) -> Result<usize> {
if self.buckets.is_empty() {
return Ok(0);
}
self.buckets
.iter()
.position(|bucket| {
bucket.input_length_min <= input_len && input_len <= bucket.input_length_max
})
.ok_or_else(|| {
anyhow::anyhow!(
"cluster.global_router.mode={} has no bucket for input_length={input_len}",
self.global_router.mode.as_str()
)
})
}
pub fn effective_buckets(&self) -> Vec<BucketConfig> {
if !self.buckets.is_empty() {
return self.buckets.clone();
}
vec![BucketConfig {
name: "default".to_string(),
input_length_min: 0,
input_length_max: u32::MAX,
num_instances: self
.num_instances
.expect("legacy single-pool cluster must have num_instances"),
}]
}
pub fn validate(&self) -> Result<()> {
if self.num_instances.is_some() && !self.buckets.is_empty() {
anyhow::bail!("cluster.num_instances and cluster.buckets are mutually exclusive");
}
if self.buckets.is_empty() {
let num_instances = self.num_instances.ok_or_else(|| {
anyhow::anyhow!("cluster must set either num_instances or buckets")
})?;
anyhow::ensure!(num_instances > 0, "cluster.num_instances must be positive");
return Ok(());
}
for bucket in &self.buckets {
anyhow::ensure!(
bucket.input_length_min <= bucket.input_length_max,
"cluster bucket '{}' has input_length_min > input_length_max",
bucket.name
);
anyhow::ensure!(
bucket.num_instances > 0,
"cluster bucket '{}' must have num_instances > 0",
bucket.name
);
}
let mut sorted = self.buckets.iter().collect::<Vec<_>>();
sorted.sort_by_key(|bucket| (bucket.input_length_min, bucket.input_length_max));
for pair in sorted.windows(2) {
let prev = pair[0];
let next = pair[1];
anyhow::ensure!(
prev.input_length_max < next.input_length_min,
"cluster buckets '{}' and '{}' overlap",
prev.name,
next.name
);
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RouterMode {
@@ -544,7 +708,7 @@ impl Config {
.with_context(|| format!("parsing config {}", path.display()))?;
let yaml_dir = path.parent().unwrap_or(Path::new("."));
raw.resolve(yaml_dir)
.with_context(|| format!("resolving config {}", path.display()))
.map_err(|err| anyhow::anyhow!("resolving config {}: {err}", path.display()))
}
}
@@ -678,7 +842,10 @@ impl RawConfig {
model,
hardware,
calibration: self.calibration,
cluster: self.cluster,
cluster: {
self.cluster.validate()?;
self.cluster
},
sim: self.sim,
})
}
@@ -1003,4 +1170,298 @@ sim:
assert_eq!(cfg.model.weight_dtype.as_deref(), Some("fp4"));
assert!((cfg.model.weight_dtype_bytes() - 0.5).abs() < 1e-12);
}
#[test]
fn bucketed_config_loads_and_preserves_legacy_single_pool() {
let legacy = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
num_instances: 2
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: strict_input_length
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&legacy).unwrap();
assert_eq!(cfg.cluster.legacy_num_instances(), Some(2));
assert_eq!(cfg.cluster.buckets.len(), 0);
let bucketed = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: strict_input_length
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
- name: long
input_length_min: 33
input_length_max: 96
num_instances: 1
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&bucketed).unwrap();
assert_eq!(cfg.cluster.legacy_num_instances(), None);
assert_eq!(cfg.cluster.buckets.len(), 2);
assert_eq!(cfg.cluster.buckets[0].name, "short");
assert_eq!(cfg.cluster.buckets[1].num_instances, 1);
assert_eq!(cfg.cluster.total_instances(), 3);
}
#[test]
fn bucketed_config_deserializes_global_router_weights() {
let path = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: bucket_score
length_penalty_weight: 1.5
load_weight: 0.75
cache_weight: 2.25
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&path).unwrap();
assert_eq!(
cfg.cluster.global_router.mode,
GlobalRouterMode::BucketScore
);
assert!((cfg.cluster.global_router.length_penalty_weight - 1.5).abs() < 1e-12);
assert!((cfg.cluster.global_router.load_weight - 0.75).abs() < 1e-12);
assert!((cfg.cluster.global_router.cache_weight - 2.25).abs() < 1e-12);
}
#[test]
fn bucketed_config_rejects_overlapping_ranges_and_mixed_modes() {
let overlap = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: strict_input_length
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
- name: overlap
input_length_min: 32
input_length_max: 96
num_instances: 1
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let err = Config::from_yaml_path(&overlap).unwrap_err();
assert!(err.to_string().contains("overlap"));
let mixed = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
num_instances: 2
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: strict_input_length
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let err = Config::from_yaml_path(&mixed).unwrap_err();
assert!(err.to_string().contains("num_instances"));
assert!(err.to_string().contains("buckets"));
}
#[test]
fn bucketed_config_reports_unmatched_input_length_gaps_clearly() {
let gapful = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: strict_input_length
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
- name: long
input_length_min: 64
input_length_max: 96
num_instances: 1
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&gapful).unwrap();
let err = cfg.cluster.bucket_index_for_input_len(40).unwrap_err();
assert!(err.to_string().contains("40"));
assert!(err.to_string().contains("no bucket"));
}
#[test]
fn bucketed_config_requires_legacy_single_pool_for_legacy_runtime_paths() {
let bucketed = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: strict_input_length
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&bucketed).unwrap();
let err = cfg
.cluster
.require_legacy_single_pool("driver run")
.unwrap_err();
assert!(err.to_string().contains("driver run"));
assert!(err.to_string().contains("cluster.buckets"));
}
}

View File

@@ -6,7 +6,7 @@ use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::cluster::Cluster;
use crate::cluster::BucketedService;
use crate::config::{Config, RouterMode};
use crate::metrics::ablation::AblationRow;
use crate::metrics::per_request::{PerRequestRow, PerRequestWriter};
@@ -37,7 +37,9 @@ pub struct RunOutputs {
#[derive(Debug, Clone)]
struct InflightInfo {
arrival: f64,
bucket: u32,
instance: u32,
length_bucket_match: bool,
total_blocks: u32,
l0_hit_blocks: u32,
l1_hit_blocks: u32,
@@ -49,7 +51,7 @@ struct InflightInfo {
}
pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
let mut cluster = Cluster::new(config, &config.model);
let mut service = BucketedService::new(config, &config.model);
let mut q = EventQueue::new();
// Output directory
@@ -108,13 +110,16 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
Some(r) => r.clone(),
None => continue,
};
let stats = cluster.route_and_admit(&req, now);
let stats = service.route_and_admit(&req, now)?;
rt_writer.write(&stats.decision)?;
let strict_bucket = config.cluster.bucket_index_for_input_len(req.input_len)?;
inflight.insert(
req_id,
InflightInfo {
arrival: req.arrival,
bucket: stats.bucket,
instance: stats.instance,
length_bucket_match: stats.bucket as usize == strict_bucket,
total_blocks: req.hash_ids.len() as u32,
l0_hit_blocks: stats.l0_hit_blocks,
l1_hit_blocks: stats.l1_hit_blocks,
@@ -125,20 +130,23 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
probe_overhead_s: stats.probe_overhead_s,
},
);
let inst = &mut cluster.instances[stats.instance as usize];
let inst = &mut service.buckets[stats.bucket as usize].cluster.instances
[stats.instance as usize];
if !inst.tick_scheduled {
inst.tick_scheduled = true;
let when = stats.ready_at.max(now);
q.schedule(
when,
Event::BatchTick {
bucket: stats.bucket,
instance: stats.instance,
},
);
}
}
Event::BatchTick { instance } => {
let inst = &mut cluster.instances[instance as usize];
Event::BatchTick { bucket, instance } => {
let inst =
&mut service.buckets[bucket as usize].cluster.instances[instance as usize];
inst.tick_scheduled = false;
let result = inst.step(now);
for (rid, ttft, end) in result.completed {
@@ -148,7 +156,9 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
arrival: info.arrival,
ttft,
e2e: end - info.arrival,
bucket: info.bucket,
instance: info.instance,
length_bucket_match: info.length_bucket_match,
total_blocks: info.total_blocks,
l0_hit_blocks: info.l0_hit_blocks,
l1_hit_blocks: info.l1_hit_blocks,
@@ -163,24 +173,28 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
}
}
if let Some(next) = result.next_tick {
let inst = &mut cluster.instances[instance as usize];
let inst =
&mut service.buckets[bucket as usize].cluster.instances[instance as usize];
if !inst.tick_scheduled {
inst.tick_scheduled = true;
q.schedule(next.max(now), Event::BatchTick { instance });
q.schedule(next.max(now), Event::BatchTick { bucket, instance });
}
}
}
Event::Sample => {
for inst in &cluster.instances {
let busy = if inst.queue_len() > 0 { 1 } else { 0 };
ts_writer.write(&TimeseriesRow {
t: now,
instance: inst.id,
queue_len: inst.queue_len(),
kv_blocks_used: inst.kv_blocks_used,
kv_blocks_total: inst.hbm_block_budget,
busy,
})?;
for bucket in &service.buckets {
for inst in &bucket.cluster.instances {
let busy = if inst.queue_len() > 0 { 1 } else { 0 };
ts_writer.write(&TimeseriesRow {
t: now,
bucket: bucket.id,
instance: inst.id,
queue_len: inst.queue_len(),
kv_blocks_used: inst.kv_blocks_used,
kv_blocks_total: inst.hbm_block_budget,
busy,
})?;
}
}
}
Event::Stop => break,
@@ -217,6 +231,8 @@ pub fn ablate_fixed_placement_with_parallelism(
evict_policies: &[ReplayEvictPolicy],
jobs: usize,
) -> Result<Vec<AblationRow>> {
base.cluster
.require_legacy_single_pool("fixed-placement ablation")?;
let mut out = Vec::new();
for &policy in evict_policies {
if policy != ReplayEvictPolicy::Lru {

View File

@@ -50,9 +50,14 @@ struct ConfigOverrides {
}
impl ConfigOverrides {
fn apply(&self, cfg: &mut Config) {
fn apply(&self, cfg: &mut Config) -> Result<()> {
if let Some(n) = self.num_instances {
cfg.cluster.num_instances = n;
if !cfg.cluster.buckets.is_empty() {
anyhow::bail!(
"--num-instances does not support cluster.buckets until Task 2 lands"
);
}
cfg.cluster.num_instances = Some(n);
}
if let Some(m) = self.max_requests {
cfg.sim.max_requests = Some(m);
@@ -78,6 +83,7 @@ impl ConfigOverrides {
if let Some(hi) = self.input_length_max {
cfg.sim.input_length_max = Some(hi);
}
Ok(())
}
}
@@ -204,7 +210,8 @@ fn main() -> Result<()> {
fn load(config: &PathBuf, overrides: &ConfigOverrides) -> Result<Config> {
let mut cfg = Config::from_yaml_path(config)?;
overrides.apply(&mut cfg);
overrides.apply(&mut cfg)?;
cfg.cluster.validate()?;
Ok(cfg)
}
@@ -268,7 +275,8 @@ fn cmd_ablate(
auto_target_ttft_mean,
probe_mode.as_str()
);
base.cluster.num_instances = chosen;
base.cluster.num_instances = Some(chosen);
base.cluster.buckets.clear();
}
eprintln!(
@@ -283,7 +291,7 @@ fn cmd_ablate(
.map(ReplayEvictPolicy::as_str)
.collect::<Vec<_>>()
.join(","),
base.cluster.num_instances,
base.cluster.total_instances(),
if jobs == 0 {
"auto".to_string()
} else {
@@ -310,6 +318,9 @@ fn auto_select_instances(
probe: RouterMode,
target_ttft_mean: f64,
) -> Result<u32> {
base.cluster
.require_legacy_single_pool("auto-instances calibration")?;
#[derive(serde::Serialize)]
struct CalibRow {
num_instances: u32,
@@ -330,7 +341,7 @@ fn auto_select_instances(
for &n in candidates {
let mut cfg = base.clone();
cfg.cluster.num_instances = n;
cfg.cluster.num_instances = Some(n);
cfg.cluster.router.mode = probe;
// Isolate calibration output so ablation runs don't overwrite it.
cfg.sim.output_dir = out_root
@@ -429,7 +440,7 @@ fn cmd_validate(path: &PathBuf, overrides: &ConfigOverrides) -> Result<()> {
let hbm_blocks = (cfg.hardware.hbm_bytes / block_bytes) as u64;
let dram_blocks = (cfg.hardware.dram_bytes / block_bytes) as u64;
eprintln!("per-instance HBM blocks = {hbm_blocks}, DRAM blocks = {dram_blocks}");
eprintln!("num_instances = {}", cfg.cluster.num_instances);
eprintln!("num_instances = {}", cfg.cluster.total_instances());
// Sample prefill times at a few prompt lengths.
eprintln!("prefill_time samples:");
for &n in &[256, 1024, 4096, 16384, 65536, 131072] {
@@ -460,9 +471,10 @@ fn cmd_oracle(
out_path: Option<&std::path::Path>,
) -> Result<()> {
let cfg = load(path, overrides)?;
cfg.cluster.require_legacy_single_pool("oracle analysis")?;
let block_bytes = cfg.model.kv_block_bytes() as f64;
let per_instance_blocks = (cfg.hardware.hbm_bytes / block_bytes).max(1.0) as u64;
let aggregate_blocks = per_instance_blocks * cfg.cluster.num_instances as u64;
let aggregate_blocks = per_instance_blocks * cfg.cluster.total_instances() as u64;
let capacity = match (capacity_blocks, per_instance) {
(Some(_), true) => {
return Err(anyhow::anyhow!(
@@ -518,7 +530,7 @@ fn cmd_oracle(
records.len(),
capacity,
per_instance_blocks,
cfg.cluster.num_instances,
cfg.cluster.total_instances(),
if per_instance {
", per-instance mode"
} else {
@@ -541,3 +553,123 @@ fn cmd_oracle(
eprintln!("[oracle] wrote {}", target.display());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use kvcache_simulator::config::{
BucketConfig, CalibrationConfig, ClusterConfig, GlobalRouterConfig, HardwareConfig,
MetaStoreConfig, ModelConfig, RouterConfig, SimConfig,
};
fn bucketed_config(out_dir: &str) -> Config {
Config {
model: ModelConfig {
name: "test".into(),
num_layers: 4,
num_kv_heads: 2,
head_dim: 64,
dtype_bytes: 2,
block_size_tokens: 16,
flops_per_token_prefill: Some(1.0e9),
attn_quadratic_coeff: Some(64.0),
..Default::default()
},
hardware: HardwareConfig {
gpu_flops: 1.0e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9,
dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9,
pcie_latency_us: 1.0,
rdma_bw: 12.0e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
},
calibration: CalibrationConfig::default(),
cluster: ClusterConfig {
num_instances: None,
buckets: vec![
BucketConfig {
name: "short".into(),
input_length_min: 0,
input_length_max: 64,
num_instances: 2,
},
BucketConfig {
name: "long".into(),
input_length_min: 65,
input_length_max: 128,
num_instances: 1,
},
],
global_router: GlobalRouterConfig::default(),
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
router: RouterConfig {
mode: RouterMode::Random,
precise_probe_latency_us: 10.0,
precise_probe_topk: 4,
load_alpha: 0.1,
score_alpha: 1.0,
score_beta: 0.1,
prefix_k: 8,
affinity_fan_out: 0,
},
},
sim: SimConfig {
trace_path: "unused.jsonl".into(),
max_requests: None,
output_dir: out_dir.into(),
sample_interval_s: 0.0,
seed: 7,
input_length_min: None,
input_length_max: None,
},
}
}
#[test]
fn num_instances_override_rejects_bucketed_configs() {
let mut cfg = bucketed_config(std::env::temp_dir().to_str().unwrap());
let overrides = ConfigOverrides {
num_instances: Some(8),
..ConfigOverrides::default()
};
let err = overrides.apply(&mut cfg).unwrap_err();
assert!(err.to_string().contains("--num-instances"));
assert!(err.to_string().contains("cluster.buckets"));
}
#[test]
fn auto_instances_rejects_bucketed_configs() {
let cfg = bucketed_config(std::env::temp_dir().to_str().unwrap());
let err = auto_select_instances(&cfg, &[4, 8], RouterMode::Random, 1.0).unwrap_err();
assert!(err.to_string().contains("auto-instances"));
assert!(err.to_string().contains("cluster.buckets"));
}
#[test]
fn oracle_rejects_bucketed_configs() {
let tmp = std::env::temp_dir().join("kvcache_sim_main_tests");
let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap();
let path = tmp.join("bucketed.yaml");
let cfg = bucketed_config(tmp.to_str().unwrap());
std::fs::write(&path, serde_yaml::to_string(&cfg).unwrap()).unwrap();
let err = cmd_oracle(&path, &ConfigOverrides::default(), None, false, None).unwrap_err();
assert!(err.to_string().contains("oracle analysis"));
assert!(err.to_string().contains("cluster.buckets"));
}
}

View File

@@ -8,7 +8,9 @@ pub struct PerRequestRow {
pub arrival: f64,
pub ttft: f64,
pub e2e: f64,
pub bucket: u32,
pub instance: u32,
pub length_bucket_match: bool,
pub total_blocks: u32,
pub l0_hit_blocks: u32,
pub l1_hit_blocks: u32,

View File

@@ -5,6 +5,7 @@ use std::path::Path;
#[derive(Debug, Clone, Serialize)]
pub struct TimeseriesRow {
pub t: f64,
pub bucket: u32,
pub instance: u32,
pub queue_len: u32,
pub kv_blocks_used: u32,

View File

@@ -196,7 +196,12 @@ fn build_next_use(records: &[RequestRecord]) -> Vec<Vec<u32>> {
/// Implementation: lazy-deletion max-heap keyed by next-use index. Each
/// cache entry has a version; the heap may contain stale entries from
/// previous insertions, which we skip on pop.
fn run_belady(records: &[RequestRecord], next_use: &[Vec<u32>], capacity: usize, mask: &[bool]) -> u64 {
fn run_belady(
records: &[RequestRecord],
next_use: &[Vec<u32>],
capacity: usize,
mask: &[bool],
) -> u64 {
if capacity == 0 {
return 0;
}
@@ -327,10 +332,7 @@ mod tests {
// req 0 populates blocks [1,2,3] but is not counted.
// req 1 has prefix [1,2,3,4] — the first 3 blocks are cache hits
// because req 0 populated them, even though req 0 is masked out.
let recs = vec![
req(0, 0.0, vec![1, 2, 3]),
req(1, 1.0, vec![1, 2, 3, 4]),
];
let recs = vec![req(0, 0.0, vec![1, 2, 3]), req(1, 1.0, vec![1, 2, 3, 4])];
let mask = vec![false, true];
let out = analyze(&recs, 100, Some(&mask));
// Only req 1 is counted: total = 4, hits = 3 (prefix [1,2,3] hit)

View File

@@ -496,6 +496,8 @@ pub fn replay_fixed_placement(
placements: &[PlacementEntry],
policy: ReplayEvictPolicy,
) -> Result<ReplaySummary> {
cfg.cluster
.require_legacy_single_pool("fixed-placement replay")?;
if records.len() != placements.len() {
return Err(anyhow!(
"records/placements length mismatch: {} vs {}",
@@ -519,7 +521,7 @@ pub fn replay_fixed_placement(
let block_bytes = cfg.model.kv_block_bytes() as f64;
let l0_cap = (cfg.hardware.hbm_bytes / block_bytes).max(1.0) as usize;
let l1_cap = (cfg.hardware.dram_bytes / block_bytes).max(1.0) as usize;
let num_instances = cfg.cluster.num_instances as usize;
let num_instances = cfg.cluster.total_instances() as usize;
let mut caches: Vec<ReplayInstanceCache> = (0..num_instances)
.map(|_| ReplayInstanceCache::new(policy, l0_cap, l1_cap))
.collect();

View File

@@ -57,7 +57,7 @@ pub struct AdaptiveAffinityRouter {
impl AdaptiveAffinityRouter {
pub fn new(config: &Config) -> Self {
let n = config.cluster.num_instances as usize;
let n = config.cluster.total_instances() as usize;
let configured_fan_out = config.cluster.router.affinity_fan_out;
let max_fan_out = if configured_fan_out > 0 {
configured_fan_out.max(2).min(n)
@@ -242,13 +242,13 @@ impl Router for AdaptiveAffinityRouter {
self.observe(fp, now);
RouteDecision {
req_id: req.req_id,
mode: "adaptive_affinity",
chosen: instances[chosen_idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"adaptive_affinity",
instances[chosen_idx].id,
0.0,
candidates,
reason,
}
)
}
}

View File

@@ -205,13 +205,13 @@ impl Router for CacheAffinityRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "cache_affinity",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"cache_affinity",
instances[best_idx].id,
0.0,
candidates,
reason: "argmin(α·q γ·l0_hit δ·meta_only) + rendezvous tiebreak",
}
"argmin(α·q γ·l0_hit δ·meta_only) + rendezvous tiebreak",
)
}
}

View File

@@ -77,13 +77,13 @@ impl Router for CacheLoadRouter {
});
}
RouteDecision {
req_id: req.req_id,
mode: "cache_load",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"cache_load",
instances[best_idx].id,
0.0,
candidates,
reason: "least-loaded 1/4, then best local L0 prefix",
}
"least-loaded 1/4, then best local L0 prefix",
)
}
}

View File

@@ -99,13 +99,13 @@ impl Router for CacheScoreRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "cache_score",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"cache_score",
instances[best_idx].id,
0.0,
candidates,
reason: "argmin 2^(α·load + β·miss)",
}
"argmin 2^(α·load + β·miss)",
)
}
}

View File

@@ -74,13 +74,13 @@ impl Router for CacheScoreTtlRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "cache_score_ttl",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"cache_score_ttl",
instances[best_idx].id,
0.0,
candidates,
reason: "argmin 2^(alpha*load + beta*meta_store_miss)",
}
"argmin 2^(alpha*load + beta*meta_store_miss)",
)
}
}

View File

@@ -89,13 +89,13 @@ impl Router for EstimatedTtftRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "estimated_ttft",
chosen: best,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"estimated_ttft",
best,
0.0,
candidates,
reason: "argmin(drain + scheduler + kv_prepare + prefill + first_token_tail)",
}
"argmin(drain + scheduler + kv_prepare + prefill + first_token_tail)",
)
}
}

371
src/router/global_bucket.rs Normal file
View File

@@ -0,0 +1,371 @@
use anyhow::{anyhow, Result};
use serde::Serialize;
use crate::config::{Config, GlobalRouterMode};
use crate::trace::RequestRecord;
pub type BucketId = u32;
#[derive(Debug, Clone, Serialize)]
pub struct BucketView {
pub id: BucketId,
pub input_length_min: u32,
pub input_length_max: u32,
pub num_instances: u32,
pub total_queue_len: u32,
pub total_load_blocks: u32,
pub predicted_prefix: u32,
}
#[derive(Debug, Clone, Serialize)]
pub struct BucketCandidate {
pub bucket: BucketId,
pub input_length_min: u32,
pub input_length_max: u32,
pub num_instances: u32,
pub total_queue_len: u32,
pub total_load_blocks: u32,
pub predicted_prefix: u32,
pub matches_input_len: bool,
pub score: f64,
}
#[derive(Debug, Clone, Serialize)]
pub struct GlobalRouteDecision {
pub req_id: u64,
pub mode: &'static str,
pub chosen_bucket: BucketId,
pub candidates: Vec<BucketCandidate>,
pub reason: &'static str,
}
impl GlobalRouteDecision {
pub fn single_bucket(req_id: u64, chosen_bucket: BucketId) -> Self {
Self {
req_id,
mode: "single_pool",
chosen_bucket,
candidates: Vec::new(),
reason: "single pool uses bucket 0",
}
}
}
pub trait GlobalRouter: Send {
fn name(&self) -> &'static str;
fn route(
&mut self,
req: &RequestRecord,
buckets: &[BucketView],
now: f64,
) -> Result<GlobalRouteDecision>;
}
struct StrictInputLengthRouter {
reported_mode: &'static str,
reason: &'static str,
}
impl StrictInputLengthRouter {
fn new(reported_mode: &'static str, reason: &'static str) -> Self {
Self {
reported_mode,
reason,
}
}
}
impl GlobalRouter for StrictInputLengthRouter {
fn name(&self) -> &'static str {
self.reported_mode
}
fn route(
&mut self,
req: &RequestRecord,
buckets: &[BucketView],
_now: f64,
) -> Result<GlobalRouteDecision> {
let candidates = buckets
.iter()
.map(|view| BucketCandidate {
bucket: view.id,
input_length_min: view.input_length_min,
input_length_max: view.input_length_max,
num_instances: view.num_instances,
total_queue_len: view.total_queue_len,
total_load_blocks: view.total_load_blocks,
predicted_prefix: view.predicted_prefix,
matches_input_len: view.input_length_min <= req.input_len
&& req.input_len <= view.input_length_max,
score: if view.input_length_min <= req.input_len
&& req.input_len <= view.input_length_max
{
0.0
} else {
f64::INFINITY
},
})
.collect::<Vec<_>>();
let matches = candidates
.iter()
.filter(|candidate| candidate.matches_input_len)
.map(|candidate| candidate.bucket)
.collect::<Vec<_>>();
let chosen_bucket = match matches.as_slice() {
[bucket] => *bucket,
[] => {
return Err(anyhow!(
"cluster.global_router.mode={} has no bucket for input_length={}",
self.reported_mode,
req.input_len
));
}
_ => {
return Err(anyhow!(
"cluster.global_router.mode={} matched multiple buckets for input_length={}",
self.reported_mode,
req.input_len
));
}
};
Ok(GlobalRouteDecision {
req_id: req.req_id,
mode: self.reported_mode,
chosen_bucket,
candidates,
reason: self.reason,
})
}
}
struct BucketScoreRouter {
length_penalty_weight: f64,
load_weight: f64,
cache_weight: f64,
}
impl BucketScoreRouter {
fn new(full: &Config) -> Self {
Self {
length_penalty_weight: full.cluster.global_router.length_penalty_weight,
load_weight: full.cluster.global_router.load_weight,
cache_weight: full.cluster.global_router.cache_weight,
}
}
fn length_penalty(&self, req: &RequestRecord, bucket: &BucketView) -> f64 {
if req.input_len < bucket.input_length_min {
(bucket.input_length_min - req.input_len) as f64
} else if req.input_len > bucket.input_length_max {
(req.input_len - bucket.input_length_max) as f64
} else {
0.0
}
}
}
impl GlobalRouter for BucketScoreRouter {
fn name(&self) -> &'static str {
"bucket_score"
}
fn route(
&mut self,
req: &RequestRecord,
buckets: &[BucketView],
_now: f64,
) -> Result<GlobalRouteDecision> {
let mut chosen_bucket = None;
let mut best_score = f64::INFINITY;
let mut candidates = Vec::with_capacity(buckets.len());
for bucket in buckets {
let length_penalty = self.length_penalty(req, bucket);
let miss = req
.hash_ids
.len()
.saturating_sub(bucket.predicted_prefix as usize) as f64;
let score = self.length_penalty_weight * length_penalty
+ self.load_weight * bucket.total_queue_len as f64
+ self.cache_weight * miss;
candidates.push(BucketCandidate {
bucket: bucket.id,
input_length_min: bucket.input_length_min,
input_length_max: bucket.input_length_max,
num_instances: bucket.num_instances,
total_queue_len: bucket.total_queue_len,
total_load_blocks: bucket.total_load_blocks,
predicted_prefix: bucket.predicted_prefix,
matches_input_len: bucket.input_length_min <= req.input_len
&& req.input_len <= bucket.input_length_max,
score,
});
let better = score < best_score
|| (score == best_score && chosen_bucket.is_none_or(|best| bucket.id < best));
if better {
best_score = score;
chosen_bucket = Some(bucket.id);
}
}
Ok(GlobalRouteDecision {
req_id: req.req_id,
mode: self.name(),
chosen_bucket: chosen_bucket.ok_or_else(|| anyhow!("no buckets available"))?,
candidates,
reason: "weighted length/load/cache bucket score",
})
}
}
pub fn build(full: &Config) -> Box<dyn GlobalRouter> {
match full.cluster.global_router.mode {
GlobalRouterMode::StrictInputLength => Box::new(StrictInputLengthRouter::new(
"strict_input_length",
"unique bucket range contains input_length",
)) as Box<dyn GlobalRouter>,
GlobalRouterMode::BucketScore => {
Box::new(BucketScoreRouter::new(full)) as Box<dyn GlobalRouter>
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
ClusterConfig, GlobalRouterConfig, MetaStoreConfig, RouterConfig, RouterMode,
};
fn cfg() -> Config {
Config {
model: crate::config::ModelConfig::default(),
hardware: crate::config::HardwareConfig {
gpu_flops: 1.0,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0,
hbm_bytes: 1.0,
dram_bytes: 1.0,
host_dram_bw: 1.0,
pcie_bw: 1.0,
pcie_latency_us: 1.0,
rdma_bw: 1.0,
rdma_latency_us: 1.0,
intra_node_tp_bw: 1.0,
intra_node_tp_latency_us: 1.0,
tp_degree: 1,
max_batch_slots: 1,
prefill_chunk_tokens: 1,
},
calibration: crate::config::CalibrationConfig::default(),
cluster: ClusterConfig {
num_instances: None,
buckets: Vec::new(),
global_router: GlobalRouterConfig {
mode: GlobalRouterMode::BucketScore,
length_penalty_weight: 1.0,
load_weight: 1.0,
cache_weight: 1.0,
},
meta_store: MetaStoreConfig { ttl_seconds: 1.0 },
router: RouterConfig {
mode: RouterMode::LeastLoaded,
precise_probe_latency_us: 1.0,
precise_probe_topk: 1,
load_alpha: 1.0,
score_alpha: 1.0,
score_beta: 1.0,
prefix_k: 8,
affinity_fan_out: 1,
},
},
sim: crate::config::SimConfig {
trace_path: String::new(),
max_requests: None,
output_dir: String::new(),
sample_interval_s: 0.0,
seed: 0,
input_length_min: None,
input_length_max: None,
},
}
}
fn req(input_len: u32) -> RequestRecord {
RequestRecord {
req_id: 1,
chat_id: 0,
parent_chat_id: -1,
turn: 0,
arrival: 0.0,
input_len,
output_len: 16,
hash_ids: vec![10, 11, 12],
}
}
#[test]
fn bucket_score_prefers_matching_bucket_when_load_is_equal() {
let mut router = BucketScoreRouter::new(&cfg());
let buckets = vec![
BucketView {
id: 0,
input_length_min: 0,
input_length_max: 32,
num_instances: 2,
total_queue_len: 1,
total_load_blocks: 0,
predicted_prefix: 0,
},
BucketView {
id: 1,
input_length_min: 33,
input_length_max: 96,
num_instances: 2,
total_queue_len: 1,
total_load_blocks: 0,
predicted_prefix: 0,
},
];
let decision = router.route(&req(24), &buckets, 0.0).unwrap();
assert_eq!(decision.chosen_bucket, 0);
}
#[test]
fn bucket_score_can_override_length_match_when_load_gap_is_large() {
let mut full = cfg();
full.cluster.global_router.load_weight = 5.0;
full.cluster.global_router.cache_weight = 1.0;
full.cluster.global_router.length_penalty_weight = 1.0;
let mut router = BucketScoreRouter::new(&full);
let buckets = vec![
BucketView {
id: 0,
input_length_min: 0,
input_length_max: 32,
num_instances: 2,
total_queue_len: 20,
total_load_blocks: 0,
predicted_prefix: 0,
},
BucketView {
id: 1,
input_length_min: 33,
input_length_max: 96,
num_instances: 2,
total_queue_len: 0,
total_load_blocks: 0,
predicted_prefix: 2,
},
];
let decision = router.route(&req(24), &buckets, 0.0).unwrap();
assert_eq!(decision.chosen_bucket, 1);
}
}

View File

@@ -41,13 +41,13 @@ impl Router for LeastLoadedRouter {
best = inst.id;
}
}
RouteDecision {
req_id: req.req_id,
mode: "least_loaded",
chosen: best,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"least_loaded",
best,
0.0,
candidates,
reason: "argmin(kv_used + alpha * queue_len)",
}
"argmin(kv_used + alpha * queue_len)",
)
}
}

View File

@@ -61,13 +61,13 @@ impl Router for LeastTokensRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "least_tokens",
chosen: best,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"least_tokens",
best,
0.0,
candidates,
reason: "argmin(waiting_prefill_tokens)",
}
"argmin(waiting_prefill_tokens)",
)
}
}

View File

@@ -53,7 +53,7 @@ pub struct LineageAffinityRouter {
impl LineageAffinityRouter {
pub fn new(config: &Config) -> Self {
let n = config.cluster.num_instances as usize;
let n = config.cluster.total_instances() as usize;
let configured_fan_out = config.cluster.router.affinity_fan_out;
let max_fan_out = if configured_fan_out > 0 {
configured_fan_out.max(2).min(n)
@@ -231,13 +231,13 @@ impl Router for LineageAffinityRouter {
self.request_home
.insert(req.chat_id, instances[chosen.idx].id);
RouteDecision {
req_id: req.req_id,
mode: "lineage_affinity",
chosen: instances[chosen.idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"lineage_affinity",
instances[chosen.idx].id,
0.0,
candidates,
reason,
}
)
}
}

View File

@@ -90,13 +90,13 @@ impl Router for MinPdRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "min_pd",
chosen: best,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"min_pd",
best,
0.0,
candidates,
reason: "argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs",
}
"argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs",
)
}
}

View File

@@ -6,6 +6,7 @@ pub mod cache_load;
pub mod cache_score;
pub mod cache_score_ttl;
pub mod estimated_ttft;
pub mod global_bucket;
pub mod least_loaded;
pub mod least_tokens;
pub mod lineage_affinity;
@@ -23,6 +24,8 @@ use crate::instance::Instance;
use crate::trace::RequestRecord;
use crate::types::InstanceId;
pub use global_bucket::{BucketCandidate, BucketId, BucketView, GlobalRouteDecision, GlobalRouter};
#[derive(Debug, Clone, Serialize)]
pub struct CandidateInfo {
pub instance: InstanceId,
@@ -34,11 +37,25 @@ pub struct CandidateInfo {
#[derive(Debug, Clone, Serialize)]
pub struct RouteDecision {
pub req_id: u64,
pub global_mode: &'static str,
pub mode: &'static str,
pub global_reason: &'static str,
pub local_reason: &'static str,
pub chosen_bucket: BucketId,
pub chosen: InstanceId,
pub probe_overhead_s: f64,
pub bucket_candidates: Vec<BucketCandidate>,
pub candidates: Vec<CandidateInfo>,
pub reason: &'static str,
}
impl RouteDecision {
pub fn with_global(mut self, decision: &GlobalRouteDecision) -> Self {
self.global_mode = decision.mode;
self.global_reason = decision.reason;
self.chosen_bucket = decision.chosen_bucket;
self.bucket_candidates = decision.candidates.clone();
self
}
}
pub trait Router: Send {
@@ -63,6 +80,28 @@ pub(crate) fn local_l0_scores(req: &RequestRecord, instances: &[Instance]) -> Ve
.collect()
}
pub fn local_route_decision(
req_id: u64,
mode: &'static str,
chosen: InstanceId,
probe_overhead_s: f64,
candidates: Vec<CandidateInfo>,
reason: &'static str,
) -> RouteDecision {
RouteDecision {
req_id,
global_mode: "single_pool",
mode,
global_reason: "single pool uses bucket 0",
local_reason: reason,
chosen_bucket: 0,
chosen,
probe_overhead_s,
bucket_candidates: Vec::new(),
candidates,
}
}
pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
use crate::config::RouterMode::*;
let cfg = &full.cluster.router;
@@ -122,6 +161,10 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
}
}
pub fn build_global(full: &Config) -> Box<dyn GlobalRouter> {
global_bucket::build(full)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -181,7 +224,9 @@ mod tests {
hardware: test_hardware(),
calibration: CalibrationConfig::default(),
cluster: ClusterConfig {
num_instances: 2,
num_instances: Some(2),
buckets: Vec::new(),
global_router: Default::default(),
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
@@ -351,7 +396,7 @@ mod tests {
let mut router = PrefixAffinityRouter::new(&cfg);
let decision = router.route(&req, &instances, &meta, 0.0);
assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)");
assert_eq!(decision.local_reason, "affinity fallback: min(drain+fetch)");
assert_eq!(decision.chosen, 1);
}

View File

@@ -62,13 +62,13 @@ impl Router for PreciseRouter {
}
}
RouteDecision {
req_id: req.req_id,
mode: "precise",
chosen: best,
probe_overhead_s: n as f64 * self.probe_latency_s,
crate::router::local_route_decision(
req.req_id,
"precise",
best,
n as f64 * self.probe_latency_s,
candidates,
reason: "exact-probe all instances' L0 cache",
}
"exact-probe all instances' L0 cache",
)
}
}

View File

@@ -51,7 +51,7 @@ pub struct PrefixAffinityRouter {
impl PrefixAffinityRouter {
pub fn new(config: &Config) -> Self {
let n = config.cluster.num_instances as usize;
let n = config.cluster.total_instances() as usize;
let cfg_fan = config.cluster.router.affinity_fan_out;
// fan_out: if configured, use it; otherwise auto = max(2, n/8).
let fan_out = if cfg_fan > 0 {
@@ -166,13 +166,13 @@ impl Router for PrefixAffinityRouter {
reason = "prefix affinity: top-K min drain";
}
RouteDecision {
req_id: req.req_id,
mode: "prefix_affinity",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"prefix_affinity",
instances[best_idx].id,
0.0,
candidates,
reason,
}
)
}
}

View File

@@ -33,19 +33,19 @@ impl Router for RandomRouter {
) -> RouteDecision {
let n = instances.len();
let chosen = self.rng.gen_range(0..n) as InstanceId;
RouteDecision {
req_id: req.req_id,
mode: "random",
crate::router::local_route_decision(
req.req_id,
"random",
chosen,
probe_overhead_s: 0.0,
candidates: vec![CandidateInfo {
0.0,
vec![CandidateInfo {
instance: chosen,
predicted_prefix: 0,
load_blocks: instances[chosen as usize].kv_blocks_used,
queue_len: instances[chosen as usize].queue_len(),
}],
reason: "uniform random",
}
"uniform random",
)
}
}
@@ -75,18 +75,18 @@ impl Router for RoundRobinRouter {
let n = instances.len() as u32;
let chosen = self.next % n;
self.next = self.next.wrapping_add(1);
RouteDecision {
req_id: req.req_id,
mode: "round_robin",
crate::router::local_route_decision(
req.req_id,
"round_robin",
chosen,
probe_overhead_s: 0.0,
candidates: vec![CandidateInfo {
0.0,
vec![CandidateInfo {
instance: chosen,
predicted_prefix: 0,
load_blocks: instances[chosen as usize].kv_blocks_used,
queue_len: instances[chosen as usize].queue_len(),
}],
reason: "round robin",
}
"round robin",
)
}
}

View File

@@ -46,13 +46,13 @@ impl Router for TtlAwareRouter {
best = inst.id;
}
}
RouteDecision {
req_id: req.req_id,
mode: "ttl_aware",
chosen: best,
probe_overhead_s: 0.0,
crate::router::local_route_decision(
req.req_id,
"ttl_aware",
best,
0.0,
candidates,
reason: "max meta_store prefix, tie -> least loaded",
}
"max meta_store prefix, tie -> least loaded",
)
}
}

View File

@@ -91,11 +91,24 @@ mod tests {
q.schedule(
2.0,
Event::BatchTick {
bucket: 0,
instance: 0 as InstanceId,
},
);
q.schedule(1.0, Event::BatchTick { instance: 1 });
q.schedule(1.5, Event::BatchTick { instance: 2 });
q.schedule(
1.0,
Event::BatchTick {
bucket: 0,
instance: 1,
},
);
q.schedule(
1.5,
Event::BatchTick {
bucket: 0,
instance: 2,
},
);
let (t1, _) = q.pop().unwrap();
let (t2, _) = q.pop().unwrap();
let (t3, _) = q.pop().unwrap();
@@ -107,12 +120,24 @@ mod tests {
#[test]
fn equal_time_fifo() {
let mut q = EventQueue::new();
q.schedule(1.0, Event::BatchTick { instance: 7 });
q.schedule(1.0, Event::BatchTick { instance: 8 });
q.schedule(
1.0,
Event::BatchTick {
bucket: 0,
instance: 7,
},
);
q.schedule(
1.0,
Event::BatchTick {
bucket: 1,
instance: 8,
},
);
let (_, e1) = q.pop().unwrap();
let (_, e2) = q.pop().unwrap();
match (e1, e2) {
(Event::BatchTick { instance: a }, Event::BatchTick { instance: b }) => {
(Event::BatchTick { instance: a, .. }, Event::BatchTick { instance: b, .. }) => {
assert_eq!(a, 7);
assert_eq!(b, 8);
}

View File

@@ -1,5 +1,6 @@
//! Event types for the discrete-event engine.
use crate::router::BucketId;
use crate::types::{InstanceId, ReqId};
#[derive(Debug)]
@@ -7,7 +8,10 @@ pub enum Event {
/// New trace request arrives at the cluster router.
Arrival { req_id: ReqId },
/// Per-instance scheduler tick (continuous batching).
BatchTick { instance: InstanceId },
BatchTick {
bucket: BucketId,
instance: InstanceId,
},
/// Periodic time-series sample of all instances.
Sample,
/// Stop the simulation early (used internally).

View File

@@ -6,7 +6,7 @@ use std::io::Write;
use kvcache_simulator::config::*;
use kvcache_simulator::driver;
use kvcache_simulator::replay::ReplayEvictPolicy;
use kvcache_simulator::replay::{self, PlacementEntry, ReplayEvictPolicy};
fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config {
Config {
@@ -41,7 +41,9 @@ fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config {
},
calibration: CalibrationConfig::default(),
cluster: ClusterConfig {
num_instances: 4,
num_instances: Some(4),
buckets: Vec::new(),
global_router: Default::default(),
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
@@ -68,6 +70,26 @@ fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config {
}
}
fn bucketed_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config {
let mut cfg = base_config(trace_path, out_dir, mode);
cfg.cluster.num_instances = None;
cfg.cluster.buckets = vec![
BucketConfig {
name: "short".into(),
input_length_min: 0,
input_length_max: 64,
num_instances: 2,
},
BucketConfig {
name: "long".into(),
input_length_min: 65,
input_length_max: 128,
num_instances: 1,
},
];
cfg
}
fn write_synthetic_trace(path: &std::path::Path) {
// 5 distinct conversations, each with 8 turns. Within a conversation,
// turn k+1 reuses the prefix of turn k (shared first ~10 blocks) and
@@ -272,3 +294,160 @@ fn ablation_parallel_matches_serial() {
assert!((lhs.miss_rate - rhs.miss_rate).abs() < 1e-12);
}
}
#[test]
fn strict_bucket_run_emits_bucket_fields_in_outputs() {
let tmp = std::env::temp_dir().join("kvcache_sim_bucket_outputs");
let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap();
let trace_path = tmp.join("trace.jsonl");
let mut f = std::fs::File::create(&trace_path).unwrap();
writeln!(
f,
"{}",
serde_json::json!({
"chat_id": 1,
"parent_chat_id": -1,
"timestamp": 0.0,
"input_length": 32,
"output_length": 16,
"type": "text",
"turn": 0,
"hash_ids": [1, 2]
})
)
.unwrap();
writeln!(
f,
"{}",
serde_json::json!({
"chat_id": 2,
"parent_chat_id": -1,
"timestamp": 0.1,
"input_length": 80,
"output_length": 16,
"type": "text",
"turn": 0,
"hash_ids": [3, 4, 5, 6, 7]
})
)
.unwrap();
let mut cfg = bucketed_config(
trace_path.to_str().unwrap(),
tmp.to_str().unwrap(),
RouterMode::LeastLoaded,
);
cfg.cluster.global_router.mode = GlobalRouterMode::StrictInputLength;
cfg.sim.sample_interval_s = 0.05;
let _ = driver::run(&cfg, Some("strict_bucket")).expect("bucketed run");
let per_request = std::fs::read_to_string(tmp.join("strict_bucket/per_request.csv")).unwrap();
assert!(per_request.contains("bucket"));
assert!(per_request.contains("length_bucket_match"));
let instances = std::fs::read_to_string(tmp.join("strict_bucket/instances.csv")).unwrap();
assert!(instances.contains("bucket"));
let routing_log = std::fs::read_to_string(tmp.join("strict_bucket/routing_log.jsonl")).unwrap();
assert!(routing_log.contains("\"chosen_bucket\""));
assert!(routing_log.contains("\"bucket_candidates\""));
assert!(routing_log.contains("\"global_reason\""));
}
#[test]
fn bucketed_configs_are_rejected_by_legacy_fixed_placement_paths() {
let tmp = std::env::temp_dir().join("kvcache_sim_bucketed_reject");
let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap();
let trace_path = tmp.join("trace.jsonl");
write_synthetic_trace(&trace_path);
let cfg = bucketed_config(
trace_path.to_str().unwrap(),
tmp.to_str().unwrap(),
RouterMode::Random,
);
let err =
driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru])
.expect_err("bucketed ablation should fail");
assert!(err.to_string().contains("cluster.buckets"));
let err = replay::replay_fixed_placement(
&cfg,
&[],
&Vec::<PlacementEntry>::new(),
ReplayEvictPolicy::Lru,
)
.expect_err("bucketed replay should fail");
assert!(err.to_string().contains("cluster.buckets"));
}
#[test]
fn bucket_score_can_deviate_from_strict_length_bucket() {
let tmp = std::env::temp_dir().join("kvcache_sim_bucket_score");
let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap();
let trace_path = tmp.join("trace.jsonl");
let mut f = std::fs::File::create(&trace_path).unwrap();
for req_id in 0..3 {
writeln!(
f,
"{}",
serde_json::json!({
"chat_id": req_id,
"parent_chat_id": -1,
"timestamp": 0.0,
"input_length": 24,
"output_length": 16,
"type": "text",
"turn": 0,
"hash_ids": [100 + req_id, 200 + req_id]
})
)
.unwrap();
}
let mut strict_cfg = bucketed_config(
trace_path.to_str().unwrap(),
tmp.to_str().unwrap(),
RouterMode::LeastLoaded,
);
strict_cfg.cluster.buckets = vec![
BucketConfig {
name: "short".into(),
input_length_min: 0,
input_length_max: 32,
num_instances: 1,
},
BucketConfig {
name: "long".into(),
input_length_min: 33,
input_length_max: 96,
num_instances: 1,
},
];
strict_cfg.cluster.global_router.mode = GlobalRouterMode::StrictInputLength;
let mut score_cfg = strict_cfg.clone();
score_cfg.cluster.global_router.mode = GlobalRouterMode::BucketScore;
score_cfg.cluster.global_router.length_penalty_weight = 1.0;
score_cfg.cluster.global_router.load_weight = 10.0;
score_cfg.cluster.global_router.cache_weight = 0.0;
let _ = driver::run(&strict_cfg, Some("strict_score_cmp")).expect("strict run");
let _ = driver::run(&score_cfg, Some("bucket_score_cmp")).expect("bucket score run");
let strict_log =
std::fs::read_to_string(tmp.join("strict_score_cmp/routing_log.jsonl")).unwrap();
let score_log =
std::fs::read_to_string(tmp.join("bucket_score_cmp/routing_log.jsonl")).unwrap();
assert!(strict_log.contains("\"chosen_bucket\":0"));
assert!(score_log.contains("\"global_mode\":\"bucket_score\""));
assert!(score_log.contains("\"chosen_bucket\":1"));
}