Add leastwork_kappa decode-aware ablation (net-negative, documented)

--policy leastwork_kappa + --kappa (default 2.5e-6, derived from KV ~100KB/tok
/ HBM 4TB/s / TPOT 10ms on H20+Qwen3-30B-A3B): score = prefill_work * (1 +
kappa * ongoing_decode_tokens), modelling decode as a fractional throughput tax
on a new prefill.

Result on the 600s trace: NET-NEGATIVE vs plain leastwork — TTFT p90 +18%,
E2E p90 +14%, balance 1.55x->1.97x, and it does NOT fix the E2E-p99 it targeted.
Decode is too cheap in agentic (output p50~80) for the term to help; it just
bounces heavy reqs off their cache-owner into cold re-prefill. The E2E-p99 tail
is the structural HEAVY+>50k floor (per-class p99 ~51-52k for ALL policies), not
decode interference. Kept in-tree as a documented ablation justifying LPWL's
omission of any decode term; do not revive without a decode-heavy regime.
See analysis/lpwl_5policy_600s.md.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 17:07:23 +08:00
parent 71b0747b3b
commit a0db3cbe77
2 changed files with 94 additions and 1 deletions

View File

@@ -118,6 +118,16 @@ class Settings:
# a reasonable starting weight. Set 0 to disable (original behavior).
lmetric_decode_weight: float = 0.0
# leastwork_kappa: decode-interference coefficient for the parameter-free
# LPWL policy. score = (pending_prefill + new_uncached) * (1 + kappa *
# ongoing_decode_tokens). Models decode as a fractional throughput tax on a
# new prefill (chunked-prefill interleaving + HBM contention). DERIVED from
# hardware, not tuned on the trace:
# kappa ~= (KV_bytes_per_token / HBM_bandwidth) / TPOT
# ~= (100 KB/tok / 4 TB/s) / 10 ms ~= 2.5e-6 per resident decode-KV-token
# (H20 + Qwen3-30B-A3B). Override with --kappa.
kappa: float = 2.5e-6
# --- KV connector selection (governs PD-sep handshake) -------------
# "mooncake": pre-baked kv_transfer_params (bootstrap_addr+engine_id+transfer_id).
# Requires --bootstrap-ports and vLLMs launched with MooncakeConnector.
@@ -424,6 +434,52 @@ def pick_instance_leastwork(instances: list[InstanceState], token_ids: list[int]
return instances[winner[2]], winner[2]
def pick_instance_leastwork_kappa(instances: list[InstanceState], token_ids: list[int] | None,
session_id: str | None, input_length: int,
affinity: dict[str, int]) -> tuple[InstanceState, int]:
"""LPWL + derived decode-interference coefficient (kappa).
ABLATION — NET-NEGATIVE on agentic workloads; kept to document that
decode-awareness is the wrong lever (see analysis/lpwl_5policy_600s.md).
Plain `leastwork` is the default. On the 600s trace this is worse than
leastwork on TTFT p90 (+18%), E2E p90 (+14%) and balance, and does NOT
fix the E2E-p99 it targeted (that tail is the structural HEAVY+>50k floor,
not decode interference). Reason: decode is so cheap in agentic that the
penalty just bounces heavy reqs off their cache-owner into cold re-prefill.
score = (pending_prefill + new_uncached) * (1 + kappa * ongoing_decode_tokens)
Same prefill-work core as `leastwork`, but multiplies by a decode tax:
under chunked prefill, a host's resident decode batch steals a *fraction*
of GPU time from a newly-arriving prefill, so the penalty scales the prefill
work rather than adding to it. This perturbs only requests with real prefill
work (heavy/cold) — exactly the ones that caused leastwork's E2E-p99 tail by
landing on decode-saturated hosts — while leaving tiny cached requests
(prefill_work ~ 0) untouched.
kappa is DERIVED from hardware (SETTINGS.kappa), not tuned on the trace.
Tie-break identical to `leastwork` (num_requests, then round-robin).
"""
global _leastwork_rr_counter
kappa = SETTINGS.kappa
keys: list[tuple[float, int, int]] = []
for i, inst in enumerate(instances):
cache_hit = inst.estimate_cache_hit(token_ids)
new_uncached = max(0, input_length - cache_hit)
prefill_work = inst.pending_prefill_tokens + new_uncached
score = prefill_work * (1.0 + kappa * inst.ongoing_decode_tokens)
keys.append((score, inst.num_requests, i))
best_pair = min(k[:2] for k in keys)
tied = [k for k in keys if k[:2] == best_pair]
if len(tied) > 1:
_leastwork_rr_counter += 1
winner = tied[_leastwork_rr_counter % len(tied)]
else:
winner = tied[0]
return instances[winner[2]], winner[2]
_unified_fallback_rr_counter = 0
@@ -1189,6 +1245,10 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
chosen, best_idx = pick_instance_leastwork(
combined_instances, token_ids, session_id, input_length,
session_affinity_combined)
elif policy == "leastwork_kappa":
chosen, best_idx = pick_instance_leastwork_kappa(
combined_instances, token_ids, session_id, input_length,
session_affinity_combined)
elif policy == "sticky":
chosen, best_idx = pick_instance_sticky(
combined_instances, token_ids, session_id, input_length,
@@ -1642,7 +1702,7 @@ def parse_args():
help="Comma-separated bootstrap ports for combined instances (for offload mode)")
p.add_argument("--policy", type=str, default="linear",
choices=["linear", "lmetric", "load_only", "sticky",
"leastwork",
"leastwork", "leastwork_kappa",
"unified", "unified_kv_both",
"unified_nixl_both", "unified_v2",
"unified_v3"],
@@ -1676,6 +1736,11 @@ def parse_args():
help="Direction B: LMetric fallback adds this × ongoing_decode_tokens"
" to the queue-depth score, so hosts with heavy decode load get"
" penalised. 0 = original behavior; 0.01 is a reasonable start.")
p.add_argument("--kappa", type=float, default=2.5e-6,
help="leastwork_kappa: derived decode-interference coefficient. "
"score = prefill_work × (1 + kappa × ongoing_decode_tokens). "
"Default 2.5e-6 = (KV ~100KB/tok / HBM 4TB/s) / TPOT 10ms on "
"H20+Qwen3-30B-A3B (derived, not trace-tuned).")
p.add_argument("--overload-factor", type=float, default=2.0,
help="Break session affinity when instance load > factor * avg")
# The four flags below are accepted for bench.sh backward compatibility but
@@ -1717,6 +1782,7 @@ if __name__ == "__main__":
SETTINGS.connector_type = getattr(global_args, 'connector_type', 'mooncake')
SETTINGS.v3_prefer_cache_target = bool(getattr(global_args, 'v3_prefer_cache_target', 1))
SETTINGS.lmetric_decode_weight = float(getattr(global_args, 'lmetric_decode_weight', 0.0))
SETTINGS.kappa = float(getattr(global_args, 'kappa', 2.5e-6))
print("SETTINGS: throughput=%.0f rdma_overhead=%.2f offload=%s v3_rotate_affinity=%s "
"connector_type=%s v3_prefer_cache_target=%s lmetric_decode_weight=%.3f" % (
SETTINGS.prefill_throughput, SETTINGS.rdma_overhead_s,