From bd24a18c18c4eba7c060fd9ed8ddac441d6642cf Mon Sep 17 00:00:00 2001 From: Gahow Wang Date: Wed, 6 May 2026 15:24:56 +0800 Subject: [PATCH] test: cover manager behavior without live Gitea --- tests/conftest.py | 30 +++++ tests/test_config.py | 90 +++++++++++++ tests/test_gitea_service.py | 216 ++++++++++++++++++++++++++++++ tests/test_rendering_workspace.py | 60 +++++++++ tests/test_scanner_state.py | 122 +++++++++++++++++ 5 files changed, 518 insertions(+) create mode 100644 tests/conftest.py create mode 100644 tests/test_config.py create mode 100644 tests/test_gitea_service.py create mode 100644 tests/test_rendering_workspace.py create mode 100644 tests/test_scanner_state.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a8cf4b4 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +from agent_gitea.config import AppConfig +from agent_gitea.db import Database + + +def make_config(tmp_path: Path, **overrides: object) -> AppConfig: + data = { + "gitea": {"base_url": "https://gitea.test", "token_env": "GITEA_TOKEN"}, + "database_path": tmp_path / "state.sqlite3", + "scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60}, + "workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False}, + "agents": { + "implementer": {"command": ["implementer", "{prompt_path}"]}, + "reviewer": {"command": ["reviewer", "{prompt_path}"]}, + }, + } + data.update(overrides) + return AppConfig.model_validate(data) + + +@pytest.fixture +def db(tmp_path: Path) -> Database: + database = Database(tmp_path / "state.sqlite3") + database.migrate() + return database diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..6e2e80d --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import textwrap + +import pytest + +from agent_gitea.config import load_config + + +def test_load_config_validates_and_filters_enabled_repos(tmp_path, monkeypatch): + monkeypatch.setenv("GITEA_TOKEN", "secret") + config_path = tmp_path / "config.yaml" + config_path.write_text( + textwrap.dedent( + """ + gitea: + base_url: https://gitea.test + token_env: GITEA_TOKEN + agents: + implementer: + command: ["codex", "{prompt_path}"] + reviewer: + command: ["codex", "{prompt_path}"] + repositories: + - owner: acme + name: enabled + enabled: true + - owner: acme + name: disabled + enabled: false + """ + ), + encoding="utf-8", + ) + + config = load_config(config_path) + + assert config.gitea.token == "secret" + assert [repo.full_name for repo in config.enabled_repositories] == ["acme/enabled"] + + +def test_load_config_rejects_duplicate_repositories(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + textwrap.dedent( + """ + gitea: + base_url: https://gitea.test + agents: + implementer: + command: ["agent"] + reviewer: + command: ["agent"] + repositories: + - owner: acme + name: service + - owner: acme + name: service + """ + ), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="duplicate repository"): + load_config(config_path) + + +def test_load_config_reads_dotenv_next_to_config(tmp_path, monkeypatch): + monkeypatch.delenv("GITEA_TOKEN", raising=False) + (tmp_path / ".env").write_text("GITEA_TOKEN=from-dotenv\n", encoding="utf-8") + config_path = tmp_path / "config.yaml" + config_path.write_text( + textwrap.dedent( + """ + gitea: + base_url: https://gitea.test + agents: + implementer: + command: ["agent"] + reviewer: + command: ["agent"] + """ + ), + encoding="utf-8", + ) + + config = load_config(config_path) + + assert config.repositories == [] + assert config.gitea.token == "from-dotenv" diff --git a/tests/test_gitea_service.py b/tests/test_gitea_service.py new file mode 100644 index 0000000..68fff9a --- /dev/null +++ b/tests/test_gitea_service.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import httpx + +from agent_gitea.config import AppConfig +from agent_gitea.gitea import GiteaClient +from agent_gitea.models import AgentResult, TaskState +from agent_gitea.service import TaskRunner, scan_issues, sync_repositories + + +def make_config(tmp_path: Path, **overrides: object) -> AppConfig: + data = { + "gitea": {"base_url": "https://gitea.test", "token_env": "GITEA_TOKEN"}, + "database_path": tmp_path / "state.sqlite3", + "scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60}, + "workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False}, + "agents": { + "implementer": {"command": ["implementer", "{prompt_path}"]}, + "reviewer": {"command": ["reviewer", "{prompt_path}"]}, + }, + } + data.update(overrides) + return AppConfig.model_validate(data) + + +def make_client(handler): + return GiteaClient("https://gitea.test", "token", transport=httpx.MockTransport(handler)) + + +def test_sync_and_scan_with_mocked_gitea(db, tmp_path): + config = make_config(tmp_path) + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/api/v1/user": + return httpx.Response(200, json={"login": "acme"}) + if request.url.path == "/api/v1/user/repos": + return httpx.Response( + 200, + json=[ + { + "owner": {"login": "acme"}, + "name": "service", + "full_name": "acme/service", + "clone_url": "https://gitea.test/acme/service.git", + "default_branch": "main", + }, + { + "owner": {"login": "other"}, + "name": "shared", + "full_name": "other/shared", + "clone_url": "https://gitea.test/other/shared.git", + "default_branch": "main", + }, + ], + ) + if request.url.path == "/api/v1/repos/acme/service/issues": + return httpx.Response( + 200, + json=[ + { + "number": 1, + "title": "Ready issue", + "body": "Body", + "state": "open", + "html_url": "https://gitea.test/acme/service/issues/1", + "labels": [{"name": "agent:ready"}], + }, + { + "number": 2, + "title": "High risk", + "body": "Body", + "state": "open", + "labels": [{"name": "agent:ready"}, {"name": "risk:high"}], + }, + ], + ) + return httpx.Response(404) + + client = make_client(handler) + + repos = sync_repositories(db, config, client) + task_ids = scan_issues(db, config, client) + + assert repos[0].full_name == "acme/service" + assert db.get_repository("other/shared") is None + assert len(task_ids) == 1 + + +class FakeWorkspaceManager: + def __init__(self, root: Path, *, diff: bool = True): + self.root = root + self.diff = diff + self.pushed: list[str] = [] + + def prepare(self, repo, issue, branch_name): + path = self.root / branch_name.replace("/", "_") + path.mkdir(parents=True) + return path + + def has_diff(self, workspace, base_ref="origin/HEAD"): + return self.diff + + def push_branch(self, workspace, branch_name): + self.pushed.append(branch_name) + + def cleanup(self, workspace): + pass + + +class FakeRunner: + def __init__(self, *, fail_role: str | None = None): + self.fail_role = fail_role + + def run(self, command, cwd): + role = command[0] + if role == self.fail_role: + return AgentResult(exit_code=1, stdout="", stderr="failed") + if role == "implementer": + (Path(cwd) / "AGENT_IMPLEMENTATION_REPORT.md").write_text( + "## Summary\nImplemented\n\n## Test commands run\npytest\n", + encoding="utf-8", + ) + if role == "reviewer": + (Path(cwd) / "AGENT_REVIEW_REPORT.md").write_text( + "## Verdict\nAPPROVE\n\n## Suggested PR Comment\nLooks good.\n", + encoding="utf-8", + ) + return AgentResult(exit_code=0, stdout="ok", stderr="") + + +def seed_task(db): + repo = db.upsert_repository( + owner="acme", + name="service", + clone_url="https://gitea.test/acme/service.git", + default_branch="main", + enabled=True, + ) + db.upsert_issue( + repo_id=repo.id, + issue_number=1, + title="Ready issue", + body="Body", + labels=["agent:ready"], + state="open", + html_url="https://gitea.test/acme/service/issues/1", + ) + return db.create_task(repo.id, 1) + + +def test_run_task_success_posts_review_comments(db, tmp_path): + config = make_config(tmp_path) + seed_task(db) + requests: list[tuple[str, str, dict]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + payload = json.loads(request.content.decode() or "{}") + requests.append((request.method, request.url.path, payload)) + if request.url.path == "/api/v1/repos/acme/service/pulls": + return httpx.Response(201, json={"number": 5, "html_url": "pr-url"}) + if request.url.path == "/api/v1/repos/acme/service/issues/5/comments": + return httpx.Response(201, json={"id": 1}) + return httpx.Response(404) + + task = TaskRunner( + db=db, + config=config, + gitea=make_client(handler), + workspace_manager=FakeWorkspaceManager(tmp_path / "work"), + command_runner=FakeRunner(), + worker_id="worker", + ).run_once() + + assert task is not None + assert task.state == TaskState.HUMAN_REVIEW_READY + assert task.pr_number == 5 + assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2 + + +def test_run_task_no_diff_becomes_blocked(db, tmp_path): + config = make_config(tmp_path) + seed_task(db) + + task = TaskRunner( + db=db, + config=config, + gitea=make_client(lambda request: httpx.Response(500)), + workspace_manager=FakeWorkspaceManager(tmp_path / "work", diff=False), + command_runner=FakeRunner(), + worker_id="worker", + ).run_once() + + assert task is not None + assert task.state == TaskState.BLOCKED + assert task.error_message == "implementer produced no diff" + + +def test_run_task_agent_failure_becomes_failed(db, tmp_path): + config = make_config(tmp_path) + seed_task(db) + + task = TaskRunner( + db=db, + config=config, + gitea=make_client(lambda request: httpx.Response(500)), + workspace_manager=FakeWorkspaceManager(tmp_path / "work"), + command_runner=FakeRunner(fail_role="implementer"), + worker_id="worker", + ).run_once() + + assert task is not None + assert task.state == TaskState.FAILED + assert "implementer failed" in (task.error_message or "") diff --git a/tests/test_rendering_workspace.py b/tests/test_rendering_workspace.py new file mode 100644 index 0000000..d7830ef --- /dev/null +++ b/tests/test_rendering_workspace.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from agent_gitea.rendering import ( + parse_review_report, + render_implementer_prompt, + render_pr_body, +) +from agent_gitea.workspace import safe_branch_name + + +def test_branch_name_is_stable_and_safe(db): + repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) + issue = db.upsert_issue( + repo_id=repo.id, + issue_number=42, + title="Fix API: handle NULL / spaces!", + body="", + labels=["agent:ready"], + state="open", + html_url="url", + ) + + assert safe_branch_name(issue) == "agent/issue-42-fix-api-handle-null-spaces" + + +def test_prompt_and_pr_body_include_contract_sections(db): + repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) + issue = db.upsert_issue( + repo_id=repo.id, + issue_number=7, + title="Add thing", + body="Please add thing", + labels=["agent:ready"], + state="open", + html_url="url", + ) + + prompt = render_implementer_prompt(repo, issue, "agent/issue-7-add-thing") + body = render_pr_body(issue, "## Summary\nDone") + + assert "AGENT_IMPLEMENTATION_REPORT.md" in prompt + assert "Closes #7" in body + assert "Human Review Gate" in body + + +def test_review_report_parsing_extracts_verdict_and_comment(): + report = """## Verdict +REQUEST_CHANGES + +## Summary +Needs work + +## Suggested PR Comment +Please add tests. +""" + + parsed = parse_review_report(report) + + assert parsed.verdict == "REQUEST_CHANGES" + assert parsed.suggested_pr_comment == "Please add tests." diff --git a/tests/test_scanner_state.py b/tests/test_scanner_state.py new file mode 100644 index 0000000..e780ba0 --- /dev/null +++ b/tests/test_scanner_state.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +from datetime import timedelta + +from agent_gitea.config import LabelsConfig +from agent_gitea.models import TaskState, dt_to_db, utcnow +from agent_gitea.scanner import is_issue_eligible, scan_eligible_issues +from agent_gitea.state import validate_transition + + +def test_issue_eligibility_filters_required_and_blocking_labels(db): + repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) + issue = db.upsert_issue( + repo_id=repo.id, + issue_number=1, + title="Fix", + body="", + labels=["agent:ready"], + state="open", + html_url="url", + ) + assert is_issue_eligible(issue, LabelsConfig()) + + blocked = db.upsert_issue( + repo_id=repo.id, + issue_number=2, + title="Risk", + body="", + labels=["agent:ready", "risk:high"], + state="open", + html_url="url", + ) + assert not is_issue_eligible(blocked, LabelsConfig()) + + +def test_scan_creates_task_unless_active_task_exists(db): + repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) + db.upsert_issue( + repo_id=repo.id, + issue_number=1, + title="Fix", + body="", + labels=["agent:ready"], + state="open", + html_url="url", + ) + + first = scan_eligible_issues(db, LabelsConfig()) + second = scan_eligible_issues(db, LabelsConfig()) + + assert len(first) == 1 + assert second == [] + + +def test_scan_ignores_repositories_not_allowed_by_current_config(db): + repo = db.upsert_repository(owner="acme", name="old", clone_url="x", default_branch="main", enabled=True) + db.upsert_issue( + repo_id=repo.id, + issue_number=1, + title="Fix", + body="", + labels=["agent:ready"], + state="open", + html_url="url", + ) + + created = scan_eligible_issues(db, LabelsConfig(), allowed_repositories={"acme/new"}) + + assert created == [] + + +def test_retry_after_terminal_task_allows_new_scan_task(db): + repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) + db.upsert_issue( + repo_id=repo.id, + issue_number=1, + title="Fix", + body="", + labels=["agent:ready"], + state="open", + html_url="url", + ) + [task_id] = scan_eligible_issues(db, LabelsConfig()) + db.transition(task_id, TaskState.CLAIMED) + db.transition(task_id, TaskState.PLANNING) + db.transition(task_id, TaskState.IMPLEMENTING) + db.transition(task_id, TaskState.BLOCKED, clear_lease=True) + + created = scan_eligible_issues(db, LabelsConfig()) + + assert len(created) == 1 + assert created[0] != task_id + + +def test_claim_next_task_claims_expired_lease(db): + repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) + task = db.create_task(repo.id, 1) + claimed = db.claim_next_task("worker-a", 60) + assert claimed is not None + assert claimed.state == TaskState.CLAIMED + + db.conn.execute( + "UPDATE tasks SET state = ?, lease_expires_at = ? WHERE id = ?", + (TaskState.IMPLEMENTING.value, dt_to_db(utcnow() - timedelta(seconds=10)), task.id), + ) + db.conn.commit() + + reclaimed = db.claim_next_task("worker-b", 60) + assert reclaimed is not None + assert reclaimed.id == task.id + assert reclaimed.lease_owner == "worker-b" + + +def test_state_transition_validation(): + validate_transition(TaskState.DISCOVERED, TaskState.CLAIMED) + + try: + validate_transition(TaskState.DISCOVERED, TaskState.PR_OPENED) + except ValueError as exc: + assert "DISCOVERED -> PR_OPENED" in str(exc) + else: + raise AssertionError("expected invalid transition")