from __future__ import annotations from datetime import timedelta from agent_gitea.config import LabelsConfig from agent_gitea.models import TaskState, dt_to_db, utcnow from agent_gitea.scanner import is_issue_eligible, scan_eligible_issues from agent_gitea.state import validate_transition def test_issue_eligibility_filters_required_and_blocking_labels(db): repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) issue = db.upsert_issue( repo_id=repo.id, issue_number=1, title="Fix", body="", labels=["agent:ready"], state="open", html_url="url", ) assert is_issue_eligible(issue, LabelsConfig()) blocked = db.upsert_issue( repo_id=repo.id, issue_number=2, title="Risk", body="", labels=["agent:ready", "risk:high"], state="open", html_url="url", ) assert not is_issue_eligible(blocked, LabelsConfig()) def test_scan_creates_task_unless_active_task_exists(db): repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) db.upsert_issue( repo_id=repo.id, issue_number=1, title="Fix", body="", labels=["agent:ready"], state="open", html_url="url", ) first = scan_eligible_issues(db, LabelsConfig()) second = scan_eligible_issues(db, LabelsConfig()) assert len(first) == 1 assert second == [] def test_scan_ignores_repositories_not_allowed_by_current_config(db): repo = db.upsert_repository(owner="acme", name="old", clone_url="x", default_branch="main", enabled=True) db.upsert_issue( repo_id=repo.id, issue_number=1, title="Fix", body="", labels=["agent:ready"], state="open", html_url="url", ) created = scan_eligible_issues(db, LabelsConfig(), allowed_repositories={"acme/new"}) assert created == [] def test_terminal_task_does_not_create_scan_loop(db): repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) db.upsert_issue( repo_id=repo.id, issue_number=1, title="Fix", body="", labels=["agent:ready"], state="open", html_url="url", ) [task_id] = scan_eligible_issues(db, LabelsConfig()) db.transition(task_id, TaskState.CLAIMED) db.transition(task_id, TaskState.PLANNING) db.transition(task_id, TaskState.IMPLEMENTING) db.transition(task_id, TaskState.BLOCKED, clear_lease=True) created = scan_eligible_issues(db, LabelsConfig()) assert created == [] def test_claim_next_task_claims_expired_lease(db): repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) task = db.create_task(repo.id, 1) claimed = db.claim_next_task("worker-a", 60) assert claimed is not None assert claimed.state == TaskState.CLAIMED db.conn.execute( "UPDATE tasks SET state = ?, lease_expires_at = ? WHERE id = ?", (TaskState.IMPLEMENTING.value, dt_to_db(utcnow() - timedelta(seconds=10)), task.id), ) db.conn.commit() reclaimed = db.claim_next_task("worker-b", 60) assert reclaimed is not None assert reclaimed.id == task.id assert reclaimed.lease_owner == "worker-b" def test_retry_task_clears_active_lease(db): repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True) task = db.create_task(repo.id, 1) claimed = db.claim_next_task("worker-a", 60) assert claimed is not None db.transition(claimed.id, TaskState.PLANNING, error_message="stuck") retried = db.retry_task(task.id) assert retried.state == TaskState.DISCOVERED assert retried.lease_owner is None assert retried.lease_expires_at is None assert retried.error_message is None def test_state_transition_validation(): validate_transition(TaskState.DISCOVERED, TaskState.CLAIMED) try: validate_transition(TaskState.DISCOVERED, TaskState.PR_OPENED) except ValueError as exc: assert "DISCOVERED -> PR_OPENED" in str(exc) else: raise AssertionError("expected invalid transition")