diff --git a/configs/glm5-nvfp4-8xb300.yaml b/configs/glm5-nvfp4-8xb300.yaml index 90d1fc5..99398ed 100644 --- a/configs/glm5-nvfp4-8xb300.yaml +++ b/configs/glm5-nvfp4-8xb300.yaml @@ -14,11 +14,12 @@ model: hardware: type: 8xb300 hbm_bytes: 1900.0e9 # KV budget after FP4 weights (~372 GB) + dram_bytes: 1.5e12 # ~1.5 TB usable CPU DRAM per node cluster: - num_instances: 32 + num_instances: 8 meta_store: - ttl_seconds: 120.0 + ttl_seconds: 300.0 router: mode: prefix_affinity prefix_k: 8 diff --git a/src/cluster/cluster.rs b/src/cluster/cluster.rs index 233f947..d7e5cbd 100644 --- a/src/cluster/cluster.rs +++ b/src/cluster/cluster.rs @@ -4,6 +4,7 @@ use crate::cluster::meta_store::MetaStore; use crate::config::{Config, ModelConfig}; use crate::instance::instance::AdmittedRequest; +use crate::instance::kv_cache::L1Change; use crate::instance::Instance; use crate::router::{self, RouteDecision, Router}; use crate::trace::RequestRecord; @@ -53,7 +54,9 @@ impl Cluster { /// per-request stats for metrics. Does NOT schedule the BatchTick — the /// simulator driver does that based on the returned `ready_at`. pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats { - let decision = self.router.route(req, &self.instances, &self.meta_store, now); + let decision = self + .router + .route(req, &self.instances, &self.meta_store, now); let inst_id = decision.chosen; let probe_overhead_s = decision.probe_overhead_s; @@ -68,19 +71,18 @@ impl Cluster { // 2. L1 lookup on the remaining suffix. let suffix_after_l0 = &req.hash_ids[l0_hits as usize..]; - let l1_hits = inst.cache.l1.longest_prefix(suffix_after_l0) as u32; + let l1_hits = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32; // L1->L0 transfer cost let l1_bytes = (l1_hits as u64) * self.kv_block_bytes; let mut t = effective_now; + let mut l1_changes = Vec::new(); if l1_hits > 0 { t = inst.links.pcie.reserve(t, l1_bytes); - // Promote those blocks into L0 - let mut evicted = Vec::new(); - inst.cache.l0.insert_blocks( - &suffix_after_l0[..l1_hits as usize], - &mut evicted, - ); + l1_changes = inst + .cache + .promote_l1_blocks_to_l0(&suffix_after_l0[..l1_hits as usize]); } + Self::apply_l1_changes(&mut self.meta_store, inst_id, now, &l1_changes); // 3. Remote v6d lookup for the still-remaining suffix. let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..]; @@ -98,20 +100,14 @@ impl Cluster { } let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes; if remote_hit_blocks > 0 { - // RDMA from peer host -> local DRAM, then PCIe -> GPU - let inst = &mut self.instances[inst_id as usize]; - t = inst.links.rdma.reserve(t, remote_bytes); - t = inst.links.pcie.reserve(t, remote_bytes); - // Insert into local L1 (occupies LRU space) AND into L0 let pulled = &suffix_after_l1[..remote_hit_blocks as usize]; - let mut evicted_l1 = Vec::new(); - inst.cache.l1.insert_blocks(pulled, &mut evicted_l1); - let mut evicted_l0 = Vec::new(); - inst.cache.l0.insert_blocks(pulled, &mut evicted_l0); - // The local instance now also owns these blocks - update meta_store. - for &h in pulled { - self.meta_store.insert(h, inst_id, now); - } + let l1_changes = { + let inst = &mut self.instances[inst_id as usize]; + t = inst.links.rdma.reserve(t, remote_bytes); + t = inst.links.pcie.reserve(t, remote_bytes); + inst.cache.fetch_remote_blocks_to_l0(pulled) + }; + Self::apply_l1_changes(&mut self.meta_store, inst_id, now, &l1_changes); } // 4. Miss = remaining tokens to prefill from scratch. @@ -119,20 +115,14 @@ impl Cluster { let miss_tokens = miss_blocks * self.block_size_tokens; // The newly-prefilled blocks (after the request runs) are inserted - // into L0 here, and into L1 / meta_store via async writeback. Doing - // this at admission time is OK because we're tracking presence, not - // actually moving bytes — the writeback latency is hidden behind - // request execution and we don't model meta_store inconsistency - // window beyond the TTL itself. - let inst = &mut self.instances[inst_id as usize]; + // into L0 here. Only later L0 evictions become remotely visible by + // landing in L1 and being published to the meta store. let new_input_blocks = &req.hash_ids[(l0_hits + l1_hits + remote_hit_blocks) as usize..]; - let mut evicted_l0 = Vec::new(); - inst.cache.l0.insert_blocks(new_input_blocks, &mut evicted_l0); - let mut evicted_l1 = Vec::new(); - inst.cache.l1.insert_blocks(new_input_blocks, &mut evicted_l1); - for &h in new_input_blocks { - self.meta_store.insert(h, inst_id, now); - } + let l1_changes = { + let inst = &mut self.instances[inst_id as usize]; + inst.cache.insert_blocks_into_l0(new_input_blocks) + }; + Self::apply_l1_changes(&mut self.meta_store, inst_id, now, &l1_changes); // 5. Reserve KV slots for this request's prefill residency. // PD disaggregation: decode runs elsewhere, so only the input @@ -145,6 +135,7 @@ impl Cluster { prefill_tokens_remaining: miss_tokens, reserved_blocks, }; + let inst = &mut self.instances[inst_id as usize]; inst.admit(admitted); let pcie_bytes = l1_bytes + remote_bytes; @@ -164,4 +155,18 @@ impl Cluster { decision, } } + + fn apply_l1_changes( + meta_store: &mut MetaStore, + inst_id: InstanceId, + now: f64, + changes: &[L1Change], + ) { + for change in changes { + match *change { + L1Change::Added(hash) => meta_store.insert(hash, inst_id, now), + L1Change::Removed(hash) => meta_store.remove(hash, inst_id), + } + } + } } diff --git a/src/cluster/meta_store.rs b/src/cluster/meta_store.rs index 2fe3b94..04bf639 100644 --- a/src/cluster/meta_store.rs +++ b/src/cluster/meta_store.rs @@ -116,6 +116,21 @@ impl MetaStore { scores } + /// Remove `instance`'s entry for `block_hash` (e.g. after L1 eviction). + /// + /// The meta-store must reflect **L1 (DRAM) presence only**, because remote + /// RDMA fetch can only reach CPU DRAM, never GPU HBM. Whenever the L1 + /// tier evicts a block, the caller must invoke this so the meta-store + /// stops advertising the block as remotely available on this instance. + pub fn remove(&mut self, block_hash: u64, instance: InstanceId) { + if let Some(bucket) = self.map.get_mut(&block_hash) { + bucket.retain(|e| e.instance != instance); + if bucket.is_empty() { + self.map.remove(&block_hash); + } + } + } + /// Lookup which (alive) instances claim to hold a given block. pub fn instances_for(&self, hash: u64, now: f64) -> SmallVec<[InstanceId; 4]> { let mut out = SmallVec::new(); @@ -149,6 +164,34 @@ mod tests { assert_eq!(s[2], 0); } + #[test] + fn remove_cleans_up() { + let mut m = MetaStore::new(60.0); + m.insert(10, 0, 0.0); + m.insert(10, 1, 0.0); + m.insert(11, 0, 0.0); + + // instance 0 has both blocks, instance 1 has block 10 only + let owners = m.instances_for(10, 0.5); + assert_eq!(owners.len(), 2); + + // Remove instance 0's entry for block 10 + m.remove(10, 0); + let owners = m.instances_for(10, 0.5); + assert_eq!(owners.len(), 1); + assert_eq!(owners[0], 1); + + // Instance 0 still owns block 11 + let owners = m.instances_for(11, 0.5); + assert_eq!(owners.len(), 1); + assert_eq!(owners[0], 0); + + // Remove last owner of a block -> entry fully cleaned + m.remove(10, 1); + let owners = m.instances_for(10, 0.5); + assert!(owners.is_empty()); + } + #[test] fn ttl_expiry() { let mut m = MetaStore::new(1.0); diff --git a/src/instance/kv_cache.rs b/src/instance/kv_cache.rs index 9daaa56..68e4fa4 100644 --- a/src/instance/kv_cache.rs +++ b/src/instance/kv_cache.rs @@ -10,6 +10,12 @@ use ahash::AHashMap; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum L1Change { + Added(u64), + Removed(u64), +} + /// Doubly-linked-list-backed LRU keyed by block hash. #[derive(Debug)] pub struct LruBlocks { @@ -56,6 +62,16 @@ impl LruBlocks { self.map.contains_key(&key) } + pub fn remove(&mut self, key: u64) -> bool { + if let Some(idx) = self.map.remove(&key) { + self.detach(idx); + self.free.push(idx); + true + } else { + false + } + } + /// Touch (move to MRU) if present. Returns whether the key was present. pub fn touch(&mut self, key: u64) -> bool { if let Some(&idx) = self.map.get(&key) { @@ -70,33 +86,47 @@ impl LruBlocks { /// existing block just touches it. pub fn insert_blocks(&mut self, hashes: &[u64], evicted_out: &mut Vec) { for &h in hashes { - if self.touch(h) { - continue; + if let Some(evicted) = self.insert_block(h) { + evicted_out.push(evicted); } - // need to make room? - if self.map.len() == self.capacity { - if let Some(tail_idx) = self.tail { - let tail_key = self.nodes[tail_idx].key; - self.detach(tail_idx); - self.map.remove(&tail_key); - self.free.push(tail_idx); - evicted_out.push(tail_key); - } - } - // allocate node - let idx = if let Some(i) = self.free.pop() { - self.nodes[i] = Node { key: h, prev: None, next: None }; - i - } else { - let i = self.nodes.len(); - self.nodes.push(Node { key: h, prev: None, next: None }); - i - }; - self.map.insert(h, idx); - self.attach_to_head(idx); } } + pub fn insert_block(&mut self, key: u64) -> Option { + if self.touch(key) { + return None; + } + let mut evicted = None; + if self.map.len() == self.capacity { + if let Some(tail_idx) = self.tail { + let tail_key = self.nodes[tail_idx].key; + self.detach(tail_idx); + self.map.remove(&tail_key); + self.free.push(tail_idx); + evicted = Some(tail_key); + } + } + let idx = if let Some(i) = self.free.pop() { + self.nodes[i] = Node { + key, + prev: None, + next: None, + }; + i + } else { + let i = self.nodes.len(); + self.nodes.push(Node { + key, + prev: None, + next: None, + }); + i + }; + self.map.insert(key, idx); + self.attach_to_head(idx); + evicted + } + /// Longest leading prefix of `hashes` present; touches the matched blocks. pub fn longest_prefix(&mut self, hashes: &[u64]) -> usize { let mut n = 0usize; @@ -178,6 +208,68 @@ impl TwoTierCache { l1: LruBlocks::new(l1_cap), } } + + pub fn insert_blocks_into_l0(&mut self, hashes: &[u64]) -> Vec { + let mut changes = Vec::new(); + for &h in hashes { + self.insert_block_into_l0(h, &mut changes); + } + changes + } + + pub fn promote_l1_blocks_to_l0(&mut self, hashes: &[u64]) -> Vec { + let mut changes = Vec::new(); + for &h in hashes { + if self.l1.remove(h) { + changes.push(L1Change::Removed(h)); + } + self.insert_block_into_l0(h, &mut changes); + } + changes + } + + pub fn fetch_remote_blocks_to_l0(&mut self, hashes: &[u64]) -> Vec { + let mut changes = Vec::new(); + for &h in hashes { + self.stage_remote_block_in_l1(h, &mut changes); + let removed = self.l1.remove(h); + debug_assert!(removed, "staged remote block must be present in l1"); + self.insert_block_into_l0(h, &mut changes); + } + changes + } + + fn insert_block_into_l0(&mut self, hash: u64, changes: &mut Vec) { + if self.l0.touch(hash) { + return; + } + if self.l1.remove(hash) { + changes.push(L1Change::Removed(hash)); + } + if let Some(evicted_l0) = self.l0.insert_block(hash) { + self.demote_into_l1(evicted_l0, changes); + } + } + + fn stage_remote_block_in_l1(&mut self, hash: u64, changes: &mut Vec) { + if self.l0.contains(hash) || self.l1.contains(hash) { + return; + } + if let Some(evicted_l1) = self.l1.insert_block(hash) { + changes.push(L1Change::Removed(evicted_l1)); + } + } + + fn demote_into_l1(&mut self, hash: u64, changes: &mut Vec) { + debug_assert!(!self.l0.contains(hash)); + if self.l1.touch(hash) { + return; + } + if let Some(evicted_l1) = self.l1.insert_block(hash) { + changes.push(L1Change::Removed(evicted_l1)); + } + changes.push(L1Change::Added(hash)); + } } #[cfg(test)] @@ -223,4 +315,61 @@ mod tests { c.insert_blocks(&[4], &mut ev); assert_eq!(ev, vec![2]); } + + #[test] + fn two_tier_cache_demotes_l0_evictions_into_l1() { + let mut c = TwoTierCache::new(2, 2); + + assert!(c.insert_blocks_into_l0(&[1, 2]).is_empty()); + let changes = c.insert_blocks_into_l0(&[3]); + + assert!(c.l0.contains(2)); + assert!(c.l0.contains(3)); + assert!(!c.l0.contains(1)); + assert!(c.l1.contains(1)); + assert_eq!(changes, vec![L1Change::Added(1)]); + } + + #[test] + fn promoting_l1_blocks_to_l0_keeps_tiers_exclusive() { + let mut c = TwoTierCache::new(2, 2); + c.insert_blocks_into_l0(&[1, 2, 3]); + + let changes = c.promote_l1_blocks_to_l0(&[1]); + + assert!(c.l0.contains(1)); + assert!(c.l0.contains(3)); + assert!(!c.l0.contains(2)); + assert!(!c.l1.contains(1)); + assert!(c.l1.contains(2)); + assert_eq!(changes, vec![L1Change::Removed(1), L1Change::Added(2)]); + } + + #[test] + fn reinserting_block_into_l0_removes_duplicate_from_l1() { + let mut c = TwoTierCache::new(2, 2); + c.insert_blocks_into_l0(&[1, 2, 3]); + + let changes = c.insert_blocks_into_l0(&[1]); + + assert!(c.l0.contains(1)); + assert!(c.l0.contains(3)); + assert!(!c.l1.contains(1)); + assert!(c.l1.contains(2)); + assert_eq!(changes, vec![L1Change::Removed(1), L1Change::Added(2)]); + } + + #[test] + fn remote_fetch_uses_l1_capacity_before_promoting_to_l0() { + let mut c = TwoTierCache::new(2, 1); + c.insert_blocks_into_l0(&[1, 2, 3]); + + let changes = c.fetch_remote_blocks_to_l0(&[4]); + + assert!(c.l0.contains(3)); + assert!(c.l0.contains(4)); + assert!(!c.l1.contains(1)); + assert!(c.l1.contains(2)); + assert_eq!(changes, vec![L1Change::Removed(1), L1Change::Added(2)]); + } }