Compare commits

..

6 Commits

10 changed files with 871 additions and 100 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
.agent-gitea/ .agent-gitea/
.agent-output/
.env .env
.pytest_cache/ .pytest_cache/
.ruff_cache/ .ruff_cache/

1
run.sh Executable file
View File

@@ -0,0 +1 @@
uv run agent-gitea --config config.yaml worker

View File

@@ -7,8 +7,29 @@ from .models import AgentResult
class CommandRunner: class CommandRunner:
def run(self, command: list[str], cwd: str | Path, *, stdin: str | None = None) -> AgentResult: def run(
result = subprocess.run(command, cwd=cwd, input=stdin, text=True, capture_output=True, check=False) self,
command: list[str],
cwd: str | Path,
*,
stdin: str | None = None,
timeout_seconds: int | float | None = None,
) -> AgentResult:
try:
result = subprocess.run(
command,
cwd=cwd,
input=stdin,
text=True,
capture_output=True,
check=False,
timeout=timeout_seconds,
)
except subprocess.TimeoutExpired as exc:
stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode(errors="replace")
stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode(errors="replace")
message = f"command timed out after {timeout_seconds} seconds"
return AgentResult(exit_code=124, stdout=stdout or "", stderr=f"{stderr}\n{message}".strip())
return AgentResult(exit_code=result.returncode, stdout=result.stdout, stderr=result.stderr) return AgentResult(exit_code=result.returncode, stdout=result.stdout, stderr=result.stderr)

View File

