cauldron/cauldron/forge.py
Kayos 8752fcd340 plan: Flavor B — 🔮 forgotten gems pulls from Discover too
The suggestion panel now folds enriched-but-unimported Discover entries
into the same Hecate-driven pool as library recipes. Pinning a discover
suggestion auto-imports it to the household's Mealie library and adds
the resulting slug to cauldron_meal_picks.

- db: list_discover_eligible_for_group(mealie_group_id, limit=80) — joins
  cauldron_discovered_recipes against cauldron_discover_imports to find
  enriched rows no household in the caller's group has imported yet
- forge.suggest_recipes: accepts a per-entry `source` field; when any
  entry is source='discover' the prompt gains an explicit hint so Hecate
  treats discover entries as "something new to try" with a soft cap on
  proportion (~half max unless library exhausted)
- /api/plan/suggest: builds a unified pool with prefixed IDs (lib:<slug>
  vs disc:<id>) so library and discover entries can coexist in Sonnet's
  validation map; decorates each suggestion with kind+image+meta_summary+
  hecate_quip; discover entries also carry source_url and source_host
- /api/plan/suggest/pin: new dispatch on body.kind:
  - kind=library (default — back-compat with existing Flavor A callers):
    same as before
  - kind=discover: looks up the discover row, short-circuits to cached
    Mealie slug if THIS household already imported it, else
    mealie.import_from_url() + record_discover_import() + add to picks.
    Returns {kind, mealie_slug, imported, added_to_picks}
- plan.html: card render is kind-aware. Discover entries get a
  "📬 from Discover · <host>" footer instead of last-planned recall, an
  "import + pin" button label, and use data-* attributes for the click
  payload so the JS doesn't need to template-interpolate slugs into
  onclick handlers
2026-05-01 20:57:44 -07:00

