From 3a84c1506841becac0dd7cc1b7299ff105698a0a Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 17 Apr 2026 15:15:18 +0800 Subject: [PATCH] fix: harden bucket routing review follow-up --- src/cluster/bucketed_service.rs | 61 +++++++++++++++++++++++++++++---- src/router/global_bucket.rs | 38 +++++++++++++------- src/router/mod.rs | 9 +++-- 3 files changed, 85 insertions(+), 23 deletions(-) diff --git a/src/cluster/bucketed_service.rs b/src/cluster/bucketed_service.rs index 104f77a..4720965 100644 --- a/src/cluster/bucketed_service.rs +++ b/src/cluster/bucketed_service.rs @@ -1,3 +1,5 @@ +use anyhow::Result; + use super::cluster::{AdmissionStats, Cluster}; use crate::config::{BucketConfig, Config, ModelConfig}; use crate::instance::Instance; @@ -46,17 +48,17 @@ impl BucketedService { &self.buckets[bucket_id as usize] } - pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats { + pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> Result { let bucket_views = self .buckets .iter() .map(|bucket| bucket.cluster.bucket_view(bucket.id, &bucket.cfg)) .collect::>(); - let global = self.global_router.route(req, &bucket_views, now); + let global = self.global_router.route(req, &bucket_views, now)?; let bucket = &mut self.buckets[global.chosen_bucket as usize]; - bucket + Ok(bucket .cluster - .route_and_admit_with_global(req, now, &global) + .route_and_admit_with_global(req, now, &global)) } } @@ -167,9 +169,19 @@ mod tests { fn strict_input_length_routes_into_matching_bucket() { let cfg = test_config(); let mut service = BucketedService::new(&cfg, &cfg.model); - let stats = service.route_and_admit(&req(1, 24, &[10, 11]), 0.0); + let stats = service + .route_and_admit(&req(1, 24, &[10, 11]), 0.0) + .unwrap(); assert_eq!(stats.bucket, 0); assert_eq!(stats.decision.chosen_bucket, 0); + assert_eq!( + stats.decision.global_reason, + "unique bucket range contains input_length" + ); + assert_eq!( + stats.decision.local_reason, + "argmin(kv_used + alpha * queue_len)" + ); assert_eq!(service.bucket(0).instances().len(), 2); } @@ -177,10 +189,45 @@ mod tests { fn bucket_meta_store_is_isolated() { let cfg = test_config(); let mut service = BucketedService::new(&cfg, &cfg.model); - let _ = service.route_and_admit(&req(1, 24, &[10, 11]), 0.0); - let long_stats = service.route_and_admit(&req(2, 64, &[10, 11, 12, 13]), 1.0); + let _ = service + .route_and_admit(&req(1, 24, &[10, 11]), 0.0) + .unwrap(); + let long_stats = service + .route_and_admit(&req(2, 64, &[10, 11, 12, 13]), 1.0) + .unwrap(); assert_eq!(long_stats.bucket, 1); assert_eq!(long_stats.remote_hit_blocks, 0); assert_eq!(long_stats.l1_hit_blocks, 0); } + + #[test] + fn unmatched_input_length_returns_recoverable_error() { + let mut cfg = test_config(); + cfg.cluster.buckets[1].input_length_min = 40; + let mut service = BucketedService::new(&cfg, &cfg.model); + + let err = service + .route_and_admit(&req(3, 36, &[20, 21, 22]), 0.0) + .unwrap_err(); + + assert!(err.to_string().contains("no bucket")); + assert!(err.to_string().contains("input_length=36")); + } + + #[test] + fn bucket_score_placeholder_reports_strict_fallback() { + let mut cfg = test_config(); + cfg.cluster.global_router.mode = GlobalRouterMode::BucketScore; + let mut service = BucketedService::new(&cfg, &cfg.model); + + let stats = service + .route_and_admit(&req(4, 24, &[30, 31]), 0.0) + .unwrap(); + + assert_eq!(stats.decision.global_mode, "strict_input_length"); + assert!(stats + .decision + .global_reason + .contains("bucket_score is not implemented")); + } } diff --git a/src/router/global_bucket.rs b/src/router/global_bucket.rs index 0684e17..f2f047a 100644 --- a/src/router/global_bucket.rs +++ b/src/router/global_bucket.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, Result}; use serde::Serialize; use crate::config::{Config, GlobalRouterMode}; @@ -54,7 +55,7 @@ pub trait GlobalRouter: Send { req: &RequestRecord, buckets: &[BucketView], now: f64, - ) -> GlobalRouteDecision; + ) -> Result; } struct StrictInputLengthRouter { @@ -81,7 +82,7 @@ impl GlobalRouter for StrictInputLengthRouter { req: &RequestRecord, buckets: &[BucketView], _now: f64, - ) -> GlobalRouteDecision { + ) -> Result { let candidates = buckets .iter() .map(|view| BucketCandidate { @@ -102,20 +103,31 @@ impl GlobalRouter for StrictInputLengthRouter { .map(|candidate| candidate.bucket) .collect::>(); - assert_eq!( - matches.len(), - 1, - "global bucket routing requires exactly one matching bucket for input_len={}", - req.input_len - ); + let chosen_bucket = match matches.as_slice() { + [bucket] => *bucket, + [] => { + return Err(anyhow!( + "cluster.global_router.mode={} has no bucket for input_length={}", + self.reported_mode, + req.input_len + )); + } + _ => { + return Err(anyhow!( + "cluster.global_router.mode={} matched multiple buckets for input_length={}", + self.reported_mode, + req.input_len + )); + } + }; - GlobalRouteDecision { + Ok(GlobalRouteDecision { req_id: req.req_id, mode: self.reported_mode, - chosen_bucket: matches[0], + chosen_bucket, candidates, reason: self.reason, - } + }) } } @@ -126,8 +138,8 @@ pub fn build(full: &Config) -> Box { "unique bucket range contains input_length", )) as Box, GlobalRouterMode::BucketScore => Box::new(StrictInputLengthRouter::new( - "bucket_score", - "bucket_score placeholder falls back to strict_input_length", + "strict_input_length", + "bucket_score is not implemented in Task 2; falling back to strict_input_length", )) as Box, } } diff --git a/src/router/mod.rs b/src/router/mod.rs index e2e7676..ef68dd5 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -39,17 +39,19 @@ pub struct RouteDecision { pub req_id: u64, pub global_mode: &'static str, pub mode: &'static str, + pub global_reason: &'static str, + pub local_reason: &'static str, pub chosen_bucket: BucketId, pub chosen: InstanceId, pub probe_overhead_s: f64, pub bucket_candidates: Vec, pub candidates: Vec, - pub reason: &'static str, } impl RouteDecision { pub fn with_global(mut self, decision: &GlobalRouteDecision) -> Self { self.global_mode = decision.mode; + self.global_reason = decision.reason; self.chosen_bucket = decision.chosen_bucket; self.bucket_candidates = decision.candidates.clone(); self @@ -90,12 +92,13 @@ pub fn local_route_decision( req_id, global_mode: "single_pool", mode, + global_reason: "single pool uses bucket 0", + local_reason: reason, chosen_bucket: 0, chosen, probe_overhead_s, bucket_candidates: Vec::new(), candidates, - reason, } } @@ -393,7 +396,7 @@ mod tests { let mut router = PrefixAffinityRouter::new(&cfg); let decision = router.route(&req, &instances, &meta, 0.0); - assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)"); + assert_eq!(decision.local_reason, "affinity fallback: min(drain+fetch)"); assert_eq!(decision.chosen, 1); }