@@ -24,6 +24,14 @@ class SchedulerConfig(BaseModel):
interval_seconds: int = Field(default=60, ge=1) interval_seconds: int = Field(default=60, ge=1)
concurrency: int = Field(default=1, ge=1) concurrency: int = Field(default=1, ge=1)
lease_seconds: int = Field(default=1800, ge=30) lease_seconds: int = Field(default=1800, ge=30)
lease_renewal_interval_seconds: int | None = Field(default=None, ge=1)
agent_timeout_seconds: int = Field(default=7200, ge=1)
@property
def effective_lease_renewal_interval_seconds(self) -> int:
if self.lease_renewal_interval_seconds is not None:
return self.lease_renewal_interval_seconds
return max(1, self.lease_seconds // 3)
class WorkspaceConfig(BaseModel): class WorkspaceConfig(BaseModel):

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import sqlite3 import sqlite3
from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Iterable
@@ -87,12 +88,21 @@ CREATE TABLE IF NOT EXISTS agent_runs (
CREATE TABLE IF NOT EXISTS pull_request_feedback ( CREATE TABLE IF NOT EXISTS pull_request_feedback (
task_id INTEGER PRIMARY KEY REFERENCES tasks(id), task_id INTEGER PRIMARY KEY REFERENCES tasks(id),
last_seen_comment_id INTEGER NOT NULL DEFAULT 0, last_seen_comment_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0,
pending INTEGER NOT NULL DEFAULT 0, pending INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL updated_at TEXT NOT NULL
); );
""" """
@dataclass(frozen=True)
class PullRequestFeedbackCursor:
last_seen_comment_id: int = 0
last_seen_review_id: int = 0
last_seen_review_comment_id: int = 0
class Database: class Database:
def __init__(self, path: str | Path): def __init__(self, path: str | Path):
self.path = Path(path) self.path = Path(path)
@@ -106,8 +116,21 @@ class Database:
def migrate(self) -> None: def migrate(self) -> None:
self.conn.executescript(SCHEMA) self.conn.executescript(SCHEMA)
self._migrate_pull_request_feedback_columns()
self.conn.commit() self.conn.commit()
def _migrate_pull_request_feedback_columns(self) -> None:
rows = self.conn.execute("PRAGMA table_info(pull_request_feedback)").fetchall()
columns = {row["name"] for row in rows}
if "last_seen_review_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_id INTEGER NOT NULL DEFAULT 0"
)
if "last_seen_review_comment_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0"
)
def upsert_repository( def upsert_repository(
self, self,
*, *,
@@ -269,12 +292,23 @@ class Database:
SELECT t.* SELECT t.*
FROM tasks t FROM tasks t
JOIN issues i ON i.repo_id = t.repo_id AND i.issue_number = t.issue_number JOIN issues i ON i.repo_id = t.repo_id AND i.issue_number = t.issue_number
WHERE t.state = ? WHERE t.state != ?
AND t.pr_number IS NOT NULL AND t.pr_number IS NOT NULL
AND i.state = 'open' AND i.state = 'open'
AND (
t.state IN (?, ?)
OR t.lease_owner IS NULL
OR t.lease_expires_at IS NULL
OR t.lease_expires_at < ?
)
ORDER BY t.id ORDER BY t.id
""", """,
(TaskState.HUMAN_REVIEW_READY.value,), (
TaskState.CANCELLED.value,
TaskState.HUMAN_REVIEW_READY.value,
TaskState.FAILED.value,
dt_to_db(utcnow()),
),
).fetchall() ).fetchall()
return [self._task(row) for row in rows] return [self._task(row) for row in rows]
@@ -298,44 +332,104 @@ class Database:
).fetchone() ).fetchone()
return int(row["last_seen_comment_id"]) if row else 0 return int(row["last_seen_comment_id"]) if row else 0
def get_pr_feedback_cursors(self, task_id: int) -> PullRequestFeedbackCursor:
row = self.conn.execute(
"""
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,),
).fetchone()
if row is None:
return PullRequestFeedbackCursor()
return PullRequestFeedbackCursor(
last_seen_comment_id=int(row["last_seen_comment_id"]),
last_seen_review_id=int(row["last_seen_review_id"]),
last_seen_review_comment_id=int(row["last_seen_review_comment_id"]),
)
def upsert_pr_feedback_state( def upsert_pr_feedback_state(
self, self,
task_id: int, task_id: int,
*, *,
last_seen_comment_id: int | None = None, last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
pending: bool | None = None, pending: bool | None = None,
) -> None: ) -> None:
row = self.conn.execute( row = self.conn.execute(
"SELECT last_seen_comment_id, pending FROM pull_request_feedback WHERE task_id = ?", """
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id, pending
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,), (task_id,),
).fetchone() ).fetchone()
now = dt_to_db(utcnow()) now = dt_to_db(utcnow())
if row is None: if row is None:
self.conn.execute( self.conn.execute(
""" """
INSERT INTO pull_request_feedback (task_id, last_seen_comment_id, pending, updated_at) INSERT INTO pull_request_feedback (
VALUES (?, ?, ?, ?) task_id,
last_seen_comment_id,
last_seen_review_id,
last_seen_review_comment_id,
pending,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?)
""", """,
(task_id, last_seen_comment_id or 0, int(bool(pending)), now), (
task_id,
last_seen_comment_id or 0,
last_seen_review_id or 0,
last_seen_review_comment_id or 0,
int(bool(pending)),
now,
),
) )
else: else:
next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id
next_review_cursor = int(row["last_seen_review_id"]) if last_seen_review_id is None else last_seen_review_id
next_review_comment_cursor = (
int(row["last_seen_review_comment_id"])
if last_seen_review_comment_id is None
else last_seen_review_comment_id
)
next_pending = bool(row["pending"]) if pending is None else pending next_pending = bool(row["pending"]) if pending is None else pending
self.conn.execute( self.conn.execute(
""" """
UPDATE pull_request_feedback UPDATE pull_request_feedback
SET last_seen_comment_id = ?, pending = ?, updated_at = ? SET last_seen_comment_id = ?,
last_seen_review_id = ?,
last_seen_review_comment_id = ?,
pending = ?,
updated_at = ?
WHERE task_id = ? WHERE task_id = ?
""", """,
(next_cursor, int(next_pending), now, task_id), (next_cursor, next_review_cursor, next_review_comment_cursor, int(next_pending), now, task_id),
) )
self.conn.commit() self.conn.commit()
def mark_pr_feedback_pending(self, task_id: int) -> None: def mark_pr_feedback_pending(self, task_id: int) -> None:
self.upsert_pr_feedback_state(task_id, pending=True) self.upsert_pr_feedback_state(task_id, pending=True)
def clear_pr_feedback_pending(self, task_id: int, *, last_seen_comment_id: int) -> None: def clear_pr_feedback_pending(
self.upsert_pr_feedback_state(task_id, last_seen_comment_id=last_seen_comment_id, pending=False) self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
) -> None:
self.upsert_pr_feedback_state(
task_id,
last_seen_comment_id=last_seen_comment_id,
last_seen_review_id=last_seen_review_id,
last_seen_review_comment_id=last_seen_review_comment_id,
pending=False,
)
def has_pending_pr_feedback(self, task_id: int) -> bool: def has_pending_pr_feedback(self, task_id: int) -> bool:
row = self.conn.execute( row = self.conn.execute(
@@ -347,49 +441,78 @@ class Database:
def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None: def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None:
now = utcnow() now = utcnow()
expires = now + timedelta(seconds=lease_seconds) expires = now + timedelta(seconds=lease_seconds)
row = self.conn.execute( try:
""" self.conn.execute("BEGIN IMMEDIATE")
SELECT * FROM tasks row = self.conn.execute(
WHERE state = ? """
OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?) SELECT * FROM tasks
ORDER BY created_at WHERE state = ?
LIMIT 1 OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?)
""", ORDER BY created_at
( LIMIT 1
TaskState.DISCOVERED.value, """,
TaskState.CLAIMED.value, (
TaskState.PLANNING.value, TaskState.DISCOVERED.value,
TaskState.IMPLEMENTING.value, TaskState.CLAIMED.value,
TaskState.TESTING.value, TaskState.PLANNING.value,
TaskState.PR_OPENED.value, TaskState.IMPLEMENTING.value,
TaskState.REVIEWING.value, TaskState.TESTING.value,
TaskState.DISCOVERED.value, TaskState.PR_OPENED.value,
dt_to_db(now), TaskState.REVIEWING.value,
), TaskState.DISCOVERED.value,
).fetchone() dt_to_db(now),
if row is None: ),
return None ).fetchone()
task = self._task(row) if row is None:
self.conn.execute( self.conn.commit()
""" return None
UPDATE tasks task = self._task(row)
SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ? self.conn.execute(
WHERE id = ? """
""", UPDATE tasks
( SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ?
TaskState.CLAIMED.value, WHERE id = ?
worker_id, """,
dt_to_db(expires), (
dt_to_db(now), TaskState.CLAIMED.value,
task.id, worker_id,
), dt_to_db(expires),
) dt_to_db(now),
self.conn.commit() task.id,
),
)
self.conn.commit()
except Exception:
self.conn.rollback()
raise
self.add_event(task.id, task.state, TaskState.CLAIMED, f"claimed by {worker_id}") self.add_event(task.id, task.state, TaskState.CLAIMED, f"claimed by {worker_id}")
claimed = self.get_task(task.id) claimed = self.get_task(task.id)
assert claimed is not None assert claimed is not None
return claimed return claimed
def renew_task_lease(self, task_id: int, worker_id: str, lease_seconds: int) -> bool:
now = utcnow()
expires = now + timedelta(seconds=lease_seconds)
placeholders = ",".join("?" for _ in ACTIVE_STATES)
cursor = self.conn.execute(
f"""
UPDATE tasks
SET lease_expires_at = ?, updated_at = ?
WHERE id = ?
AND lease_owner = ?
AND state IN ({placeholders})
""",
(
dt_to_db(expires),
dt_to_db(now),
task_id,
worker_id,
*[state.value for state in ACTIVE_STATES],
),
)
self.conn.commit()
return cursor.rowcount == 1
def transition( def transition(
self, self,
task_id: int, task_id: int,

View File

@@ -38,6 +38,17 @@ class GiteaComment:
updated_at: datetime | None updated_at: datetime | None
@dataclass(frozen=True)
class GiteaPullReview:
id: int
body: str
author: str
state: str
html_url: str
submitted_at: datetime | None
updated_at: datetime | None
@dataclass(frozen=True) @dataclass(frozen=True)
class GiteaRepository: class GiteaRepository:
owner: str owner: str
@@ -178,6 +189,55 @@ class GiteaClient:
response.raise_for_status() response.raise_for_status()
return comment_from_payload(response.json()) return comment_from_payload(response.json())
def list_pull_request_reviews(self, *, owner: str, name: str, pr_number: int) -> list[GiteaPullReview]:
reviews: list[GiteaPullReview] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return reviews
response.raise_for_status()
payload = response.json()
if not payload:
break
reviews.extend(pull_review_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return reviews
def list_pull_request_review_comments(
self,
*,
owner: str,
name: str,
pr_number: int,
review_id: int,
) -> list[GiteaComment]:
comments: list[GiteaComment] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/comments",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return comments
response.raise_for_status()
payload = response.json()
if not payload:
break
comments.extend(review_comment_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return comments
def close_issue(self, *, owner: str, name: str, issue_number: int) -> None: def close_issue(self, *, owner: str, name: str, issue_number: int) -> None:
response = self.client.patch( response = self.client.patch(
f"/repos/{owner}/{name}/issues/{issue_number}", f"/repos/{owner}/{name}/issues/{issue_number}",
@@ -225,6 +285,20 @@ def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest:
) )
def pull_review_from_payload(payload: dict[str, Any]) -> GiteaPullReview:
user_payload = payload.get("user") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
return GiteaPullReview(
id=int(payload["id"]),
body=payload.get("body") or "",
author=str(author),
state=payload.get("state") or "",
html_url=payload.get("html_url") or payload.get("url") or "",
submitted_at=parse_gitea_dt(payload.get("submitted_at")),
updated_at=parse_gitea_dt(payload.get("updated_at")),
)
def comment_from_payload(payload: dict[str, Any]) -> GiteaComment: def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
user_payload = payload.get("user") or payload.get("poster") or {} user_payload = payload.get("user") or payload.get("poster") or {}
author = user_payload.get("login") or user_payload.get("username") or "" author = user_payload.get("login") or user_payload.get("username") or ""
@@ -238,6 +312,29 @@ def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
) )
def review_comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
comment = comment_from_payload(payload)
path = payload.get("path")
position = payload.get("position") or payload.get("original_position")
location_parts = []
if path:
location_parts.append(str(path))
if position:
location_parts.append(f"line {position}")
if not location_parts:
return comment
location = ":".join(location_parts)
body = f"Inline comment on {location}\n\n{comment.body}"
return GiteaComment(
id=comment.id,
body=body,
author=comment.author,
html_url=comment.html_url,
created_at=comment.created_at,
updated_at=comment.updated_at,
)
def parse_gitea_dt(value: str | None) -> datetime | None: def parse_gitea_dt(value: str | None) -> datetime | None:
if not value: if not value:
return None return None

View File

@@ -3,13 +3,15 @@ from __future__ import annotations
import socket import socket
import time import time
import logging import logging
import threading
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from .agents import CommandRunner, read_report, render_command, write_prompt from .agents import CommandRunner, read_report, render_command, write_prompt
from .config import AppConfig from .config import AppConfig
from .db import Database from .db import Database, PullRequestFeedbackCursor
from .gitea import GiteaClient, GiteaComment from .gitea import GiteaClient, GiteaComment, GiteaPullReview
from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState from .models import ACTIVE_STATES, IssueRecord, RepositoryRecord, TaskRecord, TaskState
from .rendering import ( from .rendering import (
parse_review_report, parse_review_report,
render_human_review_summary, render_human_review_summary,
@@ -26,6 +28,53 @@ from .workspace import WorkspaceManager, safe_branch_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class PullRequestFeedbackSnapshot:
human_comments: list[GiteaComment]
handled_cursor: PullRequestFeedbackCursor
newest_cursor: PullRequestFeedbackCursor
class TaskLeaseRenewer:
def __init__(
self,
*,
db_path: Path,
task_id: int,
worker_id: str,
lease_seconds: int,
interval_seconds: int,
):
self.db_path = db_path
self.task_id = task_id
self.worker_id = worker_id
self.lease_seconds = lease_seconds
self.interval_seconds = interval_seconds
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, name=f"lease-renewer-{task_id}", daemon=True)
def __enter__(self) -> TaskLeaseRenewer:
self._thread.start()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self._stop.set()
self._thread.join(timeout=max(1, self.interval_seconds))
def _run(self) -> None:
database = Database(self.db_path)
try:
while not self._stop.wait(self.interval_seconds):
renewed = database.renew_task_lease(self.task_id, self.worker_id, self.lease_seconds)
if not renewed:
logger.warning("stopping lease renewal for task %d; lease no longer belongs to %s", self.task_id, self.worker_id)
return
except Exception:
logger.exception("lease renewal failed for task %d", self.task_id)
finally:
database.close()
def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]: def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]:
synced: list[RepositoryRecord] = [] synced: list[RepositoryRecord] = []
discovered = client.list_owned_repositories() discovered = client.list_owned_repositories()
@@ -84,12 +133,12 @@ def close_issues_for_merged_pull_requests(db: Database, client: GiteaClient) ->
) )
client.close_issue(owner=repo.owner, name=repo.name, issue_number=issue.issue_number) client.close_issue(owner=repo.owner, name=repo.name, issue_number=issue.issue_number)
db.update_issue_state(task.repo_id, task.issue_number, "closed") db.update_issue_state(task.repo_id, task.issue_number, "closed")
db.add_event( message = f"closed issue #{issue.issue_number} after merged PR #{task.pr_number}"
task.id, if task.state in ACTIVE_STATES:
task.state, db.clear_pr_feedback_pending(task.id)
task.state, db.transition(task.id, TaskState.CANCELLED, message=message, clear_lease=True)
f"closed issue #{issue.issue_number} after merged PR #{task.pr_number}", else:
) db.add_event(task.id, task.state, task.state, message)
closed += 1 closed += 1
return closed return closed
@@ -106,23 +155,21 @@ def scan_pull_request_feedback(db: Database, client: GiteaClient) -> int:
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number) pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
if pull_request.merged or pull_request.state != "open": if pull_request.merged or pull_request.state != "open":
continue continue
comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number) cursors = db.get_pr_feedback_cursors(task.id)
last_seen_comment_id = db.get_pr_feedback_cursor(task.id) feedback = load_new_human_pr_feedback(client, repo, task.pr_number, username, cursors)
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id) if not feedback.human_comments:
human_comments = [ db.upsert_pr_feedback_state(
comment task.id,
for comment in comments last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold() last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
] last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
if not human_comments: )
if newest_comment_id > last_seen_comment_id:
db.upsert_pr_feedback_state(task.id, last_seen_comment_id=newest_comment_id)
continue continue
db.mark_pr_feedback_pending(task.id) db.mark_pr_feedback_pending(task.id)
db.transition( db.transition(
task.id, task.id,
TaskState.DISCOVERED, TaskState.DISCOVERED,
message=f"queued PR feedback from {len(human_comments)} human comment(s)", message=f"queued PR feedback from {len(feedback.human_comments)} human comment(s)",
clear_lease=True, clear_lease=True,
) )
queued += 1 queued += 1
@@ -256,6 +303,7 @@ class TaskRunner:
message=f"human review summary posted with verdict {review.verdict}", message=f"human review summary posted with verdict {review.verdict}",
clear_lease=True, clear_lease=True,
) )
self._cleanup_workspace_on_success(final_task, workspace)
return final_task return final_task
def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord: def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord:
@@ -273,43 +321,61 @@ class TaskRunner:
message=f"running implementer agent for PR #{task.pr_number} feedback", message=f"running implementer agent for PR #{task.pr_number} feedback",
workspace_path=workspace, workspace_path=workspace,
) )
last_seen_comment_id = self.db.get_pr_feedback_cursor(task.id) cursors = self.db.get_pr_feedback_cursors(task.id)
comments = self.gitea.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
username = self.gitea.get_authenticated_username() username = self.gitea.get_authenticated_username()
human_comments = [ feedback = load_new_human_pr_feedback(self.gitea, repo, task.pr_number, username, cursors)
comment if not feedback.human_comments:
for comment in comments self.db.clear_pr_feedback_pending(
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold() task.id,
] last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
if not human_comments: last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id) last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id) )
return self.db.transition( final_task = self.db.transition(
task.id, task.id,
TaskState.HUMAN_REVIEW_READY, TaskState.HUMAN_REVIEW_READY,
message="PR feedback queue was stale; no new human comments found", message="PR feedback queue was stale; no new human comments found",
clear_lease=True, clear_lease=True,
) )
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
implementation_report = self._run_pr_feedback_implementer( implementation_report = self._run_pr_feedback_implementer(
task, task,
repo, repo,
issue, issue,
task.branch_name, task.branch_name,
task.pr_number, task.pr_number,
human_comments, feedback.human_comments,
workspace, workspace,
) )
task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff") task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff")
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}" has_code_changes = self.workspace_manager.has_uncommitted_changes(workspace)
commit_id = self.workspace_manager.commit_changes(workspace, commit_message) if has_code_changes:
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}") commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
self.workspace_manager.push_branch(workspace, task.branch_name) commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
self.workspace_manager.push_branch(workspace, task.branch_name)
self.gitea.post_issue_comment( self.gitea.post_issue_comment(
owner=repo.owner, owner=repo.owner,
name=repo.name, name=repo.name,
issue_number=task.pr_number, issue_number=task.pr_number,
body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}", body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}",
) )
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=feedback.handled_cursor.last_seen_comment_id,
last_seen_review_id=feedback.handled_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.handled_cursor.last_seen_review_comment_id,
)
if not has_code_changes:
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message="PR feedback handled without code changes",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}") task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}")
validate_transition(task.state, TaskState.REVIEWING) validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback") task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback")
@@ -327,17 +393,20 @@ class TaskRunner:
issue_number=task.pr_number, issue_number=task.pr_number,
body=render_human_review_summary(review), body=render_human_review_summary(review),
) )
newest_comment_id = max( self.db.add_event(
[comment.id for comment in comments] + [reviewer_comment.id, summary_comment.id], task.id,
default=max(reviewer_comment.id, summary_comment.id), TaskState.REVIEWING,
TaskState.REVIEWING,
f"posted reviewer comments {reviewer_comment.id} and {summary_comment.id}",
) )
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id) final_task = self.db.transition(
return self.db.transition(
task.id, task.id,
TaskState.HUMAN_REVIEW_READY, TaskState.HUMAN_REVIEW_READY,
message=f"PR feedback handled; human review summary posted with verdict {review.verdict}", message=f"PR feedback handled; human review summary posted with verdict {review.verdict}",
clear_lease=True, clear_lease=True,
) )
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
def _run_implementer( def _run_implementer(
self, self,
@@ -360,7 +429,7 @@ class TaskRunner:
issue_title=issue.title, issue_title=issue.title,
branch_name=branch_name, branch_name=branch_name,
) )
result = self.command_runner.run(command, workspace, stdin=prompt) result = self._run_agent_command(task, command, workspace, prompt)
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md") report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
self.db.add_agent_run( self.db.add_agent_run(
task_id=task.id, task_id=task.id,
@@ -400,7 +469,7 @@ class TaskRunner:
pr_number=pr_number, pr_number=pr_number,
branch_name=branch_name, branch_name=branch_name,
) )
result = self.command_runner.run(command, workspace, stdin=prompt) result = self._run_agent_command(task, command, workspace, prompt)
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md") report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
self.db.add_agent_run( self.db.add_agent_run(
task_id=task.id, task_id=task.id,
@@ -437,7 +506,7 @@ class TaskRunner:
issue_title=issue.title, issue_title=issue.title,
pr_number=pr_number, pr_number=pr_number,
) )
result = self.command_runner.run(command, workspace, stdin=prompt) result = self._run_agent_command(task, command, workspace, prompt)
report = read_report(output_dir / "AGENT_REVIEW_REPORT.md") report = read_report(output_dir / "AGENT_REVIEW_REPORT.md")
self.db.add_agent_run( self.db.add_agent_run(
task_id=task.id, task_id=task.id,
@@ -453,9 +522,35 @@ class TaskRunner:
raise RuntimeError(f"reviewer failed with exit code {result.exit_code}") raise RuntimeError(f"reviewer failed with exit code {result.exit_code}")
return report return report
def _run_agent_command(self, task: TaskRecord, command: list[str], workspace: Path, prompt: str) -> AgentResult:
with TaskLeaseRenewer(
db_path=self.db.path,
task_id=task.id,
worker_id=self.worker_id,
lease_seconds=self.config.scheduler.lease_seconds,
interval_seconds=self.config.scheduler.effective_lease_renewal_interval_seconds,
):
return self.command_runner.run(
command,
workspace,
stdin=prompt,
timeout_seconds=self.config.scheduler.agent_timeout_seconds,
)
def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]: def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
return load_task_context(self.db, task) return load_task_context(self.db, task)
def _cleanup_workspace_on_success(self, task: TaskRecord, workspace: Path) -> None:
if not self.config.workspace.cleanup_on_success:
return
try:
self.workspace_manager.cleanup(workspace)
except Exception as exc: # pragma: no cover - defensive best-effort cleanup
logger.warning("failed to cleanup workspace %s for task %d: %s", workspace, task.id, exc)
self.db.add_event(task.id, task.state, task.state, f"workspace cleanup failed: {exc}")
else:
self.db.add_event(task.id, task.state, task.state, f"cleaned workspace {workspace}")
def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]: def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone() repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone()
@@ -466,3 +561,72 @@ def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord,
if issue is None: if issue is None:
raise ValueError(f"issue not found for task {task.id}") raise ValueError(f"issue not found for task {task.id}")
return repo, issue return repo, issue
def load_new_human_pr_feedback(
client: GiteaClient,
repo: RepositoryRecord,
pr_number: int,
username: str,
cursors: PullRequestFeedbackCursor,
) -> PullRequestFeedbackSnapshot:
issue_comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=pr_number)
reviews = client.list_pull_request_reviews(owner=repo.owner, name=repo.name, pr_number=pr_number)
review_comments: list[GiteaComment] = []
for review in reviews:
review_comments.extend(
client.list_pull_request_review_comments(
owner=repo.owner,
name=repo.name,
pr_number=pr_number,
review_id=review.id,
)
)
bot = username.casefold()
human_issue_comments = [
comment
for comment in issue_comments
if comment.id > cursors.last_seen_comment_id and comment.author.casefold() != bot
]
human_reviews = [
review_as_comment(review)
for review in reviews
if review.id > cursors.last_seen_review_id and review.author.casefold() != bot and review.body.strip()
]
human_review_comments = [
comment
for comment in review_comments
if comment.id > cursors.last_seen_review_comment_id and comment.author.casefold() != bot
]
human_comments = [*human_issue_comments, *human_reviews, *human_review_comments]
handled_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in human_issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews if review.id > cursors.last_seen_review_id), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in human_review_comments),
default=cursors.last_seen_review_comment_id,
),
)
newest_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in review_comments),
default=cursors.last_seen_review_comment_id,
),
)
return PullRequestFeedbackSnapshot(human_comments, handled_cursor, newest_cursor)
def review_as_comment(review: GiteaPullReview) -> GiteaComment:
body = f"Pull request review ({review.state or 'COMMENT'})\n\n{review.body}"
return GiteaComment(
id=review.id,
body=body,
author=review.author,
html_url=review.html_url,
created_at=review.submitted_at,
updated_at=review.updated_at,
)

