crafting-table/crafting_table/runner.py
Kayos d467b2f5be v0.1 wave 2A (steps 5+6): per-language parsers + findings extraction
- parsers/ package: rust / python / go / typescript / generic
- parser registry with language+recipe -> fallback resolution
- fingerprint hash (kind+file+line+code) for cross-run dedup
- runner.py post-exec hook: parse log, persist findings, count on job row
  (extraction runs before mark_job_finished so callers polling on terminal
  status see findings_count populated atomically)
- db.insert_finding / list_findings / increment_findings_count DAOs already
  shipped in wave 1; wired here
- GET /jobs/{id}/findings now returns real data (server route already
  shipped; was returning empty list because nothing populated the table)
- tests/test_parsers/: 6 modules + 11 fixtures (rust/python/go/typescript)
- tests/test_runner_findings.py: 3 integration tests
- README: tick steps 2-6, add Findings section

Suite: 108 passing (62 wave-1 + 46 new).
Spec: memory/spec-crafting-table.md
2026-04-29 08:36:16 -07:00

488 lines
19 KiB
Python

"""Async job runner — bounded asyncio pool that materializes workspaces and
runs recipe shell commands.
Lifecycle:
1. server.lifespan calls `runner.start()`:
- mark any 'running' jobs from a previous process as failed (orphaned)
- kick off the dispatcher loop + workspace gc loop
2. POST /projects/<name>/jobs:
- inserts a row in `jobs` (status=queued)
- calls `runner.enqueue(job_id)` — fast, just puts the id on a queue
3. dispatcher pulls ids off the queue, acquires a semaphore slot,
spawns `_run_job` — bounded by `max_concurrent`.
4. _run_job:
a. mark running
b. materialize workspace
c. exec recipe via /bin/sh -c
d. stream stdout+stderr to log file (live)
e. enforce per-job timeout
f. mark terminal status + exit code
g. emit a `jobs_finished` event hook for wave-2 parsers / wave-8 digest
5. server.lifespan stop() drains in-flight tasks then closes.
Concurrency: asyncio.Semaphore(max_concurrent) caps in-flight subprocess
runs. The queue itself is unbounded — back-pressure is enforced by the
semaphore + caller can poll job status to know when to enqueue more.
Recipe security: shell strings are run via `create_subprocess_shell` (which
uses /bin/sh -c). Admins set them; this is documented loud in README.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import signal
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Awaitable, Callable
from .db import DB
from .parsers import Finding, find_parser, fingerprint
from .workspace import WorkspaceManager, WorkspacePaths
log = logging.getLogger("crafting_table.runner")
# Hook signature: called after every job reaches a terminal state.
# Wave 2 wires this to the parser pipeline; wave 8 to the email digest queue.
JobFinishedHook = Callable[[dict], Awaitable[None]]
@dataclass
class _JobContext:
job_id: str
job: dict
project: dict
recipe: dict
subproject: dict
class Runner:
def __init__(
self,
*,
db: DB,
workspace: WorkspaceManager,
log_dir: Path,
max_concurrent: int = 4,
default_timeout_secs: int = 1800,
gc_interval_secs: int = 3600,
gc_age_secs: int = 86400,
):
self.db = db
self.workspace = workspace
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.max_concurrent = max_concurrent
self.default_timeout_secs = default_timeout_secs
self.gc_interval_secs = gc_interval_secs
self.gc_age_secs = gc_age_secs
self.queue: asyncio.Queue[str] = asyncio.Queue()
self.semaphore = asyncio.Semaphore(max_concurrent)
self._tasks: set[asyncio.Task] = set()
self._dispatcher_task: asyncio.Task | None = None
self._gc_task: asyncio.Task | None = None
self._stopping = False
self._hooks: list[JobFinishedHook] = []
# Test introspection — lets test_runner assert on bounded-concurrency.
self.in_flight = 0
self.peak_in_flight = 0
# ---------- lifecycle ---------------------------------------------------
def add_hook(self, hook: JobFinishedHook) -> None:
self._hooks.append(hook)
async def start(self) -> None:
# Recover orphaned 'running' jobs from a previous process — mark them
# failed with exit_code=-1 and a synthetic log line. We do NOT try to
# resume a job mid-execution; recipe state could be partial.
orphaned = await self.db.arun(
self.db.mark_orphaned_jobs_failed, log_dir=self.log_dir
)
if orphaned:
log.warning("marked %d orphaned running job(s) failed: %s", len(orphaned), orphaned)
self._dispatcher_task = asyncio.create_task(self._dispatch_loop())
self._gc_task = asyncio.create_task(self._gc_loop())
async def stop(self) -> None:
self._stopping = True
if self._dispatcher_task is not None:
self._dispatcher_task.cancel()
try:
await self._dispatcher_task
except asyncio.CancelledError:
pass
if self._gc_task is not None:
self._gc_task.cancel()
try:
await self._gc_task
except asyncio.CancelledError:
pass
# Cancel any in-flight job tasks. Recipes will see SIGTERM via the
# asyncio cancellation chain on the subprocess transport.
for t in list(self._tasks):
t.cancel()
for t in list(self._tasks):
try:
await t
except (asyncio.CancelledError, Exception):
pass
# ---------- enqueue -----------------------------------------------------
async def enqueue(self, job_id: str) -> None:
await self.queue.put(job_id)
def stats(self) -> dict:
return {
"queued": self.queue.qsize(),
"running": self.in_flight,
"max": self.max_concurrent,
"peak": self.peak_in_flight,
}
# ---------- dispatcher --------------------------------------------------
async def _dispatch_loop(self) -> None:
try:
while not self._stopping:
job_id = await self.queue.get()
# Acquire BEFORE spawning the task so we naturally block when
# the pool is full instead of building up an unbounded set of
# tasks that all immediately await the semaphore.
await self.semaphore.acquire()
if self._stopping:
self.semaphore.release()
break
t = asyncio.create_task(self._wrap_run(job_id))
self._tasks.add(t)
t.add_done_callback(self._tasks.discard)
except asyncio.CancelledError:
raise
async def _wrap_run(self, job_id: str) -> None:
self.in_flight += 1
if self.in_flight > self.peak_in_flight:
self.peak_in_flight = self.in_flight
try:
await self._run_job(job_id)
except Exception as e:
log.exception("runner: unhandled error for job %s: %s", job_id, e)
finally:
self.in_flight -= 1
self.semaphore.release()
# ---------- gc loop -----------------------------------------------------
async def _gc_loop(self) -> None:
try:
while not self._stopping:
await asyncio.sleep(self.gc_interval_secs)
try:
res = await self.workspace.gc(age_secs=self.gc_age_secs)
if res["removed"]:
log.info("workspace gc: %s", res)
except Exception as e:
log.warning("workspace gc failed: %s", e)
except asyncio.CancelledError:
raise
# ---------- core --------------------------------------------------------
async def _run_job(self, job_id: str) -> None:
ctx = await self._load_context(job_id)
if ctx is None:
return
await self.db.arun(self.db.mark_job_running, job_id)
log_path = Path(ctx.job["log_path"])
log_path.parent.mkdir(parents=True, exist_ok=True)
recipe_kind = ctx.job["recipe"]
cmd_str = ctx.subproject.get(recipe_kind)
timeout = int(ctx.subproject.get("timeout_secs") or self.default_timeout_secs)
terminal_status = "succeeded"
exit_code: int | None = None
with log_path.open("w", encoding="utf-8") as log_fh:
log_fh.write(f"[crafting-table] job {job_id}\n")
log_fh.write(f"[crafting-table] project={ctx.job['project_name']} subproject={ctx.job['subproject_path']}\n")
log_fh.write(f"[crafting-table] recipe={recipe_kind} branch={ctx.job['branch']}\n")
log_fh.write(f"[crafting-table] cmd={cmd_str!r} timeout={timeout}s\n")
log_fh.flush()
if not cmd_str:
log_fh.write(f"[crafting-table] subproject has no '{recipe_kind}' command\n")
terminal_status = "failed"
exit_code = -2
else:
try:
paths = await self.workspace.materialize(
project=ctx.job["project_name"],
job_id=job_id,
git_url=ctx.project["git_url"],
branch=ctx.job["branch"],
log_fh=log_fh,
)
except Exception as e:
log_fh.write(f"[crafting-table] workspace error: {e}\n")
terminal_status = "failed"
exit_code = -3
else:
sub_path = ctx.subproject.get("path", ".")
work_dir = paths.worktree_dir / sub_path
log_fh.write(f"[crafting-table] cwd={work_dir}\n")
log_fh.write("[crafting-table] --- recipe output begin ---\n")
log_fh.flush()
try:
exit_code, timed_out = await self._exec_recipe(
cmd=cmd_str, cwd=str(work_dir), log_fh=log_fh, timeout=timeout
)
if timed_out:
terminal_status = "timed_out"
elif exit_code == 0:
terminal_status = "succeeded"
else:
terminal_status = "failed"
except asyncio.CancelledError:
log_fh.write("[crafting-table] cancelled\n")
terminal_status = "cancelled"
exit_code = -4
# Re-raise so the dispatcher's task tracking sees cancellation.
await self.db.arun(
self.db.mark_job_finished,
job_id=job_id,
status=terminal_status,
exit_code=exit_code,
)
await self.workspace.cleanup(paths)
raise
log_fh.write(f"[crafting-table] --- recipe output end (exit={exit_code}) ---\n")
log_fh.flush()
await self.workspace.cleanup(paths)
# Wave 2A — extract findings from the captured log BEFORE marking the
# job finished. Callers that poll on terminal status expect the
# findings_count and /findings rows to be populated by the time the
# job leaves the running state; doing the parse pass first keeps that
# invariant. Skipped on cancellation (no useful output) and on
# workspace failure (exit_code is a synthetic crafting-table sentinel,
# not a tool exit). Timed-out jobs ARE parsed: a timeout still
# produces real partial output, and recipe_fail-on-nonzero is useful.
findings_n = 0
if terminal_status not in {"cancelled"} and exit_code is not None and exit_code > -2:
findings_n = await self._extract_findings(
job_id=job_id,
ctx=ctx,
log_path=log_path,
exit_code=exit_code,
)
await self.db.arun(
self.db.mark_job_finished,
job_id=job_id,
status=terminal_status,
exit_code=exit_code,
)
# Hook fan-out — wave 2 parsers + wave 8 digest hook into this.
finished_event = {
"job_id": job_id,
"project_name": ctx.job["project_name"],
"subproject_path": ctx.job["subproject_path"],
"recipe": recipe_kind,
"status": terminal_status,
"exit_code": exit_code,
"log_path": str(log_path),
"findings_count": findings_n,
"finished_at": int(time.time()),
}
for hook in self._hooks:
try:
await hook(finished_event)
except Exception as e:
log.warning("jobs_finished hook failed: %s", e)
async def _extract_findings(
self,
*,
job_id: str,
ctx: _JobContext,
log_path: Path,
exit_code: int,
) -> int:
"""Parse the recipe's captured log into Finding rows and persist them.
Failure-tolerant: any exception in the parser path is logged and
swallowed — bad parser output never marks a job failed. Returns the
number of findings persisted (may be zero).
"""
try:
language = (ctx.subproject.get("language") or "").lower()
recipe_kind = ctx.job["recipe"]
parser_cls = find_parser(language=language, recipe=recipe_kind)
log_text = self._read_log_safe(log_path)
findings: list[Finding] = parser_cls.parse(
log_text, exit_code, recipe_kind
)
count = 0
for f in findings:
fp = fingerprint(f.kind, f.file, f.line, f.code, f.message)
# raw_json: prefer the parser-supplied raw_json; if absent
# but extras has content, serialize that so callers don't
# lose the per-CVE / per-advisory metadata.
raw_json = f.raw_json
if raw_json is None and f.extras:
raw_json = json.dumps(f.extras)
await self.db.arun(
self.db.insert_finding,
job_id=job_id,
kind=f.kind,
severity=f.severity,
message=f.message,
fingerprint=fp,
file=f.file,
line=f.line,
code=f.code,
suggested_fix=f.suggested_fix,
raw_json=raw_json,
)
count += 1
if count:
await self.db.arun(self.db.increment_findings_count, job_id, count)
return count
except Exception as e:
log.warning("findings extraction failed for job %s: %s", job_id, e)
return 0
def _read_log_safe(self, log_path: Path) -> str:
"""Read the captured log; return '' on any I/O error so a missing log
file doesn't crash the parser pipeline."""
try:
return log_path.read_text(encoding="utf-8", errors="replace")
except OSError as e:
log.warning("failed to read log %s: %s", log_path, e)
return ""
async def _exec_recipe(
self, *, cmd: str, cwd: str, log_fh, timeout: int
) -> tuple[int, bool]:
"""Run cmd via /bin/sh -c, stream output to log_fh, return (exit, timed_out).
Uses create_subprocess_shell because recipe strings are operator-trusted
shell expressions (e.g. `cargo build && cargo test`). Stdout+stderr
merged into one stream to preserve interleaving order, which matters
for log readability.
Important asyncio detail: we wrap proc.wait() in a single task and
gate timeout with asyncio.wait() rather than wait_for(). wait_for
cancels the underlying coroutine on timeout, which on Python 3.11
marks the proc.wait() future as cancelled — so a SECOND wait_for on
the same proc would immediately raise CancelledError instead of
returning the post-terminate exit code. Wrapping once with a
long-lived task lets us await it twice cleanly.
"""
# start_new_session=True puts the shell in its own process group so
# we can signal the WHOLE group on timeout. Without this, terminate()
# only signals the shell; long-running children (sleep, cargo build,
# etc.) inherit init and keep stdout open, so the pump never EOFs.
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
assert proc.stdout is not None
pgid = proc.pid # equals the new session/group id since we just created it
def _kill_group(sig: int) -> None:
try:
os.killpg(pgid, sig)
except ProcessLookupError:
pass
async def pump() -> None:
while True:
line = await proc.stdout.readline()
if not line:
break
log_fh.write(line.decode("utf-8", "replace"))
log_fh.flush()
pump_task = asyncio.create_task(pump())
wait_task = asyncio.create_task(proc.wait())
timed_out = False
# Phase 1 — give the process up to `timeout` seconds to finish
# naturally.
done, _ = await asyncio.wait({wait_task}, timeout=timeout)
if not done:
timed_out = True
log_fh.write(f"\n[crafting-table] timeout after {timeout}s — terminating\n")
log_fh.flush()
_kill_group(signal.SIGTERM)
# Phase 2 — graceful shutdown grace period after SIGTERM
done, _ = await asyncio.wait({wait_task}, timeout=10)
if not done:
# Phase 3 — escalate to SIGKILL on the group
log_fh.write("[crafting-table] grace expired — SIGKILL\n")
log_fh.flush()
_kill_group(signal.SIGKILL)
await wait_task
# wait_task is now done — pull the rc out
rc = wait_task.result()
# Drain pump. With process-group kill stdout will EOF cleanly;
# the wait_for guard is belt-and-braces against any orphan that
# somehow survived (e.g. a child that escaped its group).
try:
await asyncio.wait_for(pump_task, timeout=2)
except (asyncio.TimeoutError, Exception):
pump_task.cancel()
try:
await pump_task
except (asyncio.CancelledError, Exception):
pass
return int(rc), timed_out
# ---------- helpers -----------------------------------------------------
async def _load_context(self, job_id: str) -> _JobContext | None:
job = await self.db.arun(self.db.get_job, job_id)
if job is None:
log.warning("runner: job %s vanished before dispatch", job_id)
return None
recipe = json.loads(job["recipe_snapshot_json"])
# subproject inside the snapshot
subprojects = recipe.get("subprojects", [])
match = None
for s in subprojects:
if s.get("path") == job["subproject_path"]:
match = s
break
if match is None:
# Fallback to the first one — should never happen since we
# validate at enqueue time.
match = subprojects[0] if subprojects else {}
project = await self.db.arun(self.db.get_project, job["project_name"])
if project is None:
# Project was deleted while job sat in queue. The FK cascade in
# the schema would have nuked the job row too, but we may have
# popped the id off the queue before the cascade landed.
log.warning("runner: project for job %s gone", job_id)
return None
return _JobContext(job_id=job_id, job=job, project=project, recipe=recipe, subproject=match)