1451 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Thin HTTP client for clawdforge — we're a consumer."""
import json
import re
import requests
class ForgeError(RuntimeError):
pass
_DAYS = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
class Forge:
def __init__(self, *, base_url: str, token: str, default_model: str, default_timeout: int):
self.base_url = base_url.rstrip("/")
self.token = token
self.default_model = default_model
self.default_timeout = default_timeout
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}
def healthz(self) -> dict:
r = requests.get(f"{self.base_url}/healthz", headers=self._headers(), timeout=10)
r.raise_for_status()
return r.json()
def run(
self,
prompt: str,
*,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> dict:
"""POST /run. Returns parsed result dict on success.
Raises ForgeError on transport or upstream failure. The 'result' field
in the return is whatever clawdforge parsed out of `claude -p` — usually
a dict (when the prompt asked for JSON), occasionally a string.
"""
body = {"prompt": prompt, "model": model or self.default_model}
if system:
body["system"] = system
if files:
body["files"] = files
if timeout_secs:
body["timeout_secs"] = timeout_secs
# HTTP timeout = subprocess timeout + a 30s margin so we don't bail
# while clawdforge is still doing work for us.
http_timeout = (timeout_secs or self.default_timeout) + 30
try:
r = requests.post(
f"{self.base_url}/run",
headers=self._headers(),
json=body,
timeout=http_timeout,
)
except requests.RequestException as e:
raise ForgeError(f"transport: {e}") from e
if r.status_code >= 400:
raise ForgeError(f"upstream {r.status_code}: {r.text[:500]}")
return r.json()
def generate_plan(
self,
*,
picks: list[dict],
recipes: list[dict],
slots: int = 7,
week_start: str,
preference: str | None = None,
picker_profiles: dict | None = None,
daily_targets: dict | None = None,
exclusions: list | None = None,
meal_types: list | None = None,
model: str | None = None,
) -> list[dict]:
"""Ask Sonnet for a {slots}-day plan. Returns a list of slot dicts
shaped like:
{"day": "monday", "recipe_slug": "...", "recipe_name": "...",
"picker_subs": [...], "reason": "...", "source": "pick"|"mealie"}
Validates structure aggressively — wrong shape / wrong slot count /
slug-not-in-pool → ForgeError. Caller surfaces a 502 to the user.
recipes: [{slug, name, tags?}], picks: [{slug, name, picker_subs}].
Picks are the family's pinned recipes; the prompt mandates each one
appears exactly once when the pool is large enough.
"""
if slots < 1 or slots > 14:
raise ForgeError(f"bad slot count: {slots}")
if not recipes:
raise ForgeError("recipe pool empty — cannot generate")
# Build a slug → name map for validation. Use the recipe pool plus
# picks (picks should already be in the pool, but be defensive).
valid_by_slug: dict[str, str] = {}
for r in recipes:
slug = r.get("slug")
if slug:
valid_by_slug[slug] = r.get("name") or slug
for p in picks:
slug = p.get("slug")
if slug:
valid_by_slug.setdefault(slug, p.get("name") or slug)
# Default to dinner-only if not specified — back-compat with the
# original single-meal-per-day behavior.
meal_types_clean: list[str] = []
allowed_meals = {"breakfast", "lunch", "dinner", "snack", "dessert", "side"}
if isinstance(meal_types, list):
for m in meal_types:
if isinstance(m, str) and m.strip().lower() in allowed_meals:
if m.strip().lower() not in meal_types_clean:
meal_types_clean.append(m.strip().lower())
if not meal_types_clean:
meal_types_clean = ["dinner"]
# Stable order: breakfast → lunch → dinner → snack → dessert → side
meal_order = ["breakfast", "lunch", "dinner", "snack", "dessert", "side"]
meal_types_clean.sort(key=lambda m: meal_order.index(m))
expected_total = slots * len(meal_types_clean)
prompt = self._build_plan_prompt(
picks=picks, recipes=recipes, slots=slots, week_start=week_start,
preference=preference, picker_profiles=picker_profiles,
daily_targets=daily_targets, exclusions=exclusions,
meal_types=meal_types_clean,
)
result = self.run(prompt, model=model or "sonnet")
parsed, reading = _extract_plan_payload(result)
if not isinstance(parsed, list):
raise ForgeError("model output: 'slots' must be a list")
if len(parsed) != expected_total:
raise ForgeError(
f"model output: got {len(parsed)} slots, expected {expected_total} "
f"({slots} days × {len(meal_types_clean)} meals)"
)
# Pick attribution lookup keyed by slug → list[sub]
pick_subs_by_slug: dict[str, list[str]] = {}
for p in picks:
slug = p.get("slug")
if slug:
pick_subs_by_slug[slug] = list(p.get("picker_subs") or [])
out = []
seen: set[tuple[str, str]] = set() # (day, meal_type) uniqueness
for raw in parsed:
if not isinstance(raw, dict):
raise ForgeError("model output: each slot must be an object")
day = (raw.get("day") or "").strip().lower()
meal = (raw.get("meal_type") or "dinner").strip().lower()
slug = (raw.get("recipe_slug") or "").strip()
if day not in _DAYS:
raise ForgeError(f"model output: bad day '{day}'")
if meal not in meal_types_clean:
raise ForgeError(f"model output: unexpected meal_type '{meal}' (expected one of {meal_types_clean})")
key = (day, meal)
if key in seen:
raise ForgeError(f"model output: duplicate slot {day}/{meal}")
seen.add(key)
if not slug or slug not in valid_by_slug:
raise ForgeError(f"model output: unknown recipe_slug '{slug}'")
# Trust the model's picker_subs only if they intersect the real
# set. We have ground truth in pick_subs_by_slug — prefer it.
real_pickers = pick_subs_by_slug.get(slug, [])
model_pickers = raw.get("picker_subs") or []
if not isinstance(model_pickers, list):
model_pickers = []
picker_subs = real_pickers if real_pickers else [
s for s in model_pickers if isinstance(s, str)
]
source = "pick" if real_pickers else "mealie"
out.append({
"day": day,
"meal_type": meal,
"recipe_slug": slug,
"recipe_name": valid_by_slug[slug],
"picker_subs": picker_subs,
"reason": (raw.get("reason") or "")[:500],
"source": source,
})
return {"slots": out, "reading": reading}
@staticmethod
def _build_plan_prompt(*, picks, recipes, slots, week_start, preference=None,
picker_profiles=None, daily_targets=None,
exclusions=None, meal_types=None) -> str:
pool_lines = []
for r in recipes:
slug = r.get("slug") or ""
name = r.get("name") or slug
tags = r.get("tags") or []
meta = r.get("meta") or {}
extras: list[str] = []
# First 3 Mealie tags
if tags:
cleaned = []
for t in tags[:3]:
if isinstance(t, dict):
cleaned.append(t.get("name") or "")
elif isinstance(t, str):
cleaned.append(t)
cleaned = [c for c in cleaned if c]
if cleaned:
extras.append(", ".join(cleaned))
# Sonnet-generated meta — the actual high-signal stuff
if meta:
if meta.get("cuisine") and meta["cuisine"] not in ("unknown", "other"):
extras.append(meta["cuisine"])
if meta.get("complexity"):
extras.append(meta["complexity"])
em = meta.get("estimated_minutes")
if isinstance(em, int) and em > 0:
extras.append(f"{em}min")
if meta.get("primary_protein") and meta["primary_protein"] != "none":
extras.append(f"protein:{meta['primary_protein']}")
if meta.get("primary_carb") and meta["primary_carb"] != "none":
extras.append(f"carb:{meta['primary_carb']}")
if meta.get("veg_forward") and meta["veg_forward"] != "mixed":
extras.append(meta["veg_forward"])
meta_tags = meta.get("tags") or []
if meta_tags:
extras.append("/".join(meta_tags[:5]))
if meta.get("calories"):
extras.append(f"~{meta['calories']}cal")
if meta.get("protein_g"):
extras.append(f"protein={meta['protein_g']}g")
if meta.get("carbs_g"):
extras.append(f"carbs={meta['carbs_g']}g")
if meta.get("fat_g"):
extras.append(f"fat={meta['fat_g']}g")
if meta.get("fiber_g"):
extras.append(f"fiber={meta['fiber_g']}g")
if meta.get("sodium_mg"):
extras.append(f"sodium={meta['sodium_mg']}mg")
# v5: active/hands-off split lets the planner match a slot's
# available active time (e.g., "Wednesday I have 25 min hands-on")
am = meta.get("active_minutes")
ho = meta.get("hands_off_minutes")
if am is not None and ho is not None:
extras.append(f"active={am}m+offhand={ho}m")
elif am is not None:
extras.append(f"active={am}m")
# Equipment for "no oven this week" / "grill week" filters
eq = meta.get("equipment") or []
if eq:
extras.append("eq:" + ",".join(eq[:3]))
# Flavor variety so the planner doesn't run 5 spicy nights
fp = meta.get("flavor_profile") or []
if fp:
extras.append("flavor:" + "/".join(fp[:3]))
# Kid-fit signal for households with kids (Cobb has Leia + Luna)
kf = meta.get("kid_friendly_score")
if isinstance(kf, int) and kf > 0:
extras.append(f"kid={kf}")
# Cost — for budget-week filtering
cs = meta.get("cost_per_serving_estimate")
if cs:
extras.append(f"~${cs}/svg")
# Occasion fit — when is this dish best?
of = meta.get("occasion_fit") or []
if of:
extras.append("for:" + "/".join(of[:3]))
# Allergen flags — short-circuit list of "what's in this"
contains = meta.get("contains") or {}
if isinstance(contains, dict):
flags = [k for k, v in contains.items() if v]
if flags:
extras.append("has:" + ",".join(flags))
# Per-user fit score (NEW): the recipe pool entry now carries
# a `fit` dict computed in server.py from picker_profiles ×
# this recipe's meta. {sub: 1-5}. Render as "fit:cobb=5,abby=2"
fit = r.get("fit") or {}
if isinstance(fit, dict) and fit:
pieces = [f"{k}={v}" for k, v in list(fit.items())[:4]]
extras.append("fit:" + ",".join(pieces))
# Rotation history — let Sonnet avoid 3-weeks-in-a-row repeats
history = r.get("history") or {}
if history:
wa = history.get("weeks_ago")
c30 = history.get("count_30d") or 0
cl = history.get("count_long") or 0
hist_bits = []
if wa is not None:
hist_bits.append(f"last:{wa}w-ago" if wa > 0 else "last:this-week")
if c30:
hist_bits.append(f"{c30}×/30d")
if cl:
hist_bits.append(f"{cl}×/180d")
if hist_bits:
extras.append(" ".join(hist_bits))
if meta and meta.get("summary"):
# Inline 1-line summary helps Sonnet match preferences
summary = str(meta["summary"])[:140]
pool_lines.append(f"- {slug}: {name} [{' · '.join(extras)}]\n {summary}")
continue
extra_str = f" [{' · '.join(extras)}]" if extras else ""
pool_lines.append(f"- {slug}: {name}{extra_str}")
pick_lines = []
for p in picks:
slug = p.get("slug") or ""
name = p.get("name") or slug
pickers = p.get("pickers") or []
picker_subs = p.get("picker_subs") or []
who = ", ".join(pickers) if pickers else "household"
subs_repr = json.dumps(picker_subs)
pick_lines.append(f"- {slug}: {name} (picked by [{who}], picker_subs={subs_repr})")
picks_block = "\n".join(pick_lines) if pick_lines else "(none)"
pool_block = "\n".join(pool_lines)
# Picker profiles: per-member historical picking patterns. Helps
# Sonnet bias AI-chosen slots toward each member's actual taste.
profile_block = ""
if picker_profiles:
lines: list[str] = []
for sub, prof in picker_profiles.items():
if not isinstance(prof, dict):
continue
name = prof.get("display_name") or sub
total = prof.get("total_picks") or 0
bits = []
cuisines = prof.get("cuisines") or {}
if cuisines:
bits.append("cuisines=[" + ", ".join(
f"{k}:{v}" for k, v in list(cuisines.items())[:4]
) + "]")
proteins = prof.get("proteins") or {}
if proteins:
bits.append("proteins=[" + ", ".join(
f"{k}:{v}" for k, v in list(proteins.items())[:4]
) + "]")
tags = prof.get("tags") or {}
if tags:
bits.append("tags=[" + ", ".join(
f"{k}:{v}" for k, v in list(tags.items())[:5]
) + "]")
tier = prof.get("comfort_tiers") or {}
if tier:
bits.append("tier=[" + ", ".join(
f"{k}:{v}" for k, v in list(tier.items())[:2]
) + "]")
if bits:
lines.append(f" - {name} (sub={sub}, {total} picks): " + " · ".join(bits))
if lines:
profile_block = (
"\nPICKER PROFILES — per-member historical picking patterns:\n"
+ "\n".join(lines) + "\n\n"
"Use these to bias AI-chosen slots toward each member's "
"preferences. e.g., if Cobb's profile shows cuisines=[asian:6, "
"mexican:4] and proteins=[chicken:8], lean toward asian-chicken "
"recipes for the AI-filled slots when other constraints permit. "
"Picks still take precedence over profile bias.\n"
)
# Numeric daily targets — sum up over the 7-day plan and aim
# for the cumulative budget. Per-recipe macros come from
# cauldron_recipe_meta (Sonnet-estimated).
targets_block = ""
if isinstance(daily_targets, dict) and daily_targets:
target_lines = []
for k, label in (("calories", "cal"), ("protein_g", "g protein"),
("carbs_g", "g carbs"), ("fat_g", "g fat")):
v = daily_targets.get(k)
try:
if v and int(v) > 0:
target_lines.append(f"{int(v)}{label}/day ({int(v) * slots}{label}/week)")
except (TypeError, ValueError):
continue
if target_lines:
targets_block = (
"\nDAILY MACRO TARGETS (sum across the 7-day plan):\n - "
+ "\n - ".join(target_lines) + "\n\n"
"Rule: build the plan so the SUM of per-serving macros across "
"all 7 slots lands within ±15% of the weekly totals shown above. "
"If recipe macros are null/missing, treat as average (use the "
"summary + name to estimate). Trade off slots if needed — push "
"high-protein dishes to make protein, push lighter dishes if "
"calories are running over. Picks STILL must appear; balance "
"with the AI-chosen slots.\n"
)
# Allergen / dietary exclusions — strict filter, not bias
excl_block = ""
if isinstance(exclusions, list) and exclusions:
excl_clean = sorted({
str(x).strip().lower() for x in exclusions
if isinstance(x, str) and x.strip()
})
if excl_clean:
excl_block = (
"\nSTRICT EXCLUSIONS — recipes containing any of these MUST be "
f"avoided in AI-chosen slots:\n {', '.join(excl_clean)}\n\n"
"Each pool entry shows allergen flags as 'has:dairy,gluten,...'. "
"Do NOT pick a recipe whose has: list intersects the exclusions "
"above. If a HOUSEHOLD PICK violates the exclusion, include it "
"anyway (picks are explicit user choices) but flag the conflict "
"in that slot's reason field (e.g., \"contains dairy — "
"household pick\").\n"
)
pref_clean = (preference or "").strip()
pref_block = ""
if pref_clean:
pref_block = (
f"\nHOUSEHOLD PREFERENCE FOR THIS WEEK:\n \"{pref_clean}\"\n\n"
"When the preference is set, BIAS your AI-chosen slots toward "
"recipes from the pool that match it. The preference may describe "
"diet (\"high protein, low carb\"), occasion (\"light meals, "
"recovery week\"), shopping constraints (\"no fish, out of "
"season\"), or vibe (\"carb load, training hard\"). The "
"preference does NOT override picks — every pick still appears. "
"It DOES change which other recipes from the pool you choose to "
"fill the remaining slots.\n"
)
# Multi-meal block: when meal_types is more than just dinner, the
# output is N×7 slots with explicit meal_type. Each pool entry has
# a meta.meal_type tag — Sonnet must match those (a "breakfast"
# slot gets a recipe whose meta.meal_type is "breakfast" or
# something close like "lunch" if no breakfast match).
meal_types_clean = list(meal_types) if isinstance(meal_types, list) and meal_types else ["dinner"]
is_multi_meal = len(meal_types_clean) > 1
meal_block = ""
if is_multi_meal:
meal_block = (
f"\nMEAL TYPES TO PLAN: {', '.join(meal_types_clean)}\n"
"Each day must have exactly one slot per meal type listed above.\n"
"When picking a recipe for a meal slot, prefer recipes whose "
"meta.meal_type matches the slot. e.g. for a 'breakfast' slot, "
"look at the pool entries with `meta meal_type=breakfast` or those "
"tagged 'breakfast'/'brunch'/'morning' in their meta tags. Don't "
"use a 'dessert' meal_type recipe for a dinner slot. If the pool "
"is thin for a meal type, fall back to general pool entries that "
"could plausibly fit (e.g. a hearty breakfast salad for lunch).\n"
)
output_shape = (
'{"slots": [{"day": "monday", "meal_type": "breakfast", '
'"recipe_slug": "...", "picker_subs": [...] or [], "reason": "..."}, ...], '
'"reading": "<one-paragraph weekly reading from Hecate>"}'
)
else:
output_shape = (
'{"slots": [{"day": "monday", "meal_type": "dinner", '
'"recipe_slug": "...", "picker_subs": [...] or [], "reason": "..."}, ...], '
'"reading": "<one-paragraph weekly reading from Hecate>"}'
)
expected_total = slots * len(meal_types_clean)
meal_types_label = "/".join(meal_types_clean) if is_multi_meal else "dinner"
return (
"You are Hecate, a Greek-mythology witch goddess of crossroads, "
"herbs, and magic — and the family's meal planner. You're advising "
"with the warm authority of someone who knows the household's tastes, "
"their week's rhythm, and what kind of nourishment fits the moment.\n\n"
f"Build a {slots}-day plan ({meal_types_label}) "
f"for the week of {week_start}.\n\n"
f"POOL (all available recipes):\n{pool_block}\n\n"
f"PICKS (recipes the family pre-selected — every pick MUST appear in "
f"some appropriate slot if pool size allows; no repeats):\n"
f"{picks_block}\n"
f"{profile_block}"
f"{meal_block}"
f"{pref_block}"
f"{targets_block}"
f"{excl_block}"
f"Output JSON ONLY, no prose: {output_shape}\n\n"
"Rules:\n"
f"- Use exactly {expected_total} slots ({slots} days × {len(meal_types_clean)} meals)\n"
"- Distribute picks evenly across the week — don't bunch them\n"
"- ROTATION: prefer recipes with `last:NNw-ago` further in the past "
"or no history shown. If a recipe has been served 2+ times in 30d "
"(`2×/30d` or higher), DEMOTE it strongly unless it's a household "
"pick. Never repeat the same recipe slug within this plan.\n"
"- VARIETY: don't fill 5 of 7 dinner slots with the same primary_protein "
"or the same cuisine. Mix it up across the week.\n"
"- meal_type MUST be one of the listed meal types and match the slot.\n"
"- \"reason\" is a one-line user-facing rationale "
"(e.g., \"balances heavy and light meals\", \"honors abby's pick\", "
"\"high-protein lean — pairs with the gym week\", "
"\"haven't had this in 8 weeks — fresh on the rotation\")\n"
"- \"picker_subs\" is the array of authentik_sub strings of family "
"members who picked this recipe (empty list if AI-chosen)\n"
"- Day order: monday..sunday\n"
"- \"reading\" is YOUR voice — Hecate addressing the household. One "
"paragraph (~3-5 sentences). Describe the arc of the week: what "
"you're leaning into, who you've honored with their picks, where "
"the leftovers thread through, what mood the week takes on. "
"Witch-goddess tone: confident, wise, a little theatrical. "
"Reference specific recipes and members by name when fitting. "
"Don't restate every slot's reason — give the WEEK's character.\n"
)
def recipe_dedupe_decision(
self, recipes: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named recipes are
actually duplicates (same recipe imported twice / hand-copied with
a slight title tweak / etc) versus distinct recipes that just
happen to look similar by name.
Input: list of recipe summaries — {slug, name, source_url,
ingredient_summary (concise list), step_count, yields}.
Returns:
{"duplicates": bool,
"canonical_slug": "<slug to keep>",
"delete_slugs": ["<slug>", ...],
"reason": "<one-line explanation>"}
duplicates=false means the cluster is a false positive and nothing
should be deleted. canonical_slug + delete_slugs must be empty in
that case. Be conservative — when in doubt return false."""
items = [
{
"slug": r.get("slug"),
"name": r.get("name"),
"source_url": r.get("source_url") or "",
"ingredient_summary": r.get("ingredient_summary") or [],
"step_count": r.get("step_count") or 0,
"yields": r.get("yields") or "",
}
for r in recipes
]
prompt = (
"You are deciding whether a cluster of similar-named recipes "
"are actual duplicates (same recipe imported or hand-copied "
"twice) or distinct recipes that share words in the title.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"duplicates": true|false, '
'"canonical_slug": "<slug to keep, or empty>", '
'"delete_slugs": ["<slug>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- duplicates=true ONLY when the recipes are clearly the same "
" dish prepared the same way (matching ingredient sets, similar "
" step counts, often shared source_url). Slight title variations "
" ('Banana Bread' vs 'Best Banana Bread') with same body = dupes.\n"
"- Pick canonical_slug = the recipe with the cleanest name, the "
" most complete data (more steps + yields filled in beats less). "
" When tied, pick the older one (lexicographic slug order is fine "
" since Mealie slugs include date-ish suffixes for dupes).\n"
"- delete_slugs = the OTHER cluster members. Mealie DELETE removes "
" them permanently — only suggest deletion when you're confident.\n"
"- duplicates=false when ingredient sets differ meaningfully, OR "
" when names suggest distinct dishes ('Chicken Stir Fry' vs "
" 'Chicken Fajitas'), OR when you genuinely cannot tell.\n"
"- Be CONSERVATIVE — false negatives are recoverable (recipes "
" stay), false positives delete data."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_recipe_dedupe_decision(result)
def cluster_decision(
self, foods: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named foods are
actually duplicates. Input: list of {id, name, plural_name?, aliases?}.
Returns:
{"merge": bool,
"canonical_id": "<id>", # the survivor (highest-quality name/aliases)
"canonical_name": "<str>", # the survivor's name (echoed for the UI)
"discard_ids": ["<id>", ...], # the ones to merge into canonical
"alias_additions": ["<name>", ...], # discarded names worth keeping as aliases on the survivor
"reason": "<one-line explanation>"}
merge=false means the cluster is a false positive (foods that look
similar but are distinct, e.g. "olive oil" vs "olive"). In that case
canonical_id may be empty and discard_ids must be empty.
"""
items = [
{
"id": f.get("id"),
"name": f.get("name"),
"plural_name": f.get("pluralName") or f.get("plural_name"),
"aliases": [
(a.get("name") if isinstance(a, dict) else a)
for a in (f.get("aliases") or [])
],
}
for f in foods
]
prompt = (
"You are deciding whether a cluster of food rows from a recipe "
"database are duplicates that should be merged into one canonical "
"row. The names came from years of recipe imports + manual entry "
"so plural/case/wording variations are common.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"merge": true|false, '
'"canonical_id": "<id of the survivor or empty>", '
'"canonical_name": "<survivor name or empty>", '
'"discard_ids": ["<id>", ...], '
'"alias_additions": ["<name to add as alias on survivor>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- Pick the survivor whose name is the cleanest canonical "
" (lowercase, singular when applicable, no brand, no clinical "
" qualifiers like 'raw' or 'unenriched').\n"
"- discard_ids are the OTHER cluster members — Mealie will rewrite "
" recipe references to point at canonical_id.\n"
"- alias_additions = the discarded NAMES (or any close variants you "
" noticed in plural_name/aliases) that the survivor should adopt as "
" aliases so the parser fuzzy-matches them in the future.\n"
"- merge=false ONLY when the cluster is a false positive (e.g. "
" 'olive oil' vs 'olive', 'butter' vs 'peanut butter'). In that "
" case canonical_id and discard_ids must both be empty.\n"
"- Be conservative — when in doubt, merge=false."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_cluster_decision(result)
def enrich_recipe(self, recipe: dict, *, model: str | None = None) -> dict:
"""Generate structured metadata for a recipe so the plan generator
can match preferences to actual recipe characteristics, not just
names.
Input: a Mealie recipe dict (uses name + description + ingredients
+ instructions + yields + recipeYield).
Output (validated):
{
"tags": [<curated descriptor strings>],
# e.g. "high-protein", "weeknight", "one-pan",
# "kid-friendly", "leftovers-good", "freezer-friendly"
"cuisine": "<american|italian|asian|mexican|...|other|unknown>",
"complexity": "easy|medium|involved",
"estimated_minutes": <int>,
"meal_type": "breakfast|lunch|dinner|snack|dessert|side",
"primary_protein": "<chicken|beef|pork|fish|tofu|beans|eggs|none|mixed>",
"primary_carb": "<rice|pasta|bread|potato|tortilla|quinoa|none|mixed>",
"veg_forward": "veg-forward|mixed|meat-forward",
"comfort_tier": "<weeknight-easy|comfort|fancy|kid-friendly|...>",
"season_fit": [<season strings>],
"summary": "<one-line vibe>",
"best_for": "<short phrase about when this is the right pick>"
}
Cheap call, idempotent — run once per recipe and cache forever
(or until enrich_version bumps)."""
# Build a compact recipe summary for the prompt
ings = recipe.get("recipeIngredient") or []
ing_lines: list[str] = []
for i in ings[:30]:
food = (i.get("food") or {}).get("name") if isinstance(i.get("food"), dict) else None
qty = i.get("quantity")
unit = (i.get("unit") or {}).get("name") if isinstance(i.get("unit"), dict) else None
note = i.get("note") or ""
line = ""
if qty not in (None, ""):
line += f"{qty} "
if unit:
line += f"{unit} "
if food:
line += food
elif note:
line += note
if line.strip():
ing_lines.append(line.strip())
instructions = recipe.get("recipeInstructions") or []
steps: list[str] = []
char_budget = 2000
for step in instructions:
if not isinstance(step, dict):
continue
text = (step.get("text") or "").strip()
if not text or char_budget <= 0:
continue
if len(text) > char_budget:
text = text[:char_budget] + ""
steps.append(text)
char_budget -= len(text)
prompt = (
"Given the following recipe, return structured metadata to help "
"an AI meal planner pick recipes that match user preferences "
"('high protein week', 'carb load', 'light recovery', etc).\n\n"
f"NAME: {recipe.get('name') or '(unnamed)'}\n"
f"DESCRIPTION: {(recipe.get('description') or '').strip()[:400]}\n"
f"YIELDS: {(recipe.get('recipeYield') or '').strip()[:80]}\n"
f"INGREDIENTS:\n - " + "\n - ".join(ing_lines or ['(none listed)']) + "\n"
f"STEPS:\n - " + "\n - ".join(steps or ['(none listed)']) + "\n\n"
"Output JSON ONLY, no prose:\n"
"{\n"
' "tags": [<curated descriptor strings — pick 3-8 from these or invent close variants: '
'"high-protein","low-carb","high-carb","low-fat","high-fiber",'
'"vegetarian","vegan","gluten-free","dairy-free","keto","paleo",'
'"weeknight","weekend","one-pan","one-pot","sheet-pan","slow-cooker","instant-pot",'
'"freezer-friendly","leftovers-good","kid-friendly","spicy","mild",'
'"hearty","light","fresh","comfort","fancy","quick","make-ahead">],\n'
' "cuisine": "<american|italian|asian|mexican|mediterranean|indian|french|middle-eastern|other|unknown>",\n'
' "complexity": "<easy|medium|involved>",\n'
' "estimated_minutes": <int total time including prep>,\n'
' "meal_type": "<breakfast|lunch|dinner|snack|dessert|side|sauce|drink>",\n'
' "primary_protein": "<chicken|beef|pork|fish|seafood|tofu|tempeh|beans|eggs|cheese|nuts|none|mixed>",\n'
' "primary_carb": "<rice|pasta|bread|potato|tortilla|quinoa|noodles|grain|none|mixed>",\n'
' "veg_forward": "<veg-forward|mixed|meat-forward>",\n'
' "comfort_tier": "<weeknight-easy|hearty-comfort|fancy-occasion|kid-friendly|date-night|crowd-pleaser>",\n'
' "season_fit": [<one or more of "spring","summer","fall","winter","year-round">],\n'
' "calories": <int per-serving estimate or null>,\n'
' "protein_g": <int per-serving estimate or null>,\n'
' "carbs_g": <int per-serving estimate or null>,\n'
' "fat_g": <int per-serving estimate or null>,\n'
' "macros_confidence": "<low|medium|high>",\n'
' "contains": {\n'
' "dairy": <bool>, // milk/cream/butter/cheese/yogurt/whey\n'
' "gluten": <bool>, // wheat/barley/rye/regular pasta/bread/flour (not GF)\n'
' "nuts": <bool>, // tree nuts (almonds, cashews, pecans, walnuts, ...)\n'
' "peanuts": <bool>, // tracked separately from tree nuts\n'
' "eggs": <bool>,\n'
' "shellfish": <bool>, // shrimp, crab, lobster, scallops, ...\n'
' "fish": <bool>,\n'
' "soy": <bool>, // soy sauce, tofu, tempeh, edamame\n'
' "sesame": <bool>,\n'
' "pork": <bool> // for halal/kosher-ish filters\n'
' },\n'
' "pairings": {\n'
' "serves_well_with": [<2-4 short phrases — sides/components>],\n'
' "drinks": [<1-3 drink suggestions: wine, beer, soda, tea>]\n'
' },\n'
' "mood": {\n'
' "cozy": <int 1-5: how cozy/cold-weather/cuddle this feels>,\n'
' "summer_fresh": <int 1-5: how light/refreshing/hot-weather this feels>,\n'
' "energizing": <int 1-5: how performance/gym-fuel this is>,\n'
' "comfort": <int 1-5: how nostalgic/comforting this is>\n'
' },\n'
' "leftover_potential": <int 1-5: 1=eat-now-only, 5=tomorrow lunch=this+++>,\n'
' "active_minutes": <int hands-on prep+cook time you must be present for>,\n'
' "hands_off_minutes": <int unattended time — rise/marinade/braise/bake>,\n'
' "equipment": [<from: "oven","stovetop","grill","instant-pot","slow-cooker",\n'
' "sheet-pan","cast-iron","air-fryer","food-processor","blender","mixer",\n'
' "smoker","sous-vide","no-cook">],\n'
' "flavor_profile": [<2-5 from: "spicy","sweet","savory","umami","tangy","smoky","herby","citrusy","rich","fresh","bitter","earthy">],\n'
' "kid_friendly_score": <int 1-5: 1=adult-only, 5=kids-love-it>,\n'
' "fiber_g": <int per-serving fiber estimate, or null>,\n'
' "sodium_mg": <int per-serving sodium estimate, or null>,\n'
' "cost_per_serving_estimate": <int rough USD per serving, or null>,\n'
' "occasion_fit": [<from: "weeknight","weekend","brunch","date-night","party","picnic","camping","holiday","game-day","kids-birthday","quiet-night-in">],\n'
' "hecate_quip": "<one-line voice description in Hecate\'s mythic-witch tone — what does this dish FEEL like? Sample: \\"Pure midwinter comfort — the kind of meal that asks for a fire and quiet.\\">",\n'
' "summary": "<one-line vibe — what KIND of meal is this>",\n'
' "best_for": "<short phrase: when is this the right pick>"\n'
"}\n\n"
"Rules:\n"
"- Return ONLY the JSON object, no markdown fences, no prose.\n"
"- Be concrete: 'high-protein' goes in tags ONLY if the recipe genuinely "
"qualifies (≥30g protein per serving is a useful threshold).\n"
"- Macros (calories, protein_g, carbs_g, fat_g, fiber_g, sodium_mg): "
"PER-SERVING estimate computed step-by-step. Don't just guess a round "
"number — internally list each major ingredient (the protein, carb, "
"fat, dairy, anything substantial), approximate its contribution per "
"serving in grams using rough USDA averages, sum, then output. "
"Cross-check that protein_g × 4 + carbs_g × 4 + fat_g × 9 ≈ calories. "
"If yields aren't clear, assume 4 servings. If the recipe is a sauce/"
"seasoning/drink with no useful per-serving notion, set them to null.\n"
"- macros_confidence: \"high\" when the recipe has explicit yields and "
"named cuts/quantities, \"medium\" when yields are inferred or some "
"ingredients are unspecified amounts, \"low\" when you had to guess "
"significantly (no yields, free-form 'to taste' amounts dominating, "
"exotic ingredients without clear USDA equivalents).\n"
"- contains.* booleans: TRUE if the ingredient appears anywhere in the "
"recipe. dairy=true for butter, cream, cheese, milk, yogurt, ghee, whey, "
"casein. gluten=true for regular flour/bread/pasta/soy sauce/beer/seitan; "
"FALSE when explicitly gluten-free or naturally GF. soy=true for soy "
"sauce, tofu, tempeh, edamame, miso.\n"
" IMPORTANT: do NOT set contains.X=TRUE unless you can name an actual "
"ingredient in the recipe that triggers it. False positives clutter the "
"data. For ANAPHYLAXIS-risk allergens (peanuts, tree nuts, shellfish, "
"eggs, fish, sesame), if a sauce or compound ingredient could plausibly "
"contain it, set TRUE conservatively. For non-allergic exclusions "
"(pork — religious/dietary), set FALSE unless an actual pork ingredient "
"is listed.\n"
"- pairings.serves_well_with: 2-4 short phrases describing sides or "
"components that pair well. Examples: 'crusty bread', 'green salad', "
"'jasmine rice', 'roasted vegetables'. Don't list ingredients already "
"in the recipe.\n"
"- pairings.drinks: 1-3 drink suggestions (specific or generic). "
"Examples: 'iced tea', 'pinot noir', 'cold lager', 'sparkling water "
"with lime'.\n"
"- mood scores (1-5): how does the dish FEEL? cozy=cold-day-cuddle, "
"summer_fresh=hot-day-light, energizing=workout-fuel, comfort=nostalgic-warm. "
"These are independent — a recipe can score high on multiple.\n"
"- leftover_potential 1-5: 1=eat-fresh-only (crispy fries, salads with "
"wilting lettuce), 3=fine the next day, 5=actually BETTER as leftovers "
"(stews, braises, lasagna).\n"
"- active_minutes vs hands_off_minutes: split the total cook time. "
"Active = chopping, sautéing, stirring, plating — you must be present. "
"Hands-off = rise, marinate, braise, bake unattended, chill, ferment. "
"Together they should sum to ~estimated_minutes. A 4-hour beef stew "
"might be 30 active + 210 hands-off.\n"
"- equipment: pick from the listed set what's REQUIRED to make the "
"recipe. A recipe that uses both stovetop AND oven gets both. Don't "
"include fundamental tools (knife, pan) — only the bigger appliances "
"or cooking modes that gate availability.\n"
"- flavor_profile: 2-5 dominant taste/mood notes. Don't overlist; "
"pick the ones that actually characterize this dish.\n"
"- kid_friendly_score 1-5: 1=adults-only (super spicy, weird textures, "
"olives + capers), 3=most kids will eat, 5=kids LOVE this (mac and "
"cheese, pancakes, chicken nuggets).\n"
"- fiber_g, sodium_mg: per-serving estimates from the ingredient list. "
"Same precision as the other macros — rough USDA averages.\n"
"- cost_per_serving_estimate: rough USD per serving. A bean-and-rice "
"bowl is $2-3, salmon for two is $8-12, a fancy roast is $15+. Best-effort, "
"current-ish (2026) US prices. Set null if too volatile (game meat, "
"regional specialty).\n"
"- occasion_fit: when is this dish AT HOME? A roast chicken can be "
"weeknight AND date-night. Mac and cheese is weeknight + kids-birthday. "
"Be generous but discerning.\n"
"- hecate_quip: one line, ~10-20 words, in Hecate's voice. Not a "
"description of WHAT the dish is — a description of what it FEELS "
"like. Mythic-witch tone, evocative, a little theatrical. Examples: "
"\"Pure midwinter comfort — asks for a fire and quiet.\" / \"Sharp "
"and bright like a Tuesday morning resolution.\" / \"The kind of "
"feast that announces itself.\"\n"
"- estimated_minutes: best guess from prep + cook implied by steps. Dishes "
"needing rise/marinade time count that time.\n"
"- complexity: 'easy' = ≤30 min + ≤7 ingredients + simple technique; "
"'medium' = 30-90 min OR moderate technique; 'involved' = >90 min OR "
"advanced technique (lamination, fermentation, multi-component).\n"
"- summary should describe the vibe / use-case, not just restate the name. "
"e.g. 'quick weeknight stir-fry with leftover-friendly portions' beats "
"'chicken stir fry with rice'.\n"
"- When uncertain on a categorical, use 'unknown' or 'other' rather than guessing."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=180)
return _extract_recipe_meta(result)
def verify_allergens(
self,
recipe: dict,
prior_contains: dict | None,
*,
model: str | None = None,
) -> dict:
"""Second-pass allergen check. Re-reads the ingredient list with a
strict prompt — must NAME the triggering ingredient or set FALSE.
Catches false-positive contains.* booleans from the initial enrich
(e.g. pork=true on a sweet potato recipe). Returns a corrected
contains dict with the same keys.
Cheap call (~2-3s, focused prompt). The strictness here is the
opposite of the conservative-default in the main enrichment —
if you can't point to the ingredient, the answer is FALSE."""
ings = recipe.get("recipeIngredient") or []
ing_lines: list[str] = []
for i in ings[:40]:
food = (i.get("food") or {}).get("name") if isinstance(i.get("food"), dict) else None
note = (i.get("note") or "").strip()
display = (i.get("display") or "").strip()
line = food or display or note
if line:
ing_lines.append(str(line).strip()[:120])
prompt = (
f"Verify the allergen flags for this recipe.\n\n"
f"NAME: {recipe.get('name') or '(unnamed)'}\n"
f"INGREDIENTS:\n - " + "\n - ".join(ing_lines or ['(none listed)']) + "\n\n"
f"PRIOR FLAGS (may be wrong): {json.dumps(prior_contains or {})}\n\n"
"Output JSON ONLY, no prose:\n"
"{\n"
' "contains": {\n'
' "dairy": <bool>, "gluten": <bool>, "nuts": <bool>,\n'
' "peanuts": <bool>, "eggs": <bool>, "shellfish": <bool>,\n'
' "fish": <bool>, "soy": <bool>, "sesame": <bool>,\n'
' "pork": <bool>\n'
" },\n"
' "evidence": {\n'
' "<allergen>": "<exact ingredient name that triggers it, or empty if FALSE>"\n'
" }\n"
"}\n\n"
"Rules:\n"
"- ONLY set TRUE if you can name a SPECIFIC ingredient in the list above\n"
" that contains it. e.g. dairy=TRUE → evidence.dairy='butter, heavy cream'.\n"
" Otherwise set FALSE and evidence.<allergen>=''.\n"
"- For ANAPHYLAXIS allergens (peanuts, tree nuts, shellfish, eggs, fish,\n"
" sesame, dairy): if a sauce/condiment 'might' contain the allergen but\n"
" it's not explicit, lean TRUE. Better safe than sorry.\n"
"- For NON-ALLERGIC exclusions (pork — religious/dietary): set FALSE\n"
" unless an actual pork ingredient is named (pork, ham, bacon,\n"
" pork sausage explicitly identified, prosciutto, pancetta, chorizo\n"
" if pork-based, lardo).\n"
"- gluten: TRUE for regular wheat flour/bread/pasta/soy sauce/beer/seitan/\n"
" semolina/couscous/bulgur. FALSE if explicitly gluten-free.\n"
"- soy: TRUE for soy sauce, tofu, tempeh, edamame, miso, soy oil.\n"
"- The prior flags are a hint, not authority. Override them based on\n"
" actual ingredient evidence."
)
try:
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
except ForgeError:
# Verification failed — fall back to prior contains; better than nothing
return prior_contains or {}
return _extract_allergen_verification(result, prior_contains or {})
def suggest_recipes(
self,
*,
eligible_pool: list[dict],
count: int = 3,
week_start: str,
preference: str | None = None,
daily_targets: dict | None = None,
exclusions: list | None = None,
picker_profiles: dict | None = None,
in_plan_slugs: list[str] | None = None,
model: str | None = None,
) -> list[dict]:
"""Ask Hecate to nominate `count` 'forgotten gems' from an already-
filtered pool (caller has dropped recipes seen in the last 90 days
and recipes already in the current week's plan). Returns:
[{"recipe_slug": "...", "recipe_name": "...", "fit_score": 1-5,
"reason": "<one-line in Hecate's voice>"}]
Each suggestion's slug MUST be in the eligible pool — Forge raises
ForgeError if the model hallucinates one. The caller surfaces the
ForgeError as a 502."""
if count < 1 or count > 6:
raise ForgeError(f"bad suggestion count: {count}")
if not eligible_pool:
raise ForgeError("eligible pool empty — nothing to suggest")
# Need at least `count` items to pick from, otherwise just return what we have
target = min(count, len(eligible_pool))
valid_by_slug: dict[str, str] = {}
for r in eligible_pool:
slug = r.get("slug")
if slug:
valid_by_slug[slug] = r.get("name") or slug
# Render pool entries the same way the planner does (compact, meta-rich)
pool_lines = []
for r in eligible_pool:
slug = r.get("slug") or ""
name = r.get("name") or slug
meta = r.get("meta") or {}
extras: list[str] = []
if meta.get("cuisine") and meta["cuisine"] not in ("unknown", "other"):
extras.append(meta["cuisine"])
if meta.get("complexity"):
extras.append(meta["complexity"])
em = meta.get("estimated_minutes")
if isinstance(em, int) and em > 0:
extras.append(f"{em}min")
if meta.get("primary_protein") and meta["primary_protein"] != "none":
extras.append(f"protein:{meta['primary_protein']}")
if meta.get("primary_carb") and meta["primary_carb"] != "none":
extras.append(f"carb:{meta['primary_carb']}")
if meta.get("veg_forward") and meta["veg_forward"] != "mixed":
extras.append(meta["veg_forward"])
if meta.get("comfort_tier"):
extras.append(f"comfort:{meta['comfort_tier']}")
if meta.get("kid_friendly"):
extras.append(f"kid:{meta['kid_friendly']}")
meta_tags = meta.get("tags") or []
if meta_tags:
extras.append("/".join(meta_tags[:5]))
quip = meta.get("hecate_quip")
if quip:
extras.append(f"vibe:{str(quip)[:80]}")
h = r.get("history") or {}
if h.get("weeks_ago") is not None:
extras.append(f"last:{h['weeks_ago']}w-ago")
else:
extras.append("never-planned")
fit = r.get("fit") or {}
if isinstance(fit, dict) and fit:
fit_str = ",".join(f"{n}:{s}" for n, s in fit.items())
extras.append(f"fit:{fit_str}")
# Source hint — Flavor B passes "library" or "discover" so Hecate
# can reason about whether an entry is already in the household's
# Mealie library (free pick) vs scraped from the open web (would
# need to be imported on pin).
src = r.get("source")
if src:
extras.append(f"source:{src}")
line = f"- {slug} | {name}"
if extras:
line += f" [{' · '.join(extras)}]"
pool_lines.append(line)
pool_block = "\n".join(pool_lines)
has_discover = any(r.get("source") == "discover" for r in eligible_pool)
# Picker profile block (same shape as planner uses, abbreviated)
profile_block = ""
if isinstance(picker_profiles, dict) and picker_profiles:
lines = []
for sub, prof in picker_profiles.items():
if not isinstance(prof, dict):
continue
disp = prof.get("display_name") or sub
cuisines = list((prof.get("cuisines") or {}).keys())[:3]
proteins = list((prof.get("proteins") or {}).keys())[:3]
bits = []
if cuisines:
bits.append("cuisines=" + "/".join(cuisines))
if proteins:
bits.append("proteins=" + "/".join(proteins))
lines.append(f" - {disp}: " + ", ".join(bits) if bits else f" - {disp}: (no signal yet)")
if lines:
profile_block = "\nFAMILY PICKER PROFILES (their demonstrated tastes):\n" + "\n".join(lines) + "\n"
pref_block = f"\nWEEK PREFERENCE: {preference}\n" if preference else ""
excl_block = ""
if isinstance(exclusions, list) and exclusions:
excl_block = f"\nMUST AVOID (allergens/exclusions): {', '.join(str(e) for e in exclusions)}\n"
targets_block = ""
if isinstance(daily_targets, dict) and any(daily_targets.get(k) for k in ("calories", "protein_g", "carbs_g", "fat_g")):
parts = []
for k, label in [("calories", "cal"), ("protein_g", "protein g"), ("carbs_g", "carbs g"), ("fat_g", "fat g")]:
v = daily_targets.get(k)
if v:
parts.append(f"{label}={v}")
if parts:
targets_block = f"\nDAILY TARGETS: {', '.join(parts)} (per day, on average)\n"
already_block = ""
if in_plan_slugs:
already_block = f"\n(For context: this week's plan already has {len(in_plan_slugs)} recipe(s). They're NOT in the pool above.)\n"
discover_hint = (
"\nNOTE: some entries below have `source:discover` — those are "
"RECIPES NEW TO THIS HOUSEHOLD, scraped from the open web and "
"enriched but not yet imported to the family's Mealie library. "
"Treat them as 'something new to try' — bias toward them when "
"the family's history is heavy on the same handful of cuisines/"
"proteins, but don't overdo it (max ~half the picks should be "
"discover-source unless the library is exhausted). Pinning a "
"discover entry will trigger an automatic import.\n"
if has_discover else ""
)
prompt = (
"You are Hecate, a Greek-mythology witch goddess of crossroads, "
"herbs, and magic — and the family's meal planner. The household "
"asked you to look through their recipe library and surface "
"'forgotten gems': recipes that fit who they are but haven't "
"been served recently (or ever).\n\n"
f"This is for the week of {week_start}.\n"
f"{discover_hint}\n"
f"ELIGIBLE POOL (already filtered: nothing seen in the last 90 days, "
f"nothing already on this week's plan):\n{pool_block}\n"
f"{profile_block}"
f"{pref_block}"
f"{targets_block}"
f"{excl_block}"
f"{already_block}"
f"\nPick exactly {target} recipe(s) from the pool above.\n\n"
"Output JSON ONLY, no prose:\n"
'{"suggestions": [{"recipe_slug": "...", "fit_score": 1-5, "reason": "..."}]}\n\n'
"Rules:\n"
f"- Exactly {target} suggestion(s); each recipe_slug MUST be in the pool above\n"
"- VARIETY: don't pick 3 of the same cuisine or 3 of the same primary_protein\n"
"- BIAS toward recipes whose meta best fits the family's picker profiles "
"and any week preference/targets stated above\n"
"- AVOID anything with an exclusion conflict (allergens, dietary)\n"
"- fit_score is YOUR 1-5 confidence this is a hit for THIS family THIS week "
"(5 = strong match, 1 = adventurous reach)\n"
"- reason is one line in Hecate's voice — a witch-goddess remarking "
"on the crossroads. Reference the meta you saw "
"(e.g., \"haven't crossed your table since June — light, herby, "
"fits the picker profile\", \"a quiet salmon pivot from the "
"chicken-heavy week you've been weaving\"). ~12-25 words.\n"
)
result = self.run(prompt, model=model or "sonnet")
suggestions = _extract_suggestions(result)
out = []
seen_slugs: set[str] = set()
for s in suggestions:
slug = (s.get("recipe_slug") or "").strip()
if not slug or slug not in valid_by_slug:
raise ForgeError(f"model output: unknown recipe_slug '{slug}'")
if slug in seen_slugs:
raise ForgeError(f"model output: duplicate suggestion '{slug}'")
seen_slugs.add(slug)
fit = s.get("fit_score")
try:
fit_int = int(fit) if fit is not None else 3
except (TypeError, ValueError):
fit_int = 3
fit_int = max(1, min(5, fit_int))
out.append({
"recipe_slug": slug,
"recipe_name": valid_by_slug[slug],
"fit_score": fit_int,
"reason": (s.get("reason") or "")[:500],
})
if not out:
raise ForgeError("model returned no usable suggestions")
return out
def fetch_food_info(self, name: str, *, model: str | None = None) -> dict:
"""Ask Sonnet for density + unit class + common size of a single
food. Returns a dict shaped like:
{"density_g_per_ml": 1.04 | null,
"default_unit_class": "mass"|"volume"|"count",
"common_size_g": 150.0 | null,
"category": "produce"|"dairy"|... | null}
density_g_per_ml is null when the food doesn't sensibly convert
between mass and volume (e.g., whole onions, eggs — these are
count-style). common_size_g lets the aggregator handle "1 onion"
as a count → mass conversion. Cheap call, cached forever once
persisted to cauldron_foods.
"""
prompt = (
f"Give nutritional/cooking metadata for the food: {name!r}.\n\n"
"Output JSON ONLY, no prose: "
'{"density_g_per_ml": float|null, '
'"default_unit_class": "mass"|"volume"|"count", '
'"common_size_g": float|null, '
'"category": "produce"|"dairy"|"meat"|"grain"|"baking"|"pantry"'
'|"spice"|"oil"|"beverage"|"other"|null}\n\n'
"Rules:\n"
"- density_g_per_ml: typical packed/cooking density. Null if "
"the food is count-based (whole onions, eggs).\n"
"- default_unit_class: how this food is most often measured "
"(salt=mass; milk=volume; egg=count).\n"
"- common_size_g: the typical mass of one whole unit (1 onion "
"≈ 150g; 1 egg ≈ 50g). Null if the food isn't naturally counted.\n"
"- category: best single fit; null if uncertain.\n"
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_food_info(result)
def _extract_allergen_verification(forge_result: dict, prior: dict) -> dict:
"""Pull the corrected contains dict out of the verify_allergens reply.
Falls back to prior on any shape problem — verification is best-effort."""
if not isinstance(forge_result, dict):
return prior
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
try:
inner = _parse_json_blob(inner)
except Exception:
return prior
if not isinstance(inner, dict):
return prior
contains_raw = inner.get("contains") or {}
if not isinstance(contains_raw, dict):
return prior
return {
k: bool(contains_raw.get(k, prior.get(k, False)))
for k in ("dairy", "gluten", "nuts", "peanuts", "eggs",
"shellfish", "fish", "soy", "sesame", "pork")
}
def _extract_recipe_meta(forge_result: dict) -> dict:
"""Validate the recipe metadata blob from Sonnet. Coerces types,
normalizes enums to lowercase, drops fields not in the schema."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe meta not a dict: {str(inner)[:200]}")
def _str(v, default=""):
return str(v).strip().lower()[:64] if isinstance(v, str) and v.strip() else default
def _str_long(v, default=""):
return str(v).strip()[:300] if isinstance(v, str) and v.strip() else default
def _str_list(v) -> list[str]:
if not isinstance(v, list):
return []
out = []
for item in v:
if isinstance(item, str) and item.strip():
out.append(item.strip().lower()[:48])
return out[:12]
def _int(v, default=0):
try:
return max(0, int(v))
except (TypeError, ValueError):
return default
def _int_or_none(v):
if v is None:
return None
try:
n = int(v)
return n if n > 0 else None
except (TypeError, ValueError):
return None
contains_raw = inner.get("contains") or {}
if not isinstance(contains_raw, dict):
contains_raw = {}
contains = {
k: bool(contains_raw.get(k))
for k in ("dairy", "gluten", "nuts", "peanuts", "eggs",
"shellfish", "fish", "soy", "sesame", "pork")
}
pairings_raw = inner.get("pairings") or {}
if not isinstance(pairings_raw, dict):
pairings_raw = {}
pairings = {
"serves_well_with": [
str(x).strip()[:80] for x in (pairings_raw.get("serves_well_with") or [])
if isinstance(x, str) and x.strip()
][:6],
"drinks": [
str(x).strip()[:60] for x in (pairings_raw.get("drinks") or [])
if isinstance(x, str) and x.strip()
][:4],
}
def _score(v, default=3):
try:
n = int(v)
return max(1, min(5, n))
except (TypeError, ValueError):
return default
mood_raw = inner.get("mood") or {}
if not isinstance(mood_raw, dict):
mood_raw = {}
mood = {k: _score(mood_raw.get(k)) for k in ("cozy", "summer_fresh", "energizing", "comfort")}
# v5 additions: split-time, equipment, flavor, kid-fit, fiber/sodium/cost,
# occasion fit, hecate's quip
return {
"tags": _str_list(inner.get("tags")),
"cuisine": _str(inner.get("cuisine"), "unknown"),
"complexity": _str(inner.get("complexity"), "medium"),
"estimated_minutes": _int(inner.get("estimated_minutes")),
"active_minutes": _int_or_none(inner.get("active_minutes")),
"hands_off_minutes": _int_or_none(inner.get("hands_off_minutes")),
"equipment": _str_list(inner.get("equipment")),
"flavor_profile": _str_list(inner.get("flavor_profile")),
"meal_type": _str(inner.get("meal_type"), "dinner"),
"primary_protein": _str(inner.get("primary_protein"), "none"),
"primary_carb": _str(inner.get("primary_carb"), "none"),
"veg_forward": _str(inner.get("veg_forward"), "mixed"),
"comfort_tier": _str(inner.get("comfort_tier"), "weeknight-easy"),
"kid_friendly_score": _score(inner.get("kid_friendly_score")),
"season_fit": _str_list(inner.get("season_fit")) or ["year-round"],
"occasion_fit": _str_list(inner.get("occasion_fit")),
"calories": _int_or_none(inner.get("calories")),
"protein_g": _int_or_none(inner.get("protein_g")),
"carbs_g": _int_or_none(inner.get("carbs_g")),
"fat_g": _int_or_none(inner.get("fat_g")),
"macros_confidence": _str(inner.get("macros_confidence"), "medium"),
"fiber_g": _int_or_none(inner.get("fiber_g")),
"sodium_mg": _int_or_none(inner.get("sodium_mg")),
"cost_per_serving_estimate": _int_or_none(inner.get("cost_per_serving_estimate")),
"contains": contains,
"pairings": pairings,
"mood": mood,
"leftover_potential": _score(inner.get("leftover_potential")),
"summary": _str_long(inner.get("summary")),
"best_for": _str_long(inner.get("best_for")),
"hecate_quip": _str_long(inner.get("hecate_quip")),
}
def _extract_recipe_dedupe_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe dedupe decision not a dict: {str(inner)[:200]}")
duplicates = bool(inner.get("duplicates"))
canonical_slug = str(inner.get("canonical_slug") or "")
delete_raw = inner.get("delete_slugs") or []
delete_slugs = [str(x) for x in delete_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not duplicates:
canonical_slug = ""
delete_slugs = []
return {
"duplicates": duplicates,
"canonical_slug": canonical_slug,
"delete_slugs": delete_slugs,
"reason": reason,
}
def _extract_cluster_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"cluster decision not a dict: {str(inner)[:200]}")
merge = bool(inner.get("merge"))
canonical_id = str(inner.get("canonical_id") or "")
canonical_name = str(inner.get("canonical_name") or "")
discard_raw = inner.get("discard_ids") or []
discard_ids = [str(x) for x in discard_raw if isinstance(x, (str, int))]
aliases_raw = inner.get("alias_additions") or []
alias_additions = [str(x) for x in aliases_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not merge:
canonical_id = ""
discard_ids = []
return {
"merge": merge,
"canonical_id": canonical_id,
"canonical_name": canonical_name,
"discard_ids": discard_ids,
"alias_additions": alias_additions,
"reason": reason,
}
def _extract_food_info(forge_result: dict) -> dict:
"""Normalize clawdforge wrapper → food info dict. Defensive on shapes."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"forge result not a dict: {str(inner)[:200]}")
cls = (inner.get("default_unit_class") or "mass").strip().lower()
if cls not in ("mass", "volume", "count", "mixed"):
cls = "mass"
def _f(v):
if v is None:
return None
try:
x = float(v)
return x if x > 0 else None
except (TypeError, ValueError):
return None
return {
"density_g_per_ml": _f(inner.get("density_g_per_ml")),
"default_unit_class": cls,
"common_size_g": _f(inner.get("common_size_g")),
"category": (inner.get("category") or None) and str(inner["category"])[:64],
}
def _extract_plan_slots(forge_result: dict):
"""clawdforge wraps its return; the JSON we asked for can sit in a few
different shapes. Normalize aggressively. Returns (slots, reading)
where reading may be empty string."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
# `result` may be a string when claude returned non-JSON — try to scrape
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, dict) and "slots" in inner:
return inner["slots"]
if isinstance(inner, list):
return inner
raise ForgeError(f"forge result missing 'slots' key: {str(inner)[:200]}")
def _extract_plan_payload(forge_result: dict) -> tuple[list, str]:
"""Like _extract_plan_slots but ALSO pulls Hecate's weekly reading
text if present. Returns (slots_list, reading_str)."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, list):
return inner, ""
if isinstance(inner, dict) and "slots" in inner:
reading = inner.get("reading") or inner.get("weekly_reading") or ""
if not isinstance(reading, str):
reading = ""
return inner["slots"], reading.strip()[:2000]
raise ForgeError(f"forge result missing 'slots' key: {str(inner)[:200]}")
def _extract_suggestions(forge_result: dict) -> list[dict]:
"""Pull a list of {recipe_slug, fit_score, reason} dicts out of the
suggest_recipes reply. Caller validates each slug against the eligible
pool — this just normalizes the shape."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, dict) and "suggestions" in inner:
items = inner["suggestions"]
elif isinstance(inner, list):
items = inner
else:
raise ForgeError(f"forge result missing 'suggestions' key: {str(inner)[:200]}")
if not isinstance(items, list):
raise ForgeError("'suggestions' must be a list")
out: list[dict] = []
for s in items:
if isinstance(s, dict):
out.append(s)
return out
def _parse_json_blob(s: str):
"""Parse the FIRST balanced JSON value out of a string. Tolerates Sonnet
appending extra prose/notes after the JSON object (which violates the
'no prose' rule but happens occasionally). Also strips ```json fences."""
s = s.strip()
# Strip code fences if Sonnet wrapped its output
s = re.sub(r"^```(?:json)?\s*", "", s)
s = re.sub(r"\s*```$", "", s)
try:
# Plain decode first — fastest path when output is clean
return json.loads(s)
except Exception:
pass
# Fall back to raw_decode which extracts the first JSON value and
# tells us where it ended. Anything after gets ignored. Handles the
# "Extra data: line 54" failure mode where Sonnet appended notes.
try:
decoder = json.JSONDecoder()
# Skip any leading whitespace before scanning
idx = 0
while idx < len(s) and s[idx] in " \t\n\r":
idx += 1
obj, _end = decoder.raw_decode(s[idx:])
return obj
except Exception as e:
raise ForgeError(f"could not parse model JSON: {e}; head={s[:200]!r}") from e