feat: new router and benchmark setup

This commit is contained in:
2026-04-16 14:23:53 +08:00
parent c86d931d8f
commit 996511f300
35 changed files with 1480 additions and 76 deletions

View File

@@ -227,6 +227,7 @@ coefficients (`flops_per_token_prefill`, `attn_quadratic_coeff`, etc.).
| Model | Path | Architecture | | Model | Path | Architecture |
|-------|------|--------------| |-------|------|--------------|
| GLM-5 (744B/40B-active) | `models/GLM-5/config.json` | MoE (256 routed, 8 active, 1 shared) + MLA + DSA | | GLM-5 (744B/40B-active) | `models/GLM-5/config.json` | MoE (256 routed, 8 active, 1 shared) + MLA + DSA |
| GLM-5-FP8 | `models/GLM-5-FP8/config.json` | GLM-5 architecture + upstream FP8 quantization metadata |
| Qwen3-Coder-480B-A35B FP8 | `models/Qwen3-Coder-480B-A35B-Instruct-FP8/config.json` | MoE (160 experts, 8 active) + GQA | | Qwen3-Coder-480B-A35B FP8 | `models/Qwen3-Coder-480B-A35B-Instruct-FP8/config.json` | MoE (160 experts, 8 active) + GQA |
## Hardware configuration ## Hardware configuration
@@ -248,6 +249,7 @@ Available presets:
| `h100` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 | | `h100` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 |
| `h800` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 | | `h800` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 |
| `h20` | 148 TFLOPS | 96 GB | 4.0 TB/s | Gen5 | | `h20` | 148 TFLOPS | 96 GB | 4.0 TB/s | Gen5 |
| `h20-141g` | 148 TFLOPS | 141 GB | 4.8 TB/s | Gen5 |
| `a100-80gb` | 312 TFLOPS | 80 GB | 2.0 TB/s | Gen4 | | `a100-80gb` | 312 TFLOPS | 80 GB | 2.0 TB/s | Gen4 |
| `a100-40gb` | 312 TFLOPS | 40 GB | 1.555 TB/s | Gen4 | | `a100-40gb` | 312 TFLOPS | 40 GB | 1.555 TB/s | Gen4 |
| `b200` | 2.25 PFLOPS| 192 GB | 8.0 TB/s | Gen6 | | `b200` | 2.25 PFLOPS| 192 GB | 8.0 TB/s | Gen6 |
@@ -297,6 +299,7 @@ memory_time = layers * weight_bytes_per_layer / gpu_mem_bw
| Config | Model | Hardware | Instances | Trace | | Config | Model | Hardware | Instances | Trace |
|--------|-------|----------|-----------|-------| |--------|-------|----------|-----------|-------|
| `glm5-8xb200-hf.yaml` | GLM-5 via HF config.json | 8xB200 preset | 32 | GLM coder blk512 | | `glm5-8xb200-hf.yaml` | GLM-5 via HF config.json | 8xB200 preset | 32 | GLM coder blk512 |
| `glm5-fp8-8xh20-141g.yaml` | GLM-5-FP8 via ModelScope config.json | 8xH20-141G preset | 128 | GLM coder blk512 |
| `glm5-nvfp4-8xb300.yaml` | GLM-5-NVFP4 via HF config.json | 8xB300 preset | 8 | GLM coder blk512 | | `glm5-nvfp4-8xb300.yaml` | GLM-5-NVFP4 via HF config.json | 8xB300 preset | 8 | GLM coder blk512 |
| `qwen3-coder-480b-8xh20.yaml` | Qwen3-Coder via HF | 8xH20 preset | 32 | Qwen coder blk16 | | `qwen3-coder-480b-8xh20.yaml` | Qwen3-Coder via HF | 8xH20 preset | 32 | Qwen coder blk16 |

View File

@@ -0,0 +1,35 @@
# GLM-5-FP8 on 8 x H20-141G for the 0-32k bucket.
# Chosen to keep the best policy's mean TTFT below 5s.
model:
config_json: ../models/GLM-5-FP8/config.json
name: glm-5-fp8
compute_dtype: fp8
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xh20-141g
hbm_bytes: 300.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 56
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_fp8_8xh20_141g_ablation_0_32768_n56
sample_interval_s: 1.0
seed: 42
input_length_min: 0
input_length_max: 32768

View File

@@ -0,0 +1,38 @@
# GLM-5-FP8 (ZhipuAI/GLM-5-FP8) on 8 x H20-141G (N3E).
# Tuned for the 0-32768 input-length slice of
# bailian-traces/glm_coder_blksz_512_040915-040917.jsonl.
model:
config_json: ../models/GLM-5-FP8/config.json
name: glm-5-fp8
compute_dtype: fp8
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xh20-141g
hbm_bytes: 300.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 64
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
precise_probe_latency_us: 50.0
precise_probe_topk: 4
# Tuned on this filtered GLM coder workload: stronger queue penalty than
# the default 1.0 keeps cache_affinity's locality gains while reducing TTFT.
load_alpha: 1.5
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_fp8_8xh20_141g_ca_tuned
sample_interval_s: 1.0
seed: 42
input_length_min: 0
input_length_max: 32768

View File

@@ -0,0 +1,35 @@
# GLM-5-FP8 on 8 x H20-141G, 0-32768 slice.
# Analysis config: medium L1 (~10% of the default DRAM KV budget).
model:
config_json: ../models/GLM-5-FP8/config.json
name: glm-5-fp8
compute_dtype: fp8
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xh20-141g
hbm_bytes: 300.0e9
dram_bytes: 1.5e11
max_batch_slots: 256
cluster:
num_instances: 64
meta_store:
ttl_seconds: 300.0
router:
mode: min_pd
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_fp8_8xh20_141g_l1_medium
sample_interval_s: 1.0
seed: 42
input_length_min: 0
input_length_max: 32768

View File

@@ -0,0 +1,35 @@
# GLM-5-FP8 on 8 x H20-141G, 0-32768 slice.
# Analysis config: effectively disable L1/remote KV by shrinking DRAM to ~1 block.
model:
config_json: ../models/GLM-5-FP8/config.json
name: glm-5-fp8
compute_dtype: fp8
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xh20-141g
hbm_bytes: 300.0e9
dram_bytes: 1.0
max_batch_slots: 256
cluster:
num_instances: 64
meta_store:
ttl_seconds: 300.0
router:
mode: min_pd
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_fp8_8xh20_141g_l1_none
sample_interval_s: 1.0
seed: 42
input_length_min: 0
input_length_max: 32768

View File

@@ -0,0 +1,35 @@
# GLM-5-FP8 on 8 x H20-141G, 0-32768 slice.
# Analysis config: small L1 (~1% of the default DRAM KV budget).
model:
config_json: ../models/GLM-5-FP8/config.json
name: glm-5-fp8
compute_dtype: fp8
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xh20-141g
hbm_bytes: 300.0e9
dram_bytes: 1.5e10
max_batch_slots: 256
cluster:
num_instances: 64
meta_store:
ttl_seconds: 300.0
router:
mode: min_pd
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_fp8_8xh20_141g_l1_small
sample_interval_s: 1.0
seed: 42
input_length_min: 0
input_length_max: 32768

View File

@@ -0,0 +1,39 @@
# GLM-5-FP8 (ZhipuAI/GLM-5-FP8) on 8 x H20-141G (N3E).
# Architecture auto-loaded from the upstream ModelScope config.json.
#
# 8 x 141 GB = 1128 GB total HBM. With ~744 GB FP8 weights resident,
# keep the KV budget conservative to leave room for scales, BF16 holdouts,
# allocator slack, and runtime activations.
model:
config_json: ../models/GLM-5-FP8/config.json
name: glm-5-fp8
compute_dtype: fp8
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xh20-141g
hbm_bytes: 300.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 64
meta_store:
ttl_seconds: 300.0
router:
mode: min_pd
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_fp8_8xh20_141g
sample_interval_s: 1.0
seed: 42
input_length_min: 0
input_length_max: 32768

View File

@@ -0,0 +1,36 @@
# GLM-5-NVFP4 on 8 x B200 for the 32k-85k bucket.
# Chosen to keep the best policy's mean TTFT below 5s.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb200
hbm_bytes: 1150.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 5
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_8xb200_ablation_32769_87040_n5
sample_interval_s: 1.0
seed: 42
input_length_min: 32769
input_length_max: 87040

View File

@@ -0,0 +1,36 @@
# GLM-5-NVFP4 (nvidia/GLM-5-NVFP4) on 8 x B200 (192GB each).
# Architecture auto-loaded from HuggingFace config.json.
#
# FP4 weights: ~744B params * 0.5 bytes = ~372 GB across 8 GPUs.
# Total HBM: 8 * 192 GB = 1536 GB. Keep the KV budget below the raw
# remainder to leave room for runtime activations and allocator slack.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb200
hbm_bytes: 1150.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 8
meta_store:
ttl_seconds: 300.0
router:
mode: prefix_affinity
prefix_k: 8
load_alpha: 1.0
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_8xb200
sample_interval_s: 1.0
seed: 42

