From ff316c687378447b9ad42d56093eb945f39fa581 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Wed, 15 Apr 2026 17:31:39 +0800 Subject: [PATCH] fix: cache calculation --- .gitignore | 6 + src/cluster/mod.rs | 2 +- src/config.rs | 122 +++++++++++++++----- src/hardware_presets.rs | 68 +++++------ src/hf_config.rs | 50 ++++---- src/instance/compute.rs | 42 +++---- src/instance/mod.rs | 2 +- src/metrics/routing_log.rs | 4 +- src/metrics/timeseries.rs | 4 +- src/oracle.rs | 13 +-- src/router/cache_load.rs | 10 +- src/router/cache_score.rs | 8 +- src/router/estimated_ttft.rs | 74 ++++-------- src/router/least_loaded.rs | 3 +- src/router/min_pd.rs | 52 +++------ src/router/mod.rs | 208 +++++++++++++++++++++++++++++++++- src/router/precise_aware.rs | 84 ++++---------- src/router/prefix_affinity.rs | 34 ++---- src/router/random.rs | 4 +- src/router/ttl_aware.rs | 13 +-- src/sim/engine.rs | 13 ++- src/trace.rs | 3 +- tests/smoke.rs | 17 ++- 23 files changed, 500 insertions(+), 336 deletions(-) diff --git a/.gitignore b/.gitignore index f70b09b..75d0acd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,12 @@ # Trace files bailian-traces +# docs +docs +reports +scripts +tests/test_analyze_affinity_policy.py + # Rust build artifacts /target/ **/*.rs.bk diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index b6001dd..bc36667 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,6 +1,6 @@ -pub mod meta_store; #[allow(clippy::module_inception)] pub mod cluster; +pub mod meta_store; pub use cluster::Cluster; pub use meta_store::MetaStore; diff --git a/src/config.rs b/src/config.rs index 8f5fb54..057bfa1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -111,8 +111,7 @@ impl ModelConfig { * self.dtype_bytes as u64 * self.block_size_tokens as u64 } else { - 2u64 - * self.num_layers as u64 + 2u64 * self.num_layers as u64 * self.num_kv_heads as u64 * self.head_dim as u64 * self.dtype_bytes as u64 @@ -455,7 +454,12 @@ impl RawConfig { } } - Ok(Config { model, hardware, cluster: self.cluster, sim: self.sim }) + Ok(Config { + model, + hardware, + cluster: self.cluster, + sim: self.sim, + }) } } @@ -474,24 +478,60 @@ impl RawModelConfig { }; // Overlay: explicit YAML fields override the base. - if let Some(v) = self.name { m.name = v; } - if let Some(v) = self.num_layers { m.num_layers = v; } - if let Some(v) = self.num_kv_heads { m.num_kv_heads = v; } - if let Some(v) = self.head_dim { m.head_dim = v; } - if let Some(v) = self.dtype_bytes { m.dtype_bytes = v; } - if let Some(v) = self.block_size_tokens { m.block_size_tokens = v; } - if let Some(v) = self.hidden_size { m.hidden_size = Some(v); } - if let Some(v) = self.num_attention_heads { m.num_attention_heads = Some(v); } - if let Some(v) = self.intermediate_size { m.intermediate_size = Some(v); } - if self.moe.is_some() { m.moe = self.moe; } - if self.mla.is_some() { m.mla = self.mla; } - if self.attention.is_some() { m.attention = self.attention; } - if let Some(v) = self.flops_per_token_prefill { m.flops_per_token_prefill = Some(v); } - if let Some(v) = self.attn_quadratic_coeff { m.attn_quadratic_coeff = Some(v); } - if let Some(v) = self.bytes_per_token_prefill { m.bytes_per_token_prefill = Some(v); } - if self.compute_dtype.is_some() { m.compute_dtype = self.compute_dtype; } - if let Some(v) = self.flops_per_token_decode { m.flops_per_token_decode = Some(v); } - if let Some(v) = self.bytes_per_token_decode { m.bytes_per_token_decode = Some(v); } + if let Some(v) = self.name { + m.name = v; + } + if let Some(v) = self.num_layers { + m.num_layers = v; + } + if let Some(v) = self.num_kv_heads { + m.num_kv_heads = v; + } + if let Some(v) = self.head_dim { + m.head_dim = v; + } + if let Some(v) = self.dtype_bytes { + m.dtype_bytes = v; + } + if let Some(v) = self.block_size_tokens { + m.block_size_tokens = v; + } + if let Some(v) = self.hidden_size { + m.hidden_size = Some(v); + } + if let Some(v) = self.num_attention_heads { + m.num_attention_heads = Some(v); + } + if let Some(v) = self.intermediate_size { + m.intermediate_size = Some(v); + } + if self.moe.is_some() { + m.moe = self.moe; + } + if self.mla.is_some() { + m.mla = self.mla; + } + if self.attention.is_some() { + m.attention = self.attention; + } + if let Some(v) = self.flops_per_token_prefill { + m.flops_per_token_prefill = Some(v); + } + if let Some(v) = self.attn_quadratic_coeff { + m.attn_quadratic_coeff = Some(v); + } + if let Some(v) = self.bytes_per_token_prefill { + m.bytes_per_token_prefill = Some(v); + } + if self.compute_dtype.is_some() { + m.compute_dtype = self.compute_dtype; + } + if let Some(v) = self.flops_per_token_decode { + m.flops_per_token_decode = Some(v); + } + if let Some(v) = self.bytes_per_token_decode { + m.bytes_per_token_decode = Some(v); + } // Validate deployment-specific fields that HF config.json never provides. anyhow::ensure!( @@ -535,16 +575,36 @@ impl RawHardwareConfig { }; // Overlay: explicit YAML fields override the preset / defaults. - if let Some(v) = self.gpu_flops { hw.gpu_flops = v; } - if let Some(v) = self.gpu_mem_bw { hw.gpu_mem_bw = v; } - if let Some(v) = self.hbm_bytes { hw.hbm_bytes = v; } - if let Some(v) = self.dram_bytes { hw.dram_bytes = v; } - if let Some(v) = self.pcie_bw { hw.pcie_bw = v; } - if let Some(v) = self.pcie_latency_us { hw.pcie_latency_us = v; } - if let Some(v) = self.rdma_bw { hw.rdma_bw = v; } - if let Some(v) = self.rdma_latency_us { hw.rdma_latency_us = v; } - if let Some(v) = self.max_batch_slots { hw.max_batch_slots = v; } - if let Some(v) = self.prefill_chunk_tokens { hw.prefill_chunk_tokens = v; } + if let Some(v) = self.gpu_flops { + hw.gpu_flops = v; + } + if let Some(v) = self.gpu_mem_bw { + hw.gpu_mem_bw = v; + } + if let Some(v) = self.hbm_bytes { + hw.hbm_bytes = v; + } + if let Some(v) = self.dram_bytes { + hw.dram_bytes = v; + } + if let Some(v) = self.pcie_bw { + hw.pcie_bw = v; + } + if let Some(v) = self.pcie_latency_us { + hw.pcie_latency_us = v; + } + if let Some(v) = self.rdma_bw { + hw.rdma_bw = v; + } + if let Some(v) = self.rdma_latency_us { + hw.rdma_latency_us = v; + } + if let Some(v) = self.max_batch_slots { + hw.max_batch_slots = v; + } + if let Some(v) = self.prefill_chunk_tokens { + hw.prefill_chunk_tokens = v; + } // Validate minimum requirements. anyhow::ensure!(hw.gpu_flops > 0.0, "hardware.gpu_flops is required"); diff --git a/src/hardware_presets.rs b/src/hardware_presets.rs index 07d759c..2303561 100644 --- a/src/hardware_presets.rs +++ b/src/hardware_presets.rs @@ -78,56 +78,56 @@ fn parse_count_gpu(s: &str) -> (u32, String) { // -- Per-GPU base specs (single die, BF16 dense) ----------------------------- struct GpuBase { - flops: f64, // BF16 dense FLOPS - fp8_flops: f64, // FP8 dense FLOPS (0 = not supported) - fp4_flops: f64, // FP4 dense FLOPS (0 = not supported) - mem_bw: f64, // HBM bandwidth (B/s) - hbm: f64, // Total HBM (bytes) - pcie_gen: u32, // PCIe generation (4/5/6) + flops: f64, // BF16 dense FLOPS + fp8_flops: f64, // FP8 dense FLOPS (0 = not supported) + fp4_flops: f64, // FP4 dense FLOPS (0 = not supported) + mem_bw: f64, // HBM bandwidth (B/s) + hbm: f64, // Total HBM (bytes) + pcie_gen: u32, // PCIe generation (4/5/6) } const H100: GpuBase = GpuBase { - flops: 9.89e14, // 989 TFLOPS BF16 dense + flops: 9.89e14, // 989 TFLOPS BF16 dense fp8_flops: 1.979e15, // 1979 TFLOPS FP8 dense - fp4_flops: 0.0, // not supported - mem_bw: 3.35e12, // 3.35 TB/s HBM3 - hbm: 80.0e9, // 80 GB + fp4_flops: 0.0, // not supported + mem_bw: 3.35e12, // 3.35 TB/s HBM3 + hbm: 80.0e9, // 80 GB pcie_gen: 5, }; const H800: GpuBase = GpuBase { - flops: 9.89e14, // same die as H100 + flops: 9.89e14, // same die as H100 fp8_flops: 1.979e15, fp4_flops: 0.0, - mem_bw: 3.35e12, // 3.35 TB/s HBM3 - hbm: 80.0e9, // 80 GB + mem_bw: 3.35e12, // 3.35 TB/s HBM3 + hbm: 80.0e9, // 80 GB pcie_gen: 5, }; const H20: GpuBase = GpuBase { - flops: 1.48e14, // 148 TFLOPS BF16 (China-export Hopper) + flops: 1.48e14, // 148 TFLOPS BF16 (China-export Hopper) fp8_flops: 2.96e14, // 296 TFLOPS FP8 - fp4_flops: 0.0, // not supported - mem_bw: 4.0e12, // 4.0 TB/s HBM3 - hbm: 96.0e9, // 96 GB + fp4_flops: 0.0, // not supported + mem_bw: 4.0e12, // 4.0 TB/s HBM3 + hbm: 96.0e9, // 96 GB pcie_gen: 5, }; const A100_80GB: GpuBase = GpuBase { - flops: 3.12e14, // 312 TFLOPS BF16 - fp8_flops: 0.0, // A100 has no FP8 tensor cores + flops: 3.12e14, // 312 TFLOPS BF16 + fp8_flops: 0.0, // A100 has no FP8 tensor cores fp4_flops: 0.0, - mem_bw: 2.0e12, // 2.0 TB/s HBM2e - hbm: 80.0e9, // 80 GB + mem_bw: 2.0e12, // 2.0 TB/s HBM2e + hbm: 80.0e9, // 80 GB pcie_gen: 4, }; const A100_40GB: GpuBase = GpuBase { - flops: 3.12e14, // 312 TFLOPS BF16 + flops: 3.12e14, // 312 TFLOPS BF16 fp8_flops: 0.0, fp4_flops: 0.0, - mem_bw: 1.555e12, // 1.555 TB/s HBM2e - hbm: 40.0e9, // 40 GB + mem_bw: 1.555e12, // 1.555 TB/s HBM2e + hbm: 40.0e9, // 40 GB pcie_gen: 4, }; @@ -143,11 +143,11 @@ const B200: GpuBase = GpuBase { // DGX B300 (8 GPU) specs: BF16 18 PFLOPS, FP8 ~54 PFLOPS, FP4 108 PFLOPS (dense) const B300: GpuBase = GpuBase { - flops: 2.25e15, // 2250 TFLOPS BF16 dense (same GB202 die as B200) - fp8_flops: 6.75e15, // 6750 TFLOPS FP8 dense (estimated from FP4/2) - fp4_flops: 13.5e15, // 13500 TFLOPS FP4 dense (Blackwell Ultra enhanced) - mem_bw: 12.0e12, // 12 TB/s HBM3e 12-Hi - hbm: 288.0e9, // 288 GB HBM3e 12-Hi + flops: 2.25e15, // 2250 TFLOPS BF16 dense (same GB202 die as B200) + fp8_flops: 6.75e15, // 6750 TFLOPS FP8 dense (estimated from FP4/2) + fp4_flops: 13.5e15, // 13500 TFLOPS FP4 dense (Blackwell Ultra enhanced) + mem_bw: 12.0e12, // 12 TB/s HBM3e 12-Hi + hbm: 288.0e9, // 288 GB HBM3e 12-Hi pcie_gen: 6, }; @@ -162,15 +162,15 @@ fn make_config(n: u32, base: &GpuBase) -> HardwareConfig { // PCIe per-GPU bandwidth and latency by generation let (pcie_per_gpu, pcie_lat) = match base.pcie_gen { - 6 => (128.0e9, 4.0), // Gen6 x16 - 5 => (64.0e9, 5.0), // Gen5 x16 - _ => (32.0e9, 5.0), // Gen4 x16 + 6 => (128.0e9, 4.0), // Gen6 x16 + 5 => (64.0e9, 5.0), // Gen5 x16 + _ => (32.0e9, 5.0), // Gen4 x16 }; // RDMA: base NIC speed by PCIe gen, scaled for multi-NIC servers let (rdma_base, rdma_lat) = match base.pcie_gen { - 6 => (50.0e9, 6.0), // 400 Gbps NIC - _ => (25.0e9, 8.0), // 200 Gbps NIC + 6 => (50.0e9, 6.0), // 400 Gbps NIC + _ => (25.0e9, 8.0), // 200 Gbps NIC }; let rdma_scale = if n >= 8 { 2.0 } else { 1.0 }; diff --git a/src/hf_config.rs b/src/hf_config.rs index 8777751..e1f0c75 100644 --- a/src/hf_config.rs +++ b/src/hf_config.rs @@ -7,7 +7,7 @@ use anyhow::{Context, Result}; use serde_json::Value; use std::path::Path; -use crate::config::{AttentionConfig, MlaConfig, MoeConfig, ModelConfig}; +use crate::config::{AttentionConfig, MlaConfig, ModelConfig, MoeConfig}; /// Parse a HuggingFace config.json and return a partially-populated /// [`ModelConfig`]. The caller must still set `dtype_bytes` and @@ -34,8 +34,7 @@ fn parse_value(v: &Value) -> Result { let num_layers = u32_field(v, "num_hidden_layers"); let hidden_size = u32_field(v, "hidden_size"); let num_attention_heads = u32_field(v, "num_attention_heads"); - let num_kv_heads = u32_field(v, "num_key_value_heads") - .or(num_attention_heads); // default to MHA + let num_kv_heads = u32_field(v, "num_key_value_heads").or(num_attention_heads); // default to MHA let head_dim = u32_field(v, "head_dim").or_else(|| { // Infer: hidden_size / num_attention_heads match (hidden_size, num_attention_heads) { @@ -70,25 +69,24 @@ fn parse_value(v: &Value) -> Result { }); // --- Attention pattern --- - let attention = - if let Some(first_dense) = u32_field(v, "first_k_dense_replace") { - // DSA-style model (GLM-5, DeepSeek-V3). - // dense_window and sparse_stride are typically not in config.json; - // use sensible defaults the user can override in YAML. - Some(AttentionConfig::Dsa { - dense_window: 4096, - sparse_stride: 8, - first_dense_layers: first_dense, - }) - } else if let Some(sw) = v - .get("sliding_window") - .and_then(|x| x.as_u64()) - .map(|x| x as u32) - { - Some(AttentionConfig::SlidingWindow { window_size: sw }) - } else { - None // dense by default - }; + let attention = if let Some(first_dense) = u32_field(v, "first_k_dense_replace") { + // DSA-style model (GLM-5, DeepSeek-V3). + // dense_window and sparse_stride are typically not in config.json; + // use sensible defaults the user can override in YAML. + Some(AttentionConfig::Dsa { + dense_window: 4096, + sparse_stride: 8, + first_dense_layers: first_dense, + }) + } else if let Some(sw) = v + .get("sliding_window") + .and_then(|x| x.as_u64()) + .map(|x| x as u32) + { + Some(AttentionConfig::SlidingWindow { window_size: sw }) + } else { + None // dense by default + }; Ok(ModelConfig { name, @@ -188,6 +186,12 @@ mod tests { assert_eq!(moe.num_active_experts, 8); let mla = m.mla.as_ref().unwrap(); assert_eq!(mla.kv_lora_rank, 512); - assert!(matches!(m.attention, Some(AttentionConfig::Dsa { first_dense_layers: 3, .. }))); + assert!(matches!( + m.attention, + Some(AttentionConfig::Dsa { + first_dense_layers: 3, + .. + }) + )); } } diff --git a/src/instance/compute.rs b/src/instance/compute.rs index 5d6a073..ac7ee6c 100644 --- a/src/instance/compute.rs +++ b/src/instance/compute.rs @@ -33,7 +33,10 @@ pub enum AttentionPattern { SlidingWindow { window: f64 }, /// DeepSeek Sparse Attention: effective_ctx = min(N, dense_window) + /// max(0, N - dense_window) / sparse_stride. - Dsa { dense_window: f64, sparse_stride: f64 }, + Dsa { + dense_window: f64, + sparse_stride: f64, + }, } #[derive(Debug, Clone)] @@ -101,8 +104,9 @@ impl ComputeModel { // --- MLP FLOPs/token/layer (SwiGLU: gate + up + down = 3 matmuls) --- let mlp = if let Some(moe) = &model.moe { - let expert_inter = moe.expert_intermediate_size - .unwrap_or(model.intermediate_size.unwrap_or(0)) as f64; + let expert_inter = + moe.expert_intermediate_size + .unwrap_or(model.intermediate_size.unwrap_or(0)) as f64; let active = moe.num_active_experts as f64; let shared = moe.num_shared_experts as f64; active * 6.0 * h * expert_inter + shared * 6.0 * h * inter @@ -132,16 +136,14 @@ impl ComputeModel { let qk_hd = (mla.qk_nope_head_dim + mla.qk_rope_head_dim) as f64; let qk_rd = mla.qk_rope_head_dim as f64; let vhd = mla.v_head_dim as f64; - (h * qlr + qlr * n_heads * qk_hd - + h * (kvlr + qk_rd) - + n_heads * vhd * h) - * wdtype + (h * qlr + qlr * n_heads * qk_hd + h * (kvlr + qk_rd) + n_heads * vhd * h) * wdtype } else { ((n_heads + 2.0 * n_kv) * hd * h + n_heads * hd * h) * wdtype }; let mlp_wt = if let Some(moe) = &model.moe { - let expert_inter = moe.expert_intermediate_size - .unwrap_or(model.intermediate_size.unwrap_or(0)) as f64; + let expert_inter = + moe.expert_intermediate_size + .unwrap_or(model.intermediate_size.unwrap_or(0)) as f64; let active = moe.num_active_experts as f64; let shared = moe.num_shared_experts as f64; (active * 3.0 * h * expert_inter + shared * 3.0 * h * inter) * wdtype @@ -169,10 +171,9 @@ impl ComputeModel { }, 0.0, ), - Some(AttentionConfig::Dense) | None => ( - AttentionPattern::Dense, - model.num_layers as f64, - ), + Some(AttentionConfig::Dense) | None => { + (AttentionPattern::Dense, model.num_layers as f64) + } }; Self { @@ -237,10 +238,10 @@ impl ComputeModel { let dense_layers = self.first_dense_layers; let sparse_layers = self.num_layers - dense_layers; - let dense_flops = dense_layers - * (linear + self.attn_coeff * n * self.effective_ctx(n, true)); - let sparse_flops = sparse_layers - * (linear + self.attn_coeff * n * self.effective_ctx(n, false)); + let dense_flops = + dense_layers * (linear + self.attn_coeff * n * self.effective_ctx(n, true)); + let sparse_flops = + sparse_layers * (linear + self.attn_coeff * n * self.effective_ctx(n, false)); let total_flops = dense_flops + sparse_flops; let compute_time = total_flops / self.gpu_flops; @@ -254,7 +255,9 @@ impl ComputeModel { pub fn describe(&self) -> String { let pattern_str = match &self.attn_pattern { AttentionPattern::Dense => "dense".to_string(), - AttentionPattern::SlidingWindow { window } => format!("sliding_window({})", *window as u64), + AttentionPattern::SlidingWindow { window } => { + format!("sliding_window({})", *window as u64) + } AttentionPattern::Dsa { dense_window, sparse_stride, @@ -266,8 +269,7 @@ impl ComputeModel { format!( "linear_flops/tok/layer={:.3e}, attn_coeff={:.0}, pattern={}, \ weight_bytes/layer={:.2e}", - self.linear_flops_per_token, self.attn_coeff, pattern_str, - self.weight_bytes_per_layer, + self.linear_flops_per_token, self.attn_coeff, pattern_str, self.weight_bytes_per_layer, ) } } diff --git a/src/instance/mod.rs b/src/instance/mod.rs index 8cbb697..d42aaf0 100644 --- a/src/instance/mod.rs +++ b/src/instance/mod.rs @@ -1,6 +1,6 @@ pub mod compute; -pub mod kv_cache; #[allow(clippy::module_inception)] pub mod instance; +pub mod kv_cache; pub use instance::Instance; diff --git a/src/metrics/routing_log.rs b/src/metrics/routing_log.rs index 5e3817a..83efbe2 100644 --- a/src/metrics/routing_log.rs +++ b/src/metrics/routing_log.rs @@ -12,7 +12,9 @@ pub struct RoutingLogWriter { impl RoutingLogWriter { pub fn create>(path: P) -> Result { let f = File::create(path)?; - Ok(Self { inner: BufWriter::new(f) }) + Ok(Self { + inner: BufWriter::new(f), + }) } pub fn write(&mut self, decision: &RouteDecision) -> Result<()> { diff --git a/src/metrics/timeseries.rs b/src/metrics/timeseries.rs index 327cc0c..cb670e0 100644 --- a/src/metrics/timeseries.rs +++ b/src/metrics/timeseries.rs @@ -19,7 +19,9 @@ pub struct TimeseriesWriter { impl TimeseriesWriter { pub fn create>(path: P) -> Result { let f = std::fs::File::create(path)?; - Ok(Self { inner: csv::Writer::from_writer(f) }) + Ok(Self { + inner: csv::Writer::from_writer(f), + }) } pub fn write(&mut self, row: &TimeseriesRow) -> Result<()> { diff --git a/src/oracle.rs b/src/oracle.rs index 75523b6..aec53c2 100644 --- a/src/oracle.rs +++ b/src/oracle.rs @@ -51,7 +51,11 @@ impl TierResult { capacity_blocks, hits, misses, - hit_rate: if total == 0 { 0.0 } else { hits as f64 / total as f64 }, + hit_rate: if total == 0 { + 0.0 + } else { + hits as f64 / total as f64 + }, } } } @@ -68,12 +72,7 @@ pub fn analyze(records: &[RequestRecord], capacity_blocks: u64) -> OracleResult // 1. Unlimited cache let unlimited_hits = run_unlimited(records); - let unlimited = TierResult::from_counts( - "unlimited", - u64::MAX, - unlimited_hits, - total_blocks, - ); + let unlimited = TierResult::from_counts("unlimited", u64::MAX, unlimited_hits, total_blocks); // 2. Precompute next-use index for Belady let next_use = build_next_use(records); diff --git a/src/router/cache_load.rs b/src/router/cache_load.rs index b919100..142e00c 100644 --- a/src/router/cache_load.rs +++ b/src/router/cache_load.rs @@ -13,7 +13,7 @@ use crate::cluster::meta_store::MetaStore; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct CacheLoadRouter; @@ -39,11 +39,11 @@ impl Router for CacheLoadRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); - let scores = meta.score_prefix(&req.hash_ids, now, n); + let scores = local_l0_scores(req, instances); // Step 1: least-loaded 1/4 of instances (by queue_len). let pool_size = (n / 4).max(2).min(n); @@ -83,7 +83,7 @@ impl Router for CacheLoadRouter { chosen: instances[best_idx].id, probe_overhead_s: 0.0, candidates, - reason: "least-loaded 1/4, then best prefix", + reason: "least-loaded 1/4, then best local L0 prefix", } } } diff --git a/src/router/cache_score.rs b/src/router/cache_score.rs index 495d5b1..ab23882 100644 --- a/src/router/cache_score.rs +++ b/src/router/cache_score.rs @@ -32,7 +32,7 @@ use crate::cluster::meta_store::MetaStore; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct CacheScoreRouter { @@ -55,11 +55,11 @@ impl Router for CacheScoreRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); - let scores = meta.score_prefix(&req.hash_ids, now, n); + let scores = local_l0_scores(req, instances); let input_blocks = req.hash_ids.len() as f64; let mut best_idx: usize = 0; diff --git a/src/router/estimated_ttft.rs b/src/router/estimated_ttft.rs index 314872e..250ab5a 100644 --- a/src/router/estimated_ttft.rs +++ b/src/router/estimated_ttft.rs @@ -1,56 +1,30 @@ -//! First-principles TTFT-optimal routing. +//! First-principles TTFT-estimate routing using local L0 hits only. //! //! Estimates the actual time-to-first-token for each candidate instance: //! -//! `TTFT(r,i) = drain(i) + fetch(r,i) + prefill(miss)` +//! `TTFT(r,i) = drain(i) + prefill(local_l0_miss_i)` //! //! - **drain** — exact queue drain time: sum of per-request `prefill_time()` //! using the architecture-aware compute model (quadratic / DSA). //! -//! - **fetch** — RDMA fetch time for blocks cached elsewhere in the cluster -//! but not on instance `i` locally. +//! - **prefill** — compute for tokens whose blocks are absent from the +//! instance's current L0 cache. //! -//! - **prefill** — compute for cluster-wide cache-miss tokens (constant -//! across instances, cancels in the argmin). -//! -//! The router minimises `drain(i) + fetch(r,i)`, with ties broken by -//! lowest `queue_len` then most local cache. The fetch overlap with queue -//! drain is handled by keeping the additive form: this gives double -//! incentive to prefer instances with local cache, which empirically -//! outperforms the `max(drain, fetch)` alternative because even small -//! RDMA savings compound across thousands of routing decisions. +//! L1 / remote reuse can still reduce execution-time misses later in the +//! cluster fetch chain, but they are not counted as `kvcache hit` when +//! comparing routing candidates. use crate::cluster::meta_store::MetaStore; use crate::config::Config; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; -pub struct EstimatedTtftRouter { - /// Bytes per KV block (for RDMA cost estimation). - kv_block_bytes: f64, - /// RDMA bandwidth in bytes/s. - rdma_bw: f64, - /// RDMA per-transfer latency in seconds. - rdma_latency_s: f64, -} +pub struct EstimatedTtftRouter; impl EstimatedTtftRouter { - pub fn new(config: &Config) -> Self { - Self { - kv_block_bytes: config.model.kv_block_bytes() as f64, - rdma_bw: config.hardware.rdma_bw, - rdma_latency_s: config.hardware.rdma_latency_us * 1e-6, - } - } - - /// Estimate RDMA fetch time for `remote_blocks` blocks. - fn fetch_time(&self, remote_blocks: u32) -> f64 { - if remote_blocks == 0 { - return 0.0; - } - let bytes = remote_blocks as f64 * self.kv_block_bytes; - bytes / self.rdma_bw + self.rdma_latency_s + pub fn new(_config: &Config) -> Self { + Self } } @@ -63,14 +37,12 @@ impl Router for EstimatedTtftRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); - let scores = meta.score_prefix(&req.hash_ids, now, n); - - // Cluster-wide max prefix: blocks reachable via RDMA from any peer. - let cluster_prefix = scores.iter().copied().max().unwrap_or(0); + let scores = local_l0_scores(req, instances); + let input_blocks = req.hash_ids.len() as u32; let mut best: u32 = 0; let mut best_cost = f64::INFINITY; @@ -85,15 +57,11 @@ impl Router for EstimatedTtftRouter { // 1. Exact queue drain time (architecture-aware, per-request sum). let drain = inst.estimated_drain_time(); - // 2. RDMA fetch cost for blocks not locally cached. - let remote_blocks = cluster_prefix.saturating_sub(local_prefix); - let fetch = self.fetch_time(remote_blocks); - - // Additive cost: drain + fetch. - // The additive form gives explicit incentive to prefer local cache - // (lower fetch) even when the queue is non-empty, which reduces - // total RDMA traffic and improves TTFT in aggregate. - let cost = drain + fetch; + // 2. Prefill compute for blocks absent from local L0. + let miss_tokens = input_blocks + .saturating_sub(local_prefix) + .saturating_mul(inst.block_size_tokens); + let cost = drain + inst.compute.prefill_time(miss_tokens); candidates.push(CandidateInfo { instance: inst.id, @@ -122,7 +90,7 @@ impl Router for EstimatedTtftRouter { chosen: best, probe_overhead_s: 0.0, candidates, - reason: "argmin(drain_time + fetch_time)", + reason: "argmin(drain_time + local-L0-miss prefill_time)", } } } diff --git a/src/router/least_loaded.rs b/src/router/least_loaded.rs index 7a722aa..efc0ed8 100644 --- a/src/router/least_loaded.rs +++ b/src/router/least_loaded.rs @@ -29,8 +29,7 @@ impl Router for LeastLoadedRouter { let mut best_score = f64::INFINITY; let mut candidates = Vec::with_capacity(instances.len()); for inst in instances { - let load = inst.kv_blocks_used as f64 - + self.alpha * inst.queue_len() as f64; + let load = inst.kv_blocks_used as f64 + self.alpha * inst.queue_len() as f64; candidates.push(CandidateInfo { instance: inst.id, predicted_prefix: 0, diff --git a/src/router/min_pd.rs b/src/router/min_pd.rs index 87a3d4f..f801639 100644 --- a/src/router/min_pd.rs +++ b/src/router/min_pd.rs @@ -1,4 +1,4 @@ -//! Minimum P*D routing. +//! Minimum P*D routing using real local L0 hits only. //! //! For each instance compute: //! - `P` = real prefill tokens this request will do if routed there @@ -7,30 +7,18 @@ //! //! Score = `P * D`, pick the instance that minimizes it. //! -//! `P` accounts for the **actual** prefill work after the cluster fetch -//! chain runs: the fetch chain serves any block cached anywhere in the -//! cluster (L0 → L1 → remote v6d via RDMA), so prefill compute only runs -//! for blocks that are absent cluster-wide *and* for blocks past the -//! instance-local prefix (the cluster only fetches a contiguous leading -//! prefix — any gap ends the fetch chain and the rest must be prefilled). +//! `P` accounts only for blocks that miss in the candidate instance's +//! current L0 cache. L1 / remote reuse may still reduce execution-time +//! work later in the cluster fetch chain, but they do not count as +//! `kvcache hit` for routing. //! //! Concretely, for instance `i`: //! //! ```text -//! local_prefix_i = meta_store.score_prefix(req, now)[i] // blocks -//! cluster_prefix = max over all j of meta_store_score[j] // blocks -//! effective_prefix_i = min(cluster_prefix, input_blocks) -//! - if local_prefix_i == cluster_prefix the fetch chain stays local, -//! - otherwise the prefill still skips cluster_prefix blocks because -//! the missing tail is fetched via RDMA from a peer. -//! P_i = (input_blocks - effective_prefix_i) * block_size_tokens +//! local_prefix_i = longest L0 prefix on instance i // blocks +//! P_i = (input_blocks - local_prefix_i) * block_size_tokens //! ``` //! -//! This makes `P` nearly instance-independent on well-populated clusters -//! (so `min_pd` degenerates to balanced load with a cache-affinity -//! tiebreak), which is exactly what you want when RDMA is cheap relative -//! to prefill compute. -//! //! Tiebreaks (essential on 128-instance clusters where many instances are //! idle and the raw product collapses to zero): //! 1. minimum `P*D` @@ -40,7 +28,7 @@ use crate::cluster::meta_store::MetaStore; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct MinPdRouter; @@ -66,36 +54,26 @@ impl Router for MinPdRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); - let scores = meta.score_prefix(&req.hash_ids, now, n); + let scores = local_l0_scores(req, instances); let block_size = instances[0].block_size_tokens as u64; let input_blocks = req.hash_ids.len() as u64; - // Cluster-wide max prefix: longest contiguous prefix that EXISTS - // somewhere in the cluster (and will be fetched via remote RDMA if - // not local). This determines the effective prefill work for every - // candidate, not just the one that owns the blocks. - let cluster_prefix_blocks = scores.iter().copied().max().unwrap_or(0) as u64; - let effective_prefix_blocks = cluster_prefix_blocks.min(input_blocks); - let miss_blocks = input_blocks.saturating_sub(effective_prefix_blocks); - let p_base = miss_blocks.saturating_mul(block_size); // tokens to prefill - let mut candidates = Vec::with_capacity(n); let mut best: u32 = instances[0].id; // Minimize (P*D, D, -local_prefix). - // P is nearly instance-independent; D is the real discriminator. - // When tied on D, prefer the instance with the best local prefix - // (avoids the RDMA fetch cost). let mut best_key: (u128, u64, i64) = (u128::MAX, u64::MAX, i64::MAX); for inst in instances { let i = inst.id as usize; let d = inst.queue_len() as u64; - let pd = p_base as u128 * d as u128; let local_prefix = scores[i] as i64; + let miss_blocks = input_blocks.saturating_sub(scores[i] as u64); + let p = miss_blocks.saturating_mul(block_size); + let pd = p as u128 * d as u128; candidates.push(CandidateInfo { instance: inst.id, @@ -118,7 +96,7 @@ impl Router for MinPdRouter { chosen: best, probe_overhead_s: 0.0, candidates, - reason: "argmin(P*D), P=cluster-wide miss tokens, D=ongoing reqs", + reason: "argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs", } } } diff --git a/src/router/mod.rs b/src/router/mod.rs index 3203382..6cafc8d 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -48,6 +48,17 @@ pub trait Router: Send { ) -> RouteDecision; } +pub(crate) fn local_l0_prefix(req: &RequestRecord, inst: &Instance) -> u32 { + inst.cache.l0.longest_prefix_peek(&req.hash_ids) as u32 +} + +pub(crate) fn local_l0_scores(req: &RequestRecord, instances: &[Instance]) -> Vec { + instances + .iter() + .map(|inst| local_l0_prefix(req, inst)) + .collect() +} + pub fn build(full: &Config, seed: u64) -> Box { use crate::config::RouterMode::*; let cfg = &full.cluster.router; @@ -66,10 +77,10 @@ pub fn build(full: &Config, seed: u64) -> Box { MinPd => Box::new(min_pd::MinPdRouter::new()) as Box, LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box, CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box, - CacheScore => { - Box::new(cache_score::CacheScoreRouter::new(cfg.score_alpha, cfg.score_beta)) - as Box - } + CacheScore => Box::new(cache_score::CacheScoreRouter::new( + cfg.score_alpha, + cfg.score_beta, + )) as Box, EstimatedTtft => { Box::new(estimated_ttft::EstimatedTtftRouter::new(full)) as Box } @@ -78,3 +89,192 @@ pub fn build(full: &Config, seed: u64) -> Box { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{ + ClusterConfig, HardwareConfig, MetaStoreConfig, ModelConfig, RouterConfig, RouterMode, + SimConfig, + }; + use crate::instance::instance::AdmittedRequest; + use crate::router::cache_load::CacheLoadRouter; + use crate::router::cache_score::CacheScoreRouter; + use crate::router::estimated_ttft::EstimatedTtftRouter; + use crate::router::min_pd::MinPdRouter; + use crate::router::precise_aware::PreciseRouter; + use crate::router::prefix_affinity::PrefixAffinityRouter; + use crate::router::ttl_aware::TtlAwareRouter; + use crate::trace::RequestRecord; + + fn test_model() -> ModelConfig { + ModelConfig { + name: "test".into(), + num_layers: 4, + num_kv_heads: 2, + head_dim: 64, + dtype_bytes: 2, + block_size_tokens: 16, + flops_per_token_prefill: Some(1.0e9), + attn_quadratic_coeff: Some(64.0), + ..Default::default() + } + } + + fn test_hardware() -> HardwareConfig { + HardwareConfig { + gpu_flops: 1.0e14, + gpu_fp8_flops: 0.0, + gpu_fp4_flops: 0.0, + gpu_mem_bw: 1.0e12, + hbm_bytes: 1.0e9, + dram_bytes: 4.0e9, + pcie_bw: 32.0e9, + pcie_latency_us: 1.0, + rdma_bw: 12.0e9, + rdma_latency_us: 5.0, + max_batch_slots: 32, + prefill_chunk_tokens: 1024, + } + } + + fn test_config(mode: RouterMode) -> Config { + Config { + model: test_model(), + hardware: test_hardware(), + cluster: ClusterConfig { + num_instances: 2, + meta_store: MetaStoreConfig { + ttl_seconds: 1000.0, + }, + router: RouterConfig { + mode, + precise_probe_latency_us: 10.0, + precise_probe_topk: 2, + load_alpha: 0.0, + score_alpha: 0.0, + score_beta: 1.0, + prefix_k: 8, + affinity_fan_out: 2, + }, + }, + sim: SimConfig { + trace_path: String::new(), + max_requests: None, + output_dir: String::new(), + sample_interval_s: 0.0, + seed: 7, + }, + } + } + + fn make_instances(n: usize) -> Vec { + let model = test_model(); + let hw = test_hardware(); + (0..n) + .map(|id| Instance::new(id as u32, &model, &hw)) + .collect() + } + + fn make_request(hashes: &[u64]) -> RequestRecord { + RequestRecord { + req_id: 1, + chat_id: 0, + arrival: 0.0, + input_len: hashes.len() as u32 * 16, + output_len: 16, + hash_ids: hashes.to_vec(), + } + } + + fn insert_l0(inst: &mut Instance, hashes: &[u64]) { + let mut evicted = Vec::new(); + inst.cache.l0.insert_blocks(hashes, &mut evicted); + } + + fn insert_l1(inst: &mut Instance, hashes: &[u64]) { + let mut evicted = Vec::new(); + inst.cache.l1.insert_blocks(hashes, &mut evicted); + } + + fn publish_meta(meta: &mut MetaStore, inst_id: u32, hashes: &[u64], now: f64) { + for &h in hashes { + meta.insert(h, inst_id, now); + } + } + + fn enqueue_requests(inst: &mut Instance, count: u32, tokens: u32) { + for req_id in 0..count { + inst.admit(AdmittedRequest { + req_id: req_id as u64, + arrival: 0.0, + ready_at: 0.0, + prefill_tokens_remaining: tokens, + reserved_blocks: 0, + }); + } + } + + #[test] + fn precise_uses_real_l0_prefix_not_l1_prefix() { + let req = make_request(&[10, 11, 12]); + let mut instances = make_instances(2); + let mut meta = MetaStore::new(1000.0); + + insert_l1(&mut instances[0], &[10, 11, 12]); + publish_meta(&mut meta, 0, &[10, 11, 12], 0.0); + insert_l0(&mut instances[1], &[10, 11]); + + let mut router = PreciseRouter::new(2, 10e-6, 0.0); + let decision = router.route(&req, &instances, &meta, 0.0); + + assert_eq!(decision.chosen, 1); + } + + #[test] + fn cache_aware_routers_compare_real_l0_not_meta_store_scores() { + let req = make_request(&[20, 21, 22]); + let mut instances = make_instances(2); + let mut meta = MetaStore::new(1000.0); + + // Instance 0 only looks hot in the meta store; its real L0 prefix is zero. + publish_meta(&mut meta, 0, &[20, 21, 22], 0.0); + // Instance 1 really holds the first two blocks in HBM. + insert_l0(&mut instances[1], &[20, 21]); + + let mut ttl = TtlAwareRouter::new(0.0); + assert_eq!(ttl.route(&req, &instances, &meta, 0.0).chosen, 1); + + let mut cache_load = CacheLoadRouter::new(); + assert_eq!(cache_load.route(&req, &instances, &meta, 0.0).chosen, 1); + + let mut cache_score = CacheScoreRouter::new(0.0, 1.0); + assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1); + + let mut min_pd = MinPdRouter::new(); + assert_eq!(min_pd.route(&req, &instances, &meta, 0.0).chosen, 1); + + let cfg = test_config(RouterMode::EstimatedTtft); + let mut est = EstimatedTtftRouter::new(&cfg); + assert_eq!(est.route(&req, &instances, &meta, 0.0).chosen, 1); + } + + #[test] + fn prefix_affinity_fallback_uses_real_l0_not_meta_store_scores() { + let req = make_request(&[30, 31, 32]); + let mut instances = make_instances(2); + let mut meta = MetaStore::new(1000.0); + + publish_meta(&mut meta, 0, &[30, 31, 32], 0.0); + insert_l0(&mut instances[1], &[30, 31]); + enqueue_requests(&mut instances[0], 5, 64); + enqueue_requests(&mut instances[1], 5, 64); + + let cfg = test_config(RouterMode::PrefixAffinity); + let mut router = PrefixAffinityRouter::new(&cfg); + let decision = router.route(&req, &instances, &meta, 0.0); + + assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)"); + assert_eq!(decision.chosen, 1); + } +} diff --git a/src/router/precise_aware.rs b/src/router/precise_aware.rs index 9a973d6..9940815 100644 --- a/src/router/precise_aware.rs +++ b/src/router/precise_aware.rs @@ -1,28 +1,13 @@ -//! KV-aware routing via meta-store candidate selection + precise probing. +//! L0-aware routing via exact per-instance probing. //! -//! The global meta store is used as a *candidate pre-filter*: we score -//! every instance's predicted prefix from the store, take the top-K by -//! (predicted_prefix DESC, load ASC), and then exact-probe those K -//! candidates' actual L0+L1 caches to get the true longest prefix. This -//! catches two cases where the meta store is wrong: -//! -//! - the store is stale (block evicted from L0/L1 but TTL not yet up), -//! - the store undercounts because some blocks' TTL expired individually. -//! -//! Because the candidate set is sourced from the meta store rather than -//! from a load ranking, this router is a strict superset of `ttl_aware`: -//! any instance the meta store would pick is a candidate here, and the -//! exact probe can only move the decision toward a truthfully-better -//! instance. Each probe adds `probe_latency_s` to the request's -//! effective arrival time. -//! -//! If the meta store returns zero-prefix for every instance (e.g. cold -//! start, or a request whose blocks have never been seen), we fall back -//! to the top-K least-loaded instances so we still place the request. +//! Every instance is compared using its *real current L0 prefix length* +//! only. L1 / remote availability can still reduce execution-time misses +//! later in the cluster fetch chain, but they do not count as `kvcache hit` +//! for routing. use crate::cluster::meta_store::MetaStore; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_prefix, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct PreciseRouter { @@ -33,7 +18,11 @@ pub struct PreciseRouter { impl PreciseRouter { pub fn new(topk: u32, probe_latency_s: f64, alpha: f64) -> Self { - Self { topk, probe_latency_s, alpha } + Self { + topk, + probe_latency_s, + alpha, + } } fn load_of(&self, inst: &Instance) -> f64 { @@ -50,50 +39,15 @@ impl Router for PreciseRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); - let k = (self.topk as usize).min(n).max(1); - - // 1. Meta-store candidate set: rank all instances by - // (predicted_prefix DESC, load ASC) and take the top-K. - let meta_scores = meta.score_prefix(&req.hash_ids, now, n); - let any_meta_hit = meta_scores.iter().any(|&p| p > 0); - - let mut ranked: Vec = (0..n).collect(); - if any_meta_hit { - ranked.sort_by(|&a, &b| { - let pa = meta_scores[a]; - let pb = meta_scores[b]; - // prefix desc, then load asc - pb.cmp(&pa) - .then_with(|| { - self.load_of(&instances[a]) - .partial_cmp(&self.load_of(&instances[b])) - .unwrap_or(std::cmp::Ordering::Equal) - }) - }); - } else { - // Cold start fallback: pure load order. - ranked.sort_by(|&a, &b| { - self.load_of(&instances[a]) - .partial_cmp(&self.load_of(&instances[b])) - .unwrap_or(std::cmp::Ordering::Equal) - }); - } - let probed = &ranked[..k]; - - // 2. Exact probe each candidate and pick - // argmax(exact_prefix, tiebreak: -load). - let mut candidates = Vec::with_capacity(k); - let mut best = probed[0] as u32; + let mut candidates = Vec::with_capacity(n); + let mut best = instances[0].id; let mut best_key: (i64, f64) = (i64::MIN, f64::INFINITY); - for &i in probed { - let inst = &instances[i]; - let l0 = inst.cache.l0.longest_prefix_peek(&req.hash_ids); - let l1 = inst.cache.l1.longest_prefix_peek(&req.hash_ids[l0..]); - let predicted = (l0 + l1) as u32; + for inst in instances { + let predicted = local_l0_prefix(req, inst); let load = self.load_of(inst); candidates.push(CandidateInfo { instance: inst.id, @@ -112,9 +66,9 @@ impl Router for PreciseRouter { req_id: req.req_id, mode: "precise", chosen: best, - probe_overhead_s: k as f64 * self.probe_latency_s, + probe_overhead_s: n as f64 * self.probe_latency_s, candidates, - reason: "exact-probe top-K meta-store candidates", + reason: "exact-probe all instances' L0 cache", } } } diff --git a/src/router/prefix_affinity.rs b/src/router/prefix_affinity.rs index 66e6a31..8f99b7a 100644 --- a/src/router/prefix_affinity.rs +++ b/src/router/prefix_affinity.rs @@ -36,7 +36,7 @@ use crate::cluster::meta_store::MetaStore; use crate::config::Config; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct PrefixAffinityRouter { @@ -47,12 +47,6 @@ pub struct PrefixAffinityRouter { /// Queue-length threshold: if all top candidates exceed this, expand to /// the full instance set. overload_threshold: u32, - /// Bytes per KV block (for RDMA cost estimation in fallback path). - kv_block_bytes: f64, - /// RDMA bandwidth in bytes/s. - rdma_bw: f64, - /// RDMA per-transfer latency in seconds. - rdma_latency_s: f64, } impl PrefixAffinityRouter { @@ -69,9 +63,6 @@ impl PrefixAffinityRouter { prefix_k: config.cluster.router.prefix_k, fan_out, overload_threshold: 4, - kv_block_bytes: config.model.kv_block_bytes() as f64, - rdma_bw: config.hardware.rdma_bw, - rdma_latency_s: config.hardware.rdma_latency_us * 1e-6, } } @@ -96,15 +87,6 @@ impl PrefixAffinityRouter { h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb); h ^ (h >> 31) } - - /// Estimate RDMA fetch time for `remote_blocks` blocks. - fn fetch_time(&self, remote_blocks: u32) -> f64 { - if remote_blocks == 0 { - return 0.0; - } - let bytes = remote_blocks as f64 * self.kv_block_bytes; - bytes / self.rdma_bw + self.rdma_latency_s - } } impl Router for PrefixAffinityRouter { @@ -116,8 +98,8 @@ impl Router for PrefixAffinityRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); let fp = Self::fingerprint(&req.hash_ids, self.prefix_k); @@ -129,7 +111,7 @@ impl Router for PrefixAffinityRouter { ranked.sort_unstable_by(|a, b| b.0.cmp(&a.0)); // descending score // Collect candidate info for logging (also needed for fallback). - let scores = meta.score_prefix(&req.hash_ids, now, n); + let scores = local_l0_scores(req, instances); let candidates: Vec = instances .iter() .map(|inst| CandidateInfo { @@ -165,14 +147,14 @@ impl Router for PrefixAffinityRouter { let reason; if all_overloaded { reason = "affinity fallback: min(drain+fetch)"; - let cluster_prefix = scores.iter().copied().max().unwrap_or(0); let mut best_cost = f64::INFINITY; for &(_, idx) in ranked.iter() { let inst = &instances[idx]; let drain = inst.estimated_drain_time(); - let local_prefix = scores[idx]; - let remote_blocks = cluster_prefix.saturating_sub(local_prefix); - let cost = drain + self.fetch_time(remote_blocks); + let miss_tokens = (req.hash_ids.len() as u32) + .saturating_sub(scores[idx]) + .saturating_mul(inst.block_size_tokens); + let cost = drain + inst.compute.prefill_time(miss_tokens); let ql = inst.queue_len(); if cost < best_cost || (cost == best_cost && ql < best_ql) { best_cost = cost; diff --git a/src/router/random.rs b/src/router/random.rs index 8457504..c4bfb17 100644 --- a/src/router/random.rs +++ b/src/router/random.rs @@ -13,7 +13,9 @@ pub struct RandomRouter { impl RandomRouter { pub fn new(seed: u64) -> Self { - Self { rng: ChaCha8Rng::seed_from_u64(seed) } + Self { + rng: ChaCha8Rng::seed_from_u64(seed), + } } } diff --git a/src/router/ttl_aware.rs b/src/router/ttl_aware.rs index bbdcec5..39a2973 100644 --- a/src/router/ttl_aware.rs +++ b/src/router/ttl_aware.rs @@ -1,6 +1,6 @@ use crate::cluster::meta_store::MetaStore; use crate::instance::Instance; -use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct TtlAwareRouter { @@ -22,18 +22,17 @@ impl Router for TtlAwareRouter { &mut self, req: &RequestRecord, instances: &[Instance], - meta: &MetaStore, - now: f64, + _meta: &MetaStore, + _now: f64, ) -> RouteDecision { let n = instances.len(); - let scores = meta.score_prefix(&req.hash_ids, now, n); + let scores = local_l0_scores(req, instances); let mut best = 0u32; let mut best_key = (i64::MIN, f64::INFINITY); // maximize prefix, then minimize load let mut candidates = Vec::with_capacity(n); for inst in instances { let p = scores[inst.id as usize]; - let load = inst.kv_blocks_used as f64 - + self.alpha * inst.queue_len() as f64; + let load = inst.kv_blocks_used as f64 + self.alpha * inst.queue_len() as f64; candidates.push(CandidateInfo { instance: inst.id, predicted_prefix: p, @@ -53,7 +52,7 @@ impl Router for TtlAwareRouter { chosen: best, probe_overhead_s: 0.0, candidates, - reason: "max meta_store prefix, tie -> least loaded", + reason: "max local L0 prefix, tie -> least loaded", } } } diff --git a/src/sim/engine.rs b/src/sim/engine.rs index ee0d548..7a515e6 100644 --- a/src/sim/engine.rs +++ b/src/sim/engine.rs @@ -56,7 +56,11 @@ impl EventQueue { pub fn schedule(&mut self, time: f64, event: Event) { let t = time.max(self.now); self.seq += 1; - self.heap.push(Slot { time: t, seq: self.seq, event }); + self.heap.push(Slot { + time: t, + seq: self.seq, + event, + }); } pub fn pop(&mut self) -> Option<(f64, Event)> { @@ -84,7 +88,12 @@ mod tests { #[test] fn pops_in_time_order() { let mut q = EventQueue::new(); - q.schedule(2.0, Event::BatchTick { instance: 0 as InstanceId }); + q.schedule( + 2.0, + Event::BatchTick { + instance: 0 as InstanceId, + }, + ); q.schedule(1.0, Event::BatchTick { instance: 1 }); q.schedule(1.5, Event::BatchTick { instance: 2 }); let (t1, _) = q.pop().unwrap(); diff --git a/src/trace.rs b/src/trace.rs index 236c0b5..6973b50 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -50,8 +50,7 @@ pub struct TraceReader { impl TraceReader { pub fn open>(path: P, max_requests: Option) -> Result { let path = path.as_ref(); - let f = File::open(path) - .with_context(|| format!("opening trace {}", path.display()))?; + let f = File::open(path).with_context(|| format!("opening trace {}", path.display()))?; Ok(Self { inner: BufReader::with_capacity(1 << 20, f), next_id: 0, diff --git a/tests/smoke.rs b/tests/smoke.rs index 1b9d20c..6e791e3 100644 --- a/tests/smoke.rs +++ b/tests/smoke.rs @@ -172,12 +172,14 @@ fn ablation_lru_preserves_ttft_fields() { RouterMode::Random, ); let online = driver::run(&cfg, Some("online_lru")).expect("online lru run"); - let out = driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru]) - .expect("ablate lru"); + let out = + driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru]) + .expect("ablate lru"); assert_eq!(out.len(), 1); let row = &out[0]; - let online_hit = online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote; + let online_hit = + online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote; let ablate_hit = row.hit_rate_l0 + row.hit_rate_l1 + row.hit_rate_remote; assert!( @@ -204,12 +206,9 @@ fn ablate_rejects_belady_until_exact_algorithm_exists() { RouterMode::Random, ); - let err = driver::ablate_fixed_placement( - &cfg, - &[RouterMode::Random], - &[ReplayEvictPolicy::Belady], - ) - .expect_err("belady should be rejected"); + let err = + driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Belady]) + .expect_err("belady should be rejected"); assert!( err.to_string().contains("exact belady"), "unexpected error: {err:#}"