"""SQLite ledger + migrations. Why SQLite: single-process, single-host service, no cross-host replication needed. The runner is the only writer; every HTTP worker reads. SQLite in WAL mode handles single-writer-many-readers cleanly. Trade-off documented in README. Why stdlib `sqlite3` + `run_in_executor` (not aiosqlite): one less dependency and the queries are tiny (fetchone / fetchall). The runner does its own log streaming via aiofiles-equivalent so we never block the loop on disk. Migration system: - Each entry in MIGRATIONS is (version_id, sql_text). Versions are date-tagged so they sort lexicographically. - Apply in order, INSERT OR IGNORE into schema_migrations to handle multi-worker boot races. - Migrations are append-only; never edit a landed migration, add a new one. """ from __future__ import annotations import asyncio import json import sqlite3 import time from contextlib import contextmanager from pathlib import Path # fmt: off MIGRATIONS: list[tuple[str, str]] = [ ( "001_schema_migrations", """ CREATE TABLE IF NOT EXISTS schema_migrations ( version TEXT PRIMARY KEY, applied_at INTEGER NOT NULL ); """, ), ( "002_tokens", """ CREATE TABLE IF NOT EXISTS tokens ( name TEXT PRIMARY KEY, bearer_hash TEXT NOT NULL UNIQUE, is_admin INTEGER NOT NULL DEFAULT 0, ip_allowlist_json TEXT, created_at INTEGER NOT NULL, last_used_at INTEGER, revoked_at INTEGER ); """, ), ( "003_projects", """ CREATE TABLE IF NOT EXISTS projects ( name TEXT PRIMARY KEY, git_url TEXT NOT NULL, default_branch TEXT NOT NULL DEFAULT 'main', recipe_json TEXT NOT NULL, owner_token TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, FOREIGN KEY (owner_token) REFERENCES tokens(name) ); """, ), ( "004_jobs", """ CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, project_name TEXT NOT NULL, subproject_path TEXT NOT NULL, recipe TEXT NOT NULL, branch TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'queued', queued_at INTEGER NOT NULL, started_at INTEGER, finished_at INTEGER, exit_code INTEGER, log_path TEXT NOT NULL, findings_count INTEGER NOT NULL DEFAULT 0, recipe_snapshot_json TEXT NOT NULL, FOREIGN KEY (project_name) REFERENCES projects(name) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_jobs_project ON jobs(project_name); CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); CREATE INDEX IF NOT EXISTS idx_jobs_queued_at ON jobs(queued_at); """, ), ( "005_findings", """ CREATE TABLE IF NOT EXISTS findings ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT NOT NULL, kind TEXT NOT NULL, severity TEXT NOT NULL, file TEXT, line INTEGER, code TEXT, message TEXT NOT NULL, suggested_fix TEXT, raw_json TEXT, created_at INTEGER NOT NULL, fingerprint TEXT NOT NULL, FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_findings_job ON findings(job_id); CREATE INDEX IF NOT EXISTS idx_findings_fingerprint ON findings(fingerprint); """, ), ( "006_digest_runs", """ CREATE TABLE IF NOT EXISTS digest_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, project_name TEXT NOT NULL, sent_at INTEGER NOT NULL, recipient_count INTEGER NOT NULL, job_count INTEGER NOT NULL, UNIQUE(date, project_name) ); CREATE INDEX IF NOT EXISTS idx_digest_runs_date ON digest_runs(date); """, ), ( "007_patch_attempts", """ CREATE TABLE IF NOT EXISTS patch_attempts ( id INTEGER PRIMARY KEY AUTOINCREMENT, finding_id INTEGER NOT NULL, job_id TEXT NOT NULL, project_name TEXT NOT NULL, attempt_number INTEGER NOT NULL, status TEXT NOT NULL, branch_name TEXT, pr_url TEXT, diff_excerpt TEXT, session_id TEXT, error TEXT, created_at INTEGER NOT NULL, finished_at INTEGER, UNIQUE(finding_id, attempt_number), FOREIGN KEY (finding_id) REFERENCES findings(id) ON DELETE CASCADE, FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_patch_attempts_status ON patch_attempts(status); CREATE INDEX IF NOT EXISTS idx_patch_attempts_project ON patch_attempts(project_name); CREATE INDEX IF NOT EXISTS idx_patch_attempts_finding ON patch_attempts(finding_id); """, ), ] # fmt: on def _hash(token: str) -> str: import hashlib return hashlib.sha256(token.encode("utf-8")).hexdigest() class DB: """Synchronous SQLite wrapper. Async API methods wrap calls with run_in_executor so callers in the FastAPI loop stay non-blocking. """ def __init__(self, db_path: str | Path): self.db_path = str(db_path) Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with self._conn() as c: # WAL = many readers + one writer without lock contention. # synchronous=NORMAL is the standard WAL pairing — durability # against process crash is preserved; only OS crash can drop # the most recent commit. c.execute("PRAGMA journal_mode=WAL") c.execute("PRAGMA synchronous=NORMAL") c.execute("PRAGMA foreign_keys=ON") self.migrate() @contextmanager def _conn(self): # isolation_level=None gives us autocommit; we wrap multi-stmt # operations in BEGIN / COMMIT explicitly when we need atomicity. conn = sqlite3.connect(self.db_path, isolation_level=None, timeout=10.0) conn.row_factory = sqlite3.Row try: conn.execute("PRAGMA foreign_keys=ON") yield conn finally: conn.close() # ---------- migrations --------------------------------------------------- def migrate(self) -> list[str]: """Apply any pending migrations. Returns the list of versions applied on this call (empty if up-to-date). Idempotent + race-safe.""" applied: list[str] = [] with self._conn() as c: # Migration 001 must run first since it creates the tracking table. # Use IF NOT EXISTS in every CREATE so repeat runs are no-ops. for version, sql in MIGRATIONS: c.executescript(sql) cur = c.execute( "INSERT OR IGNORE INTO schema_migrations (version, applied_at) VALUES (?, ?)", (version, int(time.time())), ) if cur.rowcount == 1: applied.append(version) return applied def applied_migrations(self) -> list[str]: with self._conn() as c: rows = c.execute( "SELECT version FROM schema_migrations ORDER BY version" ).fetchall() return [r["version"] for r in rows] # ---------- tokens ------------------------------------------------------- def insert_token( self, *, name: str, bearer: str, is_admin: bool, ip_cidrs: list[str] | None, ) -> None: ip_json = json.dumps(ip_cidrs) if ip_cidrs else None with self._conn() as c: c.execute( """ INSERT INTO tokens (name, bearer_hash, is_admin, ip_allowlist_json, created_at) VALUES (?, ?, ?, ?, ?) """, (name, _hash(bearer), 1 if is_admin else 0, ip_json, int(time.time())), ) def lookup_token_by_bearer(self, bearer: str) -> dict | None: h = _hash(bearer) with self._conn() as c: row = c.execute( """ SELECT name, is_admin, ip_allowlist_json, revoked_at FROM tokens WHERE bearer_hash=? """, (h,), ).fetchone() if not row: return None if row["revoked_at"] is not None: return None c.execute("UPDATE tokens SET last_used_at=? WHERE bearer_hash=?", (int(time.time()), h)) ip_cidrs = json.loads(row["ip_allowlist_json"]) if row["ip_allowlist_json"] else None return { "name": row["name"], "is_admin": bool(row["is_admin"]), "ip_cidrs": ip_cidrs, } def get_token(self, name: str) -> dict | None: with self._conn() as c: row = c.execute( """ SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at FROM tokens WHERE name=? """, (name,), ).fetchone() if not row: return None d = dict(row) d["is_admin"] = bool(d["is_admin"]) d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None return d def list_tokens(self) -> list[dict]: with self._conn() as c: rows = c.execute( """ SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at FROM tokens ORDER BY name """, ).fetchall() out = [] for r in rows: d = dict(r) d["is_admin"] = bool(d["is_admin"]) d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None out.append(d) return out def revoke_token(self, name: str) -> bool: with self._conn() as c: cur = c.execute( "UPDATE tokens SET revoked_at=? WHERE name=? AND revoked_at IS NULL", (int(time.time()), name), ) return cur.rowcount > 0 # ---------- projects ----------------------------------------------------- def upsert_project( self, *, name: str, git_url: str, default_branch: str, recipe_json: str, owner_token: str, ) -> dict: now = int(time.time()) with self._conn() as c: row = c.execute("SELECT created_at, owner_token FROM projects WHERE name=?", (name,)).fetchone() if row is None: c.execute( """ INSERT INTO projects (name, git_url, default_branch, recipe_json, owner_token, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (name, git_url, default_branch, recipe_json, owner_token, now, now), ) created_at = now else: created_at = row["created_at"] c.execute( """ UPDATE projects SET git_url=?, default_branch=?, recipe_json=?, updated_at=? WHERE name=? """, (git_url, default_branch, recipe_json, now, name), ) return { "name": name, "git_url": git_url, "default_branch": default_branch, "recipe_json": recipe_json, "owner_token": owner_token, "created_at": created_at, "updated_at": now, } def get_project(self, name: str) -> dict | None: with self._conn() as c: row = c.execute( "SELECT * FROM projects WHERE name=?", (name,), ).fetchone() return dict(row) if row else None def list_projects(self, *, owner_token: str | None = None) -> list[dict]: with self._conn() as c: if owner_token is None: rows = c.execute("SELECT * FROM projects ORDER BY name").fetchall() else: rows = c.execute( "SELECT * FROM projects WHERE owner_token=? ORDER BY name", (owner_token,), ).fetchall() return [dict(r) for r in rows] def delete_project(self, name: str) -> bool: with self._conn() as c: cur = c.execute("DELETE FROM projects WHERE name=?", (name,)) return cur.rowcount > 0 # ---------- jobs --------------------------------------------------------- def insert_job( self, *, job_id: str, project_name: str, subproject_path: str, recipe: str, branch: str, log_path: str, recipe_snapshot_json: str, ) -> dict: now = int(time.time()) with self._conn() as c: c.execute( """ INSERT INTO jobs (id, project_name, subproject_path, recipe, branch, status, queued_at, log_path, recipe_snapshot_json) VALUES (?, ?, ?, ?, ?, 'queued', ?, ?, ?) """, (job_id, project_name, subproject_path, recipe, branch, now, log_path, recipe_snapshot_json), ) return { "id": job_id, "project_name": project_name, "subproject_path": subproject_path, "recipe": recipe, "branch": branch, "status": "queued", "queued_at": now, "started_at": None, "finished_at": None, "exit_code": None, "log_path": log_path, "findings_count": 0, } def get_job(self, job_id: str) -> dict | None: with self._conn() as c: row = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone() return dict(row) if row else None def list_jobs( self, *, project_name: str | None = None, status: str | None = None, owner_token: str | None = None, limit: int = 50, ) -> list[dict]: sql = """ SELECT j.* FROM jobs j JOIN projects p ON p.name = j.project_name WHERE 1=1 """ params: list = [] if project_name is not None: sql += " AND j.project_name=?" params.append(project_name) if status is not None: sql += " AND j.status=?" params.append(status) if owner_token is not None: sql += " AND p.owner_token=?" params.append(owner_token) sql += " ORDER BY j.queued_at DESC LIMIT ?" params.append(int(limit)) with self._conn() as c: rows = c.execute(sql, params).fetchall() return [dict(r) for r in rows] def mark_job_running(self, job_id: str) -> None: with self._conn() as c: c.execute( "UPDATE jobs SET status='running', started_at=? WHERE id=? AND status='queued'", (int(time.time()), job_id), ) def mark_job_finished( self, *, job_id: str, status: str, exit_code: int | None, ) -> None: with self._conn() as c: c.execute( """ UPDATE jobs SET status=?, finished_at=?, exit_code=? WHERE id=? """, (status, int(time.time()), exit_code, job_id), ) def mark_orphaned_jobs_failed(self, *, log_dir: Path | None = None) -> list[str]: """Sweep on boot — any job in 'running' state was orphaned by a process crash. Mark them failed with exit_code=-1 so callers see the terminal state and can re-queue if they want.""" ids: list[str] = [] now = int(time.time()) with self._conn() as c: rows = c.execute("SELECT id, log_path FROM jobs WHERE status='running'").fetchall() for r in rows: ids.append(r["id"]) c.execute( "UPDATE jobs SET status='failed', finished_at=?, exit_code=-1 WHERE id=?", (now, r["id"]), ) if log_dir is not None: try: Path(r["log_path"]).parent.mkdir(parents=True, exist_ok=True) with open(r["log_path"], "a", encoding="utf-8") as fh: fh.write("\n[crafting-table] runner restart, job orphaned\n") except OSError: pass return ids def increment_findings_count(self, job_id: str, n: int) -> None: with self._conn() as c: c.execute( "UPDATE jobs SET findings_count = findings_count + ? WHERE id=?", (n, job_id), ) # ---------- findings ----------------------------------------------------- def insert_finding( self, *, job_id: str, kind: str, severity: str, message: str, fingerprint: str, file: str | None = None, line: int | None = None, code: str | None = None, suggested_fix: str | None = None, raw_json: str | None = None, ) -> int: with self._conn() as c: cur = c.execute( """ INSERT INTO findings (job_id, kind, severity, file, line, code, message, suggested_fix, raw_json, created_at, fingerprint) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (job_id, kind, severity, file, line, code, message, suggested_fix, raw_json, int(time.time()), fingerprint), ) return cur.lastrowid def list_findings(self, job_id: str) -> list[dict]: with self._conn() as c: rows = c.execute( "SELECT * FROM findings WHERE job_id=? ORDER BY id", (job_id,), ).fetchall() return [dict(r) for r in rows] # ---------- digest runs -------------------------------------------------- def record_digest_run( self, *, date: str, project_name: str, sent_at: int, recipient_count: int, job_count: int, ) -> bool: """Record an attempted/successful digest send. UNIQUE(date, project_name) enforces idempotency — a second call for the same date+project is a no-op (returns False).""" with self._conn() as c: cur = c.execute( """ INSERT OR IGNORE INTO digest_runs (date, project_name, sent_at, recipient_count, job_count) VALUES (?, ?, ?, ?, ?) """, (date, project_name, sent_at, recipient_count, job_count), ) return cur.rowcount == 1 def digest_run_exists(self, date: str, project_name: str) -> bool: with self._conn() as c: row = c.execute( "SELECT 1 FROM digest_runs WHERE date=? AND project_name=?", (date, project_name), ).fetchone() return row is not None def list_digest_runs(self, date: str | None = None) -> list[dict]: with self._conn() as c: if date is None: rows = c.execute( "SELECT * FROM digest_runs ORDER BY date DESC, project_name" ).fetchall() else: rows = c.execute( "SELECT * FROM digest_runs WHERE date=? ORDER BY project_name", (date,), ).fetchall() return [dict(r) for r in rows] # ---------- patch attempts ---------------------------------------------- def get_finding(self, finding_id: int) -> dict | None: with self._conn() as c: row = c.execute( "SELECT * FROM findings WHERE id=?", (int(finding_id),) ).fetchone() return dict(row) if row else None def list_findings_for_job(self, job_id: str) -> list[dict]: """Alias matching list_findings — kept for callers that prefer the more explicit name.""" return self.list_findings(job_id) def count_patch_attempts(self, finding_id: int) -> int: with self._conn() as c: row = c.execute( "SELECT COUNT(*) AS n FROM patch_attempts WHERE finding_id=?", (int(finding_id),), ).fetchone() return int(row["n"]) if row else 0 def insert_patch_attempt( self, *, finding_id: int, job_id: str, project_name: str, attempt_number: int, status: str, branch_name: str | None = None, pr_url: str | None = None, diff_excerpt: str | None = None, session_id: str | None = None, error: str | None = None, ) -> int: with self._conn() as c: cur = c.execute( """ INSERT INTO patch_attempts (finding_id, job_id, project_name, attempt_number, status, branch_name, pr_url, diff_excerpt, session_id, error, created_at, finished_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( int(finding_id), job_id, project_name, int(attempt_number), status, branch_name, pr_url, diff_excerpt, session_id, error, int(time.time()), int(time.time()), ), ) return int(cur.lastrowid) def get_patch_attempt(self, attempt_id: int) -> dict | None: with self._conn() as c: row = c.execute( "SELECT * FROM patch_attempts WHERE id=?", (int(attempt_id),) ).fetchone() return dict(row) if row else None def list_patch_attempts( self, *, project_name: str | None = None, status: str | None = None, finding_id: int | None = None, owner_token: str | None = None, limit: int = 100, ) -> list[dict]: sql = """ SELECT pa.* FROM patch_attempts pa JOIN projects p ON p.name = pa.project_name WHERE 1=1 """ params: list = [] if project_name is not None: sql += " AND pa.project_name=?" params.append(project_name) if status is not None: sql += " AND pa.status=?" params.append(status) if finding_id is not None: sql += " AND pa.finding_id=?" params.append(int(finding_id)) if owner_token is not None: sql += " AND p.owner_token=?" params.append(owner_token) sql += " ORDER BY pa.created_at DESC LIMIT ?" params.append(int(limit)) with self._conn() as c: rows = c.execute(sql, params).fetchall() return [dict(r) for r in rows] def list_patch_attempts_in_window( self, *, window_start: int, window_end: int, project_name: str | None = None, statuses: tuple[str, ...] | None = None, ) -> list[dict]: """Patch attempts created within [window_start, window_end]. Used by the email digest to surface drafted patches in the daily summary.""" sql = "SELECT * FROM patch_attempts WHERE created_at >= ? AND created_at <= ?" params: list = [int(window_start), int(window_end)] if project_name is not None: sql += " AND project_name=?" params.append(project_name) if statuses: placeholders = ",".join("?" for _ in statuses) sql += f" AND status IN ({placeholders})" params.extend(statuses) sql += " ORDER BY created_at" with self._conn() as c: rows = c.execute(sql, params).fetchall() return [dict(r) for r in rows] # ---------- async wrappers ---------------------------------------------- async def arun(self, fn, *args, **kwargs): """Run a sync DB method in the default executor. Use from FastAPI request handlers / runner coroutines so we don't block the event loop on disk I/O. Most of these queries are <1ms but the pattern stays consistent for when we add bigger ones later. """ loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))