fix: harden bucket routing review follow-up
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
use anyhow::Result;
|
||||
|
||||
use super::cluster::{AdmissionStats, Cluster};
|
||||
use crate::config::{BucketConfig, Config, ModelConfig};
|
||||
use crate::instance::Instance;
|
||||
@@ -46,17 +48,17 @@ impl BucketedService {
|
||||
&self.buckets[bucket_id as usize]
|
||||
}
|
||||
|
||||
pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> AdmissionStats {
|
||||
pub fn route_and_admit(&mut self, req: &RequestRecord, now: f64) -> Result<AdmissionStats> {
|
||||
let bucket_views = self
|
||||
.buckets
|
||||
.iter()
|
||||
.map(|bucket| bucket.cluster.bucket_view(bucket.id, &bucket.cfg))
|
||||
.collect::<Vec<_>>();
|
||||
let global = self.global_router.route(req, &bucket_views, now);
|
||||
let global = self.global_router.route(req, &bucket_views, now)?;
|
||||
let bucket = &mut self.buckets[global.chosen_bucket as usize];
|
||||
bucket
|
||||
Ok(bucket
|
||||
.cluster
|
||||
.route_and_admit_with_global(req, now, &global)
|
||||
.route_and_admit_with_global(req, now, &global))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,9 +169,19 @@ mod tests {
|
||||
fn strict_input_length_routes_into_matching_bucket() {
|
||||
let cfg = test_config();
|
||||
let mut service = BucketedService::new(&cfg, &cfg.model);
|
||||
let stats = service.route_and_admit(&req(1, 24, &[10, 11]), 0.0);
|
||||
let stats = service
|
||||
.route_and_admit(&req(1, 24, &[10, 11]), 0.0)
|
||||
.unwrap();
|
||||
assert_eq!(stats.bucket, 0);
|
||||
assert_eq!(stats.decision.chosen_bucket, 0);
|
||||
assert_eq!(
|
||||
stats.decision.global_reason,
|
||||
"unique bucket range contains input_length"
|
||||
);
|
||||
assert_eq!(
|
||||
stats.decision.local_reason,
|
||||
"argmin(kv_used + alpha * queue_len)"
|
||||
);
|
||||
assert_eq!(service.bucket(0).instances().len(), 2);
|
||||
}
|
||||
|
||||
@@ -177,10 +189,45 @@ mod tests {
|
||||
fn bucket_meta_store_is_isolated() {
|
||||
let cfg = test_config();
|
||||
let mut service = BucketedService::new(&cfg, &cfg.model);
|
||||
let _ = service.route_and_admit(&req(1, 24, &[10, 11]), 0.0);
|
||||
let long_stats = service.route_and_admit(&req(2, 64, &[10, 11, 12, 13]), 1.0);
|
||||
let _ = service
|
||||
.route_and_admit(&req(1, 24, &[10, 11]), 0.0)
|
||||
.unwrap();
|
||||
let long_stats = service
|
||||
.route_and_admit(&req(2, 64, &[10, 11, 12, 13]), 1.0)
|
||||
.unwrap();
|
||||
assert_eq!(long_stats.bucket, 1);
|
||||
assert_eq!(long_stats.remote_hit_blocks, 0);
|
||||
assert_eq!(long_stats.l1_hit_blocks, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unmatched_input_length_returns_recoverable_error() {
|
||||
let mut cfg = test_config();
|
||||
cfg.cluster.buckets[1].input_length_min = 40;
|
||||
let mut service = BucketedService::new(&cfg, &cfg.model);
|
||||
|
||||
let err = service
|
||||
.route_and_admit(&req(3, 36, &[20, 21, 22]), 0.0)
|
||||
.unwrap_err();
|
||||
|
||||
assert!(err.to_string().contains("no bucket"));
|
||||
assert!(err.to_string().contains("input_length=36"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bucket_score_placeholder_reports_strict_fallback() {
|
||||
let mut cfg = test_config();
|
||||
cfg.cluster.global_router.mode = GlobalRouterMode::BucketScore;
|
||||
let mut service = BucketedService::new(&cfg, &cfg.model);
|
||||
|
||||
let stats = service
|
||||
.route_and_admit(&req(4, 24, &[30, 31]), 0.0)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(stats.decision.global_mode, "strict_input_length");
|
||||
assert!(stats
|
||||
.decision
|
||||
.global_reason
|
||||
.contains("bucket_score is not implemented"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::config::{Config, GlobalRouterMode};
|
||||
@@ -54,7 +55,7 @@ pub trait GlobalRouter: Send {
|
||||
req: &RequestRecord,
|
||||
buckets: &[BucketView],
|
||||
now: f64,
|
||||
) -> GlobalRouteDecision;
|
||||
) -> Result<GlobalRouteDecision>;
|
||||
}
|
||||
|
||||
struct StrictInputLengthRouter {
|
||||
@@ -81,7 +82,7 @@ impl GlobalRouter for StrictInputLengthRouter {
|
||||
req: &RequestRecord,
|
||||
buckets: &[BucketView],
|
||||
_now: f64,
|
||||
) -> GlobalRouteDecision {
|
||||
) -> Result<GlobalRouteDecision> {
|
||||
let candidates = buckets
|
||||
.iter()
|
||||
.map(|view| BucketCandidate {
|
||||
@@ -102,20 +103,31 @@ impl GlobalRouter for StrictInputLengthRouter {
|
||||
.map(|candidate| candidate.bucket)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
matches.len(),
|
||||
1,
|
||||
"global bucket routing requires exactly one matching bucket for input_len={}",
|
||||
req.input_len
|
||||
);
|
||||
let chosen_bucket = match matches.as_slice() {
|
||||
[bucket] => *bucket,
|
||||
[] => {
|
||||
return Err(anyhow!(
|
||||
"cluster.global_router.mode={} has no bucket for input_length={}",
|
||||
self.reported_mode,
|
||||
req.input_len
|
||||
));
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow!(
|
||||
"cluster.global_router.mode={} matched multiple buckets for input_length={}",
|
||||
self.reported_mode,
|
||||
req.input_len
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
GlobalRouteDecision {
|
||||
Ok(GlobalRouteDecision {
|
||||
req_id: req.req_id,
|
||||
mode: self.reported_mode,
|
||||
chosen_bucket: matches[0],
|
||||
chosen_bucket,
|
||||
candidates,
|
||||
reason: self.reason,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,8 +138,8 @@ pub fn build(full: &Config) -> Box<dyn GlobalRouter> {
|
||||
"unique bucket range contains input_length",
|
||||
)) as Box<dyn GlobalRouter>,
|
||||
GlobalRouterMode::BucketScore => Box::new(StrictInputLengthRouter::new(
|
||||
"bucket_score",
|
||||
"bucket_score placeholder falls back to strict_input_length",
|
||||
"strict_input_length",
|
||||
"bucket_score is not implemented in Task 2; falling back to strict_input_length",
|
||||
)) as Box<dyn GlobalRouter>,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,17 +39,19 @@ pub struct RouteDecision {
|
||||
pub req_id: u64,
|
||||
pub global_mode: &'static str,
|
||||
pub mode: &'static str,
|
||||
pub global_reason: &'static str,
|
||||
pub local_reason: &'static str,
|
||||
pub chosen_bucket: BucketId,
|
||||
pub chosen: InstanceId,
|
||||
pub probe_overhead_s: f64,
|
||||
pub bucket_candidates: Vec<BucketCandidate>,
|
||||
pub candidates: Vec<CandidateInfo>,
|
||||
pub reason: &'static str,
|
||||
}
|
||||
|
||||
impl RouteDecision {
|
||||
pub fn with_global(mut self, decision: &GlobalRouteDecision) -> Self {
|
||||
self.global_mode = decision.mode;
|
||||
self.global_reason = decision.reason;
|
||||
self.chosen_bucket = decision.chosen_bucket;
|
||||
self.bucket_candidates = decision.candidates.clone();
|
||||
self
|
||||
@@ -90,12 +92,13 @@ pub fn local_route_decision(
|
||||
req_id,
|
||||
global_mode: "single_pool",
|
||||
mode,
|
||||
global_reason: "single pool uses bucket 0",
|
||||
local_reason: reason,
|
||||
chosen_bucket: 0,
|
||||
chosen,
|
||||
probe_overhead_s,
|
||||
bucket_candidates: Vec::new(),
|
||||
candidates,
|
||||
reason,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -393,7 +396,7 @@ mod tests {
|
||||
let mut router = PrefixAffinityRouter::new(&cfg);
|
||||
let decision = router.route(&req, &instances, &meta, 0.0);
|
||||
|
||||
assert_eq!(decision.reason, "affinity fallback: min(drain+fetch)");
|
||||
assert_eq!(decision.local_reason, "affinity fallback: min(drain+fetch)");
|
||||
assert_eq!(decision.chosen, 1);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user