cauldron/cauldron/forge.py
Kayos 4db447edad plan: cook history + per-serving macros + allergens + picker profiles
Tier-1 data additions for the planner — turning the AI from a title-
matching guesser into a structured-data consumer. ENRICH_VERSION bumped
2→3 so existing meta gets refreshed with the new fields on next walk.

(A) Cook history. db.household_recipe_history aggregates recipe slug
    → {last_planned_date, count_30d, count_long} from cauldron_meal_
    plan_slots over a 180-day window. The plan generator's pool prompt
    now renders each recipe with rotation context: "last:8w-ago 0×/30d
    1×/180d". New planner rule: ROTATION — demote recipes shown 2+
    times in 30d unless they're picks; never repeat the same slug
    within the 7-day plan. New planner rule: VARIETY — don't fill 5
    of 7 slots with the same primary_protein or cuisine.

(B) Per-serving macros in enrichment. forge.enrich_recipe now asks
    Sonnet for calories, protein_g, carbs_g, fat_g per serving (rough
    USDA-grade estimates from ingredient list + yields). Renders into
    the pool prompt as "~480cal protein=32g carbs=45g fat=18g". Lets
    "high protein week" become a quantitative filter instead of a
    title-keyword match.

(C) Allergen booleans. New contains.* block in enrichment:
    {dairy, gluten, nuts, peanuts, eggs, shellfish, fish, soy, sesame,
    pork} — bool per allergen, conservatively defaulting to TRUE when
    uncertain since false negatives can hurt people. Pool prompt
    renders as "has:dairy,gluten,eggs". Foundation for upcoming
    "no dairy this week" exclusion-list UI on /plan.

(D) Picker profiles. db.household_picker_profiles unions current
    cauldron_meal_picks + historical meal_plan_slots.picker_subs over
    365 days, joins with cauldron_recipe_meta, aggregates per-user:
        {display_name, total_picks, cuisines, proteins, comfort_tiers,
         tags} — top-N counters each. Plan generator includes a new
    PICKER PROFILES block in the prompt:
        - cobb (sub=cobb@sulkta.com, 24 picks):
            cuisines=[asian:6, mexican:4, italian:3] ·
            proteins=[chicken:8, beef:5, fish:2] ·
            tags=[weeknight:11, high-protein:9, spicy:7]
    Sonnet uses these to bias AI-chosen slots toward each member's
    actual demonstrated taste — golden signal that's been sitting in
    the database the whole time. Picks still override profile bias.

Cost: cook history is a single SQL aggregate (free, sub-100ms). New
macro+allergen fields fold into the existing ~5s/recipe Sonnet call
with maybe 30 more output tokens. Picker profiles are 2-3 SQL queries
totaling sub-200ms even at scale. No new network round-trips.

Net effect once Cobb runs /enrich-recipes against ENRICH_VERSION 3:
plan generator has structured macros + allergen flags + cook-history
rotation context + per-user preferences to work with. The free-form
preference textarea ("high protein, no dairy") becomes a real query
against actual data, not just a Sonnet vibe-prompt.
2026-04-30 20:23:13 -07:00

