from __future__ import annotations import json from pathlib import Path import httpx from agent_gitea.config import AppConfig from agent_gitea.gitea import GiteaClient from agent_gitea.models import AgentResult, TaskState from agent_gitea.service import ( TaskRunner, close_issues_for_merged_pull_requests, scan_issues, scan_pull_request_feedback, sync_repositories, ) def make_config(tmp_path: Path, **overrides: object) -> AppConfig: data = { "gitea": {"base_url": "https://gitea.test", "token_env": "GITEA_TOKEN"}, "database_path": tmp_path / "state.sqlite3", "scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60}, "workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False}, "agents": { "implementer": {"command": ["implementer", "--cd", "{workspace_path}", "-"]}, "reviewer": {"command": ["reviewer", "--cd", "{workspace_path}", "-"]}, }, } data.update(overrides) return AppConfig.model_validate(data) def make_client(handler): return GiteaClient("https://gitea.test", "token", transport=httpx.MockTransport(handler)) def test_sync_and_scan_with_mocked_gitea(db, tmp_path): config = make_config(tmp_path) def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "acme"}) if request.url.path == "/api/v1/user/repos": return httpx.Response( 200, json=[ { "owner": {"login": "acme"}, "name": "service", "full_name": "acme/service", "clone_url": "https://gitea.test/acme/service.git", "default_branch": "main", }, { "owner": {"login": "other"}, "name": "shared", "full_name": "other/shared", "clone_url": "https://gitea.test/other/shared.git", "default_branch": "main", }, ], ) if request.url.path == "/api/v1/repos/acme/service/issues": return httpx.Response( 200, json=[ { "number": 1, "title": "Ready issue", "body": "Body", "state": "open", "html_url": "https://gitea.test/acme/service/issues/1", "labels": [{"name": "agent:ready"}], }, { "number": 2, "title": "High risk", "body": "Body", "state": "open", "labels": [{"name": "agent:ready"}, {"name": "risk:high"}], }, ], ) return httpx.Response(404) client = make_client(handler) repos = sync_repositories(db, config, client) task_ids = scan_issues(db, config, client) assert repos[0].full_name == "acme/service" assert db.get_repository("other/shared") is None assert len(task_ids) == 1 def test_list_open_issues_keeps_normal_issues_with_null_pull_request(): def handler(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v1/repos/acme/service/issues" return httpx.Response( 200, json=[ { "number": 1, "title": "Normal issue", "body": "Body", "state": "open", "labels": [{"name": "agent:ready"}], "pull_request": None, }, { "number": 2, "title": "PR issue", "body": "Body", "state": "open", "labels": [{"name": "agent:ready"}], "pull_request": {"url": "https://gitea.test/pr/2"}, }, ], ) issues = make_client(handler).list_open_issues("acme", "service") assert [issue.number for issue in issues] == [1] def test_list_open_issues_reads_all_pages(): seen_pages: list[int] = [] def handler(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v1/repos/acme/service/issues" page = int(request.url.params["page"]) limit = int(request.url.params["limit"]) seen_pages.append(page) if page == 1: return httpx.Response( 200, json=[ { "number": number, "title": f"Issue {number}", "body": "", "state": "open", "labels": [], } for number in range(1, limit + 1) ], ) if page == 2: return httpx.Response( 200, json=[ { "number": 51, "title": "Issue 51", "body": "", "state": "open", "labels": [], } ], ) return httpx.Response(500) issues = make_client(handler).list_open_issues("acme", "service") assert [issue.number for issue in issues] == list(range(1, 52)) assert seen_pages == [1, 2] class FakeWorkspaceManager: def __init__(self, root: Path, *, diff: bool = True): self.root = root self.diff = diff self.pushed: list[str] = [] self.resumed: list[tuple[str, Path | None]] = [] self.cleaned: list[Path] = [] def prepare(self, repo, issue, branch_name): path = self.root / branch_name.replace("/", "_") path.mkdir(parents=True, exist_ok=True) return path def prepare_existing_branch(self, repo, issue, branch_name, workspace_path): self.resumed.append((branch_name, workspace_path)) path = Path(workspace_path) if workspace_path else self.root / branch_name.replace("/", "_") path.mkdir(parents=True, exist_ok=True) return path def has_diff(self, workspace, base_ref="origin/HEAD"): return self.diff def has_uncommitted_changes(self, workspace): return self.diff def push_branch(self, workspace, branch_name): self.pushed.append(branch_name) def commit_changes(self, workspace, message): self.commit_message = message return "abc1234" def cleanup(self, workspace): self.cleaned.append(Path(workspace)) class FakeRunner: def __init__(self, *, fail_role: str | None = None): self.fail_role = fail_role def run(self, command, cwd, *, stdin=None): role = command[0] assert stdin if role == self.fail_role: return AgentResult(exit_code=1, stdout="", stderr="failed") if role == "implementer": output_dir = Path(cwd) / ".agent-output" output_dir.mkdir(exist_ok=True) (output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text( "## Summary\nImplemented\n\n## Test commands run\npytest\n", encoding="utf-8", ) if role == "reviewer": output_dir = Path(cwd) / ".agent-output" output_dir.mkdir(exist_ok=True) (output_dir / "AGENT_REVIEW_REPORT.md").write_text( "## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n", encoding="utf-8", ) return AgentResult(exit_code=0, stdout="ok", stderr="") def seed_task(db): repo = db.upsert_repository( owner="acme", name="service", clone_url="https://gitea.test/acme/service.git", default_branch="main", enabled=True, ) db.upsert_issue( repo_id=repo.id, issue_number=1, title="Ready issue", body="Body", labels=["agent:ready"], state="open", html_url="https://gitea.test/acme/service/issues/1", ) return db.create_task(repo.id, 1) def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None): db.transition(task_id, TaskState.CLAIMED) db.transition(task_id, TaskState.PLANNING) db.transition(task_id, TaskState.IMPLEMENTING, branch_name=branch_name) db.transition(task_id, TaskState.TESTING) db.transition(task_id, TaskState.PR_OPENED, pr_number=pr_number) db.transition(task_id, TaskState.REVIEWING) return db.transition(task_id, TaskState.HUMAN_REVIEW_READY, clear_lease=True) def test_run_task_success_posts_review_comments(db, tmp_path): config = make_config(tmp_path) seed_task(db) requests: list[tuple[str, str, dict]] = [] def handler(request: httpx.Request) -> httpx.Response: payload = json.loads(request.content.decode() or "{}") requests.append((request.method, request.url.path, payload)) if request.url.path == "/api/v1/repos/acme/service/pulls": return httpx.Response(201, json={"number": 5, "html_url": "pr-url"}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments": return httpx.Response(201, json={"id": 1}) return httpx.Response(404) workspace_manager = FakeWorkspaceManager(tmp_path / "work") task = TaskRunner( db=db, config=config, gitea=make_client(handler), workspace_manager=workspace_manager, command_runner=FakeRunner(), worker_id="worker", ).run_once() assert task is not None assert task.state == TaskState.HUMAN_REVIEW_READY assert task.pr_number == 5 assert workspace_manager.pushed == ["agent/issue-1-ready-issue"] assert workspace_manager.commit_message == "agent: implement issue #1 - Ready issue" pull_requests = [payload for _, path, payload in requests if path == "/api/v1/repos/acme/service/pulls"] assert pull_requests[0]["title"] == "代理实现:Ready issue" assert "代理实现报告" in pull_requests[0]["body"] assert "Closes #1" in pull_requests[0]["body"] command = json.loads(db.list_agent_runs(task.id)[0]["command_json"]) assert command[1] == "--cd" assert Path(command[2]).is_absolute() assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2 def test_scan_pull_request_feedback_queues_task_for_new_human_comment(db): task = seed_task(db) transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue") db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2) def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/pulls/5": return httpx.Response(200, json={"number": 5, "state": "open", "merged": False}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments": return httpx.Response( 200, json=[ {"id": 1, "body": "review report", "user": {"login": "agent-bot"}}, {"id": 2, "body": "summary", "user": {"login": "agent-bot"}}, {"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}}, {"id": 4, "body": "internal note", "user": {"login": "agent-bot"}}, ], ) return httpx.Response(404) queued = scan_pull_request_feedback(db, make_client(handler)) task_after_scan = db.get_task(task.id) assert queued == 1 assert task_after_scan is not None assert task_after_scan.state == TaskState.DISCOVERED assert db.has_pending_pr_feedback(task.id) assert db.get_pr_feedback_cursor(task.id) == 2 def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db): task = seed_task(db) transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue") db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2) def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/pulls/5": return httpx.Response(200, json={"number": 5, "state": "open", "merged": False}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments": return httpx.Response(200, json=[{"id": 3, "body": "summary", "user": {"login": "agent-bot"}}]) return httpx.Response(404) queued = scan_pull_request_feedback(db, make_client(handler)) assert queued == 0 assert not db.has_pending_pr_feedback(task.id) assert db.get_pr_feedback_cursor(task.id) == 3 assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr] def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db): task = seed_task(db) transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue") db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2) def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/pulls/5": return httpx.Response(200, json={"number": 5, "state": "open", "merged": False}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments": return httpx.Response(200, json=[]) if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews": return httpx.Response( 200, json=[ { "id": 9, "body": "", "state": "REQUEST_CHANGES", "user": {"login": "alice"}, } ], ) if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments": return httpx.Response( 200, json=[ { "id": 31, "body": "This branch misses the cleanup case.", "path": "src/service.py", "position": 42, "user": {"login": "alice"}, } ], ) return httpx.Response(404) queued = scan_pull_request_feedback(db, make_client(handler)) cursors = db.get_pr_feedback_cursors(task.id) assert queued == 1 assert db.has_pending_pr_feedback(task.id) assert cursors.last_seen_comment_id == 2 assert cursors.last_seen_review_id == 0 assert cursors.last_seen_review_comment_id == 0 def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path): config = make_config(tmp_path) task = seed_task(db) workspace_path = tmp_path / "work" / "existing" transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue") db.transition(task.id, TaskState.DISCOVERED, clear_lease=True) db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2) db.mark_pr_feedback_pending(task.id) db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id)) db.conn.commit() requests: list[tuple[str, str, dict]] = [] next_comment_id = 4 def handler(request: httpx.Request) -> httpx.Response: nonlocal next_comment_id payload = json.loads(request.content.decode() or "{}") requests.append((request.method, request.url.path, payload)) if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET": return httpx.Response( 200, json=[ {"id": 1, "body": "review report", "user": {"login": "agent-bot"}}, {"id": 2, "body": "summary", "user": {"login": "agent-bot"}}, {"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}}, ], ) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST": next_comment_id += 1 return httpx.Response(201, json={"id": next_comment_id, "body": payload["body"], "user": {"login": "agent-bot"}}) return httpx.Response(404) workspace_manager = FakeWorkspaceManager(tmp_path / "work") finished = TaskRunner( db=db, config=config, gitea=make_client(handler), workspace_manager=workspace_manager, command_runner=FakeRunner(), worker_id="worker", ).run_once() assert finished is not None assert finished.state == TaskState.HUMAN_REVIEW_READY assert finished.pr_number == 5 assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)] assert workspace_manager.pushed == ["agent/issue-1-ready-issue"] assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1" assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests) assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"] assert db.get_pr_feedback_cursor(task.id) == 3 assert not db.has_pending_pr_feedback(task.id) def rescan_handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/pulls/5": return httpx.Response(200, json={"number": 5, "state": "open", "merged": False}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments": return httpx.Response( 200, json=[ {"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}}, {"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}}, {"id": 5, "body": "processed", "user": {"login": "agent-bot"}}, {"id": 6, "body": "review report", "user": {"login": "agent-bot"}}, {"id": 7, "body": "summary", "user": {"login": "agent-bot"}}, ], ) return httpx.Response(404) queued = scan_pull_request_feedback(db, make_client(rescan_handler)) assert queued == 1 assert db.has_pending_pr_feedback(task.id) assert db.get_pr_feedback_cursor(task.id) == 3 def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path): config = make_config(tmp_path) task = seed_task(db) workspace_path = tmp_path / "work" / "existing" transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue") db.transition(task.id, TaskState.DISCOVERED, clear_lease=True) db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2) db.mark_pr_feedback_pending(task.id) db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id)) db.conn.commit() requests: list[tuple[str, str, dict]] = [] def handler(request: httpx.Request) -> httpx.Response: payload = json.loads(request.content.decode() or "{}") requests.append((request.method, request.url.path, payload)) if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET": return httpx.Response( 200, json=[ {"id": 1, "body": "review report", "user": {"login": "agent-bot"}}, {"id": 2, "body": "summary", "user": {"login": "agent-bot"}}, {"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}}, ], ) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST": return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}}) return httpx.Response(404) workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False) finished = TaskRunner( db=db, config=config, gitea=make_client(handler), workspace_manager=workspace_manager, command_runner=FakeRunner(), worker_id="worker", ).run_once() assert finished is not None assert finished.state == TaskState.HUMAN_REVIEW_READY assert workspace_manager.pushed == [] assert not hasattr(workspace_manager, "commit_message") assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"] assert db.get_pr_feedback_cursor(task.id) == 3 assert not db.has_pending_pr_feedback(task.id) def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path): config = make_config( tmp_path, workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True}, ) task = seed_task(db) workspace_path = tmp_path / "work" / "existing" transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue") db.transition(task.id, TaskState.DISCOVERED, clear_lease=True) db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2) db.mark_pr_feedback_pending(task.id) db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id)) db.conn.commit() def handler(request: httpx.Request) -> httpx.Response: payload = json.loads(request.content.decode() or "{}") if request.url.path == "/api/v1/user": return httpx.Response(200, json={"login": "agent-bot"}) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET": return httpx.Response( 200, json=[ {"id": 3, "body": "Please address the review.", "user": {"login": "alice"}}, ], ) if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST": return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}}) return httpx.Response(404) workspace_manager = FakeWorkspaceManager(tmp_path / "work") finished = TaskRunner( db=db, config=config, gitea=make_client(handler), workspace_manager=workspace_manager, command_runner=FakeRunner(), worker_id="worker", ).run_once() assert finished is not None assert finished.state == TaskState.HUMAN_REVIEW_READY assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)] assert workspace_manager.cleaned == [workspace_path] def test_close_issues_for_merged_pull_requests_closes_linked_issue(db): repo = db.upsert_repository( owner="acme", name="service", clone_url="https://gitea.test/acme/service.git", default_branch="main", enabled=True, ) db.upsert_issue( repo_id=repo.id, issue_number=1, title="Ready issue", body="Body", labels=["agent:ready"], state="open", html_url="https://gitea.test/acme/service/issues/1", ) task = db.create_task(repo.id, 1) db.transition(task.id, TaskState.CLAIMED) db.transition(task.id, TaskState.PLANNING) db.transition(task.id, TaskState.IMPLEMENTING) db.transition(task.id, TaskState.TESTING) db.transition(task.id, TaskState.PR_OPENED, pr_number=5) db.transition(task.id, TaskState.REVIEWING) db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True) requests: list[tuple[str, str, dict]] = [] def handler(request: httpx.Request) -> httpx.Response: payload = json.loads(request.content.decode() or "{}") requests.append((request.method, request.url.path, payload)) if request.url.path == "/api/v1/repos/acme/service/pulls/5": return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True}) if request.url.path == "/api/v1/repos/acme/service/issues/1/comments": return httpx.Response(201, json={"id": 1}) if request.url.path == "/api/v1/repos/acme/service/issues/1": return httpx.Response(200, json={"number": 1, "state": "closed"}) return httpx.Response(404) closed = close_issues_for_merged_pull_requests(db, make_client(handler)) assert closed == 1 assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr] assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests def test_close_issues_for_merged_pull_requests_skips_unmerged_pr(db): repo = db.upsert_repository( owner="acme", name="service", clone_url="https://gitea.test/acme/service.git", default_branch="main", enabled=True, ) db.upsert_issue( repo_id=repo.id, issue_number=1, title="Ready issue", body="Body", labels=["agent:ready"], state="open", html_url="https://gitea.test/acme/service/issues/1", ) task = db.create_task(repo.id, 1) db.transition(task.id, TaskState.CLAIMED) db.transition(task.id, TaskState.PLANNING) db.transition(task.id, TaskState.IMPLEMENTING) db.transition(task.id, TaskState.TESTING) db.transition(task.id, TaskState.PR_OPENED, pr_number=5) db.transition(task.id, TaskState.REVIEWING) db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True) def handler(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v1/repos/acme/service/pulls/5" return httpx.Response(200, json={"number": 5, "state": "open", "merged": False}) closed = close_issues_for_merged_pull_requests(db, make_client(handler)) assert closed == 0 assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr] def test_run_task_no_diff_becomes_blocked(db, tmp_path): config = make_config(tmp_path) seed_task(db) task = TaskRunner( db=db, config=config, gitea=make_client(lambda request: httpx.Response(500)), workspace_manager=FakeWorkspaceManager(tmp_path / "work", diff=False), command_runner=FakeRunner(), worker_id="worker", ).run_once() assert task is not None assert task.state == TaskState.BLOCKED assert task.error_message == "implementer produced no diff" def test_run_task_agent_failure_becomes_failed(db, tmp_path): config = make_config(tmp_path) seed_task(db) task = TaskRunner( db=db, config=config, gitea=make_client(lambda request: httpx.Response(500)), workspace_manager=FakeWorkspaceManager(tmp_path / "work"), command_runner=FakeRunner(fail_role="implementer"), worker_id="worker", ).run_once() assert task is not None assert task.state == TaskState.FAILED assert "implementer failed" in (task.error_message or "")