diff --git a/src/cluster/cluster.rs b/src/cluster/cluster.rs index d7e5cbd..41a1e32 100644 --- a/src/cluster/cluster.rs +++ b/src/cluster/cluster.rs @@ -8,6 +8,7 @@ use crate::instance::kv_cache::L1Change; use crate::instance::Instance; use crate::router::{self, RouteDecision, Router}; use crate::trace::RequestRecord; +use crate::ttft::{classify_prefix_tiers, TtftModel}; use crate::types::InstanceId; #[derive(Debug, Clone)] @@ -31,13 +32,19 @@ pub struct Cluster { pub router: Box, pub block_size_tokens: u32, pub kv_block_bytes: u64, + pub ttft_model: TtftModel, } impl Cluster { pub fn new(config: &Config, model: &ModelConfig) -> Self { let mut instances = Vec::with_capacity(config.cluster.num_instances as usize); for id in 0..config.cluster.num_instances { - instances.push(Instance::new(id as InstanceId, model, &config.hardware)); + instances.push(Instance::new( + id as InstanceId, + model, + &config.hardware, + &config.calibration, + )); } let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds); let router = router::build(config, config.sim.seed); @@ -47,6 +54,7 @@ impl Cluster { router, block_size_tokens: model.block_size_tokens, kv_block_bytes: model.kv_block_bytes(), + ttft_model: TtftModel::new(&config.hardware, &config.calibration, model.kv_block_bytes()), } } @@ -59,24 +67,29 @@ impl Cluster { .route(req, &self.instances, &self.meta_store, now); let inst_id = decision.chosen; let probe_overhead_s = decision.probe_overhead_s; + let scheduler_overhead_s = self + .ttft_model + .scheduler_overhead_s(self.instances.len(), 3); - // The router probe overhead delays the request's effective start time. - let effective_now = now + probe_overhead_s; + // The router probe overhead and scheduler work delay the request's + // effective start time. + let effective_now = now + probe_overhead_s + scheduler_overhead_s; let inst = &mut self.instances[inst_id as usize]; + let residency = classify_prefix_tiers(&req.hash_ids, inst, &self.meta_store, now); let total_blocks = req.hash_ids.len() as u32; + let l0_hits = residency.l0_hit_blocks; + let l1_hits = residency.l1_hit_blocks; + let remote_hit_blocks = residency.remote_hit_blocks; - // 1. L0 lookup (touches matched blocks). - let l0_hits = inst.cache.l0.longest_prefix(&req.hash_ids) as u32; - - // 2. L1 lookup on the remaining suffix. + // 1. L1 lookup on the remaining suffix. let suffix_after_l0 = &req.hash_ids[l0_hits as usize..]; - let l1_hits = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32; // L1->L0 transfer cost let l1_bytes = (l1_hits as u64) * self.kv_block_bytes; let mut t = effective_now; let mut l1_changes = Vec::new(); if l1_hits > 0 { + t += self.ttft_model.local_l1_prepare_time_s(l1_hits) - inst.links.pcie.cost(l1_bytes); t = inst.links.pcie.reserve(t, l1_bytes); l1_changes = inst .cache @@ -86,23 +99,14 @@ impl Cluster { // 3. Remote v6d lookup for the still-remaining suffix. let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..]; - let mut remote_hit_blocks: u32 = 0; - for &h in suffix_after_l1 { - // A block is remotely available iff some instance other than - // `inst_id` lists it (and not expired). - let owners = self.meta_store.instances_for(h, now); - let any_remote = owners.iter().any(|o| *o != inst_id); - if any_remote { - remote_hit_blocks += 1; - } else { - break; // contiguous prefix - stop on first miss - } - } let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes; if remote_hit_blocks > 0 { let pulled = &suffix_after_l1[..remote_hit_blocks as usize]; let l1_changes = { let inst = &mut self.instances[inst_id as usize]; + t += self.ttft_model.remote_prepare_time_s(remote_hit_blocks) + - inst.links.rdma.cost(remote_bytes) + - inst.links.pcie.cost(remote_bytes); t = inst.links.rdma.reserve(t, remote_bytes); t = inst.links.pcie.reserve(t, remote_bytes); inst.cache.fetch_remote_blocks_to_l0(pulled) @@ -134,6 +138,7 @@ impl Cluster { ready_at: t, prefill_tokens_remaining: miss_tokens, reserved_blocks, + completion_tail_s: self.ttft_model.first_token_tail_s(), }; let inst = &mut self.instances[inst_id as usize]; inst.admit(admitted); @@ -170,3 +175,100 @@ impl Cluster { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{ + CalibrationConfig, ClusterConfig, Config, HardwareConfig, MetaStoreConfig, ModelConfig, + RouterConfig, RouterMode, SimConfig, + }; + use crate::trace::RequestRecord; + + fn test_config(mode: RouterMode) -> Config { + Config { + model: ModelConfig { + name: "test".into(), + num_layers: 4, + num_kv_heads: 2, + head_dim: 64, + dtype_bytes: 2, + block_size_tokens: 16, + flops_per_token_prefill: Some(1.0e9), + attn_quadratic_coeff: Some(64.0), + ..Default::default() + }, + hardware: HardwareConfig { + gpu_flops: 1.0e14, + gpu_fp8_flops: 0.0, + gpu_fp4_flops: 0.0, + gpu_mem_bw: 1.0e12, + hbm_bytes: 1.0e9, + dram_bytes: 4.0e9, + host_dram_bw: 5.0e11, + pcie_bw: 32.0e9, + pcie_latency_us: 1.0, + rdma_bw: 12.0e9, + rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, + max_batch_slots: 32, + prefill_chunk_tokens: 1024, + }, + calibration: CalibrationConfig { + dram_access_latency_us: 25.0, + layout_transform_fixed_us: 7.0, + ..CalibrationConfig::default() + }, + cluster: ClusterConfig { + num_instances: 1, + meta_store: MetaStoreConfig { + ttl_seconds: 1000.0, + }, + router: RouterConfig { + mode, + precise_probe_latency_us: 10.0, + precise_probe_topk: 2, + load_alpha: 0.0, + score_alpha: 0.0, + score_beta: 1.0, + prefix_k: 8, + affinity_fan_out: 2, + }, + }, + sim: SimConfig { + trace_path: String::new(), + max_requests: None, + output_dir: String::new(), + sample_interval_s: 0.0, + seed: 7, + }, + } + } + + #[test] + fn l1_ready_at_includes_dram_and_transform_overhead() { + let cfg = test_config(RouterMode::EstimatedTtft); + let mut cluster = Cluster::new(&cfg, &cfg.model); + let req = RequestRecord { + req_id: 1, + chat_id: 0, + arrival: 0.0, + input_len: 32, + output_len: 16, + hash_ids: vec![10, 11], + }; + + let mut evicted = Vec::new(); + cluster.instances[0] + .cache + .l1 + .insert_blocks(&req.hash_ids, &mut evicted); + + let stats = cluster.route_and_admit(&req, 0.0); + let pure_pcie = cluster.instances[0].links.pcie.cost(cluster.kv_block_bytes * 2); + + assert!(stats.ready_at > pure_pcie); + } +} diff --git a/src/config.rs b/src/config.rs index 057bfa1..e94de15 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,6 +19,8 @@ use std::path::Path; pub struct Config { pub model: ModelConfig, pub hardware: HardwareConfig, + #[serde(default)] + pub calibration: CalibrationConfig, pub cluster: ClusterConfig, pub sim: SimConfig, } @@ -178,16 +180,36 @@ pub struct HardwareConfig { pub gpu_mem_bw: f64, pub hbm_bytes: f64, pub dram_bytes: f64, + #[serde(default = "default_host_dram_bw")] + pub host_dram_bw: f64, pub pcie_bw: f64, pub pcie_latency_us: f64, pub rdma_bw: f64, pub rdma_latency_us: f64, + #[serde(default = "default_intra_node_tp_bw")] + pub intra_node_tp_bw: f64, + #[serde(default = "default_intra_node_tp_latency_us")] + pub intra_node_tp_latency_us: f64, + #[serde(default = "default_tp_degree")] + pub tp_degree: u32, #[serde(default = "default_max_batch_slots")] pub max_batch_slots: u32, #[serde(default = "default_prefill_chunk_tokens")] pub prefill_chunk_tokens: u32, } +fn default_host_dram_bw() -> f64 { + 5.0e11 +} +fn default_intra_node_tp_bw() -> f64 { + 9.0e11 +} +fn default_intra_node_tp_latency_us() -> f64 { + 2.0 +} +fn default_tp_degree() -> u32 { + 1 +} fn default_max_batch_slots() -> u32 { 256 } @@ -195,6 +217,144 @@ fn default_prefill_chunk_tokens() -> u32 { 2048 } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CalibrationConfig { + #[serde(default = "default_matmul_util")] + pub matmul_util: f64, + #[serde(default = "default_attention_util")] + pub attention_util: f64, + #[serde(default = "default_hbm_bw_util")] + pub hbm_bw_util: f64, + #[serde(default = "default_dram_bw_util")] + pub dram_bw_util: f64, + #[serde(default = "default_pcie_bw_util")] + pub pcie_bw_util: f64, + #[serde(default = "default_rdma_bw_util")] + pub rdma_bw_util: f64, + #[serde(default = "default_tp_bw_util")] + pub tp_bw_util: f64, + #[serde(default = "default_tp_overlap_ratio")] + pub tp_overlap_ratio: f64, + #[serde(default = "default_tp_collective_count_per_layer")] + pub tp_collective_count_per_layer: f64, + #[serde(default = "default_kv_overlap_ratio")] + pub kv_overlap_ratio: f64, + #[serde(default = "default_scheduler_base_overhead_us")] + pub scheduler_base_overhead_us: f64, + #[serde(default = "default_scheduler_per_candidate_us")] + pub scheduler_per_candidate_us: f64, + #[serde(default = "default_cache_probe_us_per_tier")] + pub cache_probe_us_per_tier: f64, + #[serde(default = "default_batch_pack_overhead_us")] + pub batch_pack_overhead_us: f64, + #[serde(default = "default_dram_access_latency_us")] + pub dram_access_latency_us: f64, + #[serde(default = "default_remote_metadata_us")] + pub remote_metadata_us: f64, + #[serde(default = "default_layout_transform_fixed_us")] + pub layout_transform_fixed_us: f64, + #[serde(default = "default_misc_layer_overhead_us")] + pub misc_layer_overhead_us: f64, + #[serde(default = "default_chunk_launch_overhead_us")] + pub chunk_launch_overhead_us: f64, + #[serde(default = "default_final_sync_us")] + pub final_sync_us: f64, + #[serde(default = "default_first_token_ready_us")] + pub first_token_ready_us: f64, +} + +impl Default for CalibrationConfig { + fn default() -> Self { + Self { + matmul_util: default_matmul_util(), + attention_util: default_attention_util(), + hbm_bw_util: default_hbm_bw_util(), + dram_bw_util: default_dram_bw_util(), + pcie_bw_util: default_pcie_bw_util(), + rdma_bw_util: default_rdma_bw_util(), + tp_bw_util: default_tp_bw_util(), + tp_overlap_ratio: default_tp_overlap_ratio(), + tp_collective_count_per_layer: default_tp_collective_count_per_layer(), + kv_overlap_ratio: default_kv_overlap_ratio(), + scheduler_base_overhead_us: default_scheduler_base_overhead_us(), + scheduler_per_candidate_us: default_scheduler_per_candidate_us(), + cache_probe_us_per_tier: default_cache_probe_us_per_tier(), + batch_pack_overhead_us: default_batch_pack_overhead_us(), + dram_access_latency_us: default_dram_access_latency_us(), + remote_metadata_us: default_remote_metadata_us(), + layout_transform_fixed_us: default_layout_transform_fixed_us(), + misc_layer_overhead_us: default_misc_layer_overhead_us(), + chunk_launch_overhead_us: default_chunk_launch_overhead_us(), + final_sync_us: default_final_sync_us(), + first_token_ready_us: default_first_token_ready_us(), + } + } +} + +fn default_matmul_util() -> f64 { + 0.4 +} +fn default_attention_util() -> f64 { + 0.3 +} +fn default_hbm_bw_util() -> f64 { + 0.75 +} +fn default_dram_bw_util() -> f64 { + 0.65 +} +fn default_pcie_bw_util() -> f64 { + 0.8 +} +fn default_rdma_bw_util() -> f64 { + 0.7 +} +fn default_tp_bw_util() -> f64 { + 0.8 +} +fn default_tp_overlap_ratio() -> f64 { + 0.5 +} +fn default_tp_collective_count_per_layer() -> f64 { + 2.0 +} +fn default_kv_overlap_ratio() -> f64 { + 0.0 +} +fn default_scheduler_base_overhead_us() -> f64 { + 50.0 +} +fn default_scheduler_per_candidate_us() -> f64 { + 5.0 +} +fn default_cache_probe_us_per_tier() -> f64 { + 3.0 +} +fn default_batch_pack_overhead_us() -> f64 { + 10.0 +} +fn default_dram_access_latency_us() -> f64 { + 2.0 +} +fn default_remote_metadata_us() -> f64 { + 8.0 +} +fn default_layout_transform_fixed_us() -> f64 { + 4.0 +} +fn default_misc_layer_overhead_us() -> f64 { + 0.5 +} +fn default_chunk_launch_overhead_us() -> f64 { + 8.0 +} +fn default_final_sync_us() -> f64 { + 5.0 +} +fn default_first_token_ready_us() -> f64 { + 10.0 +} + // --------------------------------------------------------------------------- // Cluster // --------------------------------------------------------------------------- @@ -265,7 +425,11 @@ pub enum RouterMode { Precise, MinPd, CacheLoad, + CacheAffinity, + CacheAffinityWeakRend, + CacheAffinityStrongOnly, CacheScore, + CacheScoreTtl, EstimatedTtft, PrefixAffinity, } @@ -281,7 +445,11 @@ impl RouterMode { "precise" | "precise_aware" => Ok(Self::Precise), "min_pd" | "minpd" | "pd" => Ok(Self::MinPd), "cache_load" | "cl" => Ok(Self::CacheLoad), + "cache_affinity" | "caff" | "ca" => Ok(Self::CacheAffinity), + "cache_affinity_weak_rend" | "caff_weak" => Ok(Self::CacheAffinityWeakRend), + "cache_affinity_strong_only" | "caff_strong" => Ok(Self::CacheAffinityStrongOnly), "cache_score" | "cs" => Ok(Self::CacheScore), + "cache_score_ttl" | "csttl" | "cs_ttl" => Ok(Self::CacheScoreTtl), "estimated_ttft" | "ettft" | "optimal" => Ok(Self::EstimatedTtft), "prefix_affinity" | "affinity" | "pa" => Ok(Self::PrefixAffinity), other => Err(anyhow::anyhow!("unknown router mode: {other}")), @@ -298,7 +466,11 @@ impl RouterMode { Self::Precise => "precise", Self::MinPd => "min_pd", Self::CacheLoad => "cache_load", + Self::CacheAffinity => "cache_affinity", + Self::CacheAffinityWeakRend => "cache_affinity_weak_rend", + Self::CacheAffinityStrongOnly => "cache_affinity_strong_only", Self::CacheScore => "cache_score", + Self::CacheScoreTtl => "cache_score_ttl", Self::EstimatedTtft => "estimated_ttft", Self::PrefixAffinity => "prefix_affinity", } @@ -352,6 +524,8 @@ impl Config { struct RawConfig { model: RawModelConfig, hardware: RawHardwareConfig, + #[serde(default)] + calibration: CalibrationConfig, cluster: ClusterConfig, sim: SimConfig, } @@ -419,6 +593,8 @@ struct RawHardwareConfig { #[serde(default)] dram_bytes: Option, #[serde(default)] + host_dram_bw: Option, + #[serde(default)] pcie_bw: Option, #[serde(default)] pcie_latency_us: Option, @@ -427,6 +603,12 @@ struct RawHardwareConfig { #[serde(default)] rdma_latency_us: Option, #[serde(default)] + intra_node_tp_bw: Option, + #[serde(default)] + intra_node_tp_latency_us: Option, + #[serde(default)] + tp_degree: Option, + #[serde(default)] max_batch_slots: Option, #[serde(default)] prefill_chunk_tokens: Option, @@ -457,6 +639,7 @@ impl RawConfig { Ok(Config { model, hardware, + calibration: self.calibration, cluster: self.cluster, sim: self.sim, }) @@ -565,10 +748,14 @@ impl RawHardwareConfig { gpu_mem_bw: 0.0, hbm_bytes: 0.0, dram_bytes: 0.0, + host_dram_bw: default_host_dram_bw(), pcie_bw: 0.0, pcie_latency_us: 5.0, rdma_bw: 0.0, rdma_latency_us: 8.0, + intra_node_tp_bw: default_intra_node_tp_bw(), + intra_node_tp_latency_us: default_intra_node_tp_latency_us(), + tp_degree: default_tp_degree(), max_batch_slots: default_max_batch_slots(), prefill_chunk_tokens: default_prefill_chunk_tokens(), } @@ -587,6 +774,9 @@ impl RawHardwareConfig { if let Some(v) = self.dram_bytes { hw.dram_bytes = v; } + if let Some(v) = self.host_dram_bw { + hw.host_dram_bw = v; + } if let Some(v) = self.pcie_bw { hw.pcie_bw = v; } @@ -599,6 +789,15 @@ impl RawHardwareConfig { if let Some(v) = self.rdma_latency_us { hw.rdma_latency_us = v; } + if let Some(v) = self.intra_node_tp_bw { + hw.intra_node_tp_bw = v; + } + if let Some(v) = self.intra_node_tp_latency_us { + hw.intra_node_tp_latency_us = v; + } + if let Some(v) = self.tp_degree { + hw.tp_degree = v; + } if let Some(v) = self.max_batch_slots { hw.max_batch_slots = v; } @@ -614,3 +813,104 @@ impl RawHardwareConfig { Ok(hw) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicU64, Ordering}; + + fn write_temp_config(yaml: &str) -> std::path::PathBuf { + static NEXT_ID: AtomicU64 = AtomicU64::new(0); + let path = std::env::temp_dir().join(format!( + "kvcache_sim_config_test_{}_{}.yaml", + std::process::id(), + NEXT_ID.fetch_add(1, Ordering::Relaxed) + )); + std::fs::write(&path, yaml).unwrap(); + path + } + + #[test] + fn config_defaults_ttft_calibration_fields() { + let path = write_temp_config( + r#" +model: + name: test + num_layers: 4 + num_kv_heads: 2 + head_dim: 64 + dtype_bytes: 2 + block_size_tokens: 16 + flops_per_token_prefill: 1.0e9 + attn_quadratic_coeff: 64.0 +hardware: + gpu_flops: 1.0e14 + gpu_mem_bw: 1.0e12 + hbm_bytes: 1.0e9 +cluster: + num_instances: 2 + meta_store: + ttl_seconds: 10.0 + router: + mode: estimated_ttft +sim: + trace_path: trace.jsonl + output_dir: runs/test +"#, + ); + + let cfg = Config::from_yaml_path(&path).unwrap(); + assert!(cfg.hardware.host_dram_bw > 0.0); + assert!(cfg.hardware.intra_node_tp_bw > 0.0); + assert!(cfg.calibration.matmul_util > 0.0); + assert!(cfg.calibration.attention_util > 0.0); + assert!(cfg.calibration.pcie_bw_util > 0.0); + } + + #[test] + fn config_overrides_ttft_calibration_fields() { + let path = write_temp_config( + r#" +model: + name: test + num_layers: 4 + num_kv_heads: 2 + head_dim: 64 + dtype_bytes: 2 + block_size_tokens: 16 + flops_per_token_prefill: 1.0e9 + attn_quadratic_coeff: 64.0 +hardware: + gpu_flops: 1.0e14 + gpu_mem_bw: 1.0e12 + hbm_bytes: 1.0e9 + host_dram_bw: 9.9e11 + intra_node_tp_bw: 7.7e11 + intra_node_tp_latency_us: 2.5 +calibration: + matmul_util: 0.31 + attention_util: 0.27 + rdma_bw_util: 0.63 + scheduler_base_overhead_us: 123.0 +cluster: + num_instances: 2 + meta_store: + ttl_seconds: 10.0 + router: + mode: estimated_ttft +sim: + trace_path: trace.jsonl + output_dir: runs/test +"#, + ); + + let cfg = Config::from_yaml_path(&path).unwrap(); + assert_eq!(cfg.hardware.host_dram_bw, 9.9e11); + assert_eq!(cfg.hardware.intra_node_tp_bw, 7.7e11); + assert_eq!(cfg.hardware.intra_node_tp_latency_us, 2.5); + assert!((cfg.calibration.matmul_util - 0.31).abs() < 1e-12); + assert!((cfg.calibration.attention_util - 0.27).abs() < 1e-12); + assert!((cfg.calibration.rdma_bw_util - 0.63).abs() < 1e-12); + assert!((cfg.calibration.scheduler_base_overhead_us - 123.0).abs() < 1e-12); + } +} diff --git a/src/hardware_presets.rs b/src/hardware_presets.rs index 2303561..783c99d 100644 --- a/src/hardware_presets.rs +++ b/src/hardware_presets.rs @@ -188,10 +188,14 @@ fn make_config(n: u32, base: &GpuBase) -> HardwareConfig { gpu_mem_bw: base.mem_bw * f, hbm_bytes: base.hbm * f, dram_bytes: dram, + host_dram_bw: if n >= 8 { 9.0e11 } else { 5.0e11 }, pcie_bw: pcie_per_gpu * f, pcie_latency_us: pcie_lat, rdma_bw: rdma_base * rdma_scale, rdma_latency_us: rdma_lat, + intra_node_tp_bw: if base.pcie_gen >= 6 { 1.8e12 * f } else { 9.0e11 * f }, + intra_node_tp_latency_us: if base.pcie_gen >= 6 { 1.0 } else { 2.0 }, + tp_degree: n, max_batch_slots: 256, prefill_chunk_tokens: if n >= 4 { 4096 } else { 2048 }, } diff --git a/src/instance/compute.rs b/src/instance/compute.rs index ac7ee6c..fd03d15 100644 --- a/src/instance/compute.rs +++ b/src/instance/compute.rs @@ -22,7 +22,7 @@ //! `effective_ctx(N)` equals `N` for dense attention (→ O(N²) total) but //! is sub-linear for DSA / sliding-window. -use crate::config::{AttentionConfig, HardwareConfig, ModelConfig}; +use crate::config::{AttentionConfig, CalibrationConfig, HardwareConfig, ModelConfig}; /// Resolved attention pattern used at runtime. #[derive(Debug, Clone)] @@ -55,24 +55,46 @@ pub struct ComputeModel { pub attn_pattern: AttentionPattern, /// Weight bytes read from HBM per layer (for memory-bound check). pub weight_bytes_per_layer: f64, + /// Approximate bytes moved by each TP collective, per token per layer. + pub tp_bytes_per_token: f64, + /// Number of TP collectives per layer on the critical path. + pub tp_collective_count_per_layer: f64, /// Peak GPU FLOPs (aggregate across TP group). pub gpu_flops: f64, /// Peak GPU memory bandwidth (aggregate across TP group). pub gpu_mem_bw: f64, + /// Peak node-local TP bandwidth. + pub intra_node_tp_bw: f64, + /// Fixed latency per TP collective. + pub intra_node_tp_latency_s: f64, + /// Effective utilization for GEMM-like linear kernels. + pub matmul_util: f64, + /// Effective utilization for attention kernels. + pub attention_util: f64, + /// Effective utilization for HBM streaming. + pub hbm_bw_util: f64, + /// Effective utilization for TP bandwidth. + pub tp_bw_util: f64, + /// Fraction of TP communication that can overlap with compute. + pub tp_overlap_ratio: f64, + /// Fixed per-layer non-FLOP overhead. + pub misc_layer_overhead_s: f64, + /// Fixed launch overhead per prefill chunk. + pub chunk_launch_overhead_s: f64, } impl ComputeModel { - pub fn new(model: &ModelConfig, hw: &HardwareConfig) -> Self { + pub fn new(model: &ModelConfig, hw: &HardwareConfig, calib: &CalibrationConfig) -> Self { if model.is_arch_mode() { - Self::from_arch(model, hw) + Self::from_arch(model, hw, calib) } else { - Self::from_manual(model, hw) + Self::from_manual(model, hw, calib) } } // ----- Architecture-derived construction -------------------------------- - fn from_arch(model: &ModelConfig, hw: &HardwareConfig) -> Self { + fn from_arch(model: &ModelConfig, hw: &HardwareConfig, calib: &CalibrationConfig) -> Self { let h = model.hidden_size.unwrap() as f64; let n_heads = model.num_attention_heads.unwrap_or(model.num_kv_heads) as f64; let n_kv = model.num_kv_heads as f64; @@ -115,6 +137,11 @@ impl ComputeModel { }; let linear_flops = attn_linear + mlp; + let tp_bytes_per_token = if hw.tp_degree > 1 { + h * model.dtype_bytes as f64 + } else { + 0.0 + }; // --- Attention quadratic coefficient --- // attn_flops_per_layer(N) = attn_coeff * N * effective_ctx(N) @@ -183,14 +210,29 @@ impl ComputeModel { attn_coeff, attn_pattern, weight_bytes_per_layer: weight_bytes, + tp_bytes_per_token, + tp_collective_count_per_layer: if hw.tp_degree > 1 { + calib.tp_collective_count_per_layer + } else { + 0.0 + }, gpu_flops: hw.gpu_flops, gpu_mem_bw: hw.gpu_mem_bw, + intra_node_tp_bw: hw.intra_node_tp_bw, + intra_node_tp_latency_s: hw.intra_node_tp_latency_us * 1e-6, + matmul_util: calib.matmul_util, + attention_util: calib.attention_util, + hbm_bw_util: calib.hbm_bw_util, + tp_bw_util: calib.tp_bw_util, + tp_overlap_ratio: calib.tp_overlap_ratio, + misc_layer_overhead_s: calib.misc_layer_overhead_us * 1e-6, + chunk_launch_overhead_s: calib.chunk_launch_overhead_us * 1e-6, } } // ----- Legacy manual construction --------------------------------------- - fn from_manual(model: &ModelConfig, hw: &HardwareConfig) -> Self { + fn from_manual(model: &ModelConfig, hw: &HardwareConfig, calib: &CalibrationConfig) -> Self { Self { num_layers: model.num_layers as f64, first_dense_layers: model.num_layers as f64, @@ -198,8 +240,19 @@ impl ComputeModel { attn_coeff: model.attn_quadratic_coeff.unwrap_or(0.0), attn_pattern: AttentionPattern::Dense, weight_bytes_per_layer: 0.0, + tp_bytes_per_token: 0.0, + tp_collective_count_per_layer: 0.0, gpu_flops: hw.gpu_flops, gpu_mem_bw: hw.gpu_mem_bw, + intra_node_tp_bw: hw.intra_node_tp_bw, + intra_node_tp_latency_s: hw.intra_node_tp_latency_us * 1e-6, + matmul_util: calib.matmul_util, + attention_util: calib.attention_util, + hbm_bw_util: calib.hbm_bw_util, + tp_bw_util: calib.tp_bw_util, + tp_overlap_ratio: calib.tp_overlap_ratio, + misc_layer_overhead_s: calib.misc_layer_overhead_us * 1e-6, + chunk_launch_overhead_s: calib.chunk_launch_overhead_us * 1e-6, } } @@ -232,23 +285,38 @@ impl ComputeModel { return 0.0; } let n = n as f64; - let linear = n * self.linear_flops_per_token; + let linear_flops = n * self.linear_flops_per_token; // Compute FLOPs across all layers (dense + sparse may differ). let dense_layers = self.first_dense_layers; let sparse_layers = self.num_layers - dense_layers; - let dense_flops = - dense_layers * (linear + self.attn_coeff * n * self.effective_ctx(n, true)); - let sparse_flops = - sparse_layers * (linear + self.attn_coeff * n * self.effective_ctx(n, false)); - let total_flops = dense_flops + sparse_flops; + let linear_total_flops = self.num_layers * linear_flops; + let dense_attn_flops = dense_layers * (self.attn_coeff * n * self.effective_ctx(n, true)); + let sparse_attn_flops = + sparse_layers * (self.attn_coeff * n * self.effective_ctx(n, false)); + let attn_total_flops = dense_attn_flops + sparse_attn_flops; - let compute_time = total_flops / self.gpu_flops; + let linear_time = linear_total_flops / (self.gpu_flops * self.matmul_util.max(1e-6)); + let attn_time = attn_total_flops / (self.gpu_flops * self.attention_util.max(1e-6)); + let compute_time = linear_time + attn_time + self.num_layers * self.misc_layer_overhead_s; // Weight stream: all layers' active weights read once from HBM. - let mem_time = self.weight_bytes_per_layer * self.num_layers / self.gpu_mem_bw; + let mem_time = + self.weight_bytes_per_layer * self.num_layers / (self.gpu_mem_bw * self.hbm_bw_util.max(1e-6)); + let tp_comm_time = if self.tp_collective_count_per_layer > 0.0 + && self.tp_bytes_per_token > 0.0 + && self.intra_node_tp_bw > 0.0 + { + self.num_layers + * (self.tp_collective_count_per_layer * self.intra_node_tp_latency_s + + self.tp_collective_count_per_layer * self.tp_bytes_per_token * n + / (self.intra_node_tp_bw * self.tp_bw_util.max(1e-6))) + } else { + 0.0 + }; + let tp_tail = (tp_comm_time - self.tp_overlap_ratio * (linear_time + attn_time)).max(0.0); - compute_time.max(mem_time) + self.chunk_launch_overhead_s + compute_time.max(mem_time) + tp_tail } /// Print human-readable derived coefficients (for `validate` output). @@ -277,6 +345,7 @@ impl ComputeModel { #[cfg(test)] mod tests { use super::*; + use crate::config::CalibrationConfig; fn cm_legacy() -> ComputeModel { ComputeModel { @@ -286,8 +355,19 @@ mod tests { attn_coeff: 1024.0, attn_pattern: AttentionPattern::Dense, weight_bytes_per_layer: 0.0, + tp_bytes_per_token: 0.0, + tp_collective_count_per_layer: 0.0, gpu_flops: 9.89e14, gpu_mem_bw: 3.35e12, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_s: 2.0e-6, + matmul_util: 1.0, + attention_util: 1.0, + hbm_bw_util: 1.0, + tp_bw_util: 1.0, + tp_overlap_ratio: 1.0, + misc_layer_overhead_s: 0.0, + chunk_launch_overhead_s: 0.0, } } @@ -327,8 +407,19 @@ mod tests { attn_coeff: 139264.0, attn_pattern: AttentionPattern::Dense, weight_bytes_per_layer: 0.0, + tp_bytes_per_token: 0.0, + tp_collective_count_per_layer: 0.0, gpu_flops: 1.8e16, gpu_mem_bw: 6.4e13, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_s: 2.0e-6, + matmul_util: 1.0, + attention_util: 1.0, + hbm_bw_util: 1.0, + tp_bw_util: 1.0, + tp_overlap_ratio: 1.0, + misc_layer_overhead_s: 0.0, + chunk_launch_overhead_s: 0.0, }; let dsa = ComputeModel { attn_pattern: AttentionPattern::Dsa { @@ -360,8 +451,19 @@ mod tests { attn_coeff: 1.0, attn_pattern: AttentionPattern::Dense, weight_bytes_per_layer: 1.0e12, // 1 TB per layer + tp_bytes_per_token: 0.0, + tp_collective_count_per_layer: 0.0, gpu_flops: 1.0e15, gpu_mem_bw: 1.0e12, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_s: 2.0e-6, + matmul_util: 1.0, + attention_util: 1.0, + hbm_bw_util: 1.0, + tp_bw_util: 1.0, + tp_overlap_ratio: 1.0, + misc_layer_overhead_s: 0.0, + chunk_launch_overhead_s: 0.0, }; let t1 = m.prefill_time(1); let t8 = m.prefill_time(8); @@ -393,18 +495,122 @@ mod tests { gpu_mem_bw: 1e12, hbm_bytes: 1e9, dram_bytes: 4e9, + host_dram_bw: 5.0e11, pcie_bw: 32e9, pcie_latency_us: 1.0, rdma_bw: 12e9, rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, max_batch_slots: 32, prefill_chunk_tokens: 1024, }; - let cm = ComputeModel::new(&model, &hw); + let cm = ComputeModel::new(&model, &hw, &CalibrationConfig::default()); assert!(cm.linear_flops_per_token > 0.0); assert!(cm.attn_coeff > 0.0); assert!(cm.weight_bytes_per_layer > 0.0); let t = cm.prefill_time(1024); assert!(t > 0.0); } + + #[test] + fn lower_utilization_increases_prefill_time() { + let model = ModelConfig { + name: "test".into(), + num_layers: 8, + num_kv_heads: 4, + head_dim: 128, + dtype_bytes: 2, + block_size_tokens: 16, + hidden_size: Some(1024), + num_attention_heads: Some(8), + intermediate_size: Some(4096), + ..Default::default() + }; + let hw = HardwareConfig { + gpu_flops: 1e14, + gpu_fp8_flops: 0.0, + gpu_fp4_flops: 0.0, + gpu_mem_bw: 1e12, + hbm_bytes: 1e9, + dram_bytes: 4e9, + host_dram_bw: 5.0e11, + pcie_bw: 32e9, + pcie_latency_us: 1.0, + rdma_bw: 12e9, + rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, + max_batch_slots: 32, + prefill_chunk_tokens: 1024, + }; + let fast = ComputeModel::new(&model, &hw, &CalibrationConfig::default()); + let slow = ComputeModel::new( + &model, + &hw, + &CalibrationConfig { + matmul_util: 0.2, + attention_util: 0.15, + ..CalibrationConfig::default() + }, + ); + + assert!(slow.prefill_time(4096) > fast.prefill_time(4096)); + } + + #[test] + fn tp_communication_adds_tail_when_overlap_is_limited() { + let model = ModelConfig { + name: "test".into(), + num_layers: 8, + num_kv_heads: 4, + head_dim: 128, + dtype_bytes: 2, + block_size_tokens: 16, + hidden_size: Some(2048), + num_attention_heads: Some(16), + intermediate_size: Some(8192), + ..Default::default() + }; + let hw = HardwareConfig { + gpu_flops: 1e14, + gpu_fp8_flops: 0.0, + gpu_fp4_flops: 0.0, + gpu_mem_bw: 1e12, + hbm_bytes: 1e9, + dram_bytes: 4e9, + host_dram_bw: 5.0e11, + pcie_bw: 32e9, + pcie_latency_us: 1.0, + rdma_bw: 12e9, + rdma_latency_us: 5.0, + intra_node_tp_bw: 1.0e10, + intra_node_tp_latency_us: 20.0, + tp_degree: 8, + max_batch_slots: 32, + prefill_chunk_tokens: 1024, + }; + let no_tp = ComputeModel::new( + &model, + &hw, + &CalibrationConfig { + tp_overlap_ratio: 1.0, + tp_bw_util: 1.0, + ..CalibrationConfig::default() + }, + ); + let tp_tail = ComputeModel::new( + &model, + &hw, + &CalibrationConfig { + tp_overlap_ratio: 0.0, + tp_bw_util: 0.2, + ..CalibrationConfig::default() + }, + ); + + assert!(tp_tail.prefill_time(2048) > no_tp.prefill_time(2048)); + } } diff --git a/src/instance/instance.rs b/src/instance/instance.rs index c694d3c..54acb94 100644 --- a/src/instance/instance.rs +++ b/src/instance/instance.rs @@ -19,7 +19,7 @@ use std::collections::VecDeque; -use crate::config::{HardwareConfig, ModelConfig}; +use crate::config::{CalibrationConfig, HardwareConfig, ModelConfig}; use crate::instance::compute::ComputeModel; use crate::instance::kv_cache::TwoTierCache; use crate::network::InstanceLinks; @@ -37,6 +37,8 @@ pub struct AdmittedRequest { /// KV blocks reserved on this instance's HBM for the lifetime of this /// request's prefill (= number of input blocks). pub reserved_blocks: u32, + /// Tail latency between prefill completion and first-token visibility. + pub completion_tail_s: f64, } #[derive(Debug)] @@ -68,15 +70,20 @@ pub struct Instance { } impl Instance { - pub fn new(id: InstanceId, model: &ModelConfig, hw: &HardwareConfig) -> Self { + pub fn new( + id: InstanceId, + model: &ModelConfig, + hw: &HardwareConfig, + calib: &CalibrationConfig, + ) -> Self { let block_bytes = model.kv_block_bytes() as f64; let hbm_blocks = (hw.hbm_bytes / block_bytes).max(1.0) as u32; let dram_blocks = (hw.dram_bytes / block_bytes).max(1.0) as u32; Self { id, cache: TwoTierCache::new(hbm_blocks as usize, dram_blocks as usize), - links: InstanceLinks::from_hw(hw), - compute: ComputeModel::new(model, hw), + links: InstanceLinks::from_hw(hw, calib), + compute: ComputeModel::new(model, hw, calib), block_size_tokens: model.block_size_tokens, hbm_block_budget: hbm_blocks, dram_block_budget: dram_blocks, @@ -137,16 +144,17 @@ impl Instance { if self.kv_blocks_used + front.reserved_blocks > self.hbm_block_budget { break; } - let r = self.pending.pop_front().unwrap(); - self.kv_blocks_used += r.reserved_blocks; - if r.prefill_tokens_remaining == 0 { - // Full cache hit: nothing to compute. TTFT == fetch time. - let ttft = now - r.arrival; - self.kv_blocks_used = self.kv_blocks_used.saturating_sub(r.reserved_blocks); - completed.push((r.req_id, ttft, now)); - } else { - self.prefilling.push_back(r); - } + let r = self.pending.pop_front().unwrap(); + self.kv_blocks_used += r.reserved_blocks; + if r.prefill_tokens_remaining == 0 { + // Full cache hit: nothing to compute. TTFT == fetch time. + let t_done = now + r.completion_tail_s; + let ttft = t_done - r.arrival; + self.kv_blocks_used = self.kv_blocks_used.saturating_sub(r.reserved_blocks); + completed.push((r.req_id, ttft, t_done)); + } else { + self.prefilling.push_back(r); + } } // 2. Run one chunked-prefill step on the head of `prefilling`. @@ -171,9 +179,10 @@ impl Instance { head.prefill_tokens_remaining -= chunk_tokens; if head.prefill_tokens_remaining == 0 { let done = self.prefilling.pop_front().unwrap(); - let ttft = t_end - done.arrival; + let t_done = t_end + done.completion_tail_s; + let ttft = t_done - done.arrival; self.kv_blocks_used = self.kv_blocks_used.saturating_sub(done.reserved_blocks); - completed.push((done.req_id, ttft, t_end)); + completed.push((done.req_id, ttft, t_done)); } StepResult { diff --git a/src/lib.rs b/src/lib.rs index 1188251..6245674 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,5 +10,6 @@ pub mod oracle; pub mod replay; pub mod router; pub mod sim; +pub mod ttft; pub mod trace; pub mod types; diff --git a/src/main.rs b/src/main.rs index 89a7df7..89f0b7c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -219,7 +219,7 @@ fn cmd_validate(path: &PathBuf, overrides: &ConfigOverrides) -> Result<()> { "legacy manual" } ); - let cm = ComputeModel::new(&cfg.model, &cfg.hardware); + let cm = ComputeModel::new(&cfg.model, &cfg.hardware, &cfg.calibration); eprintln!("compute: {}", cm.describe()); eprintln!( "kv_block_bytes = {} ({:.2} MB{})", diff --git a/src/network.rs b/src/network.rs index 0bc1984..d8c80b1 100644 --- a/src/network.rs +++ b/src/network.rs @@ -5,7 +5,7 @@ //! by `bytes / bw`. Latency is added on top of transfer time. This captures //! contention without simulating individual packets. -use crate::config::HardwareConfig; +use crate::config::{CalibrationConfig, HardwareConfig}; #[derive(Debug, Clone)] pub struct LinkModel { @@ -53,10 +53,10 @@ pub struct InstanceLinks { } impl InstanceLinks { - pub fn from_hw(hw: &HardwareConfig) -> Self { + pub fn from_hw(hw: &HardwareConfig, calib: &CalibrationConfig) -> Self { Self { - pcie: LinkModel::new(hw.pcie_bw, hw.pcie_latency_us * 1e-6), - rdma: LinkModel::new(hw.rdma_bw, hw.rdma_latency_us * 1e-6), + pcie: LinkModel::new(hw.pcie_bw * calib.pcie_bw_util, hw.pcie_latency_us * 1e-6), + rdma: LinkModel::new(hw.rdma_bw * calib.rdma_bw_util, hw.rdma_latency_us * 1e-6), } } } diff --git a/src/router/cache_affinity.rs b/src/router/cache_affinity.rs new file mode 100644 index 0000000..8ccc029 --- /dev/null +++ b/src/router/cache_affinity.rs @@ -0,0 +1,217 @@ +//! Cache-affinity routing tuned for coding-agent workloads. +//! +//! Motivation — the coding trace has three dominant patterns: +//! +//! 1. **Short system-prompt-only requests** (≤10 blocks): novel per-chat but +//! sharing a small set of system prompts across millions of invocations. +//! 2. **Long multi-turn chains**: parent→child prefixes share ~60+ blocks +//! and grow by ~6 blocks per turn. Sticking the chain to one instance +//! maximises L0 hits for every subsequent turn. +//! 3. **Completely novel one-shots**: no existing cache anywhere; should be +//! placed to maximise *future* reuse, not just minimise current load. +//! +//! `cache_score` minimises `α·queue_len + β·miss_blocks`. With the shipping +//! defaults (α=1, β=0.1) a single extra queue position is worth ten extra +//! miss blocks, so short novel requests — the bulk of traffic — reduce to +//! pure least-loaded routing and scatter the same system prompt across +//! dozens of instances. Each scattered copy burns HBM that could have held a +//! different hot prefix, depressing the cluster-wide L0 hit-rate. +//! +//! `cache_affinity` fixes this with two changes: +//! +//! * **Strong cache weight** — cost is `α·queue_len − γ·l0_hit`, with +//! γ ≫ α·input_blocks, so any real L0 hit beats load-balancing. A soft +//! bonus (`δ·meta_only_hit`) still rewards instances that have the prefix +//! in L1/DRAM even when L0 is empty. +//! +//! * **Deterministic rendezvous tiebreak** — among instances that tie on +//! `(cost, hit, queue)`, we rank by `rendezvous(fingerprint, instance_id)` +//! where `fingerprint` is an FNV hash of the first few block hashes. This +//! turns cold routing from "first-found" (which piles on instance 0 until +//! it fills, then spills sequentially) into a consistent hash that maps +//! each distinct prefix to the *same* small set of homes. Repeat traffic +//! for that prefix therefore concentrates on its home, building a strong +//! L0 working set. +//! +//! Overload protection: if the rendezvous-chosen home already has +//! `queue_len > overload_threshold`, the load term dominates and the router +//! naturally spills to the next-best instance. + +use crate::cluster::meta_store::MetaStore; +use crate::instance::Instance; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; +use crate::trace::RequestRecord; + +pub struct CacheAffinityRouter { + /// Router display / trace name. + name: &'static str, + /// Weight on queue length (per queued request). + load_alpha: f64, + /// Reward per L0-hit block (real, locally cached). + l0_gamma: f64, + /// Reward per block present via meta-store but not in L0 (L1 / remote). + meta_delta: f64, + /// Number of leading block hashes folded into the prefix fingerprint. + fingerprint_k: usize, + /// Whether to break ties by rendezvous hash (sticky consistent placement) + /// or by first-found order (matches cache_score behaviour). + use_rendezvous: bool, +} + +impl CacheAffinityRouter { + pub fn new(load_alpha: f64) -> Self { + Self { + name: "cache_affinity", + load_alpha, + l0_gamma: 1.0, + meta_delta: 0.25, + fingerprint_k: 4, + use_rendezvous: true, + } + } + + /// Ablation: cache_score-style weights (γ=0.1, δ=0) but keep rendezvous + /// tiebreak. Isolates the contribution of deterministic sticky placement. + pub fn weak_with_rendezvous(load_alpha: f64) -> Self { + Self { + name: "cache_affinity_weak_rend", + load_alpha, + l0_gamma: 0.1, + meta_delta: 0.0, + fingerprint_k: 4, + use_rendezvous: true, + } + } + + /// Ablation: strong cache weights (γ=1.0, δ=0.25) but first-found tiebreak + /// instead of rendezvous. Isolates the contribution of reweighting alone. + pub fn strong_no_rendezvous(load_alpha: f64) -> Self { + Self { + name: "cache_affinity_strong_only", + load_alpha, + l0_gamma: 1.0, + meta_delta: 0.25, + fingerprint_k: 4, + use_rendezvous: false, + } + } + + /// FNV-1a over the first `k` block hashes — identifies the prefix family + /// (system-prompt + early agent context) that drives cache reuse. + fn fingerprint(hash_ids: &[u64], k: usize) -> u64 { + let n = hash_ids.len().min(k).max(1); + let take = hash_ids.len().min(n); + let mut fp: u64 = 0xcbf29ce484222325; + for &h in &hash_ids[..take] { + fp ^= h; + fp = fp.wrapping_mul(0x100000001b3); + } + if take == 0 { + // Empty request: still want a deterministic fingerprint (0). + fp ^= 0; + } + fp + } + + /// Splitmix64-style rendezvous score for (fingerprint, instance_id). + /// Uniform over u64; higher = preferred home. + fn rendezvous(fp: u64, instance_id: u32) -> u64 { + let mut h = fp ^ (instance_id as u64).wrapping_mul(0x9e3779b97f4a7c15); + h = h.wrapping_add(0x9e3779b97f4a7c15); + h = (h ^ (h >> 30)).wrapping_mul(0xbf58476d1ce4e5b9); + h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb); + h ^ (h >> 31) + } +} + +impl Router for CacheAffinityRouter { + fn name(&self) -> &'static str { + self.name + } + + fn route( + &mut self, + req: &RequestRecord, + instances: &[Instance], + meta: &MetaStore, + now: f64, + ) -> RouteDecision { + let n = instances.len(); + let l0 = local_l0_scores(req, instances); + // Meta-store predicted prefix — includes L1/remote-reachable blocks. + let meta_scores = meta.score_prefix(&req.hash_ids, now, n); + let fp = Self::fingerprint(&req.hash_ids, self.fingerprint_k); + + let mut candidates = Vec::with_capacity(n); + let mut best_idx: usize = 0; + let mut best_cost = f64::INFINITY; + let mut best_hit = 0u32; + let mut best_queue = u32::MAX; + let mut best_rend: u64 = 0; + + for (i, inst) in instances.iter().enumerate() { + let hit = l0[i]; + // meta_only = extra blocks reachable by RDMA/L1 beyond L0 hit. + let meta_only = meta_scores[i].saturating_sub(hit); + let q = inst.queue_len(); + + // Cost to minimise — lower is better. + // load term: α · queue_len + // cache term: − γ · l0_hit − δ · meta_only + // Short novel prefixes yield hit=0 on every instance, so cost + // reduces to α·q and the rendezvous tiebreak picks the home. + let cost = self.load_alpha * q as f64 + - self.l0_gamma * hit as f64 + - self.meta_delta * meta_only as f64; + let rend = Self::rendezvous(fp, inst.id); + + candidates.push(CandidateInfo { + instance: inst.id, + predicted_prefix: hit, + load_blocks: inst.kv_blocks_used, + queue_len: q, + }); + + // Tiebreak chain (descending preference): + // 1. lowest cost + // 2. highest hit (break cost ties toward real L0 work) + // 3. lowest queue + // 4. highest rendezvous (deterministic sticky home), optional + let better = if cost < best_cost { + true + } else if cost > best_cost { + false + } else if hit > best_hit { + true + } else if hit < best_hit { + false + } else if q < best_queue { + true + } else if q > best_queue { + false + } else if self.use_rendezvous { + rend > best_rend + } else { + // First-found wins on full tie (matches cache_score behaviour). + false + }; + + if better { + best_cost = cost; + best_hit = hit; + best_queue = q; + best_rend = rend; + best_idx = i; + } + } + + RouteDecision { + req_id: req.req_id, + mode: "cache_affinity", + chosen: instances[best_idx].id, + probe_overhead_s: 0.0, + candidates, + reason: "argmin(α·q − γ·l0_hit − δ·meta_only) + rendezvous tiebreak", + } + } +} diff --git a/src/router/cache_score_ttl.rs b/src/router/cache_score_ttl.rs new file mode 100644 index 0000000..c67ad1d --- /dev/null +++ b/src/router/cache_score_ttl.rs @@ -0,0 +1,86 @@ +//! Cache-score routing using TTL meta-store prefix predictions. +//! +//! This keeps the same scoring rule as `cache_score`: +//! +//! ```text +//! exponent_i = alpha * queue_len_i + beta * miss_i +//! ``` +//! +//! The only difference is that `miss_i` is computed from the global TTL +//! meta-store prefix score instead of the real local-L0 prefix. + +use crate::cluster::meta_store::MetaStore; +use crate::instance::Instance; +use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::trace::RequestRecord; + +pub struct CacheScoreTtlRouter { + alpha: f64, + beta: f64, +} + +impl CacheScoreTtlRouter { + pub fn new(alpha: f64, beta: f64) -> Self { + Self { alpha, beta } + } +} + +impl Router for CacheScoreTtlRouter { + fn name(&self) -> &'static str { + "cache_score_ttl" + } + + fn route( + &mut self, + req: &RequestRecord, + instances: &[Instance], + meta: &MetaStore, + now: f64, + ) -> RouteDecision { + let n = instances.len(); + let scores = meta.score_prefix(&req.hash_ids, now, n); + let input_blocks = req.hash_ids.len() as f64; + + let mut best_idx: usize = 0; + let mut best_exp = f64::INFINITY; + let mut best_queue = u32::MAX; + let mut best_prefix = 0u32; + let mut candidates = Vec::with_capacity(n); + + for (i, inst) in instances.iter().enumerate() { + let prefix = scores[i] as f64; + let miss = (input_blocks - prefix).max(0.0); + let q = inst.queue_len() as f64; + let exponent = self.alpha * q + self.beta * miss; + + candidates.push(CandidateInfo { + instance: inst.id, + predicted_prefix: scores[i], + load_blocks: inst.kv_blocks_used, + queue_len: inst.queue_len(), + }); + + let better = exponent < best_exp + || (exponent == best_exp && inst.queue_len() < best_queue) + || (exponent == best_exp + && inst.queue_len() == best_queue + && scores[i] > best_prefix); + + if better { + best_exp = exponent; + best_idx = i; + best_queue = inst.queue_len(); + best_prefix = scores[i]; + } + } + + RouteDecision { + req_id: req.req_id, + mode: "cache_score_ttl", + chosen: instances[best_idx].id, + probe_overhead_s: 0.0, + candidates, + reason: "argmin 2^(alpha*load + beta*meta_store_miss)", + } + } +} diff --git a/src/router/estimated_ttft.rs b/src/router/estimated_ttft.rs index 250ab5a..0f8b5bf 100644 --- a/src/router/estimated_ttft.rs +++ b/src/router/estimated_ttft.rs @@ -1,30 +1,30 @@ -//! First-principles TTFT-estimate routing using local L0 hits only. +//! First-principles TTFT-estimate routing with calibrated compute and +//! tier-aware KV prepare costs. //! //! Estimates the actual time-to-first-token for each candidate instance: //! -//! `TTFT(r,i) = drain(i) + prefill(local_l0_miss_i)` -//! -//! - **drain** — exact queue drain time: sum of per-request `prefill_time()` -//! using the architecture-aware compute model (quadratic / DSA). -//! -//! - **prefill** — compute for tokens whose blocks are absent from the -//! instance's current L0 cache. -//! -//! L1 / remote reuse can still reduce execution-time misses later in the -//! cluster fetch chain, but they are not counted as `kvcache hit` when -//! comparing routing candidates. +//! `TTFT(r,i) = drain(i) + scheduler + kv_prepare(r,i) + prefill(miss_i) + first_token_tail` use crate::cluster::meta_store::MetaStore; use crate::config::Config; use crate::instance::Instance; -use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; +use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; +use crate::ttft::{classify_prefix_tiers, TtftModel}; -pub struct EstimatedTtftRouter; +pub struct EstimatedTtftRouter { + ttft_model: TtftModel, +} impl EstimatedTtftRouter { - pub fn new(_config: &Config) -> Self { - Self + pub fn new(config: &Config) -> Self { + Self { + ttft_model: TtftModel::new( + &config.hardware, + &config.calibration, + config.model.kv_block_bytes(), + ), + } } } @@ -37,50 +37,51 @@ impl Router for EstimatedTtftRouter { &mut self, req: &RequestRecord, instances: &[Instance], - _meta: &MetaStore, - _now: f64, + meta: &MetaStore, + now: f64, ) -> RouteDecision { + let scheduler = self.ttft_model.scheduler_overhead_s(instances.len(), 3); let n = instances.len(); - let scores = local_l0_scores(req, instances); - let input_blocks = req.hash_ids.len() as u32; let mut best: u32 = 0; let mut best_cost = f64::INFINITY; let mut best_queue = u32::MAX; - let mut best_local = 0u32; + let mut best_reuse = 0u32; let mut candidates = Vec::with_capacity(n); for inst in instances { - let i = inst.id as usize; - let local_prefix = scores[i]; + let residency = classify_prefix_tiers(&req.hash_ids, inst, meta, now); // 1. Exact queue drain time (architecture-aware, per-request sum). let drain = inst.estimated_drain_time(); - // 2. Prefill compute for blocks absent from local L0. - let miss_tokens = input_blocks - .saturating_sub(local_prefix) - .saturating_mul(inst.block_size_tokens); - let cost = drain + inst.compute.prefill_time(miss_tokens); + let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens); + let kv_prepare = self.ttft_model.kv_prepare_time_s(residency); + let first_token_tail = self.ttft_model.first_token_tail_s(); + let cost = + drain + scheduler + kv_prepare + inst.compute.prefill_time(miss_tokens) + first_token_tail; candidates.push(CandidateInfo { instance: inst.id, - predicted_prefix: local_prefix, + predicted_prefix: residency.l0_hit_blocks + + residency.l1_hit_blocks + + residency.remote_hit_blocks, load_blocks: inst.kv_blocks_used, queue_len: inst.queue_len(), }); // Minimise (cost, queue_len, -local_prefix). let ql = inst.queue_len(); + let reusable = residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks; let better = cost < best_cost || (cost == best_cost && ql < best_queue) - || (cost == best_cost && ql == best_queue && local_prefix > best_local); + || (cost == best_cost && ql == best_queue && reusable > best_reuse); if better { best_cost = cost; best = inst.id; best_queue = ql; - best_local = local_prefix; + best_reuse = reusable; } } @@ -90,7 +91,7 @@ impl Router for EstimatedTtftRouter { chosen: best, probe_overhead_s: 0.0, candidates, - reason: "argmin(drain_time + local-L0-miss prefill_time)", + reason: "argmin(drain + scheduler + kv_prepare + prefill + first_token_tail)", } } } diff --git a/src/router/mod.rs b/src/router/mod.rs index 6cafc8d..9645471 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,7 +1,9 @@ //! Cluster-level routing strategies. +pub mod cache_affinity; pub mod cache_load; pub mod cache_score; +pub mod cache_score_ttl; pub mod estimated_ttft; pub mod least_loaded; pub mod least_tokens; @@ -77,10 +79,22 @@ pub fn build(full: &Config, seed: u64) -> Box { MinPd => Box::new(min_pd::MinPdRouter::new()) as Box, LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box, CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box, + CacheAffinity => Box::new(cache_affinity::CacheAffinityRouter::new(cfg.load_alpha)) + as Box, + CacheAffinityWeakRend => Box::new( + cache_affinity::CacheAffinityRouter::weak_with_rendezvous(cfg.load_alpha), + ) as Box, + CacheAffinityStrongOnly => Box::new( + cache_affinity::CacheAffinityRouter::strong_no_rendezvous(cfg.load_alpha), + ) as Box, CacheScore => Box::new(cache_score::CacheScoreRouter::new( cfg.score_alpha, cfg.score_beta, )) as Box, + CacheScoreTtl => Box::new(cache_score_ttl::CacheScoreTtlRouter::new( + cfg.score_alpha, + cfg.score_beta, + )) as Box, EstimatedTtft => { Box::new(estimated_ttft::EstimatedTtftRouter::new(full)) as Box } @@ -94,12 +108,13 @@ pub fn build(full: &Config, seed: u64) -> Box { mod tests { use super::*; use crate::config::{ - ClusterConfig, HardwareConfig, MetaStoreConfig, ModelConfig, RouterConfig, RouterMode, - SimConfig, + CalibrationConfig, ClusterConfig, HardwareConfig, MetaStoreConfig, ModelConfig, + RouterConfig, RouterMode, SimConfig, }; use crate::instance::instance::AdmittedRequest; use crate::router::cache_load::CacheLoadRouter; use crate::router::cache_score::CacheScoreRouter; + use crate::router::cache_score_ttl::CacheScoreTtlRouter; use crate::router::estimated_ttft::EstimatedTtftRouter; use crate::router::min_pd::MinPdRouter; use crate::router::precise_aware::PreciseRouter; @@ -129,10 +144,14 @@ mod tests { gpu_mem_bw: 1.0e12, hbm_bytes: 1.0e9, dram_bytes: 4.0e9, + host_dram_bw: 5.0e11, pcie_bw: 32.0e9, pcie_latency_us: 1.0, rdma_bw: 12.0e9, rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, max_batch_slots: 32, prefill_chunk_tokens: 1024, } @@ -142,6 +161,7 @@ mod tests { Config { model: test_model(), hardware: test_hardware(), + calibration: CalibrationConfig::default(), cluster: ClusterConfig { num_instances: 2, meta_store: MetaStoreConfig { @@ -171,8 +191,9 @@ mod tests { fn make_instances(n: usize) -> Vec { let model = test_model(); let hw = test_hardware(); + let calib = CalibrationConfig::default(); (0..n) - .map(|id| Instance::new(id as u32, &model, &hw)) + .map(|id| Instance::new(id as u32, &model, &hw, &calib)) .collect() } @@ -211,6 +232,7 @@ mod tests { ready_at: 0.0, prefill_tokens_remaining: tokens, reserved_blocks: 0, + completion_tail_s: 0.0, }); } } @@ -232,7 +254,7 @@ mod tests { } #[test] - fn cache_aware_routers_compare_real_l0_not_meta_store_scores() { + fn ttl_aware_uses_meta_store_scores() { let req = make_request(&[20, 21, 22]); let mut instances = make_instances(2); let mut meta = MetaStore::new(1000.0); @@ -243,7 +265,19 @@ mod tests { insert_l0(&mut instances[1], &[20, 21]); let mut ttl = TtlAwareRouter::new(0.0); - assert_eq!(ttl.route(&req, &instances, &meta, 0.0).chosen, 1); + assert_eq!(ttl.route(&req, &instances, &meta, 0.0).chosen, 0); + } + + #[test] + fn cache_aware_routers_compare_real_l0_not_meta_store_scores() { + let req = make_request(&[20, 21, 22]); + let mut instances = make_instances(2); + let mut meta = MetaStore::new(1000.0); + + // Instance 0 only looks hot in the meta store; its real L0 prefix is zero. + publish_meta(&mut meta, 0, &[20, 21, 22], 0.0); + // Instance 1 really holds the first two blocks in HBM. + insert_l0(&mut instances[1], &[20, 21]); let mut cache_load = CacheLoadRouter::new(); assert_eq!(cache_load.route(&req, &instances, &meta, 0.0).chosen, 1); @@ -259,6 +293,24 @@ mod tests { assert_eq!(est.route(&req, &instances, &meta, 0.0).chosen, 1); } + #[test] + fn cache_score_ttl_uses_meta_store_prefix() { + let req = make_request(&[50, 51, 52]); + let mut instances = make_instances(2); + let mut meta = MetaStore::new(1000.0); + + // Instance 0 only looks hot in the meta store. + publish_meta(&mut meta, 0, &[50, 51, 52], 0.0); + // Instance 1 has the true local L0 prefix. + insert_l0(&mut instances[1], &[50, 51]); + + let mut cache_score = CacheScoreRouter::new(0.0, 1.0); + assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1); + + let mut cache_score_ttl = CacheScoreTtlRouter::new(0.0, 1.0); + assert_eq!(cache_score_ttl.route(&req, &instances, &meta, 0.0).chosen, 0); + } + #[test] fn prefix_affinity_fallback_uses_real_l0_not_meta_store_scores() { let req = make_request(&[30, 31, 32]); @@ -277,4 +329,21 @@ mod tests { assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)"); assert_eq!(decision.chosen, 1); } + + #[test] + fn estimated_ttft_can_prefer_full_l1_hit_over_shorter_queue() { + let req = make_request(&[40, 41, 42, 43]); + let mut instances = make_instances(2); + let meta = MetaStore::new(1000.0); + + // Instance 0 can satisfy the whole prefix from local DRAM/L1. + insert_l1(&mut instances[0], &[40, 41, 42, 43]); + // Instance 1 has a slightly shorter queue but no reusable prefix. + enqueue_requests(&mut instances[0], 1, 16); + + let cfg = test_config(RouterMode::EstimatedTtft); + let mut est = EstimatedTtftRouter::new(&cfg); + + assert_eq!(est.route(&req, &instances, &meta, 0.0).chosen, 0); + } } diff --git a/src/router/ttl_aware.rs b/src/router/ttl_aware.rs index 39a2973..04481d0 100644 --- a/src/router/ttl_aware.rs +++ b/src/router/ttl_aware.rs @@ -1,6 +1,6 @@ use crate::cluster::meta_store::MetaStore; use crate::instance::Instance; -use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; +use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::trace::RequestRecord; pub struct TtlAwareRouter { @@ -22,11 +22,11 @@ impl Router for TtlAwareRouter { &mut self, req: &RequestRecord, instances: &[Instance], - _meta: &MetaStore, - _now: f64, + meta: &MetaStore, + now: f64, ) -> RouteDecision { let n = instances.len(); - let scores = local_l0_scores(req, instances); + let scores = meta.score_prefix(&req.hash_ids, now, n); let mut best = 0u32; let mut best_key = (i64::MIN, f64::INFINITY); // maximize prefix, then minimize load let mut candidates = Vec::with_capacity(n); @@ -52,7 +52,7 @@ impl Router for TtlAwareRouter { chosen: best, probe_overhead_s: 0.0, candidates, - reason: "max local L0 prefix, tie -> least loaded", + reason: "max meta_store prefix, tie -> least loaded", } } } diff --git a/src/ttft.rs b/src/ttft.rs new file mode 100644 index 0000000..f2c4f5a --- /dev/null +++ b/src/ttft.rs @@ -0,0 +1,177 @@ +use crate::cluster::meta_store::MetaStore; +use crate::config::{CalibrationConfig, HardwareConfig}; +use crate::instance::Instance; + +#[derive(Debug, Clone, Copy, Default)] +pub struct PrefixResidency { + pub l0_hit_blocks: u32, + pub l1_hit_blocks: u32, + pub remote_hit_blocks: u32, + pub miss_blocks: u32, +} + +#[derive(Debug, Clone)] +pub struct TtftModel { + kv_block_bytes: u64, + host_dram_bw: f64, + pcie_bw: f64, + pcie_latency_s: f64, + rdma_bw: f64, + rdma_latency_s: f64, + scheduler_base_s: f64, + scheduler_per_candidate_s: f64, + cache_probe_per_tier_s: f64, + batch_pack_s: f64, + dram_access_s: f64, + remote_metadata_s: f64, + layout_transform_s: f64, + first_token_tail_s: f64, +} + +impl TtftModel { + pub fn new(hw: &HardwareConfig, calib: &CalibrationConfig, kv_block_bytes: u64) -> Self { + Self { + kv_block_bytes, + host_dram_bw: hw.host_dram_bw * calib.dram_bw_util, + pcie_bw: hw.pcie_bw * calib.pcie_bw_util, + pcie_latency_s: hw.pcie_latency_us * 1e-6, + rdma_bw: hw.rdma_bw * calib.rdma_bw_util, + rdma_latency_s: hw.rdma_latency_us * 1e-6, + scheduler_base_s: calib.scheduler_base_overhead_us * 1e-6, + scheduler_per_candidate_s: calib.scheduler_per_candidate_us * 1e-6, + cache_probe_per_tier_s: calib.cache_probe_us_per_tier * 1e-6, + batch_pack_s: calib.batch_pack_overhead_us * 1e-6, + dram_access_s: calib.dram_access_latency_us * 1e-6, + remote_metadata_s: calib.remote_metadata_us * 1e-6, + layout_transform_s: calib.layout_transform_fixed_us * 1e-6, + first_token_tail_s: (calib.final_sync_us + calib.first_token_ready_us) * 1e-6, + } + } + + pub fn scheduler_overhead_s(&self, num_candidates: usize, num_tiers: usize) -> f64 { + self.scheduler_base_s + + self.scheduler_per_candidate_s * num_candidates as f64 + + self.cache_probe_per_tier_s * num_tiers as f64 + + self.batch_pack_s + } + + pub fn first_token_tail_s(&self) -> f64 { + self.first_token_tail_s + } + + pub fn block_bytes(&self, blocks: u32) -> u64 { + self.kv_block_bytes * blocks as u64 + } + + pub fn local_l1_prepare_time_s(&self, blocks: u32) -> f64 { + if blocks == 0 { + return 0.0; + } + let bytes = self.block_bytes(blocks); + self.dram_access_s + + bytes as f64 / self.host_dram_bw.max(1.0) + + self.pcie_cost_s(bytes) + + self.layout_transform_s + } + + pub fn remote_prepare_time_s(&self, blocks: u32) -> f64 { + if blocks == 0 { + return 0.0; + } + let bytes = self.block_bytes(blocks); + self.remote_metadata_s + self.rdma_cost_s(bytes) + self.pcie_cost_s(bytes) + self.layout_transform_s + } + + pub fn pcie_cost_s(&self, bytes: u64) -> f64 { + if bytes == 0 { + self.pcie_latency_s + } else { + self.pcie_latency_s + bytes as f64 / self.pcie_bw.max(1.0) + } + } + + pub fn rdma_cost_s(&self, bytes: u64) -> f64 { + if bytes == 0 { + self.rdma_latency_s + } else { + self.rdma_latency_s + bytes as f64 / self.rdma_bw.max(1.0) + } + } + + pub fn kv_prepare_time_s(&self, residency: PrefixResidency) -> f64 { + self.local_l1_prepare_time_s(residency.l1_hit_blocks) + + self.remote_prepare_time_s(residency.remote_hit_blocks) + } +} + +pub fn classify_prefix_tiers( + req_hashes: &[u64], + inst: &Instance, + meta: &MetaStore, + now: f64, +) -> PrefixResidency { + let total_blocks = req_hashes.len() as u32; + let l0_hit_blocks = inst.cache.l0.longest_prefix_peek(req_hashes) as u32; + let suffix_after_l0 = &req_hashes[l0_hit_blocks as usize..]; + + let l1_hit_blocks = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32; + let suffix_after_l1 = &suffix_after_l0[l1_hit_blocks as usize..]; + + let mut remote_hit_blocks = 0; + for &h in suffix_after_l1 { + let owners = meta.instances_for(h, now); + if owners.iter().any(|o| *o != inst.id) { + remote_hit_blocks += 1; + } else { + break; + } + } + + PrefixResidency { + l0_hit_blocks, + l1_hit_blocks, + remote_hit_blocks, + miss_blocks: total_blocks - l0_hit_blocks - l1_hit_blocks - remote_hit_blocks, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{CalibrationConfig, HardwareConfig}; + + #[test] + fn remote_prepare_includes_fixed_overheads() { + let hw = HardwareConfig { + gpu_flops: 1.0e14, + gpu_fp8_flops: 0.0, + gpu_fp4_flops: 0.0, + gpu_mem_bw: 1.0e12, + hbm_bytes: 1.0e9, + dram_bytes: 4.0e9, + host_dram_bw: 5.0e11, + pcie_bw: 32.0e9, + pcie_latency_us: 1.0, + rdma_bw: 12.0e9, + rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, + max_batch_slots: 32, + prefill_chunk_tokens: 1024, + }; + let model = TtftModel::new( + &hw, + &CalibrationConfig { + remote_metadata_us: 11.0, + layout_transform_fixed_us: 7.0, + ..CalibrationConfig::default() + }, + 4096, + ); + + let transport_only = model.rdma_cost_s(4096) + model.pcie_cost_s(4096); + let total = model.remote_prepare_time_s(1); + assert!(total > transport_only); + } +} diff --git a/tests/smoke.rs b/tests/smoke.rs index 6e791e3..4865e86 100644 --- a/tests/smoke.rs +++ b/tests/smoke.rs @@ -28,13 +28,18 @@ fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config { gpu_mem_bw: 1.0e12, hbm_bytes: 1.0e9, dram_bytes: 4.0e9, + host_dram_bw: 5.0e11, pcie_bw: 32.0e9, pcie_latency_us: 1.0, rdma_bw: 12.0e9, rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, max_batch_slots: 32, prefill_chunk_tokens: 1024, }, + calibration: CalibrationConfig::default(), cluster: ClusterConfig { num_instances: 4, meta_store: MetaStoreConfig {