cauldron/cauldron/server.py
Kayos 69e05b1f92 Step 3: foods consolidator — cluster + merge dupes via Mealie's API
New bulk job that scans Mealie's foods table for duplicate-feeling
clusters, asks Sonnet to pick the canonical survivor + flag the rest
as merge candidates, and uses Mealie's PUT /api/foods/merge to
consolidate. After each successful merge, alias_additions get pushed
onto the survivor so Mealie's CRF/NLP parser fuzzy-matches the
discarded variant names from then on.

Architecture mirrors bulk_sterilize.py:
- Migrations 018+019 add cauldron_consolidate_jobs +
  cauldron_consolidate_proposals (state machine: running → review →
  applying → done/failed/cancelled)
- New consolidate_foods.py — daemon-thread runner with cancel-respect
  and stuck-job recovery
- /api/foods/consolidate-{start,status,jobs,apply,cancel} for session
  users + /api/admin/foods/consolidate-{start,jobs,cancel} for kayos

Sonnet integration:
- forge.cluster_decision(foods) → returns {merge, canonical_id,
  canonical_name, discard_ids, alias_additions, reason}
- Conservative-by-default: when in doubt Sonnet returns merge=false
  (the "olive oil vs olive" false-positive case from the prompt)
- Alias rules in the prompt explain why we want discarded names to
  travel back to the survivor as aliases (parser future-proofing)

Mealie integration:
- mealie.merge_foods(from_id, to_id) → PUT /api/foods/merge
- mealie.update_food(food_id, body) → for pushing aliases onto the
  survivor after merges land
- Apply path catches 403/permission errors and surfaces them as the
  per-cluster apply_error (cross-household merge attempts will fail
  here, same way as the sterilize cross-household path)

Clustering:
- rapidfuzz token_set_ratio ≥ 88 (slightly stricter than Mealie's
  parser threshold of 85 to reduce false-positive clusters)
- Single-link agglomerative — O(n²) but Cobb's ~3000 foods = ~9M
  comparisons, runs in seconds
- Singleton clusters (no merge candidates) are dropped, not stored

UI:
- /consolidate — same shape as /sterilize: progress bar → review grid
  → apply button. Cards show member chips with the canonical marked
  ★, discards marked × in red, alias_additions listed in green, plus
  Sonnet's one-line reasoning. Mergeable approved by default; user
  toggles individual clusters off if they disagree.
- Linked from /me → tools section, alongside bulk sterilize.

Total: ~600 LoC across 6 files. Foundation for the "Mealie owns
canonical names" architectural rule is now actually enforceable —
cobb runs this once, his foods table gets cleaned up, and Sonnet's
catalog-aware parser (Step 1) starts matching aliases for free.
2026-04-30 12:00:20 -07:00

1446 lines
54 KiB
Python