View File

@@ -8,7 +8,13 @@ ALLOWED_TRANSITIONS: dict[TaskState, set[TaskState]] = {
TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.TESTING: {TaskState.PR_OPENED, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED}, TaskState.TESTING: {
TaskState.PR_OPENED,
TaskState.HUMAN_REVIEW_READY,
TaskState.FAILED,
TaskState.BLOCKED,
TaskState.CANCELLED,
},
TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED}, TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED},
TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED}, TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED},
TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED}, TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED},

View File

@@ -77,6 +77,10 @@ class WorkspaceManager:
diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False) diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False)
return diff.returncode == 1 return diff.returncode == 1
def has_uncommitted_changes(self, workspace: str | Path) -> bool:
result = self._git(["status", "--porcelain"], Path(workspace), check=False)
return bool(result.stdout.strip())
def commit_changes(self, workspace: str | Path, message: str) -> str: def commit_changes(self, workspace: str | Path, message: str) -> str:
workspace_path = Path(workspace) workspace_path = Path(workspace)
self._git(["add", "-A"], workspace_path) self._git(["add", "-A"], workspace_path)

View File

@@ -1,11 +1,16 @@
from __future__ import annotations from __future__ import annotations
import json import json
import sys
import threading
import time
from pathlib import Path from pathlib import Path
import httpx import httpx
from agent_gitea.agents import CommandRunner
from agent_gitea.config import AppConfig from agent_gitea.config import AppConfig
from agent_gitea.db import Database
from agent_gitea.gitea import GiteaClient from agent_gitea.gitea import GiteaClient
from agent_gitea.models import AgentResult, TaskState from agent_gitea.models import AgentResult, TaskState
from agent_gitea.service import ( from agent_gitea.service import (
@@ -174,6 +179,7 @@ class FakeWorkspaceManager:
self.diff = diff self.diff = diff
self.pushed: list[str] = [] self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = [] self.resumed: list[tuple[str, Path | None]] = []
self.cleaned: list[Path] = []
def prepare(self, repo, issue, branch_name): def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_") path = self.root / branch_name.replace("/", "_")
@@ -189,6 +195,9 @@ class FakeWorkspaceManager:
def has_diff(self, workspace, base_ref="origin/HEAD"): def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff return self.diff
def has_uncommitted_changes(self, workspace):
return self.diff
def push_branch(self, workspace, branch_name): def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name) self.pushed.append(branch_name)
@@ -197,14 +206,14 @@ class FakeWorkspaceManager:
return "abc1234" return "abc1234"
def cleanup(self, workspace): def cleanup(self, workspace):
pass self.cleaned.append(Path(workspace))
class FakeRunner: class FakeRunner:
def __init__(self, *, fail_role: str | None = None): def __init__(self, *, fail_role: str | None = None):
self.fail_role = fail_role self.fail_role = fail_role
def run(self, command, cwd, *, stdin=None): def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
role = command[0] role = command[0]
assert stdin assert stdin
if role == self.fail_role: if role == self.fail_role:
@@ -246,6 +255,126 @@ def seed_task(db):
return db.create_task(repo.id, 1) return db.create_task(repo.id, 1)
def test_claim_next_task_allows_only_one_worker_during_race(db, tmp_path):
seed_task(db)
db.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
db.conn.execute(
"""
CREATE TRIGGER slow_claim_update
BEFORE UPDATE OF state ON tasks
WHEN NEW.state = 'CLAIMED'
BEGIN
SELECT sleep_ms(150);
END
"""
)
db.conn.commit()
db.close()
barrier = threading.Barrier(2)
results: dict[str, int | None] = {}
errors: list[BaseException] = []
def claim(worker_id: str) -> None:
database = Database(tmp_path / "state.sqlite3")
database.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
try:
barrier.wait()
task = database.claim_next_task(worker_id, 60)
results[worker_id] = task.id if task else None
except BaseException as exc: # pragma: no cover - assertion below reports thread failures
errors.append(exc)
finally:
database.close()
threads = [threading.Thread(target=claim, args=(worker_id,)) for worker_id in ("worker-a", "worker-b")]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert errors == []
assert list(results.values()).count(1) == 1
assert list(results.values()).count(None) == 1
def test_command_runner_returns_timeout_result(tmp_path):
result = CommandRunner().run(
[sys.executable, "-c", "import time; time.sleep(5)"],
tmp_path,
timeout_seconds=0.1,
)
assert result.exit_code == 124
assert "timed out" in result.stderr
class ObservingSlowRunner:
def __init__(self, db, task_id: int):
self.db = db
self.task_id = task_id
self.lease_before = None
self.lease_during = None
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
role = command[0]
if role == "implementer":
self.lease_before = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
time.sleep(1.2)
self.lease_during = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
encoding="utf-8",
)
if role == "reviewer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_REVIEW_REPORT.md").write_text(
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
encoding="utf-8",
)
return AgentResult(exit_code=0, stdout="ok", stderr="")
def test_task_runner_renews_lease_while_agent_runs(db, tmp_path):
config = make_config(
tmp_path,
scheduler={
"interval_seconds": 1,
"concurrency": 1,
"lease_seconds": 60,
"lease_renewal_interval_seconds": 1,
},
)
task = seed_task(db)
runner = ObservingSlowRunner(db, task.id)
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/repos/acme/service/pulls":
return httpx.Response(201, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(201, json={"id": 1, "body": payload.get("body", ""), "user": {"login": "agent-bot"}})
return httpx.Response(404)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=runner,
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert runner.lease_before is not None
assert runner.lease_during is not None
assert runner.lease_during > runner.lease_before
def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None): def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None):
db.transition(task_id, TaskState.CLAIMED) db.transition(task_id, TaskState.CLAIMED)
db.transition(task_id, TaskState.PLANNING) db.transition(task_id, TaskState.PLANNING)
@@ -349,6 +478,55 @@ def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr] assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[])
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews":
return httpx.Response(
200,
json=[
{
"id": 9,
"body": "",
"state": "REQUEST_CHANGES",
"user": {"login": "alice"},
}
],
)
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments":
return httpx.Response(
200,
json=[
{
"id": 31,
"body": "This branch misses the cleanup case.",
"path": "src/service.py",
"position": 42,
"user": {"login": "alice"},
}
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
cursors = db.get_pr_feedback_cursors(task.id)
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert cursors.last_seen_comment_id == 2
assert cursors.last_seen_review_id == 0
assert cursors.last_seen_review_comment_id == 0
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path): def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path) config = make_config(tmp_path)
task = seed_task(db) task = seed_task(db)
@@ -360,7 +538,7 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id)) db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit() db.conn.commit()
requests: list[tuple[str, str, dict]] = [] requests: list[tuple[str, str, dict]] = []
next_comment_id = 3 next_comment_id = 4
def handler(request: httpx.Request) -> httpx.Response: def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id nonlocal next_comment_id
@@ -400,9 +578,127 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1" assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests) assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"] assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 6 assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id) assert not db.has_pending_pr_feedback(task.id)
def rescan_handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}},
{"id": 5, "body": "processed", "user": {"login": "agent-bot"}},
{"id": 6, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 7, "body": "summary", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(rescan_handler))
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.pushed == []
assert not hasattr(workspace_manager, "commit_message")
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path):
config = make_config(
tmp_path,
workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True},
)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please address the review.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.cleaned == [workspace_path]
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db): def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository( repo = db.upsert_repository(
@@ -485,6 +781,56 @@ def test_close_issues_for_merged_pull_requests_skips_unmerged_pr(db):
assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr] assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr]
def test_close_issues_for_merged_pull_requests_handles_queued_feedback_task(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
task = transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.mark_pr_feedback_pending(task.id)
db.transition(
task.id,
TaskState.DISCOVERED,
message="queued PR feedback from 1 human comment(s)",
clear_lease=True,
)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
return httpx.Response(201, json={"id": 1})
if request.url.path == "/api/v1/repos/acme/service/issues/1":
return httpx.Response(200, json={"number": 1, "state": "closed"})
return httpx.Response(404)
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
updated_task = db.get_task(task.id)
assert closed == 1
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
assert updated_task is not None
assert updated_task.state == TaskState.CANCELLED
assert not db.has_pending_pr_feedback(task.id)
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
def test_run_task_no_diff_becomes_blocked(db, tmp_path): def test_run_task_no_diff_becomes_blocked(db, tmp_path):
config = make_config(tmp_path) config = make_config(tmp_path)
seed_task(db) seed_task(db)