diff --git a/README.md b/README.md index b859653..7b9d42b 100644 --- a/README.md +++ b/README.md @@ -58,11 +58,19 @@ Prints `summary.json` to stdout and writes the full output directory target/release/kvcache-sim ablate \ --config configs/glm5-8xb200-hf.yaml \ --routers random,least_loaded,least_tokens,min_pd,prefix_affinity \ + --evict-policies lru \ --output-dir runs/glm5_ablation ``` -Writes one subdirectory per router plus a combined -`ablation.json` with side-by-side summaries. +Writes `ablation.json` with one row per `router x evict_policy`. + +`ablate` currently supports only `lru` as a valid eviction policy. The +aggregated output keeps the online prefill-time metrics +(`ttft_mean/p50/p95/p99`) and omits `e2e`. + +The previous replay-based `belady` approximation has been removed from +the CLI because it was not an exact full-hierarchy Belady algorithm and +could produce misleading comparisons against `lru`. ### 3. Compute theoretical hit-rate ceilings (oracle) @@ -115,7 +123,8 @@ so the same config can be reused across sweeps: | `--ttl-seconds ` | `cluster.meta_store.ttl_seconds` | `oracle` additionally takes `--capacity-blocks ` / `--per-instance` -and `--out `. `ablate` additionally takes `--routers `. +and `--out `. `ablate` additionally takes `--routers ` and +`--evict-policies ` (currently only `lru`). ## Router modes @@ -288,12 +297,8 @@ memory_time = layers * weight_bytes_per_layer / gpu_mem_bw | Config | Model | Hardware | Instances | Trace | |--------|-------|----------|-----------|-------| | `glm5-8xb200-hf.yaml` | GLM-5 via HF config.json | 8xB200 preset | 32 | GLM coder blk512 | -| `glm5-8xb200-blk512.yaml` | GLM-5 inline | 8xB200 inline | 64 | GLM coder blk512 | -| `glm5-8xb200.yaml` | GLM-5 inline | 8xB200 inline | 8 | GLM coder blk512 | +| `glm5-nvfp4-8xb300.yaml` | GLM-5-NVFP4 via HF config.json | 8xB300 preset | 8 | GLM coder blk512 | | `qwen3-coder-480b-8xh20.yaml` | Qwen3-Coder via HF | 8xH20 preset | 32 | Qwen coder blk16 | -| `qwen2.5-coder-7b-h800.yaml` | Qwen2.5-7B inline | H800 inline | 16 | Qwen coder blk16 | -| `qwen2.5-coder-7b-preset.yaml` | Qwen2.5-7B inline | H800 preset | 16 | Qwen coder blk16 | -| `qwen2.5-coder-32b-h800.yaml` | Qwen2.5-32B inline | H800 inline | 16 | Qwen coder blk16 | ## Outputs diff --git a/configs/glm5-8xb200-blk512.yaml b/configs/glm5-8xb200-blk512.yaml deleted file mode 100644 index cbf1be8..0000000 --- a/configs/glm5-8xb200-blk512.yaml +++ /dev/null @@ -1,68 +0,0 @@ -# GLM-5 (zai-org/GLM-5) on 8 x B200 SXM (192GB each). -# Architecture from HuggingFace config.json — all roofline coefficients -# are derived automatically. - -model: - name: glm-5 - # Core architecture (from HF config.json) - num_layers: 78 - hidden_size: 6144 - num_attention_heads: 64 - num_kv_heads: 64 # formalism; MLA overrides KV cache sizing - head_dim: 64 - intermediate_size: 12288 # shared expert FFN width - dtype_bytes: 2 # BF16 - block_size_tokens: 512 # matches bailian-traces blksz_512 - - # MoE: 256 routed + 1 shared, 8 active per token - moe: - num_experts: 256 - num_active_experts: 8 - num_shared_experts: 1 - expert_intermediate_size: 2048 # moe_intermediate_size - - # MLA (Multi-head Latent Attention): compressed KV cache - mla: - kv_lora_rank: 512 - q_lora_rank: 2048 - qk_nope_head_dim: 192 - qk_rope_head_dim: 64 - v_head_dim: 256 - - # DSA (DeepSeek Sparse Attention): sub-quadratic past dense_window - attention: - type: dsa - dense_window: 4096 - sparse_stride: 8 - first_dense_layers: 3 - -hardware: - # Aggregate of 8 x B200 in one tensor-parallel group. - gpu_flops: 1.80e16 # 8 * 2.25 PFLOPS BF16 dense - gpu_mem_bw: 6.40e13 # 8 * 8 TB/s HBM3e - # KV budget after FP8 weights + activations. GLM-5 FP8 ~744GB of 1536GB. - hbm_bytes: 500.0e9 - dram_bytes: 1.5e12 # ~1.5 TB usable CPU DRAM / v6d per node - pcie_bw: 128.0e9 # PCIe Gen6 x16 - pcie_latency_us: 4.0 - rdma_bw: 50.0e9 # ConnectX-7 400 Gbps - rdma_latency_us: 6.0 - max_batch_slots: 256 - prefill_chunk_tokens: 4096 - -cluster: - num_instances: 64 - meta_store: - ttl_seconds: 300.0 - router: - mode: min_pd - precise_probe_latency_us: 50.0 - precise_probe_topk: 4 - load_alpha: 1.0 - -sim: - trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl - max_requests: null - output_dir: runs/glm5_8xb200_blk512 - sample_interval_s: 1.0 - seed: 42 diff --git a/configs/glm5-8xb200-hf.yaml b/configs/glm5-8xb200-hf.yaml deleted file mode 100644 index 0cc32d2..0000000 --- a/configs/glm5-8xb200-hf.yaml +++ /dev/null @@ -1,40 +0,0 @@ -# GLM-5 using HuggingFace config.json + hardware preset. -# -# This config demonstrates the simplified format: -# model.config_json — loads architecture from HF config.json -# hardware.type — loads GPU specs from built-in preset -# -# Only deployment-specific fields need to be set explicitly. -# Any field from config_json or the preset can be overridden in YAML. - -model: - # Auto-detect architecture: MoE, MLA, DSA, head dims, etc. - config_json: ../models/GLM-5/config.json - name: glm-5 # override HF model_type - dtype_bytes: 1 # BF16 (not in HF config.json) - block_size_tokens: 512 # matches bailian-traces blksz_512 - -hardware: - type: 8xb200 # 8 x B200 SXM (192GB each) - # Override preset values for this specific deployment: - hbm_bytes: 500.0e9 # KV budget after FP8 weights + activations - dram_bytes: 1.5e12 # ~1.5 TB usable CPU DRAM per node - max_batch_slots: 256 - -cluster: - num_instances: 32 - meta_store: - ttl_seconds: 300.0 - router: - mode: min_pd - precise_probe_latency_us: 50.0 - precise_probe_topk: 4 - load_alpha: 1.0 - prefix_k: 8 - -sim: - trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl - max_requests: null - output_dir: runs/glm5_8xb200_hf - sample_interval_s: 1.0 - seed: 42 diff --git a/configs/glm5-8xb200.yaml b/configs/glm5-8xb200.yaml index 0e3c542..9afaa6b 100644 --- a/configs/glm5-8xb200.yaml +++ b/configs/glm5-8xb200.yaml @@ -1,66 +1,39 @@ -# GLM-5 (zai-org/GLM-5) served as a single tensor-parallel instance on -# 8 x NVIDIA B200 SXM (192GB each, 1.5 TB aggregate HBM). +# GLM-5 using HuggingFace config.json + hardware preset. # -# GLM-5 is a 744B-total / 40B-active Mixture-of-Experts model (BF16), -# using DeepSeek Sparse Attention (DSA). The HF card does not publish -# layer/head shapes, so the values below are reasonable estimates based -# on the GLM-4.5 lineage; adjust once the official config.json is public. +# This config demonstrates the simplified format: +# model.config_json — loads architecture from HF config.json +# hardware.type — loads GPU specs from built-in preset # -# Hardware values below represent the *aggregate* of the 8-GPU TP group -# (one simulated "instance" == one 8xB200 serving replica). This is how -# the roofline in src/instance/compute.rs wants to see it: gpu_flops and -# gpu_mem_bw are the effective peaks seen by the TP'd model. -# -# Calibrate `flops_per_token_prefill` and `attn_quadratic_coeff` against -# measured prefill latency before trusting absolute TTFT numbers. +# Only deployment-specific fields need to be set explicitly. +# Any field from config_json or the preset can be overridden in YAML. model: - name: glm-5 - # --- estimates; refine from official config.json when available --- - num_layers: 92 - num_kv_heads: 8 # GQA - head_dim: 128 - dtype_bytes: 2 # BF16 - block_size_tokens: 16 # trace convention - # Active-params-driven roofline: MoE activates ~40B params per token, - # so non-attention prefill FLOPs/token ≈ 2 * 40e9 = 8e10. - flops_per_token_prefill: 8.0e10 - # Quadratic attention term ≈ 2 * num_heads * head_dim. GLM-5 uses - # DeepSeek Sparse Attention which is sub-quadratic in practice, so - # this coefficient is an upper bound — lower it if your measurements - # show DSA kicking in for long prompts. - attn_quadratic_coeff: 2048.0 - bytes_per_token_prefill: 0.0 + # Auto-detect architecture: MoE, MLA, DSA, head dims, etc. + config_json: ../models/GLM-5/config.json + name: glm-5 # override HF model_type + dtype_bytes: 1 # BF16 (not in HF config.json) + block_size_tokens: 512 # matches bailian-traces blksz_512 hardware: - # Aggregate of 8 x B200 in one tensor-parallel group. - gpu_flops: 1.80e16 # 8 * 2.25 PFLOPS BF16 dense - gpu_mem_bw: 6.40e13 # 8 * 8 TB/s HBM3e - # KV-cache budget after weights + activations. GLM-5 @ BF16 is ~1.49TB, - # which barely fits in 1.5TB HBM; realistic serving uses FP8 weights - # (~744GB), leaving ~500GB for activations + KV cache. Adjust if your - # deployment uses a different weight dtype. - hbm_bytes: 500.0e9 - dram_bytes: 1.5e12 # ~1.5 TB usable CPU DRAM / v6d per node - pcie_bw: 128.0e9 # PCIe Gen6 x16 ~ 128 GB/s per direction - pcie_latency_us: 4.0 - rdma_bw: 50.0e9 # ConnectX-7 400 Gbps ≈ 50 GB/s - rdma_latency_us: 6.0 - max_batch_slots: 256 - prefill_chunk_tokens: 2048 + type: 8xb200 # 8 x B200 SXM (192GB each) + # Override preset values for this specific deployment: + hbm_bytes: 500.0e9 # KV budget after FP8 weights + activations + dram_bytes: 1.5e12 # ~1.5 TB usable CPU DRAM per node + max_batch_slots: 256 cluster: - num_instances: 8 # 8 TP replicas -> 64 B200s cluster-wide + num_instances: 32 meta_store: - ttl_seconds: 120.0 + ttl_seconds: 300.0 router: - mode: ttl_aware + mode: min_pd precise_probe_latency_us: 50.0 precise_probe_topk: 4 load_alpha: 1.0 + prefix_k: 8 sim: - trace_path: qwen-bailian-usagetraces-anon/qwen_coder_blksz_16.jsonl + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl max_requests: null output_dir: runs/glm5_8xb200 sample_interval_s: 1.0 diff --git a/configs/qwen2.5-coder-32b-h800.yaml b/configs/qwen2.5-coder-32b-h800.yaml deleted file mode 100644 index cc66942..0000000 --- a/configs/qwen2.5-coder-32b-h800.yaml +++ /dev/null @@ -1,42 +0,0 @@ -# Qwen2.5-Coder-32B (dense, GQA) on H800 SXM (80GB). -# Architecture from HuggingFace config.json — roofline auto-derived. - -model: - name: qwen2.5-coder-32b - num_layers: 64 - hidden_size: 5120 - num_attention_heads: 40 - num_kv_heads: 8 # GQA - head_dim: 128 - intermediate_size: 27648 # SwiGLU FFN - dtype_bytes: 2 # BF16 - block_size_tokens: 16 - -hardware: - gpu_flops: 9.89e14 - gpu_mem_bw: 3.35e12 - hbm_bytes: 20.0e9 # smaller budget: 32B weights are large - dram_bytes: 512.0e9 - pcie_bw: 64.0e9 - pcie_latency_us: 5.0 - rdma_bw: 25.0e9 - rdma_latency_us: 8.0 - max_batch_slots: 128 - prefill_chunk_tokens: 1024 - -cluster: - num_instances: 16 - meta_store: - ttl_seconds: 60.0 - router: - mode: ttl_aware - precise_probe_latency_us: 50.0 - precise_probe_topk: 4 - load_alpha: 1.0 - -sim: - trace_path: traces/qwen_coder_blksz_16.jsonl - max_requests: null - output_dir: runs/qwen32b - sample_interval_s: 1.0 - seed: 42 diff --git a/configs/qwen2.5-coder-7b-h800.yaml b/configs/qwen2.5-coder-7b-h800.yaml deleted file mode 100644 index bea5226..0000000 --- a/configs/qwen2.5-coder-7b-h800.yaml +++ /dev/null @@ -1,42 +0,0 @@ -# Qwen2.5-Coder-7B (dense, GQA) on a single H800 SXM (80GB). -# Architecture from HuggingFace config.json — roofline auto-derived. - -model: - name: qwen2.5-coder-7b - num_layers: 28 - hidden_size: 3584 - num_attention_heads: 28 - num_kv_heads: 4 # GQA: 28 query heads, 4 KV heads - head_dim: 128 - intermediate_size: 18944 # SwiGLU FFN - dtype_bytes: 2 # BF16 - block_size_tokens: 16 # matches qwen_coder_blksz_16 trace - -hardware: - gpu_flops: 9.89e14 # H800 bf16 dense - gpu_mem_bw: 3.35e12 # 3.35 TB/s HBM3 - hbm_bytes: 60.0e9 # leave headroom for weights/activations - dram_bytes: 512.0e9 - pcie_bw: 64.0e9 # PCIe Gen5 x16 - pcie_latency_us: 5.0 - rdma_bw: 25.0e9 # ~200 Gbps NIC - rdma_latency_us: 8.0 - max_batch_slots: 256 - prefill_chunk_tokens: 2048 - -cluster: - num_instances: 16 - meta_store: - ttl_seconds: 60.0 - router: - mode: ttl_aware - precise_probe_latency_us: 50.0 - precise_probe_topk: 4 - load_alpha: 1.0 - -sim: - trace_path: qwen-bailian-usagetraces-anon/qwen_coder_blksz_16.jsonl - max_requests: null - output_dir: runs/qwen7b - sample_interval_s: 1.0 - seed: 42 diff --git a/configs/qwen2.5-coder-7b-preset.yaml b/configs/qwen2.5-coder-7b-preset.yaml deleted file mode 100644 index a411662..0000000 --- a/configs/qwen2.5-coder-7b-preset.yaml +++ /dev/null @@ -1,36 +0,0 @@ -# Qwen2.5-Coder-7B using hardware preset. -# -# Model architecture is specified inline (no config.json needed for simple -# models). Hardware uses preset "h800" with a single override for hbm_bytes. - -model: - name: qwen2.5-coder-7b - num_layers: 28 - hidden_size: 3584 - num_attention_heads: 28 - num_kv_heads: 4 - head_dim: 128 - intermediate_size: 18944 - dtype_bytes: 2 - block_size_tokens: 16 - -hardware: - type: h800 # single H800 SXM (80GB) - hbm_bytes: 60.0e9 # KV budget after 7B model weights - -cluster: - num_instances: 16 - meta_store: - ttl_seconds: 60.0 - router: - mode: ttl_aware - precise_probe_latency_us: 50.0 - precise_probe_topk: 4 - load_alpha: 1.0 - -sim: - trace_path: qwen-bailian-usagetraces-anon/qwen_coder_blksz_16.jsonl - max_requests: null - output_dir: runs/qwen7b_preset - sample_interval_s: 1.0 - seed: 42 diff --git a/configs/qwen3-coder-480b-8xh20.yaml b/configs/qwen3-coder-480b-8xh20.yaml index a56e16e..3b84c54 100644 --- a/configs/qwen3-coder-480b-8xh20.yaml +++ b/configs/qwen3-coder-480b-8xh20.yaml @@ -5,16 +5,17 @@ model: config_json: ../models/Qwen3-Coder-480B-A35B-Instruct-FP8/config.json name: qwen3-coder-480b dtype_bytes: 1 # FP8 inference - block_size_tokens: 16 + block_size_tokens: 512 hardware: type: 8xh20 hbm_bytes: 400.0e9 # KV budget after FP8 weights on 8x96GB + dram_bytes: 1.0e12 # ~1.0 TB usable CPU DRAM per node cluster: - num_instances: 32 + num_instances: 128 meta_store: - ttl_seconds: 120.0 + ttl_seconds: 300.0 router: mode: min_pd precise_probe_latency_us: 50.0 @@ -22,7 +23,7 @@ cluster: load_alpha: 1.0 sim: - trace_path: traces/qwen_coder_blksz_16.jsonl + trace_path: bailian-traces/qwen3_coder_blksz_512_040915-040917.jsonl max_requests: null output_dir: runs/qwen3_coder_8xh20 sample_interval_s: 1.0 diff --git a/src/driver.rs b/src/driver.rs index 03fd2cf..e99a908 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -6,11 +6,13 @@ use std::collections::HashMap; use std::path::Path; use crate::cluster::Cluster; -use crate::config::Config; +use crate::config::{Config, RouterMode}; +use crate::metrics::ablation::AblationRow; use crate::metrics::per_request::{PerRequestRow, PerRequestWriter}; use crate::metrics::routing_log::RoutingLogWriter; use crate::metrics::summary::Summary; use crate::metrics::timeseries::{TimeseriesRow, TimeseriesWriter}; +use crate::replay::ReplayEvictPolicy; use crate::sim::{Event, EventQueue}; use crate::trace::{RequestRecord, TraceReader}; @@ -100,7 +102,12 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { if !inst.tick_scheduled { inst.tick_scheduled = true; let when = stats.ready_at.max(now); - q.schedule(when, Event::BatchTick { instance: stats.instance }); + q.schedule( + when, + Event::BatchTick { + instance: stats.instance, + }, + ); } } Event::BatchTick { instance } => { @@ -168,3 +175,33 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { Ok(RunOutputs { summary, rows }) } + +pub fn ablate_fixed_placement( + base: &Config, + routers: &[RouterMode], + evict_policies: &[ReplayEvictPolicy], +) -> Result> { + let mut out = Vec::new(); + for &policy in evict_policies { + if policy != ReplayEvictPolicy::Lru { + return Err(anyhow::anyhow!( + "exact belady is not supported for fixed-placement full-hierarchy ablation; \ + the previous replay-based approximation has been removed" + )); + } + } + for &mode in routers { + let mut cfg = base.clone(); + cfg.cluster.router.mode = mode; + let placement_run = run(&cfg, Some(&format!("{}__placement_lru", mode.as_str())))?; + for &policy in evict_policies { + out.push(AblationRow::from_summary( + mode.as_str(), + policy, + "realized_lru", + &placement_run.summary, + )); + } + } + Ok(out) +} diff --git a/src/lib.rs b/src/lib.rs index 36f9e98..1188251 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod instance; pub mod metrics; pub mod network; pub mod oracle; +pub mod replay; pub mod router; pub mod sim; pub mod trace; diff --git a/src/main.rs b/src/main.rs index 14289b2..89a7df7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use clap::{Args, Parser, Subcommand}; use std::path::PathBuf; use kvcache_simulator::config::{Config, RouterMode}; +use kvcache_simulator::replay::ReplayEvictPolicy; use kvcache_simulator::{driver, oracle, trace::TraceReader}; #[derive(Debug, Parser)] @@ -74,7 +75,8 @@ enum Cmd { #[command(flatten)] overrides: ConfigOverrides, }, - /// Run the same trace under multiple routers and compare summaries. + /// Run the same trace under multiple routers and fixed-placement eviction + /// policies, then compare cache-hit summaries. Ablate { #[arg(short, long)] config: PathBuf, @@ -85,6 +87,10 @@ enum Cmd { default_value = "random,least_loaded,least_tokens,ttl_aware,min_pd,cache_load,cache_score,estimated_ttft,prefix_affinity" )] routers: String, + /// Comma-separated eviction policies for ablation aggregation. + /// Currently only `lru` is supported. + #[arg(long, default_value = "lru")] + evict_policies: String, #[command(flatten)] overrides: ConfigOverrides, }, @@ -125,8 +131,9 @@ fn main() -> Result<()> { Cmd::Ablate { config, routers, + evict_policies, overrides, - } => cmd_ablate(&config, &routers, &overrides), + } => cmd_ablate(&config, &routers, &evict_policies, &overrides), Cmd::Validate { config, overrides } => cmd_validate(&config, &overrides), Cmd::Oracle { config, @@ -134,7 +141,13 @@ fn main() -> Result<()> { capacity_blocks, per_instance, out, - } => cmd_oracle(&config, &overrides, capacity_blocks, per_instance, out.as_deref()), + } => cmd_oracle( + &config, + &overrides, + capacity_blocks, + per_instance, + out.as_deref(), + ), } } @@ -151,7 +164,12 @@ fn cmd_run(path: &PathBuf, overrides: &ConfigOverrides) -> Result<()> { Ok(()) } -fn cmd_ablate(path: &PathBuf, routers: &str, overrides: &ConfigOverrides) -> Result<()> { +fn cmd_ablate( + path: &PathBuf, + routers: &str, + evict_policies: &str, + overrides: &ConfigOverrides, +) -> Result<()> { let base = load(path, overrides)?; let modes: Vec = routers .split(',') @@ -160,15 +178,27 @@ fn cmd_ablate(path: &PathBuf, routers: &str, overrides: &ConfigOverrides) -> Res .map(RouterMode::parse) .collect::>>() .with_context(|| format!("parsing --routers='{routers}'"))?; - let mut all = Vec::new(); - for mode in modes { - let mut cfg = base.clone(); - cfg.cluster.router.mode = mode; - let sub = mode.as_str().to_string(); - eprintln!("[ablate] running router={}", sub); - let out = driver::run(&cfg, Some(&sub))?; - all.push(out.summary); - } + let policies: Vec = evict_policies + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(ReplayEvictPolicy::parse) + .collect::>>() + .with_context(|| format!("parsing --evict-policies='{evict_policies}'"))?; + eprintln!( + "[ablate] routers={} evict_policies={}", + modes + .iter() + .map(RouterMode::as_str) + .collect::>() + .join(","), + policies + .iter() + .map(ReplayEvictPolicy::as_str) + .collect::>() + .join(",") + ); + let all = driver::ablate_fixed_placement(&base, &modes, &policies)?; let agg_path = std::path::Path::new(&base.sim.output_dir).join("ablation.json"); std::fs::create_dir_all(&base.sim.output_dir)?; std::fs::write(&agg_path, serde_json::to_string_pretty(&all)?)?; @@ -181,13 +211,25 @@ fn cmd_validate(path: &PathBuf, overrides: &ConfigOverrides) -> Result<()> { use kvcache_simulator::instance::compute::ComputeModel; let cfg = load(path, overrides)?; eprintln!("config OK: {}", cfg.model.name); - eprintln!("mode = {}", if cfg.model.is_arch_mode() { "architecture-derived" } else { "legacy manual" }); + eprintln!( + "mode = {}", + if cfg.model.is_arch_mode() { + "architecture-derived" + } else { + "legacy manual" + } + ); let cm = ComputeModel::new(&cfg.model, &cfg.hardware); eprintln!("compute: {}", cm.describe()); - eprintln!("kv_block_bytes = {} ({:.2} MB{})", + eprintln!( + "kv_block_bytes = {} ({:.2} MB{})", cfg.model.kv_block_bytes(), cfg.model.kv_block_bytes() as f64 / 1e6, - if cfg.model.mla.is_some() { ", MLA compressed" } else { "" }, + if cfg.model.mla.is_some() { + ", MLA compressed" + } else { + "" + }, ); let block_bytes = cfg.model.kv_block_bytes() as f64; let hbm_blocks = (cfg.hardware.hbm_bytes / block_bytes) as u64; @@ -251,7 +293,11 @@ fn cmd_oracle( capacity, per_instance_blocks, cfg.cluster.num_instances, - if per_instance { ", per-instance mode" } else { "" } + if per_instance { + ", per-instance mode" + } else { + "" + } ); let result = oracle::analyze(&records, capacity); diff --git a/src/metrics/ablation.rs b/src/metrics/ablation.rs new file mode 100644 index 0000000..6867cc3 --- /dev/null +++ b/src/metrics/ablation.rs @@ -0,0 +1,50 @@ +use serde::Serialize; + +use crate::metrics::Summary; +use crate::replay::ReplayEvictPolicy; + +#[derive(Debug, Clone, Serialize)] +pub struct AblationRow { + pub router: String, + pub evict_policy: String, + pub placement_source: String, + pub num_requests: u64, + pub total_blocks: u64, + pub ttft_mean: f64, + pub ttft_p50: f64, + pub ttft_p95: f64, + pub ttft_p99: f64, + pub hit_rate_l0: f64, + pub hit_rate_l1: f64, + pub hit_rate_remote: f64, + pub miss_rate: f64, + pub total_rdma_bytes: u64, + pub total_pcie_bytes: u64, +} + +impl AblationRow { + pub fn from_summary( + router: &str, + policy: ReplayEvictPolicy, + placement_source: &str, + summary: &Summary, + ) -> Self { + Self { + router: router.to_string(), + evict_policy: policy.as_str().to_string(), + placement_source: placement_source.to_string(), + num_requests: summary.num_requests, + total_blocks: summary.total_blocks, + ttft_mean: summary.ttft_mean, + ttft_p50: summary.ttft_p50, + ttft_p95: summary.ttft_p95, + ttft_p99: summary.ttft_p99, + hit_rate_l0: summary.hit_rate_l0, + hit_rate_l1: summary.hit_rate_l1, + hit_rate_remote: summary.hit_rate_remote, + miss_rate: summary.miss_rate, + total_rdma_bytes: summary.total_rdma_bytes, + total_pcie_bytes: summary.total_pcie_bytes, + } + } +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 86ea2cf..d52d61b 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -1,7 +1,9 @@ +pub mod ablation; pub mod per_request; pub mod routing_log; pub mod summary; pub mod timeseries; +pub use ablation::AblationRow; pub use per_request::PerRequestRow; pub use summary::Summary; diff --git a/src/replay.rs b/src/replay.rs new file mode 100644 index 0000000..13a7cb5 --- /dev/null +++ b/src/replay.rs @@ -0,0 +1,608 @@ +use ahash::{AHashMap, AHashSet}; +use anyhow::{anyhow, Result}; +use serde::Serialize; +use std::cmp::min; +use std::collections::BinaryHeap; + +use crate::config::Config; +use crate::instance::kv_cache::LruBlocks; +use crate::trace::RequestRecord; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ReplayEvictPolicy { + Lru, + Belady, +} + +impl ReplayEvictPolicy { + pub fn parse(s: &str) -> Result { + match s { + "lru" => Ok(Self::Lru), + "belady" => Err(anyhow!( + "exact belady is not supported for fixed-placement full-hierarchy ablation" + )), + other => Err(anyhow!("unknown evict policy: {other}")), + } + } + + pub fn as_str(&self) -> &'static str { + match self { + Self::Lru => "lru", + Self::Belady => "belady", + } + } +} + +#[derive(Debug, Clone)] +pub struct PlacementEntry { + pub req_id: u64, + pub instance: u32, +} + +#[derive(Debug, Clone, Serialize, Default)] +pub struct ReplaySummary { + pub num_requests: u64, + pub total_blocks: u64, + pub l0_hit_blocks: u64, + pub l1_hit_blocks: u64, + pub remote_hit_blocks: u64, + pub miss_blocks: u64, + pub hit_rate_l0: f64, + pub hit_rate_l1: f64, + pub hit_rate_remote: f64, + pub miss_rate: f64, + pub total_rdma_bytes: u64, + pub total_pcie_bytes: u64, +} + +impl ReplaySummary { + fn from_counts( + num_requests: usize, + total_blocks: u64, + l0_hit_blocks: u64, + l1_hit_blocks: u64, + remote_hit_blocks: u64, + miss_blocks: u64, + total_rdma_bytes: u64, + total_pcie_bytes: u64, + ) -> Self { + let denom = total_blocks.max(1) as f64; + Self { + num_requests: num_requests as u64, + total_blocks, + l0_hit_blocks, + l1_hit_blocks, + remote_hit_blocks, + miss_blocks, + hit_rate_l0: l0_hit_blocks as f64 / denom, + hit_rate_l1: l1_hit_blocks as f64 / denom, + hit_rate_remote: remote_hit_blocks as f64 / denom, + miss_rate: miss_blocks as f64 / denom, + total_rdma_bytes, + total_pcie_bytes, + } + } +} + +#[derive(Debug, Clone, Copy)] +enum FutureKind { + L0, + L1, +} + +#[derive(Debug)] +struct FutureIndex { + local: AHashMap<(u32, u64), Vec>, + global: AHashMap>, +} + +impl FutureIndex { + fn build(records: &[RequestRecord], placement: &[u32]) -> Self { + let mut local: AHashMap<(u32, u64), Vec> = AHashMap::new(); + let mut global: AHashMap> = AHashMap::new(); + for (req_idx, record) in records.iter().enumerate() { + let inst = placement[req_idx]; + let mut seen = AHashSet::new(); + for &block in &record.hash_ids { + if !seen.insert(block) { + continue; + } + local.entry((inst, block)).or_default().push(req_idx); + global.entry(block).or_default().push((req_idx, inst)); + } + } + Self { local, global } + } + + fn next_local(&self, inst: u32, block: u64, current_req_idx: usize) -> usize { + match self.local.get(&(inst, block)) { + Some(indices) => next_after(indices, current_req_idx), + None => usize::MAX, + } + } + + fn next_other(&self, inst: u32, block: u64, current_req_idx: usize) -> usize { + let Some(indices) = self.global.get(&block) else { + return usize::MAX; + }; + let start = first_after_pair(indices, current_req_idx); + for &(req_idx, owner_inst) in indices.iter().skip(start) { + if owner_inst != inst { + return req_idx; + } + } + usize::MAX + } + + fn next_use(&self, kind: FutureKind, inst: u32, block: u64, current_req_idx: usize) -> usize { + match kind { + FutureKind::L0 => self.next_local(inst, block, current_req_idx), + FutureKind::L1 => min( + self.next_local(inst, block, current_req_idx), + self.next_other(inst, block, current_req_idx), + ), + } + } +} + +fn next_after(indices: &[usize], current_req_idx: usize) -> usize { + let pos = indices.partition_point(|&idx| idx <= current_req_idx); + indices.get(pos).copied().unwrap_or(usize::MAX) +} + +fn first_after_pair(indices: &[(usize, u32)], current_req_idx: usize) -> usize { + indices.partition_point(|&(idx, _)| idx <= current_req_idx) +} + +#[derive(Debug)] +struct BeladyTier { + capacity: usize, + resident: AHashSet, + versions: AHashMap, + heap: BinaryHeap<(usize, u64, u64)>, + next_version: u64, +} + +impl BeladyTier { + fn new(capacity: usize) -> Self { + Self { + capacity, + resident: AHashSet::with_capacity(capacity), + versions: AHashMap::with_capacity(capacity), + heap: BinaryHeap::with_capacity(capacity), + next_version: 0, + } + } + + fn contains(&self, key: u64) -> bool { + self.resident.contains(&key) + } + + fn remove(&mut self, key: u64) -> bool { + if self.resident.remove(&key) { + self.versions.remove(&key); + true + } else { + false + } + } + + fn touch( + &mut self, + key: u64, + current_req_idx: usize, + kind: FutureKind, + inst: u32, + futures: &FutureIndex, + ) -> bool { + if !self.resident.contains(&key) { + return false; + } + self.next_version += 1; + let version = self.next_version; + let next_use = futures.next_use(kind, inst, key, current_req_idx); + self.versions.insert(key, version); + self.heap.push((next_use, version, key)); + true + } + + fn insert( + &mut self, + key: u64, + current_req_idx: usize, + kind: FutureKind, + inst: u32, + futures: &FutureIndex, + ) -> Option { + if self.touch(key, current_req_idx, kind, inst, futures) { + return None; + } + if self.capacity == 0 { + return Some(key); + } + let mut evicted = None; + if self.resident.len() == self.capacity { + evicted = self.evict(current_req_idx, kind, inst, futures); + } + self.next_version += 1; + let version = self.next_version; + let next_use = futures.next_use(kind, inst, key, current_req_idx); + self.resident.insert(key); + self.versions.insert(key, version); + self.heap.push((next_use, version, key)); + evicted + } + + fn evict( + &mut self, + current_req_idx: usize, + kind: FutureKind, + inst: u32, + futures: &FutureIndex, + ) -> Option { + while let Some((stored_next_use, version, key)) = self.heap.pop() { + if !self.resident.contains(&key) { + continue; + } + let Some(current_version) = self.versions.get(&key).copied() else { + continue; + }; + if current_version != version { + continue; + } + let actual_next_use = futures.next_use(kind, inst, key, current_req_idx); + if actual_next_use != stored_next_use { + self.next_version += 1; + let new_version = self.next_version; + self.versions.insert(key, new_version); + self.heap.push((actual_next_use, new_version, key)); + continue; + } + self.resident.remove(&key); + self.versions.remove(&key); + return Some(key); + } + None + } +} + +#[derive(Debug)] +enum Tier { + Lru(LruBlocks), + Belady(BeladyTier), +} + +impl Tier { + fn new(policy: ReplayEvictPolicy, capacity: usize) -> Self { + match policy { + ReplayEvictPolicy::Lru => Self::Lru(LruBlocks::new(capacity)), + ReplayEvictPolicy::Belady => Self::Belady(BeladyTier::new(capacity)), + } + } + + fn contains(&self, key: u64) -> bool { + match self { + Self::Lru(tier) => tier.contains(key), + Self::Belady(tier) => tier.contains(key), + } + } + + fn remove(&mut self, key: u64) -> bool { + match self { + Self::Lru(tier) => tier.remove(key), + Self::Belady(tier) => tier.remove(key), + } + } + + fn touch( + &mut self, + key: u64, + req_idx: usize, + kind: FutureKind, + inst: u32, + futures: &FutureIndex, + ) -> bool { + match self { + Self::Lru(tier) => tier.touch(key), + Self::Belady(tier) => tier.touch(key, req_idx, kind, inst, futures), + } + } + + fn insert( + &mut self, + key: u64, + req_idx: usize, + kind: FutureKind, + inst: u32, + futures: &FutureIndex, + ) -> Option { + match self { + Self::Lru(tier) => tier.insert_block(key), + Self::Belady(tier) => tier.insert(key, req_idx, kind, inst, futures), + } + } + + fn longest_prefix_touch( + &mut self, + hashes: &[u64], + req_idx: usize, + kind: FutureKind, + inst: u32, + futures: &FutureIndex, + ) -> usize { + match self { + Self::Lru(tier) => tier.longest_prefix(hashes), + Self::Belady(tier) => { + let mut matched = 0usize; + for &hash in hashes { + if !tier.touch(hash, req_idx, kind, inst, futures) { + break; + } + matched += 1; + } + matched + } + } + } + + fn longest_prefix_peek(&self, hashes: &[u64]) -> usize { + match self { + Self::Lru(tier) => tier.longest_prefix_peek(hashes), + Self::Belady(tier) => { + let mut matched = 0usize; + for &hash in hashes { + if !tier.contains(hash) { + break; + } + matched += 1; + } + matched + } + } + } +} + +#[derive(Debug)] +struct ReplayInstanceCache { + l0: Tier, + l1: Tier, +} + +impl ReplayInstanceCache { + fn new(policy: ReplayEvictPolicy, l0_cap: usize, l1_cap: usize) -> Self { + Self { + l0: Tier::new(policy, l0_cap), + l1: Tier::new(policy, l1_cap), + } + } + + fn promote_l1_blocks_to_l0( + &mut self, + hashes: &[u64], + req_idx: usize, + inst: u32, + futures: &FutureIndex, + owners: &mut AHashMap>, + ) { + for &hash in hashes { + if self.l1.remove(hash) { + remove_owner(owners, hash, inst); + } + self.insert_block_into_l0(hash, req_idx, inst, futures, owners); + } + } + + fn fetch_remote_blocks_to_l0( + &mut self, + hashes: &[u64], + req_idx: usize, + inst: u32, + futures: &FutureIndex, + owners: &mut AHashMap>, + ) { + for &hash in hashes { + self.stage_remote_block_in_l1(hash, req_idx, inst, futures, owners); + if self.l1.remove(hash) { + remove_owner(owners, hash, inst); + } + self.insert_block_into_l0(hash, req_idx, inst, futures, owners); + } + } + + fn insert_blocks_into_l0( + &mut self, + hashes: &[u64], + req_idx: usize, + inst: u32, + futures: &FutureIndex, + owners: &mut AHashMap>, + ) { + for &hash in hashes { + self.insert_block_into_l0(hash, req_idx, inst, futures, owners); + } + } + + fn insert_block_into_l0( + &mut self, + hash: u64, + req_idx: usize, + inst: u32, + futures: &FutureIndex, + owners: &mut AHashMap>, + ) { + if self.l0.touch(hash, req_idx, FutureKind::L0, inst, futures) { + return; + } + if self.l1.remove(hash) { + remove_owner(owners, hash, inst); + } + if let Some(evicted_l0) = self.l0.insert(hash, req_idx, FutureKind::L0, inst, futures) { + self.demote_into_l1(evicted_l0, req_idx, inst, futures, owners); + } + } + + fn stage_remote_block_in_l1( + &mut self, + hash: u64, + req_idx: usize, + inst: u32, + futures: &FutureIndex, + owners: &mut AHashMap>, + ) { + if self.l0.contains(hash) || self.l1.contains(hash) { + return; + } + if let Some(evicted_l1) = self.l1.insert(hash, req_idx, FutureKind::L1, inst, futures) { + remove_owner(owners, evicted_l1, inst); + } + add_owner(owners, hash, inst); + } + + fn demote_into_l1( + &mut self, + hash: u64, + req_idx: usize, + inst: u32, + futures: &FutureIndex, + owners: &mut AHashMap>, + ) { + if self.l1.touch(hash, req_idx, FutureKind::L1, inst, futures) { + return; + } + if let Some(evicted_l1) = self.l1.insert(hash, req_idx, FutureKind::L1, inst, futures) { + remove_owner(owners, evicted_l1, inst); + } + add_owner(owners, hash, inst); + } +} + +fn add_owner(owners: &mut AHashMap>, hash: u64, inst: u32) { + owners.entry(hash).or_default().insert(inst); +} + +fn remove_owner(owners: &mut AHashMap>, hash: u64, inst: u32) { + if let Some(bucket) = owners.get_mut(&hash) { + bucket.remove(&inst); + if bucket.is_empty() { + owners.remove(&hash); + } + } +} + +pub fn replay_fixed_placement( + cfg: &Config, + records: &[RequestRecord], + placements: &[PlacementEntry], + policy: ReplayEvictPolicy, +) -> Result { + if records.len() != placements.len() { + return Err(anyhow!( + "records/placements length mismatch: {} vs {}", + records.len(), + placements.len() + )); + } + let placement_by_req: AHashMap = + placements.iter().map(|p| (p.req_id, p.instance)).collect(); + let ordered_placement: Vec = records + .iter() + .map(|r| { + placement_by_req + .get(&r.req_id) + .copied() + .ok_or_else(|| anyhow!("missing placement for req_id={}", r.req_id)) + }) + .collect::>()?; + let futures = FutureIndex::build(records, &ordered_placement); + + let block_bytes = cfg.model.kv_block_bytes() as f64; + let l0_cap = (cfg.hardware.hbm_bytes / block_bytes).max(1.0) as usize; + let l1_cap = (cfg.hardware.dram_bytes / block_bytes).max(1.0) as usize; + let num_instances = cfg.cluster.num_instances as usize; + let mut caches: Vec = (0..num_instances) + .map(|_| ReplayInstanceCache::new(policy, l0_cap, l1_cap)) + .collect(); + let mut owners: AHashMap> = AHashMap::new(); + + let mut total_blocks = 0u64; + let mut l0_hit_blocks = 0u64; + let mut l1_hit_blocks = 0u64; + let mut remote_hit_blocks = 0u64; + let mut miss_blocks = 0u64; + let mut total_rdma_bytes = 0u64; + let mut total_pcie_bytes = 0u64; + + for (req_idx, record) in records.iter().enumerate() { + let inst = ordered_placement[req_idx]; + let cache = &mut caches[inst as usize]; + total_blocks += record.hash_ids.len() as u64; + + let l0_hits = cache.l0.longest_prefix_touch( + &record.hash_ids, + req_idx, + FutureKind::L0, + inst, + &futures, + ); + let suffix_after_l0 = &record.hash_ids[l0_hits..]; + let l1_hits = cache.l1.longest_prefix_peek(suffix_after_l0); + if l1_hits > 0 { + cache.promote_l1_blocks_to_l0( + &suffix_after_l0[..l1_hits], + req_idx, + inst, + &futures, + &mut owners, + ); + } + + let suffix_after_l1 = &suffix_after_l0[l1_hits..]; + let mut remote_hits = 0usize; + for &hash in suffix_after_l1 { + let any_remote = owners + .get(&hash) + .map(|bucket| bucket.iter().any(|owner| *owner != inst)) + .unwrap_or(false); + if any_remote { + remote_hits += 1; + } else { + break; + } + } + if remote_hits > 0 { + cache.fetch_remote_blocks_to_l0( + &suffix_after_l1[..remote_hits], + req_idx, + inst, + &futures, + &mut owners, + ); + } + + let misses = record.hash_ids.len() - l0_hits - l1_hits - remote_hits; + let new_input = &record.hash_ids[(l0_hits + l1_hits + remote_hits)..]; + if !new_input.is_empty() { + cache.insert_blocks_into_l0(new_input, req_idx, inst, &futures, &mut owners); + } + + l0_hit_blocks += l0_hits as u64; + l1_hit_blocks += l1_hits as u64; + remote_hit_blocks += remote_hits as u64; + miss_blocks += misses as u64; + let kv_block_bytes = cfg.model.kv_block_bytes(); + total_rdma_bytes += (remote_hits as u64) * kv_block_bytes; + total_pcie_bytes += ((l1_hits + remote_hits) as u64) * kv_block_bytes; + } + + Ok(ReplaySummary::from_counts( + records.len(), + total_blocks, + l0_hit_blocks, + l1_hit_blocks, + remote_hit_blocks, + miss_blocks, + total_rdma_bytes, + total_pcie_bytes, + )) +} diff --git a/tests/smoke.rs b/tests/smoke.rs index ac6253e..1b9d20c 100644 --- a/tests/smoke.rs +++ b/tests/smoke.rs @@ -6,6 +6,7 @@ use std::io::Write; use kvcache_simulator::config::*; use kvcache_simulator::driver; +use kvcache_simulator::replay::ReplayEvictPolicy; fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config { Config { @@ -36,7 +37,9 @@ fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config { }, cluster: ClusterConfig { num_instances: 4, - meta_store: MetaStoreConfig { ttl_seconds: 1000.0 }, + meta_store: MetaStoreConfig { + ttl_seconds: 1000.0, + }, router: RouterConfig { mode, precise_probe_latency_us: 10.0, @@ -94,9 +97,11 @@ fn write_synthetic_trace(path: &std::path::Path) { } } -fn run(mode: RouterMode, trace_path: &std::path::Path, out_root: &std::path::Path) - -> kvcache_simulator::metrics::Summary -{ +fn run( + mode: RouterMode, + trace_path: &std::path::Path, + out_root: &std::path::Path, +) -> kvcache_simulator::metrics::Summary { let cfg = base_config( trace_path.to_str().unwrap(), out_root.to_str().unwrap(), @@ -119,9 +124,8 @@ fn ablation_hit_rate_ordering() { let s_ttl = run(RouterMode::TtlAware, &trace_path, &tmp); let s_prec = run(RouterMode::Precise, &trace_path, &tmp); - let total_hit = |s: &kvcache_simulator::metrics::Summary| { - s.hit_rate_l0 + s.hit_rate_l1 + s.hit_rate_remote - }; + let total_hit = + |s: &kvcache_simulator::metrics::Summary| s.hit_rate_l0 + s.hit_rate_l1 + s.hit_rate_remote; let h_rand = total_hit(&s_random); let h_ll = total_hit(&s_ll); @@ -135,23 +139,79 @@ fn ablation_hit_rate_ordering() { eprintln!( " remote+local hit ratio L0/L1/remote: \ random=({:.2},{:.2},{:.2}) precise=({:.2},{:.2},{:.2})", - s_random.hit_rate_l0, s_random.hit_rate_l1, s_random.hit_rate_remote, - s_prec.hit_rate_l0, s_prec.hit_rate_l1, s_prec.hit_rate_remote, + s_random.hit_rate_l0, + s_random.hit_rate_l1, + s_random.hit_rate_remote, + s_prec.hit_rate_l0, + s_prec.hit_rate_l1, + s_prec.hit_rate_remote, ); // ttl_aware and precise should outperform random / least_loaded for // a workload built entirely of shared-prefix conversations. let eps = 1e-6; - assert!( - h_ttl + eps >= h_rand, - "ttl_aware should >= random hit rate" - ); - assert!( - h_prec + eps >= h_rand, - "precise should >= random hit rate" - ); + assert!(h_ttl + eps >= h_rand, "ttl_aware should >= random hit rate"); + assert!(h_prec + eps >= h_rand, "precise should >= random hit rate"); assert!( h_prec + eps >= h_ll, "precise should >= least_loaded hit rate" ); } + +#[test] +fn ablation_lru_preserves_ttft_fields() { + let tmp = std::env::temp_dir().join("kvcache_sim_replay"); + let _ = std::fs::remove_dir_all(&tmp); + std::fs::create_dir_all(&tmp).unwrap(); + let trace_path = tmp.join("trace.jsonl"); + write_synthetic_trace(&trace_path); + + let cfg = base_config( + trace_path.to_str().unwrap(), + tmp.to_str().unwrap(), + RouterMode::Random, + ); + let online = driver::run(&cfg, Some("online_lru")).expect("online lru run"); + let out = driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru]) + .expect("ablate lru"); + + assert_eq!(out.len(), 1); + let row = &out[0]; + let online_hit = online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote; + let ablate_hit = row.hit_rate_l0 + row.hit_rate_l1 + row.hit_rate_remote; + + assert!( + (ablate_hit - online_hit).abs() < 1e-9, + "ablation lru should match online lru hit rate: online={online_hit} ablate={ablate_hit}" + ); + assert!((row.ttft_mean - online.summary.ttft_mean).abs() < 1e-9); + assert!((row.ttft_p50 - online.summary.ttft_p50).abs() < 1e-9); + assert!((row.ttft_p95 - online.summary.ttft_p95).abs() < 1e-9); + assert!((row.ttft_p99 - online.summary.ttft_p99).abs() < 1e-9); +} + +#[test] +fn ablate_rejects_belady_until_exact_algorithm_exists() { + let tmp = std::env::temp_dir().join("kvcache_sim_ablate_evict"); + let _ = std::fs::remove_dir_all(&tmp); + std::fs::create_dir_all(&tmp).unwrap(); + let trace_path = tmp.join("trace.jsonl"); + write_synthetic_trace(&trace_path); + + let cfg = base_config( + trace_path.to_str().unwrap(), + tmp.to_str().unwrap(), + RouterMode::Random, + ); + + let err = driver::ablate_fixed_placement( + &cfg, + &[RouterMode::Random], + &[ReplayEvictPolicy::Belady], + ) + .expect_err("belady should be rejected"); + assert!( + err.to_string().contains("exact belady"), + "unexpected error: {err:#}" + ); +}