crafting-table/crafting_table/db.py
Kayos 0ec3a04676 v0.1 wave 1 (steps 2+3+4): SQLite ledger + FastAPI skeleton + async job runner
- db.py: migrations + DAOs for tokens / projects / jobs / findings (SQLite WAL)
- auth.py: SHA-256 bearer hashing + LAN-CIDR allowlist + admin/app token tiers
- models.py: Pydantic shapes (Project, Subproject, Schedule, Notify, Job, CreateJobRequest)
- server.py: FastAPI on port 8810; /healthz, /admin/tokens/*, /projects/*, /jobs, /jobs/{id}, /jobs/{id}/log, /jobs/{id}/findings
- runner.py: bounded asyncio pool, per-job timeout with process-group SIGTERM→SIGKILL escalation, orphaned-job recovery on boot
- workspace.py: bare-clone + worktree materialization, gc
- config.py: env-driven
- 62 tests across db / auth / projects / jobs / runner / e2e — all green

Cross-token project access returns 404 (not 403) — existence-leak guard.
Bearer tokens hashed at rest; admin token bootstrapped on first boot.
Recipe subprocess uses start_new_session=True so killpg targets the
whole process tree on timeout — child processes can't escape SIGKILL.
Pump task guarded with wait_for(2s) + cancel fallback against any
orphan that survives the group kill.

Wave 2 (parsers + findings extraction + MCP + email digest) pending.

Spec: memory/spec-crafting-table.md
2026-04-29 08:17:41 -07:00

502 lines
18 KiB
Python

"""SQLite ledger + migrations.
Why SQLite (not MariaDB like clawdforge): single-process, single-host service,
no need for cross-host replication. The runner is the only writer; every
HTTP worker reads. SQLite in WAL mode handles single-writer-many-readers
cleanly. Trade-off documented in README.
Why stdlib `sqlite3` + `run_in_executor` (not aiosqlite): one less dependency
and the queries are tiny (fetchone / fetchall). The runner does its own log
streaming via aiofiles-equivalent so we never block the loop on disk.
Migration system:
- Each entry in MIGRATIONS is (version_id, sql_text). Versions are date-tagged
so they sort lexicographically.
- Apply in order, INSERT OR IGNORE into schema_migrations to handle
multi-worker boot races (mirrors cauldron's pattern).
- Migrations are append-only; never edit a landed migration, add a new one.
"""
from __future__ import annotations
import asyncio
import json
import sqlite3
import time
from contextlib import contextmanager
from pathlib import Path
# fmt: off
MIGRATIONS: list[tuple[str, str]] = [
(
"001_schema_migrations",
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
);
""",
),
(
"002_tokens",
"""
CREATE TABLE IF NOT EXISTS tokens (
name TEXT PRIMARY KEY,
bearer_hash TEXT NOT NULL UNIQUE,
is_admin INTEGER NOT NULL DEFAULT 0,
ip_allowlist_json TEXT,
created_at INTEGER NOT NULL,
last_used_at INTEGER,
revoked_at INTEGER
);
""",
),
(
"003_projects",
"""
CREATE TABLE IF NOT EXISTS projects (
name TEXT PRIMARY KEY,
git_url TEXT NOT NULL,
default_branch TEXT NOT NULL DEFAULT 'main',
recipe_json TEXT NOT NULL,
owner_token TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (owner_token) REFERENCES tokens(name)
);
""",
),
(
"004_jobs",
"""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
project_name TEXT NOT NULL,
subproject_path TEXT NOT NULL,
recipe TEXT NOT NULL,
branch TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
queued_at INTEGER NOT NULL,
started_at INTEGER,
finished_at INTEGER,
exit_code INTEGER,
log_path TEXT NOT NULL,
findings_count INTEGER NOT NULL DEFAULT 0,
recipe_snapshot_json TEXT NOT NULL,
FOREIGN KEY (project_name) REFERENCES projects(name) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_jobs_project ON jobs(project_name);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
CREATE INDEX IF NOT EXISTS idx_jobs_queued_at ON jobs(queued_at);
""",
),
(
"005_findings",
"""
CREATE TABLE IF NOT EXISTS findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
kind TEXT NOT NULL,
severity TEXT NOT NULL,
file TEXT,
line INTEGER,
code TEXT,
message TEXT NOT NULL,
suggested_fix TEXT,
raw_json TEXT,
created_at INTEGER NOT NULL,
fingerprint TEXT NOT NULL,
FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_findings_job ON findings(job_id);
CREATE INDEX IF NOT EXISTS idx_findings_fingerprint ON findings(fingerprint);
""",
),
]
# fmt: on
def _hash(token: str) -> str:
import hashlib
return hashlib.sha256(token.encode("utf-8")).hexdigest()
class DB:
"""Synchronous SQLite wrapper. Async API methods wrap calls with
run_in_executor so callers in the FastAPI loop stay non-blocking.
"""
def __init__(self, db_path: str | Path):
self.db_path = str(db_path)
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
with self._conn() as c:
# WAL = many readers + one writer without lock contention.
# synchronous=NORMAL is the standard WAL pairing — durability
# against process crash is preserved; only OS crash can drop
# the most recent commit.
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
c.execute("PRAGMA foreign_keys=ON")
self.migrate()
@contextmanager
def _conn(self):
# isolation_level=None gives us autocommit; we wrap multi-stmt
# operations in BEGIN / COMMIT explicitly when we need atomicity.
conn = sqlite3.connect(self.db_path, isolation_level=None, timeout=10.0)
conn.row_factory = sqlite3.Row
try:
conn.execute("PRAGMA foreign_keys=ON")
yield conn
finally:
conn.close()
# ---------- migrations ---------------------------------------------------
def migrate(self) -> list[str]:
"""Apply any pending migrations. Returns the list of versions applied
on this call (empty if up-to-date). Idempotent + race-safe."""
applied: list[str] = []
with self._conn() as c:
# Migration 001 must run first since it creates the tracking table.
# Use IF NOT EXISTS in every CREATE so repeat runs are no-ops.
for version, sql in MIGRATIONS:
c.executescript(sql)
cur = c.execute(
"INSERT OR IGNORE INTO schema_migrations (version, applied_at) VALUES (?, ?)",
(version, int(time.time())),
)
if cur.rowcount == 1:
applied.append(version)
return applied
def applied_migrations(self) -> list[str]:
with self._conn() as c:
rows = c.execute(
"SELECT version FROM schema_migrations ORDER BY version"
).fetchall()
return [r["version"] for r in rows]
# ---------- tokens -------------------------------------------------------
def insert_token(
self,
*,
name: str,
bearer: str,
is_admin: bool,
ip_cidrs: list[str] | None,
) -> None:
ip_json = json.dumps(ip_cidrs) if ip_cidrs else None
with self._conn() as c:
c.execute(
"""
INSERT INTO tokens (name, bearer_hash, is_admin, ip_allowlist_json, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(name, _hash(bearer), 1 if is_admin else 0, ip_json, int(time.time())),
)
def lookup_token_by_bearer(self, bearer: str) -> dict | None:
h = _hash(bearer)
with self._conn() as c:
row = c.execute(
"""
SELECT name, is_admin, ip_allowlist_json, revoked_at
FROM tokens WHERE bearer_hash=?
""",
(h,),
).fetchone()
if not row:
return None
if row["revoked_at"] is not None:
return None
c.execute("UPDATE tokens SET last_used_at=? WHERE bearer_hash=?", (int(time.time()), h))
ip_cidrs = json.loads(row["ip_allowlist_json"]) if row["ip_allowlist_json"] else None
return {
"name": row["name"],
"is_admin": bool(row["is_admin"]),
"ip_cidrs": ip_cidrs,
}
def get_token(self, name: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"""
SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at
FROM tokens WHERE name=?
""",
(name,),
).fetchone()
if not row:
return None
d = dict(row)
d["is_admin"] = bool(d["is_admin"])
d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None
return d
def list_tokens(self) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"""
SELECT name, is_admin, ip_allowlist_json, created_at, last_used_at, revoked_at
FROM tokens ORDER BY name
""",
).fetchall()
out = []
for r in rows:
d = dict(r)
d["is_admin"] = bool(d["is_admin"])
d["ip_cidrs"] = json.loads(d.pop("ip_allowlist_json")) if d["ip_allowlist_json"] else None
out.append(d)
return out
def revoke_token(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute(
"UPDATE tokens SET revoked_at=? WHERE name=? AND revoked_at IS NULL",
(int(time.time()), name),
)
return cur.rowcount > 0
# ---------- projects -----------------------------------------------------
def upsert_project(
self,
*,
name: str,
git_url: str,
default_branch: str,
recipe_json: str,
owner_token: str,
) -> dict:
now = int(time.time())
with self._conn() as c:
row = c.execute("SELECT created_at, owner_token FROM projects WHERE name=?", (name,)).fetchone()
if row is None:
c.execute(
"""
INSERT INTO projects (name, git_url, default_branch, recipe_json, owner_token, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(name, git_url, default_branch, recipe_json, owner_token, now, now),
)
created_at = now
else:
created_at = row["created_at"]
c.execute(
"""
UPDATE projects
SET git_url=?, default_branch=?, recipe_json=?, updated_at=?
WHERE name=?
""",
(git_url, default_branch, recipe_json, now, name),
)
return {
"name": name,
"git_url": git_url,
"default_branch": default_branch,
"recipe_json": recipe_json,
"owner_token": owner_token,
"created_at": created_at,
"updated_at": now,
}
def get_project(self, name: str) -> dict | None:
with self._conn() as c:
row = c.execute(
"SELECT * FROM projects WHERE name=?",
(name,),
).fetchone()
return dict(row) if row else None
def list_projects(self, *, owner_token: str | None = None) -> list[dict]:
with self._conn() as c:
if owner_token is None:
rows = c.execute("SELECT * FROM projects ORDER BY name").fetchall()
else:
rows = c.execute(
"SELECT * FROM projects WHERE owner_token=? ORDER BY name",
(owner_token,),
).fetchall()
return [dict(r) for r in rows]
def delete_project(self, name: str) -> bool:
with self._conn() as c:
cur = c.execute("DELETE FROM projects WHERE name=?", (name,))
return cur.rowcount > 0
# ---------- jobs ---------------------------------------------------------
def insert_job(
self,
*,
job_id: str,
project_name: str,
subproject_path: str,
recipe: str,
branch: str,
log_path: str,
recipe_snapshot_json: str,
) -> dict:
now = int(time.time())
with self._conn() as c:
c.execute(
"""
INSERT INTO jobs (id, project_name, subproject_path, recipe, branch,
status, queued_at, log_path, recipe_snapshot_json)
VALUES (?, ?, ?, ?, ?, 'queued', ?, ?, ?)
""",
(job_id, project_name, subproject_path, recipe, branch, now, log_path, recipe_snapshot_json),
)
return {
"id": job_id,
"project_name": project_name,
"subproject_path": subproject_path,
"recipe": recipe,
"branch": branch,
"status": "queued",
"queued_at": now,
"started_at": None,
"finished_at": None,
"exit_code": None,
"log_path": log_path,
"findings_count": 0,
}
def get_job(self, job_id: str) -> dict | None:
with self._conn() as c:
row = c.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
return dict(row) if row else None
def list_jobs(
self,
*,
project_name: str | None = None,
status: str | None = None,
owner_token: str | None = None,
limit: int = 50,
) -> list[dict]:
sql = """
SELECT j.* FROM jobs j
JOIN projects p ON p.name = j.project_name
WHERE 1=1
"""
params: list = []
if project_name is not None:
sql += " AND j.project_name=?"
params.append(project_name)
if status is not None:
sql += " AND j.status=?"
params.append(status)
if owner_token is not None:
sql += " AND p.owner_token=?"
params.append(owner_token)
sql += " ORDER BY j.queued_at DESC LIMIT ?"
params.append(int(limit))
with self._conn() as c:
rows = c.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def mark_job_running(self, job_id: str) -> None:
with self._conn() as c:
c.execute(
"UPDATE jobs SET status='running', started_at=? WHERE id=? AND status='queued'",
(int(time.time()), job_id),
)
def mark_job_finished(
self,
*,
job_id: str,
status: str,
exit_code: int | None,
) -> None:
with self._conn() as c:
c.execute(
"""
UPDATE jobs
SET status=?, finished_at=?, exit_code=?
WHERE id=?
""",
(status, int(time.time()), exit_code, job_id),
)
def mark_orphaned_jobs_failed(self, *, log_dir: Path | None = None) -> list[str]:
"""Sweep on boot — any job in 'running' state was orphaned by a process
crash. Mark them failed with exit_code=-1 so callers see the terminal
state and can re-queue if they want."""
ids: list[str] = []
now = int(time.time())
with self._conn() as c:
rows = c.execute("SELECT id, log_path FROM jobs WHERE status='running'").fetchall()
for r in rows:
ids.append(r["id"])
c.execute(
"UPDATE jobs SET status='failed', finished_at=?, exit_code=-1 WHERE id=?",
(now, r["id"]),
)
if log_dir is not None:
try:
Path(r["log_path"]).parent.mkdir(parents=True, exist_ok=True)
with open(r["log_path"], "a", encoding="utf-8") as fh:
fh.write("\n[crafting-table] runner restart, job orphaned\n")
except OSError:
pass
return ids
def increment_findings_count(self, job_id: str, n: int) -> None:
with self._conn() as c:
c.execute(
"UPDATE jobs SET findings_count = findings_count + ? WHERE id=?",
(n, job_id),
)
# ---------- findings -----------------------------------------------------
def insert_finding(
self,
*,
job_id: str,
kind: str,
severity: str,
message: str,
fingerprint: str,
file: str | None = None,
line: int | None = None,
code: str | None = None,
suggested_fix: str | None = None,
raw_json: str | None = None,
) -> int:
with self._conn() as c:
cur = c.execute(
"""
INSERT INTO findings (job_id, kind, severity, file, line, code, message,
suggested_fix, raw_json, created_at, fingerprint)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(job_id, kind, severity, file, line, code, message,
suggested_fix, raw_json, int(time.time()), fingerprint),
)
return cur.lastrowid
def list_findings(self, job_id: str) -> list[dict]:
with self._conn() as c:
rows = c.execute(
"SELECT * FROM findings WHERE job_id=? ORDER BY id",
(job_id,),
).fetchall()
return [dict(r) for r in rows]
# ---------- async wrappers ----------------------------------------------
async def arun(self, fn, *args, **kwargs):
"""Run a sync DB method in the default executor.
Use from FastAPI request handlers / runner coroutines so we don't
block the event loop on disk I/O. Most of these queries are <1ms but
the pattern stays consistent for when we add bigger ones later.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: fn(*args, **kwargs))