View File

@@ -0,0 +1,37 @@
# GLM-5-NVFP4 on 8 x B300 for the 128k++ bucket.
# A single instance already keeps mean TTFT below 5s, and routing is
# effectively irrelevant at N=1 because every request lands on the same node.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb300
hbm_bytes: 1900.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 1
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
precise_probe_latency_us: 50.0
precise_probe_topk: 1
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_8xb300_ablation_131073_plus_n1
sample_interval_s: 1.0
seed: 42
input_length_min: 131073
input_length_max: 4294967295

View File

@@ -0,0 +1,36 @@
# GLM-5-NVFP4 on 8 x B300 for the 85k-128k bucket.
# Chosen to keep the best policy's mean TTFT below 5s.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb300
hbm_bytes: 1900.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 2
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity_strong_only
precise_probe_latency_us: 50.0
precise_probe_topk: 4
load_alpha: 1.0
prefix_k: 8
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_8xb300_ablation_87041_131072_n2
sample_interval_s: 1.0
seed: 42
input_length_min: 87041
input_length_max: 131072

View File

@@ -7,7 +7,8 @@
model: model:
config_json: ../models/GLM-5-NVFP4/config.json config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4 name: glm-5-nvfp4
compute_dtype: fp4 # FP4 weights → selects FP4 tensor core FLOPS compute_dtype: fp8 # FP8 tensor-core execution
weight_dtype: fp4 # NVFP4 weights still set the HBM budget
dtype_bytes: 1 # FP8 KV cache dtype_bytes: 1 # FP8 KV cache
block_size_tokens: 512 block_size_tokens: 512

View File

@@ -0,0 +1,32 @@
# GLM-5-NVFP4 on 8 x B200 for the 32k-85k bucket.
# NVFP4 weights, FP8 compute. Chosen to keep the best policy below 5 s TTFT.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb200
hbm_bytes: 1150.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 9
meta_store:
ttl_seconds: 300.0
router:
mode: min_pd
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_fp8compute_8xb200_ablation_32769_87040_n9
sample_interval_s: 1.0
seed: 42
input_length_min: 32769
input_length_max: 87040

View File

@@ -0,0 +1,32 @@
# GLM-5-NVFP4 on 8 x B200 with FP8 tensor-core compute.
# Weights remain stored in NVFP4, so HBM budget follows FP4 storage.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb200
hbm_bytes: 1150.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 8
meta_store:
ttl_seconds: 300.0
router:
mode: prefix_affinity
prefix_k: 8
load_alpha: 1.0
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_fp8compute_8xb200
sample_interval_s: 1.0
seed: 42

View File

@@ -0,0 +1,33 @@
# GLM-5-NVFP4 on 8 x B300 for the 128k++ bucket.
# NVFP4 weights, FP8 compute. Routing is effectively irrelevant at one instance.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb300
hbm_bytes: 1900.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 1
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
precise_probe_topk: 1
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_fp8compute_8xb300_ablation_131073_plus_n1
sample_interval_s: 1.0
seed: 42
input_length_min: 131073
input_length_max: 4294967295

View File

@@ -0,0 +1,32 @@
# GLM-5-NVFP4 on 8 x B300 for the 85k-128k bucket.
# NVFP4 weights, FP8 compute. Chosen to keep the best policy below 5 s TTFT.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb300
hbm_bytes: 1900.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 4
meta_store:
ttl_seconds: 300.0
router:
mode: cache_affinity
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_fp8compute_8xb300_ablation_87041_131072_n4
sample_interval_s: 1.0
seed: 42
input_length_min: 87041
input_length_max: 131072

View File

@@ -0,0 +1,32 @@
# GLM-5-NVFP4 on 8 x B300 with FP8 tensor-core compute.
# Weights remain stored in NVFP4, so HBM budget follows FP4 storage.
model:
config_json: ../models/GLM-5-NVFP4/config.json
name: glm-5-nvfp4
compute_dtype: fp8
weight_dtype: fp4
dtype_bytes: 1
block_size_tokens: 512
hardware:
type: 8xb300
hbm_bytes: 1900.0e9
dram_bytes: 1.5e12
max_batch_slots: 256
cluster:
num_instances: 8
meta_store:
ttl_seconds: 300.0
router:
mode: prefix_affinity
prefix_k: 8
load_alpha: 1.0
sim:
trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl
max_requests: null
output_dir: runs/glm5_nvfp4_fp8compute_8xb300
sample_interval_s: 1.0
seed: 42

View File

@@ -0,0 +1,68 @@
{
"architectures": [
"GlmMoeDsaForCausalLM"
],
"attention_bias": false,
"attention_dropout": 0.0,
"dtype": "bfloat16",
"eos_token_id": [
154820,
154827,
154829
],
"ep_size": 1,
"first_k_dense_replace": 3,
"hidden_act": "silu",
"head_dim": 64,
"hidden_size": 6144,
"index_head_dim": 128,
"index_n_heads": 32,
"index_topk": 2048,
"indexer_rope_interleave": true,
"initializer_range": 0.02,
"intermediate_size": 12288,
"kv_lora_rank": 512,
"max_position_embeddings": 202752,
"moe_intermediate_size": 2048,
"moe_layer_freq": 1,
"model_type": "glm_moe_dsa",
"n_group": 1,
"n_routed_experts": 256,
"n_shared_experts": 1,
"norm_topk_prob": true,
"num_attention_heads": 64,
"num_experts_per_tok": 8,
"num_hidden_layers": 78,
"num_key_value_heads": 64,
"num_nextn_predict_layers": 1,
"pad_token_id": 154820,
"pretraining_tp": 1,
"q_lora_rank": 2048,
"qk_head_dim": 256,
"qk_nope_head_dim": 192,
"qk_rope_head_dim": 64,
"quantization_config": {
"activation_scheme": "dynamic",
"fmt": "e4m3",
"quant_method": "fp8",
"weight_block_size": [
128,
128
]
},
"rms_norm_eps": 1e-05,
"rope_interleave": true,
"rope_parameters": {
"rope_theta": 1000000,
"rope_type": "default"
},
"routed_scaling_factor": 2.5,
"scoring_func": "sigmoid",
"tie_word_embeddings": false,
"topk_group": 1,
"topk_method": "noaux_tc",
"transformers_version": "5.0.2.dev0",
"use_cache": true,
"v_head_dim": 256,
"vocab_size": 154880
}

View File

