cauldron/cauldron/db.py
Kayos fb94da7cce discover: per-household imports + group-aware UI
Audit-driven cleanup of the multi-household design:

- migration 035: cauldron_households.mealie_group_id (was missing — blocked
  any cross-household reasoning)
- migration 036: cauldron_recipe_index.mealie_group_id (additive; current
  per-household keying preserved, the column lets future code key by group
  without a destructive PK rebuild)
- migration 037: drop dead cauldron_food_mapping (replaced by
  cauldron_food_metadata in foundation reset Step 2 / commit f74a627
  2026-04-30; zero callers in the codebase)
- migration 038: cauldron_discover_imports(discover_id, household_id,
  mealie_slug, imported_by_sub, imported_at) — per-household provenance.
  Replaces the global cauldron_discovered_recipes.status='imported' flag
  that incorrectly hid the import button from every user once anyone
  imported.

Code:
- sync_user_household reads who_am_i()["groupId"] (and nested group.id)
  and persists it on cauldron_households.mealie_group_id
- upsert_household accepts mealie_group_id with COALESCE-on-update
- new helpers: record_discover_import, get_discover_imports_for_group,
  discover_imported_by_household, get_household
- /api/discover/import: per-household idempotency (returns cached slug if
  this household already imported), records to the new join table; no
  longer flips global status='imported'
- /api/discover/search: decorates each row with imported_in_my_group +
  imported_by_my_household + mealie_slug + imported_by_household_name
- discover.html: card render uses imported_in_my_group; shows
  "✓ in your library as <slug>" when this household imported, or
  "✓ shared from <Other Household> as <slug>" when another household in
  the group imported (recipe is group-shared via Mealie's group-scope
  read so re-importing would create a duplicate)

The imported badge now correctly surfaces "imported by another household
in your group" rather than hiding the row entirely.
2026-05-01 20:40:56 -07:00

