v0.3 step 6: bulk sterilizer — automate mealie's per-recipe Parse toil

Mealie's "Parse" button on every recipe is per-recipe; clicking through
226 of them is the toil Cobb explicitly hates. The bulk sterilizer wraps
cauldron's existing per-recipe Sterilizer (clawdforge → Sonnet, parses
free-form ingredient strings into structured form) into a household-wide
job with progress tracking, batch human review, and one-shot apply.

Flow:
  1. POST /api/sterilize/bulk-start — creates cauldron_sterilize_jobs row,
     spawns a daemon thread that walks every recipe in the user's
     household. Recipes whose ingredients all already have food.id are
     skipped (no point re-parsing Cobb's manual cleanup). For each
     recipe needing work, Sonnet returns a structured proposal that gets
     persisted to cauldron_sterilize_proposals.
  2. GET /api/sterilize/bulk-status — polled every 2s by the UI for
     {state, processed_count, skipped_count, error_count, current_slug}.
  3. After the walk completes, state moves to 'review'. UI loads
     /api/sterilize/bulk-jobs/<id>/proposals and renders one card per
     recipe with a was→becomes diff per ingredient. User toggles
     approve/skip per recipe.
  4. POST /api/sterilize/bulk-apply/<id> with {approved_slugs: [...]}.
     A second daemon thread iterates approved proposals, calls
     Sterilizer.apply_recipe (which resolves food/unit IDs in Mealie,
     creating any missing rows, then PUT /api/recipes/<slug>).

Job state machine: running → review → applying → done (or 'cancelled' /
'failed' along the way). At app startup, fail_stuck_sterilize_jobs()
recovers any 'running' / 'applying' rows older than 10 min with no
progress — covers the case where the daemon thread's gunicorn worker
died mid-job. New job state lives in DB; thread is just a runner.

Concurrency: the start endpoint blocks if the household already has a
running/applying job. Sterilize calls cost clawdforge time and parallel
jobs would race on Mealie writes, so one-at-a-time per household is the
right ceiling.

UI lives at /sterilize. Linked from /me → "tools → bulk sterilize" since
it's a one-off admin action, not part of the daily flow. Mobile-aware,
diff cards expand on click. Disabled "apply" button when no recipes
are selected; preview-error rows can't be approved.

DB: two new tables (migrations 015+016). cauldron_sterilize_jobs tracks
overall job state with last_progress_at for the stuck-job recovery;
cauldron_sterilize_proposals holds per-recipe JSON proposals with the
approval flag and the final apply_at/apply_error.
This commit is contained in:
Kayos 2026-04-29 22:26:10 -07:00
parent d649b99aef
commit 9368b64a81
5 changed files with 1101 additions and 1 deletions

220
cauldron/bulk_sterilize.py Normal file
View file

@ -0,0 +1,220 @@
"""Background runner for bulk-sterilize jobs.
Architecture: a daemon thread per job. The thread holds a Mealie client
built from the starting user's decrypted token (passed in at start), walks
all recipes in the user's household, and for each one calls
Sterilizer.preview_recipe to get a structured proposal. Proposals are
written to cauldron_sterilize_proposals as we go; the job row tracks
overall progress.
Failure modes:
- clawdforge times out / errors record `preview_error` on the proposal
row, increment `error_count`, keep going
- whole-job crash daemon thread dies; row stays in 'running' forever
until DB.fail_stuck_sterilize_jobs() runs at next app boot
The thread does NOT apply changes. After the walk completes the job
moves to state='review'; user reviews proposals in the UI and POSTs
/api/sterilize/bulk-apply/<id> to actually write back to Mealie.
"""
import json
import logging
import threading
from typing import Optional
from .db import DB
from .forge import ForgeError
from .mealie import Mealie, MealieError
from .sterilizer import Sterilizer
log = logging.getLogger(__name__)
def _ingredient_needs_sterilizing(ing: dict) -> bool:
"""Heuristic: an ingredient row needs work if it has display/note content
but no resolved food. Already-parsed rows (food.id present) are skipped
so we don't waste clawdforge calls or risk regressing Cobb's manual
cleanup."""
food = ing.get("food") or {}
food_name = food.get("name") if isinstance(food, dict) else None
if food_name:
return False
has_content = bool(ing.get("display") or ing.get("note") or ing.get("originalText"))
return has_content
def _recipe_needs_sterilizing(recipe: dict) -> bool:
ings = recipe.get("recipeIngredient") or []
if not ings:
return False
return any(_ingredient_needs_sterilizing(i) for i in ings)
def run_bulk_preview(
*,
db: DB,
job_id: int,
sterilizer: Sterilizer,
) -> None:
"""Walk all recipes; persist a proposal row per recipe that needs work.
Skip already-clean recipes. Move job state on completion."""
log.info("[bulk-sterilize:%s] starting walk", job_id)
try:
# Pull every recipe slug (paginated). Mealie's listing returns
# items with slug + name; we resolve full recipes one at a time.
slugs: list[tuple[str, str]] = []
page = 1
while True:
resp = sterilizer.mealie.list_recipes(page=page, per_page=100)
items = resp.get("items") or []
for r in items:
slug = r.get("slug")
name = r.get("name") or slug or ""
if slug:
slugs.append((slug, name))
total_pages = resp.get("total_pages") or resp.get("totalPages") or 1
if page >= total_pages:
break
page += 1
# The job row was created with the recipe count from the caller's
# initial Mealie page-1 fetch. If we discovered more, update.
with db.conn() as c, c.cursor() as cur:
cur.execute(
"UPDATE cauldron_sterilize_jobs SET total_recipes=%s WHERE id=%s",
(len(slugs), job_id),
)
for slug, name in slugs:
try:
# Cheap pre-check: skip if every ingredient is already parsed
recipe = sterilizer.mealie.get_recipe(slug)
except MealieError as e:
log.warning("[bulk-sterilize:%s] mealie get_recipe(%s): %s", job_id, slug, e)
db.update_sterilize_job_progress(
job_id,
error_delta=1,
current_slug=slug,
last_error=str(e)[:500],
)
db.insert_sterilize_proposal(
job_id=job_id,
recipe_slug=slug,
recipe_name=name,
ingredient_count=0,
proposal_json=None,
preview_error=str(e)[:500],
)
continue
if not _recipe_needs_sterilizing(recipe):
db.update_sterilize_job_progress(
job_id, skipped_delta=1, current_slug=slug
)
continue
db.update_sterilize_job_progress(job_id, current_slug=slug)
try:
proposal = sterilizer.preview_recipe(slug)
db.insert_sterilize_proposal(
job_id=job_id,
recipe_slug=slug,
recipe_name=proposal.get("name") or name,
ingredient_count=proposal.get("ingredient_count")
or len(proposal.get("proposals") or []),
proposal_json=json.dumps(proposal, ensure_ascii=False),
preview_error=None,
)
db.update_sterilize_job_progress(job_id, processed_delta=1)
except (ForgeError, RuntimeError, MealieError) as e:
msg = str(e)[:500]
log.warning("[bulk-sterilize:%s] preview(%s): %s", job_id, slug, msg)
db.insert_sterilize_proposal(
job_id=job_id,
recipe_slug=slug,
recipe_name=name,
ingredient_count=0,
proposal_json=None,
preview_error=msg,
)
db.update_sterilize_job_progress(
job_id, error_delta=1, current_slug=slug, last_error=msg
)
db.finalize_sterilize_job(job_id, state="review")
log.info("[bulk-sterilize:%s] walk complete; awaiting review", job_id)
except Exception:
log.exception("[bulk-sterilize:%s] unhandled crash", job_id)
try:
db.finalize_sterilize_job(job_id, state="failed")
except Exception:
log.exception("[bulk-sterilize:%s] could not mark failed", job_id)
def run_bulk_apply(
*,
db: DB,
job_id: int,
sterilizer: Sterilizer,
) -> None:
"""Apply approved proposals back to Mealie. Job must be in state='applying'
(caller transitions it). Each recipe runs through Sterilizer.apply_recipe;
any per-recipe failure is recorded but doesn't stop the loop."""
log.info("[bulk-sterilize:%s] starting apply", job_id)
try:
approved = db.list_approved_unapplied_proposals(job_id)
for row in approved:
slug = row["recipe_slug"]
try:
db.update_sterilize_job_progress(job_id, current_slug=slug)
sterilizer.apply_recipe(slug, create_missing=True)
db.mark_proposal_applied(job_id, slug)
except (ForgeError, RuntimeError, MealieError) as e:
msg = str(e)[:500]
log.warning("[bulk-sterilize:%s] apply(%s): %s", job_id, slug, msg)
db.mark_proposal_applied(job_id, slug, error=msg)
db.update_sterilize_job_progress(
job_id, error_delta=1, current_slug=slug, last_error=msg
)
db.finalize_sterilize_job(job_id, state="done")
log.info("[bulk-sterilize:%s] apply complete", job_id)
except Exception:
log.exception("[bulk-sterilize:%s] apply unhandled crash", job_id)
try:
db.finalize_sterilize_job(job_id, state="failed")
except Exception:
log.exception("[bulk-sterilize:%s] could not mark failed", job_id)
def spawn_preview_thread(
*,
db: DB,
job_id: int,
sterilizer: Sterilizer,
name: Optional[str] = None,
) -> threading.Thread:
t = threading.Thread(
target=run_bulk_preview,
kwargs={"db": db, "job_id": job_id, "sterilizer": sterilizer},
name=name or f"bulk-sterilize-preview-{job_id}",
daemon=True,
)
t.start()
return t
def spawn_apply_thread(
*,
db: DB,
job_id: int,
sterilizer: Sterilizer,
name: Optional[str] = None,
) -> threading.Thread:
t = threading.Thread(
target=run_bulk_apply,
kwargs={"db": db, "job_id": job_id, "sterilizer": sterilizer},
name=name or f"bulk-sterilize-apply-{job_id}",
daemon=True,
)
t.start()
return t

View file

@ -228,6 +228,56 @@ MIGRATIONS = [
FOREIGN KEY (authentik_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 015 — bulk sterilizer jobs. One row per bulk-parse run kicked off by a
# household member. State machine:
# running → walking recipes, calling Sterilizer.preview_recipe
# review → walk done, awaiting human approve/reject in /sterilize UI
# applying → applying approved proposals back to Mealie
# done → complete
# failed → unrecoverable error or stuck-job recovery marked it
"""
CREATE TABLE IF NOT EXISTS cauldron_sterilize_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
household_id BIGINT NOT NULL,
started_by_sub VARCHAR(190) NOT NULL,
started_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_progress_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
finished_at DATETIME,
total_recipes INT NOT NULL DEFAULT 0,
processed_count INT NOT NULL DEFAULT 0,
skipped_count INT NOT NULL DEFAULT 0,
error_count INT NOT NULL DEFAULT 0,
current_slug VARCHAR(255),
last_error VARCHAR(500),
state ENUM('running','review','applying','done','failed','cancelled')
NOT NULL DEFAULT 'running',
INDEX idx_household_state (household_id, state),
INDEX idx_started_by (started_by_sub),
FOREIGN KEY (household_id) REFERENCES cauldron_households(id) ON DELETE CASCADE,
FOREIGN KEY (started_by_sub) REFERENCES cauldron_users(authentik_sub) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
# 016 — per-recipe proposals for a sterilize job. proposal_json is the
# full Sterilizer.preview_recipe() output. approved=NULL means awaiting
# review; TRUE = will apply; FALSE = skipped on apply. apply_error
# captures any failure when applying that specific recipe.
"""
CREATE TABLE IF NOT EXISTS cauldron_sterilize_proposals (
job_id BIGINT NOT NULL,
recipe_slug VARCHAR(255) NOT NULL,
recipe_name VARCHAR(500),
ingredient_count INT NOT NULL DEFAULT 0,
proposal_json JSON,
preview_error VARCHAR(500),
approved BOOLEAN,
applied_at DATETIME,
apply_error VARCHAR(500),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (job_id, recipe_slug),
INDEX idx_approved (job_id, approved),
FOREIGN KEY (job_id) REFERENCES cauldron_sterilize_jobs(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""",
]
@ -853,3 +903,202 @@ class DB:
""",
(sub, intent, duration_ms, model, prompt_chars, result_chars, ok, (error or "")[:500] or None),
)
# --- bulk sterilizer ----------------------------------------------------
def create_sterilize_job(
self, *, household_id: int, started_by_sub: str, total: int
) -> int:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_sterilize_jobs
(household_id, started_by_sub, total_recipes, state)
VALUES (%s, %s, %s, 'running')
""",
(household_id, started_by_sub, total),
)
return cur.lastrowid
def get_sterilize_job(self, job_id: int) -> dict | None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"SELECT * FROM cauldron_sterilize_jobs WHERE id=%s", (job_id,)
)
return cur.fetchone()
def latest_sterilize_job_for_household(self, household_id: int) -> dict | None:
"""Most recent job (by started_at) for the household — used by the
UI to figure out what to render on /sterilize."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_sterilize_jobs
WHERE household_id=%s
ORDER BY started_at DESC
LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def running_sterilize_job_for_household(self, household_id: int) -> dict | None:
"""Active (running or applying) job; used to gate concurrent starts."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT * FROM cauldron_sterilize_jobs
WHERE household_id=%s AND state IN ('running','applying')
ORDER BY started_at DESC
LIMIT 1""",
(household_id,),
)
return cur.fetchone()
def update_sterilize_job_progress(
self,
job_id: int,
*,
processed_delta: int = 0,
skipped_delta: int = 0,
error_delta: int = 0,
current_slug: str | None = None,
last_error: str | None = None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_sterilize_jobs
SET processed_count = processed_count + %s,
skipped_count = skipped_count + %s,
error_count = error_count + %s,
current_slug = %s,
last_error = COALESCE(%s, last_error),
last_progress_at = NOW()
WHERE id=%s
""",
(processed_delta, skipped_delta, error_delta,
current_slug, last_error, job_id),
)
def finalize_sterilize_job(self, job_id: int, *, state: str) -> None:
"""Move job to a terminal state (review/done/failed/cancelled)."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
UPDATE cauldron_sterilize_jobs
SET state=%s,
finished_at = CASE WHEN %s IN ('done','failed','cancelled')
THEN NOW() ELSE finished_at END,
last_progress_at = NOW(),
current_slug = NULL
WHERE id=%s
""",
(state, state, job_id),
)
def insert_sterilize_proposal(
self,
*,
job_id: int,
recipe_slug: str,
recipe_name: str | None,
ingredient_count: int,
proposal_json: str | None,
preview_error: str | None,
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""
INSERT INTO cauldron_sterilize_proposals
(job_id, recipe_slug, recipe_name, ingredient_count,
proposal_json, preview_error)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
recipe_name=VALUES(recipe_name),
ingredient_count=VALUES(ingredient_count),
proposal_json=VALUES(proposal_json),
preview_error=VALUES(preview_error)
""",
(job_id, recipe_slug, (recipe_name or "")[:500],
ingredient_count, proposal_json, (preview_error or "")[:500] or None),
)
def list_sterilize_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT recipe_slug, recipe_name, ingredient_count,
proposal_json, preview_error,
approved, applied_at, apply_error
FROM cauldron_sterilize_proposals
WHERE job_id=%s
ORDER BY recipe_name ASC""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def set_proposal_approval(
self, job_id: int, recipe_slug: str, approved: bool
) -> bool:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_proposals
SET approved=%s
WHERE job_id=%s AND recipe_slug=%s""",
(1 if approved else 0, job_id, recipe_slug),
)
return cur.rowcount > 0
def bulk_set_proposal_approvals(
self, job_id: int, approved_slugs: list[str]
) -> None:
"""Set approved=TRUE for the listed slugs, FALSE for everything else
on the job. Idempotent safe to call before /bulk-apply."""
with self.conn() as c, c.cursor() as cur:
# Reset everything to FALSE first (NULL stays NULL only on rows
# that aren't part of this job, which is none)
cur.execute(
"UPDATE cauldron_sterilize_proposals SET approved=0 WHERE job_id=%s",
(job_id,),
)
if approved_slugs:
placeholders = ",".join(["%s"] * len(approved_slugs))
cur.execute(
f"""UPDATE cauldron_sterilize_proposals SET approved=1
WHERE job_id=%s AND recipe_slug IN ({placeholders})""",
(job_id, *approved_slugs),
)
def mark_proposal_applied(
self, job_id: int, recipe_slug: str, *, error: str | None = None
) -> None:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_proposals
SET applied_at=NOW(), apply_error=%s
WHERE job_id=%s AND recipe_slug=%s""",
((error or "")[:500] or None, job_id, recipe_slug),
)
def list_approved_unapplied_proposals(self, job_id: int) -> list[dict]:
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""SELECT recipe_slug, recipe_name
FROM cauldron_sterilize_proposals
WHERE job_id=%s AND approved=1 AND applied_at IS NULL""",
(job_id,),
)
return [dict(r) for r in cur.fetchall()]
def fail_stuck_sterilize_jobs(self, *, stale_minutes: int = 10) -> int:
"""Recover jobs stuck in 'running'/'applying' with no progress for
N minutes. Called at app startup. Returns count of jobs failed."""
with self.conn() as c, c.cursor() as cur:
cur.execute(
"""UPDATE cauldron_sterilize_jobs
SET state='failed',
finished_at=NOW(),
last_error=COALESCE(last_error,
'recovery: worker exited mid-run')
WHERE state IN ('running','applying')
AND last_progress_at < NOW() - INTERVAL %s MINUTE""",
(stale_minutes,),
)
return cur.rowcount

View file

@ -30,7 +30,7 @@ from .config import load
from .crypto import TokenCrypto
from .db import DB
from .forge import Forge, ForgeError
from . import aggregator, foods
from . import aggregator, bulk_sterilize, foods
from .mealie import Mealie, MealieError
from .oidc import init_oauth
from .recipe_index import flatten_recipe, refresh_household_index, search_index
@ -74,6 +74,16 @@ def create_app() -> Flask:
except Exception as e:
app.logger.warning("foods seed load failed: %s", e)
# Recover sterilize jobs whose worker died mid-run. A new run should
# produce no false positives: gunicorn-sync workers reconnect cleanly,
# the threshold is conservative (10 minutes of zero progress).
try:
n_failed = db.fail_stuck_sterilize_jobs(stale_minutes=10)
if n_failed:
app.logger.info("failed %d stuck sterilize jobs at boot", n_failed)
except Exception as e:
app.logger.warning("sterilize stuck-job recovery failed: %s", e)
oauth = init_oauth(
app,
issuer=cfg.oidc_issuer,
@ -814,6 +824,142 @@ def create_app() -> Flask:
active="recipes",
)
# ---------- bulk sterilizer (Phase A1) ------------------------------
def _user_sterilizer() -> Sterilizer | None:
"""Build a Sterilizer bound to the current session user's Mealie
token. Returns None if the user hasn't connected Mealie yet."""
client = current_user_mealie()
if not client:
return None
return Sterilizer(mealie=client, forge=forge, model=cfg.default_model)
@app.get("/sterilize")
@require_session
def sterilize_page():
hid = current_household_id()
if not hid:
return redirect(url_for("connect_mealie_get"))
latest = db.latest_sterilize_job_for_household(hid)
return render_template(
"sterilize.html",
active="sterilize",
latest_job=latest,
)
@app.post("/api/sterilize/bulk-start")
@require_session
def sterilize_bulk_start():
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
# Block concurrent jobs per household — sterilize calls are billed
# against clawdforge time, and parallel jobs would race on writes
# to the same Mealie recipes.
active = db.running_sterilize_job_for_household(hid)
if active:
return jsonify({"error": "already_running", "job_id": active["id"]}), 409
sterilizer = _user_sterilizer()
if sterilizer is None:
return redirect(url_for("connect_mealie_get"))
# Get an upper-bound recipe count for the progress UI. The thread
# will refine this with the true total once it's walked all pages.
try:
page1 = sterilizer.mealie.list_recipes(page=1, per_page=1)
except MealieError as e:
return jsonify({"error": "mealie_unreachable", "detail": str(e)}), 502
total = int(page1.get("total") or page1.get("totalItems") or 0)
job_id = db.create_sterilize_job(
household_id=hid, started_by_sub=u["sub"], total=total
)
bulk_sterilize.spawn_preview_thread(
db=db, job_id=job_id, sterilizer=sterilizer
)
return jsonify({"ok": True, "job_id": job_id, "total": total})
@app.get("/api/sterilize/bulk-status")
@require_session
def sterilize_bulk_status():
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.latest_sterilize_job_for_household(hid)
if not job:
return jsonify({"job": None})
return jsonify({"job": _job_payload(job)})
@app.get("/api/sterilize/bulk-jobs/<int:job_id>/proposals")
@require_session
def sterilize_bulk_proposals(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_sterilize_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
proposals = db.list_sterilize_proposals(job_id)
# Inflate proposal_json strings → dicts for the client
for p in proposals:
blob = p.get("proposal_json")
if isinstance(blob, str):
try:
p["proposal_json"] = _json_loads(blob)
except Exception:
p["proposal_json"] = None
return jsonify({
"job": _job_payload(job),
"proposals": proposals,
})
@app.post("/api/sterilize/bulk-apply/<int:job_id>")
@require_session
def sterilize_bulk_apply(job_id: int):
u = session["user"]
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_sterilize_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("review",):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
body = request.get_json(silent=True) or {}
approved_slugs = body.get("approved_slugs") or []
if not isinstance(approved_slugs, list):
return jsonify({"error": "approved_slugs must be a list"}), 400
approved_slugs = [str(s) for s in approved_slugs if isinstance(s, str)]
sterilizer = _user_sterilizer()
if sterilizer is None:
return redirect(url_for("connect_mealie_get"))
db.bulk_set_proposal_approvals(job_id, approved_slugs)
db.finalize_sterilize_job(job_id, state="applying")
bulk_sterilize.spawn_apply_thread(
db=db, job_id=job_id, sterilizer=sterilizer
)
return jsonify({"ok": True, "job_id": job_id, "approved_count": len(approved_slugs)})
@app.post("/api/sterilize/bulk-cancel/<int:job_id>")
@require_session
def sterilize_bulk_cancel(job_id: int):
hid = current_household_id()
if not hid:
return jsonify({"error": "no household"}), 409
job = db.get_sterilize_job(job_id)
if not job or job["household_id"] != hid:
return jsonify({"error": "not_found"}), 404
if job["state"] not in ("running", "review", "applying"):
return jsonify({"error": f"bad_state:{job['state']}"}), 409
db.finalize_sterilize_job(job_id, state="cancelled")
return jsonify({"ok": True})
# ---------- v0.1 admin endpoints (carry over) ------------------------
@app.get("/api/recipes")
@ -964,6 +1110,16 @@ def _resolve_sub_displays(db, plan: dict) -> dict[str, str]:
return out
def _job_payload(job: dict) -> dict:
"""JSON-serializable view of a sterilize job row (datetimes → iso)."""
j = dict(job)
for k in ("started_at", "last_progress_at", "finished_at"):
v = j.get(k)
if v is not None and hasattr(v, "isoformat"):
j[k] = v.isoformat()
return j
def _plan_payload(plan: dict) -> dict:
"""JSON-serializable view of a plan dict (datetimes → iso strings)."""
p = dict(plan)

View file

@ -49,6 +49,17 @@
{% endif %}
</section>
{% if connected %}
<section class="panel">
<div class="panel-head">
<h2>tools</h2>
<span class="ctx">household admin</span>
</div>
<p class="muted">mealie's parser is per-recipe; this kicks off a bulk pass over your whole library. review proposals, apply the good ones.</p>
<p><a class="btn" href="/sterilize">🪄 bulk sterilize recipes →</a></p>
</section>
{% endif %}
<hr>
<form method="post" action="/logout">
<button class="btn" type="submit">Sign out</button>

View file

@ -0,0 +1,464 @@
{% extends "_base.html" %}
{% block title %}Sterilize · Cauldron{% endblock %}
{% block content %}
<style>
.progress-rail {
width: 100%; height: 14px;
background: var(--bg-2);
border: 1px solid var(--line);
border-radius: 8px;
overflow: hidden;
margin: 12px 0 6px 0;
}
.progress-fill {
height: 100%;
background: linear-gradient(90deg, var(--purple-deep), var(--purple-bright));
transition: width .3s ease;
box-shadow: 0 0 12px -2px var(--purple-glow);
}
.progress-meta {
color: var(--bone-dim);
font-family: var(--mono); font-size: 12px;
letter-spacing: .1em;
display: flex; gap: 18px; flex-wrap: wrap;
}
.progress-meta strong { color: var(--bone); }
.review-bar {
position: sticky; top: 70px; z-index: 5;
display: flex; align-items: center; justify-content: space-between;
flex-wrap: wrap; gap: 12px;
padding: 12px 14px;
background: var(--bg-2);
border: 1px solid var(--line);
border-radius: 8px;
margin-bottom: 14px;
}
.review-bar .left { display: flex; gap: 14px; align-items: center; }
.review-bar .right { display: flex; gap: 8px; }
.proposals-grid { display: grid; grid-template-columns: 1fr; gap: 10px; }
.proposal-card {
background: var(--surface);
border: 1px solid var(--line);
border-left: 3px solid var(--purple-dim);
border-radius: 6px;
padding: 12px 14px;
}
.proposal-card.rejected { border-left-color: var(--muted); opacity: .55; }
.proposal-card.approved { border-left-color: var(--green-bright); }
.proposal-card.errored { border-left-color: var(--crit); }
.proposal-head {
display: flex; align-items: center; justify-content: space-between;
gap: 10px; flex-wrap: wrap;
cursor: pointer; user-select: none;
}
.proposal-name {
color: var(--bone); font-family: var(--serif);
font-size: 1.05em; line-height: 1.2;
}
.proposal-meta {
color: var(--muted); font-family: var(--mono);
font-size: 11px; letter-spacing: .15em; text-transform: uppercase;
}
.toggle {
display: inline-flex; align-items: center; gap: 6px;
padding: 4px 10px; border-radius: 999px;
border: 1px solid var(--line);
background: var(--bg-2);
color: var(--bone-dim);
font-family: var(--mono); font-size: 11px; letter-spacing: .1em;
cursor: pointer; min-height: 28px;
}
.toggle.on { background: rgba(110,168,72,.18); border-color: var(--green-dim); color: var(--green-bright); }
.toggle.off { background: rgba(232,96,106,.12); border-color: rgba(232,96,106,.3); color: var(--crit); }
.diff-table {
width: 100%; border-collapse: collapse;
margin-top: 10px; font-size: 13px;
display: none;
}
.proposal-card.expanded .diff-table { display: table; }
.diff-table th {
text-align: left; color: var(--purple); font-family: var(--mono);
font-size: 10px; letter-spacing: .15em; text-transform: uppercase;
padding: 4px 8px; border-bottom: 1px solid var(--line);
}
.diff-table td {
padding: 4px 8px; vertical-align: top;
border-bottom: 1px solid var(--line-soft);
color: var(--bone-dim);
}
.diff-table .orig { color: var(--bone-dim); font-style: italic; }
.diff-table .new { color: var(--bone); }
.pill-error { background: rgba(232,96,106,.15); color: var(--crit); }
.empty-state {
padding: 28px 14px; text-align: center;
color: var(--bone-dim);
}
</style>
<div class="page-head">
<div class="crumb">// sterilize · bulk recipe parser</div>
<h1>bulk <span class="accent">sterilize</span></h1>
<div class="lede">
walk every recipe in the household, send the unparsed ingredients to
sonnet, review the proposals, apply the good ones. mealie's food
table gets cleaner; the shopping list math gets sharper.
</div>
</div>
<section class="panel" id="sterilize-shell">
<div class="panel-head">
<h2>state</h2>
<span class="pill" id="state-pill">loading…</span>
<span class="ctx" id="state-ctx"></span>
</div>
<div id="empty-pane" style="display: none;">
<p>nothing's running. kick off a bulk parse?</p>
<button class="btn btn-purple" id="start-btn" type="button" onclick="startBulk()">🪄 start bulk sterilize</button>
<p class="muted" style="margin-top: 8px;">
cost: ~5s/recipe via clawdforge. your 226-recipe sweep takes ~15-20 min.
</p>
</div>
<div id="progress-pane" style="display: none;">
<div class="progress-rail"><div class="progress-fill" id="bar" style="width: 0%;"></div></div>
<div class="progress-meta">
<span><strong id="processed">0</strong> processed</span>
<span><strong id="skipped">0</strong> already-clean</span>
<span><strong id="errors">0</strong> errors</span>
<span>of <strong id="total">?</strong></span>
<span class="muted" id="current-slug"></span>
</div>
<div class="btn-row" style="margin-top: 12px;">
<button class="btn" type="button" onclick="cancelJob()">cancel</button>
</div>
</div>
<div id="review-pane" style="display: none;">
<div class="review-bar">
<div class="left">
<span><strong id="approved-count">0</strong> selected</span>
<span class="muted" id="review-meta"></span>
</div>
<div class="right">
<button class="btn" type="button" onclick="setAll(true)">select all</button>
<button class="btn" type="button" onclick="setAll(false)">clear</button>
<button class="btn btn-purple" type="button" id="apply-btn" onclick="applyApproved()">apply selected →</button>
</div>
</div>
<div class="proposals-grid" id="proposals-grid"></div>
</div>
<div id="done-pane" style="display: none;">
<p id="done-line"></p>
<button class="btn btn-purple" type="button" onclick="startBulk()">↻ start a new run</button>
</div>
<div id="failed-pane" style="display: none;">
<p style="color: var(--crit);" id="failed-line"></p>
<button class="btn btn-purple" type="button" onclick="startBulk()">↻ retry</button>
</div>
</section>
<script>
let job = {{ (latest_job | tojson) if latest_job else 'null' }};
let pollTimer = null;
let proposals = [];
function $(id) { return document.getElementById(id); }
function showPane(name) {
for (const p of ['empty', 'progress', 'review', 'done', 'failed']) {
$(`${p}-pane`).style.display = (p === name) ? '' : 'none';
}
}
function setStatePill(text, klass) {
const el = $('state-pill');
el.textContent = text;
el.className = 'pill ' + (klass || 'pill-mute');
}
function paintProgress() {
if (!job) return;
const total = job.total_recipes || 0;
const done = (job.processed_count || 0) + (job.skipped_count || 0) + (job.error_count || 0);
const pct = total > 0 ? Math.round((done / total) * 100) : 0;
$('bar').style.width = pct + '%';
$('processed').textContent = job.processed_count || 0;
$('skipped').textContent = job.skipped_count || 0;
$('errors').textContent = job.error_count || 0;
$('total').textContent = total || '?';
$('current-slug').textContent = job.current_slug ? `· ${job.current_slug}` : '';
}
async function fetchJob() {
try {
const r = await fetch('/api/sterilize/bulk-status');
const data = await r.json();
job = data.job || null;
route();
} catch (e) {
console.error('status poll failed', e);
}
}
function route() {
if (!job) {
stopPolling();
setStatePill('idle', 'pill-mute');
$('state-ctx').textContent = '';
showPane('empty');
return;
}
const state = job.state;
$('state-ctx').textContent = `started ${new Date(job.started_at).toLocaleString()}`;
if (state === 'running') {
setStatePill('walking', 'pill-ok');
paintProgress();
showPane('progress');
startPolling();
} else if (state === 'review') {
setStatePill('review', 'pill-ok');
paintProgress();
showPane('review');
stopPolling();
loadProposals();
} else if (state === 'applying') {
setStatePill('applying', 'pill-ok');
paintProgress();
showPane('progress');
startPolling();
} else if (state === 'done') {
setStatePill('done', 'pill-mute');
const total = job.processed_count || 0;
$('done-line').textContent = `applied ${total} recipe${total === 1 ? '' : 's'}. mealie should look cleaner.`;
showPane('done');
stopPolling();
} else if (state === 'failed') {
setStatePill('failed', 'pill-mute');
$('failed-line').textContent = job.last_error || 'job failed.';
showPane('failed');
stopPolling();
} else if (state === 'cancelled') {
setStatePill('cancelled', 'pill-mute');
$('done-line').textContent = 'job cancelled.';
showPane('done');
stopPolling();
}
}
function startPolling() {
if (pollTimer) return;
pollTimer = setInterval(fetchJob, 2000);
}
function stopPolling() {
if (pollTimer) { clearInterval(pollTimer); pollTimer = null; }
}
async function startBulk() {
const btn = $('start-btn');
if (btn) { btn.disabled = true; btn.textContent = 'kicking off…'; }
try {
const r = await fetch('/api/sterilize/bulk-start', { method: 'POST' });
if (!r.ok) {
const j = await r.json().catch(() => ({}));
throw new Error(j.error || r.status);
}
await fetchJob();
} catch (e) {
alert('start failed: ' + e.message);
if (btn) { btn.disabled = false; btn.textContent = '🪄 start bulk sterilize'; }
}
}
async function cancelJob() {
if (!job) return;
if (!confirm('cancel this bulk run?')) return;
try {
await fetch('/api/sterilize/bulk-cancel/' + job.id, { method: 'POST' });
await fetchJob();
} catch (e) {
alert('cancel failed: ' + e.message);
}
}
async function loadProposals() {
if (!job) return;
try {
const r = await fetch('/api/sterilize/bulk-jobs/' + job.id + '/proposals');
const data = await r.json();
proposals = data.proposals || [];
// default: approve everything that has no preview_error
for (const p of proposals) {
if (p.approved === null || p.approved === undefined) {
p.approved = !p.preview_error;
}
}
renderProposals();
} catch (e) {
console.error('proposals load failed', e);
}
}
function renderProposals() {
const grid = $('proposals-grid');
grid.innerHTML = '';
let approvedCount = 0;
if (!proposals.length) {
grid.innerHTML = '<div class="empty-state">no proposals — every recipe was already clean.</div>';
$('apply-btn').disabled = true;
$('approved-count').textContent = 0;
$('review-meta').textContent = '';
return;
}
for (const p of proposals) {
if (p.approved) approvedCount++;
grid.appendChild(renderOne(p));
}
$('approved-count').textContent = approvedCount;
const total = proposals.filter(p => !p.preview_error).length;
$('review-meta').textContent = `· ${total} parseable, ${proposals.length - total} errored`;
$('apply-btn').disabled = approvedCount === 0;
}
function renderOne(p) {
const card = document.createElement('div');
card.className = 'proposal-card';
if (p.preview_error) card.classList.add('errored');
else if (p.approved) card.classList.add('approved');
else card.classList.add('rejected');
const head = document.createElement('div');
head.className = 'proposal-head';
const left = document.createElement('div');
left.style.flex = '1';
const nm = document.createElement('div');
nm.className = 'proposal-name';
nm.textContent = p.recipe_name || p.recipe_slug;
const meta = document.createElement('div');
meta.className = 'proposal-meta';
if (p.preview_error) {
meta.innerHTML = '<span class="pill pill-error">parse failed</span> ' + escapeHtml(p.preview_error);
} else {
meta.textContent = `${p.ingredient_count} ingredient${p.ingredient_count === 1 ? '' : 's'}` +
(p.applied_at ? ` · applied ${new Date(p.applied_at).toLocaleString()}` : '');
}
left.appendChild(nm);
left.appendChild(meta);
const toggle = document.createElement('button');
toggle.type = 'button';
toggle.className = 'toggle ' + (p.approved ? 'on' : 'off');
toggle.textContent = p.approved ? 'approved' : 'skip';
toggle.disabled = !!p.preview_error;
toggle.onclick = (e) => { e.stopPropagation(); flipApproval(p, card, toggle); };
head.appendChild(left);
head.appendChild(toggle);
head.onclick = () => { card.classList.toggle('expanded'); };
card.appendChild(head);
if (!p.preview_error && p.proposal_json && p.proposal_json.proposals) {
const tbl = document.createElement('table');
tbl.className = 'diff-table';
tbl.innerHTML = `
<thead><tr>
<th>was</th><th></th><th>becomes</th>
</tr></thead>
<tbody>
${p.proposal_json.proposals.map(rowProposal).join('')}
</tbody>`;
card.appendChild(tbl);
}
return card;
}
function rowProposal(rp) {
const pa = rp.parsed || {};
const wasParts = [];
if (rp.original_quantity !== null && rp.original_quantity !== undefined) wasParts.push(rp.original_quantity);
if (rp.original_unit_name) wasParts.push(rp.original_unit_name);
if (rp.original_food_name) wasParts.push(rp.original_food_name);
if (rp.original_note) wasParts.push(`(${rp.original_note})`);
const wasStr = wasParts.length ? wasParts.join(' ') : (rp.original_display || '—');
const newParts = [];
if (pa.quantity !== null && pa.quantity !== undefined) newParts.push(pa.quantity);
if (pa.unit) newParts.push(pa.unit);
if (pa.food) newParts.push(pa.food);
if (pa.note) newParts.push(`(${pa.note})`);
const newStr = newParts.length ? newParts.join(' ') : '—';
return `<tr>
<td class="orig">${escapeHtml(wasStr)}</td>
<td></td>
<td class="new">${escapeHtml(newStr)}</td>
</tr>`;
}
function escapeHtml(s) {
return String(s).replace(/[&<>"']/g, m => ({
'&':'&amp;','<':'&lt;','>':'&gt;','"':'&quot;',"'":'&#39;'
}[m]));
}
function flipApproval(p, card, toggle) {
p.approved = !p.approved;
card.classList.toggle('approved', p.approved);
card.classList.toggle('rejected', !p.approved);
toggle.classList.toggle('on', p.approved);
toggle.classList.toggle('off', !p.approved);
toggle.textContent = p.approved ? 'approved' : 'skip';
const cnt = proposals.filter(x => x.approved).length;
$('approved-count').textContent = cnt;
$('apply-btn').disabled = cnt === 0;
}
function setAll(on) {
for (const p of proposals) {
if (p.preview_error) continue;
p.approved = on;
}
renderProposals();
}
async function applyApproved() {
const slugs = proposals.filter(p => p.approved && !p.preview_error).map(p => p.recipe_slug);
if (!slugs.length) return;
if (!confirm(`apply ${slugs.length} recipe${slugs.length === 1 ? '' : 's'} to mealie?`)) return;
const btn = $('apply-btn');
btn.disabled = true; btn.textContent = 'applying…';
try {
const r = await fetch('/api/sterilize/bulk-apply/' + job.id, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ approved_slugs: slugs }),
});
if (!r.ok) {
const j = await r.json().catch(() => ({}));
throw new Error(j.error || r.status);
}
await fetchJob();
} catch (e) {
alert('apply failed: ' + e.message);
btn.disabled = false; btn.textContent = 'apply selected →';
}
}
// Boot
route();
// If we landed on a page mid-run, start polling immediately
if (job && (job.state === 'running' || job.state === 'applying')) startPolling();
</script>
{% endblock %}