from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Any import httpx from .config import RepositoryConfig from .models import labels_from_gitea @dataclass(frozen=True) class GiteaIssue: number: int title: str body: str labels: list[str] state: str html_url: str @dataclass(frozen=True) class GiteaPullRequest: number: int html_url: str state: str merged: bool @dataclass(frozen=True) class GiteaComment: id: int body: str author: str html_url: str created_at: datetime | None updated_at: datetime | None @dataclass(frozen=True) class GiteaPullReview: id: int body: str author: str state: str html_url: str submitted_at: datetime | None updated_at: datetime | None @dataclass(frozen=True) class GiteaRepository: owner: str name: str full_name: str clone_url: str default_branch: str class GiteaClient: def __init__(self, base_url: str, token: str, *, transport: httpx.BaseTransport | None = None): self.base_url = base_url.rstrip("/") self.client = httpx.Client( base_url=f"{self.base_url}/api/v1", headers={ "Authorization": f"token {token}", "Accept": "application/json", }, timeout=30, transport=transport, ) def close(self) -> None: self.client.close() def get_repository(self, repo: RepositoryConfig) -> dict[str, Any]: response = self.client.get(f"/repos/{repo.owner}/{repo.name}") response.raise_for_status() return response.json() def get_authenticated_username(self) -> str: response = self.client.get("/user") response.raise_for_status() payload = response.json() username = payload.get("login") or payload.get("username") if not username: raise ValueError("Gitea /user response did not include login or username") return str(username) def list_owned_repositories(self) -> list[GiteaRepository]: username = self.get_authenticated_username() repositories: list[GiteaRepository] = [] page = 1 limit = 50 while True: response = self.client.get("/user/repos", params={"page": page, "limit": limit}) response.raise_for_status() payload = response.json() if not payload: break for item in payload: repo = repository_from_payload(item, self.base_url) if repo.owner.casefold() == username.casefold(): repositories.append(repo) if len(payload) < limit: break page += 1 return repositories def list_open_issues(self, owner: str, name: str) -> list[GiteaIssue]: issues: list[GiteaIssue] = [] page = 1 limit = 50 while True: response = self.client.get( f"/repos/{owner}/{name}/issues", params={"state": "open", "type": "issues", "page": page, "limit": limit}, ) response.raise_for_status() payload = response.json() if not payload: break for item in payload: if item.get("pull_request"): continue issues.append( GiteaIssue( number=int(item["number"]), title=item.get("title") or "", body=item.get("body") or "", labels=labels_from_gitea(item.get("labels")), state=item.get("state") or "open", html_url=item.get("html_url") or item.get("url") or "", ) ) if len(payload) < limit: break page += 1 return issues def create_pull_request( self, *, owner: str, name: str, title: str, body: str, head: str, base: str, ) -> GiteaPullRequest: response = self.client.post( f"/repos/{owner}/{name}/pulls", json={"title": title, "body": body, "head": head, "base": base}, ) response.raise_for_status() payload = response.json() return pull_request_from_payload(payload) def get_pull_request(self, *, owner: str, name: str, pr_number: int) -> GiteaPullRequest: response = self.client.get(f"/repos/{owner}/{name}/pulls/{pr_number}") response.raise_for_status() return pull_request_from_payload(response.json()) def list_issue_comments(self, *, owner: str, name: str, issue_number: int) -> list[GiteaComment]: comments: list[GiteaComment] = [] page = 1 limit = 50 while True: response = self.client.get( f"/repos/{owner}/{name}/issues/{issue_number}/comments", params={"page": page, "limit": limit}, ) response.raise_for_status() payload = response.json() if not payload: break comments.extend(comment_from_payload(item) for item in payload) if len(payload) < limit: break page += 1 return comments def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> GiteaComment: response = self.client.post( f"/repos/{owner}/{name}/issues/{issue_number}/comments", json={"body": body}, ) response.raise_for_status() return comment_from_payload(response.json()) def list_pull_request_reviews(self, *, owner: str, name: str, pr_number: int) -> list[GiteaPullReview]: reviews: list[GiteaPullReview] = [] page = 1 limit = 50 while True: response = self.client.get( f"/repos/{owner}/{name}/pulls/{pr_number}/reviews", params={"page": page, "limit": limit}, ) if response.status_code == 404: return reviews response.raise_for_status() payload = response.json() if not payload: break reviews.extend(pull_review_from_payload(item) for item in payload) if len(payload) < limit: break page += 1 return reviews def list_pull_request_review_comments( self, *, owner: str, name: str, pr_number: int, review_id: int, ) -> list[GiteaComment]: comments: list[GiteaComment] = [] page = 1 limit = 50 while True: response = self.client.get( f"/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/comments", params={"page": page, "limit": limit}, ) if response.status_code == 404: return comments response.raise_for_status() payload = response.json() if not payload: break comments.extend(review_comment_from_payload(item) for item in payload) if len(payload) < limit: break page += 1 return comments def close_issue(self, *, owner: str, name: str, issue_number: int) -> None: response = self.client.patch( f"/repos/{owner}/{name}/issues/{issue_number}", json={"state": "closed"}, ) response.raise_for_status() def clone_url_from_repo_payload(payload: dict[str, Any], fallback_base_url: str, owner: str, name: str) -> str: return ( payload.get("ssh_url") or payload.get("clone_url") or payload.get("html_url") or f"{fallback_base_url.rstrip('/')}/{owner}/{name}.git" ) def repository_from_payload(payload: dict[str, Any], fallback_base_url: str) -> GiteaRepository: owner_payload = payload.get("owner") or {} owner = ( owner_payload.get("login") or owner_payload.get("username") or payload.get("owner_name") or str(payload.get("full_name", "")).split("/", 1)[0] ) name = payload.get("name") or str(payload.get("full_name", "")).split("/", 1)[-1] if not owner or not name: raise ValueError("Gitea repository response did not include owner/name") return GiteaRepository( owner=str(owner), name=str(name), full_name=payload.get("full_name") or f"{owner}/{name}", clone_url=clone_url_from_repo_payload(payload, fallback_base_url, str(owner), str(name)), default_branch=payload.get("default_branch") or "main", ) def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest: merged = bool(payload.get("merged") or payload.get("has_merged") or payload.get("merged_at")) return GiteaPullRequest( number=int(payload["number"]), html_url=payload.get("html_url") or payload.get("url") or "", state=payload.get("state") or "", merged=merged, ) def pull_review_from_payload(payload: dict[str, Any]) -> GiteaPullReview: user_payload = payload.get("user") or {} author = user_payload.get("login") or user_payload.get("username") or "" return GiteaPullReview( id=int(payload["id"]), body=payload.get("body") or "", author=str(author), state=payload.get("state") or "", html_url=payload.get("html_url") or payload.get("url") or "", submitted_at=parse_gitea_dt(payload.get("submitted_at")), updated_at=parse_gitea_dt(payload.get("updated_at")), ) def comment_from_payload(payload: dict[str, Any]) -> GiteaComment: user_payload = payload.get("user") or payload.get("poster") or {} author = user_payload.get("login") or user_payload.get("username") or "" return GiteaComment( id=int(payload["id"]), body=payload.get("body") or "", author=str(author), html_url=payload.get("html_url") or payload.get("url") or "", created_at=parse_gitea_dt(payload.get("created_at")), updated_at=parse_gitea_dt(payload.get("updated_at")), ) def review_comment_from_payload(payload: dict[str, Any]) -> GiteaComment: comment = comment_from_payload(payload) path = payload.get("path") position = payload.get("position") or payload.get("original_position") location_parts = [] if path: location_parts.append(str(path)) if position: location_parts.append(f"line {position}") if not location_parts: return comment location = ":".join(location_parts) body = f"Inline comment on {location}\n\n{comment.body}" return GiteaComment( id=comment.id, body=body, author=comment.author, html_url=comment.html_url, created_at=comment.created_at, updated_at=comment.updated_at, ) def parse_gitea_dt(value: str | None) -> datetime | None: if not value: return None if value.endswith("Z"): value = value[:-1] + "+00:00" return datetime.fromisoformat(value)