Compare commits

...

9 Commits

15 changed files with 1590 additions and 132 deletions

1
.gitignore vendored
View File

@@ -1,4 +1,5 @@
.agent-gitea/
.agent-output/
.env
.pytest_cache/
.ruff_cache/

View File

@@ -2,7 +2,7 @@
[中文说明](README_CN.md)
Local, auditable CLI service that scans configured Gitea repositories for eligible issues, runs an implementer agent in an isolated workspace, opens a pull request, runs a reviewer agent, and posts a standardized human review summary.
Local, auditable CLI service that scans configured Gitea repositories for eligible issues, runs an implementer agent in an isolated workspace, opens a pull request, runs a reviewer agent, and posts a standardized human review summary. After the PR is opened, it also watches PR comments from humans and can continue the same task on the existing branch to push follow-up commits.
The MVP never merges pull requests.
@@ -18,12 +18,13 @@ agent-gitea --config config.yaml run-once
```
`sync-repos` discovers repositories owned by the authenticated Gitea user from `/user/repos`; repositories are not listed in the config file.
`worker` continuously syncs repositories, scans issues, and processes eligible tasks.
`worker` continuously syncs repositories, scans issues, scans open agent PRs for new human comments, and processes eligible tasks.
## Commands
- `agent-gitea sync-repos`
- `agent-gitea scan-issues`
- `agent-gitea scan-pr-feedback`
- `agent-gitea run-once`
- `agent-gitea worker`
- `agent-gitea show-task <task_id>`

View File

@@ -2,7 +2,7 @@
[English](README.md)
一个本地运行、可审计的 CLI 服务,用于扫描已配置 Gitea 仓库中的合格 issue在隔离工作区中运行实现 agent创建 pull request运行评审 agent并发布标准化的人工评审摘要。
一个本地运行、可审计的 CLI 服务,用于扫描已配置 Gitea 仓库中的合格 issue在隔离工作区中运行实现 agent创建 pull request运行评审 agent并发布标准化的人工评审摘要。PR 创建后,它还会监控来自 human 的 PR 评论,并在同一个任务、已有分支和 workspace 上继续处理,推送后续 commit。
MVP 版本不会合并 pull request。
@@ -18,12 +18,13 @@ agent-gitea --config config.yaml run-once
```
`sync-repos` 会通过 `/user/repos` 发现已认证 Gitea 用户拥有的仓库;仓库不会列在配置文件中。
`worker` 会持续同步仓库、扫描 issue并处理符合条件的任务。
`worker` 会持续同步仓库、扫描 issue、扫描 agent 已创建 PR 中的新 human 评论,并处理符合条件的任务。
## 命令
- `agent-gitea sync-repos`
- `agent-gitea scan-issues`
- `agent-gitea scan-pr-feedback`
- `agent-gitea run-once`
- `agent-gitea worker`
- `agent-gitea show-task <task_id>`

1
run.sh Executable file
View File

@@ -0,0 +1 @@
uv run agent-gitea --config config.yaml worker

View File

@@ -7,8 +7,29 @@ from .models import AgentResult
class CommandRunner:
def run(self, command: list[str], cwd: str | Path, *, stdin: str | None = None) -> AgentResult:
result = subprocess.run(command, cwd=cwd, input=stdin, text=True, capture_output=True, check=False)
def run(
self,
command: list[str],
cwd: str | Path,
*,
stdin: str | None = None,
timeout_seconds: int | float | None = None,
) -> AgentResult:
try:
result = subprocess.run(
command,
cwd=cwd,
input=stdin,
text=True,
capture_output=True,
check=False,
timeout=timeout_seconds,
)
except subprocess.TimeoutExpired as exc:
stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode(errors="replace")
stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode(errors="replace")
message = f"command timed out after {timeout_seconds} seconds"
return AgentResult(exit_code=124, stdout=stdout or "", stderr=f"{stderr}\n{message}".strip())
return AgentResult(exit_code=result.returncode, stdout=result.stdout, stderr=result.stderr)

View File

@@ -9,7 +9,12 @@ import typer
from .config import AppConfig, load_config
from .db import Database
from .gitea import GiteaClient
from .service import TaskRunner, scan_issues as scan_issues_service, sync_repositories
from .service import (
TaskRunner,
scan_issues as scan_issues_service,
scan_pull_request_feedback,
sync_repositories,
)
app = typer.Typer(no_args_is_help=True, help="Agentic Gitea issue-to-PR manager.")
@@ -74,6 +79,17 @@ def run_once(ctx: typer.Context) -> None:
typer.echo(f"task {task.id} -> {task.state.value}")
@app.command("scan-pr-feedback")
def scan_pr_feedback(ctx: typer.Context) -> None:
cli: CliContext = ctx.obj
client = cli.gitea()
try:
queued = scan_pull_request_feedback(cli.db, client)
finally:
client.close()
typer.echo(f"queued {queued} tasks")
@app.command("worker")
def worker(ctx: typer.Context) -> None:
cli: CliContext = ctx.obj

View File

