34 lines
1000 B
Python
34 lines
1000 B
Python
from __future__ import annotations
|
|
|
|
import subprocess
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import Iterator, TextIO
|
|
|
|
|
|
@contextmanager
|
|
def open_trace_text(path: str | Path) -> Iterator[TextIO]:
|
|
resolved = Path(path)
|
|
if resolved.suffix == ".zst":
|
|
proc = subprocess.Popen(
|
|
["zstdcat", str(resolved)],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding="utf-8",
|
|
)
|
|
if proc.stdout is None:
|
|
raise RuntimeError(f"Failed to stream {resolved}")
|
|
try:
|
|
yield proc.stdout
|
|
finally:
|
|
proc.stdout.close()
|
|
stderr = proc.stderr.read() if proc.stderr else ""
|
|
return_code = proc.wait()
|
|
if return_code != 0:
|
|
raise RuntimeError(f"zstdcat failed for {resolved}: {stderr.strip()}")
|
|
return
|
|
|
|
with resolved.open("r", encoding="utf-8") as handle:
|
|
yield handle
|