Compare commits
6 Commits
e6162e4d12
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 17d723ecca | |||
| 9fc8c14445 | |||
| 2ae22b3492 | |||
| 3c624cc46d | |||
| 70a17d6675 | |||
| 6262d823d8 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,4 +1,5 @@
|
||||
.agent-gitea/
|
||||
.agent-output/
|
||||
.env
|
||||
.pytest_cache/
|
||||
.ruff_cache/
|
||||
|
||||
@@ -7,8 +7,29 @@ from .models import AgentResult
|
||||
|
||||
|
||||
class CommandRunner:
|
||||
def run(self, command: list[str], cwd: str | Path, *, stdin: str | None = None) -> AgentResult:
|
||||
result = subprocess.run(command, cwd=cwd, input=stdin, text=True, capture_output=True, check=False)
|
||||
def run(
|
||||
self,
|
||||
command: list[str],
|
||||
cwd: str | Path,
|
||||
*,
|
||||
stdin: str | None = None,
|
||||
timeout_seconds: int | float | None = None,
|
||||
) -> AgentResult:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
command,
|
||||
cwd=cwd,
|
||||
input=stdin,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
timeout=timeout_seconds,
|
||||
)
|
||||
except subprocess.TimeoutExpired as exc:
|
||||
stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode(errors="replace")
|
||||
stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode(errors="replace")
|
||||
message = f"command timed out after {timeout_seconds} seconds"
|
||||
return AgentResult(exit_code=124, stdout=stdout or "", stderr=f"{stderr}\n{message}".strip())
|
||||
return AgentResult(exit_code=result.returncode, stdout=result.stdout, stderr=result.stderr)
|
||||
|
||||
|
||||
|
||||
@@ -24,6 +24,14 @@ class SchedulerConfig(BaseModel):
|
||||
interval_seconds: int = Field(default=60, ge=1)
|
||||
concurrency: int = Field(default=1, ge=1)
|
||||
lease_seconds: int = Field(default=1800, ge=30)
|
||||
lease_renewal_interval_seconds: int | None = Field(default=None, ge=1)
|
||||
agent_timeout_seconds: int = Field(default=7200, ge=1)
|
||||
|
||||
@property
|
||||
def effective_lease_renewal_interval_seconds(self) -> int:
|
||||
if self.lease_renewal_interval_seconds is not None:
|
||||
return self.lease_renewal_interval_seconds
|
||||
return max(1, self.lease_seconds // 3)
|
||||
|
||||
|
||||
class WorkspaceConfig(BaseModel):
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
@@ -87,12 +88,21 @@ CREATE TABLE IF NOT EXISTS agent_runs (
|
||||
CREATE TABLE IF NOT EXISTS pull_request_feedback (
|
||||
task_id INTEGER PRIMARY KEY REFERENCES tasks(id),
|
||||
last_seen_comment_id INTEGER NOT NULL DEFAULT 0,
|
||||
last_seen_review_id INTEGER NOT NULL DEFAULT 0,
|
||||
last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0,
|
||||
pending INTEGER NOT NULL DEFAULT 0,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PullRequestFeedbackCursor:
|
||||
last_seen_comment_id: int = 0
|
||||
last_seen_review_id: int = 0
|
||||
last_seen_review_comment_id: int = 0
|
||||
|
||||
|
||||
class Database:
|
||||
def __init__(self, path: str | Path):
|
||||
self.path = Path(path)
|
||||
@@ -106,8 +116,21 @@ class Database:
|
||||
|
||||
def migrate(self) -> None:
|
||||
self.conn.executescript(SCHEMA)
|
||||
self._migrate_pull_request_feedback_columns()
|
||||
self.conn.commit()
|
||||
|
||||
def _migrate_pull_request_feedback_columns(self) -> None:
|
||||
rows = self.conn.execute("PRAGMA table_info(pull_request_feedback)").fetchall()
|
||||
columns = {row["name"] for row in rows}
|
||||
if "last_seen_review_id" not in columns:
|
||||
self.conn.execute(
|
||||
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_id INTEGER NOT NULL DEFAULT 0"
|
||||
)
|
||||
if "last_seen_review_comment_id" not in columns:
|
||||
self.conn.execute(
|
||||
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0"
|
||||
)
|
||||
|
||||
def upsert_repository(
|
||||
self,
|
||||
*,
|
||||
@@ -269,12 +292,23 @@ class Database:
|
||||
SELECT t.*
|
||||
FROM tasks t
|
||||
JOIN issues i ON i.repo_id = t.repo_id AND i.issue_number = t.issue_number
|
||||
WHERE t.state = ?
|
||||
WHERE t.state != ?
|
||||
AND t.pr_number IS NOT NULL
|
||||
AND i.state = 'open'
|
||||
AND (
|
||||
t.state IN (?, ?)
|
||||
OR t.lease_owner IS NULL
|
||||
OR t.lease_expires_at IS NULL
|
||||
OR t.lease_expires_at < ?
|
||||
)
|
||||
ORDER BY t.id
|
||||
""",
|
||||
(TaskState.HUMAN_REVIEW_READY.value,),
|
||||
(
|
||||
TaskState.CANCELLED.value,
|
||||
TaskState.HUMAN_REVIEW_READY.value,
|
||||
TaskState.FAILED.value,
|
||||
dt_to_db(utcnow()),
|
||||
),
|
||||
).fetchall()
|
||||
return [self._task(row) for row in rows]
|
||||
|
||||
@@ -298,44 +332,104 @@ class Database:
|
||||
).fetchone()
|
||||
return int(row["last_seen_comment_id"]) if row else 0
|
||||
|
||||
def get_pr_feedback_cursors(self, task_id: int) -> PullRequestFeedbackCursor:
|
||||
row = self.conn.execute(
|
||||
"""
|
||||
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id
|
||||
FROM pull_request_feedback
|
||||
WHERE task_id = ?
|
||||
""",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return PullRequestFeedbackCursor()
|
||||
return PullRequestFeedbackCursor(
|
||||
last_seen_comment_id=int(row["last_seen_comment_id"]),
|
||||
last_seen_review_id=int(row["last_seen_review_id"]),
|
||||
last_seen_review_comment_id=int(row["last_seen_review_comment_id"]),
|
||||
)
|
||||
|
||||
def upsert_pr_feedback_state(
|
||||
self,
|
||||
task_id: int,
|
||||
*,
|
||||
last_seen_comment_id: int | None = None,
|
||||
last_seen_review_id: int | None = None,
|
||||
last_seen_review_comment_id: int | None = None,
|
||||
pending: bool | None = None,
|
||||
) -> None:
|
||||
row = self.conn.execute(
|
||||
"SELECT last_seen_comment_id, pending FROM pull_request_feedback WHERE task_id = ?",
|
||||
"""
|
||||
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id, pending
|
||||
FROM pull_request_feedback
|
||||
WHERE task_id = ?
|
||||
""",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
now = dt_to_db(utcnow())
|
||||
if row is None:
|
||||
self.conn.execute(
|
||||
"""
|
||||
INSERT INTO pull_request_feedback (task_id, last_seen_comment_id, pending, updated_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
INSERT INTO pull_request_feedback (
|
||||
task_id,
|
||||
last_seen_comment_id,
|
||||
last_seen_review_id,
|
||||
last_seen_review_comment_id,
|
||||
pending,
|
||||
updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(task_id, last_seen_comment_id or 0, int(bool(pending)), now),
|
||||
(
|
||||
task_id,
|
||||
last_seen_comment_id or 0,
|
||||
last_seen_review_id or 0,
|
||||
last_seen_review_comment_id or 0,
|
||||
int(bool(pending)),
|
||||
now,
|
||||
),
|
||||
)
|
||||
else:
|
||||
next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id
|
||||
next_review_cursor = int(row["last_seen_review_id"]) if last_seen_review_id is None else last_seen_review_id
|
||||
next_review_comment_cursor = (
|
||||
int(row["last_seen_review_comment_id"])
|
||||
if last_seen_review_comment_id is None
|
||||
else last_seen_review_comment_id
|
||||
)
|
||||
next_pending = bool(row["pending"]) if pending is None else pending
|
||||
self.conn.execute(
|
||||
"""
|
||||
UPDATE pull_request_feedback
|
||||
SET last_seen_comment_id = ?, pending = ?, updated_at = ?
|
||||
SET last_seen_comment_id = ?,
|
||||
last_seen_review_id = ?,
|
||||
last_seen_review_comment_id = ?,
|
||||
pending = ?,
|
||||
updated_at = ?
|
||||
WHERE task_id = ?
|
||||
""",
|
||||
(next_cursor, int(next_pending), now, task_id),
|
||||
(next_cursor, next_review_cursor, next_review_comment_cursor, int(next_pending), now, task_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def mark_pr_feedback_pending(self, task_id: int) -> None:
|
||||
self.upsert_pr_feedback_state(task_id, pending=True)
|
||||
|
||||
def clear_pr_feedback_pending(self, task_id: int, *, last_seen_comment_id: int) -> None:
|
||||
self.upsert_pr_feedback_state(task_id, last_seen_comment_id=last_seen_comment_id, pending=False)
|
||||
def clear_pr_feedback_pending(
|
||||
self,
|
||||
task_id: int,
|
||||
*,
|
||||
last_seen_comment_id: int | None = None,
|
||||
last_seen_review_id: int | None = None,
|
||||
last_seen_review_comment_id: int | None = None,
|
||||
) -> None:
|
||||
self.upsert_pr_feedback_state(
|
||||
task_id,
|
||||
last_seen_comment_id=last_seen_comment_id,
|
||||
last_seen_review_id=last_seen_review_id,
|
||||
last_seen_review_comment_id=last_seen_review_comment_id,
|
||||
pending=False,
|
||||
)
|
||||
|
||||
def has_pending_pr_feedback(self, task_id: int) -> bool:
|
||||
row = self.conn.execute(
|
||||
@@ -347,49 +441,78 @@ class Database:
|
||||
def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None:
|
||||
now = utcnow()
|
||||
expires = now + timedelta(seconds=lease_seconds)
|
||||
row = self.conn.execute(
|
||||
"""
|
||||
SELECT * FROM tasks
|
||||
WHERE state = ?
|
||||
OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?)
|
||||
ORDER BY created_at
|
||||
LIMIT 1
|
||||
""",
|
||||
(
|
||||
TaskState.DISCOVERED.value,
|
||||
TaskState.CLAIMED.value,
|
||||
TaskState.PLANNING.value,
|
||||
TaskState.IMPLEMENTING.value,
|
||||
TaskState.TESTING.value,
|
||||
TaskState.PR_OPENED.value,
|
||||
TaskState.REVIEWING.value,
|
||||
TaskState.DISCOVERED.value,
|
||||
dt_to_db(now),
|
||||
),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
task = self._task(row)
|
||||
self.conn.execute(
|
||||
"""
|
||||
UPDATE tasks
|
||||
SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(
|
||||
TaskState.CLAIMED.value,
|
||||
worker_id,
|
||||
dt_to_db(expires),
|
||||
dt_to_db(now),
|
||||
task.id,
|
||||
),
|
||||
)
|
||||
self.conn.commit()
|
||||
try:
|
||||
self.conn.execute("BEGIN IMMEDIATE")
|
||||
row = self.conn.execute(
|
||||
"""
|
||||
SELECT * FROM tasks
|
||||
WHERE state = ?
|
||||
OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?)
|
||||
ORDER BY created_at
|
||||
LIMIT 1
|
||||
""",
|
||||
(
|
||||
TaskState.DISCOVERED.value,
|
||||
TaskState.CLAIMED.value,
|
||||
TaskState.PLANNING.value,
|
||||
TaskState.IMPLEMENTING.value,
|
||||
TaskState.TESTING.value,
|
||||
TaskState.PR_OPENED.value,
|
||||
TaskState.REVIEWING.value,
|
||||
TaskState.DISCOVERED.value,
|
||||
dt_to_db(now),
|
||||
),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
self.conn.commit()
|
||||
return None
|
||||
task = self._task(row)
|
||||
self.conn.execute(
|
||||
"""
|
||||
UPDATE tasks
|
||||
SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(
|
||||
TaskState.CLAIMED.value,
|
||||
worker_id,
|
||||
dt_to_db(expires),
|
||||
dt_to_db(now),
|
||||
task.id,
|
||||
),
|
||||
)
|
||||
self.conn.commit()
|
||||
except Exception:
|
||||
self.conn.rollback()
|
||||
raise
|
||||
self.add_event(task.id, task.state, TaskState.CLAIMED, f"claimed by {worker_id}")
|
||||
claimed = self.get_task(task.id)
|
||||
assert claimed is not None
|
||||
return claimed
|
||||
|
||||
def renew_task_lease(self, task_id: int, worker_id: str, lease_seconds: int) -> bool:
|
||||
now = utcnow()
|
||||
expires = now + timedelta(seconds=lease_seconds)
|
||||
placeholders = ",".join("?" for _ in ACTIVE_STATES)
|
||||
cursor = self.conn.execute(
|
||||
f"""
|
||||
UPDATE tasks
|
||||
SET lease_expires_at = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
AND lease_owner = ?
|
||||
AND state IN ({placeholders})
|
||||
""",
|
||||
(
|
||||
dt_to_db(expires),
|
||||
dt_to_db(now),
|
||||
task_id,
|
||||
worker_id,
|
||||
*[state.value for state in ACTIVE_STATES],
|
||||
),
|
||||
)
|
||||
self.conn.commit()
|
||||
return cursor.rowcount == 1
|
||||
|
||||
def transition(
|
||||
self,
|
||||
task_id: int,
|
||||
|
||||
@@ -38,6 +38,17 @@ class GiteaComment:
|
||||
updated_at: datetime | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GiteaPullReview:
|
||||
id: int
|
||||
body: str
|
||||
author: str
|
||||
state: str
|
||||
html_url: str
|
||||
submitted_at: datetime | None
|
||||
updated_at: datetime | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GiteaRepository:
|
||||
owner: str
|
||||
@@ -178,6 +189,55 @@ class GiteaClient:
|
||||
response.raise_for_status()
|
||||
return comment_from_payload(response.json())
|
||||
|
||||
def list_pull_request_reviews(self, *, owner: str, name: str, pr_number: int) -> list[GiteaPullReview]:
|
||||
reviews: list[GiteaPullReview] = []
|
||||
page = 1
|
||||
limit = 50
|
||||
while True:
|
||||
response = self.client.get(
|
||||
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews",
|
||||
params={"page": page, "limit": limit},
|
||||
)
|
||||
if response.status_code == 404:
|
||||
return reviews
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
if not payload:
|
||||
break
|
||||
reviews.extend(pull_review_from_payload(item) for item in payload)
|
||||
if len(payload) < limit:
|
||||
break
|
||||
page += 1
|
||||
return reviews
|
||||
|
||||
def list_pull_request_review_comments(
|
||||
self,
|
||||
*,
|
||||
owner: str,
|
||||
name: str,
|
||||
pr_number: int,
|
||||
review_id: int,
|
||||
) -> list[GiteaComment]:
|
||||
comments: list[GiteaComment] = []
|
||||
page = 1
|
||||
limit = 50
|
||||
while True:
|
||||
response = self.client.get(
|
||||
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/comments",
|
||||
params={"page": page, "limit": limit},
|
||||
)
|
||||
if response.status_code == 404:
|
||||
return comments
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
if not payload:
|
||||
break
|
||||
comments.extend(review_comment_from_payload(item) for item in payload)
|
||||
if len(payload) < limit:
|
||||
break
|
||||
page += 1
|
||||
return comments
|
||||
|
||||
def close_issue(self, *, owner: str, name: str, issue_number: int) -> None:
|
||||
response = self.client.patch(
|
||||
f"/repos/{owner}/{name}/issues/{issue_number}",
|
||||
@@ -225,6 +285,20 @@ def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest:
|
||||
)
|
||||
|
||||
|
||||
def pull_review_from_payload(payload: dict[str, Any]) -> GiteaPullReview:
|
||||
user_payload = payload.get("user") or {}
|
||||
author = user_payload.get("login") or user_payload.get("username") or ""
|
||||
return GiteaPullReview(
|
||||
id=int(payload["id"]),
|
||||
body=payload.get("body") or "",
|
||||
author=str(author),
|
||||
state=payload.get("state") or "",
|
||||
html_url=payload.get("html_url") or payload.get("url") or "",
|
||||
submitted_at=parse_gitea_dt(payload.get("submitted_at")),
|
||||
updated_at=parse_gitea_dt(payload.get("updated_at")),
|
||||
)
|
||||
|
||||
|
||||
def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
|
||||
user_payload = payload.get("user") or payload.get("poster") or {}
|
||||
author = user_payload.get("login") or user_payload.get("username") or ""
|
||||
@@ -238,6 +312,29 @@ def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
|
||||
)
|
||||
|
||||
|
||||
def review_comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
|
||||
comment = comment_from_payload(payload)
|
||||
path = payload.get("path")
|
||||
position = payload.get("position") or payload.get("original_position")
|
||||
location_parts = []
|
||||
if path:
|
||||
location_parts.append(str(path))
|
||||
if position:
|
||||
location_parts.append(f"line {position}")
|
||||
if not location_parts:
|
||||
return comment
|
||||
location = ":".join(location_parts)
|
||||
body = f"Inline comment on {location}\n\n{comment.body}"
|
||||
return GiteaComment(
|
||||
id=comment.id,
|
||||
body=body,
|
||||
author=comment.author,
|
||||
html_url=comment.html_url,
|
||||
created_at=comment.created_at,
|
||||
updated_at=comment.updated_at,
|
||||
)
|
||||
|
||||
|
||||
def parse_gitea_dt(value: str | None) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
|
||||
@@ -3,13 +3,15 @@ from __future__ import annotations
|
||||
import socket
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from .agents import CommandRunner, read_report, render_command, write_prompt
|
||||
from .config import AppConfig
|
||||
from .db import Database
|
||||
from .gitea import GiteaClient, GiteaComment
|
||||
from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState
|
||||
from .db import Database, PullRequestFeedbackCursor
|
||||
from .gitea import GiteaClient, GiteaComment, GiteaPullReview
|
||||
from .models import ACTIVE_STATES, IssueRecord, RepositoryRecord, TaskRecord, TaskState
|
||||
from .rendering import (
|
||||
parse_review_report,
|
||||
render_human_review_summary,
|
||||
@@ -26,6 +28,53 @@ from .workspace import WorkspaceManager, safe_branch_name
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PullRequestFeedbackSnapshot:
|
||||
human_comments: list[GiteaComment]
|
||||
handled_cursor: PullRequestFeedbackCursor
|
||||
newest_cursor: PullRequestFeedbackCursor
|
||||
|
||||
|
||||
class TaskLeaseRenewer:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
db_path: Path,
|
||||
task_id: int,
|
||||
worker_id: str,
|
||||
lease_seconds: int,
|
||||
interval_seconds: int,
|
||||
):
|
||||
self.db_path = db_path
|
||||
self.task_id = task_id
|
||||
self.worker_id = worker_id
|
||||
self.lease_seconds = lease_seconds
|
||||
self.interval_seconds = interval_seconds
|
||||
self._stop = threading.Event()
|
||||
self._thread = threading.Thread(target=self._run, name=f"lease-renewer-{task_id}", daemon=True)
|
||||
|
||||
def __enter__(self) -> TaskLeaseRenewer:
|
||||
self._thread.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
self._stop.set()
|
||||
self._thread.join(timeout=max(1, self.interval_seconds))
|
||||
|
||||
def _run(self) -> None:
|
||||
database = Database(self.db_path)
|
||||
try:
|
||||
while not self._stop.wait(self.interval_seconds):
|
||||
renewed = database.renew_task_lease(self.task_id, self.worker_id, self.lease_seconds)
|
||||
if not renewed:
|
||||
logger.warning("stopping lease renewal for task %d; lease no longer belongs to %s", self.task_id, self.worker_id)
|
||||
return
|
||||
except Exception:
|
||||
logger.exception("lease renewal failed for task %d", self.task_id)
|
||||
finally:
|
||||
database.close()
|
||||
|
||||
|
||||
def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]:
|
||||
synced: list[RepositoryRecord] = []
|
||||
discovered = client.list_owned_repositories()
|
||||
@@ -84,12 +133,12 @@ def close_issues_for_merged_pull_requests(db: Database, client: GiteaClient) ->
|
||||
)
|
||||
client.close_issue(owner=repo.owner, name=repo.name, issue_number=issue.issue_number)
|
||||
db.update_issue_state(task.repo_id, task.issue_number, "closed")
|
||||
db.add_event(
|
||||
task.id,
|
||||
task.state,
|
||||
task.state,
|
||||
f"closed issue #{issue.issue_number} after merged PR #{task.pr_number}",
|
||||
)
|
||||
message = f"closed issue #{issue.issue_number} after merged PR #{task.pr_number}"
|
||||
if task.state in ACTIVE_STATES:
|
||||
db.clear_pr_feedback_pending(task.id)
|
||||
db.transition(task.id, TaskState.CANCELLED, message=message, clear_lease=True)
|
||||
else:
|
||||
db.add_event(task.id, task.state, task.state, message)
|
||||
closed += 1
|
||||
return closed
|
||||
|
||||
@@ -106,23 +155,21 @@ def scan_pull_request_feedback(db: Database, client: GiteaClient) -> int:
|
||||
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
|
||||
if pull_request.merged or pull_request.state != "open":
|
||||
continue
|
||||
comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
|
||||
last_seen_comment_id = db.get_pr_feedback_cursor(task.id)
|
||||
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id)
|
||||
human_comments = [
|
||||
comment
|
||||
for comment in comments
|
||||
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold()
|
||||
]
|
||||
if not human_comments:
|
||||
if newest_comment_id > last_seen_comment_id:
|
||||
db.upsert_pr_feedback_state(task.id, last_seen_comment_id=newest_comment_id)
|
||||
cursors = db.get_pr_feedback_cursors(task.id)
|
||||
feedback = load_new_human_pr_feedback(client, repo, task.pr_number, username, cursors)
|
||||
if not feedback.human_comments:
|
||||
db.upsert_pr_feedback_state(
|
||||
task.id,
|
||||
last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
|
||||
last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
|
||||
last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
|
||||
)
|
||||
continue
|
||||
db.mark_pr_feedback_pending(task.id)
|
||||
db.transition(
|
||||
task.id,
|
||||
TaskState.DISCOVERED,
|
||||
message=f"queued PR feedback from {len(human_comments)} human comment(s)",
|
||||
message=f"queued PR feedback from {len(feedback.human_comments)} human comment(s)",
|
||||
clear_lease=True,
|
||||
)
|
||||
queued += 1
|
||||
@@ -256,6 +303,7 @@ class TaskRunner:
|
||||
message=f"human review summary posted with verdict {review.verdict}",
|
||||
clear_lease=True,
|
||||
)
|
||||
self._cleanup_workspace_on_success(final_task, workspace)
|
||||
return final_task
|
||||
|
||||
def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord:
|
||||
@@ -273,43 +321,61 @@ class TaskRunner:
|
||||
message=f"running implementer agent for PR #{task.pr_number} feedback",
|
||||
workspace_path=workspace,
|
||||
)
|
||||
last_seen_comment_id = self.db.get_pr_feedback_cursor(task.id)
|
||||
comments = self.gitea.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
|
||||
cursors = self.db.get_pr_feedback_cursors(task.id)
|
||||
username = self.gitea.get_authenticated_username()
|
||||
human_comments = [
|
||||
comment
|
||||
for comment in comments
|
||||
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold()
|
||||
]
|
||||
if not human_comments:
|
||||
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id)
|
||||
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id)
|
||||
return self.db.transition(
|
||||
feedback = load_new_human_pr_feedback(self.gitea, repo, task.pr_number, username, cursors)
|
||||
if not feedback.human_comments:
|
||||
self.db.clear_pr_feedback_pending(
|
||||
task.id,
|
||||
last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
|
||||
last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
|
||||
last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
|
||||
)
|
||||
final_task = self.db.transition(
|
||||
task.id,
|
||||
TaskState.HUMAN_REVIEW_READY,
|
||||
message="PR feedback queue was stale; no new human comments found",
|
||||
clear_lease=True,
|
||||
)
|
||||
self._cleanup_workspace_on_success(final_task, workspace)
|
||||
return final_task
|
||||
implementation_report = self._run_pr_feedback_implementer(
|
||||
task,
|
||||
repo,
|
||||
issue,
|
||||
task.branch_name,
|
||||
task.pr_number,
|
||||
human_comments,
|
||||
feedback.human_comments,
|
||||
workspace,
|
||||
)
|
||||
task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff")
|
||||
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
|
||||
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
|
||||
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
|
||||
self.workspace_manager.push_branch(workspace, task.branch_name)
|
||||
has_code_changes = self.workspace_manager.has_uncommitted_changes(workspace)
|
||||
if has_code_changes:
|
||||
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
|
||||
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
|
||||
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
|
||||
self.workspace_manager.push_branch(workspace, task.branch_name)
|
||||
self.gitea.post_issue_comment(
|
||||
owner=repo.owner,
|
||||
name=repo.name,
|
||||
issue_number=task.pr_number,
|
||||
body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}",
|
||||
)
|
||||
self.db.clear_pr_feedback_pending(
|
||||
task.id,
|
||||
last_seen_comment_id=feedback.handled_cursor.last_seen_comment_id,
|
||||
last_seen_review_id=feedback.handled_cursor.last_seen_review_id,
|
||||
last_seen_review_comment_id=feedback.handled_cursor.last_seen_review_comment_id,
|
||||
)
|
||||
if not has_code_changes:
|
||||
final_task = self.db.transition(
|
||||
task.id,
|
||||
TaskState.HUMAN_REVIEW_READY,
|
||||
message="PR feedback handled without code changes",
|
||||
clear_lease=True,
|
||||
)
|
||||
self._cleanup_workspace_on_success(final_task, workspace)
|
||||
return final_task
|
||||
task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}")
|
||||
validate_transition(task.state, TaskState.REVIEWING)
|
||||
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback")
|
||||
@@ -327,17 +393,20 @@ class TaskRunner:
|
||||
issue_number=task.pr_number,
|
||||
body=render_human_review_summary(review),
|
||||
)
|
||||
newest_comment_id = max(
|
||||
[comment.id for comment in comments] + [reviewer_comment.id, summary_comment.id],
|
||||
default=max(reviewer_comment.id, summary_comment.id),
|
||||
self.db.add_event(
|
||||
task.id,
|
||||
TaskState.REVIEWING,
|
||||
TaskState.REVIEWING,
|
||||
f"posted reviewer comments {reviewer_comment.id} and {summary_comment.id}",
|
||||
)
|
||||
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id)
|
||||
return self.db.transition(
|
||||
final_task = self.db.transition(
|
||||
task.id,
|
||||
TaskState.HUMAN_REVIEW_READY,
|
||||
message=f"PR feedback handled; human review summary posted with verdict {review.verdict}",
|
||||
clear_lease=True,
|
||||
)
|
||||
self._cleanup_workspace_on_success(final_task, workspace)
|
||||
return final_task
|
||||
|
||||
def _run_implementer(
|
||||
self,
|
||||
@@ -360,7 +429,7 @@ class TaskRunner:
|
||||
issue_title=issue.title,
|
||||
branch_name=branch_name,
|
||||
)
|
||||
result = self.command_runner.run(command, workspace, stdin=prompt)
|
||||
result = self._run_agent_command(task, command, workspace, prompt)
|
||||
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
|
||||
self.db.add_agent_run(
|
||||
task_id=task.id,
|
||||
@@ -400,7 +469,7 @@ class TaskRunner:
|
||||
pr_number=pr_number,
|
||||
branch_name=branch_name,
|
||||
)
|
||||
result = self.command_runner.run(command, workspace, stdin=prompt)
|
||||
result = self._run_agent_command(task, command, workspace, prompt)
|
||||
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
|
||||
self.db.add_agent_run(
|
||||
task_id=task.id,
|
||||
@@ -437,7 +506,7 @@ class TaskRunner:
|
||||
issue_title=issue.title,
|
||||
pr_number=pr_number,
|
||||
)
|
||||
result = self.command_runner.run(command, workspace, stdin=prompt)
|
||||
result = self._run_agent_command(task, command, workspace, prompt)
|
||||
report = read_report(output_dir / "AGENT_REVIEW_REPORT.md")
|
||||
self.db.add_agent_run(
|
||||
task_id=task.id,
|
||||
@@ -453,9 +522,35 @@ class TaskRunner:
|
||||
raise RuntimeError(f"reviewer failed with exit code {result.exit_code}")
|
||||
return report
|
||||
|
||||
def _run_agent_command(self, task: TaskRecord, command: list[str], workspace: Path, prompt: str) -> AgentResult:
|
||||
with TaskLeaseRenewer(
|
||||
db_path=self.db.path,
|
||||
task_id=task.id,
|
||||
worker_id=self.worker_id,
|
||||
lease_seconds=self.config.scheduler.lease_seconds,
|
||||
interval_seconds=self.config.scheduler.effective_lease_renewal_interval_seconds,
|
||||
):
|
||||
return self.command_runner.run(
|
||||
command,
|
||||
workspace,
|
||||
stdin=prompt,
|
||||
timeout_seconds=self.config.scheduler.agent_timeout_seconds,
|
||||
)
|
||||
|
||||
def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
|
||||
return load_task_context(self.db, task)
|
||||
|
||||
def _cleanup_workspace_on_success(self, task: TaskRecord, workspace: Path) -> None:
|
||||
if not self.config.workspace.cleanup_on_success:
|
||||
return
|
||||
try:
|
||||
self.workspace_manager.cleanup(workspace)
|
||||
except Exception as exc: # pragma: no cover - defensive best-effort cleanup
|
||||
logger.warning("failed to cleanup workspace %s for task %d: %s", workspace, task.id, exc)
|
||||
self.db.add_event(task.id, task.state, task.state, f"workspace cleanup failed: {exc}")
|
||||
else:
|
||||
self.db.add_event(task.id, task.state, task.state, f"cleaned workspace {workspace}")
|
||||
|
||||
|
||||
def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
|
||||
repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone()
|
||||
@@ -466,3 +561,72 @@ def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord,
|
||||
if issue is None:
|
||||
raise ValueError(f"issue not found for task {task.id}")
|
||||
return repo, issue
|
||||
|
||||
|
||||
def load_new_human_pr_feedback(
|
||||
client: GiteaClient,
|
||||
repo: RepositoryRecord,
|
||||
pr_number: int,
|
||||
username: str,
|
||||
cursors: PullRequestFeedbackCursor,
|
||||
) -> PullRequestFeedbackSnapshot:
|
||||
issue_comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=pr_number)
|
||||
reviews = client.list_pull_request_reviews(owner=repo.owner, name=repo.name, pr_number=pr_number)
|
||||
review_comments: list[GiteaComment] = []
|
||||
for review in reviews:
|
||||
review_comments.extend(
|
||||
client.list_pull_request_review_comments(
|
||||
owner=repo.owner,
|
||||
name=repo.name,
|
||||
pr_number=pr_number,
|
||||
review_id=review.id,
|
||||
)
|
||||
)
|
||||
|
||||
bot = username.casefold()
|
||||
human_issue_comments = [
|
||||
comment
|
||||
for comment in issue_comments
|
||||
if comment.id > cursors.last_seen_comment_id and comment.author.casefold() != bot
|
||||
]
|
||||
human_reviews = [
|
||||
review_as_comment(review)
|
||||
for review in reviews
|
||||
if review.id > cursors.last_seen_review_id and review.author.casefold() != bot and review.body.strip()
|
||||
]
|
||||
human_review_comments = [
|
||||
comment
|
||||
for comment in review_comments
|
||||
if comment.id > cursors.last_seen_review_comment_id and comment.author.casefold() != bot
|
||||
]
|
||||
human_comments = [*human_issue_comments, *human_reviews, *human_review_comments]
|
||||
|
||||
handled_cursor = PullRequestFeedbackCursor(
|
||||
last_seen_comment_id=max((comment.id for comment in human_issue_comments), default=cursors.last_seen_comment_id),
|
||||
last_seen_review_id=max((review.id for review in reviews if review.id > cursors.last_seen_review_id), default=cursors.last_seen_review_id),
|
||||
last_seen_review_comment_id=max(
|
||||
(comment.id for comment in human_review_comments),
|
||||
default=cursors.last_seen_review_comment_id,
|
||||
),
|
||||
)
|
||||
newest_cursor = PullRequestFeedbackCursor(
|
||||
last_seen_comment_id=max((comment.id for comment in issue_comments), default=cursors.last_seen_comment_id),
|
||||
last_seen_review_id=max((review.id for review in reviews), default=cursors.last_seen_review_id),
|
||||
last_seen_review_comment_id=max(
|
||||
(comment.id for comment in review_comments),
|
||||
default=cursors.last_seen_review_comment_id,
|
||||
),
|
||||
)
|
||||
return PullRequestFeedbackSnapshot(human_comments, handled_cursor, newest_cursor)
|
||||
|
||||
|
||||
def review_as_comment(review: GiteaPullReview) -> GiteaComment:
|
||||
body = f"Pull request review ({review.state or 'COMMENT'})\n\n{review.body}"
|
||||
return GiteaComment(
|
||||
id=review.id,
|
||||
body=body,
|
||||
author=review.author,
|
||||
html_url=review.html_url,
|
||||
created_at=review.submitted_at,
|
||||
updated_at=review.updated_at,
|
||||
)
|
||||
|
||||
@@ -8,7 +8,13 @@ ALLOWED_TRANSITIONS: dict[TaskState, set[TaskState]] = {
|
||||
TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
|
||||
TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
|
||||
TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
|
||||
TaskState.TESTING: {TaskState.PR_OPENED, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
|
||||
TaskState.TESTING: {
|
||||
TaskState.PR_OPENED,
|
||||
TaskState.HUMAN_REVIEW_READY,
|
||||
TaskState.FAILED,
|
||||
TaskState.BLOCKED,
|
||||
TaskState.CANCELLED,
|
||||
},
|
||||
TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED},
|
||||
TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED},
|
||||
TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED},
|
||||
|
||||
@@ -77,6 +77,10 @@ class WorkspaceManager:
|
||||
diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False)
|
||||
return diff.returncode == 1
|
||||
|
||||
def has_uncommitted_changes(self, workspace: str | Path) -> bool:
|
||||
result = self._git(["status", "--porcelain"], Path(workspace), check=False)
|
||||
return bool(result.stdout.strip())
|
||||
|
||||
def commit_changes(self, workspace: str | Path, message: str) -> str:
|
||||
workspace_path = Path(workspace)
|
||||
self._git(["add", "-A"], workspace_path)
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
from agent_gitea.agents import CommandRunner
|
||||
from agent_gitea.config import AppConfig
|
||||
from agent_gitea.db import Database
|
||||
from agent_gitea.gitea import GiteaClient
|
||||
from agent_gitea.models import AgentResult, TaskState
|
||||
from agent_gitea.service import (
|
||||
@@ -174,6 +179,7 @@ class FakeWorkspaceManager:
|
||||
self.diff = diff
|
||||
self.pushed: list[str] = []
|
||||
self.resumed: list[tuple[str, Path | None]] = []
|
||||
self.cleaned: list[Path] = []
|
||||
|
||||
def prepare(self, repo, issue, branch_name):
|
||||
path = self.root / branch_name.replace("/", "_")
|
||||
@@ -189,6 +195,9 @@ class FakeWorkspaceManager:
|
||||
def has_diff(self, workspace, base_ref="origin/HEAD"):
|
||||
return self.diff
|
||||
|
||||
def has_uncommitted_changes(self, workspace):
|
||||
return self.diff
|
||||
|
||||
def push_branch(self, workspace, branch_name):
|
||||
self.pushed.append(branch_name)
|
||||
|
||||
@@ -197,14 +206,14 @@ class FakeWorkspaceManager:
|
||||
return "abc1234"
|
||||
|
||||
def cleanup(self, workspace):
|
||||
pass
|
||||
self.cleaned.append(Path(workspace))
|
||||
|
||||
|
||||
class FakeRunner:
|
||||
def __init__(self, *, fail_role: str | None = None):
|
||||
self.fail_role = fail_role
|
||||
|
||||
def run(self, command, cwd, *, stdin=None):
|
||||
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
|
||||
role = command[0]
|
||||
assert stdin
|
||||
if role == self.fail_role:
|
||||
@@ -246,6 +255,126 @@ def seed_task(db):
|
||||
return db.create_task(repo.id, 1)
|
||||
|
||||
|
||||
def test_claim_next_task_allows_only_one_worker_during_race(db, tmp_path):
|
||||
seed_task(db)
|
||||
db.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
|
||||
db.conn.execute(
|
||||
"""
|
||||
CREATE TRIGGER slow_claim_update
|
||||
BEFORE UPDATE OF state ON tasks
|
||||
WHEN NEW.state = 'CLAIMED'
|
||||
BEGIN
|
||||
SELECT sleep_ms(150);
|
||||
END
|
||||
"""
|
||||
)
|
||||
db.conn.commit()
|
||||
db.close()
|
||||
|
||||
barrier = threading.Barrier(2)
|
||||
results: dict[str, int | None] = {}
|
||||
errors: list[BaseException] = []
|
||||
|
||||
def claim(worker_id: str) -> None:
|
||||
database = Database(tmp_path / "state.sqlite3")
|
||||
database.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
|
||||
try:
|
||||
barrier.wait()
|
||||
task = database.claim_next_task(worker_id, 60)
|
||||
results[worker_id] = task.id if task else None
|
||||
except BaseException as exc: # pragma: no cover - assertion below reports thread failures
|
||||
errors.append(exc)
|
||||
finally:
|
||||
database.close()
|
||||
|
||||
threads = [threading.Thread(target=claim, args=(worker_id,)) for worker_id in ("worker-a", "worker-b")]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
assert errors == []
|
||||
assert list(results.values()).count(1) == 1
|
||||
assert list(results.values()).count(None) == 1
|
||||
|
||||
|
||||
def test_command_runner_returns_timeout_result(tmp_path):
|
||||
result = CommandRunner().run(
|
||||
[sys.executable, "-c", "import time; time.sleep(5)"],
|
||||
tmp_path,
|
||||
timeout_seconds=0.1,
|
||||
)
|
||||
|
||||
assert result.exit_code == 124
|
||||
assert "timed out" in result.stderr
|
||||
|
||||
|
||||
class ObservingSlowRunner:
|
||||
def __init__(self, db, task_id: int):
|
||||
self.db = db
|
||||
self.task_id = task_id
|
||||
self.lease_before = None
|
||||
self.lease_during = None
|
||||
|
||||
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
|
||||
role = command[0]
|
||||
if role == "implementer":
|
||||
self.lease_before = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
|
||||
time.sleep(1.2)
|
||||
self.lease_during = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
|
||||
output_dir = Path(cwd) / ".agent-output"
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
(output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
|
||||
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
if role == "reviewer":
|
||||
output_dir = Path(cwd) / ".agent-output"
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
(output_dir / "AGENT_REVIEW_REPORT.md").write_text(
|
||||
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return AgentResult(exit_code=0, stdout="ok", stderr="")
|
||||
|
||||
|
||||
def test_task_runner_renews_lease_while_agent_runs(db, tmp_path):
|
||||
config = make_config(
|
||||
tmp_path,
|
||||
scheduler={
|
||||
"interval_seconds": 1,
|
||||
"concurrency": 1,
|
||||
"lease_seconds": 60,
|
||||
"lease_renewal_interval_seconds": 1,
|
||||
},
|
||||
)
|
||||
task = seed_task(db)
|
||||
runner = ObservingSlowRunner(db, task.id)
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
payload = json.loads(request.content.decode() or "{}")
|
||||
if request.url.path == "/api/v1/repos/acme/service/pulls":
|
||||
return httpx.Response(201, json={"number": 5, "state": "open", "merged": False})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
|
||||
return httpx.Response(201, json={"id": 1, "body": payload.get("body", ""), "user": {"login": "agent-bot"}})
|
||||
return httpx.Response(404)
|
||||
|
||||
finished = TaskRunner(
|
||||
db=db,
|
||||
config=config,
|
||||
gitea=make_client(handler),
|
||||
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
|
||||
command_runner=runner,
|
||||
worker_id="worker",
|
||||
).run_once()
|
||||
|
||||
assert finished is not None
|
||||
assert finished.state == TaskState.HUMAN_REVIEW_READY
|
||||
assert runner.lease_before is not None
|
||||
assert runner.lease_during is not None
|
||||
assert runner.lease_during > runner.lease_before
|
||||
|
||||
|
||||
def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None):
|
||||
db.transition(task_id, TaskState.CLAIMED)
|
||||
db.transition(task_id, TaskState.PLANNING)
|
||||
@@ -349,6 +478,55 @@ def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
|
||||
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
|
||||
|
||||
|
||||
def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db):
|
||||
task = seed_task(db)
|
||||
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
|
||||
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
if request.url.path == "/api/v1/user":
|
||||
return httpx.Response(200, json={"login": "agent-bot"})
|
||||
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
|
||||
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
|
||||
return httpx.Response(200, json=[])
|
||||
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[
|
||||
{
|
||||
"id": 9,
|
||||
"body": "",
|
||||
"state": "REQUEST_CHANGES",
|
||||
"user": {"login": "alice"},
|
||||
}
|
||||
],
|
||||
)
|
||||
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[
|
||||
{
|
||||
"id": 31,
|
||||
"body": "This branch misses the cleanup case.",
|
||||
"path": "src/service.py",
|
||||
"position": 42,
|
||||
"user": {"login": "alice"},
|
||||
}
|
||||
],
|
||||
)
|
||||
return httpx.Response(404)
|
||||
|
||||
queued = scan_pull_request_feedback(db, make_client(handler))
|
||||
cursors = db.get_pr_feedback_cursors(task.id)
|
||||
|
||||
assert queued == 1
|
||||
assert db.has_pending_pr_feedback(task.id)
|
||||
assert cursors.last_seen_comment_id == 2
|
||||
assert cursors.last_seen_review_id == 0
|
||||
assert cursors.last_seen_review_comment_id == 0
|
||||
|
||||
|
||||
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
|
||||
config = make_config(tmp_path)
|
||||
task = seed_task(db)
|
||||
@@ -360,7 +538,7 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
|
||||
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
|
||||
db.conn.commit()
|
||||
requests: list[tuple[str, str, dict]] = []
|
||||
next_comment_id = 3
|
||||
next_comment_id = 4
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
nonlocal next_comment_id
|
||||
@@ -400,9 +578,127 @@ def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
|
||||
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
|
||||
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
|
||||
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
|
||||
assert db.get_pr_feedback_cursor(task.id) == 6
|
||||
assert db.get_pr_feedback_cursor(task.id) == 3
|
||||
assert not db.has_pending_pr_feedback(task.id)
|
||||
|
||||
def rescan_handler(request: httpx.Request) -> httpx.Response:
|
||||
if request.url.path == "/api/v1/user":
|
||||
return httpx.Response(200, json={"login": "agent-bot"})
|
||||
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
|
||||
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[
|
||||
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
|
||||
{"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}},
|
||||
{"id": 5, "body": "processed", "user": {"login": "agent-bot"}},
|
||||
{"id": 6, "body": "review report", "user": {"login": "agent-bot"}},
|
||||
{"id": 7, "body": "summary", "user": {"login": "agent-bot"}},
|
||||
],
|
||||
)
|
||||
return httpx.Response(404)
|
||||
|
||||
queued = scan_pull_request_feedback(db, make_client(rescan_handler))
|
||||
|
||||
assert queued == 1
|
||||
assert db.has_pending_pr_feedback(task.id)
|
||||
assert db.get_pr_feedback_cursor(task.id) == 3
|
||||
|
||||
|
||||
def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path):
|
||||
config = make_config(tmp_path)
|
||||
task = seed_task(db)
|
||||
workspace_path = tmp_path / "work" / "existing"
|
||||
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
|
||||
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
|
||||
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
|
||||
db.mark_pr_feedback_pending(task.id)
|
||||
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
|
||||
db.conn.commit()
|
||||
requests: list[tuple[str, str, dict]] = []
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
payload = json.loads(request.content.decode() or "{}")
|
||||
requests.append((request.method, request.url.path, payload))
|
||||
if request.url.path == "/api/v1/user":
|
||||
return httpx.Response(200, json={"login": "agent-bot"})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[
|
||||
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
|
||||
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
|
||||
{"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}},
|
||||
],
|
||||
)
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
|
||||
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
|
||||
return httpx.Response(404)
|
||||
|
||||
workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False)
|
||||
finished = TaskRunner(
|
||||
db=db,
|
||||
config=config,
|
||||
gitea=make_client(handler),
|
||||
workspace_manager=workspace_manager,
|
||||
command_runner=FakeRunner(),
|
||||
worker_id="worker",
|
||||
).run_once()
|
||||
|
||||
assert finished is not None
|
||||
assert finished.state == TaskState.HUMAN_REVIEW_READY
|
||||
assert workspace_manager.pushed == []
|
||||
assert not hasattr(workspace_manager, "commit_message")
|
||||
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"]
|
||||
assert db.get_pr_feedback_cursor(task.id) == 3
|
||||
assert not db.has_pending_pr_feedback(task.id)
|
||||
|
||||
|
||||
def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path):
|
||||
config = make_config(
|
||||
tmp_path,
|
||||
workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True},
|
||||
)
|
||||
task = seed_task(db)
|
||||
workspace_path = tmp_path / "work" / "existing"
|
||||
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
|
||||
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
|
||||
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
|
||||
db.mark_pr_feedback_pending(task.id)
|
||||
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
|
||||
db.conn.commit()
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
payload = json.loads(request.content.decode() or "{}")
|
||||
if request.url.path == "/api/v1/user":
|
||||
return httpx.Response(200, json={"login": "agent-bot"})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[
|
||||
{"id": 3, "body": "Please address the review.", "user": {"login": "alice"}},
|
||||
],
|
||||
)
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
|
||||
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
|
||||
return httpx.Response(404)
|
||||
|
||||
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
|
||||
finished = TaskRunner(
|
||||
db=db,
|
||||
config=config,
|
||||
gitea=make_client(handler),
|
||||
workspace_manager=workspace_manager,
|
||||
command_runner=FakeRunner(),
|
||||
worker_id="worker",
|
||||
).run_once()
|
||||
|
||||
assert finished is not None
|
||||
assert finished.state == TaskState.HUMAN_REVIEW_READY
|
||||
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
|
||||
assert workspace_manager.cleaned == [workspace_path]
|
||||
|
||||
|
||||
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
|
||||
repo = db.upsert_repository(
|
||||
@@ -485,6 +781,56 @@ def test_close_issues_for_merged_pull_requests_skips_unmerged_pr(db):
|
||||
assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr]
|
||||
|
||||
|
||||
def test_close_issues_for_merged_pull_requests_handles_queued_feedback_task(db):
|
||||
repo = db.upsert_repository(
|
||||
owner="acme",
|
||||
name="service",
|
||||
clone_url="https://gitea.test/acme/service.git",
|
||||
default_branch="main",
|
||||
enabled=True,
|
||||
)
|
||||
db.upsert_issue(
|
||||
repo_id=repo.id,
|
||||
issue_number=1,
|
||||
title="Ready issue",
|
||||
body="Body",
|
||||
labels=["agent:ready"],
|
||||
state="open",
|
||||
html_url="https://gitea.test/acme/service/issues/1",
|
||||
)
|
||||
task = db.create_task(repo.id, 1)
|
||||
task = transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
|
||||
db.mark_pr_feedback_pending(task.id)
|
||||
db.transition(
|
||||
task.id,
|
||||
TaskState.DISCOVERED,
|
||||
message="queued PR feedback from 1 human comment(s)",
|
||||
clear_lease=True,
|
||||
)
|
||||
requests: list[tuple[str, str, dict]] = []
|
||||
|
||||
def handler(request: httpx.Request) -> httpx.Response:
|
||||
payload = json.loads(request.content.decode() or "{}")
|
||||
requests.append((request.method, request.url.path, payload))
|
||||
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
|
||||
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
|
||||
return httpx.Response(201, json={"id": 1})
|
||||
if request.url.path == "/api/v1/repos/acme/service/issues/1":
|
||||
return httpx.Response(200, json={"number": 1, "state": "closed"})
|
||||
return httpx.Response(404)
|
||||
|
||||
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
|
||||
|
||||
updated_task = db.get_task(task.id)
|
||||
assert closed == 1
|
||||
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
|
||||
assert updated_task is not None
|
||||
assert updated_task.state == TaskState.CANCELLED
|
||||
assert not db.has_pending_pr_feedback(task.id)
|
||||
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
|
||||
|
||||
|
||||
def test_run_task_no_diff_becomes_blocked(db, tmp_path):
|
||||
config = make_config(tmp_path)
|
||||
seed_task(db)
|
||||
|
||||
Reference in New Issue
Block a user