recipe dedupe: cluster + Sonnet decide + DELETE via Mealie

New bulk job at /dedupe-recipes (and /api/recipes/dedupe-* + admin
variants). Mirrors the consolidate_foods pattern but for recipes
themselves rather than the foods table:

Walk:
- Pull all household recipes via list_recipes (paginated, ~250 for Cobb)
- Cluster by name token_set_ratio ≥ 85
- For each multi-recipe cluster, fetch full bodies + build a summary
  (slug, name, source_url, ingredient_summary, step_count, yields)
- Ask Sonnet via forge.recipe_dedupe_decision: are these the same
  dish? canonical_slug + delete_slugs + reason
- Persist proposal

Apply:
- Per approved proposal: DELETE each delete_slug via Mealie API
- Mark applied / record error per cluster

Schema: migrations 021+022 (cauldron_recipe_dedupe_jobs +
cauldron_recipe_dedupe_proposals). Same state machine: running →
review → applying → done/failed/cancelled. Same daemon-thread runner
with cancel-respect + stuck-recovery.

Sonnet integration:
- recipe_dedupe_decision prompt is conservative-by-default. duplicates=
  false on the slightest doubt (different ingredient sets, suggestive
  name differences, etc). Picks canonical = cleanest name + most
  complete data + lex-older slug as tiebreaker.

Mealie integration:
- mealie.delete_recipe(slug) → DELETE /api/recipes/<slug>. Permanent.
  Permission-scoped per-household (cross-household will 403).

UI:
- /dedupe-recipes — same shape as /consolidate but with side-by-side
  recipe cells (canonical marked ★ KEEP, deletes marked × DELETE in
  red). Source URLs link out so user can sanity-check.
- DEFAULT TO NOT-APPROVED — recipe deletion is destructive, user must
  opt in per cluster. Bulk "approve all dupes" is one click but the
  apply confirm explicitly counts how many recipes will die.
- Linked from /me alongside sterilize + consolidate.

Cobb confirmed earlier: "we can't lose recipe data" — answered by
(1) conservative Sonnet decisions, (2) opt-in default, (3) explicit
permanent-deletion confirm, (4) same-pattern logging + DB audit trail
on every attempt.
This commit is contained in:
Kayos 2026-04-30 18:16:56 -07:00
parent 30928b482f
commit d48f70603b
7 changed files with 1100 additions and 1 deletions

View file

