Files
agent-manager/tests/test_gitea_service.py

868 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import sys
import threading
import time
from pathlib import Path
import httpx
from agent_gitea.agents import CommandRunner
from agent_gitea.config import AppConfig
from agent_gitea.db import Database
from agent_gitea.gitea import GiteaClient
from agent_gitea.models import AgentResult, TaskState
from agent_gitea.service import (
TaskRunner,
close_issues_for_merged_pull_requests,
scan_issues,
scan_pull_request_feedback,
sync_repositories,
)
def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
data = {
"gitea": {"base_url": "https://gitea.test", "token_env": "GITEA_TOKEN"},
"database_path": tmp_path / "state.sqlite3",
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
"agents": {
"implementer": {"command": ["implementer", "--cd", "{workspace_path}", "-"]},
"reviewer": {"command": ["reviewer", "--cd", "{workspace_path}", "-"]},
},
}
data.update(overrides)
return AppConfig.model_validate(data)
def make_client(handler):
return GiteaClient("https://gitea.test", "token", transport=httpx.MockTransport(handler))
def test_sync_and_scan_with_mocked_gitea(db, tmp_path):
config = make_config(tmp_path)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "acme"})
if request.url.path == "/api/v1/user/repos":
return httpx.Response(
200,
json=[
{
"owner": {"login": "acme"},
"name": "service",
"full_name": "acme/service",
"clone_url": "https://gitea.test/acme/service.git",
"default_branch": "main",
},
{
"owner": {"login": "other"},
"name": "shared",
"full_name": "other/shared",
"clone_url": "https://gitea.test/other/shared.git",
"default_branch": "main",
},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues":
return httpx.Response(
200,
json=[
{
"number": 1,
"title": "Ready issue",
"body": "Body",
"state": "open",
"html_url": "https://gitea.test/acme/service/issues/1",
"labels": [{"name": "agent:ready"}],
},
{
"number": 2,
"title": "High risk",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}, {"name": "risk:high"}],
},
],
)
return httpx.Response(404)
client = make_client(handler)
repos = sync_repositories(db, config, client)
task_ids = scan_issues(db, config, client)
assert repos[0].full_name == "acme/service"
assert db.get_repository("other/shared") is None
assert len(task_ids) == 1
def test_list_open_issues_keeps_normal_issues_with_null_pull_request():
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/issues"
return httpx.Response(
200,
json=[
{
"number": 1,
"title": "Normal issue",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}],
"pull_request": None,
},
{
"number": 2,
"title": "PR issue",
"body": "Body",
"state": "open",
"labels": [{"name": "agent:ready"}],
"pull_request": {"url": "https://gitea.test/pr/2"},
},
],
)
issues = make_client(handler).list_open_issues("acme", "service")
assert [issue.number for issue in issues] == [1]
def test_list_open_issues_reads_all_pages():
seen_pages: list[int] = []
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/issues"
page = int(request.url.params["page"])
limit = int(request.url.params["limit"])
seen_pages.append(page)
if page == 1:
return httpx.Response(
200,
json=[
{
"number": number,
"title": f"Issue {number}",
"body": "",
"state": "open",
"labels": [],
}
for number in range(1, limit + 1)
],
)
if page == 2:
return httpx.Response(
200,
json=[
{
"number": 51,
"title": "Issue 51",
"body": "",
"state": "open",
"labels": [],
}
],
)
return httpx.Response(500)
issues = make_client(handler).list_open_issues("acme", "service")
assert [issue.number for issue in issues] == list(range(1, 52))
assert seen_pages == [1, 2]
class FakeWorkspaceManager:
def __init__(self, root: Path, *, diff: bool = True):
self.root = root
self.diff = diff
self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = []
self.cleaned: list[Path] = []
def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_")
path.mkdir(parents=True, exist_ok=True)
return path
def prepare_existing_branch(self, repo, issue, branch_name, workspace_path):
self.resumed.append((branch_name, workspace_path))
path = Path(workspace_path) if workspace_path else self.root / branch_name.replace("/", "_")
path.mkdir(parents=True, exist_ok=True)
return path
def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff
def has_uncommitted_changes(self, workspace):
return self.diff
def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name)
def commit_changes(self, workspace, message):
self.commit_message = message
return "abc1234"
def cleanup(self, workspace):
self.cleaned.append(Path(workspace))
class FakeRunner:
def __init__(self, *, fail_role: str | None = None):
self.fail_role = fail_role
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
role = command[0]
assert stdin
if role == self.fail_role:
return AgentResult(exit_code=1, stdout="", stderr="failed")
if role == "implementer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
encoding="utf-8",
)
if role == "reviewer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_REVIEW_REPORT.md").write_text(
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
encoding="utf-8",
)
return AgentResult(exit_code=0, stdout="ok", stderr="")
def seed_task(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
return db.create_task(repo.id, 1)
def test_claim_next_task_allows_only_one_worker_during_race(db, tmp_path):
seed_task(db)
db.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
db.conn.execute(
"""
CREATE TRIGGER slow_claim_update
BEFORE UPDATE OF state ON tasks
WHEN NEW.state = 'CLAIMED'
BEGIN
SELECT sleep_ms(150);
END
"""
)
db.conn.commit()
db.close()
barrier = threading.Barrier(2)
results: dict[str, int | None] = {}
errors: list[BaseException] = []
def claim(worker_id: str) -> None:
database = Database(tmp_path / "state.sqlite3")
database.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
try:
barrier.wait()
task = database.claim_next_task(worker_id, 60)
results[worker_id] = task.id if task else None
except BaseException as exc: # pragma: no cover - assertion below reports thread failures
errors.append(exc)
finally:
database.close()
threads = [threading.Thread(target=claim, args=(worker_id,)) for worker_id in ("worker-a", "worker-b")]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert errors == []
assert list(results.values()).count(1) == 1
assert list(results.values()).count(None) == 1
def test_command_runner_returns_timeout_result(tmp_path):
result = CommandRunner().run(
[sys.executable, "-c", "import time; time.sleep(5)"],
tmp_path,
timeout_seconds=0.1,
)
assert result.exit_code == 124
assert "timed out" in result.stderr
class ObservingSlowRunner:
def __init__(self, db, task_id: int):
self.db = db
self.task_id = task_id
self.lease_before = None
self.lease_during = None
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
role = command[0]
if role == "implementer":
self.lease_before = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
time.sleep(1.2)
self.lease_during = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
encoding="utf-8",
)
if role == "reviewer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_REVIEW_REPORT.md").write_text(
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
encoding="utf-8",
)
return AgentResult(exit_code=0, stdout="ok", stderr="")
def test_task_runner_renews_lease_while_agent_runs(db, tmp_path):
config = make_config(
tmp_path,
scheduler={
"interval_seconds": 1,
"concurrency": 1,
"lease_seconds": 60,
"lease_renewal_interval_seconds": 1,
},
)
task = seed_task(db)
runner = ObservingSlowRunner(db, task.id)
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/repos/acme/service/pulls":
return httpx.Response(201, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(201, json={"id": 1, "body": payload.get("body", ""), "user": {"login": "agent-bot"}})
return httpx.Response(404)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=runner,
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert runner.lease_before is not None
assert runner.lease_during is not None
assert runner.lease_during > runner.lease_before
def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None):
db.transition(task_id, TaskState.CLAIMED)
db.transition(task_id, TaskState.PLANNING)
db.transition(task_id, TaskState.IMPLEMENTING, branch_name=branch_name)
db.transition(task_id, TaskState.TESTING)
db.transition(task_id, TaskState.PR_OPENED, pr_number=pr_number)
db.transition(task_id, TaskState.REVIEWING)
return db.transition(task_id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def test_run_task_success_posts_review_comments(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls":
return httpx.Response(201, json={"number": 5, "html_url": "pr-url"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(201, json={"id": 1})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
task = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.HUMAN_REVIEW_READY
assert task.pr_number == 5
assert workspace_manager.pushed == ["agent/issue-1-ready-issue"]
assert workspace_manager.commit_message == "agent: implement issue #1 - Ready issue"
pull_requests = [payload for _, path, payload in requests if path == "/api/v1/repos/acme/service/pulls"]
assert pull_requests[0]["title"] == "代理实现Ready issue"
assert "代理实现报告" in pull_requests[0]["body"]
assert "Closes #1" in pull_requests[0]["body"]
command = json.loads(db.list_agent_runs(task.id)[0]["command_json"])
assert command[1] == "--cd"
assert Path(command[2]).is_absolute()
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
def test_scan_pull_request_feedback_queues_task_for_new_human_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "internal note", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
task_after_scan = db.get_task(task.id)
assert queued == 1
assert task_after_scan is not None
assert task_after_scan.state == TaskState.DISCOVERED
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 2
def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[{"id": 3, "body": "summary", "user": {"login": "agent-bot"}}])
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
assert queued == 0
assert not db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[])
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews":
return httpx.Response(
200,
json=[
{
"id": 9,
"body": "",
"state": "REQUEST_CHANGES",
"user": {"login": "alice"},
}
],
)
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments":
return httpx.Response(
200,
json=[
{
"id": 31,
"body": "This branch misses the cleanup case.",
"path": "src/service.py",
"position": 42,
"user": {"login": "alice"},
}
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
cursors = db.get_pr_feedback_cursors(task.id)
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert cursors.last_seen_comment_id == 2
assert cursors.last_seen_review_id == 0
assert cursors.last_seen_review_comment_id == 0
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
next_comment_id = 4
def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
next_comment_id += 1
return httpx.Response(201, json={"id": next_comment_id, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert finished.pr_number == 5
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.pushed == ["agent/issue-1-ready-issue"]
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def rescan_handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}},
{"id": 5, "body": "processed", "user": {"login": "agent-bot"}},
{"id": 6, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 7, "body": "summary", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(rescan_handler))
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.pushed == []
assert not hasattr(workspace_manager, "commit_message")
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path):
config = make_config(
tmp_path,
workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True},
)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please address the review.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.cleaned == [workspace_path]
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
db.transition(task.id, TaskState.CLAIMED)
db.transition(task.id, TaskState.PLANNING)
db.transition(task.id, TaskState.IMPLEMENTING)
db.transition(task.id, TaskState.TESTING)
db.transition(task.id, TaskState.PR_OPENED, pr_number=5)
db.transition(task.id, TaskState.REVIEWING)
db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
return httpx.Response(201, json={"id": 1})
if request.url.path == "/api/v1/repos/acme/service/issues/1":
return httpx.Response(200, json={"number": 1, "state": "closed"})
return httpx.Response(404)
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
assert closed == 1
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
def test_close_issues_for_merged_pull_requests_skips_unmerged_pr(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
db.transition(task.id, TaskState.CLAIMED)
db.transition(task.id, TaskState.PLANNING)
db.transition(task.id, TaskState.IMPLEMENTING)
db.transition(task.id, TaskState.TESTING)
db.transition(task.id, TaskState.PR_OPENED, pr_number=5)
db.transition(task.id, TaskState.REVIEWING)
db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/pulls/5"
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
assert closed == 0
assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr]
def test_close_issues_for_merged_pull_requests_handles_queued_feedback_task(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
task = transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.mark_pr_feedback_pending(task.id)
db.transition(
task.id,
TaskState.DISCOVERED,
message="queued PR feedback from 1 human comment(s)",
clear_lease=True,
)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
return httpx.Response(201, json={"id": 1})
if request.url.path == "/api/v1/repos/acme/service/issues/1":
return httpx.Response(200, json={"number": 1, "state": "closed"})
return httpx.Response(404)
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
updated_task = db.get_task(task.id)
assert closed == 1
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
assert updated_task is not None
assert updated_task.state == TaskState.CANCELLED
assert not db.has_pending_pr_feedback(task.id)
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
def test_run_task_no_diff_becomes_blocked(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(lambda request: httpx.Response(500)),
workspace_manager=FakeWorkspaceManager(tmp_path / "work", diff=False),
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.BLOCKED
assert task.error_message == "implementer produced no diff"
def test_run_task_agent_failure_becomes_failed(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
task = TaskRunner(
db=db,
config=config,
gitea=make_client(lambda request: httpx.Response(500)),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=FakeRunner(fail_role="implementer"),
worker_id="worker",
).run_once()
assert task is not None
assert task.state == TaskState.FAILED
assert "implementer failed" in (task.error_message or "")