- tests/test_metrics.py asserts the new linear-interp _percentile against hand-computed expected values (single value, two-value interpolation, endpoints, numpy-equivalent linear default, on-integer rank). - tests/test_proxy_pick.py exercises InstanceState LRU eviction and move-to-end on hit, plus session-affinity stickiness, the overload fallback, the active_p_offloads penalty, and lmetric scoring. The proxy is loaded by file path with stub fastapi/uvicorn/httpx modules so the suite runs without the FastAPI server deps installed. - pyproject.toml gets a hatchling wheel target and a [tool.pytest] section so `uv run --extra dev pytest` works out of the box.
181 lines
6.2 KiB
Python
181 lines
6.2 KiB
Python
"""Minimal coverage for scripts/cache_aware_proxy pick_instance + cache LRU (S1)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import sys
|
|
import types
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
PROXY_PATH = Path(__file__).resolve().parent.parent / "scripts" / "cache_aware_proxy.py"
|
|
|
|
|
|
def _install_stub_modules() -> None:
|
|
"""Provide minimal stand-ins for fastapi/uvicorn/httpx so the proxy
|
|
module imports cleanly without the full server deps."""
|
|
if "uvicorn" not in sys.modules:
|
|
sys.modules["uvicorn"] = types.ModuleType("uvicorn")
|
|
|
|
if "fastapi" not in sys.modules:
|
|
fastapi_mod = types.ModuleType("fastapi")
|
|
|
|
class _FastAPI:
|
|
def __init__(self, *a, **kw):
|
|
self.state = types.SimpleNamespace()
|
|
|
|
def post(self, *a, **kw):
|
|
def deco(fn): return fn
|
|
return deco
|
|
|
|
def get(self, *a, **kw):
|
|
def deco(fn): return fn
|
|
return deco
|
|
|
|
class _HTTPException(Exception):
|
|
def __init__(self, status_code=500, detail=""):
|
|
self.status_code = status_code
|
|
self.detail = detail
|
|
|
|
class _Request: # not actually instantiated by the routing tests
|
|
pass
|
|
|
|
fastapi_mod.FastAPI = _FastAPI
|
|
fastapi_mod.HTTPException = _HTTPException
|
|
fastapi_mod.Request = _Request
|
|
sys.modules["fastapi"] = fastapi_mod
|
|
|
|
responses_mod = types.ModuleType("fastapi.responses")
|
|
|
|
class _StreamingResponse:
|
|
def __init__(self, *a, **kw): pass
|
|
|
|
responses_mod.StreamingResponse = _StreamingResponse
|
|
sys.modules["fastapi.responses"] = responses_mod
|
|
|
|
if "httpx" not in sys.modules:
|
|
httpx_mod = types.ModuleType("httpx")
|
|
|
|
class _AsyncClient:
|
|
def __init__(self, *a, **kw): pass
|
|
async def aclose(self): pass
|
|
|
|
class _Limits:
|
|
def __init__(self, *a, **kw): pass
|
|
|
|
httpx_mod.AsyncClient = _AsyncClient
|
|
httpx_mod.Limits = _Limits
|
|
sys.modules["httpx"] = httpx_mod
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def proxy():
|
|
_install_stub_modules()
|
|
spec = importlib.util.spec_from_file_location("cache_aware_proxy", PROXY_PATH)
|
|
if spec is None or spec.loader is None:
|
|
pytest.skip(f"cannot load proxy module at {PROXY_PATH}")
|
|
mod = importlib.util.module_from_spec(spec)
|
|
sys.modules["cache_aware_proxy"] = mod
|
|
try:
|
|
spec.loader.exec_module(mod)
|
|
except ModuleNotFoundError as exc:
|
|
pytest.skip(f"proxy dependency missing: {exc}")
|
|
return mod
|
|
|
|
|
|
def _make_inst(proxy, url: str, ongoing_tokens: int = 0,
|
|
active_p_offloads: int = 0):
|
|
inst = proxy.InstanceState(url)
|
|
inst.ongoing_tokens = ongoing_tokens
|
|
inst.active_p_offloads = active_p_offloads
|
|
return inst
|
|
|
|
|
|
def test_record_prefix_evicts_oldest_block(proxy):
|
|
"""LRU bound on cached_blocks must evict the oldest entry once full."""
|
|
inst = proxy.InstanceState("http://x")
|
|
saved = proxy.CACHE_CAPACITY_BLOCKS
|
|
proxy.CACHE_CAPACITY_BLOCKS = 2
|
|
try:
|
|
block_size = proxy.BLOCK_SIZE
|
|
# Three distinct one-block prefixes; first must be evicted.
|
|
prefix_a = [1] * block_size
|
|
prefix_b = [2] * block_size
|
|
prefix_c = [3] * block_size
|
|
inst.record_prefix(prefix_a)
|
|
inst.record_prefix(prefix_b)
|
|
inst.record_prefix(prefix_c)
|
|
assert len(inst.cached_blocks) == 2
|
|
# A should have been evicted.
|
|
assert inst.estimate_cache_hit(prefix_a) == 0
|
|
assert inst.estimate_cache_hit(prefix_b) == block_size
|
|
assert inst.estimate_cache_hit(prefix_c) == block_size
|
|
finally:
|
|
proxy.CACHE_CAPACITY_BLOCKS = saved
|
|
|
|
|
|
def test_estimate_cache_hit_touches_lru(proxy):
|
|
"""A cache hit must move the block to the MRU position."""
|
|
inst = proxy.InstanceState("http://x")
|
|
saved = proxy.CACHE_CAPACITY_BLOCKS
|
|
proxy.CACHE_CAPACITY_BLOCKS = 2
|
|
try:
|
|
block_size = proxy.BLOCK_SIZE
|
|
a = [1] * block_size
|
|
b = [2] * block_size
|
|
c = [3] * block_size
|
|
inst.record_prefix(a)
|
|
inst.record_prefix(b)
|
|
# Touch A so it becomes MRU; B is now LRU.
|
|
assert inst.estimate_cache_hit(a) == block_size
|
|
# Insert C: B should be evicted, A should remain.
|
|
inst.record_prefix(c)
|
|
assert inst.estimate_cache_hit(a) == block_size
|
|
assert inst.estimate_cache_hit(b) == 0
|
|
finally:
|
|
proxy.CACHE_CAPACITY_BLOCKS = saved
|
|
|
|
|
|
def test_pick_instance_session_affinity_sticks(proxy):
|
|
insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")]
|
|
affinity = {"sess1": 1}
|
|
chosen, idx = proxy.pick_instance(insts, None, "sess1", 1000, affinity)
|
|
assert idx == 1 and chosen is insts[1]
|
|
|
|
|
|
def test_pick_instance_session_affinity_breaks_on_overload(proxy):
|
|
"""When the pinned instance is heavily overloaded, fallback to load-aware pick."""
|
|
insts = [
|
|
_make_inst(proxy, "http://a", ongoing_tokens=100),
|
|
_make_inst(proxy, "http://b", ongoing_tokens=1_000_000),
|
|
_make_inst(proxy, "http://c", ongoing_tokens=100),
|
|
]
|
|
affinity = {"sess1": 1}
|
|
chosen, idx = proxy.pick_instance(insts, None, "sess1", 1000, affinity)
|
|
# avg ~333k; B at 1M is ~3x avg, well above OVERLOAD_FACTOR=2.0 -> fallback.
|
|
assert idx != 1
|
|
assert chosen is not insts[1]
|
|
|
|
|
|
def test_pick_instance_p_offload_penalty_steers_away(proxy):
|
|
"""Instances actively running offloaded HEAVY prefills get penalized."""
|
|
insts = [
|
|
_make_inst(proxy, "http://a", ongoing_tokens=0, active_p_offloads=2),
|
|
_make_inst(proxy, "http://b", ongoing_tokens=1000),
|
|
]
|
|
chosen, idx = proxy.pick_instance(insts, None, None, 5000, {})
|
|
# B's 1000-token load is much smaller than A's 2 * HEAVY_THRESHOLD penalty.
|
|
assert idx == 1 and chosen is insts[1]
|
|
|
|
|
|
def test_pick_instance_lmetric_picks_lowest_score(proxy):
|
|
insts = [_make_inst(proxy, "http://a"), _make_inst(proxy, "http://b")]
|
|
insts[0].pending_prefill_tokens = 0
|
|
insts[0].num_requests = 0
|
|
insts[1].pending_prefill_tokens = 5000
|
|
insts[1].num_requests = 4
|
|
chosen, idx = proxy.pick_instance_lmetric(insts, None, None, 1000, {})
|
|
# Empty instance has score = 1000 * 0 = 0; busy one has (5000+1000)*4.
|
|
assert idx == 0 and chosen is insts[0]
|