#!/usr/bin/env python3 """Annotate a formatted/sampled trace with `time_to_parent_chat` (seconds). time_to_parent_chat = this_turn.request_ready_time_ms - parent_turn.request_end_time_ms i.e. the real external gap (tool exec + agent think) between the parent turn *finishing* in production and this turn *arriving*. Turn-1 (parent_chat_id == -1) gets null. The end/ready times live only in the raw trace (meta.*); the sampled trace has neither. So we scan the raw trace once (byte-level field extraction, early-exit once every needed chat_id is found) to build {chat_id: (ready_ms, end_ms)}, then join. Run on dash0 (raw trace is there). """ from __future__ import annotations import json import sys import time from pathlib import Path RAW = "/home/admin/cpfs/wjh/ali-trace/trace-glm5.1-formatted/051315-051317-raw.jsonl" KCHAT = b'"chat_id":' KREADY = b'"request_ready_time_ms":' KEND = b'"request_end_time_ms":' def parse_int_after(line: bytes, key: bytes): i = line.find(key) if i < 0: return None i += len(key) n = len(line) while i < n and line[i] in (0x20, 0x09): # space/tab i += 1 j = i if j < n and line[j] == 0x2D: # '-' j += 1 while j < n and 0x30 <= line[j] <= 0x39: j += 1 return int(line[i:j]) if j > i and line[i:j] != b'-' else None def scan_timing(needed: set[int], raw_path: str) -> dict[int, tuple[int, int]]: timing: dict[int, tuple[int, int]] = {} t0 = time.time() nbytes = 0 with open(raw_path, "rb", buffering=1 << 22) as f: for line in f: nbytes += len(line) cid = parse_int_after(line, KCHAT) if cid is None or cid not in needed or cid in timing: continue ready = parse_int_after(line, KREADY) end = parse_int_after(line, KEND) if ready is None or end is None: continue timing[cid] = (ready, end) if len(timing) == len(needed): break print(f"[scan] found {len(timing)}/{len(needed)} chats in " f"{nbytes/1e9:.1f} GB / {time.time()-t0:.0f}s", flush=True) return timing def main(): in_trace = Path(sys.argv[1]) out_trace = Path(sys.argv[2]) raw_path = sys.argv[3] if len(sys.argv) > 3 else RAW rows = [json.loads(l) for l in open(in_trace)] chats = {r["chat_id"] for r in rows} parents = {r["parent_chat_id"] for r in rows if r.get("parent_chat_id") not in (None, -1, 0, "")} needed = chats | parents print(f"[trace] {len(rows)} reqs, {len(chats)} chats, " f"{len(parents)} parents, {len(needed)} chat_ids to look up", flush=True) timing = scan_timing(needed, raw_path) # annotate n_ann = n_neg = 0 ttps = [] min_ready = min((t[0] for t in timing.values()), default=0) for r in rows: p = r.get("parent_chat_id") ttp = None if p not in (None, -1, 0, "") and p in timing and r["chat_id"] in timing: ttp = (timing[r["chat_id"]][0] - timing[p][1]) / 1000.0 # ready - parent.end if ttp < 0: n_neg += 1 ttp = 0.0 ttps.append(ttp) n_ann += 1 r["time_to_parent_chat"] = ttp # sanity field: re-derived ready offset (s) to cross-check vs timestamp if r["chat_id"] in timing: r["_ready_off_s"] = (timing[r["chat_id"]][0] - min_ready) / 1000.0 with open(out_trace, "w") as o: for r in rows: o.write(json.dumps(r) + "\n") ttps.sort() n = len(ttps) pc = lambda q: ttps[min(int(q * n), n - 1)] if n else 0 print(f"[done] annotated {n_ann} turns with time_to_parent_chat " f"({n_neg} negative clamped to 0)") print(f"[ttp] p25={pc(.25):.2f}s p50={pc(.5):.2f}s p90={pc(.9):.2f}s " f"p99={pc(.99):.2f}s (f3a ref: p50~1.6s)") print(f"[ttp] frac<1s={sum(1 for x in ttps if x<1)/n:.0%} " f"frac<5s={sum(1 for x in ttps if x<5)/n:.0%}") # cross-check timestamp vs re-derived ready offset on a few rows chk = [(r["timestamp"], r["_ready_off_s"]) for r in rows if "_ready_off_s" in r][:5] print("[sanity] (trace.timestamp, raw ready_off_s):", [(round(a,1),round(b,1)) for a,b in chk]) print(f"wrote {out_trace}") if __name__ == "__main__": main()