fix: kvcache evict workflow

This commit is contained in:
2026-04-14 15:46:36 +08:00
parent 663ca9c5b9
commit eaf574cd4e
4 changed files with 257 additions and 59 deletions

View File

@@ -4,6 +4,7 @@
use crate::cluster::meta_store::MetaStore;
use crate::config::{Config, ModelConfig};
use crate::instance::instance::AdmittedRequest;
use crate::instance::kv_cache::L1Change;
use crate::instance::Instance;
use crate::router::{self, RouteDecision, Router};
use crate::trace::RequestRecord;
@@ -53,7 +54,9 @@ impl Cluster {
/// per-request stats for metrics. Does NOT schedule the BatchTick — the
/// simulator driver does that based on the returned `ready_at`.
pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats {
let decision = self.router.route(req, &self.instances, &self.meta_store, now);
let decision = self
.router
.route(req, &self.instances, &self.meta_store, now);
let inst_id = decision.chosen;
let probe_overhead_s = decision.probe_overhead_s;
@@ -68,19 +71,18 @@ impl Cluster {
// 2. L1 lookup on the remaining suffix.
let suffix_after_l0 = &req.hash_ids[l0_hits as usize..];
let l1_hits = inst.cache.l1.longest_prefix(suffix_after_l0) as u32;
let l1_hits = inst.cache.l1.longest_prefix_peek(suffix_after_l0) as u32;
// L1->L0 transfer cost
let l1_bytes = (l1_hits as u64) * self.kv_block_bytes;
let mut t = effective_now;
let mut l1_changes = Vec::new();
if l1_hits > 0 {
t = inst.links.pcie.reserve(t, l1_bytes);
// Promote those blocks into L0
let mut evicted = Vec::new();
inst.cache.l0.insert_blocks(
&suffix_after_l0[..l1_hits as usize],
&mut evicted,
);
l1_changes = inst
.cache
.promote_l1_blocks_to_l0(&suffix_after_l0[..l1_hits as usize]);
}
Self::apply_l1_changes(&mut self.meta_store, inst_id, now, &l1_changes);
// 3. Remote v6d lookup for the still-remaining suffix.
let suffix_after_l1 = &suffix_after_l0[l1_hits as usize..];
@@ -98,20 +100,14 @@ impl Cluster {
}
let remote_bytes = (remote_hit_blocks as u64) * self.kv_block_bytes;
if remote_hit_blocks > 0 {
// RDMA from peer host -> local DRAM, then PCIe -> GPU
let inst = &mut self.instances[inst_id as usize];
t = inst.links.rdma.reserve(t, remote_bytes);
t = inst.links.pcie.reserve(t, remote_bytes);
// Insert into local L1 (occupies LRU space) AND into L0
let pulled = &suffix_after_l1[..remote_hit_blocks as usize];
let mut evicted_l1 = Vec::new();
inst.cache.l1.insert_blocks(pulled, &mut evicted_l1);
let mut evicted_l0 = Vec::new();
inst.cache.l0.insert_blocks(pulled, &mut evicted_l0);
// The local instance now also owns these blocks - update meta_store.
for &h in pulled {
self.meta_store.insert(h, inst_id, now);
}
let l1_changes = {
let inst = &mut self.instances[inst_id as usize];
t = inst.links.rdma.reserve(t, remote_bytes);
t = inst.links.pcie.reserve(t, remote_bytes);
inst.cache.fetch_remote_blocks_to_l0(pulled)
};
Self::apply_l1_changes(&mut self.meta_store, inst_id, now, &l1_changes);
}
// 4. Miss = remaining tokens to prefill from scratch.
@@ -119,20 +115,14 @@ impl Cluster {
let miss_tokens = miss_blocks * self.block_size_tokens;
// The newly-prefilled blocks (after the request runs) are inserted
// into L0 here, and into L1 / meta_store via async writeback. Doing
// this at admission time is OK because we're tracking presence, not
// actually moving bytes — the writeback latency is hidden behind
// request execution and we don't model meta_store inconsistency
// window beyond the TTL itself.
let inst = &mut self.instances[inst_id as usize];
// into L0 here. Only later L0 evictions become remotely visible by
// landing in L1 and being published to the meta store.
let new_input_blocks = &req.hash_ids[(l0_hits + l1_hits + remote_hit_blocks) as usize..];
let mut evicted_l0 = Vec::new();
inst.cache.l0.insert_blocks(new_input_blocks, &mut evicted_l0);
let mut evicted_l1 = Vec::new();
inst.cache.l1.insert_blocks(new_input_blocks, &mut evicted_l1);
for &h in new_input_blocks {
self.meta_store.insert(h, inst_id, now);
}
let l1_changes = {
let inst = &mut self.instances[inst_id as usize];
inst.cache.insert_blocks_into_l0(new_input_blocks)
};
Self::apply_l1_changes(&mut self.meta_store, inst_id, now, &l1_changes);
// 5. Reserve KV slots for this request's prefill residency.
// PD disaggregation: decode runs elsewhere, so only the input
@@ -145,6 +135,7 @@ impl Cluster {
prefill_tokens_remaining: miss_tokens,
reserved_blocks,
};
let inst = &mut self.instances[inst_id as usize];
inst.admit(admitted);
let pcie_bytes = l1_bytes + remote_bytes;
@@ -164,4 +155,18 @@ impl Cluster {
decision,
}
}
fn apply_l1_changes(
meta_store: &mut MetaStore,
inst_id: InstanceId,
now: f64,
changes: &[L1Change],
) {
for change in changes {
match *change {
L1Change::Added(hash) => meta_store.insert(hash, inst_id, now),
L1Change::Removed(hash) => meta_store.remove(hash, inst_id),
}
}
}
}

View File

@@ -116,6 +116,21 @@ impl MetaStore {
scores
}
/// Remove `instance`'s entry for `block_hash` (e.g. after L1 eviction).
///
/// The meta-store must reflect **L1 (DRAM) presence only**, because remote
/// RDMA fetch can only reach CPU DRAM, never GPU HBM. Whenever the L1
/// tier evicts a block, the caller must invoke this so the meta-store
/// stops advertising the block as remotely available on this instance.
pub fn remove(&mut self, block_hash: u64, instance: InstanceId) {
if let Some(bucket) = self.map.get_mut(&block_hash) {
bucket.retain(|e| e.instance != instance);
if bucket.is_empty() {
self.map.remove(&block_hash);
}
}
}
/// Lookup which (alive) instances claim to hold a given block.
pub fn instances_for(&self, hash: u64, now: f64) -> SmallVec<[InstanceId; 4]> {
let mut out = SmallVec::new();
@@ -149,6 +164,34 @@ mod tests {
assert_eq!(s[2], 0);
}
#[test]
fn remove_cleans_up() {
let mut m = MetaStore::new(60.0);
m.insert(10, 0, 0.0);
m.insert(10, 1, 0.0);
m.insert(11, 0, 0.0);
// instance 0 has both blocks, instance 1 has block 10 only
let owners = m.instances_for(10, 0.5);
assert_eq!(owners.len(), 2);
// Remove instance 0's entry for block 10
m.remove(10, 0);
let owners = m.instances_for(10, 0.5);
assert_eq!(owners.len(), 1);
assert_eq!(owners[0], 1);
// Instance 0 still owns block 11
let owners = m.instances_for(11, 0.5);
assert_eq!(owners.len(), 1);
assert_eq!(owners[0], 0);
// Remove last owner of a block -> entry fully cleaned
m.remove(10, 1);
let owners = m.instances_for(10, 0.5);
assert!(owners.is_empty());
}
#[test]
fn ttl_expiry() {
let mut m = MetaStore::new(1.0);