823 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Thin HTTP client for clawdforge — we're a consumer."""
import json
import re
import requests
class ForgeError(RuntimeError):
pass
_DAYS = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
class Forge:
def __init__(self, *, base_url: str, token: str, default_model: str, default_timeout: int):
self.base_url = base_url.rstrip("/")
self.token = token
self.default_model = default_model
self.default_timeout = default_timeout
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.token}"}
def healthz(self) -> dict:
r = requests.get(f"{self.base_url}/healthz", headers=self._headers(), timeout=10)
r.raise_for_status()
return r.json()
def run(
self,
prompt: str,
*,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> dict:
"""POST /run. Returns parsed result dict on success.
Raises ForgeError on transport or upstream failure. The 'result' field
in the return is whatever clawdforge parsed out of `claude -p` — usually
a dict (when the prompt asked for JSON), occasionally a string.
"""
body = {"prompt": prompt, "model": model or self.default_model}
if system:
body["system"] = system
if files:
body["files"] = files
if timeout_secs:
body["timeout_secs"] = timeout_secs
# HTTP timeout = subprocess timeout + a 30s margin so we don't bail
# while clawdforge is still doing work for us.
http_timeout = (timeout_secs or self.default_timeout) + 30
try:
r = requests.post(
f"{self.base_url}/run",
headers=self._headers(),
json=body,
timeout=http_timeout,
)
except requests.RequestException as e:
raise ForgeError(f"transport: {e}") from e
if r.status_code >= 400:
raise ForgeError(f"upstream {r.status_code}: {r.text[:500]}")
return r.json()
def generate_plan(
self,
*,
picks: list[dict],
recipes: list[dict],
slots: int = 7,
week_start: str,
preference: str | None = None,
picker_profiles: dict | None = None,
model: str | None = None,
) -> list[dict]:
"""Ask Sonnet for a {slots}-day plan. Returns a list of slot dicts
shaped like:
{"day": "monday", "recipe_slug": "...", "recipe_name": "...",
"picker_subs": [...], "reason": "...", "source": "pick"|"mealie"}
Validates structure aggressively — wrong shape / wrong slot count /
slug-not-in-pool → ForgeError. Caller surfaces a 502 to the user.
recipes: [{slug, name, tags?}], picks: [{slug, name, picker_subs}].
Picks are the family's pinned recipes; the prompt mandates each one
appears exactly once when the pool is large enough.
"""
if slots < 1 or slots > 14:
raise ForgeError(f"bad slot count: {slots}")
if not recipes:
raise ForgeError("recipe pool empty — cannot generate")
# Build a slug → name map for validation. Use the recipe pool plus
# picks (picks should already be in the pool, but be defensive).
valid_by_slug: dict[str, str] = {}
for r in recipes:
slug = r.get("slug")
if slug:
valid_by_slug[slug] = r.get("name") or slug
for p in picks:
slug = p.get("slug")
if slug:
valid_by_slug.setdefault(slug, p.get("name") or slug)
prompt = self._build_plan_prompt(
picks=picks, recipes=recipes, slots=slots, week_start=week_start,
preference=preference, picker_profiles=picker_profiles,
)
result = self.run(prompt, model=model or "sonnet")
parsed = _extract_plan_slots(result)
if not isinstance(parsed, list):
raise ForgeError("model output: 'slots' must be a list")
if len(parsed) != slots:
raise ForgeError(f"model output: got {len(parsed)} slots, expected {slots}")
# Pick attribution lookup keyed by slug → list[sub]
pick_subs_by_slug: dict[str, list[str]] = {}
for p in picks:
slug = p.get("slug")
if slug:
pick_subs_by_slug[slug] = list(p.get("picker_subs") or [])
out = []
seen_days: set[str] = set()
for raw in parsed:
if not isinstance(raw, dict):
raise ForgeError("model output: each slot must be an object")
day = (raw.get("day") or "").strip().lower()
slug = (raw.get("recipe_slug") or "").strip()
if day not in _DAYS:
raise ForgeError(f"model output: bad day '{day}'")
if day in seen_days:
raise ForgeError(f"model output: duplicate day '{day}'")
seen_days.add(day)
if not slug or slug not in valid_by_slug:
raise ForgeError(f"model output: unknown recipe_slug '{slug}'")
# Trust the model's picker_subs only if they intersect the real
# set. We have ground truth in pick_subs_by_slug — prefer it.
real_pickers = pick_subs_by_slug.get(slug, [])
model_pickers = raw.get("picker_subs") or []
if not isinstance(model_pickers, list):
model_pickers = []
picker_subs = real_pickers if real_pickers else [
s for s in model_pickers if isinstance(s, str)
]
source = "pick" if real_pickers else "mealie"
out.append({
"day": day,
"recipe_slug": slug,
"recipe_name": valid_by_slug[slug],
"picker_subs": picker_subs,
"reason": (raw.get("reason") or "")[:500],
"source": source,
})
return out
@staticmethod
def _build_plan_prompt(*, picks, recipes, slots, week_start, preference=None,
picker_profiles=None) -> str:
pool_lines = []
for r in recipes:
slug = r.get("slug") or ""
name = r.get("name") or slug
tags = r.get("tags") or []
meta = r.get("meta") or {}
extras: list[str] = []
# First 3 Mealie tags
if tags:
cleaned = []
for t in tags[:3]:
if isinstance(t, dict):
cleaned.append(t.get("name") or "")
elif isinstance(t, str):
cleaned.append(t)
cleaned = [c for c in cleaned if c]
if cleaned:
extras.append(", ".join(cleaned))
# Sonnet-generated meta — the actual high-signal stuff
if meta:
if meta.get("cuisine") and meta["cuisine"] not in ("unknown", "other"):
extras.append(meta["cuisine"])
if meta.get("complexity"):
extras.append(meta["complexity"])
em = meta.get("estimated_minutes")
if isinstance(em, int) and em > 0:
extras.append(f"{em}min")
if meta.get("primary_protein") and meta["primary_protein"] != "none":
extras.append(f"protein:{meta['primary_protein']}")
if meta.get("primary_carb") and meta["primary_carb"] != "none":
extras.append(f"carb:{meta['primary_carb']}")
if meta.get("veg_forward") and meta["veg_forward"] != "mixed":
extras.append(meta["veg_forward"])
meta_tags = meta.get("tags") or []
if meta_tags:
extras.append("/".join(meta_tags[:5]))
if meta.get("calories"):
extras.append(f"~{meta['calories']}cal")
if meta.get("protein_g"):
extras.append(f"protein={meta['protein_g']}g")
if meta.get("carbs_g"):
extras.append(f"carbs={meta['carbs_g']}g")
if meta.get("fat_g"):
extras.append(f"fat={meta['fat_g']}g")
# Allergen flags — short-circuit list of "what's in this"
contains = meta.get("contains") or {}
if isinstance(contains, dict):
flags = [k for k, v in contains.items() if v]
if flags:
extras.append("has:" + ",".join(flags))
# Rotation history — let Sonnet avoid 3-weeks-in-a-row repeats
history = r.get("history") or {}
if history:
wa = history.get("weeks_ago")
c30 = history.get("count_30d") or 0
cl = history.get("count_long") or 0
hist_bits = []
if wa is not None:
hist_bits.append(f"last:{wa}w-ago" if wa > 0 else "last:this-week")
if c30:
hist_bits.append(f"{c30}×/30d")
if cl:
hist_bits.append(f"{cl}×/180d")
if hist_bits:
extras.append(" ".join(hist_bits))
if meta and meta.get("summary"):
# Inline 1-line summary helps Sonnet match preferences
summary = str(meta["summary"])[:140]
pool_lines.append(f"- {slug}: {name} [{' · '.join(extras)}]\n {summary}")
continue
extra_str = f" [{' · '.join(extras)}]" if extras else ""
pool_lines.append(f"- {slug}: {name}{extra_str}")
pick_lines = []
for p in picks:
slug = p.get("slug") or ""
name = p.get("name") or slug
pickers = p.get("pickers") or []
picker_subs = p.get("picker_subs") or []
who = ", ".join(pickers) if pickers else "household"
subs_repr = json.dumps(picker_subs)
pick_lines.append(f"- {slug}: {name} (picked by [{who}], picker_subs={subs_repr})")
picks_block = "\n".join(pick_lines) if pick_lines else "(none)"
pool_block = "\n".join(pool_lines)
# Picker profiles: per-member historical picking patterns. Helps
# Sonnet bias AI-chosen slots toward each member's actual taste.
profile_block = ""
if picker_profiles:
lines: list[str] = []
for sub, prof in picker_profiles.items():
if not isinstance(prof, dict):
continue
name = prof.get("display_name") or sub
total = prof.get("total_picks") or 0
bits = []
cuisines = prof.get("cuisines") or {}
if cuisines:
bits.append("cuisines=[" + ", ".join(
f"{k}:{v}" for k, v in list(cuisines.items())[:4]
) + "]")
proteins = prof.get("proteins") or {}
if proteins:
bits.append("proteins=[" + ", ".join(
f"{k}:{v}" for k, v in list(proteins.items())[:4]
) + "]")
tags = prof.get("tags") or {}
if tags:
bits.append("tags=[" + ", ".join(
f"{k}:{v}" for k, v in list(tags.items())[:5]
) + "]")
tier = prof.get("comfort_tiers") or {}
if tier:
bits.append("tier=[" + ", ".join(
f"{k}:{v}" for k, v in list(tier.items())[:2]
) + "]")
if bits:
lines.append(f" - {name} (sub={sub}, {total} picks): " + " · ".join(bits))
if lines:
profile_block = (
"\nPICKER PROFILES — per-member historical picking patterns:\n"
+ "\n".join(lines) + "\n\n"
"Use these to bias AI-chosen slots toward each member's "
"preferences. e.g., if Cobb's profile shows cuisines=[asian:6, "
"mexican:4] and proteins=[chicken:8], lean toward asian-chicken "
"recipes for the AI-filled slots when other constraints permit. "
"Picks still take precedence over profile bias.\n"
)
pref_clean = (preference or "").strip()
pref_block = ""
if pref_clean:
pref_block = (
f"\nHOUSEHOLD PREFERENCE FOR THIS WEEK:\n \"{pref_clean}\"\n\n"
"When the preference is set, BIAS your AI-chosen slots toward "
"recipes from the pool that match it. The preference may describe "
"diet (\"high protein, low carb\"), occasion (\"light meals, "
"recovery week\"), shopping constraints (\"no fish, out of "
"season\"), or vibe (\"carb load, training hard\"). The "
"preference does NOT override picks — every pick still appears. "
"It DOES change which other recipes from the pool you choose to "
"fill the remaining slots.\n"
)
return (
f"You are a family meal planner. Build a {slots}-day dinner plan "
f"for the week of {week_start}.\n\n"
f"POOL (all available recipes):\n{pool_block}\n\n"
f"PICKS (recipes the family pre-selected — every pick MUST appear "
f"if pool size >= {slots}; no repeats unless pool < {slots}):\n"
f"{picks_block}\n"
f"{profile_block}"
f"{pref_block}"
"Output JSON ONLY, no prose: "
'{"slots": [{"day": "monday", "recipe_slug": "...", '
'"picker_subs": [...] or [], "reason": "..."}, ...]}\n\n'
"Rules:\n"
f"- Use exactly {slots} recipes\n"
"- Distribute picks evenly across the week — don't bunch them\n"
"- ROTATION: prefer recipes with `last:NNw-ago` further in the past "
"or no history shown. If a recipe has been served 2+ times in 30d "
"(`2×/30d` or higher), DEMOTE it strongly unless it's a household "
"pick. Never repeat the same recipe slug within this 7-day plan.\n"
"- VARIETY: don't fill 5 of 7 slots with the same primary_protein or "
"the same cuisine. Mix it up across the week.\n"
"- \"reason\" is a one-line user-facing rationale "
"(e.g., \"balances heavy and light meals\", \"honors abby's pick\", "
"\"high-protein lean — pairs with the gym week\", "
"\"haven't had this in 8 weeks — fresh on the rotation\")\n"
"- \"picker_subs\" is the array of authentik_sub strings of family "
"members who picked this recipe (empty list if AI-chosen)\n"
"- Day order: monday..sunday\n"
)
def recipe_dedupe_decision(
self, recipes: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named recipes are
actually duplicates (same recipe imported twice / hand-copied with
a slight title tweak / etc) versus distinct recipes that just
happen to look similar by name.
Input: list of recipe summaries — {slug, name, source_url,
ingredient_summary (concise list), step_count, yields}.
Returns:
{"duplicates": bool,
"canonical_slug": "<slug to keep>",
"delete_slugs": ["<slug>", ...],
"reason": "<one-line explanation>"}
duplicates=false means the cluster is a false positive and nothing
should be deleted. canonical_slug + delete_slugs must be empty in
that case. Be conservative — when in doubt return false."""
items = [
{
"slug": r.get("slug"),
"name": r.get("name"),
"source_url": r.get("source_url") or "",
"ingredient_summary": r.get("ingredient_summary") or [],
"step_count": r.get("step_count") or 0,
"yields": r.get("yields") or "",
}
for r in recipes
]
prompt = (
"You are deciding whether a cluster of similar-named recipes "
"are actual duplicates (same recipe imported or hand-copied "
"twice) or distinct recipes that share words in the title.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"duplicates": true|false, '
'"canonical_slug": "<slug to keep, or empty>", '
'"delete_slugs": ["<slug>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- duplicates=true ONLY when the recipes are clearly the same "
" dish prepared the same way (matching ingredient sets, similar "
" step counts, often shared source_url). Slight title variations "
" ('Banana Bread' vs 'Best Banana Bread') with same body = dupes.\n"
"- Pick canonical_slug = the recipe with the cleanest name, the "
" most complete data (more steps + yields filled in beats less). "
" When tied, pick the older one (lexicographic slug order is fine "
" since Mealie slugs include date-ish suffixes for dupes).\n"
"- delete_slugs = the OTHER cluster members. Mealie DELETE removes "
" them permanently — only suggest deletion when you're confident.\n"
"- duplicates=false when ingredient sets differ meaningfully, OR "
" when names suggest distinct dishes ('Chicken Stir Fry' vs "
" 'Chicken Fajitas'), OR when you genuinely cannot tell.\n"
"- Be CONSERVATIVE — false negatives are recoverable (recipes "
" stay), false positives delete data."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_recipe_dedupe_decision(result)
def cluster_decision(
self, foods: list[dict], *, model: str | None = None
) -> dict:
"""Ask Sonnet whether a cluster of similar-named foods are
actually duplicates. Input: list of {id, name, plural_name?, aliases?}.
Returns:
{"merge": bool,
"canonical_id": "<id>", # the survivor (highest-quality name/aliases)
"canonical_name": "<str>", # the survivor's name (echoed for the UI)
"discard_ids": ["<id>", ...], # the ones to merge into canonical
"alias_additions": ["<name>", ...], # discarded names worth keeping as aliases on the survivor
"reason": "<one-line explanation>"}
merge=false means the cluster is a false positive (foods that look
similar but are distinct, e.g. "olive oil" vs "olive"). In that case
canonical_id may be empty and discard_ids must be empty.
"""
items = [
{
"id": f.get("id"),
"name": f.get("name"),
"plural_name": f.get("pluralName") or f.get("plural_name"),
"aliases": [
(a.get("name") if isinstance(a, dict) else a)
for a in (f.get("aliases") or [])
],
}
for f in foods
]
prompt = (
"You are deciding whether a cluster of food rows from a recipe "
"database are duplicates that should be merged into one canonical "
"row. The names came from years of recipe imports + manual entry "
"so plural/case/wording variations are common.\n\n"
f"Cluster:\n{json.dumps(items, indent=2)}\n\n"
"Output JSON ONLY, no prose: "
'{"merge": true|false, '
'"canonical_id": "<id of the survivor or empty>", '
'"canonical_name": "<survivor name or empty>", '
'"discard_ids": ["<id>", ...], '
'"alias_additions": ["<name to add as alias on survivor>", ...], '
'"reason": "<one-line reasoning>"}\n\n'
"Rules:\n"
"- Pick the survivor whose name is the cleanest canonical "
" (lowercase, singular when applicable, no brand, no clinical "
" qualifiers like 'raw' or 'unenriched').\n"
"- discard_ids are the OTHER cluster members — Mealie will rewrite "
" recipe references to point at canonical_id.\n"
"- alias_additions = the discarded NAMES (or any close variants you "
" noticed in plural_name/aliases) that the survivor should adopt as "
" aliases so the parser fuzzy-matches them in the future.\n"
"- merge=false ONLY when the cluster is a false positive (e.g. "
" 'olive oil' vs 'olive', 'butter' vs 'peanut butter'). In that "
" case canonical_id and discard_ids must both be empty.\n"
"- Be conservative — when in doubt, merge=false."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_cluster_decision(result)
def enrich_recipe(self, recipe: dict, *, model: str | None = None) -> dict:
"""Generate structured metadata for a recipe so the plan generator
can match preferences to actual recipe characteristics, not just
names.
Input: a Mealie recipe dict (uses name + description + ingredients
+ instructions + yields + recipeYield).
Output (validated):
{
"tags": [<curated descriptor strings>],
# e.g. "high-protein", "weeknight", "one-pan",
# "kid-friendly", "leftovers-good", "freezer-friendly"
"cuisine": "<american|italian|asian|mexican|...|other|unknown>",
"complexity": "easy|medium|involved",
"estimated_minutes": <int>,
"meal_type": "breakfast|lunch|dinner|snack|dessert|side",
"primary_protein": "<chicken|beef|pork|fish|tofu|beans|eggs|none|mixed>",
"primary_carb": "<rice|pasta|bread|potato|tortilla|quinoa|none|mixed>",
"veg_forward": "veg-forward|mixed|meat-forward",
"comfort_tier": "<weeknight-easy|comfort|fancy|kid-friendly|...>",
"season_fit": [<season strings>],
"summary": "<one-line vibe>",
"best_for": "<short phrase about when this is the right pick>"
}
Cheap call, idempotent — run once per recipe and cache forever
(or until enrich_version bumps)."""
# Build a compact recipe summary for the prompt
ings = recipe.get("recipeIngredient") or []
ing_lines: list[str] = []
for i in ings[:30]:
food = (i.get("food") or {}).get("name") if isinstance(i.get("food"), dict) else None
qty = i.get("quantity")
unit = (i.get("unit") or {}).get("name") if isinstance(i.get("unit"), dict) else None
note = i.get("note") or ""
line = ""
if qty not in (None, ""):
line += f"{qty} "
if unit:
line += f"{unit} "
if food:
line += food
elif note:
line += note
if line.strip():
ing_lines.append(line.strip())
instructions = recipe.get("recipeInstructions") or []
steps: list[str] = []
char_budget = 2000
for step in instructions:
if not isinstance(step, dict):
continue
text = (step.get("text") or "").strip()
if not text or char_budget <= 0:
continue
if len(text) > char_budget:
text = text[:char_budget] + ""
steps.append(text)
char_budget -= len(text)
prompt = (
"Given the following recipe, return structured metadata to help "
"an AI meal planner pick recipes that match user preferences "
"('high protein week', 'carb load', 'light recovery', etc).\n\n"
f"NAME: {recipe.get('name') or '(unnamed)'}\n"
f"DESCRIPTION: {(recipe.get('description') or '').strip()[:400]}\n"
f"YIELDS: {(recipe.get('recipeYield') or '').strip()[:80]}\n"
f"INGREDIENTS:\n - " + "\n - ".join(ing_lines or ['(none listed)']) + "\n"
f"STEPS:\n - " + "\n - ".join(steps or ['(none listed)']) + "\n\n"
"Output JSON ONLY, no prose:\n"
"{\n"
' "tags": [<curated descriptor strings — pick 3-8 from these or invent close variants: '
'"high-protein","low-carb","high-carb","low-fat","high-fiber",'
'"vegetarian","vegan","gluten-free","dairy-free","keto","paleo",'
'"weeknight","weekend","one-pan","one-pot","sheet-pan","slow-cooker","instant-pot",'
'"freezer-friendly","leftovers-good","kid-friendly","spicy","mild",'
'"hearty","light","fresh","comfort","fancy","quick","make-ahead">],\n'
' "cuisine": "<american|italian|asian|mexican|mediterranean|indian|french|middle-eastern|other|unknown>",\n'
' "complexity": "<easy|medium|involved>",\n'
' "estimated_minutes": <int total time including prep>,\n'
' "meal_type": "<breakfast|lunch|dinner|snack|dessert|side|sauce|drink>",\n'
' "primary_protein": "<chicken|beef|pork|fish|seafood|tofu|tempeh|beans|eggs|cheese|nuts|none|mixed>",\n'
' "primary_carb": "<rice|pasta|bread|potato|tortilla|quinoa|noodles|grain|none|mixed>",\n'
' "veg_forward": "<veg-forward|mixed|meat-forward>",\n'
' "comfort_tier": "<weeknight-easy|hearty-comfort|fancy-occasion|kid-friendly|date-night|crowd-pleaser>",\n'
' "season_fit": [<one or more of "spring","summer","fall","winter","year-round">],\n'
' "calories": <int per-serving estimate or null>,\n'
' "protein_g": <int per-serving estimate or null>,\n'
' "carbs_g": <int per-serving estimate or null>,\n'
' "fat_g": <int per-serving estimate or null>,\n'
' "contains": {\n'
' "dairy": <bool>, // milk/cream/butter/cheese/yogurt/whey\n'
' "gluten": <bool>, // wheat/barley/rye/regular pasta/bread/flour (not GF)\n'
' "nuts": <bool>, // tree nuts (almonds, cashews, pecans, walnuts, ...)\n'
' "peanuts": <bool>, // tracked separately from tree nuts\n'
' "eggs": <bool>,\n'
' "shellfish": <bool>, // shrimp, crab, lobster, scallops, ...\n'
' "fish": <bool>,\n'
' "soy": <bool>, // soy sauce, tofu, tempeh, edamame\n'
' "sesame": <bool>,\n'
' "pork": <bool> // for halal/kosher-ish filters\n'
' },\n'
' "summary": "<one-line vibe — what KIND of meal is this>",\n'
' "best_for": "<short phrase: when is this the right pick>"\n'
"}\n\n"
"Rules:\n"
"- Return ONLY the JSON object, no markdown fences, no prose.\n"
"- Be concrete: 'high-protein' goes in tags ONLY if the recipe genuinely "
"qualifies (≥30g protein per serving is a useful threshold).\n"
"- Macros (calories, protein_g, carbs_g, fat_g): best-effort PER-SERVING "
"estimate from the ingredient list and yields. Use rough USDA averages — "
"we want signal not precision. If yields aren't clear, assume 4 servings. "
"If the recipe is a sauce/seasoning/drink with no useful per-serving notion, "
"set them to null.\n"
"- contains.* booleans: TRUE if the ingredient appears anywhere in the "
"recipe (even small amounts — these drive allergen filters, not "
"macro thresholds). dairy=true for butter, cream, cheese, milk, yogurt, "
"ghee, whey, casein. gluten=true for regular flour/bread/pasta/soy "
"sauce/beer/seitan; FALSE only when explicitly gluten-free or naturally "
"GF. soy=true for soy sauce, tofu, tempeh, edamame, miso. Conservative "
"default: when uncertain, set TRUE (false negatives can cause allergic "
"reactions; false positives just narrow choices).\n"
"- estimated_minutes: best guess from prep + cook implied by steps. Dishes "
"needing rise/marinade time count that time.\n"
"- complexity: 'easy' = ≤30 min + ≤7 ingredients + simple technique; "
"'medium' = 30-90 min OR moderate technique; 'involved' = >90 min OR "
"advanced technique (lamination, fermentation, multi-component).\n"
"- summary should describe the vibe / use-case, not just restate the name. "
"e.g. 'quick weeknight stir-fry with leftover-friendly portions' beats "
"'chicken stir fry with rice'.\n"
"- When uncertain on a categorical, use 'unknown' or 'other' rather than guessing."
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=90)
return _extract_recipe_meta(result)
def fetch_food_info(self, name: str, *, model: str | None = None) -> dict:
"""Ask Sonnet for density + unit class + common size of a single
food. Returns a dict shaped like:
{"density_g_per_ml": 1.04 | null,
"default_unit_class": "mass"|"volume"|"count",
"common_size_g": 150.0 | null,
"category": "produce"|"dairy"|... | null}
density_g_per_ml is null when the food doesn't sensibly convert
between mass and volume (e.g., whole onions, eggs — these are
count-style). common_size_g lets the aggregator handle "1 onion"
as a count → mass conversion. Cheap call, cached forever once
persisted to cauldron_foods.
"""
prompt = (
f"Give nutritional/cooking metadata for the food: {name!r}.\n\n"
"Output JSON ONLY, no prose: "
'{"density_g_per_ml": float|null, '
'"default_unit_class": "mass"|"volume"|"count", '
'"common_size_g": float|null, '
'"category": "produce"|"dairy"|"meat"|"grain"|"baking"|"pantry"'
'|"spice"|"oil"|"beverage"|"other"|null}\n\n'
"Rules:\n"
"- density_g_per_ml: typical packed/cooking density. Null if "
"the food is count-based (whole onions, eggs).\n"
"- default_unit_class: how this food is most often measured "
"(salt=mass; milk=volume; egg=count).\n"
"- common_size_g: the typical mass of one whole unit (1 onion "
"≈ 150g; 1 egg ≈ 50g). Null if the food isn't naturally counted.\n"
"- category: best single fit; null if uncertain.\n"
)
result = self.run(prompt, model=model or "sonnet", timeout_secs=60)
return _extract_food_info(result)
def _extract_recipe_meta(forge_result: dict) -> dict:
"""Validate the recipe metadata blob from Sonnet. Coerces types,
normalizes enums to lowercase, drops fields not in the schema."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe meta not a dict: {str(inner)[:200]}")
def _str(v, default=""):
return str(v).strip().lower()[:64] if isinstance(v, str) and v.strip() else default
def _str_long(v, default=""):
return str(v).strip()[:300] if isinstance(v, str) and v.strip() else default
def _str_list(v) -> list[str]:
if not isinstance(v, list):
return []
out = []
for item in v:
if isinstance(item, str) and item.strip():
out.append(item.strip().lower()[:48])
return out[:12]
def _int(v, default=0):
try:
return max(0, int(v))
except (TypeError, ValueError):
return default
def _int_or_none(v):
if v is None:
return None
try:
n = int(v)
return n if n > 0 else None
except (TypeError, ValueError):
return None
contains_raw = inner.get("contains") or {}
if not isinstance(contains_raw, dict):
contains_raw = {}
contains = {
k: bool(contains_raw.get(k))
for k in ("dairy", "gluten", "nuts", "peanuts", "eggs",
"shellfish", "fish", "soy", "sesame", "pork")
}
return {
"tags": _str_list(inner.get("tags")),
"cuisine": _str(inner.get("cuisine"), "unknown"),
"complexity": _str(inner.get("complexity"), "medium"),
"estimated_minutes": _int(inner.get("estimated_minutes")),
"meal_type": _str(inner.get("meal_type"), "dinner"),
"primary_protein": _str(inner.get("primary_protein"), "none"),
"primary_carb": _str(inner.get("primary_carb"), "none"),
"veg_forward": _str(inner.get("veg_forward"), "mixed"),
"comfort_tier": _str(inner.get("comfort_tier"), "weeknight-easy"),
"season_fit": _str_list(inner.get("season_fit")) or ["year-round"],
"calories": _int_or_none(inner.get("calories")),
"protein_g": _int_or_none(inner.get("protein_g")),
"carbs_g": _int_or_none(inner.get("carbs_g")),
"fat_g": _int_or_none(inner.get("fat_g")),
"contains": contains,
"summary": _str_long(inner.get("summary")),
"best_for": _str_long(inner.get("best_for")),
}
def _extract_recipe_dedupe_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"recipe dedupe decision not a dict: {str(inner)[:200]}")
duplicates = bool(inner.get("duplicates"))
canonical_slug = str(inner.get("canonical_slug") or "")
delete_raw = inner.get("delete_slugs") or []
delete_slugs = [str(x) for x in delete_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not duplicates:
canonical_slug = ""
delete_slugs = []
return {
"duplicates": duplicates,
"canonical_slug": canonical_slug,
"delete_slugs": delete_slugs,
"reason": reason,
}
def _extract_cluster_decision(forge_result: dict) -> dict:
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"cluster decision not a dict: {str(inner)[:200]}")
merge = bool(inner.get("merge"))
canonical_id = str(inner.get("canonical_id") or "")
canonical_name = str(inner.get("canonical_name") or "")
discard_raw = inner.get("discard_ids") or []
discard_ids = [str(x) for x in discard_raw if isinstance(x, (str, int))]
aliases_raw = inner.get("alias_additions") or []
alias_additions = [str(x) for x in aliases_raw if isinstance(x, str) and x.strip()]
reason = str(inner.get("reason") or "")[:500]
if not merge:
canonical_id = ""
discard_ids = []
return {
"merge": merge,
"canonical_id": canonical_id,
"canonical_name": canonical_name,
"discard_ids": discard_ids,
"alias_additions": alias_additions,
"reason": reason,
}
def _extract_food_info(forge_result: dict) -> dict:
"""Normalize clawdforge wrapper → food info dict. Defensive on shapes."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if not isinstance(inner, dict):
raise ForgeError(f"forge result not a dict: {str(inner)[:200]}")
cls = (inner.get("default_unit_class") or "mass").strip().lower()
if cls not in ("mass", "volume", "count", "mixed"):
cls = "mass"
def _f(v):
if v is None:
return None
try:
x = float(v)
return x if x > 0 else None
except (TypeError, ValueError):
return None
return {
"density_g_per_ml": _f(inner.get("density_g_per_ml")),
"default_unit_class": cls,
"common_size_g": _f(inner.get("common_size_g")),
"category": (inner.get("category") or None) and str(inner["category"])[:64],
}
def _extract_plan_slots(forge_result: dict):
"""clawdforge wraps its return; the JSON we asked for can sit in a few
different shapes. Normalize aggressively."""
if not isinstance(forge_result, dict):
raise ForgeError("forge result not a dict")
inner = forge_result.get("result", forge_result)
# `result` may be a string when claude returned non-JSON — try to scrape
if isinstance(inner, str):
inner = _parse_json_blob(inner)
if isinstance(inner, dict) and "slots" in inner:
return inner["slots"]
if isinstance(inner, list):
return inner
raise ForgeError(f"forge result missing 'slots' key: {str(inner)[:200]}")
def _parse_json_blob(s: str):
s = s.strip()
# Strip code fences if Sonnet wrapped its output
s = re.sub(r"^```(?:json)?\s*", "", s)
s = re.sub(r"\s*```$", "", s)
try:
return json.loads(s)
except Exception as e:
raise ForgeError(f"could not parse model JSON: {e}; head={s[:200]!r}") from e