diff --git a/cauldron/db.py b/cauldron/db.py index 71c85f5..a2f19ae 100644 --- a/cauldron/db.py +++ b/cauldron/db.py @@ -355,6 +355,50 @@ MIGRATIONS = [ """ DROP TABLE IF EXISTS cauldron_pick_points """, + # 021 — Recipe-dedupe bulk job state (mirrors the consolidate pattern + # but for recipes themselves — name + ingredient similarity → Sonnet + # decides → user-confirms → DELETE via Mealie API). + """ + CREATE TABLE IF NOT EXISTS cauldron_recipe_dedupe_jobs ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + household_id BIGINT NOT NULL, + started_by_sub VARCHAR(190) NOT NULL, + started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + finished_at DATETIME, + total_clusters INT NOT NULL DEFAULT 0, + processed_count INT NOT NULL DEFAULT 0, + deleted_count INT NOT NULL DEFAULT 0, + error_count INT NOT NULL DEFAULT 0, + current_cluster VARCHAR(255), + last_error VARCHAR(500), + state ENUM('running','review','applying','done','failed','cancelled') + NOT NULL DEFAULT 'running', + INDEX idx_household_state (household_id, state), + FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE, + FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """, + # 022 — Per-cluster recipe-dedupe proposal. cluster_json holds the + # full recipe summaries (slug+name+ingredients+source); sonnet_decision + # = {duplicates: bool, canonical_slug, delete_slugs: [...], reason}. + """ + CREATE TABLE IF NOT EXISTS cauldron_recipe_dedupe_proposals ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + job_id BIGINT NOT NULL, + cluster_key VARCHAR(255) NOT NULL, + cluster_size INT NOT NULL DEFAULT 0, + cluster_json JSON, + sonnet_decision JSON, + approved BOOLEAN, + applied_at DATETIME, + apply_error VARCHAR(500), + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uk_job_cluster (job_id, cluster_key), + INDEX idx_approved (job_id, approved), + FOREIGN KEY (job_id) REFERENCES cauldron_recipe_dedupe_jobs(id) ON DELETE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """, ] @@ -1273,6 +1317,182 @@ class DB: (proposal_id,), ) + # --- recipe-dedupe jobs ------------------------------------------------ + + def create_recipe_dedupe_job(self, *, household_id: int, started_by_sub: str) -> int: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """INSERT INTO cauldron_recipe_dedupe_jobs + (household_id, started_by_sub, state) + VALUES (%s, %s, 'running')""", + (household_id, started_by_sub), + ) + return cur.lastrowid + + def get_recipe_dedupe_job(self, job_id: int) -> dict | None: + with self.conn() as c, c.cursor() as cur: + cur.execute("SELECT * FROM cauldron_recipe_dedupe_jobs WHERE id=%s", (job_id,)) + return cur.fetchone() + + def get_recipe_dedupe_job_state(self, job_id: int) -> str | None: + with self.conn() as c, c.cursor() as cur: + cur.execute("SELECT state FROM cauldron_recipe_dedupe_jobs WHERE id=%s", (job_id,)) + row = cur.fetchone() + return row["state"] if row else None + + def latest_recipe_dedupe_job_for_household(self, household_id: int) -> dict | None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT * FROM cauldron_recipe_dedupe_jobs + WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""", + (household_id,), + ) + return cur.fetchone() + + def running_recipe_dedupe_job_for_household(self, household_id: int) -> dict | None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT * FROM cauldron_recipe_dedupe_jobs + WHERE household_id=%s AND state IN ('running','applying') + ORDER BY started_at DESC LIMIT 1""", + (household_id,), + ) + return cur.fetchone() + + def update_recipe_dedupe_job_progress( + self, + job_id: int, + *, + processed_delta: int = 0, + deleted_delta: int = 0, + error_delta: int = 0, + current_cluster: str | None = None, + last_error: str | None = None, + ) -> None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_recipe_dedupe_jobs + SET processed_count = processed_count + %s, + deleted_count = deleted_count + %s, + error_count = error_count + %s, + current_cluster = COALESCE(%s, current_cluster), + last_error = COALESCE(%s, last_error), + last_progress_at = NOW() + WHERE id=%s""", + (processed_delta, deleted_delta, error_delta, + current_cluster, last_error, job_id), + ) + + def finalize_recipe_dedupe_job(self, job_id: int, *, state: str) -> None: + """Same anti-zombie guard as the others — won't overwrite terminal.""" + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_recipe_dedupe_jobs + SET state=%s, + finished_at = CASE WHEN %s IN ('done','failed','cancelled') + THEN NOW() ELSE finished_at END, + last_progress_at = NOW(), + current_cluster = NULL + WHERE id=%s + AND state NOT IN ('done','failed','cancelled')""", + (state, state, job_id), + ) + + def insert_recipe_dedupe_proposal( + self, + *, + job_id: int, + cluster_key: str, + cluster: list[dict], + decision: dict | None, + error: str | None, + ) -> None: + import json as _j + with self.conn() as c, c.cursor() as cur: + cur.execute( + """INSERT INTO cauldron_recipe_dedupe_proposals + (job_id, cluster_key, cluster_size, cluster_json, + sonnet_decision, apply_error) + VALUES (%s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + cluster_size=VALUES(cluster_size), + cluster_json=VALUES(cluster_json), + sonnet_decision=VALUES(sonnet_decision), + apply_error=VALUES(apply_error)""", + ( + job_id, cluster_key[:255], len(cluster), + _j.dumps(cluster, ensure_ascii=False, default=str), + _j.dumps(decision, ensure_ascii=False) if decision else None, + (error or "")[:500] or None, + ), + ) + + def list_recipe_dedupe_proposals(self, job_id: int) -> list[dict]: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT id, cluster_key, cluster_size, cluster_json, + sonnet_decision, approved, applied_at, apply_error + FROM cauldron_recipe_dedupe_proposals + WHERE job_id=%s + ORDER BY cluster_size DESC, cluster_key""", + (job_id,), + ) + return [dict(r) for r in cur.fetchall()] + + def list_approved_unapplied_recipe_dedupe(self, job_id: int) -> list[dict]: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """SELECT id, cluster_key, cluster_size, sonnet_decision + FROM cauldron_recipe_dedupe_proposals + WHERE job_id=%s AND approved=1 AND applied_at IS NULL""", + (job_id,), + ) + return [dict(r) for r in cur.fetchall()] + + def bulk_set_recipe_dedupe_approvals(self, job_id: int, approved_ids: list[int]) -> None: + with self.conn() as c, c.cursor() as cur: + cur.execute( + "UPDATE cauldron_recipe_dedupe_proposals SET approved=0 WHERE job_id=%s", + (job_id,), + ) + if approved_ids: + placeholders = ",".join(["%s"] * len(approved_ids)) + cur.execute( + f"""UPDATE cauldron_recipe_dedupe_proposals SET approved=1 + WHERE job_id=%s AND id IN ({placeholders})""", + (job_id, *approved_ids), + ) + + def mark_recipe_dedupe_proposal_applied( + self, proposal_id: int, *, error: str | None = None + ) -> None: + if error: + with self.conn() as c, c.cursor() as cur: + cur.execute( + "UPDATE cauldron_recipe_dedupe_proposals SET apply_error=%s WHERE id=%s", + (error[:500], proposal_id), + ) + else: + with self.conn() as c, c.cursor() as cur: + cur.execute( + "UPDATE cauldron_recipe_dedupe_proposals SET applied_at=NOW(), apply_error=NULL WHERE id=%s", + (proposal_id,), + ) + + def fail_stuck_recipe_dedupe_jobs(self, *, stale_minutes: int = 15) -> int: + with self.conn() as c, c.cursor() as cur: + cur.execute( + """UPDATE cauldron_recipe_dedupe_jobs + SET state='failed', + finished_at=NOW(), + last_error=COALESCE(last_error, + 'recovery: worker exited mid-run') + WHERE state IN ('running','applying') + AND last_progress_at < NOW() - INTERVAL %s MINUTE""", + (stale_minutes,), + ) + return cur.rowcount + def fail_stuck_consolidate_jobs(self, *, stale_minutes: int = 15) -> int: with self.conn() as c, c.cursor() as cur: cur.execute( diff --git a/cauldron/dedupe_recipes.py b/cauldron/dedupe_recipes.py new file mode 100644 index 0000000..c2346dc --- /dev/null +++ b/cauldron/dedupe_recipes.py @@ -0,0 +1,297 @@ +"""Recipe dedupe — find and delete duplicate recipes household-scoped. + +Walk: + 1. Pull all recipe summaries for the user's household + 2. Cluster by name similarity (token_set_ratio >= NAME_THRESHOLD) + 3. For each multi-recipe cluster, fetch full bodies + build a summary + (slug, name, source_url, ingredient_summary, step_count, yields) + 4. Ask Sonnet via forge.recipe_dedupe_decision — get + {duplicates, canonical_slug, delete_slugs, reason} + 5. Persist the proposal + +Apply: + 1. For each approved proposal where duplicates=true: + 2. DELETE each delete_slug via Mealie API + 3. Mark proposal applied; on any failure record + continue + +Same daemon-thread / cancel-respect / stuck-recovery pattern as the +sterilize and consolidate runners. +""" +from __future__ import annotations + +import json +import logging +import threading +from typing import Optional + +from rapidfuzz import fuzz + +from .db import DB +from .forge import Forge, ForgeError +from .mealie import Mealie, MealieError + +log = logging.getLogger(__name__) + +NAME_THRESHOLD = 85 + + +def _household_id_for(mealie: Mealie) -> str | None: + me = mealie.who_am_i() + hid = me.get("householdId") or me.get("household_id") + if not hid: + h = me.get("household") + if isinstance(h, dict): + hid = h.get("id") + return hid + + +def _all_recipes(mealie: Mealie) -> list[dict]: + out: list[dict] = [] + page = 1 + while page <= 50: + resp = mealie.list_recipes(page=page, per_page=100) + items = resp.get("items") or [] + for item in items: + out.append(item) + tp = resp.get("total_pages") or resp.get("totalPages") or 1 + if not items or page >= tp: + break + page += 1 + return out + + +def _recipe_household_id(recipe: dict) -> str | None: + hid = recipe.get("householdId") or recipe.get("household_id") + if hid: + return hid + h = recipe.get("household") + if isinstance(h, dict): + return h.get("id") + return None + + +def _filter_to_household(recipes: list[dict], household_id: str) -> list[dict]: + if not household_id: + return recipes + out = [] + for r in recipes: + hh = _recipe_household_id(r) + if not hh or hh == household_id: + out.append(r) + return out + + +def _cluster_by_name(recipes: list[dict], threshold: int = NAME_THRESHOLD) -> list[list[dict]]: + """Single-link agglomerative on rapidfuzz token_set_ratio. Returns + clusters of size >= 2. ~250 recipes = ~30K comparisons, runs instantly.""" + n = len(recipes) + parent = list(range(n)) + + def find(x): + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(a, b): + ra, rb = find(a), find(b) + if ra != rb: + parent[ra] = rb + + names = [(r.get("name") or "").strip().lower() for r in recipes] + for i in range(n): + if not names[i]: + continue + for j in range(i + 1, n): + if not names[j]: + continue + score = fuzz.token_set_ratio(names[i], names[j]) + if score >= threshold: + union(i, j) + + groups: dict[int, list[dict]] = {} + for i in range(n): + r = find(i) + groups.setdefault(r, []).append(recipes[i]) + return [g for g in groups.values() if len(g) >= 2] + + +def _summarize_recipe(full: dict) -> dict: + """Build the lean summary we hand to Sonnet for the dedupe decision.""" + ings = full.get("recipeIngredient") or [] + ing_list: list[str] = [] + for i in ings[:30]: + food = (i.get("food") or {}).get("name") if isinstance(i.get("food"), dict) else None + if food: + ing_list.append(food) + else: + note = (i.get("note") or "").strip() + if note: + ing_list.append(note[:60]) + return { + "slug": full.get("slug"), + "name": full.get("name"), + "source_url": full.get("orgURL") or full.get("originalUrl") or "", + "ingredient_summary": ing_list, + "step_count": len(full.get("recipeInstructions") or []), + "yields": (full.get("recipeYield") or "").strip(), + } + + +def _cluster_key(cluster: list[dict]) -> str: + slugs = sorted((r.get("slug") or "") for r in cluster) + return "|".join(slugs)[:255] + + +def run_walk(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> None: + log.info("[dedupe-recipes:%s] walk start", job_id) + + def _cancelled() -> bool: + s = db.get_recipe_dedupe_job_state(job_id) + return s in ("cancelled", "failed", "done") + + try: + hh = _household_id_for(mealie) + slim = _filter_to_household(_all_recipes(mealie), hh) + log.info("[dedupe-recipes:%s] household=%s recipes=%d", job_id, hh, len(slim)) + + clusters_slim = _cluster_by_name(slim) + log.info("[dedupe-recipes:%s] name-clusters≥2: %d", job_id, len(clusters_slim)) + + with db.conn() as c, c.cursor() as cur: + cur.execute( + "UPDATE cauldron_recipe_dedupe_jobs SET total_clusters=%s WHERE id=%s", + (len(clusters_slim), job_id), + ) + + for slim_cluster in clusters_slim: + if _cancelled(): + log.info("[dedupe-recipes:%s] walk aborted (state changed)", job_id) + return + + # Fetch full recipe bodies for this cluster (cluster-sized, + # not catalog-sized — cheap) + full_cluster: list[dict] = [] + for r in slim_cluster: + slug = r.get("slug") + if not slug: + continue + try: + full_cluster.append(mealie.get_recipe(slug)) + except MealieError as e: + log.warning("[dedupe-recipes:%s] get_recipe(%s): %s", job_id, slug, e) + + if len(full_cluster) < 2: + continue + + key = _cluster_key(full_cluster) + db.update_recipe_dedupe_job_progress(job_id, current_cluster=key[:80]) + + summaries = [_summarize_recipe(r) for r in full_cluster] + try: + decision = forge.recipe_dedupe_decision(summaries) + except (ForgeError, RuntimeError) as e: + msg = str(e)[:500] + log.warning("[dedupe-recipes:%s] dedupe_decision: %s", job_id, msg) + db.insert_recipe_dedupe_proposal( + job_id=job_id, cluster_key=key, cluster=summaries, + decision=None, error=msg, + ) + db.update_recipe_dedupe_job_progress(job_id, error_delta=1, last_error=msg) + continue + + db.insert_recipe_dedupe_proposal( + job_id=job_id, cluster_key=key, cluster=summaries, + decision=decision, error=None, + ) + db.update_recipe_dedupe_job_progress(job_id, processed_delta=1) + + db.finalize_recipe_dedupe_job(job_id, state="review") + log.info("[dedupe-recipes:%s] walk done; awaiting review", job_id) + except Exception: + log.exception("[dedupe-recipes:%s] walk crashed", job_id) + try: + db.finalize_recipe_dedupe_job(job_id, state="failed") + except Exception: + pass + + +def run_apply(*, db: DB, job_id: int, mealie: Mealie) -> None: + log.info("[dedupe-recipes:%s] apply start", job_id) + + def _cancelled() -> bool: + s = db.get_recipe_dedupe_job_state(job_id) + return s in ("cancelled", "failed", "done") + + try: + approved = db.list_approved_unapplied_recipe_dedupe(job_id) + for row in approved: + if _cancelled(): + log.info("[dedupe-recipes:%s] apply aborted", job_id) + return + decision = row.get("sonnet_decision") or {} + if isinstance(decision, str): + try: + decision = json.loads(decision) + except Exception: + decision = {} + if not decision.get("duplicates"): + db.mark_recipe_dedupe_proposal_applied( + row["id"], error="cluster decision was 'not duplicates' but row was approved", + ) + continue + delete_slugs = decision.get("delete_slugs") or [] + if not delete_slugs: + db.mark_recipe_dedupe_proposal_applied( + row["id"], error="missing delete_slugs", + ) + continue + db.update_recipe_dedupe_job_progress( + job_id, current_cluster=row.get("cluster_key", "")[:80] + ) + err: str | None = None + for slug in delete_slugs: + try: + mealie.delete_recipe(slug) + db.update_recipe_dedupe_job_progress(job_id, deleted_delta=1) + except MealieError as e: + err = f"delete {slug}: {e}" + log.warning("[dedupe-recipes:%s] %s", job_id, err) + break + if err: + db.mark_recipe_dedupe_proposal_applied(row["id"], error=err) + db.update_recipe_dedupe_job_progress( + job_id, error_delta=1, last_error=err, + ) + else: + db.mark_recipe_dedupe_proposal_applied(row["id"]) + db.finalize_recipe_dedupe_job(job_id, state="done") + log.info("[dedupe-recipes:%s] apply done", job_id) + except Exception: + log.exception("[dedupe-recipes:%s] apply crashed", job_id) + try: + db.finalize_recipe_dedupe_job(job_id, state="failed") + except Exception: + pass + + +def spawn_walk_thread(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> threading.Thread: + t = threading.Thread( + target=run_walk, + kwargs={"db": db, "job_id": job_id, "mealie": mealie, "forge": forge}, + name=f"dedupe-recipes-walk-{job_id}", + daemon=True, + ) + t.start() + return t + + +def spawn_apply_thread(*, db: DB, job_id: int, mealie: Mealie) -> threading.Thread: + t = threading.Thread( + target=run_apply, + kwargs={"db": db, "job_id": job_id, "mealie": mealie}, + name=f"dedupe-recipes-apply-{job_id}", + daemon=True, + ) + t.start() + return t diff --git a/cauldron/forge.py b/cauldron/forge.py index 2e672dc..3cb019f 100644 --- a/cauldron/forge.py +++ b/cauldron/forge.py @@ -215,6 +215,67 @@ class Forge: ) + def recipe_dedupe_decision( + self, recipes: list[dict], *, model: str | None = None + ) -> dict: + """Ask Sonnet whether a cluster of similar-named recipes are + actually duplicates (same recipe imported twice / hand-copied with + a slight title tweak / etc) versus distinct recipes that just + happen to look similar by name. + + Input: list of recipe summaries — {slug, name, source_url, + ingredient_summary (concise list), step_count, yields}. + + Returns: + {"duplicates": bool, + "canonical_slug": "", + "delete_slugs": ["", ...], + "reason": ""} + + duplicates=false means the cluster is a false positive and nothing + should be deleted. canonical_slug + delete_slugs must be empty in + that case. Be conservative — when in doubt return false.""" + items = [ + { + "slug": r.get("slug"), + "name": r.get("name"), + "source_url": r.get("source_url") or "", + "ingredient_summary": r.get("ingredient_summary") or [], + "step_count": r.get("step_count") or 0, + "yields": r.get("yields") or "", + } + for r in recipes + ] + prompt = ( + "You are deciding whether a cluster of similar-named recipes " + "are actual duplicates (same recipe imported or hand-copied " + "twice) or distinct recipes that share words in the title.\n\n" + f"Cluster:\n{json.dumps(items, indent=2)}\n\n" + "Output JSON ONLY, no prose: " + '{"duplicates": true|false, ' + '"canonical_slug": "", ' + '"delete_slugs": ["", ...], ' + '"reason": ""}\n\n' + "Rules:\n" + "- duplicates=true ONLY when the recipes are clearly the same " + " dish prepared the same way (matching ingredient sets, similar " + " step counts, often shared source_url). Slight title variations " + " ('Banana Bread' vs 'Best Banana Bread') with same body = dupes.\n" + "- Pick canonical_slug = the recipe with the cleanest name, the " + " most complete data (more steps + yields filled in beats less). " + " When tied, pick the older one (lexicographic slug order is fine " + " since Mealie slugs include date-ish suffixes for dupes).\n" + "- delete_slugs = the OTHER cluster members. Mealie DELETE removes " + " them permanently — only suggest deletion when you're confident.\n" + "- duplicates=false when ingredient sets differ meaningfully, OR " + " when names suggest distinct dishes ('Chicken Stir Fry' vs " + " 'Chicken Fajitas'), OR when you genuinely cannot tell.\n" + "- Be CONSERVATIVE — false negatives are recoverable (recipes " + " stay), false positives delete data." + ) + result = self.run(prompt, model=model or "sonnet", timeout_secs=60) + return _extract_recipe_dedupe_decision(result) + def cluster_decision( self, foods: list[dict], *, model: str | None = None ) -> dict: @@ -310,6 +371,32 @@ class Forge: return _extract_food_info(result) +def _extract_recipe_dedupe_decision(forge_result: dict) -> dict: + if not isinstance(forge_result, dict): + raise ForgeError("forge result not a dict") + inner = forge_result.get("result", forge_result) + if isinstance(inner, str): + inner = _parse_json_blob(inner) + if not isinstance(inner, dict): + raise ForgeError(f"recipe dedupe decision not a dict: {str(inner)[:200]}") + + duplicates = bool(inner.get("duplicates")) + canonical_slug = str(inner.get("canonical_slug") or "") + delete_raw = inner.get("delete_slugs") or [] + delete_slugs = [str(x) for x in delete_raw if isinstance(x, str) and x.strip()] + reason = str(inner.get("reason") or "")[:500] + + if not duplicates: + canonical_slug = "" + delete_slugs = [] + return { + "duplicates": duplicates, + "canonical_slug": canonical_slug, + "delete_slugs": delete_slugs, + "reason": reason, + } + + def _extract_cluster_decision(forge_result: dict) -> dict: if not isinstance(forge_result, dict): raise ForgeError("forge result not a dict") diff --git a/cauldron/mealie.py b/cauldron/mealie.py index 4f1f296..c3bb246 100644 --- a/cauldron/mealie.py +++ b/cauldron/mealie.py @@ -105,6 +105,21 @@ class Mealie: def update_recipe(self, slug: str, body: dict) -> dict: return self._put(f"/api/recipes/{slug}", body) + def delete_recipe(self, slug: str) -> dict: + """DELETE /api/recipes/. Permanently removes the recipe and + its recipe_ingredient rows. Permission-scoped per-household. + Returns Mealie's response dict (often the deleted recipe summary).""" + try: + r = self.session.delete(f"{self.base_url}/api/recipes/{slug}", timeout=30) + except requests.RequestException as e: + raise MealieError(f"DELETE /api/recipes/{slug} transport: {e}") from e + if r.status_code >= 400: + raise MealieError(f"DELETE /api/recipes/{slug} -> {r.status_code}: {r.text[:300]}") + try: + return r.json() + except Exception: + return {} + # --- foods / units ------------------------------------------------------ def list_foods(self, *, search: str | None = None, per_page: int = 200) -> dict: diff --git a/cauldron/server.py b/cauldron/server.py index 42e59f6..ed264b7 100644 --- a/cauldron/server.py +++ b/cauldron/server.py @@ -33,7 +33,7 @@ from .config import load from .crypto import TokenCrypto from .db import DB from .forge import Forge, ForgeError -from . import aggregator, bulk_sterilize, consolidate_foods, foods +from . import aggregator, bulk_sterilize, consolidate_foods, dedupe_recipes, foods from .mealie import Mealie, MealieError from .oidc import init_oauth from .recipe_index import flatten_recipe, refresh_household_index, search_index @@ -118,6 +118,13 @@ def create_app() -> Flask: except Exception as e: app.logger.warning("consolidate stuck-job recovery failed: %s", e) + try: + n_failed = db.fail_stuck_recipe_dedupe_jobs(stale_minutes=15) + if n_failed: + app.logger.info("failed %d stuck recipe-dedupe jobs at boot", n_failed) + except Exception as e: + app.logger.warning("recipe-dedupe stuck-job recovery failed: %s", e) + oauth = init_oauth( app, issuer=cfg.oidc_issuer, @@ -1023,6 +1030,139 @@ def create_app() -> Flask: db.finalize_sterilize_job(job_id, state="cancelled") return jsonify({"ok": True}) + # ---------- recipe dedupe ------------------------------------------ + + @app.get("/dedupe-recipes") + @require_session + def dedupe_recipes_page(): + hid = current_household_id() + if not hid: + return redirect(url_for("connect_mealie_get")) + latest = db.latest_recipe_dedupe_job_for_household(hid) + return render_template( + "dedupe_recipes.html", active="dedupe", latest_job=latest, + ) + + @app.post("/api/recipes/dedupe-start") + @require_session + def dedupe_recipes_start(): + u = session["user"] + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + active = db.running_recipe_dedupe_job_for_household(hid) + if active: + return jsonify({"error": "already_running", "job_id": active["id"]}), 409 + client = current_user_mealie() + if client is None: + return redirect(url_for("connect_mealie_get")) + job_id = db.create_recipe_dedupe_job(household_id=hid, started_by_sub=u["sub"]) + dedupe_recipes.spawn_walk_thread(db=db, job_id=job_id, mealie=client, forge=forge) + return jsonify({"ok": True, "job_id": job_id}) + + @app.get("/api/recipes/dedupe-status") + @require_session + def dedupe_recipes_status(): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.latest_recipe_dedupe_job_for_household(hid) + if not job: + return jsonify({"job": None}) + return jsonify({"job": _consolidate_job_payload(job)}) + + @app.get("/api/recipes/dedupe-jobs//proposals") + @require_session + def dedupe_recipes_proposals(job_id: int): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.get_recipe_dedupe_job(job_id) + if not job or job["household_id"] != hid: + return jsonify({"error": "not_found"}), 404 + rows = db.list_recipe_dedupe_proposals(job_id) + for p in rows: + for k in ("cluster_json", "sonnet_decision"): + v = p.get(k) + if isinstance(v, str): + try: + p[k] = _json_loads(v) + except Exception: + p[k] = None + return jsonify({ + "job": _consolidate_job_payload(job), + "proposals": rows, + }) + + @app.post("/api/recipes/dedupe-apply/") + @require_session + def dedupe_recipes_apply(job_id: int): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.get_recipe_dedupe_job(job_id) + if not job or job["household_id"] != hid: + return jsonify({"error": "not_found"}), 404 + if job["state"] != "review": + return jsonify({"error": f"bad_state:{job['state']}"}), 409 + body = request.get_json(silent=True) or {} + approved_ids_raw = body.get("approved_ids") or [] + approved_ids = [int(x) for x in approved_ids_raw if isinstance(x, (int, str)) and str(x).isdigit()] + client = current_user_mealie() + if client is None: + return redirect(url_for("connect_mealie_get")) + db.bulk_set_recipe_dedupe_approvals(job_id, approved_ids) + db.finalize_recipe_dedupe_job(job_id, state="applying") + dedupe_recipes.spawn_apply_thread(db=db, job_id=job_id, mealie=client) + return jsonify({"ok": True, "approved_count": len(approved_ids)}) + + @app.post("/api/recipes/dedupe-cancel/") + @require_session + def dedupe_recipes_cancel(job_id: int): + hid = current_household_id() + if not hid: + return jsonify({"error": "no household"}), 409 + job = db.get_recipe_dedupe_job(job_id) + if not job or job["household_id"] != hid: + return jsonify({"error": "not_found"}), 404 + if job["state"] not in ("running", "review", "applying"): + return jsonify({"error": f"bad_state:{job['state']}"}), 409 + db.finalize_recipe_dedupe_job(job_id, state="cancelled") + return jsonify({"ok": True}) + + @app.post("/api/admin/recipes/dedupe-start") + @require_bearer + def admin_dedupe_recipes_start(): + body = request.get_json(silent=True) or {} + sub = (body.get("started_by_sub") or "").strip() + if not sub: + return jsonify({"error": "started_by_sub required"}), 400 + hid = db.get_user_household_id(sub) + if not hid: + return jsonify({"error": "user has no household"}), 404 + active = db.running_recipe_dedupe_job_for_household(hid) + if active: + return jsonify({"error": "already_running", "job_id": active["id"]}), 409 + blob = db.get_user_mealie_token_blob(sub) + if not blob: + return jsonify({"error": "user_not_connected_to_mealie"}), 409 + try: + tok = crypto.decrypt(blob) + except Exception: + return jsonify({"error": "user_token_undecryptable"}), 500 + mealie = Mealie(base_url=cfg.mealie_api_url, api_token=tok) + job_id = db.create_recipe_dedupe_job(household_id=hid, started_by_sub=sub) + dedupe_recipes.spawn_walk_thread(db=db, job_id=job_id, mealie=mealie, forge=forge) + return jsonify({"ok": True, "job_id": job_id}) + + @app.get("/api/admin/recipes/dedupe-jobs/") + @require_bearer + def admin_dedupe_recipes_status(job_id: int): + job = db.get_recipe_dedupe_job(job_id) + if not job: + return jsonify({"error": "not_found"}), 404 + return jsonify({"job": _consolidate_job_payload(job)}) + # ---------- foods consolidator (Step 3) ------------------------------ @app.get("/consolidate") diff --git a/cauldron/templates/dedupe_recipes.html b/cauldron/templates/dedupe_recipes.html new file mode 100644 index 0000000..8917280 --- /dev/null +++ b/cauldron/templates/dedupe_recipes.html @@ -0,0 +1,337 @@ +{% extends "_base.html" %} +{% block title %}Dedupe Recipes · Cauldron{% endblock %} +{% block content %} + + + +
+
// dedupe · find duplicate recipes
+

recipe dedupe

+
+ scan your household recipes for duplicates by name similarity. sonnet + looks at ingredients + step counts + source URLs to decide whether + similar-named recipes are actually the same dish. you confirm per + cluster — DELETE in Mealie is permanent. +
+
+ +
+
+

state

+ loading… + +
+ + + + + + + + + + +
+ + + +{% endblock %} diff --git a/cauldron/templates/me.html b/cauldron/templates/me.html index 2f648ca..3d16dc6 100644 --- a/cauldron/templates/me.html +++ b/cauldron/templates/me.html @@ -59,6 +59,9 @@

🪄 bulk sterilize recipes →

scan your foods table for dupes, ask sonnet to pick canonicals, merge in mealie. one-time cleanup; aliases get attached to the survivors so the parser fuzzy-matches variants from now on.

🔮 consolidate foods table →

+ +

find duplicate recipes by name + ingredient similarity. sonnet picks the canonical to keep; you confirm per cluster before mealie deletes the others. permanent — review carefully.

+

🌀 dedupe recipes →

{% endif %}