fix: complete global router config and recoverable cluster init

This commit is contained in:
2026-04-17 14:50:47 +08:00
parent 008fe2fe5d
commit 96019082cc
3 changed files with 80 additions and 22 deletions

View File

@@ -1,6 +1,8 @@
//! Cluster: routes arrivals, performs the L0 / L1 / remote-RDMA fetch chain //! Cluster: routes arrivals, performs the L0 / L1 / remote-RDMA fetch chain
//! described in the design diagram, and bookkeeps the global meta store. //! described in the design diagram, and bookkeeps the global meta store.
use anyhow::Result;
use crate::cluster::meta_store::MetaStore; use crate::cluster::meta_store::MetaStore;
use crate::config::{Config, ModelConfig}; use crate::config::{Config, ModelConfig};
use crate::instance::instance::AdmittedRequest; use crate::instance::instance::AdmittedRequest;
@@ -36,11 +38,8 @@ pub struct Cluster {
} }
impl Cluster { impl Cluster {
pub fn new(config: &Config, model: &ModelConfig) -> Self { pub fn new(config: &Config, model: &ModelConfig) -> Result<Self> {
let total_instances = config let total_instances = config.cluster.require_legacy_single_pool("Cluster::new")?;
.cluster
.require_legacy_single_pool("Cluster::new")
.unwrap_or_else(|err| panic!("{err}"));
let mut instances = Vec::with_capacity(total_instances as usize); let mut instances = Vec::with_capacity(total_instances as usize);
for id in 0..total_instances { for id in 0..total_instances {
instances.push(Instance::new( instances.push(Instance::new(
@@ -52,7 +51,7 @@ impl Cluster {
} }
let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds); let meta_store = MetaStore::new(config.cluster.meta_store.ttl_seconds);
let router = router::build(config, config.sim.seed); let router = router::build(config, config.sim.seed);
Self { Ok(Self {
instances, instances,
meta_store, meta_store,
router, router,
@@ -63,7 +62,7 @@ impl Cluster {
&config.calibration, &config.calibration,
model.kv_block_bytes(), model.kv_block_bytes(),
), ),
} })
} }
/// Route + admit a request. Returns the chosen instance plus rich /// Route + admit a request. Returns the chosen instance plus rich
@@ -262,7 +261,7 @@ mod tests {
#[test] #[test]
fn l1_ready_at_includes_dram_and_transform_overhead() { fn l1_ready_at_includes_dram_and_transform_overhead() {
let cfg = test_config(RouterMode::EstimatedTtft); let cfg = test_config(RouterMode::EstimatedTtft);
let mut cluster = Cluster::new(&cfg, &cfg.model); let mut cluster = Cluster::new(&cfg, &cfg.model).unwrap();
let req = RequestRecord { let req = RequestRecord {
req_id: 1, req_id: 1,
chat_id: 0, chat_id: 0,
@@ -300,17 +299,10 @@ mod tests {
num_instances: 2, num_instances: 2,
}]; }];
let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { let result = Cluster::new(&cfg, &cfg.model);
Cluster::new(&cfg, &cfg.model); assert!(result.is_err(), "bucketed Cluster::new should fail");
})) let err = result.err().unwrap();
.expect_err("bucketed Cluster::new should panic"); assert!(err.to_string().contains("Cluster::new"));
assert!(err.to_string().contains("cluster.buckets"));
let msg = panic
.downcast_ref::<String>()
.cloned()
.or_else(|| panic.downcast_ref::<&str>().map(|s| (*s).to_string()))
.expect("panic payload should be a string");
assert!(msg.contains("Cluster::new"));
assert!(msg.contains("cluster.buckets"));
} }
} }

View File

@@ -397,16 +397,25 @@ pub struct BucketConfig {
pub num_instances: u32, pub num_instances: u32,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GlobalRouterConfig { pub struct GlobalRouterConfig {
#[serde(default)] #[serde(default)]
pub mode: GlobalRouterMode, pub mode: GlobalRouterMode,
#[serde(default = "default_global_router_length_penalty_weight")]
pub length_penalty_weight: f64,
#[serde(default = "default_global_router_load_weight")]
pub load_weight: f64,
#[serde(default = "default_global_router_cache_weight")]
pub cache_weight: f64,
} }
impl Default for GlobalRouterConfig { impl Default for GlobalRouterConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
mode: GlobalRouterMode::StrictInputLength, mode: GlobalRouterMode::StrictInputLength,
length_penalty_weight: default_global_router_length_penalty_weight(),
load_weight: default_global_router_load_weight(),
cache_weight: default_global_router_cache_weight(),
} }
} }
} }
@@ -428,6 +437,18 @@ impl GlobalRouterMode {
} }
} }
fn default_global_router_length_penalty_weight() -> f64 {
1.0
}
fn default_global_router_load_weight() -> f64 {
1.0
}
fn default_global_router_cache_weight() -> f64 {
1.0
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetaStoreConfig { pub struct MetaStoreConfig {
pub ttl_seconds: f64, pub ttl_seconds: f64,
@@ -1228,6 +1249,51 @@ sim:
assert_eq!(cfg.cluster.total_instances(), 3); assert_eq!(cfg.cluster.total_instances(), 3);
} }
#[test]
fn bucketed_config_deserializes_global_router_weights() {
let path = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 2
block_size_tokens: 16
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
meta_store:
ttl_seconds: 10.0
router:
mode: cache_affinity
global_router:
mode: bucket_score
length_penalty_weight: 1.5
load_weight: 0.75
cache_weight: 2.25
buckets:
- name: short
input_length_min: 0
input_length_max: 32
num_instances: 2
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&path).unwrap();
assert_eq!(cfg.cluster.global_router.mode, GlobalRouterMode::BucketScore);
assert!((cfg.cluster.global_router.length_penalty_weight - 1.5).abs() < 1e-12);
assert!((cfg.cluster.global_router.load_weight - 0.75).abs() < 1e-12);
assert!((cfg.cluster.global_router.cache_weight - 2.25).abs() < 1e-12);
}
#[test] #[test]
fn bucketed_config_rejects_overlapping_ranges_and_mixed_modes() { fn bucketed_config_rejects_overlapping_ranges_and_mixed_modes() {
let overlap = write_temp_config( let overlap = write_temp_config(

View File

@@ -52,7 +52,7 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
config config
.cluster .cluster
.require_legacy_single_pool("driver run")?; .require_legacy_single_pool("driver run")?;
let mut cluster = Cluster::new(config, &config.model); let mut cluster = Cluster::new(config, &config.model)?;
let mut q = EventQueue::new(); let mut q = EventQueue::new();
// Output directory // Output directory