Add session affinity as soft preference in unified routing

Without affinity, all cached requests route to the same instance
(cache source always has lowest prefill cost), causing 149s queue.

Fix: if the session's last instance has cost <= 2x the global best,
use it (preserves cache locality). Only re-route when the affinity
instance is significantly more expensive (overloaded).

The 2x threshold is intentionally loose — it's not a hardcoded magic
number but a "prefer locality unless clearly worse" heuristic.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-24 02:37:58 +08:00
parent 6b255fad91
commit 5892739159

View File

@@ -338,46 +338,45 @@ async def _handle_combined(api, req_data, token_ids, input_length, session_id, h
offload_enabled = getattr(global_args, 'offload', False) and len(combined_instances) >= 2
throughput = SETTINGS.prefill_throughput
# Find the best cache source (instance with highest prefix cache hit)
cache_hits = []
for i, inst in enumerate(combined_instances):
hit = inst.estimate_cache_hit(token_ids)
cache_hits.append(hit)
# Compute cache hits for all instances
cache_hits = [inst.estimate_cache_hit(token_ids) for inst in combined_instances]
best_cache_idx = max(range(len(combined_instances)), key=lambda i: cache_hits[i])
best_cache_hit = cache_hits[best_cache_idx]
# Score each instance by expected latency
best_idx = 0
best_cost = float("inf")
best_needs_push = False
costs = []
for i, inst in enumerate(combined_instances):
def _instance_cost(i: int) -> tuple[float, bool]:
"""Expected latency if this request goes to instance i."""
inst = combined_instances[i]
queue = inst.pending_prefill_tokens / throughput
local_hit = cache_hits[i]
local_new = max(0, input_length - local_hit)
local_cost = queue + local_new / throughput
if offload_enabled and best_cache_hit > 0 and i != best_cache_idx:
# This instance could receive cached blocks via PUSH
if offload_enabled and best_cache_hit > 0 and i != best_cache_idx and local_hit < best_cache_hit:
push_new = max(0, input_length - best_cache_hit)
push_cost = queue + push_new / throughput + SETTINGS.rdma_overhead_s
local_cost = queue + local_new / throughput
# Use whichever is cheaper (push vs local cache)
if push_cost < local_cost:
cost = push_cost
needs_push = True
else:
cost = local_cost
needs_push = False
else:
cost = queue + local_new / throughput
needs_push = False
return push_cost, True
return local_cost, False
costs.append((cost, needs_push))
if cost < best_cost:
best_cost = cost
best_idx = i
best_needs_push = needs_push
# Session affinity: prefer the last-used instance if its cost is reasonable
affinity_idx = session_affinity_combined.get(session_id) if session_id else None
if affinity_idx is not None and affinity_idx < len(combined_instances):
affinity_cost, affinity_push = _instance_cost(affinity_idx)
# Compare with the globally best option
all_costs = [_instance_cost(i) for i in range(len(combined_instances))]
global_best_cost = min(c for c, _ in all_costs)
# Use affinity if it's within 2x of the best option
if affinity_cost <= global_best_cost * 2.0:
best_idx = affinity_idx
best_cost = affinity_cost
best_needs_push = affinity_push
else:
best_idx = min(range(len(combined_instances)), key=lambda i: all_costs[i][0])
best_cost, best_needs_push = all_costs[best_idx]
else:
all_costs = [_instance_cost(i) for i in range(len(combined_instances))]
best_idx = min(range(len(combined_instances)), key=lambda i: all_costs[i][0])
best_cost, best_needs_push = all_costs[best_idx]
chosen = combined_instances[best_idx]
cache_hit = cache_hits[best_idx]