代理实现:Fix: 修复 comment 的 bug #8

Merged
gahow merged 1 commits from agent/issue-7-fix-comment-bug into main 2026-05-06 09:26:53 +00:00
6 changed files with 516 additions and 47 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import sqlite3 import sqlite3
from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Iterable
@@ -87,12 +88,21 @@ CREATE TABLE IF NOT EXISTS agent_runs (
CREATE TABLE IF NOT EXISTS pull_request_feedback ( CREATE TABLE IF NOT EXISTS pull_request_feedback (
task_id INTEGER PRIMARY KEY REFERENCES tasks(id), task_id INTEGER PRIMARY KEY REFERENCES tasks(id),
last_seen_comment_id INTEGER NOT NULL DEFAULT 0, last_seen_comment_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0,
pending INTEGER NOT NULL DEFAULT 0, pending INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL updated_at TEXT NOT NULL
); );
""" """
@dataclass(frozen=True)
class PullRequestFeedbackCursor:
last_seen_comment_id: int = 0
last_seen_review_id: int = 0
last_seen_review_comment_id: int = 0
class Database: class Database:
def __init__(self, path: str | Path): def __init__(self, path: str | Path):
self.path = Path(path) self.path = Path(path)
@@ -106,8 +116,21 @@ class Database:
def migrate(self) -> None: def migrate(self) -> None:
self.conn.executescript(SCHEMA) self.conn.executescript(SCHEMA)
self._migrate_pull_request_feedback_columns()
self.conn.commit() self.conn.commit()
def _migrate_pull_request_feedback_columns(self) -> None:
rows = self.conn.execute("PRAGMA table_info(pull_request_feedback)").fetchall()
columns = {row["name"] for row in rows}
if "last_seen_review_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_id INTEGER NOT NULL DEFAULT 0"
)
if "last_seen_review_comment_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0"
)
def upsert_repository( def upsert_repository(
self, self,
*, *,
@@ -298,44 +321,104 @@ class Database:
).fetchone() ).fetchone()
return int(row["last_seen_comment_id"]) if row else 0 return int(row["last_seen_comment_id"]) if row else 0
def get_pr_feedback_cursors(self, task_id: int) -> PullRequestFeedbackCursor:
row = self.conn.execute(
"""
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,),
).fetchone()
if row is None:
return PullRequestFeedbackCursor()
return PullRequestFeedbackCursor(
last_seen_comment_id=int(row["last_seen_comment_id"]),
last_seen_review_id=int(row["last_seen_review_id"]),
last_seen_review_comment_id=int(row["last_seen_review_comment_id"]),
)
def upsert_pr_feedback_state( def upsert_pr_feedback_state(
self, self,
task_id: int, task_id: int,
*, *,
last_seen_comment_id: int | None = None, last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
pending: bool | None = None, pending: bool | None = None,
) -> None: ) -> None:
row = self.conn.execute( row = self.conn.execute(
"SELECT last_seen_comment_id, pending FROM pull_request_feedback WHERE task_id = ?", """
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id, pending
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,), (task_id,),
).fetchone() ).fetchone()
now = dt_to_db(utcnow()) now = dt_to_db(utcnow())
if row is None: if row is None:
self.conn.execute( self.conn.execute(
""" """
INSERT INTO pull_request_feedback (task_id, last_seen_comment_id, pending, updated_at) INSERT INTO pull_request_feedback (
VALUES (?, ?, ?, ?) task_id,
last_seen_comment_id,
last_seen_review_id,
last_seen_review_comment_id,
pending,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?)
""", """,
(task_id, last_seen_comment_id or 0, int(bool(pending)), now), (
task_id,
last_seen_comment_id or 0,
last_seen_review_id or 0,
last_seen_review_comment_id or 0,
int(bool(pending)),
now,
),
) )
else: else:
next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id
next_review_cursor = int(row["last_seen_review_id"]) if last_seen_review_id is None else last_seen_review_id
next_review_comment_cursor = (
int(row["last_seen_review_comment_id"])
if last_seen_review_comment_id is None
else last_seen_review_comment_id
)
next_pending = bool(row["pending"]) if pending is None else pending next_pending = bool(row["pending"]) if pending is None else pending
self.conn.execute( self.conn.execute(
""" """
UPDATE pull_request_feedback UPDATE pull_request_feedback
SET last_seen_comment_id = ?, pending = ?, updated_at = ? SET last_seen_comment_id = ?,
last_seen_review_id = ?,
last_seen_review_comment_id = ?,
pending = ?,
updated_at = ?
WHERE task_id = ? WHERE task_id = ?
""", """,
(next_cursor, int(next_pending), now, task_id), (next_cursor, next_review_cursor, next_review_comment_cursor, int(next_pending), now, task_id),
) )
self.conn.commit() self.conn.commit()
def mark_pr_feedback_pending(self, task_id: int) -> None: def mark_pr_feedback_pending(self, task_id: int) -> None:
self.upsert_pr_feedback_state(task_id, pending=True) self.upsert_pr_feedback_state(task_id, pending=True)
def clear_pr_feedback_pending(self, task_id: int, *, last_seen_comment_id: int) -> None: def clear_pr_feedback_pending(
self.upsert_pr_feedback_state(task_id, last_seen_comment_id=last_seen_comment_id, pending=False) self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
) -> None:
self.upsert_pr_feedback_state(
task_id,
last_seen_comment_id=last_seen_comment_id,
last_seen_review_id=last_seen_review_id,
last_seen_review_comment_id=last_seen_review_comment_id,
pending=False,
)
def has_pending_pr_feedback(self, task_id: int) -> bool: def has_pending_pr_feedback(self, task_id: int) -> bool:
row = self.conn.execute( row = self.conn.execute(

View File

@@ -38,6 +38,17 @@ class GiteaComment:
updated_at: datetime | None updated_at: datetime | None
@dataclass(frozen=True)
class GiteaPullReview:
id: int
body: str
author: str
state: str
html_url: str
submitted_at: datetime | None
updated_at: datetime | None
@dataclass(frozen=True) @dataclass(frozen=True)
class GiteaRepository: class GiteaRepository:
owner: str owner: str
@@ -178,6 +189,55 @@ class GiteaClient:
response.raise_for_status() response.raise_for_status()
return comment_from_payload(response.json()) return comment_from_payload(response.json())
def list_pull_request_reviews(self, *, owner: str, name: str, pr_number: int) -> list[GiteaPullReview]:
reviews: list[GiteaPullReview] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return reviews
response.raise_for_status()
payload = response.json()
if not payload:
break
reviews.extend(pull_review_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return reviews
def list_pull_request_review_comments(
self,
*,
owner: str,
name: str,
pr_number: int,
review_id: int,
) -> list[GiteaComment]:
comments: list[GiteaComment] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/comments",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return comments
response.raise_for_status()
payload = response.json()
if not payload:
break
comments.extend(review_comment_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return comments
def close_issue(self, *, owner: str, name: str, issue_number: int) -> None: def close_issue(self, *, owner: str, name: str, issue_number: int) -> None:
response = self.client.patch( response = self.client.patch(
f"/repos/{owner}/{name}/issues/{issue_number}", f"/repos/{owner}/{name}/issues/{issue_number}",
@@ -225,6 +285,20 @@ def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest:
) )
def pull_review_from_payload(payload: dict[str, Any]) -> GiteaPullReview:
user_payload = payload.get("user") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
return GiteaPullReview(
id=int(payload["id"]),
body=payload.get("body") or "",
author=str(author),
state=payload.get("state") or "",
html_url=payload.get("html_url") or payload.get("url") or "",
submitted_at=parse_gitea_dt(payload.get("submitted_at")),
updated_at=parse_gitea_dt(payload.get("updated_at")),
)
def comment_from_payload(payload: dict[str, Any]) -> GiteaComment: def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
user_payload = payload.get("user") or payload.get("poster") or {} user_payload = payload.get("user") or payload.get("poster") or {}
author = user_payload.get("login") or user_payload.get("username") or "" author = user_payload.get("login") or user_payload.get("username") or ""
@@ -238,6 +312,29 @@ def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
) )
def review_comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
comment = comment_from_payload(payload)
path = payload.get("path")
position = payload.get("position") or payload.get("original_position")
location_parts = []
if path:
location_parts.append(str(path))
if position:
location_parts.append(f"line {position}")
if not location_parts:
return comment
location = ":".join(location_parts)
body = f"Inline comment on {location}\n\n{comment.body}"
return GiteaComment(
id=comment.id,
body=body,
author=comment.author,
html_url=comment.html_url,
created_at=comment.created_at,
updated_at=comment.updated_at,
)
def parse_gitea_dt(value: str | None) -> datetime | None: def parse_gitea_dt(value: str | None) -> datetime | None:
if not value: if not value:
return None return None

View File

@@ -3,12 +3,13 @@ from __future__ import annotations
import socket import socket
import time import time
import logging import logging
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from .agents import CommandRunner, read_report, render_command, write_prompt from .agents import CommandRunner, read_report, render_command, write_prompt
from .config import AppConfig from .config import AppConfig
from .db import Database from .db import Database, PullRequestFeedbackCursor
from .gitea import GiteaClient, GiteaComment from .gitea import GiteaClient, GiteaComment, GiteaPullReview
from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState
from .rendering import ( from .rendering import (
parse_review_report, parse_review_report,
@@ -26,6 +27,13 @@ from .workspace import WorkspaceManager, safe_branch_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class PullRequestFeedbackSnapshot:
human_comments: list[GiteaComment]
handled_cursor: PullRequestFeedbackCursor
newest_cursor: PullRequestFeedbackCursor
def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]: def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]:
synced: list[RepositoryRecord] = [] synced: list[RepositoryRecord] = []
discovered = client.list_owned_repositories() discovered = client.list_owned_repositories()
@@ -106,23 +114,21 @@ def scan_pull_request_feedback(db: Database, client: GiteaClient) -> int:
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number) pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
if pull_request.merged or pull_request.state != "open": if pull_request.merged or pull_request.state != "open":
continue continue
comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number) cursors = db.get_pr_feedback_cursors(task.id)
last_seen_comment_id = db.get_pr_feedback_cursor(task.id) feedback = load_new_human_pr_feedback(client, repo, task.pr_number, username, cursors)
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id) if not feedback.human_comments:
human_comments = [ db.upsert_pr_feedback_state(
comment task.id,
for comment in comments last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold() last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
] last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
if not human_comments: )
if newest_comment_id > last_seen_comment_id:
db.upsert_pr_feedback_state(task.id, last_seen_comment_id=newest_comment_id)
continue continue
db.mark_pr_feedback_pending(task.id) db.mark_pr_feedback_pending(task.id)
db.transition( db.transition(
task.id, task.id,
TaskState.DISCOVERED, TaskState.DISCOVERED,
message=f"queued PR feedback from {len(human_comments)} human comment(s)", message=f"queued PR feedback from {len(feedback.human_comments)} human comment(s)",
clear_lease=True, clear_lease=True,
) )
queued += 1 queued += 1
@@ -256,6 +262,7 @@ class TaskRunner:
message=f"human review summary posted with verdict {review.verdict}", message=f"human review summary posted with verdict {review.verdict}",
clear_lease=True, clear_lease=True,
) )
self._cleanup_workspace_on_success(final_task, workspace)
return final_task return final_task
def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord: def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord:
@@ -273,33 +280,36 @@ class TaskRunner:
message=f"running implementer agent for PR #{task.pr_number} feedback", message=f"running implementer agent for PR #{task.pr_number} feedback",
workspace_path=workspace, workspace_path=workspace,
) )
last_seen_comment_id = self.db.get_pr_feedback_cursor(task.id) cursors = self.db.get_pr_feedback_cursors(task.id)
comments = self.gitea.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
username = self.gitea.get_authenticated_username() username = self.gitea.get_authenticated_username()
human_comments = [ feedback = load_new_human_pr_feedback(self.gitea, repo, task.pr_number, username, cursors)
comment if not feedback.human_comments:
for comment in comments self.db.clear_pr_feedback_pending(
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold() task.id,
] last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
if not human_comments: last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id) last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id) )
return self.db.transition( final_task = self.db.transition(
task.id, task.id,
TaskState.HUMAN_REVIEW_READY, TaskState.HUMAN_REVIEW_READY,
message="PR feedback queue was stale; no new human comments found", message="PR feedback queue was stale; no new human comments found",
clear_lease=True, clear_lease=True,
) )
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
implementation_report = self._run_pr_feedback_implementer( implementation_report = self._run_pr_feedback_implementer(
task, task,
repo, repo,
issue, issue,
task.branch_name, task.branch_name,
task.pr_number, task.pr_number,
human_comments, feedback.human_comments,
workspace, workspace,
) )
task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff") task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff")
has_code_changes = self.workspace_manager.has_uncommitted_changes(workspace)
if has_code_changes:
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}" commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
commit_id = self.workspace_manager.commit_changes(workspace, commit_message) commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}") self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
@@ -310,6 +320,21 @@ class TaskRunner:
issue_number=task.pr_number, issue_number=task.pr_number,
body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}", body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}",
) )
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=feedback.handled_cursor.last_seen_comment_id,
last_seen_review_id=feedback.handled_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.handled_cursor.last_seen_review_comment_id,
)
if not has_code_changes:
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message="PR feedback handled without code changes",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}") task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}")
validate_transition(task.state, TaskState.REVIEWING) validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback") task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback")
@@ -327,17 +352,20 @@ class TaskRunner:
issue_number=task.pr_number, issue_number=task.pr_number,
body=render_human_review_summary(review), body=render_human_review_summary(review),
) )
newest_comment_id = max( self.db.add_event(
[comment.id for comment in comments] + [reviewer_comment.id, summary_comment.id], task.id,
default=max(reviewer_comment.id, summary_comment.id), TaskState.REVIEWING,
TaskState.REVIEWING,
f"posted reviewer comments {reviewer_comment.id} and {summary_comment.id}",
) )
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id) final_task = self.db.transition(
return self.db.transition(
task.id, task.id,
TaskState.HUMAN_REVIEW_READY, TaskState.HUMAN_REVIEW_READY,
message=f"PR feedback handled; human review summary posted with verdict {review.verdict}", message=f"PR feedback handled; human review summary posted with verdict {review.verdict}",
clear_lease=True, clear_lease=True,
) )
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
def _run_implementer( def _run_implementer(
self, self,
@@ -456,6 +484,17 @@ class TaskRunner:
def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]: def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
return load_task_context(self.db, task) return load_task_context(self.db, task)
def _cleanup_workspace_on_success(self, task: TaskRecord, workspace: Path) -> None:
if not self.config.workspace.cleanup_on_success:
return
try:
self.workspace_manager.cleanup(workspace)
except Exception as exc: # pragma: no cover - defensive best-effort cleanup
logger.warning("failed to cleanup workspace %s for task %d: %s", workspace, task.id, exc)
self.db.add_event(task.id, task.state, task.state, f"workspace cleanup failed: {exc}")
else:
self.db.add_event(task.id, task.state, task.state, f"cleaned workspace {workspace}")
def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]: def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone() repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone()
@@ -466,3 +505,72 @@ def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord,
if issue is None: if issue is None:
raise ValueError(f"issue not found for task {task.id}") raise ValueError(f"issue not found for task {task.id}")
return repo, issue return repo, issue
def load_new_human_pr_feedback(
client: GiteaClient,
repo: RepositoryRecord,
pr_number: int,
username: str,
cursors: PullRequestFeedbackCursor,
) -> PullRequestFeedbackSnapshot:
issue_comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=pr_number)
reviews = client.list_pull_request_reviews(owner=repo.owner, name=repo.name, pr_number=pr_number)
review_comments: list[GiteaComment] = []
for review in reviews:
review_comments.extend(
client.list_pull_request_review_comments(
owner=repo.owner,
name=repo.name,
pr_number=pr_number,
review_id=review.id,
)
)
bot = username.casefold()
human_issue_comments = [
comment
for comment in issue_comments
if comment.id > cursors.last_seen_comment_id and comment.author.casefold() != bot
]
human_reviews = [
review_as_comment(review)
for review in reviews
if review.id > cursors.last_seen_review_id and review.author.casefold() != bot and review.body.strip()
]
human_review_comments = [
comment
for comment in review_comments
if comment.id > cursors.last_seen_review_comment_id and comment.author.casefold() != bot
]
human_comments = [*human_issue_comments, *human_reviews, *human_review_comments]
handled_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in human_issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews if review.id > cursors.last_seen_review_id), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in human_review_comments),
default=cursors.last_seen_review_comment_id,
),
)
newest_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in review_comments),
default=cursors.last_seen_review_comment_id,
),
)
return PullRequestFeedbackSnapshot(human_comments, handled_cursor, newest_cursor)
def review_as_comment(review: GiteaPullReview) -> GiteaComment:
body = f"Pull request review ({review.state or 'COMMENT'})\n\n{review.body}"
return GiteaComment(
id=review.id,
body=body,
author=review.author,
html_url=review.html_url,
created_at=review.submitted_at,
updated_at=review.updated_at,
)

