diff --git a/src/cluster/bucketed_service.rs b/src/cluster/bucketed_service.rs new file mode 100644 index 0000000..104f77a --- /dev/null +++ b/src/cluster/bucketed_service.rs @@ -0,0 +1,186 @@ +use super::cluster::{AdmissionStats, Cluster}; +use crate::config::{BucketConfig, Config, ModelConfig}; +use crate::instance::Instance; +use crate::router::{self, BucketId, GlobalRouter}; +use crate::trace::RequestRecord; + +pub struct ServiceBucket { + pub id: BucketId, + pub cfg: BucketConfig, + pub cluster: Cluster, +} + +impl ServiceBucket { + pub fn instances(&self) -> &[Instance] { + &self.cluster.instances + } +} + +pub struct BucketedService { + pub buckets: Vec, + pub global_router: Box, +} + +impl BucketedService { + pub fn new(config: &Config, model: &ModelConfig) -> Self { + let buckets = config + .cluster + .effective_buckets() + .into_iter() + .enumerate() + .map(|(idx, cfg)| ServiceBucket { + id: idx as BucketId, + cluster: Cluster::new_for_bucket(config, model, idx as BucketId, cfg.num_instances) + .expect("bucket-local cluster construction should succeed"), + cfg, + }) + .collect(); + + Self { + buckets, + global_router: router::build_global(config), + } + } + + pub fn bucket(&self, bucket_id: BucketId) -> &ServiceBucket { + &self.buckets[bucket_id as usize] + } + + pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats { + let bucket_views = self + .buckets + .iter() + .map(|bucket| bucket.cluster.bucket_view(bucket.id, &bucket.cfg)) + .collect::>(); + let global = self.global_router.route(req, &bucket_views, now); + let bucket = &mut self.buckets[global.chosen_bucket as usize]; + bucket + .cluster + .route_and_admit_with_global(req, now, &global) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::{ + BucketConfig, CalibrationConfig, ClusterConfig, Config, GlobalRouterConfig, + GlobalRouterMode, HardwareConfig, MetaStoreConfig, ModelConfig, RouterConfig, RouterMode, + SimConfig, + }; + use crate::trace::RequestRecord; + + fn test_config() -> Config { + Config { + model: ModelConfig { + name: "test".into(), + num_layers: 4, + num_kv_heads: 2, + head_dim: 64, + dtype_bytes: 2, + block_size_tokens: 16, + flops_per_token_prefill: Some(1.0e9), + attn_quadratic_coeff: Some(64.0), + ..Default::default() + }, + hardware: HardwareConfig { + gpu_flops: 1.0e14, + gpu_fp8_flops: 0.0, + gpu_fp4_flops: 0.0, + gpu_mem_bw: 1.0e12, + hbm_bytes: 1.0e9, + dram_bytes: 4.0e9, + host_dram_bw: 5.0e11, + pcie_bw: 32.0e9, + pcie_latency_us: 1.0, + rdma_bw: 12.0e9, + rdma_latency_us: 5.0, + intra_node_tp_bw: 9.0e11, + intra_node_tp_latency_us: 2.0, + tp_degree: 1, + max_batch_slots: 32, + prefill_chunk_tokens: 1024, + }, + calibration: CalibrationConfig::default(), + cluster: ClusterConfig { + num_instances: None, + buckets: vec![ + BucketConfig { + name: "short".into(), + input_length_min: 0, + input_length_max: 32, + num_instances: 2, + }, + BucketConfig { + name: "long".into(), + input_length_min: 33, + input_length_max: 96, + num_instances: 1, + }, + ], + global_router: GlobalRouterConfig { + mode: GlobalRouterMode::StrictInputLength, + length_penalty_weight: 1.0, + load_weight: 1.0, + cache_weight: 1.0, + }, + meta_store: MetaStoreConfig { + ttl_seconds: 1000.0, + }, + router: RouterConfig { + mode: RouterMode::LeastLoaded, + precise_probe_latency_us: 10.0, + precise_probe_topk: 2, + load_alpha: 0.0, + score_alpha: 1.0, + score_beta: 0.1, + prefix_k: 8, + affinity_fan_out: 2, + }, + }, + sim: SimConfig { + trace_path: String::new(), + max_requests: None, + output_dir: String::new(), + sample_interval_s: 0.0, + seed: 7, + input_length_min: None, + input_length_max: None, + }, + } + } + + fn req(req_id: u64, input_len: u32, hashes: &[u64]) -> RequestRecord { + RequestRecord { + req_id, + chat_id: req_id as i64, + parent_chat_id: -1, + turn: 0, + arrival: 0.0, + input_len, + output_len: 16, + hash_ids: hashes.to_vec(), + } + } + + #[test] + fn strict_input_length_routes_into_matching_bucket() { + let cfg = test_config(); + let mut service = BucketedService::new(&cfg, &cfg.model); + let stats = service.route_and_admit(&req(1, 24, &[10, 11]), 0.0); + assert_eq!(stats.bucket, 0); + assert_eq!(stats.decision.chosen_bucket, 0); + assert_eq!(service.bucket(0).instances().len(), 2); + } + + #[test] + fn bucket_meta_store_is_isolated() { + let cfg = test_config(); + let mut service = BucketedService::new(&cfg, &cfg.model); + let _ = service.route_and_admit(&req(1, 24, &[10, 11]), 0.0); + let long_stats = service.route_and_admit(&req(2, 64, &[10, 11, 12, 13]), 1.0); + assert_eq!(long_stats.bucket, 1); + assert_eq!(long_stats.remote_hit_blocks, 0); + assert_eq!(long_stats.l1_hit_blocks, 0); + } +} diff --git a/src/cluster/cluster.rs b/src/cluster/cluster.rs index 035a72e..31f4386 100644 --- a/src/cluster/cluster.rs +++ b/src/cluster/cluster.rs @@ -4,17 +4,18 @@ use anyhow::Result; use crate::cluster::meta_store::MetaStore; -use crate::config::{Config, ModelConfig}; +use crate::config::{BucketConfig, Config, ModelConfig}; use crate::instance::instance::AdmittedRequest; use crate::instance::kv_cache::L1Change; use crate::instance::Instance; -use crate::router::{self, RouteDecision, Router}; +use crate::router::{self, BucketId, BucketView, GlobalRouteDecision, RouteDecision, Router}; use crate::trace::RequestRecord; use crate::ttft::{classify_prefix_tiers, TtftModel}; use crate::types::InstanceId; #[derive(Debug, Clone)] pub struct AdmissionStats { + pub bucket: BucketId, pub instance: InstanceId, pub l0_hit_blocks: u32, pub l1_hit_blocks: u32, @@ -40,38 +41,39 @@ pub struct Cluster { impl Cluster { pub fn new(config: &Config, model: &ModelConfig) -> Result { let total_instances = config.cluster.require_legacy_single_pool("Cluster::new")?; - let mut instances = Vec::with_capacity(total_instances as usize); - for id in 0..total_instances { - instances.push(Instance::new( - id as InstanceId, - model, - &config.hardware, - &config.calibration, - )); - } - let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds); - let router = router::build(config, config.sim.seed); - Ok(Self { - instances, - meta_store, - router, - block_size_tokens: model.block_size_tokens, - kv_block_bytes: model.kv_block_bytes(), - ttft_model: TtftModel::new( - &config.hardware, - &config.calibration, - model.kv_block_bytes(), - ), - }) + Self::build_local_cluster(config, model, total_instances) + } + + pub fn new_for_bucket( + config: &Config, + model: &ModelConfig, + _bucket_id: BucketId, + num_instances: u32, + ) -> Result { + let mut local_config = config.clone(); + local_config.cluster.num_instances = Some(num_instances); + local_config.cluster.buckets.clear(); + Self::build_local_cluster(&local_config, model, num_instances) } /// Route + admit a request. Returns the chosen instance plus rich /// per-request stats for metrics. Does NOT schedule the BatchTick — the /// simulator driver does that based on the returned `ready_at`. pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats { + let global = GlobalRouteDecision::single_bucket(req.req_id, 0); + self.route_and_admit_with_global(req, now, &global) + } + + pub fn route_and_admit_with_global( + &mut self, + req: &RequestRecord, + now: f64, + global: &GlobalRouteDecision, + ) -> AdmissionStats { let decision = self .router - .route(req, &self.instances, &self.meta_store, now); + .route(req, &self.instances, &self.meta_store, now) + .with_global(global); let inst_id = decision.chosen; let probe_overhead_s = decision.probe_overhead_s; let scheduler_overhead_s = self @@ -154,6 +156,7 @@ impl Cluster { let fetch_time_s = (t - effective_now).max(0.0); AdmissionStats { + bucket: decision.chosen_bucket, instance: inst_id, l0_hit_blocks: l0_hits, l1_hit_blocks: l1_hits, @@ -168,6 +171,17 @@ impl Cluster { } } + pub fn bucket_view(&self, bucket_id: BucketId, cfg: &BucketConfig) -> BucketView { + BucketView { + id: bucket_id, + input_length_min: cfg.input_length_min, + input_length_max: cfg.input_length_max, + num_instances: self.instances.len() as u32, + total_queue_len: self.instances.iter().map(Instance::queue_len).sum(), + total_load_blocks: self.instances.iter().map(|inst| inst.kv_blocks_used).sum(), + } + } + fn apply_l1_changes( meta_store: &mut MetaStore, inst_id: InstanceId, @@ -181,6 +195,36 @@ impl Cluster { } } } + + fn build_local_cluster( + config: &Config, + model: &ModelConfig, + num_instances: u32, + ) -> Result { + let mut instances = Vec::with_capacity(num_instances as usize); + for id in 0..num_instances { + instances.push(Instance::new( + id as InstanceId, + model, + &config.hardware, + &config.calibration, + )); + } + let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds); + let router = router::build(config, config.sim.seed); + Ok(Self { + instances, + meta_store, + router, + block_size_tokens: model.block_size_tokens, + kv_block_bytes: model.kv_block_bytes(), + ttft_model: TtftModel::new( + &config.hardware, + &config.calibration, + model.kv_block_bytes(), + ), + }) + } } #[cfg(test)] diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index bc36667..01893e8 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,6 +1,8 @@ +pub mod bucketed_service; #[allow(clippy::module_inception)] pub mod cluster; pub mod meta_store; +pub use bucketed_service::BucketedService; pub use cluster::Cluster; pub use meta_store::MetaStore; diff --git a/src/router/adaptive_affinity.rs b/src/router/adaptive_affinity.rs index c1ff154..8b8c7cb 100644 --- a/src/router/adaptive_affinity.rs +++ b/src/router/adaptive_affinity.rs @@ -242,13 +242,13 @@ impl Router for AdaptiveAffinityRouter { self.observe(fp, now); - RouteDecision { - req_id: req.req_id, - mode: "adaptive_affinity", - chosen: instances[chosen_idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "adaptive_affinity", + instances[chosen_idx].id, + 0.0, candidates, reason, - } + ) } } diff --git a/src/router/cache_affinity.rs b/src/router/cache_affinity.rs index 5e09d33..a42a1af 100644 --- a/src/router/cache_affinity.rs +++ b/src/router/cache_affinity.rs @@ -205,13 +205,13 @@ impl Router for CacheAffinityRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "cache_affinity", - chosen: instances[best_idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "cache_affinity", + instances[best_idx].id, + 0.0, candidates, - reason: "argmin(α·q − γ·l0_hit − δ·meta_only) + rendezvous tiebreak", - } + "argmin(α·q − γ·l0_hit − δ·meta_only) + rendezvous tiebreak", + ) } } diff --git a/src/router/cache_load.rs b/src/router/cache_load.rs index 142e00c..3101c6f 100644 --- a/src/router/cache_load.rs +++ b/src/router/cache_load.rs @@ -77,13 +77,13 @@ impl Router for CacheLoadRouter { }); } - RouteDecision { - req_id: req.req_id, - mode: "cache_load", - chosen: instances[best_idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "cache_load", + instances[best_idx].id, + 0.0, candidates, - reason: "least-loaded 1/4, then best local L0 prefix", - } + "least-loaded 1/4, then best local L0 prefix", + ) } } diff --git a/src/router/cache_score.rs b/src/router/cache_score.rs index ab23882..a727f32 100644 --- a/src/router/cache_score.rs +++ b/src/router/cache_score.rs @@ -99,13 +99,13 @@ impl Router for CacheScoreRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "cache_score", - chosen: instances[best_idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "cache_score", + instances[best_idx].id, + 0.0, candidates, - reason: "argmin 2^(α·load + β·miss)", - } + "argmin 2^(α·load + β·miss)", + ) } } diff --git a/src/router/cache_score_ttl.rs b/src/router/cache_score_ttl.rs index c67ad1d..d411247 100644 --- a/src/router/cache_score_ttl.rs +++ b/src/router/cache_score_ttl.rs @@ -74,13 +74,13 @@ impl Router for CacheScoreTtlRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "cache_score_ttl", - chosen: instances[best_idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "cache_score_ttl", + instances[best_idx].id, + 0.0, candidates, - reason: "argmin 2^(alpha*load + beta*meta_store_miss)", - } + "argmin 2^(alpha*load + beta*meta_store_miss)", + ) } } diff --git a/src/router/estimated_ttft.rs b/src/router/estimated_ttft.rs index bc206bc..e4b8a69 100644 --- a/src/router/estimated_ttft.rs +++ b/src/router/estimated_ttft.rs @@ -89,13 +89,13 @@ impl Router for EstimatedTtftRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "estimated_ttft", - chosen: best, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "estimated_ttft", + best, + 0.0, candidates, - reason: "argmin(drain + scheduler + kv_prepare + prefill + first_token_tail)", - } + "argmin(drain + scheduler + kv_prepare + prefill + first_token_tail)", + ) } } diff --git a/src/router/global_bucket.rs b/src/router/global_bucket.rs new file mode 100644 index 0000000..0684e17 --- /dev/null +++ b/src/router/global_bucket.rs @@ -0,0 +1,133 @@ +use serde::Serialize; + +use crate::config::{Config, GlobalRouterMode}; +use crate::trace::RequestRecord; + +pub type BucketId = u32; + +#[derive(Debug, Clone, Serialize)] +pub struct BucketView { + pub id: BucketId, + pub input_length_min: u32, + pub input_length_max: u32, + pub num_instances: u32, + pub total_queue_len: u32, + pub total_load_blocks: u32, +} + +#[derive(Debug, Clone, Serialize)] +pub struct BucketCandidate { + pub bucket: BucketId, + pub input_length_min: u32, + pub input_length_max: u32, + pub num_instances: u32, + pub total_queue_len: u32, + pub total_load_blocks: u32, + pub matches_input_len: bool, +} + +#[derive(Debug, Clone, Serialize)] +pub struct GlobalRouteDecision { + pub req_id: u64, + pub mode: &'static str, + pub chosen_bucket: BucketId, + pub candidates: Vec, + pub reason: &'static str, +} + +impl GlobalRouteDecision { + pub fn single_bucket(req_id: u64, chosen_bucket: BucketId) -> Self { + Self { + req_id, + mode: "single_pool", + chosen_bucket, + candidates: Vec::new(), + reason: "single pool uses bucket 0", + } + } +} + +pub trait GlobalRouter: Send { + fn name(&self) -> &'static str; + fn route( + &mut self, + req: &RequestRecord, + buckets: &[BucketView], + now: f64, + ) -> GlobalRouteDecision; +} + +struct StrictInputLengthRouter { + reported_mode: &'static str, + reason: &'static str, +} + +impl StrictInputLengthRouter { + fn new(reported_mode: &'static str, reason: &'static str) -> Self { + Self { + reported_mode, + reason, + } + } +} + +impl GlobalRouter for StrictInputLengthRouter { + fn name(&self) -> &'static str { + self.reported_mode + } + + fn route( + &mut self, + req: &RequestRecord, + buckets: &[BucketView], + _now: f64, + ) -> GlobalRouteDecision { + let candidates = buckets + .iter() + .map(|view| BucketCandidate { + bucket: view.id, + input_length_min: view.input_length_min, + input_length_max: view.input_length_max, + num_instances: view.num_instances, + total_queue_len: view.total_queue_len, + total_load_blocks: view.total_load_blocks, + matches_input_len: view.input_length_min <= req.input_len + && req.input_len <= view.input_length_max, + }) + .collect::>(); + + let matches = candidates + .iter() + .filter(|candidate| candidate.matches_input_len) + .map(|candidate| candidate.bucket) + .collect::>(); + + assert_eq!( + matches.len(), + 1, + "global bucket routing requires exactly one matching bucket for input_len={}", + req.input_len + ); + + GlobalRouteDecision { + req_id: req.req_id, + mode: self.reported_mode, + chosen_bucket: matches[0], + candidates, + reason: self.reason, + } + } +} + +pub fn build(full: &Config) -> Box { + match full.cluster.global_router.mode { + GlobalRouterMode::StrictInputLength => Box::new(StrictInputLengthRouter::new( + "strict_input_length", + "unique bucket range contains input_length", + )) as Box, + GlobalRouterMode::BucketScore => Box::new(StrictInputLengthRouter::new( + "bucket_score", + "bucket_score placeholder falls back to strict_input_length", + )) as Box, + } +} diff --git a/src/router/least_loaded.rs b/src/router/least_loaded.rs index efc0ed8..272cb93 100644 --- a/src/router/least_loaded.rs +++ b/src/router/least_loaded.rs @@ -41,13 +41,13 @@ impl Router for LeastLoadedRouter { best = inst.id; } } - RouteDecision { - req_id: req.req_id, - mode: "least_loaded", - chosen: best, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "least_loaded", + best, + 0.0, candidates, - reason: "argmin(kv_used + alpha * queue_len)", - } + "argmin(kv_used + alpha * queue_len)", + ) } } diff --git a/src/router/least_tokens.rs b/src/router/least_tokens.rs index effdad7..2c06427 100644 --- a/src/router/least_tokens.rs +++ b/src/router/least_tokens.rs @@ -61,13 +61,13 @@ impl Router for LeastTokensRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "least_tokens", - chosen: best, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "least_tokens", + best, + 0.0, candidates, - reason: "argmin(waiting_prefill_tokens)", - } + "argmin(waiting_prefill_tokens)", + ) } } diff --git a/src/router/lineage_affinity.rs b/src/router/lineage_affinity.rs index d23a580..0e9035b 100644 --- a/src/router/lineage_affinity.rs +++ b/src/router/lineage_affinity.rs @@ -231,13 +231,13 @@ impl Router for LineageAffinityRouter { self.request_home .insert(req.chat_id, instances[chosen.idx].id); - RouteDecision { - req_id: req.req_id, - mode: "lineage_affinity", - chosen: instances[chosen.idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "lineage_affinity", + instances[chosen.idx].id, + 0.0, candidates, reason, - } + ) } } diff --git a/src/router/min_pd.rs b/src/router/min_pd.rs index f801639..2033dee 100644 --- a/src/router/min_pd.rs +++ b/src/router/min_pd.rs @@ -90,13 +90,13 @@ impl Router for MinPdRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "min_pd", - chosen: best, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "min_pd", + best, + 0.0, candidates, - reason: "argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs", - } + "argmin(P*D), P=local-L0 miss tokens, D=ongoing reqs", + ) } } diff --git a/src/router/mod.rs b/src/router/mod.rs index 2a23e3c..e2e7676 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -6,6 +6,7 @@ pub mod cache_load; pub mod cache_score; pub mod cache_score_ttl; pub mod estimated_ttft; +pub mod global_bucket; pub mod least_loaded; pub mod least_tokens; pub mod lineage_affinity; @@ -23,6 +24,8 @@ use crate::instance::Instance; use crate::trace::RequestRecord; use crate::types::InstanceId; +pub use global_bucket::{BucketCandidate, BucketId, BucketView, GlobalRouteDecision, GlobalRouter}; + #[derive(Debug, Clone, Serialize)] pub struct CandidateInfo { pub instance: InstanceId, @@ -34,13 +37,25 @@ pub struct CandidateInfo { #[derive(Debug, Clone, Serialize)] pub struct RouteDecision { pub req_id: u64, + pub global_mode: &'static str, pub mode: &'static str, + pub chosen_bucket: BucketId, pub chosen: InstanceId, pub probe_overhead_s: f64, + pub bucket_candidates: Vec, pub candidates: Vec, pub reason: &'static str, } +impl RouteDecision { + pub fn with_global(mut self, decision: &GlobalRouteDecision) -> Self { + self.global_mode = decision.mode; + self.chosen_bucket = decision.chosen_bucket; + self.bucket_candidates = decision.candidates.clone(); + self + } +} + pub trait Router: Send { fn name(&self) -> &'static str; fn route( @@ -63,6 +78,27 @@ pub(crate) fn local_l0_scores(req: &RequestRecord, instances: &[Instance]) -> Ve .collect() } +pub fn local_route_decision( + req_id: u64, + mode: &'static str, + chosen: InstanceId, + probe_overhead_s: f64, + candidates: Vec, + reason: &'static str, +) -> RouteDecision { + RouteDecision { + req_id, + global_mode: "single_pool", + mode, + chosen_bucket: 0, + chosen, + probe_overhead_s, + bucket_candidates: Vec::new(), + candidates, + reason, + } +} + pub fn build(full: &Config, seed: u64) -> Box { use crate::config::RouterMode::*; let cfg = &full.cluster.router; @@ -122,6 +158,10 @@ pub fn build(full: &Config, seed: u64) -> Box { } } +pub fn build_global(full: &Config) -> Box { + global_bucket::build(full) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/router/precise_aware.rs b/src/router/precise_aware.rs index 9940815..e0d90cd 100644 --- a/src/router/precise_aware.rs +++ b/src/router/precise_aware.rs @@ -62,13 +62,13 @@ impl Router for PreciseRouter { } } - RouteDecision { - req_id: req.req_id, - mode: "precise", - chosen: best, - probe_overhead_s: n as f64 * self.probe_latency_s, + crate::router::local_route_decision( + req.req_id, + "precise", + best, + n as f64 * self.probe_latency_s, candidates, - reason: "exact-probe all instances' L0 cache", - } + "exact-probe all instances' L0 cache", + ) } } diff --git a/src/router/prefix_affinity.rs b/src/router/prefix_affinity.rs index ca6bcb3..bde3443 100644 --- a/src/router/prefix_affinity.rs +++ b/src/router/prefix_affinity.rs @@ -166,13 +166,13 @@ impl Router for PrefixAffinityRouter { reason = "prefix affinity: top-K min drain"; } - RouteDecision { - req_id: req.req_id, - mode: "prefix_affinity", - chosen: instances[best_idx].id, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "prefix_affinity", + instances[best_idx].id, + 0.0, candidates, reason, - } + ) } } diff --git a/src/router/random.rs b/src/router/random.rs index c4bfb17..8f2deae 100644 --- a/src/router/random.rs +++ b/src/router/random.rs @@ -33,19 +33,19 @@ impl Router for RandomRouter { ) -> RouteDecision { let n = instances.len(); let chosen = self.rng.gen_range(0..n) as InstanceId; - RouteDecision { - req_id: req.req_id, - mode: "random", + crate::router::local_route_decision( + req.req_id, + "random", chosen, - probe_overhead_s: 0.0, - candidates: vec![CandidateInfo { + 0.0, + vec![CandidateInfo { instance: chosen, predicted_prefix: 0, load_blocks: instances[chosen as usize].kv_blocks_used, queue_len: instances[chosen as usize].queue_len(), }], - reason: "uniform random", - } + "uniform random", + ) } } @@ -75,18 +75,18 @@ impl Router for RoundRobinRouter { let n = instances.len() as u32; let chosen = self.next % n; self.next = self.next.wrapping_add(1); - RouteDecision { - req_id: req.req_id, - mode: "round_robin", + crate::router::local_route_decision( + req.req_id, + "round_robin", chosen, - probe_overhead_s: 0.0, - candidates: vec![CandidateInfo { + 0.0, + vec![CandidateInfo { instance: chosen, predicted_prefix: 0, load_blocks: instances[chosen as usize].kv_blocks_used, queue_len: instances[chosen as usize].queue_len(), }], - reason: "round robin", - } + "round robin", + ) } } diff --git a/src/router/ttl_aware.rs b/src/router/ttl_aware.rs index 04481d0..6206ae7 100644 --- a/src/router/ttl_aware.rs +++ b/src/router/ttl_aware.rs @@ -46,13 +46,13 @@ impl Router for TtlAwareRouter { best = inst.id; } } - RouteDecision { - req_id: req.req_id, - mode: "ttl_aware", - chosen: best, - probe_overhead_s: 0.0, + crate::router::local_route_decision( + req.req_id, + "ttl_aware", + best, + 0.0, candidates, - reason: "max meta_store prefix, tie -> least loaded", - } + "max meta_store prefix, tie -> least loaded", + ) } }