crafting-table/crafting_table/runner.py
Kayos 0ec3a04676 v0.1 wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner
- db.py: migrations + DAOs for tokens / projects / jobs / findings (SQLite WAL)
- auth.py: SHA-256 bearer hashing + LAN-CIDR allowlist + admin/app token tiers
- models.py: Pydantic shapes (Project, Subproject, Schedule, Notify, Job, CreateJobRequest)
- server.py: FastAPI on port 8810; /healthz, /admin/tokens/*, /projects/*, /jobs, /jobs/{id}, /jobs/{id}/log, /jobs/{id}/findings
- runner.py: bounded asyncio pool, per-job timeout with process-group SIGTERM→SIGKILL escalation, orphaned-job recovery on boot
- workspace.py: bare-clone + worktree materialization, gc
- config.py: env-driven
- 62 tests across db / auth / projects / jobs / runner / e2e — all green

Cross-token project access returns 404 (not 403) — existence-leak guard.
Bearer tokens hashed at rest; admin token bootstrapped on first boot.
Recipe subprocess uses start_new_session=True so killpg targets the
whole process tree on timeout — child processes can't escape SIGKILL.
Pump task guarded with wait_for(2s) + cancel fallback against any
orphan that survives the group kill.

Wave 2 (parsers + findings extraction + MCP + email digest) pending.

Spec: memory/spec-crafting-table.md
2026-04-29 08:17:41 -07:00

408 lines
16 KiB
Python

"""Async job runner — bounded asyncio pool that materializes workspaces and
runs recipe shell commands.
Lifecycle:
1. server.lifespan calls `runner.start()`:
- mark any 'running' jobs from a previous process as failed (orphaned)
- kick off the dispatcher loop + workspace gc loop
2. POST /projects/<name>/jobs:
- inserts a row in `jobs` (status=queued)
- calls `runner.enqueue(job_id)` — fast, just puts the id on a queue
3. dispatcher pulls ids off the queue, acquires a semaphore slot,
spawns `_run_job` — bounded by `max_concurrent`.
4. _run_job:
a. mark running
b. materialize workspace
c. exec recipe via /bin/sh -c
d. stream stdout+stderr to log file (live)
e. enforce per-job timeout
f. mark terminal status + exit code
g. emit a `jobs_finished` event hook for wave-2 parsers / wave-8 digest
5. server.lifespan stop() drains in-flight tasks then closes.
Concurrency: asyncio.Semaphore(max_concurrent) caps in-flight subprocess
runs. The queue itself is unbounded — back-pressure is enforced by the
semaphore + caller can poll job status to know when to enqueue more.
Recipe security: shell strings are run via `create_subprocess_shell` (which
uses /bin/sh -c). Admins set them; this is documented loud in README.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import signal
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Awaitable, Callable
from .db import DB
from .workspace import WorkspaceManager, WorkspacePaths
log = logging.getLogger("crafting_table.runner")
# Hook signature: called after every job reaches a terminal state.
# Wave 2 wires this to the parser pipeline; wave 8 to the email digest queue.
JobFinishedHook = Callable[[dict], Awaitable[None]]
@dataclass
class _JobContext:
job_id: str
job: dict
project: dict
recipe: dict
subproject: dict
class Runner:
def __init__(
self,
*,
db: DB,
workspace: WorkspaceManager,
log_dir: Path,
max_concurrent: int = 4,
default_timeout_secs: int = 1800,
gc_interval_secs: int = 3600,
gc_age_secs: int = 86400,
):
self.db = db
self.workspace = workspace
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.max_concurrent = max_concurrent
self.default_timeout_secs = default_timeout_secs
self.gc_interval_secs = gc_interval_secs
self.gc_age_secs = gc_age_secs
self.queue: asyncio.Queue[str] = asyncio.Queue()
self.semaphore = asyncio.Semaphore(max_concurrent)
self._tasks: set[asyncio.Task] = set()
self._dispatcher_task: asyncio.Task | None = None
self._gc_task: asyncio.Task | None = None
self._stopping = False
self._hooks: list[JobFinishedHook] = []
# Test introspection — lets test_runner assert on bounded-concurrency.
self.in_flight = 0
self.peak_in_flight = 0
# ---------- lifecycle ---------------------------------------------------
def add_hook(self, hook: JobFinishedHook) -> None:
self._hooks.append(hook)
async def start(self) -> None:
# Recover orphaned 'running' jobs from a previous process — mark them
# failed with exit_code=-1 and a synthetic log line. We do NOT try to
# resume a job mid-execution; recipe state could be partial.
orphaned = await self.db.arun(
self.db.mark_orphaned_jobs_failed, log_dir=self.log_dir
)
if orphaned:
log.warning("marked %d orphaned running job(s) failed: %s", len(orphaned), orphaned)
self._dispatcher_task = asyncio.create_task(self._dispatch_loop())
self._gc_task = asyncio.create_task(self._gc_loop())
async def stop(self) -> None:
self._stopping = True
if self._dispatcher_task is not None:
self._dispatcher_task.cancel()
try:
await self._dispatcher_task
except asyncio.CancelledError:
pass
if self._gc_task is not None:
self._gc_task.cancel()
try:
await self._gc_task
except asyncio.CancelledError:
pass
# Cancel any in-flight job tasks. Recipes will see SIGTERM via the
# asyncio cancellation chain on the subprocess transport.
for t in list(self._tasks):
t.cancel()
for t in list(self._tasks):
try:
await t
except (asyncio.CancelledError, Exception):
pass
# ---------- enqueue -----------------------------------------------------
async def enqueue(self, job_id: str) -> None:
await self.queue.put(job_id)
def stats(self) -> dict:
return {
"queued": self.queue.qsize(),
"running": self.in_flight,
"max": self.max_concurrent,
"peak": self.peak_in_flight,
}
# ---------- dispatcher --------------------------------------------------
async def _dispatch_loop(self) -> None:
try:
while not self._stopping:
job_id = await self.queue.get()
# Acquire BEFORE spawning the task so we naturally block when
# the pool is full instead of building up an unbounded set of
# tasks that all immediately await the semaphore.
await self.semaphore.acquire()
if self._stopping:
self.semaphore.release()
break
t = asyncio.create_task(self._wrap_run(job_id))
self._tasks.add(t)
t.add_done_callback(self._tasks.discard)
except asyncio.CancelledError:
raise
async def _wrap_run(self, job_id: str) -> None:
self.in_flight += 1
if self.in_flight > self.peak_in_flight:
self.peak_in_flight = self.in_flight
try:
await self._run_job(job_id)
except Exception as e:
log.exception("runner: unhandled error for job %s: %s", job_id, e)
finally:
self.in_flight -= 1
self.semaphore.release()
# ---------- gc loop -----------------------------------------------------
async def _gc_loop(self) -> None:
try:
while not self._stopping:
await asyncio.sleep(self.gc_interval_secs)
try:
res = await self.workspace.gc(age_secs=self.gc_age_secs)
if res["removed"]:
log.info("workspace gc: %s", res)
except Exception as e:
log.warning("workspace gc failed: %s", e)
except asyncio.CancelledError:
raise
# ---------- core --------------------------------------------------------
async def _run_job(self, job_id: str) -> None:
ctx = await self._load_context(job_id)
if ctx is None:
return
await self.db.arun(self.db.mark_job_running, job_id)
log_path = Path(ctx.job["log_path"])
log_path.parent.mkdir(parents=True, exist_ok=True)
recipe_kind = ctx.job["recipe"]
cmd_str = ctx.subproject.get(recipe_kind)
timeout = int(ctx.subproject.get("timeout_secs") or self.default_timeout_secs)
terminal_status = "succeeded"
exit_code: int | None = None
with log_path.open("w", encoding="utf-8") as log_fh:
log_fh.write(f"[crafting-table] job {job_id}\n")
log_fh.write(f"[crafting-table] project={ctx.job['project_name']} subproject={ctx.job['subproject_path']}\n")
log_fh.write(f"[crafting-table] recipe={recipe_kind} branch={ctx.job['branch']}\n")
log_fh.write(f"[crafting-table] cmd={cmd_str!r} timeout={timeout}s\n")
log_fh.flush()
if not cmd_str:
log_fh.write(f"[crafting-table] subproject has no '{recipe_kind}' command\n")
terminal_status = "failed"
exit_code = -2
else:
try:
paths = await self.workspace.materialize(
project=ctx.job["project_name"],
job_id=job_id,
git_url=ctx.project["git_url"],
branch=ctx.job["branch"],
log_fh=log_fh,
)
except Exception as e:
log_fh.write(f"[crafting-table] workspace error: {e}\n")
terminal_status = "failed"
exit_code = -3
else:
sub_path = ctx.subproject.get("path", ".")
work_dir = paths.worktree_dir / sub_path
log_fh.write(f"[crafting-table] cwd={work_dir}\n")
log_fh.write("[crafting-table] --- recipe output begin ---\n")
log_fh.flush()
try:
exit_code, timed_out = await self._exec_recipe(
cmd=cmd_str, cwd=str(work_dir), log_fh=log_fh, timeout=timeout
)
if timed_out:
terminal_status = "timed_out"
elif exit_code == 0:
terminal_status = "succeeded"
else:
terminal_status = "failed"
except asyncio.CancelledError:
log_fh.write("[crafting-table] cancelled\n")
terminal_status = "cancelled"
exit_code = -4
# Re-raise so the dispatcher's task tracking sees cancellation.
await self.db.arun(
self.db.mark_job_finished,
job_id=job_id,
status=terminal_status,
exit_code=exit_code,
)
await self.workspace.cleanup(paths)
raise
log_fh.write(f"[crafting-table] --- recipe output end (exit={exit_code}) ---\n")
log_fh.flush()
await self.workspace.cleanup(paths)
await self.db.arun(
self.db.mark_job_finished,
job_id=job_id,
status=terminal_status,
exit_code=exit_code,
)
# Hook fan-out — wave 2 parsers + wave 8 digest hook into this.
finished_event = {
"job_id": job_id,
"project_name": ctx.job["project_name"],
"subproject_path": ctx.job["subproject_path"],
"recipe": recipe_kind,
"status": terminal_status,
"exit_code": exit_code,
"log_path": str(log_path),
"finished_at": int(time.time()),
}
for hook in self._hooks:
try:
await hook(finished_event)
except Exception as e:
log.warning("jobs_finished hook failed: %s", e)
async def _exec_recipe(
self, *, cmd: str, cwd: str, log_fh, timeout: int
) -> tuple[int, bool]:
"""Run cmd via /bin/sh -c, stream output to log_fh, return (exit, timed_out).
Uses create_subprocess_shell because recipe strings are operator-trusted
shell expressions (e.g. `cargo build && cargo test`). Stdout+stderr
merged into one stream to preserve interleaving order, which matters
for log readability.
Important asyncio detail: we wrap proc.wait() in a single task and
gate timeout with asyncio.wait() rather than wait_for(). wait_for
cancels the underlying coroutine on timeout, which on Python 3.11
marks the proc.wait() future as cancelled — so a SECOND wait_for on
the same proc would immediately raise CancelledError instead of
returning the post-terminate exit code. Wrapping once with a
long-lived task lets us await it twice cleanly.
"""
# start_new_session=True puts the shell in its own process group so
# we can signal the WHOLE group on timeout. Without this, terminate()
# only signals the shell; long-running children (sleep, cargo build,
# etc.) inherit init and keep stdout open, so the pump never EOFs.
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
assert proc.stdout is not None
pgid = proc.pid # equals the new session/group id since we just created it
def _kill_group(sig: int) -> None:
try:
os.killpg(pgid, sig)
except ProcessLookupError:
pass
async def pump() -> None:
while True:
line = await proc.stdout.readline()
if not line:
break
log_fh.write(line.decode("utf-8", "replace"))
log_fh.flush()
pump_task = asyncio.create_task(pump())
wait_task = asyncio.create_task(proc.wait())
timed_out = False
# Phase 1 — give the process up to `timeout` seconds to finish
# naturally.
done, _ = await asyncio.wait({wait_task}, timeout=timeout)
if not done:
timed_out = True
log_fh.write(f"\n[crafting-table] timeout after {timeout}s — terminating\n")
log_fh.flush()
_kill_group(signal.SIGTERM)
# Phase 2 — graceful shutdown grace period after SIGTERM
done, _ = await asyncio.wait({wait_task}, timeout=10)
if not done:
# Phase 3 — escalate to SIGKILL on the group
log_fh.write("[crafting-table] grace expired — SIGKILL\n")
log_fh.flush()
_kill_group(signal.SIGKILL)
await wait_task
# wait_task is now done — pull the rc out
rc = wait_task.result()
# Drain pump. With process-group kill stdout will EOF cleanly;
# the wait_for guard is belt-and-braces against any orphan that
# somehow survived (e.g. a child that escaped its group).
try:
await asyncio.wait_for(pump_task, timeout=2)
except (asyncio.TimeoutError, Exception):
pump_task.cancel()
try:
await pump_task
except (asyncio.CancelledError, Exception):
pass
return int(rc), timed_out
# ---------- helpers -----------------------------------------------------
async def _load_context(self, job_id: str) -> _JobContext | None:
job = await self.db.arun(self.db.get_job, job_id)
if job is None:
log.warning("runner: job %s vanished before dispatch", job_id)
return None
recipe = json.loads(job["recipe_snapshot_json"])
# subproject inside the snapshot
subprojects = recipe.get("subprojects", [])
match = None
for s in subprojects:
if s.get("path") == job["subproject_path"]:
match = s
break
if match is None:
# Fallback to the first one — should never happen since we
# validate at enqueue time.
match = subprojects[0] if subprojects else {}
project = await self.db.arun(self.db.get_project, job["project_name"])
if project is None:
# Project was deleted while job sat in queue. The FK cascade in
# the schema would have nuked the job row too, but we may have
# popped the id off the queue before the cascade landed.
log.warning("runner: project for job %s gone", job_id)
return None
return _JobContext(job_id=job_id, job=job, project=project, recipe=recipe, subproject=match)