@@ -24,6 +24,14 @@ class SchedulerConfig(BaseModel):
interval_seconds: int = Field(default=60, ge=1)
concurrency: int = Field(default=1, ge=1)
lease_seconds: int = Field(default=1800, ge=30)
lease_renewal_interval_seconds: int | None = Field(default=None, ge=1)
agent_timeout_seconds: int = Field(default=7200, ge=1)
@property
def effective_lease_renewal_interval_seconds(self) -> int:
if self.lease_renewal_interval_seconds is not None:
return self.lease_renewal_interval_seconds
return max(1, self.lease_seconds // 3)
class WorkspaceConfig(BaseModel):

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import json
import sqlite3
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Iterable
@@ -83,9 +84,25 @@ CREATE TABLE IF NOT EXISTS agent_runs (
report TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS pull_request_feedback (
task_id INTEGER PRIMARY KEY REFERENCES tasks(id),
last_seen_comment_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_id INTEGER NOT NULL DEFAULT 0,
last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0,
pending INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
"""
@dataclass(frozen=True)
class PullRequestFeedbackCursor:
last_seen_comment_id: int = 0
last_seen_review_id: int = 0
last_seen_review_comment_id: int = 0
class Database:
def __init__(self, path: str | Path):
self.path = Path(path)
@@ -99,8 +116,21 @@ class Database:
def migrate(self) -> None:
self.conn.executescript(SCHEMA)
self._migrate_pull_request_feedback_columns()
self.conn.commit()
def _migrate_pull_request_feedback_columns(self) -> None:
rows = self.conn.execute("PRAGMA table_info(pull_request_feedback)").fetchall()
columns = {row["name"] for row in rows}
if "last_seen_review_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_id INTEGER NOT NULL DEFAULT 0"
)
if "last_seen_review_comment_id" not in columns:
self.conn.execute(
"ALTER TABLE pull_request_feedback ADD COLUMN last_seen_review_comment_id INTEGER NOT NULL DEFAULT 0"
)
def upsert_repository(
self,
*,
@@ -195,6 +225,17 @@ class Database:
).fetchall()
return [self._issue(row) for row in rows]
def update_issue_state(self, repo_id: int, issue_number: int, state: str) -> None:
self.conn.execute(
"""
UPDATE issues
SET state = ?, updated_at = ?
WHERE repo_id = ? AND issue_number = ?
""",
(state, dt_to_db(utcnow()), repo_id, issue_number),
)
self.conn.commit()
def active_task_for_issue(self, repo_id: int, issue_number: int) -> TaskRecord | None:
placeholders = ",".join("?" for _ in ACTIVE_STATES)
rows = self.conn.execute(
@@ -245,52 +286,233 @@ class Database:
).fetchone()
return self._task(row) if row else None
def list_tasks_pending_issue_close(self) -> list[TaskRecord]:
rows = self.conn.execute(
"""
SELECT t.*
FROM tasks t
JOIN issues i ON i.repo_id = t.repo_id AND i.issue_number = t.issue_number
WHERE t.state != ?
AND t.pr_number IS NOT NULL
AND i.state = 'open'
AND (
t.state IN (?, ?)
OR t.lease_owner IS NULL
OR t.lease_expires_at IS NULL
OR t.lease_expires_at < ?
)
ORDER BY t.id
""",
(
TaskState.CANCELLED.value,
TaskState.HUMAN_REVIEW_READY.value,
TaskState.FAILED.value,
dt_to_db(utcnow()),
),
).fetchall()
return [self._task(row) for row in rows]
def list_tasks_awaiting_pr_feedback(self) -> list[TaskRecord]:
rows = self.conn.execute(
"""
SELECT *
FROM tasks
WHERE state = ?
AND pr_number IS NOT NULL
ORDER BY id
""",
(TaskState.HUMAN_REVIEW_READY.value,),
).fetchall()
return [self._task(row) for row in rows]
def get_pr_feedback_cursor(self, task_id: int) -> int:
row = self.conn.execute(
"SELECT last_seen_comment_id FROM pull_request_feedback WHERE task_id = ?",
(task_id,),
).fetchone()
return int(row["last_seen_comment_id"]) if row else 0
def get_pr_feedback_cursors(self, task_id: int) -> PullRequestFeedbackCursor:
row = self.conn.execute(
"""
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,),
).fetchone()
if row is None:
return PullRequestFeedbackCursor()
return PullRequestFeedbackCursor(
last_seen_comment_id=int(row["last_seen_comment_id"]),
last_seen_review_id=int(row["last_seen_review_id"]),
last_seen_review_comment_id=int(row["last_seen_review_comment_id"]),
)
def upsert_pr_feedback_state(
self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
pending: bool | None = None,
) -> None:
row = self.conn.execute(
"""
SELECT last_seen_comment_id, last_seen_review_id, last_seen_review_comment_id, pending
FROM pull_request_feedback
WHERE task_id = ?
""",
(task_id,),
).fetchone()
now = dt_to_db(utcnow())
if row is None:
self.conn.execute(
"""
INSERT INTO pull_request_feedback (
task_id,
last_seen_comment_id,
last_seen_review_id,
last_seen_review_comment_id,
pending,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
task_id,
last_seen_comment_id or 0,
last_seen_review_id or 0,
last_seen_review_comment_id or 0,
int(bool(pending)),
now,
),
)
else:
next_cursor = int(row["last_seen_comment_id"]) if last_seen_comment_id is None else last_seen_comment_id
next_review_cursor = int(row["last_seen_review_id"]) if last_seen_review_id is None else last_seen_review_id
next_review_comment_cursor = (
int(row["last_seen_review_comment_id"])
if last_seen_review_comment_id is None
else last_seen_review_comment_id
)
next_pending = bool(row["pending"]) if pending is None else pending
self.conn.execute(
"""
UPDATE pull_request_feedback
SET last_seen_comment_id = ?,
last_seen_review_id = ?,
last_seen_review_comment_id = ?,
pending = ?,
updated_at = ?
WHERE task_id = ?
""",
(next_cursor, next_review_cursor, next_review_comment_cursor, int(next_pending), now, task_id),
)
self.conn.commit()
def mark_pr_feedback_pending(self, task_id: int) -> None:
self.upsert_pr_feedback_state(task_id, pending=True)
def clear_pr_feedback_pending(
self,
task_id: int,
*,
last_seen_comment_id: int | None = None,
last_seen_review_id: int | None = None,
last_seen_review_comment_id: int | None = None,
) -> None:
self.upsert_pr_feedback_state(
task_id,
last_seen_comment_id=last_seen_comment_id,
last_seen_review_id=last_seen_review_id,
last_seen_review_comment_id=last_seen_review_comment_id,
pending=False,
)
def has_pending_pr_feedback(self, task_id: int) -> bool:
row = self.conn.execute(
"SELECT pending FROM pull_request_feedback WHERE task_id = ?",
(task_id,),
).fetchone()
return bool(row and row["pending"])
def claim_next_task(self, worker_id: str, lease_seconds: int) -> TaskRecord | None:
now = utcnow()
expires = now + timedelta(seconds=lease_seconds)
row = self.conn.execute(
"""
SELECT * FROM tasks
WHERE state = ?
OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?)
ORDER BY created_at
LIMIT 1
""",
(
TaskState.DISCOVERED.value,
TaskState.CLAIMED.value,
TaskState.PLANNING.value,
TaskState.IMPLEMENTING.value,
TaskState.TESTING.value,
TaskState.PR_OPENED.value,
TaskState.REVIEWING.value,
TaskState.DISCOVERED.value,
dt_to_db(now),
),
).fetchone()
if row is None:
return None
task = self._task(row)
self.conn.execute(
"""
UPDATE tasks
SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ?
WHERE id = ?
""",
(
TaskState.CLAIMED.value,
worker_id,
dt_to_db(expires),
dt_to_db(now),
task.id,
),
)
self.conn.commit()
try:
self.conn.execute("BEGIN IMMEDIATE")
row = self.conn.execute(
"""
SELECT * FROM tasks
WHERE state = ?
OR (state IN (?, ?, ?, ?, ?, ?, ?) AND lease_expires_at IS NOT NULL AND lease_expires_at < ?)
ORDER BY created_at
LIMIT 1
""",
(
TaskState.DISCOVERED.value,
TaskState.CLAIMED.value,
TaskState.PLANNING.value,
TaskState.IMPLEMENTING.value,
TaskState.TESTING.value,
TaskState.PR_OPENED.value,
TaskState.REVIEWING.value,
TaskState.DISCOVERED.value,
dt_to_db(now),
),
).fetchone()
if row is None:
self.conn.commit()
return None
task = self._task(row)
self.conn.execute(
"""
UPDATE tasks
SET state = ?, lease_owner = ?, lease_expires_at = ?, updated_at = ?
WHERE id = ?
""",
(
TaskState.CLAIMED.value,
worker_id,
dt_to_db(expires),
dt_to_db(now),
task.id,
),
)
self.conn.commit()
except Exception:
self.conn.rollback()
raise
self.add_event(task.id, task.state, TaskState.CLAIMED, f"claimed by {worker_id}")
claimed = self.get_task(task.id)
assert claimed is not None
return claimed
def renew_task_lease(self, task_id: int, worker_id: str, lease_seconds: int) -> bool:
now = utcnow()
expires = now + timedelta(seconds=lease_seconds)
placeholders = ",".join("?" for _ in ACTIVE_STATES)
cursor = self.conn.execute(
f"""
UPDATE tasks
SET lease_expires_at = ?, updated_at = ?
WHERE id = ?
AND lease_owner = ?
AND state IN ({placeholders})
""",
(
dt_to_db(expires),
dt_to_db(now),
task_id,
worker_id,
*[state.value for state in ACTIVE_STATES],
),
)
self.conn.commit()
return cursor.rowcount == 1
def transition(
self,
task_id: int,

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import httpx
@@ -23,6 +24,29 @@ class GiteaIssue:
class GiteaPullRequest:
number: int
html_url: str
state: str
merged: bool
@dataclass(frozen=True)
class GiteaComment:
id: int
body: str
author: str
html_url: str
created_at: datetime | None
updated_at: datetime | None
@dataclass(frozen=True)
class GiteaPullReview:
id: int
body: str
author: str
state: str
html_url: str
submitted_at: datetime | None
updated_at: datetime | None
@dataclass(frozen=True)
@@ -131,14 +155,95 @@ class GiteaClient:
)
response.raise_for_status()
payload = response.json()
return GiteaPullRequest(number=int(payload["number"]), html_url=payload.get("html_url") or "")
return pull_request_from_payload(payload)
def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> None:
def get_pull_request(self, *, owner: str, name: str, pr_number: int) -> GiteaPullRequest:
response = self.client.get(f"/repos/{owner}/{name}/pulls/{pr_number}")
response.raise_for_status()
return pull_request_from_payload(response.json())
def list_issue_comments(self, *, owner: str, name: str, issue_number: int) -> list[GiteaComment]:
comments: list[GiteaComment] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/issues/{issue_number}/comments",
params={"page": page, "limit": limit},
)
response.raise_for_status()
payload = response.json()
if not payload:
break
comments.extend(comment_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return comments
def post_issue_comment(self, *, owner: str, name: str, issue_number: int, body: str) -> GiteaComment:
response = self.client.post(
f"/repos/{owner}/{name}/issues/{issue_number}/comments",
json={"body": body},
)
response.raise_for_status()
return comment_from_payload(response.json())
def list_pull_request_reviews(self, *, owner: str, name: str, pr_number: int) -> list[GiteaPullReview]:
reviews: list[GiteaPullReview] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return reviews
response.raise_for_status()
payload = response.json()
if not payload:
break
reviews.extend(pull_review_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return reviews
def list_pull_request_review_comments(
self,
*,
owner: str,
name: str,
pr_number: int,
review_id: int,
) -> list[GiteaComment]:
comments: list[GiteaComment] = []
page = 1
limit = 50
while True:
response = self.client.get(
f"/repos/{owner}/{name}/pulls/{pr_number}/reviews/{review_id}/comments",
params={"page": page, "limit": limit},
)
if response.status_code == 404:
return comments
response.raise_for_status()
payload = response.json()
if not payload:
break
comments.extend(review_comment_from_payload(item) for item in payload)
if len(payload) < limit:
break
page += 1
return comments
def close_issue(self, *, owner: str, name: str, issue_number: int) -> None:
response = self.client.patch(
f"/repos/{owner}/{name}/issues/{issue_number}",
json={"state": "closed"},
)
response.raise_for_status()
def clone_url_from_repo_payload(payload: dict[str, Any], fallback_base_url: str, owner: str, name: str) -> str:
@@ -168,3 +273,71 @@ def repository_from_payload(payload: dict[str, Any], fallback_base_url: str) ->
clone_url=clone_url_from_repo_payload(payload, fallback_base_url, str(owner), str(name)),
default_branch=payload.get("default_branch") or "main",
)
def pull_request_from_payload(payload: dict[str, Any]) -> GiteaPullRequest:
merged = bool(payload.get("merged") or payload.get("has_merged") or payload.get("merged_at"))
return GiteaPullRequest(
number=int(payload["number"]),
html_url=payload.get("html_url") or payload.get("url") or "",
state=payload.get("state") or "",
merged=merged,
)
def pull_review_from_payload(payload: dict[str, Any]) -> GiteaPullReview:
user_payload = payload.get("user") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
return GiteaPullReview(
id=int(payload["id"]),
body=payload.get("body") or "",
author=str(author),
state=payload.get("state") or "",
html_url=payload.get("html_url") or payload.get("url") or "",
submitted_at=parse_gitea_dt(payload.get("submitted_at")),
updated_at=parse_gitea_dt(payload.get("updated_at")),
)
def comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
user_payload = payload.get("user") or payload.get("poster") or {}
author = user_payload.get("login") or user_payload.get("username") or ""
return GiteaComment(
id=int(payload["id"]),
body=payload.get("body") or "",
author=str(author),
html_url=payload.get("html_url") or payload.get("url") or "",
created_at=parse_gitea_dt(payload.get("created_at")),
updated_at=parse_gitea_dt(payload.get("updated_at")),
)
def review_comment_from_payload(payload: dict[str, Any]) -> GiteaComment:
comment = comment_from_payload(payload)
path = payload.get("path")
position = payload.get("position") or payload.get("original_position")
location_parts = []
if path:
location_parts.append(str(path))
if position:
location_parts.append(f"line {position}")
if not location_parts:
return comment
location = ":".join(location_parts)
body = f"Inline comment on {location}\n\n{comment.body}"
return GiteaComment(
id=comment.id,
body=body,
author=comment.author,
html_url=comment.html_url,
created_at=comment.created_at,
updated_at=comment.updated_at,
)
def parse_gitea_dt(value: str | None) -> datetime | None:
if not value:
return None
if value.endswith("Z"):
value = value[:-1] + "+00:00"
return datetime.fromisoformat(value)

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import re
from dataclasses import dataclass
from .gitea import GiteaComment
from .models import IssueRecord, RepositoryRecord
@@ -46,6 +47,48 @@ Keep the section headings exactly as written below, but write the section conten
"""
def render_pr_feedback_prompt(
repo: RepositoryRecord,
issue: IssueRecord,
pr_number: int,
branch_name: str,
comments: list[GiteaComment],
) -> str:
return f"""# Agent PR Feedback Task
Repository: {repo.full_name}
Base branch: {repo.default_branch}
Working branch: {branch_name}
Issue: #{issue.issue_number} {issue.title}
Issue URL: {issue.html_url}
Pull request: #{pr_number}
## Issue Body
{issue.body or "(no issue body)"}
## Human PR Comments To Address
{render_comment_thread(comments)}
## Instructions
Continue work in the current workspace and on the existing PR branch.
Address the human PR comments above with the smallest scoped changes that satisfy the review.
Create code changes only when needed, run the relevant tests before finishing, and leave the branch ready for a new commit.
Write `.agent-output/AGENT_IMPLEMENTATION_REPORT.md` using this exact section contract.
Keep the section headings exactly as written below, but write the section content in Chinese:
- Summary
- Files changed
- Test commands run
- Test results
- Known risks
- Follow-up suggestions
"""
def render_reviewer_prompt(repo: RepositoryRecord, issue: IssueRecord, pr_number: int) -> str:
return f"""# Agent Review Task
@@ -74,6 +117,8 @@ but write the section content and Suggested PR Comment in Chinese:
def render_pr_body(issue: IssueRecord, implementation_report: str) -> str:
return f"""关联 Issue#{issue.issue_number}
合并后自动关闭Closes #{issue.issue_number}
## 代理实现报告
{implementation_report.strip()}
@@ -85,6 +130,20 @@ def render_pr_body(issue: IssueRecord, implementation_report: str) -> str:
"""
def render_comment_thread(comments: list[GiteaComment]) -> str:
if not comments:
return "(no new human PR comments found)"
blocks: list[str] = []
for comment in comments:
created = comment.created_at.isoformat() if comment.created_at else "unknown time"
header = f"- Comment #{comment.id} by {comment.author or 'unknown'} at {created}"
if comment.html_url:
header += f"\n URL: {comment.html_url}"
body = "\n".join(f" {line}" for line in comment.body.strip().splitlines())
blocks.append(f"{header}\n\n{body or ' (empty comment)'}")
return "\n\n".join(blocks)
def parse_review_report(raw: str) -> ReviewReport:
verdict_match = re.search(r"(?im)^\s*(?:[-*]\s*)?(?:##\s*)?Verdict\s*:?\s*`?([A-Z_]+)`?", raw)
verdict = verdict_match.group(1) if verdict_match else "NEEDS_HUMAN_DECISION"
@@ -121,5 +180,5 @@ def render_human_review_summary(review: ReviewReport) -> str:
## 需要人工处理
请人工审核该 PR。agent-manager 不会自动合并关闭 PR 或提交变更请求
请人工审核该 PR。agent-manager 不会自动合并关闭 PR;如果继续在该 PR 中留言agent-manager 会检测评论并尝试提交后续修复
"""

View File

@@ -3,17 +3,20 @@ from __future__ import annotations
import socket
import time
import logging
import threading
from dataclasses import dataclass
from pathlib import Path
from .agents import CommandRunner, read_report, render_command, write_prompt
from .config import AppConfig
from .db import Database
from .gitea import GiteaClient
from .models import IssueRecord, RepositoryRecord, TaskRecord, TaskState
from .db import Database, PullRequestFeedbackCursor
from .gitea import GiteaClient, GiteaComment, GiteaPullReview
from .models import ACTIVE_STATES, IssueRecord, RepositoryRecord, TaskRecord, TaskState
from .rendering import (
parse_review_report,
render_human_review_summary,
render_implementer_prompt,
render_pr_feedback_prompt,
render_pr_body,
render_reviewer_prompt,
)
@@ -25,6 +28,53 @@ from .workspace import WorkspaceManager, safe_branch_name
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class PullRequestFeedbackSnapshot:
human_comments: list[GiteaComment]
handled_cursor: PullRequestFeedbackCursor
newest_cursor: PullRequestFeedbackCursor
class TaskLeaseRenewer:
def __init__(
self,
*,
db_path: Path,
task_id: int,
worker_id: str,
lease_seconds: int,
interval_seconds: int,
):
self.db_path = db_path
self.task_id = task_id
self.worker_id = worker_id
self.lease_seconds = lease_seconds
self.interval_seconds = interval_seconds
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, name=f"lease-renewer-{task_id}", daemon=True)
def __enter__(self) -> TaskLeaseRenewer:
self._thread.start()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self._stop.set()
self._thread.join(timeout=max(1, self.interval_seconds))
def _run(self) -> None:
database = Database(self.db_path)
try:
while not self._stop.wait(self.interval_seconds):
renewed = database.renew_task_lease(self.task_id, self.worker_id, self.lease_seconds)
if not renewed:
logger.warning("stopping lease renewal for task %d; lease no longer belongs to %s", self.task_id, self.worker_id)
return
except Exception:
logger.exception("lease renewal failed for task %d", self.task_id)
finally:
database.close()
def sync_repositories(db: Database, config: AppConfig, client: GiteaClient) -> list[RepositoryRecord]:
synced: list[RepositoryRecord] = []
discovered = client.list_owned_repositories()
@@ -67,6 +117,65 @@ def scan_issues(db: Database, config: AppConfig, client: GiteaClient) -> list[in
return scan_eligible_issues(db, config.labels)
def close_issues_for_merged_pull_requests(db: Database, client: GiteaClient) -> int:
closed = 0
for task in db.list_tasks_pending_issue_close():
repo, issue = load_task_context(db, task)
assert task.pr_number is not None
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
if not pull_request.merged:
continue
client.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=issue.issue_number,
body=f"关联 PR #{task.pr_number} 已合并agent-manager 自动关闭该 issue。",
)
client.close_issue(owner=repo.owner, name=repo.name, issue_number=issue.issue_number)
db.update_issue_state(task.repo_id, task.issue_number, "closed")
message = f"closed issue #{issue.issue_number} after merged PR #{task.pr_number}"
if task.state in ACTIVE_STATES:
db.clear_pr_feedback_pending(task.id)
db.transition(task.id, TaskState.CANCELLED, message=message, clear_lease=True)
else:
db.add_event(task.id, task.state, task.state, message)
closed += 1
return closed
def scan_pull_request_feedback(db: Database, client: GiteaClient) -> int:
tasks = db.list_tasks_awaiting_pr_feedback()
if not tasks:
return 0
username = client.get_authenticated_username()
queued = 0
for task in tasks:
repo, _issue = load_task_context(db, task)
assert task.pr_number is not None
pull_request = client.get_pull_request(owner=repo.owner, name=repo.name, pr_number=task.pr_number)
if pull_request.merged or pull_request.state != "open":
continue
cursors = db.get_pr_feedback_cursors(task.id)
feedback = load_new_human_pr_feedback(client, repo, task.pr_number, username, cursors)
if not feedback.human_comments:
db.upsert_pr_feedback_state(
task.id,
last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
)
continue
db.mark_pr_feedback_pending(task.id)
db.transition(
task.id,
TaskState.DISCOVERED,
message=f"queued PR feedback from {len(feedback.human_comments)} human comment(s)",
clear_lease=True,
)
queued += 1
return queued
class TaskRunner:
def __init__(
self,
@@ -96,6 +205,10 @@ class TaskRunner:
try:
repos = sync_repositories(self.db, self.config, self.gitea)
logger.info("synced %d repositories", len(repos))
closed = close_issues_for_merged_pull_requests(self.db, self.gitea)
logger.info("closed %d issues for merged pull requests", closed)
feedback_tasks = scan_pull_request_feedback(self.db, self.gitea)
logger.info("queued %d tasks from pull request feedback", feedback_tasks)
task_ids = scan_issues(self.db, self.config, self.gitea)
logger.info("created %d tasks from issue scan", len(task_ids))
task = self.run_once()
@@ -110,72 +223,9 @@ class TaskRunner:
def run_claimed(self, task: TaskRecord) -> TaskRecord:
try:
repo, issue = self._load_context(task)
validate_transition(task.state, TaskState.PLANNING)
task = self.db.transition(task.id, TaskState.PLANNING, message="rendering implementation prompt")
branch_name = safe_branch_name(issue)
workspace = self.workspace_manager.prepare(repo, issue, branch_name)
task = self.db.transition(
task.id,
TaskState.IMPLEMENTING,
message="running implementer agent",
branch_name=branch_name,
workspace_path=workspace,
)
implementation_report = self._run_implementer(task, repo, issue, branch_name, workspace)
task = self.db.transition(task.id, TaskState.TESTING, message="checking implementation diff")
if not self.workspace_manager.has_diff(workspace, f"origin/{repo.default_branch}"):
return self.db.transition(
task.id,
TaskState.BLOCKED,
message="implementer produced no diff",
error_message="implementer produced no diff",
clear_lease=True,
)
commit_message = f"agent: implement issue #{issue.issue_number} - {issue.title}"
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed implementation {commit_id}")
self.workspace_manager.push_branch(workspace, branch_name)
pr_body = render_pr_body(issue, implementation_report)
pr = self.gitea.create_pull_request(
owner=repo.owner,
name=repo.name,
title=f"代理实现:{issue.title}",
body=pr_body,
head=branch_name,
base=repo.default_branch,
)
task = self.db.transition(
task.id,
TaskState.PR_OPENED,
message=f"opened PR #{pr.number}",
pr_number=pr.number,
)
validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent")
review_report_raw = self._run_reviewer(task, repo, issue, pr.number, workspace)
review = parse_review_report(review_report_raw)
self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=pr.number,
body=review_report_raw.strip() or "Reviewer did not produce a report.",
)
self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=pr.number,
body=render_human_review_summary(review),
)
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message=f"human review summary posted with verdict {review.verdict}",
clear_lease=True,
)
if self.config.workspace.cleanup_on_success and final_task.workspace_path:
self.workspace_manager.cleanup(final_task.workspace_path)
return final_task
if self.db.has_pending_pr_feedback(task.id):
return self._run_pr_feedback(task)
return self._run_initial_implementation(task)
except Exception as exc:
return self.db.transition(
task.id,
@@ -185,6 +235,179 @@ class TaskRunner:
clear_lease=True,
)
def _run_initial_implementation(self, task: TaskRecord) -> TaskRecord:
repo, issue = self._load_context(task)
validate_transition(task.state, TaskState.PLANNING)
task = self.db.transition(task.id, TaskState.PLANNING, message="rendering implementation prompt")
branch_name = safe_branch_name(issue)
workspace = self.workspace_manager.prepare(repo, issue, branch_name)
task = self.db.transition(
task.id,
TaskState.IMPLEMENTING,
message="running implementer agent",
branch_name=branch_name,
workspace_path=workspace,
)
implementation_report = self._run_implementer(task, repo, issue, branch_name, workspace)
task = self.db.transition(task.id, TaskState.TESTING, message="checking implementation diff")
if not self.workspace_manager.has_diff(workspace, f"origin/{repo.default_branch}"):
return self.db.transition(
task.id,
TaskState.BLOCKED,
message="implementer produced no diff",
error_message="implementer produced no diff",
clear_lease=True,
)
commit_message = f"agent: implement issue #{issue.issue_number} - {issue.title}"
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed implementation {commit_id}")
self.workspace_manager.push_branch(workspace, branch_name)
pr_body = render_pr_body(issue, implementation_report)
pr = self.gitea.create_pull_request(
owner=repo.owner,
name=repo.name,
title=f"代理实现:{issue.title}",
body=pr_body,
head=branch_name,
base=repo.default_branch,
)
task = self.db.transition(
task.id,
TaskState.PR_OPENED,
message=f"opened PR #{pr.number}",
pr_number=pr.number,
)
validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent")
review_report_raw = self._run_reviewer(task, repo, issue, pr.number, workspace)
review = parse_review_report(review_report_raw)
reviewer_comment = self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=pr.number,
body=review_report_raw.strip() or "Reviewer did not produce a report.",
)
summary_comment = self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=pr.number,
body=render_human_review_summary(review),
)
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=max(reviewer_comment.id, summary_comment.id),
)
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message=f"human review summary posted with verdict {review.verdict}",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
def _run_pr_feedback(self, task: TaskRecord) -> TaskRecord:
repo, issue = self._load_context(task)
if task.pr_number is None:
raise ValueError(f"task {task.id} has pending PR feedback without a PR number")
if not task.branch_name:
raise ValueError(f"task {task.id} has pending PR feedback without a branch")
validate_transition(task.state, TaskState.PLANNING)
task = self.db.transition(task.id, TaskState.PLANNING, message="rendering PR feedback prompt")
workspace = self.workspace_manager.prepare_existing_branch(repo, issue, task.branch_name, task.workspace_path)
task = self.db.transition(
task.id,
TaskState.IMPLEMENTING,
message=f"running implementer agent for PR #{task.pr_number} feedback",
workspace_path=workspace,
)
cursors = self.db.get_pr_feedback_cursors(task.id)
username = self.gitea.get_authenticated_username()
feedback = load_new_human_pr_feedback(self.gitea, repo, task.pr_number, username, cursors)
if not feedback.human_comments:
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=feedback.newest_cursor.last_seen_comment_id,
last_seen_review_id=feedback.newest_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.newest_cursor.last_seen_review_comment_id,
)
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message="PR feedback queue was stale; no new human comments found",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
implementation_report = self._run_pr_feedback_implementer(
task,
repo,
issue,
task.branch_name,
task.pr_number,
feedback.human_comments,
workspace,
)
task = self.db.transition(task.id, TaskState.TESTING, message="checking PR feedback diff")
has_code_changes = self.workspace_manager.has_uncommitted_changes(workspace)
if has_code_changes:
commit_message = f"agent: address PR #{task.pr_number} feedback for issue #{issue.issue_number}"
commit_id = self.workspace_manager.commit_changes(workspace, commit_message)
self.db.add_event(task.id, TaskState.TESTING, TaskState.TESTING, f"committed PR feedback update {commit_id}")
self.workspace_manager.push_branch(workspace, task.branch_name)
self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=task.pr_number,
body=f"## 代理已处理 PR 反馈\n\n{implementation_report.strip()}",
)
self.db.clear_pr_feedback_pending(
task.id,
last_seen_comment_id=feedback.handled_cursor.last_seen_comment_id,
last_seen_review_id=feedback.handled_cursor.last_seen_review_id,
last_seen_review_comment_id=feedback.handled_cursor.last_seen_review_comment_id,
)
if not has_code_changes:
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message="PR feedback handled without code changes",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
task = self.db.transition(task.id, TaskState.PR_OPENED, message=f"updated PR #{task.pr_number}")
validate_transition(task.state, TaskState.REVIEWING)
task = self.db.transition(task.id, TaskState.REVIEWING, message="running reviewer agent after PR feedback")
review_report_raw = self._run_reviewer(task, repo, issue, task.pr_number, workspace)
review = parse_review_report(review_report_raw)
reviewer_comment = self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=task.pr_number,
body=review_report_raw.strip() or "Reviewer did not produce a report.",
)
summary_comment = self.gitea.post_issue_comment(
owner=repo.owner,
name=repo.name,
issue_number=task.pr_number,
body=render_human_review_summary(review),
)
self.db.add_event(
task.id,
TaskState.REVIEWING,
TaskState.REVIEWING,
f"posted reviewer comments {reviewer_comment.id} and {summary_comment.id}",
)
final_task = self.db.transition(
task.id,
TaskState.HUMAN_REVIEW_READY,
message=f"PR feedback handled; human review summary posted with verdict {review.verdict}",
clear_lease=True,
)
self._cleanup_workspace_on_success(final_task, workspace)
return final_task
def _run_implementer(
self,
task: TaskRecord,
@@ -206,7 +429,7 @@ class TaskRunner:
issue_title=issue.title,
branch_name=branch_name,
)
result = self.command_runner.run(command, workspace, stdin=prompt)
result = self._run_agent_command(task, command, workspace, prompt)
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
self.db.add_agent_run(
task_id=task.id,
@@ -222,6 +445,46 @@ class TaskRunner:
raise RuntimeError(f"implementer failed with exit code {result.exit_code}")
return report or "(implementer did not produce AGENT_IMPLEMENTATION_REPORT.md)"
def _run_pr_feedback_implementer(
self,
task: TaskRecord,
repo: RepositoryRecord,
issue: IssueRecord,
branch_name: str,
pr_number: int,
comments: list[GiteaComment],
workspace: Path,
) -> str:
prompt = render_pr_feedback_prompt(repo, issue, pr_number, branch_name, comments)
output_dir = workspace / ".agent-output"
output_dir.mkdir(exist_ok=True)
prompt_path = output_dir / "AGENT_PR_FEEDBACK_PROMPT.md"
write_prompt(prompt_path, prompt)
command = render_command(
self.config.agents.implementer.command,
workspace_path=workspace.resolve(),
prompt_path=prompt_path.resolve(),
issue_number=issue.issue_number,
issue_title=issue.title,
pr_number=pr_number,
branch_name=branch_name,
)
result = self._run_agent_command(task, command, workspace, prompt)
report = read_report(output_dir / "AGENT_IMPLEMENTATION_REPORT.md")
self.db.add_agent_run(
task_id=task.id,
role="implementer_pr_feedback",
command=command,
prompt=prompt,
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.exit_code,
report=report,
)
if not result.ok:
raise RuntimeError(f"implementer failed with exit code {result.exit_code}")
return report or "(implementer did not produce AGENT_IMPLEMENTATION_REPORT.md)"
def _run_reviewer(
self,
task: TaskRecord,
@@ -243,7 +506,7 @@ class TaskRunner:
issue_title=issue.title,
pr_number=pr_number,
)
result = self.command_runner.run(command, workspace, stdin=prompt)
result = self._run_agent_command(task, command, workspace, prompt)
report = read_report(output_dir / "AGENT_REVIEW_REPORT.md")
self.db.add_agent_run(
task_id=task.id,
@@ -259,12 +522,111 @@ class TaskRunner:
raise RuntimeError(f"reviewer failed with exit code {result.exit_code}")
return report
def _run_agent_command(self, task: TaskRecord, command: list[str], workspace: Path, prompt: str) -> AgentResult:
with TaskLeaseRenewer(
db_path=self.db.path,
task_id=task.id,
worker_id=self.worker_id,
lease_seconds=self.config.scheduler.lease_seconds,
interval_seconds=self.config.scheduler.effective_lease_renewal_interval_seconds,
):
return self.command_runner.run(
command,
workspace,
stdin=prompt,
timeout_seconds=self.config.scheduler.agent_timeout_seconds,
)
def _load_context(self, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
repo_row = self.db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone()
if repo_row is None:
raise ValueError(f"repository not found for task {task.id}")
repo = self.db._repo(repo_row)
issue = self.db.get_issue(task.repo_id, task.issue_number)
if issue is None:
raise ValueError(f"issue not found for task {task.id}")
return repo, issue
return load_task_context(self.db, task)
def _cleanup_workspace_on_success(self, task: TaskRecord, workspace: Path) -> None:
if not self.config.workspace.cleanup_on_success:
return
try:
self.workspace_manager.cleanup(workspace)
except Exception as exc: # pragma: no cover - defensive best-effort cleanup
logger.warning("failed to cleanup workspace %s for task %d: %s", workspace, task.id, exc)
self.db.add_event(task.id, task.state, task.state, f"workspace cleanup failed: {exc}")
else:
self.db.add_event(task.id, task.state, task.state, f"cleaned workspace {workspace}")
def load_task_context(db: Database, task: TaskRecord) -> tuple[RepositoryRecord, IssueRecord]:
repo_row = db.conn.execute("SELECT * FROM repositories WHERE id = ?", (task.repo_id,)).fetchone()
if repo_row is None:
raise ValueError(f"repository not found for task {task.id}")
repo = db._repo(repo_row)
issue = db.get_issue(task.repo_id, task.issue_number)
if issue is None:
raise ValueError(f"issue not found for task {task.id}")
return repo, issue
def load_new_human_pr_feedback(
client: GiteaClient,
repo: RepositoryRecord,
pr_number: int,
username: str,
cursors: PullRequestFeedbackCursor,
) -> PullRequestFeedbackSnapshot:
issue_comments = client.list_issue_comments(owner=repo.owner, name=repo.name, issue_number=pr_number)
reviews = client.list_pull_request_reviews(owner=repo.owner, name=repo.name, pr_number=pr_number)
review_comments: list[GiteaComment] = []
for review in reviews:
review_comments.extend(
client.list_pull_request_review_comments(
owner=repo.owner,
name=repo.name,
pr_number=pr_number,
review_id=review.id,
)
)
bot = username.casefold()
human_issue_comments = [
comment
for comment in issue_comments
if comment.id > cursors.last_seen_comment_id and comment.author.casefold() != bot
]
human_reviews = [
review_as_comment(review)
for review in reviews
if review.id > cursors.last_seen_review_id and review.author.casefold() != bot and review.body.strip()
]
human_review_comments = [
comment
for comment in review_comments
if comment.id > cursors.last_seen_review_comment_id and comment.author.casefold() != bot
]
human_comments = [*human_issue_comments, *human_reviews, *human_review_comments]
handled_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in human_issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews if review.id > cursors.last_seen_review_id), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in human_review_comments),
default=cursors.last_seen_review_comment_id,
),
)
newest_cursor = PullRequestFeedbackCursor(
last_seen_comment_id=max((comment.id for comment in issue_comments), default=cursors.last_seen_comment_id),
last_seen_review_id=max((review.id for review in reviews), default=cursors.last_seen_review_id),
last_seen_review_comment_id=max(
(comment.id for comment in review_comments),
default=cursors.last_seen_review_comment_id,
),
)
return PullRequestFeedbackSnapshot(human_comments, handled_cursor, newest_cursor)
def review_as_comment(review: GiteaPullReview) -> GiteaComment:
body = f"Pull request review ({review.state or 'COMMENT'})\n\n{review.body}"
return GiteaComment(
id=review.id,
body=body,
author=review.author,
html_url=review.html_url,
created_at=review.submitted_at,
updated_at=review.updated_at,
)

