Files
agentic-kvc/scripts/add_ttp_streaming.py
Gahow Wang 075f5bbc22 trace: time_to_parent_chat annotation + thinktime trace variants
Adds `scripts/add_ttp_streaming.py`: one streaming pass over the 522 GB raw
glm5.1 trace to build {chat_id: (ready_ms, end_ms)} and join the real
inter-turn gap onto the COMPLETE formatted trace (no early-exit, low memory).
  time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000
  = tool-exec + agent think-time; turn-1 -> null, negatives clamped to 0.

Ships the two ttp-annotated sampled traces (same anonymized data + one
timestamp-derived field; regenerated via sample_trace.py --seed 42 so they are
row-for-row identical to the non-ttp variants on all 9 shared fields):
  traces/w600_r0.0015_st30_ttp.jsonl          (1214 reqs)
  traces/w600_r0.0015_st30_first600s_ttp.jsonl (807 reqs)
They are needed to replay with --dispatch-mode thinktime without the
non-redistributable raw trace, so they are added to the .gitignore allowlist.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-30 20:58:49 +08:00

134 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""Annotate the COMPLETE formatted trace with `time_to_parent_chat` (seconds).
Streaming variant of add_time_to_parent.py for the full 2-h trace (~2.1M rows):
the needed chat_id set == all chats, so there is no early-exit -- we read the
whole raw once anyway. So instead of holding all formatted rows in memory we:
1. scan raw once -> timing = {chat_id: (ready_ms, end_ms)} (+ track min_ready)
2. stream the formatted file line-by-line, join, write out.
time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000
= real external gap (tool exec + agent think) between the parent turn
*finishing* in production and this turn *arriving*.
turn-1 (parent_chat_id in {-1,0,null}) -> null; negatives clamped to 0;
parent outside the window (no timing) -> null.
Run on dash0 (raw lives there).
"""
from __future__ import annotations
import json
import sys
import time
KCHAT = b'"chat_id":'
KREADY = b'"request_ready_time_ms":'
KEND = b'"request_end_time_ms":'
def parse_int_after(line: bytes, key: bytes):
i = line.find(key)
if i < 0:
return None
i += len(key)
n = len(line)
while i < n and line[i] in (0x20, 0x09): # space/tab
i += 1
j = i
if j < n and line[j] == 0x2D: # '-'
j += 1
while j < n and 0x30 <= line[j] <= 0x39:
j += 1
return int(line[i:j]) if j > i and line[i:j] != b'-' else None
def scan_timing(raw_path: str):
"""One pass over raw -> {chat_id: (ready_ms, end_ms)}, plus min ready_ms."""
timing: dict[int, tuple[int, int]] = {}
min_ready = None
t0 = time.time()
nbytes = nlines = 0
with open(raw_path, "rb", buffering=1 << 22) as f:
for line in f:
nbytes += len(line)
nlines += 1
cid = parse_int_after(line, KCHAT)
if cid is None or cid in timing:
continue
ready = parse_int_after(line, KREADY)
end = parse_int_after(line, KEND)
if ready is None or end is None:
continue
timing[cid] = (ready, end)
if min_ready is None or ready < min_ready:
min_ready = ready
if nlines % 200000 == 0:
print(f"[scan] {nlines} lines / {nbytes/1e9:.0f} GB / "
f"{time.time()-t0:.0f}s / {len(timing)} chats", flush=True)
print(f"[scan] DONE {len(timing)} chats in {nbytes/1e9:.1f} GB / "
f"{time.time()-t0:.0f}s", flush=True)
return timing, (min_ready or 0)
def main():
in_trace = sys.argv[1] # formatted compact .jsonl
out_trace = sys.argv[2] # output .jsonl
raw_path = sys.argv[3] # 522 GB raw .jsonl
timing, min_ready = scan_timing(raw_path)
n_rows = n_ann = n_neg = n_no_self = n_no_parent = 0
ttps = []
t0 = time.time()
with open(in_trace) as fin, open(out_trace, "w") as fout:
for ln in fin:
r = json.loads(ln)
n_rows += 1
cid = r["chat_id"]
p = r.get("parent_chat_id")
ttp = None
if p not in (None, -1, 0, ""):
if cid in timing and p in timing:
ttp = (timing[cid][0] - timing[p][1]) / 1000.0
if ttp < 0:
n_neg += 1
ttp = 0.0
ttps.append(ttp)
n_ann += 1
elif cid not in timing:
n_no_self += 1
else:
n_no_parent += 1
r["time_to_parent_chat"] = ttp
if cid in timing:
r["_ready_off_s"] = (timing[cid][0] - min_ready) / 1000.0
fout.write(json.dumps(r) + "\n")
ttps.sort()
n = len(ttps)
pc = lambda q: ttps[min(int(q * n), n - 1)] if n else 0
print(f"[join] {time.time()-t0:.0f}s", flush=True)
print(f"[done] rows={n_rows} annotated={n_ann} "
f"(neg_clamped={n_neg}) self_missing={n_no_self} "
f"parent_missing={n_no_parent}")
print(f"[ttp] p25={pc(.25):.2f}s p50={pc(.5):.2f}s p90={pc(.9):.2f}s "
f"p99={pc(.99):.2f}s (f3a ref: p50~1.6s)")
if n:
print(f"[ttp] frac<1s={sum(1 for x in ttps if x<1)/n:.0%} "
f"frac<5s={sum(1 for x in ttps if x<5)/n:.0%}")
# sanity: trace.timestamp vs re-derived ready offset for a few rows
chk = []
for ln in open(out_trace):
r = json.loads(ln)
if "_ready_off_s" in r:
chk.append((r["timestamp"], r["_ready_off_s"]))
if len(chk) >= 5:
break
print("[sanity] (trace.timestamp, raw ready_off_s):",
[(round(a, 1), round(b, 1)) for a, b in chk])
print(f"wrote {out_trace}")
if __name__ == "__main__":
main()