feat: wire bucket identities through driver outputs

This commit is contained in:
2026-04-17 17:52:49 +08:00
parent 3a84c15068
commit b5a6fb964c
9 changed files with 148 additions and 43 deletions

View File

@@ -1288,7 +1288,10 @@ sim:
); );
let cfg = Config::from_yaml_path(&path).unwrap(); let cfg = Config::from_yaml_path(&path).unwrap();
assert_eq!(cfg.cluster.global_router.mode, GlobalRouterMode::BucketScore); assert_eq!(
cfg.cluster.global_router.mode,
GlobalRouterMode::BucketScore
);
assert!((cfg.cluster.global_router.length_penalty_weight - 1.5).abs() < 1e-12); assert!((cfg.cluster.global_router.length_penalty_weight - 1.5).abs() < 1e-12);
assert!((cfg.cluster.global_router.load_weight - 0.75).abs() < 1e-12); assert!((cfg.cluster.global_router.load_weight - 0.75).abs() < 1e-12);
assert!((cfg.cluster.global_router.cache_weight - 2.25).abs() < 1e-12); assert!((cfg.cluster.global_router.cache_weight - 2.25).abs() < 1e-12);

View File

@@ -6,7 +6,7 @@ use std::collections::{HashMap, VecDeque};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::cluster::Cluster; use crate::cluster::BucketedService;
use crate::config::{Config, RouterMode}; use crate::config::{Config, RouterMode};
use crate::metrics::ablation::AblationRow; use crate::metrics::ablation::AblationRow;
use crate::metrics::per_request::{PerRequestRow, PerRequestWriter}; use crate::metrics::per_request::{PerRequestRow, PerRequestWriter};
@@ -37,7 +37,9 @@ pub struct RunOutputs {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct InflightInfo { struct InflightInfo {
arrival: f64, arrival: f64,
bucket: u32,
instance: u32, instance: u32,
length_bucket_match: bool,
total_blocks: u32, total_blocks: u32,
l0_hit_blocks: u32, l0_hit_blocks: u32,
l1_hit_blocks: u32, l1_hit_blocks: u32,
@@ -49,10 +51,7 @@ struct InflightInfo {
} }
pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> { pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
config let mut service = BucketedService::new(config, &config.model);
.cluster
.require_legacy_single_pool("driver run")?;
let mut cluster = Cluster::new(config, &config.model)?;
let mut q = EventQueue::new(); let mut q = EventQueue::new();
// Output directory // Output directory
@@ -111,13 +110,16 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
Some(r) => r.clone(), Some(r) => r.clone(),
None => continue, None => continue,
}; };
let stats = cluster.route_and_admit(&req, now); let stats = service.route_and_admit(&req, now)?;
rt_writer.write(&stats.decision)?; rt_writer.write(&stats.decision)?;
let strict_bucket = config.cluster.bucket_index_for_input_len(req.input_len)?;
inflight.insert( inflight.insert(
req_id, req_id,
InflightInfo { InflightInfo {
arrival: req.arrival, arrival: req.arrival,
bucket: stats.bucket,
instance: stats.instance, instance: stats.instance,
length_bucket_match: stats.bucket as usize == strict_bucket,
total_blocks: req.hash_ids.len() as u32, total_blocks: req.hash_ids.len() as u32,
l0_hit_blocks: stats.l0_hit_blocks, l0_hit_blocks: stats.l0_hit_blocks,
l1_hit_blocks: stats.l1_hit_blocks, l1_hit_blocks: stats.l1_hit_blocks,
@@ -128,20 +130,23 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
probe_overhead_s: stats.probe_overhead_s, probe_overhead_s: stats.probe_overhead_s,
}, },
); );
let inst = &mut cluster.instances[stats.instance as usize]; let inst = &mut service.buckets[stats.bucket as usize].cluster.instances
[stats.instance as usize];
if !inst.tick_scheduled { if !inst.tick_scheduled {
inst.tick_scheduled = true; inst.tick_scheduled = true;
let when = stats.ready_at.max(now); let when = stats.ready_at.max(now);
q.schedule( q.schedule(
when, when,
Event::BatchTick { Event::BatchTick {
bucket: stats.bucket,
instance: stats.instance, instance: stats.instance,
}, },
); );
} }
} }
Event::BatchTick { instance } => { Event::BatchTick { bucket, instance } => {
let inst = &mut cluster.instances[instance as usize]; let inst =
&mut service.buckets[bucket as usize].cluster.instances[instance as usize];
inst.tick_scheduled = false; inst.tick_scheduled = false;
let result = inst.step(now); let result = inst.step(now);
for (rid, ttft, end) in result.completed { for (rid, ttft, end) in result.completed {
@@ -151,7 +156,9 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
arrival: info.arrival, arrival: info.arrival,
ttft, ttft,
e2e: end - info.arrival, e2e: end - info.arrival,
bucket: info.bucket,
instance: info.instance, instance: info.instance,
length_bucket_match: info.length_bucket_match,
total_blocks: info.total_blocks, total_blocks: info.total_blocks,
l0_hit_blocks: info.l0_hit_blocks, l0_hit_blocks: info.l0_hit_blocks,
l1_hit_blocks: info.l1_hit_blocks, l1_hit_blocks: info.l1_hit_blocks,
@@ -166,18 +173,21 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
} }
} }
if let Some(next) = result.next_tick { if let Some(next) = result.next_tick {
let inst = &mut cluster.instances[instance as usize]; let inst =
&mut service.buckets[bucket as usize].cluster.instances[instance as usize];
if !inst.tick_scheduled { if !inst.tick_scheduled {
inst.tick_scheduled = true; inst.tick_scheduled = true;
q.schedule(next.max(now), Event::BatchTick { instance }); q.schedule(next.max(now), Event::BatchTick { bucket, instance });
} }
} }
} }
Event::Sample => { Event::Sample => {
for inst in &cluster.instances { for bucket in &service.buckets {
for inst in &bucket.cluster.instances {
let busy = if inst.queue_len() > 0 { 1 } else { 0 }; let busy = if inst.queue_len() > 0 { 1 } else { 0 };
ts_writer.write(&TimeseriesRow { ts_writer.write(&TimeseriesRow {
t: now, t: now,
bucket: bucket.id,
instance: inst.id, instance: inst.id,
queue_len: inst.queue_len(), queue_len: inst.queue_len(),
kv_blocks_used: inst.kv_blocks_used, kv_blocks_used: inst.kv_blocks_used,
@@ -186,6 +196,7 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
})?; })?;
} }
} }
}
Event::Stop => break, Event::Stop => break,
} }
} }