@ -355,6 +355,50 @@ MIGRATIONS = [
"""
DROP TABLE IF EXISTS cauldron_pick_points
""",
# 021 — Recipe-dedupe bulk job state (mirrors the consolidate pattern
# but for recipes themselves — name + ingredient similarity → Sonnet
# decides → user-confirms → DELETE via Mealie API).
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_dedupe_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
started_by_sub VARCHAR(190) NOT NULL,
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
total_clusters INT NOT NULL DEFAULT 0,
processed_count INT NOT NULL DEFAULT 0,
deleted_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
current_cluster VARCHAR(255),
last_error VARCHAR(500),
state ENUM('running','review','applying','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_household_state (household_id, state),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 022 — Per-cluster recipe-dedupe proposal. cluster_json holds the
# full recipe summaries (slug+name+ingredients+source); sonnet_decision
# = {duplicates: bool, canonical_slug, delete_slugs: [...], reason}.
"""
CREATE TABLE IF NOT EXISTS cauldron_recipe_dedupe_proposals (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_id BIGINT NOT NULL,
cluster_key VARCHAR(255) NOT NULL,
cluster_size INT NOT NULL DEFAULT 0,
cluster_json JSON,
sonnet_decision JSON,
approved BOOLEAN,
applied_at DATETIME,
apply_error VARCHAR(500),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_job_cluster (job_id, cluster_key),
INDEX idx_approved (job_id, approved),
FOREIGN KEY (job_id) REFERENCES cauldron_recipe_dedupe_jobs(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
]
@ -1273,6 +1317,182 @@ class DB:
(proposal_id,),
)
# --- recipe-dedupe jobs ------------------------------------------------
def create_recipe_dedupe_job(self, *, household_id: int, started_by_sub: str) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_recipe_dedupe_jobs
(household_id, started_by_sub, state)
VALUES (%s, %s, 'running')""",
(household_id, started_by_sub),
)
return cur.lastrowid
def get_recipe_dedupe_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT * FROM cauldron_recipe_dedupe_jobs WHERE id=%s", (job_id,))
return cur.fetchone()
def get_recipe_dedupe_job_state(self, job_id: int) -> str | None:
with self.conn() as c, c.cursor() as cur:
cur.execute("SELECT state FROM cauldron_recipe_dedupe_jobs WHERE id=%s", (job_id,))
row = cur.fetchone()
return row["state"] if row else None
def latest_recipe_dedupe_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_recipe_dedupe_jobs
WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def running_recipe_dedupe_job_for_household(self, household_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_recipe_dedupe_jobs
WHERE household_id=%s AND state IN ('running','applying')
ORDER BY started_at DESC LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def update_recipe_dedupe_job_progress(
self,
job_id: int,
*,
processed_delta: int = 0,
deleted_delta: int = 0,
error_delta: int = 0,
current_cluster: str | None = None,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_recipe_dedupe_jobs
SET processed_count = processed_count + %s,
deleted_count = deleted_count + %s,
error_count = error_count + %s,
current_cluster = COALESCE(%s, current_cluster),
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s""",
(processed_delta, deleted_delta, error_delta,
current_cluster, last_error, job_id),
)
def finalize_recipe_dedupe_job(self, job_id: int, *, state: str) -> None:
"""Same anti-zombie guard as the others — won't overwrite terminal."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_recipe_dedupe_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW(),
current_cluster = NULL
WHERE id=%s
AND state NOT IN ('done','failed','cancelled')""",
(state, state, job_id),
)
def insert_recipe_dedupe_proposal(
self,
*,
job_id: int,
cluster_key: str,
cluster: list[dict],
decision: dict | None,
error: str | None,
) -> None:
import json as _j
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""INSERT INTO cauldron_recipe_dedupe_proposals
(job_id, cluster_key, cluster_size, cluster_json,
sonnet_decision, apply_error)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
cluster_size=VALUES(cluster_size),
cluster_json=VALUES(cluster_json),
sonnet_decision=VALUES(sonnet_decision),
apply_error=VALUES(apply_error)""",
(
job_id, cluster_key[:255], len(cluster),
_j.dumps(cluster, ensure_ascii=False, default=str),
_j.dumps(decision, ensure_ascii=False) if decision else None,
(error or "")[:500] or None,
),
)
def list_recipe_dedupe_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, cluster_key, cluster_size, cluster_json,
sonnet_decision, approved, applied_at, apply_error
FROM cauldron_recipe_dedupe_proposals
WHERE job_id=%s
ORDER BY cluster_size DESC, cluster_key""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def list_approved_unapplied_recipe_dedupe(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT id, cluster_key, cluster_size, sonnet_decision
FROM cauldron_recipe_dedupe_proposals
WHERE job_id=%s AND approved=1 AND applied_at IS NULL""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def bulk_set_recipe_dedupe_approvals(self, job_id: int, approved_ids: list[int]) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_proposals SET approved=0 WHERE job_id=%s",
(job_id,),
)
if approved_ids:
placeholders = ",".join(["%s"] * len(approved_ids))
cur.execute(
f"""UPDATE cauldron_recipe_dedupe_proposals SET approved=1
WHERE job_id=%s AND id IN ({placeholders})""",
(job_id, *approved_ids),
)
def mark_recipe_dedupe_proposal_applied(
self, proposal_id: int, *, error: str | None = None
) -> None:
if error:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_proposals SET apply_error=%s WHERE id=%s",
(error[:500], proposal_id),
)
else:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_proposals SET applied_at=NOW(), apply_error=NULL WHERE id=%s",
(proposal_id,),
)
def fail_stuck_recipe_dedupe_jobs(self, *, stale_minutes: int = 15) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_recipe_dedupe_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error,
'recovery: worker exited mid-run')
WHERE state IN ('running','applying')
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount
def fail_stuck_consolidate_jobs(self, *, stale_minutes: int = 15) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(

297
cauldron/dedupe_recipes.py Normal file
View file

@ -0,0 +1,297 @@
"""Recipe dedupe — find and delete duplicate recipes household-scoped.
Walk:
1. Pull all recipe summaries for the user's household
2. Cluster by name similarity (token_set_ratio >= NAME_THRESHOLD)
3. For each multi-recipe cluster, fetch full bodies + build a summary
(slug, name, source_url, ingredient_summary, step_count, yields)
4. Ask Sonnet via forge.recipe_dedupe_decision get
{duplicates, canonical_slug, delete_slugs, reason}
5. Persist the proposal
Apply:
1. For each approved proposal where duplicates=true:
2. DELETE each delete_slug via Mealie API
3. Mark proposal applied; on any failure record + continue
Same daemon-thread / cancel-respect / stuck-recovery pattern as the
sterilize and consolidate runners.
"""
from __future__ import annotations
import json
import logging
import threading
from typing import Optional
from rapidfuzz import fuzz
from .db import DB
from .forge import Forge, ForgeError
from .mealie import Mealie, MealieError
log = logging.getLogger(__name__)
NAME_THRESHOLD = 85
def _household_id_for(mealie: Mealie) -> str | None:
me = mealie.who_am_i()
hid = me.get("householdId") or me.get("household_id")
if not hid:
h = me.get("household")
if isinstance(h, dict):
hid = h.get("id")
return hid
def _all_recipes(mealie: Mealie) -> list[dict]:
out: list[dict] = []
page = 1
while page <= 50:
resp = mealie.list_recipes(page=page, per_page=100)
items = resp.get("items") or []
for item in items:
out.append(item)
tp = resp.get("total_pages") or resp.get("totalPages") or 1
if not items or page >= tp:
break
page += 1
return out
def _recipe_household_id(recipe: dict) -> str | None:
hid = recipe.get("householdId") or recipe.get("household_id")
if hid:
return hid
h = recipe.get("household")
if isinstance(h, dict):
return h.get("id")
return None
def _filter_to_household(recipes: list[dict], household_id: str) -> list[dict]:
if not household_id:
return recipes
out = []
for r in recipes:
hh = _recipe_household_id(r)
if not hh or hh == household_id:
out.append(r)
return out
def _cluster_by_name(recipes: list[dict], threshold: int = NAME_THRESHOLD) -> list[list[dict]]:
"""Single-link agglomerative on rapidfuzz token_set_ratio. Returns
clusters of size >= 2. ~250 recipes = ~30K comparisons, runs instantly."""
n = len(recipes)
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
names = [(r.get("name") or "").strip().lower() for r in recipes]
for i in range(n):
if not names[i]:
continue
for j in range(i + 1, n):
if not names[j]:
continue
score = fuzz.token_set_ratio(names[i], names[j])
if score >= threshold:
union(i, j)
groups: dict[int, list[dict]] = {}
for i in range(n):
r = find(i)
groups.setdefault(r, []).append(recipes[i])
return [g for g in groups.values() if len(g) >= 2]
def _summarize_recipe(full: dict) -> dict:
"""Build the lean summary we hand to Sonnet for the dedupe decision."""
ings = full.get("recipeIngredient") or []
ing_list: list[str] = []
for i in ings[:30]:
food = (i.get("food") or {}).get("name") if isinstance(i.get("food"), dict) else None
if food:
ing_list.append(food)
else:
note = (i.get("note") or "").strip()
if note:
ing_list.append(note[:60])
return {
"slug": full.get("slug"),
"name": full.get("name"),
"source_url": full.get("orgURL") or full.get("originalUrl") or "",
"ingredient_summary": ing_list,
"step_count": len(full.get("recipeInstructions") or []),
"yields": (full.get("recipeYield") or "").strip(),
}
def _cluster_key(cluster: list[dict]) -> str:
slugs = sorted((r.get("slug") or "") for r in cluster)
return "|".join(slugs)[:255]
def run_walk(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> None:
log.info("[dedupe-recipes:%s] walk start", job_id)
def _cancelled() -> bool:
s = db.get_recipe_dedupe_job_state(job_id)
return s in ("cancelled", "failed", "done")
try:
hh = _household_id_for(mealie)
slim = _filter_to_household(_all_recipes(mealie), hh)
log.info("[dedupe-recipes:%s] household=%s recipes=%d", job_id, hh, len(slim))
clusters_slim = _cluster_by_name(slim)
log.info("[dedupe-recipes:%s] name-clusters≥2: %d", job_id, len(clusters_slim))
with db.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_recipe_dedupe_jobs SET total_clusters=%s WHERE id=%s",
(len(clusters_slim), job_id),
)
for slim_cluster in clusters_slim:
if _cancelled():
log.info("[dedupe-recipes:%s] walk aborted (state changed)", job_id)
return
# Fetch full recipe bodies for this cluster (cluster-sized,
# not catalog-sized — cheap)
full_cluster: list[dict] = []
for r in slim_cluster:
slug = r.get("slug")
if not slug:
continue
try:
full_cluster.append(mealie.get_recipe(slug))
except MealieError as e:
log.warning("[dedupe-recipes:%s] get_recipe(%s): %s", job_id, slug, e)
if len(full_cluster) < 2:
continue
key = _cluster_key(full_cluster)
db.update_recipe_dedupe_job_progress(job_id, current_cluster=key[:80])
summaries = [_summarize_recipe(r) for r in full_cluster]
try:
decision = forge.recipe_dedupe_decision(summaries)
except (ForgeError, RuntimeError) as e:
msg = str(e)[:500]
log.warning("[dedupe-recipes:%s] dedupe_decision: %s", job_id, msg)
db.insert_recipe_dedupe_proposal(
job_id=job_id, cluster_key=key, cluster=summaries,
decision=None, error=msg,
)
db.update_recipe_dedupe_job_progress(job_id, error_delta=1, last_error=msg)
continue
db.insert_recipe_dedupe_proposal(
job_id=job_id, cluster_key=key, cluster=summaries,
decision=decision, error=None,
)
db.update_recipe_dedupe_job_progress(job_id, processed_delta=1)
db.finalize_recipe_dedupe_job(job_id, state="review")
log.info("[dedupe-recipes:%s] walk done; awaiting review", job_id)
except Exception:
log.exception("[dedupe-recipes:%s] walk crashed", job_id)
try:
db.finalize_recipe_dedupe_job(job_id, state="failed")
except Exception:
pass
def run_apply(*, db: DB, job_id: int, mealie: Mealie) -> None:
log.info("[dedupe-recipes:%s] apply start", job_id)
def _cancelled() -> bool:
s = db.get_recipe_dedupe_job_state(job_id)
return s in ("cancelled", "failed", "done")
try:
approved = db.list_approved_unapplied_recipe_dedupe(job_id)
for row in approved:
if _cancelled():
log.info("[dedupe-recipes:%s] apply aborted", job_id)
return
decision = row.get("sonnet_decision") or {}
if isinstance(decision, str):
try:
decision = json.loads(decision)
except Exception:
decision = {}
if not decision.get("duplicates"):
db.mark_recipe_dedupe_proposal_applied(
row["id"], error="cluster decision was 'not duplicates' but row was approved",
)
continue
delete_slugs = decision.get("delete_slugs") or []
if not delete_slugs:
db.mark_recipe_dedupe_proposal_applied(
row["id"], error="missing delete_slugs",
)
continue
db.update_recipe_dedupe_job_progress(
job_id, current_cluster=row.get("cluster_key", "")[:80]
)
err: str | None = None
for slug in delete_slugs:
try:
mealie.delete_recipe(slug)
db.update_recipe_dedupe_job_progress(job_id, deleted_delta=1)
except MealieError as e:
err = f"delete {slug}: {e}"
log.warning("[dedupe-recipes:%s] %s", job_id, err)
break
if err:
db.mark_recipe_dedupe_proposal_applied(row["id"], error=err)
db.update_recipe_dedupe_job_progress(
job_id, error_delta=1, last_error=err,
)
else:
db.mark_recipe_dedupe_proposal_applied(row["id"])
db.finalize_recipe_dedupe_job(job_id, state="done")
log.info("[dedupe-recipes:%s] apply done", job_id)
except Exception:
log.exception("[dedupe-recipes:%s] apply crashed", job_id)
try:
db.finalize_recipe_dedupe_job(job_id, state="failed")
except Exception:
pass
def spawn_walk_thread(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> threading.Thread:
t = threading.Thread(
target=run_walk,
kwargs={"db": db, "job_id": job_id, "mealie": mealie, "forge": forge},
name=f"dedupe-recipes-walk-{job_id}",
daemon=True,
)
t.start()
return t
def spawn_apply_thread(*, db: DB, job_id: int, mealie: Mealie) -> threading.Thread:
t = threading.Thread(
target=run_apply,
kwargs={"db": db, "job_id": job_id, "mealie": mealie},
name=f"dedupe-recipes-apply-{job_id}",
daemon=True,
)
t.start()
return t

View file

@ -215,6 +215,67 @@ class Forge:
)
def recipe_dedupe_decision(
self, recipes: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named recipes are
actually duplicates (same recipe imported twice / hand-copied with
a slight title tweak / etc) versus distinct recipes that just
happen to look similar by name.
Input: list of recipe summaries {slug, name, source_url,
ingredient_summary (concise list), step_count, yields}.
Returns:
{"duplicates": bool,
"canonical_slug": "<slug to keep>",
"delete_slugs": ["<slug>", ...],
"reason": "<one-line explanation>"}
duplicates=false means the cluster is a false positive and nothing
should be deleted. canonical_slug + delete_slugs must be empty in
that case. Be conservative when in doubt return false."""
items = [
{
"slug": r.get("slug"),
"name": r.get("name"),
"source_url": r.get("source_url") or "",
"ingredient_summary": r.get("ingredient_summary") or [],
"step_count": r.get("step_count") or 0,
"yields": r.get("yields") or "",
}
for r in recipes
]
prompt = (
"You are deciding whether a cluster of similar-named recipes "
"are actual duplicates (same recipe imported or hand-copied "
"twice) or distinct recipes that share words in the title.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"duplicates": true|false, '
'"canonical_slug": "<slug to keep, or empty>", '
'"delete_slugs": ["<slug>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- duplicates=true ONLY when the recipes are clearly the same "
" dish prepared the same way (matching ingredient sets, similar "
" step counts, often shared source_url). Slight title variations "
" ('Banana Bread' vs 'Best Banana Bread') with same body = dupes.\n"
"- Pick canonical_slug = the recipe with the cleanest name, the "
" most complete data (more steps + yields filled in beats less). "
" When tied, pick the older one (lexicographic slug order is fine "
" since Mealie slugs include date-ish suffixes for dupes).\n"
"- delete_slugs = the OTHER cluster members. Mealie DELETE removes "
" them permanently — only suggest deletion when you're confident.\n"
"- duplicates=false when ingredient sets differ meaningfully, OR "
" when names suggest distinct dishes ('Chicken Stir Fry' vs "
" 'Chicken Fajitas'), OR when you genuinely cannot tell.\n"
"- Be CONSERVATIVE — false negatives are recoverable (recipes "
" stay), false positives delete data."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_recipe_dedupe_decision(result)
def cluster_decision(
self, foods: list[dict], *, model: str | None = None
) -> dict:
@ -310,6 +371,32 @@ class Forge:
return _extract_food_info(result)
def _extract_recipe_dedupe_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe dedupe decision not a dict: {str(inner)[:200]}")
duplicates = bool(inner.get("duplicates"))
canonical_slug = str(inner.get("canonical_slug") or "")
delete_raw = inner.get("delete_slugs") or []
delete_slugs = [str(x) for x in delete_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not duplicates:
canonical_slug = ""
delete_slugs = []
return {
"duplicates": duplicates,
"canonical_slug": canonical_slug,
"delete_slugs": delete_slugs,
"reason": reason,
}
def _extract_cluster_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")

View file

@ -105,6 +105,21 @@ class Mealie:
def update_recipe(self, slug: str, body: dict) -> dict:
return self._put(f"/api/recipes/{slug}", body)
def delete_recipe(self, slug: str) -> dict:
"""DELETE /api/recipes/<slug>. Permanently removes the recipe and
its recipe_ingredient rows. Permission-scoped per-household.
Returns Mealie's response dict (often the deleted recipe summary)."""
try:
r = self.session.delete(f"{self.base_url}/api/recipes/{slug}", timeout=30)
except requests.RequestException as e:
raise MealieError(f"DELETE /api/recipes/{slug} transport: {e}") from e
if r.status_code >= 400:
raise MealieError(f"DELETE /api/recipes/{slug} -> {r.status_code}: {r.text[:300]}")
try:
return r.json()
except Exception:
return {}
# --- foods / units ------------------------------------------------------
def list_foods(self, *, search: str | None = None, per_page: int = 200) -> dict:

View file

@ -33,7 +33,7 @@ from .config import load
from .crypto import TokenCrypto
from .db import DB
from .forge import Forge, ForgeError
from . import aggregator, bulk_sterilize, consolidate_foods, foods
from . import aggregator, bulk_sterilize, consolidate_foods, dedupe_recipes, foods
from .mealie import Mealie, MealieError
from .oidc import init_oauth
from .recipe_index import flatten_recipe, refresh_household_index, search_index
@ -118,6 +118,13 @@ def create_app() -> Flask:
except Exception as e:
app.logger.warning("consolidate stuck-job recovery failed: %s", e)
try:
n_failed = db.fail_stuck_recipe_dedupe_jobs(stale_minutes=15)
if n_failed:
app.logger.info("failed %d stuck recipe-dedupe jobs at boot", n_failed)
except Exception as e:
app.logger.warning("recipe-dedupe stuck-job recovery failed: %s", e)
oauth = init_oauth(
app,
issuer=cfg.oidc_issuer,
@ -1023,6 +1030,139 @@ def create_app() -> Flask:
db.finalize_sterilize_job(job_id, state="cancelled")
return jsonify({"ok": True})
# ---------- recipe dedupe ------------------------------------------
@app.get("/dedupe-recipes")
@require_session
def dedupe_recipes_page():
hid = current_household_id()
if not hid:
return redirect(url_for("connect_mealie_get"))
latest = db.latest_recipe_dedupe_job_for_household(hid)
return render_template(
"dedupe_recipes.html", active="dedupe", latest_job=latest,
)
@app.post("/api/recipes/dedupe-start")
@require_session
def dedupe_recipes_start():
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
active = db.running_recipe_dedupe_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
client = current_user_mealie()
if client is None:
return redirect(url_for("connect_mealie_get"))
job_id = db.create_recipe_dedupe_job(household_id=hid, started_by_sub=u["sub"])
dedupe_recipes.spawn_walk_thread(db=db, job_id=job_id, mealie=client, forge=forge)
return jsonify({"ok": True, "job_id": job_id})
@app.get("/api/recipes/dedupe-status")
@require_session
def dedupe_recipes_status():
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.latest_recipe_dedupe_job_for_household(hid)
if not job:
return jsonify({"job": None})
return jsonify({"job": _consolidate_job_payload(job)})
@app.get("/api/recipes/dedupe-jobs/<int:job_id>/proposals")
@require_session
def dedupe_recipes_proposals(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_recipe_dedupe_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
rows = db.list_recipe_dedupe_proposals(job_id)
for p in rows:
for k in ("cluster_json", "sonnet_decision"):
v = p.get(k)
if isinstance(v, str):
try:
p[k] = _json_loads(v)
except Exception:
p[k] = None
return jsonify({
"job": _consolidate_job_payload(job),
"proposals": rows,
})
@app.post("/api/recipes/dedupe-apply/<int:job_id>")
@require_session
def dedupe_recipes_apply(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_recipe_dedupe_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] != "review":
return jsonify({"error": f"bad_state:{job['state']}"}), 409
body = request.get_json(silent=True) or {}
approved_ids_raw = body.get("approved_ids") or []
approved_ids = [int(x) for x in approved_ids_raw if isinstance(x, (int, str)) and str(x).isdigit()]
client = current_user_mealie()
if client is None:
return redirect(url_for("connect_mealie_get"))
db.bulk_set_recipe_dedupe_approvals(job_id, approved_ids)
db.finalize_recipe_dedupe_job(job_id, state="applying")
dedupe_recipes.spawn_apply_thread(db=db, job_id=job_id, mealie=client)
return jsonify({"ok": True, "approved_count": len(approved_ids)})
@app.post("/api/recipes/dedupe-cancel/<int:job_id>")
@require_session
def dedupe_recipes_cancel(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_recipe_dedupe_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("running", "review", "applying"):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
db.finalize_recipe_dedupe_job(job_id, state="cancelled")
return jsonify({"ok": True})
@app.post("/api/admin/recipes/dedupe-start")
@require_bearer
def admin_dedupe_recipes_start():
body = request.get_json(silent=True) or {}
sub = (body.get("started_by_sub") or "").strip()
if not sub:
return jsonify({"error": "started_by_sub required"}), 400
hid = db.get_user_household_id(sub)
if not hid:
return jsonify({"error": "user has no household"}), 404
active = db.running_recipe_dedupe_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
blob = db.get_user_mealie_token_blob(sub)
if not blob:
return jsonify({"error": "user_not_connected_to_mealie"}), 409
try:
tok = crypto.decrypt(blob)
except Exception:
return jsonify({"error": "user_token_undecryptable"}), 500
mealie = Mealie(base_url=cfg.mealie_api_url, api_token=tok)
job_id = db.create_recipe_dedupe_job(household_id=hid, started_by_sub=sub)
dedupe_recipes.spawn_walk_thread(db=db, job_id=job_id, mealie=mealie, forge=forge)
return jsonify({"ok": True, "job_id": job_id})
@app.get("/api/admin/recipes/dedupe-jobs/<int:job_id>")
@require_bearer
def admin_dedupe_recipes_status(job_id: int):
job = db.get_recipe_dedupe_job(job_id)
if not job:
return jsonify({"error": "not_found"}), 404
return jsonify({"job": _consolidate_job_payload(job)})
# ---------- foods consolidator (Step 3) ------------------------------
@app.get("/consolidate")

View file

@ -0,0 +1,337 @@
{% extends "_base.html" %}
{% block title %}Dedupe Recipes · Cauldron{% endblock %}
{% block content %}
<style>
.progress-rail { width:100%; height:14px; background:var(--bg-2);
border:1px solid var(--line); border-radius:8px; overflow:hidden;
margin:12px 0 6px 0; }
.progress-fill { height:100%;
background:linear-gradient(90deg, var(--purple-deep), var(--purple-bright));
transition:width .3s ease; box-shadow:0 0 12px -2px var(--purple-glow); }
.progress-meta { color:var(--bone-dim); font-family:var(--mono); font-size:12px;
letter-spacing:.1em; display:flex; gap:18px; flex-wrap:wrap; }
.progress-meta strong { color:var(--bone); }
.review-bar { position:sticky; top:70px; z-index:5; display:flex;
align-items:center; justify-content:space-between; flex-wrap:wrap; gap:12px;
padding:12px 14px; background:var(--bg-2); border:1px solid var(--line);
border-radius:8px; margin-bottom:14px; }
.review-bar .left { display:flex; gap:14px; align-items:center; }
.cluster-card { background:var(--surface); border:1px solid var(--line);
border-left:3px solid var(--purple-dim); border-radius:6px;
padding:12px 14px; margin-bottom:10px; }
.cluster-card.approved { border-left-color: var(--green-bright); }
.cluster-card.rejected { border-left-color: var(--muted); opacity: .55; }
.cluster-card.no-merge { border-left-color: var(--bone-dim); }
.cluster-card.errored { border-left-color: var(--crit); }
.cluster-head { display:flex; align-items:center; justify-content:space-between;
gap:10px; flex-wrap:wrap; }
.cluster-name { color:var(--bone); font-family:var(--serif); font-size:1.05em; }
.cluster-meta { color:var(--muted); font-family:var(--mono); font-size:11px;
letter-spacing:.15em; text-transform:uppercase; }
.toggle { display:inline-flex; align-items:center; gap:6px; padding:4px 10px;
border-radius:999px; border:1px solid var(--line); background:var(--bg-2);
color:var(--bone-dim); font-family:var(--mono); font-size:11px;
letter-spacing:.1em; cursor:pointer; min-height:28px; }
.toggle.on { background:rgba(232,96,106,.15); border-color:rgba(232,96,106,.3); color:var(--crit); }
.toggle.off { background:rgba(110,168,72,.18); border-color:var(--green-dim); color:var(--green-bright); }
.toggle:disabled { opacity:.4; cursor:not-allowed; }
.recipe-row { display:grid; grid-template-columns: 1fr; gap:8px; margin-top:10px; }
@media (min-width: 720px) { .recipe-row { grid-template-columns: 1fr 1fr; } }
.rcell { padding:10px; border:1px solid var(--line); border-radius:4px; background:var(--bg-2); }
.rcell.canonical { border-color: var(--purple-dim); background:rgba(155,95,232,.08); }
.rcell.canonical::before { content:"★ KEEP"; color:var(--purple-bright); font-family:var(--mono); font-size:10px; letter-spacing:.2em; display:block; margin-bottom:4px; }
.rcell.delete { border-color: rgba(232,96,106,.4); background:rgba(232,96,106,.06); }
.rcell.delete::before { content:"× DELETE"; color:var(--crit); font-family:var(--mono); font-size:10px; letter-spacing:.2em; display:block; margin-bottom:4px; }
.rcell .rname { font-family:var(--serif); font-size:1em; color:var(--bone); }
.rcell .rmeta { color:var(--muted); font-family:var(--mono); font-size:10px; letter-spacing:.1em; margin-top:4px; }
.rcell .ringing { color:var(--bone-dim); font-size:.85em; margin-top:6px; line-height:1.3; max-height:48px; overflow:hidden; }
.rcell a { color:var(--purple-bright); }
.reason { color:var(--bone-dim); font-style:italic; font-size:.9em; margin-top:8px; }
.empty-state { padding:28px 14px; text-align:center; color:var(--bone-dim); }
</style>
<div class="page-head">
<div class="crumb">// dedupe · find duplicate recipes</div>
<h1>recipe <span class="accent">dedupe</span></h1>
<div class="lede">
scan your household recipes for duplicates by name similarity. sonnet
looks at ingredients + step counts + source URLs to decide whether
similar-named recipes are actually the same dish. you confirm per
cluster — DELETE in Mealie is permanent.
</div>
</div>
<section class="panel" id="dedupe-shell">
<div class="panel-head">
<h2>state</h2>
<span class="pill" id="state-pill">loading…</span>
<span class="ctx" id="state-ctx"></span>
</div>
<div id="empty-pane" style="display:none;">
<p>no run yet. scan now?</p>
<button class="btn btn-purple" id="start-btn" type="button" onclick="startRun()">🪄 scan recipes for duplicates</button>
<p class="muted" style="margin-top:8px;">walks your household recipes, clusters by name similarity, asks sonnet which clusters are real dupes. apply path uses Mealie's DELETE endpoint — irreversible. user-confirms per cluster.</p>
</div>
<div id="progress-pane" style="display:none;">
<div class="progress-rail"><div class="progress-fill" id="bar" style="width:0%;"></div></div>
<div class="progress-meta">
<span><strong id="processed">0</strong> processed</span>
<span><strong id="deleted">0</strong> deleted</span>
<span><strong id="errors">0</strong> errors</span>
<span>of <strong id="total">?</strong> clusters</span>
<span class="muted" id="current-cluster"></span>
</div>
<div class="btn-row" style="margin-top:12px;">
<button class="btn" type="button" onclick="cancelJob()">cancel</button>
</div>
</div>
<div id="review-pane" style="display:none;">
<div class="review-bar">
<div class="left">
<span><strong id="approved-count">0</strong> selected</span>
<span class="muted" id="review-meta"></span>
</div>
<div class="right">
<button class="btn" type="button" onclick="setAll(true)">approve all dupes</button>
<button class="btn" type="button" onclick="setAll(false)">clear</button>
<button class="btn btn-purple" type="button" id="apply-btn" onclick="applyApproved()">delete selected →</button>
</div>
</div>
<div id="proposals-grid"></div>
</div>
<div id="done-pane" style="display:none;">
<p id="done-line"></p>
<button class="btn btn-purple" type="button" onclick="startRun()">↻ start new scan</button>
</div>
<div id="failed-pane" style="display:none;">
<p style="color:var(--crit);" id="failed-line"></p>
<button class="btn btn-purple" type="button" onclick="startRun()">↻ retry</button>
</div>
</section>
<script>
let job = {{ (latest_job | tojson) if latest_job else 'null' }};
let pollTimer = null;
let proposals = [];
function $(id){return document.getElementById(id);}
function showPane(name){
for(const p of ['empty','progress','review','done','failed']){
$(`${p}-pane`).style.display = (p === name) ? '' : 'none';
}
}
function setStatePill(text, klass){
const el = $('state-pill'); el.textContent = text;
el.className = 'pill ' + (klass || 'pill-mute');
}
function paint(){
if(!job) return;
const total = job.total_clusters || 0;
const done = (job.processed_count || 0) + (job.error_count || 0);
const pct = total>0 ? Math.round((done/total)*100) : 0;
$('bar').style.width = pct+'%';
$('processed').textContent = job.processed_count || 0;
$('deleted').textContent = job.deleted_count || 0;
$('errors').textContent = job.error_count || 0;
$('total').textContent = total || '?';
$('current-cluster').textContent = job.current_cluster ? `· ${job.current_cluster.slice(0,40)}…` : '';
}
async function fetchJob(){
try {
const r = await fetch('/api/recipes/dedupe-status');
const d = await r.json();
job = d.job || null; route();
} catch(e){ console.error('status poll failed', e); }
}
function route(){
if(!job){ stopPoll(); setStatePill('idle','pill-mute'); $('state-ctx').textContent=''; showPane('empty'); return; }
$('state-ctx').textContent = `started ${new Date(job.started_at).toLocaleString()}`;
const s = job.state;
if(s === 'running'){ setStatePill('walking','pill-ok'); paint(); showPane('progress'); startPoll(); }
else if(s === 'review'){ setStatePill('review','pill-ok'); paint(); showPane('review'); stopPoll(); loadProposals(); }
else if(s === 'applying'){ setStatePill('applying','pill-ok'); paint(); showPane('progress'); startPoll(); }
else if(s === 'done'){ setStatePill('done','pill-mute');
const m = job.deleted_count || 0;
$('done-line').textContent = `deleted ${m} duplicate recipe${m===1?'':'s'}.`;
showPane('done'); stopPoll(); }
else if(s === 'failed'){ setStatePill('failed','pill-mute'); $('failed-line').textContent = job.last_error || 'job failed'; showPane('failed'); stopPoll(); }
else if(s === 'cancelled'){ setStatePill('cancelled','pill-mute'); $('done-line').textContent='job cancelled.'; showPane('done'); stopPoll(); }
}
function startPoll(){ if(pollTimer) return; pollTimer = setInterval(fetchJob, 2000); }
function stopPoll(){ if(pollTimer){ clearInterval(pollTimer); pollTimer=null; } }
async function startRun(){
const btn = $('start-btn');
if(btn){ btn.disabled = true; btn.textContent = 'kicking off…'; }
try {
const r = await fetch('/api/recipes/dedupe-start',{method:'POST'});
if(!r.ok){ const j = await r.json().catch(()=>({})); throw new Error(j.error || r.status); }
await fetchJob();
} catch(e){
alert('start failed: ' + e.message);
if(btn){ btn.disabled = false; btn.textContent = '🪄 scan recipes for duplicates'; }
}
}
async function cancelJob(){
if(!job) return;
if(!confirm('cancel?')) return;
try { await fetch('/api/recipes/dedupe-cancel/'+job.id,{method:'POST'}); await fetchJob(); }
catch(e){ alert('cancel failed: '+e.message); }
}
async function loadProposals(){
if(!job) return;
try {
const r = await fetch('/api/recipes/dedupe-jobs/'+job.id+'/proposals');
const d = await r.json();
proposals = d.proposals || [];
for(const p of proposals){
const dec = p.sonnet_decision || {};
if(p.approved === null || p.approved === undefined){
// Default to NOT approved — recipe deletion is destructive,
// user must opt-in per cluster
p.approved = false;
}
}
renderProposals();
} catch(e){ console.error('proposals load failed', e); }
}
function renderProposals(){
const grid = $('proposals-grid');
grid.innerHTML = '';
if(!proposals.length){
grid.innerHTML = '<div class="empty-state">no clusters found — your recipes are unique.</div>';
$('apply-btn').disabled = true; return;
}
let approvedCount = 0;
for(const p of proposals){
if(p.approved) approvedCount++;
grid.appendChild(renderOne(p));
}
$('approved-count').textContent = approvedCount;
const total = proposals.length;
const noDup = proposals.filter(p=>!(p.sonnet_decision||{}).duplicates).length;
$('review-meta').textContent = `· ${total} clusters, ${noDup} flagged not-duplicates`;
$('apply-btn').disabled = approvedCount === 0;
}
function escapeHtml(s){return String(s).replace(/[&<>"']/g, m=>({'&':'&amp;','<':'&lt;','>':'&gt;','"':'&quot;',"'":"&#39;"}[m]));}
function renderOne(p){
const card = document.createElement('div');
card.className = 'cluster-card';
const dec = p.sonnet_decision || {};
const cluster = p.cluster_json || [];
if(p.apply_error) card.classList.add('errored');
else if(!dec.duplicates) card.classList.add('no-merge');
else if(p.approved) card.classList.add('approved');
else card.classList.add('rejected');
const head = document.createElement('div');
head.className = 'cluster-head';
const left = document.createElement('div');
const nm = document.createElement('div');
nm.className = 'cluster-name';
nm.textContent = (cluster[0] && cluster[0].name) || `cluster of ${cluster.length}`;
const meta = document.createElement('div');
meta.className = 'cluster-meta';
meta.textContent = dec.duplicates
? `delete ${(dec.delete_slugs||[]).length}, keep 1`
: `keep all ${cluster.length} (sonnet says distinct)`;
left.appendChild(nm); left.appendChild(meta);
const tog = document.createElement('button');
tog.type = 'button';
tog.className = 'toggle ' + (p.approved ? 'on' : 'off');
tog.textContent = p.approved ? 'will delete' : (dec.duplicates ? 'skip' : 'keep all');
tog.disabled = !dec.duplicates;
tog.onclick = () => flip(p, card, tog);
head.appendChild(left); head.appendChild(tog);
card.appendChild(head);
const row = document.createElement('div');
row.className = 'recipe-row';
const canonSlug = dec.canonical_slug;
const deleteSet = new Set(dec.delete_slugs || []);
for(const r of cluster){
const cell = document.createElement('div');
cell.className = 'rcell';
if(r.slug === canonSlug && dec.duplicates) cell.classList.add('canonical');
else if(deleteSet.has(r.slug)) cell.classList.add('delete');
const link = r.source_url ? ` <a href="${escapeHtml(r.source_url)}" target="_blank"></a>` : '';
cell.innerHTML = `
<div class="rname">${escapeHtml(r.name || r.slug)}${link}</div>
<div class="rmeta">${escapeHtml(r.slug)} · ${r.step_count} steps · ${escapeHtml(r.yields || 'no yield set')}</div>
<div class="ringing">${escapeHtml((r.ingredient_summary||[]).slice(0,8).join(', ')) || 'no ingredients'}</div>
`;
row.appendChild(cell);
}
card.appendChild(row);
if(dec.reason){
const rs = document.createElement('div');
rs.className = 'reason';
rs.textContent = dec.reason;
card.appendChild(rs);
}
if(p.apply_error){
const e = document.createElement('div');
e.className = 'reason';
e.style.color = 'var(--crit)';
e.textContent = 'apply error: ' + p.apply_error;
card.appendChild(e);
}
return card;
}
function flip(p, card, tog){
const dec = p.sonnet_decision || {};
if(!dec.duplicates) return;
p.approved = !p.approved;
card.classList.toggle('approved', p.approved);
card.classList.toggle('rejected', !p.approved);
tog.classList.toggle('on', p.approved);
tog.classList.toggle('off', !p.approved);
tog.textContent = p.approved ? 'will delete' : 'skip';
const cnt = proposals.filter(x => x.approved).length;
$('approved-count').textContent = cnt;
$('apply-btn').disabled = cnt === 0;
}
function setAll(on){
for(const p of proposals){
const dec = p.sonnet_decision || {};
if(dec.duplicates) p.approved = on;
}
renderProposals();
}
async function applyApproved(){
const ids = proposals.filter(p => p.approved && (p.sonnet_decision||{}).duplicates).map(p => p.id);
if(!ids.length) return;
const total_to_delete = proposals
.filter(p => p.approved && (p.sonnet_decision||{}).duplicates)
.reduce((sum, p) => sum + ((p.sonnet_decision||{}).delete_slugs || []).length, 0);
if(!confirm(`PERMANENTLY DELETE ${total_to_delete} recipe${total_to_delete===1?'':'s'} from mealie? this cannot be undone.`)) return;
const btn = $('apply-btn'); btn.disabled = true; btn.textContent = 'deleting…';
try {
const r = await fetch('/api/recipes/dedupe-apply/'+job.id, {
method:'POST', headers:{'Content-Type':'application/json'},
body: JSON.stringify({approved_ids: ids}),
});
if(!r.ok){ const j = await r.json().catch(()=>({})); throw new Error(j.error || r.status); }
await fetchJob();
} catch(e){
alert('apply failed: '+e.message);
btn.disabled = false; btn.textContent = 'delete selected →';
}
}
route();
if(job && (job.state === 'running' || job.state === 'applying')) startPoll();
</script>
{% endblock %}

View file

@ -59,6 +59,9 @@
<p><a class="btn" href="/sterilize">🪄 bulk sterilize recipes →</a></p>
<p class="muted" style="margin-top:14px;">scan your foods table for dupes, ask sonnet to pick canonicals, merge in mealie. one-time cleanup; aliases get attached to the survivors so the parser fuzzy-matches variants from now on.</p>
<p><a class="btn" href="/consolidate">🔮 consolidate foods table →</a></p>
<p class="muted" style="margin-top:14px;">find duplicate recipes by name + ingredient similarity. sonnet picks the canonical to keep; you confirm per cluster before mealie deletes the others. permanent — review carefully.</p>
<p><a class="btn" href="/dedupe-recipes">🌀 dedupe recipes →</a></p>
</section>
{% endif %}