Document the iterative debugging from v1 (broken KVC) through v4
(routing fixed + session cap raised), with code-level analysis of
the two main bugs encountered:
1. v2 root cause (mis-diagnosed previously as `allow_local_prefill`):
`--policy default` for KVC mechanism caused replay's round-robin
policy and the PD router's round-robin to diverge, sending requests
with `session_params` to a D worker that did not have the session
open. Resulted in 56-61% truncation with finish_reason
"session id X does not exist".
Fix: use `--policy kv-aware` (sweep_tp1_v3_kvaware.sh) so replay
emits `x-smg-target-worker` and PD router uses consistent_hashing.
2. v3 new bottleneck: `pd-router-fallback-large-append-session-cap`
dominated 52-65% of requests. Root cause was hardcoded
`min(4, ...)` in `_decode_session_soft_cap`. With 7 D workers x 4
sessions = 28 slots for 52 trace sessions, ~24 sessions starved
permanently (bimodal direct-to-D rate of 0% or 99%).
Fix: raise the cap to 16 (replay.py).
Also includes the v3 finding that direct-to-d-session path P50=0.495s
and TTFT P50=0.043s already beats the 8-way DP baseline (0.65s/0.093s)
- the KVC core mechanism works when fallback paths are avoided.
Files:
- docs/KVC_DEBUG_JOURNEY_V1_TO_V4.md: full journey + code location index
- docs/SWEBENCH_EXPERIMENT_{PROGRESS,RESULTS}.md: prior session notes
- scripts/sweep_tp1_v{2,3,4}*.sh: experiment driver scripts
- src/agentic_pd_hybrid/replay.py: cap 4 -> 16, audit fields
- src/agentic_pd_hybrid/pd_router.py: strip session_params from prefill
- src/agentic_pd_hybrid/metrics.py: truncated_request_count
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
111 lines
4.6 KiB
Python
111 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Convert sibench audit.jsonl to agentic-pd-hybrid trace format.
|
|
|
|
Source format (sibench audit.jsonl):
|
|
{"instance_id": "...", "ts": float, "messages": [...],
|
|
"audit": {"prompt_tokens": int, "completion_tokens": int, ...}}
|
|
|
|
Target format (agentic-pd-hybrid trace JSONL):
|
|
{"chat_id": int, "parent_chat_id": int, "timestamp": float,
|
|
"turn": int, "input_length": int, "output_length": int,
|
|
"type": str, "hash_ids": [int, ...]}
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
BLOCK_TOKEN_BUDGET = 24 # tokens per block, matching trace.py default
|
|
|
|
|
|
def convert(src: Path, dst: Path) -> None:
|
|
# Group lines by instance_id, preserving order within each instance
|
|
instances: dict[str, list[dict]] = defaultdict(list)
|
|
with src.open() as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
rec = json.loads(line)
|
|
instances[rec["instance_id"]].append(rec)
|
|
|
|
# Sort each instance's turns by timestamp
|
|
for iid in instances:
|
|
instances[iid].sort(key=lambda r: r["ts"])
|
|
|
|
# Assign stable chat_id bases: each instance gets a block of IDs
|
|
# Max turns across all instances determines the spacing
|
|
max_turns = max(len(turns) for turns in instances.values())
|
|
spacing = max_turns + 10 # extra headroom
|
|
|
|
total_written = 0
|
|
with dst.open("w") as out:
|
|
for inst_idx, (iid, turns) in enumerate(instances.items()):
|
|
base_chat_id = (inst_idx + 1) * spacing # start from spacing to avoid 0
|
|
# Track cumulative hash_ids for prefix cache simulation
|
|
cumulative_hash_ids: list[int] = []
|
|
global_block_counter = inst_idx * 100_000 # unique block namespace per instance
|
|
|
|
for turn_idx, rec in enumerate(turns):
|
|
audit = rec.get("audit", {})
|
|
input_length = audit.get("prompt_tokens", 0)
|
|
output_length = audit.get("completion_tokens", 0)
|
|
|
|
if input_length <= 0:
|
|
# Fallback: estimate from message content
|
|
total_chars = sum(len(m.get("content", "")) for m in rec.get("messages", []))
|
|
input_length = max(1, total_chars // 4)
|
|
if output_length <= 0:
|
|
output_length = 128 # reasonable default
|
|
|
|
chat_id = base_chat_id + turn_idx
|
|
if turn_idx == 0:
|
|
parent_chat_id = -1
|
|
else:
|
|
parent_chat_id = base_chat_id + turn_idx - 1
|
|
|
|
# Build hash_ids: for turn 0, generate blocks for full input
|
|
# For turn N>0, keep previous blocks and add new ones for the delta
|
|
if turn_idx == 0:
|
|
num_blocks = input_length // BLOCK_TOKEN_BUDGET
|
|
cumulative_hash_ids = list(
|
|
range(global_block_counter, global_block_counter + num_blocks)
|
|
)
|
|
global_block_counter += num_blocks
|
|
else:
|
|
# The new input is the full prompt (cumulative), so the delta
|
|
# is the new tokens beyond what was in the previous turn's prompt
|
|
prev_input = audit.get("prompt_tokens", 0)
|
|
prev_rec_audit = turns[turn_idx - 1].get("audit", {})
|
|
prev_input_length = prev_rec_audit.get("prompt_tokens", 0)
|
|
delta = max(0, prev_input - prev_input_length) if prev_input_length > 0 else 0
|
|
new_blocks = delta // BLOCK_TOKEN_BUDGET
|
|
new_ids = list(
|
|
range(global_block_counter, global_block_counter + new_blocks)
|
|
)
|
|
global_block_counter += new_blocks
|
|
cumulative_hash_ids = cumulative_hash_ids + new_ids
|
|
|
|
trace_line = {
|
|
"chat_id": chat_id,
|
|
"parent_chat_id": parent_chat_id,
|
|
"timestamp": rec["ts"],
|
|
"turn": turn_idx,
|
|
"input_length": input_length,
|
|
"output_length": output_length,
|
|
"type": "chat",
|
|
"hash_ids": cumulative_hash_ids,
|
|
}
|
|
out.write(json.dumps(trace_line, separators=(",", ":")) + "\n")
|
|
total_written += 1
|
|
|
|
print(f"Converted {total_written} lines from {len(instances)} instances -> {dst}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) != 3:
|
|
print(f"Usage: {sys.argv[0]} <input_audit.jsonl> <output_trace.jsonl>")
|
|
sys.exit(1)
|
|
convert(Path(sys.argv[1]), Path(sys.argv[2]))
|