profile(kvc): add D KV pool timeseries poller + analyzer for v6 root-cause

v5 dropped errors but pushed session-cap fallback to 46-51%. Before adding
v6 mitigations we need to attribute that capacity loss to one of:
  (a) active sessions — real footprint
  (b) idle-evictable sessions — LRU not aggressive enough
  (c) prefill backup blocks / in-flight / fragmentation — release timing

Without this it's all guessing. Plumb a 1Hz poller into replay that hits
each P/D worker's /server_info, captures session_cache + memory_usage, and
writes a per-worker time-series JSONL to <run_dir>/d-pool-timeseries.jsonl.
Off by default (--pool-poll-interval-s 0); v5+profile sweep enables it at
1.0s. Per-tick HTTP cost is ~8 parallel /server_info calls — negligible
relative to the 50min run.

Analyzer (scripts/analysis/analyze_pool_timeseries.py) decomposes each D's
capacity into active_held / idle_evictable / other (= cap-held-avail, the
backup-blocks bucket) / free, and reports session residency churn across
workers as a starvation/thrashing signal.

Mock-tested poller end-to-end (cancellation clean, file flushed, sessions
captured); analyzer validated against synthetic timeseries.

Next: run scripts/sweep_tp1_v5_optD_profile.sh on hardware (~90min), then
analyze results to pick a v6 direction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
kzlin
2026-04-29 20:04:21 +08:00
parent 6572d7f3f4
commit 51f5386691
5 changed files with 587 additions and 0 deletions

View File