@@ -54,7 +54,11 @@ impl Cluster {
router, router,
block_size_tokens: model.block_size_tokens, block_size_tokens: model.block_size_tokens,
kv_block_bytes: model.kv_block_bytes(), kv_block_bytes: model.kv_block_bytes(),
ttft_model: TtftModel::new(&config.hardware, &config.calibration, model.kv_block_bytes()), ttft_model: TtftModel::new(
&config.hardware,
&config.calibration,
model.kv_block_bytes(),
),
} }
} }
@@ -256,6 +260,8 @@ mod tests {
let req = RequestRecord { let req = RequestRecord {
req_id: 1, req_id: 1,
chat_id: 0, chat_id: 0,
parent_chat_id: -1,
turn: 1,
arrival: 0.0, arrival: 0.0,
input_len: 32, input_len: 32,
output_len: 16, output_len: 16,
@@ -269,7 +275,10 @@ mod tests {
.insert_blocks(&req.hash_ids, &mut evicted); .insert_blocks(&req.hash_ids, &mut evicted);
let stats = cluster.route_and_admit(&req, 0.0); let stats = cluster.route_and_admit(&req, 0.0);
let pure_pcie = cluster.instances[0].links.pcie.cost(cluster.kv_block_bytes * 2); let pure_pcie = cluster.instances[0]
.links
.pcie
.cost(cluster.kv_block_bytes * 2);
assert!(stats.ready_at > pure_pcie); assert!(stats.ready_at > pure_pcie);
} }

View File

@@ -59,12 +59,17 @@ pub struct ModelConfig {
#[serde(default)] #[serde(default)]
pub attention: Option<AttentionConfig>, pub attention: Option<AttentionConfig>,
/// Compute / weight precision: `"bf16"` (default), `"fp8"`, or `"fp4"`. /// Compute precision: `"bf16"` (default), `"fp8"`, or `"fp4"`.
/// Controls which hardware FLOPS tier to use (`gpu_fp4_flops`, etc.) and /// Controls which hardware FLOPS tier to use (`gpu_fp4_flops`, etc.).
/// the weight-bytes-per-parameter for the memory-bound roofline check.
/// Independent of `dtype_bytes`, which sizes the KV cache. /// Independent of `dtype_bytes`, which sizes the KV cache.
#[serde(default)] #[serde(default)]
pub compute_dtype: Option<String>, pub compute_dtype: Option<String>,
/// Weight storage precision: `"bf16"`, `"fp8"`, or `"fp4"`.
/// When absent, falls back to `compute_dtype` for backward compatibility.
/// This lets the simulator represent deployments such as FP4/NVFP4
/// weights with FP8 tensor-core execution.
#[serde(default)]
pub weight_dtype: Option<String>,
// -- Legacy manual coefficients (used when hidden_size is absent) --------- // -- Legacy manual coefficients (used when hidden_size is absent) ---------
#[serde(default)] #[serde(default)]
@@ -88,20 +93,33 @@ impl ModelConfig {
self.hidden_size.is_some() self.hidden_size.is_some()
} }
/// Bytes per parameter for weight storage, derived from `compute_dtype`. fn precision_bytes(dtype: &str) -> Option<f64> {
/// match dtype {
/// - `"fp4"` → 0.5 "fp4" => Some(0.5),
/// - `"fp8"` → 1.0 "fp8" => Some(1.0),
/// - `"bf16"` / absent → `dtype_bytes` (backward-compatible) "bf16" => Some(2.0),
pub fn weight_dtype_bytes(&self) -> f64 { _ => None,
match self.compute_dtype.as_deref() {
Some("fp4") => 0.5,
Some("fp8") => 1.0,
Some("bf16") => 2.0,
_ => self.dtype_bytes as f64, // backward compat
} }
} }
/// Bytes per parameter for weight storage.
///
/// Priority:
/// - `weight_dtype`
/// - `compute_dtype` (backward compatibility)
/// - `dtype_bytes` fallback
pub fn weight_dtype_bytes(&self) -> f64 {
self.weight_dtype
.as_deref()
.and_then(Self::precision_bytes)
.or_else(|| {
self.compute_dtype
.as_deref()
.and_then(Self::precision_bytes)
})
.unwrap_or(self.dtype_bytes as f64)
}
/// Bytes of KV cache per block. /// Bytes of KV cache per block.
/// ///
/// For standard / GQA: `2 * L * kv_heads * head_dim * dtype * block_tokens` /// For standard / GQA: `2 * L * kv_heads * head_dim * dtype * block_tokens`
@@ -433,6 +451,8 @@ pub enum RouterMode {
CacheScoreTtl, CacheScoreTtl,
EstimatedTtft, EstimatedTtft,
PrefixAffinity, PrefixAffinity,
AdaptiveAffinity,
LineageAffinity,
} }
impl RouterMode { impl RouterMode {
@@ -454,6 +474,8 @@ impl RouterMode {
"cache_score_ttl" | "csttl" | "cs_ttl" => Ok(Self::CacheScoreTtl), "cache_score_ttl" | "csttl" | "cs_ttl" => Ok(Self::CacheScoreTtl),
"estimated_ttft" | "ettft" | "optimal" => Ok(Self::EstimatedTtft), "estimated_ttft" | "ettft" | "optimal" => Ok(Self::EstimatedTtft),
"prefix_affinity" | "affinity" | "pa" => Ok(Self::PrefixAffinity), "prefix_affinity" | "affinity" | "pa" => Ok(Self::PrefixAffinity),
"adaptive_affinity" | "aa" => Ok(Self::AdaptiveAffinity),
"lineage_affinity" | "la" => Ok(Self::LineageAffinity),
other => Err(anyhow::anyhow!("unknown router mode: {other}")), other => Err(anyhow::anyhow!("unknown router mode: {other}")),
} }
} }
@@ -476,6 +498,8 @@ impl RouterMode {
Self::CacheScoreTtl => "cache_score_ttl", Self::CacheScoreTtl => "cache_score_ttl",
Self::EstimatedTtft => "estimated_ttft", Self::EstimatedTtft => "estimated_ttft",
Self::PrefixAffinity => "prefix_affinity", Self::PrefixAffinity => "prefix_affinity",
Self::AdaptiveAffinity => "adaptive_affinity",
Self::LineageAffinity => "lineage_affinity",
} }
} }
} }
@@ -583,6 +607,8 @@ struct RawModelConfig {
#[serde(default)] #[serde(default)]
compute_dtype: Option<String>, compute_dtype: Option<String>,
#[serde(default)] #[serde(default)]
weight_dtype: Option<String>,
#[serde(default)]
flops_per_token_decode: Option<f64>, flops_per_token_decode: Option<f64>,
#[serde(default)] #[serde(default)]
bytes_per_token_decode: Option<f64>, bytes_per_token_decode: Option<f64>,
@@ -721,6 +747,9 @@ impl RawModelConfig {
if self.compute_dtype.is_some() { if self.compute_dtype.is_some() {
m.compute_dtype = self.compute_dtype; m.compute_dtype = self.compute_dtype;
} }
if self.weight_dtype.is_some() {
m.weight_dtype = self.weight_dtype;
}
if let Some(v) = self.flops_per_token_decode { if let Some(v) = self.flops_per_token_decode {
m.flops_per_token_decode = Some(v); m.flops_per_token_decode = Some(v);
} }
@@ -737,6 +766,18 @@ impl RawModelConfig {
m.block_size_tokens > 0, m.block_size_tokens > 0,
"model.block_size_tokens is required (not in HF config.json)" "model.block_size_tokens is required (not in HF config.json)"
); );
if let Some(dtype) = m.compute_dtype.as_deref() {
anyhow::ensure!(
ModelConfig::precision_bytes(dtype).is_some(),
"model.compute_dtype must be one of: bf16, fp8, fp4"
);
}
if let Some(dtype) = m.weight_dtype.as_deref() {
anyhow::ensure!(
ModelConfig::precision_bytes(dtype).is_some(),
"model.weight_dtype must be one of: bf16, fp8, fp4"
);
}
Ok(m) Ok(m)
} }
@@ -925,4 +966,41 @@ sim:
assert!((cfg.calibration.rdma_bw_util - 0.63).abs() < 1e-12); assert!((cfg.calibration.rdma_bw_util - 0.63).abs() < 1e-12);
assert!((cfg.calibration.scheduler_base_overhead_us - 123.0).abs() < 1e-12); assert!((cfg.calibration.scheduler_base_overhead_us - 123.0).abs() < 1e-12);
} }
#[test]
fn weight_dtype_can_differ_from_compute_dtype() {
let path = write_temp_config(
r#"
model:
name: test
num_layers: 4
num_kv_heads: 2
head_dim: 64
dtype_bytes: 1
block_size_tokens: 16
compute_dtype: fp8
weight_dtype: fp4
flops_per_token_prefill: 1.0e9
attn_quadratic_coeff: 64.0
hardware:
gpu_flops: 1.0e14
gpu_mem_bw: 1.0e12
hbm_bytes: 1.0e9
cluster:
num_instances: 2
meta_store:
ttl_seconds: 10.0
router:
mode: estimated_ttft
sim:
trace_path: trace.jsonl
output_dir: runs/test
"#,
);
let cfg = Config::from_yaml_path(&path).unwrap();
assert_eq!(cfg.model.compute_dtype.as_deref(), Some("fp8"));
assert_eq!(cfg.model.weight_dtype.as_deref(), Some("fp4"));
assert!((cfg.model.weight_dtype_bytes() - 0.5).abs() < 1e-12);
}
} }

View File

