fix: cache calculation
This commit is contained in:
6
.gitignore
vendored
6
.gitignore
vendored
@@ -1,6 +1,12 @@
|
||||
# Trace files
|
||||
bailian-traces
|
||||
|
||||
# docs
|
||||
docs
|
||||
reports
|
||||
scripts
|
||||
tests/test_analyze_affinity_policy.py
|
||||
|
||||
# Rust build artifacts
|
||||
/target/
|
||||
**/*.rs.bk
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pub mod meta_store;
|
||||
#[allow(clippy::module_inception)]
|
||||
pub mod cluster;
|
||||
pub mod meta_store;
|
||||
|
||||
pub use cluster::Cluster;
|
||||
pub use meta_store::MetaStore;
|
||||
|
||||
122
src/config.rs
122
src/config.rs
@@ -111,8 +111,7 @@ impl ModelConfig {
|
||||
* self.dtype_bytes as u64
|
||||
* self.block_size_tokens as u64
|
||||
} else {
|
||||
2u64
|
||||
* self.num_layers as u64
|
||||
2u64 * self.num_layers as u64
|
||||
* self.num_kv_heads as u64
|
||||
* self.head_dim as u64
|
||||
* self.dtype_bytes as u64
|
||||
@@ -455,7 +454,12 @@ impl RawConfig {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Config { model, hardware, cluster: self.cluster, sim: self.sim })
|
||||
Ok(Config {
|
||||
model,
|
||||
hardware,
|
||||
cluster: self.cluster,
|
||||
sim: self.sim,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,24 +478,60 @@ impl RawModelConfig {
|
||||
};
|
||||
|
||||
// Overlay: explicit YAML fields override the base.
|
||||
if let Some(v) = self.name { m.name = v; }
|
||||
if let Some(v) = self.num_layers { m.num_layers = v; }
|
||||
if let Some(v) = self.num_kv_heads { m.num_kv_heads = v; }
|
||||
if let Some(v) = self.head_dim { m.head_dim = v; }
|
||||
if let Some(v) = self.dtype_bytes { m.dtype_bytes = v; }
|
||||
if let Some(v) = self.block_size_tokens { m.block_size_tokens = v; }
|
||||
if let Some(v) = self.hidden_size { m.hidden_size = Some(v); }
|
||||
if let Some(v) = self.num_attention_heads { m.num_attention_heads = Some(v); }
|
||||
if let Some(v) = self.intermediate_size { m.intermediate_size = Some(v); }
|
||||
if self.moe.is_some() { m.moe = self.moe; }
|
||||
if self.mla.is_some() { m.mla = self.mla; }
|
||||
if self.attention.is_some() { m.attention = self.attention; }
|
||||
if let Some(v) = self.flops_per_token_prefill { m.flops_per_token_prefill = Some(v); }
|
||||
if let Some(v) = self.attn_quadratic_coeff { m.attn_quadratic_coeff = Some(v); }
|
||||
if let Some(v) = self.bytes_per_token_prefill { m.bytes_per_token_prefill = Some(v); }
|
||||
if self.compute_dtype.is_some() { m.compute_dtype = self.compute_dtype; }
|
||||
if let Some(v) = self.flops_per_token_decode { m.flops_per_token_decode = Some(v); }
|
||||
if let Some(v) = self.bytes_per_token_decode { m.bytes_per_token_decode = Some(v); }
|
||||
if let Some(v) = self.name {
|
||||
m.name = v;
|
||||
}
|
||||
if let Some(v) = self.num_layers {
|
||||
m.num_layers = v;
|
||||
}
|
||||
if let Some(v) = self.num_kv_heads {
|
||||
m.num_kv_heads = v;
|
||||
}
|
||||
if let Some(v) = self.head_dim {
|
||||
m.head_dim = v;
|
||||
}
|
||||
if let Some(v) = self.dtype_bytes {
|
||||
m.dtype_bytes = v;
|
||||
}
|
||||
if let Some(v) = self.block_size_tokens {
|
||||
m.block_size_tokens = v;
|
||||
}
|
||||
if let Some(v) = self.hidden_size {
|
||||
m.hidden_size = Some(v);
|
||||
}
|
||||
if let Some(v) = self.num_attention_heads {
|
||||
m.num_attention_heads = Some(v);
|
||||
}
|
||||
if let Some(v) = self.intermediate_size {
|
||||
m.intermediate_size = Some(v);
|
||||
}
|
||||
if self.moe.is_some() {
|
||||
m.moe = self.moe;
|
||||
}
|
||||
if self.mla.is_some() {
|
||||
m.mla = self.mla;
|
||||
}
|
||||
if self.attention.is_some() {
|
||||
m.attention = self.attention;
|
||||
}
|
||||
if let Some(v) = self.flops_per_token_prefill {
|
||||
m.flops_per_token_prefill = Some(v);
|
||||
}
|
||||
if let Some(v) = self.attn_quadratic_coeff {
|
||||
m.attn_quadratic_coeff = Some(v);
|
||||
}
|
||||
if let Some(v) = self.bytes_per_token_prefill {
|
||||
m.bytes_per_token_prefill = Some(v);
|
||||
}
|
||||
if self.compute_dtype.is_some() {
|
||||
m.compute_dtype = self.compute_dtype;
|
||||
}
|
||||
if let Some(v) = self.flops_per_token_decode {
|
||||
m.flops_per_token_decode = Some(v);
|
||||
}
|
||||
if let Some(v) = self.bytes_per_token_decode {
|
||||
m.bytes_per_token_decode = Some(v);
|
||||
}
|
||||
|
||||
// Validate deployment-specific fields that HF config.json never provides.
|
||||
anyhow::ensure!(
|
||||
@@ -535,16 +575,36 @@ impl RawHardwareConfig {
|
||||
};
|
||||
|
||||
// Overlay: explicit YAML fields override the preset / defaults.
|
||||
if let Some(v) = self.gpu_flops { hw.gpu_flops = v; }
|
||||
if let Some(v) = self.gpu_mem_bw { hw.gpu_mem_bw = v; }
|
||||
if let Some(v) = self.hbm_bytes { hw.hbm_bytes = v; }
|
||||
if let Some(v) = self.dram_bytes { hw.dram_bytes = v; }
|
||||
if let Some(v) = self.pcie_bw { hw.pcie_bw = v; }
|
||||
if let Some(v) = self.pcie_latency_us { hw.pcie_latency_us = v; }
|
||||
if let Some(v) = self.rdma_bw { hw.rdma_bw = v; }
|
||||
if let Some(v) = self.rdma_latency_us { hw.rdma_latency_us = v; }
|
||||
if let Some(v) = self.max_batch_slots { hw.max_batch_slots = v; }
|
||||
if let Some(v) = self.prefill_chunk_tokens { hw.prefill_chunk_tokens = v; }
|
||||
if let Some(v) = self.gpu_flops {
|
||||
hw.gpu_flops = v;
|
||||
}
|
||||
if let Some(v) = self.gpu_mem_bw {
|
||||
hw.gpu_mem_bw = v;
|
||||
}
|
||||
if let Some(v) = self.hbm_bytes {
|
||||
hw.hbm_bytes = v;
|
||||
}
|
||||
if let Some(v) = self.dram_bytes {
|
||||
hw.dram_bytes = v;
|
||||
}
|
||||
if let Some(v) = self.pcie_bw {
|
||||
hw.pcie_bw = v;
|
||||
}
|
||||
if let Some(v) = self.pcie_latency_us {
|
||||
hw.pcie_latency_us = v;
|
||||
}
|
||||
if let Some(v) = self.rdma_bw {
|
||||
hw.rdma_bw = v;
|
||||
}
|
||||
if let Some(v) = self.rdma_latency_us {
|
||||
hw.rdma_latency_us = v;
|
||||
}
|
||||
if let Some(v) = self.max_batch_slots {
|
||||
hw.max_batch_slots = v;
|
||||
}
|
||||
if let Some(v) = self.prefill_chunk_tokens {
|
||||
hw.prefill_chunk_tokens = v;
|
||||
}
|
||||
|
||||
// Validate minimum requirements.
|
||||
anyhow::ensure!(hw.gpu_flops > 0.0, "hardware.gpu_flops is required");
|
||||
|
||||
@@ -7,7 +7,7 @@ use anyhow::{Context, Result};
|
||||
use serde_json::Value;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::config::{AttentionConfig, MlaConfig, MoeConfig, ModelConfig};
|
||||
use crate::config::{AttentionConfig, MlaConfig, ModelConfig, MoeConfig};
|
||||
|
||||
/// Parse a HuggingFace config.json and return a partially-populated
|
||||
/// [`ModelConfig`]. The caller must still set `dtype_bytes` and
|
||||
@@ -34,8 +34,7 @@ fn parse_value(v: &Value) -> Result<ModelConfig> {
|
||||
let num_layers = u32_field(v, "num_hidden_layers");
|
||||
let hidden_size = u32_field(v, "hidden_size");
|
||||
let num_attention_heads = u32_field(v, "num_attention_heads");
|
||||
let num_kv_heads = u32_field(v, "num_key_value_heads")
|
||||
.or(num_attention_heads); // default to MHA
|
||||
let num_kv_heads = u32_field(v, "num_key_value_heads").or(num_attention_heads); // default to MHA
|
||||
let head_dim = u32_field(v, "head_dim").or_else(|| {
|
||||
// Infer: hidden_size / num_attention_heads
|
||||
match (hidden_size, num_attention_heads) {
|
||||
@@ -70,8 +69,7 @@ fn parse_value(v: &Value) -> Result<ModelConfig> {
|
||||
});
|
||||
|
||||
// --- Attention pattern ---
|
||||
let attention =
|
||||
if let Some(first_dense) = u32_field(v, "first_k_dense_replace") {
|
||||
let attention = if let Some(first_dense) = u32_field(v, "first_k_dense_replace") {
|
||||
// DSA-style model (GLM-5, DeepSeek-V3).
|
||||
// dense_window and sparse_stride are typically not in config.json;
|
||||
// use sensible defaults the user can override in YAML.
|
||||
@@ -188,6 +186,12 @@ mod tests {
|
||||
assert_eq!(moe.num_active_experts, 8);
|
||||
let mla = m.mla.as_ref().unwrap();
|
||||
assert_eq!(mla.kv_lora_rank, 512);
|
||||
assert!(matches!(m.attention, Some(AttentionConfig::Dsa { first_dense_layers: 3, .. })));
|
||||
assert!(matches!(
|
||||
m.attention,
|
||||
Some(AttentionConfig::Dsa {
|
||||
first_dense_layers: 3,
|
||||
..
|
||||
})
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,10 @@ pub enum AttentionPattern {
|
||||
SlidingWindow { window: f64 },
|
||||
/// DeepSeek Sparse Attention: effective_ctx = min(N, dense_window) +
|
||||
/// max(0, N - dense_window) / sparse_stride.
|
||||
Dsa { dense_window: f64, sparse_stride: f64 },
|
||||
Dsa {
|
||||
dense_window: f64,
|
||||
sparse_stride: f64,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -101,7 +104,8 @@ impl ComputeModel {
|
||||
|
||||
// --- MLP FLOPs/token/layer (SwiGLU: gate + up + down = 3 matmuls) ---
|
||||
let mlp = if let Some(moe) = &model.moe {
|
||||
let expert_inter = moe.expert_intermediate_size
|
||||
let expert_inter =
|
||||
moe.expert_intermediate_size
|
||||
.unwrap_or(model.intermediate_size.unwrap_or(0)) as f64;
|
||||
let active = moe.num_active_experts as f64;
|
||||
let shared = moe.num_shared_experts as f64;
|
||||
@@ -132,15 +136,13 @@ impl ComputeModel {
|
||||
let qk_hd = (mla.qk_nope_head_dim + mla.qk_rope_head_dim) as f64;
|
||||
let qk_rd = mla.qk_rope_head_dim as f64;
|
||||
let vhd = mla.v_head_dim as f64;
|
||||
(h * qlr + qlr * n_heads * qk_hd
|
||||
+ h * (kvlr + qk_rd)
|
||||
+ n_heads * vhd * h)
|
||||
* wdtype
|
||||
(h * qlr + qlr * n_heads * qk_hd + h * (kvlr + qk_rd) + n_heads * vhd * h) * wdtype
|
||||
} else {
|
||||
((n_heads + 2.0 * n_kv) * hd * h + n_heads * hd * h) * wdtype
|
||||
};
|
||||
let mlp_wt = if let Some(moe) = &model.moe {
|
||||
let expert_inter = moe.expert_intermediate_size
|
||||
let expert_inter =
|
||||
moe.expert_intermediate_size
|
||||
.unwrap_or(model.intermediate_size.unwrap_or(0)) as f64;
|
||||
let active = moe.num_active_experts as f64;
|
||||
let shared = moe.num_shared_experts as f64;
|
||||
@@ -169,10 +171,9 @@ impl ComputeModel {
|
||||
},
|
||||
0.0,
|
||||
),
|
||||
Some(AttentionConfig::Dense) | None => (
|
||||
AttentionPattern::Dense,
|
||||
model.num_layers as f64,
|
||||
),
|
||||
Some(AttentionConfig::Dense) | None => {
|
||||
(AttentionPattern::Dense, model.num_layers as f64)
|
||||
}
|
||||
};
|
||||
|
||||
Self {
|
||||
@@ -237,10 +238,10 @@ impl ComputeModel {
|
||||
let dense_layers = self.first_dense_layers;
|
||||
let sparse_layers = self.num_layers - dense_layers;
|
||||
|
||||
let dense_flops = dense_layers
|
||||
* (linear + self.attn_coeff * n * self.effective_ctx(n, true));
|
||||
let sparse_flops = sparse_layers
|
||||
* (linear + self.attn_coeff * n * self.effective_ctx(n, false));
|
||||
let dense_flops =
|
||||
dense_layers * (linear + self.attn_coeff * n * self.effective_ctx(n, true));
|
||||
let sparse_flops =
|
||||
sparse_layers * (linear + self.attn_coeff * n * self.effective_ctx(n, false));
|
||||
let total_flops = dense_flops + sparse_flops;
|
||||
|
||||
let compute_time = total_flops / self.gpu_flops;
|
||||
@@ -254,7 +255,9 @@ impl ComputeModel {
|
||||
pub fn describe(&self) -> String {
|
||||
let pattern_str = match &self.attn_pattern {
|
||||
AttentionPattern::Dense => "dense".to_string(),
|
||||
AttentionPattern::SlidingWindow { window } => format!("sliding_window({})", *window as u64),
|
||||
AttentionPattern::SlidingWindow { window } => {
|
||||
format!("sliding_window({})", *window as u64)
|
||||
}
|
||||
AttentionPattern::Dsa {
|
||||
dense_window,
|
||||
sparse_stride,
|
||||
@@ -266,8 +269,7 @@ impl ComputeModel {
|
||||
format!(
|
||||
"linear_flops/tok/layer={:.3e}, attn_coeff={:.0}, pattern={}, \
|
||||
weight_bytes/layer={:.2e}",
|
||||
self.linear_flops_per_token, self.attn_coeff, pattern_str,
|
||||
self.weight_bytes_per_layer,
|
||||
self.linear_flops_per_token, self.attn_coeff, pattern_str, self.weight_bytes_per_layer,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pub mod compute;
|
||||
pub mod kv_cache;
|
||||
#[allow(clippy::module_inception)]
|
||||
pub mod instance;
|
||||
pub mod kv_cache;
|
||||
|
||||
pub use instance::Instance;
|
||||
|
||||
@@ -12,7 +12,9 @@ pub struct RoutingLogWriter {
|
||||
impl RoutingLogWriter {
|
||||
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
|
||||
let f = File::create(path)?;
|
||||
Ok(Self { inner: BufWriter::new(f) })
|
||||
Ok(Self {
|
||||
inner: BufWriter::new(f),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn write(&mut self, decision: &RouteDecision) -> Result<()> {
|
||||
|
||||
@@ -19,7 +19,9 @@ pub struct TimeseriesWriter {
|
||||
impl TimeseriesWriter {
|
||||
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
|
||||
let f = std::fs::File::create(path)?;
|
||||
Ok(Self { inner: csv::Writer::from_writer(f) })
|
||||
Ok(Self {
|
||||
inner: csv::Writer::from_writer(f),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn write(&mut self, row: &TimeseriesRow) -> Result<()> {
|
||||
|
||||
@@ -51,7 +51,11 @@ impl TierResult {
|
||||
capacity_blocks,
|
||||
hits,
|
||||
misses,
|
||||
hit_rate: if total == 0 { 0.0 } else { hits as f64 / total as f64 },
|
||||
hit_rate: if total == 0 {
|
||||
0.0
|
||||
} else {
|
||||
hits as f64 / total as f64
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -68,12 +72,7 @@ pub fn analyze(records: &[RequestRecord], capacity_blocks: u64) -> OracleResult
|
||||
|
||||
// 1. Unlimited cache
|
||||
let unlimited_hits = run_unlimited(records);
|
||||
let unlimited = TierResult::from_counts(
|
||||
"unlimited",
|
||||
u64::MAX,
|
||||
unlimited_hits,
|
||||
total_blocks,
|
||||
);
|
||||
let unlimited = TierResult::from_counts("unlimited", u64::MAX, unlimited_hits, total_blocks);
|
||||
|
||||
// 2. Precompute next-use index for Belady
|
||||
let next_use = build_next_use(records);
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct CacheLoadRouter;
|
||||
@@ -39,11 +39,11 @@ impl Router for CacheLoadRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
let scores = local_l0_scores(req, instances);
|
||||
|
||||
// Step 1: least-loaded 1/4 of instances (by queue_len).
|
||||
let pool_size = (n / 4).max(2).min(n);
|
||||
@@ -83,7 +83,7 @@ impl Router for CacheLoadRouter {
|
||||
chosen: instances[best_idx].id,
|
||||
probe_overhead_s: 0.0,
|
||||
candidates,
|
||||
reason: "least-loaded 1/4, then best prefix",
|
||||
reason: "least-loaded 1/4, then best local L0 prefix",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct CacheScoreRouter {
|
||||
@@ -55,11 +55,11 @@ impl Router for CacheScoreRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
let scores = local_l0_scores(req, instances);
|
||||
let input_blocks = req.hash_ids.len() as f64;
|
||||
|
||||
let mut best_idx: usize = 0;
|
||||
|
||||
@@ -1,56 +1,30 @@
|
||||
//! First-principles TTFT-optimal routing.
|
||||
//! First-principles TTFT-estimate routing using local L0 hits only.
|
||||
//!
|
||||
//! Estimates the actual time-to-first-token for each candidate instance:
|
||||
//!
|
||||
//! `TTFT(r,i) = drain(i) + fetch(r,i) + prefill(miss)`
|
||||
//! `TTFT(r,i) = drain(i) + prefill(local_l0_miss_i)`
|
||||
//!
|
||||
//! - **drain** — exact queue drain time: sum of per-request `prefill_time()`
|
||||
//! using the architecture-aware compute model (quadratic / DSA).
|
||||
//!
|
||||
//! - **fetch** — RDMA fetch time for blocks cached elsewhere in the cluster
|
||||
//! but not on instance `i` locally.
|
||||
//! - **prefill** — compute for tokens whose blocks are absent from the
|
||||
//! instance's current L0 cache.
|
||||
//!
|
||||
//! - **prefill** — compute for cluster-wide cache-miss tokens (constant
|
||||
//! across instances, cancels in the argmin).
|
||||
//!
|
||||
//! The router minimises `drain(i) + fetch(r,i)`, with ties broken by
|
||||
//! lowest `queue_len` then most local cache. The fetch overlap with queue
|
||||
//! drain is handled by keeping the additive form: this gives double
|
||||
//! incentive to prefer instances with local cache, which empirically
|
||||
//! outperforms the `max(drain, fetch)` alternative because even small
|
||||
//! RDMA savings compound across thousands of routing decisions.
|
||||
//! L1 / remote reuse can still reduce execution-time misses later in the
|
||||
//! cluster fetch chain, but they are not counted as `kvcache hit` when
|
||||
//! comparing routing candidates.
|
||||
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::config::Config;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct EstimatedTtftRouter {
|
||||
/// Bytes per KV block (for RDMA cost estimation).
|
||||
kv_block_bytes: f64,
|
||||
/// RDMA bandwidth in bytes/s.
|
||||
rdma_bw: f64,
|
||||
/// RDMA per-transfer latency in seconds.
|
||||
rdma_latency_s: f64,
|
||||
}
|
||||
pub struct EstimatedTtftRouter;
|
||||
|
||||
impl EstimatedTtftRouter {
|
||||
pub fn new(config: &Config) -> Self {
|
||||
Self {
|
||||
kv_block_bytes: config.model.kv_block_bytes() as f64,
|
||||
rdma_bw: config.hardware.rdma_bw,
|
||||
rdma_latency_s: config.hardware.rdma_latency_us * 1e-6,
|
||||
}
|
||||
}
|
||||
|
||||
/// Estimate RDMA fetch time for `remote_blocks` blocks.
|
||||
fn fetch_time(&self, remote_blocks: u32) -> f64 {
|
||||
if remote_blocks == 0 {
|
||||
return 0.0;
|
||||
}
|
||||
let bytes = remote_blocks as f64 * self.kv_block_bytes;
|
||||
bytes / self.rdma_bw + self.rdma_latency_s
|
||||
pub fn new(_config: &Config) -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,14 +37,12 @@ impl Router for EstimatedTtftRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
|
||||
// Cluster-wide max prefix: blocks reachable via RDMA from any peer.
|
||||
let cluster_prefix = scores.iter().copied().max().unwrap_or(0);
|
||||
let scores = local_l0_scores(req, instances);
|
||||
let input_blocks = req.hash_ids.len() as u32;
|
||||
|
||||
let mut best: u32 = 0;
|
||||
let mut best_cost = f64::INFINITY;
|
||||
@@ -85,15 +57,11 @@ impl Router for EstimatedTtftRouter {
|
||||
// 1. Exact queue drain time (architecture-aware, per-request sum).
|
||||
let drain = inst.estimated_drain_time();
|
||||
|
||||
// 2. RDMA fetch cost for blocks not locally cached.
|
||||
let remote_blocks = cluster_prefix.saturating_sub(local_prefix);
|
||||
let fetch = self.fetch_time(remote_blocks);
|
||||
|
||||
// Additive cost: drain + fetch.
|
||||
// The additive form gives explicit incentive to prefer local cache
|
||||
// (lower fetch) even when the queue is non-empty, which reduces
|
||||
// total RDMA traffic and improves TTFT in aggregate.
|
||||
let cost = drain + fetch;
|
||||
// 2. Prefill compute for blocks absent from local L0.
|
||||
let miss_tokens = input_blocks
|
||||
.saturating_sub(local_prefix)
|
||||
.saturating_mul(inst.block_size_tokens);
|
||||
let cost = drain + inst.compute.prefill_time(miss_tokens);
|
||||
|
||||
candidates.push(CandidateInfo {
|
||||
instance: inst.id,
|
||||
@@ -122,7 +90,7 @@ impl Router for EstimatedTtftRouter {
|
||||
chosen: best,
|
||||
probe_overhead_s: 0.0,
|
||||
candidates,
|
||||
reason: "argmin(drain_time + fetch_time)",
|
||||
reason: "argmin(drain_time + local-L0-miss prefill_time)",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,8 +29,7 @@ impl Router for LeastLoadedRouter {
|
||||
let mut best_score = f64::INFINITY;
|
||||
let mut candidates = Vec::with_capacity(instances.len());
|
||||
for inst in instances {
|
||||
let load = inst.kv_blocks_used as f64
|
||||
+ self.alpha * inst.queue_len() as f64;
|
||||
let load = inst.kv_blocks_used as f64 + self.alpha * inst.queue_len() as f64;
|
||||
candidates.push(CandidateInfo {
|
||||
instance: inst.id,
|
||||
predicted_prefix: 0,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Minimum P*D routing.
|
||||
//! Minimum P*D routing using real local L0 hits only.
|
||||
//!
|
||||
//! For each instance compute:
|
||||
//! - `P` = real prefill tokens this request will do if routed there
|
||||
@@ -7,30 +7,18 @@
|
||||
//!
|
||||
//! Score = `P * D`, pick the instance that minimizes it.
|
||||
//!
|
||||
//! `P` accounts for the **actual** prefill work after the cluster fetch
|
||||
//! chain runs: the fetch chain serves any block cached anywhere in the
|
||||
//! cluster (L0 → L1 → remote v6d via RDMA), so prefill compute only runs
|
||||
//! for blocks that are absent cluster-wide *and* for blocks past the
|
||||
//! instance-local prefix (the cluster only fetches a contiguous leading
|
||||
//! prefix — any gap ends the fetch chain and the rest must be prefilled).
|
||||
//! `P` accounts only for blocks that miss in the candidate instance's
|
||||
//! current L0 cache. L1 / remote reuse may still reduce execution-time
|
||||
//! work later in the cluster fetch chain, but they do not count as
|
||||
//! `kvcache hit` for routing.
|
||||
//!
|
||||
//! Concretely, for instance `i`:
|
||||
//!
|
||||
//! ```text
|
||||
//! local_prefix_i = meta_store.score_prefix(req, now)[i] // blocks
|
||||
//! cluster_prefix = max over all j of meta_store_score[j] // blocks
|
||||
//! effective_prefix_i = min(cluster_prefix, input_blocks)
|
||||
//! - if local_prefix_i == cluster_prefix the fetch chain stays local,
|
||||
//! - otherwise the prefill still skips cluster_prefix blocks because
|
||||
//! the missing tail is fetched via RDMA from a peer.
|
||||
//! P_i = (input_blocks - effective_prefix_i) * block_size_tokens
|
||||
//! local_prefix_i = longest L0 prefix on instance i // blocks
|
||||
//! P_i = (input_blocks - local_prefix_i) * block_size_tokens
|
||||
//! ```
|
||||
//!
|
||||
//! This makes `P` nearly instance-independent on well-populated clusters
|
||||
//! (so `min_pd` degenerates to balanced load with a cache-affinity
|
||||
//! tiebreak), which is exactly what you want when RDMA is cheap relative
|
||||
//! to prefill compute.
|
||||
//!
|
||||
//! Tiebreaks (essential on 128-instance clusters where many instances are
|
||||
//! idle and the raw product collapses to zero):
|
||||
//! 1. minimum `P*D`
|
||||
@@ -40,7 +28,7 @@
|
||||
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct MinPdRouter;
|
||||
@@ -66,36 +54,26 @@ impl Router for MinPdRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
let scores = local_l0_scores(req, instances);
|
||||
let block_size = instances[0].block_size_tokens as u64;
|
||||
let input_blocks = req.hash_ids.len() as u64;
|
||||
|
||||
// Cluster-wide max prefix: longest contiguous prefix that EXISTS
|
||||
// somewhere in the cluster (and will be fetched via remote RDMA if
|
||||
// not local). This determines the effective prefill work for every
|
||||
// candidate, not just the one that owns the blocks.
|
||||
let cluster_prefix_blocks = scores.iter().copied().max().unwrap_or(0) as u64;
|
||||
let effective_prefix_blocks = cluster_prefix_blocks.min(input_blocks);
|
||||
let miss_blocks = input_blocks.saturating_sub(effective_prefix_blocks);
|
||||
let p_base = miss_blocks.saturating_mul(block_size); // tokens to prefill
|
||||
|
||||
let mut candidates = Vec::with_capacity(n);
|
||||
let mut best: u32 = instances[0].id;
|
||||
// Minimize (P*D, D, -local_prefix).
|
||||
// P is nearly instance-independent; D is the real discriminator.
|
||||
// When tied on D, prefer the instance with the best local prefix
|
||||
// (avoids the RDMA fetch cost).
|
||||
let mut best_key: (u128, u64, i64) = (u128::MAX, u64::MAX, i64::MAX);
|
||||
|
||||
for inst in instances {
|
||||
let i = inst.id as usize;
|
||||
let d = inst.queue_len() as u64;
|
||||
let pd = p_base as u128 * d as u128;
|
||||
let local_prefix = scores[i] as i64;
|
||||
let miss_blocks = input_blocks.saturating_sub(scores[i] as u64);
|
||||
let p = miss_blocks.saturating_mul(block_size);
|
||||
let pd = p as u128 * d as u128;
|
||||
|
||||
candidates.push(CandidateInfo {
|
||||
instance: inst.id,
|
||||
@@ -118,7 +96,7 @@ impl Router for MinPdRouter {
|
||||
chosen: best,
|
||||
probe_overhead_s: 0.0,
|
||||
candidates,
|
||||
reason: "argmin(P*D), P=cluster-wide miss tokens, D=ongoing reqs",
|
||||
reason: "argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,6 +48,17 @@ pub trait Router: Send {
|
||||
) -> RouteDecision;
|
||||
}
|
||||
|
||||
pub(crate) fn local_l0_prefix(req: &RequestRecord, inst: &Instance) -> u32 {
|
||||
inst.cache.l0.longest_prefix_peek(&req.hash_ids) as u32
|
||||
}
|
||||
|
||||
pub(crate) fn local_l0_scores(req: &RequestRecord, instances: &[Instance]) -> Vec<u32> {
|
||||
instances
|
||||
.iter()
|
||||
.map(|inst| local_l0_prefix(req, inst))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
|
||||
use crate::config::RouterMode::*;
|
||||
let cfg = &full.cluster.router;
|
||||
@@ -66,10 +77,10 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
|
||||
MinPd => Box::new(min_pd::MinPdRouter::new()) as Box<dyn Router>,
|
||||
LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box<dyn Router>,
|
||||
CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box<dyn Router>,
|
||||
CacheScore => {
|
||||
Box::new(cache_score::CacheScoreRouter::new(cfg.score_alpha, cfg.score_beta))
|
||||
as Box<dyn Router>
|
||||
}
|
||||
CacheScore => Box::new(cache_score::CacheScoreRouter::new(
|
||||
cfg.score_alpha,
|
||||
cfg.score_beta,
|
||||
)) as Box<dyn Router>,
|
||||
EstimatedTtft => {
|
||||
Box::new(estimated_ttft::EstimatedTtftRouter::new(full)) as Box<dyn Router>
|
||||
}
|
||||
@@ -78,3 +89,192 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::{
|
||||
ClusterConfig, HardwareConfig, MetaStoreConfig, ModelConfig, RouterConfig, RouterMode,
|
||||
SimConfig,
|
||||
};
|
||||
use crate::instance::instance::AdmittedRequest;
|
||||
use crate::router::cache_load::CacheLoadRouter;
|
||||
use crate::router::cache_score::CacheScoreRouter;
|
||||
use crate::router::estimated_ttft::EstimatedTtftRouter;
|
||||
use crate::router::min_pd::MinPdRouter;
|
||||
use crate::router::precise_aware::PreciseRouter;
|
||||
use crate::router::prefix_affinity::PrefixAffinityRouter;
|
||||
use crate::router::ttl_aware::TtlAwareRouter;
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
fn test_model() -> ModelConfig {
|
||||
ModelConfig {
|
||||
name: "test".into(),
|
||||
num_layers: 4,
|
||||
num_kv_heads: 2,
|
||||
head_dim: 64,
|
||||
dtype_bytes: 2,
|
||||
block_size_tokens: 16,
|
||||
flops_per_token_prefill: Some(1.0e9),
|
||||
attn_quadratic_coeff: Some(64.0),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn test_hardware() -> HardwareConfig {
|
||||
HardwareConfig {
|
||||
gpu_flops: 1.0e14,
|
||||
gpu_fp8_flops: 0.0,
|
||||
gpu_fp4_flops: 0.0,
|
||||
gpu_mem_bw: 1.0e12,
|
||||
hbm_bytes: 1.0e9,
|
||||
dram_bytes: 4.0e9,
|
||||
pcie_bw: 32.0e9,
|
||||
pcie_latency_us: 1.0,
|
||||
rdma_bw: 12.0e9,
|
||||
rdma_latency_us: 5.0,
|
||||
max_batch_slots: 32,
|
||||
prefill_chunk_tokens: 1024,
|
||||
}
|
||||
}
|
||||
|
||||
fn test_config(mode: RouterMode) -> Config {
|
||||
Config {
|
||||
model: test_model(),
|
||||
hardware: test_hardware(),
|
||||
cluster: ClusterConfig {
|
||||
num_instances: 2,
|
||||
meta_store: MetaStoreConfig {
|
||||
ttl_seconds: 1000.0,
|
||||
},
|
||||
router: RouterConfig {
|
||||
mode,
|
||||
precise_probe_latency_us: 10.0,
|
||||
precise_probe_topk: 2,
|
||||
load_alpha: 0.0,
|
||||
score_alpha: 0.0,
|
||||
score_beta: 1.0,
|
||||
prefix_k: 8,
|
||||
affinity_fan_out: 2,
|
||||
},
|
||||
},
|
||||
sim: SimConfig {
|
||||
trace_path: String::new(),
|
||||
max_requests: None,
|
||||
output_dir: String::new(),
|
||||
sample_interval_s: 0.0,
|
||||
seed: 7,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn make_instances(n: usize) -> Vec<Instance> {
|
||||
let model = test_model();
|
||||
let hw = test_hardware();
|
||||
(0..n)
|
||||
.map(|id| Instance::new(id as u32, &model, &hw))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn make_request(hashes: &[u64]) -> RequestRecord {
|
||||
RequestRecord {
|
||||
req_id: 1,
|
||||
chat_id: 0,
|
||||
arrival: 0.0,
|
||||
input_len: hashes.len() as u32 * 16,
|
||||
output_len: 16,
|
||||
hash_ids: hashes.to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_l0(inst: &mut Instance, hashes: &[u64]) {
|
||||
let mut evicted = Vec::new();
|
||||
inst.cache.l0.insert_blocks(hashes, &mut evicted);
|
||||
}
|
||||
|
||||
fn insert_l1(inst: &mut Instance, hashes: &[u64]) {
|
||||
let mut evicted = Vec::new();
|
||||
inst.cache.l1.insert_blocks(hashes, &mut evicted);
|
||||
}
|
||||
|
||||
fn publish_meta(meta: &mut MetaStore, inst_id: u32, hashes: &[u64], now: f64) {
|
||||
for &h in hashes {
|
||||
meta.insert(h, inst_id, now);
|
||||
}
|
||||
}
|
||||
|
||||
fn enqueue_requests(inst: &mut Instance, count: u32, tokens: u32) {
|
||||
for req_id in 0..count {
|
||||
inst.admit(AdmittedRequest {
|
||||
req_id: req_id as u64,
|
||||
arrival: 0.0,
|
||||
ready_at: 0.0,
|
||||
prefill_tokens_remaining: tokens,
|
||||
reserved_blocks: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn precise_uses_real_l0_prefix_not_l1_prefix() {
|
||||
let req = make_request(&[10, 11, 12]);
|
||||
let mut instances = make_instances(2);
|
||||
let mut meta = MetaStore::new(1000.0);
|
||||
|
||||
insert_l1(&mut instances[0], &[10, 11, 12]);
|
||||
publish_meta(&mut meta, 0, &[10, 11, 12], 0.0);
|
||||
insert_l0(&mut instances[1], &[10, 11]);
|
||||
|
||||
let mut router = PreciseRouter::new(2, 10e-6, 0.0);
|
||||
let decision = router.route(&req, &instances, &meta, 0.0);
|
||||
|
||||
assert_eq!(decision.chosen, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_aware_routers_compare_real_l0_not_meta_store_scores() {
|
||||
let req = make_request(&[20, 21, 22]);
|
||||
let mut instances = make_instances(2);
|
||||
let mut meta = MetaStore::new(1000.0);
|
||||
|
||||
// Instance 0 only looks hot in the meta store; its real L0 prefix is zero.
|
||||
publish_meta(&mut meta, 0, &[20, 21, 22], 0.0);
|
||||
// Instance 1 really holds the first two blocks in HBM.
|
||||
insert_l0(&mut instances[1], &[20, 21]);
|
||||
|
||||
let mut ttl = TtlAwareRouter::new(0.0);
|
||||
assert_eq!(ttl.route(&req, &instances, &meta, 0.0).chosen, 1);
|
||||
|
||||
let mut cache_load = CacheLoadRouter::new();
|
||||
assert_eq!(cache_load.route(&req, &instances, &meta, 0.0).chosen, 1);
|
||||
|
||||
let mut cache_score = CacheScoreRouter::new(0.0, 1.0);
|
||||
assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1);
|
||||
|
||||
let mut min_pd = MinPdRouter::new();
|
||||
assert_eq!(min_pd.route(&req, &instances, &meta, 0.0).chosen, 1);
|
||||
|
||||
let cfg = test_config(RouterMode::EstimatedTtft);
|
||||
let mut est = EstimatedTtftRouter::new(&cfg);
|
||||
assert_eq!(est.route(&req, &instances, &meta, 0.0).chosen, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_affinity_fallback_uses_real_l0_not_meta_store_scores() {
|
||||
let req = make_request(&[30, 31, 32]);
|
||||
let mut instances = make_instances(2);
|
||||
let mut meta = MetaStore::new(1000.0);
|
||||
|
||||
publish_meta(&mut meta, 0, &[30, 31, 32], 0.0);
|
||||
insert_l0(&mut instances[1], &[30, 31]);
|
||||
enqueue_requests(&mut instances[0], 5, 64);
|
||||
enqueue_requests(&mut instances[1], 5, 64);
|
||||
|
||||
let cfg = test_config(RouterMode::PrefixAffinity);
|
||||
let mut router = PrefixAffinityRouter::new(&cfg);
|
||||
let decision = router.route(&req, &instances, &meta, 0.0);
|
||||
|
||||
assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)");
|
||||
assert_eq!(decision.chosen, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,28 +1,13 @@
|
||||
//! KV-aware routing via meta-store candidate selection + precise probing.
|
||||
//! L0-aware routing via exact per-instance probing.
|
||||
//!
|
||||
//! The global meta store is used as a *candidate pre-filter*: we score
|
||||
//! every instance's predicted prefix from the store, take the top-K by
|
||||
//! (predicted_prefix DESC, load ASC), and then exact-probe those K
|
||||
//! candidates' actual L0+L1 caches to get the true longest prefix. This
|
||||
//! catches two cases where the meta store is wrong:
|
||||
//!
|
||||
//! - the store is stale (block evicted from L0/L1 but TTL not yet up),
|
||||
//! - the store undercounts because some blocks' TTL expired individually.
|
||||
//!
|
||||
//! Because the candidate set is sourced from the meta store rather than
|
||||
//! from a load ranking, this router is a strict superset of `ttl_aware`:
|
||||
//! any instance the meta store would pick is a candidate here, and the
|
||||
//! exact probe can only move the decision toward a truthfully-better
|
||||
//! instance. Each probe adds `probe_latency_s` to the request's
|
||||
//! effective arrival time.
|
||||
//!
|
||||
//! If the meta store returns zero-prefix for every instance (e.g. cold
|
||||
//! start, or a request whose blocks have never been seen), we fall back
|
||||
//! to the top-K least-loaded instances so we still place the request.
|
||||
//! Every instance is compared using its *real current L0 prefix length*
|
||||
//! only. L1 / remote availability can still reduce execution-time misses
|
||||
//! later in the cluster fetch chain, but they do not count as `kvcache hit`
|
||||
//! for routing.
|
||||
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_prefix, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct PreciseRouter {
|
||||
@@ -33,7 +18,11 @@ pub struct PreciseRouter {
|
||||
|
||||
impl PreciseRouter {
|
||||
pub fn new(topk: u32, probe_latency_s: f64, alpha: f64) -> Self {
|
||||
Self { topk, probe_latency_s, alpha }
|
||||
Self {
|
||||
topk,
|
||||
probe_latency_s,
|
||||
alpha,
|
||||
}
|
||||
}
|
||||
|
||||
fn load_of(&self, inst: &Instance) -> f64 {
|
||||
@@ -50,50 +39,15 @@ impl Router for PreciseRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let k = (self.topk as usize).min(n).max(1);
|
||||
|
||||
// 1. Meta-store candidate set: rank all instances by
|
||||
// (predicted_prefix DESC, load ASC) and take the top-K.
|
||||
let meta_scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
let any_meta_hit = meta_scores.iter().any(|&p| p > 0);
|
||||
|
||||
let mut ranked: Vec<usize> = (0..n).collect();
|
||||
if any_meta_hit {
|
||||
ranked.sort_by(|&a, &b| {
|
||||
let pa = meta_scores[a];
|
||||
let pb = meta_scores[b];
|
||||
// prefix desc, then load asc
|
||||
pb.cmp(&pa)
|
||||
.then_with(|| {
|
||||
self.load_of(&instances[a])
|
||||
.partial_cmp(&self.load_of(&instances[b]))
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
});
|
||||
} else {
|
||||
// Cold start fallback: pure load order.
|
||||
ranked.sort_by(|&a, &b| {
|
||||
self.load_of(&instances[a])
|
||||
.partial_cmp(&self.load_of(&instances[b]))
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
}
|
||||
let probed = &ranked[..k];
|
||||
|
||||
// 2. Exact probe each candidate and pick
|
||||
// argmax(exact_prefix, tiebreak: -load).
|
||||
let mut candidates = Vec::with_capacity(k);
|
||||
let mut best = probed[0] as u32;
|
||||
let mut candidates = Vec::with_capacity(n);
|
||||
let mut best = instances[0].id;
|
||||
let mut best_key: (i64, f64) = (i64::MIN, f64::INFINITY);
|
||||
for &i in probed {
|
||||
let inst = &instances[i];
|
||||
let l0 = inst.cache.l0.longest_prefix_peek(&req.hash_ids);
|
||||
let l1 = inst.cache.l1.longest_prefix_peek(&req.hash_ids[l0..]);
|
||||
let predicted = (l0 + l1) as u32;
|
||||
for inst in instances {
|
||||
let predicted = local_l0_prefix(req, inst);
|
||||
let load = self.load_of(inst);
|
||||
candidates.push(CandidateInfo {
|
||||
instance: inst.id,
|
||||
@@ -112,9 +66,9 @@ impl Router for PreciseRouter {
|
||||
req_id: req.req_id,
|
||||
mode: "precise",
|
||||
chosen: best,
|
||||
probe_overhead_s: k as f64 * self.probe_latency_s,
|
||||
probe_overhead_s: n as f64 * self.probe_latency_s,
|
||||
candidates,
|
||||
reason: "exact-probe top-K meta-store candidates",
|
||||
reason: "exact-probe all instances' L0 cache",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::config::Config;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct PrefixAffinityRouter {
|
||||
@@ -47,12 +47,6 @@ pub struct PrefixAffinityRouter {
|
||||
/// Queue-length threshold: if all top candidates exceed this, expand to
|
||||
/// the full instance set.
|
||||
overload_threshold: u32,
|
||||
/// Bytes per KV block (for RDMA cost estimation in fallback path).
|
||||
kv_block_bytes: f64,
|
||||
/// RDMA bandwidth in bytes/s.
|
||||
rdma_bw: f64,
|
||||
/// RDMA per-transfer latency in seconds.
|
||||
rdma_latency_s: f64,
|
||||
}
|
||||
|
||||
impl PrefixAffinityRouter {
|
||||
@@ -69,9 +63,6 @@ impl PrefixAffinityRouter {
|
||||
prefix_k: config.cluster.router.prefix_k,
|
||||
fan_out,
|
||||
overload_threshold: 4,
|
||||
kv_block_bytes: config.model.kv_block_bytes() as f64,
|
||||
rdma_bw: config.hardware.rdma_bw,
|
||||
rdma_latency_s: config.hardware.rdma_latency_us * 1e-6,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,15 +87,6 @@ impl PrefixAffinityRouter {
|
||||
h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb);
|
||||
h ^ (h >> 31)
|
||||
}
|
||||
|
||||
/// Estimate RDMA fetch time for `remote_blocks` blocks.
|
||||
fn fetch_time(&self, remote_blocks: u32) -> f64 {
|
||||
if remote_blocks == 0 {
|
||||
return 0.0;
|
||||
}
|
||||
let bytes = remote_blocks as f64 * self.kv_block_bytes;
|
||||
bytes / self.rdma_bw + self.rdma_latency_s
|
||||
}
|
||||
}
|
||||
|
||||
impl Router for PrefixAffinityRouter {
|
||||
@@ -116,8 +98,8 @@ impl Router for PrefixAffinityRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let fp = Self::fingerprint(&req.hash_ids, self.prefix_k);
|
||||
@@ -129,7 +111,7 @@ impl Router for PrefixAffinityRouter {
|
||||
ranked.sort_unstable_by(|a, b| b.0.cmp(&a.0)); // descending score
|
||||
|
||||
// Collect candidate info for logging (also needed for fallback).
|
||||
let scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
let scores = local_l0_scores(req, instances);
|
||||
let candidates: Vec<CandidateInfo> = instances
|
||||
.iter()
|
||||
.map(|inst| CandidateInfo {
|
||||
@@ -165,14 +147,14 @@ impl Router for PrefixAffinityRouter {
|
||||
let reason;
|
||||
if all_overloaded {
|
||||
reason = "affinity fallback: min(drain+fetch)";
|
||||
let cluster_prefix = scores.iter().copied().max().unwrap_or(0);
|
||||
let mut best_cost = f64::INFINITY;
|
||||
for &(_, idx) in ranked.iter() {
|
||||
let inst = &instances[idx];
|
||||
let drain = inst.estimated_drain_time();
|
||||
let local_prefix = scores[idx];
|
||||
let remote_blocks = cluster_prefix.saturating_sub(local_prefix);
|
||||
let cost = drain + self.fetch_time(remote_blocks);
|
||||
let miss_tokens = (req.hash_ids.len() as u32)
|
||||
.saturating_sub(scores[idx])
|
||||
.saturating_mul(inst.block_size_tokens);
|
||||
let cost = drain + inst.compute.prefill_time(miss_tokens);
|
||||
let ql = inst.queue_len();
|
||||
if cost < best_cost || (cost == best_cost && ql < best_ql) {
|
||||
best_cost = cost;
|
||||
|
||||
@@ -13,7 +13,9 @@ pub struct RandomRouter {
|
||||
|
||||
impl RandomRouter {
|
||||
pub fn new(seed: u64) -> Self {
|
||||
Self { rng: ChaCha8Rng::seed_from_u64(seed) }
|
||||
Self {
|
||||
rng: ChaCha8Rng::seed_from_u64(seed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::cluster::meta_store::MetaStore;
|
||||
use crate::instance::Instance;
|
||||
use crate::router::{CandidateInfo, RouteDecision, Router};
|
||||
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
|
||||
use crate::trace::RequestRecord;
|
||||
|
||||
pub struct TtlAwareRouter {
|
||||
@@ -22,18 +22,17 @@ impl Router for TtlAwareRouter {
|
||||
&mut self,
|
||||
req: &RequestRecord,
|
||||
instances: &[Instance],
|
||||
meta: &MetaStore,
|
||||
now: f64,
|
||||
_meta: &MetaStore,
|
||||
_now: f64,
|
||||
) -> RouteDecision {
|
||||
let n = instances.len();
|
||||
let scores = meta.score_prefix(&req.hash_ids, now, n);
|
||||
let scores = local_l0_scores(req, instances);
|
||||
let mut best = 0u32;
|
||||
let mut best_key = (i64::MIN, f64::INFINITY); // maximize prefix, then minimize load
|
||||
let mut candidates = Vec::with_capacity(n);
|
||||
for inst in instances {
|
||||
let p = scores[inst.id as usize];
|
||||
let load = inst.kv_blocks_used as f64
|
||||
+ self.alpha * inst.queue_len() as f64;
|
||||
let load = inst.kv_blocks_used as f64 + self.alpha * inst.queue_len() as f64;
|
||||
candidates.push(CandidateInfo {
|
||||
instance: inst.id,
|
||||
predicted_prefix: p,
|
||||
@@ -53,7 +52,7 @@ impl Router for TtlAwareRouter {
|
||||
chosen: best,
|
||||
probe_overhead_s: 0.0,
|
||||
candidates,
|
||||
reason: "max meta_store prefix, tie -> least loaded",
|
||||
reason: "max local L0 prefix, tie -> least loaded",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,11 @@ impl EventQueue {
|
||||
pub fn schedule(&mut self, time: f64, event: Event) {
|
||||
let t = time.max(self.now);
|
||||
self.seq += 1;
|
||||
self.heap.push(Slot { time: t, seq: self.seq, event });
|
||||
self.heap.push(Slot {
|
||||
time: t,
|
||||
seq: self.seq,
|
||||
event,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn pop(&mut self) -> Option<(f64, Event)> {
|
||||
@@ -84,7 +88,12 @@ mod tests {
|
||||
#[test]
|
||||
fn pops_in_time_order() {
|
||||
let mut q = EventQueue::new();
|
||||
q.schedule(2.0, Event::BatchTick { instance: 0 as InstanceId });
|
||||
q.schedule(
|
||||
2.0,
|
||||
Event::BatchTick {
|
||||
instance: 0 as InstanceId,
|
||||
},
|
||||
);
|
||||
q.schedule(1.0, Event::BatchTick { instance: 1 });
|
||||
q.schedule(1.5, Event::BatchTick { instance: 2 });
|
||||
let (t1, _) = q.pop().unwrap();
|
||||
|
||||
@@ -50,8 +50,7 @@ pub struct TraceReader {
|
||||
impl TraceReader {
|
||||
pub fn open<P: AsRef<Path>>(path: P, max_requests: Option<u64>) -> Result<Self> {
|
||||
let path = path.as_ref();
|
||||
let f = File::open(path)
|
||||
.with_context(|| format!("opening trace {}", path.display()))?;
|
||||
let f = File::open(path).with_context(|| format!("opening trace {}", path.display()))?;
|
||||
Ok(Self {
|
||||
inner: BufReader::with_capacity(1 << 20, f),
|
||||
next_id: 0,
|
||||
|
||||
@@ -172,12 +172,14 @@ fn ablation_lru_preserves_ttft_fields() {
|
||||
RouterMode::Random,
|
||||
);
|
||||
let online = driver::run(&cfg, Some("online_lru")).expect("online lru run");
|
||||
let out = driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru])
|
||||
let out =
|
||||
driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru])
|
||||
.expect("ablate lru");
|
||||
|
||||
assert_eq!(out.len(), 1);
|
||||
let row = &out[0];
|
||||
let online_hit = online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote;
|
||||
let online_hit =
|
||||
online.summary.hit_rate_l0 + online.summary.hit_rate_l1 + online.summary.hit_rate_remote;
|
||||
let ablate_hit = row.hit_rate_l0 + row.hit_rate_l1 + row.hit_rate_remote;
|
||||
|
||||
assert!(
|
||||
@@ -204,11 +206,8 @@ fn ablate_rejects_belady_until_exact_algorithm_exists() {
|
||||
RouterMode::Random,
|
||||
);
|
||||
|
||||
let err = driver::ablate_fixed_placement(
|
||||
&cfg,
|
||||
&[RouterMode::Random],
|
||||
&[ReplayEvictPolicy::Belady],
|
||||
)
|
||||
let err =
|
||||
driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Belady])
|
||||
.expect_err("belady should be rejected");
|
||||
assert!(
|
||||
err.to_string().contains("exact belady"),
|
||||
|
||||
Reference in New Issue
Block a user