Compare commits

..

2 Commits

Author SHA1 Message Date
a3f386c858 feat: update ttft modeling and add cache affinity 2026-04-15 19:08:10 +08:00
ff316c6873 fix: cache calculation 2026-04-15 17:31:39 +08:00
31 changed files with 1728 additions and 387 deletions

6
.gitignore vendored
View File

@@ -1,6 +1,12 @@
# Trace files # Trace files
bailian-traces bailian-traces
# docs
docs
reports
scripts
tests/test_analyze_affinity_policy.py
# Rust build artifacts # Rust build artifacts
/target/ /target/
**/*.rs.bk **/*.rs.bk

View File

@@ -8,6 +8,7 @@ use crate::instance::kv_cache::L1Change;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{self, RouteDecision, Router}; use crate::router::{self, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
use crate::ttft::{classify_prefix_tiers, TtftModel};
use crate::types::InstanceId; use crate::types::InstanceId;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -31,13 +32,19 @@ pub struct Cluster {
pub router: Box<dyn Router>, pub router: Box<dyn Router>,
pub block_size_tokens: u32, pub block_size_tokens: u32,
pub kv_block_bytes: u64, pub kv_block_bytes: u64,
pub ttft_model: TtftModel,
} }
impl Cluster { impl Cluster {
pub fn new(config: &Config, model: &ModelConfig) -> Self { pub fn new(config: &Config, model: &ModelConfig) -> Self {
let mut instances = Vec::with_capacity(config.cluster.num_instances as usize); let mut instances = Vec::with_capacity(config.cluster.num_instances as usize);
for id in 0..config.cluster.num_instances { for id in 0..config.cluster.num_instances {
instances.push(Instance::new(id as InstanceId, model, &config.hardware)); instances.push(Instance::new(
id as InstanceId,
model,
&config.hardware,
&config.calibration,
));
} }
let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds); let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds);
let router = router::build(config, config.sim.seed); let router = router::build(config, config.sim.seed);
@@ -47,6 +54,7 @@ impl Cluster {
router, router,
block_size_tokens: model.block_size_tokens, block_size_tokens: model.block_size_tokens,
kv_block_bytes: model.kv_block_bytes(), kv_block_bytes: model.kv_block_bytes(),
ttft_model: TtftModel::new(&config.hardware, &config.calibration, model.kv_block_bytes()),
} }
} }
@@ -59,24 +67,29 @@ impl Cluster {
.route(req, &self.instances, &self.meta_store, now); .route(req, &self.instances, &self.meta_store, now);
let inst_id = decision.chosen; let inst_id = decision.chosen;
let probe_overhead_s = decision.probe_overhead_s; let probe_overhead_s = decision.probe_overhead_s;
let scheduler_overhead_s = self
.ttft_model
.scheduler_overhead_s(self.instances.len(), 3);
// The router probe overhead delays the request's effective start time. // The router probe overhead and scheduler work delay the request's
let effective_now = now + probe_overhead_s; // effective start time.
let effective_now = now + probe_overhead_s + scheduler_overhead_s;
let inst = &mut self.instances[inst_id as usize]; let inst = &mut self.instances[inst_id as usize];
let residency = classify_prefix_tiers(&req.hash_ids, inst, &self.meta_store, now);
let total_blocks = req.hash_ids.len() as u32; let total_blocks = req.hash_ids.len() as u32;
let l0_hits = residency.l0_hit_blocks;
let l1_hits = residency.l1_hit_blocks;
let remote_hit_blocks = residency.remote_hit_blocks;
// 1. L0 lookup (touches matched blocks). // 1. L1 lookup on the remaining suffix.
let l0_hits = inst.cache.l0.longest_prefix(&req.hash_ids) as u32;
// 2. L1 lookup on the remaining suffix.
let suffix_after_l0 = &req.hash_ids[l0_hits as usize..]; let suffix_after_l0 = &req.hash_ids[l0_hits as usize..];
let l1_hits = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32;
// L1->L0 transfer cost // L1->L0 transfer cost
let l1_bytes = (l1_hits as u64) * self.kv_block_bytes; let l1_bytes = (l1_hits as u64) * self.kv_block_bytes;
let mut t = effective_now; let mut t = effective_now;
let mut l1_changes = Vec::new(); let mut l1_changes = Vec::new();
if l1_hits > 0 { if l1_hits > 0 {
t += self.ttft_model.local_l1_prepare_time_s(l1_hits) - inst.links.pcie.cost(l1_bytes);
t = inst.links.pcie.reserve(t, l1_bytes); t = inst.links.pcie.reserve(t, l1_bytes);
l1_changes = inst l1_changes = inst
.cache .cache
@@ -86,23 +99,14 @@ impl Cluster {
// 3. Remote v6d lookup for the still-remaining suffix. // 3. Remote v6d lookup for the still-remaining suffix.
let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..]; let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..];
let mut remote_hit_blocks: u32 = 0;
for &h in suffix_after_l1 {
// A block is remotely available iff some instance other than
// `inst_id` lists it (and not expired).
let owners = self.meta_store.instances_for(h, now);
let any_remote = owners.iter().any(|o| *o != inst_id);
if any_remote {
remote_hit_blocks += 1;
} else {
break; // contiguous prefix - stop on first miss
}
}
let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes; let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes;
if remote_hit_blocks > 0 { if remote_hit_blocks > 0 {
let pulled = &suffix_after_l1[..remote_hit_blocks as usize]; let pulled = &suffix_after_l1[..remote_hit_blocks as usize];
let l1_changes = { let l1_changes = {
let inst = &mut self.instances[inst_id as usize]; let inst = &mut self.instances[inst_id as usize];
t += self.ttft_model.remote_prepare_time_s(remote_hit_blocks)
- inst.links.rdma.cost(remote_bytes)
- inst.links.pcie.cost(remote_bytes);
t = inst.links.rdma.reserve(t, remote_bytes); t = inst.links.rdma.reserve(t, remote_bytes);
t = inst.links.pcie.reserve(t, remote_bytes); t = inst.links.pcie.reserve(t, remote_bytes);
inst.cache.fetch_remote_blocks_to_l0(pulled) inst.cache.fetch_remote_blocks_to_l0(pulled)
@@ -134,6 +138,7 @@ impl Cluster {
ready_at: t, ready_at: t,
prefill_tokens_remaining: miss_tokens, prefill_tokens_remaining: miss_tokens,
reserved_blocks, reserved_blocks,
completion_tail_s: self.ttft_model.first_token_tail_s(),
}; };
let inst = &mut self.instances[inst_id as usize]; let inst = &mut self.instances[inst_id as usize];
inst.admit(admitted); inst.admit(admitted);
@@ -170,3 +175,100 @@ impl Cluster {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CalibrationConfig, ClusterConfig, Config, HardwareConfig, MetaStoreConfig, ModelConfig,
RouterConfig, RouterMode, SimConfig,
};
use crate::trace::RequestRecord;
fn test_config(mode: RouterMode) -> Config {
Config {
model: ModelConfig {
name: "test".into(),
num_layers: 4,
num_kv_heads: 2,
head_dim: 64,
dtype_bytes: 2,
block_size_tokens: 16,
flops_per_token_prefill: Some(1.0e9),
attn_quadratic_coeff: Some(64.0),
..Default::default()
},
hardware: HardwareConfig {
gpu_flops: 1.0e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9,
dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9,
pcie_latency_us: 1.0,
rdma_bw: 12.0e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
},
calibration: CalibrationConfig {
dram_access_latency_us: 25.0,
layout_transform_fixed_us: 7.0,
..CalibrationConfig::default()
},
cluster: ClusterConfig {
num_instances: 1,
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
router: RouterConfig {
mode,
precise_probe_latency_us: 10.0,
precise_probe_topk: 2,
load_alpha: 0.0,
score_alpha: 0.0,
score_beta: 1.0,
prefix_k: 8,
affinity_fan_out: 2,
},
},
sim: SimConfig {
trace_path: String::new(),
max_requests: None,
output_dir: String::new(),
sample_interval_s: 0.0,
seed: 7,
},
}
}
#[test]
fn l1_ready_at_includes_dram_and_transform_overhead() {
let cfg = test_config(RouterMode::EstimatedTtft);
let mut cluster = Cluster::new(&cfg, &cfg.model);
let req = RequestRecord {
req_id: 1,
chat_id: 0,
arrival: 0.0,
input_len: 32,
output_len: 16,
hash_ids: vec![10, 11],
};
let mut evicted = Vec::new();
cluster.instances[0]
.cache
.l1
.insert_blocks(&req.hash_ids, &mut evicted);
let stats = cluster.route_and_admit(&req, 0.0);
let pure_pcie = cluster.instances[0].links.pcie.cost(cluster.kv_block_bytes * 2);
assert!(stats.ready_at > pure_pcie);
}
}

View File

@@ -1,6 +1,6 @@
pub mod meta_store;
#[allow(clippy::module_inception)] #[allow(clippy::module_inception)]
pub mod cluster; pub mod cluster;
pub mod meta_store;
pub use cluster::Cluster; pub use cluster::Cluster;
pub use meta_store::MetaStore; pub use meta_store::MetaStore;

View File

@@ -19,6 +19,8 @@ use std::path::Path;
pub struct Config { pub struct Config {
pub model: ModelConfig, pub model: ModelConfig,
pub hardware: HardwareConfig, pub hardware: HardwareConfig,
#[serde(default)]
pub calibration: CalibrationConfig,
pub cluster: ClusterConfig, pub cluster: ClusterConfig,
pub sim: SimConfig, pub sim: SimConfig,
} }
@@ -111,8 +113,7 @@ impl ModelConfig {
* self.dtype_bytes as u64 * self.dtype_bytes as u64
* self.block_size_tokens as u64 * self.block_size_tokens as u64
} else { } else {
2u64 2u64 * self.num_layers as u64
* self.num_layers as u64
* self.num_kv_heads as u64 * self.num_kv_heads as u64
* self.head_dim as u64 * self.head_dim as u64
* self.dtype_bytes as u64 * self.dtype_bytes as u64
@@ -179,16 +180,36 @@ pub struct HardwareConfig {
pub gpu_mem_bw: f64, pub gpu_mem_bw: f64,
pub hbm_bytes: f64, pub hbm_bytes: f64,
pub dram_bytes: f64, pub dram_bytes: f64,
#[serde(default = "default_host_dram_bw")]
pub host_dram_bw: f64,
pub pcie_bw: f64, pub pcie_bw: f64,
pub pcie_latency_us: f64, pub pcie_latency_us: f64,
pub rdma_bw: f64, pub rdma_bw: f64,
pub rdma_latency_us: f64, pub rdma_latency_us: f64,
#[serde(default = "default_intra_node_tp_bw")]
pub intra_node_tp_bw: f64,
#[serde(default = "default_intra_node_tp_latency_us")]
pub intra_node_tp_latency_us: f64,
#[serde(default = "default_tp_degree")]
pub tp_degree: u32,
#[serde(default = "default_max_batch_slots")] #[serde(default = "default_max_batch_slots")]
pub max_batch_slots: u32, pub max_batch_slots: u32,
#[serde(default = "default_prefill_chunk_tokens")] #[serde(default = "default_prefill_chunk_tokens")]
pub prefill_chunk_tokens: u32, pub prefill_chunk_tokens: u32,
} }
fn default_host_dram_bw() -> f64 {
5.0e11
}
fn default_intra_node_tp_bw() -> f64 {
9.0e11
}
fn default_intra_node_tp_latency_us() -> f64 {
2.0
}
fn default_tp_degree() -> u32 {
1
}
fn default_max_batch_slots() -> u32 { fn default_max_batch_slots() -> u32 {
256 256
} }
@@ -196,6 +217,144 @@ fn default_prefill_chunk_tokens() -> u32 {
2048 2048
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CalibrationConfig {
#[serde(default = "default_matmul_util")]
pub matmul_util: f64,
#[serde(default = "default_attention_util")]
pub attention_util: f64,
#[serde(default = "default_hbm_bw_util")]
pub hbm_bw_util: f64,
#[serde(default = "default_dram_bw_util")]
pub dram_bw_util: f64,
#[serde(default = "default_pcie_bw_util")]
pub pcie_bw_util: f64,
#[serde(default = "default_rdma_bw_util")]
pub rdma_bw_util: f64,
#[serde(default = "default_tp_bw_util")]
pub tp_bw_util: f64,
#[serde(default = "default_tp_overlap_ratio")]
pub tp_overlap_ratio: f64,
#[serde(default = "default_tp_collective_count_per_layer")]
pub tp_collective_count_per_layer: f64,
#[serde(default = "default_kv_overlap_ratio")]
pub kv_overlap_ratio: f64,
#[serde(default = "default_scheduler_base_overhead_us")]
pub scheduler_base_overhead_us: f64,
#[serde(default = "default_scheduler_per_candidate_us")]
pub scheduler_per_candidate_us: f64,
#[serde(default = "default_cache_probe_us_per_tier")]
pub cache_probe_us_per_tier: f64,
#[serde(default = "default_batch_pack_overhead_us")]
pub batch_pack_overhead_us: f64,
#[serde(default = "default_dram_access_latency_us")]
pub dram_access_latency_us: f64,
#[serde(default = "default_remote_metadata_us")]
pub remote_metadata_us: f64,
#[serde(default = "default_layout_transform_fixed_us")]
pub layout_transform_fixed_us: f64,
#[serde(default = "default_misc_layer_overhead_us")]
pub misc_layer_overhead_us: f64,
#[serde(default = "default_chunk_launch_overhead_us")]
pub chunk_launch_overhead_us: f64,
#[serde(default = "default_final_sync_us")]
pub final_sync_us: f64,
#[serde(default = "default_first_token_ready_us")]
pub first_token_ready_us: f64,
}
impl Default for CalibrationConfig {
fn default() -> Self {
Self {
matmul_util: default_matmul_util(),
attention_util: default_attention_util(),
hbm_bw_util: default_hbm_bw_util(),
dram_bw_util: default_dram_bw_util(),
pcie_bw_util: default_pcie_bw_util(),
rdma_bw_util: default_rdma_bw_util(),
tp_bw_util: default_tp_bw_util(),
tp_overlap_ratio: default_tp_overlap_ratio(),
tp_collective_count_per_layer: default_tp_collective_count_per_layer(),
kv_overlap_ratio: default_kv_overlap_ratio(),
scheduler_base_overhead_us: default_scheduler_base_overhead_us(),
scheduler_per_candidate_us: default_scheduler_per_candidate_us(),
cache_probe_us_per_tier: default_cache_probe_us_per_tier(),
batch_pack_overhead_us: default_batch_pack_overhead_us(),
dram_access_latency_us: default_dram_access_latency_us(),
remote_metadata_us: default_remote_metadata_us(),
layout_transform_fixed_us: default_layout_transform_fixed_us(),
misc_layer_overhead_us: default_misc_layer_overhead_us(),
chunk_launch_overhead_us: default_chunk_launch_overhead_us(),
final_sync_us: default_final_sync_us(),
first_token_ready_us: default_first_token_ready_us(),
}
}
}
fn default_matmul_util() -> f64 {
0.4
}
fn default_attention_util() -> f64 {
0.3
}
fn default_hbm_bw_util() -> f64 {
0.75
}
fn default_dram_bw_util() -> f64 {
0.65
}
fn default_pcie_bw_util() -> f64 {
0.8
}
fn default_rdma_bw_util() -> f64 {
0.7
}
fn default_tp_bw_util() -> f64 {
0.8
}
fn default_tp_overlap_ratio() -> f64 {
0.5
}
fn default_tp_collective_count_per_layer() -> f64 {
2.0
}
fn default_kv_overlap_ratio() -> f64 {
0.0
}
fn default_scheduler_base_overhead_us() -> f64 {
50.0
}
fn default_scheduler_per_candidate_us() -> f64 {
5.0
}
fn default_cache_probe_us_per_tier() -> f64 {
3.0
}
fn default_batch_pack_overhead_us() -> f64 {
10.0
}
fn default_dram_access_latency_us() -> f64 {
2.0
}
fn default_remote_metadata_us() -> f64 {
8.0
}
fn default_layout_transform_fixed_us() -> f64 {
4.0
}
fn default_misc_layer_overhead_us() -> f64 {
0.5
}
fn default_chunk_launch_overhead_us() -> f64 {
8.0
}
fn default_final_sync_us() -> f64 {
5.0
}
fn default_first_token_ready_us() -> f64 {
10.0
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Cluster // Cluster
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -266,7 +425,11 @@ pub enum RouterMode {
Precise, Precise,
MinPd, MinPd,
CacheLoad, CacheLoad,
CacheAffinity,
CacheAffinityWeakRend,
CacheAffinityStrongOnly,
CacheScore, CacheScore,
CacheScoreTtl,
EstimatedTtft, EstimatedTtft,
PrefixAffinity, PrefixAffinity,
} }
@@ -282,7 +445,11 @@ impl RouterMode {
"precise" | "precise_aware" => Ok(Self::Precise), "precise" | "precise_aware" => Ok(Self::Precise),
"min_pd" | "minpd" | "pd" => Ok(Self::MinPd), "min_pd" | "minpd" | "pd" => Ok(Self::MinPd),
"cache_load" | "cl" => Ok(Self::CacheLoad), "cache_load" | "cl" => Ok(Self::CacheLoad),
"cache_affinity" | "caff" | "ca" => Ok(Self::CacheAffinity),
"cache_affinity_weak_rend" | "caff_weak" => Ok(Self::CacheAffinityWeakRend),
"cache_affinity_strong_only" | "caff_strong" => Ok(Self::CacheAffinityStrongOnly),
"cache_score" | "cs" => Ok(Self::CacheScore), "cache_score" | "cs" => Ok(Self::CacheScore),
"cache_score_ttl" | "csttl" | "cs_ttl" => Ok(Self::CacheScoreTtl),
"estimated_ttft" | "ettft" | "optimal" => Ok(Self::EstimatedTtft), "estimated_ttft" | "ettft" | "optimal" => Ok(Self::EstimatedTtft),
"prefix_affinity" | "affinity" | "pa" => Ok(Self::PrefixAffinity), "prefix_affinity" | "affinity" | "pa" => Ok(Self::PrefixAffinity),
other => Err(anyhow::anyhow!("unknown router mode: {other}")), other => Err(anyhow::anyhow!("unknown router mode: {other}")),
@@ -299,7 +466,11 @@ impl RouterMode {
Self::Precise => "precise", Self::Precise => "precise",
Self::MinPd => "min_pd", Self::MinPd => "min_pd",
Self::CacheLoad => "cache_load", Self::CacheLoad => "cache_load",
Self::CacheAffinity => "cache_affinity",
Self::CacheAffinityWeakRend => "cache_affinity_weak_rend",
Self::CacheAffinityStrongOnly => "cache_affinity_strong_only",
Self::CacheScore => "cache_score", Self::CacheScore => "cache_score",
Self::CacheScoreTtl => "cache_score_ttl",
Self::EstimatedTtft => "estimated_ttft", Self::EstimatedTtft => "estimated_ttft",
Self::PrefixAffinity => "prefix_affinity", Self::PrefixAffinity => "prefix_affinity",
} }
@@ -353,6 +524,8 @@ impl Config {
struct RawConfig { struct RawConfig {
model: RawModelConfig, model: RawModelConfig,
hardware: RawHardwareConfig, hardware: RawHardwareConfig,
#[serde(default)]
calibration: CalibrationConfig,
cluster: ClusterConfig, cluster: ClusterConfig,
sim: SimConfig, sim: SimConfig,
} }
@@ -420,6 +593,8 @@ struct RawHardwareConfig {
#[serde(default)] #[serde(default)]
dram_bytes: Option<f64>, dram_bytes: Option<f64>,
#[serde(default)] #[serde(default)]
host_dram_bw: Option<f64>,
#[serde(default)]
pcie_bw: Option<f64>, pcie_bw: Option<f64>,
#[serde(default)] #[serde(default)]
pcie_latency_us: Option<f64>, pcie_latency_us: Option<f64>,
@@ -428,6 +603,12 @@ struct RawHardwareConfig {
#[serde(default)] #[serde(default)]
rdma_latency_us: Option<f64>, rdma_latency_us: Option<f64>,
#[serde(default)] #[serde(default)]
intra_node_tp_bw: Option<f64>,
#[serde(default)]
intra_node_tp_latency_us: Option<f64>,
#[serde(default)]
tp_degree: Option<u32>,
#[serde(default)]
max_batch_slots: Option<u32>, max_batch_slots: Option<u32>,
#[serde(default)] #[serde(default)]
prefill_chunk_tokens: Option<u32>, prefill_chunk_tokens: Option<u32>,
@@ -455,7 +636,13 @@ impl RawConfig {
} }
} }
Ok(Config { model, hardware, cluster: self.cluster, sim: self.sim }) Ok(Config {
model,
hardware,
calibration: self.calibration,
cluster: self.cluster,
sim: self.sim,
})
} }
} }
@@ -474,24 +661,60 @@ impl RawModelConfig {
}; };
// Overlay: explicit YAML fields override the base. // Overlay: explicit YAML fields override the base.
if let Some(v) = self.name { m.name = v; } if let Some(v) = self.name {
if let Some(v) = self.num_layers { m.num_layers = v; } m.name = v;
if let Some(v) = self.num_kv_heads { m.num_kv_heads = v; } }
if let Some(v) = self.head_dim { m.head_dim = v; } if let Some(v) = self.num_layers {
if let Some(v) = self.dtype_bytes { m.dtype_bytes = v; } m.num_layers = v;
if let Some(v) = self.block_size_tokens { m.block_size_tokens = v; } }
if let Some(v) = self.hidden_size { m.hidden_size = Some(v); } if let Some(v) = self.num_kv_heads {
if let Some(v) = self.num_attention_heads { m.num_attention_heads = Some(v); } m.num_kv_heads = v;
if let Some(v) = self.intermediate_size { m.intermediate_size = Some(v); } }
if self.moe.is_some() { m.moe = self.moe; } if let Some(v) = self.head_dim {
if self.mla.is_some() { m.mla = self.mla; } m.head_dim = v;
if self.attention.is_some() { m.attention = self.attention; } }
if let Some(v) = self.flops_per_token_prefill { m.flops_per_token_prefill = Some(v); } if let Some(v) = self.dtype_bytes {
if let Some(v) = self.attn_quadratic_coeff { m.attn_quadratic_coeff = Some(v); } m.dtype_bytes = v;
if let Some(v) = self.bytes_per_token_prefill { m.bytes_per_token_prefill = Some(v); } }
if self.compute_dtype.is_some() { m.compute_dtype = self.compute_dtype; } if let Some(v) = self.block_size_tokens {
if let Some(v) = self.flops_per_token_decode { m.flops_per_token_decode = Some(v); } m.block_size_tokens = v;
if let Some(v) = self.bytes_per_token_decode { m.bytes_per_token_decode = Some(v); } }
if let Some(v) = self.hidden_size {
m.hidden_size = Some(v);
}
if let Some(v) = self.num_attention_heads {
m.num_attention_heads = Some(v);
}
if let Some(v) = self.intermediate_size {
m.intermediate_size = Some(v);
}
if self.moe.is_some() {
m.moe = self.moe;
}
if self.mla.is_some() {
m.mla = self.mla;
}
if self.attention.is_some() {
m.attention = self.attention;
}
if let Some(v) = self.flops_per_token_prefill {
m.flops_per_token_prefill = Some(v);
}
if let Some(v) = self.attn_quadratic_coeff {
m.attn_quadratic_coeff = Some(v);
}
if let Some(v) = self.bytes_per_token_prefill {
m.bytes_per_token_prefill = Some(v);
}
if self.compute_dtype.is_some() {
m.compute_dtype = self.compute_dtype;
}
if let Some(v) = self.flops_per_token_decode {
m.flops_per_token_decode = Some(v);
}
if let Some(v) = self.bytes_per_token_decode {
m.bytes_per_token_decode = Some(v);
}
// Validate deployment-specific fields that HF config.json never provides. // Validate deployment-specific fields that HF config.json never provides.
anyhow::ensure!( anyhow::ensure!(
@@ -525,26 +748,62 @@ impl RawHardwareConfig {
gpu_mem_bw: 0.0, gpu_mem_bw: 0.0,
hbm_bytes: 0.0, hbm_bytes: 0.0,
dram_bytes: 0.0, dram_bytes: 0.0,
host_dram_bw: default_host_dram_bw(),
pcie_bw: 0.0, pcie_bw: 0.0,
pcie_latency_us: 5.0, pcie_latency_us: 5.0,
rdma_bw: 0.0, rdma_bw: 0.0,
rdma_latency_us: 8.0, rdma_latency_us: 8.0,
intra_node_tp_bw: default_intra_node_tp_bw(),
intra_node_tp_latency_us: default_intra_node_tp_latency_us(),
tp_degree: default_tp_degree(),
max_batch_slots: default_max_batch_slots(), max_batch_slots: default_max_batch_slots(),
prefill_chunk_tokens: default_prefill_chunk_tokens(), prefill_chunk_tokens: default_prefill_chunk_tokens(),
} }
}; };
// Overlay: explicit YAML fields override the preset / defaults. // Overlay: explicit YAML fields override the preset / defaults.
if let Some(v) = self.gpu_flops { hw.gpu_flops = v; } if let Some(v) = self.gpu_flops {
if let Some(v) = self.gpu_mem_bw { hw.gpu_mem_bw = v; } hw.gpu_flops = v;
if let Some(v) = self.hbm_bytes { hw.hbm_bytes = v; } }
if let Some(v) = self.dram_bytes { hw.dram_bytes = v; } if let Some(v) = self.gpu_mem_bw {
if let Some(v) = self.pcie_bw { hw.pcie_bw = v; } hw.gpu_mem_bw = v;
if let Some(v) = self.pcie_latency_us { hw.pcie_latency_us = v; } }
if let Some(v) = self.rdma_bw { hw.rdma_bw = v; } if let Some(v) = self.hbm_bytes {
if let Some(v) = self.rdma_latency_us { hw.rdma_latency_us = v; } hw.hbm_bytes = v;
if let Some(v) = self.max_batch_slots { hw.max_batch_slots = v; } }
if let Some(v) = self.prefill_chunk_tokens { hw.prefill_chunk_tokens = v; } if let Some(v) = self.dram_bytes {
hw.dram_bytes = v;
}
if let Some(v) = self.host_dram_bw {
hw.host_dram_bw = v;
}
if let Some(v) = self.pcie_bw {
hw.pcie_bw = v;
}
if let Some(v) = self.pcie_latency_us {
hw.pcie_latency_us = v;
}
if let Some(v) = self.rdma_bw {
hw.rdma_bw = v;
}
if let Some(v) = self.rdma_latency_us {
hw.rdma_latency_us = v;
}
if let Some(v) = self.intra_node_tp_bw {
hw.intra_node_tp_bw = v;
}
if let Some(v) = self.intra_node_tp_latency_us {
hw.intra_node_tp_latency_us = v;
}
if let Some(v) = self.tp_degree {
hw.tp_degree = v;
}
if let Some(v) = self.max_batch_slots {
hw.max_batch_slots = v;
}
if let Some(v) = self.prefill_chunk_tokens {
hw.prefill_chunk_tokens = v;
}
// Validate minimum requirements. // Validate minimum requirements.
anyhow::ensure!(hw.gpu_flops > 0.0, "hardware.gpu_flops is required"); anyhow::ensure!(hw.gpu_flops > 0.0, "hardware.gpu_flops is required");
@@ -554,3 +813,104 @@ impl RawHardwareConfig {
Ok(hw) Ok(hw)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
fn write_temp_config(yaml: &str) -> std::path::PathBuf {
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
let path = std::env::temp_dir().join(format!(
"kvcache_sim_config_test_{}_{}.yaml",
std::process::id(),
NEXT_ID.fetch_add(1, Ordering::Relaxed)
));
std::fs::write(&path, yaml).unwrap();
path
}
#[test]
fn config_defaults_ttft_calibration_fields() {
let path = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
num_instances: 2
meta_store:
ttl_seconds: 10.0
router:
mode: estimated_ttft
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&path).unwrap();
assert!(cfg.hardware.host_dram_bw > 0.0);
assert!(cfg.hardware.intra_node_tp_bw > 0.0);
assert!(cfg.calibration.matmul_util > 0.0);
assert!(cfg.calibration.attention_util > 0.0);
assert!(cfg.calibration.pcie_bw_util > 0.0);
}
#[test]
fn config_overrides_ttft_calibration_fields() {
let path = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
host_dram_bw: 9.9e11
intra_node_tp_bw: 7.7e11
intra_node_tp_latency_us: 2.5
calibration:
matmul_util: 0.31
attention_util: 0.27
rdma_bw_util: 0.63
scheduler_base_overhead_us: 123.0
cluster:
num_instances: 2
meta_store:
ttl_seconds: 10.0
router:
mode: estimated_ttft
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&path).unwrap();
assert_eq!(cfg.hardware.host_dram_bw, 9.9e11);
assert_eq!(cfg.hardware.intra_node_tp_bw, 7.7e11);
assert_eq!(cfg.hardware.intra_node_tp_latency_us, 2.5);
assert!((cfg.calibration.matmul_util - 0.31).abs() < 1e-12);
assert!((cfg.calibration.attention_util - 0.27).abs() < 1e-12);
assert!((cfg.calibration.rdma_bw_util - 0.63).abs() < 1e-12);
assert!((cfg.calibration.scheduler_base_overhead_us - 123.0).abs() < 1e-12);
}
}

