Files
agent-manager/tests/test_gitea_service.py

522 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from pathlib import Path
import httpx
from agent_gitea.config import AppConfig
from agent_gitea.gitea import GiteaClient
from agent_gitea.models import AgentResult, TaskState
from agent_gitea.service import (
TaskRunner,
close_issues_for_merged_pull_requests,
scan_issues,
scan_pull_request_feedback,
sync_repositories,
)
def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
data = {
"gitea": {"base_url": "https://gitea.test", "token_env": "GITEA_TOKEN"},
"database_path": tmp_path / "state.sqlite3",
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
"agents": {
"implementer": {"command": ["implementer", "--cd", "{workspace_path}", "-"]},
"reviewer": {"command": ["reviewer", "--cd", "{workspace_path}", "-"]},
},
}
data.update(overrides)
return AppConfig.model_validate(data)
def make_client(handler):
return GiteaClient("https://gitea.test", "token", transport=httpx.MockTransport(handler))
def test_sync_and_scan_with_mocked_gitea(db, tmp_path):
config = make_config(tmp_path)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "acme"})
if request.url.path == "/api/v1/user/repos":
return httpx.Response(
200,
json=[
{
"owner": {"login": "acme"},
"name": "service",
"full_name": "acme/service",
"clone_url": "https://gitea.test/acme/service.git",
"default_branch": "main",
},
{
"owner": {"login": "other"},
"name": "shared",
"full_name": "other/shared",
"clone_url": "https://gitea.test/other/shared.git",
"default_branch": "main",
},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues":
return httpx.Response(
200,
json=[
{
"number": 1,
"title": "Ready issue",
"body": "Body",
"state": "open",
"html_url": "https://gitea.test/acme/service/issues/1",
"labels": [{"name": "agent:ready"}],
},
{
"number": 2,
"title": "High risk",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}, {"name": "risk:high"}],
},
],
)
return httpx.Response(404)
client = make_client(handler)
repos = sync_repositories(db, config, client)
task_ids = scan_issues(db, config, client)
assert repos[0].full_name == "acme/service"
assert db.get_repository("other/shared") is None
assert len(task_ids) == 1
def test_list_open_issues_keeps_normal_issues_with_null_pull_request():
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/issues"
return httpx.Response(
200,
json=[
{
"number": 1,
"title": "Normal issue",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}],
"pull_request": None,
},
{
"number": 2,
"title": "PR issue",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}],
"pull_request": {"url": "https://gitea.test/pr/2"},
},
],
)
issues = make_client(handler).list_open_issues("acme", "service")
assert [issue.number for issue in issues] == [1]
def test_list_open_issues_reads_all_pages():
seen_pages: list[int] = []
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/issues"
page = int(request.url.params["page"])
limit = int(request.url.params["limit"])
seen_pages.append(page)
if page == 1:
return httpx.Response(
200,
json=[
{
"number": number,
"title": f"Issue {number}",
"body": "",
"state": "open",
"labels": [],
}
for number in range(1, limit + 1)
],
)
if page == 2:
return httpx.Response(
200,
json=[
{
"number": 51,
"title": "Issue 51",
"body": "",
"state": "open",
"labels": [],
}
],
)
return httpx.Response(500)
issues = make_client(handler).list_open_issues("acme", "service")
assert [issue.number for issue in issues] == list(range(1, 52))
assert seen_pages == [1, 2]
class FakeWorkspaceManager:
def __init__(self, root: Path, *, diff: bool = True):
self.root = root
self.diff = diff
self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = []
def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_")
path.mkdir(parents=True, exist_ok=True)
return path
def prepare_existing_branch(self, repo, issue, branch_name, workspace_path):
self.resumed.append((branch_name, workspace_path))
path = Path(workspace_path) if workspace_path else self.root / branch_name.replace("/", "_")
path.mkdir(parents=True, exist_ok=True)
return path
def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff
def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name)
def commit_changes(self, workspace, message):
self.commit_message = message
return "abc1234"
def cleanup(self, workspace):
pass
class FakeRunner:
def __init__(self, *, fail_role: str | None = None):
self.fail_role = fail_role
def run(self, command, cwd, *, stdin=None):
role = command[0]
assert stdin
if role == self.fail_role:
return AgentResult(exit_code=1, stdout="", stderr="failed")
if role == "implementer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
encoding="utf-8",
)
if role == "reviewer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_REVIEW_REPORT.md").write_text(
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
encoding="utf-8",
)
return AgentResult(exit_code=0, stdout="ok", stderr="")
def seed_task(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
return db.create_task(repo.id, 1)
def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None):
db.transition(task_id, TaskState.CLAIMED)
db.transition(task_id, TaskState.PLANNING)
db.transition(task_id, TaskState.IMPLEMENTING, branch_name=branch_name)
db.transition(task_id, TaskState.TESTING)
db.transition(task_id, TaskState.PR_OPENED, pr_number=pr_number)
db.transition(task_id, TaskState.REVIEWING)
return db.transition(task_id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def test_run_task_success_posts_review_comments(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls":
return httpx.Response(201, json={"number": 5, "html_url": "pr-url"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(201, json={"id": 1})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
task = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.HUMAN_REVIEW_READY
assert task.pr_number == 5
assert workspace_manager.pushed == ["agent/issue-1-ready-issue"]
assert workspace_manager.commit_message == "agent: implement issue #1 - Ready issue"
pull_requests = [payload for _, path, payload in requests if path == "/api/v1/repos/acme/service/pulls"]
assert pull_requests[0]["title"] == "代理实现Ready issue"
assert "代理实现报告" in pull_requests[0]["body"]
assert "Closes #1" in pull_requests[0]["body"]
command = json.loads(db.list_agent_runs(task.id)[0]["command_json"])
assert command[1] == "--cd"
assert Path(command[2]).is_absolute()
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
def test_scan_pull_request_feedback_queues_task_for_new_human_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "internal note", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
task_after_scan = db.get_task(task.id)
assert queued == 1
assert task_after_scan is not None
assert task_after_scan.state == TaskState.DISCOVERED
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 2
def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[{"id": 3, "body": "summary", "user": {"login": "agent-bot"}}])
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
assert queued == 0
assert not db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
next_comment_id = 3
def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
next_comment_id += 1
return httpx.Response(201, json={"id": next_comment_id, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert finished.pr_number == 5
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.pushed == ["agent/issue-1-ready-issue"]
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 6
assert not db.has_pending_pr_feedback(task.id)
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
db.transition(task.id, TaskState.CLAIMED)
db.transition(task.id, TaskState.PLANNING)
db.transition(task.id, TaskState.IMPLEMENTING)
db.transition(task.id, TaskState.TESTING)
db.transition(task.id, TaskState.PR_OPENED, pr_number=5)
db.transition(task.id, TaskState.REVIEWING)
db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
return httpx.Response(201, json={"id": 1})
if request.url.path == "/api/v1/repos/acme/service/issues/1":
return httpx.Response(200, json={"number": 1, "state": "closed"})
return httpx.Response(404)
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
assert closed == 1
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
def test_close_issues_for_merged_pull_requests_skips_unmerged_pr(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
db.transition(task.id, TaskState.CLAIMED)
db.transition(task.id, TaskState.PLANNING)
db.transition(task.id, TaskState.IMPLEMENTING)
db.transition(task.id, TaskState.TESTING)
db.transition(task.id, TaskState.PR_OPENED, pr_number=5)
db.transition(task.id, TaskState.REVIEWING)
db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/pulls/5"
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
assert closed == 0
assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr]
def test_run_task_no_diff_becomes_blocked(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(lambda request: httpx.Response(500)),
workspace_manager=FakeWorkspaceManager(tmp_path / "work", diff=False),
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.BLOCKED
assert task.error_message == "implementer produced no diff"
def test_run_task_agent_failure_becomes_failed(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(lambda request: httpx.Response(500)),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=FakeRunner(fail_role="implementer"),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.FAILED
assert "implementer failed" in (task.error_message or "")