@@ -1,9 +1,10 @@
//! Simulation driver: pulls trace records, advances the event queue, runs //! Simulation driver: pulls trace records, advances the event queue, runs
//! instance batch ticks, and emits metrics. //! instance batch ticks, and emits metrics.
use anyhow::Result; use anyhow::{anyhow, Context, Result};
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::cluster::Cluster; use crate::cluster::Cluster;
use crate::config::{Config, RouterMode}; use crate::config::{Config, RouterMode};
@@ -19,10 +20,7 @@ use crate::trace::{RequestRecord, TraceReader};
/// Drop records whose `input_len` falls outside `sim.input_length_{min,max}`. /// Drop records whose `input_len` falls outside `sim.input_length_{min,max}`.
/// Used to carve an ablation onto a specific input-length bucket (e.g. 040k) /// Used to carve an ablation onto a specific input-length bucket (e.g. 040k)
/// without rewriting the trace file. No-op if both bounds are unset. /// without rewriting the trace file. No-op if both bounds are unset.
pub fn apply_input_length_filter( pub fn apply_input_length_filter(records: &mut Vec<RequestRecord>, cfg: &crate::config::SimConfig) {
records: &mut Vec<RequestRecord>,
cfg: &crate::config::SimConfig,
) {
let lo = cfg.input_length_min.unwrap_or(0); let lo = cfg.input_length_min.unwrap_or(0);
let hi = cfg.input_length_max.unwrap_or(u32::MAX); let hi = cfg.input_length_max.unwrap_or(u32::MAX);
if lo == 0 && hi == u32::MAX { if lo == 0 && hi == u32::MAX {
@@ -77,7 +75,10 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result<RunOutputs> {
eprintln!( eprintln!(
"[driver] input_length filter [{}, {}] kept {}/{} requests", "[driver] input_length filter [{}, {}] kept {}/{} requests",
config.sim.input_length_min.unwrap_or(0), config.sim.input_length_min.unwrap_or(0),
config.sim.input_length_max.map_or("".to_string(), |v| v.to_string()), config
.sim
.input_length_max
.map_or("".to_string(), |v| v.to_string()),
records.len(), records.len(),
raw_count, raw_count,
); );
@@ -206,6 +207,15 @@ pub fn ablate_fixed_placement(
base: &Config, base: &Config,
routers: &[RouterMode], routers: &[RouterMode],
evict_policies: &[ReplayEvictPolicy], evict_policies: &[ReplayEvictPolicy],
) -> Result<Vec<AblationRow>> {
ablate_fixed_placement_with_parallelism(base, routers, evict_policies, 1)
}
pub fn ablate_fixed_placement_with_parallelism(
base: &Config,
routers: &[RouterMode],
evict_policies: &[ReplayEvictPolicy],
jobs: usize,
) -> Result<Vec<AblationRow>> { ) -> Result<Vec<AblationRow>> {
let mut out = Vec::new(); let mut out = Vec::new();
for &policy in evict_policies { for &policy in evict_policies {
@@ -216,18 +226,112 @@ pub fn ablate_fixed_placement(
)); ));
} }
} }
if routers.is_empty() {
return Ok(out);
}
let worker_count = resolve_ablation_parallelism(jobs, routers.len());
if worker_count == 1 {
for &mode in routers { for &mode in routers {
out.extend(run_ablation_router(base, mode, evict_policies)?);
}
return Ok(out);
}
eprintln!(
"[ablate] running {} routers with {} workers",
routers.len(),
worker_count
);
let queue = Arc::new(Mutex::new(VecDeque::from(
routers
.iter()
.copied()
.enumerate()
.collect::<Vec<(usize, RouterMode)>>(),
)));
let mut ordered_results: Vec<(usize, Vec<AblationRow>)> = Vec::with_capacity(routers.len());
std::thread::scope(|scope| -> Result<()> {
let mut handles = Vec::with_capacity(worker_count);
for worker_idx in 0..worker_count {
let queue = Arc::clone(&queue);
let base = base.clone();
let policies = evict_policies.to_vec();
handles.push(
scope.spawn(move || -> Result<Vec<(usize, Vec<AblationRow>)>> {
let mut local = Vec::new();
loop {
let next = queue
.lock()
.map_err(|_| anyhow!("ablation work queue mutex poisoned"))?
.pop_front();
let Some((idx, mode)) = next else {
break;
};
eprintln!(
"[ablate] worker {}/{} router={} ...",
worker_idx + 1,
worker_count,
mode.as_str()
);
let rows = run_ablation_router(&base, mode, &policies)
.with_context(|| format!("ablation router={}", mode.as_str()))?;
local.push((idx, rows));
}
Ok(local)
}),
);
}
for handle in handles {
let local = handle
.join()
.map_err(|_| anyhow!("ablation worker thread panicked"))??;
ordered_results.extend(local);
}
Ok(())
})?;
ordered_results.sort_by_key(|(idx, _)| *idx);
for (_, rows) in ordered_results {
out.extend(rows);
}
Ok(out)
}
fn resolve_ablation_parallelism(jobs: usize, num_routers: usize) -> usize {
if num_routers == 0 {
return 1;
}
let requested = if jobs == 0 {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
} else {
jobs
};
requested.max(1).min(num_routers)
}
fn run_ablation_router(
base: &Config,
mode: RouterMode,
evict_policies: &[ReplayEvictPolicy],
) -> Result<Vec<AblationRow>> {
let mut cfg = base.clone(); let mut cfg = base.clone();
cfg.cluster.router.mode = mode; cfg.cluster.router.mode = mode;
let placement_run = run(&cfg, Some(&format!("{}__placement_lru", mode.as_str())))?; let placement_run = run(&cfg, Some(&format!("{}__placement_lru", mode.as_str())))?;
let mut rows = Vec::with_capacity(evict_policies.len());
for &policy in evict_policies { for &policy in evict_policies {
out.push(AblationRow::from_summary( rows.push(AblationRow::from_summary(
mode.as_str(), mode.as_str(),
policy, policy,
"realized_lru", "realized_lru",
&placement_run.summary, &placement_run.summary,
)); ));
} }
} Ok(rows)
Ok(out)
} }

View File

