"""Async job runner — bounded asyncio pool that materializes workspaces and runs recipe shell commands. Lifecycle: 1. server.lifespan calls `runner.start()`: - mark any 'running' jobs from a previous process as failed (orphaned) - kick off the dispatcher loop + workspace gc loop 2. POST /projects//jobs: - inserts a row in `jobs` (status=queued) - calls `runner.enqueue(job_id)` — fast, just puts the id on a queue 3. dispatcher pulls ids off the queue, acquires a semaphore slot, spawns `_run_job` — bounded by `max_concurrent`. 4. _run_job: a. mark running b. materialize workspace c. exec recipe via /bin/sh -c d. stream stdout+stderr to log file (live) e. enforce per-job timeout f. mark terminal status + exit code g. emit a `jobs_finished` event hook for wave-2 parsers / wave-8 digest 5. server.lifespan stop() drains in-flight tasks then closes. Concurrency: asyncio.Semaphore(max_concurrent) caps in-flight subprocess runs. The queue itself is unbounded — back-pressure is enforced by the semaphore + caller can poll job status to know when to enqueue more. Recipe security: shell strings are run via `create_subprocess_shell` (which uses /bin/sh -c). Admins set them; this is documented loud in README. """ from __future__ import annotations import asyncio import json import logging import os import signal import time from dataclasses import dataclass from pathlib import Path from typing import Awaitable, Callable from .db import DB from .parsers import Finding, find_parser, fingerprint from .workspace import WorkspaceManager, WorkspacePaths log = logging.getLogger("crafting_table.runner") # Hook signature: called after every job reaches a terminal state. # Wave 2 wires this to the parser pipeline; wave 8 to the email digest queue. JobFinishedHook = Callable[[dict], Awaitable[None]] @dataclass class _JobContext: job_id: str job: dict project: dict recipe: dict subproject: dict class Runner: def __init__( self, *, db: DB, workspace: WorkspaceManager, log_dir: Path, max_concurrent: int = 4, default_timeout_secs: int = 1800, gc_interval_secs: int = 3600, gc_age_secs: int = 86400, ): self.db = db self.workspace = workspace self.log_dir = Path(log_dir) self.log_dir.mkdir(parents=True, exist_ok=True) self.max_concurrent = max_concurrent self.default_timeout_secs = default_timeout_secs self.gc_interval_secs = gc_interval_secs self.gc_age_secs = gc_age_secs self.queue: asyncio.Queue[str] = asyncio.Queue() self.semaphore = asyncio.Semaphore(max_concurrent) self._tasks: set[asyncio.Task] = set() self._dispatcher_task: asyncio.Task | None = None self._gc_task: asyncio.Task | None = None self._stopping = False self._hooks: list[JobFinishedHook] = [] # Test introspection — lets test_runner assert on bounded-concurrency. self.in_flight = 0 self.peak_in_flight = 0 # ---------- lifecycle --------------------------------------------------- def add_hook(self, hook: JobFinishedHook) -> None: self._hooks.append(hook) async def start(self) -> None: # Recover orphaned 'running' jobs from a previous process — mark them # failed with exit_code=-1 and a synthetic log line. We do NOT try to # resume a job mid-execution; recipe state could be partial. orphaned = await self.db.arun( self.db.mark_orphaned_jobs_failed, log_dir=self.log_dir ) if orphaned: log.warning("marked %d orphaned running job(s) failed: %s", len(orphaned), orphaned) self._dispatcher_task = asyncio.create_task(self._dispatch_loop()) self._gc_task = asyncio.create_task(self._gc_loop()) async def stop(self) -> None: self._stopping = True if self._dispatcher_task is not None: self._dispatcher_task.cancel() try: await self._dispatcher_task except asyncio.CancelledError: pass if self._gc_task is not None: self._gc_task.cancel() try: await self._gc_task except asyncio.CancelledError: pass # Cancel any in-flight job tasks. Recipes will see SIGTERM via the # asyncio cancellation chain on the subprocess transport. for t in list(self._tasks): t.cancel() for t in list(self._tasks): try: await t except (asyncio.CancelledError, Exception): pass # ---------- enqueue ----------------------------------------------------- async def enqueue(self, job_id: str) -> None: await self.queue.put(job_id) def stats(self) -> dict: return { "queued": self.queue.qsize(), "running": self.in_flight, "max": self.max_concurrent, "peak": self.peak_in_flight, } # ---------- dispatcher -------------------------------------------------- async def _dispatch_loop(self) -> None: try: while not self._stopping: job_id = await self.queue.get() # Acquire BEFORE spawning the task so we naturally block when # the pool is full instead of building up an unbounded set of # tasks that all immediately await the semaphore. await self.semaphore.acquire() if self._stopping: self.semaphore.release() break t = asyncio.create_task(self._wrap_run(job_id)) self._tasks.add(t) t.add_done_callback(self._tasks.discard) except asyncio.CancelledError: raise async def _wrap_run(self, job_id: str) -> None: self.in_flight += 1 if self.in_flight > self.peak_in_flight: self.peak_in_flight = self.in_flight try: await self._run_job(job_id) except Exception as e: log.exception("runner: unhandled error for job %s: %s", job_id, e) finally: self.in_flight -= 1 self.semaphore.release() # ---------- gc loop ----------------------------------------------------- async def _gc_loop(self) -> None: try: while not self._stopping: await asyncio.sleep(self.gc_interval_secs) try: res = await self.workspace.gc(age_secs=self.gc_age_secs) if res["removed"]: log.info("workspace gc: %s", res) except Exception as e: log.warning("workspace gc failed: %s", e) except asyncio.CancelledError: raise # ---------- core -------------------------------------------------------- async def _run_job(self, job_id: str) -> None: ctx = await self._load_context(job_id) if ctx is None: return await self.db.arun(self.db.mark_job_running, job_id) log_path = Path(ctx.job["log_path"]) log_path.parent.mkdir(parents=True, exist_ok=True) recipe_kind = ctx.job["recipe"] cmd_str = ctx.subproject.get(recipe_kind) timeout = int(ctx.subproject.get("timeout_secs") or self.default_timeout_secs) terminal_status = "succeeded" exit_code: int | None = None with log_path.open("w", encoding="utf-8") as log_fh: log_fh.write(f"[crafting-table] job {job_id}\n") log_fh.write(f"[crafting-table] project={ctx.job['project_name']} subproject={ctx.job['subproject_path']}\n") log_fh.write(f"[crafting-table] recipe={recipe_kind} branch={ctx.job['branch']}\n") log_fh.write(f"[crafting-table] cmd={cmd_str!r} timeout={timeout}s\n") log_fh.flush() if not cmd_str: log_fh.write(f"[crafting-table] subproject has no '{recipe_kind}' command\n") terminal_status = "failed" exit_code = -2 else: try: paths = await self.workspace.materialize( project=ctx.job["project_name"], job_id=job_id, git_url=ctx.project["git_url"], branch=ctx.job["branch"], log_fh=log_fh, ) except Exception as e: log_fh.write(f"[crafting-table] workspace error: {e}\n") terminal_status = "failed" exit_code = -3 else: sub_path = ctx.subproject.get("path", ".") work_dir = paths.worktree_dir / sub_path log_fh.write(f"[crafting-table] cwd={work_dir}\n") log_fh.write("[crafting-table] --- recipe output begin ---\n") log_fh.flush() try: exit_code, timed_out = await self._exec_recipe( cmd=cmd_str, cwd=str(work_dir), log_fh=log_fh, timeout=timeout ) if timed_out: terminal_status = "timed_out" elif exit_code == 0: terminal_status = "succeeded" else: terminal_status = "failed" except asyncio.CancelledError: log_fh.write("[crafting-table] cancelled\n") terminal_status = "cancelled" exit_code = -4 # Re-raise so the dispatcher's task tracking sees cancellation. await self.db.arun( self.db.mark_job_finished, job_id=job_id, status=terminal_status, exit_code=exit_code, ) await self.workspace.cleanup(paths) raise log_fh.write(f"[crafting-table] --- recipe output end (exit={exit_code}) ---\n") log_fh.flush() await self.workspace.cleanup(paths) # Wave 2A — extract findings from the captured log BEFORE marking the # job finished. Callers that poll on terminal status expect the # findings_count and /findings rows to be populated by the time the # job leaves the running state; doing the parse pass first keeps that # invariant. Skipped on cancellation (no useful output) and on # workspace failure (exit_code is a synthetic crafting-table sentinel, # not a tool exit). Timed-out jobs ARE parsed: a timeout still # produces real partial output, and recipe_fail-on-nonzero is useful. findings_n = 0 if terminal_status not in {"cancelled"} and exit_code is not None and exit_code > -2: findings_n = await self._extract_findings( job_id=job_id, ctx=ctx, log_path=log_path, exit_code=exit_code, ) await self.db.arun( self.db.mark_job_finished, job_id=job_id, status=terminal_status, exit_code=exit_code, ) # Hook fan-out — wave 2 parsers + wave 8 digest hook into this. finished_event = { "job_id": job_id, "project_name": ctx.job["project_name"], "subproject_path": ctx.job["subproject_path"], "recipe": recipe_kind, "status": terminal_status, "exit_code": exit_code, "log_path": str(log_path), "findings_count": findings_n, "finished_at": int(time.time()), } for hook in self._hooks: try: await hook(finished_event) except Exception as e: log.warning("jobs_finished hook failed: %s", e) async def _extract_findings( self, *, job_id: str, ctx: _JobContext, log_path: Path, exit_code: int, ) -> int: """Parse the recipe's captured log into Finding rows and persist them. Failure-tolerant: any exception in the parser path is logged and swallowed — bad parser output never marks a job failed. Returns the number of findings persisted (may be zero). """ try: language = (ctx.subproject.get("language") or "").lower() recipe_kind = ctx.job["recipe"] parser_cls = find_parser(language=language, recipe=recipe_kind) log_text = self._read_log_safe(log_path) findings: list[Finding] = parser_cls.parse( log_text, exit_code, recipe_kind ) count = 0 for f in findings: fp = fingerprint(f.kind, f.file, f.line, f.code, f.message) # raw_json: prefer the parser-supplied raw_json; if absent # but extras has content, serialize that so callers don't # lose the per-CVE / per-advisory metadata. raw_json = f.raw_json if raw_json is None and f.extras: raw_json = json.dumps(f.extras) await self.db.arun( self.db.insert_finding, job_id=job_id, kind=f.kind, severity=f.severity, message=f.message, fingerprint=fp, file=f.file, line=f.line, code=f.code, suggested_fix=f.suggested_fix, raw_json=raw_json, ) count += 1 if count: await self.db.arun(self.db.increment_findings_count, job_id, count) return count except Exception as e: log.warning("findings extraction failed for job %s: %s", job_id, e) return 0 def _read_log_safe(self, log_path: Path) -> str: """Read the captured log; return '' on any I/O error so a missing log file doesn't crash the parser pipeline.""" try: return log_path.read_text(encoding="utf-8", errors="replace") except OSError as e: log.warning("failed to read log %s: %s", log_path, e) return "" async def _exec_recipe( self, *, cmd: str, cwd: str, log_fh, timeout: int ) -> tuple[int, bool]: """Run cmd via /bin/sh -c, stream output to log_fh, return (exit, timed_out). Uses create_subprocess_shell because recipe strings are operator-trusted shell expressions (e.g. `cargo build && cargo test`). Stdout+stderr merged into one stream to preserve interleaving order, which matters for log readability. Important asyncio detail: we wrap proc.wait() in a single task and gate timeout with asyncio.wait() rather than wait_for(). wait_for cancels the underlying coroutine on timeout, which on Python 3.11 marks the proc.wait() future as cancelled — so a SECOND wait_for on the same proc would immediately raise CancelledError instead of returning the post-terminate exit code. Wrapping once with a long-lived task lets us await it twice cleanly. """ # start_new_session=True puts the shell in its own process group so # we can signal the WHOLE group on timeout. Without this, terminate() # only signals the shell; long-running children (sleep, cargo build, # etc.) inherit init and keep stdout open, so the pump never EOFs. proc = await asyncio.create_subprocess_shell( cmd, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, start_new_session=True, ) assert proc.stdout is not None pgid = proc.pid # equals the new session/group id since we just created it def _kill_group(sig: int) -> None: try: os.killpg(pgid, sig) except ProcessLookupError: pass async def pump() -> None: while True: line = await proc.stdout.readline() if not line: break log_fh.write(line.decode("utf-8", "replace")) log_fh.flush() pump_task = asyncio.create_task(pump()) wait_task = asyncio.create_task(proc.wait()) timed_out = False # Phase 1 — give the process up to `timeout` seconds to finish # naturally. done, _ = await asyncio.wait({wait_task}, timeout=timeout) if not done: timed_out = True log_fh.write(f"\n[crafting-table] timeout after {timeout}s — terminating\n") log_fh.flush() _kill_group(signal.SIGTERM) # Phase 2 — graceful shutdown grace period after SIGTERM done, _ = await asyncio.wait({wait_task}, timeout=10) if not done: # Phase 3 — escalate to SIGKILL on the group log_fh.write("[crafting-table] grace expired — SIGKILL\n") log_fh.flush() _kill_group(signal.SIGKILL) await wait_task # wait_task is now done — pull the rc out rc = wait_task.result() # Drain pump. With process-group kill stdout will EOF cleanly; # the wait_for guard is belt-and-braces against any orphan that # somehow survived (e.g. a child that escaped its group). try: await asyncio.wait_for(pump_task, timeout=2) except (asyncio.TimeoutError, Exception): pump_task.cancel() try: await pump_task except (asyncio.CancelledError, Exception): pass return int(rc), timed_out # ---------- helpers ----------------------------------------------------- async def _load_context(self, job_id: str) -> _JobContext | None: job = await self.db.arun(self.db.get_job, job_id) if job is None: log.warning("runner: job %s vanished before dispatch", job_id) return None recipe = json.loads(job["recipe_snapshot_json"]) # subproject inside the snapshot subprojects = recipe.get("subprojects", []) match = None for s in subprojects: if s.get("path") == job["subproject_path"]: match = s break if match is None: # Fallback to the first one — should never happen since we # validate at enqueue time. match = subprojects[0] if subprojects else {} project = await self.db.arun(self.db.get_project, job["project_name"]) if project is None: # Project was deleted while job sat in queue. The FK cascade in # the schema would have nuked the job row too, but we may have # popped the id off the queue before the cascade landed. log.warning("runner: project for job %s gone", job_id) return None return _JobContext(job_id=job_id, job=job, project=project, recipe=recipe, subproject=match)