Step 3: foods consolidator — cluster + merge dupes via Mealie's API
New bulk job that scans Mealie's foods table for duplicate-feeling
clusters, asks Sonnet to pick the canonical survivor + flag the rest
as merge candidates, and uses Mealie's PUT /api/foods/merge to
consolidate. After each successful merge, alias_additions get pushed
onto the survivor so Mealie's CRF/NLP parser fuzzy-matches the
discarded variant names from then on.
Architecture mirrors bulk_sterilize.py:
- Migrations 018+019 add cauldron_consolidate_jobs +
cauldron_consolidate_proposals (state machine: running → review →
applying → done/failed/cancelled)
- New consolidate_foods.py — daemon-thread runner with cancel-respect
and stuck-job recovery
- /api/foods/consolidate-{start,status,jobs,apply,cancel} for session
users + /api/admin/foods/consolidate-{start,jobs,cancel} for kayos
Sonnet integration:
- forge.cluster_decision(foods) → returns {merge, canonical_id,
canonical_name, discard_ids, alias_additions, reason}
- Conservative-by-default: when in doubt Sonnet returns merge=false
(the "olive oil vs olive" false-positive case from the prompt)
- Alias rules in the prompt explain why we want discarded names to
travel back to the survivor as aliases (parser future-proofing)
Mealie integration:
- mealie.merge_foods(from_id, to_id) → PUT /api/foods/merge
- mealie.update_food(food_id, body) → for pushing aliases onto the
survivor after merges land
- Apply path catches 403/permission errors and surfaces them as the
per-cluster apply_error (cross-household merge attempts will fail
here, same way as the sterilize cross-household path)
Clustering:
- rapidfuzz token_set_ratio ≥ 88 (slightly stricter than Mealie's
parser threshold of 85 to reduce false-positive clusters)
- Single-link agglomerative — O(n²) but Cobb's ~3000 foods = ~9M
comparisons, runs in seconds
- Singleton clusters (no merge candidates) are dropped, not stored
UI:
- /consolidate — same shape as /sterilize: progress bar → review grid
→ apply button. Cards show member chips with the canonical marked
★, discards marked × in red, alias_additions listed in green, plus
Sonnet's one-line reasoning. Mergeable approved by default; user
toggles individual clusters off if they disagree.
- Linked from /me → tools section, alongside bulk sterilize.
Total: ~600 LoC across 6 files. Foundation for the "Mealie owns
canonical names" architectural rule is now actually enforceable —
cobb runs this once, his foods table gets cleaned up, and Sonnet's
catalog-aware parser (Step 1) starts matching aliases for free.
This commit is contained in:
parent
5e62da2013
commit
69e05b1f92
7 changed files with 1141 additions and 1 deletions
300
cauldron/consolidate_foods.py
Normal file
300
cauldron/consolidate_foods.py
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
"""Foods consolidator — clusters Mealie's food rows by similarity, asks
|
||||
Sonnet which clusters are dupes, and uses Mealie's PUT /api/foods/merge
|
||||
to consolidate. Operates household-scoped via the user's Mealie token.
|
||||
|
||||
The walk:
|
||||
1. Fetch all foods via GET /api/foods (group-wide; the user's
|
||||
household admin scope is enough)
|
||||
2. Filter to the user's own household (where edit permission exists)
|
||||
3. Cluster by rapidfuzz.token_set_ratio ≥ THRESHOLD (default 88, slightly
|
||||
stricter than Mealie's parser threshold of 85 to reduce false positives)
|
||||
4. Drop singleton clusters (no merge candidate)
|
||||
5. For each cluster, call forge.cluster_decision(...) — Sonnet
|
||||
decides merge/keep + picks the canonical survivor
|
||||
6. Persist the proposal
|
||||
|
||||
The apply:
|
||||
1. For each approved cluster: PUT /api/foods/merge from each discard_id
|
||||
into canonical_id
|
||||
2. After merges, optionally PUT /api/foods/{canonical_id} with the
|
||||
accumulated aliases array so the survivor learns the variant names
|
||||
|
||||
Same daemon-thread + cancel-respecting + stuck-job-recovery pattern as
|
||||
bulk_sterilize.py.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
from rapidfuzz import fuzz, process
|
||||
|
||||
from .db import DB
|
||||
from .forge import Forge, ForgeError
|
||||
from .mealie import Mealie, MealieError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
CLUSTER_THRESHOLD = 88
|
||||
|
||||
|
||||
def _household_id_for(mealie: Mealie) -> str | None:
|
||||
me = mealie.who_am_i()
|
||||
hid = me.get("householdId") or me.get("household_id")
|
||||
if not hid:
|
||||
h = me.get("household")
|
||||
if isinstance(h, dict):
|
||||
hid = h.get("id")
|
||||
return hid
|
||||
|
||||
|
||||
def _all_foods(mealie: Mealie) -> list[dict]:
|
||||
out: list[dict] = []
|
||||
page = 1
|
||||
while page <= 20:
|
||||
resp = mealie._get("/api/foods", search="", perPage=2000, page=page)
|
||||
items = resp.get("items") or resp.get("data") or []
|
||||
for item in items:
|
||||
out.append(item)
|
||||
tp = resp.get("total_pages") or resp.get("totalPages") or 1
|
||||
if not items or page >= tp:
|
||||
break
|
||||
page += 1
|
||||
return out
|
||||
|
||||
|
||||
def _foods_in_household(mealie: Mealie, household_id: str) -> list[dict]:
|
||||
foods = _all_foods(mealie)
|
||||
if not household_id:
|
||||
return foods
|
||||
out = []
|
||||
for f in foods:
|
||||
hh = f.get("householdId") or f.get("household_id")
|
||||
if not hh:
|
||||
h = f.get("household")
|
||||
if isinstance(h, dict):
|
||||
hh = h.get("id")
|
||||
if hh and hh == household_id:
|
||||
out.append(f)
|
||||
elif not hh:
|
||||
# Some Mealie versions don't tag foods with householdId at the
|
||||
# food level — they're group-scoped instead. In that case, we
|
||||
# consolidate across the whole group (Cobb is admin so writes
|
||||
# land regardless).
|
||||
out.append(f)
|
||||
return out
|
||||
|
||||
|
||||
def _cluster(foods: list[dict], threshold: int = CLUSTER_THRESHOLD) -> list[list[dict]]:
|
||||
"""Single-link agglomerative clustering on token_set_ratio. O(n²) — fine
|
||||
for ~3000 foods (~9M comparisons). Returns clusters of size ≥ 2."""
|
||||
n = len(foods)
|
||||
parent = list(range(n))
|
||||
|
||||
def find(x):
|
||||
while parent[x] != x:
|
||||
parent[x] = parent[parent[x]]
|
||||
x = parent[x]
|
||||
return x
|
||||
|
||||
def union(a, b):
|
||||
ra, rb = find(a), find(b)
|
||||
if ra != rb:
|
||||
parent[ra] = rb
|
||||
|
||||
names = [(f.get("name") or "").strip().lower() for f in foods]
|
||||
for i in range(n):
|
||||
if not names[i]:
|
||||
continue
|
||||
for j in range(i + 1, n):
|
||||
if not names[j]:
|
||||
continue
|
||||
score = fuzz.token_set_ratio(names[i], names[j])
|
||||
if score >= threshold:
|
||||
union(i, j)
|
||||
|
||||
groups: dict[int, list[dict]] = {}
|
||||
for i in range(n):
|
||||
r = find(i)
|
||||
groups.setdefault(r, []).append(foods[i])
|
||||
return [g for g in groups.values() if len(g) >= 2]
|
||||
|
||||
|
||||
def _cluster_key(cluster: list[dict]) -> str:
|
||||
"""Stable key for the cluster — sorted name + ids."""
|
||||
names = sorted((f.get("name") or "").strip().lower() for f in cluster)
|
||||
return "|".join(names)[:255]
|
||||
|
||||
|
||||
def run_walk(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> None:
|
||||
log.info("[consolidate:%s] walk start", job_id)
|
||||
|
||||
def _cancelled() -> bool:
|
||||
s = db.get_consolidate_job_state(job_id)
|
||||
return s in ("cancelled", "failed", "done")
|
||||
|
||||
try:
|
||||
hh = _household_id_for(mealie)
|
||||
foods = _foods_in_household(mealie, hh)
|
||||
log.info("[consolidate:%s] household=%s foods=%d", job_id, hh, len(foods))
|
||||
|
||||
clusters = _cluster(foods)
|
||||
log.info("[consolidate:%s] clusters≥2: %d", job_id, len(clusters))
|
||||
|
||||
with db.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE cauldron_consolidate_jobs SET total_clusters=%s WHERE id=%s",
|
||||
(len(clusters), job_id),
|
||||
)
|
||||
|
||||
for cluster in clusters:
|
||||
if _cancelled():
|
||||
log.info("[consolidate:%s] walk aborted (state changed)", job_id)
|
||||
return
|
||||
key = _cluster_key(cluster)
|
||||
db.update_consolidate_job_progress(job_id, current_cluster=key[:80])
|
||||
|
||||
try:
|
||||
decision = forge.cluster_decision(cluster)
|
||||
except (ForgeError, RuntimeError) as e:
|
||||
msg = str(e)[:500]
|
||||
log.warning("[consolidate:%s] cluster_decision(%s): %s", job_id, key[:60], msg)
|
||||
db.insert_consolidate_proposal(
|
||||
job_id=job_id, cluster_key=key,
|
||||
cluster=cluster, decision=None, error=msg,
|
||||
)
|
||||
db.update_consolidate_job_progress(job_id, error_delta=1, last_error=msg)
|
||||
continue
|
||||
|
||||
db.insert_consolidate_proposal(
|
||||
job_id=job_id, cluster_key=key,
|
||||
cluster=cluster, decision=decision, error=None,
|
||||
)
|
||||
db.update_consolidate_job_progress(job_id, processed_delta=1)
|
||||
|
||||
db.finalize_consolidate_job(job_id, state="review")
|
||||
log.info("[consolidate:%s] walk done; awaiting review", job_id)
|
||||
except Exception:
|
||||
log.exception("[consolidate:%s] walk crashed", job_id)
|
||||
try:
|
||||
db.finalize_consolidate_job(job_id, state="failed")
|
||||
except Exception:
|
||||
log.exception("[consolidate:%s] couldn't mark failed", job_id)
|
||||
|
||||
|
||||
def run_apply(*, db: DB, job_id: int, mealie: Mealie) -> None:
|
||||
log.info("[consolidate:%s] apply start", job_id)
|
||||
|
||||
def _cancelled() -> bool:
|
||||
s = db.get_consolidate_job_state(job_id)
|
||||
return s in ("cancelled", "failed", "done")
|
||||
|
||||
try:
|
||||
approved = db.list_approved_unapplied_consolidate(job_id)
|
||||
for row in approved:
|
||||
if _cancelled():
|
||||
log.info("[consolidate:%s] apply aborted (state changed)", job_id)
|
||||
return
|
||||
decision = row.get("sonnet_decision") or {}
|
||||
if isinstance(decision, str):
|
||||
try:
|
||||
decision = json.loads(decision)
|
||||
except Exception:
|
||||
decision = {}
|
||||
|
||||
if not decision.get("merge"):
|
||||
# User shouldn't have approved a non-merge cluster; defensive
|
||||
db.mark_consolidate_proposal_applied(
|
||||
row["id"], error="cluster decision was 'no merge' but row was approved",
|
||||
)
|
||||
continue
|
||||
|
||||
canonical_id = decision.get("canonical_id") or ""
|
||||
discard_ids = decision.get("discard_ids") or []
|
||||
alias_additions = decision.get("alias_additions") or []
|
||||
if not canonical_id or not discard_ids:
|
||||
db.mark_consolidate_proposal_applied(
|
||||
row["id"], error="missing canonical_id or discard_ids",
|
||||
)
|
||||
continue
|
||||
|
||||
db.update_consolidate_job_progress(
|
||||
job_id, current_cluster=row.get("cluster_key", "")[:80]
|
||||
)
|
||||
|
||||
err: str | None = None
|
||||
for did in discard_ids:
|
||||
if did == canonical_id:
|
||||
continue
|
||||
try:
|
||||
mealie.merge_foods(from_id=did, to_id=canonical_id)
|
||||
except MealieError as e:
|
||||
err = f"merge {did} → {canonical_id}: {e}"
|
||||
log.warning("[consolidate:%s] %s", job_id, err)
|
||||
break
|
||||
|
||||
if err is None and alias_additions:
|
||||
# After successful merges, push the aliases onto the survivor
|
||||
try:
|
||||
survivor = mealie._get(f"/api/foods/{canonical_id}")
|
||||
existing = survivor.get("aliases") or []
|
||||
existing_names: set = set()
|
||||
for a in existing:
|
||||
if isinstance(a, str):
|
||||
existing_names.add(a.strip().lower())
|
||||
elif isinstance(a, dict):
|
||||
existing_names.add((a.get("name") or "").strip().lower())
|
||||
new_aliases: list[dict] = list(existing)
|
||||
for n in alias_additions:
|
||||
if n.strip().lower() not in existing_names and n.strip().lower() != (survivor.get("name") or "").strip().lower():
|
||||
new_aliases.append({"name": n.strip()})
|
||||
existing_names.add(n.strip().lower())
|
||||
if new_aliases != existing:
|
||||
body = dict(survivor)
|
||||
body["aliases"] = new_aliases
|
||||
mealie.update_food(canonical_id, body)
|
||||
except MealieError as e:
|
||||
err = f"alias update on {canonical_id}: {e}"
|
||||
log.warning("[consolidate:%s] %s", job_id, err)
|
||||
|
||||
if err:
|
||||
db.mark_consolidate_proposal_applied(row["id"], error=err)
|
||||
db.update_consolidate_job_progress(
|
||||
job_id, error_delta=1, last_error=err,
|
||||
)
|
||||
else:
|
||||
db.mark_consolidate_proposal_applied(row["id"])
|
||||
db.update_consolidate_job_progress(job_id, merged_delta=1)
|
||||
|
||||
db.finalize_consolidate_job(job_id, state="done")
|
||||
log.info("[consolidate:%s] apply done", job_id)
|
||||
except Exception:
|
||||
log.exception("[consolidate:%s] apply crashed", job_id)
|
||||
try:
|
||||
db.finalize_consolidate_job(job_id, state="failed")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def spawn_walk_thread(*, db: DB, job_id: int, mealie: Mealie, forge: Forge) -> threading.Thread:
|
||||
t = threading.Thread(
|
||||
target=run_walk,
|
||||
kwargs={"db": db, "job_id": job_id, "mealie": mealie, "forge": forge},
|
||||
name=f"consolidate-walk-{job_id}",
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
return t
|
||||
|
||||
|
||||
def spawn_apply_thread(*, db: DB, job_id: int, mealie: Mealie) -> threading.Thread:
|
||||
t = threading.Thread(
|
||||
target=run_apply,
|
||||
kwargs={"db": db, "job_id": job_id, "mealie": mealie},
|
||||
name=f"consolidate-apply-{job_id}",
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
return t
|
||||
231
cauldron/db.py
231
cauldron/db.py
|
|
@ -300,6 +300,52 @@ MIGRATIONS = [
|
|||
INDEX idx_category (category)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||
""",
|
||||
# 018 — Foods consolidator jobs. Mirrors the sterilize job pattern but
|
||||
# for the foods table itself: scan Mealie household foods, cluster by
|
||||
# similarity, ask Sonnet which clusters are dupes, apply via Mealie's
|
||||
# PUT /api/foods/merge endpoint. One job at a time per household.
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS cauldron_consolidate_jobs (
|
||||
id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
||||
household_id BIGINT NOT NULL,
|
||||
started_by_sub VARCHAR(190) NOT NULL,
|
||||
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
finished_at DATETIME,
|
||||
total_clusters INT NOT NULL DEFAULT 0,
|
||||
processed_count INT NOT NULL DEFAULT 0,
|
||||
merged_count INT NOT NULL DEFAULT 0,
|
||||
error_count INT NOT NULL DEFAULT 0,
|
||||
current_cluster VARCHAR(255),
|
||||
last_error VARCHAR(500),
|
||||
state ENUM('running','review','applying','done','failed','cancelled')
|
||||
NOT NULL DEFAULT 'running',
|
||||
INDEX idx_household_state (household_id, state),
|
||||
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||
""",
|
||||
# 019 — Per-cluster proposal. cluster_json holds the list of Mealie
|
||||
# food IDs+names in the cluster; sonnet_decision stores
|
||||
# {merge: bool, canonical_name: <picked>, canonical_food_id: <id>,
|
||||
# discard_food_ids: [...]}.
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS cauldron_consolidate_proposals (
|
||||
id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
||||
job_id BIGINT NOT NULL,
|
||||
cluster_key VARCHAR(255) NOT NULL,
|
||||
cluster_size INT NOT NULL DEFAULT 0,
|
||||
cluster_json JSON,
|
||||
sonnet_decision JSON,
|
||||
approved BOOLEAN,
|
||||
applied_at DATETIME,
|
||||
apply_error VARCHAR(500),
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE KEY uk_job_cluster (job_id, cluster_key),
|
||||
INDEX idx_approved (job_id, approved),
|
||||
FOREIGN KEY (job_id) REFERENCES cauldron_consolidate_jobs(id) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||
""",
|
||||
]
|
||||
|
||||
|
||||
|
|
@ -1156,6 +1202,191 @@ class DB:
|
|||
)
|
||||
return [dict(r) for r in cur.fetchall()]
|
||||
|
||||
# --- consolidate (foods merge) jobs -------------------------------------
|
||||
|
||||
def create_consolidate_job(
|
||||
self, *, household_id: int, started_by_sub: str
|
||||
) -> int:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""INSERT INTO cauldron_consolidate_jobs
|
||||
(household_id, started_by_sub, state)
|
||||
VALUES (%s, %s, 'running')""",
|
||||
(household_id, started_by_sub),
|
||||
)
|
||||
return cur.lastrowid
|
||||
|
||||
def get_consolidate_job(self, job_id: int) -> dict | None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute("SELECT * FROM cauldron_consolidate_jobs WHERE id=%s", (job_id,))
|
||||
return cur.fetchone()
|
||||
|
||||
def get_consolidate_job_state(self, job_id: int) -> str | None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute("SELECT state FROM cauldron_consolidate_jobs WHERE id=%s", (job_id,))
|
||||
row = cur.fetchone()
|
||||
return row["state"] if row else None
|
||||
|
||||
def latest_consolidate_job_for_household(self, household_id: int) -> dict | None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM cauldron_consolidate_jobs
|
||||
WHERE household_id=%s ORDER BY started_at DESC LIMIT 1""",
|
||||
(household_id,),
|
||||
)
|
||||
return cur.fetchone()
|
||||
|
||||
def running_consolidate_job_for_household(self, household_id: int) -> dict | None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM cauldron_consolidate_jobs
|
||||
WHERE household_id=%s AND state IN ('running','applying')
|
||||
ORDER BY started_at DESC LIMIT 1""",
|
||||
(household_id,),
|
||||
)
|
||||
return cur.fetchone()
|
||||
|
||||
def update_consolidate_job_progress(
|
||||
self,
|
||||
job_id: int,
|
||||
*,
|
||||
processed_delta: int = 0,
|
||||
merged_delta: int = 0,
|
||||
error_delta: int = 0,
|
||||
current_cluster: str | None = None,
|
||||
last_error: str | None = None,
|
||||
) -> None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""UPDATE cauldron_consolidate_jobs
|
||||
SET processed_count = processed_count + %s,
|
||||
merged_count = merged_count + %s,
|
||||
error_count = error_count + %s,
|
||||
current_cluster = COALESCE(%s, current_cluster),
|
||||
last_error = COALESCE(%s, last_error),
|
||||
last_progress_at = NOW()
|
||||
WHERE id=%s""",
|
||||
(processed_delta, merged_delta, error_delta,
|
||||
current_cluster, last_error, job_id),
|
||||
)
|
||||
|
||||
def finalize_consolidate_job(self, job_id: int, *, state: str) -> None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""UPDATE cauldron_consolidate_jobs
|
||||
SET state=%s,
|
||||
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
|
||||
THEN NOW() ELSE finished_at END,
|
||||
last_progress_at = NOW(),
|
||||
current_cluster = NULL
|
||||
WHERE id=%s
|
||||
AND state IN ('running','applying')""",
|
||||
(state, state, job_id),
|
||||
)
|
||||
|
||||
def insert_consolidate_proposal(
|
||||
self,
|
||||
*,
|
||||
job_id: int,
|
||||
cluster_key: str,
|
||||
cluster: list[dict],
|
||||
decision: dict | None,
|
||||
error: str | None,
|
||||
) -> None:
|
||||
import json as _j
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""INSERT INTO cauldron_consolidate_proposals
|
||||
(job_id, cluster_key, cluster_size, cluster_json,
|
||||
sonnet_decision, apply_error)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
cluster_size=VALUES(cluster_size),
|
||||
cluster_json=VALUES(cluster_json),
|
||||
sonnet_decision=VALUES(sonnet_decision),
|
||||
apply_error=VALUES(apply_error)""",
|
||||
(
|
||||
job_id, cluster_key[:255], len(cluster),
|
||||
_j.dumps(cluster, ensure_ascii=False, default=str),
|
||||
_j.dumps(decision, ensure_ascii=False) if decision else None,
|
||||
(error or "")[:500] or None,
|
||||
),
|
||||
)
|
||||
|
||||
def list_consolidate_proposals(self, job_id: int) -> list[dict]:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT id, cluster_key, cluster_size, cluster_json,
|
||||
sonnet_decision, approved, applied_at, apply_error
|
||||
FROM cauldron_consolidate_proposals
|
||||
WHERE job_id=%s
|
||||
ORDER BY cluster_size DESC, cluster_key""",
|
||||
(job_id,),
|
||||
)
|
||||
return [dict(r) for r in cur.fetchall()]
|
||||
|
||||
def list_approved_unapplied_consolidate(self, job_id: int) -> list[dict]:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT id, cluster_key, cluster_size, sonnet_decision
|
||||
FROM cauldron_consolidate_proposals
|
||||
WHERE job_id=%s AND approved=1 AND applied_at IS NULL""",
|
||||
(job_id,),
|
||||
)
|
||||
return [dict(r) for r in cur.fetchall()]
|
||||
|
||||
def bulk_set_consolidate_approvals(
|
||||
self, job_id: int, approved_ids: list[int]
|
||||
) -> None:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE cauldron_consolidate_proposals SET approved=0 WHERE job_id=%s",
|
||||
(job_id,),
|
||||
)
|
||||
if approved_ids:
|
||||
placeholders = ",".join(["%s"] * len(approved_ids))
|
||||
cur.execute(
|
||||
f"""UPDATE cauldron_consolidate_proposals SET approved=1
|
||||
WHERE job_id=%s AND id IN ({placeholders})""",
|
||||
(job_id, *approved_ids),
|
||||
)
|
||||
|
||||
def mark_consolidate_proposal_applied(
|
||||
self, proposal_id: int, *, error: str | None = None
|
||||
) -> None:
|
||||
if error:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""UPDATE cauldron_consolidate_proposals
|
||||
SET apply_error=%s
|
||||
WHERE id=%s""",
|
||||
(error[:500], proposal_id),
|
||||
)
|
||||
else:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""UPDATE cauldron_consolidate_proposals
|
||||
SET applied_at=NOW(), apply_error=NULL
|
||||
WHERE id=%s""",
|
||||
(proposal_id,),
|
||||
)
|
||||
|
||||
def fail_stuck_consolidate_jobs(self, *, stale_minutes: int = 15) -> int:
|
||||
with self.conn() as c, c.cursor() as cur:
|
||||
cur.execute(
|
||||
"""UPDATE cauldron_consolidate_jobs
|
||||
SET state='failed',
|
||||
finished_at=NOW(),
|
||||
last_error=COALESCE(last_error,
|
||||
'recovery: worker exited mid-run')
|
||||
WHERE state IN ('running','applying')
|
||||
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
|
||||
(stale_minutes,),
|
||||
)
|
||||
return cur.rowcount
|
||||
|
||||
# --- sterilize stuck-job recovery (already used at boot) ---------------
|
||||
|
||||
def fail_stuck_sterilize_jobs(self, *, stale_minutes: int = 10) -> int:
|
||||
"""Recover jobs stuck in 'running'/'applying' with no progress for
|
||||
N minutes. Called at app startup. Returns count of jobs failed."""
|
||||
|
|
|
|||
|
|
@ -215,6 +215,65 @@ class Forge:
|
|||
)
|
||||
|
||||
|
||||
def cluster_decision(
|
||||
self, foods: list[dict], *, model: str | None = None
|
||||
) -> dict:
|
||||
"""Ask Sonnet whether a cluster of similar-named foods are
|
||||
actually duplicates. Input: list of {id, name, plural_name?, aliases?}.
|
||||
Returns:
|
||||
{"merge": bool,
|
||||
"canonical_id": "<id>", # the survivor (highest-quality name/aliases)
|
||||
"canonical_name": "<str>", # the survivor's name (echoed for the UI)
|
||||
"discard_ids": ["<id>", ...], # the ones to merge into canonical
|
||||
"alias_additions": ["<name>", ...], # discarded names worth keeping as aliases on the survivor
|
||||
"reason": "<one-line explanation>"}
|
||||
|
||||
merge=false means the cluster is a false positive (foods that look
|
||||
similar but are distinct, e.g. "olive oil" vs "olive"). In that case
|
||||
canonical_id may be empty and discard_ids must be empty.
|
||||
"""
|
||||
items = [
|
||||
{
|
||||
"id": f.get("id"),
|
||||
"name": f.get("name"),
|
||||
"plural_name": f.get("pluralName") or f.get("plural_name"),
|
||||
"aliases": [
|
||||
(a.get("name") if isinstance(a, dict) else a)
|
||||
for a in (f.get("aliases") or [])
|
||||
],
|
||||
}
|
||||
for f in foods
|
||||
]
|
||||
prompt = (
|
||||
"You are deciding whether a cluster of food rows from a recipe "
|
||||
"database are duplicates that should be merged into one canonical "
|
||||
"row. The names came from years of recipe imports + manual entry "
|
||||
"so plural/case/wording variations are common.\n\n"
|
||||
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
|
||||
"Output JSON ONLY, no prose: "
|
||||
'{"merge": true|false, '
|
||||
'"canonical_id": "<id of the survivor or empty>", '
|
||||
'"canonical_name": "<survivor name or empty>", '
|
||||
'"discard_ids": ["<id>", ...], '
|
||||
'"alias_additions": ["<name to add as alias on survivor>", ...], '
|
||||
'"reason": "<one-line reasoning>"}\n\n'
|
||||
"Rules:\n"
|
||||
"- Pick the survivor whose name is the cleanest canonical "
|
||||
" (lowercase, singular when applicable, no brand, no clinical "
|
||||
" qualifiers like 'raw' or 'unenriched').\n"
|
||||
"- discard_ids are the OTHER cluster members — Mealie will rewrite "
|
||||
" recipe references to point at canonical_id.\n"
|
||||
"- alias_additions = the discarded NAMES (or any close variants you "
|
||||
" noticed in plural_name/aliases) that the survivor should adopt as "
|
||||
" aliases so the parser fuzzy-matches them in the future.\n"
|
||||
"- merge=false ONLY when the cluster is a false positive (e.g. "
|
||||
" 'olive oil' vs 'olive', 'butter' vs 'peanut butter'). In that "
|
||||
" case canonical_id and discard_ids must both be empty.\n"
|
||||
"- Be conservative — when in doubt, merge=false."
|
||||
)
|
||||
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
|
||||
return _extract_cluster_decision(result)
|
||||
|
||||
def fetch_food_info(self, name: str, *, model: str | None = None) -> dict:
|
||||
"""Ask Sonnet for density + unit class + common size of a single
|
||||
food. Returns a dict shaped like:
|
||||
|
|
@ -251,6 +310,37 @@ class Forge:
|
|||
return _extract_food_info(result)
|
||||
|
||||
|
||||
def _extract_cluster_decision(forge_result: dict) -> dict:
|
||||
if not isinstance(forge_result, dict):
|
||||
raise ForgeError("forge result not a dict")
|
||||
inner = forge_result.get("result", forge_result)
|
||||
if isinstance(inner, str):
|
||||
inner = _parse_json_blob(inner)
|
||||
if not isinstance(inner, dict):
|
||||
raise ForgeError(f"cluster decision not a dict: {str(inner)[:200]}")
|
||||
|
||||
merge = bool(inner.get("merge"))
|
||||
canonical_id = str(inner.get("canonical_id") or "")
|
||||
canonical_name = str(inner.get("canonical_name") or "")
|
||||
discard_raw = inner.get("discard_ids") or []
|
||||
discard_ids = [str(x) for x in discard_raw if isinstance(x, (str, int))]
|
||||
aliases_raw = inner.get("alias_additions") or []
|
||||
alias_additions = [str(x) for x in aliases_raw if isinstance(x, str) and x.strip()]
|
||||
reason = str(inner.get("reason") or "")[:500]
|
||||
|
||||
if not merge:
|
||||
canonical_id = ""
|
||||
discard_ids = []
|
||||
return {
|
||||
"merge": merge,
|
||||
"canonical_id": canonical_id,
|
||||
"canonical_name": canonical_name,
|
||||
"discard_ids": discard_ids,
|
||||
"alias_additions": alias_additions,
|
||||
"reason": reason,
|
||||
}
|
||||
|
||||
|
||||
def _extract_food_info(forge_result: dict) -> dict:
|
||||
"""Normalize clawdforge wrapper → food info dict. Defensive on shapes."""
|
||||
if not isinstance(forge_result, dict):
|
||||
|
|
|
|||
|
|
@ -116,6 +116,15 @@ class Mealie:
|
|||
body["pluralName"] = plural_name
|
||||
return self._post("/api/foods", body)
|
||||
|
||||
def update_food(self, food_id: str, body: dict) -> dict:
|
||||
return self._put(f"/api/foods/{food_id}", body)
|
||||
|
||||
def merge_foods(self, *, from_id: str, to_id: str) -> dict:
|
||||
"""PUT /api/foods/merge — consolidates `from_id` into `to_id`. Mealie
|
||||
rewrites every recipe_ingredient.food_id reference and deletes
|
||||
from_id. Permission-scoped per-household."""
|
||||
return self._put("/api/foods/merge", {"fromFood": from_id, "toFood": to_id})
|
||||
|
||||
def list_units(self, *, search: str | None = None, per_page: int = 200) -> dict:
|
||||
return self._get("/api/units", search=search or "", perPage=per_page)
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ from .config import load
|
|||
from .crypto import TokenCrypto
|
||||
from .db import DB
|
||||
from .forge import Forge, ForgeError
|
||||
from . import aggregator, bulk_sterilize, foods
|
||||
from . import aggregator, bulk_sterilize, consolidate_foods, foods
|
||||
from .mealie import Mealie, MealieError
|
||||
from .oidc import init_oauth
|
||||
from .recipe_index import flatten_recipe, refresh_household_index, search_index
|
||||
|
|
@ -111,6 +111,13 @@ def create_app() -> Flask:
|
|||
except Exception as e:
|
||||
app.logger.warning("sterilize stuck-job recovery failed: %s", e)
|
||||
|
||||
try:
|
||||
n_failed = db.fail_stuck_consolidate_jobs(stale_minutes=15)
|
||||
if n_failed:
|
||||
app.logger.info("failed %d stuck consolidate jobs at boot", n_failed)
|
||||
except Exception as e:
|
||||
app.logger.warning("consolidate stuck-job recovery failed: %s", e)
|
||||
|
||||
oauth = init_oauth(
|
||||
app,
|
||||
issuer=cfg.oidc_issuer,
|
||||
|
|
@ -1016,6 +1023,153 @@ def create_app() -> Flask:
|
|||
db.finalize_sterilize_job(job_id, state="cancelled")
|
||||
return jsonify({"ok": True})
|
||||
|
||||
# ---------- foods consolidator (Step 3) ------------------------------
|
||||
|
||||
@app.get("/consolidate")
|
||||
@require_session
|
||||
def consolidate_page():
|
||||
hid = current_household_id()
|
||||
if not hid:
|
||||
return redirect(url_for("connect_mealie_get"))
|
||||
latest = db.latest_consolidate_job_for_household(hid)
|
||||
return render_template(
|
||||
"consolidate.html", active="consolidate", latest_job=latest,
|
||||
)
|
||||
|
||||
@app.post("/api/foods/consolidate-start")
|
||||
@require_session
|
||||
def consolidate_start():
|
||||
u = session["user"]
|
||||
hid = current_household_id()
|
||||
if not hid:
|
||||
return jsonify({"error": "no household"}), 409
|
||||
active = db.running_consolidate_job_for_household(hid)
|
||||
if active:
|
||||
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
|
||||
client = current_user_mealie()
|
||||
if client is None:
|
||||
return redirect(url_for("connect_mealie_get"))
|
||||
job_id = db.create_consolidate_job(household_id=hid, started_by_sub=u["sub"])
|
||||
consolidate_foods.spawn_walk_thread(
|
||||
db=db, job_id=job_id, mealie=client, forge=forge,
|
||||
)
|
||||
return jsonify({"ok": True, "job_id": job_id})
|
||||
|
||||
@app.get("/api/foods/consolidate-status")
|
||||
@require_session
|
||||
def consolidate_status():
|
||||
hid = current_household_id()
|
||||
if not hid:
|
||||
return jsonify({"error": "no household"}), 409
|
||||
job = db.latest_consolidate_job_for_household(hid)
|
||||
if not job:
|
||||
return jsonify({"job": None})
|
||||
return jsonify({"job": _consolidate_job_payload(job)})
|
||||
|
||||
@app.get("/api/foods/consolidate-jobs/<int:job_id>/proposals")
|
||||
@require_session
|
||||
def consolidate_proposals(job_id: int):
|
||||
hid = current_household_id()
|
||||
if not hid:
|
||||
return jsonify({"error": "no household"}), 409
|
||||
job = db.get_consolidate_job(job_id)
|
||||
if not job or job["household_id"] != hid:
|
||||
return jsonify({"error": "not_found"}), 404
|
||||
rows = db.list_consolidate_proposals(job_id)
|
||||
for p in rows:
|
||||
for k in ("cluster_json", "sonnet_decision"):
|
||||
v = p.get(k)
|
||||
if isinstance(v, str):
|
||||
try:
|
||||
p[k] = _json_loads(v)
|
||||
except Exception:
|
||||
p[k] = None
|
||||
return jsonify({
|
||||
"job": _consolidate_job_payload(job),
|
||||
"proposals": rows,
|
||||
})
|
||||
|
||||
@app.post("/api/foods/consolidate-apply/<int:job_id>")
|
||||
@require_session
|
||||
def consolidate_apply(job_id: int):
|
||||
hid = current_household_id()
|
||||
if not hid:
|
||||
return jsonify({"error": "no household"}), 409
|
||||
job = db.get_consolidate_job(job_id)
|
||||
if not job or job["household_id"] != hid:
|
||||
return jsonify({"error": "not_found"}), 404
|
||||
if job["state"] != "review":
|
||||
return jsonify({"error": f"bad_state:{job['state']}"}), 409
|
||||
body = request.get_json(silent=True) or {}
|
||||
approved_ids_raw = body.get("approved_ids") or []
|
||||
approved_ids = [int(x) for x in approved_ids_raw if isinstance(x, (int, str)) and str(x).isdigit()]
|
||||
client = current_user_mealie()
|
||||
if client is None:
|
||||
return redirect(url_for("connect_mealie_get"))
|
||||
db.bulk_set_consolidate_approvals(job_id, approved_ids)
|
||||
db.finalize_consolidate_job(job_id, state="applying")
|
||||
consolidate_foods.spawn_apply_thread(db=db, job_id=job_id, mealie=client)
|
||||
return jsonify({"ok": True, "approved_count": len(approved_ids)})
|
||||
|
||||
@app.post("/api/foods/consolidate-cancel/<int:job_id>")
|
||||
@require_session
|
||||
def consolidate_cancel(job_id: int):
|
||||
hid = current_household_id()
|
||||
if not hid:
|
||||
return jsonify({"error": "no household"}), 409
|
||||
job = db.get_consolidate_job(job_id)
|
||||
if not job or job["household_id"] != hid:
|
||||
return jsonify({"error": "not_found"}), 404
|
||||
if job["state"] not in ("running", "review", "applying"):
|
||||
return jsonify({"error": f"bad_state:{job['state']}"}), 409
|
||||
db.finalize_consolidate_job(job_id, state="cancelled")
|
||||
return jsonify({"ok": True})
|
||||
|
||||
# admin variants for kayos kick-off
|
||||
@app.post("/api/admin/foods/consolidate-start")
|
||||
@require_bearer
|
||||
def admin_consolidate_start():
|
||||
body = request.get_json(silent=True) or {}
|
||||
sub = (body.get("started_by_sub") or "").strip()
|
||||
if not sub:
|
||||
return jsonify({"error": "started_by_sub required"}), 400
|
||||
hid = db.get_user_household_id(sub)
|
||||
if not hid:
|
||||
return jsonify({"error": "user has no household"}), 404
|
||||
active = db.running_consolidate_job_for_household(hid)
|
||||
if active:
|
||||
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
|
||||
blob = db.get_user_mealie_token_blob(sub)
|
||||
if not blob:
|
||||
return jsonify({"error": "user_not_connected_to_mealie"}), 409
|
||||
try:
|
||||
tok = crypto.decrypt(blob)
|
||||
except Exception:
|
||||
return jsonify({"error": "user_token_undecryptable"}), 500
|
||||
mealie = Mealie(base_url=cfg.mealie_api_url, api_token=tok)
|
||||
job_id = db.create_consolidate_job(household_id=hid, started_by_sub=sub)
|
||||
consolidate_foods.spawn_walk_thread(db=db, job_id=job_id, mealie=mealie, forge=forge)
|
||||
return jsonify({"ok": True, "job_id": job_id})
|
||||
|
||||
@app.get("/api/admin/foods/consolidate-jobs/<int:job_id>")
|
||||
@require_bearer
|
||||
def admin_consolidate_status(job_id: int):
|
||||
job = db.get_consolidate_job(job_id)
|
||||
if not job:
|
||||
return jsonify({"error": "not_found"}), 404
|
||||
return jsonify({"job": _consolidate_job_payload(job)})
|
||||
|
||||
@app.post("/api/admin/foods/consolidate-cancel/<int:job_id>")
|
||||
@require_bearer
|
||||
def admin_consolidate_cancel(job_id: int):
|
||||
job = db.get_consolidate_job(job_id)
|
||||
if not job:
|
||||
return jsonify({"error": "not_found"}), 404
|
||||
if job["state"] not in ("running", "review", "applying"):
|
||||
return jsonify({"error": f"bad_state:{job['state']}"}), 409
|
||||
db.finalize_consolidate_job(job_id, state="cancelled")
|
||||
return jsonify({"ok": True})
|
||||
|
||||
# ---------- admin sterilizer (bearer-auth, kick off on user's behalf) -
|
||||
|
||||
@app.post("/api/admin/sterilize/bulk-start")
|
||||
|
|
@ -1244,6 +1398,17 @@ def _job_payload(job: dict) -> dict:
|
|||
return j
|
||||
|
||||
|
||||
def _consolidate_job_payload(job: dict) -> dict:
|
||||
"""Same shape as _job_payload — kept separate for clarity since the
|
||||
consolidate job has different counter columns than sterilize."""
|
||||
j = dict(job)
|
||||
for k in ("started_at", "last_progress_at", "finished_at"):
|
||||
v = j.get(k)
|
||||
if v is not None and hasattr(v, "isoformat"):
|
||||
j[k] = v.isoformat()
|
||||
return j
|
||||
|
||||
|
||||
def _plan_payload(plan: dict) -> dict:
|
||||
"""JSON-serializable view of a plan dict (datetimes → iso strings)."""
|
||||
p = dict(plan)
|
||||
|
|
|
|||
343
cauldron/templates/consolidate.html
Normal file
343
cauldron/templates/consolidate.html
Normal file
|
|
@ -0,0 +1,343 @@
|
|||
{% extends "_base.html" %}
|
||||
{% block title %}Consolidate · Cauldron{% endblock %}
|
||||
{% block content %}
|
||||
|
||||
<style>
|
||||
.progress-rail { width:100%; height:14px; background:var(--bg-2);
|
||||
border:1px solid var(--line); border-radius:8px; overflow:hidden;
|
||||
margin:12px 0 6px 0; }
|
||||
.progress-fill { height:100%;
|
||||
background:linear-gradient(90deg, var(--purple-deep), var(--purple-bright));
|
||||
transition:width .3s ease; box-shadow:0 0 12px -2px var(--purple-glow); }
|
||||
.progress-meta { color:var(--bone-dim); font-family:var(--mono); font-size:12px;
|
||||
letter-spacing:.1em; display:flex; gap:18px; flex-wrap:wrap; }
|
||||
.progress-meta strong { color:var(--bone); }
|
||||
|
||||
.review-bar { position:sticky; top:70px; z-index:5; display:flex;
|
||||
align-items:center; justify-content:space-between; flex-wrap:wrap; gap:12px;
|
||||
padding:12px 14px; background:var(--bg-2); border:1px solid var(--line);
|
||||
border-radius:8px; margin-bottom:14px; }
|
||||
.review-bar .left { display:flex; gap:14px; align-items:center; }
|
||||
|
||||
.cluster-card { background:var(--surface); border:1px solid var(--line);
|
||||
border-left:3px solid var(--purple-dim); border-radius:6px;
|
||||
padding:12px 14px; margin-bottom:10px; }
|
||||
.cluster-card.approved { border-left-color: var(--green-bright); }
|
||||
.cluster-card.rejected { border-left-color: var(--muted); opacity: .55; }
|
||||
.cluster-card.no-merge { border-left-color: var(--bone-dim); }
|
||||
.cluster-card.errored { border-left-color: var(--crit); }
|
||||
|
||||
.cluster-head { display:flex; align-items:center; justify-content:space-between;
|
||||
gap:10px; flex-wrap:wrap; }
|
||||
.cluster-name { color:var(--bone); font-family:var(--serif); font-size:1.05em;
|
||||
line-height:1.2; }
|
||||
.cluster-meta { color:var(--muted); font-family:var(--mono); font-size:11px;
|
||||
letter-spacing:.15em; text-transform:uppercase; }
|
||||
.toggle { display:inline-flex; align-items:center; gap:6px; padding:4px 10px;
|
||||
border-radius:999px; border:1px solid var(--line); background:var(--bg-2);
|
||||
color:var(--bone-dim); font-family:var(--mono); font-size:11px;
|
||||
letter-spacing:.1em; cursor:pointer; min-height:28px; }
|
||||
.toggle.on { background:rgba(110,168,72,.18); border-color:var(--green-dim);
|
||||
color:var(--green-bright); }
|
||||
.toggle.off { background:rgba(232,96,106,.12);
|
||||
border-color:rgba(232,96,106,.3); color:var(--crit); }
|
||||
.toggle:disabled { opacity:.4; cursor:not-allowed; }
|
||||
|
||||
.members-list { display:flex; gap:6px; flex-wrap:wrap; margin:8px 0; }
|
||||
.member-chip { display:inline-flex; align-items:center; gap:6px; padding:4px 8px;
|
||||
background:var(--bg-2); border:1px solid var(--line); border-radius:4px;
|
||||
font-family:var(--mono); font-size:12px; color:var(--bone-dim); }
|
||||
.member-chip.canonical {
|
||||
background:rgba(155,95,232,.15); border-color:var(--purple-dim);
|
||||
color:var(--purple-bright); }
|
||||
.member-chip.canonical::before { content:"★ "; color:var(--purple-bright); }
|
||||
.member-chip.discard { color:var(--muted); }
|
||||
.member-chip.discard::before { content:"× "; color:var(--crit); }
|
||||
|
||||
.reason { color:var(--bone-dim); font-style:italic; font-size:.9em;
|
||||
margin-top:6px; line-height:1.3; }
|
||||
.alias-add { color:var(--green-bright); font-size:.85em; font-family:var(--mono);
|
||||
margin-top:4px; }
|
||||
|
||||
.empty-state { padding:28px 14px; text-align:center; color:var(--bone-dim); }
|
||||
</style>
|
||||
|
||||
<div class="page-head">
|
||||
<div class="crumb">// consolidate · merge duplicate foods in mealie</div>
|
||||
<h1>foods <span class="accent">consolidate</span></h1>
|
||||
<div class="lede">
|
||||
scan your mealie household foods, cluster lookalikes by similarity,
|
||||
let sonnet pick the survivor + the rest become aliases on it. mealie
|
||||
rewrites every recipe ingredient to point at the canonical row.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<section class="panel" id="consolidate-shell">
|
||||
<div class="panel-head">
|
||||
<h2>state</h2>
|
||||
<span class="pill" id="state-pill">loading…</span>
|
||||
<span class="ctx" id="state-ctx"></span>
|
||||
</div>
|
||||
|
||||
<div id="empty-pane" style="display:none;">
|
||||
<p>no run yet. kick one off?</p>
|
||||
<button class="btn btn-purple" id="start-btn" type="button" onclick="startRun()">🪄 scan + cluster mealie foods</button>
|
||||
<p class="muted" style="margin-top:8px;">
|
||||
walks your household catalog, clusters by name similarity, asks sonnet for the survivor of each cluster.
|
||||
first run might take 5-15 min depending on cluster count.
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div id="progress-pane" style="display:none;">
|
||||
<div class="progress-rail"><div class="progress-fill" id="bar" style="width:0%;"></div></div>
|
||||
<div class="progress-meta">
|
||||
<span><strong id="processed">0</strong> processed</span>
|
||||
<span><strong id="merged">0</strong> merged</span>
|
||||
<span><strong id="errors">0</strong> errors</span>
|
||||
<span>of <strong id="total">?</strong> clusters</span>
|
||||
<span class="muted" id="current-cluster"></span>
|
||||
</div>
|
||||
<div class="btn-row" style="margin-top:12px;">
|
||||
<button class="btn" type="button" onclick="cancelJob()">cancel</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div id="review-pane" style="display:none;">
|
||||
<div class="review-bar">
|
||||
<div class="left">
|
||||
<span><strong id="approved-count">0</strong> selected</span>
|
||||
<span class="muted" id="review-meta"></span>
|
||||
</div>
|
||||
<div class="right">
|
||||
<button class="btn" type="button" onclick="setAll(true)">approve all merges</button>
|
||||
<button class="btn" type="button" onclick="setAll(false)">clear</button>
|
||||
<button class="btn btn-purple" type="button" id="apply-btn" onclick="applyApproved()">apply selected →</button>
|
||||
</div>
|
||||
</div>
|
||||
<div id="proposals-grid"></div>
|
||||
</div>
|
||||
|
||||
<div id="done-pane" style="display:none;">
|
||||
<p id="done-line"></p>
|
||||
<button class="btn btn-purple" type="button" onclick="startRun()">↻ start new run</button>
|
||||
</div>
|
||||
|
||||
<div id="failed-pane" style="display:none;">
|
||||
<p style="color:var(--crit);" id="failed-line"></p>
|
||||
<button class="btn btn-purple" type="button" onclick="startRun()">↻ retry</button>
|
||||
</div>
|
||||
</section>
|
||||
|
||||
<script>
|
||||
let job = {{ (latest_job | tojson) if latest_job else 'null' }};
|
||||
let pollTimer = null;
|
||||
let proposals = [];
|
||||
function $(id){return document.getElementById(id);}
|
||||
|
||||
function showPane(name){
|
||||
for(const p of ['empty','progress','review','done','failed']){
|
||||
$(`${p}-pane`).style.display = (p === name) ? '' : 'none';
|
||||
}
|
||||
}
|
||||
function setStatePill(text, klass){
|
||||
const el = $('state-pill');
|
||||
el.textContent = text;
|
||||
el.className = 'pill ' + (klass || 'pill-mute');
|
||||
}
|
||||
function paint(){
|
||||
if(!job) return;
|
||||
const total = job.total_clusters || 0;
|
||||
const done = (job.processed_count || 0) + (job.error_count || 0);
|
||||
const pct = total>0 ? Math.round((done/total)*100) : 0;
|
||||
$('bar').style.width = pct+'%';
|
||||
$('processed').textContent = job.processed_count || 0;
|
||||
$('merged').textContent = job.merged_count || 0;
|
||||
$('errors').textContent = job.error_count || 0;
|
||||
$('total').textContent = total || '?';
|
||||
$('current-cluster').textContent = job.current_cluster ? `· ${job.current_cluster}` : '';
|
||||
}
|
||||
async function fetchJob(){
|
||||
try {
|
||||
const r = await fetch('/api/foods/consolidate-status');
|
||||
const d = await r.json();
|
||||
job = d.job || null; route();
|
||||
} catch(e){ console.error('status poll failed', e); }
|
||||
}
|
||||
function route(){
|
||||
if(!job){ stopPoll(); setStatePill('idle','pill-mute'); $('state-ctx').textContent=''; showPane('empty'); return; }
|
||||
$('state-ctx').textContent = `started ${new Date(job.started_at).toLocaleString()}`;
|
||||
const s = job.state;
|
||||
if(s === 'running'){ setStatePill('walking','pill-ok'); paint(); showPane('progress'); startPoll(); }
|
||||
else if(s === 'review'){ setStatePill('review','pill-ok'); paint(); showPane('review'); stopPoll(); loadProposals(); }
|
||||
else if(s === 'applying'){ setStatePill('applying','pill-ok'); paint(); showPane('progress'); startPoll(); }
|
||||
else if(s === 'done'){ setStatePill('done','pill-mute');
|
||||
const m = job.merged_count || 0;
|
||||
$('done-line').textContent = `merged ${m} cluster${m===1?'':'s'} into canonical foods.`;
|
||||
showPane('done'); stopPoll(); }
|
||||
else if(s === 'failed'){ setStatePill('failed','pill-mute'); $('failed-line').textContent = job.last_error || 'job failed'; showPane('failed'); stopPoll(); }
|
||||
else if(s === 'cancelled'){ setStatePill('cancelled','pill-mute'); $('done-line').textContent='job cancelled.'; showPane('done'); stopPoll(); }
|
||||
}
|
||||
function startPoll(){ if(pollTimer) return; pollTimer = setInterval(fetchJob, 2000); }
|
||||
function stopPoll(){ if(pollTimer){ clearInterval(pollTimer); pollTimer=null; } }
|
||||
|
||||
async function startRun(){
|
||||
const btn = $('start-btn');
|
||||
if(btn){ btn.disabled = true; btn.textContent = 'kicking off…'; }
|
||||
try {
|
||||
const r = await fetch('/api/foods/consolidate-start',{method:'POST'});
|
||||
if(!r.ok){ const j = await r.json().catch(()=>({})); throw new Error(j.error || r.status); }
|
||||
await fetchJob();
|
||||
} catch(e){
|
||||
alert('start failed: ' + e.message);
|
||||
if(btn){ btn.disabled = false; btn.textContent = '🪄 scan + cluster mealie foods'; }
|
||||
}
|
||||
}
|
||||
async function cancelJob(){
|
||||
if(!job) return;
|
||||
if(!confirm('cancel this run?')) return;
|
||||
try { await fetch('/api/foods/consolidate-cancel/'+job.id,{method:'POST'}); await fetchJob(); }
|
||||
catch(e){ alert('cancel failed: '+e.message); }
|
||||
}
|
||||
async function loadProposals(){
|
||||
if(!job) return;
|
||||
try {
|
||||
const r = await fetch('/api/foods/consolidate-jobs/'+job.id+'/proposals');
|
||||
const d = await r.json();
|
||||
proposals = d.proposals || [];
|
||||
for(const p of proposals){
|
||||
const dec = p.sonnet_decision || {};
|
||||
if(p.approved === null || p.approved === undefined){
|
||||
// Default: approve mergeable clusters with no apply error
|
||||
p.approved = !!dec.merge && !p.apply_error;
|
||||
}
|
||||
}
|
||||
renderProposals();
|
||||
} catch(e){ console.error('proposals load failed', e); }
|
||||
}
|
||||
function renderProposals(){
|
||||
const grid = $('proposals-grid');
|
||||
grid.innerHTML = '';
|
||||
if(!proposals.length){
|
||||
grid.innerHTML = '<div class="empty-state">no clusters found — your foods table is already clean (or close to it).</div>';
|
||||
$('apply-btn').disabled = true; return;
|
||||
}
|
||||
let approvedCount = 0;
|
||||
for(const p of proposals){
|
||||
if(p.approved) approvedCount++;
|
||||
grid.appendChild(renderOne(p));
|
||||
}
|
||||
$('approved-count').textContent = approvedCount;
|
||||
const total = proposals.length;
|
||||
const noMerge = proposals.filter(p=>!(p.sonnet_decision||{}).merge).length;
|
||||
$('review-meta').textContent = `· ${total} clusters, ${noMerge} marked no-merge`;
|
||||
$('apply-btn').disabled = approvedCount === 0;
|
||||
}
|
||||
function renderOne(p){
|
||||
const card = document.createElement('div');
|
||||
card.className = 'cluster-card';
|
||||
const dec = p.sonnet_decision || {};
|
||||
const cluster = p.cluster_json || [];
|
||||
if(p.apply_error) card.classList.add('errored');
|
||||
else if(!dec.merge) card.classList.add('no-merge');
|
||||
else if(p.approved) card.classList.add('approved');
|
||||
else card.classList.add('rejected');
|
||||
|
||||
const head = document.createElement('div');
|
||||
head.className = 'cluster-head';
|
||||
const left = document.createElement('div');
|
||||
const nm = document.createElement('div');
|
||||
nm.className = 'cluster-name';
|
||||
nm.textContent = dec.canonical_name || `cluster of ${cluster.length}`;
|
||||
const meta = document.createElement('div');
|
||||
meta.className = 'cluster-meta';
|
||||
meta.textContent = dec.merge ? `merge ${cluster.length} → 1` : `keep separate (${cluster.length} foods)`;
|
||||
left.appendChild(nm); left.appendChild(meta);
|
||||
|
||||
const tog = document.createElement('button');
|
||||
tog.type = 'button';
|
||||
tog.className = 'toggle ' + (p.approved ? 'on' : 'off');
|
||||
tog.textContent = p.approved ? 'merge' : (dec.merge ? 'skip' : 'keep separate');
|
||||
tog.disabled = !dec.merge;
|
||||
tog.onclick = () => flip(p, card, tog);
|
||||
head.appendChild(left); head.appendChild(tog);
|
||||
card.appendChild(head);
|
||||
|
||||
const list = document.createElement('div');
|
||||
list.className = 'members-list';
|
||||
const canonId = dec.canonical_id;
|
||||
const discardSet = new Set(dec.discard_ids || []);
|
||||
for(const m of cluster){
|
||||
const chip = document.createElement('span');
|
||||
chip.className = 'member-chip';
|
||||
if(m.id === canonId && dec.merge) chip.classList.add('canonical');
|
||||
else if(discardSet.has(m.id)) chip.classList.add('discard');
|
||||
chip.textContent = m.name + (m.pluralName && m.pluralName !== m.name ? ` / ${m.pluralName}` : '');
|
||||
list.appendChild(chip);
|
||||
}
|
||||
card.appendChild(list);
|
||||
|
||||
if(dec.alias_additions && dec.alias_additions.length){
|
||||
const al = document.createElement('div');
|
||||
al.className = 'alias-add';
|
||||
al.textContent = `+ aliases: ${dec.alias_additions.join(', ')}`;
|
||||
card.appendChild(al);
|
||||
}
|
||||
|
||||
if(dec.reason){
|
||||
const r = document.createElement('div');
|
||||
r.className = 'reason';
|
||||
r.textContent = dec.reason;
|
||||
card.appendChild(r);
|
||||
}
|
||||
if(p.apply_error){
|
||||
const e = document.createElement('div');
|
||||
e.className = 'reason';
|
||||
e.style.color = 'var(--crit)';
|
||||
e.textContent = 'apply error: ' + p.apply_error;
|
||||
card.appendChild(e);
|
||||
}
|
||||
return card;
|
||||
}
|
||||
function flip(p, card, tog){
|
||||
const dec = p.sonnet_decision || {};
|
||||
if(!dec.merge) return;
|
||||
p.approved = !p.approved;
|
||||
card.classList.toggle('approved', p.approved);
|
||||
card.classList.toggle('rejected', !p.approved);
|
||||
tog.classList.toggle('on', p.approved);
|
||||
tog.classList.toggle('off', !p.approved);
|
||||
tog.textContent = p.approved ? 'merge' : 'skip';
|
||||
const cnt = proposals.filter(x => x.approved).length;
|
||||
$('approved-count').textContent = cnt;
|
||||
$('apply-btn').disabled = cnt === 0;
|
||||
}
|
||||
function setAll(on){
|
||||
for(const p of proposals){
|
||||
const dec = p.sonnet_decision || {};
|
||||
if(dec.merge) p.approved = on;
|
||||
}
|
||||
renderProposals();
|
||||
}
|
||||
async function applyApproved(){
|
||||
const ids = proposals.filter(p => p.approved && (p.sonnet_decision||{}).merge).map(p => p.id);
|
||||
if(!ids.length) return;
|
||||
if(!confirm(`merge ${ids.length} cluster${ids.length===1?'':'s'} in mealie? this rewrites recipe references.`)) return;
|
||||
const btn = $('apply-btn'); btn.disabled = true; btn.textContent = 'applying…';
|
||||
try {
|
||||
const r = await fetch('/api/foods/consolidate-apply/'+job.id, {
|
||||
method:'POST', headers:{'Content-Type':'application/json'},
|
||||
body: JSON.stringify({approved_ids: ids}),
|
||||
});
|
||||
if(!r.ok){ const j = await r.json().catch(()=>({})); throw new Error(j.error || r.status); }
|
||||
await fetchJob();
|
||||
} catch(e){
|
||||
alert('apply failed: '+e.message);
|
||||
btn.disabled = false; btn.textContent = 'apply selected →';
|
||||
}
|
||||
}
|
||||
|
||||
route();
|
||||
if(job && (job.state === 'running' || job.state === 'applying')) startPoll();
|
||||
</script>
|
||||
|
||||
{% endblock %}
|
||||
|
|
@ -57,6 +57,8 @@
|
|||
</div>
|
||||
<p class="muted">mealie's parser is per-recipe; this kicks off a bulk pass over your whole library. review proposals, apply the good ones.</p>
|
||||
<p><a class="btn" href="/sterilize">🪄 bulk sterilize recipes →</a></p>
|
||||
<p class="muted" style="margin-top:14px;">scan your foods table for dupes, ask sonnet to pick canonicals, merge in mealie. one-time cleanup; aliases get attached to the survivors so the parser fuzzy-matches variants from now on.</p>
|
||||
<p><a class="btn" href="/consolidate">🔮 consolidate foods table →</a></p>
|
||||
</section>
|
||||
{% endif %}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue