fix: prevent repeated task creation after failures
This commit is contained in:
@@ -207,6 +207,17 @@ class Database:
|
|||||||
).fetchall()
|
).fetchall()
|
||||||
return self._task(rows[0]) if rows else None
|
return self._task(rows[0]) if rows else None
|
||||||
|
|
||||||
|
def task_for_issue(self, repo_id: int, issue_number: int) -> TaskRecord | None:
|
||||||
|
row = self.conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM tasks
|
||||||
|
WHERE repo_id = ? AND issue_number = ?
|
||||||
|
ORDER BY id DESC LIMIT 1
|
||||||
|
""",
|
||||||
|
(repo_id, issue_number),
|
||||||
|
).fetchone()
|
||||||
|
return self._task(row) if row else None
|
||||||
|
|
||||||
def create_task(self, repo_id: int, issue_number: int) -> TaskRecord:
|
def create_task(self, repo_id: int, issue_number: int) -> TaskRecord:
|
||||||
now = dt_to_db(utcnow())
|
now = dt_to_db(utcnow())
|
||||||
self.conn.execute(
|
self.conn.execute(
|
||||||
@@ -322,13 +333,23 @@ class Database:
|
|||||||
return updated
|
return updated
|
||||||
|
|
||||||
def retry_task(self, task_id: int) -> TaskRecord:
|
def retry_task(self, task_id: int) -> TaskRecord:
|
||||||
return self.transition(
|
task = self.get_task(task_id)
|
||||||
task_id,
|
if task is None:
|
||||||
TaskState.DISCOVERED,
|
raise ValueError(f"task not found: {task_id}")
|
||||||
message="manual retry",
|
now = dt_to_db(utcnow())
|
||||||
error_message="",
|
self.conn.execute(
|
||||||
clear_lease=True,
|
"""
|
||||||
|
UPDATE tasks
|
||||||
|
SET state = ?, lease_owner = NULL, lease_expires_at = NULL, error_message = NULL, updated_at = ?
|
||||||
|
WHERE id = ?
|
||||||
|
""",
|
||||||
|
(TaskState.DISCOVERED.value, now, task_id),
|
||||||
)
|
)
|
||||||
|
self.conn.commit()
|
||||||
|
self.add_event(task_id, task.state, TaskState.DISCOVERED, "manual retry")
|
||||||
|
updated = self.get_task(task_id)
|
||||||
|
assert updated is not None
|
||||||
|
return updated
|
||||||
|
|
||||||
def cancel_task(self, task_id: int) -> TaskRecord:
|
def cancel_task(self, task_id: int) -> TaskRecord:
|
||||||
return self.transition(task_id, TaskState.CANCELLED, message="manual cancellation", clear_lease=True)
|
return self.transition(task_id, TaskState.CANCELLED, message="manual cancellation", clear_lease=True)
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ def scan_eligible_issues(
|
|||||||
for issue in db.list_open_issues(repo.id):
|
for issue in db.list_open_issues(repo.id):
|
||||||
if not is_issue_eligible(issue, labels):
|
if not is_issue_eligible(issue, labels):
|
||||||
continue
|
continue
|
||||||
if db.active_task_for_issue(repo.id, issue.issue_number):
|
if db.task_for_issue(repo.id, issue.issue_number):
|
||||||
continue
|
continue
|
||||||
task = db.create_task(repo.id, issue.issue_number)
|
task = db.create_task(repo.id, issue.issue_number)
|
||||||
created.append(task.id)
|
created.append(task.id)
|
||||||
|
|||||||
@@ -195,8 +195,8 @@ class TaskRunner:
|
|||||||
write_prompt(prompt_path, prompt)
|
write_prompt(prompt_path, prompt)
|
||||||
command = render_command(
|
command = render_command(
|
||||||
self.config.agents.implementer.command,
|
self.config.agents.implementer.command,
|
||||||
workspace_path=workspace,
|
workspace_path=workspace.resolve(),
|
||||||
prompt_path=prompt_path,
|
prompt_path=prompt_path.resolve(),
|
||||||
issue_number=issue.issue_number,
|
issue_number=issue.issue_number,
|
||||||
issue_title=issue.title,
|
issue_title=issue.title,
|
||||||
branch_name=branch_name,
|
branch_name=branch_name,
|
||||||
@@ -230,8 +230,8 @@ class TaskRunner:
|
|||||||
write_prompt(prompt_path, prompt)
|
write_prompt(prompt_path, prompt)
|
||||||
command = render_command(
|
command = render_command(
|
||||||
self.config.agents.reviewer.command,
|
self.config.agents.reviewer.command,
|
||||||
workspace_path=workspace,
|
workspace_path=workspace.resolve(),
|
||||||
prompt_path=prompt_path,
|
prompt_path=prompt_path.resolve(),
|
||||||
issue_number=issue.issue_number,
|
issue_number=issue.issue_number,
|
||||||
issue_title=issue.title,
|
issue_title=issue.title,
|
||||||
pr_number=pr_number,
|
pr_number=pr_number,
|
||||||
|
|||||||
@@ -15,8 +15,8 @@ def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
|
|||||||
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
|
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
|
||||||
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
|
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
|
||||||
"agents": {
|
"agents": {
|
||||||
"implementer": {"command": ["implementer", "-"]},
|
"implementer": {"command": ["implementer", "--cd", "{workspace_path}", "-"]},
|
||||||
"reviewer": {"command": ["reviewer", "-"]},
|
"reviewer": {"command": ["reviewer", "--cd", "{workspace_path}", "-"]},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
data.update(overrides)
|
data.update(overrides)
|
||||||
|
|||||||
@@ -18,8 +18,8 @@ def make_config(tmp_path: Path, **overrides: object) -> AppConfig:
|
|||||||
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
|
"scheduler": {"interval_seconds": 1, "concurrency": 1, "lease_seconds": 60},
|
||||||
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
|
"workspace": {"root": tmp_path / "workspaces", "cleanup_on_success": False},
|
||||||
"agents": {
|
"agents": {
|
||||||
"implementer": {"command": ["implementer", "-"]},
|
"implementer": {"command": ["implementer", "--cd", "{workspace_path}", "-"]},
|
||||||
"reviewer": {"command": ["reviewer", "-"]},
|
"reviewer": {"command": ["reviewer", "--cd", "{workspace_path}", "-"]},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
data.update(overrides)
|
data.update(overrides)
|
||||||
@@ -208,6 +208,9 @@ def test_run_task_success_posts_review_comments(db, tmp_path):
|
|||||||
assert task is not None
|
assert task is not None
|
||||||
assert task.state == TaskState.HUMAN_REVIEW_READY
|
assert task.state == TaskState.HUMAN_REVIEW_READY
|
||||||
assert task.pr_number == 5
|
assert task.pr_number == 5
|
||||||
|
command = json.loads(db.list_agent_runs(task.id)[0]["command_json"])
|
||||||
|
assert command[1] == "--cd"
|
||||||
|
assert Path(command[2]).is_absolute()
|
||||||
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
|
assert [path for _, path, _ in requests].count("/api/v1/repos/acme/service/issues/5/comments") == 2
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ def test_scan_ignores_repositories_not_allowed_by_current_config(db):
|
|||||||
assert created == []
|
assert created == []
|
||||||
|
|
||||||
|
|
||||||
def test_retry_after_terminal_task_allows_new_scan_task(db):
|
def test_terminal_task_does_not_create_scan_loop(db):
|
||||||
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
|
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
|
||||||
db.upsert_issue(
|
db.upsert_issue(
|
||||||
repo_id=repo.id,
|
repo_id=repo.id,
|
||||||
@@ -88,8 +88,7 @@ def test_retry_after_terminal_task_allows_new_scan_task(db):
|
|||||||
|
|
||||||
created = scan_eligible_issues(db, LabelsConfig())
|
created = scan_eligible_issues(db, LabelsConfig())
|
||||||
|
|
||||||
assert len(created) == 1
|
assert created == []
|
||||||
assert created[0] != task_id
|
|
||||||
|
|
||||||
|
|
||||||
def test_claim_next_task_claims_expired_lease(db):
|
def test_claim_next_task_claims_expired_lease(db):
|
||||||
@@ -111,6 +110,21 @@ def test_claim_next_task_claims_expired_lease(db):
|
|||||||
assert reclaimed.lease_owner == "worker-b"
|
assert reclaimed.lease_owner == "worker-b"
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_task_clears_active_lease(db):
|
||||||
|
repo = db.upsert_repository(owner="acme", name="service", clone_url="x", default_branch="main", enabled=True)
|
||||||
|
task = db.create_task(repo.id, 1)
|
||||||
|
claimed = db.claim_next_task("worker-a", 60)
|
||||||
|
assert claimed is not None
|
||||||
|
db.transition(claimed.id, TaskState.PLANNING, error_message="stuck")
|
||||||
|
|
||||||
|
retried = db.retry_task(task.id)
|
||||||
|
|
||||||
|
assert retried.state == TaskState.DISCOVERED
|
||||||
|
assert retried.lease_owner is None
|
||||||
|
assert retried.lease_expires_at is None
|
||||||
|
assert retried.error_message is None
|
||||||
|
|
||||||
|
|
||||||
def test_state_transition_validation():
|
def test_state_transition_validation():
|
||||||
validate_transition(TaskState.DISCOVERED, TaskState.CLAIMED)
|
validate_transition(TaskState.DISCOVERED, TaskState.CLAIMED)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user