From 996511f3003cf48a44bf183108c25336b2c46373 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Thu, 16 Apr 2026 14:23:53 +0800 Subject: [PATCH] feat: new router and benchmark setup --- README.md | 3 + configs/glm5-fp8-8xh20-141g-0-32k-n56.yaml | 35 +++ configs/glm5-fp8-8xh20-141g-ca-tuned.yaml | 38 +++ configs/glm5-fp8-8xh20-141g-l1-medium.yaml | 35 +++ configs/glm5-fp8-8xh20-141g-l1-none.yaml | 35 +++ configs/glm5-fp8-8xh20-141g-l1-small.yaml | 35 +++ configs/glm5-fp8-8xh20-141g.yaml | 39 +++ configs/glm5-nvfp4-8xb200-32k-85k-n5.yaml | 36 +++ configs/glm5-nvfp4-8xb200.yaml | 36 +++ configs/glm5-nvfp4-8xb300-128k-plus-n1.yaml | 37 +++ configs/glm5-nvfp4-8xb300-85k-128k-n2.yaml | 36 +++ configs/glm5-nvfp4-8xb300.yaml | 3 +- ...m5-nvfp4-fp8compute-8xb200-32k-85k-n9.yaml | 32 +++ configs/glm5-nvfp4-fp8compute-8xb200.yaml | 32 +++ ...-nvfp4-fp8compute-8xb300-128k-plus-n1.yaml | 33 +++ ...5-nvfp4-fp8compute-8xb300-85k-128k-n4.yaml | 32 +++ configs/glm5-nvfp4-fp8compute-8xb300.yaml | 32 +++ models/GLM-5-FP8/config.json | 68 +++++ src/cluster/cluster.rs | 13 +- src/config.rs | 106 +++++++- src/driver.rs | 140 ++++++++-- src/hardware_presets.rs | 30 ++- src/instance/compute.rs | 4 +- src/instance/instance.rs | 22 +- src/lib.rs | 2 +- src/main.rs | 27 +- src/oracle.rs | 2 + src/router/adaptive_affinity.rs | 254 ++++++++++++++++++ src/router/cache_affinity.rs | 12 +- src/router/estimated_ttft.rs | 10 +- src/router/lineage_affinity.rs | 243 +++++++++++++++++ src/router/mod.rs | 30 ++- src/trace.rs | 8 + src/ttft.rs | 5 +- tests/smoke.rs | 51 ++++ 35 files changed, 1480 insertions(+), 76 deletions(-) create mode 100644 configs/glm5-fp8-8xh20-141g-0-32k-n56.yaml create mode 100644 configs/glm5-fp8-8xh20-141g-ca-tuned.yaml create mode 100644 configs/glm5-fp8-8xh20-141g-l1-medium.yaml create mode 100644 configs/glm5-fp8-8xh20-141g-l1-none.yaml create mode 100644 configs/glm5-fp8-8xh20-141g-l1-small.yaml create mode 100644 configs/glm5-fp8-8xh20-141g.yaml create mode 100644 configs/glm5-nvfp4-8xb200-32k-85k-n5.yaml create mode 100644 configs/glm5-nvfp4-8xb200.yaml create mode 100644 configs/glm5-nvfp4-8xb300-128k-plus-n1.yaml create mode 100644 configs/glm5-nvfp4-8xb300-85k-128k-n2.yaml create mode 100644 configs/glm5-nvfp4-fp8compute-8xb200-32k-85k-n9.yaml create mode 100644 configs/glm5-nvfp4-fp8compute-8xb200.yaml create mode 100644 configs/glm5-nvfp4-fp8compute-8xb300-128k-plus-n1.yaml create mode 100644 configs/glm5-nvfp4-fp8compute-8xb300-85k-128k-n4.yaml create mode 100644 configs/glm5-nvfp4-fp8compute-8xb300.yaml create mode 100644 models/GLM-5-FP8/config.json create mode 100644 src/router/adaptive_affinity.rs create mode 100644 src/router/lineage_affinity.rs diff --git a/README.md b/README.md index 7b9d42b..9206b36 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,7 @@ coefficients (`flops_per_token_prefill`, `attn_quadratic_coeff`, etc.). | Model | Path | Architecture | |-------|------|--------------| | GLM-5 (744B/40B-active) | `models/GLM-5/config.json` | MoE (256 routed, 8 active, 1 shared) + MLA + DSA | +| GLM-5-FP8 | `models/GLM-5-FP8/config.json` | GLM-5 architecture + upstream FP8 quantization metadata | | Qwen3-Coder-480B-A35B FP8 | `models/Qwen3-Coder-480B-A35B-Instruct-FP8/config.json` | MoE (160 experts, 8 active) + GQA | ## Hardware configuration @@ -248,6 +249,7 @@ Available presets: | `h100` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 | | `h800` | 989 TFLOPS | 80 GB | 3.35 TB/s | Gen5 | | `h20` | 148 TFLOPS | 96 GB | 4.0 TB/s | Gen5 | +| `h20-141g` | 148 TFLOPS | 141 GB | 4.8 TB/s | Gen5 | | `a100-80gb` | 312 TFLOPS | 80 GB | 2.0 TB/s | Gen4 | | `a100-40gb` | 312 TFLOPS | 40 GB | 1.555 TB/s | Gen4 | | `b200` | 2.25 PFLOPS| 192 GB | 8.0 TB/s | Gen6 | @@ -297,6 +299,7 @@ memory_time = layers * weight_bytes_per_layer / gpu_mem_bw | Config | Model | Hardware | Instances | Trace | |--------|-------|----------|-----------|-------| | `glm5-8xb200-hf.yaml` | GLM-5 via HF config.json | 8xB200 preset | 32 | GLM coder blk512 | +| `glm5-fp8-8xh20-141g.yaml` | GLM-5-FP8 via ModelScope config.json | 8xH20-141G preset | 128 | GLM coder blk512 | | `glm5-nvfp4-8xb300.yaml` | GLM-5-NVFP4 via HF config.json | 8xB300 preset | 8 | GLM coder blk512 | | `qwen3-coder-480b-8xh20.yaml` | Qwen3-Coder via HF | 8xH20 preset | 32 | Qwen coder blk16 | diff --git a/configs/glm5-fp8-8xh20-141g-0-32k-n56.yaml b/configs/glm5-fp8-8xh20-141g-0-32k-n56.yaml new file mode 100644 index 0000000..89f5e72 --- /dev/null +++ b/configs/glm5-fp8-8xh20-141g-0-32k-n56.yaml @@ -0,0 +1,35 @@ +# GLM-5-FP8 on 8 x H20-141G for the 0-32k bucket. +# Chosen to keep the best policy's mean TTFT below 5s. + +model: + config_json: ../models/GLM-5-FP8/config.json + name: glm-5-fp8 + compute_dtype: fp8 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xh20-141g + hbm_bytes: 300.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 56 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_fp8_8xh20_141g_ablation_0_32768_n56 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 0 + input_length_max: 32768 diff --git a/configs/glm5-fp8-8xh20-141g-ca-tuned.yaml b/configs/glm5-fp8-8xh20-141g-ca-tuned.yaml new file mode 100644 index 0000000..6d39ad8 --- /dev/null +++ b/configs/glm5-fp8-8xh20-141g-ca-tuned.yaml @@ -0,0 +1,38 @@ +# GLM-5-FP8 (ZhipuAI/GLM-5-FP8) on 8 x H20-141G (N3E). +# Tuned for the 0-32768 input-length slice of +# bailian-traces/glm_coder_blksz_512_040915-040917.jsonl. + +model: + config_json: ../models/GLM-5-FP8/config.json + name: glm-5-fp8 + compute_dtype: fp8 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xh20-141g + hbm_bytes: 300.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 64 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + # Tuned on this filtered GLM coder workload: stronger queue penalty than + # the default 1.0 keeps cache_affinity's locality gains while reducing TTFT. + load_alpha: 1.5 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_fp8_8xh20_141g_ca_tuned + sample_interval_s: 1.0 + seed: 42 + input_length_min: 0 + input_length_max: 32768 diff --git a/configs/glm5-fp8-8xh20-141g-l1-medium.yaml b/configs/glm5-fp8-8xh20-141g-l1-medium.yaml new file mode 100644 index 0000000..4bb3ea7 --- /dev/null +++ b/configs/glm5-fp8-8xh20-141g-l1-medium.yaml @@ -0,0 +1,35 @@ +# GLM-5-FP8 on 8 x H20-141G, 0-32768 slice. +# Analysis config: medium L1 (~10% of the default DRAM KV budget). + +model: + config_json: ../models/GLM-5-FP8/config.json + name: glm-5-fp8 + compute_dtype: fp8 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xh20-141g + hbm_bytes: 300.0e9 + dram_bytes: 1.5e11 + max_batch_slots: 256 + +cluster: + num_instances: 64 + meta_store: + ttl_seconds: 300.0 + router: + mode: min_pd + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_fp8_8xh20_141g_l1_medium + sample_interval_s: 1.0 + seed: 42 + input_length_min: 0 + input_length_max: 32768 diff --git a/configs/glm5-fp8-8xh20-141g-l1-none.yaml b/configs/glm5-fp8-8xh20-141g-l1-none.yaml new file mode 100644 index 0000000..88a3475 --- /dev/null +++ b/configs/glm5-fp8-8xh20-141g-l1-none.yaml @@ -0,0 +1,35 @@ +# GLM-5-FP8 on 8 x H20-141G, 0-32768 slice. +# Analysis config: effectively disable L1/remote KV by shrinking DRAM to ~1 block. + +model: + config_json: ../models/GLM-5-FP8/config.json + name: glm-5-fp8 + compute_dtype: fp8 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xh20-141g + hbm_bytes: 300.0e9 + dram_bytes: 1.0 + max_batch_slots: 256 + +cluster: + num_instances: 64 + meta_store: + ttl_seconds: 300.0 + router: + mode: min_pd + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_fp8_8xh20_141g_l1_none + sample_interval_s: 1.0 + seed: 42 + input_length_min: 0 + input_length_max: 32768 diff --git a/configs/glm5-fp8-8xh20-141g-l1-small.yaml b/configs/glm5-fp8-8xh20-141g-l1-small.yaml new file mode 100644 index 0000000..773d6d2 --- /dev/null +++ b/configs/glm5-fp8-8xh20-141g-l1-small.yaml @@ -0,0 +1,35 @@ +# GLM-5-FP8 on 8 x H20-141G, 0-32768 slice. +# Analysis config: small L1 (~1% of the default DRAM KV budget). + +model: + config_json: ../models/GLM-5-FP8/config.json + name: glm-5-fp8 + compute_dtype: fp8 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xh20-141g + hbm_bytes: 300.0e9 + dram_bytes: 1.5e10 + max_batch_slots: 256 + +cluster: + num_instances: 64 + meta_store: + ttl_seconds: 300.0 + router: + mode: min_pd + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_fp8_8xh20_141g_l1_small + sample_interval_s: 1.0 + seed: 42 + input_length_min: 0 + input_length_max: 32768 diff --git a/configs/glm5-fp8-8xh20-141g.yaml b/configs/glm5-fp8-8xh20-141g.yaml new file mode 100644 index 0000000..c4a30f0 --- /dev/null +++ b/configs/glm5-fp8-8xh20-141g.yaml @@ -0,0 +1,39 @@ +# GLM-5-FP8 (ZhipuAI/GLM-5-FP8) on 8 x H20-141G (N3E). +# Architecture auto-loaded from the upstream ModelScope config.json. +# +# 8 x 141 GB = 1128 GB total HBM. With ~744 GB FP8 weights resident, +# keep the KV budget conservative to leave room for scales, BF16 holdouts, +# allocator slack, and runtime activations. + +model: + config_json: ../models/GLM-5-FP8/config.json + name: glm-5-fp8 + compute_dtype: fp8 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xh20-141g + hbm_bytes: 300.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 64 + meta_store: + ttl_seconds: 300.0 + router: + mode: min_pd + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_fp8_8xh20_141g + sample_interval_s: 1.0 + seed: 42 + input_length_min: 0 + input_length_max: 32768 diff --git a/configs/glm5-nvfp4-8xb200-32k-85k-n5.yaml b/configs/glm5-nvfp4-8xb200-32k-85k-n5.yaml new file mode 100644 index 0000000..4415767 --- /dev/null +++ b/configs/glm5-nvfp4-8xb200-32k-85k-n5.yaml @@ -0,0 +1,36 @@ +# GLM-5-NVFP4 on 8 x B200 for the 32k-85k bucket. +# Chosen to keep the best policy's mean TTFT below 5s. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb200 + hbm_bytes: 1150.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 5 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_8xb200_ablation_32769_87040_n5 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 32769 + input_length_max: 87040 diff --git a/configs/glm5-nvfp4-8xb200.yaml b/configs/glm5-nvfp4-8xb200.yaml new file mode 100644 index 0000000..e1fe509 --- /dev/null +++ b/configs/glm5-nvfp4-8xb200.yaml @@ -0,0 +1,36 @@ +# GLM-5-NVFP4 (nvidia/GLM-5-NVFP4) on 8 x B200 (192GB each). +# Architecture auto-loaded from HuggingFace config.json. +# +# FP4 weights: ~744B params * 0.5 bytes = ~372 GB across 8 GPUs. +# Total HBM: 8 * 192 GB = 1536 GB. Keep the KV budget below the raw +# remainder to leave room for runtime activations and allocator slack. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb200 + hbm_bytes: 1150.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 8 + meta_store: + ttl_seconds: 300.0 + router: + mode: prefix_affinity + prefix_k: 8 + load_alpha: 1.0 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_8xb200 + sample_interval_s: 1.0 + seed: 42 diff --git a/configs/glm5-nvfp4-8xb300-128k-plus-n1.yaml b/configs/glm5-nvfp4-8xb300-128k-plus-n1.yaml new file mode 100644 index 0000000..81d1fe3 --- /dev/null +++ b/configs/glm5-nvfp4-8xb300-128k-plus-n1.yaml @@ -0,0 +1,37 @@ +# GLM-5-NVFP4 on 8 x B300 for the 128k++ bucket. +# A single instance already keeps mean TTFT below 5s, and routing is +# effectively irrelevant at N=1 because every request lands on the same node. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb300 + hbm_bytes: 1900.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 1 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity + precise_probe_latency_us: 50.0 + precise_probe_topk: 1 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_8xb300_ablation_131073_plus_n1 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 131073 + input_length_max: 4294967295 diff --git a/configs/glm5-nvfp4-8xb300-85k-128k-n2.yaml b/configs/glm5-nvfp4-8xb300-85k-128k-n2.yaml new file mode 100644 index 0000000..928248c --- /dev/null +++ b/configs/glm5-nvfp4-8xb300-85k-128k-n2.yaml @@ -0,0 +1,36 @@ +# GLM-5-NVFP4 on 8 x B300 for the 85k-128k bucket. +# Chosen to keep the best policy's mean TTFT below 5s. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb300 + hbm_bytes: 1900.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 2 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity_strong_only + precise_probe_latency_us: 50.0 + precise_probe_topk: 4 + load_alpha: 1.0 + prefix_k: 8 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_8xb300_ablation_87041_131072_n2 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 87041 + input_length_max: 131072 diff --git a/configs/glm5-nvfp4-8xb300.yaml b/configs/glm5-nvfp4-8xb300.yaml index 99398ed..7421158 100644 --- a/configs/glm5-nvfp4-8xb300.yaml +++ b/configs/glm5-nvfp4-8xb300.yaml @@ -7,7 +7,8 @@ model: config_json: ../models/GLM-5-NVFP4/config.json name: glm-5-nvfp4 - compute_dtype: fp4 # FP4 weights → selects FP4 tensor core FLOPS + compute_dtype: fp8 # FP8 tensor-core execution + weight_dtype: fp4 # NVFP4 weights still set the HBM budget dtype_bytes: 1 # FP8 KV cache block_size_tokens: 512 diff --git a/configs/glm5-nvfp4-fp8compute-8xb200-32k-85k-n9.yaml b/configs/glm5-nvfp4-fp8compute-8xb200-32k-85k-n9.yaml new file mode 100644 index 0000000..14e2f46 --- /dev/null +++ b/configs/glm5-nvfp4-fp8compute-8xb200-32k-85k-n9.yaml @@ -0,0 +1,32 @@ +# GLM-5-NVFP4 on 8 x B200 for the 32k-85k bucket. +# NVFP4 weights, FP8 compute. Chosen to keep the best policy below 5 s TTFT. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb200 + hbm_bytes: 1150.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 9 + meta_store: + ttl_seconds: 300.0 + router: + mode: min_pd + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_fp8compute_8xb200_ablation_32769_87040_n9 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 32769 + input_length_max: 87040 diff --git a/configs/glm5-nvfp4-fp8compute-8xb200.yaml b/configs/glm5-nvfp4-fp8compute-8xb200.yaml new file mode 100644 index 0000000..8e35cbd --- /dev/null +++ b/configs/glm5-nvfp4-fp8compute-8xb200.yaml @@ -0,0 +1,32 @@ +# GLM-5-NVFP4 on 8 x B200 with FP8 tensor-core compute. +# Weights remain stored in NVFP4, so HBM budget follows FP4 storage. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb200 + hbm_bytes: 1150.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 8 + meta_store: + ttl_seconds: 300.0 + router: + mode: prefix_affinity + prefix_k: 8 + load_alpha: 1.0 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_fp8compute_8xb200 + sample_interval_s: 1.0 + seed: 42 diff --git a/configs/glm5-nvfp4-fp8compute-8xb300-128k-plus-n1.yaml b/configs/glm5-nvfp4-fp8compute-8xb300-128k-plus-n1.yaml new file mode 100644 index 0000000..17cda40 --- /dev/null +++ b/configs/glm5-nvfp4-fp8compute-8xb300-128k-plus-n1.yaml @@ -0,0 +1,33 @@ +# GLM-5-NVFP4 on 8 x B300 for the 128k++ bucket. +# NVFP4 weights, FP8 compute. Routing is effectively irrelevant at one instance. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb300 + hbm_bytes: 1900.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 1 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity + precise_probe_topk: 1 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_fp8compute_8xb300_ablation_131073_plus_n1 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 131073 + input_length_max: 4294967295 diff --git a/configs/glm5-nvfp4-fp8compute-8xb300-85k-128k-n4.yaml b/configs/glm5-nvfp4-fp8compute-8xb300-85k-128k-n4.yaml new file mode 100644 index 0000000..be61e23 --- /dev/null +++ b/configs/glm5-nvfp4-fp8compute-8xb300-85k-128k-n4.yaml @@ -0,0 +1,32 @@ +# GLM-5-NVFP4 on 8 x B300 for the 85k-128k bucket. +# NVFP4 weights, FP8 compute. Chosen to keep the best policy below 5 s TTFT. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb300 + hbm_bytes: 1900.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 4 + meta_store: + ttl_seconds: 300.0 + router: + mode: cache_affinity + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_fp8compute_8xb300_ablation_87041_131072_n4 + sample_interval_s: 1.0 + seed: 42 + input_length_min: 87041 + input_length_max: 131072 diff --git a/configs/glm5-nvfp4-fp8compute-8xb300.yaml b/configs/glm5-nvfp4-fp8compute-8xb300.yaml new file mode 100644 index 0000000..efba60e --- /dev/null +++ b/configs/glm5-nvfp4-fp8compute-8xb300.yaml @@ -0,0 +1,32 @@ +# GLM-5-NVFP4 on 8 x B300 with FP8 tensor-core compute. +# Weights remain stored in NVFP4, so HBM budget follows FP4 storage. + +model: + config_json: ../models/GLM-5-NVFP4/config.json + name: glm-5-nvfp4 + compute_dtype: fp8 + weight_dtype: fp4 + dtype_bytes: 1 + block_size_tokens: 512 + +hardware: + type: 8xb300 + hbm_bytes: 1900.0e9 + dram_bytes: 1.5e12 + max_batch_slots: 256 + +cluster: + num_instances: 8 + meta_store: + ttl_seconds: 300.0 + router: + mode: prefix_affinity + prefix_k: 8 + load_alpha: 1.0 + +sim: + trace_path: bailian-traces/glm_coder_blksz_512_040915-040917.jsonl + max_requests: null + output_dir: runs/glm5_nvfp4_fp8compute_8xb300 + sample_interval_s: 1.0 + seed: 42 diff --git a/models/GLM-5-FP8/config.json b/models/GLM-5-FP8/config.json new file mode 100644 index 0000000..1948d70 --- /dev/null +++ b/models/GLM-5-FP8/config.json @@ -0,0 +1,68 @@ +{ + "architectures": [ + "GlmMoeDsaForCausalLM" + ], + "attention_bias": false, + "attention_dropout": 0.0, + "dtype": "bfloat16", + "eos_token_id": [ + 154820, + 154827, + 154829 + ], + "ep_size": 1, + "first_k_dense_replace": 3, + "hidden_act": "silu", + "head_dim": 64, + "hidden_size": 6144, + "index_head_dim": 128, + "index_n_heads": 32, + "index_topk": 2048, + "indexer_rope_interleave": true, + "initializer_range": 0.02, + "intermediate_size": 12288, + "kv_lora_rank": 512, + "max_position_embeddings": 202752, + "moe_intermediate_size": 2048, + "moe_layer_freq": 1, + "model_type": "glm_moe_dsa", + "n_group": 1, + "n_routed_experts": 256, + "n_shared_experts": 1, + "norm_topk_prob": true, + "num_attention_heads": 64, + "num_experts_per_tok": 8, + "num_hidden_layers": 78, + "num_key_value_heads": 64, + "num_nextn_predict_layers": 1, + "pad_token_id": 154820, + "pretraining_tp": 1, + "q_lora_rank": 2048, + "qk_head_dim": 256, + "qk_nope_head_dim": 192, + "qk_rope_head_dim": 64, + "quantization_config": { + "activation_scheme": "dynamic", + "fmt": "e4m3", + "quant_method": "fp8", + "weight_block_size": [ + 128, + 128 + ] + }, + "rms_norm_eps": 1e-05, + "rope_interleave": true, + "rope_parameters": { + "rope_theta": 1000000, + "rope_type": "default" + }, + "routed_scaling_factor": 2.5, + "scoring_func": "sigmoid", + "tie_word_embeddings": false, + "topk_group": 1, + "topk_method": "noaux_tc", + "transformers_version": "5.0.2.dev0", + "use_cache": true, + "v_head_dim": 256, + "vocab_size": 154880 +} diff --git a/src/cluster/cluster.rs b/src/cluster/cluster.rs index 2d3a98e..9927cdc 100644 --- a/src/cluster/cluster.rs +++ b/src/cluster/cluster.rs @@ -54,7 +54,11 @@ impl Cluster { router, block_size_tokens: model.block_size_tokens, kv_block_bytes: model.kv_block_bytes(), - ttft_model: TtftModel::new(&config.hardware, &config.calibration, model.kv_block_bytes()), + ttft_model: TtftModel::new( + &config.hardware, + &config.calibration, + model.kv_block_bytes(), + ), } } @@ -256,6 +260,8 @@ mod tests { let req = RequestRecord { req_id: 1, chat_id: 0, + parent_chat_id: -1, + turn: 1, arrival: 0.0, input_len: 32, output_len: 16, @@ -269,7 +275,10 @@ mod tests { .insert_blocks(&req.hash_ids, &mut evicted); let stats = cluster.route_and_admit(&req, 0.0); - let pure_pcie = cluster.instances[0].links.pcie.cost(cluster.kv_block_bytes * 2); + let pure_pcie = cluster.instances[0] + .links + .pcie + .cost(cluster.kv_block_bytes * 2); assert!(stats.ready_at > pure_pcie); } diff --git a/src/config.rs b/src/config.rs index f922a0d..562ac5e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -59,12 +59,17 @@ pub struct ModelConfig { #[serde(default)] pub attention: Option, - /// Compute / weight precision: `"bf16"` (default), `"fp8"`, or `"fp4"`. - /// Controls which hardware FLOPS tier to use (`gpu_fp4_flops`, etc.) and - /// the weight-bytes-per-parameter for the memory-bound roofline check. + /// Compute precision: `"bf16"` (default), `"fp8"`, or `"fp4"`. + /// Controls which hardware FLOPS tier to use (`gpu_fp4_flops`, etc.). /// Independent of `dtype_bytes`, which sizes the KV cache. #[serde(default)] pub compute_dtype: Option, + /// Weight storage precision: `"bf16"`, `"fp8"`, or `"fp4"`. + /// When absent, falls back to `compute_dtype` for backward compatibility. + /// This lets the simulator represent deployments such as FP4/NVFP4 + /// weights with FP8 tensor-core execution. + #[serde(default)] + pub weight_dtype: Option, // -- Legacy manual coefficients (used when hidden_size is absent) --------- #[serde(default)] @@ -88,20 +93,33 @@ impl ModelConfig { self.hidden_size.is_some() } - /// Bytes per parameter for weight storage, derived from `compute_dtype`. - /// - /// - `"fp4"` → 0.5 - /// - `"fp8"` → 1.0 - /// - `"bf16"` / absent → `dtype_bytes` (backward-compatible) - pub fn weight_dtype_bytes(&self) -> f64 { - match self.compute_dtype.as_deref() { - Some("fp4") => 0.5, - Some("fp8") => 1.0, - Some("bf16") => 2.0, - _ => self.dtype_bytes as f64, // backward compat + fn precision_bytes(dtype: &str) -> Option { + match dtype { + "fp4" => Some(0.5), + "fp8" => Some(1.0), + "bf16" => Some(2.0), + _ => None, } } + /// Bytes per parameter for weight storage. + /// + /// Priority: + /// - `weight_dtype` + /// - `compute_dtype` (backward compatibility) + /// - `dtype_bytes` fallback + pub fn weight_dtype_bytes(&self) -> f64 { + self.weight_dtype + .as_deref() + .and_then(Self::precision_bytes) + .or_else(|| { + self.compute_dtype + .as_deref() + .and_then(Self::precision_bytes) + }) + .unwrap_or(self.dtype_bytes as f64) + } + /// Bytes of KV cache per block. /// /// For standard / GQA: `2 * L * kv_heads * head_dim * dtype * block_tokens` @@ -433,6 +451,8 @@ pub enum RouterMode { CacheScoreTtl, EstimatedTtft, PrefixAffinity, + AdaptiveAffinity, + LineageAffinity, } impl RouterMode { @@ -454,6 +474,8 @@ impl RouterMode { "cache_score_ttl" | "csttl" | "cs_ttl" => Ok(Self::CacheScoreTtl), "estimated_ttft" | "ettft" | "optimal" => Ok(Self::EstimatedTtft), "prefix_affinity" | "affinity" | "pa" => Ok(Self::PrefixAffinity), + "adaptive_affinity" | "aa" => Ok(Self::AdaptiveAffinity), + "lineage_affinity" | "la" => Ok(Self::LineageAffinity), other => Err(anyhow::anyhow!("unknown router mode: {other}")), } } @@ -476,6 +498,8 @@ impl RouterMode { Self::CacheScoreTtl => "cache_score_ttl", Self::EstimatedTtft => "estimated_ttft", Self::PrefixAffinity => "prefix_affinity", + Self::AdaptiveAffinity => "adaptive_affinity", + Self::LineageAffinity => "lineage_affinity", } } } @@ -583,6 +607,8 @@ struct RawModelConfig { #[serde(default)] compute_dtype: Option, #[serde(default)] + weight_dtype: Option, + #[serde(default)] flops_per_token_decode: Option, #[serde(default)] bytes_per_token_decode: Option, @@ -721,6 +747,9 @@ impl RawModelConfig { if self.compute_dtype.is_some() { m.compute_dtype = self.compute_dtype; } + if self.weight_dtype.is_some() { + m.weight_dtype = self.weight_dtype; + } if let Some(v) = self.flops_per_token_decode { m.flops_per_token_decode = Some(v); } @@ -737,6 +766,18 @@ impl RawModelConfig { m.block_size_tokens > 0, "model.block_size_tokens is required (not in HF config.json)" ); + if let Some(dtype) = m.compute_dtype.as_deref() { + anyhow::ensure!( + ModelConfig::precision_bytes(dtype).is_some(), + "model.compute_dtype must be one of: bf16, fp8, fp4" + ); + } + if let Some(dtype) = m.weight_dtype.as_deref() { + anyhow::ensure!( + ModelConfig::precision_bytes(dtype).is_some(), + "model.weight_dtype must be one of: bf16, fp8, fp4" + ); + } Ok(m) } @@ -925,4 +966,41 @@ sim: assert!((cfg.calibration.rdma_bw_util - 0.63).abs() < 1e-12); assert!((cfg.calibration.scheduler_base_overhead_us - 123.0).abs() < 1e-12); } + + #[test] + fn weight_dtype_can_differ_from_compute_dtype() { + let path = write_temp_config( + r#" +model: + name: test + num_layers: 4 + num_kv_heads: 2 + head_dim: 64 + dtype_bytes: 1 + block_size_tokens: 16 + compute_dtype: fp8 + weight_dtype: fp4 + flops_per_token_prefill: 1.0e9 + attn_quadratic_coeff: 64.0 +hardware: + gpu_flops: 1.0e14 + gpu_mem_bw: 1.0e12 + hbm_bytes: 1.0e9 +cluster: + num_instances: 2 + meta_store: + ttl_seconds: 10.0 + router: + mode: estimated_ttft +sim: + trace_path: trace.jsonl + output_dir: runs/test +"#, + ); + + let cfg = Config::from_yaml_path(&path).unwrap(); + assert_eq!(cfg.model.compute_dtype.as_deref(), Some("fp8")); + assert_eq!(cfg.model.weight_dtype.as_deref(), Some("fp4")); + assert!((cfg.model.weight_dtype_bytes() - 0.5).abs() < 1e-12); + } } diff --git a/src/driver.rs b/src/driver.rs index 6410570..6c1c064 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -1,9 +1,10 @@ //! Simulation driver: pulls trace records, advances the event queue, runs //! instance batch ticks, and emits metrics. -use anyhow::Result; -use std::collections::HashMap; +use anyhow::{anyhow, Context, Result}; +use std::collections::{HashMap, VecDeque}; use std::path::Path; +use std::sync::{Arc, Mutex}; use crate::cluster::Cluster; use crate::config::{Config, RouterMode}; @@ -19,10 +20,7 @@ use crate::trace::{RequestRecord, TraceReader}; /// Drop records whose `input_len` falls outside `sim.input_length_{min,max}`. /// Used to carve an ablation onto a specific input-length bucket (e.g. 0–40k) /// without rewriting the trace file. No-op if both bounds are unset. -pub fn apply_input_length_filter( - records: &mut Vec, - cfg: &crate::config::SimConfig, -) { +pub fn apply_input_length_filter(records: &mut Vec, cfg: &crate::config::SimConfig) { let lo = cfg.input_length_min.unwrap_or(0); let hi = cfg.input_length_max.unwrap_or(u32::MAX); if lo == 0 && hi == u32::MAX { @@ -77,7 +75,10 @@ pub fn run(config: &Config, output_subdir: Option<&str>) -> Result { eprintln!( "[driver] input_length filter [{}, {}] kept {}/{} requests", config.sim.input_length_min.unwrap_or(0), - config.sim.input_length_max.map_or("∞".to_string(), |v| v.to_string()), + config + .sim + .input_length_max + .map_or("∞".to_string(), |v| v.to_string()), records.len(), raw_count, ); @@ -206,6 +207,15 @@ pub fn ablate_fixed_placement( base: &Config, routers: &[RouterMode], evict_policies: &[ReplayEvictPolicy], +) -> Result> { + ablate_fixed_placement_with_parallelism(base, routers, evict_policies, 1) +} + +pub fn ablate_fixed_placement_with_parallelism( + base: &Config, + routers: &[RouterMode], + evict_policies: &[ReplayEvictPolicy], + jobs: usize, ) -> Result> { let mut out = Vec::new(); for &policy in evict_policies { @@ -216,18 +226,112 @@ pub fn ablate_fixed_placement( )); } } - for &mode in routers { - let mut cfg = base.clone(); - cfg.cluster.router.mode = mode; - let placement_run = run(&cfg, Some(&format!("{}__placement_lru", mode.as_str())))?; - for &policy in evict_policies { - out.push(AblationRow::from_summary( - mode.as_str(), - policy, - "realized_lru", - &placement_run.summary, - )); + + if routers.is_empty() { + return Ok(out); + } + + let worker_count = resolve_ablation_parallelism(jobs, routers.len()); + if worker_count == 1 { + for &mode in routers { + out.extend(run_ablation_router(base, mode, evict_policies)?); } + return Ok(out); + } + + eprintln!( + "[ablate] running {} routers with {} workers", + routers.len(), + worker_count + ); + + let queue = Arc::new(Mutex::new(VecDeque::from( + routers + .iter() + .copied() + .enumerate() + .collect::>(), + ))); + let mut ordered_results: Vec<(usize, Vec)> = Vec::with_capacity(routers.len()); + + std::thread::scope(|scope| -> Result<()> { + let mut handles = Vec::with_capacity(worker_count); + for worker_idx in 0..worker_count { + let queue = Arc::clone(&queue); + let base = base.clone(); + let policies = evict_policies.to_vec(); + handles.push( + scope.spawn(move || -> Result)>> { + let mut local = Vec::new(); + loop { + let next = queue + .lock() + .map_err(|_| anyhow!("ablation work queue mutex poisoned"))? + .pop_front(); + let Some((idx, mode)) = next else { + break; + }; + eprintln!( + "[ablate] worker {}/{} router={} ...", + worker_idx + 1, + worker_count, + mode.as_str() + ); + let rows = run_ablation_router(&base, mode, &policies) + .with_context(|| format!("ablation router={}", mode.as_str()))?; + local.push((idx, rows)); + } + Ok(local) + }), + ); + } + + for handle in handles { + let local = handle + .join() + .map_err(|_| anyhow!("ablation worker thread panicked"))??; + ordered_results.extend(local); + } + Ok(()) + })?; + + ordered_results.sort_by_key(|(idx, _)| *idx); + for (_, rows) in ordered_results { + out.extend(rows); } Ok(out) } + +fn resolve_ablation_parallelism(jobs: usize, num_routers: usize) -> usize { + if num_routers == 0 { + return 1; + } + let requested = if jobs == 0 { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + } else { + jobs + }; + requested.max(1).min(num_routers) +} + +fn run_ablation_router( + base: &Config, + mode: RouterMode, + evict_policies: &[ReplayEvictPolicy], +) -> Result> { + let mut cfg = base.clone(); + cfg.cluster.router.mode = mode; + let placement_run = run(&cfg, Some(&format!("{}__placement_lru", mode.as_str())))?; + let mut rows = Vec::with_capacity(evict_policies.len()); + for &policy in evict_policies { + rows.push(AblationRow::from_summary( + mode.as_str(), + policy, + "realized_lru", + &placement_run.summary, + )); + } + Ok(rows) +} diff --git a/src/hardware_presets.rs b/src/hardware_presets.rs index 783c99d..4758eaa 100644 --- a/src/hardware_presets.rs +++ b/src/hardware_presets.rs @@ -17,6 +17,7 @@ pub const AVAILABLE: &[&str] = &[ "h100", "h800", "h20", + "h20-141g", "a100-80gb", "a100-40gb", "b200", @@ -30,6 +31,9 @@ pub const AVAILABLE: &[&str] = &[ "2xh20", "4xh20", "8xh20", + "2xh20-141g", + "4xh20-141g", + "8xh20-141g", "2xb200", "4xb200", "8xb200", @@ -49,6 +53,7 @@ pub fn resolve(name: &str) -> Option { "h100" => Some(make_config(count, &H100)), "h800" => Some(make_config(count, &H800)), "h20" => Some(make_config(count, &H20)), + "h20141g" | "h20141gb" => Some(make_config(count, &H20_141G)), "a10080gb" | "a100" => Some(make_config(count, &A100_80GB)), "a10040gb" => Some(make_config(count, &A100_40GB)), "b200" => Some(make_config(count, &B200)), @@ -113,6 +118,15 @@ const H20: GpuBase = GpuBase { pcie_gen: 5, }; +const H20_141G: GpuBase = GpuBase { + flops: 1.48e14, // modeled as the same H20 compute envelope + fp8_flops: 2.96e14, // modeled as the same H20 FP8 throughput + fp4_flops: 0.0, // not supported + mem_bw: 4.8e12, // 141 GB HBM variant + hbm: 141.0e9, // 141 GB + pcie_gen: 5, +}; + const A100_80GB: GpuBase = GpuBase { flops: 3.12e14, // 312 TFLOPS BF16 fp8_flops: 0.0, // A100 has no FP8 tensor cores @@ -193,7 +207,11 @@ fn make_config(n: u32, base: &GpuBase) -> HardwareConfig { pcie_latency_us: pcie_lat, rdma_bw: rdma_base * rdma_scale, rdma_latency_us: rdma_lat, - intra_node_tp_bw: if base.pcie_gen >= 6 { 1.8e12 * f } else { 9.0e11 * f }, + intra_node_tp_bw: if base.pcie_gen >= 6 { + 1.8e12 * f + } else { + 9.0e11 * f + }, intra_node_tp_latency_us: if base.pcie_gen >= 6 { 1.0 } else { 2.0 }, tp_degree: n, max_batch_slots: 256, @@ -227,6 +245,7 @@ mod tests { assert!(resolve("H100").is_some()); assert!(resolve("8xB200").is_some()); assert!(resolve("8x-B200").is_some()); + assert!(resolve("8xH20-141G").is_some()); assert!(resolve("a100-80gb").is_some()); assert!(resolve("A100_80GB").is_some()); assert!(resolve("a100_80gb").is_some()); @@ -258,4 +277,13 @@ mod tests { assert!((s4.gpu_mem_bw - s1.gpu_mem_bw * 4.0).abs() < 1.0); assert!((s8.hbm_bytes - s1.hbm_bytes * 8.0).abs() < 1.0); } + + #[test] + fn h20_141g_variant_has_larger_hbm() { + let h20 = resolve("8xh20").unwrap(); + let h20_141g = resolve("8xh20-141g").unwrap(); + assert!((h20_141g.gpu_flops - h20.gpu_flops).abs() < 1.0); + assert!(h20_141g.hbm_bytes > h20.hbm_bytes); + assert!(h20_141g.gpu_mem_bw > h20.gpu_mem_bw); + } } diff --git a/src/instance/compute.rs b/src/instance/compute.rs index fd03d15..34d8e49 100644 --- a/src/instance/compute.rs +++ b/src/instance/compute.rs @@ -301,8 +301,8 @@ impl ComputeModel { let attn_time = attn_total_flops / (self.gpu_flops * self.attention_util.max(1e-6)); let compute_time = linear_time + attn_time + self.num_layers * self.misc_layer_overhead_s; // Weight stream: all layers' active weights read once from HBM. - let mem_time = - self.weight_bytes_per_layer * self.num_layers / (self.gpu_mem_bw * self.hbm_bw_util.max(1e-6)); + let mem_time = self.weight_bytes_per_layer * self.num_layers + / (self.gpu_mem_bw * self.hbm_bw_util.max(1e-6)); let tp_comm_time = if self.tp_collective_count_per_layer > 0.0 && self.tp_bytes_per_token > 0.0 && self.intra_node_tp_bw > 0.0 diff --git a/src/instance/instance.rs b/src/instance/instance.rs index 54acb94..01bfeef 100644 --- a/src/instance/instance.rs +++ b/src/instance/instance.rs @@ -144,17 +144,17 @@ impl Instance { if self.kv_blocks_used + front.reserved_blocks > self.hbm_block_budget { break; } - let r = self.pending.pop_front().unwrap(); - self.kv_blocks_used += r.reserved_blocks; - if r.prefill_tokens_remaining == 0 { - // Full cache hit: nothing to compute. TTFT == fetch time. - let t_done = now + r.completion_tail_s; - let ttft = t_done - r.arrival; - self.kv_blocks_used = self.kv_blocks_used.saturating_sub(r.reserved_blocks); - completed.push((r.req_id, ttft, t_done)); - } else { - self.prefilling.push_back(r); - } + let r = self.pending.pop_front().unwrap(); + self.kv_blocks_used += r.reserved_blocks; + if r.prefill_tokens_remaining == 0 { + // Full cache hit: nothing to compute. TTFT == fetch time. + let t_done = now + r.completion_tail_s; + let ttft = t_done - r.arrival; + self.kv_blocks_used = self.kv_blocks_used.saturating_sub(r.reserved_blocks); + completed.push((r.req_id, ttft, t_done)); + } else { + self.prefilling.push_back(r); + } } // 2. Run one chunked-prefill step on the head of `prefilling`. diff --git a/src/lib.rs b/src/lib.rs index 6245674..2a3a6f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,6 @@ pub mod oracle; pub mod replay; pub mod router; pub mod sim; -pub mod ttft; pub mod trace; +pub mod ttft; pub mod types; diff --git a/src/main.rs b/src/main.rs index 55302df..050616a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,6 +123,10 @@ enum Cmd { /// routers are then run at that cluster size. #[arg(long, default_value = "cache_score")] auto_probe_router: String, + /// Maximum number of routers to simulate concurrently. + /// `0` means auto-detect from available CPU parallelism. + #[arg(long, default_value_t = 0)] + jobs: usize, #[command(flatten)] overrides: ConfigOverrides, }, @@ -168,6 +172,7 @@ fn main() -> Result<()> { auto_target_ttft_mean, auto_candidates, auto_probe_router, + jobs, overrides, } => cmd_ablate( &config, @@ -177,6 +182,7 @@ fn main() -> Result<()> { auto_target_ttft_mean, &auto_candidates, &auto_probe_router, + jobs, &overrides, ), Cmd::Validate { config, overrides } => cmd_validate(&config, &overrides), @@ -218,6 +224,7 @@ fn cmd_ablate( auto_target_ttft_mean: f64, auto_candidates: &str, auto_probe_router: &str, + jobs: usize, overrides: &ConfigOverrides, ) -> Result<()> { let mut base = load(path, overrides)?; @@ -254,12 +261,7 @@ fn cmd_ablate( sorted.sort_unstable(); let probe_mode = RouterMode::parse(auto_probe_router) .with_context(|| format!("parsing --auto-probe-router='{auto_probe_router}'"))?; - let chosen = auto_select_instances( - &base, - &sorted, - probe_mode, - auto_target_ttft_mean, - )?; + let chosen = auto_select_instances(&base, &sorted, probe_mode, auto_target_ttft_mean)?; eprintln!( "[ablate] auto-instances chose num_instances={} (target ttft_mean ≤ {:.3}s, probe_router={})", chosen, @@ -270,7 +272,7 @@ fn cmd_ablate( } eprintln!( - "[ablate] routers={} evict_policies={} num_instances={}", + "[ablate] routers={} evict_policies={} num_instances={} jobs={}", modes .iter() .map(RouterMode::as_str) @@ -282,8 +284,13 @@ fn cmd_ablate( .collect::>() .join(","), base.cluster.num_instances, + if jobs == 0 { + "auto".to_string() + } else { + jobs.to_string() + }, ); - let all = driver::ablate_fixed_placement(&base, &modes, &policies)?; + let all = driver::ablate_fixed_placement_with_parallelism(&base, &modes, &policies, jobs)?; let agg_path = std::path::Path::new(&base.sim.output_dir).join("ablation.json"); std::fs::create_dir_all(&base.sim.output_dir)?; std::fs::write(&agg_path, serde_json::to_string_pretty(&all)?)?; @@ -479,7 +486,9 @@ fn cmd_oracle( eprintln!( "[oracle] input_length filter [{}, {}] kept {}/{} requests", cfg.sim.input_length_min.unwrap_or(0), - cfg.sim.input_length_max.map_or("∞".to_string(), |v| v.to_string()), + cfg.sim + .input_length_max + .map_or("∞".to_string(), |v| v.to_string()), records.len(), raw_count, ); diff --git a/src/oracle.rs b/src/oracle.rs index aec53c2..05df0bd 100644 --- a/src/oracle.rs +++ b/src/oracle.rs @@ -221,6 +221,8 @@ mod tests { RequestRecord { req_id: id, chat_id: id as i64, + parent_chat_id: -1, + turn: 1, arrival: t, input_len: (hashes.len() as u32) * 16, output_len: 16, diff --git a/src/router/adaptive_affinity.rs b/src/router/adaptive_affinity.rs new file mode 100644 index 0000000..7b94239 --- /dev/null +++ b/src/router/adaptive_affinity.rs @@ -0,0 +1,254 @@ +//! Adaptive affinity routing for coding-agent workloads. +//! +//! The trace has two distinct regimes: +//! +//! 1. Short root prompts with a tiny shared stem. Their *current-request* +//! miss cost is small, so pure TTFT minimization tends to scatter them. +//! That destroys future cache locality for the dominant system prompts. +//! 2. Continuations or already-warm requests with a long local prefix. Their +//! immediate reuse is already visible, so a first-principles TTFT estimate +//! is the right objective. +//! +//! This router separates the two: +//! +//! - Keep a lightweight per-prefix heat map over the first few blocks. +//! - Only when a prefix family is both short and hot do we enforce affinity. +//! - The hot prefix is mapped to a deterministic rendezvous-ranked home set, +//! and the set widens logarithmically as the family gets hotter. +//! - Within that home set we still minimize estimated TTFT, and we fall back +//! to the global TTFT optimum if the affinity choice is clearly overloaded. + +use std::collections::HashMap; + +use crate::cluster::meta_store::MetaStore; +use crate::config::Config; +use crate::instance::Instance; +use crate::router::{CandidateInfo, RouteDecision, Router}; +use crate::trace::RequestRecord; +use crate::ttft::{classify_prefix_tiers, TtftModel}; + +#[derive(Debug, Clone, Copy, Default)] +struct PrefixStat { + seen: u16, + last_seen: f64, +} + +#[derive(Debug, Clone, Copy)] +struct CostedCandidate { + idx: usize, + cost: f64, + reusable_blocks: u32, + queue_len: u32, + rendezvous: u64, +} + +pub struct AdaptiveAffinityRouter { + ttft_model: TtftModel, + fingerprint_k: usize, + short_request_blocks: usize, + warm_prefix_blocks: u32, + hot_threshold: u16, + hot_ttl_s: f64, + max_fan_out: usize, + overload_ratio: f64, + overload_abs_s: f64, + prefix_stats: HashMap, +} + +impl AdaptiveAffinityRouter { + pub fn new(config: &Config) -> Self { + let n = config.cluster.num_instances as usize; + let configured_fan_out = config.cluster.router.affinity_fan_out; + let max_fan_out = if configured_fan_out > 0 { + configured_fan_out.max(2).min(n) + } else { + (n / 8).max(8).min(n) + }; + Self { + ttft_model: TtftModel::new( + &config.hardware, + &config.calibration, + config.model.kv_block_bytes(), + ), + // Coding-trace reuse is dominated by the system prompt stem. + fingerprint_k: config.cluster.router.prefix_k.clamp(1, 4), + short_request_blocks: 12, + warm_prefix_blocks: 8, + hot_threshold: 4, + hot_ttl_s: config.cluster.meta_store.ttl_seconds.max(1.0), + max_fan_out, + overload_ratio: 1.25, + overload_abs_s: 0.25, + prefix_stats: HashMap::new(), + } + } + + fn fingerprint(hash_ids: &[u64], k: usize) -> u64 { + let take = hash_ids.len().min(k).max(1); + let mut fp: u64 = 0xcbf29ce484222325; + for &h in &hash_ids[..hash_ids.len().min(take)] { + fp ^= h; + fp = fp.wrapping_mul(0x100000001b3); + } + fp + } + + fn rendezvous(fp: u64, instance_id: u32) -> u64 { + let mut h = fp ^ (instance_id as u64).wrapping_mul(0x9e3779b97f4a7c15); + h = h.wrapping_add(0x9e3779b97f4a7c15); + h = (h ^ (h >> 30)).wrapping_mul(0xbf58476d1ce4e5b9); + h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb); + h ^ (h >> 31) + } + + fn active_heat(&self, fp: u64, now: f64) -> u16 { + self.prefix_stats + .get(&fp) + .filter(|stat| now - stat.last_seen <= self.hot_ttl_s) + .map(|stat| stat.seen) + .unwrap_or(0) + } + + fn observe(&mut self, fp: u64, now: f64) { + let stat = self.prefix_stats.entry(fp).or_default(); + if now - stat.last_seen > self.hot_ttl_s { + stat.seen = 0; + } + stat.last_seen = now; + stat.seen = stat.seen.saturating_add(1); + } + + fn fan_out(&self, heat: u16, n: usize) -> usize { + if heat < self.hot_threshold { + return 2.min(n); + } + let multiples = (heat / self.hot_threshold).max(1) as u32; + let extra = multiples.ilog2() as usize; + (2 + extra).min(self.max_fan_out).min(n).max(2) + } + + fn candidate_cost( + &self, + req: &RequestRecord, + inst: &Instance, + meta: &MetaStore, + now: f64, + scheduler_s: f64, + ) -> (f64, u32) { + let residency = classify_prefix_tiers(&req.hash_ids, inst, meta, now); + let reusable = + residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks; + let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens); + let kv_prepare = self.ttft_model.kv_prepare_time_s(residency); + let cost = inst.estimated_drain_time() + + scheduler_s + + kv_prepare + + inst.compute.prefill_time(miss_tokens) + + self.ttft_model.first_token_tail_s(); + (cost, reusable) + } + + fn better(a: CostedCandidate, b: CostedCandidate) -> bool { + a.cost < b.cost + || (a.cost == b.cost && a.queue_len < b.queue_len) + || (a.cost == b.cost + && a.queue_len == b.queue_len + && a.reusable_blocks > b.reusable_blocks) + } +} + +impl Router for AdaptiveAffinityRouter { + fn name(&self) -> &'static str { + "adaptive_affinity" + } + + fn route( + &mut self, + req: &RequestRecord, + instances: &[Instance], + meta: &MetaStore, + now: f64, + ) -> RouteDecision { + let n = instances.len(); + let scheduler_s = self.ttft_model.scheduler_overhead_s(n, 3); + let fp = Self::fingerprint(&req.hash_ids, self.fingerprint_k); + let active_heat = self.active_heat(fp, now); + + let mut best_local_l0 = 0u32; + let mut candidates = Vec::with_capacity(n); + let mut scored = Vec::with_capacity(n); + + for (idx, inst) in instances.iter().enumerate() { + let l0_hit = inst.cache.l0.longest_prefix_peek(&req.hash_ids) as u32; + best_local_l0 = best_local_l0.max(l0_hit); + + let (cost, reusable_blocks) = self.candidate_cost(req, inst, meta, now, scheduler_s); + let rendezvous = Self::rendezvous(fp, inst.id); + candidates.push(CandidateInfo { + instance: inst.id, + predicted_prefix: reusable_blocks, + load_blocks: inst.kv_blocks_used, + queue_len: inst.queue_len(), + }); + scored.push(CostedCandidate { + idx, + cost, + reusable_blocks, + queue_len: inst.queue_len(), + rendezvous, + }); + } + + let mut global_best = scored[0]; + for cand in scored.iter().copied().skip(1) { + if Self::better(cand, global_best) { + global_best = cand; + } + } + + let should_affinitize = req.hash_ids.len() <= self.short_request_blocks + && best_local_l0 <= self.warm_prefix_blocks + && active_heat.saturating_add(1) >= self.hot_threshold; + + let (chosen_idx, reason) = if should_affinitize { + let fan_out = self.fan_out(active_heat.saturating_add(1), n); + scored.sort_unstable_by(|a, b| b.rendezvous.cmp(&a.rendezvous)); + + let mut home_best = scored[0]; + for cand in scored.iter().copied().take(fan_out).skip(1) { + let better = Self::better(cand, home_best) + || (cand.cost == home_best.cost + && cand.queue_len == home_best.queue_len + && cand.reusable_blocks == home_best.reusable_blocks + && cand.rendezvous > home_best.rendezvous); + if better { + home_best = cand; + } + } + + let home_cost_ok = + home_best.cost <= global_best.cost * self.overload_ratio + self.overload_abs_s; + if home_cost_ok { + (home_best.idx, "adaptive affinity: hot short prefix homeset") + } else { + ( + global_best.idx, + "adaptive affinity fallback: global min estimated ttft", + ) + } + } else { + (global_best.idx, "global min estimated ttft") + }; + + self.observe(fp, now); + + RouteDecision { + req_id: req.req_id, + mode: "adaptive_affinity", + chosen: instances[chosen_idx].id, + probe_overhead_s: 0.0, + candidates, + reason, + } + } +} diff --git a/src/router/cache_affinity.rs b/src/router/cache_affinity.rs index 8ccc029..5e09d33 100644 --- a/src/router/cache_affinity.rs +++ b/src/router/cache_affinity.rs @@ -59,39 +59,39 @@ pub struct CacheAffinityRouter { } impl CacheAffinityRouter { - pub fn new(load_alpha: f64) -> Self { + pub fn new(load_alpha: f64, fingerprint_k: usize) -> Self { Self { name: "cache_affinity", load_alpha, l0_gamma: 1.0, meta_delta: 0.25, - fingerprint_k: 4, + fingerprint_k: fingerprint_k.max(1), use_rendezvous: true, } } /// Ablation: cache_score-style weights (γ=0.1, δ=0) but keep rendezvous /// tiebreak. Isolates the contribution of deterministic sticky placement. - pub fn weak_with_rendezvous(load_alpha: f64) -> Self { + pub fn weak_with_rendezvous(load_alpha: f64, fingerprint_k: usize) -> Self { Self { name: "cache_affinity_weak_rend", load_alpha, l0_gamma: 0.1, meta_delta: 0.0, - fingerprint_k: 4, + fingerprint_k: fingerprint_k.max(1), use_rendezvous: true, } } /// Ablation: strong cache weights (γ=1.0, δ=0.25) but first-found tiebreak /// instead of rendezvous. Isolates the contribution of reweighting alone. - pub fn strong_no_rendezvous(load_alpha: f64) -> Self { + pub fn strong_no_rendezvous(load_alpha: f64, fingerprint_k: usize) -> Self { Self { name: "cache_affinity_strong_only", load_alpha, l0_gamma: 1.0, meta_delta: 0.25, - fingerprint_k: 4, + fingerprint_k: fingerprint_k.max(1), use_rendezvous: false, } } diff --git a/src/router/estimated_ttft.rs b/src/router/estimated_ttft.rs index 0f8b5bf..bc206bc 100644 --- a/src/router/estimated_ttft.rs +++ b/src/router/estimated_ttft.rs @@ -58,8 +58,11 @@ impl Router for EstimatedTtftRouter { let miss_tokens = residency.miss_blocks.saturating_mul(inst.block_size_tokens); let kv_prepare = self.ttft_model.kv_prepare_time_s(residency); let first_token_tail = self.ttft_model.first_token_tail_s(); - let cost = - drain + scheduler + kv_prepare + inst.compute.prefill_time(miss_tokens) + first_token_tail; + let cost = drain + + scheduler + + kv_prepare + + inst.compute.prefill_time(miss_tokens) + + first_token_tail; candidates.push(CandidateInfo { instance: inst.id, @@ -72,7 +75,8 @@ impl Router for EstimatedTtftRouter { // Minimise (cost, queue_len, -local_prefix). let ql = inst.queue_len(); - let reusable = residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks; + let reusable = + residency.l0_hit_blocks + residency.l1_hit_blocks + residency.remote_hit_blocks; let better = cost < best_cost || (cost == best_cost && ql < best_queue) || (cost == best_cost && ql == best_queue && reusable > best_reuse); diff --git a/src/router/lineage_affinity.rs b/src/router/lineage_affinity.rs new file mode 100644 index 0000000..f8d51e6 --- /dev/null +++ b/src/router/lineage_affinity.rs @@ -0,0 +1,243 @@ +//! Lineage-aware reuse routing for agentic coding workloads. +//! +//! Workload hypothesis: +//! - turn-1 requests are diverse but recur by prefix family and benefit from +//! deterministic home placement instead of diffusion across the cluster; +//! - child requests usually extend the immediately preceding request and +//! should stay on the parent's instance whenever that instance is not +//! clearly overloaded. +//! +//! The router therefore uses three modes: +//! - strong local cache scoring for already-warm requests; +//! - parent stickiness for continuations with a known parent placement; +//! - family homesets (rendezvous-ranked top-K) for cold / weakly-warm +//! requests, with a global fallback if the homeset is substantially worse. + +use std::collections::HashMap; + +use crate::cluster::meta_store::MetaStore; +use crate::config::Config; +use crate::instance::Instance; +use crate::router::{local_l0_scores, CandidateInfo, RouteDecision, Router}; +use crate::trace::RequestRecord; + +#[derive(Debug, Clone, Copy, Default)] +struct FamilyStat { + seen: u16, + last_seen: f64, +} + +#[derive(Debug, Clone, Copy)] +struct CandidateCost { + idx: usize, + cost: f64, + l0_hit: u32, + meta_only: u32, + queue_len: u32, + rendezvous: u64, +} + +pub struct LineageAffinityRouter { + load_alpha: f64, + l0_gamma: f64, + meta_delta: f64, + fingerprint_k: usize, + warm_prefix_blocks: u32, + hot_ttl_s: f64, + max_fan_out: usize, + parent_cost_slack: f64, + homeset_cost_slack: f64, + family_stats: HashMap, + request_home: HashMap, +} + +impl LineageAffinityRouter { + pub fn new(config: &Config) -> Self { + let n = config.cluster.num_instances as usize; + let configured_fan_out = config.cluster.router.affinity_fan_out; + let max_fan_out = if configured_fan_out > 0 { + configured_fan_out.max(2).min(n) + } else { + (n / 8).max(8).min(n) + }; + Self { + load_alpha: config.cluster.router.load_alpha.max(1.0), + l0_gamma: 1.0, + meta_delta: 0.25, + fingerprint_k: config.cluster.router.prefix_k.clamp(2, 8), + warm_prefix_blocks: 12, + hot_ttl_s: config.cluster.meta_store.ttl_seconds.max(1.0), + max_fan_out, + // Measured in the same score units as α·queue − γ·hit. + parent_cost_slack: 6.0, + homeset_cost_slack: 2.0, + family_stats: HashMap::new(), + request_home: HashMap::new(), + } + } + + fn fingerprint(hash_ids: &[u64], k: usize) -> u64 { + let take = hash_ids.len().min(k).max(1); + let mut fp: u64 = 0xcbf29ce484222325; + for &h in &hash_ids[..take] { + fp ^= h; + fp = fp.wrapping_mul(0x100000001b3); + } + fp + } + + fn rendezvous(fp: u64, instance_id: u32) -> u64 { + let mut h = fp ^ (instance_id as u64).wrapping_mul(0x9e3779b97f4a7c15); + h = h.wrapping_add(0x9e3779b97f4a7c15); + h = (h ^ (h >> 30)).wrapping_mul(0xbf58476d1ce4e5b9); + h = (h ^ (h >> 27)).wrapping_mul(0x94d049bb133111eb); + h ^ (h >> 31) + } + + fn active_heat(&self, fp: u64, now: f64) -> u16 { + self.family_stats + .get(&fp) + .filter(|stat| now - stat.last_seen <= self.hot_ttl_s) + .map(|stat| stat.seen) + .unwrap_or(0) + } + + fn observe(&mut self, fp: u64, now: f64) { + let stat = self.family_stats.entry(fp).or_default(); + if now - stat.last_seen > self.hot_ttl_s { + stat.seen = 0; + } + stat.last_seen = now; + stat.seen = stat.seen.saturating_add(1); + } + + fn fan_out(&self, heat: u16, n: usize) -> usize { + let base = 2usize; + let extra = match heat { + 0..=1 => 0, + 2..=3 => 1, + 4..=7 => 2, + 8..=15 => 3, + _ => 4, + }; + (base + extra).min(self.max_fan_out).min(n).max(2) + } + + fn better(a: CandidateCost, b: CandidateCost) -> bool { + a.cost < b.cost + || (a.cost == b.cost && a.l0_hit > b.l0_hit) + || (a.cost == b.cost && a.l0_hit == b.l0_hit && a.queue_len < b.queue_len) + || (a.cost == b.cost + && a.l0_hit == b.l0_hit + && a.queue_len == b.queue_len + && a.meta_only > b.meta_only) + } +} + +impl Router for LineageAffinityRouter { + fn name(&self) -> &'static str { + "lineage_affinity" + } + + fn route( + &mut self, + req: &RequestRecord, + instances: &[Instance], + meta: &MetaStore, + now: f64, + ) -> RouteDecision { + let n = instances.len(); + let l0 = local_l0_scores(req, instances); + let meta_scores = meta.score_prefix(&req.hash_ids, now, n); + let family_fp = Self::fingerprint(&req.hash_ids, self.fingerprint_k); + let family_heat = self.active_heat(family_fp, now).saturating_add(1); + let parent_home = if req.parent_chat_id >= 0 { + self.request_home.get(&req.parent_chat_id).copied() + } else { + None + }; + + let mut candidates = Vec::with_capacity(n); + let mut scored = Vec::with_capacity(n); + let mut best_local_l0 = 0u32; + + for (idx, inst) in instances.iter().enumerate() { + let l0_hit = l0[idx]; + best_local_l0 = best_local_l0.max(l0_hit); + let meta_only = meta_scores[idx].saturating_sub(l0_hit); + let queue_len = inst.queue_len(); + let cost = self.load_alpha * queue_len as f64 + - self.l0_gamma * l0_hit as f64 + - self.meta_delta * meta_only as f64; + let rend = Self::rendezvous(family_fp, inst.id); + candidates.push(CandidateInfo { + instance: inst.id, + predicted_prefix: l0_hit, + load_blocks: inst.kv_blocks_used, + queue_len, + }); + scored.push(CandidateCost { + idx, + cost, + l0_hit, + meta_only, + queue_len, + rendezvous: rend, + }); + } + + let mut global_best = scored[0]; + for cand in scored.iter().copied().skip(1) { + if Self::better(cand, global_best) { + global_best = cand; + } + } + + let mut chosen = global_best; + let reason = if let Some(parent_inst) = parent_home { + let parent = scored[parent_inst as usize]; + if parent.cost <= global_best.cost + self.parent_cost_slack { + chosen = parent; + "lineage affinity: parent stickiness" + } else { + "lineage affinity: parent overloaded, global best" + } + } else if best_local_l0 < self.warm_prefix_blocks { + let fan_out = self.fan_out(family_heat, n); + scored.sort_unstable_by(|a, b| b.rendezvous.cmp(&a.rendezvous)); + let mut home_best = scored[0]; + for cand in scored.iter().copied().take(fan_out).skip(1) { + let better = Self::better(cand, home_best) + || (cand.cost == home_best.cost + && cand.l0_hit == home_best.l0_hit + && cand.queue_len == home_best.queue_len + && cand.meta_only == home_best.meta_only + && cand.rendezvous > home_best.rendezvous); + if better { + home_best = cand; + } + } + if home_best.cost <= global_best.cost + self.homeset_cost_slack { + chosen = home_best; + "lineage affinity: family homeset" + } else { + "lineage affinity: homeset overloaded, global best" + } + } else { + "lineage affinity: warm request global locality" + }; + + self.observe(family_fp, now); + self.request_home + .insert(req.chat_id, instances[chosen.idx].id); + + RouteDecision { + req_id: req.req_id, + mode: "lineage_affinity", + chosen: instances[chosen.idx].id, + probe_overhead_s: 0.0, + candidates, + reason, + } + } +} diff --git a/src/router/mod.rs b/src/router/mod.rs index 84d0cc3..7773e3d 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,5 +1,6 @@ //! Cluster-level routing strategies. +pub mod adaptive_affinity; pub mod cache_affinity; pub mod cache_load; pub mod cache_score; @@ -7,6 +8,7 @@ pub mod cache_score_ttl; pub mod estimated_ttft; pub mod least_loaded; pub mod least_tokens; +pub mod lineage_affinity; pub mod min_pd; pub mod precise_aware; pub mod prefix_affinity; @@ -79,13 +81,15 @@ pub fn build(full: &Config, seed: u64) -> Box { MinPd => Box::new(min_pd::MinPdRouter::new()) as Box, LeastTokens => Box::new(least_tokens::LeastTokensRouter::new()) as Box, CacheLoad => Box::new(cache_load::CacheLoadRouter::new()) as Box, - CacheAffinity => Box::new(cache_affinity::CacheAffinityRouter::new(cfg.load_alpha)) - as Box, + CacheAffinity => Box::new(cache_affinity::CacheAffinityRouter::new( + cfg.load_alpha, + cfg.prefix_k, + )) as Box, CacheAffinityWeakRend => Box::new( - cache_affinity::CacheAffinityRouter::weak_with_rendezvous(cfg.load_alpha), + cache_affinity::CacheAffinityRouter::weak_with_rendezvous(cfg.load_alpha, cfg.prefix_k), ) as Box, CacheAffinityStrongOnly => Box::new( - cache_affinity::CacheAffinityRouter::strong_no_rendezvous(cfg.load_alpha), + cache_affinity::CacheAffinityRouter::strong_no_rendezvous(cfg.load_alpha, cfg.prefix_k), ) as Box, CacheScore => Box::new(cache_score::CacheScoreRouter::new( cfg.score_alpha, @@ -96,8 +100,9 @@ pub fn build(full: &Config, seed: u64) -> Box { // offsets one queue position. Demonstrates how much of cache_affinity's // gain is reproducible by just retuning β (no rendezvous, no meta-store // bonus). - CacheScoreStrong => Box::new(cache_score::CacheScoreRouter::new(1.0, 1.0)) - as Box, + CacheScoreStrong => { + Box::new(cache_score::CacheScoreRouter::new(1.0, 1.0)) as Box + } CacheScoreTtl => Box::new(cache_score_ttl::CacheScoreTtlRouter::new( cfg.score_alpha, cfg.score_beta, @@ -108,6 +113,12 @@ pub fn build(full: &Config, seed: u64) -> Box { PrefixAffinity => { Box::new(prefix_affinity::PrefixAffinityRouter::new(full)) as Box } + AdaptiveAffinity => { + Box::new(adaptive_affinity::AdaptiveAffinityRouter::new(full)) as Box + } + LineageAffinity => { + Box::new(lineage_affinity::LineageAffinityRouter::new(full)) as Box + } } } @@ -210,6 +221,8 @@ mod tests { RequestRecord { req_id: 1, chat_id: 0, + parent_chat_id: -1, + turn: 1, arrival: 0.0, input_len: hashes.len() as u32 * 16, output_len: 16, @@ -317,7 +330,10 @@ mod tests { assert_eq!(cache_score.route(&req, &instances, &meta, 0.0).chosen, 1); let mut cache_score_ttl = CacheScoreTtlRouter::new(0.0, 1.0); - assert_eq!(cache_score_ttl.route(&req, &instances, &meta, 0.0).chosen, 0); + assert_eq!( + cache_score_ttl.route(&req, &instances, &meta, 0.0).chosen, + 0 + ); } #[test] diff --git a/src/trace.rs b/src/trace.rs index 6973b50..1432b54 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -21,12 +21,16 @@ struct RawRecord { #[serde(default)] chat_id: i64, #[serde(default)] + parent_chat_id: i64, + #[serde(default)] timestamp: f64, #[serde(default)] input_length: i64, #[serde(default)] output_length: i64, #[serde(default)] + turn: i64, + #[serde(default)] hash_ids: Vec, } @@ -34,6 +38,8 @@ struct RawRecord { pub struct RequestRecord { pub req_id: u64, pub chat_id: i64, + pub parent_chat_id: i64, + pub turn: i64, pub arrival: f64, pub input_len: u32, pub output_len: u32, @@ -88,6 +94,8 @@ impl Iterator for TraceReader { return Some(Ok(RequestRecord { req_id: id, chat_id: raw.chat_id, + parent_chat_id: raw.parent_chat_id, + turn: raw.turn, arrival: raw.timestamp, input_len: raw.input_length.max(0) as u32, output_len: raw.output_length.max(0) as u32, diff --git a/src/ttft.rs b/src/ttft.rs index f2c4f5a..7e15d77 100644 --- a/src/ttft.rs +++ b/src/ttft.rs @@ -79,7 +79,10 @@ impl TtftModel { return 0.0; } let bytes = self.block_bytes(blocks); - self.remote_metadata_s + self.rdma_cost_s(bytes) + self.pcie_cost_s(bytes) + self.layout_transform_s + self.remote_metadata_s + + self.rdma_cost_s(bytes) + + self.pcie_cost_s(bytes) + + self.layout_transform_s } pub fn pcie_cost_s(&self, bytes: u64) -> f64 { diff --git a/tests/smoke.rs b/tests/smoke.rs index c0a75f4..700408d 100644 --- a/tests/smoke.rs +++ b/tests/smoke.rs @@ -221,3 +221,54 @@ fn ablate_rejects_belady_until_exact_algorithm_exists() { "unexpected error: {err:#}" ); } + +#[test] +fn ablation_parallel_matches_serial() { + let tmp = std::env::temp_dir().join("kvcache_sim_ablate_parallel"); + let _ = std::fs::remove_dir_all(&tmp); + std::fs::create_dir_all(&tmp).unwrap(); + let trace_path = tmp.join("trace.jsonl"); + write_synthetic_trace(&trace_path); + + let cfg = base_config( + trace_path.to_str().unwrap(), + tmp.to_str().unwrap(), + RouterMode::Random, + ); + let routers = [ + RouterMode::Random, + RouterMode::LeastLoaded, + RouterMode::TtlAware, + RouterMode::Precise, + ]; + + let serial = driver::ablate_fixed_placement_with_parallelism( + &cfg, + &routers, + &[ReplayEvictPolicy::Lru], + 1, + ) + .expect("serial ablate"); + let parallel = driver::ablate_fixed_placement_with_parallelism( + &cfg, + &routers, + &[ReplayEvictPolicy::Lru], + 2, + ) + .expect("parallel ablate"); + + assert_eq!(parallel.len(), serial.len()); + for (lhs, rhs) in parallel.iter().zip(serial.iter()) { + assert_eq!(lhs.router, rhs.router); + assert_eq!(lhs.evict_policy, rhs.evict_policy); + assert_eq!(lhs.placement_source, rhs.placement_source); + assert!((lhs.ttft_mean - rhs.ttft_mean).abs() < 1e-9); + assert!((lhs.ttft_p50 - rhs.ttft_p50).abs() < 1e-9); + assert!((lhs.ttft_p95 - rhs.ttft_p95).abs() < 1e-9); + assert!((lhs.ttft_p99 - rhs.ttft_p99).abs() < 1e-9); + assert!((lhs.hit_rate_l0 - rhs.hit_rate_l0).abs() < 1e-12); + assert!((lhs.hit_rate_l1 - rhs.hit_rate_l1).abs() < 1e-12); + assert!((lhs.hit_rate_remote - rhs.hit_rate_remote).abs() < 1e-12); + assert!((lhs.miss_rate - rhs.miss_rate).abs() < 1e-12); + } +}