Adds `scripts/add_ttp_streaming.py`: one streaming pass over the 522 GB raw
glm5.1 trace to build {chat_id: (ready_ms, end_ms)} and join the real
inter-turn gap onto the COMPLETE formatted trace (no early-exit, low memory).
time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000
= tool-exec + agent think-time; turn-1 -> null, negatives clamped to 0.
Ships the two ttp-annotated sampled traces (same anonymized data + one
timestamp-derived field; regenerated via sample_trace.py --seed 42 so they are
row-for-row identical to the non-ttp variants on all 9 shared fields):
traces/w600_r0.0015_st30_ttp.jsonl (1214 reqs)
traces/w600_r0.0015_st30_first600s_ttp.jsonl (807 reqs)
They are needed to replay with --dispatch-mode thinktime without the
non-redistributable raw trace, so they are added to the .gitignore allowlist.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
134 lines
4.7 KiB
Python
134 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Annotate the COMPLETE formatted trace with `time_to_parent_chat` (seconds).
|
|
|
|
Streaming variant of add_time_to_parent.py for the full 2-h trace (~2.1M rows):
|
|
the needed chat_id set == all chats, so there is no early-exit -- we read the
|
|
whole raw once anyway. So instead of holding all formatted rows in memory we:
|
|
|
|
1. scan raw once -> timing = {chat_id: (ready_ms, end_ms)} (+ track min_ready)
|
|
2. stream the formatted file line-by-line, join, write out.
|
|
|
|
time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000
|
|
= real external gap (tool exec + agent think) between the parent turn
|
|
*finishing* in production and this turn *arriving*.
|
|
turn-1 (parent_chat_id in {-1,0,null}) -> null; negatives clamped to 0;
|
|
parent outside the window (no timing) -> null.
|
|
|
|
Run on dash0 (raw lives there).
|
|
"""
|
|
from __future__ import annotations
|
|
import json
|
|
import sys
|
|
import time
|
|
|
|
KCHAT = b'"chat_id":'
|
|
KREADY = b'"request_ready_time_ms":'
|
|
KEND = b'"request_end_time_ms":'
|
|
|
|
|
|
def parse_int_after(line: bytes, key: bytes):
|
|
i = line.find(key)
|
|
if i < 0:
|
|
return None
|
|
i += len(key)
|
|
n = len(line)
|
|
while i < n and line[i] in (0x20, 0x09): # space/tab
|
|
i += 1
|
|
j = i
|
|
if j < n and line[j] == 0x2D: # '-'
|
|
j += 1
|
|
while j < n and 0x30 <= line[j] <= 0x39:
|
|
j += 1
|
|
return int(line[i:j]) if j > i and line[i:j] != b'-' else None
|
|
|
|
|
|
def scan_timing(raw_path: str):
|
|
"""One pass over raw -> {chat_id: (ready_ms, end_ms)}, plus min ready_ms."""
|
|
timing: dict[int, tuple[int, int]] = {}
|
|
min_ready = None
|
|
t0 = time.time()
|
|
nbytes = nlines = 0
|
|
with open(raw_path, "rb", buffering=1 << 22) as f:
|
|
for line in f:
|
|
nbytes += len(line)
|
|
nlines += 1
|
|
cid = parse_int_after(line, KCHAT)
|
|
if cid is None or cid in timing:
|
|
continue
|
|
ready = parse_int_after(line, KREADY)
|
|
end = parse_int_after(line, KEND)
|
|
if ready is None or end is None:
|
|
continue
|
|
timing[cid] = (ready, end)
|
|
if min_ready is None or ready < min_ready:
|
|
min_ready = ready
|
|
if nlines % 200000 == 0:
|
|
print(f"[scan] {nlines} lines / {nbytes/1e9:.0f} GB / "
|
|
f"{time.time()-t0:.0f}s / {len(timing)} chats", flush=True)
|
|
print(f"[scan] DONE {len(timing)} chats in {nbytes/1e9:.1f} GB / "
|
|
f"{time.time()-t0:.0f}s", flush=True)
|
|
return timing, (min_ready or 0)
|
|
|
|
|
|
def main():
|
|
in_trace = sys.argv[1] # formatted compact .jsonl
|
|
out_trace = sys.argv[2] # output .jsonl
|
|
raw_path = sys.argv[3] # 522 GB raw .jsonl
|
|
|
|
timing, min_ready = scan_timing(raw_path)
|
|
|
|
n_rows = n_ann = n_neg = n_no_self = n_no_parent = 0
|
|
ttps = []
|
|
t0 = time.time()
|
|
with open(in_trace) as fin, open(out_trace, "w") as fout:
|
|
for ln in fin:
|
|
r = json.loads(ln)
|
|
n_rows += 1
|
|
cid = r["chat_id"]
|
|
p = r.get("parent_chat_id")
|
|
ttp = None
|
|
if p not in (None, -1, 0, ""):
|
|
if cid in timing and p in timing:
|
|
ttp = (timing[cid][0] - timing[p][1]) / 1000.0
|
|
if ttp < 0:
|
|
n_neg += 1
|
|
ttp = 0.0
|
|
ttps.append(ttp)
|
|
n_ann += 1
|
|
elif cid not in timing:
|
|
n_no_self += 1
|
|
else:
|
|
n_no_parent += 1
|
|
r["time_to_parent_chat"] = ttp
|
|
if cid in timing:
|
|
r["_ready_off_s"] = (timing[cid][0] - min_ready) / 1000.0
|
|
fout.write(json.dumps(r) + "\n")
|
|
|
|
ttps.sort()
|
|
n = len(ttps)
|
|
pc = lambda q: ttps[min(int(q * n), n - 1)] if n else 0
|
|
print(f"[join] {time.time()-t0:.0f}s", flush=True)
|
|
print(f"[done] rows={n_rows} annotated={n_ann} "
|
|
f"(neg_clamped={n_neg}) self_missing={n_no_self} "
|
|
f"parent_missing={n_no_parent}")
|
|
print(f"[ttp] p25={pc(.25):.2f}s p50={pc(.5):.2f}s p90={pc(.9):.2f}s "
|
|
f"p99={pc(.99):.2f}s (f3a ref: p50~1.6s)")
|
|
if n:
|
|
print(f"[ttp] frac<1s={sum(1 for x in ttps if x<1)/n:.0%} "
|
|
f"frac<5s={sum(1 for x in ttps if x<5)/n:.0%}")
|
|
# sanity: trace.timestamp vs re-derived ready offset for a few rows
|
|
chk = []
|
|
for ln in open(out_trace):
|
|
r = json.loads(ln)
|
|
if "_ready_off_s" in r:
|
|
chk.append((r["timestamp"], r["_ready_off_s"]))
|
|
if len(chk) >= 5:
|
|
break
|
|
print("[sanity] (trace.timestamp, raw ready_off_s):",
|
|
[(round(a, 1), round(b, 1)) for a, b in chk])
|
|
print(f"wrote {out_trace}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|