Compare commits

..

2 Commits

9 changed files with 596 additions and 76 deletions

View File

@@ -2,7 +2,7 @@
[中文说明](README_CN.md) [中文说明](README_CN.md)
Local, auditable CLI service that scans configured Gitea repositories for eligible issues, runs an implementer agent in an isolated workspace, opens a pull request, runs a reviewer agent, and posts a standardized human review summary. Local, auditable CLI service that scans configured Gitea repositories for eligible issues, runs an implementer agent in an isolated workspace, opens a pull request, runs a reviewer agent, and posts a standardized human review summary. After the PR is opened, it also watches PR comments from humans and can continue the same task on the existing branch to push follow-up commits.
The MVP never merges pull requests. The MVP never merges pull requests.
@@ -18,12 +18,13 @@ agent-gitea --config config.yaml run-once
``` ```
`sync-repos` discovers repositories owned by the authenticated Gitea user from `/user/repos`; repositories are not listed in the config file. `sync-repos` discovers repositories owned by the authenticated Gitea user from `/user/repos`; repositories are not listed in the config file.
`worker` continuously syncs repositories, scans issues, and processes eligible tasks. `worker` continuously syncs repositories, scans issues, scans open agent PRs for new human comments, and processes eligible tasks.
## Commands ## Commands
- `agent-gitea sync-repos` - `agent-gitea sync-repos`
- `agent-gitea scan-issues` - `agent-gitea scan-issues`
- `agent-gitea scan-pr-feedback`
- `agent-gitea run-once` - `agent-gitea run-once`
- `agent-gitea worker` - `agent-gitea worker`
- `agent-gitea show-task <task_id>` - `agent-gitea show-task <task_id>`

View File

@@ -2,7 +2,7 @@
[English](README.md) [English](README.md)
一个本地运行、可审计的 CLI 服务,用于扫描已配置 Gitea 仓库中的合格 issue在隔离工作区中运行实现 agent创建 pull request运行评审 agent并发布标准化的人工评审摘要。 一个本地运行、可审计的 CLI 服务,用于扫描已配置 Gitea 仓库中的合格 issue在隔离工作区中运行实现 agent创建 pull request运行评审 agent并发布标准化的人工评审摘要。PR 创建后,它还会监控来自 human 的 PR 评论,并在同一个任务、已有分支和 workspace 上继续处理,推送后续 commit。
MVP 版本不会合并 pull request。 MVP 版本不会合并 pull request。
@@ -18,12 +18,13 @@ agent-gitea --config config.yaml run-once
``` ```
`sync-repos` 会通过 `/user/repos` 发现已认证 Gitea 用户拥有的仓库;仓库不会列在配置文件中。 `sync-repos` 会通过 `/user/repos` 发现已认证 Gitea 用户拥有的仓库;仓库不会列在配置文件中。
`worker` 会持续同步仓库、扫描 issue并处理符合条件的任务。 `worker` 会持续同步仓库、扫描 issue、扫描 agent 已创建 PR 中的新 human 评论,并处理符合条件的任务。
## 命令 ## 命令
- `agent-gitea sync-repos` - `agent-gitea sync-repos`
- `agent-gitea scan-issues` - `agent-gitea scan-issues`
- `agent-gitea scan-pr-feedback`
- `agent-gitea run-once` - `agent-gitea run-once`
- `agent-gitea worker` - `agent-gitea worker`
- `agent-gitea show-task <task_id>` - `agent-gitea show-task <task_id>`

View File

