802 lines
32 KiB
Python
802 lines
32 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
import time
|
|
from collections import Counter
|
|
from itertools import islice
|
|
from pathlib import Path
|
|
|
|
import psutil
|
|
from tqdm.auto import tqdm
|
|
|
|
from .layout import DETAILS_SUMMARY_FILENAME, preferred_details_dir, resolve_details_dir
|
|
|
|
|
|
PROGRESS_FLUSH_INTERVAL_S = 5.0
|
|
PROGRESS_REFRESH_INTERVAL_S = 0.5
|
|
PROGRESS_REFRESH_INTERVAL_REQ = 256
|
|
|
|
DEFAULT_INPUT_LENGTH_BUCKET_THRESHOLDS = [32 * 1024, 85 * 1024, 128 * 1024]
|
|
|
|
FIRST_SEEN_MS = 0
|
|
LAST_SEEN_MS = 1
|
|
LAST_REUSE_MS = 2
|
|
FIRST_REQUEST_ID = 3
|
|
LAST_REQUEST_ID = 4
|
|
LAST_REUSE_REQUEST_ID = 5
|
|
REUSE_COUNT = 6
|
|
|
|
|
|
def _format_bucket_boundary(value: int) -> str:
|
|
if value == 0:
|
|
return "0"
|
|
if value % (1024 * 1024) == 0:
|
|
return f"{value // (1024 * 1024)}Mi"
|
|
if value % 1024 == 0:
|
|
return f"{value // 1024}Ki"
|
|
return str(value)
|
|
|
|
|
|
def build_input_length_bucket_defs(thresholds=None):
|
|
parsed_thresholds = (
|
|
list(DEFAULT_INPUT_LENGTH_BUCKET_THRESHOLDS)
|
|
if thresholds is None
|
|
else sorted(set(int(value) for value in thresholds))
|
|
)
|
|
if not parsed_thresholds:
|
|
raise ValueError("At least one input-length bucket threshold is required.")
|
|
if any(value <= 0 for value in parsed_thresholds):
|
|
raise ValueError("Input-length bucket thresholds must be positive integers.")
|
|
if parsed_thresholds == DEFAULT_INPUT_LENGTH_BUCKET_THRESHOLDS:
|
|
return [
|
|
("0-32Ki", 0, 32 * 1024),
|
|
("32-85Ki", 32 * 1024, 85 * 1024),
|
|
("85-128Ki", 85 * 1024, 128 * 1024),
|
|
("128Ki+", 128 * 1024, None),
|
|
]
|
|
bucket_defs = []
|
|
lower_bound = 0
|
|
for upper_bound in parsed_thresholds:
|
|
bucket_defs.append(
|
|
(
|
|
f"{_format_bucket_boundary(lower_bound)}-{_format_bucket_boundary(upper_bound)}",
|
|
lower_bound,
|
|
upper_bound,
|
|
)
|
|
)
|
|
lower_bound = upper_bound
|
|
bucket_defs.append((f"{_format_bucket_boundary(lower_bound)}+", lower_bound, None))
|
|
return bucket_defs
|
|
|
|
|
|
def assign_input_length_bucket(input_tokens: int, bucket_defs=None) -> str:
|
|
bucket_defs = bucket_defs or build_input_length_bucket_defs()
|
|
for bucket_label, lower_bound, upper_bound in bucket_defs:
|
|
if input_tokens >= lower_bound and (upper_bound is None or input_tokens < upper_bound):
|
|
return bucket_label
|
|
return bucket_defs[-1][0]
|
|
|
|
|
|
def write_csv(path: Path, rows: list[dict], fieldnames: list[str] | None = None) -> Path:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
if fieldnames is None and rows:
|
|
fieldnames = list(rows[0].keys())
|
|
fieldnames = fieldnames or []
|
|
with path.open("w", encoding="utf-8", newline="") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=fieldnames)
|
|
if fieldnames:
|
|
writer.writeheader()
|
|
if rows:
|
|
writer.writerows(rows)
|
|
return path
|
|
|
|
|
|
def _estimate_peak_rss_mb(current_rss_mb, peak_rss_mb, fraction_done):
|
|
baseline = max(current_rss_mb, peak_rss_mb)
|
|
headroom = 1.0 + 0.25 * max(0.0, 1.0 - fraction_done)
|
|
return baseline * headroom
|
|
|
|
|
|
def _progress_postfix(process, peak_rss_mb, fraction_done, **extra):
|
|
current_rss_mb = process.memory_info().rss / (1024 * 1024)
|
|
peak_rss_mb = max(peak_rss_mb, current_rss_mb)
|
|
postfix = {
|
|
"rss_mb": f"{current_rss_mb:.0f}",
|
|
"est_peak_mb": f"{_estimate_peak_rss_mb(current_rss_mb, peak_rss_mb, fraction_done):.0f}",
|
|
}
|
|
postfix.update(extra)
|
|
return postfix, peak_rss_mb
|
|
|
|
|
|
def _format_duration(seconds):
|
|
if seconds is None or seconds < 0:
|
|
return "?"
|
|
if seconds < 60:
|
|
return f"{seconds:.0f}s"
|
|
if seconds < 3600:
|
|
return f"{seconds / 60:.1f}m"
|
|
return f"{seconds / 3600:.2f}h"
|
|
|
|
|
|
def _write_progress_state(
|
|
path,
|
|
*,
|
|
total_requests,
|
|
processed_requests,
|
|
started_at,
|
|
current_rss_mb,
|
|
peak_rss_mb,
|
|
est_peak_mb,
|
|
source_path,
|
|
features_path,
|
|
last_request_id,
|
|
block_state_count,
|
|
bucket_state_count,
|
|
):
|
|
elapsed_s = max(time.monotonic() - started_at, 1e-9)
|
|
req_per_s = processed_requests / elapsed_s
|
|
eta_s = ((total_requests - processed_requests) / req_per_s) if req_per_s > 0 and processed_requests < total_requests else 0.0
|
|
payload = {
|
|
"source_path": str(source_path),
|
|
"features_path": str(features_path),
|
|
"total_requests": total_requests,
|
|
"processed_requests": processed_requests,
|
|
"fraction_done": (processed_requests / total_requests) if total_requests else 1.0,
|
|
"elapsed_s": elapsed_s,
|
|
"req_per_s": req_per_s,
|
|
"eta_s": eta_s,
|
|
"eta_human": _format_duration(eta_s),
|
|
"rss_mb": current_rss_mb,
|
|
"peak_rss_mb": peak_rss_mb,
|
|
"est_peak_mb": est_peak_mb,
|
|
"block_state_count": block_state_count,
|
|
"bucket_state_count": bucket_state_count,
|
|
"last_request_id": last_request_id,
|
|
"updated_at_epoch_s": time.time(),
|
|
}
|
|
tmp_path = path.with_suffix(path.suffix + ".tmp")
|
|
tmp_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
tmp_path.replace(path)
|
|
|
|
|
|
def _count_lines(path):
|
|
with open(path, "r", encoding="utf-8") as handle:
|
|
return sum(1 for _ in handle)
|
|
|
|
|
|
def _count_feature_rows(path):
|
|
total_lines = _count_lines(path)
|
|
return max(total_lines - 1, 0)
|
|
|
|
|
|
class InMemoryBlockCache:
|
|
def __init__(self):
|
|
self.state = {}
|
|
|
|
def get(self, block_id):
|
|
return self.state.get(block_id)
|
|
|
|
def put(self, block_id, meta):
|
|
self.state[block_id] = meta
|
|
|
|
def iter_blocks(self):
|
|
for block_id, meta in self.state.items():
|
|
yield (
|
|
block_id,
|
|
meta[FIRST_SEEN_MS],
|
|
meta[LAST_SEEN_MS],
|
|
meta[LAST_REUSE_MS],
|
|
meta[FIRST_REQUEST_ID],
|
|
meta[LAST_REQUEST_ID],
|
|
meta[LAST_REUSE_REQUEST_ID],
|
|
meta[REUSE_COUNT],
|
|
)
|
|
|
|
def __len__(self):
|
|
return len(self.state)
|
|
|
|
|
|
def _normalize_source_row(row):
|
|
meta = row.get("meta", {}) if isinstance(row.get("meta", {}), dict) else {}
|
|
declared_tools = row.get("declared_tools", [])
|
|
raw_messages = row.get("raw_messages", [])
|
|
return {
|
|
"meta": meta,
|
|
"declared_tools": [tool for tool in declared_tools if isinstance(tool, dict)],
|
|
"raw_messages": [message for message in raw_messages if isinstance(message, dict)],
|
|
}
|
|
|
|
|
|
def _read_source_minimal(path):
|
|
with open(path, "r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
row = _normalize_source_row(json.loads(line))
|
|
meta = row["meta"]
|
|
yield {
|
|
"request_id": meta["request_id"],
|
|
"session_id": meta["session_id"],
|
|
"request_ready_time_ms": meta["request_ready_time_ms"],
|
|
"request_end_time_ms": meta["request_end_time_ms"],
|
|
"declared_tool_names": [
|
|
tool["name"] for tool in row.get("declared_tools", []) if tool.get("name")
|
|
],
|
|
"raw_messages": row["raw_messages"],
|
|
}
|
|
|
|
|
|
def _count_child_refs_by_chat_id(path, limit=None):
|
|
counts = Counter()
|
|
for index, row in enumerate(_iter_release_rows(path), start=1):
|
|
if limit is not None and index > limit:
|
|
break
|
|
parent_chat_id = int(row.get("parent_chat_id", -1) or -1)
|
|
if parent_chat_id != -1:
|
|
counts[parent_chat_id] += 1
|
|
return counts
|
|
|
|
|
|
def _new_block_meta(request_id, ready_ms):
|
|
return [ready_ms, ready_ms, 0, request_id, request_id, "", 0]
|
|
|
|
|
|
def _build_alive_block_timeline_from_events(events):
|
|
alive_rows = []
|
|
alive_count = 0
|
|
peak_alive_blocks = 0
|
|
for timestamp_ms in sorted(events):
|
|
alive_count += events[timestamp_ms]
|
|
peak_alive_blocks = max(peak_alive_blocks, alive_count)
|
|
alive_rows.append(
|
|
{
|
|
"timestamp_ms": timestamp_ms,
|
|
"delta_alive_blocks": events[timestamp_ms],
|
|
"alive_block_count": alive_count,
|
|
}
|
|
)
|
|
return {
|
|
"peak_alive_blocks": peak_alive_blocks,
|
|
"event_count": len(alive_rows),
|
|
}, alive_rows
|
|
|
|
|
|
def _compute_prefix_hits(
|
|
global_store,
|
|
bucket_store,
|
|
*,
|
|
hash_ids,
|
|
request_id,
|
|
ready_ms,
|
|
reuse_gap_counts=None,
|
|
):
|
|
global_prefix_active = True
|
|
bucket_prefix_active = True
|
|
global_prefix_match_blocks = 0
|
|
bucket_prefix_match_blocks = 0
|
|
global_source_request_id = ""
|
|
bucket_source_request_id = ""
|
|
|
|
for block_id in hash_ids:
|
|
global_meta = global_store.get(block_id)
|
|
if global_meta is not None and global_prefix_active:
|
|
global_prefix_match_blocks += 1
|
|
global_source_request_id = global_meta[LAST_REQUEST_ID]
|
|
if reuse_gap_counts is not None:
|
|
reuse_gap_counts[max(ready_ms - global_meta[LAST_SEEN_MS], 0)] += 1
|
|
global_meta[LAST_REUSE_MS] = ready_ms
|
|
global_meta[LAST_REUSE_REQUEST_ID] = request_id
|
|
global_meta[REUSE_COUNT] += 1
|
|
elif global_meta is None:
|
|
global_prefix_active = False
|
|
global_meta = _new_block_meta(request_id, ready_ms)
|
|
else:
|
|
global_prefix_active = False
|
|
|
|
global_meta[LAST_SEEN_MS] = ready_ms
|
|
global_meta[LAST_REQUEST_ID] = request_id
|
|
global_store.put(block_id, global_meta)
|
|
|
|
bucket_meta = bucket_store.get(block_id)
|
|
if bucket_meta is not None and bucket_prefix_active:
|
|
bucket_prefix_match_blocks += 1
|
|
bucket_source_request_id = bucket_meta[LAST_REQUEST_ID]
|
|
bucket_meta[LAST_REUSE_MS] = ready_ms
|
|
bucket_meta[LAST_REUSE_REQUEST_ID] = request_id
|
|
bucket_meta[REUSE_COUNT] += 1
|
|
elif bucket_meta is None:
|
|
bucket_prefix_active = False
|
|
bucket_meta = _new_block_meta(request_id, ready_ms)
|
|
else:
|
|
bucket_prefix_active = False
|
|
|
|
bucket_meta[LAST_SEEN_MS] = ready_ms
|
|
bucket_meta[LAST_REQUEST_ID] = request_id
|
|
bucket_store.put(block_id, bucket_meta)
|
|
|
|
return (
|
|
global_prefix_match_blocks,
|
|
global_source_request_id,
|
|
bucket_prefix_match_blocks,
|
|
bucket_source_request_id,
|
|
)
|
|
|
|
|
|
def _iter_release_rows(path):
|
|
with open(path, "r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
row = json.loads(line)
|
|
yield {
|
|
"chat_id": int(row.get("chat_id", -1) or -1),
|
|
"parent_chat_id": int(row.get("parent_chat_id", -1) or -1),
|
|
"timestamp": row.get("timestamp"),
|
|
"turn": int(row.get("turn", 0) or 0),
|
|
"type": row.get("type", ""),
|
|
"input_length": int(row.get("input_length", 0) or 0),
|
|
"output_length": int(row.get("output_length", 0) or 0),
|
|
"hash_ids": [int(value) for value in row.get("hash_ids", [])],
|
|
}
|
|
|
|
|
|
def _message_signature(message: dict) -> str:
|
|
return str(message.get("role", ""))
|
|
|
|
|
|
def _common_prefix_message_count(previous_messages, current_messages):
|
|
count = 0
|
|
for previous, current in zip(previous_messages, current_messages):
|
|
if _message_signature(previous) != _message_signature(current):
|
|
break
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def _classify_trigger(previous_messages, current_messages):
|
|
common_prefix_count = _common_prefix_message_count(previous_messages, current_messages)
|
|
appended_messages = current_messages[common_prefix_count:]
|
|
appended_message_count = len(appended_messages)
|
|
last_role = str(current_messages[-1].get("role", "unknown")) if current_messages else "unknown"
|
|
trigger_group = last_role
|
|
trigger_detail = f"last_message_role={last_role}"
|
|
|
|
return {
|
|
"common_prefix_message_count": common_prefix_count,
|
|
"appended_message_count": appended_message_count,
|
|
"first_new_role": str(appended_messages[0].get("role", "unknown")) if appended_messages else "",
|
|
"trigger_group": trigger_group,
|
|
"trigger_detail": trigger_detail,
|
|
}
|
|
|
|
|
|
def _bucket_definition_rows(bucket_defs):
|
|
rows = []
|
|
for bucket, lower_bound, upper_bound in bucket_defs:
|
|
rows.append(
|
|
{
|
|
"bucket": bucket,
|
|
"input_tokens_min_inclusive": lower_bound,
|
|
"input_tokens_max_exclusive": upper_bound,
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def _clear_details_dir(details_dir: Path) -> None:
|
|
details_dir.mkdir(parents=True, exist_ok=True)
|
|
for path in details_dir.iterdir():
|
|
if path.is_file():
|
|
path.unlink()
|
|
|
|
|
|
def collect_existing_detail_paths(output_dir):
|
|
details_dir = resolve_details_dir(output_dir)
|
|
return {
|
|
"details_dir": details_dir,
|
|
"progress": details_dir / "progress.json",
|
|
"request_metrics": details_dir / "request_metrics.csv",
|
|
"theoretical_block_reuse_gaps": details_dir / "theoretical_block_reuse_gaps.csv",
|
|
"theoretical_block_lifetimes": details_dir / "theoretical_block_lifetimes.csv",
|
|
"theoretical_alive_block_timeline": details_dir / "theoretical_alive_block_timeline.csv",
|
|
"session_bucket_boundary_miss": details_dir / "session_bucket_boundary_miss.csv",
|
|
"details_summary": details_dir / DETAILS_SUMMARY_FILENAME,
|
|
}
|
|
|
|
|
|
def run_advanced_from_existing(
|
|
source_path,
|
|
release_path,
|
|
features_path,
|
|
output_dir,
|
|
input_length_bucket_thresholds=None,
|
|
show_progress=True,
|
|
limit=None,
|
|
):
|
|
output_dir = Path(output_dir)
|
|
details_dir = preferred_details_dir(output_dir)
|
|
_clear_details_dir(details_dir)
|
|
|
|
source_path = Path(source_path)
|
|
release_path = Path(release_path)
|
|
features_path = Path(features_path)
|
|
|
|
total_requests = limit if limit is not None else _count_feature_rows(features_path)
|
|
release_request_count = _count_lines(release_path)
|
|
if limit is None and release_request_count != total_requests:
|
|
raise ValueError(
|
|
f"release/features row count mismatch: release={release_request_count} vs features={total_requests}"
|
|
)
|
|
|
|
process = psutil.Process()
|
|
peak_rss_mb = 0.0
|
|
started_at = time.monotonic()
|
|
bucket_defs = build_input_length_bucket_defs(input_length_bucket_thresholds)
|
|
child_ref_counts = _count_child_refs_by_chat_id(release_path, limit=limit)
|
|
|
|
store = InMemoryBlockCache()
|
|
bucket_stores = {bucket_label: InMemoryBlockCache() for bucket_label, _, _ in bucket_defs}
|
|
progress_state_path = details_dir / "progress.json"
|
|
next_progress_flush_at = started_at + PROGRESS_FLUSH_INTERVAL_S
|
|
|
|
request_metrics_path = details_dir / "request_metrics.csv"
|
|
processed_requests = 0
|
|
last_request_id = ""
|
|
reuse_gap_counts = Counter()
|
|
bucket_reused_block_totals = Counter()
|
|
total_prompt_blocks = 0
|
|
total_global_reused_blocks = 0
|
|
session_last = {}
|
|
chat_state_for_children = {}
|
|
session_bucket_totals = {
|
|
bucket_label: {
|
|
"edge_count": 0,
|
|
"reusable_edge_count": 0,
|
|
"cross_bucket_edge_count": 0,
|
|
"shared_prefix_units_sum": 0,
|
|
"cross_bucket_shared_prefix_units_sum": 0,
|
|
}
|
|
for bucket_label, _, _ in bucket_defs
|
|
}
|
|
|
|
with request_metrics_path.open("w", encoding="utf-8", newline="") as request_metrics_handle, features_path.open(
|
|
"r", encoding="utf-8"
|
|
) as features_handle:
|
|
feature_reader = csv.DictReader(features_handle)
|
|
source_iter = _read_source_minimal(source_path)
|
|
release_iter = _iter_release_rows(release_path)
|
|
if limit is not None:
|
|
feature_reader = islice(feature_reader, limit)
|
|
source_iter = islice(source_iter, limit)
|
|
release_iter = islice(release_iter, limit)
|
|
|
|
request_metrics_writer = None
|
|
progress = tqdm(
|
|
total=total_requests,
|
|
desc="Build details",
|
|
unit="req",
|
|
dynamic_ncols=True,
|
|
disable=not show_progress,
|
|
)
|
|
last_progress_refresh_at = started_at
|
|
try:
|
|
for source_row, feature_row, release_row in zip(source_iter, feature_reader, release_iter):
|
|
request_id = source_row["request_id"]
|
|
session_id = source_row["session_id"]
|
|
ready_ms = int(source_row["request_ready_time_ms"])
|
|
end_ms = int(source_row["request_end_time_ms"])
|
|
tool_names = source_row["declared_tool_names"]
|
|
raw_messages = source_row["raw_messages"]
|
|
hash_ids = release_row["hash_ids"]
|
|
|
|
release_input_length = int(release_row["input_length"])
|
|
release_output_length = int(release_row["output_length"])
|
|
feature_input_tokens = int(feature_row["input_tokens"])
|
|
feature_output_tokens = int(feature_row["output_tokens"])
|
|
if feature_input_tokens != release_input_length:
|
|
raise ValueError(
|
|
f"release/raw mismatch at request {request_id}: "
|
|
f"features.input_tokens={feature_row['input_tokens']} vs release.input_length={release_input_length}"
|
|
)
|
|
if feature_output_tokens != release_output_length:
|
|
raise ValueError(
|
|
f"release/raw mismatch at request {request_id}: "
|
|
f"features.output_tokens={feature_row['output_tokens']} vs release.output_length={release_output_length}"
|
|
)
|
|
|
|
input_tokens = feature_input_tokens
|
|
bucket_label = assign_input_length_bucket(input_tokens, bucket_defs)
|
|
bucket_store = bucket_stores[bucket_label]
|
|
(
|
|
prefix_match_blocks,
|
|
global_source_request_id,
|
|
bucketed_prefix_match_blocks,
|
|
bucketed_source_request_id,
|
|
) = _compute_prefix_hits(
|
|
store,
|
|
bucket_store,
|
|
hash_ids=hash_ids,
|
|
request_id=request_id,
|
|
ready_ms=ready_ms,
|
|
reuse_gap_counts=reuse_gap_counts,
|
|
)
|
|
|
|
prompt_block_count = len(hash_ids)
|
|
theoretical_prefix_hit_ratio = prefix_match_blocks / prompt_block_count if prompt_block_count else 0.0
|
|
bucketed_theoretical_prefix_hit_ratio = (
|
|
bucketed_prefix_match_blocks / prompt_block_count if prompt_block_count else 0.0
|
|
)
|
|
|
|
previous_session_state = session_last.get(session_id)
|
|
trigger = _classify_trigger(
|
|
previous_session_state["raw_messages"] if previous_session_state is not None else [],
|
|
raw_messages,
|
|
)
|
|
|
|
feature_row["request_ready_time_ms"] = ready_ms
|
|
feature_row["request_end_time_ms"] = end_ms
|
|
feature_row["turn"] = release_row["turn"]
|
|
feature_row["chat_id"] = release_row["chat_id"]
|
|
feature_row["parent_chat_id"] = release_row["parent_chat_id"]
|
|
feature_row["trigger_group"] = trigger["trigger_group"]
|
|
feature_row["trigger_detail"] = trigger["trigger_detail"]
|
|
feature_row["first_new_role"] = trigger["first_new_role"]
|
|
feature_row["common_prefix_message_count"] = trigger["common_prefix_message_count"]
|
|
feature_row["appended_message_count"] = trigger["appended_message_count"]
|
|
feature_row["input_length_bucket"] = bucket_label
|
|
feature_row["declared_tool_names"] = ";".join(tool_names)
|
|
feature_row["theoretical_prompt_unit_length"] = prompt_block_count
|
|
feature_row["theoretical_prefix_hit_blocks"] = prefix_match_blocks
|
|
feature_row["theoretical_prefix_hit_ratio"] = theoretical_prefix_hit_ratio
|
|
feature_row["theoretical_source_request_id"] = global_source_request_id
|
|
feature_row["bucketed_theoretical_prefix_hit_blocks"] = bucketed_prefix_match_blocks
|
|
feature_row["bucketed_theoretical_prefix_hit_ratio"] = bucketed_theoretical_prefix_hit_ratio
|
|
feature_row["bucketed_theoretical_source_request_id"] = bucketed_source_request_id
|
|
feature_row["theoretical_bucket_boundary_loss_blocks"] = max(
|
|
prefix_match_blocks - bucketed_prefix_match_blocks,
|
|
0,
|
|
)
|
|
feature_row["theoretical_bucket_boundary_loss_ratio"] = (
|
|
feature_row["theoretical_bucket_boundary_loss_blocks"] / prompt_block_count
|
|
if prompt_block_count
|
|
else 0.0
|
|
)
|
|
|
|
if request_metrics_writer is None:
|
|
request_metrics_writer = csv.DictWriter(
|
|
request_metrics_handle,
|
|
fieldnames=list(feature_row.keys()),
|
|
)
|
|
request_metrics_writer.writeheader()
|
|
request_metrics_writer.writerow(feature_row)
|
|
|
|
chat_id = release_row["chat_id"]
|
|
parent_chat_id = release_row["parent_chat_id"]
|
|
if parent_chat_id != -1:
|
|
parent_state = chat_state_for_children.get(parent_chat_id)
|
|
if parent_state is not None:
|
|
shared_prefix_units = 0
|
|
for parent_block_id, child_block_id in zip(parent_state["hash_ids"], hash_ids):
|
|
if parent_block_id != child_block_id:
|
|
break
|
|
shared_prefix_units += 1
|
|
bucket_totals = session_bucket_totals[bucket_label]
|
|
bucket_totals["edge_count"] += 1
|
|
if shared_prefix_units > 0:
|
|
bucket_totals["reusable_edge_count"] += 1
|
|
if parent_state["bucket_label"] != bucket_label:
|
|
bucket_totals["cross_bucket_edge_count"] += 1
|
|
bucket_totals["cross_bucket_shared_prefix_units_sum"] += shared_prefix_units
|
|
bucket_totals["shared_prefix_units_sum"] += shared_prefix_units
|
|
|
|
remaining_children = child_ref_counts.get(parent_chat_id, 0) - 1
|
|
if remaining_children > 0:
|
|
child_ref_counts[parent_chat_id] = remaining_children
|
|
else:
|
|
child_ref_counts.pop(parent_chat_id, None)
|
|
chat_state_for_children.pop(parent_chat_id, None)
|
|
|
|
if chat_id != -1 and child_ref_counts.get(chat_id, 0) > 0:
|
|
chat_state_for_children[chat_id] = {
|
|
"bucket_label": bucket_label,
|
|
"hash_ids": hash_ids,
|
|
}
|
|
|
|
total_prompt_blocks += prompt_block_count
|
|
total_global_reused_blocks += prefix_match_blocks
|
|
bucket_reused_block_totals[bucket_label] += bucketed_prefix_match_blocks
|
|
|
|
session_last[session_id] = {
|
|
"request_id": request_id,
|
|
"request_ready_time_ms": ready_ms,
|
|
"request_end_time_ms": end_ms,
|
|
"raw_messages": raw_messages,
|
|
}
|
|
|
|
processed_requests += 1
|
|
last_request_id = request_id
|
|
progress.update(1)
|
|
|
|
now = time.monotonic()
|
|
should_refresh_progress = (
|
|
processed_requests == 1
|
|
or processed_requests % PROGRESS_REFRESH_INTERVAL_REQ == 0
|
|
or now - last_progress_refresh_at >= PROGRESS_REFRESH_INTERVAL_S
|
|
or processed_requests == total_requests
|
|
)
|
|
if should_refresh_progress:
|
|
fraction_done = progress.n / progress.total if progress.total else 0.0
|
|
elapsed_s = max(now - started_at, 1e-9)
|
|
req_per_s = progress.n / elapsed_s
|
|
eta_s = ((progress.total - progress.n) / req_per_s) if req_per_s > 0 and progress.total else 0.0
|
|
total_bucket_state_count = sum(len(each_store) for each_store in bucket_stores.values())
|
|
postfix, peak_rss_mb = _progress_postfix(
|
|
process,
|
|
peak_rss_mb,
|
|
fraction_done,
|
|
req_s=f"{req_per_s:.1f}",
|
|
eta=_format_duration(eta_s),
|
|
blocks=len(store),
|
|
bucket_blocks=total_bucket_state_count,
|
|
sessions=len(session_last),
|
|
)
|
|
progress.set_postfix(postfix)
|
|
last_progress_refresh_at = now
|
|
|
|
if processed_requests and now >= next_progress_flush_at:
|
|
current_rss_mb = process.memory_info().rss / (1024 * 1024)
|
|
peak_rss_mb = max(peak_rss_mb, current_rss_mb)
|
|
est_peak_mb = _estimate_peak_rss_mb(
|
|
current_rss_mb,
|
|
peak_rss_mb,
|
|
(processed_requests / total_requests) if total_requests else 1.0,
|
|
)
|
|
_write_progress_state(
|
|
progress_state_path,
|
|
total_requests=total_requests,
|
|
processed_requests=processed_requests,
|
|
started_at=started_at,
|
|
current_rss_mb=current_rss_mb,
|
|
peak_rss_mb=peak_rss_mb,
|
|
est_peak_mb=est_peak_mb,
|
|
source_path=f"{source_path} + {release_path}",
|
|
features_path=features_path,
|
|
last_request_id=last_request_id,
|
|
block_state_count=len(store),
|
|
bucket_state_count=total_bucket_state_count,
|
|
)
|
|
next_progress_flush_at = now + PROGRESS_FLUSH_INTERVAL_S
|
|
finally:
|
|
progress.close()
|
|
|
|
theoretical_block_reuse_gaps_path = details_dir / "theoretical_block_reuse_gaps.csv"
|
|
write_csv(
|
|
theoretical_block_reuse_gaps_path,
|
|
[
|
|
{"reuse_gap_ms": reuse_gap_ms, "count": count}
|
|
for reuse_gap_ms, count in sorted(reuse_gap_counts.items())
|
|
],
|
|
fieldnames=["reuse_gap_ms", "count"],
|
|
)
|
|
|
|
theoretical_block_lifetimes_path = details_dir / "theoretical_block_lifetimes.csv"
|
|
alive_block_events = Counter()
|
|
block_lifetime_rows = []
|
|
for (
|
|
block_hash,
|
|
first_seen_ms,
|
|
last_seen_ms,
|
|
last_reuse_ms,
|
|
first_request_id,
|
|
last_request_id_for_block,
|
|
last_reuse_request_id,
|
|
reuse_count,
|
|
) in store.iter_blocks():
|
|
lifecycle_end_ms = last_reuse_ms if reuse_count > 0 else first_seen_ms
|
|
lifetime_ms = max(lifecycle_end_ms - first_seen_ms, 0)
|
|
block_lifetime_rows.append(
|
|
{
|
|
"hash": block_hash,
|
|
"first_request_id": first_request_id,
|
|
"last_request_id": last_request_id_for_block,
|
|
"first_seen_ms": first_seen_ms,
|
|
"last_seen_ms": last_seen_ms,
|
|
"last_reuse_ms": last_reuse_ms,
|
|
"last_reuse_request_id": last_reuse_request_id,
|
|
"reuse_count": reuse_count,
|
|
"lifetime_ms": lifetime_ms,
|
|
"span_end_ms": lifecycle_end_ms,
|
|
"reused": 1 if reuse_count > 0 else 0,
|
|
}
|
|
)
|
|
alive_block_events[first_seen_ms] += 1
|
|
alive_block_events[lifecycle_end_ms + 1] -= 1
|
|
write_csv(theoretical_block_lifetimes_path, block_lifetime_rows)
|
|
|
|
alive_block_timeline_summary, alive_block_timeline_rows = _build_alive_block_timeline_from_events(alive_block_events)
|
|
theoretical_alive_block_timeline_path = details_dir / "theoretical_alive_block_timeline.csv"
|
|
write_csv(theoretical_alive_block_timeline_path, alive_block_timeline_rows)
|
|
|
|
session_bucket_boundary_rows = []
|
|
for bucket_label, _, _ in bucket_defs:
|
|
bucket_totals = session_bucket_totals[bucket_label]
|
|
total_bucket_reused_blocks = bucket_reused_block_totals[bucket_label]
|
|
session_bucket_boundary_rows.append(
|
|
{
|
|
"bucket": bucket_label,
|
|
"edge_count": bucket_totals["edge_count"],
|
|
"reusable_edge_count": bucket_totals["reusable_edge_count"],
|
|
"cross_bucket_edge_count": bucket_totals["cross_bucket_edge_count"],
|
|
"cross_bucket_edge_fraction": (
|
|
bucket_totals["cross_bucket_edge_count"] / bucket_totals["edge_count"]
|
|
if bucket_totals["edge_count"]
|
|
else 0.0
|
|
),
|
|
"shared_prefix_units_sum": bucket_totals["shared_prefix_units_sum"],
|
|
"cross_bucket_shared_prefix_units_sum": bucket_totals["cross_bucket_shared_prefix_units_sum"],
|
|
"cross_bucket_shared_prefix_unit_fraction": (
|
|
bucket_totals["cross_bucket_shared_prefix_units_sum"] / bucket_totals["shared_prefix_units_sum"]
|
|
if bucket_totals["shared_prefix_units_sum"]
|
|
else 0.0
|
|
),
|
|
"bucket_total_reused_blocks": total_bucket_reused_blocks,
|
|
"reduced_reused_blocks_ratio": (
|
|
bucket_totals["cross_bucket_shared_prefix_units_sum"] / total_bucket_reused_blocks
|
|
if total_bucket_reused_blocks
|
|
else 0.0
|
|
),
|
|
}
|
|
)
|
|
session_bucket_boundary_miss_path = details_dir / "session_bucket_boundary_miss.csv"
|
|
write_csv(session_bucket_boundary_miss_path, session_bucket_boundary_rows)
|
|
|
|
details_summary_path = details_dir / DETAILS_SUMMARY_FILENAME
|
|
details_summary = {
|
|
"schema_version": 3,
|
|
"request_count": total_requests,
|
|
"figure_count": 13,
|
|
"cache_analysis_mode": "release_hash_ids",
|
|
"release_path": str(release_path),
|
|
"bucket_definition": {"buckets": _bucket_definition_rows(bucket_defs)},
|
|
"global_prompt_blocks": total_prompt_blocks,
|
|
"global_reused_blocks": total_global_reused_blocks,
|
|
"global_reuse_ratio": (total_global_reused_blocks / total_prompt_blocks) if total_prompt_blocks else 0.0,
|
|
"alive_block_timeline_summary": alive_block_timeline_summary,
|
|
"detail_files": [
|
|
"request_metrics.csv",
|
|
"theoretical_block_reuse_gaps.csv",
|
|
"theoretical_block_lifetimes.csv",
|
|
"theoretical_alive_block_timeline.csv",
|
|
"session_bucket_boundary_miss.csv",
|
|
DETAILS_SUMMARY_FILENAME,
|
|
"progress.json",
|
|
],
|
|
}
|
|
details_summary_path.write_text(json.dumps(details_summary, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
current_rss_mb = process.memory_info().rss / (1024 * 1024)
|
|
peak_rss_mb = max(peak_rss_mb, current_rss_mb)
|
|
est_peak_mb = _estimate_peak_rss_mb(current_rss_mb, peak_rss_mb, 1.0)
|
|
_write_progress_state(
|
|
progress_state_path,
|
|
total_requests=total_requests,
|
|
processed_requests=processed_requests,
|
|
started_at=started_at,
|
|
current_rss_mb=current_rss_mb,
|
|
peak_rss_mb=peak_rss_mb,
|
|
est_peak_mb=est_peak_mb,
|
|
source_path=f"{source_path} + {release_path}",
|
|
features_path=features_path,
|
|
last_request_id=last_request_id,
|
|
block_state_count=len(store),
|
|
bucket_state_count=sum(len(bucket_store) for bucket_store in bucket_stores.values()),
|
|
)
|
|
|
|
return {
|
|
"details_dir": details_dir,
|
|
"progress": progress_state_path,
|
|
"request_metrics": request_metrics_path,
|
|
"theoretical_block_reuse_gaps": theoretical_block_reuse_gaps_path,
|
|
"theoretical_block_lifetimes": theoretical_block_lifetimes_path,
|
|
"theoretical_alive_block_timeline": theoretical_alive_block_timeline_path,
|
|
"session_bucket_boundary_miss": session_bucket_boundary_miss_path,
|
|
"details_summary": details_summary_path,
|
|
}
|