//! Global redis-like KV-cache index. //! //! Maps `block_hash -> SmallVec<(instance_id, expires_at)>`. TTL eviction is //! lazy (on read). The TTL-aware router uses `score_prefix` to score each //! instance's predicted longest prefix without probing instances directly. use ahash::AHashMap; use smallvec::SmallVec; use crate::types::InstanceId; #[derive(Debug, Clone, Copy)] struct Entry { instance: InstanceId, expires_at: f64, } #[derive(Debug, Default)] pub struct MetaStore { ttl_seconds: f64, map: AHashMap>, } impl MetaStore { pub fn new(ttl_seconds: f64) -> Self { Self { ttl_seconds, map: AHashMap::with_capacity(1 << 16), } } pub fn ttl(&self) -> f64 { self.ttl_seconds } /// Record that `instance` now holds `block_hash`. pub fn insert(&mut self, block_hash: u64, instance: InstanceId, now: f64) { let entry = Entry { instance, expires_at: now + self.ttl_seconds, }; let bucket = self.map.entry(block_hash).or_default(); // refresh existing entry if present for e in bucket.iter_mut() { if e.instance == instance { e.expires_at = entry.expires_at; return; } } bucket.push(entry); } /// Score each candidate instance by the longest leading prefix of /// `hash_ids` for which the meta store believes that instance still holds /// every block. Returns scores indexed by instance id. pub fn score_prefix(&self, hash_ids: &[u64], now: f64, num_instances: usize) -> Vec { if hash_ids.is_empty() { return vec![0; num_instances]; } // Walk hashes; at each step intersect the still-eligible instance set. // Use a small bitset since num_instances is typically <= 1024. let mut alive: Vec = vec![false; num_instances]; // First block: seed alive set let first = hash_ids[0]; let mut any = false; if let Some(bucket) = self.map.get(&first) { for e in bucket { if e.expires_at >= now { let i = e.instance as usize; if i < num_instances { alive[i] = true; any = true; } } } } let mut scores = vec![0u32; num_instances]; if !any { return scores; } for i in 0..num_instances { if alive[i] { scores[i] = 1; } } // Subsequent blocks: an instance survives only if the meta store still // lists it for that block (and not expired). for (depth, &h) in hash_ids.iter().enumerate().skip(1) { let bucket = match self.map.get(&h) { Some(b) => b, None => break, }; // mark instances present for this block let mut present = vec![false; num_instances]; let mut any2 = false; for e in bucket { if e.expires_at >= now { let i = e.instance as usize; if i < num_instances && alive[i] { present[i] = true; any2 = true; } } } if !any2 { break; } for i in 0..num_instances { if present[i] { scores[i] = (depth + 1) as u32; } else { alive[i] = false; } } } scores } /// Remove `instance`'s entry for `block_hash` (e.g. after L1 eviction). /// /// The meta-store must reflect **L1 (DRAM) presence only**, because remote /// RDMA fetch can only reach CPU DRAM, never GPU HBM. Whenever the L1 /// tier evicts a block, the caller must invoke this so the meta-store /// stops advertising the block as remotely available on this instance. pub fn remove(&mut self, block_hash: u64, instance: InstanceId) { if let Some(bucket) = self.map.get_mut(&block_hash) { bucket.retain(|e| e.instance != instance); if bucket.is_empty() { self.map.remove(&block_hash); } } } /// Lookup which (alive) instances claim to hold a given block. pub fn instances_for(&self, hash: u64, now: f64) -> SmallVec<[InstanceId; 4]> { let mut out = SmallVec::new(); if let Some(bucket) = self.map.get(&hash) { for e in bucket { if e.expires_at >= now { out.push(e.instance); } } } out } } #[cfg(test)] mod tests { use super::*; #[test] fn score_prefix_basic() { let mut m = MetaStore::new(60.0); m.insert(10, 0, 0.0); m.insert(11, 0, 0.0); m.insert(12, 0, 0.0); m.insert(10, 1, 0.0); m.insert(11, 1, 0.0); // instance 1 only has 10,11; instance 0 has 10,11,12 let s = m.score_prefix(&[10, 11, 12, 13], 1.0, 4); assert_eq!(s[0], 3); assert_eq!(s[1], 2); assert_eq!(s[2], 0); } #[test] fn remove_cleans_up() { let mut m = MetaStore::new(60.0); m.insert(10, 0, 0.0); m.insert(10, 1, 0.0); m.insert(11, 0, 0.0); // instance 0 has both blocks, instance 1 has block 10 only let owners = m.instances_for(10, 0.5); assert_eq!(owners.len(), 2); // Remove instance 0's entry for block 10 m.remove(10, 0); let owners = m.instances_for(10, 0.5); assert_eq!(owners.len(), 1); assert_eq!(owners[0], 1); // Instance 0 still owns block 11 let owners = m.instances_for(11, 0.5); assert_eq!(owners.len(), 1); assert_eq!(owners[0], 0); // Remove last owner of a block -> entry fully cleaned m.remove(10, 1); let owners = m.instances_for(10, 0.5); assert!(owners.is_empty()); } #[test] fn ttl_expiry() { let mut m = MetaStore::new(1.0); m.insert(10, 0, 0.0); let s_now = m.score_prefix(&[10], 0.5, 2); assert_eq!(s_now[0], 1); let s_later = m.score_prefix(&[10], 5.0, 2); assert_eq!(s_later[0], 0); } }