@@ -9,7 +9,12 @@ import typer
from .config import AppConfig, load_config from .config import AppConfig, load_config
from .db import Database from .db import Database
from .gitea import GiteaClient from .gitea import GiteaClient
from .service import TaskRunner, scan_issues as scan_issues_service, sync_repositories from .service import (
TaskRunner,
scan_issues as scan_issues_service,
scan_pull_request_feedback,
sync_repositories,
)
app = typer.Typer(no_args_is_help=True, help="Agentic Gitea issue-to-PR manager.") app = typer.Typer(no_args_is_help=True, help="Agentic Gitea issue-to-PR manager.")
@@ -74,6 +79,17 @@ def run_once(ctx: typer.Context) -> None:
typer.echo(f"task {task.id} -> {task.state.value}") typer.echo(f"task {task.id} -> {task.state.value}")
@app.command("scan-pr-feedback")
def scan_pr_feedback(ctx: typer.Context) -> None:
cli: CliContext = ctx.obj
client = cli.gitea()
try:
queued = scan_pull_request_feedback(cli.db, client)
finally:
client.close()
typer.echo(f"queued {queued} tasks")
@app.command("worker") @app.command("worker")
def worker(ctx: typer.Context) -> None: def worker(ctx: typer.Context) -> None:
cli: CliContext = ctx.obj cli: CliContext = ctx.obj

View File