View File

@@ -8,7 +8,13 @@ ALLOWED_TRANSITIONS: dict[TaskState, set[TaskState]] = {
TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.TESTING: {TaskState.PR_OPENED, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.TESTING: {
TaskState.PR_OPENED,
TaskState.HUMAN_REVIEW_READY,
TaskState.FAILED,
TaskState.BLOCKED,
TaskState.CANCELLED,
},
TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED}, TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED},
TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED}, TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED},
TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED}, TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED},

View File

@@ -77,6 +77,10 @@ class WorkspaceManager:
diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False) diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False)
return diff.returncode == 1 return diff.returncode == 1
def has_uncommitted_changes(self, workspace: str | Path) -> bool:
result = self._git(["status", "--porcelain"], Path(workspace), check=False)
return bool(result.stdout.strip())
def commit_changes(self, workspace: str | Path, message: str) -> str: def commit_changes(self, workspace: str | Path, message: str) -> str:
workspace_path = Path(workspace) workspace_path = Path(workspace)
self._git(["add", "-A"], workspace_path) self._git(["add", "-A"], workspace_path)

View File

@@ -174,6 +174,7 @@ class FakeWorkspaceManager:
self.diff = diff self.diff = diff
self.pushed: list[str] = [] self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = [] self.resumed: list[tuple[str, Path | None]] = []
self.cleaned: list[Path] = []
def prepare(self, repo, issue, branch_name): def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_") path = self.root / branch_name.replace("/", "_")
@@ -189,6 +190,9 @@ class FakeWorkspaceManager:
def has_diff(self, workspace, base_ref="origin/HEAD"): def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff return self.diff
def has_uncommitted_changes(self, workspace):
return self.diff
def push_branch(self, workspace, branch_name): def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name) self.pushed.append(branch_name)
@@ -197,7 +201,7 @@ class FakeWorkspaceManager:
return "abc1234" return "abc1234"
def cleanup(self, workspace): def cleanup(self, workspace):
pass self.cleaned.append(Path(workspace))
class FakeRunner: class FakeRunner:
@@ -349,6 +353,55 @@ def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr] assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[])
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews":
return httpx.Response(
200,
json=[
{
"id": 9,
"body": "",
"state": "REQUEST_CHANGES",
"user": {"login": "alice"},
}
],
)
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments":
return httpx.Response(
200,
json=[
{
"id": 31,
"body": "This branch misses the cleanup case.",
"path": "src/service.py",
"position": 42,
"user": {"login": "alice"},
}
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
cursors = db.get_pr_feedback_cursors(task.id)
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert cursors.last_seen_comment_id == 2
assert cursors.last_seen_review_id == 0
assert cursors.last_seen_review_comment_id == 0
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path): def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path) config = make_config(tmp_path)
task = seed_task(db) task = seed_task(db)
@@ -360,7 +413,7 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id)) db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit() db.conn.commit()
requests: list[tuple[str, str, dict]] = [] requests: list[tuple[str, str, dict]] = []
next_comment_id = 3 next_comment_id = 4
def handler(request: httpx.Request) -> httpx.Response: def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id nonlocal next_comment_id
@@ -400,9 +453,127 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1" assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests) assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"] assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 6 assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id) assert not db.has_pending_pr_feedback(task.id)
def rescan_handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}},
{"id": 5, "body": "processed", "user": {"login": "agent-bot"}},
{"id": 6, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 7, "body": "summary", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(rescan_handler))
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.pushed == []
assert not hasattr(workspace_manager, "commit_message")
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path):
config = make_config(
tmp_path,
workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True},
)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please address the review.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.cleaned == [workspace_path]
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db): def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository( repo = db.upsert_repository(