2516 lines
108 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""DB access + migrations against sulkta-mariadb.
Uses PyMySQL with a tiny per-request connection (no pool) — Cauldron is
LAN-only family-internal, traffic is single-digit qps. If load ever grows
swap in DBUtils.PooledDB or SQLAlchemy.
"""
from contextlib import contextmanager
from pathlib import Path
import pymysql
import pymysql.cursors
MIGRATIONS = [
# 001 — bookkeeping
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(16) PRIMARY KEY,
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 002 — users (Authentik subject is the PK)
"""
CREATE TABLE IF NOT EXISTS cauldron_users (
authentik_sub VARCHAR(190) PRIMARY KEY,
email VARCHAR(255) NOT NULL,
display_name VARCHAR(255),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_seen DATETIME,
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 003 — per-user encrypted Mealie tokens
"""
CREATE TABLE IF NOT EXISTS cauldron_user_mealie_tokens (
authentik_sub VARCHAR(190) PRIMARY KEY,
encrypted_token BLOB NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_validated DATETIME,
last_failure_at DATETIME,
last_failure_reason VARCHAR(500),
FOREIGN KEY (authentik_sub) REFERENCES cauldron_users(authentik_sub)
ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 004 — chat / AI run log (joins to clawdforge runs server-side)
"""
CREATE TABLE IF NOT EXISTS cauldron_chat_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
authentik_sub VARCHAR(190) NOT NULL,
ts DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
intent VARCHAR(64),
forge_duration_ms INT,
forge_model VARCHAR(64),
prompt_chars INT,
result_chars INT,
ok BOOLEAN NOT NULL DEFAULT TRUE,
error VARCHAR(500),
INDEX idx_user_ts (authentik_sub, ts)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 005 — meal picks: per-user list of recipes the user wants in the next
# AI meal plan run. Pre-populated wishlist that the planner respects.
"""
CREATE TABLE IF NOT EXISTS cauldron_meal_picks (
authentik_sub VARCHAR(190) NOT NULL,
recipe_slug VARCHAR(255) NOT NULL,
recipe_name VARCHAR(500) NOT NULL,
added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (authentik_sub, recipe_slug),
INDEX idx_user_added (authentik_sub, added_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 006 — households (cached mirror of Mealie's household) + membership.
# Keyed by Mealie's UUID. Multiple cauldron users join via the same
# Mealie household to share picks/plans.
"""
CREATE TABLE IF NOT EXISTS cauldron_households (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
mealie_household_id VARCHAR(64) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
"""
CREATE TABLE IF NOT EXISTS cauldron_household_members (
household_id BIGINT NOT NULL,
authentik_sub VARCHAR(190) NOT NULL,
role VARCHAR(32) NOT NULL DEFAULT 'member',
joined_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (household_id, authentik_sub),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (authentik_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 007 — meal plans (per household per week). Lock state + race metadata.
# week_start = Monday (date) of the week.
"""
CREATE TABLE IF NOT EXISTS cauldron_meal_plans (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
week_start DATE NOT NULL,
generated_by_sub VARCHAR(190),
generated_at DATETIME,
locked_by_sub VARCHAR(190),
locked_at DATETIME,
locked_reason ENUM('user','auto') DEFAULT NULL,
UNIQUE KEY uk_household_week (household_id, week_start),
INDEX idx_locked_by (locked_by_sub),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 008 — local recipe index for fast in-process search. Mirrors enough
# of Mealie's recipe shape to fuzzy-rank without round-tripping to
# Mealie on every keystroke. Refreshed on demand (on first /recipes
# load, after pin/unpin, every 5min, or on /me 'refresh' button).
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_index (
household_id BIGINT NOT NULL,
slug VARCHAR(255) NOT NULL,
name VARCHAR(500) NOT NULL,
description TEXT,
tags_text TEXT,
cats_text TEXT,
foods_text TEXT,
ings_text TEXT,
date_updated DATETIME,
date_added DATETIME,
last_made DATETIME,
total_time VARCHAR(64),
recipe_yield VARCHAR(255),
raw_json JSON,
indexed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (household_id, slug),
FULLTEXT KEY ft_text (name, description, tags_text, cats_text, foods_text),
INDEX idx_household (household_id),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 009 — refresh state per household
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_index_state (
household_id BIGINT PRIMARY KEY,
last_refreshed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
recipe_count INT NOT NULL DEFAULT 0,
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 010 — canonical foods table for the unit-aware aggregator. Each row is
# ONE food (e.g. "rice", "butter", "onion") with density + unit class.
# Seeded from USDA SR Legacy via scripts/build_foods_seed.py; will be
# extended with claude-curated entries in v0.3 step 2.
"""
CREATE TABLE IF NOT EXISTS cauldron_foods (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
canonical_name VARCHAR(255) NOT NULL,
plural_name VARCHAR(255),
category VARCHAR(64),
density_g_per_ml DECIMAL(6,3),
common_size_g DECIMAL(8,2),
default_unit_class ENUM('mass','volume','count','mixed') NOT NULL DEFAULT 'mass',
usda_fdc_id INT,
usda_description VARCHAR(500),
notes JSON,
source ENUM('usda','claude','manual') NOT NULL DEFAULT 'usda',
last_updated DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_canonical (canonical_name),
INDEX idx_category (category),
INDEX idx_usda (usda_fdc_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 011 — Mealie food_id → cauldron food_id mapping per household. The
# foods dedupe step (v0.3 A2) populates this. Aggregator joins through
# this to group ingredients across recipes by canonical food.
"""
CREATE TABLE IF NOT EXISTS cauldron_food_mapping (
household_id BIGINT NOT NULL,
mealie_food_id VARCHAR(64) NOT NULL,
cauldron_food_id BIGINT NOT NULL,
confidence DECIMAL(4,2) NOT NULL DEFAULT 1.00,
mapped_by ENUM('exact','fuzzy','claude','manual') NOT NULL DEFAULT 'fuzzy',
mapped_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (household_id, mealie_food_id),
INDEX idx_canonical (cauldron_food_id),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (cauldron_food_id) REFERENCES cauldron_foods(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 012 — AI-generated meal plan slots. One row per (plan, day). Created
# when a household member triggers /api/plan/generate. picker_subs JSON
# holds the authentik_subs of household members who pinned this slot's
# recipe (empty list if AI-chosen). reason is the AI's user-facing
# rationale. notes is reserved for future swap/edit history.
"""
CREATE TABLE IF NOT EXISTS cauldron_meal_plan_slots (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
plan_id BIGINT NOT NULL,
day VARCHAR(10) NOT NULL,
recipe_slug VARCHAR(255) NOT NULL,
recipe_name VARCHAR(500) NOT NULL,
source ENUM('mealie','pick') NOT NULL DEFAULT 'mealie',
picker_subs JSON,
reason VARCHAR(500),
notes JSON,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_plan_day (plan_id, day),
INDEX idx_plan (plan_id),
FOREIGN KEY (plan_id) REFERENCES cauldron_meal_plans(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 013 — pick-points ledger. 1pt awarded when a member's pick lands in
# a generated plan ('pick_used'). Reserved reasons for v0.4: first-to-
# lock + streak bonuses. Joins to households + plans + users so a row
# disappears cleanly if any of them are removed.
"""
CREATE TABLE IF NOT EXISTS cauldron_pick_points (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
plan_id BIGINT NOT NULL,
authentik_sub VARCHAR(190) NOT NULL,
points INT NOT NULL,
reason ENUM('pick_used','first_to_lock','streak_bonus') NOT NULL,
awarded_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_household_user (household_id, authentik_sub),
INDEX idx_plan (plan_id),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (plan_id) REFERENCES cauldron_meal_plans(id) ON DELETE CASCADE,
FOREIGN KEY (authentik_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 015 — bulk sterilizer jobs. One row per bulk-parse run kicked off by a
# household member. State machine:
# running → walking recipes, calling Sterilizer.preview_recipe
# review → walk done, awaiting human approve/reject in /sterilize UI
# applying → applying approved proposals back to Mealie
# done → complete
# failed → unrecoverable error or stuck-job recovery marked it
"""
CREATE TABLE IF NOT EXISTS cauldron_sterilize_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
started_by_sub VARCHAR(190) NOT NULL,
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
total_recipes INT NOT NULL DEFAULT 0,
processed_count INT NOT NULL DEFAULT 0,
skipped_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
current_slug VARCHAR(255),
last_error VARCHAR(500),
state ENUM('running','review','applying','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_household_state (household_id, state),
INDEX idx_started_by (started_by_sub),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 016 — per-recipe proposals for a sterilize job. proposal_json is the
# full Sterilizer.preview_recipe() output. approved=NULL means awaiting
# review; TRUE = will apply; FALSE = skipped on apply. apply_error
# captures any failure when applying that specific recipe.
"""
CREATE TABLE IF NOT EXISTS cauldron_sterilize_proposals (
job_id BIGINT NOT NULL,
recipe_slug VARCHAR(255) NOT NULL,
recipe_name VARCHAR(500),
ingredient_count INT NOT NULL DEFAULT 0,
proposal_json JSON,
preview_error VARCHAR(500),
approved BOOLEAN,
applied_at DATETIME,
apply_error VARCHAR(500),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (job_id, recipe_slug),
INDEX idx_approved (job_id, approved),
FOREIGN KEY (job_id) REFERENCES cauldron_sterilize_jobs(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 017 — Per-Mealie-food cooking metadata, keyed by Mealie's UUID food.id.
# Replaces the old parallel name catalog (cauldron_foods) — Mealie owns
# canonical food names, cauldron only owns density + unit class + common
# size for shopping list aggregation. Populated lazily by the /list
# foods_lookup path on the first sighting of each food (claude fallback).
# Carried-over USDA densities will get re-keyed to mealie_food_id during
# the Step 2 backfill, then cauldron_foods can be dropped (Step 4).
"""
CREATE TABLE IF NOT EXISTS cauldron_food_metadata (
mealie_food_id VARCHAR(64) PRIMARY KEY,
food_name VARCHAR(255),
density_g_per_ml DECIMAL(6,3),
common_size_g DECIMAL(8,2),
default_unit_class ENUM('mass','volume','count','mixed') NOT NULL DEFAULT 'mass',
category VARCHAR(64),
source ENUM('seed','claude','manual') NOT NULL DEFAULT 'claude',
notes JSON,
last_updated DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_food_name (food_name),
INDEX idx_category (category)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 018 — Foods consolidator jobs. Mirrors the sterilize job pattern but
# for the foods table itself: scan Mealie household foods, cluster by
# similarity, ask Sonnet which clusters are dupes, apply via Mealie's
# PUT /api/foods/merge endpoint. One job at a time per household.
"""
CREATE TABLE IF NOT EXISTS cauldron_consolidate_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
started_by_sub VARCHAR(190) NOT NULL,
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
total_clusters INT NOT NULL DEFAULT 0,
processed_count INT NOT NULL DEFAULT 0,
merged_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
current_cluster VARCHAR(255),
last_error VARCHAR(500),
state ENUM('running','review','applying','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_household_state (household_id, state),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 019 — Per-cluster proposal. cluster_json holds the list of Mealie
# food IDs+names in the cluster; sonnet_decision stores
# {merge: bool, canonical_name: <picked>, canonical_food_id: <id>,
# discard_food_ids: [...]}.
"""
CREATE TABLE IF NOT EXISTS cauldron_consolidate_proposals (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_id BIGINT NOT NULL,
cluster_key VARCHAR(255) NOT NULL,
cluster_size INT NOT NULL DEFAULT 0,
cluster_json JSON,
sonnet_decision JSON,
approved BOOLEAN,
applied_at DATETIME,
apply_error VARCHAR(500),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_job_cluster (job_id, cluster_key),
INDEX idx_approved (job_id, approved),
FOREIGN KEY (job_id) REFERENCES cauldron_consolidate_jobs(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 020 — Drop the game-system pick-points table. Stripped from /plan
# 2026-04-30; tables and methods kept around as dead code for a few
# days to let it bake. Now that the simpler base flow is proven,
# the ledger comes out. Recovery path: revert this migration's number
# and re-add the table from db.py history; the points data was never
# widely populated anyway.
"""
DROP TABLE IF EXISTS cauldron_pick_points
""",
# 021 — Recipe-dedupe bulk job state (mirrors the consolidate pattern
# but for recipes themselves — name + ingredient similarity → Sonnet
# decides → user-confirms → DELETE via Mealie API).
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_dedupe_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
started_by_sub VARCHAR(190) NOT NULL,
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
total_clusters INT NOT NULL DEFAULT 0,
processed_count INT NOT NULL DEFAULT 0,
deleted_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
current_cluster VARCHAR(255),
last_error VARCHAR(500),
state ENUM('running','review','applying','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_household_state (household_id, state),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 022 — Per-cluster recipe-dedupe proposal. cluster_json holds the
# full recipe summaries (slug+name+ingredients+source); sonnet_decision
# = {duplicates: bool, canonical_slug, delete_slugs: [...], reason}.
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_dedupe_proposals (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_id BIGINT NOT NULL,
cluster_key VARCHAR(255) NOT NULL,
cluster_size INT NOT NULL DEFAULT 0,
cluster_json JSON,
sonnet_decision JSON,
approved BOOLEAN,
applied_at DATETIME,
apply_error VARCHAR(500),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_job_cluster (job_id, cluster_key),
INDEX idx_approved (job_id, approved),
FOREIGN KEY (job_id) REFERENCES cauldron_recipe_dedupe_jobs(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 023 — Per-week diet/vibe preference on the plan generator. Free-form
# text the user types before generating ("high protein low carb",
# "carb load this week", "light recovery, no fish") that biases
# Sonnet's slot picks. Picks still take precedence — preference only
# influences AI-chosen slots when the pool is bigger than picks.
"""
ALTER TABLE cauldron_meal_plans
ADD COLUMN IF NOT EXISTS preference_prompt VARCHAR(1000)
""",
# 024 — Per-recipe AI-generated metadata. Sonnet looks at the full
# recipe (name, description, ingredients, steps, yields) and returns
# a structured blob: tags, cuisine, complexity, estimated_minutes,
# meal_type, primary_protein, primary_carb, veg_forward, comfort_tier,
# season_fit, summary, best_for. Plan generator uses this so "high
# protein week" becomes a real query, not just a vibe-prompt.
# enrich_version lets us bump the prompt and re-enrich without
# losing the prior data.
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_meta (
household_id BIGINT NOT NULL,
recipe_slug VARCHAR(255) NOT NULL,
meta_json JSON,
enrich_version INT NOT NULL DEFAULT 1,
last_enriched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (household_id, recipe_slug),
INDEX idx_household (household_id),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 025 — Recipe-enrichment bulk job state. Runs through every household
# recipe, calls Sonnet, persists meta. No apply/review step — meta is
# purely additive so we just write it. Same daemon-thread runner +
# cancel + stuck-recovery pattern.
"""
CREATE TABLE IF NOT EXISTS cauldron_enrich_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
started_by_sub VARCHAR(190) NOT NULL,
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
total_recipes INT NOT NULL DEFAULT 0,
enriched_count INT NOT NULL DEFAULT 0,
skipped_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
current_slug VARCHAR(255),
last_error VARCHAR(500),
state ENUM('running','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_household_state (household_id, state),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 026 — Per-week structured planning constraints. daily_targets_json
# holds the macro budget the household wants to hit per day
# ({calories, protein_g, carbs_g, fat_g} — Sonnet sums per slot and
# divides by 7). exclusions_json is a list of contains.* allergen
# keys to exclude entirely from the AI-chosen slots
# (["dairy", "shellfish"]). Both nullable; the existing free-form
# preference_prompt is kept for vibe-only weeks where macros aren't
# explicit.
"""
ALTER TABLE cauldron_meal_plans
ADD COLUMN IF NOT EXISTS daily_targets_json JSON
""",
"""
ALTER TABLE cauldron_meal_plans
ADD COLUMN IF NOT EXISTS exclusions_json JSON
""",
# 028 — Multi-meal planning. Slots gain meal_type so a single plan
# row can hold breakfast + lunch + dinner across the same 7 days.
# Default 'dinner' so existing rows keep their semantics.
"""
ALTER TABLE cauldron_meal_plan_slots
ADD COLUMN IF NOT EXISTS meal_type
ENUM('breakfast','lunch','dinner','snack','dessert','side')
NOT NULL DEFAULT 'dinner'
""",
# 029 — Old unique key was (plan_id, day) — now needs to include
# meal_type so a Monday can have breakfast AND lunch AND dinner.
# Drop-and-add, idempotent: catch the "doesn't exist" if already done.
"""
ALTER TABLE cauldron_meal_plan_slots
DROP INDEX uk_plan_day
""",
"""
ALTER TABLE cauldron_meal_plan_slots
ADD UNIQUE KEY uk_plan_day_meal (plan_id, day, meal_type)
""",
# 030 — Per-plan selection of which meal types to generate. List of
# strings: ["breakfast","lunch","dinner"]. Default dinner-only so
# generation is back-compat for users who haven't opted in.
"""
ALTER TABLE cauldron_meal_plans
ADD COLUMN IF NOT EXISTS meal_types_json JSON
""",
# 032 — Hecate's narrative weekly reading. Stored alongside the plan
# so it persists with the slots and can be re-rendered. ~2KB ceiling.
"""
ALTER TABLE cauldron_meal_plans
ADD COLUMN IF NOT EXISTS hecate_reading TEXT
""",
# 033 — Discover v0.1 corpus: recipes scraped from external sources
# (allrecipes, BBC Good Food, smitten kitchen, etc) before any household
# has imported them. status walks raw → enriched → imported|rejected.
# Once a household clicks "import to Mealie", Mealie's recipe-from-URL
# endpoint pulls the recipe into THAT household and our sterilize+enrich
# pipelines run on the new Mealie row; the discover row stays as a
# provenance breadcrumb. source_url capped at 768 to stay under InnoDB's
# 3072-byte unique-key limit (768 × 4-byte utf8mb4 chars = 3072).
"""
CREATE TABLE IF NOT EXISTS cauldron_discovered_recipes (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
slug VARCHAR(255),
source_url VARCHAR(768) NOT NULL,
name VARCHAR(500),
description TEXT,
image_url VARCHAR(1024),
scraped_json JSON,
meta_json JSON,
enrich_version INT NOT NULL DEFAULT 0,
status ENUM('raw','enriched','imported','rejected')
NOT NULL DEFAULT 'raw',
scraped_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_action_at DATETIME,
UNIQUE KEY uk_source_url (source_url),
INDEX idx_status_scraped (status, scraped_at),
INDEX idx_slug (slug),
FULLTEXT KEY ft_search (name, description)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 034 — Discover scrape-job state. Same daemon-thread runner pattern as
# sterilize/enrich/foods-consolidate jobs: one job at a time per source
# seed, cancel-respect via state poll, finalize updates only if state
# is non-terminal. source_seed is e.g. 'allrecipes.com'.
"""
CREATE TABLE IF NOT EXISTS cauldron_discover_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
started_by_sub VARCHAR(190),
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
source_seed VARCHAR(255),
pages_scraped INT NOT NULL DEFAULT 0,
recipes_added INT NOT NULL DEFAULT 0,
skipped_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
last_error VARCHAR(500),
state ENUM('running','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_state (state)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 035 — Track the Mealie GROUP a household belongs to. Until now cauldron
# only stored mealie_household_id, but Mealie's recipe / food / unit /
# category / tag / tool tables are GROUP-scoped on read (cauldron audit
# 2026-05-01). To reason about "is this discover import already in my
# group's library" we need the group id. Backfilled at boot via
# sync_user_household reading who_am_i()["groupId"].
"""
ALTER TABLE cauldron_households
ADD COLUMN IF NOT EXISTS mealie_group_id VARCHAR(64) NULL,
ADD INDEX IF NOT EXISTS idx_mealie_group_id (mealie_group_id)
""",
# 036 — Same column on cauldron_recipe_index. The index is per-household
# today (2x storage for a 2-household group) but the underlying recipes
# are group-shared. Adding this column doesn't change current write
# behavior; it lets future code de-duplicate the cache keyed by group.
"""
ALTER TABLE cauldron_recipe_index
ADD COLUMN IF NOT EXISTS mealie_group_id VARCHAR(64) NULL,
ADD INDEX IF NOT EXISTS idx_recipe_group_id (mealie_group_id)
""",
# 037 — Drop the dead cauldron_food_mapping table. Foundation-reset
# Step 2 (commit f74a627 2026-04-30) replaced it with cauldron_food_metadata
# which is keyed by Mealie's food UUID and is therefore already group-
# implicit. cauldron_food_mapping has zero callers in the codebase as of
# 2026-05-01.
"""
DROP TABLE IF EXISTS cauldron_food_mapping
""",
# 038 — Per-household provenance for /discover imports. Replaces the
# global `cauldron_discovered_recipes.status='imported'` flag, which
# incorrectly hid the import-button from every household once any user
# imported. The new model: a discover row stays at status='enriched'
# forever (or 'rejected' if explicitly skipped); imports are recorded
# per-household here. The /discover search endpoint joins this against
# the user's group's households so the UI can show "✓ already in your
# group's library" without forcing every household to re-import.
"""
CREATE TABLE IF NOT EXISTS cauldron_discover_imports (
discover_id BIGINT NOT NULL,
household_id BIGINT NOT NULL,
mealie_slug VARCHAR(255) NOT NULL,
imported_by_sub VARCHAR(190),
imported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (discover_id, household_id),
INDEX idx_household (household_id),
INDEX idx_slug (mealie_slug),
FOREIGN KEY (discover_id) REFERENCES cauldron_discovered_recipes(id) ON DELETE CASCADE,
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
]
class DB:
def __init__(self, *, host: str, port: int, name: str, user: str, password: str):
self.kwargs = dict(
host=host,
port=port,
user=user,
password=password,
database=name,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=False,
)
@contextmanager
def conn(self):
c = pymysql.connect(**self.kwargs)
try:
yield c
c.commit()
except Exception:
c.rollback()
raise
finally:
c.close()
def migrate(self) -> list[str]:
"""Apply pending migrations. Returns list of versions applied."""
applied: list[str] = []
with self.conn() as c:
with c.cursor() as cur:
cur.execute(MIGRATIONS[0]) # bootstrap migrations table
cur.execute("SELECT version FROM schema_migrations")
done = {r["version"] for r in cur.fetchall()}
for i, sql in enumerate(MIGRATIONS, start=1):
ver = f"{i:03d}"
if ver in done:
continue
cur.execute(sql)
# IGNORE to tolerate the multi-worker boot race where two
# gunicorn workers both bootstrap an empty migrations table
cur.execute(
"INSERT IGNORE INTO schema_migrations (version) VALUES (%s)", (ver,)
)
applied.append(ver)
return applied
# --- user ops -----------------------------------------------------------
def upsert_user(self, *, sub: str, email: str, display_name: str | None) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_users (authentik_sub, email, display_name, last_seen)
VALUES (%s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
email = VALUES(email),
display_name = COALESCE(VALUES(display_name), display_name),
last_seen = NOW()
""",
(sub, email, display_name),
)
def get_user(self, sub: str) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT authentik_sub, email, display_name, last_seen FROM cauldron_users WHERE authentik_sub=%s",
(sub,),
)
return cur.fetchone()
# --- mealie token ops ---------------------------------------------------
def first_usable_mealie_token(self) -> dict | None:
"""Return the freshest (last_validated DESC) per-user encrypted Mealie
token row. Used at boot as a fallback when the system MEALIE_API_TOKEN
env is dead — group admins can list foods, so any user's token works
for the backfill scan."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT authentik_sub, encrypted_token
FROM cauldron_user_mealie_tokens
WHERE last_failure_at IS NULL OR
last_validated > COALESCE(last_failure_at, '1970-01-01')
ORDER BY last_validated DESC, created_at DESC
LIMIT 1"""
)
row = cur.fetchone()
return dict(row) if row else None
def get_user_mealie_token_blob(self, sub: str) -> bytes | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT encrypted_token FROM cauldron_user_mealie_tokens WHERE authentik_sub=%s",
(sub,),
)
row = cur.fetchone()
return row["encrypted_token"] if row else None
def set_user_mealie_token_blob(self, sub: str, blob: bytes) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_user_mealie_tokens (authentik_sub, encrypted_token, last_validated)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE
encrypted_token = VALUES(encrypted_token),
last_validated = NOW(),
last_failure_at = NULL,
last_failure_reason = NULL
""",
(sub, blob),
)
def delete_user_mealie_token(self, sub: str) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"DELETE FROM cauldron_user_mealie_tokens WHERE authentik_sub=%s",
(sub,),
)
def mark_user_mealie_token_failure(self, sub: str, reason: str) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_user_mealie_tokens
SET last_failure_at = NOW(), last_failure_reason = %s
WHERE authentik_sub = %s
""",
(reason[:500], sub),
)
# --- households ---------------------------------------------------------
def upsert_household(
self,
*,
mealie_household_id: str,
name: str,
mealie_group_id: str | None = None,
) -> int:
"""Create or update a household record. Returns local PK (id).
`mealie_group_id` is optional for back-compat with older callers but
should be supplied by sync_user_household so cross-household reasoning
works."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_households (mealie_household_id, name, mealie_group_id)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
mealie_group_id = COALESCE(VALUES(mealie_group_id), mealie_group_id),
id = LAST_INSERT_ID(id)
""",
(mealie_household_id, name, mealie_group_id),
)
return cur.lastrowid
def add_household_member(self, household_id: int, sub: str, role: str = "member") -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT IGNORE INTO cauldron_household_members
(household_id, authentik_sub, role)
VALUES (%s, %s, %s)
""",
(household_id, sub, role),
)
def get_user_household_id(self, sub: str) -> int | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT household_id FROM cauldron_household_members WHERE authentik_sub=%s LIMIT 1",
(sub,),
)
row = cur.fetchone()
return row["household_id"] if row else None
def get_household(self, household_id: int) -> dict | None:
"""Return a household row by local id, including mealie_group_id."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, mealie_household_id, mealie_group_id, name, created_at
FROM cauldron_households WHERE id=%s""",
(household_id,),
)
return cur.fetchone()
def list_household_member_subs(self, household_id: int) -> list[str]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT authentik_sub FROM cauldron_household_members WHERE household_id=%s",
(household_id,),
)
return [r["authentik_sub"] for r in cur.fetchall()]
# --- meal plans (per household per week) -------------------------------
def get_or_create_plan(self, household_id: int, week_start) -> dict:
"""Get the plan record for a (household, week_start), creating an
empty one if it doesn't exist. week_start is a date (Monday)."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT IGNORE INTO cauldron_meal_plans (household_id, week_start)
VALUES (%s, %s)
""",
(household_id, week_start),
)
cur.execute(
"SELECT * FROM cauldron_meal_plans WHERE household_id=%s AND week_start=%s",
(household_id, week_start),
)
return dict(cur.fetchone())
def set_plan_preference(self, plan_id: int, preference: str | None) -> None:
"""Persist the per-week diet/vibe preference. Empty/whitespace-only
strings are stored as NULL so the prompt only includes the section
when the user actually filled it in."""
clean = (preference or "").strip()[:1000] or None
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_meal_plans SET preference_prompt=%s WHERE id=%s",
(clean, plan_id),
)
def set_plan_hecate_reading(self, plan_id: int, reading: str) -> None:
"""Persist Hecate's narrative weekly reading. Called after generate
so a re-render of the plan view shows it."""
clean = (reading or "").strip()[:2000] or None
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_meal_plans SET hecate_reading=%s WHERE id=%s",
(clean, plan_id),
)
def set_plan_meal_types(self, plan_id: int, meal_types: list | None) -> None:
"""Persist which meal types this plan should generate. None / empty
list defaults to ['dinner'] at generation time."""
import json as _json
clean: list | None = None
if isinstance(meal_types, list) and meal_types:
allowed = set(self.MEAL_ORDER)
ce = []
for m in meal_types:
if isinstance(m, str) and m.strip().lower() in allowed:
ce.append(m.strip().lower())
# Preserve order: breakfast, lunch, dinner, snack, dessert, side
order = {m: i for i, m in enumerate(self.MEAL_ORDER)}
ce = sorted(set(ce), key=lambda m: order[m])
clean = ce or None
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_meal_plans SET meal_types_json=%s WHERE id=%s",
(_json.dumps(clean) if clean else None, plan_id),
)
def set_plan_targets_and_exclusions(
self,
plan_id: int,
*,
targets: dict | None,
exclusions: list | None,
) -> None:
"""Persist the per-week macro targets ({calories, protein_g, carbs_g,
fat_g} — null fields mean 'no target') and allergen exclusion list
(subset of contains.* keys). Either or both can be None."""
import json as _json
# Normalize: drop empty/zero values so the prompt only renders the
# ones that matter
clean_targets: dict | None = None
if isinstance(targets, dict):
ct = {}
for k in ("calories", "protein_g", "carbs_g", "fat_g"):
v = targets.get(k)
try:
n = int(v) if v not in (None, "", 0) else 0
if n > 0:
ct[k] = n
except (TypeError, ValueError):
continue
clean_targets = ct or None
clean_exclusions: list | None = None
if isinstance(exclusions, list):
allowed = {"dairy", "gluten", "nuts", "peanuts", "eggs",
"shellfish", "fish", "soy", "sesame", "pork"}
ce = sorted({
str(x).strip().lower() for x in exclusions
if isinstance(x, str) and x.strip().lower() in allowed
})
clean_exclusions = ce or None
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_meal_plans
SET daily_targets_json=%s, exclusions_json=%s
WHERE id=%s""",
(
_json.dumps(clean_targets) if clean_targets else None,
_json.dumps(clean_exclusions) if clean_exclusions else None,
plan_id,
),
)
def reset_plan(self, plan_id: int) -> bool:
"""Wipe an UNLOCKED plan back to blank slate: delete slots, clear
generated_at + generated_by_sub, clear preference_prompt +
daily_targets_json + exclusions_json. Returns True if the row was
eligible (unlocked) and reset; False if locked (untouched —
locked plans are immutable history)."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_meal_plans
SET generated_at=NULL, generated_by_sub=NULL,
preference_prompt=NULL, daily_targets_json=NULL,
exclusions_json=NULL
WHERE id=%s AND locked_at IS NULL""",
(plan_id,),
)
if cur.rowcount == 0:
return False
cur.execute(
"DELETE FROM cauldron_meal_plan_slots WHERE plan_id=%s",
(plan_id,),
)
return True
def lock_plan(self, plan_id: int, *, sub: str, reason: str = "user") -> dict:
"""Lock a plan if not already locked. Returns updated plan dict."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_meal_plans
SET locked_by_sub = %s, locked_at = NOW(), locked_reason = %s
WHERE id = %s AND locked_at IS NULL
""",
(sub, reason, plan_id),
)
cur.execute("SELECT * FROM cauldron_meal_plans WHERE id=%s", (plan_id,))
return dict(cur.fetchone())
def auto_lock_past_unlocked_plans(self, household_id: int, before_date) -> int:
"""Mark any past unlocked plans as auto-locked. Returns count."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_meal_plans
SET locked_at = NOW(), locked_reason = 'auto'
WHERE household_id = %s AND week_start < %s AND locked_at IS NULL
""",
(household_id, before_date),
)
return cur.rowcount
# --- plan slots (v0.3 A4) ----------------------------------------------
# Day order is stable Mon..Sun. Used everywhere we need to render slots
# in calendar order.
PLAN_DAYS = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
MEAL_ORDER = ("breakfast", "lunch", "dinner", "snack", "dessert", "side")
def list_plan_slots(self, plan_id: int) -> list[dict]:
"""All slots for a plan, ordered Mon..Sun then breakfast→dinner.
picker_subs is decoded from JSON to a list (or [] if null)."""
import json as _json
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
SELECT id, plan_id, day, meal_type, recipe_slug, recipe_name, source,
picker_subs, reason, notes, created_at
FROM cauldron_meal_plan_slots
WHERE plan_id = %s
""",
(plan_id,),
)
rows = [dict(r) for r in cur.fetchall()]
for r in rows:
ps = r.get("picker_subs")
if isinstance(ps, str):
try:
r["picker_subs"] = _json.loads(ps)
except Exception:
r["picker_subs"] = []
elif ps is None:
r["picker_subs"] = []
n = r.get("notes")
if isinstance(n, str):
try:
r["notes"] = _json.loads(n)
except Exception:
r["notes"] = None
day_order = {d: i for i, d in enumerate(self.PLAN_DAYS)}
meal_order = {m: i for i, m in enumerate(self.MEAL_ORDER)}
rows.sort(key=lambda r: (
day_order.get((r.get("day") or "").lower(), 99),
meal_order.get((r.get("meal_type") or "dinner").lower(), 99),
))
return rows
def save_plan_slots(self, plan_id: int, slots: list[dict]) -> int:
"""INSERT IGNORE every slot. Returns count actually inserted —
callers can use this to detect race contention (zero rows = someone
else already saved this plan). Each slot must carry a meal_type
(defaults to 'dinner' for back-compat)."""
import json as _json
if not slots:
return 0
inserted = 0
with self.conn() as c, c.cursor() as cur:
for s in slots:
meal = (s.get("meal_type") or "dinner").lower()
if meal not in self.MEAL_ORDER:
meal = "dinner"
cur.execute(
"""
INSERT IGNORE INTO cauldron_meal_plan_slots
(plan_id, day, meal_type, recipe_slug, recipe_name,
source, picker_subs, reason, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
plan_id,
(s.get("day") or "").lower()[:10],
meal,
s["recipe_slug"][:255],
(s.get("recipe_name") or s["recipe_slug"])[:500],
s.get("source") or ("pick" if s.get("picker_subs") else "mealie"),
_json.dumps(s.get("picker_subs") or []),
(s.get("reason") or "")[:500] or None,
_json.dumps(s["notes"]) if s.get("notes") is not None else None,
),
)
inserted += cur.rowcount
return inserted
def delete_plan_slots(self, plan_id: int) -> int:
"""Wipe slots for a plan (used by re-roll)."""
with self.conn() as c, c.cursor() as cur:
cur.execute("DELETE FROM cauldron_meal_plan_slots WHERE plan_id=%s", (plan_id,))
return cur.rowcount
def mark_plan_generated(self, plan_id: int, sub: str) -> dict:
"""Set generated_by_sub + generated_at IF not already set. Returns
the post-update plan row. Idempotent for the same generator."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_meal_plans
SET generated_by_sub = %s,
generated_at = NOW()
WHERE id = %s AND generated_at IS NULL
""",
(sub, plan_id),
)
cur.execute("SELECT * FROM cauldron_meal_plans WHERE id=%s", (plan_id,))
return dict(cur.fetchone())
def clear_plan_generated(self, plan_id: int) -> None:
"""Re-roll path: clear the generated_by/at marker so the next
generate writes fresh metadata."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_meal_plans
SET generated_by_sub = NULL,
generated_at = NULL
WHERE id = %s
""",
(plan_id,),
)
def enrich_plan_with_slots(self, plan: dict) -> dict:
"""In-place: add `slots` key to a plan dict. Returns the same dict
for chaining. Empty list if there are no slots yet."""
plan["slots"] = self.list_plan_slots(plan["id"]) if plan.get("id") else []
return plan
# --- meal picks ---------------------------------------------------------
def add_meal_pick(self, sub: str, slug: str, name: str) -> bool:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT IGNORE INTO cauldron_meal_picks (authentik_sub, recipe_slug, recipe_name)
VALUES (%s, %s, %s)
""",
(sub, slug, name[:500]),
)
return cur.rowcount > 0
def remove_meal_pick(self, sub: str, slug: str) -> bool:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"DELETE FROM cauldron_meal_picks WHERE authentik_sub=%s AND recipe_slug=%s",
(sub, slug),
)
return cur.rowcount > 0
def list_meal_picks(self, sub: str) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT recipe_slug, recipe_name, added_at FROM cauldron_meal_picks "
"WHERE authentik_sub=%s ORDER BY added_at DESC",
(sub,),
)
return [dict(r) for r in cur.fetchall()]
def list_meal_pick_slugs(self, sub: str) -> set[str]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT recipe_slug FROM cauldron_meal_picks WHERE authentik_sub=%s",
(sub,),
)
return {r["recipe_slug"] for r in cur.fetchall()}
def list_household_pick_slugs(self, household_id: int) -> set[str]:
"""Union of picks across all members of the household."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
SELECT DISTINCT p.recipe_slug
FROM cauldron_meal_picks p
JOIN cauldron_household_members m ON m.authentik_sub = p.authentik_sub
WHERE m.household_id = %s
""",
(household_id,),
)
return {r["recipe_slug"] for r in cur.fetchall()}
def list_household_picks_with_pickers(self, household_id: int) -> list[dict]:
"""All picks across the household, grouped by slug, with the list of
members who picked each (so the UI can show 'pinned by Cobb · Abby').
Latest pick added_at per slug for ordering."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
SELECT
p.recipe_slug AS slug,
MIN(p.recipe_name) AS name,
GROUP_CONCAT(
DISTINCT COALESCE(NULLIF(u.display_name, ''),
SUBSTRING_INDEX(u.email, '@', 1))
ORDER BY p.added_at ASC
SEPARATOR '|'
) AS pickers,
GROUP_CONCAT(
DISTINCT u.authentik_sub
ORDER BY p.added_at ASC
SEPARATOR '|'
) AS picker_subs,
MAX(p.added_at) AS last_pick_at,
COUNT(*) AS pick_count
FROM cauldron_meal_picks p
JOIN cauldron_household_members m ON m.authentik_sub = p.authentik_sub
LEFT JOIN cauldron_users u ON u.authentik_sub = p.authentik_sub
WHERE m.household_id = %s
GROUP BY p.recipe_slug
ORDER BY last_pick_at DESC
""",
(household_id,),
)
out = []
for r in cur.fetchall():
d = dict(r)
d["pickers"] = (d["pickers"] or "").split("|") if d["pickers"] else []
d["picker_subs"] = (d["picker_subs"] or "").split("|") if d["picker_subs"] else []
out.append(d)
return out
# --- recipe index -------------------------------------------------------
def get_index_state(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT last_refreshed_at, recipe_count FROM cauldron_recipe_index_state WHERE household_id=%s",
(household_id,),
)
return cur.fetchone()
def replace_recipe_index(self, household_id: int, rows: list[dict]) -> int:
"""Atomic-ish replace of the index for one household. Drops + reinserts."""
import json as _json
with self.conn() as c, c.cursor() as cur:
cur.execute("DELETE FROM cauldron_recipe_index WHERE household_id=%s", (household_id,))
for r in rows:
cur.execute(
"""
INSERT INTO cauldron_recipe_index
(household_id, slug, name, description, tags_text, cats_text,
foods_text, ings_text, date_updated, date_added, last_made,
total_time, recipe_yield, raw_json)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(
household_id,
r["slug"],
r["name"][:500],
(r.get("description") or "")[:65000],
(r.get("tags_text") or "")[:65000],
(r.get("cats_text") or "")[:65000],
(r.get("foods_text") or "")[:65000],
(r.get("ings_text") or "")[:65000],
r.get("date_updated"),
r.get("date_added"),
r.get("last_made"),
(r.get("total_time") or "")[:64],
(r.get("recipe_yield") or "")[:255],
_json.dumps(r.get("raw") or {}, default=str),
),
)
cur.execute(
"""
INSERT INTO cauldron_recipe_index_state (household_id, last_refreshed_at, recipe_count)
VALUES (%s, NOW(), %s)
ON DUPLICATE KEY UPDATE last_refreshed_at=NOW(), recipe_count=VALUES(recipe_count)
""",
(household_id, len(rows)),
)
return len(rows)
def find_indexed_recipe(self, household_id: int, slug: str) -> dict | None:
"""Look up a single indexed recipe by household + slug. Returns None
if the slug isn't in this household's catalog. Used by /api/plan/
suggest/pin to validate a slug before adding it to picks."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT slug, name, description, tags_text, cats_text,
foods_text, date_updated, date_added, last_made,
total_time, recipe_yield, raw_json
FROM cauldron_recipe_index
WHERE household_id = %s AND slug = %s
LIMIT 1""",
(household_id, slug),
)
r = cur.fetchone()
return dict(r) if r else None
def list_indexed_recipes(self, household_id: int, *, category: str | None = None,
order_by: str = "date_added", order_dir: str = "desc",
limit: int = 1000, offset: int = 0) -> list[dict]:
"""Pull the indexed recipe rows. Used both for non-search browse + as
the candidate set for in-process fuzzy ranking on search."""
order_col = {
"date_added": "date_added",
"date_updated": "date_updated",
"last_made": "last_made",
"name": "name",
}.get(order_by, "date_added")
order_dir_sql = "DESC" if order_dir.lower() != "asc" else "ASC"
sql = f"""
SELECT slug, name, description, tags_text, cats_text, foods_text,
date_updated, date_added, last_made, total_time, recipe_yield, raw_json
FROM cauldron_recipe_index
WHERE household_id = %s
"""
params: list = [household_id]
if category:
sql += " AND cats_text LIKE %s"
params.append(f"%{category}%")
sql += f" ORDER BY {order_col} {order_dir_sql} LIMIT %s OFFSET %s"
params += [limit, offset]
with self.conn() as c, c.cursor() as cur:
cur.execute(sql, params)
return [dict(r) for r in cur.fetchall()]
# --- chat log -----------------------------------------------------------
def log_chat(
self,
*,
sub: str,
intent: str,
duration_ms: int,
model: str,
prompt_chars: int,
result_chars: int,
ok: bool,
error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_chat_log
(authentik_sub, intent, forge_duration_ms, forge_model,
prompt_chars, result_chars, ok, error)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""",
(sub, intent, duration_ms, model, prompt_chars, result_chars, ok, (error or "")[:500] or None),
)
# --- bulk sterilizer ----------------------------------------------------
def create_sterilize_job(
self, *, household_id: int, started_by_sub: str, total: int
) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_sterilize_jobs
(household_id, started_by_sub, total_recipes, state)
VALUES (%s, %s, %s, 'running')
""",
(household_id, started_by_sub, total),
)
return cur.lastrowid
def get_sterilize_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT * FROM cauldron_sterilize_jobs WHERE id=%s", (job_id,)
)
return cur.fetchone()
def get_sterilize_job_state(self, job_id: int) -> str | None:
"""Cheap state-only fetch used by the runner to detect external
cancels mid-loop without dragging the whole row over the wire."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT state FROM cauldron_sterilize_jobs WHERE id=%s", (job_id,)
)
row = cur.fetchone()
return row["state"] if row else None
def latest_sterilize_job_for_household(self, household_id: int) -> dict | None:
"""Most recent job (by started_at) for the household — used by the
UI to figure out what to render on /sterilize."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_sterilize_jobs
WHERE household_id=%s
ORDER BY started_at DESC
LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def running_sterilize_job_for_household(self, household_id: int) -> dict | None:
"""Active (running or applying) job; used to gate concurrent starts."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_sterilize_jobs
WHERE household_id=%s AND state IN ('running','applying')
ORDER BY started_at DESC
LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def update_sterilize_job_progress(
self,
job_id: int,
*,
processed_delta: int = 0,
skipped_delta: int = 0,
error_delta: int = 0,
current_slug: str | None = None,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_sterilize_jobs
SET processed_count = processed_count + %s,
skipped_count = skipped_count + %s,
error_count = error_count + %s,
current_slug = %s,
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s
""",
(processed_delta, skipped_delta, error_delta,
current_slug, last_error, job_id),
)
def finalize_sterilize_job(self, job_id: int, *, state: str) -> None:
"""Move job to a new state. Will NOT overwrite a terminal state
(done / failed / cancelled) — that's the anti-zombie guard that
keeps user cancels from being silently replaced when the daemon
thread limps to the finish line.
Allowed source states: running, applying, review. The review state
is part of the normal flow (walk done → review → user approves
→ applying), so transitions out of review must work."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_sterilize_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW(),
current_slug = NULL
WHERE id=%s
AND state NOT IN ('done','failed','cancelled')
""",
(state, state, job_id),
)
def insert_sterilize_proposal(
self,
*,
job_id: int,
recipe_slug: str,
recipe_name: str | None,
ingredient_count: int,
proposal_json: str | None,
preview_error: str | None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_sterilize_proposals
(job_id, recipe_slug, recipe_name, ingredient_count,
proposal_json, preview_error)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
recipe_name=VALUES(recipe_name),
ingredient_count=VALUES(ingredient_count),
proposal_json=VALUES(proposal_json),
preview_error=VALUES(preview_error)
""",
(job_id, recipe_slug, (recipe_name or "")[:500],
ingredient_count, proposal_json, (preview_error or "")[:500] or None),
)
def list_sterilize_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT recipe_slug, recipe_name, ingredient_count,
proposal_json, preview_error,
approved, applied_at, apply_error
FROM cauldron_sterilize_proposals
WHERE job_id=%s
ORDER BY recipe_name ASC""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def set_proposal_approval(
self, job_id: int, recipe_slug: str, approved: bool
) -> bool:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_proposals
SET approved=%s
WHERE job_id=%s AND recipe_slug=%s""",
(1 if approved else 0, job_id, recipe_slug),
)
return cur.rowcount > 0
def bulk_set_proposal_approvals(
self, job_id: int, approved_slugs: list[str]
) -> None:
"""Set approved=TRUE for the listed slugs, FALSE for everything else
on the job. Idempotent — safe to call before /bulk-apply."""
with self.conn() as c, c.cursor() as cur:
# Reset everything to FALSE first (NULL stays NULL only on rows
# that aren't part of this job, which is none)
cur.execute(
"UPDATE cauldron_sterilize_proposals SET approved=0 WHERE job_id=%s",
(job_id,),
)
if approved_slugs:
placeholders = ",".join(["%s"] * len(approved_slugs))
cur.execute(
f"""UPDATE cauldron_sterilize_proposals SET approved=1
WHERE job_id=%s AND recipe_slug IN ({placeholders})""",
(job_id, *approved_slugs),
)
def mark_proposal_applied(
self, job_id: int, recipe_slug: str, *, error: str | None = None
) -> None:
"""On success: applied_at=NOW(), apply_error=NULL. On error: leave
applied_at NULL so a rerun can retry, but record the error for
review. The list_approved_unapplied_proposals query keys off
applied_at IS NULL, so this directly drives retryability."""
if error:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_proposals
SET apply_error=%s
WHERE job_id=%s AND recipe_slug=%s""",
(error[:500], job_id, recipe_slug),
)
else:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_proposals
SET applied_at=NOW(), apply_error=NULL
WHERE job_id=%s AND recipe_slug=%s""",
(job_id, recipe_slug),
)
def list_approved_unapplied_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT recipe_slug, recipe_name
FROM cauldron_sterilize_proposals
WHERE job_id=%s AND approved=1 AND applied_at IS NULL""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
# --- consolidate (foods merge) jobs -------------------------------------
def create_consolidate_job(
self, *, household_id: int, started_by_sub: str
) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_consolidate_jobs
(household_id, started_by_sub, state)
VALUES (%s, %s, 'running')""",
(household_id, started_by_sub),
)
return cur.lastrowid
def get_consolidate_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT * FROM cauldron_consolidate_jobs WHERE id=%s", (job_id,))
return cur.fetchone()
def get_consolidate_job_state(self, job_id: int) -> str | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT state FROM cauldron_consolidate_jobs WHERE id=%s", (job_id,))
row = cur.fetchone()
return row["state"] if row else None
def latest_consolidate_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_consolidate_jobs
WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def running_consolidate_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_consolidate_jobs
WHERE household_id=%s AND state IN ('running','applying')
ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def update_consolidate_job_progress(
self,
job_id: int,
*,
processed_delta: int = 0,
merged_delta: int = 0,
error_delta: int = 0,
current_cluster: str | None = None,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_consolidate_jobs
SET processed_count = processed_count + %s,
merged_count = merged_count + %s,
error_count = error_count + %s,
current_cluster = COALESCE(%s, current_cluster),
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s""",
(processed_delta, merged_delta, error_delta,
current_cluster, last_error, job_id),
)
def finalize_consolidate_job(self, job_id: int, *, state: str) -> None:
"""Same anti-zombie guard as finalize_sterilize_job — won't overwrite
a terminal state (done/failed/cancelled), but allows the normal
running→review→applying→done flow."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_consolidate_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW(),
current_cluster = NULL
WHERE id=%s
AND state NOT IN ('done','failed','cancelled')""",
(state, state, job_id),
)
def insert_consolidate_proposal(
self,
*,
job_id: int,
cluster_key: str,
cluster: list[dict],
decision: dict | None,
error: str | None,
) -> None:
import json as _j
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_consolidate_proposals
(job_id, cluster_key, cluster_size, cluster_json,
sonnet_decision, apply_error)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
cluster_size=VALUES(cluster_size),
cluster_json=VALUES(cluster_json),
sonnet_decision=VALUES(sonnet_decision),
apply_error=VALUES(apply_error)""",
(
job_id, cluster_key[:255], len(cluster),
_j.dumps(cluster, ensure_ascii=False, default=str),
_j.dumps(decision, ensure_ascii=False) if decision else None,
(error or "")[:500] or None,
),
)
def list_consolidate_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, cluster_key, cluster_size, cluster_json,
sonnet_decision, approved, applied_at, apply_error
FROM cauldron_consolidate_proposals
WHERE job_id=%s
ORDER BY cluster_size DESC, cluster_key""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def list_approved_unapplied_consolidate(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, cluster_key, cluster_size, sonnet_decision
FROM cauldron_consolidate_proposals
WHERE job_id=%s AND approved=1 AND applied_at IS NULL""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def bulk_set_consolidate_approvals(
self, job_id: int, approved_ids: list[int]
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_consolidate_proposals SET approved=0 WHERE job_id=%s",
(job_id,),
)
if approved_ids:
placeholders = ",".join(["%s"] * len(approved_ids))
cur.execute(
f"""UPDATE cauldron_consolidate_proposals SET approved=1
WHERE job_id=%s AND id IN ({placeholders})""",
(job_id, *approved_ids),
)
def mark_consolidate_proposal_applied(
self, proposal_id: int, *, error: str | None = None
) -> None:
if error:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_consolidate_proposals
SET apply_error=%s
WHERE id=%s""",
(error[:500], proposal_id),
)
else:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_consolidate_proposals
SET applied_at=NOW(), apply_error=NULL
WHERE id=%s""",
(proposal_id,),
)
# --- recipe-dedupe jobs ------------------------------------------------
def create_recipe_dedupe_job(self, *, household_id: int, started_by_sub: str) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_recipe_dedupe_jobs
(household_id, started_by_sub, state)
VALUES (%s, %s, 'running')""",
(household_id, started_by_sub),
)
return cur.lastrowid
def get_recipe_dedupe_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT * FROM cauldron_recipe_dedupe_jobs WHERE id=%s", (job_id,))
return cur.fetchone()
def get_recipe_dedupe_job_state(self, job_id: int) -> str | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT state FROM cauldron_recipe_dedupe_jobs WHERE id=%s", (job_id,))
row = cur.fetchone()
return row["state"] if row else None
def latest_recipe_dedupe_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_recipe_dedupe_jobs
WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def running_recipe_dedupe_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_recipe_dedupe_jobs
WHERE household_id=%s AND state IN ('running','applying')
ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def update_recipe_dedupe_job_progress(
self,
job_id: int,
*,
processed_delta: int = 0,
deleted_delta: int = 0,
error_delta: int = 0,
current_cluster: str | None = None,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_recipe_dedupe_jobs
SET processed_count = processed_count + %s,
deleted_count = deleted_count + %s,
error_count = error_count + %s,
current_cluster = COALESCE(%s, current_cluster),
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s""",
(processed_delta, deleted_delta, error_delta,
current_cluster, last_error, job_id),
)
def finalize_recipe_dedupe_job(self, job_id: int, *, state: str) -> None:
"""Same anti-zombie guard as the others — won't overwrite terminal."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_recipe_dedupe_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW(),
current_cluster = NULL
WHERE id=%s
AND state NOT IN ('done','failed','cancelled')""",
(state, state, job_id),
)
def insert_recipe_dedupe_proposal(
self,
*,
job_id: int,
cluster_key: str,
cluster: list[dict],
decision: dict | None,
error: str | None,
) -> None:
import json as _j
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_recipe_dedupe_proposals
(job_id, cluster_key, cluster_size, cluster_json,
sonnet_decision, apply_error)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
cluster_size=VALUES(cluster_size),
cluster_json=VALUES(cluster_json),
sonnet_decision=VALUES(sonnet_decision),
apply_error=VALUES(apply_error)""",
(
job_id, cluster_key[:255], len(cluster),
_j.dumps(cluster, ensure_ascii=False, default=str),
_j.dumps(decision, ensure_ascii=False) if decision else None,
(error or "")[:500] or None,
),
)
def list_recipe_dedupe_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, cluster_key, cluster_size, cluster_json,
sonnet_decision, approved, applied_at, apply_error
FROM cauldron_recipe_dedupe_proposals
WHERE job_id=%s
ORDER BY cluster_size DESC, cluster_key""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def list_approved_unapplied_recipe_dedupe(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, cluster_key, cluster_size, sonnet_decision
FROM cauldron_recipe_dedupe_proposals
WHERE job_id=%s AND approved=1 AND applied_at IS NULL""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def bulk_set_recipe_dedupe_approvals(self, job_id: int, approved_ids: list[int]) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_proposals SET approved=0 WHERE job_id=%s",
(job_id,),
)
if approved_ids:
placeholders = ",".join(["%s"] * len(approved_ids))
cur.execute(
f"""UPDATE cauldron_recipe_dedupe_proposals SET approved=1
WHERE job_id=%s AND id IN ({placeholders})""",
(job_id, *approved_ids),
)
def mark_recipe_dedupe_proposal_applied(
self, proposal_id: int, *, error: str | None = None
) -> None:
if error:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_proposals SET apply_error=%s WHERE id=%s",
(error[:500], proposal_id),
)
else:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_proposals SET applied_at=NOW(), apply_error=NULL WHERE id=%s",
(proposal_id,),
)
# --- recipe enrichment ------------------------------------------------
# Bump when the meta schema or prompt changes meaningfully so existing
# rows get re-enriched on next walk (or on user-clicks "force re-enrich").
# v2: added calories, protein_g, carbs_g, fat_g per-serving estimates
# v3: added contains.{dairy,gluten,nuts,peanuts,eggs,shellfish,fish,soy,
# sesame,pork} allergen booleans
# v4: added pairings, mood scores, leftover_potential, plus second-pass
# allergen verification to clean false-positive contains.* booleans
# v5: split estimated_minutes → active+hands_off, equipment[], flavor_profile[],
# kid_friendly_score, fiber_g + sodium_mg, cost_per_serving_estimate,
# occasion_fit[], hecate_quip
ENRICH_VERSION = 5
def get_recipe_meta(self, household_id: int, recipe_slug: str) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT recipe_slug, meta_json, enrich_version, last_enriched_at
FROM cauldron_recipe_meta
WHERE household_id=%s AND recipe_slug=%s""",
(household_id, recipe_slug),
)
row = cur.fetchone()
return dict(row) if row else None
def household_picker_profiles(
self, household_id: int, *, lookback_days: int = 365
) -> dict[str, dict]:
"""For each member of the household, aggregate the recipes they've
explicitly picked (current cauldron_meal_picks + historical
meal_plan_slots.picker_subs across the lookback window). Cross
with cauldron_recipe_meta to derive their picking pattern:
{
"cobb@sulkta.com": {
"display_name": "cobb",
"total_picks": 24,
"cuisines": {"asian": 6, "mexican": 4, "italian": 3, ...},
"proteins": {"chicken": 8, "beef": 5, "fish": 2, ...},
"comfort_tiers": {"weeknight-easy": 15, "hearty-comfort": 4, ...},
"tags": {"high-protein": 9, "weeknight": 11, "spicy": 7, ...},
},
...
}
Used by the plan generator to bias AI-chosen slots toward what
each member has historically wanted, even when nobody actively
pins for the upcoming week. Picks-attribution is golden signal —
the user explicitly said 'I want this.'"""
import json as _json
# Step 1: gather (sub, recipe_slug) pairs from both sources
attributions: list[tuple[str, str]] = []
with self.conn() as c, c.cursor() as cur:
# Current picks (high-signal, recent activity)
cur.execute(
"""SELECT mp.authentik_sub AS sub, mp.recipe_slug
FROM cauldron_meal_picks mp
JOIN cauldron_household_members m
ON m.authentik_sub = mp.authentik_sub
WHERE m.household_id = %s""",
(household_id,),
)
for r in cur.fetchall():
attributions.append((r["sub"], r["recipe_slug"]))
# Historical picker_subs in plan slots (what actually got cooked
# because someone wanted it)
cur.execute(
"""SELECT s.recipe_slug, s.picker_subs
FROM cauldron_meal_plan_slots s
JOIN cauldron_meal_plans p ON p.id = s.plan_id
WHERE p.household_id = %s
AND p.week_start >= CURDATE() - INTERVAL %s DAY
AND s.picker_subs IS NOT NULL""",
(household_id, lookback_days),
)
for r in cur.fetchall():
blob = r["picker_subs"]
if isinstance(blob, str):
try:
blob = _json.loads(blob)
except Exception:
continue
if not isinstance(blob, list):
continue
for sub in blob:
if isinstance(sub, str) and sub:
attributions.append((sub, r["recipe_slug"]))
if not attributions:
return {}
# Step 2: pull recipe meta for any slug in the attribution set
slugs = list({s for _, s in attributions})
meta_by_slug: dict[str, dict] = {}
if slugs:
placeholders = ",".join(["%s"] * len(slugs))
with self.conn() as c, c.cursor() as cur:
cur.execute(
f"""SELECT recipe_slug, meta_json
FROM cauldron_recipe_meta
WHERE household_id=%s AND recipe_slug IN ({placeholders})""",
(household_id, *slugs),
)
for r in cur.fetchall():
blob = r["meta_json"]
if isinstance(blob, str):
try:
meta_by_slug[r["recipe_slug"]] = _json.loads(blob)
except Exception:
pass
elif isinstance(blob, dict):
meta_by_slug[r["recipe_slug"]] = blob
# Step 3: pull display names for the subs we found
subs = list({s for s, _ in attributions})
display_by_sub: dict[str, str] = {}
if subs:
placeholders = ",".join(["%s"] * len(subs))
with self.conn() as c, c.cursor() as cur:
cur.execute(
f"""SELECT authentik_sub, email, display_name
FROM cauldron_users
WHERE authentik_sub IN ({placeholders})""",
tuple(subs),
)
for r in cur.fetchall():
nm = r["display_name"] or (r["email"] or "").split("@")[0] or r["authentik_sub"]
display_by_sub[r["authentik_sub"]] = nm
# Step 4: aggregate per user
from collections import Counter
per_user: dict[str, dict] = {}
for sub, slug in attributions:
meta = meta_by_slug.get(slug) or {}
prof = per_user.setdefault(sub, {
"display_name": display_by_sub.get(sub, sub),
"total_picks": 0,
"cuisines": Counter(),
"proteins": Counter(),
"comfort_tiers": Counter(),
"tags": Counter(),
})
prof["total_picks"] += 1
cuisine = meta.get("cuisine")
if cuisine and cuisine not in ("unknown", "other"):
prof["cuisines"][cuisine] += 1
prot = meta.get("primary_protein")
if prot and prot != "none":
prof["proteins"][prot] += 1
tier = meta.get("comfort_tier")
if tier:
prof["comfort_tiers"][tier] += 1
for t in (meta.get("tags") or [])[:8]:
if t:
prof["tags"][t] += 1
# Step 5: convert Counters to plain top-N dicts for prompt-friendly output
out: dict[str, dict] = {}
for sub, prof in per_user.items():
out[sub] = {
"display_name": prof["display_name"],
"total_picks": prof["total_picks"],
"cuisines": dict(prof["cuisines"].most_common(5)),
"proteins": dict(prof["proteins"].most_common(5)),
"comfort_tiers": dict(prof["comfort_tiers"].most_common(3)),
"tags": dict(prof["tags"].most_common(8)),
}
return out
def household_recipe_history(
self, household_id: int, *, lookback_days: int = 180
) -> dict[str, dict]:
"""For each recipe slug that's appeared in this household's meal plans
within the lookback window, return:
{recipe_slug: {last_planned: date, count_30d: int, count_180d: int}}
Used by the plan generator to surface rotation context to Sonnet
("don't suggest the same recipe 3 weeks running"). Joins on
meal_plan_slots → meal_plans → household."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
SELECT
s.recipe_slug,
MAX(p.week_start) AS last_planned,
SUM(CASE WHEN p.week_start >= CURDATE() - INTERVAL 30 DAY THEN 1 ELSE 0 END) AS count_30d,
SUM(CASE WHEN p.week_start >= CURDATE() - INTERVAL %s DAY THEN 1 ELSE 0 END) AS count_long
FROM cauldron_meal_plan_slots s
JOIN cauldron_meal_plans p ON p.id = s.plan_id
WHERE p.household_id = %s
AND p.week_start >= CURDATE() - INTERVAL %s DAY
GROUP BY s.recipe_slug
""",
(lookback_days, household_id, lookback_days),
)
out: dict[str, dict] = {}
for r in cur.fetchall():
out[r["recipe_slug"]] = {
"last_planned": r["last_planned"],
"count_30d": int(r["count_30d"] or 0),
"count_long": int(r["count_long"] or 0),
}
return out
def list_recipe_meta_for_household(self, household_id: int) -> list[dict]:
"""Used by the plan generator to splice meta into the recipe pool prompt."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT recipe_slug, meta_json, enrich_version
FROM cauldron_recipe_meta
WHERE household_id=%s""",
(household_id,),
)
return [dict(r) for r in cur.fetchall()]
def upsert_recipe_meta(
self,
*,
household_id: int,
recipe_slug: str,
meta_json: str,
version: int,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_recipe_meta
(household_id, recipe_slug, meta_json, enrich_version)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
meta_json = VALUES(meta_json),
enrich_version = VALUES(enrich_version)""",
(household_id, recipe_slug, meta_json, version),
)
def create_enrich_job(self, *, household_id: int, started_by_sub: str) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_enrich_jobs
(household_id, started_by_sub, state)
VALUES (%s, %s, 'running')""",
(household_id, started_by_sub),
)
return cur.lastrowid
def get_enrich_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT * FROM cauldron_enrich_jobs WHERE id=%s", (job_id,))
return cur.fetchone()
def get_enrich_job_state(self, job_id: int) -> str | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT state FROM cauldron_enrich_jobs WHERE id=%s", (job_id,))
row = cur.fetchone()
return row["state"] if row else None
def latest_enrich_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_enrich_jobs
WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def running_enrich_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_enrich_jobs
WHERE household_id=%s AND state='running'
ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def update_enrich_job_progress(
self,
job_id: int,
*,
enriched_delta: int = 0,
skipped_delta: int = 0,
error_delta: int = 0,
current_slug: str | None = None,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_enrich_jobs
SET enriched_count = enriched_count + %s,
skipped_count = skipped_count + %s,
error_count = error_count + %s,
current_slug = COALESCE(%s, current_slug),
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s""",
(enriched_delta, skipped_delta, error_delta,
current_slug, last_error, job_id),
)
def finalize_enrich_job(self, job_id: int, *, state: str) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_enrich_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW(),
current_slug = NULL
WHERE id=%s
AND state NOT IN ('done','failed','cancelled')""",
(state, state, job_id),
)
def fail_stuck_enrich_jobs(self, *, stale_minutes: int = 15) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_enrich_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error, 'recovery: worker exited mid-run')
WHERE state='running'
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount
def fail_stuck_recipe_dedupe_jobs(self, *, stale_minutes: int = 15) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_recipe_dedupe_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error,
'recovery: worker exited mid-run')
WHERE state IN ('running','applying')
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount
def fail_stuck_consolidate_jobs(self, *, stale_minutes: int = 15) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_consolidate_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error,
'recovery: worker exited mid-run')
WHERE state IN ('running','applying')
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount
# --- sterilize stuck-job recovery (already used at boot) ---------------
def fail_stuck_sterilize_jobs(self, *, stale_minutes: int = 10) -> int:
"""Recover jobs stuck in 'running'/'applying' with no progress for
N minutes. Called at app startup. Returns count of jobs failed."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error,
'recovery: worker exited mid-run')
WHERE state IN ('running','applying')
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount
# --- discover (Discover v0.1) ------------------------------------------
def insert_discovered_recipe(
self,
*,
slug: str | None,
source_url: str,
name: str | None,
description: str | None,
image_url: str | None,
scraped_json: str,
) -> int | None:
"""INSERT a freshly-scraped recipe in 'raw' state. Returns the new
row id, or None if the source_url was already present (UNIQUE
violation = duplicate scrape, treat as skip)."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT IGNORE INTO cauldron_discovered_recipes
(slug, source_url, name, description, image_url,
scraped_json, status, scraped_at, last_action_at)
VALUES (%s, %s, %s, %s, %s, %s, 'raw', NOW(), NOW())""",
(slug, source_url[:768], name, description, image_url, scraped_json),
)
return cur.lastrowid or None
def update_discovered_meta(
self, discover_id: int, *, meta_json: str, version: int
) -> None:
"""Persist enriched metadata + flip status raw → enriched."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_discovered_recipes
SET meta_json=%s,
enrich_version=%s,
status=CASE WHEN status='raw' THEN 'enriched'
ELSE status END,
last_action_at=NOW()
WHERE id=%s""",
(meta_json, version, discover_id),
)
def set_discovered_status(self, discover_id: int, status: str) -> None:
"""Move a discovered recipe's GLOBAL status. Use for 'rejected' only;
per-household 'imported' is now tracked in cauldron_discover_imports
(see record_discover_import). Kept for back-compat — the column
stays for ENUM compatibility."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_discovered_recipes
SET status=%s, last_action_at=NOW()
WHERE id=%s""",
(status, discover_id),
)
def record_discover_import(
self,
*,
discover_id: int,
household_id: int,
mealie_slug: str,
imported_by_sub: str | None,
) -> None:
"""Record that a household has imported a discovered recipe into its
Mealie library. INSERT IGNORE so a duplicate POST is a no-op rather
than a 409 — the user can click import twice and we just keep the
first slug we recorded. Composite PK (discover_id, household_id)."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT IGNORE INTO cauldron_discover_imports
(discover_id, household_id, mealie_slug, imported_by_sub)
VALUES (%s, %s, %s, %s)""",
(discover_id, household_id, mealie_slug[:255], imported_by_sub),
)
def get_discover_imports_for_group(
self, *, mealie_group_id: str
) -> dict[int, dict]:
"""Return {discover_id: {household_id, household_name, mealie_slug,
imported_at}} for every import by ANY household in the given Mealie
group. The /discover search endpoint uses this to mark cards as
'already in your group's library' since Mealie's group-scope read
means a recipe imported by household A is visible to household B."""
if not mealie_group_id:
return {}
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT i.discover_id, i.household_id, h.name AS household_name,
h.mealie_household_id, i.mealie_slug, i.imported_at
FROM cauldron_discover_imports i
JOIN cauldron_households h ON h.id = i.household_id
WHERE h.mealie_group_id = %s""",
(mealie_group_id,),
)
out: dict[int, dict] = {}
for r in cur.fetchall() or []:
did = r["discover_id"]
row = dict(r)
if hasattr(row.get("imported_at"), "isoformat"):
row["imported_at"] = row["imported_at"].isoformat()
# First-import-wins per discover_id (oldest reaches us first
# by PK order, but we don't sort — preserve insertion order)
out.setdefault(did, row)
return out
def discover_imported_by_household(
self, *, discover_id: int, household_id: int
) -> dict | None:
"""Look up the import record for one (discover, household) pair.
Used by /api/discover/import to detect re-imports."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT discover_id, household_id, mealie_slug,
imported_by_sub, imported_at
FROM cauldron_discover_imports
WHERE discover_id=%s AND household_id=%s""",
(discover_id, household_id),
)
row = cur.fetchone()
if row and hasattr(row.get("imported_at"), "isoformat"):
row["imported_at"] = row["imported_at"].isoformat()
return row
def get_discovered_recipe(self, discover_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT * FROM cauldron_discovered_recipes WHERE id=%s",
(discover_id,),
)
return cur.fetchone()
def list_discovered_recipes(
self,
*,
status: str | list[str] | None = "enriched",
q: str | None = None,
cuisine: str | None = None,
complexity: str | None = None,
primary_protein: str | None = None,
meal_type: str | None = None,
kid_friendly_min: int | None = None,
max_minutes: int | None = None,
limit: int = 60,
offset: int = 0,
) -> list[dict]:
"""Browse discovered recipes with filters. Status defaults to
'enriched' so the /discover page surfaces only ready-to-import
rows. JSON path filters use MySQL JSON_EXTRACT against meta_json."""
where = []
args: list = []
if status is not None:
if isinstance(status, list):
if not status:
return []
placeholders = ",".join(["%s"] * len(status))
where.append(f"status IN ({placeholders})")
args.extend(status)
else:
where.append("status = %s")
args.append(status)
if q:
where.append("MATCH(name, description) AGAINST (%s IN NATURAL LANGUAGE MODE)")
args.append(q)
if cuisine:
where.append("JSON_UNQUOTE(JSON_EXTRACT(meta_json, '$.cuisine')) = %s")
args.append(cuisine)
if complexity:
where.append("JSON_UNQUOTE(JSON_EXTRACT(meta_json, '$.complexity')) = %s")
args.append(complexity)
if primary_protein:
where.append("JSON_UNQUOTE(JSON_EXTRACT(meta_json, '$.primary_protein')) = %s")
args.append(primary_protein)
if meal_type:
where.append("JSON_UNQUOTE(JSON_EXTRACT(meta_json, '$.meal_type')) = %s")
args.append(meal_type)
if kid_friendly_min is not None:
where.append("CAST(JSON_EXTRACT(meta_json, '$.kid_friendly_score') AS UNSIGNED) >= %s")
args.append(kid_friendly_min)
if max_minutes is not None:
where.append("CAST(JSON_EXTRACT(meta_json, '$.estimated_minutes') AS UNSIGNED) <= %s")
args.append(max_minutes)
sql = "SELECT * FROM cauldron_discovered_recipes"
if where:
sql += " WHERE " + " AND ".join(where)
# Relevance-rank when there's a search query, else newest-first
if q:
sql += " ORDER BY MATCH(name, description) AGAINST (%s IN NATURAL LANGUAGE MODE) DESC, scraped_at DESC"
args.append(q)
else:
sql += " ORDER BY scraped_at DESC"
sql += " LIMIT %s OFFSET %s"
args.extend([int(limit), int(offset)])
with self.conn() as c, c.cursor() as cur:
cur.execute(sql, args)
return list(cur.fetchall() or [])
def count_discovered_by_status(self) -> dict[str, int]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT status, COUNT(*) AS n
FROM cauldron_discovered_recipes GROUP BY status"""
)
return {r["status"]: int(r["n"]) for r in (cur.fetchall() or [])}
def create_discover_job(
self, *, started_by_sub: str, source_seed: str
) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_discover_jobs
(started_by_sub, source_seed, state)
VALUES (%s, %s, 'running')""",
(started_by_sub, source_seed[:255]),
)
return cur.lastrowid
def get_discover_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT * FROM cauldron_discover_jobs WHERE id=%s", (job_id,)
)
return cur.fetchone()
def get_discover_job_state(self, job_id: int) -> str | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT state FROM cauldron_discover_jobs WHERE id=%s", (job_id,)
)
row = cur.fetchone()
return row["state"] if row else None
def latest_discover_job(self) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_discover_jobs
ORDER BY started_at DESC LIMIT 1"""
)
return cur.fetchone()
def running_discover_job(self) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_discover_jobs
WHERE state='running' ORDER BY started_at DESC LIMIT 1"""
)
return cur.fetchone()
def update_discover_job_progress(
self,
job_id: int,
*,
pages_delta: int = 0,
added_delta: int = 0,
skipped_delta: int = 0,
error_delta: int = 0,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_discover_jobs
SET pages_scraped = pages_scraped + %s,
recipes_added = recipes_added + %s,
skipped_count = skipped_count + %s,
error_count = error_count + %s,
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s""",
(pages_delta, added_delta, skipped_delta, error_delta,
last_error[:500] if last_error else None, job_id),
)
def finalize_discover_job(self, job_id: int, *, state: str) -> None:
"""Anti-zombie guard: only update if the job isn't already in a
terminal state. Mirrors finalize_enrich_job."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_discover_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW()
WHERE id=%s
AND state NOT IN ('done','failed','cancelled')""",
(state, state, job_id),
)
def fail_stuck_discover_jobs(self, *, stale_minutes: int = 15) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_discover_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error,
'recovery: worker exited mid-run')
WHERE state='running'
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount