feat: implement Gitea issue-to-PR workflow
This commit is contained in:
161
src/agent_gitea/gitea.py
Normal file
161
src/agent_gitea/gitea.py
Normal file
@@ -0,0 +1,161 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import RepositoryConfig
|
||||
from .models import labels_from_gitea
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GiteaIssue:
|
||||
number: int
|
||||
title: str
|
||||
body: str
|
||||
labels: list[str]
|
||||
state: str
|
||||
html_url: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GiteaPullRequest:
|
||||
number: int
|
||||
html_url: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GiteaRepository:
|
||||
owner: str
|
||||
name: str
|
||||
full_name: str
|
||||
clone_url: str
|
||||
default_branch: str
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, base_url: str, token: str, *, transport: httpx.BaseTransport | None = None):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.client = httpx.Client(
|
||||
base_url=f"{self.base_url}/api/v1",
|
||||
headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
timeout=30,
|
||||
transport=transport,
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
self.client.close()
|
||||
|
||||
def get_repository(self, repo: RepositoryConfig) -> dict[str, Any]:
|
||||
response = self.client.get(f"/repos/{repo.owner}/{repo.name}")
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def get_authenticated_username(self) -> str:
|
||||
response = self.client.get("/user")
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
username = payload.get("login") or payload.get("username")
|
||||
if not username:
|
||||
raise ValueError("Gitea /user response did not include login or username")
|
||||
return str(username)
|
||||
|
||||
def list_owned_repositories(self) -> list[GiteaRepository]:
|
||||
username = self.get_authenticated_username()
|
||||
repositories: list[GiteaRepository] = []
|
||||
page = 1
|
||||
limit = 50
|
||||
while True:
|
||||
response = self.client.get("/user/repos", params={"page": page, "limit": limit})
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
if not payload:
|
||||
break
|
||||
for item in payload:
|
||||
repo = repository_from_payload(item, self.base_url)
|
||||
if repo.owner.casefold() == username.casefold():
|
||||
repositories.append(repo)
|
||||
if len(payload) < limit:
|
||||
break
|
||||
page += 1
|
||||
return repositories
|
||||
|
||||
def list_open_issues(self, owner: str, name: str) -> list[GiteaIssue]:
|
||||
response = self.client.get(
|
||||
f"/repos/{owner}/{name}/issues",
|
||||
params={"state": "open", "type": "issues", "limit": 50},
|
||||
)
|
||||
response.raise_for_status()
|
||||
issues: list[GiteaIssue] = []
|
||||
for item in response.json():
|
||||
if "pull_request" in item:
|
||||
continue
|
||||
issues.append(
|
||||
GiteaIssue(
|
||||
number=int(item["number"]),
|
||||
title=item.get("title") or "",
|
||||
body=item.get("body") or "",
|
||||
labels=labels_from_gitea(item.get("labels")),
|
||||
state=item.get("state") or "open",
|
||||
html_url=item.get("html_url") or item.get("url") or "",
|
||||
)
|
||||
)
|
||||
return issues
|
||||
|
||||
def create_pull_request(
|
||||
self,
|
||||
*,
|
||||
owner: str,
|
||||
name: str,
|
||||
title: str,
|
||||
body: str,
|
||||
head: str,
|
||||
base: str,
|
||||
) -> GiteaPullRequest:
|
||||
response = self.client.post(
|
||||
f"/repos/{owner}/{name}/pulls",
|
||||
json={"title": title, "body": body, "head": head, "base": base},
|
||||
)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
return GiteaPullRequest(number=int(payload["number"]), html_url=payload.get("html_url") or "")
|
||||
|
||||
def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> None:
|
||||
response = self.client.post(
|
||||
f"/repos/{owner}/{name}/issues/{issue_number}/comments",
|
||||
json={"body": body},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
|
||||
def clone_url_from_repo_payload(payload: dict[str, Any], fallback_base_url: str, owner: str, name: str) -> str:
|
||||
return (
|
||||
payload.get("ssh_url")
|
||||
or payload.get("clone_url")
|
||||
or payload.get("html_url")
|
||||
or f"{fallback_base_url.rstrip('/')}/{owner}/{name}.git"
|
||||
)
|
||||
|
||||
|
||||
def repository_from_payload(payload: dict[str, Any], fallback_base_url: str) -> GiteaRepository:
|
||||
owner_payload = payload.get("owner") or {}
|
||||
owner = (
|
||||
owner_payload.get("login")
|
||||
or owner_payload.get("username")
|
||||
or payload.get("owner_name")
|
||||
or str(payload.get("full_name", "")).split("/", 1)[0]
|
||||
)
|
||||
name = payload.get("name") or str(payload.get("full_name", "")).split("/", 1)[-1]
|
||||
if not owner or not name:
|
||||
raise ValueError("Gitea repository response did not include owner/name")
|
||||
return GiteaRepository(
|
||||
owner=str(owner),
|
||||
name=str(name),
|
||||
full_name=payload.get("full_name") or f"{owner}/{name}",
|
||||
clone_url=clone_url_from_repo_payload(payload, fallback_base_url, str(owner), str(name)),
|
||||
default_branch=payload.get("default_branch") or "main",
|
||||
)
|
||||
Reference in New Issue
Block a user