@@ -17,6 +17,7 @@ pub const AVAILABLE: &[&str] = &[
"h100", "h100",
"h800", "h800",
"h20", "h20",
"h20-141g",
"a100-80gb", "a100-80gb",
"a100-40gb", "a100-40gb",
"b200", "b200",
@@ -30,6 +31,9 @@ pub const AVAILABLE: &[&str] = &[
"2xh20", "2xh20",
"4xh20", "4xh20",
"8xh20", "8xh20",
"2xh20-141g",
"4xh20-141g",
"8xh20-141g",
"2xb200", "2xb200",
"4xb200", "4xb200",
"8xb200", "8xb200",
@@ -49,6 +53,7 @@ pub fn resolve(name: &str) -> Option<HardwareConfig> {
"h100" => Some(make_config(count, &H100)), "h100" => Some(make_config(count, &H100)),
"h800" => Some(make_config(count, &H800)), "h800" => Some(make_config(count, &H800)),
"h20" => Some(make_config(count, &H20)), "h20" => Some(make_config(count, &H20)),
"h20141g" | "h20141gb" => Some(make_config(count, &H20_141G)),
"a10080gb" | "a100" => Some(make_config(count, &A100_80GB)), "a10080gb" | "a100" => Some(make_config(count, &A100_80GB)),
"a10040gb" => Some(make_config(count, &A100_40GB)), "a10040gb" => Some(make_config(count, &A100_40GB)),
"b200" => Some(make_config(count, &B200)), "b200" => Some(make_config(count, &B200)),
@@ -113,6 +118,15 @@ const H20: GpuBase = GpuBase {
pcie_gen: 5, pcie_gen: 5,
}; };
const H20_141G: GpuBase = GpuBase {
flops: 1.48e14, // modeled as the same H20 compute envelope
fp8_flops: 2.96e14, // modeled as the same H20 FP8 throughput
fp4_flops: 0.0, // not supported
mem_bw: 4.8e12, // 141 GB HBM variant
hbm: 141.0e9, // 141 GB
pcie_gen: 5,
};
const A100_80GB: GpuBase = GpuBase { const A100_80GB: GpuBase = GpuBase {
flops: 3.12e14, // 312 TFLOPS BF16 flops: 3.12e14, // 312 TFLOPS BF16
fp8_flops: 0.0, // A100 has no FP8 tensor cores fp8_flops: 0.0, // A100 has no FP8 tensor cores
@@ -193,7 +207,11 @@ fn make_config(n: u32, base: &GpuBase) -> HardwareConfig {
pcie_latency_us: pcie_lat, pcie_latency_us: pcie_lat,
rdma_bw: rdma_base * rdma_scale, rdma_bw: rdma_base * rdma_scale,
rdma_latency_us: rdma_lat, rdma_latency_us: rdma_lat,
intra_node_tp_bw: if base.pcie_gen >= 6 { 1.8e12 * f } else { 9.0e11 * f }, intra_node_tp_bw: if base.pcie_gen >= 6 {
1.8e12 * f
} else {
9.0e11 * f
},
intra_node_tp_latency_us: if base.pcie_gen >= 6 { 1.0 } else { 2.0 }, intra_node_tp_latency_us: if base.pcie_gen >= 6 { 1.0 } else { 2.0 },
tp_degree: n, tp_degree: n,
max_batch_slots: 256, max_batch_slots: 256,
@@ -227,6 +245,7 @@ mod tests {
assert!(resolve("H100").is_some()); assert!(resolve("H100").is_some());
assert!(resolve("8xB200").is_some()); assert!(resolve("8xB200").is_some());
assert!(resolve("8x-B200").is_some()); assert!(resolve("8x-B200").is_some());
assert!(resolve("8xH20-141G").is_some());
assert!(resolve("a100-80gb").is_some()); assert!(resolve("a100-80gb").is_some());
assert!(resolve("A100_80GB").is_some()); assert!(resolve("A100_80GB").is_some());
assert!(resolve("a100_80gb").is_some()); assert!(resolve("a100_80gb").is_some());
@@ -258,4 +277,13 @@ mod tests {
assert!((s4.gpu_mem_bw - s1.gpu_mem_bw * 4.0).abs() < 1.0); assert!((s4.gpu_mem_bw - s1.gpu_mem_bw * 4.0).abs() < 1.0);
assert!((s8.hbm_bytes - s1.hbm_bytes * 8.0).abs() < 1.0); assert!((s8.hbm_bytes - s1.hbm_bytes * 8.0).abs() < 1.0);
} }
#[test]
fn h20_141g_variant_has_larger_hbm() {
let h20 = resolve("8xh20").unwrap();
let h20_141g = resolve("8xh20-141g").unwrap();
assert!((h20_141g.gpu_flops - h20.gpu_flops).abs() < 1.0);
assert!(h20_141g.hbm_bytes > h20.hbm_bytes);
assert!(h20_141g.gpu_mem_bw > h20.gpu_mem_bw);
}
} }

View File

@@ -301,8 +301,8 @@ impl ComputeModel {
let attn_time = attn_total_flops / (self.gpu_flops * self.attention_util.max(1e-6)); let attn_time = attn_total_flops / (self.gpu_flops * self.attention_util.max(1e-6));
let compute_time = linear_time + attn_time + self.num_layers * self.misc_layer_overhead_s; let compute_time = linear_time + attn_time + self.num_layers * self.misc_layer_overhead_s;
// Weight stream: all layers' active weights read once from HBM. // Weight stream: all layers' active weights read once from HBM.
let mem_time = let mem_time = self.weight_bytes_per_layer * self.num_layers
self.weight_bytes_per_layer * self.num_layers / (self.gpu_mem_bw * self.hbm_bw_util.max(1e-6)); / (self.gpu_mem_bw * self.hbm_bw_util.max(1e-6));
let tp_comm_time = if self.tp_collective_count_per_layer > 0.0 let tp_comm_time = if self.tp_collective_count_per_layer > 0.0
&& self.tp_bytes_per_token > 0.0 && self.tp_bytes_per_token > 0.0
&& self.intra_node_tp_bw > 0.0 && self.intra_node_tp_bw > 0.0

View File

@@ -10,6 +10,6 @@ pub mod oracle;
pub mod replay; pub mod replay;
pub mod router; pub mod router;
pub mod sim; pub mod sim;
pub mod ttft;
pub mod trace; pub mod trace;
pub mod ttft;
pub mod types; pub mod types;

View File

@@ -123,6 +123,10 @@ enum Cmd {
/// routers are then run at that cluster size. /// routers are then run at that cluster size.
#[arg(long, default_value = "cache_score")] #[arg(long, default_value = "cache_score")]
auto_probe_router: String, auto_probe_router: String,
/// Maximum number of routers to simulate concurrently.
/// `0` means auto-detect from available CPU parallelism.
#[arg(long, default_value_t = 0)]
jobs: usize,
#[command(flatten)] #[command(flatten)]
overrides: ConfigOverrides, overrides: ConfigOverrides,
}, },
@@ -168,6 +172,7 @@ fn main() -> Result<()> {
auto_target_ttft_mean, auto_target_ttft_mean,
auto_candidates, auto_candidates,
auto_probe_router, auto_probe_router,
jobs,
overrides, overrides,
} => cmd_ablate( } => cmd_ablate(
&config, &config,
@@ -177,6 +182,7 @@ fn main() -> Result<()> {
auto_target_ttft_mean, auto_target_ttft_mean,
&auto_candidates, &auto_candidates,
&auto_probe_router, &auto_probe_router,
jobs,
&overrides, &overrides,
), ),
Cmd::Validate { config, overrides } => cmd_validate(&config, &overrides), Cmd::Validate { config, overrides } => cmd_validate(&config, &overrides),
@@ -218,6 +224,7 @@ fn cmd_ablate(
auto_target_ttft_mean: f64, auto_target_ttft_mean: f64,
auto_candidates: &str, auto_candidates: &str,
auto_probe_router: &str, auto_probe_router: &str,
jobs: usize,
overrides: &ConfigOverrides, overrides: &ConfigOverrides,
) -> Result<()> { ) -> Result<()> {
let mut base = load(path, overrides)?; let mut base = load(path, overrides)?;
@@ -254,12 +261,7 @@ fn cmd_ablate(
sorted.sort_unstable(); sorted.sort_unstable();
let probe_mode = RouterMode::parse(auto_probe_router) let probe_mode = RouterMode::parse(auto_probe_router)
.with_context(|| format!("parsing --auto-probe-router='{auto_probe_router}'"))?; .with_context(|| format!("parsing --auto-probe-router='{auto_probe_router}'"))?;
let chosen = auto_select_instances( let chosen = auto_select_instances(&base, &sorted, probe_mode, auto_target_ttft_mean)?;
&base,
&sorted,
probe_mode,
auto_target_ttft_mean,
)?;
eprintln!( eprintln!(
"[ablate] auto-instances chose num_instances={} (target ttft_mean ≤ {:.3}s, probe_router={})", "[ablate] auto-instances chose num_instances={} (target ttft_mean ≤ {:.3}s, probe_router={})",
chosen, chosen,
@@ -270,7 +272,7 @@ fn cmd_ablate(
} }
eprintln!( eprintln!(
"[ablate] routers={} evict_policies={} num_instances={}", "[ablate] routers={} evict_policies={} num_instances={} jobs={}",
modes modes
.iter() .iter()
.map(RouterMode::as_str) .map(RouterMode::as_str)
@@ -282,8 +284,13 @@ fn cmd_ablate(
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(","), .join(","),
base.cluster.num_instances, base.cluster.num_instances,
if jobs == 0 {
"auto".to_string()
} else {
jobs.to_string()
},
); );
let all = driver::ablate_fixed_placement(&base, &modes, &policies)?; let all = driver::ablate_fixed_placement_with_parallelism(&base, &modes, &policies, jobs)?;
let agg_path = std::path::Path::new(&base.sim.output_dir).join("ablation.json"); let agg_path = std::path::Path::new(&base.sim.output_dir).join("ablation.json");
std::fs::create_dir_all(&base.sim.output_dir)?; std::fs::create_dir_all(&base.sim.output_dir)?;
std::fs::write(&agg_path, serde_json::to_string_pretty(&all)?)?; std::fs::write(&agg_path, serde_json::to_string_pretty(&all)?)?;
@@ -479,7 +486,9 @@ fn cmd_oracle(
eprintln!( eprintln!(
"[oracle] input_length filter [{}, {}] kept {}/{} requests", "[oracle] input_length filter [{}, {}] kept {}/{} requests",
cfg.sim.input_length_min.unwrap_or(0), cfg.sim.input_length_min.unwrap_or(0),
cfg.sim.input_length_max.map_or("".to_string(), |v| v.to_string()), cfg.sim
.input_length_max
.map_or("".to_string(), |v| v.to_string()),
records.len(), records.len(),
raw_count, raw_count,
); );

View File

@@ -221,6 +221,8 @@ mod tests {
RequestRecord { RequestRecord {
req_id: id, req_id: id,
chat_id: id as i64, chat_id: id as i64,
parent_chat_id: -1,
turn: 1,
arrival: t, arrival: t,
input_len: (hashes.len() as u32) * 16, input_len: (hashes.len() as u32) * 16,
output_len: 16, output_len: 16,

View File

@@ -0,0 +1,254 @@
//! Adaptive affinity routing for coding-agent workloads.
//!
//! The trace has two distinct regimes:
//!
//! 1. Short root prompts with a tiny shared stem. Their *current-request*
//! miss cost is small, so pure TTFT minimization tends to scatter them.
//! That destroys future cache locality for the dominant system prompts.
//! 2. Continuations or already-warm requests with a long local prefix. Their
//! immediate reuse is already visible, so a first-principles TTFT estimate
//! is the right objective.
//!
//! This router separates the two:
//!
//! - Keep a lightweight per-prefix heat map over the first few blocks.
//! - Only when a prefix family is both short and hot do we enforce affinity.
//! - The hot prefix is mapped to a deterministic rendezvous-ranked home set,
//! and the set widens logarithmically as the family gets hotter.
//! - Within that home set we still minimize estimated TTFT, and we fall back
//! to the global TTFT optimum if the affinity choice is clearly overloaded.
use std::collections::HashMap;
use crate::cluster::meta_store::MetaStore;
use crate::config::Config;
use crate::instance::Instance;
use crate::router::{CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord;
use crate::ttft::{classify_prefix_tiers, TtftModel};
#[derive(Debug, Clone, Copy, Default)]
struct PrefixStat {
seen: u16,
last_seen: f64,
}
#[derive(Debug, Clone, Copy)]
struct CostedCandidate {
idx: usize,
cost: f64,
reusable_blocks: u32,
queue_len: u32,
rendezvous: u64,
}
pub struct AdaptiveAffinityRouter {
ttft_model: TtftModel,
fingerprint_k: usize,
short_request_blocks: usize,
warm_prefix_blocks: u32,
hot_threshold: u16,
hot_ttl_s: f64,
max_fan_out: usize,
overload_ratio: f64,
overload_abs_s: f64,
prefix_stats: HashMap<u64, PrefixStat>,
}
impl AdaptiveAffinityRouter {
pub fn new(config: &Config) -> Self {
let n = config.cluster.num_instances as usize;
let configured_fan_out = config.cluster.router.affinity_fan_out;
let max_fan_out = if configured_fan_out > 0 {
configured_fan_out.max(2).min(n)
} else {
(n / 8).max(8).min(n)
};
Self {
ttft_model: TtftModel::new(
&config.hardware,
&config.calibration,
config.model.kv_block_bytes(),
),
// Coding-trace reuse is dominated by the system prompt stem.
fingerprint_k: config.cluster.router.prefix_k.clamp(1, 4),
short_request_blocks: 12,
warm_prefix_blocks: 8,
hot_threshold: 4,
hot_ttl_s: config.cluster.meta_store.ttl_seconds.max(1.0),
max_fan_out,
overload_ratio: 1.25,
overload_abs_s: 0.25,
prefix_stats: HashMap::new(),
}
}
fn fingerprint(hash_ids: &[u64], k: usize) -> u64 {
let take = hash_ids.len().min(k).max(1);
let mut fp: u64 = 0xcbf29ce484222325;
for &h in &hash_ids[..hash_ids.len().min(take)] {
fp ^= h;
fp = fp.wrapping_mul(0x100000001b3);
}
fp
}
fn rendezvous(fp: u64, instance_id: u32) -> u64 {
let mut h = fp ^ (instance_id as u64).wrapping_mul(0x9e3779b97f4a7c15);
h = h.wrapping_add(0x9e3779b97f4a7c15);
h = (h ^ (h >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb);
h ^ (h >> 31)
}
fn active_heat(&self, fp: u64, now: f64) -> u16 {
self.prefix_stats
.get(&fp)
.filter(|stat| now - stat.last_seen <= self.hot_ttl_s)
.map(|stat| stat.seen)
.unwrap_or(0)
}
fn observe(&mut self, fp: u64, now: f64) {
let stat = self.prefix_stats.entry(fp).or_default();
if now - stat.last_seen > self.hot_ttl_s {
stat.seen = 0;
}
stat.last_seen = now;
stat.seen = stat.seen.saturating_add(1);
}
fn fan_out(&self, heat: u16, n: usize) -> usize {
if heat < self.hot_threshold {
return 2.min(n);
}
let multiples = (heat / self.hot_threshold).max(1) as u32;
let extra = multiples.ilog2() as usize;
(2 + extra).min(self.max_fan_out).min(n).max(2)
}
fn candidate_cost(
&self,
req: &RequestRecord,
inst: &Instance,
meta: &MetaStore,
now: f64,
scheduler_s: f64,
) -> (f64, u32) {
let residency = classify_prefix_tiers(&req.hash_ids, inst, meta, now);
let reusable =
residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks;
let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens);
let kv_prepare = self.ttft_model.kv_prepare_time_s(residency);
let cost = inst.estimated_drain_time()
+ scheduler_s
+ kv_prepare
+ inst.compute.prefill_time(miss_tokens)
+ self.ttft_model.first_token_tail_s();
(cost, reusable)
}
fn better(a: CostedCandidate, b: CostedCandidate) -> bool {
a.cost < b.cost
|| (a.cost == b.cost && a.queue_len < b.queue_len)
|| (a.cost == b.cost
&& a.queue_len == b.queue_len
&& a.reusable_blocks > b.reusable_blocks)
}
}
impl Router for AdaptiveAffinityRouter {
fn name(&self) -> &'static str {
"adaptive_affinity"
}
fn route(
&mut self,
req: &RequestRecord,
instances: &[Instance],
meta: &MetaStore,
now: f64,
) -> RouteDecision {
let n = instances.len();
let scheduler_s = self.ttft_model.scheduler_overhead_s(n, 3);
let fp = Self::fingerprint(&req.hash_ids, self.fingerprint_k);
let active_heat = self.active_heat(fp, now);
let mut best_local_l0 = 0u32;
let mut candidates = Vec::with_capacity(n);
let mut scored = Vec::with_capacity(n);
for (idx, inst) in instances.iter().enumerate() {
let l0_hit = inst.cache.l0.longest_prefix_peek(&req.hash_ids) as u32;
best_local_l0 = best_local_l0.max(l0_hit);
let (cost, reusable_blocks) = self.candidate_cost(req, inst, meta, now, scheduler_s);
let rendezvous = Self::rendezvous(fp, inst.id);
candidates.push(CandidateInfo {
instance: inst.id,
predicted_prefix: reusable_blocks,
load_blocks: inst.kv_blocks_used,
queue_len: inst.queue_len(),
});
scored.push(CostedCandidate {
idx,
cost,
reusable_blocks,
queue_len: inst.queue_len(),
rendezvous,
});
}
let mut global_best = scored[0];
for cand in scored.iter().copied().skip(1) {
if Self::better(cand, global_best) {
global_best = cand;
}
}
let should_affinitize = req.hash_ids.len() <= self.short_request_blocks
&& best_local_l0 <= self.warm_prefix_blocks
&& active_heat.saturating_add(1) >= self.hot_threshold;
let (chosen_idx, reason) = if should_affinitize {
let fan_out = self.fan_out(active_heat.saturating_add(1), n);
scored.sort_unstable_by(|a, b| b.rendezvous.cmp(&a.rendezvous));
let mut home_best = scored[0];
for cand in scored.iter().copied().take(fan_out).skip(1) {
let better = Self::better(cand, home_best)
|| (cand.cost == home_best.cost
&& cand.queue_len == home_best.queue_len
&& cand.reusable_blocks == home_best.reusable_blocks
&& cand.rendezvous > home_best.rendezvous);
if better {
home_best = cand;
}
}
let home_cost_ok =
home_best.cost <= global_best.cost * self.overload_ratio + self.overload_abs_s;
if home_cost_ok {
(home_best.idx, "adaptive affinity: hot short prefix homeset")
} else {
(
global_best.idx,
"adaptive affinity fallback: global min estimated ttft",
)
}
} else {
(global_best.idx, "global min estimated ttft")
};
self.observe(fp, now);
RouteDecision {
req_id: req.req_id,
mode: "adaptive_affinity",
chosen: instances[chosen_idx].id,
probe_overhead_s: 0.0,
candidates,
reason,
}
}
}

View File

@@ -59,39 +59,39 @@ pub struct CacheAffinityRouter {
} }
impl CacheAffinityRouter { impl CacheAffinityRouter {
pub fn new(load_alpha: f64) -> Self { pub fn new(load_alpha: f64, fingerprint_k: usize) -> Self {
Self { Self {
name: "cache_affinity", name: "cache_affinity",
load_alpha, load_alpha,
l0_gamma: 1.0, l0_gamma: 1.0,
meta_delta: 0.25, meta_delta: 0.25,
fingerprint_k: 4, fingerprint_k: fingerprint_k.max(1),
use_rendezvous: true, use_rendezvous: true,
} }
} }
/// Ablation: cache_score-style weights (γ=0.1, δ=0) but keep rendezvous /// Ablation: cache_score-style weights (γ=0.1, δ=0) but keep rendezvous
/// tiebreak. Isolates the contribution of deterministic sticky placement. /// tiebreak. Isolates the contribution of deterministic sticky placement.
pub fn weak_with_rendezvous(load_alpha: f64) -> Self { pub fn weak_with_rendezvous(load_alpha: f64, fingerprint_k: usize) -> Self {
Self { Self {
name: "cache_affinity_weak_rend", name: "cache_affinity_weak_rend",
load_alpha, load_alpha,
l0_gamma: 0.1, l0_gamma: 0.1,
meta_delta: 0.0, meta_delta: 0.0,
fingerprint_k: 4, fingerprint_k: fingerprint_k.max(1),
use_rendezvous: true, use_rendezvous: true,
} }
} }
/// Ablation: strong cache weights (γ=1.0, δ=0.25) but first-found tiebreak /// Ablation: strong cache weights (γ=1.0, δ=0.25) but first-found tiebreak
/// instead of rendezvous. Isolates the contribution of reweighting alone. /// instead of rendezvous. Isolates the contribution of reweighting alone.
pub fn strong_no_rendezvous(load_alpha: f64) -> Self { pub fn strong_no_rendezvous(load_alpha: f64, fingerprint_k: usize) -> Self {
Self { Self {
name: "cache_affinity_strong_only", name: "cache_affinity_strong_only",
load_alpha, load_alpha,
l0_gamma: 1.0, l0_gamma: 1.0,
meta_delta: 0.25, meta_delta: 0.25,
fingerprint_k: 4, fingerprint_k: fingerprint_k.max(1),
use_rendezvous: false, use_rendezvous: false,
} }
} }

View File

@@ -58,8 +58,11 @@ impl Router for EstimatedTtftRouter {
let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens); let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens);
let kv_prepare = self.ttft_model.kv_prepare_time_s(residency); let kv_prepare = self.ttft_model.kv_prepare_time_s(residency);
let first_token_tail = self.ttft_model.first_token_tail_s(); let first_token_tail = self.ttft_model.first_token_tail_s();
let cost = let cost = drain
drain + scheduler + kv_prepare + inst.compute.prefill_time(miss_tokens) + first_token_tail; + scheduler
+ kv_prepare
+ inst.compute.prefill_time(miss_tokens)
+ first_token_tail;
candidates.push(CandidateInfo { candidates.push(CandidateInfo {
instance: inst.id, instance: inst.id,
@@ -72,7 +75,8 @@ impl Router for EstimatedTtftRouter {
// Minimise (cost, queue_len, -local_prefix). // Minimise (cost, queue_len, -local_prefix).
let ql = inst.queue_len(); let ql = inst.queue_len();
let reusable = residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks; let reusable =
residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks;
let better = cost < best_cost let better = cost < best_cost
|| (cost == best_cost && ql < best_queue) || (cost == best_cost && ql < best_queue)
|| (cost == best_cost && ql == best_queue && reusable > best_reuse); || (cost == best_cost && ql == best_queue && reusable > best_reuse);

View File

@@ -0,0 +1,243 @@
//! Lineage-aware reuse routing for agentic coding workloads.
//!
//! Workload hypothesis:
//! - turn-1 requests are diverse but recur by prefix family and benefit from
//! deterministic home placement instead of diffusion across the cluster;
//! - child requests usually extend the immediately preceding request and
//! should stay on the parent's instance whenever that instance is not
//! clearly overloaded.
//!
//! The router therefore uses three modes:
//! - strong local cache scoring for already-warm requests;
//! - parent stickiness for continuations with a known parent placement;
//! - family homesets (rendezvous-ranked top-K) for cold / weakly-warm
//! requests, with a global fallback if the homeset is substantially worse.
use std::collections::HashMap;
use crate::cluster::meta_store::MetaStore;
use crate::config::Config;
use crate::instance::Instance;
use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router};
use crate::trace::RequestRecord;
#[derive(Debug, Clone, Copy, Default)]
struct FamilyStat {
seen: u16,
last_seen: f64,
}
#[derive(Debug, Clone, Copy)]
struct CandidateCost {
idx: usize,
cost: f64,
l0_hit: u32,
meta_only: u32,
queue_len: u32,
rendezvous: u64,
}
pub struct LineageAffinityRouter {
load_alpha: f64,
l0_gamma: f64,
meta_delta: f64,
fingerprint_k: usize,
warm_prefix_blocks: u32,
hot_ttl_s: f64,
max_fan_out: usize,
parent_cost_slack: f64,
homeset_cost_slack: f64,
family_stats: HashMap<u64, FamilyStat>,
request_home: HashMap<i64, u32>,
}
impl LineageAffinityRouter {
pub fn new(config: &Config) -> Self {
let n = config.cluster.num_instances as usize;
let configured_fan_out = config.cluster.router.affinity_fan_out;
let max_fan_out = if configured_fan_out > 0 {
configured_fan_out.max(2).min(n)
} else {
(n / 8).max(8).min(n)
};
Self {
load_alpha: config.cluster.router.load_alpha.max(1.0),
l0_gamma: 1.0,
meta_delta: 0.25,
fingerprint_k: config.cluster.router.prefix_k.clamp(2, 8),
warm_prefix_blocks: 12,
hot_ttl_s: config.cluster.meta_store.ttl_seconds.max(1.0),
max_fan_out,
// Measured in the same score units as α·queue γ·hit.
parent_cost_slack: 6.0,
homeset_cost_slack: 2.0,
family_stats: HashMap::new(),
request_home: HashMap::new(),
}
}
fn fingerprint(hash_ids: &[u64], k: usize) -> u64 {
let take = hash_ids.len().min(k).max(1);
let mut fp: u64 = 0xcbf29ce484222325;
for &h in &hash_ids[..take] {
fp ^= h;
fp = fp.wrapping_mul(0x100000001b3);
}
fp
}
fn rendezvous(fp: u64, instance_id: u32) -> u64 {
let mut h = fp ^ (instance_id as u64).wrapping_mul(0x9e3779b97f4a7c15);
h = h.wrapping_add(0x9e3779b97f4a7c15);
h = (h ^ (h >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb);
h ^ (h >> 31)
}
fn active_heat(&self, fp: u64, now: f64) -> u16 {
self.family_stats
.get(&fp)
.filter(|stat| now - stat.last_seen <= self.hot_ttl_s)
.map(|stat| stat.seen)
.unwrap_or(0)
}
fn observe(&mut self, fp: u64, now: f64) {
let stat = self.family_stats.entry(fp).or_default();
if now - stat.last_seen > self.hot_ttl_s {
stat.seen = 0;
}
stat.last_seen = now;
stat.seen = stat.seen.saturating_add(1);
}
fn fan_out(&self, heat: u16, n: usize) -> usize {
let base = 2usize;
let extra = match heat {
0..=1 => 0,
2..=3 => 1,
4..=7 => 2,
8..=15 => 3,
_ => 4,
};
(base + extra).min(self.max_fan_out).min(n).max(2)
}
fn better(a: CandidateCost, b: CandidateCost) -> bool {
a.cost < b.cost
|| (a.cost == b.cost && a.l0_hit > b.l0_hit)
|| (a.cost == b.cost && a.l0_hit == b.l0_hit && a.queue_len < b.queue_len)
|| (a.cost == b.cost
&& a.l0_hit == b.l0_hit
&& a.queue_len == b.queue_len
&& a.meta_only > b.meta_only)
}
}
impl Router for LineageAffinityRouter {
fn name(&self) -> &'static str {
"lineage_affinity"
}
fn route(
&mut self,
req: &RequestRecord,
instances: &[Instance],
meta: &MetaStore,
now: f64,
) -> RouteDecision {
let n = instances.len();
let l0 = local_l0_scores(req, instances);
let meta_scores = meta.score_prefix(&req.hash_ids, now, n);
let family_fp = Self::fingerprint(&req.hash_ids, self.fingerprint_k);
let family_heat = self.active_heat(family_fp, now).saturating_add(1);
let parent_home = if req.parent_chat_id >= 0 {
self.request_home.get(&req.parent_chat_id).copied()
} else {
None
};
let mut candidates = Vec::with_capacity(n);
let mut scored = Vec::with_capacity(n);
let mut best_local_l0 = 0u32;
for (idx, inst) in instances.iter().enumerate() {
let l0_hit = l0[idx];
best_local_l0 = best_local_l0.max(l0_hit);
let meta_only = meta_scores[idx].saturating_sub(l0_hit);
let queue_len = inst.queue_len();
let cost = self.load_alpha * queue_len as f64
- self.l0_gamma * l0_hit as f64
- self.meta_delta * meta_only as f64;
let rend = Self::rendezvous(family_fp, inst.id);
candidates.push(CandidateInfo {
instance: inst.id,
predicted_prefix: l0_hit,
load_blocks: inst.kv_blocks_used,
queue_len,
});
scored.push(CandidateCost {
idx,
cost,
l0_hit,
meta_only,
queue_len,
rendezvous: rend,
});
}
let mut global_best = scored[0];
for cand in scored.iter().copied().skip(1) {
if Self::better(cand, global_best) {
global_best = cand;
}
}
let mut chosen = global_best;
let reason = if let Some(parent_inst) = parent_home {
let parent = scored[parent_inst as usize];
if parent.cost <= global_best.cost + self.parent_cost_slack {
chosen = parent;
"lineage affinity: parent stickiness"
} else {
"lineage affinity: parent overloaded, global best"
}
} else if best_local_l0 < self.warm_prefix_blocks {
let fan_out = self.fan_out(family_heat, n);
scored.sort_unstable_by(|a, b| b.rendezvous.cmp(&a.rendezvous));
let mut home_best = scored[0];
for cand in scored.iter().copied().take(fan_out).skip(1) {
let better = Self::better(cand, home_best)
|| (cand.cost == home_best.cost
&& cand.l0_hit == home_best.l0_hit
&& cand.queue_len == home_best.queue_len
&& cand.meta_only == home_best.meta_only
&& cand.rendezvous > home_best.rendezvous);
if better {
home_best = cand;
}
}
if home_best.cost <= global_best.cost + self.homeset_cost_slack {
chosen = home_best;
"lineage affinity: family homeset"
} else {
"lineage affinity: homeset overloaded, global best"
}
} else {
"lineage affinity: warm request global locality"
};
self.observe(family_fp, now);
self.request_home
.insert(req.chat_id, instances[chosen.idx].id);
RouteDecision {
req_id: req.req_id,
mode: "lineage_affinity",
chosen: instances[chosen.idx].id,
probe_overhead_s: 0.0,
candidates,
reason,
}
}
}

View File

@@ -1,5 +1,6 @@
//! Cluster-level routing strategies. //! Cluster-level routing strategies.
pub mod adaptive_affinity;
pub mod cache_affinity; pub mod cache_affinity;
pub mod cache_load; pub mod cache_load;
pub mod cache_score; pub mod cache_score;
@@ -7,6 +8,7 @@ pub mod cache_score_ttl;
pub mod estimated_ttft; pub mod estimated_ttft;
pub mod least_loaded; pub mod least_loaded;
pub mod least_tokens; pub mod least_tokens;
pub mod lineage_affinity;
pub mod min_pd; pub mod min_pd;
pub mod precise_aware; pub mod precise_aware;
pub mod prefix_affinity; pub mod prefix_affinity;
@@ -79,13 +81,15 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
MinPd => Box::new(min_pd::MinPdRouter::new()) as Box<dyn Router>, MinPd => Box::new(min_pd::MinPdRouter::new()) as Box<dyn Router>,
LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box<dyn Router>, LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box<dyn Router>,
CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box<dyn Router>, CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box<dyn Router>,
CacheAffinity => Box::new(cache_affinity::CacheAffinityRouter::new(cfg.load_alpha)) CacheAffinity => Box::new(cache_affinity::CacheAffinityRouter::new(
as Box<dyn Router>, cfg.load_alpha,
cfg.prefix_k,
)) as Box<dyn Router>,
CacheAffinityWeakRend => Box::new( CacheAffinityWeakRend => Box::new(
cache_affinity::CacheAffinityRouter::weak_with_rendezvous(cfg.load_alpha), cache_affinity::CacheAffinityRouter::weak_with_rendezvous(cfg.load_alpha, cfg.prefix_k),
) as Box<dyn Router>, ) as Box<dyn Router>,
CacheAffinityStrongOnly => Box::new( CacheAffinityStrongOnly => Box::new(
cache_affinity::CacheAffinityRouter::strong_no_rendezvous(cfg.load_alpha), cache_affinity::CacheAffinityRouter::strong_no_rendezvous(cfg.load_alpha, cfg.prefix_k),
) as Box<dyn Router>, ) as Box<dyn Router>,
CacheScore => Box::new(cache_score::CacheScoreRouter::new( CacheScore => Box::new(cache_score::CacheScoreRouter::new(
cfg.score_alpha, cfg.score_alpha,
@@ -96,8 +100,9 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
// offsets one queue position. Demonstrates how much of cache_affinity's // offsets one queue position. Demonstrates how much of cache_affinity's
// gain is reproducible by just retuning β (no rendezvous, no meta-store // gain is reproducible by just retuning β (no rendezvous, no meta-store
// bonus). // bonus).
CacheScoreStrong => Box::new(cache_score::CacheScoreRouter::new(1.0, 1.0)) CacheScoreStrong => {
as Box<dyn Router>, Box::new(cache_score::CacheScoreRouter::new(1.0, 1.0)) as Box<dyn Router>
}
CacheScoreTtl => Box::new(cache_score_ttl::CacheScoreTtlRouter::new( CacheScoreTtl => Box::new(cache_score_ttl::CacheScoreTtlRouter::new(
cfg.score_alpha, cfg.score_alpha,
cfg.score_beta, cfg.score_beta,
@@ -108,6 +113,12 @@ pub fn build(full: &Config, seed: u64) -> Box<dyn Router> {
PrefixAffinity => { PrefixAffinity => {
Box::new(prefix_affinity::PrefixAffinityRouter::new(full)) as Box<dyn Router> Box::new(prefix_affinity::PrefixAffinityRouter::new(full)) as Box<dyn Router>
} }
AdaptiveAffinity => {
Box::new(adaptive_affinity::AdaptiveAffinityRouter::new(full)) as Box<dyn Router>
}
LineageAffinity => {
Box::new(lineage_affinity::LineageAffinityRouter::new(full)) as Box<dyn Router>
}
} }
} }
@@ -210,6 +221,8 @@ mod tests {
RequestRecord { RequestRecord {
req_id: 1, req_id: 1,
chat_id: 0, chat_id: 0,
parent_chat_id: -1,
turn: 1,
arrival: 0.0, arrival: 0.0,
input_len: hashes.len() as u32 * 16, input_len: hashes.len() as u32 * 16,
output_len: 16, output_len: 16,
@@ -317,7 +330,10 @@ mod tests {
assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1); assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1);
let mut cache_score_ttl = CacheScoreTtlRouter::new(0.0, 1.0); let mut cache_score_ttl = CacheScoreTtlRouter::new(0.0, 1.0);
assert_eq!(cache_score_ttl.route(&req, &instances, &meta, 0.0).chosen, 0); assert_eq!(
cache_score_ttl.route(&req, &instances, &meta, 0.0).chosen,
0
);
} }
#[test] #[test]

View File

@@ -21,12 +21,16 @@ struct RawRecord {
#[serde(default)] #[serde(default)]
chat_id: i64, chat_id: i64,
#[serde(default)] #[serde(default)]
parent_chat_id: i64,
#[serde(default)]
timestamp: f64, timestamp: f64,
#[serde(default)] #[serde(default)]
input_length: i64, input_length: i64,
#[serde(default)] #[serde(default)]
output_length: i64, output_length: i64,
#[serde(default)] #[serde(default)]
turn: i64,
#[serde(default)]
hash_ids: Vec<i64>, hash_ids: Vec<i64>,
} }
@@ -34,6 +38,8 @@ struct RawRecord {
pub struct RequestRecord { pub struct RequestRecord {
pub req_id: u64, pub req_id: u64,
pub chat_id: i64, pub chat_id: i64,
pub parent_chat_id: i64,
pub turn: i64,
pub arrival: f64, pub arrival: f64,
pub input_len: u32, pub input_len: u32,
pub output_len: u32, pub output_len: u32,
@@ -88,6 +94,8 @@ impl Iterator for TraceReader {
return Some(Ok(RequestRecord { return Some(Ok(RequestRecord {
req_id: id, req_id: id,
chat_id: raw.chat_id, chat_id: raw.chat_id,
parent_chat_id: raw.parent_chat_id,
turn: raw.turn,
arrival: raw.timestamp, arrival: raw.timestamp,
input_len: raw.input_length.max(0) as u32, input_len: raw.input_length.max(0) as u32,
output_len: raw.output_length.max(0) as u32, output_len: raw.output_length.max(0) as u32,

View File

@@ -79,7 +79,10 @@ impl TtftModel {
return 0.0; return 0.0;
} }
let bytes = self.block_bytes(blocks); let bytes = self.block_bytes(blocks);
self.remote_metadata_s + self.rdma_cost_s(bytes) + self.pcie_cost_s(bytes) + self.layout_transform_s self.remote_metadata_s
+ self.rdma_cost_s(bytes)
+ self.pcie_cost_s(bytes)
+ self.layout_transform_s
} }
pub fn pcie_cost_s(&self, bytes: u64) -> f64 { pub fn pcie_cost_s(&self, bytes: u64) -> f64 {

View File

@@ -221,3 +221,54 @@ fn ablate_rejects_belady_until_exact_algorithm_exists() {
"unexpected error: {err:#}" "unexpected error: {err:#}"
); );
} }
#[test]
fn ablation_parallel_matches_serial() {
let tmp = std::env::temp_dir().join("kvcache_sim_ablate_parallel");
let _ = std::fs::remove_dir_all(&tmp);
std::fs::create_dir_all(&tmp).unwrap();
let trace_path = tmp.join("trace.jsonl");
write_synthetic_trace(&trace_path);
let cfg = base_config(
trace_path.to_str().unwrap(),
tmp.to_str().unwrap(),
RouterMode::Random,
);
let routers = [
RouterMode::Random,
RouterMode::LeastLoaded,
RouterMode::TtlAware,
RouterMode::Precise,
];
let serial = driver::ablate_fixed_placement_with_parallelism(
&cfg,
&routers,
&[ReplayEvictPolicy::Lru],
1,
)
.expect("serial ablate");
let parallel = driver::ablate_fixed_placement_with_parallelism(
&cfg,
&routers,
&[ReplayEvictPolicy::Lru],
2,
)
.expect("parallel ablate");
assert_eq!(parallel.len(), serial.len());
for (lhs, rhs) in parallel.iter().zip(serial.iter()) {
assert_eq!(lhs.router, rhs.router);
assert_eq!(lhs.evict_policy, rhs.evict_policy);
assert_eq!(lhs.placement_source, rhs.placement_source);
assert!((lhs.ttft_mean - rhs.ttft_mean).abs() < 1e-9);
assert!((lhs.ttft_p50 - rhs.ttft_p50).abs() < 1e-9);
assert!((lhs.ttft_p95 - rhs.ttft_p95).abs() < 1e-9);
assert!((lhs.ttft_p99 - rhs.ttft_p99).abs() < 1e-9);
assert!((lhs.hit_rate_l0 - rhs.hit_rate_l0).abs() < 1e-12);
assert!((lhs.hit_rate_l1 - rhs.hit_rate_l1).abs() < 1e-12);
assert!((lhs.hit_rate_remote - rhs.hit_rate_remote).abs() < 1e-12);
assert!((lhs.miss_rate - rhs.miss_rate).abs() < 1e-12);
}
}