"""Runner — exec recipe, timeout, bounded concurrency, orphan recovery. Strategy: bypass the workspace materialization step (no real git URLs in tests) by stubbing WorkspaceManager.materialize to return a path the test controls. Recipes are stub shell strings (`echo`, `sleep`, `false`). """ from __future__ import annotations import asyncio import json import time from pathlib import Path import pytest from crafting_table.db import DB from crafting_table.runner import Runner from crafting_table.workspace import WorkspaceManager, WorkspacePaths # ---------- helper: stub materialize() so we don't need real git ---------- class _StubWorkspace(WorkspaceManager): def __init__(self, root: Path): super().__init__(root) self._cleanups: list[Path] = [] async def materialize(self, *, project, job_id, git_url, branch, log_fh): # Echo what would happen, write to log, return a path inside root. worktree = self.root / project / job_id worktree.mkdir(parents=True, exist_ok=True) log_fh.write(f"[stub-workspace] would clone {git_url}@{branch} to {worktree}\n") log_fh.flush() return WorkspacePaths( project_root=self.root / project, cache_dir=self.root / project / ".cache", worktree_dir=worktree, ) async def cleanup(self, paths): self._cleanups.append(paths.worktree_dir) def _seed_project_and_job( db: DB, *, recipe_cmd: str, timeout_secs: int = 5, recipe_kind: str = "test" ) -> str: if not db.get_token("o"): db.insert_token(name="o", bearer="t-runner", is_admin=False, ip_cidrs=None) sub = { "path": ".", "language": "python", "build": None, "test": None, "lint": None, "audit": None, "timeout_secs": timeout_secs, } sub[recipe_kind] = recipe_cmd snapshot = { "git_url": "stub://localhost", "default_branch": "main", "subprojects": [sub], "languages": ["python"], } db.upsert_project( name="proj", git_url="stub://localhost", default_branch="main", recipe_json=json.dumps(snapshot), owner_token="o", ) job_id = f"job-{int(time.time()*1000)}-{recipe_kind}" log_path = Path(db.db_path).parent / "jobs" / f"{job_id}.log" log_path.parent.mkdir(parents=True, exist_ok=True) db.insert_job( job_id=job_id, project_name="proj", subproject_path=".", recipe=recipe_kind, branch="main", log_path=str(log_path), recipe_snapshot_json=json.dumps(snapshot), ) return job_id # ---------- tests --------------------------------------------------------- @pytest.mark.asyncio async def test_run_succeeds(tmp_path): db = DB(str(tmp_path / "ct.db")) ws = _StubWorkspace(tmp_path / "ws") runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) await runner.start() try: job_id = _seed_project_and_job(db, recipe_cmd="echo hello && exit 0", recipe_kind="test") await runner.enqueue(job_id) await _wait_terminal(db, job_id) j = db.get_job(job_id) assert j["status"] == "succeeded" assert j["exit_code"] == 0 finally: await runner.stop() @pytest.mark.asyncio async def test_run_fails(tmp_path): db = DB(str(tmp_path / "ct.db")) ws = _StubWorkspace(tmp_path / "ws") runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) await runner.start() try: job_id = _seed_project_and_job(db, recipe_cmd="exit 1", recipe_kind="test") await runner.enqueue(job_id) await _wait_terminal(db, job_id) j = db.get_job(job_id) assert j["status"] == "failed" assert j["exit_code"] == 1 finally: await runner.stop() @pytest.mark.asyncio async def test_run_times_out(tmp_path): db = DB(str(tmp_path / "ct.db")) ws = _StubWorkspace(tmp_path / "ws") runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) await runner.start() try: job_id = _seed_project_and_job( db, recipe_cmd="sleep 60", timeout_secs=1, recipe_kind="audit" ) await runner.enqueue(job_id) await _wait_terminal(db, job_id, deadline_s=15) j = db.get_job(job_id) assert j["status"] == "timed_out" log_text = Path(j["log_path"]).read_text() assert "timeout" in log_text finally: await runner.stop() @pytest.mark.asyncio async def test_bounded_concurrency(tmp_path): """Queue 5 jobs with max_concurrent=2, assert peak in-flight stays at 2.""" db = DB(str(tmp_path / "ct.db")) ws = _StubWorkspace(tmp_path / "ws") runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) await runner.start() try: ids = [] for _ in range(5): job_id = _seed_project_and_job( db, recipe_cmd="sleep 0.5 && echo ok", timeout_secs=10, recipe_kind="test" ) ids.append(job_id) for j in ids: await runner.enqueue(j) for j in ids: await _wait_terminal(db, j, deadline_s=30) assert runner.peak_in_flight <= 2 assert runner.peak_in_flight >= 1 # at least one concurrent run happened for j in ids: assert db.get_job(j)["status"] == "succeeded" finally: await runner.stop() @pytest.mark.asyncio async def test_orphaned_running_marked_failed_on_start(tmp_path): """A row left in 'running' from a previous boot should be flipped to failed.""" db = DB(str(tmp_path / "ct.db")) db.insert_token(name="o", bearer="t-orph", is_admin=False, ip_cidrs=None) db.upsert_project( name="proj", git_url="stub://x", default_branch="main", recipe_json="{}", owner_token="o", ) db.insert_job( job_id="orph-1", project_name="proj", subproject_path=".", recipe="test", branch="main", log_path=str(tmp_path / "orph.log"), recipe_snapshot_json="{}", ) db.mark_job_running("orph-1") assert db.get_job("orph-1")["status"] == "running" ws = _StubWorkspace(tmp_path / "ws") runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) await runner.start() try: # start() should have flipped orph-1 to failed j = db.get_job("orph-1") assert j["status"] == "failed" assert j["exit_code"] == -1 finally: await runner.stop() @pytest.mark.asyncio async def test_jobs_finished_hook_fires(tmp_path): db = DB(str(tmp_path / "ct.db")) ws = _StubWorkspace(tmp_path / "ws") runner = Runner(db=db, workspace=ws, log_dir=tmp_path / "jobs", max_concurrent=2) seen: list[dict] = [] async def cap(event): seen.append(event) runner.add_hook(cap) await runner.start() try: job_id = _seed_project_and_job(db, recipe_cmd="echo h && true", recipe_kind="test") await runner.enqueue(job_id) await _wait_terminal(db, job_id) # Hook fires AFTER mark_job_finished so we have to give the loop a tick. for _ in range(50): if seen: break await asyncio.sleep(0.05) assert len(seen) == 1 assert seen[0]["job_id"] == job_id assert seen[0]["status"] == "succeeded" finally: await runner.stop() # ---------- helpers ------------------------------------------------------- async def _wait_terminal(db: DB, job_id: str, *, deadline_s: float = 15.0) -> None: deadline = time.monotonic() + deadline_s while time.monotonic() < deadline: j = db.get_job(job_id) if j and j["status"] in ("succeeded", "failed", "timed_out", "cancelled"): return await asyncio.sleep(0.1) raise AssertionError(f"job {job_id} did not reach terminal state within {deadline_s}s")