A1: replayer instrumentation for cross-process join
RequestMetrics gains absolute unix timestamps (t_dispatch_unix, t_first_token_unix, t_finish_unix), the proxy_request_id, the chosen endpoint URL, and the trace hash_ids. Replayer sends X-Request-Id: <session_id>:<turn_id>:<chat_id>:<idx> so proxy breakdown rows can be joined to metrics by exact key. Required by Batch 0 (online sequentiality proof) and Batch 1 reuse decomposition; existing metrics.jsonl couldn't establish either. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -28,6 +28,12 @@ class RequestMetrics:
|
||||
requested_output_tokens: int | None = None
|
||||
finish_reason: str | None = None
|
||||
error: str | None = None
|
||||
t_dispatch_unix: float | None = None
|
||||
t_first_token_unix: float | None = None
|
||||
t_finish_unix: float | None = None
|
||||
proxy_request_id: str | None = None
|
||||
endpoint_url: str | None = None
|
||||
trace_hash_ids: tuple[int, ...] = ()
|
||||
|
||||
|
||||
class IncrementalMetricSink:
|
||||
|
||||
@@ -125,6 +125,8 @@ async def _dispatch_request(
|
||||
}
|
||||
|
||||
start = time.perf_counter()
|
||||
t_dispatch_unix = time.time()
|
||||
t_first_token_unix: float | None = None
|
||||
ttft_s = None
|
||||
n_output = 0
|
||||
cached_tokens = 0
|
||||
@@ -133,7 +135,10 @@ async def _dispatch_request(
|
||||
token_times: list[float] = []
|
||||
output_token_ids: list[int] = []
|
||||
|
||||
req_headers = {"X-Session-Id": req.session_id}
|
||||
req_headers = {
|
||||
"X-Session-Id": req.session_id,
|
||||
"X-Request-Id": req.request_id,
|
||||
}
|
||||
|
||||
async with sem:
|
||||
try:
|
||||
@@ -169,11 +174,13 @@ async def _dispatch_request(
|
||||
if clean_ids:
|
||||
if ttft_s is None:
|
||||
ttft_s = now - start
|
||||
t_first_token_unix = time.time()
|
||||
output_token_ids.extend(clean_ids)
|
||||
token_times.extend([now] * len(clean_ids))
|
||||
elif delta:
|
||||
if ttft_s is None:
|
||||
ttft_s = now - start
|
||||
t_first_token_unix = time.time()
|
||||
token_times.append(now)
|
||||
fr = choices[0].get("finish_reason")
|
||||
if fr:
|
||||
@@ -187,6 +194,7 @@ async def _dispatch_request(
|
||||
err = repr(exc)[:300]
|
||||
|
||||
end = time.perf_counter()
|
||||
t_finish_unix = time.time()
|
||||
e2e = end - start
|
||||
if output_token_ids:
|
||||
n_output = len(output_token_ids)
|
||||
@@ -222,6 +230,12 @@ async def _dispatch_request(
|
||||
requested_output_tokens=req.output_length,
|
||||
finish_reason=finish_reason,
|
||||
error=err,
|
||||
t_dispatch_unix=t_dispatch_unix,
|
||||
t_first_token_unix=t_first_token_unix,
|
||||
t_finish_unix=t_finish_unix,
|
||||
proxy_request_id=req.request_id,
|
||||
endpoint_url=endpoint,
|
||||
trace_hash_ids=req.hash_ids,
|
||||
),
|
||||
output_token_ids=output_token_ids,
|
||||
)
|
||||
|
||||
75
tests/test_metrics_instrumentation.py
Normal file
75
tests/test_metrics_instrumentation.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""Tests for A1 replayer instrumentation: new timestamp + join fields."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import asdict
|
||||
|
||||
from replayer.metrics import RequestMetrics
|
||||
|
||||
|
||||
def _make_metric(**overrides) -> RequestMetrics:
|
||||
base = dict(
|
||||
request_id="sess1:0:42:0",
|
||||
session_id="sess1",
|
||||
turn_id=0,
|
||||
trace_timestamp_s=1.0,
|
||||
input_length=1000,
|
||||
output_length=50,
|
||||
request_type="user",
|
||||
effective_input_length=1000,
|
||||
cached_tokens=512,
|
||||
latency_s=2.5,
|
||||
ttft_s=0.5,
|
||||
tpot_s=0.04,
|
||||
)
|
||||
base.update(overrides)
|
||||
return RequestMetrics(**base)
|
||||
|
||||
|
||||
def test_request_metrics_has_new_join_fields():
|
||||
m = _make_metric()
|
||||
fields = asdict(m)
|
||||
for key in (
|
||||
"t_dispatch_unix",
|
||||
"t_first_token_unix",
|
||||
"t_finish_unix",
|
||||
"proxy_request_id",
|
||||
"endpoint_url",
|
||||
"trace_hash_ids",
|
||||
):
|
||||
assert key in fields, f"missing {key}"
|
||||
|
||||
|
||||
def test_request_metrics_defaults_when_unset():
|
||||
m = _make_metric()
|
||||
assert m.t_dispatch_unix is None
|
||||
assert m.t_first_token_unix is None
|
||||
assert m.t_finish_unix is None
|
||||
assert m.proxy_request_id is None
|
||||
assert m.endpoint_url is None
|
||||
assert m.trace_hash_ids == ()
|
||||
|
||||
|
||||
def test_request_metrics_round_trips_through_json():
|
||||
m = _make_metric(
|
||||
t_dispatch_unix=1700000000.123,
|
||||
t_first_token_unix=1700000000.5,
|
||||
t_finish_unix=1700000002.0,
|
||||
proxy_request_id="sess1:0:42:0",
|
||||
endpoint_url="http://node:8000",
|
||||
trace_hash_ids=(11, 22, 33),
|
||||
)
|
||||
payload = json.dumps(asdict(m), sort_keys=True)
|
||||
parsed = json.loads(payload)
|
||||
assert parsed["t_dispatch_unix"] == 1700000000.123
|
||||
assert parsed["t_first_token_unix"] == 1700000000.5
|
||||
assert parsed["t_finish_unix"] == 1700000002.0
|
||||
assert parsed["proxy_request_id"] == "sess1:0:42:0"
|
||||
assert parsed["endpoint_url"] == "http://node:8000"
|
||||
assert parsed["trace_hash_ids"] == [11, 22, 33]
|
||||
|
||||
|
||||
def test_proxy_request_id_matches_replayer_request_id():
|
||||
m = _make_metric(proxy_request_id="sess1:0:42:0")
|
||||
assert m.proxy_request_id == m.request_id
|
||||
Reference in New Issue
Block a user