@@ -0,0 +1,275 @@
#!/usr/bin/env python3
"""Analyze d-pool-timeseries.jsonl produced by --pool-poll-interval-s.
Answers v6's main question: where is D's KV pool actually spent?
For each decode worker, decomposes capacity over the run wall-clock into:
- resident_held_active = held - idle_evictable (sessions in active use)
- resident_held_idle = idle_evictable (sessions kept around but evictable)
- prefill_backup_or_other = capacity - held - available (everything else: backup blocks,
in-flight transfers, fragmentation)
- free_available = available
Also reports session residency churn (how many distinct sessions ever resided per D, and
how often a session bounced between workers — a strong starvation signal).
Usage:
python scripts/analysis/analyze_pool_timeseries.py <run_dir>
or
python scripts/analysis/analyze_pool_timeseries.py <pool_timeseries.jsonl>
Output: human-readable text. Add --json to also print a machine-readable summary.
"""
from __future__ import annotations
import argparse
import json
import statistics
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any
def _load_jsonl(path: Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
with path.open() as fh:
for line in fh:
line = line.strip()
if not line:
continue
rows.append(json.loads(line))
return rows
def _resolve_input(path: Path) -> Path:
if path.is_file():
return path
if path.is_dir():
candidate = path / "d-pool-timeseries.jsonl"
if candidate.is_file():
return candidate
raise FileNotFoundError(
f"{candidate} not found; pass the file directly or a run dir containing it."
)
raise FileNotFoundError(path)
def _percentile(values: list[float], p: float) -> float:
if not values:
return 0.0
s = sorted(values)
idx = min(len(s) - 1, max(0, int(round((len(s) - 1) * p))))
return s[idx]
def _fmt_tokens(n: float) -> str:
if n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}K"
return f"{int(n)}"
def _fmt_pct(n: float, total: float) -> str:
if total <= 0:
return " - "
return f"{100 * n / total:5.1f}%"
def analyze(timeseries_path: Path) -> dict[str, Any]:
rows = _load_jsonl(timeseries_path)
if not rows:
raise ValueError(f"empty timeseries: {timeseries_path}")
by_worker: dict[str, list[dict[str, Any]]] = defaultdict(list)
for row in rows:
if row.get("error") and "session_cache_enabled" not in row:
# poller failed at this tick — skip
continue
wid = row.get("worker_id") or "?"
by_worker[wid].append(row)
summary: dict[str, Any] = {
"timeseries_path": str(timeseries_path),
"total_rows": len(rows),
"tick_count": len(by_worker[next(iter(by_worker))]) if by_worker else 0,
"wall_s_span": (
max(r.get("wall_s", 0.0) for r in rows)
- min(r.get("wall_s", 0.0) for r in rows)
),
"workers": {},
}
print(f"\n=== Pool timeseries: {timeseries_path}")
print(
f" rows={summary['total_rows']} workers={len(by_worker)} "
f"span={summary['wall_s_span']:.1f}s"
)
# Print per-worker decomposition table
header = (
f"{'worker':<12} {'role':<8} {'cap':>8} | "
f"{'avg_active':>10} {'avg_idle':>10} {'avg_other':>10} {'avg_free':>10} | "
f"{'p90_held':>10} {'max_held':>10} {'p90_avail':>10}"
)
print(header)
print("-" * len(header))
for wid in sorted(by_worker.keys()):
ws = by_worker[wid]
role = ws[0].get("worker_role", "?")
cap_vals = [int(r.get("capacity_tokens") or 0) for r in ws]
held_vals = [int(r.get("held_tokens") or 0) for r in ws]
avail_vals = [int(r.get("available_tokens") or 0) for r in ws]
idle_vals = [int(r.get("idle_evictable_tokens") or 0) for r in ws]
# active = held - idle (sessions in active use)
active_vals = [max(0, h - i) for h, i in zip(held_vals, idle_vals)]
# other = capacity - held - available (prefill backup blocks, in-flight, fragmentation)
other_vals = [
max(0, c - h - a) for c, h, a in zip(cap_vals, held_vals, avail_vals)
]
cap = max(cap_vals) if cap_vals else 0
avg_active = statistics.fmean(active_vals) if active_vals else 0.0
avg_idle = statistics.fmean(idle_vals) if idle_vals else 0.0
avg_other = statistics.fmean(other_vals) if other_vals else 0.0
avg_avail = statistics.fmean(avail_vals) if avail_vals else 0.0
p90_held = _percentile([float(v) for v in held_vals], 0.90)
max_held = max(held_vals) if held_vals else 0
p90_avail = _percentile([float(v) for v in avail_vals], 0.90)
sess_counts = [int(r.get("session_count") or 0) for r in ws]
resident_counts = [int(r.get("resident_session_count") or 0) for r in ws]
print(
f"{wid:<12} {role:<8} {_fmt_tokens(cap):>8} | "
f"{_fmt_tokens(avg_active):>4} {_fmt_pct(avg_active, cap):>5} "
f"{_fmt_tokens(avg_idle):>4} {_fmt_pct(avg_idle, cap):>5} "
f"{_fmt_tokens(avg_other):>4} {_fmt_pct(avg_other, cap):>5} "
f"{_fmt_tokens(avg_avail):>4} {_fmt_pct(avg_avail, cap):>5} | "
f"{_fmt_tokens(p90_held):>10} {_fmt_tokens(max_held):>10} "
f"{_fmt_tokens(p90_avail):>10}"
)
summary["workers"][wid] = {
"role": role,
"capacity_tokens": cap,
"avg_active_held_tokens": avg_active,
"avg_idle_evictable_tokens": avg_idle,
"avg_other_tokens": avg_other,
"avg_available_tokens": avg_avail,
"p90_held_tokens": p90_held,
"max_held_tokens": max_held,
"p90_available_tokens": p90_avail,
"max_session_count": max(sess_counts) if sess_counts else 0,
"max_resident_session_count": (
max(resident_counts) if resident_counts else 0
),
"ticks": len(ws),
}
print(
"\nLegend: active=held-idle idle=idle_evictable "
"other=cap-held-avail (prefill backup, in-flight, fragmentation)"
)
# Session residency churn: how many distinct sessions ever sat on each worker,
# and how many sessions hopped across workers (= starvation indicator).
print("\n=== Session residency churn ===")
sessions_per_worker: dict[str, set[str]] = defaultdict(set)
workers_per_session: dict[str, set[str]] = defaultdict(set)
resident_ticks_per_session: Counter[str] = Counter()
resident_ticks_per_worker: Counter[str] = Counter()
for row in rows:
wid = row.get("worker_id")
if wid is None or row.get("worker_role") != "decode":
continue
sessions = row.get("sessions") or []
if not isinstance(sessions, list):
continue
for entry in sessions:
if not isinstance(entry, dict):
continue
sid = entry.get("session_id")
if sid is None:
continue
if entry.get("resident"):
sessions_per_worker[wid].add(sid)
workers_per_session[sid].add(wid)
resident_ticks_per_session[(wid, sid)] += 1
resident_ticks_per_worker[wid] += 1
# Per-decode worker: distinct session count
print(f" {'worker':<12} {'distinct_sess':>14} {'resident_ticks':>16}")
for wid in sorted(sessions_per_worker.keys()):
print(
f" {wid:<12} {len(sessions_per_worker[wid]):>14} "
f"{resident_ticks_per_worker[wid]:>16}"
)
# Per session: how many workers it hopped across
hops = Counter(len(ws) for ws in workers_per_session.values())
print(f"\n Sessions seen on N workers (decode side):")
for n, count in sorted(hops.items()):
print(f" on {n} worker(s): {count} sessions")
starvation = [sid for sid, ws in workers_per_session.items() if len(ws) == 0]
multi_hopper = sorted(
((sid, ws) for sid, ws in workers_per_session.items() if len(ws) >= 2),
key=lambda x: -len(x[1]),
)[:10]
if multi_hopper:
print(
"\n Top sessions seen resident on multiple workers (potential thrashing):"
)
for sid, ws in multi_hopper:
print(f" {sid}: {len(ws)} workers ({sorted(ws)})")
summary["session_residency"] = {
"distinct_sessions_per_worker": {
wid: len(s) for wid, s in sessions_per_worker.items()
},
"session_hop_count_distribution": dict(hops),
"starvation_session_count": len(starvation),
}
# If a request-metrics file is co-located, also bucket fallback reasons
# against contemporaneous pool state (rough — uses tick nearest to median tick).
metrics_path = timeseries_path.with_name("request-metrics.jsonl")
if metrics_path.exists():
print(f"\n=== Request-metrics summary ({metrics_path.name}) ===")
mrows = _load_jsonl(metrics_path)
modes = Counter(r.get("execution_mode") or "?" for r in mrows)
total = sum(modes.values())
for mode, count in modes.most_common():
print(f" {count:>6} ({100 * count / total:5.1f}%) {mode}")
summary["execution_modes"] = dict(modes)
return summary
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"path",
type=Path,
help="Path to d-pool-timeseries.jsonl OR a run dir containing it",
)
parser.add_argument(
"--json",
action="store_true",
help="Also print a machine-readable JSON summary",
)
args = parser.parse_args()
resolved = _resolve_input(args.path)
summary = analyze(resolved)
if args.json:
print("\n=== JSON summary ===")
print(json.dumps(summary, indent=2, sort_keys=True, default=str))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,125 @@
#!/bin/bash
# TP1 v5 + profiling — re-run the v5 (Option D) config with the new
# d-pool-timeseries poller enabled, so we can attribute each session-cap
# fallback to actual D KV pool occupancy (held vs available vs idle-evictable
# vs prefill-backup) instead of guessing.
#
# Output:
# outputs/qwen3-30b-tp1-v5-optD-profile/
# ├── kvcache-centric-kv-aware-worker-admission-<ts>/
# │ ├── request-metrics.jsonl
# │ ├── request-metrics.jsonl.summary.json
# │ └── d-pool-timeseries.jsonl ← NEW (1Hz P/D /server_info snapshots)
# ├── exp1_1p7d_kvc_optD_profile_metrics.jsonl
# └── exp2_2p6d_kvc_optD_profile_metrics.jsonl
set -euo pipefail
cd "$(dirname "$0")/.."
MODEL=/mnt/kzlin/workflow/pd-hybrid/simm-swe-bench/models/Qwen3-30B-A3B-Instruct-2507
TRACE=outputs/qwen35-swebench-50sess.jsonl
OUTPUT=outputs/qwen3-30b-tp1-v5-optD-profile
VENV_PYTHON=.venv/bin/python
RESULTS_FILE=$OUTPUT/sweep_results.txt
POLL_INTERVAL=1.0
mkdir -p $OUTPUT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a $RESULTS_FILE
}
save_result() {
local label=$1
local run_dir=$2
log "=== $label COMPLETED ==="
if [ -f "$run_dir/request-metrics.jsonl.summary.json" ]; then
log "Summary:"
cat "$run_dir/request-metrics.jsonl.summary.json" >> $RESULTS_FILE
echo "" >> $RESULTS_FILE
cp "$run_dir/request-metrics.jsonl.summary.json" "$OUTPUT/${label}_summary.json"
cp "$run_dir/request-metrics.jsonl" "$OUTPUT/${label}_metrics.jsonl"
if [ -f "$run_dir/d-pool-timeseries.jsonl" ]; then
cp "$run_dir/d-pool-timeseries.jsonl" "$OUTPUT/${label}_pool_timeseries.jsonl"
log "Pool timeseries: $(wc -l < $OUTPUT/${label}_pool_timeseries.jsonl) rows"
else
log "WARNING: no d-pool-timeseries.jsonl produced"
fi
log "Saved to $OUTPUT/${label}_summary.json + ${label}_metrics.jsonl + ${label}_pool_timeseries.jsonl"
else
log "WARNING: No summary file found in $run_dir"
fi
}
log "Starting TP1 v5 + profile sweep (Option D + ${POLL_INTERVAL}s pool polling)"
log "Model: $MODEL"
log "Trace: $TRACE (4449 requests, 52 sessions)"
log "Profiling: --pool-poll-interval-s $POLL_INTERVAL (writes d-pool-timeseries.jsonl)"
########################################
# Experiment 1: 1P + 7D KVC kv-aware Option D + profile
########################################
log ""
log "=== [EXP1] 1P7D KVC kv-aware Option D + profile ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 1 --decode-workers 7 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0 --decode-gpu-ids 1,2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction \
--pool-poll-interval-s $POLL_INTERVAL
EXP1_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp1_1p7d_kvc_optD_profile" "$EXP1_DIR"
########################################
# Experiment 2: 2P + 6D KVC kv-aware Option D + profile
########################################
log ""
log "=== [EXP2] 2P6D KVC kv-aware Option D + profile ==="
PYTHONPATH=src:third_party/sglang/python \
$VENV_PYTHON -m agentic_pd_hybrid.cli benchmark-live \
--trace $TRACE \
--output-root $OUTPUT \
--mechanism kvcache-centric \
--policy kv-aware \
--model-path $MODEL \
--prefill-workers 2 --decode-workers 6 \
--prefill-tp-size 1 --decode-tp-size 1 \
--prefill-gpu-ids 0,1 --decode-gpu-ids 2,3,4,5,6,7 \
--transfer-backend mooncake \
--gpu-budget 8 \
--time-scale 10 \
--session-sample-rate 1.0 \
--target-duration-s 100000 \
--concurrency-limit 32 \
--timeout-s 900 \
--request-timeout-s 300 \
--kvcache-admission-mode worker \
--kvcache-seed-min-turn-id 1 \
--kvcache-seed-max-inflight-decode -1 \
--kvcache-prefill-backup-policy release-after-transfer \
--kvcache-prefill-priority-eviction \
--pool-poll-interval-s $POLL_INTERVAL
EXP2_DIR=$(ls -td $OUTPUT/kvcache-centric-*/ 2>/dev/null | head -1)
save_result "exp2_2p6d_kvc_optD_profile" "$EXP2_DIR"
log ""
log "=== ALL TP1 V5+PROFILE EXPERIMENTS DONE ==="