View File

@@ -471,8 +471,7 @@ fn cmd_oracle(
out_path: Option<&std::path::Path>, out_path: Option<&std::path::Path>,
) -> Result<()> { ) -> Result<()> {
let cfg = load(path, overrides)?; let cfg = load(path, overrides)?;
cfg.cluster cfg.cluster.require_legacy_single_pool("oracle analysis")?;
.require_legacy_single_pool("oracle analysis")?;
let block_bytes = cfg.model.kv_block_bytes() as f64; let block_bytes = cfg.model.kv_block_bytes() as f64;
let per_instance_blocks = (cfg.hardware.hbm_bytes / block_bytes).max(1.0) as u64; let per_instance_blocks = (cfg.hardware.hbm_bytes / block_bytes).max(1.0) as u64;
let aggregate_blocks = per_instance_blocks * cfg.cluster.total_instances() as u64; let aggregate_blocks = per_instance_blocks * cfg.cluster.total_instances() as u64;

View File

@@ -8,7 +8,9 @@ pub struct PerRequestRow {
pub arrival: f64, pub arrival: f64,
pub ttft: f64, pub ttft: f64,
pub e2e: f64, pub e2e: f64,
pub bucket: u32,
pub instance: u32, pub instance: u32,
pub length_bucket_match: bool,
pub total_blocks: u32, pub total_blocks: u32,
pub l0_hit_blocks: u32, pub l0_hit_blocks: u32,
pub l1_hit_blocks: u32, pub l1_hit_blocks: u32,

View File

@@ -5,6 +5,7 @@ use std::path::Path;
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct TimeseriesRow { pub struct TimeseriesRow {
pub t: f64, pub t: f64,
pub bucket: u32,
pub instance: u32, pub instance: u32,
pub queue_len: u32, pub queue_len: u32,
pub kv_blocks_used: u32, pub kv_blocks_used: u32,

View File

@@ -196,7 +196,12 @@ fn build_next_use(records: &[RequestRecord]) -> Vec<Vec<u32>> {
/// Implementation: lazy-deletion max-heap keyed by next-use index. Each /// Implementation: lazy-deletion max-heap keyed by next-use index. Each
/// cache entry has a version; the heap may contain stale entries from /// cache entry has a version; the heap may contain stale entries from
/// previous insertions, which we skip on pop. /// previous insertions, which we skip on pop.
fn run_belady(records: &[RequestRecord], next_use: &[Vec<u32>], capacity: usize, mask: &[bool]) -> u64 { fn run_belady(
records: &[RequestRecord],
next_use: &[Vec<u32>],
capacity: usize,
mask: &[bool],
) -> u64 {
if capacity == 0 { if capacity == 0 {
return 0; return 0;
} }
@@ -327,10 +332,7 @@ mod tests {
// req 0 populates blocks [1,2,3] but is not counted. // req 0 populates blocks [1,2,3] but is not counted.
// req 1 has prefix [1,2,3,4] — the first 3 blocks are cache hits // req 1 has prefix [1,2,3,4] — the first 3 blocks are cache hits
// because req 0 populated them, even though req 0 is masked out. // because req 0 populated them, even though req 0 is masked out.
let recs = vec![ let recs = vec![req(0, 0.0, vec![1, 2, 3]), req(1, 1.0, vec![1, 2, 3, 4])];
req(0, 0.0, vec![1, 2, 3]),
req(1, 1.0, vec![1, 2, 3, 4]),
];
let mask = vec![false, true]; let mask = vec![false, true];
let out = analyze(&recs, 100, Some(&mask)); let out = analyze(&recs, 100, Some(&mask));
// Only req 1 is counted: total = 4, hits = 3 (prefix [1,2,3] hit) // Only req 1 is counted: total = 4, hits = 3 (prefix [1,2,3] hit)

View File

@@ -91,11 +91,24 @@ mod tests {
q.schedule( q.schedule(
2.0, 2.0,
Event::BatchTick { Event::BatchTick {
bucket: 0,
instance: 0 as InstanceId, instance: 0 as InstanceId,
}, },
); );
q.schedule(1.0, Event::BatchTick { instance: 1 }); q.schedule(
q.schedule(1.5, Event::BatchTick { instance: 2 }); 1.0,
Event::BatchTick {
bucket: 0,
instance: 1,
},
);
q.schedule(
1.5,
Event::BatchTick {
bucket: 0,
instance: 2,
},
);
let (t1, _) = q.pop().unwrap(); let (t1, _) = q.pop().unwrap();
let (t2, _) = q.pop().unwrap(); let (t2, _) = q.pop().unwrap();
let (t3, _) = q.pop().unwrap(); let (t3, _) = q.pop().unwrap();
@@ -107,12 +120,24 @@ mod tests {
#[test] #[test]
fn equal_time_fifo() { fn equal_time_fifo() {
let mut q = EventQueue::new(); let mut q = EventQueue::new();
q.schedule(1.0, Event::BatchTick { instance: 7 }); q.schedule(
q.schedule(1.0, Event::BatchTick { instance: 8 }); 1.0,
Event::BatchTick {
bucket: 0,
instance: 7,
},
);
q.schedule(
1.0,
Event::BatchTick {
bucket: 1,
instance: 8,
},
);
let (_, e1) = q.pop().unwrap(); let (_, e1) = q.pop().unwrap();
let (_, e2) = q.pop().unwrap(); let (_, e2) = q.pop().unwrap();
match (e1, e2) { match (e1, e2) {
(Event::BatchTick { instance: a }, Event::BatchTick { instance: b }) => { (Event::BatchTick { instance: a, .. }, Event::BatchTick { instance: b, .. }) => {
assert_eq!(a, 7); assert_eq!(a, 7);
assert_eq!(b, 8); assert_eq!(b, 8);
} }

View File

@@ -1,5 +1,6 @@
//! Event types for the discrete-event engine. //! Event types for the discrete-event engine.
use crate::router::BucketId;
use crate::types::{InstanceId, ReqId}; use crate::types::{InstanceId, ReqId};
#[derive(Debug)] #[derive(Debug)]
@@ -7,7 +8,10 @@ pub enum Event {
/// New trace request arrives at the cluster router. /// New trace request arrives at the cluster router.
Arrival { req_id: ReqId }, Arrival { req_id: ReqId },
/// Per-instance scheduler tick (continuous batching). /// Per-instance scheduler tick (continuous batching).
BatchTick { instance: InstanceId }, BatchTick {
bucket: BucketId,
instance: InstanceId,
},
/// Periodic time-series sample of all instances. /// Periodic time-series sample of all instances.
Sample, Sample,
/// Stop the simulation early (used internally). /// Stop the simulation early (used internally).

View File

@@ -296,7 +296,69 @@ fn ablation_parallel_matches_serial() {
} }
#[test] #[test]
fn bucketed_configs_are_rejected_by_legacy_runtime_paths() { fn strict_bucket_run_emits_bucket_fields_in_outputs() {
let tmp = std::env::temp_dir().join("kvcache_sim_bucket_outputs");
let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap();
let trace_path = tmp.join("trace.jsonl");
let mut f = std::fs::File::create(&trace_path).unwrap();
writeln!(
f,
"{}",
serde_json::json!({
"chat_id": 1,
"parent_chat_id": -1,
"timestamp": 0.0,
"input_length": 32,
"output_length": 16,
"type": "text",
"turn": 0,
"hash_ids": [1, 2]
})
)
.unwrap();
writeln!(
f,
"{}",
serde_json::json!({
"chat_id": 2,
"parent_chat_id": -1,
"timestamp": 0.1,
"input_length": 80,
"output_length": 16,
"type": "text",
"turn": 0,
"hash_ids": [3, 4, 5, 6, 7]
})
)
.unwrap();
let mut cfg = bucketed_config(
trace_path.to_str().unwrap(),
tmp.to_str().unwrap(),
RouterMode::LeastLoaded,
);
cfg.cluster.global_router.mode = GlobalRouterMode::StrictInputLength;
cfg.sim.sample_interval_s = 0.05;
let _ = driver::run(&cfg, Some("strict_bucket")).expect("bucketed run");
let per_request = std::fs::read_to_string(tmp.join("strict_bucket/per_request.csv")).unwrap();
assert!(per_request.contains("bucket"));
assert!(per_request.contains("length_bucket_match"));
let instances = std::fs::read_to_string(tmp.join("strict_bucket/instances.csv")).unwrap();
assert!(instances.contains("bucket"));
let routing_log = std::fs::read_to_string(tmp.join("strict_bucket/routing_log.jsonl")).unwrap();
assert!(routing_log.contains("\"chosen_bucket\""));
assert!(routing_log.contains("\"bucket_candidates\""));
assert!(routing_log.contains("\"global_reason\""));
}
#[test]
fn bucketed_configs_are_rejected_by_legacy_fixed_placement_paths() {
let tmp = std::env::temp_dir().join("kvcache_sim_bucketed_reject"); let tmp = std::env::temp_dir().join("kvcache_sim_bucketed_reject");
let _ = std::fs::remove_dir_all(&tmp); let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap(); std::fs::create_dir_all(&tmp).unwrap();
@@ -309,12 +371,8 @@ fn bucketed_configs_are_rejected_by_legacy_runtime_paths() {
RouterMode::Random, RouterMode::Random,
); );
let result = driver::run(&cfg, Some("bucketed_guard")); let err =
assert!(result.is_err(), "bucketed run should fail"); driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru])
let err = result.err().unwrap();
assert!(err.to_string().contains("cluster.buckets"));
let err = driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru])
.expect_err("bucketed ablation should fail"); .expect_err("bucketed ablation should fail");
assert!(err.to_string().contains("cluster.buckets")); assert!(err.to_string().contains("cluster.buckets"));