@@ -83,6 +83,13 @@ CREATE TABLE IF NOT EXISTS agent_runs (
report TEXT, report TEXT,
created_at TEXT NOT NULL created_at TEXT NOT NULL
); );
CREATE TABLE IF NOT EXISTS pull_request_feedback (
task_id INTEGER PRIMARY KEY REFERENCES tasks(id),
last_seen_comment_id INTEGER NOT NULL DEFAULT 0,
pending INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
""" """
@@ -271,6 +278,72 @@ class Database:
).fetchall() ).fetchall()
return [self._task(row) for row in rows] return [self._task(row) for row in rows]
def list_tasks_awaiting_pr_feedback(self) -> list[TaskRecord]:
rows = self.conn.execute(
"""
SELECT *
FROM tasks
WHERE state = ?
AND pr_number IS NOT NULL
ORDER BY id
""",
(TaskState.HUMAN_REVIEW_READY.value,),
).fetchall()
return [self._task(row) for row in rows]
def get_pr_feedback_cursor(self, task_id: int) -> int:
row = self.conn.execute(
"SELECT last_seen_comment_id FROM pull_request_feedback WHERE task_id = ?",
(task_id,),
).fetchone()
return int(row["last_seen_comment_id"]) if row else 0
def upsert_pr_feedback_state(
self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
pending: bool | None = None,
) -> None:
row = self.conn.execute(
"SELECT last_seen_comment_id, pending FROM pull_request_feedback WHERE task_id = ?",
(task_id,),
).fetchone()
now = dt_to_db(utcnow())
if row is None:
self.conn.execute(
"""
INSERT INTO pull_request_feedback (task_id, last_seen_comment_id, pending, updated_at)
VALUES (?, ?, ?, ?)
""",
(task_id, last_seen_comment_id or 0, int(bool(pending)), now),
)
else:
next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id
next_pending = bool(row["pending"]) if pending is None else pending
self.conn.execute(
"""
UPDATE pull_request_feedback
SET last_seen_comment_id = ?, pending = ?, updated_at = ?
WHERE task_id = ?
""",
(next_cursor, int(next_pending), now, task_id),
)
self.conn.commit()
def mark_pr_feedback_pending(self, task_id: int) -> None:
self.upsert_pr_feedback_state(task_id, pending=True)
def clear_pr_feedback_pending(self, task_id: int, *, last_seen_comment_id: int) -> None:
self.upsert_pr_feedback_state(task_id, last_seen_comment_id=last_seen_comment_id, pending=False)
def has_pending_pr_feedback(self, task_id: int) -> bool:
row = self.conn.execute(
"SELECT pending FROM pull_request_feedback WHERE task_id = ?",
(task_id,),
).fetchone()
return bool(row and row["pending"])
def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None: def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None:
now = utcnow() now = utcnow()
expires = now + timedelta(seconds=lease_seconds) expires = now + timedelta(seconds=lease_seconds)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from typing import Any from typing import Any
import httpx import httpx
@@ -27,6 +28,16 @@ class GiteaPullRequest:
merged: bool merged: bool
@dataclass(frozen=True)
class GiteaComment:
id: int
body: str
author: str
html_url: str
created_at: datetime | None
updated_at: datetime | None
@dataclass(frozen=True) @dataclass(frozen=True)
class GiteaRepository: class GiteaRepository:
owner: str owner: str
@@ -140,12 +151,32 @@ class GiteaClient:
response.raise_for_status() response.raise_for_status()
return pull_request_from_payload(response.json()) return pull_request_from_payload(response.json())
def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> None: def list_issue_comments(self, *, owner: str, name: str, issue_number: int) -> list[GiteaComment]:
comments: list[GiteaComment] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/issues/{issue_number}/comments",
params={"page": page, "limit": limit},
)
response.raise_for_status()
payload = response.json()
if not payload:
break
comments.extend(comment_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return comments
def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> GiteaComment:
response = self.client.post( response = self.client.post(
f"/repos/{owner}/{name}/issues/{issue_number}/comments", f"/repos/{owner}/{name}/issues/{issue_number}/comments",
json={"body": body}, json={"body": body},
) )
response.raise_for_status() response.raise_for_status()
return comment_from_payload(response.json())
def close_issue(self, *, owner: str, name: str, issue_number: int) -> None: def close_issue(self, *, owner: str, name: str, issue_number: int) -> None:
response = self.client.patch( response = self.client.patch(
@@ -192,3 +223,24 @@ def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest:
state=payload.get("state") or "", state=payload.get("state") or "",
merged=merged, merged=merged,
) )
def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
user_payload = payload.get("user") or payload.get("poster") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
return GiteaComment(
id=int(payload["id"]),
body=payload.get("body") or "",
author=str(author),
html_url=payload.get("html_url") or payload.get("url") or "",
created_at=parse_gitea_dt(payload.get("created_at")),
updated_at=parse_gitea_dt(payload.get("updated_at")),
)
def parse_gitea_dt(value: str | None) -> datetime | None:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+00:00"
return datetime.fromisoformat(value)

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from .gitea import GiteaComment
from .models import IssueRecord, RepositoryRecord from .models import IssueRecord, RepositoryRecord
@@ -46,6 +47,48 @@ Keep the section headings exactly as written below, but write the section conten
""" """
def render_pr_feedback_prompt(
repo: RepositoryRecord,
issue: IssueRecord,
pr_number: int,
branch_name: str,
comments: list[GiteaComment],
) -> str:
return f"""# Agent PR Feedback Task
Repository: {repo.full_name}
Base branch: {repo.default_branch}
Working branch: {branch_name}
Issue: #{issue.issue_number} {issue.title}
Issue URL: {issue.html_url}
Pull request: #{pr_number}
## Issue Body
{issue.body or "(no issue body)"}
## Human PR Comments To Address
{render_comment_thread(comments)}
## Instructions
Continue work in the current workspace and on the existing PR branch.
Address the human PR comments above with the smallest scoped changes that satisfy the review.
Create code changes only when needed, run the relevant tests before finishing, and leave the branch ready for a new commit.
Write `.agent-output/AGENT_IMPLEMENTATION_REPORT.md` using this exact section contract.
Keep the section headings exactly as written below, but write the section content in Chinese:
- Summary
- Files changed
- Test commands run
- Test results
- Known risks
- Follow-up suggestions
"""
def render_reviewer_prompt(repo: RepositoryRecord, issue: IssueRecord, pr_number: int) -> str: def render_reviewer_prompt(repo: RepositoryRecord, issue: IssueRecord, pr_number: int) -> str:
return f"""# Agent Review Task return f"""# Agent Review Task
@@ -87,6 +130,20 @@ def render_pr_body(issue: IssueRecord, implementation_report: str) -> str:
""" """
def render_comment_thread(comments: list[GiteaComment]) -> str:
if not comments:
return "(no new human PR comments found)"
blocks: list[str] = []
for comment in comments:
created = comment.created_at.isoformat() if comment.created_at else "unknown time"
header = f"- Comment #{comment.id} by {comment.author or 'unknown'} at {created}"
if comment.html_url:
header += f"\n URL: {comment.html_url}"
body = "\n".join(f" {line}" for line in comment.body.strip().splitlines())
blocks.append(f"{header}\n\n{body or ' (empty comment)'}")
return "\n\n".join(blocks)
def parse_review_report(raw: str) -> ReviewReport: def parse_review_report(raw: str) -> ReviewReport:
verdict_match = re.search(r"(?im)^\s*(?:[-*]\s*)?(?:##\s*)?Verdict\s*:?\s*`?([A-Z_]+)`?", raw) verdict_match = re.search(r"(?im)^\s*(?:[-*]\s*)?(?:##\s*)?Verdict\s*:?\s*`?([A-Z_]+)`?", raw)
verdict = verdict_match.group(1) if verdict_match else "NEEDS_HUMAN_DECISION" verdict = verdict_match.group(1) if verdict_match else "NEEDS_HUMAN_DECISION"
@@ -123,5 +180,5 @@ def render_human_review_summary(review: ReviewReport) -> str:
## 需要人工处理 ## 需要人工处理
请人工审核该 PR。agent-manager 不会自动合并关闭 PR 或提交变更请求 请人工审核该 PR。agent-manager 不会自动合并关闭 PR;如果继续在该 PR 中留言agent-manager 会检测评论并尝试提交后续修复
""" """

View File

@@ -8,12 +8,13 @@ from pathlib import Path
from .agents import CommandRunner, read_report, render_command, write_prompt from .agents import CommandRunner, read_report, render_command, write_prompt
from .config import AppConfig from .config import AppConfig
from .db import Database from .db import Database
from .gitea import GiteaClient from .gitea import GiteaClient, GiteaComment
from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState
from .rendering import ( from .rendering import (
parse_review_report, parse_review_report,
render_human_review_summary, render_human_review_summary,
render_implementer_prompt, render_implementer_prompt,
render_pr_feedback_prompt,
render_pr_body, render_pr_body,
render_reviewer_prompt, render_reviewer_prompt,
) )
@@ -93,6 +94,41 @@ def close_issues_for_merged_pull_requests(db: Database, client: GiteaClient) ->
return closed return closed
def scan_pull_request_feedback(db: Database, client: GiteaClient) -> int:
tasks = db.list_tasks_awaiting_pr_feedback()
if not tasks:
return 0
username = client.get_authenticated_username()
queued = 0
for task in tasks:
repo, _issue = load_task_context(db, task)
assert task.pr_number is not None
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
if pull_request.merged or pull_request.state != "open":
continue
comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
last_seen_comment_id = db.get_pr_feedback_cursor(task.id)
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id)
human_comments = [
comment
for comment in comments
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold()
]
if not human_comments:
if newest_comment_id > last_seen_comment_id:
db.upsert_pr_feedback_state(task.id, last_seen_comment_id=newest_comment_id)
continue
db.mark_pr_feedback_pending(task.id)
db.transition(
task.id,
TaskState.DISCOVERED,
message=f"queued PR feedback from {len(human_comments)} human comment(s)",
clear_lease=True,
)
queued += 1
return queued
class TaskRunner: class TaskRunner:
def __init__( def __init__(
self, self,
@@ -124,6 +160,8 @@ class TaskRunner:
logger.info("synced %d repositories", len(repos)) logger.info("synced %d repositories", len(repos))
closed = close_issues_for_merged_pull_requests(self.db, self.gitea) closed = close_issues_for_merged_pull_requests(self.db, self.gitea)
logger.info("closed %d issues for merged pull requests", closed) logger.info("closed %d issues for merged pull requests", closed)
feedback_tasks = scan_pull_request_feedback(self.db, self.gitea)
logger.info("queued %d tasks from pull request feedback", feedback_tasks)
task_ids = scan_issues(self.db, self.config, self.gitea) task_ids = scan_issues(self.db, self.config, self.gitea)
logger.info("created %d tasks from issue scan", len(task_ids)) logger.info("created %d tasks from issue scan", len(task_ids))
task = self.run_once() task = self.run_once()
@@ -138,6 +176,19 @@ class TaskRunner:
def run_claimed(self, task: TaskRecord) -> TaskRecord: def run_claimed(self, task: TaskRecord) -> TaskRecord:
try: try:
if self.db.has_pending_pr_feedback(task.id):
return self._run_pr_feedback(task)
return self._run_initial_implementation(task)
except Exception as exc:
return self.db.transition(
task.id,
TaskState.FAILED,
message="task failed",
error_message=str(exc),
clear_lease=True,
)
def _run_initial_implementation(self, task: TaskRecord) -> TaskRecord:
repo, issue = self._load_context(task) repo, issue = self._load_context(task)
validate_transition(task.state, TaskState.PLANNING) validate_transition(task.state, TaskState.PLANNING)
task = self.db.transition(task.id, TaskState.PLANNING, message="rendering implementation prompt") task = self.db.transition(task.id, TaskState.PLANNING, message="rendering implementation prompt")
@@ -183,33 +234,108 @@ class TaskRunner:
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent") task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent")
review_report_raw = self._run_reviewer(task, repo, issue, pr.number, workspace) review_report_raw = self._run_reviewer(task, repo, issue, pr.number, workspace)
review = parse_review_report(review_report_raw) review = parse_review_report(review_report_raw)
self.gitea.post_issue_comment( reviewer_comment = self.gitea.post_issue_comment(
owner=repo.owner, owner=repo.owner,
name=repo.name, name=repo.name,
issue_number=pr.number, issue_number=pr.number,
body=review_report_raw.strip() or "Reviewer did not produce a report.", body=review_report_raw.strip() or "Reviewer did not produce a report.",
) )
self.gitea.post_issue_comment( summary_comment = self.gitea.post_issue_comment(
owner=repo.owner, owner=repo.owner,
name=repo.name, name=repo.name,
issue_number=pr.number, issue_number=pr.number,
body=render_human_review_summary(review), body=render_human_review_summary(review),
) )
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=max(reviewer_comment.id, summary_comment.id),
)
final_task = self.db.transition( final_task = self.db.transition(
task.id, task.id,
TaskState.HUMAN_REVIEW_READY, TaskState.HUMAN_REVIEW_READY,
message=f"human review summary posted with verdict {review.verdict}", message=f"human review summary posted with verdict {review.verdict}",
clear_lease=True, clear_lease=True,
) )
if self.config.workspace.cleanup_on_success and final_task.workspace_path:
self.workspace_manager.cleanup(final_task.workspace_path)
return final_task return final_task
except Exception as exc:
def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord:
repo, issue = self._load_context(task)
if task.pr_number is None:
raise ValueError(f"task {task.id} has pending PR feedback without a PR number")
if not task.branch_name:
raise ValueError(f"task {task.id} has pending PR feedback without a branch")
validate_transition(task.state, TaskState.PLANNING)
task = self.db.transition(task.id, TaskState.PLANNING, message="rendering PR feedback prompt")
workspace = self.workspace_manager.prepare_existing_branch(repo, issue, task.branch_name, task.workspace_path)
task = self.db.transition(
task.id,
TaskState.IMPLEMENTING,
message=f"running implementer agent for PR #{task.pr_number} feedback",
workspace_path=workspace,
)
last_seen_comment_id = self.db.get_pr_feedback_cursor(task.id)
comments = self.gitea.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=task.pr_number)
username = self.gitea.get_authenticated_username()
human_comments = [
comment
for comment in comments
if comment.id > last_seen_comment_id and comment.author.casefold() != username.casefold()
]
if not human_comments:
newest_comment_id = max((comment.id for comment in comments), default=last_seen_comment_id)
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id)
return self.db.transition( return self.db.transition(
task.id, task.id,
TaskState.FAILED, TaskState.HUMAN_REVIEW_READY,
message="task failed", message="PR feedback queue was stale; no new human comments found",
error_message=str(exc), clear_lease=True,
)
implementation_report = self._run_pr_feedback_implementer(
task,
repo,
issue,
task.branch_name,
task.pr_number,
human_comments,
workspace,
)
task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff")
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
self.workspace_manager.push_branch(workspace, task.branch_name)
self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=task.pr_number,
body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}",
)
task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}")
validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback")
review_report_raw = self._run_reviewer(task, repo, issue, task.pr_number, workspace)
review = parse_review_report(review_report_raw)
reviewer_comment = self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=task.pr_number,
body=review_report_raw.strip() or "Reviewer did not produce a report.",
)
summary_comment = self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=task.pr_number,
body=render_human_review_summary(review),
)
newest_comment_id = max(
[comment.id for comment in comments] + [reviewer_comment.id, summary_comment.id],
default=max(reviewer_comment.id, summary_comment.id),
)
self.db.clear_pr_feedback_pending(task.id, last_seen_comment_id=newest_comment_id)
return self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message=f"PR feedback handled; human review summary posted with verdict {review.verdict}",
clear_lease=True, clear_lease=True,
) )
@@ -250,6 +376,46 @@ class TaskRunner:
raise RuntimeError(f"implementer failed with exit code {result.exit_code}") raise RuntimeError(f"implementer failed with exit code {result.exit_code}")
return report or "(implementer did not produce AGENT_IMPLEMENTATION_REPORT.md)" return report or "(implementer did not produce AGENT_IMPLEMENTATION_REPORT.md)"
def _run_pr_feedback_implementer(
self,
task: TaskRecord,
repo: RepositoryRecord,
issue: IssueRecord,
branch_name: str,
pr_number: int,
comments: list[GiteaComment],
workspace: Path,
) -> str:
prompt = render_pr_feedback_prompt(repo, issue, pr_number, branch_name, comments)
output_dir = workspace / ".agent-output"
output_dir.mkdir(exist_ok=True)
prompt_path = output_dir / "AGENT_PR_FEEDBACK_PROMPT.md"
write_prompt(prompt_path, prompt)
command = render_command(
self.config.agents.implementer.command,
workspace_path=workspace.resolve(),
prompt_path=prompt_path.resolve(),
issue_number=issue.issue_number,
issue_title=issue.title,
pr_number=pr_number,
branch_name=branch_name,
)
result = self.command_runner.run(command, workspace, stdin=prompt)
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
self.db.add_agent_run(
task_id=task.id,
role="implementer_pr_feedback",
command=command,
prompt=prompt,
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.exit_code,
report=report,
)
if not result.ok:
raise RuntimeError(f"implementer failed with exit code {result.exit_code}")
return report or "(implementer did not produce AGENT_IMPLEMENTATION_REPORT.md)"
def _run_reviewer( def _run_reviewer(
self, self,
task: TaskRecord, task: TaskRecord,

View File

@@ -35,11 +35,33 @@ class WorkspaceManager:
self.exclude_runtime_artifacts(path) self.exclude_runtime_artifacts(path)
return path return path
def prepare_existing_branch(
self,
repo: RepositoryRecord,
issue: IssueRecord,
branch_name: str,
workspace_path: str | Path | None,
) -> Path:
path = Path(workspace_path) if workspace_path else self.task_workspace(repo, issue)
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
self._git(["clone", repo.clone_url, str(path)], Path.cwd())
self.exclude_runtime_artifacts(path)
self._git(["fetch", "origin"], path)
checkout = self._git(["checkout", branch_name], path, check=False)
if checkout.returncode != 0:
self._git(["checkout", "-B", branch_name, f"origin/{branch_name}"], path)
pull = self._git(["pull", "--ff-only", "origin", branch_name], path, check=False)
if pull.returncode != 0:
raise RuntimeError(f"git pull --ff-only origin {branch_name} failed: {pull.stderr.strip()}")
return path
def exclude_runtime_artifacts(self, workspace: str | Path) -> None: def exclude_runtime_artifacts(self, workspace: str | Path) -> None:
exclude_path = Path(workspace) / ".git" / "info" / "exclude" exclude_path = Path(workspace) / ".git" / "info" / "exclude"
entries = { entries = {
".agent-output/", ".agent-output/",
"AGENT_IMPLEMENTER_PROMPT.md", "AGENT_IMPLEMENTER_PROMPT.md",
"AGENT_PR_FEEDBACK_PROMPT.md",
"AGENT_REVIEWER_PROMPT.md", "AGENT_REVIEWER_PROMPT.md",
} }
existing = exclude_path.read_text(encoding="utf-8") if exclude_path.exists() else "" existing = exclude_path.read_text(encoding="utf-8") if exclude_path.exists() else ""

View File

@@ -8,7 +8,13 @@ import httpx
from agent_gitea.config import AppConfig from agent_gitea.config import AppConfig
from agent_gitea.gitea import GiteaClient from agent_gitea.gitea import GiteaClient
from agent_gitea.models import AgentResult, TaskState from agent_gitea.models import AgentResult, TaskState
from agent_gitea.service import TaskRunner, close_issues_for_merged_pull_requests, scan_issues, sync_repositories from agent_gitea.service import (
TaskRunner,
close_issues_for_merged_pull_requests,
scan_issues,
scan_pull_request_feedback,
sync_repositories,
)
def make_config(tmp_path: Path, **overrides: object) -> AppConfig: def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
@@ -167,10 +173,17 @@ class FakeWorkspaceManager:
self.root = root self.root = root
self.diff = diff self.diff = diff
self.pushed: list[str] = [] self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = []
def prepare(self, repo, issue, branch_name): def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_") path = self.root / branch_name.replace("/", "_")
path.mkdir(parents=True) path.mkdir(parents=True, exist_ok=True)
return path
def prepare_existing_branch(self, repo, issue, branch_name, workspace_path):
self.resumed.append((branch_name, workspace_path))
path = Path(workspace_path) if workspace_path else self.root / branch_name.replace("/", "_")
path.mkdir(parents=True, exist_ok=True)
return path return path
def has_diff(self, workspace, base_ref="origin/HEAD"): def has_diff(self, workspace, base_ref="origin/HEAD"):
@@ -233,6 +246,16 @@ def seed_task(db):
return db.create_task(repo.id, 1) return db.create_task(repo.id, 1)
def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None):
db.transition(task_id, TaskState.CLAIMED)
db.transition(task_id, TaskState.PLANNING)
db.transition(task_id, TaskState.IMPLEMENTING, branch_name=branch_name)
db.transition(task_id, TaskState.TESTING)
db.transition(task_id, TaskState.PR_OPENED, pr_number=pr_number)
db.transition(task_id, TaskState.REVIEWING)
return db.transition(task_id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def test_run_task_success_posts_review_comments(db, tmp_path): def test_run_task_success_posts_review_comments(db, tmp_path):
config = make_config(tmp_path) config = make_config(tmp_path)
seed_task(db) seed_task(db)
@@ -272,6 +295,115 @@ def test_run_task_success_posts_review_comments(db, tmp_path):
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2 assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
def test_scan_pull_request_feedback_queues_task_for_new_human_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "internal note", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
task_after_scan = db.get_task(task.id)
assert queued == 1
assert task_after_scan is not None
assert task_after_scan.state == TaskState.DISCOVERED
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 2
def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[{"id": 3, "body": "summary", "user": {"login": "agent-bot"}}])
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
assert queued == 0
assert not db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
next_comment_id = 3
def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
next_comment_id += 1
return httpx.Response(201, json={"id": next_comment_id, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert finished.pr_number == 5
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.pushed == ["agent/issue-1-ready-issue"]
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 6
assert not db.has_pending_pr_feedback(task.id)
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db): def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository( repo = db.upsert_repository(
owner="acme", owner="acme",