test(policy): Theorem 1 no-starvation property tests

Adds the algorithm-layer guarantee tests for
docs/KVC_ROUTER_ALGORITHM.md §4.1. The full Dispatch loop
lives in replay.py (HTTP + mooncake), but the policy-layer
guarantee is testable in isolation: under any reject
sequence, select() must keep returning a valid worker.

Cases:
  - select returns a valid decision even after every (s,d)
    is past τ_reject (degenerate fallback)
  - |D|·τ_reject rejects suffice to explore every D
    (cannot trap a session on one D under universal
    rejection)
  - degenerate fallback picks the least-rejected D
    (Algorithm 1 line 4)
  - per-(session, D) isolation: session A's blacklist
    does not affect session B
  - migration_reject_threshold=0 disables blacklist
  - select() does NOT silently bump the reject counter
    (the only mutator is record_admission_reject)

Adds tests/_fixtures.py with minimal make_topology() and
make_request() helpers that skip build_single_node_topology's
GPU-budget validation (irrelevant in unit tests).

Verified locally: 20/20 passing under pytest 9.0.3. The
six new tests cover only Algorithm 1's policy-layer
half of Theorem 1; the reset-on-success half lives in
Algorithm 3 (replay.py) and is a future test target.
This commit is contained in:
2026-05-12 23:55:57 +08:00
parent a785b83023
commit c5f552e122
2 changed files with 216 additions and 0 deletions

66
tests/_fixtures.py Normal file
View File

@@ -0,0 +1,66 @@
"""Lightweight fixtures for algorithm-layer tests.
Builds minimal TraceRequest / SingleNodeTopology / RoutingState instances
without invoking build_single_node_topology() (which validates GPU budgets
we don't care about in unit tests).
"""
from __future__ import annotations
from agentic_pd_hybrid.topology import SingleNodeTopology, WorkerSpec
from agentic_pd_hybrid.trace import TraceRequest
def make_topology(decode_count: int = 3, prefill_count: int = 1) -> SingleNodeTopology:
prefill_workers = tuple(
WorkerSpec(
role="prefill",
ordinal=i,
gpu_ids=(i,),
host="127.0.0.1",
port=30000 + i,
)
for i in range(prefill_count)
)
decode_workers = tuple(
WorkerSpec(
role="decode",
ordinal=i,
gpu_ids=(prefill_count + i,),
host="127.0.0.1",
port=31000 + i,
)
for i in range(decode_count)
)
return SingleNodeTopology(
model_path="/dev/null/test-model",
prefill_workers=prefill_workers,
decode_workers=decode_workers,
direct_workers=(),
router_host="127.0.0.1",
router_port=8000,
transfer_backend="mooncake",
trust_remote_code=True,
)
def make_request(
*,
session_id: str = "sess-1",
turn_id: int = 0,
hash_ids: tuple[int, ...] = (),
input_length: int = 1024,
output_length: int = 64,
) -> TraceRequest:
return TraceRequest(
request_id=f"{session_id}-t{turn_id}",
session_id=session_id,
chat_id=int(turn_id),
parent_chat_id=-1 if turn_id == 0 else int(turn_id - 1),
timestamp_s=float(turn_id),
input_length=input_length,
output_length=output_length,
request_type="user",
turn_id=turn_id,
hash_ids=hash_ids,
)

150
tests/test_no_starvation.py Normal file
View File

