#!/usr/bin/env python3 """Annotate the COMPLETE formatted trace with `time_to_parent_chat` (seconds). Streaming variant of add_time_to_parent.py for the full 2-h trace (~2.1M rows): the needed chat_id set == all chats, so there is no early-exit -- we read the whole raw once anyway. So instead of holding all formatted rows in memory we: 1. scan raw once -> timing = {chat_id: (ready_ms, end_ms)} (+ track min_ready) 2. stream the formatted file line-by-line, join, write out. time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000 = real external gap (tool exec + agent think) between the parent turn *finishing* in production and this turn *arriving*. turn-1 (parent_chat_id in {-1,0,null}) -> null; negatives clamped to 0; parent outside the window (no timing) -> null. Run on dash0 (raw lives there). """ from __future__ import annotations import json import sys import time KCHAT = b'"chat_id":' KREADY = b'"request_ready_time_ms":' KEND = b'"request_end_time_ms":' def parse_int_after(line: bytes, key: bytes): i = line.find(key) if i < 0: return None i += len(key) n = len(line) while i < n and line[i] in (0x20, 0x09): # space/tab i += 1 j = i if j < n and line[j] == 0x2D: # '-' j += 1 while j < n and 0x30 <= line[j] <= 0x39: j += 1 return int(line[i:j]) if j > i and line[i:j] != b'-' else None def scan_timing(raw_path: str): """One pass over raw -> {chat_id: (ready_ms, end_ms)}, plus min ready_ms.""" timing: dict[int, tuple[int, int]] = {} min_ready = None t0 = time.time() nbytes = nlines = 0 with open(raw_path, "rb", buffering=1 << 22) as f: for line in f: nbytes += len(line) nlines += 1 cid = parse_int_after(line, KCHAT) if cid is None or cid in timing: continue ready = parse_int_after(line, KREADY) end = parse_int_after(line, KEND) if ready is None or end is None: continue timing[cid] = (ready, end) if min_ready is None or ready < min_ready: min_ready = ready if nlines % 200000 == 0: print(f"[scan] {nlines} lines / {nbytes/1e9:.0f} GB / " f"{time.time()-t0:.0f}s / {len(timing)} chats", flush=True) print(f"[scan] DONE {len(timing)} chats in {nbytes/1e9:.1f} GB / " f"{time.time()-t0:.0f}s", flush=True) return timing, (min_ready or 0) def main(): in_trace = sys.argv[1] # formatted compact .jsonl out_trace = sys.argv[2] # output .jsonl raw_path = sys.argv[3] # 522 GB raw .jsonl timing, min_ready = scan_timing(raw_path) n_rows = n_ann = n_neg = n_no_self = n_no_parent = 0 ttps = [] t0 = time.time() with open(in_trace) as fin, open(out_trace, "w") as fout: for ln in fin: r = json.loads(ln) n_rows += 1 cid = r["chat_id"] p = r.get("parent_chat_id") ttp = None if p not in (None, -1, 0, ""): if cid in timing and p in timing: ttp = (timing[cid][0] - timing[p][1]) / 1000.0 if ttp < 0: n_neg += 1 ttp = 0.0 ttps.append(ttp) n_ann += 1 elif cid not in timing: n_no_self += 1 else: n_no_parent += 1 r["time_to_parent_chat"] = ttp if cid in timing: r["_ready_off_s"] = (timing[cid][0] - min_ready) / 1000.0 fout.write(json.dumps(r) + "\n") ttps.sort() n = len(ttps) pc = lambda q: ttps[min(int(q * n), n - 1)] if n else 0 print(f"[join] {time.time()-t0:.0f}s", flush=True) print(f"[done] rows={n_rows} annotated={n_ann} " f"(neg_clamped={n_neg}) self_missing={n_no_self} " f"parent_missing={n_no_parent}") print(f"[ttp] p25={pc(.25):.2f}s p50={pc(.5):.2f}s p90={pc(.9):.2f}s " f"p99={pc(.99):.2f}s (f3a ref: p50~1.6s)") if n: print(f"[ttp] frac<1s={sum(1 for x in ttps if x<1)/n:.0%} " f"frac<5s={sum(1 for x in ttps if x<5)/n:.0%}") # sanity: trace.timestamp vs re-derived ready offset for a few rows chk = [] for ln in open(out_trace): r = json.loads(ln) if "_ready_off_s" in r: chk.append((r["timestamp"], r["_ready_off_s"])) if len(chk) >= 5: break print("[sanity] (trace.timestamp, raw ready_off_s):", [(round(a, 1), round(b, 1)) for a, b in chk]) print(f"wrote {out_trace}") if __name__ == "__main__": main()