View File

@@ -43,6 +43,8 @@ class BenchmarkConfig:
kvcache_prefill_priority_eviction: bool = False
kvcache_prefill_direct_priority: int = -100
kvcache_prefill_normal_priority: int = 100
pool_poll_interval_s: float = 0.0
pool_poll_include_sessions: bool = True
sample_profile: str = "default"
min_initial_input_tokens: int | None = None
max_initial_input_tokens: int | None = None
@@ -190,6 +192,8 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
),
kvcache_prefill_direct_priority=config.kvcache_prefill_direct_priority,
kvcache_prefill_normal_priority=config.kvcache_prefill_normal_priority,
pool_poll_interval_s=config.pool_poll_interval_s,
pool_poll_include_sessions=config.pool_poll_include_sessions,
)
if config.request_timeout_s is not None:
replay_config = replace(
@@ -246,6 +250,8 @@ def run_live_benchmark(config: BenchmarkConfig) -> BenchmarkArtifacts:
"kvcache_prefill_normal_priority": (
config.kvcache_prefill_normal_priority
),
"pool_poll_interval_s": config.pool_poll_interval_s,
"pool_poll_include_sessions": config.pool_poll_include_sessions,
"sample_profile": config.sample_profile,
"min_initial_input_tokens": config.min_initial_input_tokens,
"max_initial_input_tokens": config.max_initial_input_tokens,

View File

@@ -228,6 +228,23 @@ def main() -> None:
)
replay.add_argument("--kvcache-prefill-direct-priority", type=int, default=-100)
replay.add_argument("--kvcache-prefill-normal-priority", type=int, default=100)
replay.add_argument(
"--pool-poll-interval-s",
type=float,
default=0.0,
help=(
"Poll each P/D worker's /server_info every N seconds and write a "
"time-series snapshot to <run_dir>/d-pool-timeseries.jsonl. "
"0 disables polling."
),
)
replay.add_argument(
"--pool-poll-no-sessions",
action="store_true",
help=(
"Disable per-session detail in the pool timeseries (smaller files)."
),
)
sample = subparsers.add_parser(
"sample-sessions",
@@ -439,6 +456,23 @@ def main() -> None:
)
benchmark.add_argument("--kvcache-prefill-direct-priority", type=int, default=-100)
benchmark.add_argument("--kvcache-prefill-normal-priority", type=int, default=100)
benchmark.add_argument(
"--pool-poll-interval-s",
type=float,
default=0.0,
help=(
"Poll each P/D worker's /server_info every N seconds and write a "
"time-series snapshot to <run_dir>/d-pool-timeseries.jsonl. "
"0 disables polling."
),
)
benchmark.add_argument(
"--pool-poll-no-sessions",
action="store_true",
help=(
"Disable per-session detail in the pool timeseries (smaller files)."
),
)
benchmark.add_argument(
"--sample-profile",
choices=["default", "small-append"],
@@ -520,6 +554,8 @@ def main() -> None:
),
kvcache_prefill_direct_priority=args.kvcache_prefill_direct_priority,
kvcache_prefill_normal_priority=args.kvcache_prefill_normal_priority,
pool_poll_interval_s=args.pool_poll_interval_s,
pool_poll_include_sessions=not args.pool_poll_no_sessions,
)
results = asyncio.run(replay_trace(config))
print(
@@ -662,6 +698,8 @@ def main() -> None:
kvcache_prefill_normal_priority=(
args.kvcache_prefill_normal_priority
),
pool_poll_interval_s=args.pool_poll_interval_s,
pool_poll_include_sessions=not args.pool_poll_no_sessions,
sample_profile=args.sample_profile,
min_initial_input_tokens=args.min_initial_input_tokens,
max_initial_input_tokens=args.max_initial_input_tokens,

View File

@@ -64,6 +64,8 @@ class ReplayConfig:
kvcache_prefill_priority_eviction: bool = False
kvcache_prefill_direct_priority: int = -100
kvcache_prefill_normal_priority: int = 100
pool_poll_interval_s: float = 0.0
pool_poll_include_sessions: bool = True
@dataclass
@@ -155,6 +157,25 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
client=client,
config=config,
)
poll_task: asyncio.Task[None] | None = None
if config.pool_poll_interval_s > 0:
poll_workers: list[tuple[str, str, str]] = []
for worker in config.topology.decode_workers:
poll_workers.append((worker.worker_id, "decode", worker.url))
for worker in config.topology.prefill_workers:
poll_workers.append((worker.worker_id, "prefill", worker.url))
if poll_workers:
poll_output = config.output_path.parent / "d-pool-timeseries.jsonl"
poll_task = asyncio.create_task(
_poll_pool_timeseries(
client=client,
workers=poll_workers,
interval_s=config.pool_poll_interval_s,
output_path=poll_output,
start_time=start_time,
include_sessions=config.pool_poll_include_sessions,
)
)
tasks = []
for request in requests:
if config.pace:
@@ -182,6 +203,12 @@ async def replay_trace(config: ReplayConfig) -> list[RequestMetrics]:
session_tail_tasks[request.session_id] = tasks[-1]
results = await asyncio.gather(*tasks)
if poll_task is not None:
poll_task.cancel()
try:
await poll_task
except asyncio.CancelledError:
pass
for session in direct_sessions.values():
if session.opened:
try:
@@ -644,6 +671,122 @@ async def _fetch_decode_server_state(
)
async def _query_pool_snapshot(
*,
client: httpx.AsyncClient,
server_url: str,
include_sessions: bool,
) -> dict[str, Any]:
try:
response = await client.get(
f"{server_url.rstrip('/')}/server_info",
timeout=_ADMISSION_PROBE_TIMEOUT_S,
)
response.raise_for_status()
payload = response.json()
except Exception as exc:
return {"error": type(exc).__name__}
internal = _extract_internal_state(payload)
session_cache = _extract_session_cache(payload)
sessions: list[dict[str, Any]] = []
if include_sessions and isinstance(session_cache.get("sessions"), list):
for entry in session_cache["sessions"]:
if not isinstance(entry, dict):
continue
sessions.append(
{
"session_id": entry.get("session_id"),
"resident": bool(entry.get("resident")),
"resident_tokens": int(entry.get("resident_tokens") or 0),
"idle_evictable": bool(entry.get("idle_evictable")),
"timed_out": bool(entry.get("timed_out")),
}
)
memory_usage = internal.get("memory_usage") if isinstance(internal, dict) else None
if not isinstance(memory_usage, dict):
memory_usage = {}
return {
"session_cache_enabled": bool(session_cache.get("enabled")),
"session_count": int(session_cache.get("session_count") or 0),
"resident_session_count": int(session_cache.get("resident_session_count") or 0),
"held_tokens": int(session_cache.get("held_tokens") or 0),
"available_tokens": int(session_cache.get("available_tokens") or 0),
"capacity_tokens": int(session_cache.get("capacity_tokens") or 0),
"idle_evictable_session_count": int(
session_cache.get("idle_evictable_session_count") or 0
),
"idle_evictable_tokens": int(session_cache.get("idle_evictable_tokens") or 0),
"kvcache_mem_gb": float(memory_usage.get("kvcache") or 0.0),
"token_capacity": int(memory_usage.get("token_capacity") or 0),
"max_total_num_tokens": int(internal.get("max_total_num_tokens") or 0)
if isinstance(internal, dict)
else 0,
"last_gen_throughput": float(internal.get("last_gen_throughput") or 0.0)
if isinstance(internal, dict)
else 0.0,
"sessions": sessions,
}
async def _poll_pool_timeseries(
*,
client: httpx.AsyncClient,
workers: list[tuple[str, str, str]],
interval_s: float,
output_path: Path,
start_time: float,
include_sessions: bool,
) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as handle:
try:
while True:
tick_started = time.perf_counter()
ts = time.time()
wall_s = tick_started - start_time
snapshots = await asyncio.gather(
*(
_query_pool_snapshot(
client=client,
server_url=url,
include_sessions=include_sessions,
)
for _, _, url in workers
),
return_exceptions=True,
)
for (worker_id, role, url), snap in zip(workers, snapshots):
if isinstance(snap, BaseException):
row: dict[str, Any] = {
"ts": ts,
"wall_s": wall_s,
"worker_id": worker_id,
"worker_role": role,
"worker_url": url,
"error": type(snap).__name__,
}
else:
row = {
"ts": ts,
"wall_s": wall_s,
"worker_id": worker_id,
"worker_role": role,
"worker_url": url,
**snap,
}
handle.write(json.dumps(row, sort_keys=True) + "\n")
handle.flush()
elapsed = time.perf_counter() - tick_started
sleep_s = interval_s - elapsed
if sleep_s > 0:
await asyncio.sleep(sleep_s)
except asyncio.CancelledError:
return
async def _query_decode_direct_admission(
*,
client: httpx.AsyncClient,