@@ -0,0 +1,150 @@
"""Theorem 1 — no permanent starvation under bounded retries.
Reference: docs/KVC_ROUTER_ALGORITHM.md §4.1.
For any session s with τ_reject ≥ 1, after at most |D| · τ_reject
consecutive admission rejects on s, the routing policy MUST still
return a valid decision (via the degenerate "least-rejected D"
fallback). The session cannot be permanently starved at the policy
layer.
We can't exercise the full Dispatch loop here (it lives in replay.py and
needs HTTP, mooncake, etc.). What we CAN test is the policy-layer
guarantee: after K = |D| · τ_reject reject bumps, select() never raises
and never returns a worker that's both blacklisted *and* has positive
overlap (the degenerate path chooses by least-rejected).
This is the property-layer companion to test_policy_scoring.py's
quantitative checks.
"""
from __future__ import annotations
from agentic_pd_hybrid.policies import KvAwarePolicy, RoutingState
from ._fixtures import make_request, make_topology
def test_select_returns_valid_decision_under_full_blacklist():
"""Bump all (s, d) reject counters past τ_reject. select() must still
pick a worker (degenerate fallback, no exception, no None)."""
topology = make_topology(decode_count=3)
state = RoutingState.create(topology)
request = make_request(session_id="s-stuck", turn_id=0)
policy = KvAwarePolicy(migration_reject_threshold=3)
# Pre-fill the blacklist for every D.
for worker in topology.route_workers:
for _ in range(3):
state.record_admission_reject(request.session_id, worker.worker_id)
decision = policy.select(request=request, topology=topology, state=state)
assert decision.decode_worker_id is not None
assert decision.decode_worker_id in {w.worker_id for w in topology.route_workers}
def test_bounded_retries_to_force_degenerate_path():
"""Theorem 1: at most |D| · τ_reject rejects suffice to either exhaust
every D or to force the degenerate fallback. Simulate the worst case
where each retry picks a fresh D and is immediately rejected."""
topology = make_topology(decode_count=4)
state = RoutingState.create(topology)
request = make_request(session_id="s-worst", turn_id=0)
threshold = 3
policy = KvAwarePolicy(migration_reject_threshold=threshold)
seen_decoders: set[str] = set()
max_retries = len(topology.route_workers) * threshold
for retry in range(max_retries):
decision = policy.select(request=request, topology=topology, state=state)
seen_decoders.add(decision.decode_worker_id)
# Adversary: this D rejects this session.
state.record_admission_reject(request.session_id, decision.decode_worker_id)
# After |D|·τ_reject rejects every D must be blacklisted, so the next
# select() takes the degenerate "least-rejected" branch and STILL
# returns a valid worker.
final = policy.select(request=request, topology=topology, state=state)
assert final.decode_worker_id in {w.worker_id for w in topology.route_workers}
# And we should have explored every D over the bounded retries — the
# algorithm cannot trap a session on a single D when all are rejecting.
assert seen_decoders == {w.worker_id for w in topology.route_workers}
def test_least_rejected_d_chosen_when_all_blacklisted():
"""When every D is past threshold, the degenerate fallback chooses the
one with the *fewest* rejects (Algorithm 1, line 4)."""
topology = make_topology(decode_count=3)
state = RoutingState.create(topology)
request = make_request(session_id="s-lr", turn_id=0)
policy = KvAwarePolicy(migration_reject_threshold=3)
# Skew rejections: decode-0 has 5, decode-1 has 10, decode-2 has 3.
# All are >= threshold=3, so the filter wipes out every candidate.
# The fallback should pick decode-2 (smallest rejection count).
workers = list(topology.route_workers)
bumps = {workers[0].worker_id: 5, workers[1].worker_id: 10, workers[2].worker_id: 3}
for wid, n in bumps.items():
for _ in range(n):
state.record_admission_reject(request.session_id, wid)
decision = policy.select(request=request, topology=topology, state=state)
assert decision.decode_worker_id == workers[2].worker_id
def test_other_session_unaffected_by_blacklist():
"""Algorithm 1's filter is per-(session, D), not per-D. Session A's
rejects must not influence session B's routing."""
topology = make_topology(decode_count=2)
state = RoutingState.create(topology)
policy = KvAwarePolicy(migration_reject_threshold=3)
# Blacklist decode-0 for session A.
workers = list(topology.route_workers)
for _ in range(3):
state.record_admission_reject("session-A", workers[0].worker_id)
# Session B sees a clean slate — should be able to pick decode-0
# (which is the iteration-order winner under empty state).
decision_b = policy.select(
request=make_request(session_id="session-B"),
topology=topology,
state=state,
)
# decode-0 wins iteration-order tiebreak when all scores are (0,0,0,0).
assert decision_b.decode_worker_id == workers[0].worker_id
def test_threshold_zero_disables_blacklist():
"""migration_reject_threshold=0 means the migration mechanism is off:
every D stays a candidate regardless of its reject count."""
topology = make_topology(decode_count=2)
state = RoutingState.create(topology)
request = make_request(session_id="s-no-mig")
policy = KvAwarePolicy(migration_reject_threshold=0)
workers = list(topology.route_workers)
# Pile a huge number of rejects on decode-0.
for _ in range(100):
state.record_admission_reject(request.session_id, workers[0].worker_id)
decision = policy.select(request=request, topology=topology, state=state)
# decode-0 should still be eligible; with empty overlap/sticky/inflight,
# iteration order picks decode-0 first.
assert decision.decode_worker_id == workers[0].worker_id
def test_reject_counter_only_grows_on_record():
"""RoutingState.record_admission_reject is the ONLY mutator for the
counter. select() must not silently bump it."""
topology = make_topology(decode_count=2)
state = RoutingState.create(topology)
request = make_request(session_id="s-clean")
policy = KvAwarePolicy()
for _ in range(5):
policy.select(request=request, topology=topology, state=state)
# No explicit record_admission_reject -> all counters stay zero.
assert sum(state.session_d_rejects.values()) == 0