New replayer/srr.py drives a Poisson session-arrival load against the existing proxy, with strict per-session turn sequentiality, explicit warmup/steady/drain windows, and per-arrival fresh session_id + request_id so APC/session-affinity counters are not contaminated by repeated draws from the trace pool. Writes window_summary.json with attempted/completed/errored split by window so latency tails can be read on the steady-state window only. Required by Batch 4 SRR sweep; trace-timestamp dispatch in replay.py cannot drive arrival rate independently. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
101 lines
3.3 KiB
Python
101 lines
3.3 KiB
Python
"""Tests for A4 SRR loadgen helpers (no network I/O)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import random
|
|
|
|
from replayer.srr import (
|
|
_build_session_pool,
|
|
_clone_session_for_arrival,
|
|
_window_for,
|
|
)
|
|
from replayer.trace import TraceRequest
|
|
|
|
|
|
def _mk_req(session_id: str, turn: int, chat_id: int, ts: float) -> TraceRequest:
|
|
return TraceRequest(
|
|
request_id=f"{session_id}:{turn}:{chat_id}:{turn}",
|
|
session_id=session_id,
|
|
chat_id=chat_id,
|
|
parent_chat_id=-1 if turn == 0 else chat_id - 1,
|
|
timestamp_s=ts,
|
|
input_length=100,
|
|
output_length=10,
|
|
request_type="user",
|
|
turn_id=turn,
|
|
hash_ids=(chat_id,),
|
|
)
|
|
|
|
|
|
def test_build_session_pool_groups_and_orders_turns():
|
|
reqs = [
|
|
_mk_req("s1", 1, 11, 2.0),
|
|
_mk_req("s2", 0, 20, 1.5),
|
|
_mk_req("s1", 0, 10, 1.0),
|
|
]
|
|
pool = _build_session_pool(reqs, pool_cap=None)
|
|
assert len(pool) == 2
|
|
by_sid = {turns[0].session_id: turns for turns in pool}
|
|
assert [t.turn_id for t in by_sid["s1"]] == [0, 1]
|
|
assert [t.turn_id for t in by_sid["s2"]] == [0]
|
|
|
|
|
|
def test_build_session_pool_honors_pool_cap():
|
|
reqs = [_mk_req(f"s{i}", 0, i, float(i)) for i in range(5)]
|
|
pool = _build_session_pool(reqs, pool_cap=2)
|
|
assert len(pool) == 2
|
|
|
|
|
|
def test_window_for_classifies_correctly():
|
|
warmup_end = 100.0
|
|
steady_end = 400.0
|
|
assert _window_for(50.0, warmup_end, steady_end) == "warmup"
|
|
assert _window_for(100.0, warmup_end, steady_end) == "steady"
|
|
assert _window_for(399.999, warmup_end, steady_end) == "steady"
|
|
assert _window_for(400.0, warmup_end, steady_end) == "drain"
|
|
assert _window_for(500.0, warmup_end, steady_end) == "drain"
|
|
|
|
|
|
def test_clone_session_uses_fresh_ids_so_arrivals_do_not_alias():
|
|
template = [_mk_req("orig", 0, 100, 1.0), _mk_req("orig", 1, 101, 2.0)]
|
|
clone_a = _clone_session_for_arrival(template, arrival_idx=7)
|
|
clone_b = _clone_session_for_arrival(template, arrival_idx=8)
|
|
|
|
for c in (clone_a, clone_b):
|
|
assert c[0].session_id == c[1].session_id # within an arrival
|
|
assert c[0].session_id.startswith("srr")
|
|
assert c[0].request_id != template[0].request_id
|
|
|
|
assert clone_a[0].session_id != clone_b[0].session_id
|
|
assert clone_a[0].request_id != clone_b[0].request_id
|
|
|
|
|
|
def test_clone_session_preserves_token_payload_fields():
|
|
template = [_mk_req("orig", 0, 100, 1.0)]
|
|
template = [TraceRequest(
|
|
request_id=template[0].request_id,
|
|
session_id=template[0].session_id,
|
|
chat_id=template[0].chat_id,
|
|
parent_chat_id=template[0].parent_chat_id,
|
|
timestamp_s=template[0].timestamp_s,
|
|
input_length=4000,
|
|
output_length=300,
|
|
request_type="user",
|
|
turn_id=0,
|
|
hash_ids=(1, 2, 3, 4, 5),
|
|
)]
|
|
cloned = _clone_session_for_arrival(template, arrival_idx=1)
|
|
assert cloned[0].input_length == 4000
|
|
assert cloned[0].output_length == 300
|
|
assert cloned[0].hash_ids == (1, 2, 3, 4, 5)
|
|
assert cloned[0].turn_id == 0
|
|
|
|
|
|
def test_poisson_inter_arrival_mean_matches_rate():
|
|
"""Sanity check on the exponential RNG used for arrivals."""
|
|
rng = random.Random(0)
|
|
rate = 5.0 # 5 sess/s -> mean inter-arrival ~ 0.2 s
|
|
samples = [rng.expovariate(rate) for _ in range(20000)]
|
|
mean = sum(samples) / len(samples)
|
|
assert abs(mean - 1.0 / rate) < 0.01
|