crafting-table/crafting_table/db.py
Cobb Hayes b335405c02 Public-flip audit: generalize internal hosts/paths + drop Sulkta-internal refs
URLs, mount paths, and LAN host bindings parameterized via env or relative paths
so the repo stands up from a clean clone anywhere. Drop cross-codebase refs
("mirrors clawdforge's pattern"), Sulkta-Coop client/merchant test fixtures,
and audit-changelog scaffolding from comments. README terser, technical content
preserved.
2026-05-27 11:25:47 -07:00

715 lines
25 KiB
Python

"""SQLite ledger + migrations.
Why SQLite: single-process, single-host service, no cross-host replication
needed. The runner is the only writer; every HTTP worker reads. SQLite in
WAL mode handles single-writer-many-readers cleanly. Trade-off documented
in README.
Why stdlib `sqlite3` + `run_in_executor` (not aiosqlite): one less dependency
and the queries are tiny (fetchone / fetchall). The runner does its own log
streaming via aiofiles-equivalent so we never block the loop on disk.
Migration system:
- Each entry in MIGRATIONS is (version_id, sql_text). Versions are date-tagged
so they sort lexicographically.
- Apply in order, INSERT OR IGNORE into schema_migrations to handle
multi-worker boot races.
- Migrations are append-only; never edit a landed migration, add a new one.
"""
from __future__ import annotations
import asyncio
import json
import sqlite3
import time
from contextlib import contextmanager
from pathlib import Path
# fmt: off
MIGRATIONS: list[tuple[str, str]] = [
(
"001_schema_migrations",
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
);
""",
),
(
"002_tokens",
"""
CREATE TABLE IF NOT EXISTS tokens (
name TEXT PRIMARY KEY,
bearer_hash TEXT NOT NULL UNIQUE,
is_admin INTEGER NOT NULL DEFAULT 0,
ip_allowlist_json TEXT,
created_at INTEGER NOT NULL,
last_used_at INTEGER,
revoked_at INTEGER
);
""",
),
(
"003_projects",
"""
CREATE TABLE IF NOT EXISTS projects (
name TEXT PRIMARY KEY,
git_url TEXT NOT NULL,
default_branch TEXT NOT NULL DEFAULT 'main',
recipe_json TEXT NOT NULL,
owner_token TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (owner_token) REFERENCES tokens(name)
);
""",
),
(
"004_jobs",
"""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
project_name TEXT NOT NULL,
subproject_path TEXT NOT NULL,
recipe TEXT NOT NULL,
branch TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
queued_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER,
exit_code INTEGER,
log_path TEXT NOT NULL,
findings_count INTEGER NOT NULL DEFAULT 0,
recipe_snapshot_json TEXT NOT NULL,
FOREIGN KEY (project_name) REFERENCES projects(name) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_jobs_project ON jobs(project_name);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
CREATE INDEX IF NOT EXISTS idx_jobs_queued_at ON jobs(queued_at);
""",
),
(
"005_findings",
"""
CREATE TABLE IF NOT EXISTS findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
kind TEXT NOT NULL,
severity TEXT NOT NULL,
file TEXT,
line INTEGER,
code TEXT,
message TEXT NOT NULL,
suggested_fix TEXT,
raw_json TEXT,
created_at INTEGER NOT NULL,
fingerprint TEXT NOT NULL,
FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_findings_job ON findings(job_id);
CREATE INDEX IF NOT EXISTS idx_findings_fingerprint ON findings(fingerprint);
""",
),
(
"006_digest_runs",
"""
CREATE TABLE IF NOT EXISTS digest_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
project_name TEXT NOT NULL,
sent_at INTEGER NOT NULL,
recipient_count INTEGER NOT NULL,
job_count INTEGER NOT NULL,
UNIQUE(date, project_name)
);
CREATE INDEX IF NOT EXISTS idx_digest_runs_date ON digest_runs(date);
""",
),
(
"007_patch_attempts",
"""
CREATE TABLE IF NOT EXISTS patch_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
finding_id INTEGER NOT NULL,
job_id TEXT NOT NULL,
project_name TEXT NOT NULL,
attempt_number INTEGER NOT NULL,
status TEXT NOT NULL,
branch_name TEXT,
pr_url TEXT,
diff_excerpt TEXT,
session_id TEXT,
error TEXT,
created_at INTEGER NOT NULL,
finished_at INTEGER,
UNIQUE(finding_id, attempt_number),
FOREIGN KEY (finding_id) REFERENCES findings(id) ON DELETE CASCADE,
FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_patch_attempts_status ON patch_attempts(status);
CREATE INDEX IF NOT EXISTS idx_patch_attempts_project ON patch_attempts(project_name);
CREATE INDEX IF NOT EXISTS idx_patch_attempts_finding ON patch_attempts(finding_id);
""",
),
]
# fmt: on
def _hash(token: str) -> str:
import hashlib
return hashlib.sha256(token.encode("utf-8")).hexdigest()
class DB:
"""Synchronous SQLite wrapper. Async API methods wrap calls with
run_in_executor so callers in the FastAPI loop stay non-blocking.
"""
def __init__(self, db_path: str | Path):
self.db_path = str(db_path)
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
with self._conn() as c:
# WAL = many readers + one writer without lock contention.
# synchronous=NORMAL is the standard WAL pairing — durability
# against process crash is preserved; only OS crash can drop
# the most recent commit.
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
c.execute("PRAGMA foreign_keys=ON")
self.migrate()
@contextmanager
def _conn(self):
# isolation_level=None gives us autocommit; we wrap multi-stmt
# operations in BEGIN / COMMIT explicitly when we need atomicity.
conn = sqlite3.connect(self.db_path, isolation_level=None, timeout=10.0)
conn.row_factory = sqlite3.Row
try:
conn.execute("PRAGMA foreign_keys=ON")
yield conn
finally:
conn.close()
# ---------- migrations ---------------------------------------------------
def migrate(self) -> list[str]:
"""Apply any pending migrations. Returns the list of versions applied
on this call (empty if up-to-date). Idempotent + race-safe."""
applied: list[str] = []
with self._conn() as c:
# Migration 001 must run first since it creates the tracking table.
# Use IF NOT EXISTS in every CREATE so repeat runs are no-ops.
for version, sql in MIGRATIONS:
c.executescript(sql)
cur = c.execute(
"INSERT OR IGNORE INTO schema_migrations (version, applied_at) VALUES (?, ?)",
(version, int(time.time())),
)
if cur.rowcount == 1:
applied.append(version)
return applied
def applied_migrations(self) -> list[str]:
with self._conn() as c:
rows = c.execute(
"SELECT version FROM schema_migrations ORDER BY version"
).fetchall()
return [r["version"] for r in rows]
# ---------- tokens -------------------------------------------------------
def insert_token(
self,
*,
name: str,
bearer: str,
is_admin: bool,
ip_cidrs: list[str] | None,
) -> None:
ip_json = json.dumps(ip_cidrs) if ip_cidrs else None
with self._conn() as c:
c.execute(
"""
INSERT INTO tokens (name, bearer_hash, is_admin, ip_allowlist_json, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(name, _hash(bearer), 1 if is_admin else 0, ip_json, int(time.time())),
)
def lookup_token_by_bearer(self, bearer: str) -> dict | None:
h = _hash(bearer)
with self._conn() as c:
row = c.execute(
"""
SELECT name, is_admin, ip_allowlist_json, revoked_at
FROM tokens WHERE bearer_hash=?
""",
(h,),
).fetchone()
if not row:
return None
if row["revoked_at"] is not None:
return None
c.execute("UPDATE tokens SET last_used_at=? WHERE bearer_hash=?", (int(time.time()), h))
ip_cidrs = json.loads(row["ip_allowlist_json"]) if row["ip_allowlist_json"] else None
return {
"name": row["name"],
"is_admin": bool(row["is_admin"]),
"ip_cidrs": ip_cidrs,
}
def get_token(self, name: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"""
SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at
FROM tokens WHERE name=?
""",
(name,),
).fetchone()
if not row:
return None
d = dict(row)
d["is_admin"] = bool(d["is_admin"])
d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None
return d
def list_tokens(self) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"""
SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at
FROM tokens ORDER BY name
""",
).fetchall()
out = []
for r in rows:
d = dict(r)
d["is_admin"] = bool(d["is_admin"])
d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None
out.append(d)
return out
def revoke_token(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute(
"UPDATE tokens SET revoked_at=? WHERE name=? AND revoked_at IS NULL",
(int(time.time()), name),
)
return cur.rowcount > 0
# ---------- projects -----------------------------------------------------
def upsert_project(
self,
*,
name: str,
git_url: str,
default_branch: str,
recipe_json: str,
owner_token: str,
) -> dict:
now = int(time.time())
with self._conn() as c:
row = c.execute("SELECT created_at, owner_token FROM projects WHERE name=?", (name,)).fetchone()
if row is None:
c.execute(
"""
INSERT INTO projects (name, git_url, default_branch, recipe_json, owner_token, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(name, git_url, default_branch, recipe_json, owner_token, now, now),
)
created_at = now
else:
created_at = row["created_at"]
c.execute(
"""
UPDATE projects
SET git_url=?, default_branch=?, recipe_json=?, updated_at=?
WHERE name=?
""",
(git_url, default_branch, recipe_json, now, name),
)
return {
"name": name,
"git_url": git_url,
"default_branch": default_branch,
"recipe_json": recipe_json,
"owner_token": owner_token,
"created_at": created_at,
"updated_at": now,
}
def get_project(self, name: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT * FROM projects WHERE name=?",
(name,),
).fetchone()
return dict(row) if row else None
def list_projects(self, *, owner_token: str | None = None) -> list[dict]:
with self._conn() as c:
if owner_token is None:
rows = c.execute("SELECT * FROM projects ORDER BY name").fetchall()
else:
rows = c.execute(
"SELECT * FROM projects WHERE owner_token=? ORDER BY name",
(owner_token,),
).fetchall()
return [dict(r) for r in rows]
def delete_project(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute("DELETE FROM projects WHERE name=?", (name,))
return cur.rowcount > 0
# ---------- jobs ---------------------------------------------------------
def insert_job(
self,
*,
job_id: str,
project_name: str,
subproject_path: str,
recipe: str,
branch: str,
log_path: str,
recipe_snapshot_json: str,
) -> dict:
now = int(time.time())
with self._conn() as c:
c.execute(
"""
INSERT INTO jobs (id, project_name, subproject_path, recipe, branch,
status, queued_at, log_path, recipe_snapshot_json)
VALUES (?, ?, ?, ?, ?, 'queued', ?, ?, ?)
""",
(job_id, project_name, subproject_path, recipe, branch, now, log_path, recipe_snapshot_json),
)
return {
"id": job_id,
"project_name": project_name,
"subproject_path": subproject_path,
"recipe": recipe,
"branch": branch,
"status": "queued",
"queued_at": now,
"started_at": None,
"finished_at": None,
"exit_code": None,
"log_path": log_path,
"findings_count": 0,
}
def get_job(self, job_id: str) -> dict | None:
with self._conn() as c:
row = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
return dict(row) if row else None
def list_jobs(
self,
*,
project_name: str | None = None,
status: str | None = None,
owner_token: str | None = None,
limit: int = 50,
) -> list[dict]:
sql = """
SELECT j.* FROM jobs j
JOIN projects p ON p.name = j.project_name
WHERE 1=1
"""
params: list = []
if project_name is not None:
sql += " AND j.project_name=?"
params.append(project_name)
if status is not None:
sql += " AND j.status=?"
params.append(status)
if owner_token is not None:
sql += " AND p.owner_token=?"
params.append(owner_token)
sql += " ORDER BY j.queued_at DESC LIMIT ?"
params.append(int(limit))
with self._conn() as c:
rows = c.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def mark_job_running(self, job_id: str) -> None:
with self._conn() as c:
c.execute(
"UPDATE jobs SET status='running', started_at=? WHERE id=? AND status='queued'",
(int(time.time()), job_id),
)
def mark_job_finished(
self,
*,
job_id: str,
status: str,
exit_code: int | None,
) -> None:
with self._conn() as c:
c.execute(
"""
UPDATE jobs
SET status=?, finished_at=?, exit_code=?
WHERE id=?
""",
(status, int(time.time()), exit_code, job_id),
)
def mark_orphaned_jobs_failed(self, *, log_dir: Path | None = None) -> list[str]:
"""Sweep on boot — any job in 'running' state was orphaned by a process
crash. Mark them failed with exit_code=-1 so callers see the terminal
state and can re-queue if they want."""
ids: list[str] = []
now = int(time.time())
with self._conn() as c:
rows = c.execute("SELECT id, log_path FROM jobs WHERE status='running'").fetchall()
for r in rows:
ids.append(r["id"])
c.execute(
"UPDATE jobs SET status='failed', finished_at=?, exit_code=-1 WHERE id=?",
(now, r["id"]),
)
if log_dir is not None:
try:
Path(r["log_path"]).parent.mkdir(parents=True, exist_ok=True)
with open(r["log_path"], "a", encoding="utf-8") as fh:
fh.write("\n[crafting-table] runner restart, job orphaned\n")
except OSError:
pass
return ids
def increment_findings_count(self, job_id: str, n: int) -> None:
with self._conn() as c:
c.execute(
"UPDATE jobs SET findings_count = findings_count + ? WHERE id=?",
(n, job_id),
)
# ---------- findings -----------------------------------------------------
def insert_finding(
self,
*,
job_id: str,
kind: str,
severity: str,
message: str,
fingerprint: str,
file: str | None = None,
line: int | None = None,
code: str | None = None,
suggested_fix: str | None = None,
raw_json: str | None = None,
) -> int:
with self._conn() as c:
cur = c.execute(
"""
INSERT INTO findings (job_id, kind, severity, file, line, code, message,
suggested_fix, raw_json, created_at, fingerprint)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(job_id, kind, severity, file, line, code, message,
suggested_fix, raw_json, int(time.time()), fingerprint),
)
return cur.lastrowid
def list_findings(self, job_id: str) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"SELECT * FROM findings WHERE job_id=? ORDER BY id",
(job_id,),
).fetchall()
return [dict(r) for r in rows]
# ---------- digest runs --------------------------------------------------
def record_digest_run(
self,
*,
date: str,
project_name: str,
sent_at: int,
recipient_count: int,
job_count: int,
) -> bool:
"""Record an attempted/successful digest send. UNIQUE(date, project_name)
enforces idempotency — a second call for the same date+project is a
no-op (returns False)."""
with self._conn() as c:
cur = c.execute(
"""
INSERT OR IGNORE INTO digest_runs
(date, project_name, sent_at, recipient_count, job_count)
VALUES (?, ?, ?, ?, ?)
""",
(date, project_name, sent_at, recipient_count, job_count),
)
return cur.rowcount == 1
def digest_run_exists(self, date: str, project_name: str) -> bool:
with self._conn() as c:
row = c.execute(
"SELECT 1 FROM digest_runs WHERE date=? AND project_name=?",
(date, project_name),
).fetchone()
return row is not None
def list_digest_runs(self, date: str | None = None) -> list[dict]:
with self._conn() as c:
if date is None:
rows = c.execute(
"SELECT * FROM digest_runs ORDER BY date DESC, project_name"
).fetchall()
else:
rows = c.execute(
"SELECT * FROM digest_runs WHERE date=? ORDER BY project_name",
(date,),
).fetchall()
return [dict(r) for r in rows]
# ---------- patch attempts ----------------------------------------------
def get_finding(self, finding_id: int) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT * FROM findings WHERE id=?", (int(finding_id),)
).fetchone()
return dict(row) if row else None
def list_findings_for_job(self, job_id: str) -> list[dict]:
"""Alias matching list_findings — kept for callers that prefer the
more explicit name."""
return self.list_findings(job_id)
def count_patch_attempts(self, finding_id: int) -> int:
with self._conn() as c:
row = c.execute(
"SELECT COUNT(*) AS n FROM patch_attempts WHERE finding_id=?",
(int(finding_id),),
).fetchone()
return int(row["n"]) if row else 0
def insert_patch_attempt(
self,
*,
finding_id: int,
job_id: str,
project_name: str,
attempt_number: int,
status: str,
branch_name: str | None = None,
pr_url: str | None = None,
diff_excerpt: str | None = None,
session_id: str | None = None,
error: str | None = None,
) -> int:
with self._conn() as c:
cur = c.execute(
"""
INSERT INTO patch_attempts
(finding_id, job_id, project_name, attempt_number, status,
branch_name, pr_url, diff_excerpt, session_id, error,
created_at, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
int(finding_id),
job_id,
project_name,
int(attempt_number),
status,
branch_name,
pr_url,
diff_excerpt,
session_id,
error,
int(time.time()),
int(time.time()),
),
)
return int(cur.lastrowid)
def get_patch_attempt(self, attempt_id: int) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT * FROM patch_attempts WHERE id=?", (int(attempt_id),)
).fetchone()
return dict(row) if row else None
def list_patch_attempts(
self,
*,
project_name: str | None = None,
status: str | None = None,
finding_id: int | None = None,
owner_token: str | None = None,
limit: int = 100,
) -> list[dict]:
sql = """
SELECT pa.* FROM patch_attempts pa
JOIN projects p ON p.name = pa.project_name
WHERE 1=1
"""
params: list = []
if project_name is not None:
sql += " AND pa.project_name=?"
params.append(project_name)
if status is not None:
sql += " AND pa.status=?"
params.append(status)
if finding_id is not None:
sql += " AND pa.finding_id=?"
params.append(int(finding_id))
if owner_token is not None:
sql += " AND p.owner_token=?"
params.append(owner_token)
sql += " ORDER BY pa.created_at DESC LIMIT ?"
params.append(int(limit))
with self._conn() as c:
rows = c.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def list_patch_attempts_in_window(
self,
*,
window_start: int,
window_end: int,
project_name: str | None = None,
statuses: tuple[str, ...] | None = None,
) -> list[dict]:
"""Patch attempts created within [window_start, window_end]. Used by
the email digest to surface drafted patches in the daily summary."""
sql = "SELECT * FROM patch_attempts WHERE created_at >= ? AND created_at <= ?"
params: list = [int(window_start), int(window_end)]
if project_name is not None:
sql += " AND project_name=?"
params.append(project_name)
if statuses:
placeholders = ",".join("?" for _ in statuses)
sql += f" AND status IN ({placeholders})"
params.extend(statuses)
sql += " ORDER BY created_at"
with self._conn() as c:
rows = c.execute(sql, params).fetchall()
return [dict(r) for r in rows]
# ---------- async wrappers ----------------------------------------------
async def arun(self, fn, *args, **kwargs):
"""Run a sync DB method in the default executor.
Use from FastAPI request handlers / runner coroutines so we don't
block the event loop on disk I/O. Most of these queries are <1ms but
the pattern stays consistent for when we add bigger ones later.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))