View File

@@ -188,10 +188,14 @@ fn make_config(n: u32, base: &GpuBase) -> HardwareConfig {
gpu_mem_bw: base.mem_bw * f, gpu_mem_bw: base.mem_bw * f,
hbm_bytes: base.hbm * f, hbm_bytes: base.hbm * f,
dram_bytes: dram, dram_bytes: dram,
host_dram_bw: if n >= 8 { 9.0e11 } else { 5.0e11 },
pcie_bw: pcie_per_gpu * f, pcie_bw: pcie_per_gpu * f,
pcie_latency_us: pcie_lat, pcie_latency_us: pcie_lat,
rdma_bw: rdma_base * rdma_scale, rdma_bw: rdma_base * rdma_scale,
rdma_latency_us: rdma_lat, rdma_latency_us: rdma_lat,
intra_node_tp_bw: if base.pcie_gen >= 6 { 1.8e12 * f } else { 9.0e11 * f },
intra_node_tp_latency_us: if base.pcie_gen >= 6 { 1.0 } else { 2.0 },
tp_degree: n,
max_batch_slots: 256, max_batch_slots: 256,
prefill_chunk_tokens: if n >= 4 { 4096 } else { 2048 }, prefill_chunk_tokens: if n >= 4 { 4096 } else { 2048 },
} }

