commit 77c22ab21f133d63adaecf1900dfba8d5e4d23d4 Author: Gahow Wang Date: Thu Jun 25 11:01:58 2026 +0800 Initial mm CLI diff --git a/.config/mm/list.yaml b/.config/mm/list.yaml new file mode 100644 index 0000000..949a3a0 --- /dev/null +++ b/.config/mm/list.yaml @@ -0,0 +1,6 @@ +machines: + - dash0 + - dash1 + - dash2 + - dash3 + - dash4 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..aae3ad3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..a9f1eb9 --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# mm + +`mm` is a local terminal dashboard for GPU status on SSH machines. + +## Install + +```bash +python3 -m pip install -e . +``` + +## Configure + +Create either `./.config/mm/list.yaml` in the directory where you run `mm`, or +`~/.config/mm/list.yaml` for a global config. + +```yaml +machines: + - dash0 + - dash1 + - dash2 + - alias: dash3 + label: dash3 / lab +``` + +Each alias must be reachable by `ssh ` and must have `nvidia-smi` +available on the remote machine. + +## Run + +```bash +mm +mm --config ~/.config/mm/list.yaml +mm --timeout 8 +``` diff --git a/mm/__init__.py b/mm/__init__.py new file mode 100644 index 0000000..dfbc5d1 --- /dev/null +++ b/mm/__init__.py @@ -0,0 +1,3 @@ +"""mm command line package.""" + +__version__ = "0.1.0" diff --git a/mm/__main__.py b/mm/__main__.py new file mode 100644 index 0000000..13f5bb9 --- /dev/null +++ b/mm/__main__.py @@ -0,0 +1,5 @@ +from mm.cli import main + + +if __name__ == "__main__": + main() diff --git a/mm/cli.py b/mm/cli.py new file mode 100644 index 0000000..c382446 --- /dev/null +++ b/mm/cli.py @@ -0,0 +1,418 @@ +from __future__ import annotations + +import argparse +import csv +import os +import subprocess +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Sequence + +import yaml +from rich import box +from rich.console import Console +from rich.table import Table +from rich.text import Text + +from mm import __version__ + + +QUERY_FIELDS = ( + "index", + "name", + "memory.used", + "memory.total", + "utilization.gpu", + "temperature.gpu", + "power.draw", + "power.limit", +) +REMOTE_COMMAND = ( + f"nvidia-smi --query-gpu={','.join(QUERY_FIELDS)} " + "--format=csv,noheader,nounits" +) + + +class ConfigError(Exception): + """Raised when mm cannot load a usable machine config.""" + + +@dataclass(frozen=True) +class MachineConfig: + alias: str + label: str + + +@dataclass(frozen=True) +class GPUStat: + index: str + name: str + memory_used_mib: float | None + memory_total_mib: float | None + utilization_pct: float | None + temperature_c: float | None + power_draw_w: float | None + power_limit_w: float | None + + +@dataclass(frozen=True) +class MachineResult: + machine: MachineConfig + gpus: tuple[GPUStat, ...] + error: str | None = None + + +def default_config_candidates() -> list[Path]: + candidates = [Path.cwd() / ".config/mm/list.yaml"] + + xdg_home = os.environ.get("XDG_CONFIG_HOME") + if xdg_home: + candidates.append(Path(xdg_home).expanduser() / "mm/list.yaml") + + candidates.append(Path.home() / ".config/mm/list.yaml") + + unique_candidates: list[Path] = [] + seen: set[Path] = set() + for path in candidates: + resolved = path.expanduser() + if resolved not in seen: + unique_candidates.append(resolved) + seen.add(resolved) + return unique_candidates + + +def resolve_config_path(config_arg: str | None) -> Path: + if config_arg: + path = Path(config_arg).expanduser() + if not path.exists(): + raise ConfigError(f"config file does not exist: {path}") + return path + + for path in default_config_candidates(): + if path.exists(): + return path + + searched = ", ".join(str(path) for path in default_config_candidates()) + raise ConfigError(f"no config file found; searched: {searched}") + + +def load_machines(path: Path) -> list[MachineConfig]: + try: + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + except yaml.YAMLError as exc: + raise ConfigError(f"invalid YAML in {path}: {exc}") from exc + except OSError as exc: + raise ConfigError(f"cannot read {path}: {exc}") from exc + + records = raw.get("machines") if isinstance(raw, dict) else raw + if records is None: + raise ConfigError(f"{path} must contain a machines list") + + machine_items = normalize_machine_records(records) + machines: list[MachineConfig] = [] + for position, item in enumerate(machine_items, start=1): + machine = parse_machine_item(item, position) + machines.append(machine) + + if not machines: + raise ConfigError(f"{path} does not define any machines") + + return machines + + +def normalize_machine_records(records: Any) -> list[Any]: + if isinstance(records, list): + return records + + if isinstance(records, dict): + normalized: list[dict[str, Any]] = [] + for alias, value in records.items(): + if value is None: + normalized.append({"alias": alias}) + elif isinstance(value, dict): + normalized.append({"alias": alias, **value}) + else: + raise ConfigError( + "machines mapping values must be objects or null; " + f"got {type(value).__name__} for {alias!r}" + ) + return normalized + + raise ConfigError("machines must be a list or mapping") + + +def parse_machine_item(item: Any, position: int) -> MachineConfig: + if isinstance(item, str): + alias = item.strip() + label = alias + elif isinstance(item, dict): + alias_value = item.get("alias") + alias = str(alias_value).strip() if alias_value is not None else "" + label_value = item.get("label") or item.get("name") or alias + label = str(label_value).strip() + else: + raise ConfigError( + f"machine entry #{position} must be a string or object, " + f"got {type(item).__name__}" + ) + + if not alias: + raise ConfigError(f"machine entry #{position} is missing an alias") + + return MachineConfig(alias=alias, label=label or alias) + + +def ssh_command(alias: str, timeout: float) -> list[str]: + connect_timeout = max(1, int(timeout)) + return [ + "ssh", + "-o", + "BatchMode=yes", + "-o", + f"ConnectTimeout={connect_timeout}", + alias, + REMOTE_COMMAND, + ] + + +def query_machine(machine: MachineConfig, timeout: float) -> MachineResult: + try: + completed = subprocess.run( + ssh_command(machine.alias, timeout), + capture_output=True, + check=False, + text=True, + timeout=timeout, + ) + except FileNotFoundError: + return MachineResult(machine, (), "ssh command not found") + except subprocess.TimeoutExpired: + return MachineResult(machine, (), f"ssh timed out after {timeout:g}s") + + if completed.returncode != 0: + detail = (completed.stderr or completed.stdout or "").strip() + if not detail: + detail = f"ssh exited with status {completed.returncode}" + return MachineResult(machine, (), detail) + + try: + gpus = tuple(parse_nvidia_smi_csv(completed.stdout)) + except ValueError as exc: + return MachineResult(machine, (), str(exc)) + + if not gpus: + return MachineResult(machine, (), "nvidia-smi returned no GPUs") + + return MachineResult(machine, gpus) + + +def collect_status(machines: Sequence[MachineConfig], timeout: float) -> list[MachineResult]: + max_workers = min(len(machines), 16) + results: list[MachineResult | None] = [None] * len(machines) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_index = { + executor.submit(query_machine, machine, timeout): index + for index, machine in enumerate(machines) + } + for future in as_completed(future_to_index): + index = future_to_index[future] + results[index] = future.result() + + return [result for result in results if result is not None] + + +def parse_nvidia_smi_csv(output: str) -> list[GPUStat]: + gpus: list[GPUStat] = [] + rows = csv.reader(line for line in output.splitlines() if line.strip()) + + for row in rows: + values = [value.strip() for value in row] + if len(values) != len(QUERY_FIELDS): + raise ValueError( + "unexpected nvidia-smi output: " + f"expected {len(QUERY_FIELDS)} columns, got {len(values)}" + ) + gpus.append( + GPUStat( + index=values[0], + name=values[1], + memory_used_mib=parse_number(values[2]), + memory_total_mib=parse_number(values[3]), + utilization_pct=parse_number(values[4]), + temperature_c=parse_number(values[5]), + power_draw_w=parse_number(values[6]), + power_limit_w=parse_number(values[7]), + ) + ) + + return gpus + + +def parse_number(value: str) -> float | None: + normalized = value.strip() + if normalized in {"", "N/A", "[N/A]", "Not Supported", "[Not Supported]"}: + return None + + try: + return float(normalized) + except ValueError: + return None + + +def render_dashboard( + results: Sequence[MachineResult], + config_path: Path, + console: Console, +) -> None: + console.print(Text(f"GPU status from {config_path}", style="dim")) + console.print(render_compact_table(results)) + + +def render_compact_table(results: Sequence[MachineResult]) -> Table: + table = Table( + box=box.ROUNDED, + expand=True, + show_edge=False, + show_lines=False, + padding=(0, 1), + ) + table.add_column("Machine", style="bold cyan", no_wrap=True, overflow="ellipsis") + table.add_column("GPUs (mem util)", overflow="fold", ratio=1) + + for result in results: + table.add_row(machine_label(result.machine), gpu_summary(result)) + + return table + + +def machine_label(machine: MachineConfig) -> Text: + label = Text(machine.label, style="bold cyan") + if machine.label != machine.alias: + label.append(f" ({machine.alias})", style="dim") + return label + + +def gpu_summary(result: MachineResult) -> Text: + if result.error: + return Text(short_error(result.error), style="red") + + summary = Text() + for index, gpu in enumerate(result.gpus): + if index: + summary.append(" ") + append_gpu_chip(summary, gpu) + return summary + + +def append_gpu_chip(summary: Text, gpu: GPUStat) -> None: + memory_pct = gpu_memory_percent(gpu) + util_pct = gpu.utilization_pct + + summary.append(gpu.index, style="bold") + summary.append(" ") + summary.append(format_memory_pair(gpu), style=metric_style(memory_pct)) + summary.append(" ") + summary.append(f"{format_percent(util_pct)}%", style=metric_style(util_pct)) + + +def gpu_memory_percent(gpu: GPUStat) -> float | None: + used = gpu.memory_used_mib + total = gpu.memory_total_mib + if used is None or total is None or total <= 0: + return None + return (used / total) * 100 + + +def format_percent(value: float | None) -> str: + if value is None: + return "??" + return f"{value:.0f}" + + +def metric_style(value: float | None) -> str: + if value is None: + return "dim" + return usage_style(max(0.0, min(value, 100.0))) + + +def usage_style(percent: float) -> str: + if percent >= 90: + return "bold red" + if percent >= 75: + return "yellow" + return "green" + + +def short_error(error: str) -> str: + message = " ".join(error.split()) + if len(message) <= 120: + return message + return f"{message[:117]}..." + + +def format_memory_pair(gpu: GPUStat) -> str: + used = gpu.memory_used_mib + total = gpu.memory_total_mib + if used is None or total is None or total <= 0: + return "??" + + if used >= 1024 or total >= 1024: + return f"{format_gib(used)}/{format_gib(total)}G" + return f"{used:.0f}/{total:.0f}M" + + +def format_gib(value: float) -> str: + gib = value / 1024 + if gib >= 10 or gib.is_integer(): + return f"{gib:.0f}" + return f"{gib:.1f}" + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="mm", + description="Show GPU status for SSH machine aliases.", + ) + parser.add_argument( + "-c", + "--config", + help="Path to list.yaml. Defaults to ./.config/mm/list.yaml, then XDG config.", + ) + parser.add_argument( + "-t", + "--timeout", + type=float, + default=10.0, + help="SSH timeout per machine in seconds. Default: 10.", + ) + parser.add_argument("--no-color", action="store_true", help="Disable terminal colors.") + parser.add_argument("--version", action="version", version=f"mm {__version__}") + return parser + + +def run(argv: Sequence[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + console = Console(no_color=args.no_color) + + if args.timeout <= 0: + console.print("[red]error:[/] --timeout must be greater than 0") + return 2 + + try: + config_path = resolve_config_path(args.config) + machines = load_machines(config_path) + except ConfigError as exc: + console.print(f"[red]error:[/] {exc}") + return 2 + + results = collect_status(machines, args.timeout) + render_dashboard(results, config_path, console) + return 1 if any(result.error for result in results) else 0 + + +def main() -> None: + raise SystemExit(run(sys.argv[1:])) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..14bfdc1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "mm" +version = "0.1.0" +description = "Terminal GPU status dashboard for SSH machine aliases." +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "PyYAML>=6.0", + "rich>=13.7", +] + +[project.scripts] +mm = "mm.cli:main" + +[tool.setuptools.packages.find] +include = ["mm*"] diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..8fd9bc1 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,63 @@ +from pathlib import Path +from tempfile import TemporaryDirectory +import unittest + +from mm.cli import load_machines, parse_nvidia_smi_csv + + +class ConfigTests(unittest.TestCase): + def test_loads_string_and_object_machine_entries(self) -> None: + with TemporaryDirectory() as tmpdir: + config_path = Path(tmpdir) / "list.yaml" + config_path.write_text( + """ +machines: + - dash0 + - alias: dash1 + label: training-1 +""", + encoding="utf-8", + ) + + machines = load_machines(config_path) + + self.assertEqual([machine.alias for machine in machines], ["dash0", "dash1"]) + self.assertEqual([machine.label for machine in machines], ["dash0", "training-1"]) + + def test_loads_mapping_machine_entries(self) -> None: + with TemporaryDirectory() as tmpdir: + config_path = Path(tmpdir) / "list.yaml" + config_path.write_text( + """ +machines: + dash0: + dash1: + label: training-1 +""", + encoding="utf-8", + ) + + machines = load_machines(config_path) + + self.assertEqual([machine.alias for machine in machines], ["dash0", "dash1"]) + self.assertEqual([machine.label for machine in machines], ["dash0", "training-1"]) + + +class NvidiaSmiParsingTests(unittest.TestCase): + def test_parse_csv_output(self) -> None: + output = ( + "0, NVIDIA A100-SXM4-80GB, 12000, 81920, 84, 61, 294.3, 400.0\n" + "1, NVIDIA A100-SXM4-80GB, 0, 81920, 0, 32, 49.8, 400.0\n" + ) + + gpus = parse_nvidia_smi_csv(output) + + self.assertEqual(len(gpus), 2) + self.assertEqual(gpus[0].index, "0") + self.assertEqual(gpus[0].memory_used_mib, 12000) + self.assertEqual(gpus[0].utilization_pct, 84) + self.assertEqual(gpus[1].power_draw_w, 49.8) + + +if __name__ == "__main__": + unittest.main()