test: cover manager behavior without live Gitea

This commit is contained in:
2026-05-06 15:24:56 +08:00
parent ba2c9d9f88
commit bd24a18c18
5 changed files with 518 additions and 0 deletions

122
tests/test_scanner_state.py Normal file
View File

@@ -0,0 +1,122 @@
from __future__ import annotations
from datetime import timedelta
from agent_gitea.config import LabelsConfig
from agent_gitea.models import TaskState, dt_to_db, utcnow
from agent_gitea.scanner import is_issue_eligible, scan_eligible_issues
from agent_gitea.state import validate_transition
def test_issue_eligibility_filters_required_and_blocking_labels(db):
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
issue = db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Fix",
body="",
labels=["agent:ready"],
state="open",
html_url="url",
)
assert is_issue_eligible(issue, LabelsConfig())
blocked = db.upsert_issue(
repo_id=repo.id,
issue_number=2,
title="Risk",
body="",
labels=["agent:ready", "risk:high"],
state="open",
html_url="url",
)
assert not is_issue_eligible(blocked, LabelsConfig())
def test_scan_creates_task_unless_active_task_exists(db):
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Fix",
body="",
labels=["agent:ready"],
state="open",
html_url="url",
)
first = scan_eligible_issues(db, LabelsConfig())
second = scan_eligible_issues(db, LabelsConfig())
assert len(first) == 1
assert second == []
def test_scan_ignores_repositories_not_allowed_by_current_config(db):
repo = db.upsert_repository(owner="acme", name="old", clone_url="x", default_branch="main", enabled=True)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Fix",
body="",
labels=["agent:ready"],
state="open",
html_url="url",
)
created = scan_eligible_issues(db, LabelsConfig(), allowed_repositories={"acme/new"})
assert created == []
def test_retry_after_terminal_task_allows_new_scan_task(db):
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
db.upsert_issue(
repo_id=repo.id,
issue_number=1,
title="Fix",
body="",
labels=["agent:ready"],
state="open",
html_url="url",
)
[task_id] = scan_eligible_issues(db, LabelsConfig())
db.transition(task_id, TaskState.CLAIMED)
db.transition(task_id, TaskState.PLANNING)
db.transition(task_id, TaskState.IMPLEMENTING)
db.transition(task_id, TaskState.BLOCKED, clear_lease=True)
created = scan_eligible_issues(db, LabelsConfig())
assert len(created) == 1
assert created[0] != task_id
def test_claim_next_task_claims_expired_lease(db):
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
task = db.create_task(repo.id, 1)
claimed = db.claim_next_task("worker-a", 60)
assert claimed is not None
assert claimed.state == TaskState.CLAIMED
db.conn.execute(
"UPDATE tasks SET state = ?, lease_expires_at = ? WHERE id = ?",
(TaskState.IMPLEMENTING.value, dt_to_db(utcnow() - timedelta(seconds=10)), task.id),
)
db.conn.commit()
reclaimed = db.claim_next_task("worker-b", 60)
assert reclaimed is not None
assert reclaimed.id == task.id
assert reclaimed.lease_owner == "worker-b"
def test_state_transition_validation():
validate_transition(TaskState.DISCOVERED, TaskState.CLAIMED)
try:
validate_transition(TaskState.DISCOVERED, TaskState.PR_OPENED)
except ValueError as exc:
assert "DISCOVERED -> PR_OPENED" in str(exc)
else:
raise AssertionError("expected invalid transition")