cauldron/cauldron/db.py
Kayos 213801ca70 v0.2 foundation — Authentik OIDC + sulkta-mariadb DB + Fernet crypto
Adds the multi-user plumbing layer underneath v0.1's batch-only API:

- DB module (db.py) — PyMySQL against sulkta-mariadb, in-process migrations.
  Tables: cauldron_users, cauldron_user_mealie_tokens, cauldron_chat_log,
  schema_migrations.
- Crypto module (crypto.py) — thin Fernet wrapper. Master key in env,
  per-row encryption of stored Mealie tokens, decrypt only in-process.
- OIDC module (oidc.py) — Authlib-based Authentik integration. Issuer
  https://auth.sulkta.com/application/o/cauldron/, sub_mode=user_email,
  scopes openid+email+profile. App gated to 'Sulkta Family' group.
- Two-tier Mealie shape — system_mealie (env token, admin batch) +
  current_user_mealie() helper that loads + decrypts the calling user's
  token from DB. Per the v0.2 design (memory/spec-cauldron-v0.2.md).
- Connect flow — /connect-mealie pages walk users through minting their
  own Mealie API token and pasting it back. Validated against
  /api/users/self before encryption + storage.
- Routes — /, /login, /auth/callback, /logout, /me, /connect-mealie,
  /disconnect-mealie. v0.1 admin endpoints kept under bearer auth.
- Mealie.who_am_i() helper added.
- Auth flow uses Authentik subject (sub) as the canonical user key.

UI is minimal — connect-mealie page uses the locked palette
(forest #1f2d1f, panels #2d3a2a, meadow #6b8e5a/#88a87a, parchment text
#f0e6cc/#ddd4ba) and Cormorant Garamond serif headers. Strict palette.
The fuller dashboard / plan / list / recipes views land in subsequent
commits.

Authentik provider PK 24, client_id ZIwEugWWWZinR1KcVC9IT9hpGoTds9ps8XDDHPPN.
Group 'Sulkta Family' (pk 6d0c75e9-...) created with cobb member.

Foundation only — Abby's branded UI and the meal-plan / shopping-list
features land in subsequent v0.2 commits.
2026-04-28 19:47:47 -07:00

200 lines
6.9 KiB
Python

"""DB access + migrations against sulkta-mariadb.
Uses PyMySQL with a tiny per-request connection (no pool) — Cauldron is
LAN-only family-internal, traffic is single-digit qps. If load ever grows
swap in DBUtils.PooledDB or SQLAlchemy.
"""
from contextlib import contextmanager
from pathlib import Path
import pymysql
import pymysql.cursors
MIGRATIONS = [
# 001 — bookkeeping
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(16) PRIMARY KEY,
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 002 — users (Authentik subject is the PK)
"""
CREATE TABLE IF NOT EXISTS cauldron_users (
authentik_sub VARCHAR(190) PRIMARY KEY,
email VARCHAR(255) NOT NULL,
display_name VARCHAR(255),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen DATETIME,
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 003 — per-user encrypted Mealie tokens
"""
CREATE TABLE IF NOT EXISTS cauldron_user_mealie_tokens (
authentik_sub VARCHAR(190) PRIMARY KEY,
encrypted_token BLOB NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_validated DATETIME,
last_failure_at DATETIME,
last_failure_reason VARCHAR(500),
FOREIGN KEY (authentik_sub) REFERENCES cauldron_users(authentik_sub)
ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 004 — chat / AI run log (joins to clawdforge runs server-side)
"""
CREATE TABLE IF NOT EXISTS cauldron_chat_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
authentik_sub VARCHAR(190) NOT NULL,
ts DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
intent VARCHAR(64),
forge_duration_ms INT,
forge_model VARCHAR(64),
prompt_chars INT,
result_chars INT,
ok BOOLEAN NOT NULL DEFAULT TRUE,
error VARCHAR(500),
INDEX idx_user_ts (authentik_sub, ts)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
]
class DB:
def __init__(self, *, host: str, port: int, name: str, user: str, password: str):
self.kwargs = dict(
host=host,
port=port,
user=user,
password=password,
database=name,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=False,
)
@contextmanager
def conn(self):
c = pymysql.connect(**self.kwargs)
try:
yield c
c.commit()
except Exception:
c.rollback()
raise
finally:
c.close()
def migrate(self) -> list[str]:
"""Apply pending migrations. Returns list of versions applied."""
applied: list[str] = []
with self.conn() as c:
with c.cursor() as cur:
cur.execute(MIGRATIONS[0]) # bootstrap migrations table
cur.execute("SELECT version FROM schema_migrations")
done = {r["version"] for r in cur.fetchall()}
for i, sql in enumerate(MIGRATIONS, start=1):
ver = f"{i:03d}"
if ver in done:
continue
cur.execute(sql)
cur.execute(
"INSERT INTO schema_migrations (version) VALUES (%s)", (ver,)
)
applied.append(ver)
return applied
# --- user ops -----------------------------------------------------------
def upsert_user(self, *, sub: str, email: str, display_name: str | None) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_users (authentik_sub, email, display_name, last_seen)
VALUES (%s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
email = VALUES(email),
display_name = COALESCE(VALUES(display_name), display_name),
last_seen = NOW()
""",
(sub, email, display_name),
)
def get_user(self, sub: str) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT authentik_sub, email, display_name, last_seen FROM cauldron_users WHERE authentik_sub=%s",
(sub,),
)
return cur.fetchone()
# --- mealie token ops ---------------------------------------------------
def get_user_mealie_token_blob(self, sub: str) -> bytes | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT encrypted_token FROM cauldron_user_mealie_tokens WHERE authentik_sub=%s",
(sub,),
)
row = cur.fetchone()
return row["encrypted_token"] if row else None
def set_user_mealie_token_blob(self, sub: str, blob: bytes) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_user_mealie_tokens (authentik_sub, encrypted_token, last_validated)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE
encrypted_token = VALUES(encrypted_token),
last_validated = NOW(),
last_failure_at = NULL,
last_failure_reason = NULL
""",
(sub, blob),
)
def delete_user_mealie_token(self, sub: str) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"DELETE FROM cauldron_user_mealie_tokens WHERE authentik_sub=%s",
(sub,),
)
def mark_user_mealie_token_failure(self, sub: str, reason: str) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_user_mealie_tokens
SET last_failure_at = NOW(), last_failure_reason = %s
WHERE authentik_sub = %s
""",
(reason[:500], sub),
)
# --- chat log -----------------------------------------------------------
def log_chat(
self,
*,
sub: str,
intent: str,
duration_ms: int,
model: str,
prompt_chars: int,
result_chars: int,
ok: bool,
error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_chat_log
(authentik_sub, intent, forge_duration_ms, forge_model,
prompt_chars, result_chars, ok, error)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""",
(sub, intent, duration_ms, model, prompt_chars, result_chars, ok, (error or "")[:500] or None),
)