diff --git a/src/agent_gitea/agents.py b/src/agent_gitea/agents.py new file mode 100644 index 0000000..eb05e90 --- /dev/null +++ b/src/agent_gitea/agents.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path + +from .models import AgentResult + + +class CommandRunner: + def run(self, command: list[str], cwd: str | Path) -> AgentResult: + result = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False) + return AgentResult(exit_code=result.returncode, stdout=result.stdout, stderr=result.stderr) + + +def render_command(template: list[str], **values: object) -> list[str]: + substitutions = {key: str(value) for key, value in values.items()} + return [part.format(**substitutions) for part in template] + + +def write_prompt(path: str | Path, prompt: str) -> None: + Path(path).write_text(prompt, encoding="utf-8") + + +def read_report(path: str | Path) -> str: + report_path = Path(path) + if not report_path.exists(): + return "" + return report_path.read_text(encoding="utf-8") diff --git a/src/agent_gitea/cli.py b/src/agent_gitea/cli.py new file mode 100644 index 0000000..a80cb46 --- /dev/null +++ b/src/agent_gitea/cli.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Annotated + +import typer + +from .config import AppConfig, load_config +from .db import Database +from .gitea import GiteaClient +from .service import TaskRunner, scan_issues as scan_issues_service, sync_repositories + + +app = typer.Typer(no_args_is_help=True, help="Agentic Gitea issue-to-PR manager.") + + +class CliContext: + def __init__(self, config_path: Path): + self.config_path = config_path + self.config: AppConfig = load_config(config_path) + self.db = Database(self.config.database_path) + self.db.migrate() + + def gitea(self) -> GiteaClient: + return GiteaClient(self.config.gitea.base_url, self.config.gitea.token) + + +@app.callback() +def main( + ctx: typer.Context, + config: Annotated[Path, typer.Option("--config", "-c", help="Path to YAML config.")] = Path("config.yaml"), +) -> None: + ctx.obj = CliContext(config) + + +@app.command("sync-repos") +def sync_repos(ctx: typer.Context) -> None: + cli: CliContext = ctx.obj + client = cli.gitea() + try: + repos = sync_repositories(cli.db, cli.config, client) + finally: + client.close() + typer.echo(f"synced {len(repos)} repositories") + + +@app.command("scan-issues") +def scan_issues(ctx: typer.Context) -> None: + cli: CliContext = ctx.obj + client = cli.gitea() + try: + task_ids = scan_issues_service(cli.db, cli.config, client) + finally: + client.close() + typer.echo(f"created {len(task_ids)} tasks") + for task_id in task_ids: + typer.echo(f"task {task_id}") + + +@app.command("run-once") +def run_once(ctx: typer.Context) -> None: + cli: CliContext = ctx.obj + client = cli.gitea() + try: + task = TaskRunner(db=cli.db, config=cli.config, gitea=client).run_once() + finally: + client.close() + if task is None: + typer.echo("no task available") + else: + typer.echo(f"task {task.id} -> {task.state.value}") + + +@app.command("worker") +def worker(ctx: typer.Context) -> None: + cli: CliContext = ctx.obj + client = cli.gitea() + try: + TaskRunner(db=cli.db, config=cli.config, gitea=client).worker_loop() + finally: + client.close() + + +@app.command("show-task") +def show_task(ctx: typer.Context, task_id: int) -> None: + cli: CliContext = ctx.obj + task = cli.db.get_task(task_id) + if task is None: + raise typer.BadParameter(f"task not found: {task_id}") + issue = cli.db.get_issue(task.repo_id, task.issue_number) + repo_row = cli.db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone() + repo_name = repo_row["full_name"] if repo_row else f"repo:{task.repo_id}" + typer.echo(f"Task {task.id}") + typer.echo(f"Repository: {repo_name}") + typer.echo(f"Issue: #{task.issue_number}" + (f" {issue.title}" if issue else "")) + typer.echo(f"State: {task.state.value}") + typer.echo(f"Branch: {task.branch_name or ''}") + typer.echo(f"Workspace: {task.workspace_path or ''}") + typer.echo(f"PR: {task.pr_number or ''}") + typer.echo(f"Error: {task.error_message or ''}") + typer.echo("Events:") + for event in cli.db.list_events(task.id): + typer.echo(f"- {event['created_at']} {event['from_state'] or ''}->{event['to_state']} {event['message'] or ''}") + runs = cli.db.list_agent_runs(task.id) + if runs: + typer.echo("Agent runs:") + for run in runs: + typer.echo(f"- {run['role']} exit={run['exit_code']} at {run['created_at']}") + + +@app.command("retry-task") +def retry_task(ctx: typer.Context, task_id: int) -> None: + cli: CliContext = ctx.obj + task = cli.db.retry_task(task_id) + typer.echo(f"task {task.id} -> {task.state.value}") + + +@app.command("cancel-task") +def cancel_task(ctx: typer.Context, task_id: int) -> None: + cli: CliContext = ctx.obj + task = cli.db.cancel_task(task_id) + typer.echo(f"task {task.id} -> {task.state.value}") diff --git a/src/agent_gitea/config.py b/src/agent_gitea/config.py new file mode 100644 index 0000000..9ce74ef --- /dev/null +++ b/src/agent_gitea/config.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +import yaml +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +class GiteaConfig(BaseModel): + base_url: str + token_env: str = "GITEA_TOKEN" + + @property + def token(self) -> str: + token = os.environ.get(self.token_env) + if not token: + raise ValueError(f"missing Gitea token env var: {self.token_env}") + return token + + +class SchedulerConfig(BaseModel): + interval_seconds: int = Field(default=60, ge=1) + concurrency: int = Field(default=1, ge=1) + lease_seconds: int = Field(default=1800, ge=30) + + +class WorkspaceConfig(BaseModel): + root: Path = Path(".agent-gitea/workspaces") + cleanup_on_success: bool = False + + +class AgentCommandConfig(BaseModel): + command: list[str] + + @field_validator("command") + @classmethod + def command_must_not_be_empty(cls, value: list[str]) -> list[str]: + if not value: + raise ValueError("agent command must not be empty") + return value + + +class AgentsConfig(BaseModel): + implementer: AgentCommandConfig + reviewer: AgentCommandConfig + + +class LabelsConfig(BaseModel): + ready: str = "agent:ready" + running: str = "agent:running" + blocked: str = "agent:blocked" + skip: str = "agent:skip" + high_risk: str = "risk:high" + + +class RepositoryConfig(BaseModel): + owner: str + name: str + enabled: bool = True + default_branch: str = "main" + clone_url: str | None = None + + @property + def full_name(self) -> str: + return f"{self.owner}/{self.name}" + + +class AppConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + gitea: GiteaConfig + database_path: Path = Path(".agent-gitea/state.sqlite3") + scheduler: SchedulerConfig = SchedulerConfig() + workspace: WorkspaceConfig = WorkspaceConfig() + agents: AgentsConfig + labels: LabelsConfig = LabelsConfig() + repositories: list[RepositoryConfig] = Field(default_factory=list) + + @field_validator("repositories") + @classmethod + def repositories_must_be_unique(cls, value: list[RepositoryConfig]) -> list[RepositoryConfig]: + seen: set[str] = set() + for repo in value: + if repo.full_name in seen: + raise ValueError(f"duplicate repository config: {repo.full_name}") + seen.add(repo.full_name) + return value + + @property + def enabled_repositories(self) -> list[RepositoryConfig]: + return [repo for repo in self.repositories if repo.enabled] + + +def load_config(path: str | Path) -> AppConfig: + config_path = Path(path) + load_env_file(config_path.parent / ".env") + with config_path.open("r", encoding="utf-8") as handle: + raw: dict[str, Any] = yaml.safe_load(handle) or {} + return AppConfig.model_validate(raw) + + +def load_env_file(path: str | Path) -> None: + env_path = Path(path) + if not env_path.exists(): + return + for raw_line in env_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = value diff --git a/src/agent_gitea/db.py b/src/agent_gitea/db.py new file mode 100644 index 0000000..c44a06a --- /dev/null +++ b/src/agent_gitea/db.py @@ -0,0 +1,433 @@ +from __future__ import annotations + +import json +import sqlite3 +from datetime import timedelta +from pathlib import Path +from typing import Iterable + +from .models import ( + ACTIVE_STATES, + IssueRecord, + RepositoryRecord, + TaskRecord, + TaskState, + dt_to_db, + parse_dt, + utcnow, +) +from .state import validate_transition + + +SCHEMA = """ +PRAGMA foreign_keys = ON; + +CREATE TABLE IF NOT EXISTS repositories ( + id INTEGER PRIMARY KEY, + owner TEXT NOT NULL, + name TEXT NOT NULL, + full_name TEXT NOT NULL UNIQUE, + clone_url TEXT NOT NULL, + default_branch TEXT NOT NULL, + enabled INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS issues ( + id INTEGER PRIMARY KEY, + repo_id INTEGER NOT NULL REFERENCES repositories(id), + issue_number INTEGER NOT NULL, + title TEXT NOT NULL, + body TEXT NOT NULL, + labels_json TEXT NOT NULL, + state TEXT NOT NULL, + html_url TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(repo_id, issue_number) +); + +CREATE TABLE IF NOT EXISTS tasks ( + id INTEGER PRIMARY KEY, + repo_id INTEGER NOT NULL REFERENCES repositories(id), + issue_number INTEGER NOT NULL, + state TEXT NOT NULL, + lease_owner TEXT, + lease_expires_at TEXT, + branch_name TEXT, + workspace_path TEXT, + pr_number INTEGER, + error_message TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS task_events ( + id INTEGER PRIMARY KEY, + task_id INTEGER NOT NULL REFERENCES tasks(id), + from_state TEXT, + to_state TEXT NOT NULL, + message TEXT, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS agent_runs ( + id INTEGER PRIMARY KEY, + task_id INTEGER NOT NULL REFERENCES tasks(id), + role TEXT NOT NULL, + command_json TEXT NOT NULL, + prompt TEXT NOT NULL, + stdout TEXT NOT NULL, + stderr TEXT NOT NULL, + exit_code INTEGER NOT NULL, + report TEXT, + created_at TEXT NOT NULL +); +""" + + +class Database: + def __init__(self, path: str | Path): + self.path = Path(path) + self.path.parent.mkdir(parents=True, exist_ok=True) + self.conn = sqlite3.connect(self.path) + self.conn.row_factory = sqlite3.Row + self.conn.execute("PRAGMA foreign_keys = ON") + + def close(self) -> None: + self.conn.close() + + def migrate(self) -> None: + self.conn.executescript(SCHEMA) + self.conn.commit() + + def upsert_repository( + self, + *, + owner: str, + name: str, + clone_url: str, + default_branch: str, + enabled: bool, + ) -> RepositoryRecord: + now = dt_to_db(utcnow()) + full_name = f"{owner}/{name}" + self.conn.execute( + """ + INSERT INTO repositories (owner, name, full_name, clone_url, default_branch, enabled, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(full_name) DO UPDATE SET + clone_url = excluded.clone_url, + default_branch = excluded.default_branch, + enabled = excluded.enabled, + updated_at = excluded.updated_at + """, + (owner, name, full_name, clone_url, default_branch, int(enabled), now, now), + ) + self.conn.commit() + repo = self.get_repository(full_name) + assert repo is not None + return repo + + def get_repository(self, full_name: str) -> RepositoryRecord | None: + row = self.conn.execute("SELECT * FROM repositories WHERE full_name = ?", (full_name,)).fetchone() + return self._repo(row) if row else None + + def list_enabled_repositories(self) -> list[RepositoryRecord]: + rows = self.conn.execute("SELECT * FROM repositories WHERE enabled = 1 ORDER BY full_name").fetchall() + return [self._repo(row) for row in rows] + + def disable_repositories_except(self, full_names: set[str]) -> None: + now = dt_to_db(utcnow()) + if not full_names: + self.conn.execute("UPDATE repositories SET enabled = 0, updated_at = ?", (now,)) + else: + placeholders = ",".join("?" for _ in full_names) + self.conn.execute( + f"UPDATE repositories SET enabled = 0, updated_at = ? WHERE full_name NOT IN ({placeholders})", + (now, *sorted(full_names)), + ) + self.conn.commit() + + def upsert_issue( + self, + *, + repo_id: int, + issue_number: int, + title: str, + body: str, + labels: Iterable[str], + state: str, + html_url: str, + ) -> IssueRecord: + now = dt_to_db(utcnow()) + labels_json = json.dumps(sorted(labels)) + self.conn.execute( + """ + INSERT INTO issues (repo_id, issue_number, title, body, labels_json, state, html_url, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(repo_id, issue_number) DO UPDATE SET + title = excluded.title, + body = excluded.body, + labels_json = excluded.labels_json, + state = excluded.state, + html_url = excluded.html_url, + updated_at = excluded.updated_at + """, + (repo_id, issue_number, title, body or "", labels_json, state, html_url, now), + ) + self.conn.commit() + issue = self.get_issue(repo_id, issue_number) + assert issue is not None + return issue + + def get_issue(self, repo_id: int, issue_number: int) -> IssueRecord | None: + row = self.conn.execute( + "SELECT * FROM issues WHERE repo_id = ? AND issue_number = ?", + (repo_id, issue_number), + ).fetchone() + return self._issue(row) if row else None + + def list_open_issues(self, repo_id: int) -> list[IssueRecord]: + rows = self.conn.execute( + "SELECT * FROM issues WHERE repo_id = ? AND state = 'open' ORDER BY issue_number", + (repo_id,), + ).fetchall() + return [self._issue(row) for row in rows] + + def active_task_for_issue(self, repo_id: int, issue_number: int) -> TaskRecord | None: + placeholders = ",".join("?" for _ in ACTIVE_STATES) + rows = self.conn.execute( + f""" + SELECT * FROM tasks + WHERE repo_id = ? AND issue_number = ? AND state IN ({placeholders}) + ORDER BY id DESC LIMIT 1 + """, + (repo_id, issue_number, *[state.value for state in ACTIVE_STATES]), + ).fetchall() + return self._task(rows[0]) if rows else None + + def create_task(self, repo_id: int, issue_number: int) -> TaskRecord: + now = dt_to_db(utcnow()) + self.conn.execute( + """ + INSERT INTO tasks (repo_id, issue_number, state, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + """, + (repo_id, issue_number, TaskState.DISCOVERED.value, now, now), + ) + self.conn.commit() + task_id = int(self.conn.execute("SELECT last_insert_rowid()").fetchone()[0]) + task = self.get_task(task_id) + assert task is not None + self.add_event(task.id, None, TaskState.DISCOVERED, "task discovered") + return task + + def get_task(self, task_id: int) -> TaskRecord | None: + row = self.conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone() + return self._task(row) if row else None + + def get_task_by_issue(self, repo_id: int, issue_number: int) -> TaskRecord | None: + row = self.conn.execute( + "SELECT * FROM tasks WHERE repo_id = ? AND issue_number = ? ORDER BY id DESC LIMIT 1", + (repo_id, issue_number), + ).fetchone() + return self._task(row) if row else None + + def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None: + now = utcnow() + expires = now + timedelta(seconds=lease_seconds) + row = self.conn.execute( + """ + SELECT * FROM tasks + WHERE state = ? + OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?) + ORDER BY created_at + LIMIT 1 + """, + ( + TaskState.DISCOVERED.value, + TaskState.CLAIMED.value, + TaskState.PLANNING.value, + TaskState.IMPLEMENTING.value, + TaskState.TESTING.value, + TaskState.PR_OPENED.value, + TaskState.REVIEWING.value, + TaskState.DISCOVERED.value, + dt_to_db(now), + ), + ).fetchone() + if row is None: + return None + task = self._task(row) + self.conn.execute( + """ + UPDATE tasks + SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ? + WHERE id = ? + """, + ( + TaskState.CLAIMED.value, + worker_id, + dt_to_db(expires), + dt_to_db(now), + task.id, + ), + ) + self.conn.commit() + self.add_event(task.id, task.state, TaskState.CLAIMED, f"claimed by {worker_id}") + claimed = self.get_task(task.id) + assert claimed is not None + return claimed + + def transition( + self, + task_id: int, + to_state: TaskState, + *, + message: str | None = None, + branch_name: str | None = None, + workspace_path: str | Path | None = None, + pr_number: int | None = None, + error_message: str | None = None, + clear_lease: bool = False, + ) -> TaskRecord: + task = self.get_task(task_id) + if task is None: + raise ValueError(f"task not found: {task_id}") + validate_transition(task.state, to_state) + now = dt_to_db(utcnow()) + updates = ["state = ?", "updated_at = ?"] + values: list[object] = [to_state.value, now] + if branch_name is not None: + updates.append("branch_name = ?") + values.append(branch_name) + if workspace_path is not None: + updates.append("workspace_path = ?") + values.append(str(workspace_path)) + if pr_number is not None: + updates.append("pr_number = ?") + values.append(pr_number) + if error_message is not None: + updates.append("error_message = ?") + values.append(error_message) + if clear_lease: + updates.extend(["lease_owner = NULL", "lease_expires_at = NULL"]) + values.append(task_id) + self.conn.execute(f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?", values) + self.conn.commit() + self.add_event(task_id, task.state, to_state, message) + updated = self.get_task(task_id) + assert updated is not None + return updated + + def retry_task(self, task_id: int) -> TaskRecord: + return self.transition( + task_id, + TaskState.DISCOVERED, + message="manual retry", + error_message="", + clear_lease=True, + ) + + def cancel_task(self, task_id: int) -> TaskRecord: + return self.transition(task_id, TaskState.CANCELLED, message="manual cancellation", clear_lease=True) + + def add_event( + self, + task_id: int, + from_state: TaskState | None, + to_state: TaskState, + message: str | None, + ) -> None: + self.conn.execute( + """ + INSERT INTO task_events (task_id, from_state, to_state, message, created_at) + VALUES (?, ?, ?, ?, ?) + """, + (task_id, from_state.value if from_state else None, to_state.value, message, dt_to_db(utcnow())), + ) + self.conn.commit() + + def add_agent_run( + self, + *, + task_id: int, + role: str, + command: list[str], + prompt: str, + stdout: str, + stderr: str, + exit_code: int, + report: str | None, + ) -> None: + self.conn.execute( + """ + INSERT INTO agent_runs (task_id, role, command_json, prompt, stdout, stderr, exit_code, report, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + task_id, + role, + json.dumps(command), + prompt, + stdout, + stderr, + exit_code, + report, + dt_to_db(utcnow()), + ), + ) + self.conn.commit() + + def list_events(self, task_id: int) -> list[sqlite3.Row]: + return self.conn.execute( + "SELECT * FROM task_events WHERE task_id = ? ORDER BY id", + (task_id,), + ).fetchall() + + def list_agent_runs(self, task_id: int) -> list[sqlite3.Row]: + return self.conn.execute( + "SELECT * FROM agent_runs WHERE task_id = ? ORDER BY id", + (task_id,), + ).fetchall() + + def _repo(self, row: sqlite3.Row) -> RepositoryRecord: + return RepositoryRecord( + id=int(row["id"]), + owner=row["owner"], + name=row["name"], + full_name=row["full_name"], + clone_url=row["clone_url"], + default_branch=row["default_branch"], + enabled=bool(row["enabled"]), + ) + + def _issue(self, row: sqlite3.Row) -> IssueRecord: + return IssueRecord( + id=int(row["id"]), + repo_id=int(row["repo_id"]), + issue_number=int(row["issue_number"]), + title=row["title"], + body=row["body"], + labels=json.loads(row["labels_json"]), + state=row["state"], + html_url=row["html_url"], + ) + + def _task(self, row: sqlite3.Row) -> TaskRecord: + workspace_path = Path(row["workspace_path"]) if row["workspace_path"] else None + return TaskRecord( + id=int(row["id"]), + repo_id=int(row["repo_id"]), + issue_number=int(row["issue_number"]), + state=TaskState(row["state"]), + lease_owner=row["lease_owner"], + lease_expires_at=parse_dt(row["lease_expires_at"]), + branch_name=row["branch_name"], + workspace_path=workspace_path, + pr_number=int(row["pr_number"]) if row["pr_number"] is not None else None, + error_message=row["error_message"], + created_at=parse_dt(row["created_at"]), # type: ignore[arg-type] + updated_at=parse_dt(row["updated_at"]), # type: ignore[arg-type] + ) diff --git a/src/agent_gitea/gitea.py b/src/agent_gitea/gitea.py new file mode 100644 index 0000000..9280e98 --- /dev/null +++ b/src/agent_gitea/gitea.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import httpx + +from .config import RepositoryConfig +from .models import labels_from_gitea + + +@dataclass(frozen=True) +class GiteaIssue: + number: int + title: str + body: str + labels: list[str] + state: str + html_url: str + + +@dataclass(frozen=True) +class GiteaPullRequest: + number: int + html_url: str + + +@dataclass(frozen=True) +class GiteaRepository: + owner: str + name: str + full_name: str + clone_url: str + default_branch: str + + +class GiteaClient: + def __init__(self, base_url: str, token: str, *, transport: httpx.BaseTransport | None = None): + self.base_url = base_url.rstrip("/") + self.client = httpx.Client( + base_url=f"{self.base_url}/api/v1", + headers={ + "Authorization": f"token {token}", + "Accept": "application/json", + }, + timeout=30, + transport=transport, + ) + + def close(self) -> None: + self.client.close() + + def get_repository(self, repo: RepositoryConfig) -> dict[str, Any]: + response = self.client.get(f"/repos/{repo.owner}/{repo.name}") + response.raise_for_status() + return response.json() + + def get_authenticated_username(self) -> str: + response = self.client.get("/user") + response.raise_for_status() + payload = response.json() + username = payload.get("login") or payload.get("username") + if not username: + raise ValueError("Gitea /user response did not include login or username") + return str(username) + + def list_owned_repositories(self) -> list[GiteaRepository]: + username = self.get_authenticated_username() + repositories: list[GiteaRepository] = [] + page = 1 + limit = 50 + while True: + response = self.client.get("/user/repos", params={"page": page, "limit": limit}) + response.raise_for_status() + payload = response.json() + if not payload: + break + for item in payload: + repo = repository_from_payload(item, self.base_url) + if repo.owner.casefold() == username.casefold(): + repositories.append(repo) + if len(payload) < limit: + break + page += 1 + return repositories + + def list_open_issues(self, owner: str, name: str) -> list[GiteaIssue]: + response = self.client.get( + f"/repos/{owner}/{name}/issues", + params={"state": "open", "type": "issues", "limit": 50}, + ) + response.raise_for_status() + issues: list[GiteaIssue] = [] + for item in response.json(): + if "pull_request" in item: + continue + issues.append( + GiteaIssue( + number=int(item["number"]), + title=item.get("title") or "", + body=item.get("body") or "", + labels=labels_from_gitea(item.get("labels")), + state=item.get("state") or "open", + html_url=item.get("html_url") or item.get("url") or "", + ) + ) + return issues + + def create_pull_request( + self, + *, + owner: str, + name: str, + title: str, + body: str, + head: str, + base: str, + ) -> GiteaPullRequest: + response = self.client.post( + f"/repos/{owner}/{name}/pulls", + json={"title": title, "body": body, "head": head, "base": base}, + ) + response.raise_for_status() + payload = response.json() + return GiteaPullRequest(number=int(payload["number"]), html_url=payload.get("html_url") or "") + + def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> None: + response = self.client.post( + f"/repos/{owner}/{name}/issues/{issue_number}/comments", + json={"body": body}, + ) + response.raise_for_status() + + +def clone_url_from_repo_payload(payload: dict[str, Any], fallback_base_url: str, owner: str, name: str) -> str: + return ( + payload.get("ssh_url") + or payload.get("clone_url") + or payload.get("html_url") + or f"{fallback_base_url.rstrip('/')}/{owner}/{name}.git" + ) + + +def repository_from_payload(payload: dict[str, Any], fallback_base_url: str) -> GiteaRepository: + owner_payload = payload.get("owner") or {} + owner = ( + owner_payload.get("login") + or owner_payload.get("username") + or payload.get("owner_name") + or str(payload.get("full_name", "")).split("/", 1)[0] + ) + name = payload.get("name") or str(payload.get("full_name", "")).split("/", 1)[-1] + if not owner or not name: + raise ValueError("Gitea repository response did not include owner/name") + return GiteaRepository( + owner=str(owner), + name=str(name), + full_name=payload.get("full_name") or f"{owner}/{name}", + clone_url=clone_url_from_repo_payload(payload, fallback_base_url, str(owner), str(name)), + default_branch=payload.get("default_branch") or "main", + ) diff --git a/src/agent_gitea/models.py b/src/agent_gitea/models.py new file mode 100644 index 0000000..103d5f9 --- /dev/null +++ b/src/agent_gitea/models.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import StrEnum +from pathlib import Path +from typing import Any + + +class TaskState(StrEnum): + DISCOVERED = "DISCOVERED" + CLAIMED = "CLAIMED" + PLANNING = "PLANNING" + IMPLEMENTING = "IMPLEMENTING" + TESTING = "TESTING" + PR_OPENED = "PR_OPENED" + REVIEWING = "REVIEWING" + HUMAN_REVIEW_READY = "HUMAN_REVIEW_READY" + BLOCKED = "BLOCKED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + + +ACTIVE_STATES = { + TaskState.DISCOVERED, + TaskState.CLAIMED, + TaskState.PLANNING, + TaskState.IMPLEMENTING, + TaskState.TESTING, + TaskState.PR_OPENED, + TaskState.REVIEWING, +} + +TERMINAL_STATES = { + TaskState.HUMAN_REVIEW_READY, + TaskState.BLOCKED, + TaskState.FAILED, + TaskState.CANCELLED, +} + + +@dataclass(frozen=True) +class RepositoryRecord: + id: int + owner: str + name: str + full_name: str + clone_url: str + default_branch: str + enabled: bool + + +@dataclass(frozen=True) +class IssueRecord: + id: int + repo_id: int + issue_number: int + title: str + body: str + labels: list[str] + state: str + html_url: str + + +@dataclass(frozen=True) +class TaskRecord: + id: int + repo_id: int + issue_number: int + state: TaskState + lease_owner: str | None + lease_expires_at: datetime | None + branch_name: str | None + workspace_path: Path | None + pr_number: int | None + error_message: str | None + created_at: datetime + updated_at: datetime + + +@dataclass(frozen=True) +class AgentResult: + exit_code: int + stdout: str + stderr: str + + @property + def ok(self) -> bool: + return self.exit_code == 0 + + +def utcnow() -> datetime: + return datetime.now(timezone.utc).replace(microsecond=0) + + +def parse_dt(value: str | None) -> datetime | None: + if not value: + return None + if value.endswith("Z"): + value = value[:-1] + "+00:00" + return datetime.fromisoformat(value) + + +def dt_to_db(value: datetime | None) -> str | None: + if value is None: + return None + if value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) + return value.astimezone(timezone.utc).isoformat() + + +def labels_from_gitea(labels: list[dict[str, Any]] | list[str] | None) -> list[str]: + names: list[str] = [] + for label in labels or []: + if isinstance(label, str): + names.append(label) + else: + name = label.get("name") + if name: + names.append(str(name)) + return names diff --git a/src/agent_gitea/rendering.py b/src/agent_gitea/rendering.py new file mode 100644 index 0000000..271737e --- /dev/null +++ b/src/agent_gitea/rendering.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass + +from .models import IssueRecord, RepositoryRecord + + +VALID_VERDICTS = {"APPROVE", "REQUEST_CHANGES", "NEEDS_HUMAN_DECISION"} + + +@dataclass(frozen=True) +class ReviewReport: + verdict: str + raw: str + suggested_pr_comment: str + + +def render_implementer_prompt(repo: RepositoryRecord, issue: IssueRecord, branch_name: str) -> str: + return f"""# Agent Implementation Task + +Repository: {repo.full_name} +Base branch: {repo.default_branch} +Working branch: {branch_name} +Issue: #{issue.issue_number} {issue.title} +Issue URL: {issue.html_url} + +## Issue Body + +{issue.body or "(no issue body)"} + +## Instructions + +Implement the requested change in this workspace. Keep the change scoped to this issue. +Run the relevant tests before finishing. + +Write `AGENT_IMPLEMENTATION_REPORT.md` in the workspace root using this exact section contract: + +- Summary +- Files changed +- Test commands run +- Test results +- Known risks +- Follow-up suggestions +""" + + +def render_reviewer_prompt(repo: RepositoryRecord, issue: IssueRecord, pr_number: int) -> str: + return f"""# Agent Review Task + +Repository: {repo.full_name} +Pull request: #{pr_number} +Issue: #{issue.issue_number} {issue.title} + +Review the implementation currently checked out in this workspace. Focus on correctness, +scope control, test evidence, and human risks. Do not modify code. + +Write `AGENT_REVIEW_REPORT.md` in the workspace root using this exact section contract: + +- Verdict: APPROVE, REQUEST_CHANGES, or NEEDS_HUMAN_DECISION +- Summary +- Correctness +- Scope Control +- Test Evidence +- Risks +- Required Human Checks +- Suggested PR Comment +""" + + +def render_pr_body(issue: IssueRecord, implementation_report: str) -> str: + return f"""Closes #{issue.issue_number} + +## Agent Implementation Report + +{implementation_report.strip()} + +## Human Review Gate + +This PR was opened by the local agent manager. It has not been auto-merged. +Human maintainers must review, decide, and merge manually. +""" + + +def parse_review_report(raw: str) -> ReviewReport: + verdict_match = re.search(r"(?im)^\s*(?:##\s*)?Verdict\s*:?\s*`?([A-Z_]+)`?", raw) + verdict = verdict_match.group(1) if verdict_match else "NEEDS_HUMAN_DECISION" + if verdict not in VALID_VERDICTS: + verdict = "NEEDS_HUMAN_DECISION" + suggested = extract_section(raw, "Suggested PR Comment").strip() + if not suggested: + suggested = raw.strip() + return ReviewReport(verdict=verdict, raw=raw, suggested_pr_comment=suggested) + + +def extract_section(raw: str, title: str) -> str: + pattern = re.compile( + rf"(?ims)^\s*##\s+{re.escape(title)}\s*$\n(?P.*?)(?=^\s*##\s+|\Z)" + ) + match = pattern.search(raw) + return match.group("body") if match else "" + + +def render_human_review_summary(review: ReviewReport) -> str: + return f"""## Agent Review Summary + +Verdict: `{review.verdict}` + +{review.suggested_pr_comment.strip()} + +## Human Action Required + +Please review the PR manually. The agent manager will not merge, close, or request changes automatically. +""" diff --git a/src/agent_gitea/scanner.py b/src/agent_gitea/scanner.py new file mode 100644 index 0000000..6902e48 --- /dev/null +++ b/src/agent_gitea/scanner.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from .config import LabelsConfig +from .db import Database +from .models import IssueRecord + + +BLOCKING_LABEL_ATTRS = ("running", "blocked", "skip", "high_risk") + + +def is_issue_eligible(issue: IssueRecord, labels: LabelsConfig) -> bool: + if issue.state != "open": + return False + label_set = set(issue.labels) + if labels.ready not in label_set: + return False + blocking_labels = {getattr(labels, attr) for attr in BLOCKING_LABEL_ATTRS} + return label_set.isdisjoint(blocking_labels) + + +def scan_eligible_issues( + db: Database, + labels: LabelsConfig, + *, + allowed_repositories: set[str] | None = None, +) -> list[int]: + created: list[int] = [] + for repo in db.list_enabled_repositories(): + if allowed_repositories is not None and repo.full_name not in allowed_repositories: + continue + for issue in db.list_open_issues(repo.id): + if not is_issue_eligible(issue, labels): + continue + if db.active_task_for_issue(repo.id, issue.issue_number): + continue + task = db.create_task(repo.id, issue.issue_number) + created.append(task.id) + return created diff --git a/src/agent_gitea/service.py b/src/agent_gitea/service.py new file mode 100644 index 0000000..0f06a78 --- /dev/null +++ b/src/agent_gitea/service.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import socket +import time +from pathlib import Path + +from .agents import CommandRunner, read_report, render_command, write_prompt +from .config import AppConfig +from .db import Database +from .gitea import GiteaClient +from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState +from .rendering import ( + parse_review_report, + render_human_review_summary, + render_implementer_prompt, + render_pr_body, + render_reviewer_prompt, +) +from .scanner import scan_eligible_issues +from .state import validate_transition +from .workspace import WorkspaceManager, safe_branch_name + + +def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]: + synced: list[RepositoryRecord] = [] + discovered = client.list_owned_repositories() + discovered_names = {repo.full_name for repo in discovered} + for repo in discovered: + synced.append( + db.upsert_repository( + owner=repo.owner, + name=repo.name, + clone_url=repo.clone_url, + default_branch=repo.default_branch, + enabled=True, + ) + ) + db.disable_repositories_except(discovered_names) + return synced + + +def fetch_issues(db: Database, client: GiteaClient, *, allowed_repositories: set[str] | None = None) -> int: + count = 0 + for repo in db.list_enabled_repositories(): + if allowed_repositories is not None and repo.full_name not in allowed_repositories: + continue + for issue in client.list_open_issues(repo.owner, repo.name): + db.upsert_issue( + repo_id=repo.id, + issue_number=issue.number, + title=issue.title, + body=issue.body, + labels=issue.labels, + state=issue.state, + html_url=issue.html_url, + ) + count += 1 + return count + + +def scan_issues(db: Database, config: AppConfig, client: GiteaClient) -> list[int]: + fetch_issues(db, client) + return scan_eligible_issues(db, config.labels) + + +class TaskRunner: + def __init__( + self, + *, + db: Database, + config: AppConfig, + gitea: GiteaClient, + workspace_manager: WorkspaceManager | None = None, + command_runner: CommandRunner | None = None, + worker_id: str | None = None, + ): + self.db = db + self.config = config + self.gitea = gitea + self.workspace_manager = workspace_manager or WorkspaceManager(config.workspace.root) + self.command_runner = command_runner or CommandRunner() + self.worker_id = worker_id or f"{socket.gethostname()}:{Path.cwd()}" + + def run_once(self) -> TaskRecord | None: + task = self.db.claim_next_task(self.worker_id, self.config.scheduler.lease_seconds) + if task is None: + return None + return self.run_claimed(task) + + def worker_loop(self) -> None: + while True: + sync_repositories(self.db, self.config, self.gitea) + scan_issues(self.db, self.config, self.gitea) + task = self.run_once() + if task is None: + time.sleep(self.config.scheduler.interval_seconds) + + def run_claimed(self, task: TaskRecord) -> TaskRecord: + try: + repo, issue = self._load_context(task) + validate_transition(task.state, TaskState.PLANNING) + task = self.db.transition(task.id, TaskState.PLANNING, message="rendering implementation prompt") + branch_name = safe_branch_name(issue) + workspace = self.workspace_manager.prepare(repo, issue, branch_name) + task = self.db.transition( + task.id, + TaskState.IMPLEMENTING, + message="running implementer agent", + branch_name=branch_name, + workspace_path=workspace, + ) + implementation_report = self._run_implementer(task, repo, issue, branch_name, workspace) + task = self.db.transition(task.id, TaskState.TESTING, message="checking implementation diff") + if not self.workspace_manager.has_diff(workspace, f"origin/{repo.default_branch}"): + return self.db.transition( + task.id, + TaskState.BLOCKED, + message="implementer produced no diff", + error_message="implementer produced no diff", + clear_lease=True, + ) + self.workspace_manager.push_branch(workspace, branch_name) + pr_body = render_pr_body(issue, implementation_report) + pr = self.gitea.create_pull_request( + owner=repo.owner, + name=repo.name, + title=f"Agent: {issue.title}", + body=pr_body, + head=branch_name, + base=repo.default_branch, + ) + task = self.db.transition( + task.id, + TaskState.PR_OPENED, + message=f"opened PR #{pr.number}", + pr_number=pr.number, + ) + validate_transition(task.state, TaskState.REVIEWING) + task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent") + review_report_raw = self._run_reviewer(task, repo, issue, pr.number, workspace) + review = parse_review_report(review_report_raw) + self.gitea.post_issue_comment( + owner=repo.owner, + name=repo.name, + issue_number=pr.number, + body=review_report_raw.strip() or "Reviewer did not produce a report.", + ) + self.gitea.post_issue_comment( + owner=repo.owner, + name=repo.name, + issue_number=pr.number, + body=render_human_review_summary(review), + ) + final_task = self.db.transition( + task.id, + TaskState.HUMAN_REVIEW_READY, + message=f"human review summary posted with verdict {review.verdict}", + clear_lease=True, + ) + if self.config.workspace.cleanup_on_success and final_task.workspace_path: + self.workspace_manager.cleanup(final_task.workspace_path) + return final_task + except Exception as exc: + return self.db.transition( + task.id, + TaskState.FAILED, + message="task failed", + error_message=str(exc), + clear_lease=True, + ) + + def _run_implementer( + self, + task: TaskRecord, + repo: RepositoryRecord, + issue: IssueRecord, + branch_name: str, + workspace: Path, + ) -> str: + prompt = render_implementer_prompt(repo, issue, branch_name) + prompt_path = workspace / "AGENT_IMPLEMENTER_PROMPT.md" + write_prompt(prompt_path, prompt) + command = render_command( + self.config.agents.implementer.command, + workspace_path=workspace, + prompt_path=prompt_path, + issue_number=issue.issue_number, + issue_title=issue.title, + branch_name=branch_name, + ) + result = self.command_runner.run(command, workspace) + report = read_report(workspace / "AGENT_IMPLEMENTATION_REPORT.md") + self.db.add_agent_run( + task_id=task.id, + role="implementer", + command=command, + prompt=prompt, + stdout=result.stdout, + stderr=result.stderr, + exit_code=result.exit_code, + report=report, + ) + if not result.ok: + raise RuntimeError(f"implementer failed with exit code {result.exit_code}") + return report or "(implementer did not produce AGENT_IMPLEMENTATION_REPORT.md)" + + def _run_reviewer( + self, + task: TaskRecord, + repo: RepositoryRecord, + issue: IssueRecord, + pr_number: int, + workspace: Path, + ) -> str: + prompt = render_reviewer_prompt(repo, issue, pr_number) + prompt_path = workspace / "AGENT_REVIEWER_PROMPT.md" + write_prompt(prompt_path, prompt) + command = render_command( + self.config.agents.reviewer.command, + workspace_path=workspace, + prompt_path=prompt_path, + issue_number=issue.issue_number, + issue_title=issue.title, + pr_number=pr_number, + ) + result = self.command_runner.run(command, workspace) + report = read_report(workspace / "AGENT_REVIEW_REPORT.md") + self.db.add_agent_run( + task_id=task.id, + role="reviewer", + command=command, + prompt=prompt, + stdout=result.stdout, + stderr=result.stderr, + exit_code=result.exit_code, + report=report, + ) + if not result.ok: + raise RuntimeError(f"reviewer failed with exit code {result.exit_code}") + return report + + def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]: + repo_row = self.db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone() + if repo_row is None: + raise ValueError(f"repository not found for task {task.id}") + repo = self.db._repo(repo_row) + issue = self.db.get_issue(task.repo_id, task.issue_number) + if issue is None: + raise ValueError(f"issue not found for task {task.id}") + return repo, issue diff --git a/src/agent_gitea/state.py b/src/agent_gitea/state.py new file mode 100644 index 0000000..36e0899 --- /dev/null +++ b/src/agent_gitea/state.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from .models import TaskState + + +ALLOWED_TRANSITIONS: dict[TaskState, set[TaskState]] = { + TaskState.DISCOVERED: {TaskState.CLAIMED, TaskState.CANCELLED}, + TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, + TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, + TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, + TaskState.TESTING: {TaskState.PR_OPENED, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, + TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED}, + TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED}, + TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED}, + TaskState.BLOCKED: {TaskState.DISCOVERED, TaskState.CANCELLED}, + TaskState.FAILED: {TaskState.DISCOVERED, TaskState.CANCELLED}, + TaskState.CANCELLED: {TaskState.DISCOVERED}, +} + + +def validate_transition(from_state: TaskState, to_state: TaskState) -> None: + if to_state not in ALLOWED_TRANSITIONS[from_state]: + raise ValueError(f"invalid task transition: {from_state.value} -> {to_state.value}") diff --git a/src/agent_gitea/workspace.py b/src/agent_gitea/workspace.py new file mode 100644 index 0000000..b82f8d1 --- /dev/null +++ b/src/agent_gitea/workspace.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import re +import shutil +import subprocess +from pathlib import Path + +from .models import IssueRecord, RepositoryRecord + + +def safe_branch_name(issue: IssueRecord) -> str: + slug = re.sub(r"[^a-zA-Z0-9]+", "-", issue.title.lower()).strip("-") + slug = re.sub(r"-{2,}", "-", slug)[:48].strip("-") + if not slug: + slug = "issue" + return f"agent/issue-{issue.issue_number}-{slug}" + + +class WorkspaceManager: + def __init__(self, root: str | Path): + self.root = Path(root) + self.root.mkdir(parents=True, exist_ok=True) + + def task_workspace(self, repo: RepositoryRecord, issue: IssueRecord) -> Path: + return self.root / repo.full_name.replace("/", "__") / f"issue-{issue.issue_number}" + + def prepare(self, repo: RepositoryRecord, issue: IssueRecord, branch_name: str) -> Path: + path = self.task_workspace(repo, issue) + if path.exists(): + shutil.rmtree(path) + path.parent.mkdir(parents=True, exist_ok=True) + self._git(["clone", repo.clone_url, str(path)], Path.cwd()) + self._git(["checkout", repo.default_branch], path) + self._git(["checkout", "-B", branch_name], path) + return path + + def has_diff(self, workspace: str | Path, base_ref: str = "origin/HEAD") -> bool: + result = self._git(["status", "--porcelain"], Path(workspace), check=False) + if result.stdout.strip(): + return True + diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False) + return diff.returncode == 1 + + def push_branch(self, workspace: str | Path, branch_name: str) -> None: + self._git(["push", "-u", "origin", branch_name], Path(workspace)) + + def cleanup(self, workspace: str | Path) -> None: + shutil.rmtree(workspace, ignore_errors=True) + + def _git( + self, + args: list[str], + cwd: Path, + *, + check: bool = True, + ) -> subprocess.CompletedProcess[str]: + result = subprocess.run( + ["git", *args], + cwd=cwd, + text=True, + capture_output=True, + check=False, + ) + if check and result.returncode != 0: + raise RuntimeError(f"git {' '.join(args)} failed: {result.stderr.strip()}") + return result