diff --git a/src/config.rs b/src/config.rs index 36e4947..087b960 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1288,7 +1288,10 @@ sim: ); let cfg = Config::from_yaml_path(&path).unwrap(); - assert_eq!(cfg.cluster.global_router.mode, GlobalRouterMode::BucketScore); + assert_eq!( + cfg.cluster.global_router.mode, + GlobalRouterMode::BucketScore + ); assert!((cfg.cluster.global_router.length_penalty_weight - 1.5).abs() < 1e-12); assert!((cfg.cluster.global_router.load_weight - 0.75).abs() < 1e-12); assert!((cfg.cluster.global_router.cache_weight - 2.25).abs() < 1e-12); diff --git a/src/driver.rs b/src/driver.rs index dbc6b5e..0494942 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -6,7 +6,7 @@ use std::collections::{HashMap, VecDeque}; use std::path::Path; use std::sync::{Arc, Mutex}; -use crate::cluster::Cluster; +use crate::cluster::BucketedService; use crate::config::{Config, RouterMode}; use crate::metrics::ablation::AblationRow; use crate::metrics::per_request::{PerRequestRow, PerRequestWriter}; @@ -37,7 +37,9 @@ pub struct RunOutputs { #[derive(Debug, Clone)] struct InflightInfo { arrival: f64, + bucket: u32, instance: u32, + length_bucket_match: bool, total_blocks: u32, l0_hit_blocks: u32, l1_hit_blocks: u32, @@ -49,10 +51,7 @@ struct InflightInfo { } pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { - config - .cluster - .require_legacy_single_pool("driver run")?; - let mut cluster = Cluster::new(config, &config.model)?; + let mut service = BucketedService::new(config, &config.model); let mut q = EventQueue::new(); // Output directory @@ -111,13 +110,16 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { Some(r) => r.clone(), None => continue, }; - let stats = cluster.route_and_admit(&req, now); + let stats = service.route_and_admit(&req, now)?; rt_writer.write(&stats.decision)?; + let strict_bucket = config.cluster.bucket_index_for_input_len(req.input_len)?; inflight.insert( req_id, InflightInfo { arrival: req.arrival, + bucket: stats.bucket, instance: stats.instance, + length_bucket_match: stats.bucket as usize == strict_bucket, total_blocks: req.hash_ids.len() as u32, l0_hit_blocks: stats.l0_hit_blocks, l1_hit_blocks: stats.l1_hit_blocks, @@ -128,20 +130,23 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { probe_overhead_s: stats.probe_overhead_s, }, ); - let inst = &mut cluster.instances[stats.instance as usize]; + let inst = &mut service.buckets[stats.bucket as usize].cluster.instances + [stats.instance as usize]; if !inst.tick_scheduled { inst.tick_scheduled = true; let when = stats.ready_at.max(now); q.schedule( when, Event::BatchTick { + bucket: stats.bucket, instance: stats.instance, }, ); } } - Event::BatchTick { instance } => { - let inst = &mut cluster.instances[instance as usize]; + Event::BatchTick { bucket, instance } => { + let inst = + &mut service.buckets[bucket as usize].cluster.instances[instance as usize]; inst.tick_scheduled = false; let result = inst.step(now); for (rid, ttft, end) in result.completed { @@ -151,7 +156,9 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { arrival: info.arrival, ttft, e2e: end - info.arrival, + bucket: info.bucket, instance: info.instance, + length_bucket_match: info.length_bucket_match, total_blocks: info.total_blocks, l0_hit_blocks: info.l0_hit_blocks, l1_hit_blocks: info.l1_hit_blocks, @@ -166,24 +173,28 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { } } if let Some(next) = result.next_tick { - let inst = &mut cluster.instances[instance as usize]; + let inst = + &mut service.buckets[bucket as usize].cluster.instances[instance as usize]; if !inst.tick_scheduled { inst.tick_scheduled = true; - q.schedule(next.max(now), Event::BatchTick { instance }); + q.schedule(next.max(now), Event::BatchTick { bucket, instance }); } } } Event::Sample => { - for inst in &cluster.instances { - let busy = if inst.queue_len() > 0 { 1 } else { 0 }; - ts_writer.write(&TimeseriesRow { - t: now, - instance: inst.id, - queue_len: inst.queue_len(), - kv_blocks_used: inst.kv_blocks_used, - kv_blocks_total: inst.hbm_block_budget, - busy, - })?; + for bucket in &service.buckets { + for inst in &bucket.cluster.instances { + let busy = if inst.queue_len() > 0 { 1 } else { 0 }; + ts_writer.write(&TimeseriesRow { + t: now, + bucket: bucket.id, + instance: inst.id, + queue_len: inst.queue_len(), + kv_blocks_used: inst.kv_blocks_used, + kv_blocks_total: inst.hbm_block_budget, + busy, + })?; + } } } Event::Stop => break, diff --git a/src/main.rs b/src/main.rs index b174148..af59237 100644 --- a/src/main.rs +++ b/src/main.rs @@ -471,8 +471,7 @@ fn cmd_oracle( out_path: Option<&std::path::Path>, ) -> Result<()> { let cfg = load(path, overrides)?; - cfg.cluster - .require_legacy_single_pool("oracle analysis")?; + cfg.cluster.require_legacy_single_pool("oracle analysis")?; let block_bytes = cfg.model.kv_block_bytes() as f64; let per_instance_blocks = (cfg.hardware.hbm_bytes / block_bytes).max(1.0) as u64; let aggregate_blocks = per_instance_blocks * cfg.cluster.total_instances() as u64; diff --git a/src/metrics/per_request.rs b/src/metrics/per_request.rs index b4ccdd8..9d2a792 100644 --- a/src/metrics/per_request.rs +++ b/src/metrics/per_request.rs @@ -8,7 +8,9 @@ pub struct PerRequestRow { pub arrival: f64, pub ttft: f64, pub e2e: f64, + pub bucket: u32, pub instance: u32, + pub length_bucket_match: bool, pub total_blocks: u32, pub l0_hit_blocks: u32, pub l1_hit_blocks: u32, diff --git a/src/metrics/timeseries.rs b/src/metrics/timeseries.rs index cb670e0..3851465 100644 --- a/src/metrics/timeseries.rs +++ b/src/metrics/timeseries.rs @@ -5,6 +5,7 @@ use std::path::Path; #[derive(Debug, Clone, Serialize)] pub struct TimeseriesRow { pub t: f64, + pub bucket: u32, pub instance: u32, pub queue_len: u32, pub kv_blocks_used: u32, diff --git a/src/oracle.rs b/src/oracle.rs index 064a2d2..590736f 100644 --- a/src/oracle.rs +++ b/src/oracle.rs @@ -196,7 +196,12 @@ fn build_next_use(records: &[RequestRecord]) -> Vec> { /// Implementation: lazy-deletion max-heap keyed by next-use index. Each /// cache entry has a version; the heap may contain stale entries from /// previous insertions, which we skip on pop. -fn run_belady(records: &[RequestRecord], next_use: &[Vec], capacity: usize, mask: &[bool]) -> u64 { +fn run_belady( + records: &[RequestRecord], + next_use: &[Vec], + capacity: usize, + mask: &[bool], +) -> u64 { if capacity == 0 { return 0; } @@ -327,10 +332,7 @@ mod tests { // req 0 populates blocks [1,2,3] but is not counted. // req 1 has prefix [1,2,3,4] — the first 3 blocks are cache hits // because req 0 populated them, even though req 0 is masked out. - let recs = vec![ - req(0, 0.0, vec![1, 2, 3]), - req(1, 1.0, vec![1, 2, 3, 4]), - ]; + let recs = vec![req(0, 0.0, vec![1, 2, 3]), req(1, 1.0, vec![1, 2, 3, 4])]; let mask = vec![false, true]; let out = analyze(&recs, 100, Some(&mask)); // Only req 1 is counted: total = 4, hits = 3 (prefix [1,2,3] hit) diff --git a/src/sim/engine.rs b/src/sim/engine.rs index 7a515e6..83651a3 100644 --- a/src/sim/engine.rs +++ b/src/sim/engine.rs @@ -91,11 +91,24 @@ mod tests { q.schedule( 2.0, Event::BatchTick { + bucket: 0, instance: 0 as InstanceId, }, ); - q.schedule(1.0, Event::BatchTick { instance: 1 }); - q.schedule(1.5, Event::BatchTick { instance: 2 }); + q.schedule( + 1.0, + Event::BatchTick { + bucket: 0, + instance: 1, + }, + ); + q.schedule( + 1.5, + Event::BatchTick { + bucket: 0, + instance: 2, + }, + ); let (t1, _) = q.pop().unwrap(); let (t2, _) = q.pop().unwrap(); let (t3, _) = q.pop().unwrap(); @@ -107,12 +120,24 @@ mod tests { #[test] fn equal_time_fifo() { let mut q = EventQueue::new(); - q.schedule(1.0, Event::BatchTick { instance: 7 }); - q.schedule(1.0, Event::BatchTick { instance: 8 }); + q.schedule( + 1.0, + Event::BatchTick { + bucket: 0, + instance: 7, + }, + ); + q.schedule( + 1.0, + Event::BatchTick { + bucket: 1, + instance: 8, + }, + ); let (_, e1) = q.pop().unwrap(); let (_, e2) = q.pop().unwrap(); match (e1, e2) { - (Event::BatchTick { instance: a }, Event::BatchTick { instance: b }) => { + (Event::BatchTick { instance: a, .. }, Event::BatchTick { instance: b, .. }) => { assert_eq!(a, 7); assert_eq!(b, 8); } diff --git a/src/sim/events.rs b/src/sim/events.rs index e369fa2..c8847a3 100644 --- a/src/sim/events.rs +++ b/src/sim/events.rs @@ -1,5 +1,6 @@ //! Event types for the discrete-event engine. +use crate::router::BucketId; use crate::types::{InstanceId, ReqId}; #[derive(Debug)] @@ -7,7 +8,10 @@ pub enum Event { /// New trace request arrives at the cluster router. Arrival { req_id: ReqId }, /// Per-instance scheduler tick (continuous batching). - BatchTick { instance: InstanceId }, + BatchTick { + bucket: BucketId, + instance: InstanceId, + }, /// Periodic time-series sample of all instances. Sample, /// Stop the simulation early (used internally). diff --git a/tests/smoke.rs b/tests/smoke.rs index 76374f4..8a23f8b 100644 --- a/tests/smoke.rs +++ b/tests/smoke.rs @@ -296,7 +296,69 @@ fn ablation_parallel_matches_serial() { } #[test] -fn bucketed_configs_are_rejected_by_legacy_runtime_paths() { +fn strict_bucket_run_emits_bucket_fields_in_outputs() { + let tmp = std::env::temp_dir().join("kvcache_sim_bucket_outputs"); + let _ = std::fs::remove_dir_all(&tmp); + std::fs::create_dir_all(&tmp).unwrap(); + let trace_path = tmp.join("trace.jsonl"); + + let mut f = std::fs::File::create(&trace_path).unwrap(); + writeln!( + f, + "{}", + serde_json::json!({ + "chat_id": 1, + "parent_chat_id": -1, + "timestamp": 0.0, + "input_length": 32, + "output_length": 16, + "type": "text", + "turn": 0, + "hash_ids": [1, 2] + }) + ) + .unwrap(); + writeln!( + f, + "{}", + serde_json::json!({ + "chat_id": 2, + "parent_chat_id": -1, + "timestamp": 0.1, + "input_length": 80, + "output_length": 16, + "type": "text", + "turn": 0, + "hash_ids": [3, 4, 5, 6, 7] + }) + ) + .unwrap(); + + let mut cfg = bucketed_config( + trace_path.to_str().unwrap(), + tmp.to_str().unwrap(), + RouterMode::LeastLoaded, + ); + cfg.cluster.global_router.mode = GlobalRouterMode::StrictInputLength; + cfg.sim.sample_interval_s = 0.05; + + let _ = driver::run(&cfg, Some("strict_bucket")).expect("bucketed run"); + + let per_request = std::fs::read_to_string(tmp.join("strict_bucket/per_request.csv")).unwrap(); + assert!(per_request.contains("bucket")); + assert!(per_request.contains("length_bucket_match")); + + let instances = std::fs::read_to_string(tmp.join("strict_bucket/instances.csv")).unwrap(); + assert!(instances.contains("bucket")); + + let routing_log = std::fs::read_to_string(tmp.join("strict_bucket/routing_log.jsonl")).unwrap(); + assert!(routing_log.contains("\"chosen_bucket\"")); + assert!(routing_log.contains("\"bucket_candidates\"")); + assert!(routing_log.contains("\"global_reason\"")); +} + +#[test] +fn bucketed_configs_are_rejected_by_legacy_fixed_placement_paths() { let tmp = std::env::temp_dir().join("kvcache_sim_bucketed_reject"); let _ = std::fs::remove_dir_all(&tmp); std::fs::create_dir_all(&tmp).unwrap(); @@ -309,13 +371,9 @@ fn bucketed_configs_are_rejected_by_legacy_runtime_paths() { RouterMode::Random, ); - let result = driver::run(&cfg, Some("bucketed_guard")); - assert!(result.is_err(), "bucketed run should fail"); - let err = result.err().unwrap(); - assert!(err.to_string().contains("cluster.buckets")); - - let err = driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru]) - .expect_err("bucketed ablation should fail"); + let err = + driver::ablate_fixed_placement(&cfg, &[RouterMode::Random], &[ReplayEvictPolicy::Lru]) + .expect_err("bucketed ablation should fail"); assert!(err.to_string().contains("cluster.buckets")); let err = replay::replay_fixed_placement(