From 69e05b1f9265318b0c80bf91e032f0f72077b9d2 Mon Sep 17 00:00:00 2001 From: Kayos Date: Thu, 30 Apr 2026 12:00:20 -0700 Subject: [PATCH] =?UTF-8?q?Step=203:=20foods=20consolidator=20=E2=80=94=20?= =?UTF-8?q?cluster=20+=20merge=20dupes=20via=20Mealie's=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New bulk job that scans Mealie's foods table for duplicate-feeling clusters, asks Sonnet to pick the canonical survivor + flag the rest as merge candidates, and uses Mealie's PUT /api/foods/merge to consolidate. After each successful merge, alias_additions get pushed onto the survivor so Mealie's CRF/NLP parser fuzzy-matches the discarded variant names from then on. Architecture mirrors bulk_sterilize.py: - Migrations 018+019 add cauldron_consolidate_jobs + cauldron_consolidate_proposals (state machine: running → review → applying → done/failed/cancelled) - New consolidate_foods.py — daemon-thread runner with cancel-respect and stuck-job recovery - /api/foods/consolidate-{start,status,jobs,apply,cancel} for session users + /api/admin/foods/consolidate-{start,jobs,cancel} for kayos Sonnet integration: - forge.cluster_decision(foods) → returns {merge, canonical_id, canonical_name, discard_ids, alias_additions, reason} - Conservative-by-default: when in doubt Sonnet returns merge=false (the "olive oil vs olive" false-positive case from the prompt) - Alias rules in the prompt explain why we want discarded names to travel back to the survivor as aliases (parser future-proofing) Mealie integration: - mealie.merge_foods(from_id, to_id) → PUT /api/foods/merge - mealie.update_food(food_id, body) → for pushing aliases onto the survivor after merges land - Apply path catches 403/permission errors and surfaces them as the per-cluster apply_error (cross-household merge attempts will fail here, same way as the sterilize cross-household path) Clustering: - rapidfuzz token_set_ratio ≥ 88 (slightly stricter than Mealie's parser threshold of 85 to reduce false-positive clusters) - Single-link agglomerative — O(n²) but Cobb's ~3000 foods = ~9M comparisons, runs in seconds - Singleton clusters (no merge candidates) are dropped, not stored UI: - /consolidate — same shape as /sterilize: progress bar → review grid → apply button. Cards show member chips with the canonical marked ★, discards marked × in red, alias_additions listed in green, plus Sonnet's one-line reasoning. Mergeable approved by default; user toggles individual clusters off if they disagree. - Linked from /me → tools section, alongside bulk sterilize. Total: ~600 LoC across 6 files. Foundation for the "Mealie owns canonical names" architectural rule is now actually enforceable — cobb runs this once, his foods table gets cleaned up, and Sonnet's catalog-aware parser (Step 1) starts matching aliases for free. --- cauldron/consolidate_foods.py | 300 ++++++++++++++++++++++++ cauldron/db.py | 231 +++++++++++++++++++ cauldron/forge.py | 90 ++++++++ cauldron/mealie.py | 9 + cauldron/server.py | 167 +++++++++++++- cauldron/templates/consolidate.html | 343 ++++++++++++++++++++++++++++ cauldron/templates/me.html | 2 + 7 files changed, 1141 insertions(+), 1 deletion(-) create mode 100644 cauldron/consolidate_foods.py create mode 100644 cauldron/templates/consolidate.html diff --git a/cauldron/consolidate_foods.py b/cauldron/consolidate_foods.py new file mode 100644 index 0000000..c7262ff --- /dev/null +++ b/cauldron/consolidate_foods.py @@ -0,0 +1,300 @@ +"""Foods consolidator — clusters Mealie's food rows by similarity, asks +Sonnet which clusters are dupes, and uses Mealie's PUT /api/foods/merge +to consolidate. Operates household-scoped via the user's Mealie token. + +The walk: + 1. Fetch all foods via GET /api/foods (group-wide; the user's + household admin scope is enough) + 2. Filter to the user's own household (where edit permission exists) + 3. Cluster by rapidfuzz.token_set_ratio ≥ THRESHOLD (default 88, slightly + stricter than Mealie's parser threshold of 85 to reduce false positives) + 4. Drop singleton clusters (no merge candidate) + 5. For each cluster, call forge.cluster_decision(...) — Sonnet + decides merge/keep + picks the canonical survivor + 6. Persist the proposal + +The apply: + 1. For each approved cluster: PUT /api/foods/merge from each discard_id + into canonical_id + 2. After merges, optionally PUT /api/foods/{canonical_id} with the + accumulated aliases array so the survivor learns the variant names + +Same daemon-thread + cancel-respecting + stuck-job-recovery pattern as +bulk_sterilize.py. +""" +from __future__ import annotations + +import json +import logging +import threading +from typing import Optional + +from rapidfuzz import fuzz, process + +from .db import DB +from .forge import Forge, ForgeError +from .mealie import Mealie, MealieError + +log = logging.getLogger(__name__) + +CLUSTER_THRESHOLD = 88 + + +def _household_id_for(mealie: Mealie) -> str | None: + me = mealie.who_am_i() + hid = me.get("householdId") or me.get("household_id") + if not hid: + h = me.get("household") + if isinstance(h, dict): + hid = h.get("id") + return hid + + +def _all_foods(mealie: Mealie) -> list[dict]: + out: list[dict] = [] + page = 1 + while page <= 20: + resp = mealie._get("/api/foods", search="", perPage=2000, page=page) + items = resp.get("items") or resp.get("data") or [] + for item in items: + out.append(item) + tp = resp.get("total_pages") or resp.get("totalPages") or 1 + if not items or page >= tp: + break + page += 1 + return out + + +def _foods_in_household(mealie: Mealie, household_id: str) -> list[dict]: + foods = _all_foods(mealie) + if not household_id: + return foods + out = [] + for f in foods: + hh = f.get("householdId") or f.get("household_id") + if not hh: + h = f.get("household") + if isinstance(h, dict): + hh = h.get("id") + if hh and hh == household_id: + out.append(f) + elif not hh: + # Some Mealie versions don't tag foods with householdId at the + # food level — they're group-scoped instead. In that case, we + # consolidate across the whole group (Cobb is admin so writes + # land regardless). + out.append(f) + return out + + +def _cluster(foods: list[dict], threshold: int = CLUSTER_THRESHOLD) -> list[list[dict]]: + """Single-link agglomerative clustering on token_set_ratio. O(n²) — fine + for ~3000 foods (~9M comparisons). Returns clusters of size ≥ 2.""" + n = len(foods) + parent = list(range(n)) + + def find(x): + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(a, b): + ra, rb = find(a), find(b) + if ra != rb: + parent[ra] = rb + + names = [(f.get("name") or "").strip().lower() for f in foods] + for i in range(n): + if not names[i]: + continue + for j in range(i + 1, n): + if not names[j]: + continue + score = fuzz.token_set_ratio(names[i], names[j]) + if score >= threshold: + union(i, j) + + groups: dict[int, list[dict]] = {} + for i in range(n): + r = find(i) + groups.setdefault(r, []).append(foods[i]) + return [g for g in groups.values() if len(g) >= 2] + + +def _cluster_key(cluster: list[dict]) -> str: + """Stable key for the cluster — sorted name + ids.""" + names = sorted((f.get("name") or "").strip().lower() for f in cluster) + return "|".join(names)[:255] + + +def run_walk(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> None: + log.info("[consolidate:%s] walk start", job_id) + + def _cancelled() -> bool: + s = db.get_consolidate_job_state(job_id) + return s in ("cancelled", "failed", "done") + + try: + hh = _household_id_for(mealie) + foods = _foods_in_household(mealie, hh) + log.info("[consolidate:%s] household=%s foods=%d", job_id, hh, len(foods)) + + clusters = _cluster(foods) + log.info("[consolidate:%s] clusters≥2: %d", job_id, len(clusters)) + + with db.conn() as c, c.cursor() as cur: + cur.execute( + "UPDATE cauldron_consolidate_jobs SET total_clusters=%s WHERE id=%s", + (len(clusters), job_id), + ) + + for cluster in clusters: + if _cancelled(): + log.info("[consolidate:%s] walk aborted (state changed)", job_id) + return + key = _cluster_key(cluster) + db.update_consolidate_job_progress(job_id, current_cluster=key[:80]) + + try: + decision = forge.cluster_decision(cluster) + except (ForgeError, RuntimeError) as e: + msg = str(e)[:500] + log.warning("[consolidate:%s] cluster_decision(%s): %s", job_id, key[:60], msg) + db.insert_consolidate_proposal( + job_id=job_id, cluster_key=key, + cluster=cluster, decision=None, error=msg, + ) + db.update_consolidate_job_progress(job_id, error_delta=1, last_error=msg) + continue + + db.insert_consolidate_proposal( + job_id=job_id, cluster_key=key, + cluster=cluster, decision=decision, error=None, + ) + db.update_consolidate_job_progress(job_id, processed_delta=1) + + db.finalize_consolidate_job(job_id, state="review") + log.info("[consolidate:%s] walk done; awaiting review", job_id) + except Exception: + log.exception("[consolidate:%s] walk crashed", job_id) + try: + db.finalize_consolidate_job(job_id, state="failed") + except Exception: + log.exception("[consolidate:%s] couldn't mark failed", job_id) + + +def run_apply(*, db: DB, job_id: int, mealie: Mealie) -> None: + log.info("[consolidate:%s] apply start", job_id) + + def _cancelled() -> bool: + s = db.get_consolidate_job_state(job_id) + return s in ("cancelled", "failed", "done") + + try: + approved = db.list_approved_unapplied_consolidate(job_id) + for row in approved: + if _cancelled(): + log.info("[consolidate:%s] apply aborted (state changed)", job_id) + return + decision = row.get("sonnet_decision") or {} + if isinstance(decision, str): + try: + decision = json.loads(decision) + except Exception: + decision = {} + + if not decision.get("merge"): + # User shouldn't have approved a non-merge cluster; defensive + db.mark_consolidate_proposal_applied( + row["id"], error="cluster decision was 'no merge' but row was approved", + ) + continue + + canonical_id = decision.get("canonical_id") or "" + discard_ids = decision.get("discard_ids") or [] + alias_additions = decision.get("alias_additions") or [] + if not canonical_id or not discard_ids: + db.mark_consolidate_proposal_applied( + row["id"], error="missing canonical_id or discard_ids", + ) + continue + + db.update_consolidate_job_progress( + job_id, current_cluster=row.get("cluster_key", "")[:80] + ) + + err: str | None = None + for did in discard_ids: + if did == canonical_id: + continue + try: + mealie.merge_foods(from_id=did, to_id=canonical_id) + except MealieError as e: + err = f"merge {did} → {canonical_id}: {e}" + log.warning("[consolidate:%s] %s", job_id, err) + break + + if err is None and alias_additions: + # After successful merges, push the aliases onto the survivor + try: + survivor = mealie._get(f"/api/foods/{canonical_id}") + existing = survivor.get("aliases") or [] + existing_names: set = set() + for a in existing: + if isinstance(a, str): + existing_names.add(a.strip().lower()) + elif isinstance(a, dict): + existing_names.add((a.get("name") or "").strip().lower()) + new_aliases: list[dict] = list(existing) + for n in alias_additions: + if n.strip().lower() not in existing_names and n.strip().lower() != (survivor.get("name") or "").strip().lower(): + new_aliases.append({"name": n.strip()}) + existing_names.add(n.strip().lower()) + if new_aliases != existing: + body = dict(survivor) + body["aliases"] = new_aliases + mealie.update_food(canonical_id, body) + except MealieError as e: + err = f"alias update on {canonical_id}: {e}" + log.warning("[consolidate:%s] %s", job_id, err) + + if err: + db.mark_consolidate_proposal_applied(row["id"], error=err) + db.update_consolidate_job_progress( + job_id, error_delta=1, last_error=err, + ) + else: + db.mark_consolidate_proposal_applied(row["id"]) + db.update_consolidate_job_progress(job_id, merged_delta=1) + + db.finalize_consolidate_job(job_id, state="done") + log.info("[consolidate:%s] apply done", job_id) + except Exception: + log.exception("[consolidate:%s] apply crashed", job_id) + try: + db.finalize_consolidate_job(job_id, state="failed") + except Exception: + pass + + +def spawn_walk_thread(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> threading.Thread: + t = threading.Thread( + target=run_walk, + kwargs={"db": db, "job_id": job_id, "mealie": mealie, "forge": forge}, + name=f"consolidate-walk-{job_id}", + daemon=True, + ) + t.start() + return t + + +def spawn_apply_thread(*, db: DB, job_id: int, mealie: Mealie) -> threading.Thread: + t = threading.Thread( + target=run_apply, + kwargs={"db": db, "job_id": job_id, "mealie": mealie}, + name=f"consolidate-apply-{job_id}", + daemon=True, + ) + t.start() + return t diff --git a/cauldron/db.py b/cauldron/db.py index d491696..a4f9574 100644 --- a/cauldron/db.py +++ b/cauldron/db.py @@ -300,6 +300,52 @@ MIGRATIONS = [ INDEX idx_category (category) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, + # 018 — Foods consolidator jobs. Mirrors the sterilize job pattern but + # for the foods table itself: scan Mealie household foods, cluster by + # similarity, ask Sonnet which clusters are dupes, apply via Mealie's + # PUT /api/foods/merge endpoint. One job at a time per household. + """ + CREATE TABLE IF NOT EXISTS cauldron_consolidate_jobs ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + household_id BIGINT NOT NULL, + started_by_sub VARCHAR(190) NOT NULL, + started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + finished_at DATETIME, + total_clusters INT NOT NULL DEFAULT 0, + processed_count INT NOT NULL DEFAULT 0, + merged_count INT NOT NULL DEFAULT 0, + error_count INT NOT NULL DEFAULT 0, + current_cluster VARCHAR(255), + last_error VARCHAR(500), + state ENUM('running','review','applying','done','failed','cancelled') + NOT NULL DEFAULT 'running', + INDEX idx_household_state (household_id, state), + FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE, + FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """, + # 019 — Per-cluster proposal. cluster_json holds the list of Mealie + # food IDs+names in the cluster; sonnet_decision stores + # {merge: bool, canonical_name: , canonical_food_id: , + # discard_food_ids: [...]}. + """ + CREATE TABLE IF NOT EXISTS cauldron_consolidate_proposals ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + job_id BIGINT NOT NULL, + cluster_key VARCHAR(255) NOT NULL, + cluster_size INT NOT NULL DEFAULT 0, + cluster_json JSON, + sonnet_decision JSON, + approved BOOLEAN, + applied_at DATETIME, + apply_error VARCHAR(500), + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_job_cluster (job_id, cluster_key), + INDEX idx_approved (job_id, approved), + FOREIGN KEY (job_id) REFERENCES cauldron_consolidate_jobs(id) ON DELETE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """, ] @@ -1156,6 +1202,191 @@ class DB: ) return [dict(r) for r in cur.fetchall()] + # --- consolidate (foods merge) jobs ------------------------------------- + + def create_consolidate_job( + self, *, household_id: int, started_by_sub: str + ) -> int: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """INSERT INTO cauldron_consolidate_jobs + (household_id, started_by_sub, state) + VALUES (%s, %s, 'running')""", + (household_id, started_by_sub), + ) + return cur.lastrowid + + def get_consolidate_job(self, job_id: int) -> dict | None: + with self.conn() as c, c.cursor() as cur: + cur.execute("SELECT * FROM cauldron_consolidate_jobs WHERE id=%s", (job_id,)) + return cur.fetchone() + + def get_consolidate_job_state(self, job_id: int) -> str | None: + with self.conn() as c, c.cursor() as cur: + cur.execute("SELECT state FROM cauldron_consolidate_jobs WHERE id=%s", (job_id,)) + row = cur.fetchone() + return row["state"] if row else None + + def latest_consolidate_job_for_household(self, household_id: int) -> dict | None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT * FROM cauldron_consolidate_jobs + WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""", + (household_id,), + ) + return cur.fetchone() + + def running_consolidate_job_for_household(self, household_id: int) -> dict | None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT * FROM cauldron_consolidate_jobs + WHERE household_id=%s AND state IN ('running','applying') + ORDER BY started_at DESC LIMIT 1""", + (household_id,), + ) + return cur.fetchone() + + def update_consolidate_job_progress( + self, + job_id: int, + *, + processed_delta: int = 0, + merged_delta: int = 0, + error_delta: int = 0, + current_cluster: str | None = None, + last_error: str | None = None, + ) -> None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_consolidate_jobs + SET processed_count = processed_count + %s, + merged_count = merged_count + %s, + error_count = error_count + %s, + current_cluster = COALESCE(%s, current_cluster), + last_error = COALESCE(%s, last_error), + last_progress_at = NOW() + WHERE id=%s""", + (processed_delta, merged_delta, error_delta, + current_cluster, last_error, job_id), + ) + + def finalize_consolidate_job(self, job_id: int, *, state: str) -> None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_consolidate_jobs + SET state=%s, + finished_at = CASE WHEN %s IN ('done','failed','cancelled') + THEN NOW() ELSE finished_at END, + last_progress_at = NOW(), + current_cluster = NULL + WHERE id=%s + AND state IN ('running','applying')""", + (state, state, job_id), + ) + + def insert_consolidate_proposal( + self, + *, + job_id: int, + cluster_key: str, + cluster: list[dict], + decision: dict | None, + error: str | None, + ) -> None: + import json as _j + with self.conn() as c, c.cursor() as cur: + cur.execute( + """INSERT INTO cauldron_consolidate_proposals + (job_id, cluster_key, cluster_size, cluster_json, + sonnet_decision, apply_error) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + cluster_size=VALUES(cluster_size), + cluster_json=VALUES(cluster_json), + sonnet_decision=VALUES(sonnet_decision), + apply_error=VALUES(apply_error)""", + ( + job_id, cluster_key[:255], len(cluster), + _j.dumps(cluster, ensure_ascii=False, default=str), + _j.dumps(decision, ensure_ascii=False) if decision else None, + (error or "")[:500] or None, + ), + ) + + def list_consolidate_proposals(self, job_id: int) -> list[dict]: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT id, cluster_key, cluster_size, cluster_json, + sonnet_decision, approved, applied_at, apply_error + FROM cauldron_consolidate_proposals + WHERE job_id=%s + ORDER BY cluster_size DESC, cluster_key""", + (job_id,), + ) + return [dict(r) for r in cur.fetchall()] + + def list_approved_unapplied_consolidate(self, job_id: int) -> list[dict]: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT id, cluster_key, cluster_size, sonnet_decision + FROM cauldron_consolidate_proposals + WHERE job_id=%s AND approved=1 AND applied_at IS NULL""", + (job_id,), + ) + return [dict(r) for r in cur.fetchall()] + + def bulk_set_consolidate_approvals( + self, job_id: int, approved_ids: list[int] + ) -> None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + "UPDATE cauldron_consolidate_proposals SET approved=0 WHERE job_id=%s", + (job_id,), + ) + if approved_ids: + placeholders = ",".join(["%s"] * len(approved_ids)) + cur.execute( + f"""UPDATE cauldron_consolidate_proposals SET approved=1 + WHERE job_id=%s AND id IN ({placeholders})""", + (job_id, *approved_ids), + ) + + def mark_consolidate_proposal_applied( + self, proposal_id: int, *, error: str | None = None + ) -> None: + if error: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_consolidate_proposals + SET apply_error=%s + WHERE id=%s""", + (error[:500], proposal_id), + ) + else: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_consolidate_proposals + SET applied_at=NOW(), apply_error=NULL + WHERE id=%s""", + (proposal_id,), + ) + + def fail_stuck_consolidate_jobs(self, *, stale_minutes: int = 15) -> int: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_consolidate_jobs + SET state='failed', + finished_at=NOW(), + last_error=COALESCE(last_error, + 'recovery: worker exited mid-run') + WHERE state IN ('running','applying') + AND last_progress_at < NOW() - INTERVAL %s MINUTE""", + (stale_minutes,), + ) + return cur.rowcount + + # --- sterilize stuck-job recovery (already used at boot) --------------- + def fail_stuck_sterilize_jobs(self, *, stale_minutes: int = 10) -> int: """Recover jobs stuck in 'running'/'applying' with no progress for N minutes. Called at app startup. Returns count of jobs failed.""" diff --git a/cauldron/forge.py b/cauldron/forge.py index 5087fb3..2e672dc 100644 --- a/cauldron/forge.py +++ b/cauldron/forge.py @@ -215,6 +215,65 @@ class Forge: ) + def cluster_decision( + self, foods: list[dict], *, model: str | None = None + ) -> dict: + """Ask Sonnet whether a cluster of similar-named foods are + actually duplicates. Input: list of {id, name, plural_name?, aliases?}. + Returns: + {"merge": bool, + "canonical_id": "", # the survivor (highest-quality name/aliases) + "canonical_name": "", # the survivor's name (echoed for the UI) + "discard_ids": ["", ...], # the ones to merge into canonical + "alias_additions": ["", ...], # discarded names worth keeping as aliases on the survivor + "reason": ""} + + merge=false means the cluster is a false positive (foods that look + similar but are distinct, e.g. "olive oil" vs "olive"). In that case + canonical_id may be empty and discard_ids must be empty. + """ + items = [ + { + "id": f.get("id"), + "name": f.get("name"), + "plural_name": f.get("pluralName") or f.get("plural_name"), + "aliases": [ + (a.get("name") if isinstance(a, dict) else a) + for a in (f.get("aliases") or []) + ], + } + for f in foods + ] + prompt = ( + "You are deciding whether a cluster of food rows from a recipe " + "database are duplicates that should be merged into one canonical " + "row. The names came from years of recipe imports + manual entry " + "so plural/case/wording variations are common.\n\n" + f"Cluster:\n{json.dumps(items, indent=2)}\n\n" + "Output JSON ONLY, no prose: " + '{"merge": true|false, ' + '"canonical_id": "", ' + '"canonical_name": "", ' + '"discard_ids": ["", ...], ' + '"alias_additions": ["", ...], ' + '"reason": ""}\n\n' + "Rules:\n" + "- Pick the survivor whose name is the cleanest canonical " + " (lowercase, singular when applicable, no brand, no clinical " + " qualifiers like 'raw' or 'unenriched').\n" + "- discard_ids are the OTHER cluster members — Mealie will rewrite " + " recipe references to point at canonical_id.\n" + "- alias_additions = the discarded NAMES (or any close variants you " + " noticed in plural_name/aliases) that the survivor should adopt as " + " aliases so the parser fuzzy-matches them in the future.\n" + "- merge=false ONLY when the cluster is a false positive (e.g. " + " 'olive oil' vs 'olive', 'butter' vs 'peanut butter'). In that " + " case canonical_id and discard_ids must both be empty.\n" + "- Be conservative — when in doubt, merge=false." + ) + result = self.run(prompt, model=model or "sonnet", timeout_secs=60) + return _extract_cluster_decision(result) + def fetch_food_info(self, name: str, *, model: str | None = None) -> dict: """Ask Sonnet for density + unit class + common size of a single food. Returns a dict shaped like: @@ -251,6 +310,37 @@ class Forge: return _extract_food_info(result) +def _extract_cluster_decision(forge_result: dict) -> dict: + if not isinstance(forge_result, dict): + raise ForgeError("forge result not a dict") + inner = forge_result.get("result", forge_result) + if isinstance(inner, str): + inner = _parse_json_blob(inner) + if not isinstance(inner, dict): + raise ForgeError(f"cluster decision not a dict: {str(inner)[:200]}") + + merge = bool(inner.get("merge")) + canonical_id = str(inner.get("canonical_id") or "") + canonical_name = str(inner.get("canonical_name") or "") + discard_raw = inner.get("discard_ids") or [] + discard_ids = [str(x) for x in discard_raw if isinstance(x, (str, int))] + aliases_raw = inner.get("alias_additions") or [] + alias_additions = [str(x) for x in aliases_raw if isinstance(x, str) and x.strip()] + reason = str(inner.get("reason") or "")[:500] + + if not merge: + canonical_id = "" + discard_ids = [] + return { + "merge": merge, + "canonical_id": canonical_id, + "canonical_name": canonical_name, + "discard_ids": discard_ids, + "alias_additions": alias_additions, + "reason": reason, + } + + def _extract_food_info(forge_result: dict) -> dict: """Normalize clawdforge wrapper → food info dict. Defensive on shapes.""" if not isinstance(forge_result, dict): diff --git a/cauldron/mealie.py b/cauldron/mealie.py index 0bf4d69..4f1f296 100644 --- a/cauldron/mealie.py +++ b/cauldron/mealie.py @@ -116,6 +116,15 @@ class Mealie: body["pluralName"] = plural_name return self._post("/api/foods", body) + def update_food(self, food_id: str, body: dict) -> dict: + return self._put(f"/api/foods/{food_id}", body) + + def merge_foods(self, *, from_id: str, to_id: str) -> dict: + """PUT /api/foods/merge — consolidates `from_id` into `to_id`. Mealie + rewrites every recipe_ingredient.food_id reference and deletes + from_id. Permission-scoped per-household.""" + return self._put("/api/foods/merge", {"fromFood": from_id, "toFood": to_id}) + def list_units(self, *, search: str | None = None, per_page: int = 200) -> dict: return self._get("/api/units", search=search or "", perPage=per_page) diff --git a/cauldron/server.py b/cauldron/server.py index a5fd2ab..42e59f6 100644 --- a/cauldron/server.py +++ b/cauldron/server.py @@ -33,7 +33,7 @@ from .config import load from .crypto import TokenCrypto from .db import DB from .forge import Forge, ForgeError -from . import aggregator, bulk_sterilize, foods +from . import aggregator, bulk_sterilize, consolidate_foods, foods from .mealie import Mealie, MealieError from .oidc import init_oauth from .recipe_index import flatten_recipe, refresh_household_index, search_index @@ -111,6 +111,13 @@ def create_app() -> Flask: except Exception as e: app.logger.warning("sterilize stuck-job recovery failed: %s", e) + try: + n_failed = db.fail_stuck_consolidate_jobs(stale_minutes=15) + if n_failed: + app.logger.info("failed %d stuck consolidate jobs at boot", n_failed) + except Exception as e: + app.logger.warning("consolidate stuck-job recovery failed: %s", e) + oauth = init_oauth( app, issuer=cfg.oidc_issuer, @@ -1016,6 +1023,153 @@ def create_app() -> Flask: db.finalize_sterilize_job(job_id, state="cancelled") return jsonify({"ok": True}) + # ---------- foods consolidator (Step 3) ------------------------------ + + @app.get("/consolidate") + @require_session + def consolidate_page(): + hid = current_household_id() + if not hid: + return redirect(url_for("connect_mealie_get")) + latest = db.latest_consolidate_job_for_household(hid) + return render_template( + "consolidate.html", active="consolidate", latest_job=latest, + ) + + @app.post("/api/foods/consolidate-start") + @require_session + def consolidate_start(): + u = session["user"] + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + active = db.running_consolidate_job_for_household(hid) + if active: + return jsonify({"error": "already_running", "job_id": active["id"]}), 409 + client = current_user_mealie() + if client is None: + return redirect(url_for("connect_mealie_get")) + job_id = db.create_consolidate_job(household_id=hid, started_by_sub=u["sub"]) + consolidate_foods.spawn_walk_thread( + db=db, job_id=job_id, mealie=client, forge=forge, + ) + return jsonify({"ok": True, "job_id": job_id}) + + @app.get("/api/foods/consolidate-status") + @require_session + def consolidate_status(): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.latest_consolidate_job_for_household(hid) + if not job: + return jsonify({"job": None}) + return jsonify({"job": _consolidate_job_payload(job)}) + + @app.get("/api/foods/consolidate-jobs//proposals") + @require_session + def consolidate_proposals(job_id: int): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.get_consolidate_job(job_id) + if not job or job["household_id"] != hid: + return jsonify({"error": "not_found"}), 404 + rows = db.list_consolidate_proposals(job_id) + for p in rows: + for k in ("cluster_json", "sonnet_decision"): + v = p.get(k) + if isinstance(v, str): + try: + p[k] = _json_loads(v) + except Exception: + p[k] = None + return jsonify({ + "job": _consolidate_job_payload(job), + "proposals": rows, + }) + + @app.post("/api/foods/consolidate-apply/") + @require_session + def consolidate_apply(job_id: int): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.get_consolidate_job(job_id) + if not job or job["household_id"] != hid: + return jsonify({"error": "not_found"}), 404 + if job["state"] != "review": + return jsonify({"error": f"bad_state:{job['state']}"}), 409 + body = request.get_json(silent=True) or {} + approved_ids_raw = body.get("approved_ids") or [] + approved_ids = [int(x) for x in approved_ids_raw if isinstance(x, (int, str)) and str(x).isdigit()] + client = current_user_mealie() + if client is None: + return redirect(url_for("connect_mealie_get")) + db.bulk_set_consolidate_approvals(job_id, approved_ids) + db.finalize_consolidate_job(job_id, state="applying") + consolidate_foods.spawn_apply_thread(db=db, job_id=job_id, mealie=client) + return jsonify({"ok": True, "approved_count": len(approved_ids)}) + + @app.post("/api/foods/consolidate-cancel/") + @require_session + def consolidate_cancel(job_id: int): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.get_consolidate_job(job_id) + if not job or job["household_id"] != hid: + return jsonify({"error": "not_found"}), 404 + if job["state"] not in ("running", "review", "applying"): + return jsonify({"error": f"bad_state:{job['state']}"}), 409 + db.finalize_consolidate_job(job_id, state="cancelled") + return jsonify({"ok": True}) + + # admin variants for kayos kick-off + @app.post("/api/admin/foods/consolidate-start") + @require_bearer + def admin_consolidate_start(): + body = request.get_json(silent=True) or {} + sub = (body.get("started_by_sub") or "").strip() + if not sub: + return jsonify({"error": "started_by_sub required"}), 400 + hid = db.get_user_household_id(sub) + if not hid: + return jsonify({"error": "user has no household"}), 404 + active = db.running_consolidate_job_for_household(hid) + if active: + return jsonify({"error": "already_running", "job_id": active["id"]}), 409 + blob = db.get_user_mealie_token_blob(sub) + if not blob: + return jsonify({"error": "user_not_connected_to_mealie"}), 409 + try: + tok = crypto.decrypt(blob) + except Exception: + return jsonify({"error": "user_token_undecryptable"}), 500 + mealie = Mealie(base_url=cfg.mealie_api_url, api_token=tok) + job_id = db.create_consolidate_job(household_id=hid, started_by_sub=sub) + consolidate_foods.spawn_walk_thread(db=db, job_id=job_id, mealie=mealie, forge=forge) + return jsonify({"ok": True, "job_id": job_id}) + + @app.get("/api/admin/foods/consolidate-jobs/") + @require_bearer + def admin_consolidate_status(job_id: int): + job = db.get_consolidate_job(job_id) + if not job: + return jsonify({"error": "not_found"}), 404 + return jsonify({"job": _consolidate_job_payload(job)}) + + @app.post("/api/admin/foods/consolidate-cancel/") + @require_bearer + def admin_consolidate_cancel(job_id: int): + job = db.get_consolidate_job(job_id) + if not job: + return jsonify({"error": "not_found"}), 404 + if job["state"] not in ("running", "review", "applying"): + return jsonify({"error": f"bad_state:{job['state']}"}), 409 + db.finalize_consolidate_job(job_id, state="cancelled") + return jsonify({"ok": True}) + # ---------- admin sterilizer (bearer-auth, kick off on user's behalf) - @app.post("/api/admin/sterilize/bulk-start") @@ -1244,6 +1398,17 @@ def _job_payload(job: dict) -> dict: return j +def _consolidate_job_payload(job: dict) -> dict: + """Same shape as _job_payload — kept separate for clarity since the + consolidate job has different counter columns than sterilize.""" + j = dict(job) + for k in ("started_at", "last_progress_at", "finished_at"): + v = j.get(k) + if v is not None and hasattr(v, "isoformat"): + j[k] = v.isoformat() + return j + + def _plan_payload(plan: dict) -> dict: """JSON-serializable view of a plan dict (datetimes → iso strings).""" p = dict(plan) diff --git a/cauldron/templates/consolidate.html b/cauldron/templates/consolidate.html new file mode 100644 index 0000000..e143ccc --- /dev/null +++ b/cauldron/templates/consolidate.html @@ -0,0 +1,343 @@ +{% extends "_base.html" %} +{% block title %}Consolidate · Cauldron{% endblock %} +{% block content %} + + + +
+
// consolidate · merge duplicate foods in mealie
+

foods consolidate

+
+ scan your mealie household foods, cluster lookalikes by similarity, + let sonnet pick the survivor + the rest become aliases on it. mealie + rewrites every recipe ingredient to point at the canonical row. +
+
+ +
+
+

state

+ loading… + +
+ + + + + + + + + + +
+ + + +{% endblock %} diff --git a/cauldron/templates/me.html b/cauldron/templates/me.html index 9a8ab20..2f648ca 100644 --- a/cauldron/templates/me.html +++ b/cauldron/templates/me.html @@ -57,6 +57,8 @@

mealie's parser is per-recipe; this kicks off a bulk pass over your whole library. review proposals, apply the good ones.

🪄 bulk sterilize recipes →

+

scan your foods table for dupes, ask sonnet to pick canonicals, merge in mealie. one-time cleanup; aliases get attached to the survivors so the parser fuzzy-matches variants from now on.

+

🔮 consolidate foods table →

{% endif %}