View File

@@ -8,7 +8,13 @@ ALLOWED_TRANSITIONS: dict[TaskState, set[TaskState]] = {
TaskState.CLAIMED: {TaskState.PLANNING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.PLANNING: {TaskState.IMPLEMENTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.IMPLEMENTING: {TaskState.TESTING, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.TESTING: {TaskState.PR_OPENED, TaskState.FAILED, TaskState.BLOCKED, TaskState.CANCELLED},
TaskState.TESTING: {
TaskState.PR_OPENED,
TaskState.HUMAN_REVIEW_READY,
TaskState.FAILED,
TaskState.BLOCKED,
TaskState.CANCELLED,
},
TaskState.PR_OPENED: {TaskState.REVIEWING, TaskState.FAILED, TaskState.CANCELLED},
TaskState.REVIEWING: {TaskState.HUMAN_REVIEW_READY, TaskState.FAILED, TaskState.CANCELLED},
TaskState.HUMAN_REVIEW_READY: {TaskState.DISCOVERED},

View File

@@ -35,11 +35,33 @@ class WorkspaceManager:
self.exclude_runtime_artifacts(path)
return path
def prepare_existing_branch(
self,
repo: RepositoryRecord,
issue: IssueRecord,
branch_name: str,
workspace_path: str | Path | None,
) -> Path:
path = Path(workspace_path) if workspace_path else self.task_workspace(repo, issue)
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
self._git(["clone", repo.clone_url, str(path)], Path.cwd())
self.exclude_runtime_artifacts(path)
self._git(["fetch", "origin"], path)
checkout = self._git(["checkout", branch_name], path, check=False)
if checkout.returncode != 0:
self._git(["checkout", "-B", branch_name, f"origin/{branch_name}"], path)
pull = self._git(["pull", "--ff-only", "origin", branch_name], path, check=False)
if pull.returncode != 0:
raise RuntimeError(f"git pull --ff-only origin {branch_name} failed: {pull.stderr.strip()}")
return path
def exclude_runtime_artifacts(self, workspace: str | Path) -> None:
exclude_path = Path(workspace) / ".git" / "info" / "exclude"
entries = {
".agent-output/",
"AGENT_IMPLEMENTER_PROMPT.md",
"AGENT_PR_FEEDBACK_PROMPT.md",
"AGENT_REVIEWER_PROMPT.md",
}
existing = exclude_path.read_text(encoding="utf-8") if exclude_path.exists() else ""
@@ -55,6 +77,10 @@ class WorkspaceManager:
diff = self._git(["diff", "--quiet", f"{base_ref}...HEAD"], Path(workspace), check=False)
return diff.returncode == 1
def has_uncommitted_changes(self, workspace: str | Path) -> bool:
result = self._git(["status", "--porcelain"], Path(workspace), check=False)
return bool(result.stdout.strip())
def commit_changes(self, workspace: str | Path, message: str) -> str:
workspace_path = Path(workspace)
self._git(["add", "-A"], workspace_path)

View File

@@ -1,14 +1,25 @@
from __future__ import annotations
import json
import sys
import threading
import time
from pathlib import Path
import httpx
from agent_gitea.agents import CommandRunner
from agent_gitea.config import AppConfig
from agent_gitea.db import Database
from agent_gitea.gitea import GiteaClient
from agent_gitea.models import AgentResult, TaskState
from agent_gitea.service import TaskRunner, scan_issues, sync_repositories
from agent_gitea.service import (
TaskRunner,
close_issues_for_merged_pull_requests,
scan_issues,
scan_pull_request_feedback,
sync_repositories,
)
def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
@@ -167,15 +178,26 @@ class FakeWorkspaceManager:
self.root = root
self.diff = diff
self.pushed: list[str] = []
self.resumed: list[tuple[str, Path | None]] = []
self.cleaned: list[Path] = []
def prepare(self, repo, issue, branch_name):
path = self.root / branch_name.replace("/", "_")
path.mkdir(parents=True)
path.mkdir(parents=True, exist_ok=True)
return path
def prepare_existing_branch(self, repo, issue, branch_name, workspace_path):
self.resumed.append((branch_name, workspace_path))
path = Path(workspace_path) if workspace_path else self.root / branch_name.replace("/", "_")
path.mkdir(parents=True, exist_ok=True)
return path
def has_diff(self, workspace, base_ref="origin/HEAD"):
return self.diff
def has_uncommitted_changes(self, workspace):
return self.diff
def push_branch(self, workspace, branch_name):
self.pushed.append(branch_name)
@@ -184,14 +206,14 @@ class FakeWorkspaceManager:
return "abc1234"
def cleanup(self, workspace):
pass
self.cleaned.append(Path(workspace))
class FakeRunner:
def __init__(self, *, fail_role: str | None = None):
self.fail_role = fail_role
def run(self, command, cwd, *, stdin=None):
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
role = command[0]
assert stdin
if role == self.fail_role:
@@ -233,6 +255,136 @@ def seed_task(db):
return db.create_task(repo.id, 1)
def test_claim_next_task_allows_only_one_worker_during_race(db, tmp_path):
seed_task(db)
db.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
db.conn.execute(
"""
CREATE TRIGGER slow_claim_update
BEFORE UPDATE OF state ON tasks
WHEN NEW.state = 'CLAIMED'
BEGIN
SELECT sleep_ms(150);
END
"""
)
db.conn.commit()
db.close()
barrier = threading.Barrier(2)
results: dict[str, int | None] = {}
errors: list[BaseException] = []
def claim(worker_id: str) -> None:
database = Database(tmp_path / "state.sqlite3")
database.conn.create_function("sleep_ms", 1, lambda ms: time.sleep(ms / 1000))
try:
barrier.wait()
task = database.claim_next_task(worker_id, 60)
results[worker_id] = task.id if task else None
except BaseException as exc: # pragma: no cover - assertion below reports thread failures
errors.append(exc)
finally:
database.close()
threads = [threading.Thread(target=claim, args=(worker_id,)) for worker_id in ("worker-a", "worker-b")]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert errors == []
assert list(results.values()).count(1) == 1
assert list(results.values()).count(None) == 1
def test_command_runner_returns_timeout_result(tmp_path):
result = CommandRunner().run(
[sys.executable, "-c", "import time; time.sleep(5)"],
tmp_path,
timeout_seconds=0.1,
)
assert result.exit_code == 124
assert "timed out" in result.stderr
class ObservingSlowRunner:
def __init__(self, db, task_id: int):
self.db = db
self.task_id = task_id
self.lease_before = None
self.lease_during = None
def run(self, command, cwd, *, stdin=None, timeout_seconds=None):
role = command[0]
if role == "implementer":
self.lease_before = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
time.sleep(1.2)
self.lease_during = self.db.get_task(self.task_id).lease_expires_at # type: ignore[union-attr]
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_IMPLEMENTATION_REPORT.md").write_text(
"## Summary\nImplemented\n\n## Test commands run\npytest\n",
encoding="utf-8",
)
if role == "reviewer":
output_dir = Path(cwd) / ".agent-output"
output_dir.mkdir(exist_ok=True)
(output_dir / "AGENT_REVIEW_REPORT.md").write_text(
"## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n",
encoding="utf-8",
)
return AgentResult(exit_code=0, stdout="ok", stderr="")
def test_task_runner_renews_lease_while_agent_runs(db, tmp_path):
config = make_config(
tmp_path,
scheduler={
"interval_seconds": 1,
"concurrency": 1,
"lease_seconds": 60,
"lease_renewal_interval_seconds": 1,
},
)
task = seed_task(db)
runner = ObservingSlowRunner(db, task.id)
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/repos/acme/service/pulls":
return httpx.Response(201, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(201, json={"id": 1, "body": payload.get("body", ""), "user": {"login": "agent-bot"}})
return httpx.Response(404)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=FakeWorkspaceManager(tmp_path / "work"),
command_runner=runner,
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert runner.lease_before is not None
assert runner.lease_during is not None
assert runner.lease_during > runner.lease_before
def transition_to_human_review_ready(db, task_id: int, *, pr_number: int = 5, branch_name: str | None = None):
db.transition(task_id, TaskState.CLAIMED)
db.transition(task_id, TaskState.PLANNING)
db.transition(task_id, TaskState.IMPLEMENTING, branch_name=branch_name)
db.transition(task_id, TaskState.TESTING)
db.transition(task_id, TaskState.PR_OPENED, pr_number=pr_number)
db.transition(task_id, TaskState.REVIEWING)
return db.transition(task_id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def test_run_task_success_posts_review_comments(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)
@@ -265,12 +417,420 @@ def test_run_task_success_posts_review_comments(db, tmp_path):
pull_requests = [payload for _, path, payload in requests if path == "/api/v1/repos/acme/service/pulls"]
assert pull_requests[0]["title"] == "代理实现Ready issue"
assert "代理实现报告" in pull_requests[0]["body"]
assert "Closes #1" in pull_requests[0]["body"]
command = json.loads(db.list_agent_runs(task.id)[0]["command_json"])
assert command[1] == "--cd"
assert Path(command[2]).is_absolute()
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
def test_scan_pull_request_feedback_queues_task_for_new_human_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "internal note", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
task_after_scan = db.get_task(task.id)
assert queued == 1
assert task_after_scan is not None
assert task_after_scan.state == TaskState.DISCOVERED
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 2
def test_scan_pull_request_feedback_advances_cursor_without_human_comments(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[{"id": 3, "body": "summary", "user": {"login": "agent-bot"}}])
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
assert queued == 0
assert not db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
assert db.get_task(task.id).state == TaskState.HUMAN_REVIEW_READY # type: ignore[union-attr]
def test_scan_pull_request_feedback_queues_task_for_inline_review_comment(db):
task = seed_task(db)
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(200, json=[])
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews":
return httpx.Response(
200,
json=[
{
"id": 9,
"body": "",
"state": "REQUEST_CHANGES",
"user": {"login": "alice"},
}
],
)
if request.url.path == "/api/v1/repos/acme/service/pulls/5/reviews/9/comments":
return httpx.Response(
200,
json=[
{
"id": 31,
"body": "This branch misses the cleanup case.",
"path": "src/service.py",
"position": 42,
"user": {"login": "alice"},
}
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(handler))
cursors = db.get_pr_feedback_cursors(task.id)
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert cursors.last_seen_comment_id == 2
assert cursors.last_seen_review_id == 0
assert cursors.last_seen_review_comment_id == 0
def test_run_task_with_pending_pr_feedback_updates_existing_pr(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
next_comment_id = 4
def handler(request: httpx.Request) -> httpx.Response:
nonlocal next_comment_id
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
next_comment_id += 1
return httpx.Response(201, json={"id": next_comment_id, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert finished.pr_number == 5
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.pushed == ["agent/issue-1-ready-issue"]
assert workspace_manager.commit_message == "agent: address PR #5 feedback for issue #1"
assert not any(path == "/api/v1/repos/acme/service/pulls" for _, path, _ in requests)
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback", "reviewer"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def rescan_handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please add a regression test.", "user": {"login": "alice"}},
{"id": 4, "body": "Also cover the cleanup case.", "user": {"login": "alice"}},
{"id": 5, "body": "processed", "user": {"login": "agent-bot"}},
{"id": 6, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 7, "body": "summary", "user": {"login": "agent-bot"}},
],
)
return httpx.Response(404)
queued = scan_pull_request_feedback(db, make_client(rescan_handler))
assert queued == 1
assert db.has_pending_pr_feedback(task.id)
assert db.get_pr_feedback_cursor(task.id) == 3
def test_run_task_with_pending_pr_feedback_allows_no_code_changes(db, tmp_path):
config = make_config(tmp_path)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 1, "body": "review report", "user": {"login": "agent-bot"}},
{"id": 2, "body": "summary", "user": {"login": "agent-bot"}},
{"id": 3, "body": "Can you confirm why this is enough?", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work", diff=False)
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.pushed == []
assert not hasattr(workspace_manager, "commit_message")
assert [run["role"] for run in db.list_agent_runs(task.id)] == ["implementer_pr_feedback"]
assert db.get_pr_feedback_cursor(task.id) == 3
assert not db.has_pending_pr_feedback(task.id)
def test_success_cleanup_does_not_block_later_pr_feedback(db, tmp_path):
config = make_config(
tmp_path,
workspace={"root": tmp_path / "workspaces", "cleanup_on_success": True},
)
task = seed_task(db)
workspace_path = tmp_path / "work" / "existing"
transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.transition(task.id, TaskState.DISCOVERED, clear_lease=True)
db.clear_pr_feedback_pending(task.id, last_seen_comment_id=2)
db.mark_pr_feedback_pending(task.id)
db.conn.execute("UPDATE tasks SET workspace_path = ? WHERE id = ?", (str(workspace_path), task.id))
db.conn.commit()
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
if request.url.path == "/api/v1/user":
return httpx.Response(200, json={"login": "agent-bot"})
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "GET":
return httpx.Response(
200,
json=[
{"id": 3, "body": "Please address the review.", "user": {"login": "alice"}},
],
)
if request.url.path == "/api/v1/repos/acme/service/issues/5/comments" and request.method == "POST":
return httpx.Response(201, json={"id": 4, "body": payload["body"], "user": {"login": "agent-bot"}})
return httpx.Response(404)
workspace_manager = FakeWorkspaceManager(tmp_path / "work")
finished = TaskRunner(
db=db,
config=config,
gitea=make_client(handler),
workspace_manager=workspace_manager,
command_runner=FakeRunner(),
worker_id="worker",
).run_once()
assert finished is not None
assert finished.state == TaskState.HUMAN_REVIEW_READY
assert workspace_manager.resumed == [("agent/issue-1-ready-issue", workspace_path)]
assert workspace_manager.cleaned == [workspace_path]
def test_close_issues_for_merged_pull_requests_closes_linked_issue(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
db.transition(task.id, TaskState.CLAIMED)
db.transition(task.id, TaskState.PLANNING)
db.transition(task.id, TaskState.IMPLEMENTING)
db.transition(task.id, TaskState.TESTING)
db.transition(task.id, TaskState.PR_OPENED, pr_number=5)
db.transition(task.id, TaskState.REVIEWING)
db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
return httpx.Response(201, json={"id": 1})
if request.url.path == "/api/v1/repos/acme/service/issues/1":
return httpx.Response(200, json={"number": 1, "state": "closed"})
return httpx.Response(404)
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
assert closed == 1
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
def test_close_issues_for_merged_pull_requests_skips_unmerged_pr(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
db.transition(task.id, TaskState.CLAIMED)
db.transition(task.id, TaskState.PLANNING)
db.transition(task.id, TaskState.IMPLEMENTING)
db.transition(task.id, TaskState.TESTING)
db.transition(task.id, TaskState.PR_OPENED, pr_number=5)
db.transition(task.id, TaskState.REVIEWING)
db.transition(task.id, TaskState.HUMAN_REVIEW_READY, clear_lease=True)
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v1/repos/acme/service/pulls/5"
return httpx.Response(200, json={"number": 5, "state": "open", "merged": False})
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
assert closed == 0
assert db.get_issue(repo.id, 1).state == "open" # type: ignore[union-attr]
def test_close_issues_for_merged_pull_requests_handles_queued_feedback_task(db):
repo = db.upsert_repository(
owner="acme",
name="service",
clone_url="https://gitea.test/acme/service.git",
default_branch="main",
enabled=True,
)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Ready issue",
body="Body",
labels=["agent:ready"],
state="open",
html_url="https://gitea.test/acme/service/issues/1",
)
task = db.create_task(repo.id, 1)
task = transition_to_human_review_ready(db, task.id, pr_number=5, branch_name="agent/issue-1-ready-issue")
db.mark_pr_feedback_pending(task.id)
db.transition(
task.id,
TaskState.DISCOVERED,
message="queued PR feedback from 1 human comment(s)",
clear_lease=True,
)
requests: list[tuple[str, str, dict]] = []
def handler(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content.decode() or "{}")
requests.append((request.method, request.url.path, payload))
if request.url.path == "/api/v1/repos/acme/service/pulls/5":
return httpx.Response(200, json={"number": 5, "state": "closed", "merged": True})
if request.url.path == "/api/v1/repos/acme/service/issues/1/comments":
return httpx.Response(201, json={"id": 1})
if request.url.path == "/api/v1/repos/acme/service/issues/1":
return httpx.Response(200, json={"number": 1, "state": "closed"})
return httpx.Response(404)
closed = close_issues_for_merged_pull_requests(db, make_client(handler))
updated_task = db.get_task(task.id)
assert closed == 1
assert db.get_issue(repo.id, 1).state == "closed" # type: ignore[union-attr]
assert updated_task is not None
assert updated_task.state == TaskState.CANCELLED
assert not db.has_pending_pr_feedback(task.id)
assert ("PATCH", "/api/v1/repos/acme/service/issues/1", {"state": "closed"}) in requests
def test_run_task_no_diff_becomes_blocked(db, tmp_path):
config = make_config(tmp_path)
seed_task(db)

View File

@@ -40,6 +40,7 @@ def test_prompt_and_pr_body_include_contract_sections(db):
assert ".agent-output/AGENT_IMPLEMENTATION_REPORT.md" in prompt
assert "关联 Issue#7" in body
assert "Closes #7" in body
assert "人工审核" in body