//! Cluster: routes arrivals, performs the L0 / L1 / remote-RDMA fetch chain //! described in the design diagram, and bookkeeps the global meta store. use crate::cluster::meta_store::MetaStore; use crate::config::{Config, ModelConfig}; use crate::instance::instance::AdmittedRequest; use crate::instance::Instance; use crate::router::{self, RouteDecision, Router}; use crate::trace::RequestRecord; use crate::types::InstanceId; #[derive(Debug, Clone)] pub struct AdmissionStats { pub instance: InstanceId, pub l0_hit_blocks: u32, pub l1_hit_blocks: u32, pub remote_hit_blocks: u32, pub miss_blocks: u32, pub rdma_bytes: u64, pub pcie_bytes: u64, pub fetch_time_s: f64, pub probe_overhead_s: f64, pub ready_at: f64, pub decision: RouteDecision, } pub struct Cluster { pub instances: Vec, pub meta_store: MetaStore, pub router: Box, pub block_size_tokens: u32, pub kv_block_bytes: u64, } impl Cluster { pub fn new(config: &Config, model: &ModelConfig) -> Self { let mut instances = Vec::with_capacity(config.cluster.num_instances as usize); for id in 0..config.cluster.num_instances { instances.push(Instance::new(id as InstanceId, model, &config.hardware)); } let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds); let router = router::build(config, config.sim.seed); Self { instances, meta_store, router, block_size_tokens: model.block_size_tokens, kv_block_bytes: model.kv_block_bytes(), } } /// Route + admit a request. Returns the chosen instance plus rich /// per-request stats for metrics. Does NOT schedule the BatchTick — the /// simulator driver does that based on the returned `ready_at`. pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats { let decision = self.router.route(req, &self.instances, &self.meta_store, now); let inst_id = decision.chosen; let probe_overhead_s = decision.probe_overhead_s; // The router probe overhead delays the request's effective start time. let effective_now = now + probe_overhead_s; let inst = &mut self.instances[inst_id as usize]; let total_blocks = req.hash_ids.len() as u32; // 1. L0 lookup (touches matched blocks). let l0_hits = inst.cache.l0.longest_prefix(&req.hash_ids) as u32; // 2. L1 lookup on the remaining suffix. let suffix_after_l0 = &req.hash_ids[l0_hits as usize..]; let l1_hits = inst.cache.l1.longest_prefix(suffix_after_l0) as u32; // L1->L0 transfer cost let l1_bytes = (l1_hits as u64) * self.kv_block_bytes; let mut t = effective_now; if l1_hits > 0 { t = inst.links.pcie.reserve(t, l1_bytes); // Promote those blocks into L0 let mut evicted = Vec::new(); inst.cache.l0.insert_blocks( &suffix_after_l0[..l1_hits as usize], &mut evicted, ); } // 3. Remote v6d lookup for the still-remaining suffix. let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..]; let mut remote_hit_blocks: u32 = 0; for &h in suffix_after_l1 { // A block is remotely available iff some instance other than // `inst_id` lists it (and not expired). let owners = self.meta_store.instances_for(h, now); let any_remote = owners.iter().any(|o| *o != inst_id); if any_remote { remote_hit_blocks += 1; } else { break; // contiguous prefix - stop on first miss } } let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes; if remote_hit_blocks > 0 { // RDMA from peer host -> local DRAM, then PCIe -> GPU let inst = &mut self.instances[inst_id as usize]; t = inst.links.rdma.reserve(t, remote_bytes); t = inst.links.pcie.reserve(t, remote_bytes); // Insert into local L1 (occupies LRU space) AND into L0 let pulled = &suffix_after_l1[..remote_hit_blocks as usize]; let mut evicted_l1 = Vec::new(); inst.cache.l1.insert_blocks(pulled, &mut evicted_l1); let mut evicted_l0 = Vec::new(); inst.cache.l0.insert_blocks(pulled, &mut evicted_l0); // The local instance now also owns these blocks - update meta_store. for &h in pulled { self.meta_store.insert(h, inst_id, now); } } // 4. Miss = remaining tokens to prefill from scratch. let miss_blocks = total_blocks - l0_hits - l1_hits - remote_hit_blocks; let miss_tokens = miss_blocks * self.block_size_tokens; // The newly-prefilled blocks (after the request runs) are inserted // into L0 here, and into L1 / meta_store via async writeback. Doing // this at admission time is OK because we're tracking presence, not // actually moving bytes — the writeback latency is hidden behind // request execution and we don't model meta_store inconsistency // window beyond the TTL itself. let inst = &mut self.instances[inst_id as usize]; let new_input_blocks = &req.hash_ids[(l0_hits + l1_hits + remote_hit_blocks) as usize..]; let mut evicted_l0 = Vec::new(); inst.cache.l0.insert_blocks(new_input_blocks, &mut evicted_l0); let mut evicted_l1 = Vec::new(); inst.cache.l1.insert_blocks(new_input_blocks, &mut evicted_l1); for &h in new_input_blocks { self.meta_store.insert(h, inst_id, now); } // 5. Reserve KV slots for this request's prefill residency. // PD disaggregation: decode runs elsewhere, so only the input // blocks occupy HBM on this instance. let reserved_blocks = total_blocks; let admitted = AdmittedRequest { req_id: req.req_id, arrival: req.arrival, ready_at: t, prefill_tokens_remaining: miss_tokens, reserved_blocks, }; inst.admit(admitted); let pcie_bytes = l1_bytes + remote_bytes; let fetch_time_s = (t - effective_now).max(0.0); AdmissionStats { instance: inst_id, l0_hit_blocks: l0_hits, l1_hit_blocks: l1_hits, remote_hit_blocks, miss_blocks, rdma_bytes: remote_bytes, pcie_bytes, fetch_time_s, probe_overhead_s, ready_at: t, decision, } } }