Files
agent-manager/tests/test_gitea_service.py

248 lines
8.2 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
import httpx
from agent_gitea.config import AppConfig
from agent_gitea.gitea import GiteaClient
from agent_gitea.models import AgentResult, TaskState
from agent_gitea.service import TaskRunner, scan_issues, sync_repositories
def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
data = {
"gitea": {"base_url": "https://gitea.test", "token_env": "GITEA_TOKEN"},
"database_path": tmp_path / "state.sqlite3",
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
"agents": {
"implementer": {"command": ["implementer", "-"]},
"reviewer": {"command": ["reviewer", "-"]},
},
}
data.update(overrides)
return AppConfig.model_validate(data)
def make_client(handler):
return GiteaClient("https://gitea.test", "token", transport=httpx.MockTransport(handler))
def test_sync_and_scan_with_mocked_gitea(db, tmp_path):
config = make_config(tmp_path)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "acme"})
if request.url.path == "/api/v1/user/repos":
return httpx.Response(
200,
json=[
{
"owner": {"login": "acme"},
"name": "service",
"full_name": "acme/service",
"clone_url": "https://gitea.test/acme/service.git",
"default_branch": "main",
},
{
"owner": {"login": "other"},
"name": "shared",
"full_name": "other/shared",
"clone_url": "https://gitea.test/other/shared.git",
"default_branch": "main",
},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues":
return httpx.Response(
200,
json=[
{
"number": 1,
"title": "Ready issue",
"body": "Body",
"state": "open",
"html_url": "https://gitea.test/acme/service/issues/1",
"labels": [{"name": "agent:ready"}],
},
{
"number": 2,
"title": "High risk",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}, {"name": "risk:high"}],
},
],
)
return httpx.Response(404)
client = make_client(handler)
repos = sync_repositories(db, config, client)
task_ids = scan_issues(db, config, client)
assert repos[0].full_name == "acme/service"
assert db.get_repository("other/shared") is None
assert len(task_ids) == 1
def test_list_open_issues_keeps_normal_issues_with_null_pull_request():
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/issues"
return httpx.Response(
200,
json=[
{
"number": 1,
"title": "Normal issue",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}],
"pull_request": None,
},
{
"number": 2,
"title": "PR issue",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}],
"pull_request": {"url": "https://gitea.test/pr/2"},
},
],
)
issues = make_client(handler).list_open_issues("acme", "service")
assert [issue.number for issue in issues] == [1]
class FakeWorkspaceManager:
def __init__(self, root: Path, *, diff: bool = True):
self.root = root
self.diff = diff
self.pushed: list[str] = []
def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_")
path.mkdir(parents=True)
return path
def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff
def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name)
def cleanup(self, workspace):
pass
class FakeRunner:
def __init__(self, *, fail_role: str | None = None):
self.fail_role = fail_role
def run(self, command, cwd, *, stdin=None):
role = command[0]
assert stdin
if role == self.fail_role:
return AgentResult(exit_code=1, stdout="", stderr="failed")
if role == "implementer":
(Path(cwd) / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
encoding="utf-8",
)
if role == "reviewer":
(Path(cwd) / "AGENT_REVIEW_REPORT.md").write_text(
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
encoding="utf-8",
)
return AgentResult(exit_code=0, stdout="ok", stderr="")
def seed_task(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
return db.create_task(repo.id, 1)
def test_run_task_success_posts_review_comments(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls":
return httpx.Response(201, json={"number": 5, "html_url": "pr-url"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(201, json={"id": 1})
return httpx.Response(404)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.HUMAN_REVIEW_READY
assert task.pr_number == 5
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
def test_run_task_no_diff_becomes_blocked(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(lambda request: httpx.Response(500)),
workspace_manager=FakeWorkspaceManager(tmp_path / "work", diff=False),
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.BLOCKED
assert task.error_message == "implementer produced no diff"
def test_run_task_agent_failure_becomes_failed(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(lambda request: httpx.Response(500)),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=FakeRunner(fail_role="implementer"),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.FAILED
assert "implementer failed" in (task.error_message or "")