Compare commits

...

2 Commits

6 changed files with 516 additions and 47 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Iterable
@@ -87,12 +88,21 @@ CREATE TABLE IF NOT EXISTS agent_runs (
CREATE TABLE IF NOT EXISTS pull_request_feedback (
task_id INTEGER PRIMARY KEY REFERENCES tasks(id),
last_seen_comment_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0,
pending INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
"""
@dataclass(frozen=True)
class PullRequestFeedbackCursor:
last_seen_comment_id: int = 0
last_seen_review_id: int = 0
last_seen_review_comment_id: int = 0
class Database:
def __init__(self, path: str | Path):
self.path = Path(path)
@@ -106,8 +116,21 @@ class Database:
def migrate(self) -> None:
self.conn.executescript(SCHEMA)
self._migrate_pull_request_feedback_columns()
self.conn.commit()
def _migrate_pull_request_feedback_columns(self) -> None:
rows = self.conn.execute("PRAGMA table_info(pull_request_feedback)").fetchall()
columns = {row["name"] for row in rows}
if "last_seen_review_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_id INTEGER NOT NULL DEFAULT 0"
)
if "last_seen_review_comment_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0"
)
def upsert_repository(
self,
*,
@@ -298,44 +321,104 @@ class Database:
).fetchone()
return int(row["last_seen_comment_id"]) if row else 0
def get_pr_feedback_cursors(self, task_id: int) -> PullRequestFeedbackCursor:
row = self.conn.execute(
"""
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,),
).fetchone()
if row is None:
return PullRequestFeedbackCursor()
return PullRequestFeedbackCursor(
last_seen_comment_id=int(row["last_seen_comment_id"]),
last_seen_review_id=int(row["last_seen_review_id"]),
last_seen_review_comment_id=int(row["last_seen_review_comment_id"]),
)
def upsert_pr_feedback_state(
self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
pending: bool | None = None,
) -> None:
row = self.conn.execute(
"SELECT last_seen_comment_id, pending FROM pull_request_feedback WHERE task_id = ?",
"""
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id, pending
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,),
).fetchone()
now = dt_to_db(utcnow())
if row is None:
self.conn.execute(
"""
INSERT INTO pull_request_feedback (task_id, last_seen_comment_id, pending, updated_at)
VALUES (?, ?, ?, ?)
INSERT INTO pull_request_feedback (
task_id,
last_seen_comment_id,
last_seen_review_id,
last_seen_review_comment_id,
pending,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?)
""",
(task_id, last_seen_comment_id or 0, int(bool(pending)), now),
(
task_id,
last_seen_comment_id or 0,
last_seen_review_id or 0,
last_seen_review_comment_id or 0,
int(bool(pending)),
now,
),
)
else:
next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id
next_review_cursor = int(row["last_seen_review_id"]) if last_seen_review_id is None else last_seen_review_id
next_review_comment_cursor = (
int(row["last_seen_review_comment_id"])
if last_seen_review_comment_id is None
else last_seen_review_comment_id
)
next_pending = bool(row["pending"]) if pending is None else pending
self.conn.execute(
"""
UPDATE pull_request_feedback
SET last_seen_comment_id = ?, pending = ?, updated_at = ?
SET last_seen_comment_id = ?,
last_seen_review_id = ?,
last_seen_review_comment_id = ?,
pending = ?,
updated_at = ?
WHERE task_id = ?
""",
(next_cursor, int(next_pending), now, task_id),
(next_cursor, next_review_cursor, next_review_comment_cursor, int(next_pending), now, task_id),
)
self.conn.commit()
def mark_pr_feedback_pending(self, task_id: int) -> None:
self.upsert_pr_feedback_state(task_id, pending=True)
def clear_pr_feedback_pending(self, task_id: int, *, last_seen_comment_id: int) -> None:
self.upsert_pr_feedback_state(task_id, last_seen_comment_id=last_seen_comment_id, pending=False)
def clear_pr_feedback_pending(
self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
) -> None:
self.upsert_pr_feedback_state(
task_id,
last_seen_comment_id=last_seen_comment_id,
last_seen_review_id=last_seen_review_id,
last_seen_review_comment_id=last_seen_review_comment_id,
pending=False,
)
def has_pending_pr_feedback(self, task_id: int) -> bool:
row = self.conn.execute(

View File

@@ -38,6 +38,17 @@ class GiteaComment:
updated_at: datetime | None
@dataclass(frozen=True)
class GiteaPullReview:
id: int
body: str
author: str
state: str
html_url: str
submitted_at: datetime | None
updated_at: datetime | None
@dataclass(frozen=True)
class GiteaRepository:
owner: str
@@ -178,6 +189,55 @@ class GiteaClient:
response.raise_for_status()
return comment_from_payload(response.json())
def list_pull_request_reviews(self, *, owner: str, name: str, pr_number: int) -> list[GiteaPullReview]:
reviews: list[GiteaPullReview] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return reviews
response.raise_for_status()
payload = response.json()
if not payload:
break
reviews.extend(pull_review_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return reviews
def list_pull_request_review_comments(
self,
*,
owner: str,
name: str,
pr_number: int,
review_id: int,
) -> list[GiteaComment]:
comments: list[GiteaComment] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/comments",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return comments
response.raise_for_status()
payload = response.json()
if not payload:
break
comments.extend(review_comment_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return comments
def close_issue(self, *, owner: str, name: str, issue_number: int) -> None:
response = self.client.patch(
f"/repos/{owner}/{name}/issues/{issue_number}",
@@ -225,6 +285,20 @@ def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest:
)
def pull_review_from_payload(payload: dict[str, Any]) -> GiteaPullReview:
user_payload = payload.get("user") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
return GiteaPullReview(
id=int(payload["id"]),
body=payload.get("body") or "",
author=str(author),
state=payload.get("state") or "",
html_url=payload.get("html_url") or payload.get("url") or "",
submitted_at=parse_gitea_dt(payload.get("submitted_at")),
updated_at=parse_gitea_dt(payload.get("updated_at")),
)
def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
user_payload = payload.get("user") or payload.get("poster") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
@@ -238,6 +312,29 @@ def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
)
def review_comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
comment = comment_from_payload(payload)
path = payload.get("path")
position = payload.get("position") or payload.get("original_position")
location_parts = []
if path:
location_parts.append(str(path))
if position:
location_parts.append(f"line {position}")
if not location_parts:
return comment
location = ":".join(location_parts)
body = f"Inline comment on {location}\n\n{comment.body}"
return GiteaComment(
id=comment.id,
body=body,
author=comment.author,
html_url=comment.html_url,
created_at=comment.created_at,
updated_at=comment.updated_at,
)
def parse_gitea_dt(value: str | None) -> datetime | None:
if not value:
return None

View File

@@ -3,12 +3,13 @@ from __future__ import annotations
import socket
import time
import logging
from dataclasses import dataclass
from pathlib import Path
from .agents import CommandRunner, read_report, render_command, write_prompt
from .config import AppConfig
from .db import Database
from .gitea import GiteaClient, GiteaComment
from .db import Database, PullRequestFeedbackCursor
from .gitea import GiteaClient, GiteaComment, GiteaPullReview
from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState
from .rendering import (
parse_review_report,
@@ -26,6 +27,13 @@ from .workspace import WorkspaceManager, safe_branch_name
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class PullRequestFeedbackSnapshot:
human_comments: list[GiteaComment]
handled_cursor: PullRequestFeedbackCursor
newest_cursor: PullRequestFeedbackCursor
def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]:
synced: list[RepositoryRecord] = []
discovered = client.list_owned_repositories()
@@ -106,23 +114,21 @@ def scan_pull_request_feedback(db: Database, client: GiteaClient) -> int:
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
if pull_request.merged or pull_request.state != "open":
continue
comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
last_seen_comment_id = db.get_pr_feedback_cursor(task.id)
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id)
human_comments = [
comment
for comment in comments
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold()
]
if not human_comments:
if newest_comment_id > last_seen_comment_id:
db.upsert_pr_feedback_state(task.id, last_seen_comment_id=newest_comment_id)
cursors = db.get_pr_feedback_cursors(task.id)
feedback = load_new_human_pr_feedback(client, repo, task.pr_number, username, cursors)
if not feedback.human_comments:
db.upsert_pr_feedback_state(
task.id,
last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
)
continue
db.mark_pr_feedback_pending(task.id)
db.transition(
task.id,
TaskState.DISCOVERED,
message=f"queued PR feedback from {len(human_comments)} human comment(s)",
message=f"queued PR feedback from {len(feedback.human_comments)} human comment(s)",
clear_lease=True,
)
queued += 1
@@ -256,6 +262,7 @@ class TaskRunner:
message=f"human review summary posted with verdict {review.verdict}",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord:
@@ -273,33 +280,36 @@ class TaskRunner:
message=f"running implementer agent for PR #{task.pr_number} feedback",
workspace_path=workspace,
)
last_seen_comment_id = self.db.get_pr_feedback_cursor(task.id)
comments = self.gitea.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
cursors = self.db.get_pr_feedback_cursors(task.id)
username = self.gitea.get_authenticated_username()
human_comments = [
comment
for comment in comments
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold()
]
if not human_comments:
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id)
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id)
return self.db.transition(
feedback = load_new_human_pr_feedback(self.gitea, repo, task.pr_number, username, cursors)
if not feedback.human_comments:
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
)
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message="PR feedback queue was stale; no new human comments found",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
implementation_report = self._run_pr_feedback_implementer(
task,
repo,
issue,
task.branch_name,
task.pr_number,
human_comments,
feedback.human_comments,
workspace,
)
task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff")
has_code_changes = self.workspace_manager.has_uncommitted_changes(workspace)
if has_code_changes:
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
@@ -310,6 +320,21 @@ class TaskRunner:
issue_number=task.pr_number,
body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}",
)
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=feedback.handled_cursor.last_seen_comment_id,
last_seen_review_id=feedback.handled_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.handled_cursor.last_seen_review_comment_id,
)
if not has_code_changes:
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message="PR feedback handled without code changes",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}")
validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback")
@@ -327,17 +352,20 @@ class TaskRunner:
issue_number=task.pr_number,
body=render_human_review_summary(review),
)
newest_comment_id = max(
[comment.id for comment in comments] + [reviewer_comment.id, summary_comment.id],
default=max(reviewer_comment.id, summary_comment.id),
self.db.add_event(
task.id,
TaskState.REVIEWING,
TaskState.REVIEWING,
f"posted reviewer comments {reviewer_comment.id} and {summary_comment.id}",
)
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id)
return self.db.transition(
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message=f"PR feedback handled; human review summary posted with verdict {review.verdict}",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
def _run_implementer(
self,
@@ -456,6 +484,17 @@ class TaskRunner:
def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
return load_task_context(self.db, task)
def _cleanup_workspace_on_success(self, task: TaskRecord, workspace: Path) -> None:
if not self.config.workspace.cleanup_on_success:
return
try:
self.workspace_manager.cleanup(workspace)
except Exception as exc: # pragma: no cover - defensive best-effort cleanup
logger.warning("failed to cleanup workspace %s for task %d: %s", workspace, task.id, exc)
self.db.add_event(task.id, task.state, task.state, f"workspace cleanup failed: {exc}")
else:
self.db.add_event(task.id, task.state, task.state, f"cleaned workspace {workspace}")
def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone()
@@ -466,3 +505,72 @@ def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord,
if issue is None:
raise ValueError(f"issue not found for task {task.id}")
return repo, issue
def load_new_human_pr_feedback(
client: GiteaClient,
repo: RepositoryRecord,
pr_number: int,
username: str,
cursors: PullRequestFeedbackCursor,
) -> PullRequestFeedbackSnapshot:
issue_comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=pr_number)
reviews = client.list_pull_request_reviews(owner=repo.owner, name=repo.name, pr_number=pr_number)
review_comments: list[GiteaComment] = []
for review in reviews:
review_comments.extend(
client.list_pull_request_review_comments(
owner=repo.owner,
name=repo.name,
pr_number=pr_number,
review_id=review.id,
)
)
bot = username.casefold()
human_issue_comments = [
comment
for comment in issue_comments
if comment.id > cursors.last_seen_comment_id and comment.author.casefold() != bot
]
human_reviews = [
review_as_comment(review)
for review in reviews
if review.id > cursors.last_seen_review_id and review.author.casefold() != bot and review.body.strip()
]
human_review_comments = [
comment
for comment in review_comments
if comment.id > cursors.last_seen_review_comment_id and comment.author.casefold() != bot
]
human_comments = [*human_issue_comments, *human_reviews, *human_review_comments]
handled_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in human_issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews if review.id > cursors.last_seen_review_id), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in human_review_comments),
default=cursors.last_seen_review_comment_id,
),
)
newest_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in review_comments),
default=cursors.last_seen_review_comment_id,
),
)
return PullRequestFeedbackSnapshot(human_comments, handled_cursor, newest_cursor)
def review_as_comment(review: GiteaPullReview) -> GiteaComment:
body = f"Pull request review ({review.state or 'COMMENT'})\n\n{review.body}"
return GiteaComment(
id=review.id,
body=body,
author=review.author,
html_url=review.html_url,
created_at=review.submitted_at,
updated_at=review.updated_at,
)

View File

@@ -8,7 +8,13 @@ ALLOWED_TRANSITIONS: dict[TaskState, set[TaskState]] = {
TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.TESTING: {TaskState.PR_OPENED, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.TESTING: {
TaskState.PR_OPENED,
TaskState.HUMAN_REVIEW_READY,
TaskState.FAILED,
TaskState.BLOCKED,
TaskState.CANCELLED,
},
TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED},
TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED},
TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED},

View File

@@ -77,6 +77,10 @@ class WorkspaceManager:
diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False)
return diff.returncode == 1
def has_uncommitted_changes(self, workspace: str | Path) -> bool:
result = self._git(["status", "--porcelain"], Path(workspace), check=False)
return bool(result.stdout.strip())
def commit_changes(self, workspace: str | Path, message: str) -> str:
workspace_path = Path(workspace)
self._git(["add", "-A"], workspace_path)

View File

@@ -174,6 +174,7 @@ class FakeWorkspaceManager:
self.diff = diff
self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = []
self.cleaned: list[Path] = []
def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_")
@@ -189,6 +190,9 @@ class FakeWorkspaceManager:
def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff
def has_uncommitted_changes(self, workspace):
return self.diff
def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name)
@@ -197,7 +201,7 @@ class FakeWorkspaceManager:
return "abc1234"
def cleanup(self, workspace):
pass
self.cleaned.append(Path(workspace))
class FakeRunner:
@@ -349,6 +353,55 @@ def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[])
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews":
return httpx.Response(
200,
json=[
{
"id": 9,
"body": "",
"state": "REQUEST_CHANGES",
"user": {"login": "alice"},
}
],
)
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments":
return httpx.Response(
200,
json=[
{
"id": 31,
"body": "This branch misses the cleanup case.",
"path": "src/service.py",
"position": 42,
"user": {"login": "alice"},
}
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
cursors = db.get_pr_feedback_cursors(task.id)
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert cursors.last_seen_comment_id == 2
assert cursors.last_seen_review_id == 0
assert cursors.last_seen_review_comment_id == 0
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
@@ -360,7 +413,7 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
next_comment_id = 3
next_comment_id = 4
def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id
@@ -400,9 +453,127 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 6
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def rescan_handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}},
{"id": 5, "body": "processed", "user": {"login": "agent-bot"}},
{"id": 6, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 7, "body": "summary", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(rescan_handler))
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.pushed == []
assert not hasattr(workspace_manager, "commit_message")
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path):
config = make_config(
tmp_path,
workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True},
)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please address the review.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.cleaned == [workspace_path]
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository(