Files

34 lines
1000 B
Python

from __future__ import annotations
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator, TextIO
@contextmanager
def open_trace_text(path: str | Path) -> Iterator[TextIO]:
resolved = Path(path)
if resolved.suffix == ".zst":
proc = subprocess.Popen(
["zstdcat", str(resolved)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
)
if proc.stdout is None:
raise RuntimeError(f"Failed to stream {resolved}")
try:
yield proc.stdout
finally:
proc.stdout.close()
stderr = proc.stderr.read() if proc.stderr else ""
return_code = proc.wait()
if return_code != 0:
raise RuntimeError(f"zstdcat failed for {resolved}: {stderr.strip()}")
return
with resolved.open("r", encoding="utf-8") as handle:
yield handle