View File

@@ -7,7 +7,7 @@ use anyhow::{Context, Result};
use serde_json::Value; use serde_json::Value;
use std::path::Path; use std::path::Path;
use crate::config::{AttentionConfig, MlaConfig, MoeConfig, ModelConfig}; use crate::config::{AttentionConfig, MlaConfig, ModelConfig, MoeConfig};
/// Parse a HuggingFace config.json and return a partially-populated /// Parse a HuggingFace config.json and return a partially-populated
/// [`ModelConfig`]. The caller must still set `dtype_bytes` and /// [`ModelConfig`]. The caller must still set `dtype_bytes` and
@@ -34,8 +34,7 @@ fn parse_value(v: &Value) -> Result<ModelConfig> {
let num_layers = u32_field(v, "num_hidden_layers"); let num_layers = u32_field(v, "num_hidden_layers");
let hidden_size = u32_field(v, "hidden_size"); let hidden_size = u32_field(v, "hidden_size");
let num_attention_heads = u32_field(v, "num_attention_heads"); let num_attention_heads = u32_field(v, "num_attention_heads");
let num_kv_heads = u32_field(v, "num_key_value_heads") let num_kv_heads = u32_field(v, "num_key_value_heads").or(num_attention_heads); // default to MHA
.or(num_attention_heads); // default to MHA
let head_dim = u32_field(v, "head_dim").or_else(|| { let head_dim = u32_field(v, "head_dim").or_else(|| {
// Infer: hidden_size / num_attention_heads // Infer: hidden_size / num_attention_heads
match (hidden_size, num_attention_heads) { match (hidden_size, num_attention_heads) {
@@ -70,8 +69,7 @@ fn parse_value(v: &Value) -> Result<ModelConfig> {
}); });
// --- Attention pattern --- // --- Attention pattern ---
let attention = let attention = if let Some(first_dense) = u32_field(v, "first_k_dense_replace") {
if let Some(first_dense) = u32_field(v, "first_k_dense_replace") {
// DSA-style model (GLM-5, DeepSeek-V3). // DSA-style model (GLM-5, DeepSeek-V3).
// dense_window and sparse_stride are typically not in config.json; // dense_window and sparse_stride are typically not in config.json;
// use sensible defaults the user can override in YAML. // use sensible defaults the user can override in YAML.
@@ -188,6 +186,12 @@ mod tests {
assert_eq!(moe.num_active_experts, 8); assert_eq!(moe.num_active_experts, 8);
let mla = m.mla.as_ref().unwrap(); let mla = m.mla.as_ref().unwrap();
assert_eq!(mla.kv_lora_rank, 512); assert_eq!(mla.kv_lora_rank, 512);
assert!(matches!(m.attention, Some(AttentionConfig::Dsa { first_dense_layers: 3, .. }))); assert!(matches!(
m.attention,
Some(AttentionConfig::Dsa {
first_dense_layers: 3,
..
})
));
} }
} }

View File

@@ -22,7 +22,7 @@
//! `effective_ctx(N)` equals `N` for dense attention (→ O(N²) total) but //! `effective_ctx(N)` equals `N` for dense attention (→ O(N²) total) but
//! is sub-linear for DSA / sliding-window. //! is sub-linear for DSA / sliding-window.
use crate::config::{AttentionConfig, HardwareConfig, ModelConfig}; use crate::config::{AttentionConfig, CalibrationConfig, HardwareConfig, ModelConfig};
/// Resolved attention pattern used at runtime. /// Resolved attention pattern used at runtime.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -33,7 +33,10 @@ pub enum AttentionPattern {
SlidingWindow { window: f64 }, SlidingWindow { window: f64 },
/// DeepSeek Sparse Attention: effective_ctx = min(N, dense_window) + /// DeepSeek Sparse Attention: effective_ctx = min(N, dense_window) +
/// max(0, N - dense_window) / sparse_stride. /// max(0, N - dense_window) / sparse_stride.
Dsa { dense_window: f64, sparse_stride: f64 }, Dsa {
dense_window: f64,
sparse_stride: f64,
},
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -52,24 +55,46 @@ pub struct ComputeModel {
pub attn_pattern: AttentionPattern, pub attn_pattern: AttentionPattern,
/// Weight bytes read from HBM per layer (for memory-bound check). /// Weight bytes read from HBM per layer (for memory-bound check).
pub weight_bytes_per_layer: f64, pub weight_bytes_per_layer: f64,
/// Approximate bytes moved by each TP collective, per token per layer.
pub tp_bytes_per_token: f64,
/// Number of TP collectives per layer on the critical path.
pub tp_collective_count_per_layer: f64,
/// Peak GPU FLOPs (aggregate across TP group). /// Peak GPU FLOPs (aggregate across TP group).
pub gpu_flops: f64, pub gpu_flops: f64,
/// Peak GPU memory bandwidth (aggregate across TP group). /// Peak GPU memory bandwidth (aggregate across TP group).
pub gpu_mem_bw: f64, pub gpu_mem_bw: f64,
/// Peak node-local TP bandwidth.
pub intra_node_tp_bw: f64,
/// Fixed latency per TP collective.
pub intra_node_tp_latency_s: f64,
/// Effective utilization for GEMM-like linear kernels.
pub matmul_util: f64,
/// Effective utilization for attention kernels.
pub attention_util: f64,
/// Effective utilization for HBM streaming.
pub hbm_bw_util: f64,
/// Effective utilization for TP bandwidth.
pub tp_bw_util: f64,
/// Fraction of TP communication that can overlap with compute.
pub tp_overlap_ratio: f64,
/// Fixed per-layer non-FLOP overhead.
pub misc_layer_overhead_s: f64,
/// Fixed launch overhead per prefill chunk.
pub chunk_launch_overhead_s: f64,
} }
impl ComputeModel { impl ComputeModel {
pub fn new(model: &ModelConfig, hw: &HardwareConfig) -> Self { pub fn new(model: &ModelConfig, hw: &HardwareConfig, calib: &CalibrationConfig) -> Self {
if model.is_arch_mode() { if model.is_arch_mode() {
Self::from_arch(model, hw) Self::from_arch(model, hw, calib)
} else { } else {
Self::from_manual(model, hw) Self::from_manual(model, hw, calib)
} }
} }
// ----- Architecture-derived construction -------------------------------- // ----- Architecture-derived construction --------------------------------
fn from_arch(model: &ModelConfig, hw: &HardwareConfig) -> Self { fn from_arch(model: &ModelConfig, hw: &HardwareConfig, calib: &CalibrationConfig) -> Self {
let h = model.hidden_size.unwrap() as f64; let h = model.hidden_size.unwrap() as f64;
let n_heads = model.num_attention_heads.unwrap_or(model.num_kv_heads) as f64; let n_heads = model.num_attention_heads.unwrap_or(model.num_kv_heads) as f64;
let n_kv = model.num_kv_heads as f64; let n_kv = model.num_kv_heads as f64;
@@ -101,7 +126,8 @@ impl ComputeModel {
// --- MLP FLOPs/token/layer (SwiGLU: gate + up + down = 3 matmuls) --- // --- MLP FLOPs/token/layer (SwiGLU: gate + up + down = 3 matmuls) ---
let mlp = if let Some(moe) = &model.moe { let mlp = if let Some(moe) = &model.moe {
let expert_inter = moe.expert_intermediate_size let expert_inter =
moe.expert_intermediate_size
.unwrap_or(model.intermediate_size.unwrap_or(0)) as f64; .unwrap_or(model.intermediate_size.unwrap_or(0)) as f64;
let active = moe.num_active_experts as f64; let active = moe.num_active_experts as f64;
let shared = moe.num_shared_experts as f64; let shared = moe.num_shared_experts as f64;
@@ -111,6 +137,11 @@ impl ComputeModel {
}; };
let linear_flops = attn_linear + mlp; let linear_flops = attn_linear + mlp;
let tp_bytes_per_token = if hw.tp_degree > 1 {
h * model.dtype_bytes as f64
} else {
0.0
};
// --- Attention quadratic coefficient --- // --- Attention quadratic coefficient ---
// attn_flops_per_layer(N) = attn_coeff * N * effective_ctx(N) // attn_flops_per_layer(N) = attn_coeff * N * effective_ctx(N)
@@ -132,15 +163,13 @@ impl ComputeModel {
let qk_hd = (mla.qk_nope_head_dim + mla.qk_rope_head_dim) as f64; let qk_hd = (mla.qk_nope_head_dim + mla.qk_rope_head_dim) as f64;
let qk_rd = mla.qk_rope_head_dim as f64; let qk_rd = mla.qk_rope_head_dim as f64;
let vhd = mla.v_head_dim as f64; let vhd = mla.v_head_dim as f64;
(h * qlr + qlr * n_heads * qk_hd (h * qlr + qlr * n_heads * qk_hd + h * (kvlr + qk_rd) + n_heads * vhd * h) * wdtype
+ h * (kvlr + qk_rd)
+ n_heads * vhd * h)
* wdtype
} else { } else {
((n_heads + 2.0 * n_kv) * hd * h + n_heads * hd * h) * wdtype ((n_heads + 2.0 * n_kv) * hd * h + n_heads * hd * h) * wdtype
}; };
let mlp_wt = if let Some(moe) = &model.moe { let mlp_wt = if let Some(moe) = &model.moe {
let expert_inter = moe.expert_intermediate_size let expert_inter =
moe.expert_intermediate_size
.unwrap_or(model.intermediate_size.unwrap_or(0)) as f64; .unwrap_or(model.intermediate_size.unwrap_or(0)) as f64;
let active = moe.num_active_experts as f64; let active = moe.num_active_experts as f64;
let shared = moe.num_shared_experts as f64; let shared = moe.num_shared_experts as f64;
@@ -169,10 +198,9 @@ impl ComputeModel {
}, },
0.0, 0.0,
), ),
Some(AttentionConfig::Dense) | None => ( Some(AttentionConfig::Dense) | None => {
AttentionPattern::Dense, (AttentionPattern::Dense, model.num_layers as f64)
model.num_layers as f64, }
),
}; };
Self { Self {
@@ -182,14 +210,29 @@ impl ComputeModel {
attn_coeff, attn_coeff,
attn_pattern, attn_pattern,
weight_bytes_per_layer: weight_bytes, weight_bytes_per_layer: weight_bytes,
tp_bytes_per_token,
tp_collective_count_per_layer: if hw.tp_degree > 1 {
calib.tp_collective_count_per_layer
} else {
0.0
},
gpu_flops: hw.gpu_flops, gpu_flops: hw.gpu_flops,
gpu_mem_bw: hw.gpu_mem_bw, gpu_mem_bw: hw.gpu_mem_bw,
intra_node_tp_bw: hw.intra_node_tp_bw,
intra_node_tp_latency_s: hw.intra_node_tp_latency_us * 1e-6,
matmul_util: calib.matmul_util,
attention_util: calib.attention_util,
hbm_bw_util: calib.hbm_bw_util,
tp_bw_util: calib.tp_bw_util,
tp_overlap_ratio: calib.tp_overlap_ratio,
misc_layer_overhead_s: calib.misc_layer_overhead_us * 1e-6,
chunk_launch_overhead_s: calib.chunk_launch_overhead_us * 1e-6,
} }
} }
// ----- Legacy manual construction --------------------------------------- // ----- Legacy manual construction ---------------------------------------
fn from_manual(model: &ModelConfig, hw: &HardwareConfig) -> Self { fn from_manual(model: &ModelConfig, hw: &HardwareConfig, calib: &CalibrationConfig) -> Self {
Self { Self {
num_layers: model.num_layers as f64, num_layers: model.num_layers as f64,
first_dense_layers: model.num_layers as f64, first_dense_layers: model.num_layers as f64,
@@ -197,8 +240,19 @@ impl ComputeModel {
attn_coeff: model.attn_quadratic_coeff.unwrap_or(0.0), attn_coeff: model.attn_quadratic_coeff.unwrap_or(0.0),
attn_pattern: AttentionPattern::Dense, attn_pattern: AttentionPattern::Dense,
weight_bytes_per_layer: 0.0, weight_bytes_per_layer: 0.0,
tp_bytes_per_token: 0.0,
tp_collective_count_per_layer: 0.0,
gpu_flops: hw.gpu_flops, gpu_flops: hw.gpu_flops,
gpu_mem_bw: hw.gpu_mem_bw, gpu_mem_bw: hw.gpu_mem_bw,
intra_node_tp_bw: hw.intra_node_tp_bw,
intra_node_tp_latency_s: hw.intra_node_tp_latency_us * 1e-6,
matmul_util: calib.matmul_util,
attention_util: calib.attention_util,
hbm_bw_util: calib.hbm_bw_util,
tp_bw_util: calib.tp_bw_util,
tp_overlap_ratio: calib.tp_overlap_ratio,
misc_layer_overhead_s: calib.misc_layer_overhead_us * 1e-6,
chunk_launch_overhead_s: calib.chunk_launch_overhead_us * 1e-6,
} }
} }
@@ -231,30 +285,47 @@ impl ComputeModel {
return 0.0; return 0.0;
} }
let n = n as f64; let n = n as f64;
let linear = n * self.linear_flops_per_token; let linear_flops = n * self.linear_flops_per_token;
// Compute FLOPs across all layers (dense + sparse may differ). // Compute FLOPs across all layers (dense + sparse may differ).
let dense_layers = self.first_dense_layers; let dense_layers = self.first_dense_layers;
let sparse_layers = self.num_layers - dense_layers; let sparse_layers = self.num_layers - dense_layers;
let dense_flops = dense_layers let linear_total_flops = self.num_layers * linear_flops;
* (linear + self.attn_coeff * n * self.effective_ctx(n, true)); let dense_attn_flops = dense_layers * (self.attn_coeff * n * self.effective_ctx(n, true));
let sparse_flops = sparse_layers let sparse_attn_flops =
* (linear + self.attn_coeff * n * self.effective_ctx(n, false)); sparse_layers * (self.attn_coeff * n * self.effective_ctx(n, false));
let total_flops = dense_flops + sparse_flops; let attn_total_flops = dense_attn_flops + sparse_attn_flops;
let compute_time = total_flops / self.gpu_flops; let linear_time = linear_total_flops / (self.gpu_flops * self.matmul_util.max(1e-6));
let attn_time = attn_total_flops / (self.gpu_flops * self.attention_util.max(1e-6));
let compute_time = linear_time + attn_time + self.num_layers * self.misc_layer_overhead_s;
// Weight stream: all layers' active weights read once from HBM. // Weight stream: all layers' active weights read once from HBM.
let mem_time = self.weight_bytes_per_layer * self.num_layers / self.gpu_mem_bw; let mem_time =
self.weight_bytes_per_layer * self.num_layers / (self.gpu_mem_bw * self.hbm_bw_util.max(1e-6));
let tp_comm_time = if self.tp_collective_count_per_layer > 0.0
&& self.tp_bytes_per_token > 0.0
&& self.intra_node_tp_bw > 0.0
{
self.num_layers
* (self.tp_collective_count_per_layer * self.intra_node_tp_latency_s
+ self.tp_collective_count_per_layer * self.tp_bytes_per_token * n
/ (self.intra_node_tp_bw * self.tp_bw_util.max(1e-6)))
} else {
0.0
};
let tp_tail = (tp_comm_time - self.tp_overlap_ratio * (linear_time + attn_time)).max(0.0);
compute_time.max(mem_time) self.chunk_launch_overhead_s + compute_time.max(mem_time) + tp_tail
} }
/// Print human-readable derived coefficients (for `validate` output). /// Print human-readable derived coefficients (for `validate` output).
pub fn describe(&self) -> String { pub fn describe(&self) -> String {
let pattern_str = match &self.attn_pattern { let pattern_str = match &self.attn_pattern {
AttentionPattern::Dense => "dense".to_string(), AttentionPattern::Dense => "dense".to_string(),
AttentionPattern::SlidingWindow { window } => format!("sliding_window({})", *window as u64), AttentionPattern::SlidingWindow { window } => {
format!("sliding_window({})", *window as u64)
}
AttentionPattern::Dsa { AttentionPattern::Dsa {
dense_window, dense_window,
sparse_stride, sparse_stride,
@@ -266,8 +337,7 @@ impl ComputeModel {
format!( format!(
"linear_flops/tok/layer={:.3e}, attn_coeff={:.0}, pattern={}, \ "linear_flops/tok/layer={:.3e}, attn_coeff={:.0}, pattern={}, \
weight_bytes/layer={:.2e}", weight_bytes/layer={:.2e}",
self.linear_flops_per_token, self.attn_coeff, pattern_str, self.linear_flops_per_token, self.attn_coeff, pattern_str, self.weight_bytes_per_layer,
self.weight_bytes_per_layer,
) )
} }
} }
@@ -275,6 +345,7 @@ impl ComputeModel {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::config::CalibrationConfig;
fn cm_legacy() -> ComputeModel { fn cm_legacy() -> ComputeModel {
ComputeModel { ComputeModel {
@@ -284,8 +355,19 @@ mod tests {
attn_coeff: 1024.0, attn_coeff: 1024.0,
attn_pattern: AttentionPattern::Dense, attn_pattern: AttentionPattern::Dense,
weight_bytes_per_layer: 0.0, weight_bytes_per_layer: 0.0,
tp_bytes_per_token: 0.0,
tp_collective_count_per_layer: 0.0,
gpu_flops: 9.89e14, gpu_flops: 9.89e14,
gpu_mem_bw: 3.35e12, gpu_mem_bw: 3.35e12,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_s: 2.0e-6,
matmul_util: 1.0,
attention_util: 1.0,
hbm_bw_util: 1.0,
tp_bw_util: 1.0,
tp_overlap_ratio: 1.0,
misc_layer_overhead_s: 0.0,
chunk_launch_overhead_s: 0.0,
} }
} }
@@ -325,8 +407,19 @@ mod tests {
attn_coeff: 139264.0, attn_coeff: 139264.0,
attn_pattern: AttentionPattern::Dense, attn_pattern: AttentionPattern::Dense,
weight_bytes_per_layer: 0.0, weight_bytes_per_layer: 0.0,
tp_bytes_per_token: 0.0,
tp_collective_count_per_layer: 0.0,
gpu_flops: 1.8e16, gpu_flops: 1.8e16,
gpu_mem_bw: 6.4e13, gpu_mem_bw: 6.4e13,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_s: 2.0e-6,
matmul_util: 1.0,
attention_util: 1.0,
hbm_bw_util: 1.0,
tp_bw_util: 1.0,
tp_overlap_ratio: 1.0,
misc_layer_overhead_s: 0.0,
chunk_launch_overhead_s: 0.0,
}; };
let dsa = ComputeModel { let dsa = ComputeModel {
attn_pattern: AttentionPattern::Dsa { attn_pattern: AttentionPattern::Dsa {
@@ -358,8 +451,19 @@ mod tests {
attn_coeff: 1.0, attn_coeff: 1.0,
attn_pattern: AttentionPattern::Dense, attn_pattern: AttentionPattern::Dense,
weight_bytes_per_layer: 1.0e12, // 1 TB per layer weight_bytes_per_layer: 1.0e12, // 1 TB per layer
tp_bytes_per_token: 0.0,
tp_collective_count_per_layer: 0.0,
gpu_flops: 1.0e15, gpu_flops: 1.0e15,
gpu_mem_bw: 1.0e12, gpu_mem_bw: 1.0e12,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_s: 2.0e-6,
matmul_util: 1.0,
attention_util: 1.0,
hbm_bw_util: 1.0,
tp_bw_util: 1.0,
tp_overlap_ratio: 1.0,
misc_layer_overhead_s: 0.0,
chunk_launch_overhead_s: 0.0,
}; };
let t1 = m.prefill_time(1); let t1 = m.prefill_time(1);
let t8 = m.prefill_time(8); let t8 = m.prefill_time(8);
@@ -391,18 +495,122 @@ mod tests {
gpu_mem_bw: 1e12, gpu_mem_bw: 1e12,
hbm_bytes: 1e9, hbm_bytes: 1e9,
dram_bytes: 4e9, dram_bytes: 4e9,
host_dram_bw: 5.0e11,
pcie_bw: 32e9, pcie_bw: 32e9,
pcie_latency_us: 1.0, pcie_latency_us: 1.0,
rdma_bw: 12e9, rdma_bw: 12e9,
rdma_latency_us: 5.0, rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32, max_batch_slots: 32,
prefill_chunk_tokens: 1024, prefill_chunk_tokens: 1024,
}; };
let cm = ComputeModel::new(&model, &hw); let cm = ComputeModel::new(&model, &hw, &CalibrationConfig::default());
assert!(cm.linear_flops_per_token > 0.0); assert!(cm.linear_flops_per_token > 0.0);
assert!(cm.attn_coeff > 0.0); assert!(cm.attn_coeff > 0.0);
assert!(cm.weight_bytes_per_layer > 0.0); assert!(cm.weight_bytes_per_layer > 0.0);
let t = cm.prefill_time(1024); let t = cm.prefill_time(1024);
assert!(t > 0.0); assert!(t > 0.0);
} }
#[test]
fn lower_utilization_increases_prefill_time() {
let model = ModelConfig {
name: "test".into(),
num_layers: 8,
num_kv_heads: 4,
head_dim: 128,
dtype_bytes: 2,
block_size_tokens: 16,
hidden_size: Some(1024),
num_attention_heads: Some(8),
intermediate_size: Some(4096),
..Default::default()
};
let hw = HardwareConfig {
gpu_flops: 1e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1e12,
hbm_bytes: 1e9,
dram_bytes: 4e9,
host_dram_bw: 5.0e11,
pcie_bw: 32e9,
pcie_latency_us: 1.0,
rdma_bw: 12e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
};
let fast = ComputeModel::new(&model, &hw, &CalibrationConfig::default());
let slow = ComputeModel::new(
&model,
&hw,
&CalibrationConfig {
matmul_util: 0.2,
attention_util: 0.15,
..CalibrationConfig::default()
},
);
assert!(slow.prefill_time(4096) > fast.prefill_time(4096));
}
#[test]
fn tp_communication_adds_tail_when_overlap_is_limited() {
let model = ModelConfig {
name: "test".into(),
num_layers: 8,
num_kv_heads: 4,
head_dim: 128,
dtype_bytes: 2,
block_size_tokens: 16,
hidden_size: Some(2048),
num_attention_heads: Some(16),
intermediate_size: Some(8192),
..Default::default()
};
let hw = HardwareConfig {
gpu_flops: 1e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1e12,
hbm_bytes: 1e9,
dram_bytes: 4e9,
host_dram_bw: 5.0e11,
pcie_bw: 32e9,
pcie_latency_us: 1.0,
rdma_bw: 12e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 1.0e10,
intra_node_tp_latency_us: 20.0,
tp_degree: 8,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
};
let no_tp = ComputeModel::new(
&model,
&hw,
&CalibrationConfig {
tp_overlap_ratio: 1.0,
tp_bw_util: 1.0,
..CalibrationConfig::default()
},
);
let tp_tail = ComputeModel::new(
&model,
&hw,
&CalibrationConfig {
tp_overlap_ratio: 0.0,
tp_bw_util: 0.2,
..CalibrationConfig::default()
},
);
assert!(tp_tail.prefill_time(2048) > no_tp.prefill_time(2048));
}
} }

View File

@@ -19,7 +19,7 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use crate::config::{HardwareConfig, ModelConfig}; use crate::config::{CalibrationConfig, HardwareConfig, ModelConfig};
use crate::instance::compute::ComputeModel; use crate::instance::compute::ComputeModel;
use crate::instance::kv_cache::TwoTierCache; use crate::instance::kv_cache::TwoTierCache;
use crate::network::InstanceLinks; use crate::network::InstanceLinks;
@@ -37,6 +37,8 @@ pub struct AdmittedRequest {
/// KV blocks reserved on this instance's HBM for the lifetime of this /// KV blocks reserved on this instance's HBM for the lifetime of this
/// request's prefill (= number of input blocks). /// request's prefill (= number of input blocks).
pub reserved_blocks: u32, pub reserved_blocks: u32,
/// Tail latency between prefill completion and first-token visibility.
pub completion_tail_s: f64,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -68,15 +70,20 @@ pub struct Instance {
} }
impl Instance { impl Instance {
pub fn new(id: InstanceId, model: &ModelConfig, hw: &HardwareConfig) -> Self { pub fn new(
id: InstanceId,
model: &ModelConfig,
hw: &HardwareConfig,
calib: &CalibrationConfig,
) -> Self {
let block_bytes = model.kv_block_bytes() as f64; let block_bytes = model.kv_block_bytes() as f64;
let hbm_blocks = (hw.hbm_bytes / block_bytes).max(1.0) as u32; let hbm_blocks = (hw.hbm_bytes / block_bytes).max(1.0) as u32;
let dram_blocks = (hw.dram_bytes / block_bytes).max(1.0) as u32; let dram_blocks = (hw.dram_bytes / block_bytes).max(1.0) as u32;
Self { Self {
id, id,
cache: TwoTierCache::new(hbm_blocks as usize, dram_blocks as usize), cache: TwoTierCache::new(hbm_blocks as usize, dram_blocks as usize),
links: InstanceLinks::from_hw(hw), links: InstanceLinks::from_hw(hw, calib),
compute: ComputeModel::new(model, hw), compute: ComputeModel::new(model, hw, calib),
block_size_tokens: model.block_size_tokens, block_size_tokens: model.block_size_tokens,
hbm_block_budget: hbm_blocks, hbm_block_budget: hbm_blocks,
dram_block_budget: dram_blocks, dram_block_budget: dram_blocks,
@@ -141,9 +148,10 @@ impl Instance {
self.kv_blocks_used += r.reserved_blocks; self.kv_blocks_used += r.reserved_blocks;
if r.prefill_tokens_remaining == 0 { if r.prefill_tokens_remaining == 0 {
// Full cache hit: nothing to compute. TTFT == fetch time. // Full cache hit: nothing to compute. TTFT == fetch time.
let ttft = now - r.arrival; let t_done = now + r.completion_tail_s;
let ttft = t_done - r.arrival;
self.kv_blocks_used = self.kv_blocks_used.saturating_sub(r.reserved_blocks); self.kv_blocks_used = self.kv_blocks_used.saturating_sub(r.reserved_blocks);
completed.push((r.req_id, ttft, now)); completed.push((r.req_id, ttft, t_done));
} else { } else {
self.prefilling.push_back(r); self.prefilling.push_back(r);
} }
@@ -171,9 +179,10 @@ impl Instance {
head.prefill_tokens_remaining -= chunk_tokens; head.prefill_tokens_remaining -= chunk_tokens;
if head.prefill_tokens_remaining == 0 { if head.prefill_tokens_remaining == 0 {
let done = self.prefilling.pop_front().unwrap(); let done = self.prefilling.pop_front().unwrap();
let ttft = t_end - done.arrival; let t_done = t_end + done.completion_tail_s;
let ttft = t_done - done.arrival;
self.kv_blocks_used = self.kv_blocks_used.saturating_sub(done.reserved_blocks); self.kv_blocks_used = self.kv_blocks_used.saturating_sub(done.reserved_blocks);
completed.push((done.req_id, ttft, t_end)); completed.push((done.req_id, ttft, t_done));
} }
StepResult { StepResult {

View File

@@ -1,6 +1,6 @@
pub mod compute; pub mod compute;
pub mod kv_cache;
#[allow(clippy::module_inception)] #[allow(clippy::module_inception)]
pub mod instance; pub mod instance;
pub mod kv_cache;
pub use instance::Instance; pub use instance::Instance;

View File

@@ -10,5 +10,6 @@ pub mod oracle;
pub mod replay; pub mod replay;
pub mod router; pub mod router;
pub mod sim; pub mod sim;
pub mod ttft;
pub mod trace; pub mod trace;
pub mod types; pub mod types;

View File

@@ -219,7 +219,7 @@ fn cmd_validate(path: &PathBuf, overrides: &ConfigOverrides) -> Result<()> {
"legacy manual" "legacy manual"
} }
); );
let cm = ComputeModel::new(&cfg.model, &cfg.hardware); let cm = ComputeModel::new(&cfg.model, &cfg.hardware, &cfg.calibration);
eprintln!("compute: {}", cm.describe()); eprintln!("compute: {}", cm.describe());
eprintln!( eprintln!(
"kv_block_bytes = {} ({:.2} MB{})", "kv_block_bytes = {} ({:.2} MB{})",

View File

@@ -12,7 +12,9 @@ pub struct RoutingLogWriter {
impl RoutingLogWriter { impl RoutingLogWriter {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> { pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
let f = File::create(path)?; let f = File::create(path)?;
Ok(Self { inner: BufWriter::new(f) }) Ok(Self {
inner: BufWriter::new(f),
})
} }
pub fn write(&mut self, decision: &RouteDecision) -> Result<()> { pub fn write(&mut self, decision: &RouteDecision) -> Result<()> {

View File

@@ -19,7 +19,9 @@ pub struct TimeseriesWriter {
impl TimeseriesWriter { impl TimeseriesWriter {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> { pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
let f = std::fs::File::create(path)?; let f = std::fs::File::create(path)?;
Ok(Self { inner: csv::Writer::from_writer(f) }) Ok(Self {
inner: csv::Writer::from_writer(f),
})
} }
pub fn write(&mut self, row: &TimeseriesRow) -> Result<()> { pub fn write(&mut self, row: &TimeseriesRow) -> Result<()> {

View File

@@ -5,7 +5,7 @@
//! by `bytes / bw`. Latency is added on top of transfer time. This captures //! by `bytes / bw`. Latency is added on top of transfer time. This captures
//! contention without simulating individual packets. //! contention without simulating individual packets.
use crate::config::HardwareConfig; use crate::config::{CalibrationConfig, HardwareConfig};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct LinkModel { pub struct LinkModel {
@@ -53,10 +53,10 @@ pub struct InstanceLinks {
} }
impl InstanceLinks { impl InstanceLinks {
pub fn from_hw(hw: &HardwareConfig) -> Self { pub fn from_hw(hw: &HardwareConfig, calib: &CalibrationConfig) -> Self {
Self { Self {
pcie: LinkModel::new(hw.pcie_bw, hw.pcie_latency_us * 1e-6), pcie: LinkModel::new(hw.pcie_bw * calib.pcie_bw_util, hw.pcie_latency_us * 1e-6),
rdma: LinkModel::new(hw.rdma_bw, hw.rdma_latency_us * 1e-6), rdma: LinkModel::new(hw.rdma_bw * calib.rdma_bw_util, hw.rdma_latency_us * 1e-6),
} }
} }
} }

View File

@@ -51,7 +51,11 @@ impl TierResult {
capacity_blocks, capacity_blocks,
hits, hits,
misses, misses,
hit_rate: if total == 0 { 0.0 } else { hits as f64 / total as f64 }, hit_rate: if total == 0 {
0.0
} else {
hits as f64 / total as f64
},
} }
} }
} }
@@ -68,12 +72,7 @@ pub fn analyze(records: &[RequestRecord], capacity_blocks: u64) -> OracleResult
// 1. Unlimited cache // 1. Unlimited cache
let unlimited_hits = run_unlimited(records); let unlimited_hits = run_unlimited(records);
let unlimited = TierResult::from_counts( let unlimited = TierResult::from_counts("unlimited", u64::MAX, unlimited_hits, total_blocks);
"unlimited",
u64::MAX,
unlimited_hits,
total_blocks,
);
// 2. Precompute next-use index for Belady // 2. Precompute next-use index for Belady
let next_use = build_next_use(records); let next_use = build_next_use(records);

View File

@@ -0,0 +1,217 @@
//! Cache-affinity routing tuned for coding-agent workloads.
//!
//! Motivation — the coding trace has three dominant patterns:
//!
//! 1. **Short system-prompt-only requests** (≤10 blocks): novel per-chat but
//! sharing a small set of system prompts across millions of invocations.
//! 2. **Long multi-turn chains**: parent→child prefixes share ~60+ blocks
//! and grow by ~6 blocks per turn. Sticking the chain to one instance
//! maximises L0 hits for every subsequent turn.
//! 3. **Completely novel one-shots**: no existing cache anywhere; should be
//! placed to maximise *future* reuse, not just minimise current load.
//!
//! `cache_score` minimises `α·queue_len + β·miss_blocks`. With the shipping
//! defaults (α=1, β=0.1) a single extra queue position is worth ten extra
//! miss blocks, so short novel requests — the bulk of traffic — reduce to
//! pure least-loaded routing and scatter the same system prompt across
//! dozens of instances. Each scattered copy burns HBM that could have held a
//! different hot prefix, depressing the cluster-wide L0 hit-rate.
//!
//! `cache_affinity` fixes this with two changes:
//!
//! * **Strong cache weight** — cost is `α·queue_len γ·l0_hit`, with
//! γα·input_blocks, so any real L0 hit beats load-balancing. A soft
//! bonus (`δ·meta_only_hit`) still rewards instances that have the prefix
//! in L1/DRAM even when L0 is empty.
//!
//! * **Deterministic rendezvous tiebreak** — among instances that tie on
//! `(cost, hit, queue)`, we rank by `rendezvous(fingerprint, instance_id)`
//! where `fingerprint` is an FNV hash of the first few block hashes. This
//! turns cold routing from "first-found" (which piles on instance 0 until
//! it fills, then spills sequentially) into a consistent hash that maps
//! each distinct prefix to the *same* small set of homes. Repeat traffic
//! for that prefix therefore concentrates on its home, building a strong
//! L0 working set.
//!
//! Overload protection: if the rendezvous-chosen home already has
//! `queue_len > overload_threshold`, the load term dominates and the router
//! naturally spills to the next-best instance.
use crate::cluster::meta_store::MetaStore;
use crate::instance::Instance;
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord;
pub struct CacheAffinityRouter {
/// Router display / trace name.
name: &'static str,
/// Weight on queue length (per queued request).
load_alpha: f64,
/// Reward per L0-hit block (real, locally cached).
l0_gamma: f64,
/// Reward per block present via meta-store but not in L0 (L1 / remote).
meta_delta: f64,
/// Number of leading block hashes folded into the prefix fingerprint.
fingerprint_k: usize,
/// Whether to break ties by rendezvous hash (sticky consistent placement)
/// or by first-found order (matches cache_score behaviour).
use_rendezvous: bool,
}
impl CacheAffinityRouter {
pub fn new(load_alpha: f64) -> Self {
Self {
name: "cache_affinity",
load_alpha,
l0_gamma: 1.0,
meta_delta: 0.25,
fingerprint_k: 4,
use_rendezvous: true,
}
}
/// Ablation: cache_score-style weights (γ=0.1, δ=0) but keep rendezvous
/// tiebreak. Isolates the contribution of deterministic sticky placement.
pub fn weak_with_rendezvous(load_alpha: f64) -> Self {
Self {
name: "cache_affinity_weak_rend",
load_alpha,
l0_gamma: 0.1,
meta_delta: 0.0,
fingerprint_k: 4,
use_rendezvous: true,
}
}
/// Ablation: strong cache weights (γ=1.0, δ=0.25) but first-found tiebreak
/// instead of rendezvous. Isolates the contribution of reweighting alone.
pub fn strong_no_rendezvous(load_alpha: f64) -> Self {
Self {
name: "cache_affinity_strong_only",
load_alpha,
l0_gamma: 1.0,
meta_delta: 0.25,
fingerprint_k: 4,
use_rendezvous: false,
}
}
/// FNV-1a over the first `k` block hashes — identifies the prefix family
/// (system-prompt + early agent context) that drives cache reuse.
fn fingerprint(hash_ids: &[u64], k: usize) -> u64 {
let n = hash_ids.len().min(k).max(1);
let take = hash_ids.len().min(n);
let mut fp: u64 = 0xcbf29ce484222325;
for &h in &hash_ids[..take] {
fp ^= h;
fp = fp.wrapping_mul(0x100000001b3);
}
if take == 0 {
// Empty request: still want a deterministic fingerprint (0).
fp ^= 0;
}
fp
}
/// Splitmix64-style rendezvous score for (fingerprint, instance_id).
/// Uniform over u64; higher = preferred home.
fn rendezvous(fp: u64, instance_id: u32) -> u64 {
let mut h = fp ^ (instance_id as u64).wrapping_mul(0x9e3779b97f4a7c15);
h = h.wrapping_add(0x9e3779b97f4a7c15);
h = (h ^ (h >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb);
h ^ (h >> 31)
}
}
impl Router for CacheAffinityRouter {
fn name(&self) -> &'static str {
self.name
}
fn route(
&mut self,
req: &RequestRecord,
instances: &[Instance],
meta: &MetaStore,
now: f64,
) -> RouteDecision {
let n = instances.len();
let l0 = local_l0_scores(req, instances);
// Meta-store predicted prefix — includes L1/remote-reachable blocks.
let meta_scores = meta.score_prefix(&req.hash_ids, now, n);
let fp = Self::fingerprint(&req.hash_ids, self.fingerprint_k);
let mut candidates = Vec::with_capacity(n);
let mut best_idx: usize = 0;
let mut best_cost = f64::INFINITY;
let mut best_hit = 0u32;
let mut best_queue = u32::MAX;
let mut best_rend: u64 = 0;
for (i, inst) in instances.iter().enumerate() {
let hit = l0[i];
// meta_only = extra blocks reachable by RDMA/L1 beyond L0 hit.
let meta_only = meta_scores[i].saturating_sub(hit);
let q = inst.queue_len();
// Cost to minimise — lower is better.
// load term: α · queue_len
// cache term: γ · l0_hit δ · meta_only
// Short novel prefixes yield hit=0 on every instance, so cost
// reduces to α·q and the rendezvous tiebreak picks the home.
let cost = self.load_alpha * q as f64
- self.l0_gamma * hit as f64
- self.meta_delta * meta_only as f64;
let rend = Self::rendezvous(fp, inst.id);
candidates.push(CandidateInfo {
instance: inst.id,
predicted_prefix: hit,
load_blocks: inst.kv_blocks_used,
queue_len: q,
});
// Tiebreak chain (descending preference):
// 1. lowest cost
// 2. highest hit (break cost ties toward real L0 work)
// 3. lowest queue
// 4. highest rendezvous (deterministic sticky home), optional
let better = if cost < best_cost {
true
} else if cost > best_cost {
false
} else if hit > best_hit {
true
} else if hit < best_hit {
false
} else if q < best_queue {
true
} else if q > best_queue {
false
} else if self.use_rendezvous {
rend > best_rend
} else {
// First-found wins on full tie (matches cache_score behaviour).
false
};
if better {
best_cost = cost;
best_hit = hit;
best_queue = q;
best_rend = rend;
best_idx = i;
}
}
RouteDecision {
req_id: req.req_id,
mode: "cache_affinity",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
candidates,
reason: "argmin(α·q γ·l0_hit δ·meta_only) + rendezvous tiebreak",
}
}
}

View File

@@ -13,7 +13,7 @@
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
pub struct CacheLoadRouter; pub struct CacheLoadRouter;
@@ -39,11 +39,11 @@ impl Router for CacheLoadRouter {
&mut self, &mut self,
req: &RequestRecord, req: &RequestRecord,
instances: &[Instance], instances: &[Instance],
meta: &MetaStore, _meta: &MetaStore,
now: f64, _now: f64,
) -> RouteDecision { ) -> RouteDecision {
let n = instances.len(); let n = instances.len();
let scores = meta.score_prefix(&req.hash_ids, now, n); let scores = local_l0_scores(req, instances);
// Step 1: least-loaded 1/4 of instances (by queue_len). // Step 1: least-loaded 1/4 of instances (by queue_len).
let pool_size = (n / 4).max(2).min(n); let pool_size = (n / 4).max(2).min(n);
@@ -83,7 +83,7 @@ impl Router for CacheLoadRouter {
chosen: instances[best_idx].id, chosen: instances[best_idx].id,
probe_overhead_s: 0.0, probe_overhead_s: 0.0,
candidates, candidates,
reason: "least-loaded 1/4, then best prefix", reason: "least-loaded 1/4, then best local L0 prefix",
} }
} }
} }

View File

@@ -32,7 +32,7 @@
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
pub struct CacheScoreRouter { pub struct CacheScoreRouter {
@@ -55,11 +55,11 @@ impl Router for CacheScoreRouter {
&mut self, &mut self,
req: &RequestRecord, req: &RequestRecord,
instances: &[Instance], instances: &[Instance],
meta: &MetaStore, _meta: &MetaStore,
now: f64, _now: f64,
) -> RouteDecision { ) -> RouteDecision {
let n = instances.len(); let n = instances.len();
let scores = meta.score_prefix(&req.hash_ids, now, n); let scores = local_l0_scores(req, instances);
let input_blocks = req.hash_ids.len() as f64; let input_blocks = req.hash_ids.len() as f64;
let mut best_idx: usize = 0; let mut best_idx: usize = 0;

View File

@@ -0,0 +1,86 @@
//! Cache-score routing using TTL meta-store prefix predictions.
//!
//! This keeps the same scoring rule as `cache_score`:
//!
//! ```text
//! exponent_i = alpha * queue_len_i + beta * miss_i
//! ```
//!
//! The only difference is that `miss_i` is computed from the global TTL
//! meta-store prefix score instead of the real local-L0 prefix.
use crate::cluster::meta_store::MetaStore;
use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord;
pub struct CacheScoreTtlRouter {
alpha: f64,
beta: f64,
}
impl CacheScoreTtlRouter {
pub fn new(alpha: f64, beta: f64) -> Self {
Self { alpha, beta }
}
}
impl Router for CacheScoreTtlRouter {
fn name(&self) -> &'static str {
"cache_score_ttl"
}
fn route(
&mut self,
req: &RequestRecord,
instances: &[Instance],
meta: &MetaStore,
now: f64,
) -> RouteDecision {
let n = instances.len();
let scores = meta.score_prefix(&req.hash_ids, now, n);
let input_blocks = req.hash_ids.len() as f64;
let mut best_idx: usize = 0;
let mut best_exp = f64::INFINITY;
let mut best_queue = u32::MAX;
let mut best_prefix = 0u32;
let mut candidates = Vec::with_capacity(n);
for (i, inst) in instances.iter().enumerate() {
let prefix = scores[i] as f64;
let miss = (input_blocks - prefix).max(0.0);
let q = inst.queue_len() as f64;
let exponent = self.alpha * q + self.beta * miss;
candidates.push(CandidateInfo {
instance: inst.id,
predicted_prefix: scores[i],
load_blocks: inst.kv_blocks_used,
queue_len: inst.queue_len(),
});
let better = exponent < best_exp
|| (exponent == best_exp && inst.queue_len() < best_queue)
|| (exponent == best_exp
&& inst.queue_len() == best_queue
&& scores[i] > best_prefix);
if better {
best_exp = exponent;
best_idx = i;
best_queue = inst.queue_len();
best_prefix = scores[i];
}
}
RouteDecision {
req_id: req.req_id,
mode: "cache_score_ttl",
chosen: instances[best_idx].id,
probe_overhead_s: 0.0,
candidates,
reason: "argmin 2^(alpha*load + beta*meta_store_miss)",
}
}
}

View File

@@ -1,57 +1,31 @@
//! First-principles TTFT-optimal routing. //! First-principles TTFT-estimate routing with calibrated compute and
//! tier-aware KV prepare costs.
//! //!
//! Estimates the actual time-to-first-token for each candidate instance: //! Estimates the actual time-to-first-token for each candidate instance:
//! //!
//! `TTFT(r,i) = drain(i) + fetch(r,i) + prefill(miss)` //! `TTFT(r,i) = drain(i) + scheduler + kv_prepare(r,i) + prefill(miss_i) + first_token_tail`
//!
//! - **drain** — exact queue drain time: sum of per-request `prefill_time()`
//! using the architecture-aware compute model (quadratic / DSA).
//!
//! - **fetch** — RDMA fetch time for blocks cached elsewhere in the cluster
//! but not on instance `i` locally.
//!
//! - **prefill** — compute for cluster-wide cache-miss tokens (constant
//! across instances, cancels in the argmin).
//!
//! The router minimises `drain(i) + fetch(r,i)`, with ties broken by
//! lowest `queue_len` then most local cache. The fetch overlap with queue
//! drain is handled by keeping the additive form: this gives double
//! incentive to prefer instances with local cache, which empirically
//! outperforms the `max(drain, fetch)` alternative because even small
//! RDMA savings compound across thousands of routing decisions.
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::config::Config; use crate::config::Config;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::router::{CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
use crate::ttft::{classify_prefix_tiers, TtftModel};
pub struct EstimatedTtftRouter { pub struct EstimatedTtftRouter {
/// Bytes per KV block (for RDMA cost estimation). ttft_model: TtftModel,
kv_block_bytes: f64,
/// RDMA bandwidth in bytes/s.
rdma_bw: f64,
/// RDMA per-transfer latency in seconds.
rdma_latency_s: f64,
} }
impl EstimatedTtftRouter { impl EstimatedTtftRouter {
pub fn new(config: &Config) -> Self { pub fn new(config: &Config) -> Self {
Self { Self {
kv_block_bytes: config.model.kv_block_bytes() as f64, ttft_model: TtftModel::new(
rdma_bw: config.hardware.rdma_bw, &config.hardware,
rdma_latency_s: config.hardware.rdma_latency_us * 1e-6, &config.calibration,
config.model.kv_block_bytes(),
),
} }
} }
/// Estimate RDMA fetch time for `remote_blocks` blocks.
fn fetch_time(&self, remote_blocks: u32) -> f64 {
if remote_blocks == 0 {
return 0.0;
}
let bytes = remote_blocks as f64 * self.kv_block_bytes;
bytes / self.rdma_bw + self.rdma_latency_s
}
} }
impl Router for EstimatedTtftRouter { impl Router for EstimatedTtftRouter {
@@ -66,53 +40,48 @@ impl Router for EstimatedTtftRouter {
meta: &MetaStore, meta: &MetaStore,
now: f64, now: f64,
) -> RouteDecision { ) -> RouteDecision {
let scheduler = self.ttft_model.scheduler_overhead_s(instances.len(), 3);
let n = instances.len(); let n = instances.len();
let scores = meta.score_prefix(&req.hash_ids, now, n);
// Cluster-wide max prefix: blocks reachable via RDMA from any peer.
let cluster_prefix = scores.iter().copied().max().unwrap_or(0);
let mut best: u32 = 0; let mut best: u32 = 0;
let mut best_cost = f64::INFINITY; let mut best_cost = f64::INFINITY;
let mut best_queue = u32::MAX; let mut best_queue = u32::MAX;
let mut best_local = 0u32; let mut best_reuse = 0u32;
let mut candidates = Vec::with_capacity(n); let mut candidates = Vec::with_capacity(n);
for inst in instances { for inst in instances {
let i = inst.id as usize; let residency = classify_prefix_tiers(&req.hash_ids, inst, meta, now);
let local_prefix = scores[i];
// 1. Exact queue drain time (architecture-aware, per-request sum). // 1. Exact queue drain time (architecture-aware, per-request sum).
let drain = inst.estimated_drain_time(); let drain = inst.estimated_drain_time();
// 2. RDMA fetch cost for blocks not locally cached. let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens);
let remote_blocks = cluster_prefix.saturating_sub(local_prefix); let kv_prepare = self.ttft_model.kv_prepare_time_s(residency);
let fetch = self.fetch_time(remote_blocks); let first_token_tail = self.ttft_model.first_token_tail_s();
let cost =
// Additive cost: drain + fetch. drain + scheduler + kv_prepare + inst.compute.prefill_time(miss_tokens) + first_token_tail;
// The additive form gives explicit incentive to prefer local cache
// (lower fetch) even when the queue is non-empty, which reduces
// total RDMA traffic and improves TTFT in aggregate.
let cost = drain + fetch;
candidates.push(CandidateInfo { candidates.push(CandidateInfo {
instance: inst.id, instance: inst.id,
predicted_prefix: local_prefix, predicted_prefix: residency.l0_hit_blocks
+ residency.l1_hit_blocks
+ residency.remote_hit_blocks,
load_blocks: inst.kv_blocks_used, load_blocks: inst.kv_blocks_used,
queue_len: inst.queue_len(), queue_len: inst.queue_len(),
}); });
// Minimise (cost, queue_len, -local_prefix). // Minimise (cost, queue_len, -local_prefix).
let ql = inst.queue_len(); let ql = inst.queue_len();
let reusable = residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks;
let better = cost < best_cost let better = cost < best_cost
|| (cost == best_cost && ql < best_queue) || (cost == best_cost && ql < best_queue)
|| (cost == best_cost && ql == best_queue && local_prefix > best_local); || (cost == best_cost && ql == best_queue && reusable > best_reuse);
if better { if better {
best_cost = cost; best_cost = cost;
best = inst.id; best = inst.id;
best_queue = ql; best_queue = ql;
best_local = local_prefix; best_reuse = reusable;
} }
} }
@@ -122,7 +91,7 @@ impl Router for EstimatedTtftRouter {
chosen: best, chosen: best,
probe_overhead_s: 0.0, probe_overhead_s: 0.0,
candidates, candidates,
reason: "argmin(drain_time + fetch_time)", reason: "argmin(drain + scheduler + kv_prepare + prefill + first_token_tail)",
} }
} }
} }

View File

@@ -29,8 +29,7 @@ impl Router for LeastLoadedRouter {
let mut best_score = f64::INFINITY; let mut best_score = f64::INFINITY;
let mut candidates = Vec::with_capacity(instances.len()); let mut candidates = Vec::with_capacity(instances.len());
for inst in instances { for inst in instances {
let load = inst.kv_blocks_used as f64 let load = inst.kv_blocks_used as f64 + self.alpha * inst.queue_len() as f64;
+ self.alpha * inst.queue_len() as f64;
candidates.push(CandidateInfo { candidates.push(CandidateInfo {
instance: inst.id, instance: inst.id,
predicted_prefix: 0, predicted_prefix: 0,

View File

@@ -1,4 +1,4 @@
//! Minimum P*D routing. //! Minimum P*D routing using real local L0 hits only.
//! //!
//! For each instance compute: //! For each instance compute:
//! - `P` = real prefill tokens this request will do if routed there //! - `P` = real prefill tokens this request will do if routed there
@@ -7,30 +7,18 @@
//! //!
//! Score = `P * D`, pick the instance that minimizes it. //! Score = `P * D`, pick the instance that minimizes it.
//! //!
//! `P` accounts for the **actual** prefill work after the cluster fetch //! `P` accounts only for blocks that miss in the candidate instance's
//! chain runs: the fetch chain serves any block cached anywhere in the //! current L0 cache. L1 / remote reuse may still reduce execution-time
//! cluster (L0 → L1 → remote v6d via RDMA), so prefill compute only runs //! work later in the cluster fetch chain, but they do not count as
//! for blocks that are absent cluster-wide *and* for blocks past the //! `kvcache hit` for routing.
//! instance-local prefix (the cluster only fetches a contiguous leading
//! prefix — any gap ends the fetch chain and the rest must be prefilled).
//! //!
//! Concretely, for instance `i`: //! Concretely, for instance `i`:
//! //!
//! ```text //! ```text
//! local_prefix_i = meta_store.score_prefix(req, now)[i] // blocks //! local_prefix_i = longest L0 prefix on instance i // blocks
//! cluster_prefix = max over all j of meta_store_score[j] // blocks //! P_i = (input_blocks - local_prefix_i) * block_size_tokens
//! effective_prefix_i = min(cluster_prefix, input_blocks)
//! - if local_prefix_i == cluster_prefix the fetch chain stays local,
//! - otherwise the prefill still skips cluster_prefix blocks because
//! the missing tail is fetched via RDMA from a peer.
//! P_i = (input_blocks - effective_prefix_i) * block_size_tokens
//! ``` //! ```
//! //!
//! This makes `P` nearly instance-independent on well-populated clusters
//! (so `min_pd` degenerates to balanced load with a cache-affinity
//! tiebreak), which is exactly what you want when RDMA is cheap relative
//! to prefill compute.
//!
//! Tiebreaks (essential on 128-instance clusters where many instances are //! Tiebreaks (essential on 128-instance clusters where many instances are
//! idle and the raw product collapses to zero): //! idle and the raw product collapses to zero):
//! 1. minimum `P*D` //! 1. minimum `P*D`
@@ -40,7 +28,7 @@
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
pub struct MinPdRouter; pub struct MinPdRouter;
@@ -66,36 +54,26 @@ impl Router for MinPdRouter {
&mut self, &mut self,
req: &RequestRecord, req: &RequestRecord,
instances: &[Instance], instances: &[Instance],
meta: &MetaStore, _meta: &MetaStore,
now: f64, _now: f64,
) -> RouteDecision { ) -> RouteDecision {
let n = instances.len(); let n = instances.len();
let scores = meta.score_prefix(&req.hash_ids, now, n); let scores = local_l0_scores(req, instances);
let block_size = instances[0].block_size_tokens as u64; let block_size = instances[0].block_size_tokens as u64;
let input_blocks = req.hash_ids.len() as u64; let input_blocks = req.hash_ids.len() as u64;
// Cluster-wide max prefix: longest contiguous prefix that EXISTS
// somewhere in the cluster (and will be fetched via remote RDMA if
// not local). This determines the effective prefill work for every
// candidate, not just the one that owns the blocks.
let cluster_prefix_blocks = scores.iter().copied().max().unwrap_or(0) as u64;
let effective_prefix_blocks = cluster_prefix_blocks.min(input_blocks);
let miss_blocks = input_blocks.saturating_sub(effective_prefix_blocks);
let p_base = miss_blocks.saturating_mul(block_size); // tokens to prefill
let mut candidates = Vec::with_capacity(n); let mut candidates = Vec::with_capacity(n);
let mut best: u32 = instances[0].id; let mut best: u32 = instances[0].id;
// Minimize (P*D, D, -local_prefix). // Minimize (P*D, D, -local_prefix).
// P is nearly instance-independent; D is the real discriminator.
// When tied on D, prefer the instance with the best local prefix
// (avoids the RDMA fetch cost).
let mut best_key: (u128, u64, i64) = (u128::MAX, u64::MAX, i64::MAX); let mut best_key: (u128, u64, i64) = (u128::MAX, u64::MAX, i64::MAX);
for inst in instances { for inst in instances {
let i = inst.id as usize; let i = inst.id as usize;
let d = inst.queue_len() as u64; let d = inst.queue_len() as u64;
let pd = p_base as u128 * d as u128;
let local_prefix = scores[i] as i64; let local_prefix = scores[i] as i64;
let miss_blocks = input_blocks.saturating_sub(scores[i] as u64);
let p = miss_blocks.saturating_mul(block_size);
let pd = p as u128 * d as u128;
candidates.push(CandidateInfo { candidates.push(CandidateInfo {
instance: inst.id, instance: inst.id,
@@ -118,7 +96,7 @@ impl Router for MinPdRouter {
chosen: best, chosen: best,
probe_overhead_s: 0.0, probe_overhead_s: 0.0,
candidates, candidates,
reason: "argmin(P*D), P=cluster-wide miss tokens, D=ongoing reqs", reason: "argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs",
} }
} }
} }

View File

@@ -1,7 +1,9 @@
//! Cluster-level routing strategies. //! Cluster-level routing strategies.
pub mod cache_affinity;
pub mod cache_load; pub mod cache_load;
pub mod cache_score; pub mod cache_score;
pub mod cache_score_ttl;
pub mod estimated_ttft; pub mod estimated_ttft;
pub mod least_loaded; pub mod least_loaded;
pub mod least_tokens; pub mod least_tokens;
@@ -48,6 +50,17 @@ pub trait Router: Send {
) -> RouteDecision; ) -> RouteDecision;
} }
pub(crate) fn local_l0_prefix(req: &RequestRecord, inst: &Instance) -> u32 {
inst.cache.l0.longest_prefix_peek(&req.hash_ids) as u32
}
pub(crate) fn local_l0_scores(req: &RequestRecord, instances: &[Instance]) -> Vec<u32> {
instances
.iter()
.map(|inst| local_l0_prefix(req, inst))
.collect()
}
pub fn build(full: &Config, seed: u64) -> Box<dyn Router> { pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
use crate::config::RouterMode::*; use crate::config::RouterMode::*;
let cfg = &full.cluster.router; let cfg = &full.cluster.router;
@@ -66,10 +79,22 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
MinPd => Box::new(min_pd::MinPdRouter::new()) as Box<dyn Router>, MinPd => Box::new(min_pd::MinPdRouter::new()) as Box<dyn Router>,
LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box<dyn Router>, LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box<dyn Router>,
CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box<dyn Router>, CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box<dyn Router>,
CacheScore => { CacheAffinity => Box::new(cache_affinity::CacheAffinityRouter::new(cfg.load_alpha))
Box::new(cache_score::CacheScoreRouter::new(cfg.score_alpha, cfg.score_beta)) as Box<dyn Router>,
as Box<dyn Router> CacheAffinityWeakRend => Box::new(
} cache_affinity::CacheAffinityRouter::weak_with_rendezvous(cfg.load_alpha),
) as Box<dyn Router>,
CacheAffinityStrongOnly => Box::new(
cache_affinity::CacheAffinityRouter::strong_no_rendezvous(cfg.load_alpha),
) as Box<dyn Router>,
CacheScore => Box::new(cache_score::CacheScoreRouter::new(
cfg.score_alpha,
cfg.score_beta,
)) as Box<dyn Router>,
CacheScoreTtl => Box::new(cache_score_ttl::CacheScoreTtlRouter::new(
cfg.score_alpha,
cfg.score_beta,
)) as Box<dyn Router>,
EstimatedTtft => { EstimatedTtft => {
Box::new(estimated_ttft::EstimatedTtftRouter::new(full)) as Box<dyn Router> Box::new(estimated_ttft::EstimatedTtftRouter::new(full)) as Box<dyn Router>
} }
@@ -78,3 +103,247 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CalibrationConfig, ClusterConfig, HardwareConfig, MetaStoreConfig, ModelConfig,
RouterConfig, RouterMode, SimConfig,
};
use crate::instance::instance::AdmittedRequest;
use crate::router::cache_load::CacheLoadRouter;
use crate::router::cache_score::CacheScoreRouter;
use crate::router::cache_score_ttl::CacheScoreTtlRouter;
use crate::router::estimated_ttft::EstimatedTtftRouter;
use crate::router::min_pd::MinPdRouter;
use crate::router::precise_aware::PreciseRouter;
use crate::router::prefix_affinity::PrefixAffinityRouter;
use crate::router::ttl_aware::TtlAwareRouter;
use crate::trace::RequestRecord;
fn test_model() -> ModelConfig {
ModelConfig {
name: "test".into(),
num_layers: 4,
num_kv_heads: 2,
head_dim: 64,
dtype_bytes: 2,
block_size_tokens: 16,
flops_per_token_prefill: Some(1.0e9),
attn_quadratic_coeff: Some(64.0),
..Default::default()
}
}
fn test_hardware() -> HardwareConfig {
HardwareConfig {
gpu_flops: 1.0e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9,
dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9,
pcie_latency_us: 1.0,
rdma_bw: 12.0e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
}
}
fn test_config(mode: RouterMode) -> Config {
Config {
model: test_model(),
hardware: test_hardware(),
calibration: CalibrationConfig::default(),
cluster: ClusterConfig {
num_instances: 2,
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
router: RouterConfig {
mode,
precise_probe_latency_us: 10.0,
precise_probe_topk: 2,
load_alpha: 0.0,
score_alpha: 0.0,
score_beta: 1.0,
prefix_k: 8,
affinity_fan_out: 2,
},
},
sim: SimConfig {
trace_path: String::new(),
max_requests: None,
output_dir: String::new(),
sample_interval_s: 0.0,
seed: 7,
},
}
}
fn make_instances(n: usize) -> Vec<Instance> {
let model = test_model();
let hw = test_hardware();
let calib = CalibrationConfig::default();
(0..n)
.map(|id| Instance::new(id as u32, &model, &hw, &calib))
.collect()
}
fn make_request(hashes: &[u64]) -> RequestRecord {
RequestRecord {
req_id: 1,
chat_id: 0,
arrival: 0.0,
input_len: hashes.len() as u32 * 16,
output_len: 16,
hash_ids: hashes.to_vec(),
}
}
fn insert_l0(inst: &mut Instance, hashes: &[u64]) {
let mut evicted = Vec::new();
inst.cache.l0.insert_blocks(hashes, &mut evicted);
}
fn insert_l1(inst: &mut Instance, hashes: &[u64]) {
let mut evicted = Vec::new();
inst.cache.l1.insert_blocks(hashes, &mut evicted);
}
fn publish_meta(meta: &mut MetaStore, inst_id: u32, hashes: &[u64], now: f64) {
for &h in hashes {
meta.insert(h, inst_id, now);
}
}
fn enqueue_requests(inst: &mut Instance, count: u32, tokens: u32) {
for req_id in 0..count {
inst.admit(AdmittedRequest {
req_id: req_id as u64,
arrival: 0.0,
ready_at: 0.0,
prefill_tokens_remaining: tokens,
reserved_blocks: 0,
completion_tail_s: 0.0,
});
}
}
#[test]
fn precise_uses_real_l0_prefix_not_l1_prefix() {
let req = make_request(&[10, 11, 12]);
let mut instances = make_instances(2);
let mut meta = MetaStore::new(1000.0);
insert_l1(&mut instances[0], &[10, 11, 12]);
publish_meta(&mut meta, 0, &[10, 11, 12], 0.0);
insert_l0(&mut instances[1], &[10, 11]);
let mut router = PreciseRouter::new(2, 10e-6, 0.0);
let decision = router.route(&req, &instances, &meta, 0.0);
assert_eq!(decision.chosen, 1);
}
#[test]
fn ttl_aware_uses_meta_store_scores() {
let req = make_request(&[20, 21, 22]);
let mut instances = make_instances(2);
let mut meta = MetaStore::new(1000.0);
// Instance 0 only looks hot in the meta store; its real L0 prefix is zero.
publish_meta(&mut meta, 0, &[20, 21, 22], 0.0);
// Instance 1 really holds the first two blocks in HBM.
insert_l0(&mut instances[1], &[20, 21]);
let mut ttl = TtlAwareRouter::new(0.0);
assert_eq!(ttl.route(&req, &instances, &meta, 0.0).chosen, 0);
}
#[test]
fn cache_aware_routers_compare_real_l0_not_meta_store_scores() {
let req = make_request(&[20, 21, 22]);
let mut instances = make_instances(2);
let mut meta = MetaStore::new(1000.0);
// Instance 0 only looks hot in the meta store; its real L0 prefix is zero.
publish_meta(&mut meta, 0, &[20, 21, 22], 0.0);
// Instance 1 really holds the first two blocks in HBM.
insert_l0(&mut instances[1], &[20, 21]);
let mut cache_load = CacheLoadRouter::new();
assert_eq!(cache_load.route(&req, &instances, &meta, 0.0).chosen, 1);
let mut cache_score = CacheScoreRouter::new(0.0, 1.0);
assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1);
let mut min_pd = MinPdRouter::new();
assert_eq!(min_pd.route(&req, &instances, &meta, 0.0).chosen, 1);
let cfg = test_config(RouterMode::EstimatedTtft);
let mut est = EstimatedTtftRouter::new(&cfg);
assert_eq!(est.route(&req, &instances, &meta, 0.0).chosen, 1);
}
#[test]
fn cache_score_ttl_uses_meta_store_prefix() {
let req = make_request(&[50, 51, 52]);
let mut instances = make_instances(2);
let mut meta = MetaStore::new(1000.0);
// Instance 0 only looks hot in the meta store.
publish_meta(&mut meta, 0, &[50, 51, 52], 0.0);
// Instance 1 has the true local L0 prefix.
insert_l0(&mut instances[1], &[50, 51]);
let mut cache_score = CacheScoreRouter::new(0.0, 1.0);
assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1);
let mut cache_score_ttl = CacheScoreTtlRouter::new(0.0, 1.0);
assert_eq!(cache_score_ttl.route(&req, &instances, &meta, 0.0).chosen, 0);
}
#[test]
fn prefix_affinity_fallback_uses_real_l0_not_meta_store_scores() {
let req = make_request(&[30, 31, 32]);
let mut instances = make_instances(2);
let mut meta = MetaStore::new(1000.0);
publish_meta(&mut meta, 0, &[30, 31, 32], 0.0);
insert_l0(&mut instances[1], &[30, 31]);
enqueue_requests(&mut instances[0], 5, 64);
enqueue_requests(&mut instances[1], 5, 64);
let cfg = test_config(RouterMode::PrefixAffinity);
let mut router = PrefixAffinityRouter::new(&cfg);
let decision = router.route(&req, &instances, &meta, 0.0);
assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)");
assert_eq!(decision.chosen, 1);
}
#[test]
fn estimated_ttft_can_prefer_full_l1_hit_over_shorter_queue() {
let req = make_request(&[40, 41, 42, 43]);
let mut instances = make_instances(2);
let meta = MetaStore::new(1000.0);
// Instance 0 can satisfy the whole prefix from local DRAM/L1.
insert_l1(&mut instances[0], &[40, 41, 42, 43]);
// Instance 1 has a slightly shorter queue but no reusable prefix.
enqueue_requests(&mut instances[0], 1, 16);
let cfg = test_config(RouterMode::EstimatedTtft);
let mut est = EstimatedTtftRouter::new(&cfg);
assert_eq!(est.route(&req, &instances, &meta, 0.0).chosen, 0);
}
}

View File

@@ -1,28 +1,13 @@
//! KV-aware routing via meta-store candidate selection + precise probing. //! L0-aware routing via exact per-instance probing.
//! //!
//! The global meta store is used as a *candidate pre-filter*: we score //! Every instance is compared using its *real current L0 prefix length*
//! every instance's predicted prefix from the store, take the top-K by //! only. L1 / remote availability can still reduce execution-time misses
//! (predicted_prefix DESC, load ASC), and then exact-probe those K //! later in the cluster fetch chain, but they do not count as `kvcache hit`
//! candidates' actual L0+L1 caches to get the true longest prefix. This //! for routing.
//! catches two cases where the meta store is wrong:
//!
//! - the store is stale (block evicted from L0/L1 but TTL not yet up),
//! - the store undercounts because some blocks' TTL expired individually.
//!
//! Because the candidate set is sourced from the meta store rather than
//! from a load ranking, this router is a strict superset of `ttl_aware`:
//! any instance the meta store would pick is a candidate here, and the
//! exact probe can only move the decision toward a truthfully-better
//! instance. Each probe adds `probe_latency_s` to the request's
//! effective arrival time.
//!
//! If the meta store returns zero-prefix for every instance (e.g. cold
//! start, or a request whose blocks have never been seen), we fall back
//! to the top-K least-loaded instances so we still place the request.
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::router::{local_l0_prefix, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
pub struct PreciseRouter { pub struct PreciseRouter {
@@ -33,7 +18,11 @@ pub struct PreciseRouter {
impl PreciseRouter { impl PreciseRouter {
pub fn new(topk: u32, probe_latency_s: f64, alpha: f64) -> Self { pub fn new(topk: u32, probe_latency_s: f64, alpha: f64) -> Self {
Self { topk, probe_latency_s, alpha } Self {
topk,
probe_latency_s,
alpha,
}
} }
fn load_of(&self, inst: &Instance) -> f64 { fn load_of(&self, inst: &Instance) -> f64 {
@@ -50,50 +39,15 @@ impl Router for PreciseRouter {
&mut self, &mut self,
req: &RequestRecord, req: &RequestRecord,
instances: &[Instance], instances: &[Instance],
meta: &MetaStore, _meta: &MetaStore,
now: f64, _now: f64,
) -> RouteDecision { ) -> RouteDecision {
let n = instances.len(); let n = instances.len();
let k = (self.topk as usize).min(n).max(1); let mut candidates = Vec::with_capacity(n);
let mut best = instances[0].id;
// 1. Meta-store candidate set: rank all instances by
// (predicted_prefix DESC, load ASC) and take the top-K.
let meta_scores = meta.score_prefix(&req.hash_ids, now, n);
let any_meta_hit = meta_scores.iter().any(|&p| p > 0);
let mut ranked: Vec<usize> = (0..n).collect();
if any_meta_hit {
ranked.sort_by(|&a, &b| {
let pa = meta_scores[a];
let pb = meta_scores[b];
// prefix desc, then load asc
pb.cmp(&pa)
.then_with(|| {
self.load_of(&instances[a])
.partial_cmp(&self.load_of(&instances[b]))
.unwrap_or(std::cmp::Ordering::Equal)
})
});
} else {
// Cold start fallback: pure load order.
ranked.sort_by(|&a, &b| {
self.load_of(&instances[a])
.partial_cmp(&self.load_of(&instances[b]))
.unwrap_or(std::cmp::Ordering::Equal)
});
}
let probed = &ranked[..k];
// 2. Exact probe each candidate and pick
// argmax(exact_prefix, tiebreak: -load).
let mut candidates = Vec::with_capacity(k);
let mut best = probed[0] as u32;
let mut best_key: (i64, f64) = (i64::MIN, f64::INFINITY); let mut best_key: (i64, f64) = (i64::MIN, f64::INFINITY);
for &i in probed { for inst in instances {
let inst = &instances[i]; let predicted = local_l0_prefix(req, inst);
let l0 = inst.cache.l0.longest_prefix_peek(&req.hash_ids);
let l1 = inst.cache.l1.longest_prefix_peek(&req.hash_ids[l0..]);
let predicted = (l0 + l1) as u32;
let load = self.load_of(inst); let load = self.load_of(inst);
candidates.push(CandidateInfo { candidates.push(CandidateInfo {
instance: inst.id, instance: inst.id,
@@ -112,9 +66,9 @@ impl Router for PreciseRouter {
req_id: req.req_id, req_id: req.req_id,
mode: "precise", mode: "precise",
chosen: best, chosen: best,
probe_overhead_s: k as f64 * self.probe_latency_s, probe_overhead_s: n as f64 * self.probe_latency_s,
candidates, candidates,
reason: "exact-probe top-K meta-store candidates", reason: "exact-probe all instances' L0 cache",
} }
} }
} }

View File

@@ -36,7 +36,7 @@
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::config::Config; use crate::config::Config;
use crate::instance::Instance; use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router}; use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord; use crate::trace::RequestRecord;
pub struct PrefixAffinityRouter { pub struct PrefixAffinityRouter {
@@ -47,12 +47,6 @@ pub struct PrefixAffinityRouter {
/// Queue-length threshold: if all top candidates exceed this, expand to /// Queue-length threshold: if all top candidates exceed this, expand to
/// the full instance set. /// the full instance set.
overload_threshold: u32, overload_threshold: u32,
/// Bytes per KV block (for RDMA cost estimation in fallback path).
kv_block_bytes: f64,
/// RDMA bandwidth in bytes/s.
rdma_bw: f64,
/// RDMA per-transfer latency in seconds.
rdma_latency_s: f64,
} }
impl PrefixAffinityRouter { impl PrefixAffinityRouter {
@@ -69,9 +63,6 @@ impl PrefixAffinityRouter {
prefix_k: config.cluster.router.prefix_k, prefix_k: config.cluster.router.prefix_k,
fan_out, fan_out,
overload_threshold: 4, overload_threshold: 4,
kv_block_bytes: config.model.kv_block_bytes() as f64,
rdma_bw: config.hardware.rdma_bw,
rdma_latency_s: config.hardware.rdma_latency_us * 1e-6,
} }
} }
@@ -96,15 +87,6 @@ impl PrefixAffinityRouter {
h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb); h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb);
h ^ (h >> 31) h ^ (h >> 31)
} }
/// Estimate RDMA fetch time for `remote_blocks` blocks.
fn fetch_time(&self, remote_blocks: u32) -> f64 {
if remote_blocks == 0 {
return 0.0;
}
let bytes = remote_blocks as f64 * self.kv_block_bytes;
bytes / self.rdma_bw + self.rdma_latency_s
}
} }
impl Router for PrefixAffinityRouter { impl Router for PrefixAffinityRouter {
@@ -116,8 +98,8 @@ impl Router for PrefixAffinityRouter {
&mut self, &mut self,
req: &RequestRecord, req: &RequestRecord,
instances: &[Instance], instances: &[Instance],
meta: &MetaStore, _meta: &MetaStore,
now: f64, _now: f64,
) -> RouteDecision { ) -> RouteDecision {
let n = instances.len(); let n = instances.len();
let fp = Self::fingerprint(&req.hash_ids, self.prefix_k); let fp = Self::fingerprint(&req.hash_ids, self.prefix_k);
@@ -129,7 +111,7 @@ impl Router for PrefixAffinityRouter {
ranked.sort_unstable_by(|a, b| b.0.cmp(&a.0)); // descending score ranked.sort_unstable_by(|a, b| b.0.cmp(&a.0)); // descending score
// Collect candidate info for logging (also needed for fallback). // Collect candidate info for logging (also needed for fallback).
let scores = meta.score_prefix(&req.hash_ids, now, n); let scores = local_l0_scores(req, instances);
let candidates: Vec<CandidateInfo> = instances let candidates: Vec<CandidateInfo> = instances
.iter() .iter()
.map(|inst| CandidateInfo { .map(|inst| CandidateInfo {
@@ -165,14 +147,14 @@ impl Router for PrefixAffinityRouter {
let reason; let reason;
if all_overloaded { if all_overloaded {
reason = "affinity fallback: min(drain+fetch)"; reason = "affinity fallback: min(drain+fetch)";
let cluster_prefix = scores.iter().copied().max().unwrap_or(0);
let mut best_cost = f64::INFINITY; let mut best_cost = f64::INFINITY;
for &(_, idx) in ranked.iter() { for &(_, idx) in ranked.iter() {
let inst = &instances[idx]; let inst = &instances[idx];
let drain = inst.estimated_drain_time(); let drain = inst.estimated_drain_time();
let local_prefix = scores[idx]; let miss_tokens = (req.hash_ids.len() as u32)
let remote_blocks = cluster_prefix.saturating_sub(local_prefix); .saturating_sub(scores[idx])
let cost = drain + self.fetch_time(remote_blocks); .saturating_mul(inst.block_size_tokens);
let cost = drain + inst.compute.prefill_time(miss_tokens);
let ql = inst.queue_len(); let ql = inst.queue_len();
if cost < best_cost || (cost == best_cost && ql < best_ql) { if cost < best_cost || (cost == best_cost && ql < best_ql) {
best_cost = cost; best_cost = cost;

View File

@@ -13,7 +13,9 @@ pub struct RandomRouter {
impl RandomRouter { impl RandomRouter {
pub fn new(seed: u64) -> Self { pub fn new(seed: u64) -> Self {
Self { rng: ChaCha8Rng::seed_from_u64(seed) } Self {
rng: ChaCha8Rng::seed_from_u64(seed),
}
} }
} }

View File

@@ -32,8 +32,7 @@ impl Router for TtlAwareRouter {
let mut candidates = Vec::with_capacity(n); let mut candidates = Vec::with_capacity(n);
for inst in instances { for inst in instances {
let p = scores[inst.id as usize]; let p = scores[inst.id as usize];
let load = inst.kv_blocks_used as f64 let load = inst.kv_blocks_used as f64 + self.alpha * inst.queue_len() as f64;
+ self.alpha * inst.queue_len() as f64;
candidates.push(CandidateInfo { candidates.push(CandidateInfo {
instance: inst.id, instance: inst.id,
predicted_prefix: p, predicted_prefix: p,

View File

@@ -56,7 +56,11 @@ impl EventQueue {
pub fn schedule(&mut self, time: f64, event: Event) { pub fn schedule(&mut self, time: f64, event: Event) {
let t = time.max(self.now); let t = time.max(self.now);
self.seq += 1; self.seq += 1;
self.heap.push(Slot { time: t, seq: self.seq, event }); self.heap.push(Slot {
time: t,
seq: self.seq,
event,
});
} }
pub fn pop(&mut self) -> Option<(f64, Event)> { pub fn pop(&mut self) -> Option<(f64, Event)> {
@@ -84,7 +88,12 @@ mod tests {
#[test] #[test]
fn pops_in_time_order() { fn pops_in_time_order() {
let mut q = EventQueue::new(); let mut q = EventQueue::new();
q.schedule(2.0, Event::BatchTick { instance: 0 as InstanceId }); q.schedule(
2.0,
Event::BatchTick {
instance: 0 as InstanceId,
},
);
q.schedule(1.0, Event::BatchTick { instance: 1 }); q.schedule(1.0, Event::BatchTick { instance: 1 });
q.schedule(1.5, Event::BatchTick { instance: 2 }); q.schedule(1.5, Event::BatchTick { instance: 2 });
let (t1, _) = q.pop().unwrap(); let (t1, _) = q.pop().unwrap();

View File

@@ -50,8 +50,7 @@ pub struct TraceReader {
impl TraceReader { impl TraceReader {
pub fn open<P: AsRef<Path>>(path: P, max_requests: Option<u64>) -> Result<Self> { pub fn open<P: AsRef<Path>>(path: P, max_requests: Option<u64>) -> Result<Self> {
let path = path.as_ref(); let path = path.as_ref();
let f = File::open(path) let f = File::open(path).with_context(|| format!("opening trace {}", path.display()))?;
.with_context(|| format!("opening trace {}", path.display()))?;
Ok(Self { Ok(Self {
inner: BufReader::with_capacity(1 << 20, f), inner: BufReader::with_capacity(1 << 20, f),
next_id: 0, next_id: 0,

177
src/ttft.rs Normal file
View File

@@ -0,0 +1,177 @@
use crate::cluster::meta_store::MetaStore;
use crate::config::{CalibrationConfig, HardwareConfig};
use crate::instance::Instance;
#[derive(Debug, Clone, Copy, Default)]
pub struct PrefixResidency {
pub l0_hit_blocks: u32,
pub l1_hit_blocks: u32,
pub remote_hit_blocks: u32,
pub miss_blocks: u32,
}
#[derive(Debug, Clone)]
pub struct TtftModel {
kv_block_bytes: u64,
host_dram_bw: f64,
pcie_bw: f64,
pcie_latency_s: f64,
rdma_bw: f64,
rdma_latency_s: f64,
scheduler_base_s: f64,
scheduler_per_candidate_s: f64,
cache_probe_per_tier_s: f64,
batch_pack_s: f64,
dram_access_s: f64,
remote_metadata_s: f64,
layout_transform_s: f64,
first_token_tail_s: f64,
}
impl TtftModel {
pub fn new(hw: &HardwareConfig, calib: &CalibrationConfig, kv_block_bytes: u64) -> Self {
Self {
kv_block_bytes,
host_dram_bw: hw.host_dram_bw * calib.dram_bw_util,
pcie_bw: hw.pcie_bw * calib.pcie_bw_util,
pcie_latency_s: hw.pcie_latency_us * 1e-6,
rdma_bw: hw.rdma_bw * calib.rdma_bw_util,
rdma_latency_s: hw.rdma_latency_us * 1e-6,
scheduler_base_s: calib.scheduler_base_overhead_us * 1e-6,
scheduler_per_candidate_s: calib.scheduler_per_candidate_us * 1e-6,
cache_probe_per_tier_s: calib.cache_probe_us_per_tier * 1e-6,
batch_pack_s: calib.batch_pack_overhead_us * 1e-6,
dram_access_s: calib.dram_access_latency_us * 1e-6,
remote_metadata_s: calib.remote_metadata_us * 1e-6,
layout_transform_s: calib.layout_transform_fixed_us * 1e-6,
first_token_tail_s: (calib.final_sync_us + calib.first_token_ready_us) * 1e-6,
}
}
pub fn scheduler_overhead_s(&self, num_candidates: usize, num_tiers: usize) -> f64 {
self.scheduler_base_s
+ self.scheduler_per_candidate_s * num_candidates as f64
+ self.cache_probe_per_tier_s * num_tiers as f64
+ self.batch_pack_s
}
pub fn first_token_tail_s(&self) -> f64 {
self.first_token_tail_s
}
pub fn block_bytes(&self, blocks: u32) -> u64 {
self.kv_block_bytes * blocks as u64
}
pub fn local_l1_prepare_time_s(&self, blocks: u32) -> f64 {
if blocks == 0 {
return 0.0;
}
let bytes = self.block_bytes(blocks);
self.dram_access_s
+ bytes as f64 / self.host_dram_bw.max(1.0)
+ self.pcie_cost_s(bytes)
+ self.layout_transform_s
}
pub fn remote_prepare_time_s(&self, blocks: u32) -> f64 {
if blocks == 0 {
return 0.0;
}
let bytes = self.block_bytes(blocks);
self.remote_metadata_s + self.rdma_cost_s(bytes) + self.pcie_cost_s(bytes) + self.layout_transform_s
}
pub fn pcie_cost_s(&self, bytes: u64) -> f64 {
if bytes == 0 {
self.pcie_latency_s
} else {
self.pcie_latency_s + bytes as f64 / self.pcie_bw.max(1.0)
}
}
pub fn rdma_cost_s(&self, bytes: u64) -> f64 {
if bytes == 0 {
self.rdma_latency_s
} else {
self.rdma_latency_s + bytes as f64 / self.rdma_bw.max(1.0)
}
}
pub fn kv_prepare_time_s(&self, residency: PrefixResidency) -> f64 {
self.local_l1_prepare_time_s(residency.l1_hit_blocks)
+ self.remote_prepare_time_s(residency.remote_hit_blocks)
}
}
pub fn classify_prefix_tiers(
req_hashes: &[u64],
inst: &Instance,
meta: &MetaStore,
now: f64,
) -> PrefixResidency {
let total_blocks = req_hashes.len() as u32;
let l0_hit_blocks = inst.cache.l0.longest_prefix_peek(req_hashes) as u32;
let suffix_after_l0 = &req_hashes[l0_hit_blocks as usize..];
let l1_hit_blocks = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32;
let suffix_after_l1 = &suffix_after_l0[l1_hit_blocks as usize..];
let mut remote_hit_blocks = 0;
for &h in suffix_after_l1 {
let owners = meta.instances_for(h, now);
if owners.iter().any(|o| *o != inst.id) {
remote_hit_blocks += 1;
} else {
break;
}
}
PrefixResidency {
l0_hit_blocks,
l1_hit_blocks,
remote_hit_blocks,
miss_blocks: total_blocks - l0_hit_blocks - l1_hit_blocks - remote_hit_blocks,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{CalibrationConfig, HardwareConfig};
#[test]
fn remote_prepare_includes_fixed_overheads() {
let hw = HardwareConfig {
gpu_flops: 1.0e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9,
dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9,
pcie_latency_us: 1.0,
rdma_bw: 12.0e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
};
let model = TtftModel::new(
&hw,
&CalibrationConfig {
remote_metadata_us: 11.0,
layout_transform_fixed_us: 7.0,
..CalibrationConfig::default()
},
4096,
);
let transport_only = model.rdma_cost_s(4096) + model.pcie_cost_s(4096);
let total = model.remote_prepare_time_s(1);
assert!(total > transport_only);
}
}

View File

@@ -28,13 +28,18 @@ fn base_config(trace_path: &str, out_dir: &str, mode: RouterMode) -> Config {
gpu_mem_bw: 1.0e12, gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9, hbm_bytes: 1.0e9,
dram_bytes: 4.0e9, dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9, pcie_bw: 32.0e9,
pcie_latency_us: 1.0, pcie_latency_us: 1.0,
rdma_bw: 12.0e9, rdma_bw: 12.0e9,
rdma_latency_us: 5.0, rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32, max_batch_slots: 32,
prefill_chunk_tokens: 1024, prefill_chunk_tokens: 1024,
}, },
calibration: CalibrationConfig::default(),
cluster: ClusterConfig { cluster: ClusterConfig {
num_instances: 4, num_instances: 4,
meta_store: MetaStoreConfig { meta_store: MetaStoreConfig {
@@ -172,12 +177,14 @@ fn ablation_lru_preserves_ttft_fields() {
RouterMode::Random, RouterMode::Random,
); );
let online = driver::run(&cfg, Some("online_lru")).expect("online lru run"); let online = driver::run(&cfg, Some("online_lru")).expect("online lru run");
let out = driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru]) let out =
driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru])
.expect("ablate lru"); .expect("ablate lru");
assert_eq!(out.len(), 1); assert_eq!(out.len(), 1);
let row = &out[0]; let row = &out[0];
let online_hit = online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote; let online_hit =
online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote;
let ablate_hit = row.hit_rate_l0 + row.hit_rate_l1 + row.hit_rate_remote; let ablate_hit = row.hit_rate_l0 + row.hit_rate_l1 + row.hit_rate_remote;
assert!( assert!(
@@ -204,11 +211,8 @@ fn ablate_rejects_belady_until_exact_algorithm_exists() {
RouterMode::Random, RouterMode::Random,
); );
let err = driver::ablate_fixed_placement( let err =
&cfg, driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Belady])
&[RouterMode::Random],
&[ReplayEvictPolicy::Belady],
)
.expect_err("belady should be rejected"); .expect_err("belady should be rejected");
assert!( assert!(
err.to_string().contains("exact belady"), err.to_string().contains("exact belady"),