feat: update ttft modeling and add cache affinity

This commit is contained in:
2026-04-15 19:08:10 +08:00
parent ff316c6873
commit a3f386c858
15 changed files with 1276 additions and 99 deletions

View File

@@ -8,6 +8,7 @@ use crate::instance::kv_cache::L1Change;
use crate::instance::Instance;
use crate::router::{self, RouteDecision, Router};
use crate::trace::RequestRecord;
use crate::ttft::{classify_prefix_tiers, TtftModel};
use crate::types::InstanceId;
#[derive(Debug, Clone)]
@@ -31,13 +32,19 @@ pub struct Cluster {
pub router: Box<dyn Router>,
pub block_size_tokens: u32,
pub kv_block_bytes: u64,
pub ttft_model: TtftModel,
}
impl Cluster {
pub fn new(config: &Config, model: &ModelConfig) -> Self {
let mut instances = Vec::with_capacity(config.cluster.num_instances as usize);
for id in 0..config.cluster.num_instances {
instances.push(Instance::new(id as InstanceId, model, &config.hardware));
instances.push(Instance::new(
id as InstanceId,
model,
&config.hardware,
&config.calibration,
));
}
let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds);
let router = router::build(config, config.sim.seed);
@@ -47,6 +54,7 @@ impl Cluster {
router,
block_size_tokens: model.block_size_tokens,
kv_block_bytes: model.kv_block_bytes(),
ttft_model: TtftModel::new(&config.hardware, &config.calibration, model.kv_block_bytes()),
}
}
@@ -59,24 +67,29 @@ impl Cluster {
.route(req, &self.instances, &self.meta_store, now);
let inst_id = decision.chosen;
let probe_overhead_s = decision.probe_overhead_s;
let scheduler_overhead_s = self
.ttft_model
.scheduler_overhead_s(self.instances.len(), 3);
// The router probe overhead delays the request's effective start time.
let effective_now = now + probe_overhead_s;
// The router probe overhead and scheduler work delay the request's
// effective start time.
let effective_now = now + probe_overhead_s + scheduler_overhead_s;
let inst = &mut self.instances[inst_id as usize];
let residency = classify_prefix_tiers(&req.hash_ids, inst, &self.meta_store, now);
let total_blocks = req.hash_ids.len() as u32;
let l0_hits = residency.l0_hit_blocks;
let l1_hits = residency.l1_hit_blocks;
let remote_hit_blocks = residency.remote_hit_blocks;
// 1. L0 lookup (touches matched blocks).
let l0_hits = inst.cache.l0.longest_prefix(&req.hash_ids) as u32;
// 2. L1 lookup on the remaining suffix.
// 1. L1 lookup on the remaining suffix.
let suffix_after_l0 = &req.hash_ids[l0_hits as usize..];
let l1_hits = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32;
// L1->L0 transfer cost
let l1_bytes = (l1_hits as u64) * self.kv_block_bytes;
let mut t = effective_now;
let mut l1_changes = Vec::new();
if l1_hits > 0 {
t += self.ttft_model.local_l1_prepare_time_s(l1_hits) - inst.links.pcie.cost(l1_bytes);
t = inst.links.pcie.reserve(t, l1_bytes);
l1_changes = inst
.cache
@@ -86,23 +99,14 @@ impl Cluster {
// 3. Remote v6d lookup for the still-remaining suffix.
let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..];
let mut remote_hit_blocks: u32 = 0;
for &h in suffix_after_l1 {
// A block is remotely available iff some instance other than
// `inst_id` lists it (and not expired).
let owners = self.meta_store.instances_for(h, now);
let any_remote = owners.iter().any(|o| *o != inst_id);
if any_remote {
remote_hit_blocks += 1;
} else {
break; // contiguous prefix - stop on first miss
}
}
let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes;
if remote_hit_blocks > 0 {
let pulled = &suffix_after_l1[..remote_hit_blocks as usize];
let l1_changes = {
let inst = &mut self.instances[inst_id as usize];
t += self.ttft_model.remote_prepare_time_s(remote_hit_blocks)
- inst.links.rdma.cost(remote_bytes)
- inst.links.pcie.cost(remote_bytes);
t = inst.links.rdma.reserve(t, remote_bytes);
t = inst.links.pcie.reserve(t, remote_bytes);
inst.cache.fetch_remote_blocks_to_l0(pulled)
@@ -134,6 +138,7 @@ impl Cluster {
ready_at: t,
prefill_tokens_remaining: miss_tokens,
reserved_blocks,
completion_tail_s: self.ttft_model.first_token_tail_s(),
};
let inst = &mut self.instances[inst_id as usize];
inst.admit(admitted);
@@ -170,3 +175,100 @@ impl Cluster {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
CalibrationConfig, ClusterConfig, Config, HardwareConfig, MetaStoreConfig, ModelConfig,
RouterConfig, RouterMode, SimConfig,
};
use crate::trace::RequestRecord;
fn test_config(mode: RouterMode) -> Config {
Config {
model: ModelConfig {
name: "test".into(),
num_layers: 4,
num_kv_heads: 2,
head_dim: 64,
dtype_bytes: 2,
block_size_tokens: 16,
flops_per_token_prefill: Some(1.0e9),
attn_quadratic_coeff: Some(64.0),
..Default::default()
},
hardware: HardwareConfig {
gpu_flops: 1.0e14,
gpu_fp8_flops: 0.0,
gpu_fp4_flops: 0.0,
gpu_mem_bw: 1.0e12,
hbm_bytes: 1.0e9,
dram_bytes: 4.0e9,
host_dram_bw: 5.0e11,
pcie_bw: 32.0e9,
pcie_latency_us: 1.0,
rdma_bw: 12.0e9,
rdma_latency_us: 5.0,
intra_node_tp_bw: 9.0e11,
intra_node_tp_latency_us: 2.0,
tp_degree: 1,
max_batch_slots: 32,
prefill_chunk_tokens: 1024,
},
calibration: CalibrationConfig {
dram_access_latency_us: 25.0,
layout_transform_fixed_us: 7.0,
..CalibrationConfig::default()
},
cluster: ClusterConfig {
num_instances: 1,
meta_store: MetaStoreConfig {
ttl_seconds: 1000.0,
},
router: RouterConfig {
mode,
precise_probe_latency_us: 10.0,
precise_probe_topk: 2,
load_alpha: 0.0,
score_alpha: 0.0,
score_beta: 1.0,
prefix_k: 8,
affinity_fan_out: 2,
},
},
sim: SimConfig {
trace_path: String::new(),
max_requests: None,
output_dir: String::new(),
sample_interval_s: 0.0,
seed: 7,
},
}
}
#[test]
fn l1_ready_at_includes_dram_and_transform_overhead() {
let cfg = test_config(RouterMode::EstimatedTtft);
let mut cluster = Cluster::new(&cfg, &cfg.model);
let req = RequestRecord {
req_id: 1,
chat_id: 0,
arrival: 0.0,
input_len: 32,
output_len: 16,
hash_ids: vec![10, 11],
};
let mut evicted = Vec::new();
cluster.instances[0]
.cache
.l1
.insert_blocks(&req.hash_ids, &mut evicted);
let stats = cluster.route_and_admit(&req, 0.0);
let pure_pcie = cluster.instances[0].links.pcie.cost(cluster.kv_block_bytes * 2);
assert!(stats.ready_at > pure_pcie);
}
}