Initial mm CLI
This commit is contained in:
6
.config/mm/list.yaml
Normal file
6
.config/mm/list.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
machines:
|
||||
- dash0
|
||||
- dash1
|
||||
- dash2
|
||||
- dash3
|
||||
- dash4
|
||||
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*.egg-info/
|
||||
.venv/
|
||||
34
README.md
Normal file
34
README.md
Normal file
@@ -0,0 +1,34 @@
|
||||
# mm
|
||||
|
||||
`mm` is a local terminal dashboard for GPU status on SSH machines.
|
||||
|
||||
## Install
|
||||
|
||||
```bash
|
||||
python3 -m pip install -e .
|
||||
```
|
||||
|
||||
## Configure
|
||||
|
||||
Create either `./.config/mm/list.yaml` in the directory where you run `mm`, or
|
||||
`~/.config/mm/list.yaml` for a global config.
|
||||
|
||||
```yaml
|
||||
machines:
|
||||
- dash0
|
||||
- dash1
|
||||
- dash2
|
||||
- alias: dash3
|
||||
label: dash3 / lab
|
||||
```
|
||||
|
||||
Each alias must be reachable by `ssh <alias>` and must have `nvidia-smi`
|
||||
available on the remote machine.
|
||||
|
||||
## Run
|
||||
|
||||
```bash
|
||||
mm
|
||||
mm --config ~/.config/mm/list.yaml
|
||||
mm --timeout 8
|
||||
```
|
||||
3
mm/__init__.py
Normal file
3
mm/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""mm command line package."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
5
mm/__main__.py
Normal file
5
mm/__main__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from mm.cli import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
418
mm/cli.py
Normal file
418
mm/cli.py
Normal file
@@ -0,0 +1,418 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Sequence
|
||||
|
||||
import yaml
|
||||
from rich import box
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
|
||||
from mm import __version__
|
||||
|
||||
|
||||
QUERY_FIELDS = (
|
||||
"index",
|
||||
"name",
|
||||
"memory.used",
|
||||
"memory.total",
|
||||
"utilization.gpu",
|
||||
"temperature.gpu",
|
||||
"power.draw",
|
||||
"power.limit",
|
||||
)
|
||||
REMOTE_COMMAND = (
|
||||
f"nvidia-smi --query-gpu={','.join(QUERY_FIELDS)} "
|
||||
"--format=csv,noheader,nounits"
|
||||
)
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
"""Raised when mm cannot load a usable machine config."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MachineConfig:
|
||||
alias: str
|
||||
label: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GPUStat:
|
||||
index: str
|
||||
name: str
|
||||
memory_used_mib: float | None
|
||||
memory_total_mib: float | None
|
||||
utilization_pct: float | None
|
||||
temperature_c: float | None
|
||||
power_draw_w: float | None
|
||||
power_limit_w: float | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MachineResult:
|
||||
machine: MachineConfig
|
||||
gpus: tuple[GPUStat, ...]
|
||||
error: str | None = None
|
||||
|
||||
|
||||
def default_config_candidates() -> list[Path]:
|
||||
candidates = [Path.cwd() / ".config/mm/list.yaml"]
|
||||
|
||||
xdg_home = os.environ.get("XDG_CONFIG_HOME")
|
||||
if xdg_home:
|
||||
candidates.append(Path(xdg_home).expanduser() / "mm/list.yaml")
|
||||
|
||||
candidates.append(Path.home() / ".config/mm/list.yaml")
|
||||
|
||||
unique_candidates: list[Path] = []
|
||||
seen: set[Path] = set()
|
||||
for path in candidates:
|
||||
resolved = path.expanduser()
|
||||
if resolved not in seen:
|
||||
unique_candidates.append(resolved)
|
||||
seen.add(resolved)
|
||||
return unique_candidates
|
||||
|
||||
|
||||
def resolve_config_path(config_arg: str | None) -> Path:
|
||||
if config_arg:
|
||||
path = Path(config_arg).expanduser()
|
||||
if not path.exists():
|
||||
raise ConfigError(f"config file does not exist: {path}")
|
||||
return path
|
||||
|
||||
for path in default_config_candidates():
|
||||
if path.exists():
|
||||
return path
|
||||
|
||||
searched = ", ".join(str(path) for path in default_config_candidates())
|
||||
raise ConfigError(f"no config file found; searched: {searched}")
|
||||
|
||||
|
||||
def load_machines(path: Path) -> list[MachineConfig]:
|
||||
try:
|
||||
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
|
||||
except yaml.YAMLError as exc:
|
||||
raise ConfigError(f"invalid YAML in {path}: {exc}") from exc
|
||||
except OSError as exc:
|
||||
raise ConfigError(f"cannot read {path}: {exc}") from exc
|
||||
|
||||
records = raw.get("machines") if isinstance(raw, dict) else raw
|
||||
if records is None:
|
||||
raise ConfigError(f"{path} must contain a machines list")
|
||||
|
||||
machine_items = normalize_machine_records(records)
|
||||
machines: list[MachineConfig] = []
|
||||
for position, item in enumerate(machine_items, start=1):
|
||||
machine = parse_machine_item(item, position)
|
||||
machines.append(machine)
|
||||
|
||||
if not machines:
|
||||
raise ConfigError(f"{path} does not define any machines")
|
||||
|
||||
return machines
|
||||
|
||||
|
||||
def normalize_machine_records(records: Any) -> list[Any]:
|
||||
if isinstance(records, list):
|
||||
return records
|
||||
|
||||
if isinstance(records, dict):
|
||||
normalized: list[dict[str, Any]] = []
|
||||
for alias, value in records.items():
|
||||
if value is None:
|
||||
normalized.append({"alias": alias})
|
||||
elif isinstance(value, dict):
|
||||
normalized.append({"alias": alias, **value})
|
||||
else:
|
||||
raise ConfigError(
|
||||
"machines mapping values must be objects or null; "
|
||||
f"got {type(value).__name__} for {alias!r}"
|
||||
)
|
||||
return normalized
|
||||
|
||||
raise ConfigError("machines must be a list or mapping")
|
||||
|
||||
|
||||
def parse_machine_item(item: Any, position: int) -> MachineConfig:
|
||||
if isinstance(item, str):
|
||||
alias = item.strip()
|
||||
label = alias
|
||||
elif isinstance(item, dict):
|
||||
alias_value = item.get("alias")
|
||||
alias = str(alias_value).strip() if alias_value is not None else ""
|
||||
label_value = item.get("label") or item.get("name") or alias
|
||||
label = str(label_value).strip()
|
||||
else:
|
||||
raise ConfigError(
|
||||
f"machine entry #{position} must be a string or object, "
|
||||
f"got {type(item).__name__}"
|
||||
)
|
||||
|
||||
if not alias:
|
||||
raise ConfigError(f"machine entry #{position} is missing an alias")
|
||||
|
||||
return MachineConfig(alias=alias, label=label or alias)
|
||||
|
||||
|
||||
def ssh_command(alias: str, timeout: float) -> list[str]:
|
||||
connect_timeout = max(1, int(timeout))
|
||||
return [
|
||||
"ssh",
|
||||
"-o",
|
||||
"BatchMode=yes",
|
||||
"-o",
|
||||
f"ConnectTimeout={connect_timeout}",
|
||||
alias,
|
||||
REMOTE_COMMAND,
|
||||
]
|
||||
|
||||
|
||||
def query_machine(machine: MachineConfig, timeout: float) -> MachineResult:
|
||||
try:
|
||||
completed = subprocess.run(
|
||||
ssh_command(machine.alias, timeout),
|
||||
capture_output=True,
|
||||
check=False,
|
||||
text=True,
|
||||
timeout=timeout,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return MachineResult(machine, (), "ssh command not found")
|
||||
except subprocess.TimeoutExpired:
|
||||
return MachineResult(machine, (), f"ssh timed out after {timeout:g}s")
|
||||
|
||||
if completed.returncode != 0:
|
||||
detail = (completed.stderr or completed.stdout or "").strip()
|
||||
if not detail:
|
||||
detail = f"ssh exited with status {completed.returncode}"
|
||||
return MachineResult(machine, (), detail)
|
||||
|
||||
try:
|
||||
gpus = tuple(parse_nvidia_smi_csv(completed.stdout))
|
||||
except ValueError as exc:
|
||||
return MachineResult(machine, (), str(exc))
|
||||
|
||||
if not gpus:
|
||||
return MachineResult(machine, (), "nvidia-smi returned no GPUs")
|
||||
|
||||
return MachineResult(machine, gpus)
|
||||
|
||||
|
||||
def collect_status(machines: Sequence[MachineConfig], timeout: float) -> list[MachineResult]:
|
||||
max_workers = min(len(machines), 16)
|
||||
results: list[MachineResult | None] = [None] * len(machines)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
future_to_index = {
|
||||
executor.submit(query_machine, machine, timeout): index
|
||||
for index, machine in enumerate(machines)
|
||||
}
|
||||
for future in as_completed(future_to_index):
|
||||
index = future_to_index[future]
|
||||
results[index] = future.result()
|
||||
|
||||
return [result for result in results if result is not None]
|
||||
|
||||
|
||||
def parse_nvidia_smi_csv(output: str) -> list[GPUStat]:
|
||||
gpus: list[GPUStat] = []
|
||||
rows = csv.reader(line for line in output.splitlines() if line.strip())
|
||||
|
||||
for row in rows:
|
||||
values = [value.strip() for value in row]
|
||||
if len(values) != len(QUERY_FIELDS):
|
||||
raise ValueError(
|
||||
"unexpected nvidia-smi output: "
|
||||
f"expected {len(QUERY_FIELDS)} columns, got {len(values)}"
|
||||
)
|
||||
gpus.append(
|
||||
GPUStat(
|
||||
index=values[0],
|
||||
name=values[1],
|
||||
memory_used_mib=parse_number(values[2]),
|
||||
memory_total_mib=parse_number(values[3]),
|
||||
utilization_pct=parse_number(values[4]),
|
||||
temperature_c=parse_number(values[5]),
|
||||
power_draw_w=parse_number(values[6]),
|
||||
power_limit_w=parse_number(values[7]),
|
||||
)
|
||||
)
|
||||
|
||||
return gpus
|
||||
|
||||
|
||||
def parse_number(value: str) -> float | None:
|
||||
normalized = value.strip()
|
||||
if normalized in {"", "N/A", "[N/A]", "Not Supported", "[Not Supported]"}:
|
||||
return None
|
||||
|
||||
try:
|
||||
return float(normalized)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def render_dashboard(
|
||||
results: Sequence[MachineResult],
|
||||
config_path: Path,
|
||||
console: Console,
|
||||
) -> None:
|
||||
console.print(Text(f"GPU status from {config_path}", style="dim"))
|
||||
console.print(render_compact_table(results))
|
||||
|
||||
|
||||
def render_compact_table(results: Sequence[MachineResult]) -> Table:
|
||||
table = Table(
|
||||
box=box.ROUNDED,
|
||||
expand=True,
|
||||
show_edge=False,
|
||||
show_lines=False,
|
||||
padding=(0, 1),
|
||||
)
|
||||
table.add_column("Machine", style="bold cyan", no_wrap=True, overflow="ellipsis")
|
||||
table.add_column("GPUs (mem util)", overflow="fold", ratio=1)
|
||||
|
||||
for result in results:
|
||||
table.add_row(machine_label(result.machine), gpu_summary(result))
|
||||
|
||||
return table
|
||||
|
||||
|
||||
def machine_label(machine: MachineConfig) -> Text:
|
||||
label = Text(machine.label, style="bold cyan")
|
||||
if machine.label != machine.alias:
|
||||
label.append(f" ({machine.alias})", style="dim")
|
||||
return label
|
||||
|
||||
|
||||
def gpu_summary(result: MachineResult) -> Text:
|
||||
if result.error:
|
||||
return Text(short_error(result.error), style="red")
|
||||
|
||||
summary = Text()
|
||||
for index, gpu in enumerate(result.gpus):
|
||||
if index:
|
||||
summary.append(" ")
|
||||
append_gpu_chip(summary, gpu)
|
||||
return summary
|
||||
|
||||
|
||||
def append_gpu_chip(summary: Text, gpu: GPUStat) -> None:
|
||||
memory_pct = gpu_memory_percent(gpu)
|
||||
util_pct = gpu.utilization_pct
|
||||
|
||||
summary.append(gpu.index, style="bold")
|
||||
summary.append(" ")
|
||||
summary.append(format_memory_pair(gpu), style=metric_style(memory_pct))
|
||||
summary.append(" ")
|
||||
summary.append(f"{format_percent(util_pct)}%", style=metric_style(util_pct))
|
||||
|
||||
|
||||
def gpu_memory_percent(gpu: GPUStat) -> float | None:
|
||||
used = gpu.memory_used_mib
|
||||
total = gpu.memory_total_mib
|
||||
if used is None or total is None or total <= 0:
|
||||
return None
|
||||
return (used / total) * 100
|
||||
|
||||
|
||||
def format_percent(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "??"
|
||||
return f"{value:.0f}"
|
||||
|
||||
|
||||
def metric_style(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "dim"
|
||||
return usage_style(max(0.0, min(value, 100.0)))
|
||||
|
||||
|
||||
def usage_style(percent: float) -> str:
|
||||
if percent >= 90:
|
||||
return "bold red"
|
||||
if percent >= 75:
|
||||
return "yellow"
|
||||
return "green"
|
||||
|
||||
|
||||
def short_error(error: str) -> str:
|
||||
message = " ".join(error.split())
|
||||
if len(message) <= 120:
|
||||
return message
|
||||
return f"{message[:117]}..."
|
||||
|
||||
|
||||
def format_memory_pair(gpu: GPUStat) -> str:
|
||||
used = gpu.memory_used_mib
|
||||
total = gpu.memory_total_mib
|
||||
if used is None or total is None or total <= 0:
|
||||
return "??"
|
||||
|
||||
if used >= 1024 or total >= 1024:
|
||||
return f"{format_gib(used)}/{format_gib(total)}G"
|
||||
return f"{used:.0f}/{total:.0f}M"
|
||||
|
||||
|
||||
def format_gib(value: float) -> str:
|
||||
gib = value / 1024
|
||||
if gib >= 10 or gib.is_integer():
|
||||
return f"{gib:.0f}"
|
||||
return f"{gib:.1f}"
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="mm",
|
||||
description="Show GPU status for SSH machine aliases.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config",
|
||||
help="Path to list.yaml. Defaults to ./.config/mm/list.yaml, then XDG config.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--timeout",
|
||||
type=float,
|
||||
default=10.0,
|
||||
help="SSH timeout per machine in seconds. Default: 10.",
|
||||
)
|
||||
parser.add_argument("--no-color", action="store_true", help="Disable terminal colors.")
|
||||
parser.add_argument("--version", action="version", version=f"mm {__version__}")
|
||||
return parser
|
||||
|
||||
|
||||
def run(argv: Sequence[str] | None = None) -> int:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
console = Console(no_color=args.no_color)
|
||||
|
||||
if args.timeout <= 0:
|
||||
console.print("[red]error:[/] --timeout must be greater than 0")
|
||||
return 2
|
||||
|
||||
try:
|
||||
config_path = resolve_config_path(args.config)
|
||||
machines = load_machines(config_path)
|
||||
except ConfigError as exc:
|
||||
console.print(f"[red]error:[/] {exc}")
|
||||
return 2
|
||||
|
||||
results = collect_status(machines, args.timeout)
|
||||
render_dashboard(results, config_path, console)
|
||||
return 1 if any(result.error for result in results) else 0
|
||||
|
||||
|
||||
def main() -> None:
|
||||
raise SystemExit(run(sys.argv[1:]))
|
||||
20
pyproject.toml
Normal file
20
pyproject.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=68"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "mm"
|
||||
version = "0.1.0"
|
||||
description = "Terminal GPU status dashboard for SSH machine aliases."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"PyYAML>=6.0",
|
||||
"rich>=13.7",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
mm = "mm.cli:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
include = ["mm*"]
|
||||
63
tests/test_cli.py
Normal file
63
tests/test_cli.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
import unittest
|
||||
|
||||
from mm.cli import load_machines, parse_nvidia_smi_csv
|
||||
|
||||
|
||||
class ConfigTests(unittest.TestCase):
|
||||
def test_loads_string_and_object_machine_entries(self) -> None:
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
config_path = Path(tmpdir) / "list.yaml"
|
||||
config_path.write_text(
|
||||
"""
|
||||
machines:
|
||||
- dash0
|
||||
- alias: dash1
|
||||
label: training-1
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
machines = load_machines(config_path)
|
||||
|
||||
self.assertEqual([machine.alias for machine in machines], ["dash0", "dash1"])
|
||||
self.assertEqual([machine.label for machine in machines], ["dash0", "training-1"])
|
||||
|
||||
def test_loads_mapping_machine_entries(self) -> None:
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
config_path = Path(tmpdir) / "list.yaml"
|
||||
config_path.write_text(
|
||||
"""
|
||||
machines:
|
||||
dash0:
|
||||
dash1:
|
||||
label: training-1
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
machines = load_machines(config_path)
|
||||
|
||||
self.assertEqual([machine.alias for machine in machines], ["dash0", "dash1"])
|
||||
self.assertEqual([machine.label for machine in machines], ["dash0", "training-1"])
|
||||
|
||||
|
||||
class NvidiaSmiParsingTests(unittest.TestCase):
|
||||
def test_parse_csv_output(self) -> None:
|
||||
output = (
|
||||
"0, NVIDIA A100-SXM4-80GB, 12000, 81920, 84, 61, 294.3, 400.0\n"
|
||||
"1, NVIDIA A100-SXM4-80GB, 0, 81920, 0, 32, 49.8, 400.0\n"
|
||||
)
|
||||
|
||||
gpus = parse_nvidia_smi_csv(output)
|
||||
|
||||
self.assertEqual(len(gpus), 2)
|
||||
self.assertEqual(gpus[0].index, "0")
|
||||
self.assertEqual(gpus[0].memory_used_mib, 12000)
|
||||
self.assertEqual(gpus[0].utilization_pct, 84)
|
||||
self.assertEqual(gpus[1].power_draw_w, 49.8)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user