"""Flask app — v0.2 foundation.
Adds Authentik OIDC + sulkta-mariadb DB + Fernet crypto for per-user Mealie
tokens. v0.1 admin endpoints stay (still bearer-gated for now); user-facing
routes start using OIDC sessions.
Routes (current):
GET /healthz liveness, no auth
GET / redirects to /login if no session,
else /me
GET /login start OIDC flow
GET /auth/callback OIDC callback
POST /logout clear session
GET /me "who am I" page (json for now)
GET /connect-mealie prompt for Mealie token
POST /connect-mealie store encrypted token
POST /disconnect-mealie delete encrypted token
GET /api/recipes (admin bearer) proxy Mealie list
POST /api/sterilize/preview/<slug> (admin bearer) v0.1 sterilizer
POST /api/sterilize/apply/<slug> (admin bearer) v0.1 sterilizer
"""
from datetime import date, datetime, timedelta
from functools import wraps
import requests
from authlib.integrations.base_client.errors import MismatchingStateError, OAuthError
from flask import Flask, jsonify, redirect, render_template, request, session, url_for
from requests.exceptions import ConnectionError as RequestsConnectionError
from .config import load
from .crypto import TokenCrypto
from .db import DB
from .forge import Forge, ForgeError
from . import aggregator, bulk_sterilize, consolidate_foods, foods
from .mealie import Mealie, MealieError
from .oidc import init_oauth
from .recipe_index import flatten_recipe, refresh_household_index, search_index
from .sterilizer import Sterilizer
cfg = load()
db = DB(host=cfg.db_host, port=cfg.db_port, name=cfg.db_name, user=cfg.db_user, password=cfg.db_password)
crypto = TokenCrypto(cfg.fernet_key)
forge = Forge(
base_url=cfg.clawdforge_url,
token=cfg.clawdforge_token,
default_model=cfg.default_model,
default_timeout=cfg.default_timeout_secs,
)
# System-tier Mealie client (Cobb's "Cauldron" token; admin batch ops only)
system_mealie = Mealie(base_url=cfg.mealie_api_url, api_token=cfg.mealie_api_token)
system_sterilizer = Sterilizer(mealie=system_mealie, forge=forge, model=cfg.default_model)
def create_app() -> Flask:
app = Flask(__name__)
app.secret_key = cfg.secret_key
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
# NOT setting SESSION_COOKIE_SECURE=True — LAN is plain HTTP for now.
# If we ever front this with TLS, flip secure=True.
)
# Apply migrations on startup
applied = db.migrate()
if applied:
app.logger.info("applied migrations: %s", applied)
# One-time backfill: re-key the legacy cauldron_foods (USDA + curated)
# densities into cauldron_food_metadata, keyed by Mealie food.id. Runs
# only when the new metadata table is empty. The system MEALIE_API_TOKEN
# may be expired (Cobb's "Cauldron" token was minted long ago); fall
# back to any stored per-user token since household admins can list
# all foods anyway.
def _resolve_backfill_mealie() -> "Mealie":
try:
system_mealie.who_am_i()
return system_mealie
except Exception:
stash = db.first_usable_mealie_token()
if not stash:
raise RuntimeError("no usable Mealie token (system + user both unavailable)")
return Mealie(
base_url=cfg.mealie_api_url,
api_token=crypto.decrypt(stash["encrypted_token"]),
)
try:
if foods.metadata_count(db) == 0:
mealie_for_backfill = _resolve_backfill_mealie()
stats = foods.backfill_seed_from_legacy(db, mealie_for_backfill)
app.logger.info(
"cauldron_food_metadata backfill: matched=%d missed=%d total_mealie=%d",
stats.get("matched", 0),
stats.get("missed", 0),
stats.get("total_mealie", 0),
)
except Exception as e:
app.logger.warning("food metadata backfill failed: %s", e)
# Recover sterilize jobs whose worker died mid-run. A new run should
# produce no false positives: gunicorn-sync workers reconnect cleanly,
# the threshold is conservative (10 minutes of zero progress).
try:
n_failed = db.fail_stuck_sterilize_jobs(stale_minutes=10)
if n_failed:
app.logger.info("failed %d stuck sterilize jobs at boot", n_failed)
except Exception as e:
app.logger.warning("sterilize stuck-job recovery failed: %s", e)
try:
n_failed = db.fail_stuck_consolidate_jobs(stale_minutes=15)
if n_failed:
app.logger.info("failed %d stuck consolidate jobs at boot", n_failed)
except Exception as e:
app.logger.warning("consolidate stuck-job recovery failed: %s", e)
oauth = init_oauth(
app,
issuer=cfg.oidc_issuer,
client_id=cfg.oidc_client_id,
client_secret=cfg.oidc_client_secret,
)
# ---------- helpers --------------------------------------------------
def require_bearer(fn):
@wraps(fn)
def w(*a, **kw):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return jsonify({"error": "missing bearer"}), 401
tok = auth[7:].strip()
if not _const_eq(tok, cfg.admin_bearer):
return jsonify({"error": "forbidden"}), 403
return fn(*a, **kw)
return w
def require_session(fn):
@wraps(fn)
def w(*a, **kw):
if not session.get("user"):
return redirect(url_for("login", next=request.path))
return fn(*a, **kw)
return w
def current_user_mealie() -> Mealie | None:
u = session.get("user")
if not u:
return None
blob = db.get_user_mealie_token_blob(u["sub"])
if not blob:
return None
try:
tok = crypto.decrypt(blob)
except Exception:
return None
return Mealie(base_url=cfg.mealie_api_url, api_token=tok)
def sync_user_household(sub: str) -> int | None:
"""Pull the user's Mealie household, upsert into cauldron, ensure
membership. Idempotent. Returns local household_id or None.
Mealie's user-self response shape varies across versions — the
`household` field can be a dict (with id+name+slug), a plain
slug-string, or absent. We also accept top-level householdId /
householdSlug. If we can't resolve a real ID, fall back to slug.
"""
client = current_user_mealie()
if not client:
return None
try:
me = client.who_am_i()
except Exception:
return None
h = me.get("household")
h_id_mealie: str | None = None
h_name: str | None = None
if isinstance(h, dict):
h_id_mealie = h.get("id") or h.get("slug")
h_name = h.get("name") or h.get("slug")
elif isinstance(h, str) and h:
# newer Mealie versions return household as a slug string
h_id_mealie = h
h_name = h
# Fall back to top-level fields
h_id_mealie = h_id_mealie or me.get("householdId") or me.get("household_id") or me.get("householdSlug") or me.get("household_slug")
h_name = h_name or me.get("householdName") or h_id_mealie or "default"
if not h_id_mealie:
return None
local_id = db.upsert_household(mealie_household_id=str(h_id_mealie), name=str(h_name))
existing = db.list_household_member_subs(local_id)
role = "admin" if not existing else "member"
db.add_household_member(local_id, sub, role=role)
return local_id
def current_household_id() -> int | None:
u = session.get("user")
if not u:
return None
h = db.get_user_household_id(u["sub"])
if h is None:
h = sync_user_household(u["sub"])
return h
def monday_of(d: date) -> date:
return d - timedelta(days=d.weekday())
# ---------- public ---------------------------------------------------
@app.get("/healthz")
def healthz():
upstream = {}
try:
upstream["clawdforge"] = forge.healthz()
except Exception as e:
upstream["clawdforge_error"] = str(e)
try:
with db.conn() as c, c.cursor() as cur:
cur.execute("SELECT 1 AS ok")
cur.fetchone()
upstream["db"] = "ok"
except Exception as e:
upstream["db_error"] = str(e)
return jsonify({"ok": True, "upstream": upstream})
@app.get("/")
def index():
if not session.get("user"):
return redirect(url_for("login"))
return redirect(url_for("me"))
@app.get("/login")
def login():
# Stash where to go after login
nxt = request.args.get("next") or "/me"
session["post_login_next"] = nxt
return oauth.cauldron.authorize_redirect(cfg.oidc_redirect_uri)
@app.get("/auth/callback")
def auth_callback():
# Wrap the OIDC exchange so transient DNS/JWKS hiccups (resolver
# blip on auth.sulkta.com → ConnectionError → 500) render a
# friendly retry page instead of dumping a stack trace, AND
# clear the stashed state so the user's retry doesn't trip the
# MismatchingState CSRF guard from a stale state cookie.
try:
token = oauth.cauldron.authorize_access_token()
except (RequestsConnectionError, requests.Timeout) as e:
app.logger.warning("OIDC callback: upstream unreachable: %s", e)
session.pop("_state_cauldron_authlib", None)
return render_template(
"auth_retry.html",
reason="upstream",
detail="couldn't reach the auth server — usually a momentary DNS or network blip.",
), 503
except MismatchingStateError:
# Stale state from a previous failed callback. Clear and ask
# the user to start a fresh login.
session.pop("_state_cauldron_authlib", None)
return render_template(
"auth_retry.html",
reason="stale",
detail="that login link expired (you probably retried after a blip). hit login again to start fresh.",
), 400
except OAuthError as e:
app.logger.warning("OIDC callback: oauth error: %s", e)
session.pop("_state_cauldron_authlib", None)
return render_template(
"auth_retry.html",
reason="oauth",
detail=f"auth handshake failed: {e}",
), 400
userinfo = token.get("userinfo") or oauth.cauldron.userinfo(token=token)
sub = userinfo.get("sub") or userinfo.get("email")
email = userinfo.get("email") or sub
name = userinfo.get("name") or userinfo.get("preferred_username")
if not sub or not email:
return ("missing sub/email in userinfo", 400)
db.upsert_user(sub=sub, email=email, display_name=name)
session["user"] = {"sub": sub, "email": email, "name": name}
return redirect(session.pop("post_login_next", "/me"))
@app.post("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.get("/me")
@require_session
def me():
u = session["user"]
connected = db.get_user_mealie_token_blob(u["sub"]) is not None
mealie_user = None
household_size = 0
if connected:
client = current_user_mealie()
if client:
try:
mealie_user = client.who_am_i()
except Exception:
mealie_user = None
# Lazy-sync household if not done yet
hid = current_household_id()
if hid:
household_size = len(db.list_household_member_subs(hid))
return render_template(
"me.html",
user=u, connected=connected, mealie_user=mealie_user,
household_size=household_size, active="me",
)
@app.get("/me.json")
@require_session
def me_json():
u = session["user"]
connected = db.get_user_mealie_token_blob(u["sub"]) is not None
return jsonify({"user": u, "mealie_connected": connected})
# ---------- mealie connect flow --------------------------------------
@app.get("/connect-mealie")
@require_session
def connect_mealie_get():
u = session["user"]
return render_template(
"connect.html",
user=u,
mealie_url=cfg.mealie_public_url,
active="me",
)
@app.post("/connect-mealie")
@require_session
def connect_mealie_post():
u = session["user"]
token = (request.form.get("mealie_token") or "").strip()
if not token:
return ("empty token", 400)
# Validate against Mealie before storing — don't persist a bad token
test = Mealie(base_url=cfg.mealie_api_url, api_token=token)
try:
test.who_am_i()
except MealieError as e:
return (f"token rejected by Mealie: {e}", 400)
blob = crypto.encrypt(token)
db.set_user_mealie_token_blob(u["sub"], blob)
# Sync household membership so the user immediately joins the shared
# picks pool + scoreboard (no manual admin step needed for first member).
sync_user_household(u["sub"])
return redirect(url_for("me"))
@app.post("/disconnect-mealie")
@require_session
def disconnect_mealie():
u = session["user"]
db.delete_user_mealie_token(u["sub"])
return redirect(url_for("me"))
# ---------- recipes (user-tier) --------------------------------------
@app.get("/recipes")
@require_session
def recipes_list():
client = current_user_mealie()
if not client:
return redirect(url_for("connect_mealie_get"))
u = session["user"]
sort = request.args.get("sort", "newest")
category = (request.args.get("cat") or "").strip()
per_page = 24
hid = current_household_id()
# Lazy refresh
state = db.get_index_state(hid) if hid else None
if hid and (not state or _index_stale(state)):
try:
refresh_household_index(mealie_client=client, db=db, household_id=hid)
state = db.get_index_state(hid)
except Exception as e:
app.logger.warning("recipe index refresh failed: %s", e)
# Categories for chips (still from Mealie — they're per-household)
categories: list[dict] = []
try:
cat_data = client.list_categories()
categories = cat_data.get("items") or []
except Exception:
pass
order_by_local, order_dir_local = _sort_to_local_order(sort)
items: list[dict] = []
total = 0
if hid:
rows = db.list_indexed_recipes(
hid, category=category or None,
order_by=order_by_local, order_dir=order_dir_local,
limit=per_page, offset=0,
)
pick_slugs = db.list_household_pick_slugs(hid)
items = [_index_row_to_card(r, pick_slugs) for r in rows]
total = (state or {}).get("recipe_count") or len(items)
pages = max(1, (total + per_page - 1) // per_page)
return render_template(
"recipes.html",
recipes=items, total=total, pages=pages,
sort=sort, category=category,
categories=categories,
active="recipes",
)
@app.get("/api/recipes.json")
@require_session
def recipes_json():
"""Recipes endpoint for the AJAX path. Two modes:
1. SEARCH (q given): hits the local cauldron_recipe_index, fuzzy-
ranks via rapidfuzz with multi-field weighting. Returns ranked
list, page=1 of however-many-matched. Refreshes index lazily
if stale (>5min) or empty.
2. BROWSE (no q): reads from local index ordered by sort key,
paginated. Falls back to Mealie if the index is empty (first
load before refresh completes).
"""
client = current_user_mealie()
if not client:
return jsonify({"error": "not connected"}), 409
u = session["user"]
page = max(1, int(request.args.get("page", "1")))
search = (request.args.get("q") or "").strip()
sort = request.args.get("sort", "newest")
category = (request.args.get("cat") or "").strip() or None
order_by_local, order_dir_local = _sort_to_local_order(sort)
per_page = 24
hid = current_household_id()
if not hid:
return jsonify({"items": [], "total": 0, "total_pages": 1, "next": None})
# Lazy index refresh
state = db.get_index_state(hid)
is_stale = (not state) or _index_stale(state)
if is_stale:
try:
refresh_household_index(mealie_client=client, db=db, household_id=hid)
except Exception as e:
app.logger.warning("recipe index refresh failed: %s", e)
pick_slugs = db.list_household_pick_slugs(hid)
if search:
# Pull all rows for the household, fuzzy-rank
rows = db.list_indexed_recipes(hid, category=category, limit=2000, offset=0)
ranked = search_index(rows, search, limit=80)
start = (page - 1) * per_page
slice_ = ranked[start:start + per_page]
items = [_index_row_to_card(r, pick_slugs) for r in slice_]
total = len(ranked)
total_pages = max(1, (total + per_page - 1) // per_page)
return jsonify({
"items": items,
"page": page,
"total": total,
"total_pages": total_pages,
"next": page + 1 if page < total_pages else None,
"scored": True,
})
# Browse mode
offset = (page - 1) * per_page
rows = db.list_indexed_recipes(
hid, category=category,
order_by=order_by_local, order_dir=order_dir_local,
limit=per_page + 1, offset=offset,
)
has_next = len(rows) > per_page
rows = rows[:per_page]
items = [_index_row_to_card(r, pick_slugs) for r in rows]
# total — cheap-ish: count only when we don't already know
total = (state or {}).get("recipe_count") or len(items)
total_pages = max(1, (total + per_page - 1) // per_page)
return jsonify({
"items": items,
"page": page,
"total": total,
"total_pages": total_pages,
"next": page + 1 if has_next else None,
"scored": False,
})
@app.post("/api/index/refresh")
@require_session
def index_refresh():
client = current_user_mealie()
hid = current_household_id()
if not client or not hid:
return jsonify({"error": "not ready"}), 409
try:
n = refresh_household_index(mealie_client=client, db=db, household_id=hid)
return jsonify({"ok": True, "count": n})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 502
@app.post("/api/picks/<slug>")
@require_session
def add_pick(slug: str):
u = session["user"]
name = (request.json or {}).get("name", "") if request.is_json else request.form.get("name", "")
if not name:
# Look it up from Mealie if missing
client = current_user_mealie()
if client:
try:
r = client.get_recipe(slug)
name = r.get("name") or slug
except Exception:
name = slug
else:
name = slug
added = db.add_meal_pick(u["sub"], slug, name)
return jsonify({"ok": True, "added": added, "slug": slug})
@app.delete("/api/picks/<slug>")
@require_session
def del_pick(slug: str):
u = session["user"]
removed = db.remove_meal_pick(u["sub"], slug)
return jsonify({"ok": True, "removed": removed, "slug": slug})
@app.get("/api/picks.json")
@require_session
def list_picks_json():
u = session["user"]
return jsonify({"picks": db.list_meal_picks(u["sub"])})
@app.get("/picks")
@require_session
def picks_view():
u = session["user"]
hid = current_household_id()
picks = db.list_household_picks_with_pickers(hid) if hid else []
my_picks = db.list_meal_pick_slugs(u["sub"])
for p in picks:
p["mine"] = p["slug"] in my_picks
return render_template(
"picks.html",
picks=picks,
active="picks",
household_size=len(db.list_household_member_subs(hid)) if hid else 0,
)
@app.get("/plan")
@require_session
def plan_view():
u = session["user"]
hid = current_household_id()
if not hid:
return redirect(url_for("connect_mealie_get"))
today = date.today()
this_monday = monday_of(today)
# Auto-lock any past unlocked weeks before reading
db.auto_lock_past_unlocked_plans(hid, this_monday)
plan = db.get_or_create_plan(hid, this_monday)
db.enrich_plan_with_slots(plan)
pick_count = len(db.list_household_pick_slugs(hid))
# Resolve display names for any subs we render — locked_by + every
# picker referenced by any slot. One round-trip; small set.
sub_display: dict[str, str] = _resolve_sub_displays(db, plan)
locked_by_display = None
if plan.get("locked_by_sub"):
locked_by_display = sub_display.get(plan["locked_by_sub"])
generated_by_display = None
if plan.get("generated_by_sub"):
generated_by_display = sub_display.get(plan["generated_by_sub"])
return render_template(
"plan.html",
week_start=plan["week_start"],
plan=plan,
locked_by_display=locked_by_display,
generated_by_display=generated_by_display,
sub_display=sub_display,
current_user_sub=u["sub"],
pick_count=pick_count,
active="plan",
)
@app.post("/api/plan/lock")
@require_session
def plan_lock():
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
today = date.today()
this_monday = monday_of(today)
plan = db.get_or_create_plan(hid, this_monday)
if plan.get("locked_at"):
return jsonify({"ok": False, "already_locked": True, "by": plan.get("locked_by_sub")})
updated = db.lock_plan(plan["id"], sub=u["sub"], reason="user")
return jsonify({"ok": True, "locked_at": updated["locked_at"].isoformat() if updated["locked_at"] else None})
@app.post("/api/plan/generate")
@require_session
def plan_generate():
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
today = date.today()
this_monday = monday_of(today)
plan = db.get_or_create_plan(hid, this_monday)
if plan.get("locked_at"):
return jsonify({
"error": "plan_locked",
"locked_by_sub": plan.get("locked_by_sub"),
"locked_at": plan["locked_at"].isoformat() if plan.get("locked_at") else None,
}), 409
# Idempotency / race: if slots already exist, return them. Two
# concurrent generators end up with the first writer's slots; the
# second sees them and returns 409 with the existing plan.
existing = db.list_plan_slots(plan["id"])
if existing:
db.enrich_plan_with_slots(plan)
return jsonify({
"error": "plan_already_generated",
"plan": _plan_payload(plan),
}), 409
# Pull picks (with picker_subs) + recipe pool (slug+name+tags only)
picks = db.list_household_picks_with_pickers(hid)
rows = db.list_indexed_recipes(hid, limit=2000, offset=0)
recipes = []
for r in rows:
tags = []
raw = r.get("raw_json")
if isinstance(raw, str):
try:
raw = _json_loads(raw)
except Exception:
raw = None
if isinstance(raw, dict):
tags = raw.get("tags") or []
recipes.append({"slug": r["slug"], "name": r["name"], "tags": tags})
if not recipes:
return jsonify({"error": "no_recipes_indexed"}), 409
try:
slots = forge.generate_plan(
picks=picks, recipes=recipes,
slots=7, week_start=this_monday.isoformat(),
)
except ForgeError as e:
return jsonify({"error": "forge_failed", "detail": str(e)}), 502
inserted = db.save_plan_slots(plan["id"], slots)
if inserted == 0:
# Race lost — re-read and return the winner's plan
db.enrich_plan_with_slots(plan)
return jsonify({
"error": "plan_already_generated",
"plan": _plan_payload(plan),
}), 409
plan = db.mark_plan_generated(plan["id"], u["sub"])
db.enrich_plan_with_slots(plan)
return jsonify({"ok": True, "plan": _plan_payload(plan)})
@app.post("/api/plan/regenerate")
@require_session
def plan_regenerate():
"""Re-roll: only the original generator can do this, only before
lock. Wipes slots + pick_points for this plan, then reuses the
generate path. Defensive — returns 409 on lock or wrong owner."""
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
today = date.today()
this_monday = monday_of(today)
plan = db.get_or_create_plan(hid, this_monday)
if plan.get("locked_at"):
return jsonify({"error": "plan_locked"}), 409
if not plan.get("generated_at"):
# Nothing to re-roll — caller probably wanted plain /generate
return jsonify({"error": "plan_not_generated"}), 409
if plan.get("generated_by_sub") != u["sub"]:
return jsonify({"error": "not_generator"}), 403
db.delete_plan_slots(plan["id"])
db.clear_plan_generated(plan["id"])
# Now fall through to the same logic as generate
picks = db.list_household_picks_with_pickers(hid)
rows = db.list_indexed_recipes(hid, limit=2000, offset=0)
recipes = []
for r in rows:
tags = []
raw = r.get("raw_json")
if isinstance(raw, str):
try:
raw = _json_loads(raw)
except Exception:
raw = None
if isinstance(raw, dict):
tags = raw.get("tags") or []
recipes.append({"slug": r["slug"], "name": r["name"], "tags": tags})
if not recipes:
return jsonify({"error": "no_recipes_indexed"}), 409
try:
slots = forge.generate_plan(
picks=picks, recipes=recipes,
slots=7, week_start=this_monday.isoformat(),
)
except ForgeError as e:
return jsonify({"error": "forge_failed", "detail": str(e)}), 502
db.save_plan_slots(plan["id"], slots)
plan = db.mark_plan_generated(plan["id"], u["sub"])
db.enrich_plan_with_slots(plan)
return jsonify({"ok": True, "plan": _plan_payload(plan)})
@app.get("/list")
@require_session
def list_view():
u = session["user"]
hid = current_household_id()
if not hid:
return redirect(url_for("connect_mealie_get"))
today = date.today()
this_monday = monday_of(today)
plan = db.get_or_create_plan(hid, this_monday)
db.enrich_plan_with_slots(plan)
if not plan.get("slots"):
return render_template(
"list.html",
plan=plan, lines=[], active="list",
empty_reason="no_plan",
missing_recipes=[],
)
client = current_user_mealie()
if not client:
return redirect(url_for("connect_mealie_get"))
raw_ings: list[aggregator.Ingredient] = []
missing: list[str] = []
for s in plan["slots"]:
try:
recipe = client.get_recipe(s["recipe_slug"])
except Exception:
missing.append(s["recipe_slug"])
continue
for ri in recipe.get("recipeIngredient", []) or []:
qty = ri.get("quantity")
unit_obj = ri.get("unit") or {}
unit = (unit_obj.get("name") if isinstance(unit_obj, dict) else "") or ""
food_obj = ri.get("food") or {}
food_name = (food_obj.get("name") if isinstance(food_obj, dict) else "") or ""
food_id = (food_obj.get("id") if isinstance(food_obj, dict) else None) or None
note = ri.get("note") or ""
if not food_name and not note:
continue
raw_ings.append(aggregator.Ingredient(
qty=float(qty) if qty not in (None, "") else None,
unit=unit,
food_name=food_name or note,
mealie_food_id=food_id,
note=note if food_name else None,
source_recipe_slug=s["recipe_slug"],
original_text=ri.get("display") or _ing_render(qty, unit, food_name, note),
))
# foods_lookup is now id-keyed: takes (food_name, mealie_food_id),
# primary lookup is by Mealie's UUID via cauldron_food_metadata.
# On a miss with a known food_id, calls clawdforge and persists.
# When food_id is missing (ingredient still in note form), returns
# None — the aggregator will fall back to name-based grouping.
lookup_cache: dict[str, dict | None] = {}
def foods_lookup(name: str, food_id: str | None):
if not food_id:
return None
cache_key = food_id
if cache_key in lookup_cache:
return lookup_cache[cache_key]
meta = foods.get_metadata_by_food_id(db, food_id)
if meta:
# Normalize shape — aggregator expects canonical_name key
meta = {**meta, "canonical_name": meta.get("food_name") or (name or "").lower()}
lookup_cache[cache_key] = meta
return meta
# Miss — clawdforge fetch, persist by id
row = foods.fetch_and_persist(
db, mealie_food_id=food_id, food_name=name or "", forge=forge
)
if row:
row = {**row, "canonical_name": row.get("food_name") or (name or "").lower()}
else:
app.logger.warning("foods.fetch_and_persist failed for food_id=%s name=%r", food_id, name)
lookup_cache[cache_key] = row
return row
lines = aggregator.aggregate(raw_ings, foods_lookup)
return render_template(
"list.html",
plan=plan, lines=lines, active="list",
empty_reason=None,
missing_recipes=missing,
)
@app.get("/api/recipes/<slug>.json")
@require_session
def recipe_detail_json(slug: str):
client = current_user_mealie()
if not client:
return jsonify({"error": "not connected"}), 409
u = session["user"]
try:
recipe = client.get_recipe(slug)
except Exception as e:
return jsonify({"error": str(e)}), 502
hid = current_household_id()
recipe["picked"] = slug in db.list_household_pick_slugs(hid) if hid else False
# Mealie's web URL: <public>/g/<group-slug>/r/<recipe-slug>
group_slug = _user_group_slug(client)
mealie_url = (
f"{cfg.mealie_public_url}/g/{group_slug}/r/{slug}"
if group_slug
else f"{cfg.mealie_public_url}/recipe/{slug}"
)
return jsonify({
"recipe": recipe,
"public_url": cfg.mealie_public_url,
"mealie_url": mealie_url,
})
@app.get("/recipes/<slug>")
@require_session
def recipe_detail(slug: str):
client = current_user_mealie()
if not client:
return redirect(url_for("connect_mealie_get"))
u = session["user"]
try:
recipe = client.get_recipe(slug)
except Exception as e:
return (f"recipe load failed: {e}", 502)
picked = slug in db.list_meal_pick_slugs(u["sub"])
group_slug = _user_group_slug(client)
mealie_url = (
f"{cfg.mealie_public_url}/g/{group_slug}/r/{slug}"
if group_slug else f"{cfg.mealie_public_url}/recipe/{slug}"
)
return render_template(
"recipe_detail.html",
recipe=recipe,
public_url=cfg.mealie_public_url,
mealie_url=mealie_url,
picked=picked,
active="recipes",
)
# ---------- bulk sterilizer (Phase A1) ------------------------------
def _user_sterilizer() -> Sterilizer | None:
"""Build a Sterilizer bound to the current session user's Mealie
token. Returns None if the user hasn't connected Mealie yet."""
client = current_user_mealie()
if not client:
return None
return Sterilizer(mealie=client, forge=forge, model=cfg.default_model)
@app.get("/sterilize")
@require_session
def sterilize_page():
hid = current_household_id()
if not hid:
return redirect(url_for("connect_mealie_get"))
latest = db.latest_sterilize_job_for_household(hid)
return render_template(
"sterilize.html",
active="sterilize",
latest_job=latest,
)
@app.post("/api/sterilize/bulk-start")
@require_session
def sterilize_bulk_start():
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
# Block concurrent jobs per household — sterilize calls are billed
# against clawdforge time, and parallel jobs would race on writes
# to the same Mealie recipes.
active = db.running_sterilize_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
sterilizer = _user_sterilizer()
if sterilizer is None:
return redirect(url_for("connect_mealie_get"))
# Get an upper-bound recipe count for the progress UI. The thread
# will refine this with the true total once it's walked all pages.
try:
page1 = sterilizer.mealie.list_recipes(page=1, per_page=1)
except MealieError as e:
return jsonify({"error": "mealie_unreachable", "detail": str(e)}), 502
total = int(page1.get("total") or page1.get("totalItems") or 0)
job_id = db.create_sterilize_job(
household_id=hid, started_by_sub=u["sub"], total=total
)
bulk_sterilize.spawn_preview_thread(
db=db, job_id=job_id, sterilizer=sterilizer
)
return jsonify({"ok": True, "job_id": job_id, "total": total})
@app.get("/api/sterilize/bulk-status")
@require_session
def sterilize_bulk_status():
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.latest_sterilize_job_for_household(hid)
if not job:
return jsonify({"job": None})
return jsonify({"job": _job_payload(job)})
@app.get("/api/sterilize/bulk-jobs/<int:job_id>/proposals")
@require_session
def sterilize_bulk_proposals(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_sterilize_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
proposals = db.list_sterilize_proposals(job_id)
# Inflate proposal_json strings → dicts for the client
for p in proposals:
blob = p.get("proposal_json")
if isinstance(blob, str):
try:
p["proposal_json"] = _json_loads(blob)
except Exception:
p["proposal_json"] = None
return jsonify({
"job": _job_payload(job),
"proposals": proposals,
})
@app.post("/api/sterilize/bulk-apply/<int:job_id>")
@require_session
def sterilize_bulk_apply(job_id: int):
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_sterilize_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("review",):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
body = request.get_json(silent=True) or {}
approved_slugs = body.get("approved_slugs") or []
if not isinstance(approved_slugs, list):
return jsonify({"error": "approved_slugs must be a list"}), 400
approved_slugs = [str(s) for s in approved_slugs if isinstance(s, str)]
sterilizer = _user_sterilizer()
if sterilizer is None:
return redirect(url_for("connect_mealie_get"))
db.bulk_set_proposal_approvals(job_id, approved_slugs)
db.finalize_sterilize_job(job_id, state="applying")
bulk_sterilize.spawn_apply_thread(
db=db, job_id=job_id, sterilizer=sterilizer
)
return jsonify({"ok": True, "job_id": job_id, "approved_count": len(approved_slugs)})
@app.post("/api/sterilize/bulk-cancel/<int:job_id>")
@require_session
def sterilize_bulk_cancel(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_sterilize_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("running", "review", "applying"):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
db.finalize_sterilize_job(job_id, state="cancelled")
return jsonify({"ok": True})
# ---------- foods consolidator (Step 3) ------------------------------
@app.get("/consolidate")
@require_session
def consolidate_page():
hid = current_household_id()
if not hid:
return redirect(url_for("connect_mealie_get"))
latest = db.latest_consolidate_job_for_household(hid)
return render_template(
"consolidate.html", active="consolidate", latest_job=latest,
)
@app.post("/api/foods/consolidate-start")
@require_session
def consolidate_start():
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
active = db.running_consolidate_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
client = current_user_mealie()
if client is None:
return redirect(url_for("connect_mealie_get"))
job_id = db.create_consolidate_job(household_id=hid, started_by_sub=u["sub"])
consolidate_foods.spawn_walk_thread(
db=db, job_id=job_id, mealie=client, forge=forge,
)
return jsonify({"ok": True, "job_id": job_id})
@app.get("/api/foods/consolidate-status")
@require_session
def consolidate_status():
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.latest_consolidate_job_for_household(hid)
if not job:
return jsonify({"job": None})
return jsonify({"job": _consolidate_job_payload(job)})
@app.get("/api/foods/consolidate-jobs/<int:job_id>/proposals")
@require_session
def consolidate_proposals(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_consolidate_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
rows = db.list_consolidate_proposals(job_id)
for p in rows:
for k in ("cluster_json", "sonnet_decision"):
v = p.get(k)
if isinstance(v, str):
try:
p[k] = _json_loads(v)
except Exception:
p[k] = None
return jsonify({
"job": _consolidate_job_payload(job),
"proposals": rows,
})
@app.post("/api/foods/consolidate-apply/<int:job_id>")
@require_session
def consolidate_apply(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_consolidate_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] != "review":
return jsonify({"error": f"bad_state:{job['state']}"}), 409
body = request.get_json(silent=True) or {}
approved_ids_raw = body.get("approved_ids") or []
approved_ids = [int(x) for x in approved_ids_raw if isinstance(x, (int, str)) and str(x).isdigit()]
client = current_user_mealie()
if client is None:
return redirect(url_for("connect_mealie_get"))
db.bulk_set_consolidate_approvals(job_id, approved_ids)
db.finalize_consolidate_job(job_id, state="applying")
consolidate_foods.spawn_apply_thread(db=db, job_id=job_id, mealie=client)
return jsonify({"ok": True, "approved_count": len(approved_ids)})
@app.post("/api/foods/consolidate-cancel/<int:job_id>")
@require_session
def consolidate_cancel(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_consolidate_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("running", "review", "applying"):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
db.finalize_consolidate_job(job_id, state="cancelled")
return jsonify({"ok": True})
# admin variants for kayos kick-off
@app.post("/api/admin/foods/consolidate-start")
@require_bearer
def admin_consolidate_start():
body = request.get_json(silent=True) or {}
sub = (body.get("started_by_sub") or "").strip()
if not sub:
return jsonify({"error": "started_by_sub required"}), 400
hid = db.get_user_household_id(sub)
if not hid:
return jsonify({"error": "user has no household"}), 404
active = db.running_consolidate_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
blob = db.get_user_mealie_token_blob(sub)
if not blob:
return jsonify({"error": "user_not_connected_to_mealie"}), 409
try:
tok = crypto.decrypt(blob)
except Exception:
return jsonify({"error": "user_token_undecryptable"}), 500
mealie = Mealie(base_url=cfg.mealie_api_url, api_token=tok)
job_id = db.create_consolidate_job(household_id=hid, started_by_sub=sub)
consolidate_foods.spawn_walk_thread(db=db, job_id=job_id, mealie=mealie, forge=forge)
return jsonify({"ok": True, "job_id": job_id})
@app.get("/api/admin/foods/consolidate-jobs/<int:job_id>")
@require_bearer
def admin_consolidate_status(job_id: int):
job = db.get_consolidate_job(job_id)
if not job:
return jsonify({"error": "not_found"}), 404
return jsonify({"job": _consolidate_job_payload(job)})
@app.post("/api/admin/foods/consolidate-cancel/<int:job_id>")
@require_bearer
def admin_consolidate_cancel(job_id: int):
job = db.get_consolidate_job(job_id)
if not job:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("running", "review", "applying"):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
db.finalize_consolidate_job(job_id, state="cancelled")
return jsonify({"ok": True})
# ---------- admin sterilizer (bearer-auth, kick off on user's behalf) -
@app.post("/api/admin/sterilize/bulk-start")
@require_bearer
def admin_sterilize_bulk_start():
"""Bearer-authed alternate to /api/sterilize/bulk-start. Body:
{"started_by_sub": "cobb@sulkta.com"}
Resolves that user's household + decrypts their stored Mealie
token + spawns a preview thread. Lets cauldron operators kick
off bulk runs without needing a Flask session — same job state
and proposals the user will see in /sterilize."""
body = request.get_json(silent=True) or {}
sub = (body.get("started_by_sub") or "").strip()
if not sub:
return jsonify({"error": "started_by_sub required"}), 400
hid = db.get_user_household_id(sub)
if not hid:
return jsonify({"error": "user has no household"}), 404
active = db.running_sterilize_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
blob = db.get_user_mealie_token_blob(sub)
if not blob:
return jsonify({"error": "user_not_connected_to_mealie"}), 409
try:
tok = crypto.decrypt(blob)
except Exception:
return jsonify({"error": "user_token_undecryptable"}), 500
mealie = Mealie(base_url=cfg.mealie_api_url, api_token=tok)
sterilizer = Sterilizer(mealie=mealie, forge=forge, model=cfg.default_model)
try:
page1 = sterilizer.mealie.list_recipes(page=1, per_page=1)
except MealieError as e:
return jsonify({"error": "mealie_unreachable", "detail": str(e)}), 502
total = int(page1.get("total") or page1.get("totalItems") or 0)
job_id = db.create_sterilize_job(
household_id=hid, started_by_sub=sub, total=total
)
bulk_sterilize.spawn_preview_thread(
db=db, job_id=job_id, sterilizer=sterilizer
)
return jsonify({"ok": True, "job_id": job_id, "total": total})
@app.get("/api/admin/sterilize/jobs/<int:job_id>")
@require_bearer
def admin_sterilize_job_status(job_id: int):
"""Bearer-authed read of any job's state — for poll-from-outside."""
job = db.get_sterilize_job(job_id)
if not job:
return jsonify({"error": "not_found"}), 404
return jsonify({"job": _job_payload(job)})
@app.post("/api/admin/sterilize/bulk-cancel/<int:job_id>")
@require_bearer
def admin_sterilize_bulk_cancel(job_id: int):
job = db.get_sterilize_job(job_id)
if not job:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("running", "review", "applying"):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
db.finalize_sterilize_job(job_id, state="cancelled")
return jsonify({"ok": True})
# ---------- v0.1 admin endpoints (carry over) ------------------------
@app.get("/api/recipes")
@require_bearer
def list_recipes_api():
page = int(request.args.get("page", "1"))
per_page = min(int(request.args.get("per_page", "50")), 200)
return jsonify(system_mealie.list_recipes(page=page, per_page=per_page))
@app.post("/api/sterilize/preview/<slug>")
@require_bearer
def sterilize_preview(slug: str):
try:
return jsonify(system_sterilizer.preview_recipe(slug))
except Exception as e:
return jsonify({"error": str(e)}), 502
@app.post("/api/sterilize/apply/<slug>")
@require_bearer
def sterilize_apply(slug: str):
create_missing = request.args.get("create_missing", "true").lower() == "true"
try:
return jsonify(system_sterilizer.apply_recipe(slug, create_missing=create_missing))
except Exception as e:
return jsonify({"error": str(e)}), 502
return app
def _user_group_slug(client) -> str | None:
"""Mealie's recipe permalink lives at /g/<group-slug>/r/<slug>. Pull
the group slug from /api/users/self. Cheap call (Mealie is on the
same docker bridge); could cache in session if it becomes hot."""
try:
me = client.who_am_i()
except Exception:
return None
g = me.get("group")
if isinstance(g, dict):
return g.get("slug") or g.get("name")
if isinstance(g, str) and g:
return g
return me.get("groupSlug") or me.get("group_slug") or me.get("groupName")
def _sort_to_order(sort: str) -> tuple[str, str]:
"""Map our sort keys to Mealie's orderBy + direction."""
return {
"newest": ("created_at", "desc"),
"oldest": ("created_at", "asc"),
"az": ("name", "asc"),
"za": ("name", "desc"),
"made": ("last_made", "desc"),
"updated": ("updated_at", "desc"),
}.get(sort, ("created_at", "desc"))
def _sort_to_local_order(sort: str) -> tuple[str, str]:
"""Same set, but mapped to our cauldron_recipe_index columns."""
return {
"newest": ("date_added", "desc"),
"oldest": ("date_added", "asc"),
"az": ("name", "asc"),
"za": ("name", "desc"),
"made": ("last_made", "desc"),
"updated": ("date_updated", "desc"),
}.get(sort, ("date_added", "desc"))
_INDEX_TTL_SECS = 5 * 60
def _index_stale(state: dict | None) -> bool:
if not state:
return True
last = state.get("last_refreshed_at")
if not last:
return True
from datetime import datetime
age = (datetime.utcnow() - last).total_seconds()
return age > _INDEX_TTL_SECS
def _index_row_to_card(row: dict, pick_slugs: set[str]) -> dict:
"""Index row → recipe card dict the JS frontend expects (matches the
Mealie recipe shape closely enough for renderCard)."""
import json as _json
raw = row.get("raw_json")
if isinstance(raw, str):
try:
raw = _json.loads(raw)
except Exception:
raw = {}
raw = raw or {}
return {
"slug": row["slug"],
"name": row["name"],
"totalTime": row.get("total_time") or raw.get("totalTime"),
"recipeYield": row.get("recipe_yield") or raw.get("recipeYield"),
"dateUpdated": (raw.get("dateUpdated") if raw else None) or (row["date_updated"].isoformat() if row.get("date_updated") else None),
"tags": raw.get("tags") or [],
"picked": row["slug"] in pick_slugs,
}
def _const_eq(a: str, b: str) -> bool:
if len(a) != len(b):
return False
diff = 0
for x, y in zip(a.encode(), b.encode()):
diff |= x ^ y
return diff == 0
def _json_loads(s):
import json as _j
return _j.loads(s)
def _resolve_sub_displays(db, plan: dict) -> dict[str, str]:
"""Return {sub: display_name} for every sub referenced by this plan's
slots + locked_by + generated_by. One DB round-trip; small set."""
subs: set[str] = set()
if plan.get("locked_by_sub"):
subs.add(plan["locked_by_sub"])
if plan.get("generated_by_sub"):
subs.add(plan["generated_by_sub"])
for s in plan.get("slots") or []:
for sub in s.get("picker_subs") or []:
if sub:
subs.add(sub)
if not subs:
return {}
placeholders = ", ".join(["%s"] * len(subs))
out: dict[str, str] = {}
with db.conn() as c, c.cursor() as cur:
cur.execute(
f"SELECT authentik_sub, display_name, email FROM cauldron_users "
f"WHERE authentik_sub IN ({placeholders})",
tuple(subs),
)
for r in cur.fetchall():
sub = r["authentik_sub"]
disp = r.get("display_name")
if not disp:
disp = (r.get("email") or "").split("@")[0]
out[sub] = disp or sub
return out
def _job_payload(job: dict) -> dict:
"""JSON-serializable view of a sterilize job row (datetimes → iso)."""
j = dict(job)
for k in ("started_at", "last_progress_at", "finished_at"):
v = j.get(k)
if v is not None and hasattr(v, "isoformat"):
j[k] = v.isoformat()
return j
def _consolidate_job_payload(job: dict) -> dict:
"""Same shape as _job_payload — kept separate for clarity since the
consolidate job has different counter columns than sterilize."""
j = dict(job)
for k in ("started_at", "last_progress_at", "finished_at"):
v = j.get(k)
if v is not None and hasattr(v, "isoformat"):
j[k] = v.isoformat()
return j
def _plan_payload(plan: dict) -> dict:
"""JSON-serializable view of a plan dict (datetimes → iso strings)."""
p = dict(plan)
for k in ("week_start", "generated_at", "locked_at"):
v = p.get(k)
if v is not None and hasattr(v, "isoformat"):
p[k] = v.isoformat()
slots = p.get("slots") or []
out_slots = []
for s in slots:
s2 = dict(s)
ca = s2.get("created_at")
if ca is not None and hasattr(ca, "isoformat"):
s2["created_at"] = ca.isoformat()
out_slots.append(s2)
p["slots"] = out_slots
return p
def _ing_render(qty, unit, food_name, note) -> str:
"""Tiny fallback for `display` when Mealie didn't render the line."""
parts: list[str] = []
if qty not in (None, ""):
parts.append(str(qty))
if unit:
parts.append(unit)
if food_name:
parts.append(food_name)
if note and not food_name:
parts.append(note)
return " ".join(parts).strip()
# gunicorn entrypoint
app = create_app()