trace: time_to_parent_chat annotation + thinktime trace variants
Adds `scripts/add_ttp_streaming.py`: one streaming pass over the 522 GB raw
glm5.1 trace to build {chat_id: (ready_ms, end_ms)} and join the real
inter-turn gap onto the COMPLETE formatted trace (no early-exit, low memory).
time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000
= tool-exec + agent think-time; turn-1 -> null, negatives clamped to 0.
Ships the two ttp-annotated sampled traces (same anonymized data + one
timestamp-derived field; regenerated via sample_trace.py --seed 42 so they are
row-for-row identical to the non-ttp variants on all 9 shared fields):
traces/w600_r0.0015_st30_ttp.jsonl (1214 reqs)
traces/w600_r0.0015_st30_first600s_ttp.jsonl (807 reqs)
They are needed to replay with --dispatch-mode thinktime without the
non-redistributable raw trace, so they are added to the .gitignore allowlist.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
133
scripts/add_ttp_streaming.py
Normal file
133
scripts/add_ttp_streaming.py
Normal file
@@ -0,0 +1,133 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Annotate the COMPLETE formatted trace with `time_to_parent_chat` (seconds).
|
||||
|
||||
Streaming variant of add_time_to_parent.py for the full 2-h trace (~2.1M rows):
|
||||
the needed chat_id set == all chats, so there is no early-exit -- we read the
|
||||
whole raw once anyway. So instead of holding all formatted rows in memory we:
|
||||
|
||||
1. scan raw once -> timing = {chat_id: (ready_ms, end_ms)} (+ track min_ready)
|
||||
2. stream the formatted file line-by-line, join, write out.
|
||||
|
||||
time_to_parent_chat = (this.request_ready_time_ms - parent.request_end_time_ms)/1000
|
||||
= real external gap (tool exec + agent think) between the parent turn
|
||||
*finishing* in production and this turn *arriving*.
|
||||
turn-1 (parent_chat_id in {-1,0,null}) -> null; negatives clamped to 0;
|
||||
parent outside the window (no timing) -> null.
|
||||
|
||||
Run on dash0 (raw lives there).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
|
||||
KCHAT = b'"chat_id":'
|
||||
KREADY = b'"request_ready_time_ms":'
|
||||
KEND = b'"request_end_time_ms":'
|
||||
|
||||
|
||||
def parse_int_after(line: bytes, key: bytes):
|
||||
i = line.find(key)
|
||||
if i < 0:
|
||||
return None
|
||||
i += len(key)
|
||||
n = len(line)
|
||||
while i < n and line[i] in (0x20, 0x09): # space/tab
|
||||
i += 1
|
||||
j = i
|
||||
if j < n and line[j] == 0x2D: # '-'
|
||||
j += 1
|
||||
while j < n and 0x30 <= line[j] <= 0x39:
|
||||
j += 1
|
||||
return int(line[i:j]) if j > i and line[i:j] != b'-' else None
|
||||
|
||||
|
||||
def scan_timing(raw_path: str):
|
||||
"""One pass over raw -> {chat_id: (ready_ms, end_ms)}, plus min ready_ms."""
|
||||
timing: dict[int, tuple[int, int]] = {}
|
||||
min_ready = None
|
||||
t0 = time.time()
|
||||
nbytes = nlines = 0
|
||||
with open(raw_path, "rb", buffering=1 << 22) as f:
|
||||
for line in f:
|
||||
nbytes += len(line)
|
||||
nlines += 1
|
||||
cid = parse_int_after(line, KCHAT)
|
||||
if cid is None or cid in timing:
|
||||
continue
|
||||
ready = parse_int_after(line, KREADY)
|
||||
end = parse_int_after(line, KEND)
|
||||
if ready is None or end is None:
|
||||
continue
|
||||
timing[cid] = (ready, end)
|
||||
if min_ready is None or ready < min_ready:
|
||||
min_ready = ready
|
||||
if nlines % 200000 == 0:
|
||||
print(f"[scan] {nlines} lines / {nbytes/1e9:.0f} GB / "
|
||||
f"{time.time()-t0:.0f}s / {len(timing)} chats", flush=True)
|
||||
print(f"[scan] DONE {len(timing)} chats in {nbytes/1e9:.1f} GB / "
|
||||
f"{time.time()-t0:.0f}s", flush=True)
|
||||
return timing, (min_ready or 0)
|
||||
|
||||
|
||||
def main():
|
||||
in_trace = sys.argv[1] # formatted compact .jsonl
|
||||
out_trace = sys.argv[2] # output .jsonl
|
||||
raw_path = sys.argv[3] # 522 GB raw .jsonl
|
||||
|
||||
timing, min_ready = scan_timing(raw_path)
|
||||
|
||||
n_rows = n_ann = n_neg = n_no_self = n_no_parent = 0
|
||||
ttps = []
|
||||
t0 = time.time()
|
||||
with open(in_trace) as fin, open(out_trace, "w") as fout:
|
||||
for ln in fin:
|
||||
r = json.loads(ln)
|
||||
n_rows += 1
|
||||
cid = r["chat_id"]
|
||||
p = r.get("parent_chat_id")
|
||||
ttp = None
|
||||
if p not in (None, -1, 0, ""):
|
||||
if cid in timing and p in timing:
|
||||
ttp = (timing[cid][0] - timing[p][1]) / 1000.0
|
||||
if ttp < 0:
|
||||
n_neg += 1
|
||||
ttp = 0.0
|
||||
ttps.append(ttp)
|
||||
n_ann += 1
|
||||
elif cid not in timing:
|
||||
n_no_self += 1
|
||||
else:
|
||||
n_no_parent += 1
|
||||
r["time_to_parent_chat"] = ttp
|
||||
if cid in timing:
|
||||
r["_ready_off_s"] = (timing[cid][0] - min_ready) / 1000.0
|
||||
fout.write(json.dumps(r) + "\n")
|
||||
|
||||
ttps.sort()
|
||||
n = len(ttps)
|
||||
pc = lambda q: ttps[min(int(q * n), n - 1)] if n else 0
|
||||
print(f"[join] {time.time()-t0:.0f}s", flush=True)
|
||||
print(f"[done] rows={n_rows} annotated={n_ann} "
|
||||
f"(neg_clamped={n_neg}) self_missing={n_no_self} "
|
||||
f"parent_missing={n_no_parent}")
|
||||
print(f"[ttp] p25={pc(.25):.2f}s p50={pc(.5):.2f}s p90={pc(.9):.2f}s "
|
||||
f"p99={pc(.99):.2f}s (f3a ref: p50~1.6s)")
|
||||
if n:
|
||||
print(f"[ttp] frac<1s={sum(1 for x in ttps if x<1)/n:.0%} "
|
||||
f"frac<5s={sum(1 for x in ttps if x<5)/n:.0%}")
|
||||
# sanity: trace.timestamp vs re-derived ready offset for a few rows
|
||||
chk = []
|
||||
for ln in open(out_trace):
|
||||
r = json.loads(ln)
|
||||
if "_ready_off_s" in r:
|
||||
chk.append((r["timestamp"], r["_ready_off_s"]))
|
||||
if len(chk) >= 5:
|
||||
break
|
||||
print("[sanity] (trace.timestamp, raw ready_off_s):",
|
||||
[(round(a, 1), round(b, 1)) for a, b in chk])
|
||||
print(f"wrote {out_trace}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user