From 32f09d32cdb953275307022cc3c2ada4e75ca728 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Fri, 22 May 2026 01:50:27 +0800 Subject: [PATCH] Balanced session-sticky routing + agentic workload pattern analysis Routing fix: new sessions placed by cumulative token load (greedy bin packing) with cache-hit tiebreak. Session affinity for turn 2+. Replayer now sends X-Session-Id header for proper session tracking. Agentic workload core patterns (GLM-5.1 trace): - 91% of reusable KV is intra-session (not cross-session) - Session-sticky routing is THE critical optimization - 36% warm requests (1.3k new tokens), 64% cold (17k+) - After cache: effective prefill/decode ratio drops from 61.5x to 28.7x - Cross-session sharing (system prompt) is only 4.8% of tokens Co-Authored-By: Claude Opus 4.6 (1M context) --- replayer/replay.py | 3 + scripts/analyze_agentic_patterns.py | 188 ++++++++++++++++++++++++++++ scripts/cache_aware_proxy.py | 44 +++++-- 3 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 scripts/analyze_agentic_patterns.py diff --git a/replayer/replay.py b/replayer/replay.py index 4f3d836..58b3fae 100644 --- a/replayer/replay.py +++ b/replayer/replay.py @@ -125,12 +125,15 @@ async def _dispatch_request( err = None token_times: list[float] = [] + req_headers = {"X-Session-Id": req.session_id} + async with sem: try: async with client.stream( "POST", f"{endpoint}/v1/completions", json=payload, + headers=req_headers, timeout=config.request_timeout_s, ) as resp: resp.raise_for_status() diff --git a/scripts/analyze_agentic_patterns.py b/scripts/analyze_agentic_patterns.py new file mode 100644 index 0000000..b74745f --- /dev/null +++ b/scripts/analyze_agentic_patterns.py @@ -0,0 +1,188 @@ +"""Analyze core agentic workload patterns that matter for PD scheduling. + +Focus: what characteristics make agentic workloads different from chatbot, +and how do they interact with PD-combined vs PD-sep architectures? +""" +import json, statistics +from collections import defaultdict, Counter + +rows = [json.loads(l) for l in open("traces/sampled_1000req_seed42.jsonl")] +rows.sort(key=lambda r: float(r["timestamp"])) + +BLOCK_SIZE = 512 + +# Build sessions +chat_to_session = {} +sessions = defaultdict(list) +for idx, r in enumerate(rows): + cid = r["chat_id"] + pid = r["parent_chat_id"] + sid = r.get("session_id", str(cid) if pid < 0 else chat_to_session.get(pid, str(pid))) + chat_to_session[cid] = str(sid) + sessions[str(sid)].append((idx, r)) + +mt = {k: v for k, v in sessions.items() if len(v) > 1} +st = {k: v for k, v in sessions.items() if len(v) == 1} + +sep = "=" * 70 +print(sep) +print(" AGENTIC WORKLOAD CORE PATTERNS") +print(sep) + +# Pattern 1: Bimodal request profile +print("\n PATTERN 1: Bimodal Request Profile") +print(" " + "-" * 40) + +# Compute per-request new tokens (simulating prefix cache) +seen = set() +warm_reqs = [] # high cache hit +cold_reqs = [] # low cache hit + +for r in rows: + hids = r.get("hash_ids", []) + hit = 0 + for hid in hids: + if hid in seen: + hit += 1 + else: + break + for hid in hids: + seen.add(hid) + + cache_ratio = (hit * BLOCK_SIZE) / r["input_length"] if r["input_length"] > 0 else 0 + new_tokens = max(0, r["input_length"] - hit * BLOCK_SIZE) + + entry = {"input": r["input_length"], "new": new_tokens, "cache": cache_ratio, + "output": r["output_length"]} + if cache_ratio > 0.5: + warm_reqs.append(entry) + else: + cold_reqs.append(entry) + +print(" Warm (cache>50%%): %d reqs (%.0f%%)" % (len(warm_reqs), len(warm_reqs)*100/len(rows))) +print(" Cold (cache<=50%%): %d reqs (%.0f%%)" % (len(cold_reqs), len(cold_reqs)*100/len(rows))) + +warm_new = sorted([r["new"] for r in warm_reqs]) +cold_new = sorted([r["new"] for r in cold_reqs]) +p = lambda v, q: v[min(int(q*len(v)), len(v)-1)] if v else 0 + +print(" Warm new_tokens: p50=%d p90=%d" % (p(warm_new,.5), p(warm_new,.9))) +print(" Cold new_tokens: p50=%d p90=%d" % (p(cold_new,.5), p(cold_new,.9))) + +warm_out = sorted([r["output"] for r in warm_reqs]) +cold_out = sorted([r["output"] for r in cold_reqs]) +print(" Warm output: p50=%d p90=%d" % (p(warm_out,.5), p(warm_out,.9))) +print(" Cold output: p50=%d p90=%d" % (p(cold_out,.5), p(cold_out,.9))) + +# Pattern 2: Multi-turn session structure +print("\n PATTERN 2: Multi-Turn Session Lifecycle") +print(" " + "-" * 40) +print(" Sessions: %d total, %d multi-turn (%.0f%%)" % ( + len(sessions), len(mt), len(mt)*100/len(sessions))) + +# Per-session: KV growth across turns +for sid in sorted(mt.keys(), key=lambda s: -len(mt[s]))[:5]: + turns = mt[sid] + turns.sort(key=lambda x: x[0]) + print(" Session %s (%d turns):" % (sid[:8], len(turns))) + for req_idx, r in turns[:5]: + print(" turn %d: input=%d output=%d blocks=%d" % ( + r.get("turn", 0), r["input_length"], r["output_length"], len(r.get("hash_ids", [])))) + if len(turns) > 5: + print(" ... (%d more turns)" % (len(turns) - 5)) + +# Pattern 3: Arrival burstiness +print("\n PATTERN 3: Arrival Pattern and Concurrency") +print(" " + "-" * 40) + +timestamps = [float(r["timestamp"]) for r in rows] +inter_arrivals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)] +inter_arrivals.sort() +print(" Inter-arrival time (s): p50=%.2f p90=%.2f" % (p(inter_arrivals,.5), p(inter_arrivals,.9))) + +# Simulate concurrency at different time scales +for window_s in [1, 5, 10, 30]: + max_concurrent = 0 + for i, ts in enumerate(timestamps): + concurrent = sum(1 for t in timestamps if ts <= t < ts + window_s) + max_concurrent = max(max_concurrent, concurrent) + print(" Max concurrent in %ds window: %d" % (window_s, max_concurrent)) + +# Pattern 4: Prefill-decode compute ratio +print("\n PATTERN 4: Compute Asymmetry") +print(" " + "-" * 40) + +total_input = sum(r["input_length"] for r in rows) +total_output = sum(r["output_length"] for r in rows) +total_new = sum(r["new"] for r in warm_reqs + cold_reqs) + +print(" Total input tokens: %s" % "{:,}".format(total_input)) +print(" Total output tokens: %s" % "{:,}".format(total_output)) +print(" Total new tokens (after cache): %s" % "{:,}".format(total_new)) +print(" I/O ratio: %.1fx" % (total_input / max(total_output, 1))) +print(" New/O ratio (actual prefill/decode): %.1fx" % (total_new / max(total_output, 1))) +print(" Prefill reduction from cache: %.0f%%" % ((1 - total_new/total_input) * 100)) + +# Pattern 5: Session KV reuse potential +print("\n PATTERN 5: Where KV Reuse Comes From") +print(" " + "-" * 40) + +# Decompose: intra-session reuse vs cross-session reuse +intra_session_reuse = 0 +cross_session_reuse = 0 +no_reuse = 0 + +session_seen = defaultdict(set) +global_seen = set() +for r in rows: + hids = r.get("hash_ids", []) + cid = r["chat_id"] + pid = r["parent_chat_id"] + sid = r.get("session_id", str(cid) if pid < 0 else chat_to_session.get(pid, str(pid))) + + for hid in hids: + if hid in session_seen[sid]: + intra_session_reuse += BLOCK_SIZE + elif hid in global_seen: + cross_session_reuse += BLOCK_SIZE + else: + no_reuse += BLOCK_SIZE + session_seen[sid].add(hid) + global_seen.add(hid) + +total = intra_session_reuse + cross_session_reuse + no_reuse +print(" Intra-session (multi-turn KV reuse): %s tokens (%.1f%%)" % ( + "{:,}".format(intra_session_reuse), intra_session_reuse*100/total)) +print(" Cross-session (shared prefix/system prompt): %s tokens (%.1f%%)" % ( + "{:,}".format(cross_session_reuse), cross_session_reuse*100/total)) +print(" New (no reuse possible): %s tokens (%.1f%%)" % ( + "{:,}".format(no_reuse), no_reuse*100/total)) + +# Pattern 6: Implications for PD design +print("\n" + sep) +print(" IMPLICATIONS FOR PD DESIGN") +print(sep) +print(""" + 1. BIMODAL PREFILL: 36%% of requests are warm (1.3k new tokens), 64%% cold (17k+). + -> One-size-fits-all PD strategy suboptimal. Warm requests don't need P isolation. + + 2. MULTI-TURN DOMINATES REUSE: %.1f%% of reusable KV is intra-session. + -> Session-sticky routing is critical. Breaking session affinity destroys APC. + + 3. HIGH I/O RATIO (%.1fx), but after cache: %.1fx actual prefill/decode. + -> Cache dramatically reduces effective prefill compute. + -> PD separation's benefit (isolate prefill compute) is reduced by cache. + + 4. SHORT INTER-TURN GAP (p50=2 req): multi-turn KV stays warm in LRU naturally. + -> No special eviction policy needed IF routing is balanced. + + 5. CROSS-SESSION SHARING IS SMALL (%.1f%% of total tokens). + -> System prompt sharing helps APC but is not the main source of reuse. + -> Intra-session reuse (%.1f%%) is the dominant pattern. +""" % ( + intra_session_reuse * 100 / (intra_session_reuse + cross_session_reuse), + total_input / max(total_output, 1), + total_new / max(total_output, 1), + cross_session_reuse * 100 / total, + intra_session_reuse * 100 / total, +)) diff --git a/scripts/cache_aware_proxy.py b/scripts/cache_aware_proxy.py index 4583849..e225ef7 100644 --- a/scripts/cache_aware_proxy.py +++ b/scripts/cache_aware_proxy.py @@ -63,25 +63,51 @@ class InstanceState: self.cached_blocks = set(list(self.cached_blocks)[-100000:]) +# Cumulative token load per instance (for balanced session placement) +_inst_cumulative_tokens: list[int] = [] + + def pick_instance(instances: list[InstanceState], token_ids: list[int] | None, session_id: str | None, input_length: int, affinity: dict[str, int]) -> tuple[InstanceState, int]: - """Normalized load - cache bonus scoring.""" + """Session-sticky + KV-size balanced placement. + + Turn 2+: session affinity (sticky to same instance for KV reuse). + Turn 1 (new session): place on instance with least cumulative token load + (greedy bin packing), with cache-hit tiebreak. + """ + global _inst_cumulative_tokens + if not _inst_cumulative_tokens: + _inst_cumulative_tokens = [0] * len(instances) + + # Session affinity for turn 2+ if session_id and session_id in affinity: idx = affinity[session_id] if idx < len(instances): return instances[idx], idx - avg_load = max(sum(i.ongoing_tokens for i in instances) / len(instances), 1.0) - best_idx, best_score = 0, float("inf") - for i, inst in enumerate(instances): - cache_hit = inst.estimate_cache_hit(token_ids) - cache_ratio = cache_hit / input_length if input_length > 0 else 0.0 - score = inst.ongoing_tokens / avg_load - CACHE_HIT_ALPHA * cache_ratio - if score < best_score: - best_score = score + # New session: balanced placement + # Primary: least cumulative tokens (long-term balance) + # Secondary: cache hit (tiebreak for prefix reuse) + min_load = min(_inst_cumulative_tokens) + # Candidates within 10% of min load + threshold = min_load + max(min_load * 0.1, 10000) + candidates = [i for i in range(len(instances)) + if _inst_cumulative_tokens[i] <= threshold] + + if not candidates: + candidates = list(range(len(instances))) + + # Among candidates, pick best cache hit + best_idx = candidates[0] + best_hit = 0 + for i in candidates: + hit = instances[i].estimate_cache_hit(token_ids) + if hit > best_hit: + best_hit = hit best_idx = i + _inst_cumulative_tokens[best_idx] += input_length if session_id: affinity[session_id] = best_idx return instances[best_idx], best_idx