From 19c443e3bc0287c248bc26c67a7c3c67f3cb4023 Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Mon, 1 Jun 2026 01:03:40 +0800 Subject: [PATCH] paper f2a: reuse-topology decomposition + mixture-sensitivity sweep Full-trace analysis backing figure 2a on the real 2h cluster trace: - f2a_reuse_topology_analyze.py: infinite-KV-cache (LRU) decomposition of prefix-cache reuse hits into intra-session vs cross-session, by most-recent prior holder of each content-addressed block. - f2a_mixture_sweep.py: sensitivity of the intra/cross split to the single-turn session fraction (tests whether the 93%-intra sample vs 54.6% full-trace gap is session-mixture selection bias) -- keep all multi-turn sessions, downsample single-turn to each target fraction, reclassify. Includes the result JSONs for both. Co-Authored-By: Claude Opus 4.8 --- paper/data/f2a_mixture_sweep.py | 120 ++++++++++++++ paper/data/f2a_mixture_sweep_result.json | 84 ++++++++++ paper/data/f2a_reuse_topology_analyze.py | 182 ++++++++++++++++++++++ paper/data/f2a_reuse_topology_result.json | 77 +++++++++ 4 files changed, 463 insertions(+) create mode 100644 paper/data/f2a_mixture_sweep.py create mode 100644 paper/data/f2a_mixture_sweep_result.json create mode 100644 paper/data/f2a_reuse_topology_analyze.py create mode 100644 paper/data/f2a_reuse_topology_result.json diff --git a/paper/data/f2a_mixture_sweep.py b/paper/data/f2a_mixture_sweep.py new file mode 100644 index 0000000..7ddd946 --- /dev/null +++ b/paper/data/f2a_mixture_sweep.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +f2a sensitivity: how does the intra/cross reuse split move as we change the +single-turn session fraction? (Tests whether the old 93%-intra sample vs 54.6% +full-trace gap is just session-mixture selection bias.) + +Keep ALL multi-turn sessions; downsample single-turn sessions to hit each target +single-turn fraction f. Re-run the LRU (last-touched), reuse-hits-only +classification on the filtered request stream. + + python3 f2a_mixture_sweep.py ~/ali-trace/.../051315-051317.jsonl /tmp/f2a_sweep.json +""" +import sys, json, time, random +from collections import Counter, defaultdict + +PATH = sys.argv[1] +OUT = sys.argv[2] if len(sys.argv) > 2 else "/tmp/f2a_sweep.json" +random.seed(0) + +t0 = time.time() +chat_parent = {} +records = [] +with open(PATH) as f: + for line in f: + d = json.loads(line) + cid = d["chat_id"]; pc = d.get("parent_chat_id") + chat_parent[cid] = 0 if pc is None else pc + records.append((d.get("timestamp", 0.0), cid, d.get("hash_ids") or [])) +sys.stderr.write(f"[{time.time()-t0:.0f}s] loaded {len(records)}\n") + +root_cache = {} +def resolve_root(cid): + chain = []; cur = cid + while True: + if cur in root_cache: + r = root_cache[cur]; break + p = chat_parent.get(cur, 0) + if p == 0 or p not in chat_parent: + r = cur; break + chain.append(cur); cur = p + if len(chain) > 100000: + r = cur; break + for nd in chain: + root_cache[nd] = r + root_cache[cid] = r + return r + +records.sort(key=lambda x: x[0]) +roots = [resolve_root(cid) for _, cid, _ in records] +req_per_root = Counter(roots) +single_roots = [r for r, c in req_per_root.items() if c == 1] +multi_roots = [r for r, c in req_per_root.items() if c >= 2] +M = len(multi_roots) +sys.stderr.write(f"[{time.time()-t0:.0f}s] roots: single={len(single_roots)} multi={M}\n") + +GAP_EDGES = [1, 10, 60, 300, 1800, 3600, float("inf")] +def gbucket(g): + for i, e in enumerate(GAP_EDGES): + if g < e: + return i + return len(GAP_EDGES) - 1 + +def classify(kept): # kept=None -> keep all + last_root = {}; last_ts = {} + intra = cross = new = 0 + rec_i = [0] * len(GAP_EDGES); rec_c = [0] * len(GAP_EDGES) + for (ts, cid, hs), r in zip(records, roots): + if kept is not None and r not in kept: + continue + for h in hs: + lr = last_root.get(h) + if lr is None: + new += 1 + else: + gb = gbucket(max(0.0, ts - last_ts[h])) + if lr == r: + intra += 1; rec_i[gb] += 1 + else: + cross += 1; rec_c[gb] += 1 + last_root[h] = r; last_ts[h] = ts + return intra, cross, new, rec_i, rec_c + +def cum_le(rec, idx): # cumulative fraction with gap-bucket <= idx + tot = sum(rec) or 1 + return sum(rec[: idx + 1]) / tot + +targets = [("full", None), (0.75, None), (0.50, None), + (0.25, None), (0.10, None), (0.00, None)] +rows = [] +for label, _ in targets: + if label == "full": + kept = None + f_actual = len(single_roots) / (len(single_roots) + M) + else: + f = float(label) + S = min(len(single_roots), int(round(M * f / (1 - f)))) if f < 1 else len(single_roots) + keep_single = set(random.sample(single_roots, S)) if S < len(single_roots) else set(single_roots) + kept = set(multi_roots) | keep_single + f_actual = S / (S + M) + intra, cross, new, rec_i, rec_c = classify(kept) + reuse = intra + cross + n_sess = (len(single_roots) + M) if kept is None else len(kept) + row = { + "target": label, "single_turn_frac": round(f_actual, 4), "n_sessions": n_sess, + "new": new, "intra": intra, "cross": cross, "reuse": reuse, + "intra_frac_of_reuse": round(intra / reuse, 4), + "cross_frac_of_reuse": round(cross / reuse, 4), + "intra_le60s": round(cum_le(rec_i, 2), 4), + "cross_le60s": round(cum_le(rec_c, 2), 4), + } + rows.append(row) + sys.stderr.write(f"[{time.time()-t0:.0f}s] f={row['single_turn_frac']}: " + f"intra={row['intra_frac_of_reuse']} cross={row['cross_frac_of_reuse']}\n") + +json.dump({"rows": rows, "n_single": len(single_roots), "n_multi": M}, open(OUT, "w"), indent=2) +print(f"{'single-turn%':>12} {'sessions':>10} {'intra%':>8} {'cross%':>8} {'intra<=60s':>11} {'cross<=60s':>11}") +for r in rows: + print(f"{r['single_turn_frac']*100:>11.1f}% {r['n_sessions']:>10} " + f"{r['intra_frac_of_reuse']*100:>7.1f}% {r['cross_frac_of_reuse']*100:>7.1f}% " + f"{r['intra_le60s']*100:>10.1f}% {r['cross_le60s']*100:>10.1f}%") diff --git a/paper/data/f2a_mixture_sweep_result.json b/paper/data/f2a_mixture_sweep_result.json new file mode 100644 index 0000000..071d0b4 --- /dev/null +++ b/paper/data/f2a_mixture_sweep_result.json @@ -0,0 +1,84 @@ +{ + "rows": [ + { + "target": "full", + "single_turn_frac": 0.9026, + "n_sessions": 1307276, + "new": 20650883, + "intra": 65166144, + "cross": 54134925, + "reuse": 119301069, + "intra_frac_of_reuse": 0.5462, + "cross_frac_of_reuse": 0.4538, + "intra_le60s": 0.8865, + "cross_le60s": 0.8706 + }, + { + "target": 0.75, + "single_turn_frac": 0.75, + "n_sessions": 509144, + "new": 15446415, + "intra": 66081759, + "cross": 26932604, + "reuse": 93014363, + "intra_frac_of_reuse": 0.7104, + "cross_frac_of_reuse": 0.2896, + "intra_le60s": 0.8844, + "cross_le60s": 0.8568 + }, + { + "target": 0.5, + "single_turn_frac": 0.5, + "n_sessions": 254572, + "new": 12843712, + "intra": 66548474, + "cross": 18990485, + "reuse": 85538959, + "intra_frac_of_reuse": 0.778, + "cross_frac_of_reuse": 0.222, + "intra_le60s": 0.8832, + "cross_le60s": 0.8881 + }, + { + "target": 0.25, + "single_turn_frac": 0.25, + "n_sessions": 169715, + "new": 11553493, + "intra": 66732961, + "cross": 16726772, + "reuse": 83459733, + "intra_frac_of_reuse": 0.7996, + "cross_frac_of_reuse": 0.2004, + "intra_le60s": 0.8827, + "cross_le60s": 0.9087 + }, + { + "target": 0.1, + "single_turn_frac": 0.1, + "n_sessions": 141429, + "new": 11036894, + "intra": 66798704, + "cross": 16084035, + "reuse": 82882739, + "intra_frac_of_reuse": 0.8059, + "cross_frac_of_reuse": 0.1941, + "intra_le60s": 0.8826, + "cross_le60s": 0.9152 + }, + { + "target": 0.0, + "single_turn_frac": 0.0, + "n_sessions": 127286, + "new": 10724167, + "intra": 66834552, + "cross": 15799085, + "reuse": 82633637, + "intra_frac_of_reuse": 0.8088, + "cross_frac_of_reuse": 0.1912, + "intra_le60s": 0.8825, + "cross_le60s": 0.9184 + } + ], + "n_single": 1179990, + "n_multi": 127286 +} \ No newline at end of file diff --git a/paper/data/f2a_reuse_topology_analyze.py b/paper/data/f2a_reuse_topology_analyze.py new file mode 100644 index 0000000..2c09383 --- /dev/null +++ b/paper/data/f2a_reuse_topology_analyze.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +""" +f2a reuse topology — full-trace, infinite-KV-cache decomposition (LRU semantics). + +Question: on the real 2h cluster trace, assuming an *infinite* KV cache (nothing +ever evicted), where do prefix-cache REUSE HITS come from? + +We classify only reuse hits (the 1st occurrence of a block is `new` = irreducible +prefill; it is reported only as context for the APC ceiling, not in the split). + +A block (content-addressed `hash_id`) processed in timestamp order. For each hit we +look at the block's **most recent prior holder** (last computed OR used = LRU): + + intra : last touch was the SAME session (parent_chat_id chain) + cross : last touch was a DIFFERENT session + +After classifying, the block's last-holder / last-time are updated to the current +request (LRU refresh). The reuse "recency" is the **LRU reuse distance** = time since +the block was last touched (what a finite TTL/LRU cache would need to retain). + +`cross` is further resolved by *block popularity* = number of distinct sessions that +ever touch the block: a handful of hugely-popular blocks are the shared system/tool +prefix; low-popularity cross blocks are genuine cross-session content. + +Run on dash2 (trace lives there): + python3 f2a_reuse_topology_analyze.py \ + ~/ali-trace/trace-glm5.1-formatted/051315-051317.jsonl /tmp/f2a_result.json +""" +import sys, json, time +from collections import defaultdict + +PATH = sys.argv[1] +OUT = sys.argv[2] if len(sys.argv) > 2 else "/tmp/f2a_result.json" +POP_CAP = 4096 # cap per-block root set; >= this is "very shared", buckets unaffected + +t0 = time.time() +chat_parent = {} +records = [] # (ts, chat_id, hash_ids) +total_input_tokens = 0 +total_blocks = 0 +turn1 = 0 +n = 0 +with open(PATH) as f: + for line in f: + d = json.loads(line) + cid = d["chat_id"] + pc = d.get("parent_chat_id") + chat_parent[cid] = 0 if pc is None else pc + hs = d.get("hash_ids") or [] + records.append((d.get("timestamp", 0.0), cid, hs)) + total_input_tokens += d.get("input_length", 0) or 0 + total_blocks += len(hs) + if (d.get("turn", 1) or 1) == 1: + turn1 += 1 + n += 1 +sys.stderr.write(f"[{time.time()-t0:.0f}s] loaded {n} reqs, {total_blocks} block-occ\n") + +# resolve session root by following parent_chat_id to turn-1 / out-of-window head +root_cache = {} +def resolve_root(cid): + chain = [] + cur = cid + while True: + if cur in root_cache: + r = root_cache[cur]; break + p = chat_parent.get(cur, 0) + if p == 0 or p not in chat_parent: + r = cur; break + chain.append(cur); cur = p + if len(chain) > 100000: + r = cur; break + for nd in chain: + root_cache[nd] = r + root_cache[cid] = r + return r + +records.sort(key=lambda r: r[0]) +sys.stderr.write(f"[{time.time()-t0:.0f}s] sorted by ts\n") + +last_root = {} # block -> root of MOST RECENT holder (LRU) +last_ts = {} # block -> ts of most recent touch (LRU) +roots_of = defaultdict(set) # block -> set of distinct roots (capped) = popularity +intra_cnt = defaultdict(int) # block -> intra reuse hits +cross_cnt = defaultdict(int) # block -> cross reuse hits +new = intra = cross = 0 + +# LRU reuse distance of each hit: gap = consumer_ts - last_touch_ts +GAP_EDGES = [1, 10, 60, 300, 1800, 3600, float("inf")] # seconds +GAP_LABELS = ["<1s", "1-10s", "10-60s", "1-5min", "5-30min", "30-60min", ">60min"] +rec_intra = [0] * len(GAP_EDGES) +rec_cross = [0] * len(GAP_EDGES) +def gap_bucket(g): + for i, e in enumerate(GAP_EDGES): + if g < e: + return i + return len(GAP_EDGES) - 1 + +for ts, cid, hs in records: + if not hs: + continue + r = resolve_root(cid) + for h in hs: + lr = last_root.get(h) + if lr is None: + new += 1 # first compute: not a hit + else: + gb = gap_bucket(max(0.0, ts - last_ts[h])) + if lr == r: + intra += 1; intra_cnt[h] += 1; rec_intra[gb] += 1 + else: + cross += 1; cross_cnt[h] += 1; rec_cross[gb] += 1 + last_root[h] = r # LRU refresh: now held by current session + last_ts[h] = ts + s = roots_of[h] + if len(s) < POP_CAP: + s.add(r) +sys.stderr.write(f"[{time.time()-t0:.0f}s] classified: new={new} intra={intra} cross={cross}\n") + +# popularity buckets: distinct sessions touching a block +POP_EDGES = [2, 10, 100, 1000, float("inf")] +POP_LABELS = ["1 (private)", "2-9", "10-99", "100-999", ">=1000"] +def pop_bucket(p): + if p <= 1: + return 0 + for i, e in enumerate(POP_EDGES[1:], start=1): + if p < e: + return i + return len(POP_LABELS) - 1 +pop_blocks = [0] * len(POP_LABELS) +pop_intra = [0] * len(POP_LABELS) +pop_cross = [0] * len(POP_LABELS) +for h in last_root: + p = len(roots_of[h]) + b = pop_bucket(p) + pop_blocks[b] += 1 + pop_intra[b] += intra_cnt.get(h, 0) + pop_cross[b] += cross_cnt.get(h, 0) + +eff_blk = total_input_tokens / total_blocks if total_blocks else 0.0 +total_occ = new + intra + cross +reuse = intra + cross +result = { + "trace": PATH, + "semantics": "LRU last-touched; reuse-hits only (new excluded from split)", + "n_requests": n, + "n_sessions": len(set(resolve_root(c) for c in chat_parent)), + "turn1_frac": turn1 / n, + "block_size_tokens_eff": eff_blk, + "total_input_tokens": total_input_tokens, + "total_block_occ": total_occ, + "distinct_blocks": len(last_root), + "new_occ": new, # context only + "apc_ceiling": reuse / total_occ, # context only + # REUSE-ONLY decomposition (the headline) + "reuse_total": reuse, + "reuse": {"intra": intra, "cross": cross}, + "reuse_frac": {"intra": intra / reuse, "cross": cross / reuse}, + # cross resolved by popularity (over reuse hits) + "pop_labels": POP_LABELS, + "pop_blocks": pop_blocks, + "pop_intra": pop_intra, + "pop_cross": pop_cross, + # LRU reuse-distance recency (over reuse hits) + "gap_labels": GAP_LABELS, + "rec_intra": rec_intra, + "rec_cross": rec_cross, +} +with open(OUT, "w") as f: + json.dump(result, f, indent=2) +sys.stderr.write(f"[{time.time()-t0:.0f}s] wrote {OUT}\n") + +# human summary +print(json.dumps({k: result[k] for k in + ("n_requests","n_sessions","distinct_blocks","reuse_total", + "reuse_frac","apc_ceiling")}, indent=2)) +print(f"new(context)={new} intra={intra} cross={cross}") +print("popularity blocks / intra-hits / cross-hits:") +for i, lab in enumerate(POP_LABELS): + print(f" {lab:>12}: {pop_blocks[i]:>10} | {pop_intra[i]:>11} | {pop_cross[i]:>11}") +print("LRU reuse-distance intra / cross:") +for i, lab in enumerate(GAP_LABELS): + print(f" {lab:>8}: {rec_intra[i]:>11} | {rec_cross[i]:>11}") diff --git a/paper/data/f2a_reuse_topology_result.json b/paper/data/f2a_reuse_topology_result.json new file mode 100644 index 0000000..4d3bd96 --- /dev/null +++ b/paper/data/f2a_reuse_topology_result.json @@ -0,0 +1,77 @@ +{ + "trace": "051315-051317.jsonl", + "semantics": "LRU last-touched; reuse-hits only (new excluded from split)", + "n_requests": 2114220, + "n_sessions": 1307276, + "turn1_frac": 0.6183254344391785, + "block_size_tokens_eff": 508.1517503092776, + "total_input_tokens": 71116829368, + "total_block_occ": 139951952, + "distinct_blocks": 20650883, + "new_occ": 20650883, + "apc_ceiling": 0.8524430513123532, + "reuse_total": 119301069, + "reuse": { + "intra": 65166144, + "cross": 54134925 + }, + "reuse_frac": { + "intra": 0.5462326913432771, + "cross": 0.45376730865672293 + }, + "pop_labels": [ + "1 (private)", + "2-9", + "10-99", + "100-999", + ">=1000" + ], + "pop_blocks": [ + 14581108, + 5535433, + 517069, + 16153, + 1120 + ], + "pop_intra": [ + 44515497, + 14288480, + 5421050, + 924419, + 16698 + ], + "pop_cross": [ + 0, + 20230912, + 13750153, + 7689338, + 12464522 + ], + "gap_labels": [ + "<1s", + "1-10s", + "10-60s", + "1-5min", + "5-30min", + "30-60min", + ">60min" + ], + "rec_intra": [ + 390952, + 26060293, + 31317556, + 5877221, + 1384772, + 109673, + 25677 + ], + "rec_cross": [ + 13222875, + 22254795, + 11653445, + 4965765, + 1747487, + 220816, + 69742 + ] +} \ No newline at end of file